// Copyright 2009 The Go Authors. All rights reserved. // Use of this source code is governed by a BSD-style // license that can be found in the LICENSE file. // Pipe adapter to connect code expecting an io.Reader // with code expecting an io.Writer. package io import ( "errors" "sync" ) // ErrClosedPipe is the error used for read or write operations on a closed pipe. var ErrClosedPipe = errors.New("io: read/write on closed pipe") type pipeResult struct { n int err error } // A pipe is the shared pipe structure underlying PipeReader and PipeWriter. type pipe struct { rl sync.Mutex // gates readers one at a time wl sync.Mutex // gates writers one at a time l sync.Mutex // protects remaining fields data []byte // data remaining in pending write rwait sync.Cond // waiting reader wwait sync.Cond // waiting writer rerr error // if reader closed, error to give writes werr error // if writer closed, error to give reads } func (p *pipe) read(b []byte) (n int, err error) { // One reader at a time. p.rl.Lock() defer p.rl.Unlock() p.l.Lock() defer p.l.Unlock() for { if p.rerr != nil { return 0, ErrClosedPipe } if p.data != nil { break } if p.werr != nil { return 0, p.werr } p.rwait.Wait() } n = copy(b, p.data) p.data = p.data[n:] if len(p.data) == 0 { p.data = nil p.wwait.Signal() } return } var zero [0]byte func (p *pipe) write(b []byte) (n int, err error) { // pipe uses nil to mean not available if b == nil { b = zero[:] } // One writer at a time. p.wl.Lock() defer p.wl.Unlock() p.l.Lock() defer p.l.Unlock() p.data = b p.rwait.Signal() for { if p.data == nil { break } if p.rerr != nil { err = p.rerr break } if p.werr != nil { err = ErrClosedPipe } p.wwait.Wait() } n = len(b) - len(p.data) p.data = nil // in case of rerr or werr return } func (p *pipe) rclose(err error) { if err == nil { err = ErrClosedPipe } p.l.Lock() defer p.l.Unlock() p.rerr = err p.rwait.Signal() p.wwait.Signal() } func (p *pipe) wclose(err error) { if err == nil { err = EOF } p.l.Lock() defer p.l.Unlock() p.werr = err p.rwait.Signal() p.wwait.Signal() } // A PipeReader is the read half of a pipe. type PipeReader struct { p *pipe } // Read implements the standard Read interface: // it reads data from the pipe, blocking until a writer // arrives or the write end is closed. // If the write end is closed with an error, that error is // returned as err; otherwise err is EOF. func (r *PipeReader) Read(data []byte) (n int, err error) { return r.p.read(data) } // Close closes the reader; subsequent writes to the // write half of the pipe will return the error ErrClosedPipe. func (r *PipeReader) Close() error { return r.CloseWithError(nil) } // CloseWithError closes the reader; subsequent writes // to the write half of the pipe will return the error err. func (r *PipeReader) CloseWithError(err error) error { r.p.rclose(err) return nil } // A PipeWriter is the write half of a pipe. type PipeWriter struct { p *pipe } // Write implements the standard Write interface: // it writes data to the pipe, blocking until readers // have consumed all the data or the read end is closed. // If the read end is closed with an error, that err is // returned as err; otherwise err is ErrClosedPipe. func (w *PipeWriter) Write(data []byte) (n int, err error) { return w.p.write(data) } // Close closes the writer; subsequent reads from the // read half of the pipe will return no bytes and EOF. func (w *PipeWriter) Close() error { return w.CloseWithError(nil) } // CloseWithError closes the writer; subsequent reads from the // read half of the pipe will return no bytes and the error err. func (w *PipeWriter) CloseWithError(err error) error { w.p.wclose(err) return nil } // Pipe creates a synchronous in-memory pipe. // It can be used to connect code expecting an io.Reader // with code expecting an io.Writer. // Reads on one end are matched with writes on the other, // copying data directly between the two; there is no internal buffering. // It is safe to call Read and Write in parallel with each other or with // Close. Close will complete once pending I/O is done. Parallel calls to // Read, and parallel calls to Write, are also safe: // the individual calls will be gated sequentially. func Pipe() (*PipeReader, *PipeWriter) { p := new(pipe) p.rwait.L = &p.l p.wwait.L = &p.l r := &PipeReader{p} w := &PipeWriter{p} return r, w }