diff options
author | Koushik Dutta <koushd@gmail.com> | 2014-07-26 19:27:30 -0700 |
---|---|---|
committer | Koushik Dutta <koushd@gmail.com> | 2014-07-26 19:27:30 -0700 |
commit | f9ac08876bac04052cce8b78fd05e5cf5fe2fe0e (patch) | |
tree | ff2323ebb0d21b2fd8e6f6a2022ca1bfe81d7648 /AndroidAsync | |
parent | 893442cba5cc5c20ea0be7953860513ab7e4e325 (diff) | |
download | AndroidAsync-f9ac08876bac04052cce8b78fd05e5cf5fe2fe0e.tar.gz AndroidAsync-f9ac08876bac04052cce8b78fd05e5cf5fe2fe0e.tar.bz2 AndroidAsync-f9ac08876bac04052cce8b78fd05e5cf5fe2fe0e.zip |
spdy is working
Diffstat (limited to 'AndroidAsync')
15 files changed, 607 insertions, 305 deletions
diff --git a/AndroidAsync/src/com/koushikdutta/async/Util.java b/AndroidAsync/src/com/koushikdutta/async/Util.java index 5dc0359..5e26662 100644 --- a/AndroidAsync/src/com/koushikdutta/async/Util.java +++ b/AndroidAsync/src/com/koushikdutta/async/Util.java @@ -222,4 +222,26 @@ public class Util { } return null; } + + public static void end(DataEmitter emitter, Exception e) { + if (emitter == null) + return; + end(emitter.getEndCallback(), e); + } + + public static void end(CompletedCallback end, Exception e) { + if (end != null) + end.onCompleted(e); + } + + public static void writable(DataSink emitter) { + if (emitter == null) + return; + writable(emitter.getWriteableCallback()); + } + + public static void writable(WritableCallback writable) { + if (writable != null) + writable.onWriteable(); + } } diff --git a/AndroidAsync/src/com/koushikdutta/async/http/AsyncSSLSocketMiddleware.java b/AndroidAsync/src/com/koushikdutta/async/http/AsyncSSLSocketMiddleware.java index 59fa0f7..01f3fbe 100644 --- a/AndroidAsync/src/com/koushikdutta/async/http/AsyncSSLSocketMiddleware.java +++ b/AndroidAsync/src/com/koushikdutta/async/http/AsyncSSLSocketMiddleware.java @@ -68,7 +68,7 @@ public class AsyncSSLSocketMiddleware extends AsyncSocketMiddleware { return sslEngine; } - protected AsyncSSLSocketWrapper.HandshakeCallback createHandshakeCallback(final ConnectCallback callback) { + protected AsyncSSLSocketWrapper.HandshakeCallback createHandshakeCallback(GetSocketData data, final ConnectCallback callback) { return new AsyncSSLSocketWrapper.HandshakeCallback() { @Override public void onHandshakeCompleted(Exception e, AsyncSSLSocket socket) { @@ -77,15 +77,15 @@ public class AsyncSSLSocketMiddleware extends AsyncSocketMiddleware { }; } - protected void tryHandshake(final ConnectCallback callback, AsyncSocket socket, final Uri uri, final int port) { + protected void tryHandshake(AsyncSocket socket, GetSocketData data, final Uri uri, final int port, final ConnectCallback callback) { AsyncSSLSocketWrapper.handshake(socket, uri.getHost(), port, createConfiguredSSLEngine(uri.getHost(), port), trustManagers, hostnameVerifier, true, - createHandshakeCallback(callback)); + createHandshakeCallback(data, callback)); } @Override - protected ConnectCallback wrapCallback(GetSocketData data, final Uri uri, final int port, final boolean proxied, final ConnectCallback callback) { + protected ConnectCallback wrapCallback(final GetSocketData data, final Uri uri, final int port, final boolean proxied, final ConnectCallback callback) { return new ConnectCallback() { @Override public void onConnectCompleted(Exception ex, final AsyncSocket socket) { @@ -95,7 +95,7 @@ public class AsyncSSLSocketMiddleware extends AsyncSocketMiddleware { } if (!proxied) { - tryHandshake(callback, socket, uri, port); + tryHandshake(socket, data, uri, port, callback); return; } @@ -127,7 +127,7 @@ public class AsyncSSLSocketMiddleware extends AsyncSocketMiddleware { socket.setDataCallback(null); socket.setEndCallback(null); if (TextUtils.isEmpty(s.trim())) { - tryHandshake(callback, socket, uri, port); + tryHandshake(socket, data, uri, port, callback); } else { callback.onConnectCompleted(new IOException("unknown second status line"), socket); diff --git a/AndroidAsync/src/com/koushikdutta/async/http/spdy/AsyncSpdyConnection.java b/AndroidAsync/src/com/koushikdutta/async/http/spdy/AsyncSpdyConnection.java index 18e856c..066f92a 100644 --- a/AndroidAsync/src/com/koushikdutta/async/http/spdy/AsyncSpdyConnection.java +++ b/AndroidAsync/src/com/koushikdutta/async/http/spdy/AsyncSpdyConnection.java @@ -3,12 +3,15 @@ package com.koushikdutta.async.http.spdy; import com.koushikdutta.async.AsyncServer; import com.koushikdutta.async.AsyncSocket; import com.koushikdutta.async.BufferedDataEmitter; +import com.koushikdutta.async.BufferedDataSink; import com.koushikdutta.async.ByteBufferList; import com.koushikdutta.async.DataEmitter; import com.koushikdutta.async.Util; import com.koushikdutta.async.callback.CompletedCallback; import com.koushikdutta.async.callback.DataCallback; import com.koushikdutta.async.callback.WritableCallback; +import com.koushikdutta.async.future.SimpleFuture; +import com.koushikdutta.async.http.Headers; import com.koushikdutta.async.http.Protocol; import com.koushikdutta.async.http.spdy.okhttp.internal.spdy.ErrorCode; import com.koushikdutta.async.http.spdy.okhttp.internal.spdy.FrameReader; @@ -20,8 +23,12 @@ import com.koushikdutta.async.http.spdy.okhttp.internal.spdy.Ping; import com.koushikdutta.async.http.spdy.okhttp.internal.spdy.Settings; import com.koushikdutta.async.http.spdy.okhttp.internal.spdy.Spdy3; import com.koushikdutta.async.http.spdy.okhttp.internal.spdy.Variant; +import com.koushikdutta.async.http.spdy.okio.BufferedSink; import com.koushikdutta.async.http.spdy.okio.BufferedSource; import com.koushikdutta.async.http.spdy.okio.ByteString; +import com.koushikdutta.async.http.spdy.okio.Okio; + +import junit.framework.Assert; import java.io.IOException; import java.util.Hashtable; @@ -37,31 +44,126 @@ import static com.koushikdutta.async.http.spdy.okhttp.internal.spdy.Settings.DEF public class AsyncSpdyConnection implements FrameReader.Handler { BufferedDataEmitter emitter; AsyncSocket socket; + BufferedDataSink bufferedSocket; FrameReader reader; FrameWriter writer; Variant variant; - SpdySocket zero = new SpdySocket(0, false, false, null); +// SpdySocket zero = new SpdySocket(0, false, false, null); ByteBufferListSource source = new ByteBufferListSource(); + ByteBufferListSink sink = new ByteBufferListSink() { + @Override + public void flush() throws IOException { + AsyncSpdyConnection.this.flush(); + } + }; + BufferedSource bufferedSource; + BufferedSink bufferedSink; Hashtable<Integer, SpdySocket> sockets = new Hashtable<Integer, SpdySocket>(); Protocol protocol; boolean client = true; - private class SpdySocket implements AsyncSocket { + public void flush() { + bufferedSocket.write(sink); + } + + /** + * Returns a new locally-initiated stream. + * + * @param out true to create an output stream that we can use to send data to the remote peer. + * Corresponds to {@code FLAG_FIN}. + * @param in true to create an input stream that the remote peer can use to send data to us. + * Corresponds to {@code FLAG_UNIDIRECTIONAL}. + */ + public SpdySocket newStream(List<Header> requestHeaders, boolean out, boolean in) throws IOException { + return newStream(0, requestHeaders, out, in); + } + + private SpdySocket newStream(int associatedStreamId, List<Header> requestHeaders, boolean out, + boolean in) throws IOException { + boolean outFinished = !out; + boolean inFinished = !in; + SpdySocket socket; + int streamId; + + if (shutdown) { + throw new IOException("shutdown"); + } + + streamId = nextStreamId; + nextStreamId += 2; + socket = new SpdySocket(streamId, outFinished, inFinished, requestHeaders); + if (socket.isOpen()) { + sockets.put(streamId, socket); +// setIdle(false); + } + if (associatedStreamId == 0) { + writer.synStream(outFinished, inFinished, streamId, associatedStreamId, + requestHeaders); + } else if (client) { + throw new IllegalArgumentException("client streams shouldn't have associated stream IDs"); + } else { // HTTP/2 has a PUSH_PROMISE frame. + writer.pushPromise(associatedStreamId, streamId, requestHeaders); + } + + if (!out) { + writer.flush(); + } + + return socket; + } + + int totalWindowRead; + void updateWindowRead(int length) { + totalWindowRead += length; + if (totalWindowRead >= okHttpSettings.getInitialWindowSize(DEFAULT_INITIAL_WINDOW_SIZE) / 2) { + try { + writer.windowUpdate(0, totalWindowRead); + } + catch (IOException e) { + throw new AssertionError(e); + } + totalWindowRead = 0; + } + } + + public class SpdySocket implements AsyncSocket { long bytesLeftInWriteWindow; WritableCallback writable; final int id; CompletedCallback closedCallback; CompletedCallback endCallback; DataCallback dataCallback; - ByteBufferList pending = new ByteBufferList(); + ByteBufferListSink pending = new ByteBufferListSink(); + SimpleFuture<List<Header>> headers = new SimpleFuture<List<Header>>(); + boolean isOpen = true; + int totalWindowRead; - public SpdySocket(int id, boolean outFinished, boolean inFinished, List<Header> headerBlock) { - this.id = id; + public SimpleFuture<List<Header>> headers() { + return headers; } - private void report(Exception e) { - if (endCallback != null) - endCallback.onCompleted(e); + void updateWindowRead(int length) { + totalWindowRead += length; + if (totalWindowRead >= okHttpSettings.getInitialWindowSize(DEFAULT_INITIAL_WINDOW_SIZE) / 2) { + try { + writer.windowUpdate(id, totalWindowRead); + } + catch (IOException e) { + throw new AssertionError(e); + } + totalWindowRead = 0; + } + AsyncSpdyConnection.this.updateWindowRead(length); + } + + public SpdySocket(int id, boolean outFinished, boolean inFinished, List<Header> headerBlock) { + this.id = id; + try { + writer.windowUpdate(id, DEFAULT_INITIAL_WINDOW_SIZE); + } + catch (IOException e) { + throw new AssertionError(e); + } } public boolean isLocallyInitiated() { @@ -72,8 +174,8 @@ public class AsyncSpdyConnection implements FrameReader.Handler { public void addBytesToWriteWindow(long delta) { long prev = bytesLeftInWriteWindow; bytesLeftInWriteWindow += delta; - if (writable != null && bytesLeftInWriteWindow > 0 && prev <= 0) - writable.onWriteable(); + if (bytesLeftInWriteWindow > 0 && prev <= 0) + Util.writable(writable); } @Override @@ -109,7 +211,7 @@ public class AsyncSpdyConnection implements FrameReader.Handler { @Override public void close() { - + isOpen = false; } @Override @@ -134,7 +236,7 @@ public class AsyncSpdyConnection implements FrameReader.Handler { @Override public void write(ByteBufferList bb) { - + System.out.println("writing!"); } @Override @@ -149,7 +251,7 @@ public class AsyncSpdyConnection implements FrameReader.Handler { @Override public boolean isOpen() { - return true; + return isOpen; } @Override @@ -165,11 +267,20 @@ public class AsyncSpdyConnection implements FrameReader.Handler { public CompletedCallback getClosedCallback() { return closedCallback; } + + public void receiveHeaders(List<Header> headers, HeadersMode headerMode) { + this.headers.setComplete(headers); + } } + final Settings okHttpSettings = new Settings(); + private int nextPingId; + private static final int OKHTTP_CLIENT_WINDOW_SIZE = 16 * 1024 * 1024; + public AsyncSpdyConnection(AsyncSocket socket, Protocol protocol) { this.protocol = protocol; this.socket = socket; + this.bufferedSocket = new BufferedDataSink(socket); emitter = new BufferedDataEmitter(socket); emitter.setDataCallback(callback); @@ -179,24 +290,53 @@ public class AsyncSpdyConnection implements FrameReader.Handler { else if (protocol == Protocol.HTTP_2) { variant = new Http20Draft13(); } - reader = variant.newReader(source, true); + reader = variant.newReader(bufferedSource = Okio.buffer(source), true); + writer = variant.newWriter(bufferedSink = Okio.buffer(sink), true); + + boolean client = true; + nextStreamId = client ? 1 : 2; + if (client && protocol == Protocol.HTTP_2) { + nextStreamId += 2; // In HTTP/2, 1 on client is reserved for Upgrade. + } + nextPingId = client ? 1 : 2; + // Flow control was designed more for servers, or proxies than edge clients. + // If we are a client, set the flow control window to 16MiB. This avoids + // thrashing window updates every 64KiB, yet small enough to avoid blowing + // up the heap. + if (client) { + okHttpSettings.set(Settings.INITIAL_WINDOW_SIZE, 0, OKHTTP_CLIENT_WINDOW_SIZE); + } } DataCallback callback = new DataCallback() { @Override public void onDataAvailable(DataEmitter emitter, ByteBufferList bb) { - bb.get(source); - if (!reader.canProcessFrame(source)) - return; - try { - reader.nextFrame(AsyncSpdyConnection.this); - } - catch (IOException e) { - throw new AssertionError(e); + int needed; + while ((needed = reader.canProcessFrame(bb)) > 0) { + bb.get(source, needed); + try { + reader.nextFrame(AsyncSpdyConnection.this); + } + catch (IOException e) { + throw new AssertionError(e); + } } } }; + /** + * Sends a connection header if the current variant requires it. This should + * be called after {@link Builder#build} for all new connections. + */ + public void sendConnectionPreface() throws IOException { + writer.connectionPreface(); + writer.settings(okHttpSettings); + int windowSize = okHttpSettings.getInitialWindowSize(Settings.DEFAULT_INITIAL_WINDOW_SIZE); + if (windowSize != Settings.DEFAULT_INITIAL_WINDOW_SIZE) { + writer.windowUpdate(0, windowSize - Settings.DEFAULT_INITIAL_WINDOW_SIZE); + } + } + /** Even, positive numbered streams are pushed streams in HTTP/2. */ private boolean pushedStream(int streamId) { return protocol == Protocol.HTTP_2 && streamId != 0 && (streamId & 1) == 0; @@ -215,12 +355,16 @@ public class AsyncSpdyConnection implements FrameReader.Handler { source.skip(length); return; } - if (source != this.source) + if (source != this.bufferedSource || this.source.remaining() + source.buffer().size() != length) throw new AssertionError(); - this.source.get(socket.pending, length); + source.buffer().readAll(socket.pending); + this.source.get(socket.pending); + socket.updateWindowRead(length); Util.emitAllData(socket, socket.pending); if (inFinished) { - socket.report(null); + sockets.remove(streamId); + socket.close(); + Util.end(socket, null); } } @@ -228,7 +372,6 @@ public class AsyncSpdyConnection implements FrameReader.Handler { private int nextStreamId; @Override public void headers(boolean outFinished, boolean inFinished, int streamId, int associatedStreamId, List<Header> headerBlock, HeadersMode headersMode) { - /* if (pushedStream(streamId)) { throw new AssertionError("push"); // pushHeadersLater(streamId, headerBlock, inFinished); @@ -258,25 +401,34 @@ public class AsyncSpdyConnection implements FrameReader.Handler { // If the stream ID is in the client's namespace, assume it's already closed. if (streamId % 2 == nextStreamId % 2) return; + throw new AssertionError("unexpected receive stream"); + // Create a stream. - socket = new SpdySocket(streamId, outFinished, inFinished, headerBlock); - lastGoodStreamId = streamId; - sockets.put(streamId, socket); - handler.receive(newStream); - return; +// socket = new SpdySocket(streamId, outFinished, inFinished, headerBlock); +// lastGoodStreamId = streamId; +// sockets.put(streamId, socket); +// handler.receive(newStream); +// return; } // The headers claim to be for a new stream, but we already have one. if (headersMode.failIfStreamPresent()) { - stream.closeLater(ErrorCode.PROTOCOL_ERROR); - removeStream(streamId); + try { + writer.rstStream(streamId, ErrorCode.INVALID_STREAM); + } + catch (IOException e) { + throw new AssertionError(e); + } + sockets.remove(streamId); return; } // Update an existing stream. - stream.receiveHeaders(headerBlock, headersMode); - if (inFinished) stream.receiveFin(); - */ + socket.receiveHeaders(headerBlock, headersMode); + if (inFinished) { + sockets.remove(streamId); + Util.end(socket, null); + } } @Override @@ -288,10 +440,11 @@ public class AsyncSpdyConnection implements FrameReader.Handler { } SpdySocket rstStream = sockets.remove(streamId); if (rstStream != null) { - rstStream.report(new IOException(errorCode.toString())); + Util.end(rstStream, new IOException(errorCode.toString())); } } + long bytesLeftInWriteWindow; Settings peerSettings = new Settings(); private boolean receivedInitialPeerSettings = false; @Override @@ -310,7 +463,7 @@ public class AsyncSpdyConnection implements FrameReader.Handler { if (peerInitialWindowSize != -1 && peerInitialWindowSize != priorWriteWindowSize) { delta = peerInitialWindowSize - priorWriteWindowSize; if (!receivedInitialPeerSettings) { - zero.addBytesToWriteWindow(delta); + addBytesToWriteWindow(delta); receivedInitialPeerSettings = true; } } @@ -319,8 +472,21 @@ public class AsyncSpdyConnection implements FrameReader.Handler { } } + void addBytesToWriteWindow(long delta) { + bytesLeftInWriteWindow += delta; + for (SpdySocket socket: sockets.values()) { + Util.writable(socket); + } + } + @Override public void ackSettings() { + try { + writer.ackSettings(); + } + catch (IOException e) { + throw new AssertionError(e); + } } private Map<Integer, Ping> pings; @@ -362,7 +528,7 @@ public class AsyncSpdyConnection implements FrameReader.Handler { Map.Entry<Integer, SpdySocket> entry = i.next(); int streamId = entry.getKey(); if (streamId > lastGoodStreamId && entry.getValue().isLocallyInitiated()) { - entry.getValue().report(new IOException(ErrorCode.REFUSED_STREAM.toString())); + Util.end(entry.getValue(), new IOException(ErrorCode.REFUSED_STREAM.toString())); i.remove(); } } @@ -370,25 +536,25 @@ public class AsyncSpdyConnection implements FrameReader.Handler { @Override public void windowUpdate(int streamId, long windowSizeIncrement) { - System.out.println("fff"); - + if (streamId == 0) { + addBytesToWriteWindow(windowSizeIncrement); + return; + } + SpdySocket socket = sockets.get(streamId); + if (socket != null) + socket.addBytesToWriteWindow(windowSizeIncrement); } @Override public void priority(int streamId, int streamDependency, int weight, boolean exclusive) { - System.out.println("fff"); - } @Override public void pushPromise(int streamId, int promisedStreamId, List<Header> requestHeaders) throws IOException { - System.out.println("fff"); - + throw new AssertionError("pushPromise"); } @Override public void alternateService(int streamId, String origin, ByteString protocol, String host, int port, long maxAge) { - System.out.println("fff"); - } } diff --git a/AndroidAsync/src/com/koushikdutta/async/http/spdy/ByteBufferListSink.java b/AndroidAsync/src/com/koushikdutta/async/http/spdy/ByteBufferListSink.java new file mode 100644 index 0000000..aa9da9a --- /dev/null +++ b/AndroidAsync/src/com/koushikdutta/async/http/spdy/ByteBufferListSink.java @@ -0,0 +1,52 @@ +package com.koushikdutta.async.http.spdy; + +import com.koushikdutta.async.ByteBufferList; +import com.koushikdutta.async.http.spdy.okio.Buffer; +import com.koushikdutta.async.http.spdy.okio.Segment; +import com.koushikdutta.async.http.spdy.okio.SegmentPool; +import com.koushikdutta.async.http.spdy.okio.Sink; +import com.koushikdutta.async.http.spdy.okio.Timeout; + +import java.io.IOException; +import java.nio.ByteBuffer; + +/** + * Created by koush on 7/25/14. + */ +public class ByteBufferListSink extends ByteBufferList implements Sink { + @Override + public void write(Buffer source, long byteCount) throws IOException { + Segment s = source.head; + while (byteCount > 0) { + int toCopy = (int) Math.min(byteCount, s.limit - s.pos); + ByteBuffer b = obtain(toCopy); + b.put(s.data, s.pos, toCopy); + b.flip(); + add(b); + + s.pos += toCopy; + source.size -= toCopy; + byteCount -= toCopy; + + if (s.pos == s.limit) { + Segment toRecycle = s; + source.head = s = toRecycle.pop(); + SegmentPool.getInstance().recycle(toRecycle); + } + } + } + + @Override + public void flush() throws IOException { + } + + @Override + public Timeout timeout() { + return Timeout.NONE; + } + + @Override + public void close() throws IOException { + recycle(); + } +} diff --git a/AndroidAsync/src/com/koushikdutta/async/http/spdy/ByteBufferListSource.java b/AndroidAsync/src/com/koushikdutta/async/http/spdy/ByteBufferListSource.java index 5d57c77..13803b3 100644 --- a/AndroidAsync/src/com/koushikdutta/async/http/spdy/ByteBufferListSource.java +++ b/AndroidAsync/src/com/koushikdutta/async/http/spdy/ByteBufferListSource.java @@ -2,173 +2,39 @@ package com.koushikdutta.async.http.spdy; import com.koushikdutta.async.ByteBufferList; import com.koushikdutta.async.http.spdy.okio.Buffer; -import com.koushikdutta.async.http.spdy.okio.BufferedSource; -import com.koushikdutta.async.http.spdy.okio.ByteString; -import com.koushikdutta.async.http.spdy.okio.Sink; +import com.koushikdutta.async.http.spdy.okio.Source; import com.koushikdutta.async.http.spdy.okio.Timeout; -import com.koushikdutta.async.util.Charsets; import java.io.IOException; -import java.io.InputStream; -import java.nio.ByteOrder; -import java.nio.charset.Charset; +import java.nio.ByteBuffer; /** - * Created by koush on 7/17/14. + * Created by koush on 7/25/14. */ -public class ByteBufferListSource extends ByteBufferList implements BufferedSource { - @Override - public Buffer buffer() { - return null; - } - - @Override - public boolean exhausted() throws IOException { - return !hasRemaining(); - } - - @Override - public void require(long byteCount) throws IOException { - if (remaining() < byteCount) - throw new AssertionError("out of data"); - } - - @Override - public byte readByte() throws IOException { - return order(ByteOrder.BIG_ENDIAN).get(); - } - - @Override - public short readShort() throws IOException { - return order(ByteOrder.BIG_ENDIAN).getShort(); - } - - @Override - public short readShortLe() throws IOException { - return order(ByteOrder.LITTLE_ENDIAN).getShort(); - } - - @Override - public int readInt() throws IOException { - return order(ByteOrder.BIG_ENDIAN).getInt(); - } - - @Override - public int readIntLe() throws IOException { - return order(ByteOrder.LITTLE_ENDIAN).getInt(); - } - - @Override - public long readLong() throws IOException { - return order(ByteOrder.BIG_ENDIAN).getLong(); - } - - @Override - public long readLongLe() throws IOException { - return order(ByteOrder.LITTLE_ENDIAN).getLong(); - } - - @Override - public void skip(long byteCount) throws IOException { - if (byteCount > Integer.MAX_VALUE) - throw new AssertionError("too much skippy, use less peanut butter"); - read(new byte[(int)byteCount]); - } - - @Override - public ByteString readByteString() throws IOException { - return readByteString(remaining()); - } - - @Override - public ByteString readByteString(long byteCount) throws IOException { - return ByteString.of(readByteArray(byteCount)); - } - - @Override - public byte[] readByteArray() throws IOException { - return getAllByteArray(); - } - - @Override - public byte[] readByteArray(long byteCount) throws IOException { - byte[] ret = new byte[(int)byteCount]; - get(ret); - return ret; - } - - @Override - public int read(byte[] sink) throws IOException { - return read(sink, 0, sink.length); - } - - @Override - public void readFully(byte[] sink) throws IOException { - read(sink, 0, sink.length); - } - - @Override - public int read(byte[] sink, int offset, int byteCount) throws IOException { - get(sink, offset, byteCount); - return byteCount; - } - - @Override - public void readFully(Buffer sink, long byteCount) throws IOException { - throw new AssertionError("not implemented"); - } - - @Override - public long readAll(Sink sink) throws IOException { - throw new AssertionError("not implemented"); - } - - @Override - public String readUtf8() throws IOException { - return readUtf8(remaining()); - } - - @Override - public String readUtf8(long byteCount) throws IOException { - return new String(readByteArray(byteCount), Charsets.UTF_8); - } - - @Override - public String readUtf8Line() throws IOException { - throw new AssertionError("not implemented"); - } - - @Override - public String readUtf8LineStrict() throws IOException { - throw new AssertionError("not implemented"); - } - - @Override - public String readString(long byteCount, Charset charset) throws IOException { - return new String(readByteArray(byteCount), charset); - } - - @Override - public long indexOf(byte b) throws IOException { - throw new AssertionError("not implemented"); - } - - @Override - public InputStream inputStream() { - throw new AssertionError("not implemented"); - } - +public class ByteBufferListSource extends ByteBufferList implements Source { @Override public long read(Buffer sink, long byteCount) throws IOException { - throw new AssertionError("not implemented"); + if (!hasRemaining()) + throw new AssertionError("empty!"); + int total = 0; + while (total < byteCount && hasRemaining()) { + ByteBuffer b = remove(); + int toRead = (int)Math.min(byteCount - total, b.remaining()); + total += toRead; + sink.write(b.array(), b.arrayOffset() + b.position(), toRead); + b.position(b.position() + toRead); + addFirst(b); + } + return total; } @Override public Timeout timeout() { - return null; + return Timeout.NONE; } @Override public void close() throws IOException { + recycle(); } } diff --git a/AndroidAsync/src/com/koushikdutta/async/http/spdy/SpdyMiddleware.java b/AndroidAsync/src/com/koushikdutta/async/http/spdy/SpdyMiddleware.java index 35d4257..3ebe802 100644 --- a/AndroidAsync/src/com/koushikdutta/async/http/spdy/SpdyMiddleware.java +++ b/AndroidAsync/src/com/koushikdutta/async/http/spdy/SpdyMiddleware.java @@ -1,19 +1,32 @@ package com.koushikdutta.async.http.spdy; +import android.net.Uri; + import com.koushikdutta.async.AsyncSSLSocket; import com.koushikdutta.async.AsyncSSLSocketWrapper; import com.koushikdutta.async.ByteBufferList; import com.koushikdutta.async.callback.ConnectCallback; import com.koushikdutta.async.future.Cancellable; +import com.koushikdutta.async.future.FutureCallback; +import com.koushikdutta.async.future.SimpleCancellable; +import com.koushikdutta.async.future.TransformFuture; import com.koushikdutta.async.http.AsyncHttpClient; +import com.koushikdutta.async.http.AsyncHttpClientMiddleware; import com.koushikdutta.async.http.AsyncSSLEngineConfigurator; import com.koushikdutta.async.http.AsyncSSLSocketMiddleware; +import com.koushikdutta.async.http.Headers; +import com.koushikdutta.async.http.Multimap; import com.koushikdutta.async.http.Protocol; +import com.koushikdutta.async.http.spdy.okhttp.internal.spdy.Header; +import com.koushikdutta.async.http.spdy.okhttp.internal.spdy.SpdyConnection; import com.koushikdutta.async.util.Charsets; import java.lang.reflect.Field; import java.lang.reflect.Method; import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Hashtable; +import java.util.List; import javax.net.ssl.SSLContext; import javax.net.ssl.SSLEngine; @@ -41,8 +54,15 @@ public class SpdyMiddleware extends AsyncSSLSocketMiddleware { return ret; } + private static String requestPath(Uri uri) { + String pathAndQuery = uri.getPath(); + if (pathAndQuery == null) return "/"; + if (!pathAndQuery.startsWith("/")) return "/" + pathAndQuery; + return pathAndQuery; + } + @Override - protected AsyncSSLSocketWrapper.HandshakeCallback createHandshakeCallback(final ConnectCallback callback) { + protected AsyncSSLSocketWrapper.HandshakeCallback createHandshakeCallback(final GetSocketData data, final ConnectCallback callback) { return new AsyncSSLSocketWrapper.HandshakeCallback() { @Override public void onHandshakeCompleted(Exception e, AsyncSSLSocket socket) { @@ -53,8 +73,23 @@ public class SpdyMiddleware extends AsyncSSLSocketMiddleware { try { long ptr = (Long)sslNativePointer.get(socket.getSSLEngine()); byte[] proto = (byte[])nativeGetAlpnNegotiatedProtocol.invoke(null, ptr); + if (proto == null) { + callback.onConnectCompleted(null, socket); + return; + } String protoString = new String(proto); - AsyncSpdyConnection connection = new AsyncSpdyConnection(socket, Protocol.get(protoString)); + Protocol p = Protocol.get(protoString); + if (p == null) { + callback.onConnectCompleted(null, socket); + return; + } + final AsyncSpdyConnection connection = new AsyncSpdyConnection(socket, Protocol.get(protoString)); + connection.sendConnectionPreface(); + connection.flush(); + + connections.put(data.request.getUri().getHost(), connection); + + newSocket(data, connection, callback); } catch (Exception ex) { socket.close(); @@ -64,6 +99,83 @@ public class SpdyMiddleware extends AsyncSSLSocketMiddleware { }; } + private void newSocket(GetSocketData data, final AsyncSpdyConnection connection, final ConnectCallback callback) { + final ArrayList<Header> headers = new ArrayList<Header>(); + headers.add(new Header(Header.TARGET_METHOD, data.request.getMethod())); + headers.add(new Header(Header.TARGET_PATH, requestPath(data.request.getUri()))); + String host = data.request.getHeaders().get("Host"); + if (Protocol.SPDY_3 == connection.protocol) { + headers.add(new Header(Header.VERSION, "HTTP/1.1")); + headers.add(new Header(Header.TARGET_HOST, host)); + } else if (Protocol.HTTP_2 == connection.protocol) { + headers.add(new Header(Header.TARGET_AUTHORITY, host)); // Optional in HTTP/2 + } else { + throw new AssertionError(); + } + headers.add(new Header(Header.TARGET_SCHEME, data.request.getUri().getScheme())); + + Multimap mm = data.request.getHeaders().getMultiMap(); + for (String key: mm.keySet()) { + if (SpdyTransport.isProhibitedHeader(connection.protocol, key)) + continue; + for (String value: mm.get(key)) { + headers.add(new Header(key.toLowerCase(), value)); + } + } + + connection.socket.getServer().postDelayed(new Runnable() { + @Override + public void run() { + try { + AsyncSpdyConnection.SpdySocket spdy = connection.newStream(headers, false, true); + connection.flush(); + callback.onConnectCompleted(null, spdy); + } + catch (Exception e) { + throw new AssertionError(e); + } + } + }, 1000); + } + + @Override + public boolean exchangeHeaders(final ExchangeHeaderData data) { + if (!(data.socket instanceof AsyncSpdyConnection.SpdySocket)) + return false; + + // headers were already sent as part of the socket being opened. + data.sendHeadersCallback.onCompleted(null); + + final AsyncSpdyConnection.SpdySocket spdySocket = (AsyncSpdyConnection.SpdySocket)data.socket; + spdySocket.headers() + .then(new TransformFuture<Headers, List<Header>>() { + @Override + protected void transform(List<Header> result) throws Exception { + Headers headers = new Headers(); + for (Header header: result) { + String key = header.name.utf8(); + String value = header.value.utf8(); + headers.add(key, value); + } + String status = headers.remove(Header.RESPONSE_STATUS.utf8()); + String[] statusParts = status.split(" ", 2); + data.response.code(Integer.parseInt(statusParts[0])); + data.response.message(statusParts[1]); + data.response.protocol(headers.remove(Header.VERSION.utf8())); + data.response.headers(headers); + setComplete(headers); + } + }) + .setCallback(new FutureCallback<Headers>() { + @Override + public void onCompleted(Exception e, Headers result) { + data.receiveHeadersCallback.onCompleted(e); + data.response.emitter(spdySocket); + } + }); + return true; + } + private void configure(SSLEngine engine, String host, int port) { if (!initialized) { initialized = true; @@ -144,6 +256,7 @@ public class SpdyMiddleware extends AsyncSSLSocketMiddleware { Field useSni; Method nativeGetNpnNegotiatedProtocol; Method nativeGetAlpnNegotiatedProtocol; + Hashtable<String, AsyncSpdyConnection> connections = new Hashtable<String, AsyncSpdyConnection>(); @Override public void setSSLContext(SSLContext sslContext) { @@ -153,6 +266,24 @@ public class SpdyMiddleware extends AsyncSSLSocketMiddleware { @Override public Cancellable getSocket(GetSocketData data) { - return super.getSocket(data); + final Uri uri = data.request.getUri(); + final int port = getSchemePort(data.request.getUri()); + if (port == -1) { + return null; + } + + // can we use an existing connection to satisfy this, or do we need a new one? + String host = uri.getHost(); + AsyncSpdyConnection conn = connections.get(host); + if (conn == null || !conn.socket.isOpen()) { + connections.remove(host); + return super.getSocket(data); + } + + newSocket(data, conn, data.connectCallback); + + SimpleCancellable ret = new SimpleCancellable(); + ret.setComplete(); + return ret; } }
\ No newline at end of file diff --git a/AndroidAsync/src/com/koushikdutta/async/http/spdy/SpdyTransport.java b/AndroidAsync/src/com/koushikdutta/async/http/spdy/SpdyTransport.java new file mode 100644 index 0000000..e915a06 --- /dev/null +++ b/AndroidAsync/src/com/koushikdutta/async/http/spdy/SpdyTransport.java @@ -0,0 +1,61 @@ +/* + * Copyright (C) 2012 The Android Open Source Project + * + * Licensed under the Apache License, Version 2.0 (the "License"; + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.koushikdutta.async.http.spdy; + + +import com.koushikdutta.async.http.Protocol; +import com.koushikdutta.async.http.spdy.okhttp.internal.Util; +import com.koushikdutta.async.http.spdy.okio.ByteString; + +import java.util.List; + + +final class SpdyTransport { + /** See http://www.chromium.org/spdy/spdy-protocol/spdy-protocol-draft3-1#TOC-3.2.1-Request. */ + private static final List<String> SPDY_3_PROHIBITED_HEADERS = Util.immutableList( + "accept-encoding", + "user-agent", + "accept", + + "connection", + "host", + "keep-alive", + "proxy-connection", + "transfer-encoding"); + + /** See http://tools.ietf.org/html/draft-ietf-httpbis-http2-09#section-8.1.3. */ + private static final List<String> HTTP_2_PROHIBITED_HEADERS = Util.immutableList( + "connection", + "host", + "keep-alive", + "proxy-connection", + "te", + "transfer-encoding", + "encoding", + "upgrade"); + + /** When true, this header should not be emitted or consumed. */ + static boolean isProhibitedHeader(Protocol protocol, String name) { + if (protocol == Protocol.SPDY_3) { + return SPDY_3_PROHIBITED_HEADERS.contains(name.toLowerCase()); + } else if (protocol == Protocol.HTTP_2) { + return HTTP_2_PROHIBITED_HEADERS.contains(name.toLowerCase()); + } else { + throw new AssertionError(protocol); + } + } +} diff --git a/AndroidAsync/src/com/koushikdutta/async/http/spdy/okhttp/internal/spdy/FrameReader.java b/AndroidAsync/src/com/koushikdutta/async/http/spdy/okhttp/internal/spdy/FrameReader.java index 5305f63..19b6b77 100644 --- a/AndroidAsync/src/com/koushikdutta/async/http/spdy/okhttp/internal/spdy/FrameReader.java +++ b/AndroidAsync/src/com/koushikdutta/async/http/spdy/okhttp/internal/spdy/FrameReader.java @@ -26,7 +26,7 @@ import java.util.List; /** Reads transport frames for SPDY/3 or HTTP/2. */ public interface FrameReader extends Closeable { - boolean canProcessFrame(ByteBufferList bb); + int canProcessFrame(ByteBufferList bb); void readConnectionPreface() throws IOException; boolean nextFrame(Handler handler) throws IOException; diff --git a/AndroidAsync/src/com/koushikdutta/async/http/spdy/okhttp/internal/spdy/Http20Draft13.java b/AndroidAsync/src/com/koushikdutta/async/http/spdy/okhttp/internal/spdy/Http20Draft13.java index 88958ef..1425cbb 100644 --- a/AndroidAsync/src/com/koushikdutta/async/http/spdy/okhttp/internal/spdy/Http20Draft13.java +++ b/AndroidAsync/src/com/koushikdutta/async/http/spdy/okhttp/internal/spdy/Http20Draft13.java @@ -96,14 +96,16 @@ public final class Http20Draft13 implements Variant { final HpackDraft08.Reader hpackReader; @Override - public boolean canProcessFrame(ByteBufferList bb) { + public int canProcessFrame(ByteBufferList bb) { if (bb.remaining() < 4) - return false; + return 0; bb.order(ByteOrder.BIG_ENDIAN); int w1 = bb.peekInt(); short length = (short) ((w1 & 0x3fff0000) >> 16); // 14-bit unsigned == MAX_FRAME_SIZE - return bb.remaining() >= 8 + length; + if (bb.remaining() < 8 + length) + return 0; + return 8 + length; } Reader(BufferedSource source, int headerTableSize, boolean client) { diff --git a/AndroidAsync/src/com/koushikdutta/async/http/spdy/okhttp/internal/spdy/Settings.java b/AndroidAsync/src/com/koushikdutta/async/http/spdy/okhttp/internal/spdy/Settings.java index 4b332f6..1b96f64 100644 --- a/AndroidAsync/src/com/koushikdutta/async/http/spdy/okhttp/internal/spdy/Settings.java +++ b/AndroidAsync/src/com/koushikdutta/async/http/spdy/okhttp/internal/spdy/Settings.java @@ -53,7 +53,7 @@ public final class Settings { /** spdy/3: Retransmission rate. Percentage */ static final int DOWNLOAD_RETRANS_RATE = 6; /** Window size in bytes. */ - static final int INITIAL_WINDOW_SIZE = 7; + public static final int INITIAL_WINDOW_SIZE = 7; /** spdy/3: Window size in bytes. */ static final int CLIENT_CERTIFICATE_VECTOR_SIZE = 8; /** Flow control options. */ @@ -82,7 +82,7 @@ public final class Settings { Arrays.fill(values, 0); } - Settings set(int id, int idFlags, int value) { + public Settings set(int id, int idFlags, int value) { if (id >= values.length) { return this; // Discard unknown settings. } diff --git a/AndroidAsync/src/com/koushikdutta/async/http/spdy/okhttp/internal/spdy/Spdy3.java b/AndroidAsync/src/com/koushikdutta/async/http/spdy/okhttp/internal/spdy/Spdy3.java index 72a3df2..fe19935 100644 --- a/AndroidAsync/src/com/koushikdutta/async/http/spdy/okhttp/internal/spdy/Spdy3.java +++ b/AndroidAsync/src/com/koushikdutta/async/http/spdy/okhttp/internal/spdy/Spdy3.java @@ -128,15 +128,17 @@ public final class Spdy3 implements Variant { } @Override - public boolean canProcessFrame(ByteBufferList bb) { - if (bb.remaining() < 8) - return false; + public int canProcessFrame(ByteBufferList bb) { + if (source.buffer().size() + bb.remaining() < 8) + return 0; ByteBuffer peek = ByteBuffer.wrap(bb.peekBytes(8)).order(ByteOrder.BIG_ENDIAN); - peek.getInt(); + int w1 = peek.getInt(); int w2 = peek.getInt(); int length = (w2 & 0xffffff); - return bb.remaining() >= 8 + length; + if (bb.remaining() < 8 + length) + return 0; + return 8 + length; } /** diff --git a/AndroidAsync/src/com/koushikdutta/async/http/spdy/okio/Buffer.java b/AndroidAsync/src/com/koushikdutta/async/http/spdy/okio/Buffer.java index 4ac22b1..bb6852d 100644 --- a/AndroidAsync/src/com/koushikdutta/async/http/spdy/okio/Buffer.java +++ b/AndroidAsync/src/com/koushikdutta/async/http/spdy/okio/Buffer.java @@ -45,8 +45,8 @@ import static com.koushikdutta.async.http.spdy.okio.Util.reverseBytesLong; * This class avoids zero-fill and GC churn by pooling byte arrays. */ public final class Buffer implements BufferedSource, BufferedSink, Cloneable { - Segment head; - long size; + public Segment head; + public long size; public Buffer() { } diff --git a/AndroidAsync/src/com/koushikdutta/async/http/spdy/okio/InflaterSource.java b/AndroidAsync/src/com/koushikdutta/async/http/spdy/okio/InflaterSource.java index ce50796..76f7cc0 100644 --- a/AndroidAsync/src/com/koushikdutta/async/http/spdy/okio/InflaterSource.java +++ b/AndroidAsync/src/com/koushikdutta/async/http/spdy/okio/InflaterSource.java @@ -25,99 +25,99 @@ import java.util.zip.Inflater; * to decompress data read from another source. */ public final class InflaterSource implements Source { - private final BufferedSource source; - private final Inflater inflater; + private final BufferedSource source; + private final Inflater inflater; - /** - * When we call Inflater.setInput(), the inflater keeps our byte array until - * it needs input again. This tracks how many bytes the inflater is currently - * holding on to. - */ - private int bufferBytesHeldByInflater; - private boolean closed; + /** + * When we call Inflater.setInput(), the inflater keeps our byte array until + * it needs input again. This tracks how many bytes the inflater is currently + * holding on to. + */ + private int bufferBytesHeldByInflater; + private boolean closed; - public InflaterSource(Source source, Inflater inflater) { - this(Okio.buffer(source), inflater); - } + public InflaterSource(Source source, Inflater inflater) { + this(Okio.buffer(source), inflater); + } - /** - * This package-private constructor shares a buffer with its trusted caller. - * In general we can't share a BufferedSource because the inflater holds input - * bytes until they are inflated. - */ - InflaterSource(BufferedSource source, Inflater inflater) { - if (source == null) throw new IllegalArgumentException("source == null"); - if (inflater == null) throw new IllegalArgumentException("inflater == null"); - this.source = source; - this.inflater = inflater; - } + /** + * This package-private constructor shares a buffer with its trusted caller. + * In general we can't share a BufferedSource because the inflater holds input + * bytes until they are inflated. + */ + InflaterSource(BufferedSource source, Inflater inflater) { + if (source == null) throw new IllegalArgumentException("source == null"); + if (inflater == null) throw new IllegalArgumentException("inflater == null"); + this.source = source; + this.inflater = inflater; + } - @Override public long read( - Buffer sink, long byteCount) throws IOException { - if (byteCount < 0) throw new IllegalArgumentException("byteCount < 0: " + byteCount); - if (closed) throw new IllegalStateException("closed"); - if (byteCount == 0) return 0; + @Override public long read( + Buffer sink, long byteCount) throws IOException { + if (byteCount < 0) throw new IllegalArgumentException("byteCount < 0: " + byteCount); + if (closed) throw new IllegalStateException("closed"); + if (byteCount == 0) return 0; - while (true) { - boolean sourceExhausted = refill(); + while (true) { + boolean sourceExhausted = refill(); - // Decompress the inflater's compressed data into the sink. - try { - Segment tail = sink.writableSegment(1); - int bytesInflated = inflater.inflate(tail.data, tail.limit, Segment.SIZE - tail.limit); - if (bytesInflated > 0) { - tail.limit += bytesInflated; - sink.size += bytesInflated; - return bytesInflated; + // Decompress the inflater's compressed data into the sink. + try { + Segment tail = sink.writableSegment(1); + int bytesInflated = inflater.inflate(tail.data, tail.limit, Segment.SIZE - tail.limit); + if (bytesInflated > 0) { + tail.limit += bytesInflated; + sink.size += bytesInflated; + return bytesInflated; + } + if (inflater.finished() || inflater.needsDictionary()) { + releaseInflatedBytes(); + return -1; + } + if (sourceExhausted) throw new EOFException("source exhausted prematurely"); + } catch (DataFormatException e) { + throw new IOException(e); + } } - if (inflater.finished() || inflater.needsDictionary()) { - releaseInflatedBytes(); - return -1; - } - if (sourceExhausted) throw new EOFException("source exhausted prematurely"); - } catch (DataFormatException e) { - throw new IOException(e); - } } - } - /** - * Refills the inflater with compressed data if it needs input. (And only if - * it needs input). Returns true if the inflater required input but the source - * was exhausted. - */ - public boolean refill() throws IOException { - if (!inflater.needsInput()) return false; + /** + * Refills the inflater with compressed data if it needs input. (And only if + * it needs input). Returns true if the inflater required input but the source + * was exhausted. + */ + public boolean refill() throws IOException { + if (!inflater.needsInput()) return false; - releaseInflatedBytes(); - if (inflater.getRemaining() != 0) throw new IllegalStateException("?"); // TODO: possible? + releaseInflatedBytes(); + if (inflater.getRemaining() != 0) throw new IllegalStateException("?"); // TODO: possible? - // If there are compressed bytes in the source, assign them to the inflater. - if (source.exhausted()) return true; + // If there are compressed bytes in the source, assign them to the inflater. + if (source.exhausted()) return true; - // Assign buffer bytes to the inflater. - byte[] data = source.readByteArray(); - bufferBytesHeldByInflater = data.length; - inflater.setInput(data, 0, bufferBytesHeldByInflater); - return false; - } + // Assign buffer bytes to the inflater. + Segment head = source.buffer().head; + bufferBytesHeldByInflater = head.limit - head.pos; + inflater.setInput(head.data, head.pos, bufferBytesHeldByInflater); + return false; + } - /** When the inflater has processed compressed data, remove it from the buffer. */ - private void releaseInflatedBytes() throws IOException { - if (bufferBytesHeldByInflater == 0) return; - int toRelease = bufferBytesHeldByInflater - inflater.getRemaining(); - bufferBytesHeldByInflater -= toRelease; - source.skip(toRelease); - } + /** When the inflater has processed compressed data, remove it from the buffer. */ + private void releaseInflatedBytes() throws IOException { + if (bufferBytesHeldByInflater == 0) return; + int toRelease = bufferBytesHeldByInflater - inflater.getRemaining(); + bufferBytesHeldByInflater -= toRelease; + source.skip(toRelease); + } - @Override public Timeout timeout() { - return source.timeout(); - } + @Override public Timeout timeout() { + return source.timeout(); + } - @Override public void close() throws IOException { - if (closed) return; - inflater.end(); - closed = true; - source.close(); - } + @Override public void close() throws IOException { + if (closed) return; + inflater.end(); + closed = true; + source.close(); + } } diff --git a/AndroidAsync/src/com/koushikdutta/async/http/spdy/okio/Segment.java b/AndroidAsync/src/com/koushikdutta/async/http/spdy/okio/Segment.java index 501343a..9c289ef 100644 --- a/AndroidAsync/src/com/koushikdutta/async/http/spdy/okio/Segment.java +++ b/AndroidAsync/src/com/koushikdutta/async/http/spdy/okio/Segment.java @@ -31,13 +31,13 @@ public final class Segment { // TODO: Is 2 KiB a good default segment size? static final int SIZE = 2048; - final byte[] data = new byte[SIZE]; + public final byte[] data = new byte[SIZE]; /** The next byte of application data byte to read in this segment. */ - int pos; + public int pos; /** The first byte of available data ready to be written to. */ - int limit; + public int limit; /** Next segment in a linked or circularly-linked list. */ Segment next; diff --git a/AndroidAsync/src/com/koushikdutta/async/http/spdy/okio/SegmentPool.java b/AndroidAsync/src/com/koushikdutta/async/http/spdy/okio/SegmentPool.java index f410c8c..58d362b 100644 --- a/AndroidAsync/src/com/koushikdutta/async/http/spdy/okio/SegmentPool.java +++ b/AndroidAsync/src/com/koushikdutta/async/http/spdy/okio/SegmentPool.java @@ -51,7 +51,7 @@ public final class SegmentPool { return new Segment(); // Pool is empty. Don't zero-fill while holding a lock. } - void recycle(Segment segment) { + public void recycle(Segment segment) { if (segment.next != null || segment.prev != null) throw new IllegalArgumentException(); synchronized (this) { if (byteCount + Segment.SIZE > MAX_SIZE) return; // Pool is full. |