aboutsummaryrefslogtreecommitdiffstats
path: root/worker.go
diff options
context:
space:
mode:
authorShinichiro Hamaji <shinichiro.hamaji@gmail.com>2015-05-13 17:03:20 +0900
committerShinichiro Hamaji <shinichiro.hamaji@gmail.com>2015-05-13 17:06:44 +0900
commitcedc5c87f6366c8ea9c14f0bf95e0257222f55ca (patch)
treec93bf6f16d0eceaa0bf27f2c1afb0fb918e9f5db /worker.go
parent5e07dd38678eb8ca7fb3157a882c28d0d12fa8f0 (diff)
downloadandroid_build_kati-cedc5c87f6366c8ea9c14f0bf95e0257222f55ca.tar.gz
android_build_kati-cedc5c87f6366c8ea9c14f0bf95e0257222f55ca.tar.bz2
android_build_kati-cedc5c87f6366c8ea9c14f0bf95e0257222f55ca.zip
Add para.go and para_test.go
Diffstat (limited to 'worker.go')
-rw-r--r--worker.go77
1 files changed, 59 insertions, 18 deletions
diff --git a/worker.go b/worker.go
index 5d3472f..f0220f6 100644
--- a/worker.go
+++ b/worker.go
@@ -3,6 +3,7 @@ package main
import (
"container/heap"
"fmt"
+ "os"
"os/exec"
"strings"
"syscall"
@@ -224,18 +225,24 @@ func (j Job) build() {
func (wm *WorkerManager) handleJobs() {
for {
- if len(wm.freeWorkers) == 0 {
+ if !useParaFlag && len(wm.freeWorkers) == 0 {
return
}
if wm.readyQueue.Len() == 0 {
return
}
j := heap.Pop(&wm.readyQueue).(*Job)
- j.numDeps = -1 // Do not let other workers pick this.
- w := wm.freeWorkers[0]
- wm.freeWorkers = wm.freeWorkers[1:]
- wm.busyWorkers[w] = true
- w.jobChan <- j
+
+ if useParaFlag {
+ wm.runnings[j.n.Output] = j
+ wm.para.RunCommand(j.createRunners())
+ } else {
+ j.numDeps = -1 // Do not let other workers pick this.
+ w := wm.freeWorkers[0]
+ wm.freeWorkers = wm.freeWorkers[1:]
+ wm.busyWorkers[w] = true
+ w.jobChan <- j
+ }
}
}
@@ -259,6 +266,10 @@ type WorkerManager struct {
doneChan chan bool
freeWorkers []*Worker
busyWorkers map[*Worker]bool
+ ex *Executor
+ para *ParaWorker
+ paraChan chan *ParaResult
+ runnings map[string]*Job
finishCnt int
}
@@ -272,12 +283,21 @@ func NewWorkerManager() *WorkerManager {
doneChan: make(chan bool),
busyWorkers: make(map[*Worker]bool),
}
- heap.Init(&wm.readyQueue)
- for i := 0; i < jobsFlag; i++ {
- w := NewWorker(wm)
- wm.freeWorkers = append(wm.freeWorkers, w)
- go w.Run()
+
+ if useParaFlag {
+ wm.runnings = make(map[string]*Job)
+ wm.paraChan = make(chan *ParaResult)
+ wm.para = NewParaWorker(wm.paraChan)
+ go wm.para.Run()
+ } else {
+ wm.busyWorkers = make(map[*Worker]bool)
+ for i := 0; i < jobsFlag; i++ {
+ w := NewWorker(wm)
+ wm.freeWorkers = append(wm.freeWorkers, w)
+ go w.Run()
+ }
}
+ heap.Init(&wm.readyQueue)
go wm.Run()
return wm
}
@@ -320,7 +340,7 @@ func (wm *WorkerManager) handleNewDep(j *Job, neededBy *Job) {
func (wm *WorkerManager) Run() {
done := false
- for wm.hasTodo() || len(wm.busyWorkers) > 0 || !done {
+ for wm.hasTodo() || len(wm.busyWorkers) > 0 || len(wm.runnings) > 0 || !done {
select {
case j := <-wm.jobChan:
j.id = len(wm.jobs) + 1
@@ -333,17 +353,38 @@ func (wm *WorkerManager) Run() {
wm.finishCnt++
case af := <-wm.newDepChan:
wm.handleNewDep(af.j, af.neededBy)
+ case pr := <-wm.paraChan:
+ os.Stdout.Write([]byte(pr.stdout))
+ os.Stderr.Write([]byte(pr.stderr))
+ j := wm.runnings[pr.output]
+ wm.updateParents(j)
+ delete(wm.runnings, pr.output)
+ wm.finishCnt++
case done = <-wm.waitChan:
}
wm.handleJobs()
- Log("job=%d ready=%d free=%d busy=%d", len(wm.jobs)-wm.finishCnt, wm.readyQueue.Len(), len(wm.freeWorkers), len(wm.busyWorkers))
- }
- for _, w := range wm.freeWorkers {
- w.Wait()
+ if useParaFlag {
+ numBusy := len(wm.runnings)
+ if numBusy > jobsFlag {
+ numBusy = jobsFlag
+ }
+ Log("job=%d ready=%d free=%d busy=%d", len(wm.jobs)-wm.finishCnt, wm.readyQueue.Len(), jobsFlag-numBusy, numBusy)
+ } else {
+ Log("job=%d ready=%d free=%d busy=%d", len(wm.jobs)-wm.finishCnt, wm.readyQueue.Len(), len(wm.freeWorkers), len(wm.busyWorkers))
+ }
}
- for w := range wm.busyWorkers {
- w.Wait()
+
+ if useParaFlag {
+ Log("Wait for para to finish")
+ wm.para.Wait()
+ } else {
+ for _, w := range wm.freeWorkers {
+ w.Wait()
+ }
+ for w := range wm.busyWorkers {
+ w.Wait()
+ }
}
wm.doneChan <- true
}