diff options
Diffstat (limited to 'golang/kati/worker.go')
| -rw-r--r-- | golang/kati/worker.go | 368 |
1 files changed, 368 insertions, 0 deletions
diff --git a/golang/kati/worker.go b/golang/kati/worker.go new file mode 100644 index 0000000..a339d23 --- /dev/null +++ b/golang/kati/worker.go @@ -0,0 +1,368 @@ +// Copyright 2015 Google Inc. All rights reserved +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package kati + +import ( + "container/heap" + "errors" + "fmt" + "os" + "os/exec" + "syscall" + "time" + + "github.com/golang/glog" +) + +var ( + errNothingDone = errors.New("nothing done") +) + +type job struct { + n *DepNode + ex *Executor + parents []*job + outputTs int64 + numDeps int + depsTs int64 + id int + + runners []runner +} + +type jobResult struct { + j *job + w *worker + err error +} + +type newDep struct { + j *job + neededBy *job +} + +type worker struct { + wm *workerManager + jobChan chan *job + waitChan chan bool + doneChan chan bool +} + +type jobQueue []*job + +func (jq jobQueue) Len() int { return len(jq) } +func (jq jobQueue) Swap(i, j int) { jq[i], jq[j] = jq[j], jq[i] } + +func (jq jobQueue) Less(i, j int) bool { + // First come, first serve, for GNU make compatibility. + return jq[i].id < jq[j].id +} + +func (jq *jobQueue) Push(x interface{}) { + item := x.(*job) + *jq = append(*jq, item) +} + +func (jq *jobQueue) Pop() interface{} { + old := *jq + n := len(old) + item := old[n-1] + *jq = old[0 : n-1] + return item +} + +func newWorker(wm *workerManager) *worker { + w := &worker{ + wm: wm, + jobChan: make(chan *job), + waitChan: make(chan bool), + doneChan: make(chan bool), + } + return w +} + +func (w *worker) Run() { + done := false + for !done { + select { + case j := <-w.jobChan: + err := j.build() + w.wm.ReportResult(w, j, err) + case done = <-w.waitChan: + } + } + w.doneChan <- true +} + +func (w *worker) PostJob(j *job) { + w.jobChan <- j +} + +func (w *worker) Wait() { + w.waitChan <- true + <-w.doneChan +} + +func (j *job) createRunners() ([]runner, error) { + runners, _, err := createRunners(j.ex.ctx, j.n) + return runners, err +} + +// TODO(ukai): use time.Time? +func getTimestamp(filename string) int64 { + st, err := os.Stat(filename) + if err != nil { + return -2 + } + return st.ModTime().Unix() +} + +func (j *job) build() error { + if j.n.IsPhony { + j.outputTs = -2 // trigger cmd even if all inputs don't exist. + } else { + j.outputTs = getTimestamp(j.n.Output) + } + + if !j.n.HasRule { + if j.outputTs >= 0 || j.n.IsPhony { + return errNothingDone + } + if len(j.parents) == 0 { + return fmt.Errorf("*** No rule to make target %q.", j.n.Output) + } + return fmt.Errorf("*** No rule to make target %q, needed by %q.", j.n.Output, j.parents[0].n.Output) + } + + if j.outputTs >= j.depsTs { + // TODO: stats. + return errNothingDone + } + + rr, err := j.createRunners() + if err != nil { + return err + } + if len(rr) == 0 { + return errNothingDone + } + for _, r := range rr { + err := r.run(j.n.Output) + glog.Warningf("cmd result for %q: %v", j.n.Output, err) + if err != nil { + exit := exitStatus(err) + return fmt.Errorf("*** [%s] Error %d", j.n.Output, exit) + } + } + + if j.n.IsPhony { + j.outputTs = time.Now().Unix() + } else { + j.outputTs = getTimestamp(j.n.Output) + if j.outputTs < 0 { + j.outputTs = time.Now().Unix() + } + } + return nil +} + +func (wm *workerManager) handleJobs() error { + for { + if len(wm.freeWorkers) == 0 { + return nil + } + if wm.readyQueue.Len() == 0 { + return nil + } + j := heap.Pop(&wm.readyQueue).(*job) + glog.V(1).Infof("run: %s", j.n.Output) + + j.numDeps = -1 // Do not let other workers pick this. + w := wm.freeWorkers[0] + wm.freeWorkers = wm.freeWorkers[1:] + wm.busyWorkers[w] = true + w.jobChan <- j + } +} + +func (wm *workerManager) updateParents(j *job) { + for _, p := range j.parents { + p.numDeps-- + glog.V(1).Infof("child: %s (%d)", p.n.Output, p.numDeps) + if p.depsTs < j.outputTs { + p.depsTs = j.outputTs + } + wm.maybePushToReadyQueue(p) + } +} + +type workerManager struct { + maxJobs int + jobs []*job + readyQueue jobQueue + jobChan chan *job + resultChan chan jobResult + newDepChan chan newDep + stopChan chan bool + waitChan chan bool + doneChan chan error + freeWorkers []*worker + busyWorkers map[*worker]bool + ex *Executor + runnings map[string]*job + + finishCnt int + skipCnt int +} + +func newWorkerManager(numJobs int) (*workerManager, error) { + wm := &workerManager{ + maxJobs: numJobs, + jobChan: make(chan *job), + resultChan: make(chan jobResult), + newDepChan: make(chan newDep), + stopChan: make(chan bool), + waitChan: make(chan bool), + doneChan: make(chan error), + busyWorkers: make(map[*worker]bool), + } + + wm.busyWorkers = make(map[*worker]bool) + for i := 0; i < numJobs; i++ { + w := newWorker(wm) + wm.freeWorkers = append(wm.freeWorkers, w) + go w.Run() + } + heap.Init(&wm.readyQueue) + go wm.Run() + return wm, nil +} + +func exitStatus(err error) int { + if err == nil { + return 0 + } + exit := 1 + if err, ok := err.(*exec.ExitError); ok { + if w, ok := err.ProcessState.Sys().(syscall.WaitStatus); ok { + return w.ExitStatus() + } + } + return exit +} + +func (wm *workerManager) hasTodo() bool { + return wm.finishCnt != len(wm.jobs) +} + +func (wm *workerManager) maybePushToReadyQueue(j *job) { + if j.numDeps != 0 { + return + } + heap.Push(&wm.readyQueue, j) + glog.V(1).Infof("ready: %s", j.n.Output) +} + +func (wm *workerManager) handleNewDep(j *job, neededBy *job) { + if j.numDeps < 0 { + neededBy.numDeps-- + if neededBy.id > 0 { + panic("FIXME: already in WM... can this happen?") + } + } else { + j.parents = append(j.parents, neededBy) + } +} + +func (wm *workerManager) Run() { + done := false + var err error +Loop: + for wm.hasTodo() || len(wm.busyWorkers) > 0 || len(wm.runnings) > 0 || !done { + select { + case j := <-wm.jobChan: + glog.V(1).Infof("wait: %s (%d)", j.n.Output, j.numDeps) + j.id = len(wm.jobs) + 1 + wm.jobs = append(wm.jobs, j) + wm.maybePushToReadyQueue(j) + case jr := <-wm.resultChan: + glog.V(1).Infof("done: %s", jr.j.n.Output) + delete(wm.busyWorkers, jr.w) + wm.freeWorkers = append(wm.freeWorkers, jr.w) + wm.updateParents(jr.j) + wm.finishCnt++ + if jr.err == errNothingDone { + wm.skipCnt++ + jr.err = nil + } + if jr.err != nil { + err = jr.err + close(wm.stopChan) + break Loop + } + case af := <-wm.newDepChan: + wm.handleNewDep(af.j, af.neededBy) + glog.V(1).Infof("dep: %s (%d) %s", af.neededBy.n.Output, af.neededBy.numDeps, af.j.n.Output) + case done = <-wm.waitChan: + } + err = wm.handleJobs() + if err != nil { + break Loop + } + + glog.V(1).Infof("job=%d ready=%d free=%d busy=%d", len(wm.jobs)-wm.finishCnt, wm.readyQueue.Len(), len(wm.freeWorkers), len(wm.busyWorkers)) + } + if !done { + <-wm.waitChan + } + + for _, w := range wm.freeWorkers { + w.Wait() + } + for w := range wm.busyWorkers { + w.Wait() + } + wm.doneChan <- err +} + +func (wm *workerManager) PostJob(j *job) error { + select { + case wm.jobChan <- j: + return nil + case <-wm.stopChan: + return errors.New("worker manager stopped") + } +} + +func (wm *workerManager) ReportResult(w *worker, j *job, err error) { + select { + case wm.resultChan <- jobResult{w: w, j: j, err: err}: + case <-wm.stopChan: + } +} + +func (wm *workerManager) ReportNewDep(j *job, neededBy *job) { + select { + case wm.newDepChan <- newDep{j: j, neededBy: neededBy}: + case <-wm.stopChan: + } +} + +func (wm *workerManager) Wait() (int, error) { + wm.waitChan <- true + err := <-wm.doneChan + glog.V(2).Infof("finish %d skip %d", wm.finishCnt, wm.skipCnt) + return wm.finishCnt - wm.skipCnt, err +} |
