From 744bb2b8d146eaba4d073cf58e35a60903e06de8 Mon Sep 17 00:00:00 2001 From: Fumitoshi Ukai Date: Thu, 25 Jun 2015 00:10:52 +0900 Subject: go gettable for github.com/google/kati --- worker.go | 38 ++++++++++++++++++++------------------ 1 file changed, 20 insertions(+), 18 deletions(-) (limited to 'worker.go') diff --git a/worker.go b/worker.go index 3e61db7..fdd095a 100644 --- a/worker.go +++ b/worker.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package main +package kati import ( "container/heap" @@ -87,7 +87,7 @@ func (jq *JobQueue) Pop() interface{} { return item } -func NewWorker(wm *WorkerManager) *Worker { +func newWorker(wm *WorkerManager) *Worker { w := &Worker{ wm: wm, jobChan: make(chan *Job), @@ -154,7 +154,7 @@ func newRunner(r runner, s string) runner { } switch s[0] { case '@': - if !dryRunFlag { + if !DryRunFlag { r.echo = false } s = s[1:] @@ -171,10 +171,10 @@ func newRunner(r runner, s string) runner { } func (r runner) run(output string) error { - if r.echo || dryRunFlag { + if r.echo || DryRunFlag { fmt.Printf("%s\n", r.cmd) } - if dryRunFlag { + if DryRunFlag { return nil } args := []string{r.shell, "-c", r.cmd} @@ -250,7 +250,7 @@ func (j *Job) build() { func (wm *WorkerManager) handleJobs() { for { - if !useParaFlag && len(wm.freeWorkers) == 0 { + if wm.para == nil && len(wm.freeWorkers) == 0 { return } if wm.readyQueue.Len() == 0 { @@ -259,7 +259,7 @@ func (wm *WorkerManager) handleJobs() { j := heap.Pop(&wm.readyQueue).(*Job) Logf("run: %s", j.n.Output) - if useParaFlag { + if wm.para != nil { j.runners = j.createRunners() if len(j.runners) == 0 { wm.updateParents(j) @@ -290,6 +290,7 @@ func (wm *WorkerManager) updateParents(j *Job) { } type WorkerManager struct { + maxJobs int jobs []*Job readyQueue JobQueue jobChan chan *Job @@ -307,8 +308,9 @@ type WorkerManager struct { finishCnt int } -func NewWorkerManager() *WorkerManager { +func NewWorkerManager(numJobs int, paraPath string) *WorkerManager { wm := &WorkerManager{ + maxJobs: numJobs, jobChan: make(chan *Job), resultChan: make(chan JobResult), newDepChan: make(chan NewDep), @@ -317,15 +319,15 @@ func NewWorkerManager() *WorkerManager { busyWorkers: make(map[*Worker]bool), } - if useParaFlag { + if paraPath != "" { wm.runnings = make(map[string]*Job) wm.paraChan = make(chan *ParaResult) - wm.para = NewParaWorker(wm.paraChan) + wm.para = newParaWorker(wm.paraChan, numJobs, paraPath) go wm.para.Run() } else { wm.busyWorkers = make(map[*Worker]bool) - for i := 0; i < jobsFlag; i++ { - w := NewWorker(wm) + for i := 0; i < numJobs; i++ { + w := newWorker(wm) wm.freeWorkers = append(wm.freeWorkers, w) go w.Run() } @@ -393,7 +395,7 @@ func (wm *WorkerManager) Run() { if pr.status < 0 && pr.signal < 0 { j := wm.runnings[pr.output] for _, r := range j.runners { - if r.echo || dryRunFlag { + if r.echo || DryRunFlag { fmt.Printf("%s\n", r.cmd) } } @@ -409,18 +411,18 @@ func (wm *WorkerManager) Run() { } wm.handleJobs() - if useParaFlag { + if wm.para != nil { numBusy := len(wm.runnings) - if numBusy > jobsFlag { - numBusy = jobsFlag + if numBusy > wm.maxJobs { + numBusy = wm.maxJobs } - Logf("job=%d ready=%d free=%d busy=%d", len(wm.jobs)-wm.finishCnt, wm.readyQueue.Len(), jobsFlag-numBusy, numBusy) + Logf("job=%d ready=%d free=%d busy=%d", len(wm.jobs)-wm.finishCnt, wm.readyQueue.Len(), wm.maxJobs-numBusy, numBusy) } else { Logf("job=%d ready=%d free=%d busy=%d", len(wm.jobs)-wm.finishCnt, wm.readyQueue.Len(), len(wm.freeWorkers), len(wm.busyWorkers)) } } - if useParaFlag { + if wm.para != nil { Logf("Wait for para to finish") wm.para.Wait() } else { -- cgit v1.2.3