aboutsummaryrefslogtreecommitdiffstats
path: root/gcc-4.8/libgo/go/old/netchan/import.go
diff options
context:
space:
mode:
Diffstat (limited to 'gcc-4.8/libgo/go/old/netchan/import.go')
-rw-r--r--gcc-4.8/libgo/go/old/netchan/import.go287
1 files changed, 0 insertions, 287 deletions
diff --git a/gcc-4.8/libgo/go/old/netchan/import.go b/gcc-4.8/libgo/go/old/netchan/import.go
deleted file mode 100644
index 50abaa9fa..000000000
--- a/gcc-4.8/libgo/go/old/netchan/import.go
+++ /dev/null
@@ -1,287 +0,0 @@
-// Copyright 2010 The Go Authors. All rights reserved.
-// Use of this source code is governed by a BSD-style
-// license that can be found in the LICENSE file.
-
-package netchan
-
-import (
- "errors"
- "io"
- "log"
- "net"
- "reflect"
- "sync"
- "time"
-)
-
-// Import
-
-// impLog is a logging convenience function. The first argument must be a string.
-func impLog(args ...interface{}) {
- args[0] = "netchan import: " + args[0].(string)
- log.Print(args...)
-}
-
-// An Importer allows a set of channels to be imported from a single
-// remote machine/network port. A machine may have multiple
-// importers, even from the same machine/network port.
-type Importer struct {
- *encDec
- chanLock sync.Mutex // protects access to channel map
- names map[string]*netChan
- chans map[int]*netChan
- errors chan error
- maxId int
- mu sync.Mutex // protects remaining fields
- unacked int64 // number of unacknowledged sends.
- seqLock sync.Mutex // guarantees messages are in sequence, only locked under mu
-}
-
-// NewImporter creates a new Importer object to import a set of channels
-// from the given connection. The Exporter must be available and serving when
-// the Importer is created.
-func NewImporter(conn io.ReadWriter) *Importer {
- imp := new(Importer)
- imp.encDec = newEncDec(conn)
- imp.chans = make(map[int]*netChan)
- imp.names = make(map[string]*netChan)
- imp.errors = make(chan error, 10)
- imp.unacked = 0
- go imp.run()
- return imp
-}
-
-// Import imports a set of channels from the given network and address.
-func Import(network, remoteaddr string) (*Importer, error) {
- conn, err := net.Dial(network, remoteaddr)
- if err != nil {
- return nil, err
- }
- return NewImporter(conn), nil
-}
-
-// shutdown closes all channels for which we are receiving data from the remote side.
-func (imp *Importer) shutdown() {
- imp.chanLock.Lock()
- for _, ich := range imp.chans {
- if ich.dir == Recv {
- ich.close()
- }
- }
- imp.chanLock.Unlock()
-}
-
-// Handle the data from a single imported data stream, which will
-// have the form
-// (response, data)*
-// The response identifies by name which channel is transmitting data.
-func (imp *Importer) run() {
- // Loop on responses; requests are sent by ImportNValues()
- hdr := new(header)
- hdrValue := reflect.ValueOf(hdr)
- ackHdr := new(header)
- err := new(error_)
- errValue := reflect.ValueOf(err)
- for {
- *hdr = header{}
- if e := imp.decode(hdrValue); e != nil {
- if e != io.EOF {
- impLog("header:", e)
- imp.shutdown()
- }
- return
- }
- switch hdr.PayloadType {
- case payData:
- // done lower in loop
- case payError:
- if e := imp.decode(errValue); e != nil {
- impLog("error:", e)
- return
- }
- if err.Error != "" {
- impLog("response error:", err.Error)
- select {
- case imp.errors <- errors.New(err.Error):
- continue // errors are not acknowledged
- default:
- imp.shutdown()
- return
- }
- }
- case payClosed:
- nch := imp.getChan(hdr.Id, false)
- if nch != nil {
- nch.close()
- }
- continue // closes are not acknowledged.
- case payAckSend:
- // we can receive spurious acks if the channel is
- // hung up, so we ask getChan to ignore any errors.
- nch := imp.getChan(hdr.Id, true)
- if nch != nil {
- nch.acked()
- imp.mu.Lock()
- imp.unacked--
- imp.mu.Unlock()
- }
- continue
- default:
- impLog("unexpected payload type:", hdr.PayloadType)
- return
- }
- nch := imp.getChan(hdr.Id, false)
- if nch == nil {
- continue
- }
- if nch.dir != Recv {
- impLog("cannot happen: receive from non-Recv channel")
- return
- }
- // Acknowledge receipt
- ackHdr.Id = hdr.Id
- ackHdr.SeqNum = hdr.SeqNum
- imp.encode(ackHdr, payAck, nil)
- // Create a new value for each received item.
- value := reflect.New(nch.ch.Type().Elem()).Elem()
- if e := imp.decode(value); e != nil {
- impLog("importer value decode:", e)
- return
- }
- nch.send(value)
- }
-}
-
-func (imp *Importer) getChan(id int, errOk bool) *netChan {
- imp.chanLock.Lock()
- ich := imp.chans[id]
- imp.chanLock.Unlock()
- if ich == nil {
- if !errOk {
- impLog("unknown id in netchan request: ", id)
- }
- return nil
- }
- return ich
-}
-
-// Errors returns a channel from which transmission and protocol errors
-// can be read. Clients of the importer are not required to read the error
-// channel for correct execution. However, if too many errors occur
-// without being read from the error channel, the importer will shut down.
-func (imp *Importer) Errors() chan error {
- return imp.errors
-}
-
-// Import imports a channel of the given type, size and specified direction.
-// It is equivalent to ImportNValues with a count of -1, meaning unbounded.
-func (imp *Importer) Import(name string, chT interface{}, dir Dir, size int) error {
- return imp.ImportNValues(name, chT, dir, size, -1)
-}
-
-// ImportNValues imports a channel of the given type and specified
-// direction and then receives or transmits up to n values on that
-// channel. A value of n==-1 implies an unbounded number of values. The
-// channel will have buffer space for size values, or 1 value if size < 1.
-// The channel to be bound to the remote site's channel is provided
-// in the call and may be of arbitrary channel type.
-// Despite the literal signature, the effective signature is
-// ImportNValues(name string, chT chan T, dir Dir, size, n int) error
-// Example usage:
-// imp, err := NewImporter("tcp", "netchanserver.mydomain.com:1234")
-// if err != nil { log.Fatal(err) }
-// ch := make(chan myType)
-// err = imp.ImportNValues("name", ch, Recv, 1, 1)
-// if err != nil { log.Fatal(err) }
-// fmt.Printf("%+v\n", <-ch)
-func (imp *Importer) ImportNValues(name string, chT interface{}, dir Dir, size, n int) error {
- ch, err := checkChan(chT, dir)
- if err != nil {
- return err
- }
- imp.chanLock.Lock()
- defer imp.chanLock.Unlock()
- _, present := imp.names[name]
- if present {
- return errors.New("channel name already being imported:" + name)
- }
- if size < 1 {
- size = 1
- }
- id := imp.maxId
- imp.maxId++
- nch := newNetChan(name, id, &chanDir{ch, dir}, imp.encDec, size, int64(n))
- imp.names[name] = nch
- imp.chans[id] = nch
- // Tell the other side about this channel.
- hdr := &header{Id: id}
- req := &request{Name: name, Count: int64(n), Dir: dir, Size: size}
- if err = imp.encode(hdr, payRequest, req); err != nil {
- impLog("request encode:", err)
- return err
- }
- if dir == Send {
- go func() {
- for i := 0; n == -1 || i < n; i++ {
- val, ok := nch.recv()
- if !ok {
- if err = imp.encode(hdr, payClosed, nil); err != nil {
- impLog("error encoding client closed message:", err)
- }
- return
- }
- // We hold the lock during transmission to guarantee messages are
- // sent in order.
- imp.mu.Lock()
- imp.unacked++
- imp.seqLock.Lock()
- imp.mu.Unlock()
- if err = imp.encode(hdr, payData, val.Interface()); err != nil {
- impLog("error encoding client send:", err)
- return
- }
- imp.seqLock.Unlock()
- }
- }()
- }
- return nil
-}
-
-// Hangup disassociates the named channel from the Importer and closes
-// the channel. Messages in flight for the channel may be dropped.
-func (imp *Importer) Hangup(name string) error {
- imp.chanLock.Lock()
- defer imp.chanLock.Unlock()
- nc := imp.names[name]
- if nc == nil {
- return errors.New("netchan import: hangup: no such channel: " + name)
- }
- delete(imp.names, name)
- delete(imp.chans, nc.id)
- nc.close()
- return nil
-}
-
-func (imp *Importer) unackedCount() int64 {
- imp.mu.Lock()
- n := imp.unacked
- imp.mu.Unlock()
- return n
-}
-
-// Drain waits until all messages sent from this exporter/importer, including
-// those not yet sent to any server and possibly including those sent while
-// Drain was executing, have been received by the exporter. In short, it
-// waits until all the importer's messages have been received.
-// If the timeout (measured in nanoseconds) is positive and Drain takes
-// longer than that to complete, an error is returned.
-func (imp *Importer) Drain(timeout int64) error {
- deadline := time.Now().Add(time.Duration(timeout))
- for imp.unackedCount() > 0 {
- if timeout > 0 && time.Now().After(deadline) {
- return errors.New("timeout")
- }
- time.Sleep(100 * time.Millisecond)
- }
- return nil
-}