aboutsummaryrefslogtreecommitdiffstats
path: root/golang/kati/worker.go
diff options
context:
space:
mode:
Diffstat (limited to 'golang/kati/worker.go')
-rw-r--r--golang/kati/worker.go368
1 files changed, 368 insertions, 0 deletions
diff --git a/golang/kati/worker.go b/golang/kati/worker.go
new file mode 100644
index 0000000..a339d23
--- /dev/null
+++ b/golang/kati/worker.go
@@ -0,0 +1,368 @@
+// Copyright 2015 Google Inc. All rights reserved
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package kati
+
+import (
+ "container/heap"
+ "errors"
+ "fmt"
+ "os"
+ "os/exec"
+ "syscall"
+ "time"
+
+ "github.com/golang/glog"
+)
+
+var (
+ errNothingDone = errors.New("nothing done")
+)
+
+type job struct {
+ n *DepNode
+ ex *Executor
+ parents []*job
+ outputTs int64
+ numDeps int
+ depsTs int64
+ id int
+
+ runners []runner
+}
+
+type jobResult struct {
+ j *job
+ w *worker
+ err error
+}
+
+type newDep struct {
+ j *job
+ neededBy *job
+}
+
+type worker struct {
+ wm *workerManager
+ jobChan chan *job
+ waitChan chan bool
+ doneChan chan bool
+}
+
+type jobQueue []*job
+
+func (jq jobQueue) Len() int { return len(jq) }
+func (jq jobQueue) Swap(i, j int) { jq[i], jq[j] = jq[j], jq[i] }
+
+func (jq jobQueue) Less(i, j int) bool {
+ // First come, first serve, for GNU make compatibility.
+ return jq[i].id < jq[j].id
+}
+
+func (jq *jobQueue) Push(x interface{}) {
+ item := x.(*job)
+ *jq = append(*jq, item)
+}
+
+func (jq *jobQueue) Pop() interface{} {
+ old := *jq
+ n := len(old)
+ item := old[n-1]
+ *jq = old[0 : n-1]
+ return item
+}
+
+func newWorker(wm *workerManager) *worker {
+ w := &worker{
+ wm: wm,
+ jobChan: make(chan *job),
+ waitChan: make(chan bool),
+ doneChan: make(chan bool),
+ }
+ return w
+}
+
+func (w *worker) Run() {
+ done := false
+ for !done {
+ select {
+ case j := <-w.jobChan:
+ err := j.build()
+ w.wm.ReportResult(w, j, err)
+ case done = <-w.waitChan:
+ }
+ }
+ w.doneChan <- true
+}
+
+func (w *worker) PostJob(j *job) {
+ w.jobChan <- j
+}
+
+func (w *worker) Wait() {
+ w.waitChan <- true
+ <-w.doneChan
+}
+
+func (j *job) createRunners() ([]runner, error) {
+ runners, _, err := createRunners(j.ex.ctx, j.n)
+ return runners, err
+}
+
+// TODO(ukai): use time.Time?
+func getTimestamp(filename string) int64 {
+ st, err := os.Stat(filename)
+ if err != nil {
+ return -2
+ }
+ return st.ModTime().Unix()
+}
+
+func (j *job) build() error {
+ if j.n.IsPhony {
+ j.outputTs = -2 // trigger cmd even if all inputs don't exist.
+ } else {
+ j.outputTs = getTimestamp(j.n.Output)
+ }
+
+ if !j.n.HasRule {
+ if j.outputTs >= 0 || j.n.IsPhony {
+ return errNothingDone
+ }
+ if len(j.parents) == 0 {
+ return fmt.Errorf("*** No rule to make target %q.", j.n.Output)
+ }
+ return fmt.Errorf("*** No rule to make target %q, needed by %q.", j.n.Output, j.parents[0].n.Output)
+ }
+
+ if j.outputTs >= j.depsTs {
+ // TODO: stats.
+ return errNothingDone
+ }
+
+ rr, err := j.createRunners()
+ if err != nil {
+ return err
+ }
+ if len(rr) == 0 {
+ return errNothingDone
+ }
+ for _, r := range rr {
+ err := r.run(j.n.Output)
+ glog.Warningf("cmd result for %q: %v", j.n.Output, err)
+ if err != nil {
+ exit := exitStatus(err)
+ return fmt.Errorf("*** [%s] Error %d", j.n.Output, exit)
+ }
+ }
+
+ if j.n.IsPhony {
+ j.outputTs = time.Now().Unix()
+ } else {
+ j.outputTs = getTimestamp(j.n.Output)
+ if j.outputTs < 0 {
+ j.outputTs = time.Now().Unix()
+ }
+ }
+ return nil
+}
+
+func (wm *workerManager) handleJobs() error {
+ for {
+ if len(wm.freeWorkers) == 0 {
+ return nil
+ }
+ if wm.readyQueue.Len() == 0 {
+ return nil
+ }
+ j := heap.Pop(&wm.readyQueue).(*job)
+ glog.V(1).Infof("run: %s", j.n.Output)
+
+ j.numDeps = -1 // Do not let other workers pick this.
+ w := wm.freeWorkers[0]
+ wm.freeWorkers = wm.freeWorkers[1:]
+ wm.busyWorkers[w] = true
+ w.jobChan <- j
+ }
+}
+
+func (wm *workerManager) updateParents(j *job) {
+ for _, p := range j.parents {
+ p.numDeps--
+ glog.V(1).Infof("child: %s (%d)", p.n.Output, p.numDeps)
+ if p.depsTs < j.outputTs {
+ p.depsTs = j.outputTs
+ }
+ wm.maybePushToReadyQueue(p)
+ }
+}
+
+type workerManager struct {
+ maxJobs int
+ jobs []*job
+ readyQueue jobQueue
+ jobChan chan *job
+ resultChan chan jobResult
+ newDepChan chan newDep
+ stopChan chan bool
+ waitChan chan bool
+ doneChan chan error
+ freeWorkers []*worker
+ busyWorkers map[*worker]bool
+ ex *Executor
+ runnings map[string]*job
+
+ finishCnt int
+ skipCnt int
+}
+
+func newWorkerManager(numJobs int) (*workerManager, error) {
+ wm := &workerManager{
+ maxJobs: numJobs,
+ jobChan: make(chan *job),
+ resultChan: make(chan jobResult),
+ newDepChan: make(chan newDep),
+ stopChan: make(chan bool),
+ waitChan: make(chan bool),
+ doneChan: make(chan error),
+ busyWorkers: make(map[*worker]bool),
+ }
+
+ wm.busyWorkers = make(map[*worker]bool)
+ for i := 0; i < numJobs; i++ {
+ w := newWorker(wm)
+ wm.freeWorkers = append(wm.freeWorkers, w)
+ go w.Run()
+ }
+ heap.Init(&wm.readyQueue)
+ go wm.Run()
+ return wm, nil
+}
+
+func exitStatus(err error) int {
+ if err == nil {
+ return 0
+ }
+ exit := 1
+ if err, ok := err.(*exec.ExitError); ok {
+ if w, ok := err.ProcessState.Sys().(syscall.WaitStatus); ok {
+ return w.ExitStatus()
+ }
+ }
+ return exit
+}
+
+func (wm *workerManager) hasTodo() bool {
+ return wm.finishCnt != len(wm.jobs)
+}
+
+func (wm *workerManager) maybePushToReadyQueue(j *job) {
+ if j.numDeps != 0 {
+ return
+ }
+ heap.Push(&wm.readyQueue, j)
+ glog.V(1).Infof("ready: %s", j.n.Output)
+}
+
+func (wm *workerManager) handleNewDep(j *job, neededBy *job) {
+ if j.numDeps < 0 {
+ neededBy.numDeps--
+ if neededBy.id > 0 {
+ panic("FIXME: already in WM... can this happen?")
+ }
+ } else {
+ j.parents = append(j.parents, neededBy)
+ }
+}
+
+func (wm *workerManager) Run() {
+ done := false
+ var err error
+Loop:
+ for wm.hasTodo() || len(wm.busyWorkers) > 0 || len(wm.runnings) > 0 || !done {
+ select {
+ case j := <-wm.jobChan:
+ glog.V(1).Infof("wait: %s (%d)", j.n.Output, j.numDeps)
+ j.id = len(wm.jobs) + 1
+ wm.jobs = append(wm.jobs, j)
+ wm.maybePushToReadyQueue(j)
+ case jr := <-wm.resultChan:
+ glog.V(1).Infof("done: %s", jr.j.n.Output)
+ delete(wm.busyWorkers, jr.w)
+ wm.freeWorkers = append(wm.freeWorkers, jr.w)
+ wm.updateParents(jr.j)
+ wm.finishCnt++
+ if jr.err == errNothingDone {
+ wm.skipCnt++
+ jr.err = nil
+ }
+ if jr.err != nil {
+ err = jr.err
+ close(wm.stopChan)
+ break Loop
+ }
+ case af := <-wm.newDepChan:
+ wm.handleNewDep(af.j, af.neededBy)
+ glog.V(1).Infof("dep: %s (%d) %s", af.neededBy.n.Output, af.neededBy.numDeps, af.j.n.Output)
+ case done = <-wm.waitChan:
+ }
+ err = wm.handleJobs()
+ if err != nil {
+ break Loop
+ }
+
+ glog.V(1).Infof("job=%d ready=%d free=%d busy=%d", len(wm.jobs)-wm.finishCnt, wm.readyQueue.Len(), len(wm.freeWorkers), len(wm.busyWorkers))
+ }
+ if !done {
+ <-wm.waitChan
+ }
+
+ for _, w := range wm.freeWorkers {
+ w.Wait()
+ }
+ for w := range wm.busyWorkers {
+ w.Wait()
+ }
+ wm.doneChan <- err
+}
+
+func (wm *workerManager) PostJob(j *job) error {
+ select {
+ case wm.jobChan <- j:
+ return nil
+ case <-wm.stopChan:
+ return errors.New("worker manager stopped")
+ }
+}
+
+func (wm *workerManager) ReportResult(w *worker, j *job, err error) {
+ select {
+ case wm.resultChan <- jobResult{w: w, j: j, err: err}:
+ case <-wm.stopChan:
+ }
+}
+
+func (wm *workerManager) ReportNewDep(j *job, neededBy *job) {
+ select {
+ case wm.newDepChan <- newDep{j: j, neededBy: neededBy}:
+ case <-wm.stopChan:
+ }
+}
+
+func (wm *workerManager) Wait() (int, error) {
+ wm.waitChan <- true
+ err := <-wm.doneChan
+ glog.V(2).Infof("finish %d skip %d", wm.finishCnt, wm.skipCnt)
+ return wm.finishCnt - wm.skipCnt, err
+}