diff options
author | Shinichiro Hamaji <shinichiro.hamaji@gmail.com> | 2015-05-13 17:03:20 +0900 |
---|---|---|
committer | Shinichiro Hamaji <shinichiro.hamaji@gmail.com> | 2015-05-13 17:06:44 +0900 |
commit | cedc5c87f6366c8ea9c14f0bf95e0257222f55ca (patch) | |
tree | c93bf6f16d0eceaa0bf27f2c1afb0fb918e9f5db /worker.go | |
parent | 5e07dd38678eb8ca7fb3157a882c28d0d12fa8f0 (diff) | |
download | android_build_kati-cedc5c87f6366c8ea9c14f0bf95e0257222f55ca.tar.gz android_build_kati-cedc5c87f6366c8ea9c14f0bf95e0257222f55ca.tar.bz2 android_build_kati-cedc5c87f6366c8ea9c14f0bf95e0257222f55ca.zip |
Add para.go and para_test.go
Diffstat (limited to 'worker.go')
-rw-r--r-- | worker.go | 77 |
1 files changed, 59 insertions, 18 deletions
@@ -3,6 +3,7 @@ package main import ( "container/heap" "fmt" + "os" "os/exec" "strings" "syscall" @@ -224,18 +225,24 @@ func (j Job) build() { func (wm *WorkerManager) handleJobs() { for { - if len(wm.freeWorkers) == 0 { + if !useParaFlag && len(wm.freeWorkers) == 0 { return } if wm.readyQueue.Len() == 0 { return } j := heap.Pop(&wm.readyQueue).(*Job) - j.numDeps = -1 // Do not let other workers pick this. - w := wm.freeWorkers[0] - wm.freeWorkers = wm.freeWorkers[1:] - wm.busyWorkers[w] = true - w.jobChan <- j + + if useParaFlag { + wm.runnings[j.n.Output] = j + wm.para.RunCommand(j.createRunners()) + } else { + j.numDeps = -1 // Do not let other workers pick this. + w := wm.freeWorkers[0] + wm.freeWorkers = wm.freeWorkers[1:] + wm.busyWorkers[w] = true + w.jobChan <- j + } } } @@ -259,6 +266,10 @@ type WorkerManager struct { doneChan chan bool freeWorkers []*Worker busyWorkers map[*Worker]bool + ex *Executor + para *ParaWorker + paraChan chan *ParaResult + runnings map[string]*Job finishCnt int } @@ -272,12 +283,21 @@ func NewWorkerManager() *WorkerManager { doneChan: make(chan bool), busyWorkers: make(map[*Worker]bool), } - heap.Init(&wm.readyQueue) - for i := 0; i < jobsFlag; i++ { - w := NewWorker(wm) - wm.freeWorkers = append(wm.freeWorkers, w) - go w.Run() + + if useParaFlag { + wm.runnings = make(map[string]*Job) + wm.paraChan = make(chan *ParaResult) + wm.para = NewParaWorker(wm.paraChan) + go wm.para.Run() + } else { + wm.busyWorkers = make(map[*Worker]bool) + for i := 0; i < jobsFlag; i++ { + w := NewWorker(wm) + wm.freeWorkers = append(wm.freeWorkers, w) + go w.Run() + } } + heap.Init(&wm.readyQueue) go wm.Run() return wm } @@ -320,7 +340,7 @@ func (wm *WorkerManager) handleNewDep(j *Job, neededBy *Job) { func (wm *WorkerManager) Run() { done := false - for wm.hasTodo() || len(wm.busyWorkers) > 0 || !done { + for wm.hasTodo() || len(wm.busyWorkers) > 0 || len(wm.runnings) > 0 || !done { select { case j := <-wm.jobChan: j.id = len(wm.jobs) + 1 @@ -333,17 +353,38 @@ func (wm *WorkerManager) Run() { wm.finishCnt++ case af := <-wm.newDepChan: wm.handleNewDep(af.j, af.neededBy) + case pr := <-wm.paraChan: + os.Stdout.Write([]byte(pr.stdout)) + os.Stderr.Write([]byte(pr.stderr)) + j := wm.runnings[pr.output] + wm.updateParents(j) + delete(wm.runnings, pr.output) + wm.finishCnt++ case done = <-wm.waitChan: } wm.handleJobs() - Log("job=%d ready=%d free=%d busy=%d", len(wm.jobs)-wm.finishCnt, wm.readyQueue.Len(), len(wm.freeWorkers), len(wm.busyWorkers)) - } - for _, w := range wm.freeWorkers { - w.Wait() + if useParaFlag { + numBusy := len(wm.runnings) + if numBusy > jobsFlag { + numBusy = jobsFlag + } + Log("job=%d ready=%d free=%d busy=%d", len(wm.jobs)-wm.finishCnt, wm.readyQueue.Len(), jobsFlag-numBusy, numBusy) + } else { + Log("job=%d ready=%d free=%d busy=%d", len(wm.jobs)-wm.finishCnt, wm.readyQueue.Len(), len(wm.freeWorkers), len(wm.busyWorkers)) + } } - for w := range wm.busyWorkers { - w.Wait() + + if useParaFlag { + Log("Wait for para to finish") + wm.para.Wait() + } else { + for _, w := range wm.freeWorkers { + w.Wait() + } + for w := range wm.busyWorkers { + w.Wait() + } } wm.doneChan <- true } |