From dbc6c13f528ef1d1ded8281e737019b99d55d948 Mon Sep 17 00:00:00 2001 From: Shinichiro Hamaji Date: Tue, 28 Apr 2015 18:26:36 +0900 Subject: Introduce job queue --- worker.go | 59 ++++++++++++++++++++++++++++++++++++++++++++--------------- 1 file changed, 44 insertions(+), 15 deletions(-) (limited to 'worker.go') diff --git a/worker.go b/worker.go index 7639964..a444d4e 100644 --- a/worker.go +++ b/worker.go @@ -1,6 +1,7 @@ package main import ( + "container/heap" "fmt" "os/exec" "strings" @@ -43,6 +44,32 @@ type Worker struct { doneChan chan bool } +type JobQueue []*Job + +func (jq JobQueue) Len() int { return len(jq) } + +func (jq JobQueue) Less(i, j int) bool { + // First come, first serve, for GNU make compatibility. + return jq[i].id < jq[j].id +} + +func (jq JobQueue) Swap(i, j int) { + jq[i], jq[j] = jq[j], jq[i] +} + +func (jq *JobQueue) Push(x interface{}) { + item := x.(*Job) + *jq = append(*jq, item) +} + +func (jq *JobQueue) Pop() interface{} { + old := *jq + n := len(old) + item := old[n-1] + *jq = old[0 : n-1] + return item +} + func NewWorker(wm *WorkerManager) *Worker { w := &Worker{ wm: wm, @@ -234,17 +261,10 @@ func (wm *WorkerManager) handleJobs() { if len(wm.freeWorkers) == 0 { return } - var j *Job - // TODO(hamaji): This linear search is slow. - for _, j2 := range wm.jobs { - if j2.numDeps == 0 { - j = j2 - break - } - } - if j == nil { + if wm.readyQueue.Len() == 0 { return } + j := heap.Pop(&wm.readyQueue).(*Job) j.numDeps = -1 // Do not let other workers pick this. w := wm.freeWorkers[0] wm.freeWorkers = wm.freeWorkers[1:] @@ -259,11 +279,13 @@ func (wm *WorkerManager) updateParents(j *Job) { if p.depsTs < j.outputTs { p.depsTs = j.outputTs } + wm.maybePushToReadyQueue(p) } } type WorkerManager struct { jobs []*Job + readyQueue JobQueue jobChan chan *Job resultChan chan JobResult newDepChan chan NewDep @@ -271,6 +293,8 @@ type WorkerManager struct { doneChan chan bool freeWorkers []*Worker busyWorkers map[*Worker]bool + + finishCnt int } func NewWorkerManager() *WorkerManager { @@ -282,6 +306,7 @@ func NewWorkerManager() *WorkerManager { doneChan: make(chan bool), busyWorkers: make(map[*Worker]bool), } + heap.Init(&wm.readyQueue) for i := 0; i < jobsFlag; i++ { w := NewWorker(wm) wm.freeWorkers = append(wm.freeWorkers, w) @@ -305,18 +330,20 @@ func exitStatus(err error) int { } func (wm *WorkerManager) hasTodo() bool { - // TODO(hamaji): This linear search is slow. - for _, j := range wm.jobs { - if j.numDeps >= 0 { - return true - } + return wm.finishCnt != len(wm.jobs) +} + +func (wm *WorkerManager) maybePushToReadyQueue(j *Job) { + if j.numDeps != 0 { + return } - return false + heap.Push(&wm.readyQueue, j) } func (wm *WorkerManager) handleNewDep(j *Job, neededBy *Job) { if j.numDeps < 0 { neededBy.numDeps-- + wm.maybePushToReadyQueue(neededBy) } else { j.parents = append(j.parents, neededBy) } @@ -329,10 +356,12 @@ func (wm *WorkerManager) Run() { case j := <-wm.jobChan: j.id = len(wm.jobs) wm.jobs = append(wm.jobs, j) + wm.maybePushToReadyQueue(j) case jr := <-wm.resultChan: delete(wm.busyWorkers, jr.w) wm.freeWorkers = append(wm.freeWorkers, jr.w) wm.updateParents(jr.j) + wm.finishCnt++ case af := <-wm.newDepChan: wm.handleNewDep(af.j, af.neededBy) case done = <-wm.waitChan: -- cgit v1.2.3