aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorShinichiro Hamaji <shinichiro.hamaji@gmail.com>2015-05-13 17:03:20 +0900
committerShinichiro Hamaji <shinichiro.hamaji@gmail.com>2015-05-13 17:06:44 +0900
commitcedc5c87f6366c8ea9c14f0bf95e0257222f55ca (patch)
treec93bf6f16d0eceaa0bf27f2c1afb0fb918e9f5db
parent5e07dd38678eb8ca7fb3157a882c28d0d12fa8f0 (diff)
downloadandroid_build_kati-cedc5c87f6366c8ea9c14f0bf95e0257222f55ca.tar.gz
android_build_kati-cedc5c87f6366c8ea9c14f0bf95e0257222f55ca.tar.bz2
android_build_kati-cedc5c87f6366c8ea9c14f0bf95e0257222f55ca.zip
Add para.go and para_test.go
-rw-r--r--Makefile4
-rw-r--r--exec.go2
-rw-r--r--main.go19
-rw-r--r--para.cc260
-rw-r--r--para.go165
-rw-r--r--para_test.go41
-rw-r--r--worker.go77
7 files changed, 520 insertions, 48 deletions
diff --git a/Makefile b/Makefile
index 7cc7dbb..30ae73d 100644
--- a/Makefile
+++ b/Makefile
@@ -5,11 +5,11 @@ all: kati go_test para
kati: $(GOSRC)
env $(shell go env) go build -o $@ *.go
-go_test: $(GOSRC)
+go_test: $(GOSRC) para
env $(shell go env) go test *.go
para: para.cc
- $(CXX) -std=c++11 -g -O -MMD -o $@ $<
+ $(CXX) -std=c++11 -g -O -W -Wall -MMD -o $@ $<
test: all
ruby runtest.rb
diff --git a/exec.go b/exec.go
index 4ba8302..4da02a7 100644
--- a/exec.go
+++ b/exec.go
@@ -292,7 +292,7 @@ func EvalCommands(nodes []*DepNode, vars Vars) {
runners, hasIO := ex.createRunners(n, true)
if hasIO {
ioCnt++
- if ioCnt % 100 == 0 {
+ if ioCnt%100 == 0 {
LogStats("%d/%d rules have IO", ioCnt, i+1)
}
continue
diff --git a/main.go b/main.go
index 5eabbb2..33af0bb 100644
--- a/main.go
+++ b/main.go
@@ -5,6 +5,7 @@ import (
"flag"
"fmt"
"os"
+ "path/filepath"
"runtime"
"runtime/pprof"
"strings"
@@ -29,6 +30,9 @@ var (
syntaxCheckOnlyFlag bool
queryFlag string
eagerCmdEvalFlag bool
+ useParaFlag bool
+
+ katiDir string
)
func parseFlags() {
@@ -51,6 +55,7 @@ func parseFlags() {
flag.BoolVar(&katiStatsFlag, "kati_stats", false, "Show a bunch of statistics")
flag.BoolVar(&katiEvalStatsFlag, "kati_eval_stats", false, "Show eval statistics")
flag.BoolVar(&eagerCmdEvalFlag, "eager_cmd_eval", false, "Eval commands first.")
+ flag.BoolVar(&useParaFlag, "use_para", false, "Use para.")
flag.BoolVar(&syntaxCheckOnlyFlag, "c", false, "Syntax check only.")
flag.StringVar(&queryFlag, "query", "", "Show the target info")
flag.Parse()
@@ -188,8 +193,22 @@ func getDepGraph(clvars []string, targets []string) ([]*DepNode, Vars) {
return nodes, vars
}
+func findKatiDir() {
+ switch runtime.GOOS {
+ case "linux":
+ kati, err := os.Readlink("/proc/self/exe")
+ if err != nil {
+ panic(err)
+ }
+ katiDir = filepath.Dir(kati)
+ default:
+ panic(fmt.Sprintf("unknown OS: %s", runtime.GOOS))
+ }
+}
+
func main() {
runtime.GOMAXPROCS(runtime.NumCPU())
+ findKatiDir()
parseFlags()
if cpuprofile != "" {
f, err := os.Create(cpuprofile)
diff --git a/para.cc b/para.cc
index d756c82..90a67de 100644
--- a/para.cc
+++ b/para.cc
@@ -1,6 +1,7 @@
#include <signal.h>
#include <stdio.h>
#include <stdlib.h>
+#include <string.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <unistd.h>
@@ -29,23 +30,42 @@ using namespace std;
class Para;
struct Task {
+ Task() : echo(false), ignore_error(false) {}
+ ~Task() {
+ if (stdout_pipe[0] >= 0)
+ PCHECK(close(stdout_pipe[0]));
+ if (stderr_pipe[0] >= 0)
+ PCHECK(close(stderr_pipe[0]));
+ }
+
+ string output;
string shell;
string cmd;
+ bool echo;
+ bool ignore_error;
+
pid_t pid;
+ int stdout_pipe[2];
+ int stderr_pipe[2];
+ string stdout_buf;
+ string stderr_buf;
+ int status;
};
class TaskProvider {
public:
virtual ~TaskProvider() {}
virtual int GetFD() = 0;
- virtual void PollFD(Para* para) = 0;
+ virtual void PollFD(Para* para, int fd) = 0;
+ virtual void OnFinished(Task* t) = 0;
};
class Para {
public:
Para(TaskProvider* provider, int num_jobs);
+ ~Para();
- void AddTask(const Task& task);
+ void AddTask(Task* task);
void Loop();
@@ -58,7 +78,7 @@ class Para {
void RunCommands();
TaskProvider* provider_;
- int num_jobs_;
+ size_t num_jobs_;
int sig_pipe_[2];
int provider_fd_;
queue<Task*> tasks_;
@@ -70,7 +90,19 @@ class StdinTaskProvider : public TaskProvider {
public:
virtual ~StdinTaskProvider() {}
virtual int GetFD() { return STDIN_FILENO; }
- virtual void PollFD(Para* para);
+ virtual void PollFD(Para* para, int fd);
+ virtual void OnFinished(Task* t);
+
+ private:
+ string buf_;
+};
+
+class KatiTaskProvider : public TaskProvider {
+ public:
+ virtual ~KatiTaskProvider() {}
+ virtual int GetFD() { return STDIN_FILENO; }
+ virtual void PollFD(Para* para, int fd);
+ virtual void OnFinished(Task* t);
private:
string buf_;
@@ -88,21 +120,44 @@ Para::Para(TaskProvider* provider, int num_jobs)
sigemptyset(&sigmask);
sigaddset(&sigmask, SIGCHLD);
PCHECK(sigprocmask(SIG_BLOCK, &sigmask, NULL));
- PCHECK(signal(SIGCHLD, &Para::WakeUp));
+ CHECK(signal(SIGCHLD, &Para::WakeUp) != SIG_ERR);
provider_fd_ = provider_->GetFD();
}
-void Para::AddTask(const Task& task) {
- Task* t = new Task(task);
- t->pid = 0;
- tasks_.push(t);
+Para::~Para() {
+ PCHECK(close(sig_pipe_[0]));
+ PCHECK(close(sig_pipe_[1]));
+}
+
+void Para::AddTask(Task* task) {
+ task->pid = 0;
+ tasks_.push(task);
}
static void SetFd(int fd, fd_set* fdset, int* nfds) {
+ if (fd < 0)
+ return;
FD_SET(fd, fdset);
*nfds = max(*nfds, fd);
}
+static void readOutput(int* fd, string* buf) {
+ char b[4096];
+ ssize_t r = read(*fd, b, sizeof(b));
+ if (r < 0 && errno != EINTR)
+ return;
+ PCHECK(r);
+ if (r == 0) {
+ PCHECK(close(*fd));
+ *fd = -1;
+ return;
+ }
+
+ size_t l = buf->size();
+ buf->resize(l + r);
+ memcpy(&((*buf)[l]), b, r);
+}
+
void Para::Loop() {
sigset_t sigmask;
sigemptyset(&sigmask);
@@ -114,17 +169,29 @@ void Para::Loop() {
if (!done_)
SetFd(provider_fd_, &rd, &nfds);
SetFd(sig_pipe_[0], &rd, &nfds);
+ for (Task* t : running_) {
+ SetFd(t->stdout_pipe[0], &rd, &nfds);
+ SetFd(t->stderr_pipe[0], &rd, &nfds);
+ }
int r = pselect(nfds, &rd, NULL, NULL, NULL, &sigmask);
PCHECK(r && errno != EINTR);
- if (FD_ISSET(provider_fd_, &rd)) {
- provider_->PollFD(this);
- }
if (FD_ISSET(sig_pipe_[0], &rd)) {
WaitChildren();
+ RunCommands();
+ continue;
+ }
+ if (FD_ISSET(provider_fd_, &rd)) {
+ provider_->PollFD(this, provider_fd_);
+ RunCommands();
+ continue;
+ }
+ for (Task* t : running_) {
+ if (t->stdout_pipe[0] >= 0 && FD_ISSET(t->stdout_pipe[0], &rd))
+ readOutput(&t->stdout_pipe[0], &t->stdout_buf);
+ if (t->stderr_pipe[0] >= 0 && FD_ISSET(t->stderr_pipe[0], &rd))
+ readOutput(&t->stderr_pipe[0], &t->stderr_buf);
}
-
- RunCommands();
}
}
@@ -138,31 +205,51 @@ void Para::WakeUp(int) {
}
void Para::WaitChildren() {
+ char c = 0;
+ PCHECK(read(sig_pipe_[0], &c, 1));
+ CHECK(c == 42);
+
vector<Task*> finished;
for (Task* task : running_) {
- int status;
- pid_t pid = waitpid(task->pid, &status, WNOHANG);
+ pid_t pid = waitpid(task->pid, &task->status, WNOHANG);
PCHECK(pid);
if (pid == 0) {
continue;
}
CHECK(pid == task->pid);
+
+ while (task->stdout_pipe[0] >= 0)
+ readOutput(&task->stdout_pipe[0], &task->stdout_buf);
+ while (task->stderr_pipe[0] >= 0)
+ readOutput(&task->stderr_pipe[0], &task->stderr_buf);
+
// TODO: Handle error.
finished.push_back(task);
}
for (Task* task : finished) {
running_.erase(task);
+ provider_->OnFinished(task);
delete task;
}
}
void Para::RunCommands() {
- while (!tasks_.empty() && running_.size() < num_jobs_) {
+ while (!tasks_.empty() && (running_.size() < num_jobs_ || num_jobs_ == 0)) {
Task* task = tasks_.front();
tasks_.pop();
+
+ PCHECK(pipe(task->stdout_pipe));
+ PCHECK(pipe(task->stderr_pipe));
task->pid = fork();
if (task->pid == 0) {
+ PCHECK(close(task->stdout_pipe[0]));
+ PCHECK(close(task->stderr_pipe[0]));
+ PCHECK(dup2(task->stdout_pipe[1], STDOUT_FILENO));
+ PCHECK(dup2(task->stderr_pipe[1], STDERR_FILENO));
+ PCHECK(close(task->stdout_pipe[1]));
+ PCHECK(close(task->stderr_pipe[1]));
+
const char* args[] = {
task->shell.c_str(),
"-c",
@@ -172,14 +259,16 @@ void Para::RunCommands() {
PCHECK(execvp(args[0], const_cast<char* const*>(args)));
abort();
}
+
+ PCHECK(close(task->stdout_pipe[1]));
+ PCHECK(close(task->stderr_pipe[1]));
running_.insert(task);
}
}
-void StdinTaskProvider::PollFD(Para* para) {
- const int BUF_SIZE = 4096;
- char buf[BUF_SIZE];
- ssize_t r = read(STDIN_FILENO, buf, BUF_SIZE);
+void StdinTaskProvider::PollFD(Para* para, int fd) {
+ char buf[4096];
+ ssize_t r = read(fd, buf, sizeof(buf));
PCHECK(r);
if (r == 0) {
para->Done();
@@ -193,19 +282,136 @@ void StdinTaskProvider::PollFD(Para* para) {
break;
}
- Task task;
- task.shell = "/bin/sh";
- task.cmd = buf_.substr(0, index);
- if (task.cmd.empty())
+ Task* task = new Task();
+ task->shell = "/bin/sh";
+ task->cmd = buf_.substr(0, index);
+ if (task->cmd.empty())
continue;
para->AddTask(task);
buf_ = buf_.substr(index + 1);
}
}
+void StdinTaskProvider::OnFinished(Task* t) {
+ fprintf(stdout, "%s", t->stdout_buf.c_str());
+ fprintf(stderr, "%s", t->stderr_buf.c_str());
+}
+
+struct Runner {
+ string output;
+ string cmd;
+ string shell;
+ bool echo;
+ bool ignore_error;
+};
+
+static void recvData(int fd, void* d, size_t sz) {
+ size_t s = 0;
+ while (s < sz) {
+ ssize_t r = read(fd, reinterpret_cast<char*>(d) + s, sz - s);
+ if (r < 0 && errno == EINTR)
+ continue;
+ PCHECK(r);
+ if (r == 0) {
+ exit(1);
+ }
+ s += r;
+ }
+}
+
+static int recvInt(int fd) {
+ int v;
+ recvData(fd, &v, sizeof(v));
+ return v;
+}
+
+static void recvString(int fd, string* s) {
+ int l = recvInt(fd);
+ s->resize(l);
+ recvData(fd, &((*s)[0]), l);
+}
+
+static void recvTasks(int fd, vector<Task*>* tasks) {
+ int l = recvInt(fd);
+ for (int i = 0; i < l; i++) {
+ Task* r = new Task();
+ recvString(fd, &r->output);
+ recvString(fd, &r->cmd);
+ recvString(fd, &r->shell);
+ r->echo = recvInt(fd);
+ r->ignore_error = recvInt(fd);
+ tasks->push_back(r);
+ }
+}
+
+void KatiTaskProvider::PollFD(Para* para, int fd) {
+ vector<Task*> tasks;
+ recvTasks(fd, &tasks);
+ for (Task* t : tasks) {
+ para->AddTask(t);
+ }
+}
+
+static void sendData(int fd, const void* d, size_t sz) {
+ size_t s = 0;
+ while (s < sz) {
+ ssize_t r = write(fd, reinterpret_cast<const char*>(d) + s, sz - s);
+ if (r < 0 && errno == EINTR)
+ continue;
+ PCHECK(r);
+ if (r == 0) {
+ exit(1);
+ }
+ s += r;
+ }
+}
+
+static void sendInt(int fd, int v) {
+ sendData(fd, &v, sizeof(v));
+}
+
+static void sendString(int fd, const string& s) {
+ sendInt(fd, s.size());
+ sendData(fd, s.data(), s.size());
+}
+
+static void sendResult(int fd, Task* t) {
+ sendString(fd, t->output);
+ sendString(fd, t->stdout_buf);
+ sendString(fd, t->stderr_buf);
+ sendInt(fd, t->status);
+}
+
+void KatiTaskProvider::OnFinished(Task* t) {
+ sendResult(STDOUT_FILENO, t);
+}
+
+int GetNumCpus() {
+ // TODO: Implement.
+ return 4;
+}
+
int main(int argc, char* argv[]) {
+ int num_jobs = -1;
+ bool from_kati = false;
+ for (int i = 1; i < argc; i++) {
+ char* arg = argv[i];
+ if (!strncmp(arg, "-j", 2)) {
+ num_jobs = atoi(arg + 2);
+ } else if (!strcmp(arg, "--kati")) {
+ from_kati = true;
+ }
+ }
+ if (num_jobs < 0) {
+ num_jobs = GetNumCpus();
+ }
+
TaskProvider* provider = NULL;
- provider = new StdinTaskProvider();
- Para para(provider, 4);
+ if (from_kati) {
+ provider = new KatiTaskProvider();
+ } else {
+ provider = new StdinTaskProvider();
+ }
+ Para para(provider, num_jobs);
para.Loop();
}
diff --git a/para.go b/para.go
new file mode 100644
index 0000000..6985a68
--- /dev/null
+++ b/para.go
@@ -0,0 +1,165 @@
+package main
+
+import (
+ "bufio"
+ "encoding/binary"
+ "fmt"
+ "io"
+ "os/exec"
+ "path/filepath"
+)
+
+func btoi(b bool) int {
+ if b {
+ return 1
+ } else {
+ return 0
+ }
+}
+
+func sendMsg(w io.Writer, data []byte) {
+ for len(data) != 0 {
+ written, err := w.Write(data)
+ if err == io.EOF {
+ return
+ }
+ if err != nil {
+ panic(err)
+ }
+ data = data[written:]
+ }
+}
+
+func sendInt(w io.Writer, i int) {
+ v := int32(i)
+ binary.Write(w, binary.LittleEndian, &v)
+}
+
+func sendString(w io.Writer, s string) {
+ sendInt(w, len(s))
+ sendMsg(w, []byte(s))
+}
+
+func sendRunners(w io.Writer, runners []runner) {
+ sendInt(w, len(runners))
+ for _, r := range runners {
+ sendString(w, r.output)
+ sendString(w, r.cmd)
+ sendString(w, r.shell)
+ sendInt(w, btoi(r.echo))
+ sendInt(w, btoi(r.ignoreError))
+ }
+}
+
+type ParaResult struct {
+ output string
+ stdout string
+ stderr string
+ status int
+}
+
+func recvInt(r *bufio.Reader) (int, error) {
+ var v int32
+ err := binary.Read(r, binary.LittleEndian, &v)
+ return int(v), err
+}
+
+func recvString(r *bufio.Reader) (string, error) {
+ l, err := recvInt(r)
+ if err != nil {
+ return "", err
+ }
+ buf := make([]byte, l)
+ read := 0
+ for read < len(buf) {
+ r, err := r.Read(buf[read:])
+ if err != nil {
+ return "", err
+ }
+ read += r
+ }
+ return string(buf), nil
+}
+
+func recvResult(r *bufio.Reader) (*ParaResult, error) {
+ output, err := recvString(r)
+ if err != nil {
+ return nil, err
+ }
+ stdout, err := recvString(r)
+ if err != nil {
+ return nil, err
+ }
+ stderr, err := recvString(r)
+ if err != nil {
+ return nil, err
+ }
+ status, err := recvInt(r)
+ if err != nil {
+ return nil, err
+ }
+ return &ParaResult{
+ output: output,
+ stdout: stdout,
+ stderr: stderr,
+ status: status,
+ }, nil
+}
+
+type ParaWorker struct {
+ para *exec.Cmd
+ paraChan chan *ParaResult
+ stdin io.WriteCloser
+ stdout *bufio.Reader
+ doneChan chan bool
+}
+
+func NewParaWorker(paraChan chan *ParaResult) *ParaWorker {
+ bin := filepath.Join(katiDir, "para")
+ para := exec.Command(bin, fmt.Sprintf("-j%d", jobsFlag), "--kati")
+ stdin, err := para.StdinPipe()
+ if err != nil {
+ panic(err)
+ }
+ stdout, err := para.StdoutPipe()
+ if err != nil {
+ panic(err)
+ }
+ err = para.Start()
+ if err != nil {
+ panic(err)
+ }
+ return &ParaWorker{
+ para: para,
+ paraChan: paraChan,
+ stdin: stdin,
+ stdout: bufio.NewReader(stdout),
+ doneChan: make(chan bool),
+ }
+}
+
+func (para *ParaWorker) Run() {
+ for {
+ r, err := recvResult(para.stdout)
+ if err == io.EOF {
+ break
+ }
+ if err != nil {
+ panic(err)
+ }
+ para.paraChan <- r
+ }
+ para.para.Process.Kill()
+ para.para.Process.Wait()
+ para.doneChan <- true
+}
+
+func (para *ParaWorker) Wait() {
+ fmt.Printf("wait\n")
+ para.stdin.Close()
+ <-para.doneChan
+}
+
+func (para *ParaWorker) RunCommand(runners []runner) {
+ sendRunners(para.stdin, runners)
+}
diff --git a/para_test.go b/para_test.go
new file mode 100644
index 0000000..1475ce2
--- /dev/null
+++ b/para_test.go
@@ -0,0 +1,41 @@
+package main
+
+import (
+ "fmt"
+ "path/filepath"
+ "testing"
+)
+
+func TestPara(t *testing.T) {
+ cwd, err := filepath.Abs(".")
+ if err != nil {
+ panic(err)
+ }
+ katiDir = cwd
+
+ paraChan := make(chan *ParaResult)
+ para := NewParaWorker(paraChan)
+ go para.Run()
+
+ num_tasks := 100
+ for i := 0; i < num_tasks; i++ {
+ runners := []runner{
+ {
+ output: fmt.Sprintf("%d", i),
+ cmd: fmt.Sprintf("echo test%d 2>&1", i),
+ shell: "/bin/sh",
+ },
+ }
+ para.RunCommand(runners)
+ }
+
+ var results []*ParaResult
+ for len(results) != num_tasks {
+ select {
+ case r := <-paraChan:
+ results = append(results, r)
+ }
+ }
+
+ para.Wait()
+}
diff --git a/worker.go b/worker.go
index 5d3472f..f0220f6 100644
--- a/worker.go
+++ b/worker.go
@@ -3,6 +3,7 @@ package main
import (
"container/heap"
"fmt"
+ "os"
"os/exec"
"strings"
"syscall"
@@ -224,18 +225,24 @@ func (j Job) build() {
func (wm *WorkerManager) handleJobs() {
for {
- if len(wm.freeWorkers) == 0 {
+ if !useParaFlag && len(wm.freeWorkers) == 0 {
return
}
if wm.readyQueue.Len() == 0 {
return
}
j := heap.Pop(&wm.readyQueue).(*Job)
- j.numDeps = -1 // Do not let other workers pick this.
- w := wm.freeWorkers[0]
- wm.freeWorkers = wm.freeWorkers[1:]
- wm.busyWorkers[w] = true
- w.jobChan <- j
+
+ if useParaFlag {
+ wm.runnings[j.n.Output] = j
+ wm.para.RunCommand(j.createRunners())
+ } else {
+ j.numDeps = -1 // Do not let other workers pick this.
+ w := wm.freeWorkers[0]
+ wm.freeWorkers = wm.freeWorkers[1:]
+ wm.busyWorkers[w] = true
+ w.jobChan <- j
+ }
}
}
@@ -259,6 +266,10 @@ type WorkerManager struct {
doneChan chan bool
freeWorkers []*Worker
busyWorkers map[*Worker]bool
+ ex *Executor
+ para *ParaWorker
+ paraChan chan *ParaResult
+ runnings map[string]*Job
finishCnt int
}
@@ -272,12 +283,21 @@ func NewWorkerManager() *WorkerManager {
doneChan: make(chan bool),
busyWorkers: make(map[*Worker]bool),
}
- heap.Init(&wm.readyQueue)
- for i := 0; i < jobsFlag; i++ {
- w := NewWorker(wm)
- wm.freeWorkers = append(wm.freeWorkers, w)
- go w.Run()
+
+ if useParaFlag {
+ wm.runnings = make(map[string]*Job)
+ wm.paraChan = make(chan *ParaResult)
+ wm.para = NewParaWorker(wm.paraChan)
+ go wm.para.Run()
+ } else {
+ wm.busyWorkers = make(map[*Worker]bool)
+ for i := 0; i < jobsFlag; i++ {
+ w := NewWorker(wm)
+ wm.freeWorkers = append(wm.freeWorkers, w)
+ go w.Run()
+ }
}
+ heap.Init(&wm.readyQueue)
go wm.Run()
return wm
}
@@ -320,7 +340,7 @@ func (wm *WorkerManager) handleNewDep(j *Job, neededBy *Job) {
func (wm *WorkerManager) Run() {
done := false
- for wm.hasTodo() || len(wm.busyWorkers) > 0 || !done {
+ for wm.hasTodo() || len(wm.busyWorkers) > 0 || len(wm.runnings) > 0 || !done {
select {
case j := <-wm.jobChan:
j.id = len(wm.jobs) + 1
@@ -333,17 +353,38 @@ func (wm *WorkerManager) Run() {
wm.finishCnt++
case af := <-wm.newDepChan:
wm.handleNewDep(af.j, af.neededBy)
+ case pr := <-wm.paraChan:
+ os.Stdout.Write([]byte(pr.stdout))
+ os.Stderr.Write([]byte(pr.stderr))
+ j := wm.runnings[pr.output]
+ wm.updateParents(j)
+ delete(wm.runnings, pr.output)
+ wm.finishCnt++
case done = <-wm.waitChan:
}
wm.handleJobs()
- Log("job=%d ready=%d free=%d busy=%d", len(wm.jobs)-wm.finishCnt, wm.readyQueue.Len(), len(wm.freeWorkers), len(wm.busyWorkers))
- }
- for _, w := range wm.freeWorkers {
- w.Wait()
+ if useParaFlag {
+ numBusy := len(wm.runnings)
+ if numBusy > jobsFlag {
+ numBusy = jobsFlag
+ }
+ Log("job=%d ready=%d free=%d busy=%d", len(wm.jobs)-wm.finishCnt, wm.readyQueue.Len(), jobsFlag-numBusy, numBusy)
+ } else {
+ Log("job=%d ready=%d free=%d busy=%d", len(wm.jobs)-wm.finishCnt, wm.readyQueue.Len(), len(wm.freeWorkers), len(wm.busyWorkers))
+ }
}
- for w := range wm.busyWorkers {
- w.Wait()
+
+ if useParaFlag {
+ Log("Wait for para to finish")
+ wm.para.Wait()
+ } else {
+ for _, w := range wm.freeWorkers {
+ w.Wait()
+ }
+ for w := range wm.busyWorkers {
+ w.Wait()
+ }
}
wm.doneChan <- true
}