diff options
author | Shinichiro Hamaji <shinichiro.hamaji@gmail.com> | 2015-05-13 17:03:20 +0900 |
---|---|---|
committer | Shinichiro Hamaji <shinichiro.hamaji@gmail.com> | 2015-05-13 17:06:44 +0900 |
commit | cedc5c87f6366c8ea9c14f0bf95e0257222f55ca (patch) | |
tree | c93bf6f16d0eceaa0bf27f2c1afb0fb918e9f5db | |
parent | 5e07dd38678eb8ca7fb3157a882c28d0d12fa8f0 (diff) | |
download | android_build_kati-cedc5c87f6366c8ea9c14f0bf95e0257222f55ca.tar.gz android_build_kati-cedc5c87f6366c8ea9c14f0bf95e0257222f55ca.tar.bz2 android_build_kati-cedc5c87f6366c8ea9c14f0bf95e0257222f55ca.zip |
Add para.go and para_test.go
-rw-r--r-- | Makefile | 4 | ||||
-rw-r--r-- | exec.go | 2 | ||||
-rw-r--r-- | main.go | 19 | ||||
-rw-r--r-- | para.cc | 260 | ||||
-rw-r--r-- | para.go | 165 | ||||
-rw-r--r-- | para_test.go | 41 | ||||
-rw-r--r-- | worker.go | 77 |
7 files changed, 520 insertions, 48 deletions
@@ -5,11 +5,11 @@ all: kati go_test para kati: $(GOSRC) env $(shell go env) go build -o $@ *.go -go_test: $(GOSRC) +go_test: $(GOSRC) para env $(shell go env) go test *.go para: para.cc - $(CXX) -std=c++11 -g -O -MMD -o $@ $< + $(CXX) -std=c++11 -g -O -W -Wall -MMD -o $@ $< test: all ruby runtest.rb @@ -292,7 +292,7 @@ func EvalCommands(nodes []*DepNode, vars Vars) { runners, hasIO := ex.createRunners(n, true) if hasIO { ioCnt++ - if ioCnt % 100 == 0 { + if ioCnt%100 == 0 { LogStats("%d/%d rules have IO", ioCnt, i+1) } continue @@ -5,6 +5,7 @@ import ( "flag" "fmt" "os" + "path/filepath" "runtime" "runtime/pprof" "strings" @@ -29,6 +30,9 @@ var ( syntaxCheckOnlyFlag bool queryFlag string eagerCmdEvalFlag bool + useParaFlag bool + + katiDir string ) func parseFlags() { @@ -51,6 +55,7 @@ func parseFlags() { flag.BoolVar(&katiStatsFlag, "kati_stats", false, "Show a bunch of statistics") flag.BoolVar(&katiEvalStatsFlag, "kati_eval_stats", false, "Show eval statistics") flag.BoolVar(&eagerCmdEvalFlag, "eager_cmd_eval", false, "Eval commands first.") + flag.BoolVar(&useParaFlag, "use_para", false, "Use para.") flag.BoolVar(&syntaxCheckOnlyFlag, "c", false, "Syntax check only.") flag.StringVar(&queryFlag, "query", "", "Show the target info") flag.Parse() @@ -188,8 +193,22 @@ func getDepGraph(clvars []string, targets []string) ([]*DepNode, Vars) { return nodes, vars } +func findKatiDir() { + switch runtime.GOOS { + case "linux": + kati, err := os.Readlink("/proc/self/exe") + if err != nil { + panic(err) + } + katiDir = filepath.Dir(kati) + default: + panic(fmt.Sprintf("unknown OS: %s", runtime.GOOS)) + } +} + func main() { runtime.GOMAXPROCS(runtime.NumCPU()) + findKatiDir() parseFlags() if cpuprofile != "" { f, err := os.Create(cpuprofile) @@ -1,6 +1,7 @@ #include <signal.h> #include <stdio.h> #include <stdlib.h> +#include <string.h> #include <sys/types.h> #include <sys/wait.h> #include <unistd.h> @@ -29,23 +30,42 @@ using namespace std; class Para; struct Task { + Task() : echo(false), ignore_error(false) {} + ~Task() { + if (stdout_pipe[0] >= 0) + PCHECK(close(stdout_pipe[0])); + if (stderr_pipe[0] >= 0) + PCHECK(close(stderr_pipe[0])); + } + + string output; string shell; string cmd; + bool echo; + bool ignore_error; + pid_t pid; + int stdout_pipe[2]; + int stderr_pipe[2]; + string stdout_buf; + string stderr_buf; + int status; }; class TaskProvider { public: virtual ~TaskProvider() {} virtual int GetFD() = 0; - virtual void PollFD(Para* para) = 0; + virtual void PollFD(Para* para, int fd) = 0; + virtual void OnFinished(Task* t) = 0; }; class Para { public: Para(TaskProvider* provider, int num_jobs); + ~Para(); - void AddTask(const Task& task); + void AddTask(Task* task); void Loop(); @@ -58,7 +78,7 @@ class Para { void RunCommands(); TaskProvider* provider_; - int num_jobs_; + size_t num_jobs_; int sig_pipe_[2]; int provider_fd_; queue<Task*> tasks_; @@ -70,7 +90,19 @@ class StdinTaskProvider : public TaskProvider { public: virtual ~StdinTaskProvider() {} virtual int GetFD() { return STDIN_FILENO; } - virtual void PollFD(Para* para); + virtual void PollFD(Para* para, int fd); + virtual void OnFinished(Task* t); + + private: + string buf_; +}; + +class KatiTaskProvider : public TaskProvider { + public: + virtual ~KatiTaskProvider() {} + virtual int GetFD() { return STDIN_FILENO; } + virtual void PollFD(Para* para, int fd); + virtual void OnFinished(Task* t); private: string buf_; @@ -88,21 +120,44 @@ Para::Para(TaskProvider* provider, int num_jobs) sigemptyset(&sigmask); sigaddset(&sigmask, SIGCHLD); PCHECK(sigprocmask(SIG_BLOCK, &sigmask, NULL)); - PCHECK(signal(SIGCHLD, &Para::WakeUp)); + CHECK(signal(SIGCHLD, &Para::WakeUp) != SIG_ERR); provider_fd_ = provider_->GetFD(); } -void Para::AddTask(const Task& task) { - Task* t = new Task(task); - t->pid = 0; - tasks_.push(t); +Para::~Para() { + PCHECK(close(sig_pipe_[0])); + PCHECK(close(sig_pipe_[1])); +} + +void Para::AddTask(Task* task) { + task->pid = 0; + tasks_.push(task); } static void SetFd(int fd, fd_set* fdset, int* nfds) { + if (fd < 0) + return; FD_SET(fd, fdset); *nfds = max(*nfds, fd); } +static void readOutput(int* fd, string* buf) { + char b[4096]; + ssize_t r = read(*fd, b, sizeof(b)); + if (r < 0 && errno != EINTR) + return; + PCHECK(r); + if (r == 0) { + PCHECK(close(*fd)); + *fd = -1; + return; + } + + size_t l = buf->size(); + buf->resize(l + r); + memcpy(&((*buf)[l]), b, r); +} + void Para::Loop() { sigset_t sigmask; sigemptyset(&sigmask); @@ -114,17 +169,29 @@ void Para::Loop() { if (!done_) SetFd(provider_fd_, &rd, &nfds); SetFd(sig_pipe_[0], &rd, &nfds); + for (Task* t : running_) { + SetFd(t->stdout_pipe[0], &rd, &nfds); + SetFd(t->stderr_pipe[0], &rd, &nfds); + } int r = pselect(nfds, &rd, NULL, NULL, NULL, &sigmask); PCHECK(r && errno != EINTR); - if (FD_ISSET(provider_fd_, &rd)) { - provider_->PollFD(this); - } if (FD_ISSET(sig_pipe_[0], &rd)) { WaitChildren(); + RunCommands(); + continue; + } + if (FD_ISSET(provider_fd_, &rd)) { + provider_->PollFD(this, provider_fd_); + RunCommands(); + continue; + } + for (Task* t : running_) { + if (t->stdout_pipe[0] >= 0 && FD_ISSET(t->stdout_pipe[0], &rd)) + readOutput(&t->stdout_pipe[0], &t->stdout_buf); + if (t->stderr_pipe[0] >= 0 && FD_ISSET(t->stderr_pipe[0], &rd)) + readOutput(&t->stderr_pipe[0], &t->stderr_buf); } - - RunCommands(); } } @@ -138,31 +205,51 @@ void Para::WakeUp(int) { } void Para::WaitChildren() { + char c = 0; + PCHECK(read(sig_pipe_[0], &c, 1)); + CHECK(c == 42); + vector<Task*> finished; for (Task* task : running_) { - int status; - pid_t pid = waitpid(task->pid, &status, WNOHANG); + pid_t pid = waitpid(task->pid, &task->status, WNOHANG); PCHECK(pid); if (pid == 0) { continue; } CHECK(pid == task->pid); + + while (task->stdout_pipe[0] >= 0) + readOutput(&task->stdout_pipe[0], &task->stdout_buf); + while (task->stderr_pipe[0] >= 0) + readOutput(&task->stderr_pipe[0], &task->stderr_buf); + // TODO: Handle error. finished.push_back(task); } for (Task* task : finished) { running_.erase(task); + provider_->OnFinished(task); delete task; } } void Para::RunCommands() { - while (!tasks_.empty() && running_.size() < num_jobs_) { + while (!tasks_.empty() && (running_.size() < num_jobs_ || num_jobs_ == 0)) { Task* task = tasks_.front(); tasks_.pop(); + + PCHECK(pipe(task->stdout_pipe)); + PCHECK(pipe(task->stderr_pipe)); task->pid = fork(); if (task->pid == 0) { + PCHECK(close(task->stdout_pipe[0])); + PCHECK(close(task->stderr_pipe[0])); + PCHECK(dup2(task->stdout_pipe[1], STDOUT_FILENO)); + PCHECK(dup2(task->stderr_pipe[1], STDERR_FILENO)); + PCHECK(close(task->stdout_pipe[1])); + PCHECK(close(task->stderr_pipe[1])); + const char* args[] = { task->shell.c_str(), "-c", @@ -172,14 +259,16 @@ void Para::RunCommands() { PCHECK(execvp(args[0], const_cast<char* const*>(args))); abort(); } + + PCHECK(close(task->stdout_pipe[1])); + PCHECK(close(task->stderr_pipe[1])); running_.insert(task); } } -void StdinTaskProvider::PollFD(Para* para) { - const int BUF_SIZE = 4096; - char buf[BUF_SIZE]; - ssize_t r = read(STDIN_FILENO, buf, BUF_SIZE); +void StdinTaskProvider::PollFD(Para* para, int fd) { + char buf[4096]; + ssize_t r = read(fd, buf, sizeof(buf)); PCHECK(r); if (r == 0) { para->Done(); @@ -193,19 +282,136 @@ void StdinTaskProvider::PollFD(Para* para) { break; } - Task task; - task.shell = "/bin/sh"; - task.cmd = buf_.substr(0, index); - if (task.cmd.empty()) + Task* task = new Task(); + task->shell = "/bin/sh"; + task->cmd = buf_.substr(0, index); + if (task->cmd.empty()) continue; para->AddTask(task); buf_ = buf_.substr(index + 1); } } +void StdinTaskProvider::OnFinished(Task* t) { + fprintf(stdout, "%s", t->stdout_buf.c_str()); + fprintf(stderr, "%s", t->stderr_buf.c_str()); +} + +struct Runner { + string output; + string cmd; + string shell; + bool echo; + bool ignore_error; +}; + +static void recvData(int fd, void* d, size_t sz) { + size_t s = 0; + while (s < sz) { + ssize_t r = read(fd, reinterpret_cast<char*>(d) + s, sz - s); + if (r < 0 && errno == EINTR) + continue; + PCHECK(r); + if (r == 0) { + exit(1); + } + s += r; + } +} + +static int recvInt(int fd) { + int v; + recvData(fd, &v, sizeof(v)); + return v; +} + +static void recvString(int fd, string* s) { + int l = recvInt(fd); + s->resize(l); + recvData(fd, &((*s)[0]), l); +} + +static void recvTasks(int fd, vector<Task*>* tasks) { + int l = recvInt(fd); + for (int i = 0; i < l; i++) { + Task* r = new Task(); + recvString(fd, &r->output); + recvString(fd, &r->cmd); + recvString(fd, &r->shell); + r->echo = recvInt(fd); + r->ignore_error = recvInt(fd); + tasks->push_back(r); + } +} + +void KatiTaskProvider::PollFD(Para* para, int fd) { + vector<Task*> tasks; + recvTasks(fd, &tasks); + for (Task* t : tasks) { + para->AddTask(t); + } +} + +static void sendData(int fd, const void* d, size_t sz) { + size_t s = 0; + while (s < sz) { + ssize_t r = write(fd, reinterpret_cast<const char*>(d) + s, sz - s); + if (r < 0 && errno == EINTR) + continue; + PCHECK(r); + if (r == 0) { + exit(1); + } + s += r; + } +} + +static void sendInt(int fd, int v) { + sendData(fd, &v, sizeof(v)); +} + +static void sendString(int fd, const string& s) { + sendInt(fd, s.size()); + sendData(fd, s.data(), s.size()); +} + +static void sendResult(int fd, Task* t) { + sendString(fd, t->output); + sendString(fd, t->stdout_buf); + sendString(fd, t->stderr_buf); + sendInt(fd, t->status); +} + +void KatiTaskProvider::OnFinished(Task* t) { + sendResult(STDOUT_FILENO, t); +} + +int GetNumCpus() { + // TODO: Implement. + return 4; +} + int main(int argc, char* argv[]) { + int num_jobs = -1; + bool from_kati = false; + for (int i = 1; i < argc; i++) { + char* arg = argv[i]; + if (!strncmp(arg, "-j", 2)) { + num_jobs = atoi(arg + 2); + } else if (!strcmp(arg, "--kati")) { + from_kati = true; + } + } + if (num_jobs < 0) { + num_jobs = GetNumCpus(); + } + TaskProvider* provider = NULL; - provider = new StdinTaskProvider(); - Para para(provider, 4); + if (from_kati) { + provider = new KatiTaskProvider(); + } else { + provider = new StdinTaskProvider(); + } + Para para(provider, num_jobs); para.Loop(); } @@ -0,0 +1,165 @@ +package main + +import ( + "bufio" + "encoding/binary" + "fmt" + "io" + "os/exec" + "path/filepath" +) + +func btoi(b bool) int { + if b { + return 1 + } else { + return 0 + } +} + +func sendMsg(w io.Writer, data []byte) { + for len(data) != 0 { + written, err := w.Write(data) + if err == io.EOF { + return + } + if err != nil { + panic(err) + } + data = data[written:] + } +} + +func sendInt(w io.Writer, i int) { + v := int32(i) + binary.Write(w, binary.LittleEndian, &v) +} + +func sendString(w io.Writer, s string) { + sendInt(w, len(s)) + sendMsg(w, []byte(s)) +} + +func sendRunners(w io.Writer, runners []runner) { + sendInt(w, len(runners)) + for _, r := range runners { + sendString(w, r.output) + sendString(w, r.cmd) + sendString(w, r.shell) + sendInt(w, btoi(r.echo)) + sendInt(w, btoi(r.ignoreError)) + } +} + +type ParaResult struct { + output string + stdout string + stderr string + status int +} + +func recvInt(r *bufio.Reader) (int, error) { + var v int32 + err := binary.Read(r, binary.LittleEndian, &v) + return int(v), err +} + +func recvString(r *bufio.Reader) (string, error) { + l, err := recvInt(r) + if err != nil { + return "", err + } + buf := make([]byte, l) + read := 0 + for read < len(buf) { + r, err := r.Read(buf[read:]) + if err != nil { + return "", err + } + read += r + } + return string(buf), nil +} + +func recvResult(r *bufio.Reader) (*ParaResult, error) { + output, err := recvString(r) + if err != nil { + return nil, err + } + stdout, err := recvString(r) + if err != nil { + return nil, err + } + stderr, err := recvString(r) + if err != nil { + return nil, err + } + status, err := recvInt(r) + if err != nil { + return nil, err + } + return &ParaResult{ + output: output, + stdout: stdout, + stderr: stderr, + status: status, + }, nil +} + +type ParaWorker struct { + para *exec.Cmd + paraChan chan *ParaResult + stdin io.WriteCloser + stdout *bufio.Reader + doneChan chan bool +} + +func NewParaWorker(paraChan chan *ParaResult) *ParaWorker { + bin := filepath.Join(katiDir, "para") + para := exec.Command(bin, fmt.Sprintf("-j%d", jobsFlag), "--kati") + stdin, err := para.StdinPipe() + if err != nil { + panic(err) + } + stdout, err := para.StdoutPipe() + if err != nil { + panic(err) + } + err = para.Start() + if err != nil { + panic(err) + } + return &ParaWorker{ + para: para, + paraChan: paraChan, + stdin: stdin, + stdout: bufio.NewReader(stdout), + doneChan: make(chan bool), + } +} + +func (para *ParaWorker) Run() { + for { + r, err := recvResult(para.stdout) + if err == io.EOF { + break + } + if err != nil { + panic(err) + } + para.paraChan <- r + } + para.para.Process.Kill() + para.para.Process.Wait() + para.doneChan <- true +} + +func (para *ParaWorker) Wait() { + fmt.Printf("wait\n") + para.stdin.Close() + <-para.doneChan +} + +func (para *ParaWorker) RunCommand(runners []runner) { + sendRunners(para.stdin, runners) +} diff --git a/para_test.go b/para_test.go new file mode 100644 index 0000000..1475ce2 --- /dev/null +++ b/para_test.go @@ -0,0 +1,41 @@ +package main + +import ( + "fmt" + "path/filepath" + "testing" +) + +func TestPara(t *testing.T) { + cwd, err := filepath.Abs(".") + if err != nil { + panic(err) + } + katiDir = cwd + + paraChan := make(chan *ParaResult) + para := NewParaWorker(paraChan) + go para.Run() + + num_tasks := 100 + for i := 0; i < num_tasks; i++ { + runners := []runner{ + { + output: fmt.Sprintf("%d", i), + cmd: fmt.Sprintf("echo test%d 2>&1", i), + shell: "/bin/sh", + }, + } + para.RunCommand(runners) + } + + var results []*ParaResult + for len(results) != num_tasks { + select { + case r := <-paraChan: + results = append(results, r) + } + } + + para.Wait() +} @@ -3,6 +3,7 @@ package main import ( "container/heap" "fmt" + "os" "os/exec" "strings" "syscall" @@ -224,18 +225,24 @@ func (j Job) build() { func (wm *WorkerManager) handleJobs() { for { - if len(wm.freeWorkers) == 0 { + if !useParaFlag && len(wm.freeWorkers) == 0 { return } if wm.readyQueue.Len() == 0 { return } j := heap.Pop(&wm.readyQueue).(*Job) - j.numDeps = -1 // Do not let other workers pick this. - w := wm.freeWorkers[0] - wm.freeWorkers = wm.freeWorkers[1:] - wm.busyWorkers[w] = true - w.jobChan <- j + + if useParaFlag { + wm.runnings[j.n.Output] = j + wm.para.RunCommand(j.createRunners()) + } else { + j.numDeps = -1 // Do not let other workers pick this. + w := wm.freeWorkers[0] + wm.freeWorkers = wm.freeWorkers[1:] + wm.busyWorkers[w] = true + w.jobChan <- j + } } } @@ -259,6 +266,10 @@ type WorkerManager struct { doneChan chan bool freeWorkers []*Worker busyWorkers map[*Worker]bool + ex *Executor + para *ParaWorker + paraChan chan *ParaResult + runnings map[string]*Job finishCnt int } @@ -272,12 +283,21 @@ func NewWorkerManager() *WorkerManager { doneChan: make(chan bool), busyWorkers: make(map[*Worker]bool), } - heap.Init(&wm.readyQueue) - for i := 0; i < jobsFlag; i++ { - w := NewWorker(wm) - wm.freeWorkers = append(wm.freeWorkers, w) - go w.Run() + + if useParaFlag { + wm.runnings = make(map[string]*Job) + wm.paraChan = make(chan *ParaResult) + wm.para = NewParaWorker(wm.paraChan) + go wm.para.Run() + } else { + wm.busyWorkers = make(map[*Worker]bool) + for i := 0; i < jobsFlag; i++ { + w := NewWorker(wm) + wm.freeWorkers = append(wm.freeWorkers, w) + go w.Run() + } } + heap.Init(&wm.readyQueue) go wm.Run() return wm } @@ -320,7 +340,7 @@ func (wm *WorkerManager) handleNewDep(j *Job, neededBy *Job) { func (wm *WorkerManager) Run() { done := false - for wm.hasTodo() || len(wm.busyWorkers) > 0 || !done { + for wm.hasTodo() || len(wm.busyWorkers) > 0 || len(wm.runnings) > 0 || !done { select { case j := <-wm.jobChan: j.id = len(wm.jobs) + 1 @@ -333,17 +353,38 @@ func (wm *WorkerManager) Run() { wm.finishCnt++ case af := <-wm.newDepChan: wm.handleNewDep(af.j, af.neededBy) + case pr := <-wm.paraChan: + os.Stdout.Write([]byte(pr.stdout)) + os.Stderr.Write([]byte(pr.stderr)) + j := wm.runnings[pr.output] + wm.updateParents(j) + delete(wm.runnings, pr.output) + wm.finishCnt++ case done = <-wm.waitChan: } wm.handleJobs() - Log("job=%d ready=%d free=%d busy=%d", len(wm.jobs)-wm.finishCnt, wm.readyQueue.Len(), len(wm.freeWorkers), len(wm.busyWorkers)) - } - for _, w := range wm.freeWorkers { - w.Wait() + if useParaFlag { + numBusy := len(wm.runnings) + if numBusy > jobsFlag { + numBusy = jobsFlag + } + Log("job=%d ready=%d free=%d busy=%d", len(wm.jobs)-wm.finishCnt, wm.readyQueue.Len(), jobsFlag-numBusy, numBusy) + } else { + Log("job=%d ready=%d free=%d busy=%d", len(wm.jobs)-wm.finishCnt, wm.readyQueue.Len(), len(wm.freeWorkers), len(wm.busyWorkers)) + } } - for w := range wm.busyWorkers { - w.Wait() + + if useParaFlag { + Log("Wait for para to finish") + wm.para.Wait() + } else { + for _, w := range wm.freeWorkers { + w.Wait() + } + for w := range wm.busyWorkers { + w.Wait() + } } wm.doneChan <- true } |