diff options
Diffstat (limited to 'gcc-4.8/libgo/go/old/netchan/import.go')
-rw-r--r-- | gcc-4.8/libgo/go/old/netchan/import.go | 287 |
1 files changed, 0 insertions, 287 deletions
diff --git a/gcc-4.8/libgo/go/old/netchan/import.go b/gcc-4.8/libgo/go/old/netchan/import.go deleted file mode 100644 index 50abaa9fa..000000000 --- a/gcc-4.8/libgo/go/old/netchan/import.go +++ /dev/null @@ -1,287 +0,0 @@ -// Copyright 2010 The Go Authors. All rights reserved. -// Use of this source code is governed by a BSD-style -// license that can be found in the LICENSE file. - -package netchan - -import ( - "errors" - "io" - "log" - "net" - "reflect" - "sync" - "time" -) - -// Import - -// impLog is a logging convenience function. The first argument must be a string. -func impLog(args ...interface{}) { - args[0] = "netchan import: " + args[0].(string) - log.Print(args...) -} - -// An Importer allows a set of channels to be imported from a single -// remote machine/network port. A machine may have multiple -// importers, even from the same machine/network port. -type Importer struct { - *encDec - chanLock sync.Mutex // protects access to channel map - names map[string]*netChan - chans map[int]*netChan - errors chan error - maxId int - mu sync.Mutex // protects remaining fields - unacked int64 // number of unacknowledged sends. - seqLock sync.Mutex // guarantees messages are in sequence, only locked under mu -} - -// NewImporter creates a new Importer object to import a set of channels -// from the given connection. The Exporter must be available and serving when -// the Importer is created. -func NewImporter(conn io.ReadWriter) *Importer { - imp := new(Importer) - imp.encDec = newEncDec(conn) - imp.chans = make(map[int]*netChan) - imp.names = make(map[string]*netChan) - imp.errors = make(chan error, 10) - imp.unacked = 0 - go imp.run() - return imp -} - -// Import imports a set of channels from the given network and address. -func Import(network, remoteaddr string) (*Importer, error) { - conn, err := net.Dial(network, remoteaddr) - if err != nil { - return nil, err - } - return NewImporter(conn), nil -} - -// shutdown closes all channels for which we are receiving data from the remote side. -func (imp *Importer) shutdown() { - imp.chanLock.Lock() - for _, ich := range imp.chans { - if ich.dir == Recv { - ich.close() - } - } - imp.chanLock.Unlock() -} - -// Handle the data from a single imported data stream, which will -// have the form -// (response, data)* -// The response identifies by name which channel is transmitting data. -func (imp *Importer) run() { - // Loop on responses; requests are sent by ImportNValues() - hdr := new(header) - hdrValue := reflect.ValueOf(hdr) - ackHdr := new(header) - err := new(error_) - errValue := reflect.ValueOf(err) - for { - *hdr = header{} - if e := imp.decode(hdrValue); e != nil { - if e != io.EOF { - impLog("header:", e) - imp.shutdown() - } - return - } - switch hdr.PayloadType { - case payData: - // done lower in loop - case payError: - if e := imp.decode(errValue); e != nil { - impLog("error:", e) - return - } - if err.Error != "" { - impLog("response error:", err.Error) - select { - case imp.errors <- errors.New(err.Error): - continue // errors are not acknowledged - default: - imp.shutdown() - return - } - } - case payClosed: - nch := imp.getChan(hdr.Id, false) - if nch != nil { - nch.close() - } - continue // closes are not acknowledged. - case payAckSend: - // we can receive spurious acks if the channel is - // hung up, so we ask getChan to ignore any errors. - nch := imp.getChan(hdr.Id, true) - if nch != nil { - nch.acked() - imp.mu.Lock() - imp.unacked-- - imp.mu.Unlock() - } - continue - default: - impLog("unexpected payload type:", hdr.PayloadType) - return - } - nch := imp.getChan(hdr.Id, false) - if nch == nil { - continue - } - if nch.dir != Recv { - impLog("cannot happen: receive from non-Recv channel") - return - } - // Acknowledge receipt - ackHdr.Id = hdr.Id - ackHdr.SeqNum = hdr.SeqNum - imp.encode(ackHdr, payAck, nil) - // Create a new value for each received item. - value := reflect.New(nch.ch.Type().Elem()).Elem() - if e := imp.decode(value); e != nil { - impLog("importer value decode:", e) - return - } - nch.send(value) - } -} - -func (imp *Importer) getChan(id int, errOk bool) *netChan { - imp.chanLock.Lock() - ich := imp.chans[id] - imp.chanLock.Unlock() - if ich == nil { - if !errOk { - impLog("unknown id in netchan request: ", id) - } - return nil - } - return ich -} - -// Errors returns a channel from which transmission and protocol errors -// can be read. Clients of the importer are not required to read the error -// channel for correct execution. However, if too many errors occur -// without being read from the error channel, the importer will shut down. -func (imp *Importer) Errors() chan error { - return imp.errors -} - -// Import imports a channel of the given type, size and specified direction. -// It is equivalent to ImportNValues with a count of -1, meaning unbounded. -func (imp *Importer) Import(name string, chT interface{}, dir Dir, size int) error { - return imp.ImportNValues(name, chT, dir, size, -1) -} - -// ImportNValues imports a channel of the given type and specified -// direction and then receives or transmits up to n values on that -// channel. A value of n==-1 implies an unbounded number of values. The -// channel will have buffer space for size values, or 1 value if size < 1. -// The channel to be bound to the remote site's channel is provided -// in the call and may be of arbitrary channel type. -// Despite the literal signature, the effective signature is -// ImportNValues(name string, chT chan T, dir Dir, size, n int) error -// Example usage: -// imp, err := NewImporter("tcp", "netchanserver.mydomain.com:1234") -// if err != nil { log.Fatal(err) } -// ch := make(chan myType) -// err = imp.ImportNValues("name", ch, Recv, 1, 1) -// if err != nil { log.Fatal(err) } -// fmt.Printf("%+v\n", <-ch) -func (imp *Importer) ImportNValues(name string, chT interface{}, dir Dir, size, n int) error { - ch, err := checkChan(chT, dir) - if err != nil { - return err - } - imp.chanLock.Lock() - defer imp.chanLock.Unlock() - _, present := imp.names[name] - if present { - return errors.New("channel name already being imported:" + name) - } - if size < 1 { - size = 1 - } - id := imp.maxId - imp.maxId++ - nch := newNetChan(name, id, &chanDir{ch, dir}, imp.encDec, size, int64(n)) - imp.names[name] = nch - imp.chans[id] = nch - // Tell the other side about this channel. - hdr := &header{Id: id} - req := &request{Name: name, Count: int64(n), Dir: dir, Size: size} - if err = imp.encode(hdr, payRequest, req); err != nil { - impLog("request encode:", err) - return err - } - if dir == Send { - go func() { - for i := 0; n == -1 || i < n; i++ { - val, ok := nch.recv() - if !ok { - if err = imp.encode(hdr, payClosed, nil); err != nil { - impLog("error encoding client closed message:", err) - } - return - } - // We hold the lock during transmission to guarantee messages are - // sent in order. - imp.mu.Lock() - imp.unacked++ - imp.seqLock.Lock() - imp.mu.Unlock() - if err = imp.encode(hdr, payData, val.Interface()); err != nil { - impLog("error encoding client send:", err) - return - } - imp.seqLock.Unlock() - } - }() - } - return nil -} - -// Hangup disassociates the named channel from the Importer and closes -// the channel. Messages in flight for the channel may be dropped. -func (imp *Importer) Hangup(name string) error { - imp.chanLock.Lock() - defer imp.chanLock.Unlock() - nc := imp.names[name] - if nc == nil { - return errors.New("netchan import: hangup: no such channel: " + name) - } - delete(imp.names, name) - delete(imp.chans, nc.id) - nc.close() - return nil -} - -func (imp *Importer) unackedCount() int64 { - imp.mu.Lock() - n := imp.unacked - imp.mu.Unlock() - return n -} - -// Drain waits until all messages sent from this exporter/importer, including -// those not yet sent to any server and possibly including those sent while -// Drain was executing, have been received by the exporter. In short, it -// waits until all the importer's messages have been received. -// If the timeout (measured in nanoseconds) is positive and Drain takes -// longer than that to complete, an error is returned. -func (imp *Importer) Drain(timeout int64) error { - deadline := time.Now().Add(time.Duration(timeout)) - for imp.unackedCount() > 0 { - if timeout > 0 && time.Now().After(deadline) { - return errors.New("timeout") - } - time.Sleep(100 * time.Millisecond) - } - return nil -} |