aboutsummaryrefslogtreecommitdiffstats
path: root/worker.go
diff options
context:
space:
mode:
authorShinichiro Hamaji <shinichiro.hamaji@gmail.com>2015-04-28 18:26:36 +0900
committerShinichiro Hamaji <shinichiro.hamaji@gmail.com>2015-04-28 18:29:51 +0900
commitdbc6c13f528ef1d1ded8281e737019b99d55d948 (patch)
tree6462ecc42353fd50b48852d02ba6c28d551c7f72 /worker.go
parent7a65e681c96cddad3ec2aa2007e067e67b713aa2 (diff)
downloadandroid_build_kati-dbc6c13f528ef1d1ded8281e737019b99d55d948.tar.gz
android_build_kati-dbc6c13f528ef1d1ded8281e737019b99d55d948.tar.bz2
android_build_kati-dbc6c13f528ef1d1ded8281e737019b99d55d948.zip
Introduce job queue
Diffstat (limited to 'worker.go')
-rw-r--r--worker.go59
1 files changed, 44 insertions, 15 deletions
diff --git a/worker.go b/worker.go
index 7639964..a444d4e 100644
--- a/worker.go
+++ b/worker.go
@@ -1,6 +1,7 @@
package main
import (
+ "container/heap"
"fmt"
"os/exec"
"strings"
@@ -43,6 +44,32 @@ type Worker struct {
doneChan chan bool
}
+type JobQueue []*Job
+
+func (jq JobQueue) Len() int { return len(jq) }
+
+func (jq JobQueue) Less(i, j int) bool {
+ // First come, first serve, for GNU make compatibility.
+ return jq[i].id < jq[j].id
+}
+
+func (jq JobQueue) Swap(i, j int) {
+ jq[i], jq[j] = jq[j], jq[i]
+}
+
+func (jq *JobQueue) Push(x interface{}) {
+ item := x.(*Job)
+ *jq = append(*jq, item)
+}
+
+func (jq *JobQueue) Pop() interface{} {
+ old := *jq
+ n := len(old)
+ item := old[n-1]
+ *jq = old[0 : n-1]
+ return item
+}
+
func NewWorker(wm *WorkerManager) *Worker {
w := &Worker{
wm: wm,
@@ -234,17 +261,10 @@ func (wm *WorkerManager) handleJobs() {
if len(wm.freeWorkers) == 0 {
return
}
- var j *Job
- // TODO(hamaji): This linear search is slow.
- for _, j2 := range wm.jobs {
- if j2.numDeps == 0 {
- j = j2
- break
- }
- }
- if j == nil {
+ if wm.readyQueue.Len() == 0 {
return
}
+ j := heap.Pop(&wm.readyQueue).(*Job)
j.numDeps = -1 // Do not let other workers pick this.
w := wm.freeWorkers[0]
wm.freeWorkers = wm.freeWorkers[1:]
@@ -259,11 +279,13 @@ func (wm *WorkerManager) updateParents(j *Job) {
if p.depsTs < j.outputTs {
p.depsTs = j.outputTs
}
+ wm.maybePushToReadyQueue(p)
}
}
type WorkerManager struct {
jobs []*Job
+ readyQueue JobQueue
jobChan chan *Job
resultChan chan JobResult
newDepChan chan NewDep
@@ -271,6 +293,8 @@ type WorkerManager struct {
doneChan chan bool
freeWorkers []*Worker
busyWorkers map[*Worker]bool
+
+ finishCnt int
}
func NewWorkerManager() *WorkerManager {
@@ -282,6 +306,7 @@ func NewWorkerManager() *WorkerManager {
doneChan: make(chan bool),
busyWorkers: make(map[*Worker]bool),
}
+ heap.Init(&wm.readyQueue)
for i := 0; i < jobsFlag; i++ {
w := NewWorker(wm)
wm.freeWorkers = append(wm.freeWorkers, w)
@@ -305,18 +330,20 @@ func exitStatus(err error) int {
}
func (wm *WorkerManager) hasTodo() bool {
- // TODO(hamaji): This linear search is slow.
- for _, j := range wm.jobs {
- if j.numDeps >= 0 {
- return true
- }
+ return wm.finishCnt != len(wm.jobs)
+}
+
+func (wm *WorkerManager) maybePushToReadyQueue(j *Job) {
+ if j.numDeps != 0 {
+ return
}
- return false
+ heap.Push(&wm.readyQueue, j)
}
func (wm *WorkerManager) handleNewDep(j *Job, neededBy *Job) {
if j.numDeps < 0 {
neededBy.numDeps--
+ wm.maybePushToReadyQueue(neededBy)
} else {
j.parents = append(j.parents, neededBy)
}
@@ -329,10 +356,12 @@ func (wm *WorkerManager) Run() {
case j := <-wm.jobChan:
j.id = len(wm.jobs)
wm.jobs = append(wm.jobs, j)
+ wm.maybePushToReadyQueue(j)
case jr := <-wm.resultChan:
delete(wm.busyWorkers, jr.w)
wm.freeWorkers = append(wm.freeWorkers, jr.w)
wm.updateParents(jr.j)
+ wm.finishCnt++
case af := <-wm.newDepChan:
wm.handleNewDep(af.j, af.neededBy)
case done = <-wm.waitChan: