// Copyright 2015 Google Inc. All rights reserved // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package kati import ( "container/heap" "errors" "fmt" "os" "os/exec" "syscall" "time" "github.com/golang/glog" ) var ( errNothingDone = errors.New("nothing done") ) type job struct { n *DepNode ex *Executor parents []*job outputTs int64 numDeps int depsTs int64 id int runners []runner } type jobResult struct { j *job w *worker err error } type newDep struct { j *job neededBy *job } type worker struct { wm *workerManager jobChan chan *job waitChan chan bool doneChan chan bool } type jobQueue []*job func (jq jobQueue) Len() int { return len(jq) } func (jq jobQueue) Swap(i, j int) { jq[i], jq[j] = jq[j], jq[i] } func (jq jobQueue) Less(i, j int) bool { // First come, first serve, for GNU make compatibility. return jq[i].id < jq[j].id } func (jq *jobQueue) Push(x interface{}) { item := x.(*job) *jq = append(*jq, item) } func (jq *jobQueue) Pop() interface{} { old := *jq n := len(old) item := old[n-1] *jq = old[0 : n-1] return item } func newWorker(wm *workerManager) *worker { w := &worker{ wm: wm, jobChan: make(chan *job), waitChan: make(chan bool), doneChan: make(chan bool), } return w } func (w *worker) Run() { done := false for !done { select { case j := <-w.jobChan: err := j.build() w.wm.ReportResult(w, j, err) case done = <-w.waitChan: } } w.doneChan <- true } func (w *worker) PostJob(j *job) { w.jobChan <- j } func (w *worker) Wait() { w.waitChan <- true <-w.doneChan } func (j *job) createRunners() ([]runner, error) { runners, _, err := createRunners(j.ex.ctx, j.n) return runners, err } // TODO(ukai): use time.Time? func getTimestamp(filename string) int64 { st, err := os.Stat(filename) if err != nil { return -2 } return st.ModTime().Unix() } func (j *job) build() error { if j.n.IsPhony { j.outputTs = -2 // trigger cmd even if all inputs don't exist. } else { j.outputTs = getTimestamp(j.n.Output) } if !j.n.HasRule { if j.outputTs >= 0 || j.n.IsPhony { return errNothingDone } if len(j.parents) == 0 { return fmt.Errorf("*** No rule to make target %q.", j.n.Output) } return fmt.Errorf("*** No rule to make target %q, needed by %q.", j.n.Output, j.parents[0].n.Output) } if j.outputTs >= j.depsTs { // TODO: stats. return errNothingDone } rr, err := j.createRunners() if err != nil { return err } if len(rr) == 0 { return errNothingDone } for _, r := range rr { err := r.run(j.n.Output) glog.Warningf("cmd result for %q: %v", j.n.Output, err) if err != nil { exit := exitStatus(err) return fmt.Errorf("*** [%s] Error %d", j.n.Output, exit) } } if j.n.IsPhony { j.outputTs = time.Now().Unix() } else { j.outputTs = getTimestamp(j.n.Output) if j.outputTs < 0 { j.outputTs = time.Now().Unix() } } return nil } func (wm *workerManager) handleJobs() error { for { if len(wm.freeWorkers) == 0 { return nil } if wm.readyQueue.Len() == 0 { return nil } j := heap.Pop(&wm.readyQueue).(*job) glog.V(1).Infof("run: %s", j.n.Output) j.numDeps = -1 // Do not let other workers pick this. w := wm.freeWorkers[0] wm.freeWorkers = wm.freeWorkers[1:] wm.busyWorkers[w] = true w.jobChan <- j } } func (wm *workerManager) updateParents(j *job) { for _, p := range j.parents { p.numDeps-- glog.V(1).Infof("child: %s (%d)", p.n.Output, p.numDeps) if p.depsTs < j.outputTs { p.depsTs = j.outputTs } wm.maybePushToReadyQueue(p) } } type workerManager struct { maxJobs int jobs []*job readyQueue jobQueue jobChan chan *job resultChan chan jobResult newDepChan chan newDep stopChan chan bool waitChan chan bool doneChan chan error freeWorkers []*worker busyWorkers map[*worker]bool ex *Executor runnings map[string]*job finishCnt int skipCnt int } func newWorkerManager(numJobs int) (*workerManager, error) { wm := &workerManager{ maxJobs: numJobs, jobChan: make(chan *job), resultChan: make(chan jobResult), newDepChan: make(chan newDep), stopChan: make(chan bool), waitChan: make(chan bool), doneChan: make(chan error), busyWorkers: make(map[*worker]bool), } wm.busyWorkers = make(map[*worker]bool) for i := 0; i < numJobs; i++ { w := newWorker(wm) wm.freeWorkers = append(wm.freeWorkers, w) go w.Run() } heap.Init(&wm.readyQueue) go wm.Run() return wm, nil } func exitStatus(err error) int { if err == nil { return 0 } exit := 1 if err, ok := err.(*exec.ExitError); ok { if w, ok := err.ProcessState.Sys().(syscall.WaitStatus); ok { return w.ExitStatus() } } return exit } func (wm *workerManager) hasTodo() bool { return wm.finishCnt != len(wm.jobs) } func (wm *workerManager) maybePushToReadyQueue(j *job) { if j.numDeps != 0 { return } heap.Push(&wm.readyQueue, j) glog.V(1).Infof("ready: %s", j.n.Output) } func (wm *workerManager) handleNewDep(j *job, neededBy *job) { if j.numDeps < 0 { neededBy.numDeps-- if neededBy.id > 0 { panic("FIXME: already in WM... can this happen?") } } else { j.parents = append(j.parents, neededBy) } } func (wm *workerManager) Run() { done := false var err error Loop: for wm.hasTodo() || len(wm.busyWorkers) > 0 || len(wm.runnings) > 0 || !done { select { case j := <-wm.jobChan: glog.V(1).Infof("wait: %s (%d)", j.n.Output, j.numDeps) j.id = len(wm.jobs) + 1 wm.jobs = append(wm.jobs, j) wm.maybePushToReadyQueue(j) case jr := <-wm.resultChan: glog.V(1).Infof("done: %s", jr.j.n.Output) delete(wm.busyWorkers, jr.w) wm.freeWorkers = append(wm.freeWorkers, jr.w) wm.updateParents(jr.j) wm.finishCnt++ if jr.err == errNothingDone { wm.skipCnt++ jr.err = nil } if jr.err != nil { err = jr.err close(wm.stopChan) break Loop } case af := <-wm.newDepChan: wm.handleNewDep(af.j, af.neededBy) glog.V(1).Infof("dep: %s (%d) %s", af.neededBy.n.Output, af.neededBy.numDeps, af.j.n.Output) case done = <-wm.waitChan: } err = wm.handleJobs() if err != nil { break Loop } glog.V(1).Infof("job=%d ready=%d free=%d busy=%d", len(wm.jobs)-wm.finishCnt, wm.readyQueue.Len(), len(wm.freeWorkers), len(wm.busyWorkers)) } if !done { <-wm.waitChan } for _, w := range wm.freeWorkers { w.Wait() } for w := range wm.busyWorkers { w.Wait() } wm.doneChan <- err } func (wm *workerManager) PostJob(j *job) error { select { case wm.jobChan <- j: return nil case <-wm.stopChan: return errors.New("worker manager stopped") } } func (wm *workerManager) ReportResult(w *worker, j *job, err error) { select { case wm.resultChan <- jobResult{w: w, j: j, err: err}: case <-wm.stopChan: } } func (wm *workerManager) ReportNewDep(j *job, neededBy *job) { select { case wm.newDepChan <- newDep{j: j, neededBy: neededBy}: case <-wm.stopChan: } } func (wm *workerManager) Wait() (int, error) { wm.waitChan <- true err := <-wm.doneChan glog.V(2).Infof("finish %d skip %d", wm.finishCnt, wm.skipCnt) return wm.finishCnt - wm.skipCnt, err }