aboutsummaryrefslogtreecommitdiffstats
path: root/AndroidAsync
diff options
context:
space:
mode:
authorKoushik Dutta <koushd@gmail.com>2014-07-26 19:27:30 -0700
committerKoushik Dutta <koushd@gmail.com>2014-07-26 19:27:30 -0700
commitf9ac08876bac04052cce8b78fd05e5cf5fe2fe0e (patch)
treeff2323ebb0d21b2fd8e6f6a2022ca1bfe81d7648 /AndroidAsync
parent893442cba5cc5c20ea0be7953860513ab7e4e325 (diff)
downloadAndroidAsync-f9ac08876bac04052cce8b78fd05e5cf5fe2fe0e.tar.gz
AndroidAsync-f9ac08876bac04052cce8b78fd05e5cf5fe2fe0e.tar.bz2
AndroidAsync-f9ac08876bac04052cce8b78fd05e5cf5fe2fe0e.zip
spdy is working
Diffstat (limited to 'AndroidAsync')
-rw-r--r--AndroidAsync/src/com/koushikdutta/async/Util.java22
-rw-r--r--AndroidAsync/src/com/koushikdutta/async/http/AsyncSSLSocketMiddleware.java12
-rw-r--r--AndroidAsync/src/com/koushikdutta/async/http/spdy/AsyncSpdyConnection.java260
-rw-r--r--AndroidAsync/src/com/koushikdutta/async/http/spdy/ByteBufferListSink.java52
-rw-r--r--AndroidAsync/src/com/koushikdutta/async/http/spdy/ByteBufferListSource.java170
-rw-r--r--AndroidAsync/src/com/koushikdutta/async/http/spdy/SpdyMiddleware.java137
-rw-r--r--AndroidAsync/src/com/koushikdutta/async/http/spdy/SpdyTransport.java61
-rw-r--r--AndroidAsync/src/com/koushikdutta/async/http/spdy/okhttp/internal/spdy/FrameReader.java2
-rw-r--r--AndroidAsync/src/com/koushikdutta/async/http/spdy/okhttp/internal/spdy/Http20Draft13.java8
-rw-r--r--AndroidAsync/src/com/koushikdutta/async/http/spdy/okhttp/internal/spdy/Settings.java4
-rw-r--r--AndroidAsync/src/com/koushikdutta/async/http/spdy/okhttp/internal/spdy/Spdy3.java12
-rw-r--r--AndroidAsync/src/com/koushikdutta/async/http/spdy/okio/Buffer.java4
-rw-r--r--AndroidAsync/src/com/koushikdutta/async/http/spdy/okio/InflaterSource.java160
-rw-r--r--AndroidAsync/src/com/koushikdutta/async/http/spdy/okio/Segment.java6
-rw-r--r--AndroidAsync/src/com/koushikdutta/async/http/spdy/okio/SegmentPool.java2
15 files changed, 607 insertions, 305 deletions
diff --git a/AndroidAsync/src/com/koushikdutta/async/Util.java b/AndroidAsync/src/com/koushikdutta/async/Util.java
index 5dc0359..5e26662 100644
--- a/AndroidAsync/src/com/koushikdutta/async/Util.java
+++ b/AndroidAsync/src/com/koushikdutta/async/Util.java
@@ -222,4 +222,26 @@ public class Util {
}
return null;
}
+
+ public static void end(DataEmitter emitter, Exception e) {
+ if (emitter == null)
+ return;
+ end(emitter.getEndCallback(), e);
+ }
+
+ public static void end(CompletedCallback end, Exception e) {
+ if (end != null)
+ end.onCompleted(e);
+ }
+
+ public static void writable(DataSink emitter) {
+ if (emitter == null)
+ return;
+ writable(emitter.getWriteableCallback());
+ }
+
+ public static void writable(WritableCallback writable) {
+ if (writable != null)
+ writable.onWriteable();
+ }
}
diff --git a/AndroidAsync/src/com/koushikdutta/async/http/AsyncSSLSocketMiddleware.java b/AndroidAsync/src/com/koushikdutta/async/http/AsyncSSLSocketMiddleware.java
index 59fa0f7..01f3fbe 100644
--- a/AndroidAsync/src/com/koushikdutta/async/http/AsyncSSLSocketMiddleware.java
+++ b/AndroidAsync/src/com/koushikdutta/async/http/AsyncSSLSocketMiddleware.java
@@ -68,7 +68,7 @@ public class AsyncSSLSocketMiddleware extends AsyncSocketMiddleware {
return sslEngine;
}
- protected AsyncSSLSocketWrapper.HandshakeCallback createHandshakeCallback(final ConnectCallback callback) {
+ protected AsyncSSLSocketWrapper.HandshakeCallback createHandshakeCallback(GetSocketData data, final ConnectCallback callback) {
return new AsyncSSLSocketWrapper.HandshakeCallback() {
@Override
public void onHandshakeCompleted(Exception e, AsyncSSLSocket socket) {
@@ -77,15 +77,15 @@ public class AsyncSSLSocketMiddleware extends AsyncSocketMiddleware {
};
}
- protected void tryHandshake(final ConnectCallback callback, AsyncSocket socket, final Uri uri, final int port) {
+ protected void tryHandshake(AsyncSocket socket, GetSocketData data, final Uri uri, final int port, final ConnectCallback callback) {
AsyncSSLSocketWrapper.handshake(socket, uri.getHost(), port,
createConfiguredSSLEngine(uri.getHost(), port),
trustManagers, hostnameVerifier, true,
- createHandshakeCallback(callback));
+ createHandshakeCallback(data, callback));
}
@Override
- protected ConnectCallback wrapCallback(GetSocketData data, final Uri uri, final int port, final boolean proxied, final ConnectCallback callback) {
+ protected ConnectCallback wrapCallback(final GetSocketData data, final Uri uri, final int port, final boolean proxied, final ConnectCallback callback) {
return new ConnectCallback() {
@Override
public void onConnectCompleted(Exception ex, final AsyncSocket socket) {
@@ -95,7 +95,7 @@ public class AsyncSSLSocketMiddleware extends AsyncSocketMiddleware {
}
if (!proxied) {
- tryHandshake(callback, socket, uri, port);
+ tryHandshake(socket, data, uri, port, callback);
return;
}
@@ -127,7 +127,7 @@ public class AsyncSSLSocketMiddleware extends AsyncSocketMiddleware {
socket.setDataCallback(null);
socket.setEndCallback(null);
if (TextUtils.isEmpty(s.trim())) {
- tryHandshake(callback, socket, uri, port);
+ tryHandshake(socket, data, uri, port, callback);
}
else {
callback.onConnectCompleted(new IOException("unknown second status line"), socket);
diff --git a/AndroidAsync/src/com/koushikdutta/async/http/spdy/AsyncSpdyConnection.java b/AndroidAsync/src/com/koushikdutta/async/http/spdy/AsyncSpdyConnection.java
index 18e856c..066f92a 100644
--- a/AndroidAsync/src/com/koushikdutta/async/http/spdy/AsyncSpdyConnection.java
+++ b/AndroidAsync/src/com/koushikdutta/async/http/spdy/AsyncSpdyConnection.java
@@ -3,12 +3,15 @@ package com.koushikdutta.async.http.spdy;
import com.koushikdutta.async.AsyncServer;
import com.koushikdutta.async.AsyncSocket;
import com.koushikdutta.async.BufferedDataEmitter;
+import com.koushikdutta.async.BufferedDataSink;
import com.koushikdutta.async.ByteBufferList;
import com.koushikdutta.async.DataEmitter;
import com.koushikdutta.async.Util;
import com.koushikdutta.async.callback.CompletedCallback;
import com.koushikdutta.async.callback.DataCallback;
import com.koushikdutta.async.callback.WritableCallback;
+import com.koushikdutta.async.future.SimpleFuture;
+import com.koushikdutta.async.http.Headers;
import com.koushikdutta.async.http.Protocol;
import com.koushikdutta.async.http.spdy.okhttp.internal.spdy.ErrorCode;
import com.koushikdutta.async.http.spdy.okhttp.internal.spdy.FrameReader;
@@ -20,8 +23,12 @@ import com.koushikdutta.async.http.spdy.okhttp.internal.spdy.Ping;
import com.koushikdutta.async.http.spdy.okhttp.internal.spdy.Settings;
import com.koushikdutta.async.http.spdy.okhttp.internal.spdy.Spdy3;
import com.koushikdutta.async.http.spdy.okhttp.internal.spdy.Variant;
+import com.koushikdutta.async.http.spdy.okio.BufferedSink;
import com.koushikdutta.async.http.spdy.okio.BufferedSource;
import com.koushikdutta.async.http.spdy.okio.ByteString;
+import com.koushikdutta.async.http.spdy.okio.Okio;
+
+import junit.framework.Assert;
import java.io.IOException;
import java.util.Hashtable;
@@ -37,31 +44,126 @@ import static com.koushikdutta.async.http.spdy.okhttp.internal.spdy.Settings.DEF
public class AsyncSpdyConnection implements FrameReader.Handler {
BufferedDataEmitter emitter;
AsyncSocket socket;
+ BufferedDataSink bufferedSocket;
FrameReader reader;
FrameWriter writer;
Variant variant;
- SpdySocket zero = new SpdySocket(0, false, false, null);
+// SpdySocket zero = new SpdySocket(0, false, false, null);
ByteBufferListSource source = new ByteBufferListSource();
+ ByteBufferListSink sink = new ByteBufferListSink() {
+ @Override
+ public void flush() throws IOException {
+ AsyncSpdyConnection.this.flush();
+ }
+ };
+ BufferedSource bufferedSource;
+ BufferedSink bufferedSink;
Hashtable<Integer, SpdySocket> sockets = new Hashtable<Integer, SpdySocket>();
Protocol protocol;
boolean client = true;
- private class SpdySocket implements AsyncSocket {
+ public void flush() {
+ bufferedSocket.write(sink);
+ }
+
+ /**
+ * Returns a new locally-initiated stream.
+ *
+ * @param out true to create an output stream that we can use to send data to the remote peer.
+ * Corresponds to {@code FLAG_FIN}.
+ * @param in true to create an input stream that the remote peer can use to send data to us.
+ * Corresponds to {@code FLAG_UNIDIRECTIONAL}.
+ */
+ public SpdySocket newStream(List<Header> requestHeaders, boolean out, boolean in) throws IOException {
+ return newStream(0, requestHeaders, out, in);
+ }
+
+ private SpdySocket newStream(int associatedStreamId, List<Header> requestHeaders, boolean out,
+ boolean in) throws IOException {
+ boolean outFinished = !out;
+ boolean inFinished = !in;
+ SpdySocket socket;
+ int streamId;
+
+ if (shutdown) {
+ throw new IOException("shutdown");
+ }
+
+ streamId = nextStreamId;
+ nextStreamId += 2;
+ socket = new SpdySocket(streamId, outFinished, inFinished, requestHeaders);
+ if (socket.isOpen()) {
+ sockets.put(streamId, socket);
+// setIdle(false);
+ }
+ if (associatedStreamId == 0) {
+ writer.synStream(outFinished, inFinished, streamId, associatedStreamId,
+ requestHeaders);
+ } else if (client) {
+ throw new IllegalArgumentException("client streams shouldn't have associated stream IDs");
+ } else { // HTTP/2 has a PUSH_PROMISE frame.
+ writer.pushPromise(associatedStreamId, streamId, requestHeaders);
+ }
+
+ if (!out) {
+ writer.flush();
+ }
+
+ return socket;
+ }
+
+ int totalWindowRead;
+ void updateWindowRead(int length) {
+ totalWindowRead += length;
+ if (totalWindowRead >= okHttpSettings.getInitialWindowSize(DEFAULT_INITIAL_WINDOW_SIZE) / 2) {
+ try {
+ writer.windowUpdate(0, totalWindowRead);
+ }
+ catch (IOException e) {
+ throw new AssertionError(e);
+ }
+ totalWindowRead = 0;
+ }
+ }
+
+ public class SpdySocket implements AsyncSocket {
long bytesLeftInWriteWindow;
WritableCallback writable;
final int id;
CompletedCallback closedCallback;
CompletedCallback endCallback;
DataCallback dataCallback;
- ByteBufferList pending = new ByteBufferList();
+ ByteBufferListSink pending = new ByteBufferListSink();
+ SimpleFuture<List<Header>> headers = new SimpleFuture<List<Header>>();
+ boolean isOpen = true;
+ int totalWindowRead;
- public SpdySocket(int id, boolean outFinished, boolean inFinished, List<Header> headerBlock) {
- this.id = id;
+ public SimpleFuture<List<Header>> headers() {
+ return headers;
}
- private void report(Exception e) {
- if (endCallback != null)
- endCallback.onCompleted(e);
+ void updateWindowRead(int length) {
+ totalWindowRead += length;
+ if (totalWindowRead >= okHttpSettings.getInitialWindowSize(DEFAULT_INITIAL_WINDOW_SIZE) / 2) {
+ try {
+ writer.windowUpdate(id, totalWindowRead);
+ }
+ catch (IOException e) {
+ throw new AssertionError(e);
+ }
+ totalWindowRead = 0;
+ }
+ AsyncSpdyConnection.this.updateWindowRead(length);
+ }
+
+ public SpdySocket(int id, boolean outFinished, boolean inFinished, List<Header> headerBlock) {
+ this.id = id;
+ try {
+ writer.windowUpdate(id, DEFAULT_INITIAL_WINDOW_SIZE);
+ }
+ catch (IOException e) {
+ throw new AssertionError(e);
+ }
}
public boolean isLocallyInitiated() {
@@ -72,8 +174,8 @@ public class AsyncSpdyConnection implements FrameReader.Handler {
public void addBytesToWriteWindow(long delta) {
long prev = bytesLeftInWriteWindow;
bytesLeftInWriteWindow += delta;
- if (writable != null && bytesLeftInWriteWindow > 0 && prev <= 0)
- writable.onWriteable();
+ if (bytesLeftInWriteWindow > 0 && prev <= 0)
+ Util.writable(writable);
}
@Override
@@ -109,7 +211,7 @@ public class AsyncSpdyConnection implements FrameReader.Handler {
@Override
public void close() {
-
+ isOpen = false;
}
@Override
@@ -134,7 +236,7 @@ public class AsyncSpdyConnection implements FrameReader.Handler {
@Override
public void write(ByteBufferList bb) {
-
+ System.out.println("writing!");
}
@Override
@@ -149,7 +251,7 @@ public class AsyncSpdyConnection implements FrameReader.Handler {
@Override
public boolean isOpen() {
- return true;
+ return isOpen;
}
@Override
@@ -165,11 +267,20 @@ public class AsyncSpdyConnection implements FrameReader.Handler {
public CompletedCallback getClosedCallback() {
return closedCallback;
}
+
+ public void receiveHeaders(List<Header> headers, HeadersMode headerMode) {
+ this.headers.setComplete(headers);
+ }
}
+ final Settings okHttpSettings = new Settings();
+ private int nextPingId;
+ private static final int OKHTTP_CLIENT_WINDOW_SIZE = 16 * 1024 * 1024;
+
public AsyncSpdyConnection(AsyncSocket socket, Protocol protocol) {
this.protocol = protocol;
this.socket = socket;
+ this.bufferedSocket = new BufferedDataSink(socket);
emitter = new BufferedDataEmitter(socket);
emitter.setDataCallback(callback);
@@ -179,24 +290,53 @@ public class AsyncSpdyConnection implements FrameReader.Handler {
else if (protocol == Protocol.HTTP_2) {
variant = new Http20Draft13();
}
- reader = variant.newReader(source, true);
+ reader = variant.newReader(bufferedSource = Okio.buffer(source), true);
+ writer = variant.newWriter(bufferedSink = Okio.buffer(sink), true);
+
+ boolean client = true;
+ nextStreamId = client ? 1 : 2;
+ if (client && protocol == Protocol.HTTP_2) {
+ nextStreamId += 2; // In HTTP/2, 1 on client is reserved for Upgrade.
+ }
+ nextPingId = client ? 1 : 2;
+ // Flow control was designed more for servers, or proxies than edge clients.
+ // If we are a client, set the flow control window to 16MiB. This avoids
+ // thrashing window updates every 64KiB, yet small enough to avoid blowing
+ // up the heap.
+ if (client) {
+ okHttpSettings.set(Settings.INITIAL_WINDOW_SIZE, 0, OKHTTP_CLIENT_WINDOW_SIZE);
+ }
}
DataCallback callback = new DataCallback() {
@Override
public void onDataAvailable(DataEmitter emitter, ByteBufferList bb) {
- bb.get(source);
- if (!reader.canProcessFrame(source))
- return;
- try {
- reader.nextFrame(AsyncSpdyConnection.this);
- }
- catch (IOException e) {
- throw new AssertionError(e);
+ int needed;
+ while ((needed = reader.canProcessFrame(bb)) > 0) {
+ bb.get(source, needed);
+ try {
+ reader.nextFrame(AsyncSpdyConnection.this);
+ }
+ catch (IOException e) {
+ throw new AssertionError(e);
+ }
}
}
};
+ /**
+ * Sends a connection header if the current variant requires it. This should
+ * be called after {@link Builder#build} for all new connections.
+ */
+ public void sendConnectionPreface() throws IOException {
+ writer.connectionPreface();
+ writer.settings(okHttpSettings);
+ int windowSize = okHttpSettings.getInitialWindowSize(Settings.DEFAULT_INITIAL_WINDOW_SIZE);
+ if (windowSize != Settings.DEFAULT_INITIAL_WINDOW_SIZE) {
+ writer.windowUpdate(0, windowSize - Settings.DEFAULT_INITIAL_WINDOW_SIZE);
+ }
+ }
+
/** Even, positive numbered streams are pushed streams in HTTP/2. */
private boolean pushedStream(int streamId) {
return protocol == Protocol.HTTP_2 && streamId != 0 && (streamId & 1) == 0;
@@ -215,12 +355,16 @@ public class AsyncSpdyConnection implements FrameReader.Handler {
source.skip(length);
return;
}
- if (source != this.source)
+ if (source != this.bufferedSource || this.source.remaining() + source.buffer().size() != length)
throw new AssertionError();
- this.source.get(socket.pending, length);
+ source.buffer().readAll(socket.pending);
+ this.source.get(socket.pending);
+ socket.updateWindowRead(length);
Util.emitAllData(socket, socket.pending);
if (inFinished) {
- socket.report(null);
+ sockets.remove(streamId);
+ socket.close();
+ Util.end(socket, null);
}
}
@@ -228,7 +372,6 @@ public class AsyncSpdyConnection implements FrameReader.Handler {
private int nextStreamId;
@Override
public void headers(boolean outFinished, boolean inFinished, int streamId, int associatedStreamId, List<Header> headerBlock, HeadersMode headersMode) {
- /*
if (pushedStream(streamId)) {
throw new AssertionError("push");
// pushHeadersLater(streamId, headerBlock, inFinished);
@@ -258,25 +401,34 @@ public class AsyncSpdyConnection implements FrameReader.Handler {
// If the stream ID is in the client's namespace, assume it's already closed.
if (streamId % 2 == nextStreamId % 2) return;
+ throw new AssertionError("unexpected receive stream");
+
// Create a stream.
- socket = new SpdySocket(streamId, outFinished, inFinished, headerBlock);
- lastGoodStreamId = streamId;
- sockets.put(streamId, socket);
- handler.receive(newStream);
- return;
+// socket = new SpdySocket(streamId, outFinished, inFinished, headerBlock);
+// lastGoodStreamId = streamId;
+// sockets.put(streamId, socket);
+// handler.receive(newStream);
+// return;
}
// The headers claim to be for a new stream, but we already have one.
if (headersMode.failIfStreamPresent()) {
- stream.closeLater(ErrorCode.PROTOCOL_ERROR);
- removeStream(streamId);
+ try {
+ writer.rstStream(streamId, ErrorCode.INVALID_STREAM);
+ }
+ catch (IOException e) {
+ throw new AssertionError(e);
+ }
+ sockets.remove(streamId);
return;
}
// Update an existing stream.
- stream.receiveHeaders(headerBlock, headersMode);
- if (inFinished) stream.receiveFin();
- */
+ socket.receiveHeaders(headerBlock, headersMode);
+ if (inFinished) {
+ sockets.remove(streamId);
+ Util.end(socket, null);
+ }
}
@Override
@@ -288,10 +440,11 @@ public class AsyncSpdyConnection implements FrameReader.Handler {
}
SpdySocket rstStream = sockets.remove(streamId);
if (rstStream != null) {
- rstStream.report(new IOException(errorCode.toString()));
+ Util.end(rstStream, new IOException(errorCode.toString()));
}
}
+ long bytesLeftInWriteWindow;
Settings peerSettings = new Settings();
private boolean receivedInitialPeerSettings = false;
@Override
@@ -310,7 +463,7 @@ public class AsyncSpdyConnection implements FrameReader.Handler {
if (peerInitialWindowSize != -1 && peerInitialWindowSize != priorWriteWindowSize) {
delta = peerInitialWindowSize - priorWriteWindowSize;
if (!receivedInitialPeerSettings) {
- zero.addBytesToWriteWindow(delta);
+ addBytesToWriteWindow(delta);
receivedInitialPeerSettings = true;
}
}
@@ -319,8 +472,21 @@ public class AsyncSpdyConnection implements FrameReader.Handler {
}
}
+ void addBytesToWriteWindow(long delta) {
+ bytesLeftInWriteWindow += delta;
+ for (SpdySocket socket: sockets.values()) {
+ Util.writable(socket);
+ }
+ }
+
@Override
public void ackSettings() {
+ try {
+ writer.ackSettings();
+ }
+ catch (IOException e) {
+ throw new AssertionError(e);
+ }
}
private Map<Integer, Ping> pings;
@@ -362,7 +528,7 @@ public class AsyncSpdyConnection implements FrameReader.Handler {
Map.Entry<Integer, SpdySocket> entry = i.next();
int streamId = entry.getKey();
if (streamId > lastGoodStreamId && entry.getValue().isLocallyInitiated()) {
- entry.getValue().report(new IOException(ErrorCode.REFUSED_STREAM.toString()));
+ Util.end(entry.getValue(), new IOException(ErrorCode.REFUSED_STREAM.toString()));
i.remove();
}
}
@@ -370,25 +536,25 @@ public class AsyncSpdyConnection implements FrameReader.Handler {
@Override
public void windowUpdate(int streamId, long windowSizeIncrement) {
- System.out.println("fff");
-
+ if (streamId == 0) {
+ addBytesToWriteWindow(windowSizeIncrement);
+ return;
+ }
+ SpdySocket socket = sockets.get(streamId);
+ if (socket != null)
+ socket.addBytesToWriteWindow(windowSizeIncrement);
}
@Override
public void priority(int streamId, int streamDependency, int weight, boolean exclusive) {
- System.out.println("fff");
-
}
@Override
public void pushPromise(int streamId, int promisedStreamId, List<Header> requestHeaders) throws IOException {
- System.out.println("fff");
-
+ throw new AssertionError("pushPromise");
}
@Override
public void alternateService(int streamId, String origin, ByteString protocol, String host, int port, long maxAge) {
- System.out.println("fff");
-
}
}
diff --git a/AndroidAsync/src/com/koushikdutta/async/http/spdy/ByteBufferListSink.java b/AndroidAsync/src/com/koushikdutta/async/http/spdy/ByteBufferListSink.java
new file mode 100644
index 0000000..aa9da9a
--- /dev/null
+++ b/AndroidAsync/src/com/koushikdutta/async/http/spdy/ByteBufferListSink.java
@@ -0,0 +1,52 @@
+package com.koushikdutta.async.http.spdy;
+
+import com.koushikdutta.async.ByteBufferList;
+import com.koushikdutta.async.http.spdy.okio.Buffer;
+import com.koushikdutta.async.http.spdy.okio.Segment;
+import com.koushikdutta.async.http.spdy.okio.SegmentPool;
+import com.koushikdutta.async.http.spdy.okio.Sink;
+import com.koushikdutta.async.http.spdy.okio.Timeout;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+/**
+ * Created by koush on 7/25/14.
+ */
+public class ByteBufferListSink extends ByteBufferList implements Sink {
+ @Override
+ public void write(Buffer source, long byteCount) throws IOException {
+ Segment s = source.head;
+ while (byteCount > 0) {
+ int toCopy = (int) Math.min(byteCount, s.limit - s.pos);
+ ByteBuffer b = obtain(toCopy);
+ b.put(s.data, s.pos, toCopy);
+ b.flip();
+ add(b);
+
+ s.pos += toCopy;
+ source.size -= toCopy;
+ byteCount -= toCopy;
+
+ if (s.pos == s.limit) {
+ Segment toRecycle = s;
+ source.head = s = toRecycle.pop();
+ SegmentPool.getInstance().recycle(toRecycle);
+ }
+ }
+ }
+
+ @Override
+ public void flush() throws IOException {
+ }
+
+ @Override
+ public Timeout timeout() {
+ return Timeout.NONE;
+ }
+
+ @Override
+ public void close() throws IOException {
+ recycle();
+ }
+}
diff --git a/AndroidAsync/src/com/koushikdutta/async/http/spdy/ByteBufferListSource.java b/AndroidAsync/src/com/koushikdutta/async/http/spdy/ByteBufferListSource.java
index 5d57c77..13803b3 100644
--- a/AndroidAsync/src/com/koushikdutta/async/http/spdy/ByteBufferListSource.java
+++ b/AndroidAsync/src/com/koushikdutta/async/http/spdy/ByteBufferListSource.java
@@ -2,173 +2,39 @@ package com.koushikdutta.async.http.spdy;
import com.koushikdutta.async.ByteBufferList;
import com.koushikdutta.async.http.spdy.okio.Buffer;
-import com.koushikdutta.async.http.spdy.okio.BufferedSource;
-import com.koushikdutta.async.http.spdy.okio.ByteString;
-import com.koushikdutta.async.http.spdy.okio.Sink;
+import com.koushikdutta.async.http.spdy.okio.Source;
import com.koushikdutta.async.http.spdy.okio.Timeout;
-import com.koushikdutta.async.util.Charsets;
import java.io.IOException;
-import java.io.InputStream;
-import java.nio.ByteOrder;
-import java.nio.charset.Charset;
+import java.nio.ByteBuffer;
/**
- * Created by koush on 7/17/14.
+ * Created by koush on 7/25/14.
*/
-public class ByteBufferListSource extends ByteBufferList implements BufferedSource {
- @Override
- public Buffer buffer() {
- return null;
- }
-
- @Override
- public boolean exhausted() throws IOException {
- return !hasRemaining();
- }
-
- @Override
- public void require(long byteCount) throws IOException {
- if (remaining() < byteCount)
- throw new AssertionError("out of data");
- }
-
- @Override
- public byte readByte() throws IOException {
- return order(ByteOrder.BIG_ENDIAN).get();
- }
-
- @Override
- public short readShort() throws IOException {
- return order(ByteOrder.BIG_ENDIAN).getShort();
- }
-
- @Override
- public short readShortLe() throws IOException {
- return order(ByteOrder.LITTLE_ENDIAN).getShort();
- }
-
- @Override
- public int readInt() throws IOException {
- return order(ByteOrder.BIG_ENDIAN).getInt();
- }
-
- @Override
- public int readIntLe() throws IOException {
- return order(ByteOrder.LITTLE_ENDIAN).getInt();
- }
-
- @Override
- public long readLong() throws IOException {
- return order(ByteOrder.BIG_ENDIAN).getLong();
- }
-
- @Override
- public long readLongLe() throws IOException {
- return order(ByteOrder.LITTLE_ENDIAN).getLong();
- }
-
- @Override
- public void skip(long byteCount) throws IOException {
- if (byteCount > Integer.MAX_VALUE)
- throw new AssertionError("too much skippy, use less peanut butter");
- read(new byte[(int)byteCount]);
- }
-
- @Override
- public ByteString readByteString() throws IOException {
- return readByteString(remaining());
- }
-
- @Override
- public ByteString readByteString(long byteCount) throws IOException {
- return ByteString.of(readByteArray(byteCount));
- }
-
- @Override
- public byte[] readByteArray() throws IOException {
- return getAllByteArray();
- }
-
- @Override
- public byte[] readByteArray(long byteCount) throws IOException {
- byte[] ret = new byte[(int)byteCount];
- get(ret);
- return ret;
- }
-
- @Override
- public int read(byte[] sink) throws IOException {
- return read(sink, 0, sink.length);
- }
-
- @Override
- public void readFully(byte[] sink) throws IOException {
- read(sink, 0, sink.length);
- }
-
- @Override
- public int read(byte[] sink, int offset, int byteCount) throws IOException {
- get(sink, offset, byteCount);
- return byteCount;
- }
-
- @Override
- public void readFully(Buffer sink, long byteCount) throws IOException {
- throw new AssertionError("not implemented");
- }
-
- @Override
- public long readAll(Sink sink) throws IOException {
- throw new AssertionError("not implemented");
- }
-
- @Override
- public String readUtf8() throws IOException {
- return readUtf8(remaining());
- }
-
- @Override
- public String readUtf8(long byteCount) throws IOException {
- return new String(readByteArray(byteCount), Charsets.UTF_8);
- }
-
- @Override
- public String readUtf8Line() throws IOException {
- throw new AssertionError("not implemented");
- }
-
- @Override
- public String readUtf8LineStrict() throws IOException {
- throw new AssertionError("not implemented");
- }
-
- @Override
- public String readString(long byteCount, Charset charset) throws IOException {
- return new String(readByteArray(byteCount), charset);
- }
-
- @Override
- public long indexOf(byte b) throws IOException {
- throw new AssertionError("not implemented");
- }
-
- @Override
- public InputStream inputStream() {
- throw new AssertionError("not implemented");
- }
-
+public class ByteBufferListSource extends ByteBufferList implements Source {
@Override
public long read(Buffer sink, long byteCount) throws IOException {
- throw new AssertionError("not implemented");
+ if (!hasRemaining())
+ throw new AssertionError("empty!");
+ int total = 0;
+ while (total < byteCount && hasRemaining()) {
+ ByteBuffer b = remove();
+ int toRead = (int)Math.min(byteCount - total, b.remaining());
+ total += toRead;
+ sink.write(b.array(), b.arrayOffset() + b.position(), toRead);
+ b.position(b.position() + toRead);
+ addFirst(b);
+ }
+ return total;
}
@Override
public Timeout timeout() {
- return null;
+ return Timeout.NONE;
}
@Override
public void close() throws IOException {
+ recycle();
}
}
diff --git a/AndroidAsync/src/com/koushikdutta/async/http/spdy/SpdyMiddleware.java b/AndroidAsync/src/com/koushikdutta/async/http/spdy/SpdyMiddleware.java
index 35d4257..3ebe802 100644
--- a/AndroidAsync/src/com/koushikdutta/async/http/spdy/SpdyMiddleware.java
+++ b/AndroidAsync/src/com/koushikdutta/async/http/spdy/SpdyMiddleware.java
@@ -1,19 +1,32 @@
package com.koushikdutta.async.http.spdy;
+import android.net.Uri;
+
import com.koushikdutta.async.AsyncSSLSocket;
import com.koushikdutta.async.AsyncSSLSocketWrapper;
import com.koushikdutta.async.ByteBufferList;
import com.koushikdutta.async.callback.ConnectCallback;
import com.koushikdutta.async.future.Cancellable;
+import com.koushikdutta.async.future.FutureCallback;
+import com.koushikdutta.async.future.SimpleCancellable;
+import com.koushikdutta.async.future.TransformFuture;
import com.koushikdutta.async.http.AsyncHttpClient;
+import com.koushikdutta.async.http.AsyncHttpClientMiddleware;
import com.koushikdutta.async.http.AsyncSSLEngineConfigurator;
import com.koushikdutta.async.http.AsyncSSLSocketMiddleware;
+import com.koushikdutta.async.http.Headers;
+import com.koushikdutta.async.http.Multimap;
import com.koushikdutta.async.http.Protocol;
+import com.koushikdutta.async.http.spdy.okhttp.internal.spdy.Header;
+import com.koushikdutta.async.http.spdy.okhttp.internal.spdy.SpdyConnection;
import com.koushikdutta.async.util.Charsets;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Hashtable;
+import java.util.List;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLEngine;
@@ -41,8 +54,15 @@ public class SpdyMiddleware extends AsyncSSLSocketMiddleware {
return ret;
}
+ private static String requestPath(Uri uri) {
+ String pathAndQuery = uri.getPath();
+ if (pathAndQuery == null) return "/";
+ if (!pathAndQuery.startsWith("/")) return "/" + pathAndQuery;
+ return pathAndQuery;
+ }
+
@Override
- protected AsyncSSLSocketWrapper.HandshakeCallback createHandshakeCallback(final ConnectCallback callback) {
+ protected AsyncSSLSocketWrapper.HandshakeCallback createHandshakeCallback(final GetSocketData data, final ConnectCallback callback) {
return new AsyncSSLSocketWrapper.HandshakeCallback() {
@Override
public void onHandshakeCompleted(Exception e, AsyncSSLSocket socket) {
@@ -53,8 +73,23 @@ public class SpdyMiddleware extends AsyncSSLSocketMiddleware {
try {
long ptr = (Long)sslNativePointer.get(socket.getSSLEngine());
byte[] proto = (byte[])nativeGetAlpnNegotiatedProtocol.invoke(null, ptr);
+ if (proto == null) {
+ callback.onConnectCompleted(null, socket);
+ return;
+ }
String protoString = new String(proto);
- AsyncSpdyConnection connection = new AsyncSpdyConnection(socket, Protocol.get(protoString));
+ Protocol p = Protocol.get(protoString);
+ if (p == null) {
+ callback.onConnectCompleted(null, socket);
+ return;
+ }
+ final AsyncSpdyConnection connection = new AsyncSpdyConnection(socket, Protocol.get(protoString));
+ connection.sendConnectionPreface();
+ connection.flush();
+
+ connections.put(data.request.getUri().getHost(), connection);
+
+ newSocket(data, connection, callback);
}
catch (Exception ex) {
socket.close();
@@ -64,6 +99,83 @@ public class SpdyMiddleware extends AsyncSSLSocketMiddleware {
};
}
+ private void newSocket(GetSocketData data, final AsyncSpdyConnection connection, final ConnectCallback callback) {
+ final ArrayList<Header> headers = new ArrayList<Header>();
+ headers.add(new Header(Header.TARGET_METHOD, data.request.getMethod()));
+ headers.add(new Header(Header.TARGET_PATH, requestPath(data.request.getUri())));
+ String host = data.request.getHeaders().get("Host");
+ if (Protocol.SPDY_3 == connection.protocol) {
+ headers.add(new Header(Header.VERSION, "HTTP/1.1"));
+ headers.add(new Header(Header.TARGET_HOST, host));
+ } else if (Protocol.HTTP_2 == connection.protocol) {
+ headers.add(new Header(Header.TARGET_AUTHORITY, host)); // Optional in HTTP/2
+ } else {
+ throw new AssertionError();
+ }
+ headers.add(new Header(Header.TARGET_SCHEME, data.request.getUri().getScheme()));
+
+ Multimap mm = data.request.getHeaders().getMultiMap();
+ for (String key: mm.keySet()) {
+ if (SpdyTransport.isProhibitedHeader(connection.protocol, key))
+ continue;
+ for (String value: mm.get(key)) {
+ headers.add(new Header(key.toLowerCase(), value));
+ }
+ }
+
+ connection.socket.getServer().postDelayed(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ AsyncSpdyConnection.SpdySocket spdy = connection.newStream(headers, false, true);
+ connection.flush();
+ callback.onConnectCompleted(null, spdy);
+ }
+ catch (Exception e) {
+ throw new AssertionError(e);
+ }
+ }
+ }, 1000);
+ }
+
+ @Override
+ public boolean exchangeHeaders(final ExchangeHeaderData data) {
+ if (!(data.socket instanceof AsyncSpdyConnection.SpdySocket))
+ return false;
+
+ // headers were already sent as part of the socket being opened.
+ data.sendHeadersCallback.onCompleted(null);
+
+ final AsyncSpdyConnection.SpdySocket spdySocket = (AsyncSpdyConnection.SpdySocket)data.socket;
+ spdySocket.headers()
+ .then(new TransformFuture<Headers, List<Header>>() {
+ @Override
+ protected void transform(List<Header> result) throws Exception {
+ Headers headers = new Headers();
+ for (Header header: result) {
+ String key = header.name.utf8();
+ String value = header.value.utf8();
+ headers.add(key, value);
+ }
+ String status = headers.remove(Header.RESPONSE_STATUS.utf8());
+ String[] statusParts = status.split(" ", 2);
+ data.response.code(Integer.parseInt(statusParts[0]));
+ data.response.message(statusParts[1]);
+ data.response.protocol(headers.remove(Header.VERSION.utf8()));
+ data.response.headers(headers);
+ setComplete(headers);
+ }
+ })
+ .setCallback(new FutureCallback<Headers>() {
+ @Override
+ public void onCompleted(Exception e, Headers result) {
+ data.receiveHeadersCallback.onCompleted(e);
+ data.response.emitter(spdySocket);
+ }
+ });
+ return true;
+ }
+
private void configure(SSLEngine engine, String host, int port) {
if (!initialized) {
initialized = true;
@@ -144,6 +256,7 @@ public class SpdyMiddleware extends AsyncSSLSocketMiddleware {
Field useSni;
Method nativeGetNpnNegotiatedProtocol;
Method nativeGetAlpnNegotiatedProtocol;
+ Hashtable<String, AsyncSpdyConnection> connections = new Hashtable<String, AsyncSpdyConnection>();
@Override
public void setSSLContext(SSLContext sslContext) {
@@ -153,6 +266,24 @@ public class SpdyMiddleware extends AsyncSSLSocketMiddleware {
@Override
public Cancellable getSocket(GetSocketData data) {
- return super.getSocket(data);
+ final Uri uri = data.request.getUri();
+ final int port = getSchemePort(data.request.getUri());
+ if (port == -1) {
+ return null;
+ }
+
+ // can we use an existing connection to satisfy this, or do we need a new one?
+ String host = uri.getHost();
+ AsyncSpdyConnection conn = connections.get(host);
+ if (conn == null || !conn.socket.isOpen()) {
+ connections.remove(host);
+ return super.getSocket(data);
+ }
+
+ newSocket(data, conn, data.connectCallback);
+
+ SimpleCancellable ret = new SimpleCancellable();
+ ret.setComplete();
+ return ret;
}
} \ No newline at end of file
diff --git a/AndroidAsync/src/com/koushikdutta/async/http/spdy/SpdyTransport.java b/AndroidAsync/src/com/koushikdutta/async/http/spdy/SpdyTransport.java
new file mode 100644
index 0000000..e915a06
--- /dev/null
+++ b/AndroidAsync/src/com/koushikdutta/async/http/spdy/SpdyTransport.java
@@ -0,0 +1,61 @@
+/*
+ * Copyright (C) 2012 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License";
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.koushikdutta.async.http.spdy;
+
+
+import com.koushikdutta.async.http.Protocol;
+import com.koushikdutta.async.http.spdy.okhttp.internal.Util;
+import com.koushikdutta.async.http.spdy.okio.ByteString;
+
+import java.util.List;
+
+
+final class SpdyTransport {
+ /** See http://www.chromium.org/spdy/spdy-protocol/spdy-protocol-draft3-1#TOC-3.2.1-Request. */
+ private static final List<String> SPDY_3_PROHIBITED_HEADERS = Util.immutableList(
+ "accept-encoding",
+ "user-agent",
+ "accept",
+
+ "connection",
+ "host",
+ "keep-alive",
+ "proxy-connection",
+ "transfer-encoding");
+
+ /** See http://tools.ietf.org/html/draft-ietf-httpbis-http2-09#section-8.1.3. */
+ private static final List<String> HTTP_2_PROHIBITED_HEADERS = Util.immutableList(
+ "connection",
+ "host",
+ "keep-alive",
+ "proxy-connection",
+ "te",
+ "transfer-encoding",
+ "encoding",
+ "upgrade");
+
+ /** When true, this header should not be emitted or consumed. */
+ static boolean isProhibitedHeader(Protocol protocol, String name) {
+ if (protocol == Protocol.SPDY_3) {
+ return SPDY_3_PROHIBITED_HEADERS.contains(name.toLowerCase());
+ } else if (protocol == Protocol.HTTP_2) {
+ return HTTP_2_PROHIBITED_HEADERS.contains(name.toLowerCase());
+ } else {
+ throw new AssertionError(protocol);
+ }
+ }
+}
diff --git a/AndroidAsync/src/com/koushikdutta/async/http/spdy/okhttp/internal/spdy/FrameReader.java b/AndroidAsync/src/com/koushikdutta/async/http/spdy/okhttp/internal/spdy/FrameReader.java
index 5305f63..19b6b77 100644
--- a/AndroidAsync/src/com/koushikdutta/async/http/spdy/okhttp/internal/spdy/FrameReader.java
+++ b/AndroidAsync/src/com/koushikdutta/async/http/spdy/okhttp/internal/spdy/FrameReader.java
@@ -26,7 +26,7 @@ import java.util.List;
/** Reads transport frames for SPDY/3 or HTTP/2. */
public interface FrameReader extends Closeable {
- boolean canProcessFrame(ByteBufferList bb);
+ int canProcessFrame(ByteBufferList bb);
void readConnectionPreface() throws IOException;
boolean nextFrame(Handler handler) throws IOException;
diff --git a/AndroidAsync/src/com/koushikdutta/async/http/spdy/okhttp/internal/spdy/Http20Draft13.java b/AndroidAsync/src/com/koushikdutta/async/http/spdy/okhttp/internal/spdy/Http20Draft13.java
index 88958ef..1425cbb 100644
--- a/AndroidAsync/src/com/koushikdutta/async/http/spdy/okhttp/internal/spdy/Http20Draft13.java
+++ b/AndroidAsync/src/com/koushikdutta/async/http/spdy/okhttp/internal/spdy/Http20Draft13.java
@@ -96,14 +96,16 @@ public final class Http20Draft13 implements Variant {
final HpackDraft08.Reader hpackReader;
@Override
- public boolean canProcessFrame(ByteBufferList bb) {
+ public int canProcessFrame(ByteBufferList bb) {
if (bb.remaining() < 4)
- return false;
+ return 0;
bb.order(ByteOrder.BIG_ENDIAN);
int w1 = bb.peekInt();
short length = (short) ((w1 & 0x3fff0000) >> 16); // 14-bit unsigned == MAX_FRAME_SIZE
- return bb.remaining() >= 8 + length;
+ if (bb.remaining() < 8 + length)
+ return 0;
+ return 8 + length;
}
Reader(BufferedSource source, int headerTableSize, boolean client) {
diff --git a/AndroidAsync/src/com/koushikdutta/async/http/spdy/okhttp/internal/spdy/Settings.java b/AndroidAsync/src/com/koushikdutta/async/http/spdy/okhttp/internal/spdy/Settings.java
index 4b332f6..1b96f64 100644
--- a/AndroidAsync/src/com/koushikdutta/async/http/spdy/okhttp/internal/spdy/Settings.java
+++ b/AndroidAsync/src/com/koushikdutta/async/http/spdy/okhttp/internal/spdy/Settings.java
@@ -53,7 +53,7 @@ public final class Settings {
/** spdy/3: Retransmission rate. Percentage */
static final int DOWNLOAD_RETRANS_RATE = 6;
/** Window size in bytes. */
- static final int INITIAL_WINDOW_SIZE = 7;
+ public static final int INITIAL_WINDOW_SIZE = 7;
/** spdy/3: Window size in bytes. */
static final int CLIENT_CERTIFICATE_VECTOR_SIZE = 8;
/** Flow control options. */
@@ -82,7 +82,7 @@ public final class Settings {
Arrays.fill(values, 0);
}
- Settings set(int id, int idFlags, int value) {
+ public Settings set(int id, int idFlags, int value) {
if (id >= values.length) {
return this; // Discard unknown settings.
}
diff --git a/AndroidAsync/src/com/koushikdutta/async/http/spdy/okhttp/internal/spdy/Spdy3.java b/AndroidAsync/src/com/koushikdutta/async/http/spdy/okhttp/internal/spdy/Spdy3.java
index 72a3df2..fe19935 100644
--- a/AndroidAsync/src/com/koushikdutta/async/http/spdy/okhttp/internal/spdy/Spdy3.java
+++ b/AndroidAsync/src/com/koushikdutta/async/http/spdy/okhttp/internal/spdy/Spdy3.java
@@ -128,15 +128,17 @@ public final class Spdy3 implements Variant {
}
@Override
- public boolean canProcessFrame(ByteBufferList bb) {
- if (bb.remaining() < 8)
- return false;
+ public int canProcessFrame(ByteBufferList bb) {
+ if (source.buffer().size() + bb.remaining() < 8)
+ return 0;
ByteBuffer peek = ByteBuffer.wrap(bb.peekBytes(8)).order(ByteOrder.BIG_ENDIAN);
- peek.getInt();
+ int w1 = peek.getInt();
int w2 = peek.getInt();
int length = (w2 & 0xffffff);
- return bb.remaining() >= 8 + length;
+ if (bb.remaining() < 8 + length)
+ return 0;
+ return 8 + length;
}
/**
diff --git a/AndroidAsync/src/com/koushikdutta/async/http/spdy/okio/Buffer.java b/AndroidAsync/src/com/koushikdutta/async/http/spdy/okio/Buffer.java
index 4ac22b1..bb6852d 100644
--- a/AndroidAsync/src/com/koushikdutta/async/http/spdy/okio/Buffer.java
+++ b/AndroidAsync/src/com/koushikdutta/async/http/spdy/okio/Buffer.java
@@ -45,8 +45,8 @@ import static com.koushikdutta.async.http.spdy.okio.Util.reverseBytesLong;
* This class avoids zero-fill and GC churn by pooling byte arrays.
*/
public final class Buffer implements BufferedSource, BufferedSink, Cloneable {
- Segment head;
- long size;
+ public Segment head;
+ public long size;
public Buffer() {
}
diff --git a/AndroidAsync/src/com/koushikdutta/async/http/spdy/okio/InflaterSource.java b/AndroidAsync/src/com/koushikdutta/async/http/spdy/okio/InflaterSource.java
index ce50796..76f7cc0 100644
--- a/AndroidAsync/src/com/koushikdutta/async/http/spdy/okio/InflaterSource.java
+++ b/AndroidAsync/src/com/koushikdutta/async/http/spdy/okio/InflaterSource.java
@@ -25,99 +25,99 @@ import java.util.zip.Inflater;
* to decompress data read from another source.
*/
public final class InflaterSource implements Source {
- private final BufferedSource source;
- private final Inflater inflater;
+ private final BufferedSource source;
+ private final Inflater inflater;
- /**
- * When we call Inflater.setInput(), the inflater keeps our byte array until
- * it needs input again. This tracks how many bytes the inflater is currently
- * holding on to.
- */
- private int bufferBytesHeldByInflater;
- private boolean closed;
+ /**
+ * When we call Inflater.setInput(), the inflater keeps our byte array until
+ * it needs input again. This tracks how many bytes the inflater is currently
+ * holding on to.
+ */
+ private int bufferBytesHeldByInflater;
+ private boolean closed;
- public InflaterSource(Source source, Inflater inflater) {
- this(Okio.buffer(source), inflater);
- }
+ public InflaterSource(Source source, Inflater inflater) {
+ this(Okio.buffer(source), inflater);
+ }
- /**
- * This package-private constructor shares a buffer with its trusted caller.
- * In general we can't share a BufferedSource because the inflater holds input
- * bytes until they are inflated.
- */
- InflaterSource(BufferedSource source, Inflater inflater) {
- if (source == null) throw new IllegalArgumentException("source == null");
- if (inflater == null) throw new IllegalArgumentException("inflater == null");
- this.source = source;
- this.inflater = inflater;
- }
+ /**
+ * This package-private constructor shares a buffer with its trusted caller.
+ * In general we can't share a BufferedSource because the inflater holds input
+ * bytes until they are inflated.
+ */
+ InflaterSource(BufferedSource source, Inflater inflater) {
+ if (source == null) throw new IllegalArgumentException("source == null");
+ if (inflater == null) throw new IllegalArgumentException("inflater == null");
+ this.source = source;
+ this.inflater = inflater;
+ }
- @Override public long read(
- Buffer sink, long byteCount) throws IOException {
- if (byteCount < 0) throw new IllegalArgumentException("byteCount < 0: " + byteCount);
- if (closed) throw new IllegalStateException("closed");
- if (byteCount == 0) return 0;
+ @Override public long read(
+ Buffer sink, long byteCount) throws IOException {
+ if (byteCount < 0) throw new IllegalArgumentException("byteCount < 0: " + byteCount);
+ if (closed) throw new IllegalStateException("closed");
+ if (byteCount == 0) return 0;
- while (true) {
- boolean sourceExhausted = refill();
+ while (true) {
+ boolean sourceExhausted = refill();
- // Decompress the inflater's compressed data into the sink.
- try {
- Segment tail = sink.writableSegment(1);
- int bytesInflated = inflater.inflate(tail.data, tail.limit, Segment.SIZE - tail.limit);
- if (bytesInflated > 0) {
- tail.limit += bytesInflated;
- sink.size += bytesInflated;
- return bytesInflated;
+ // Decompress the inflater's compressed data into the sink.
+ try {
+ Segment tail = sink.writableSegment(1);
+ int bytesInflated = inflater.inflate(tail.data, tail.limit, Segment.SIZE - tail.limit);
+ if (bytesInflated > 0) {
+ tail.limit += bytesInflated;
+ sink.size += bytesInflated;
+ return bytesInflated;
+ }
+ if (inflater.finished() || inflater.needsDictionary()) {
+ releaseInflatedBytes();
+ return -1;
+ }
+ if (sourceExhausted) throw new EOFException("source exhausted prematurely");
+ } catch (DataFormatException e) {
+ throw new IOException(e);
+ }
}
- if (inflater.finished() || inflater.needsDictionary()) {
- releaseInflatedBytes();
- return -1;
- }
- if (sourceExhausted) throw new EOFException("source exhausted prematurely");
- } catch (DataFormatException e) {
- throw new IOException(e);
- }
}
- }
- /**
- * Refills the inflater with compressed data if it needs input. (And only if
- * it needs input). Returns true if the inflater required input but the source
- * was exhausted.
- */
- public boolean refill() throws IOException {
- if (!inflater.needsInput()) return false;
+ /**
+ * Refills the inflater with compressed data if it needs input. (And only if
+ * it needs input). Returns true if the inflater required input but the source
+ * was exhausted.
+ */
+ public boolean refill() throws IOException {
+ if (!inflater.needsInput()) return false;
- releaseInflatedBytes();
- if (inflater.getRemaining() != 0) throw new IllegalStateException("?"); // TODO: possible?
+ releaseInflatedBytes();
+ if (inflater.getRemaining() != 0) throw new IllegalStateException("?"); // TODO: possible?
- // If there are compressed bytes in the source, assign them to the inflater.
- if (source.exhausted()) return true;
+ // If there are compressed bytes in the source, assign them to the inflater.
+ if (source.exhausted()) return true;
- // Assign buffer bytes to the inflater.
- byte[] data = source.readByteArray();
- bufferBytesHeldByInflater = data.length;
- inflater.setInput(data, 0, bufferBytesHeldByInflater);
- return false;
- }
+ // Assign buffer bytes to the inflater.
+ Segment head = source.buffer().head;
+ bufferBytesHeldByInflater = head.limit - head.pos;
+ inflater.setInput(head.data, head.pos, bufferBytesHeldByInflater);
+ return false;
+ }
- /** When the inflater has processed compressed data, remove it from the buffer. */
- private void releaseInflatedBytes() throws IOException {
- if (bufferBytesHeldByInflater == 0) return;
- int toRelease = bufferBytesHeldByInflater - inflater.getRemaining();
- bufferBytesHeldByInflater -= toRelease;
- source.skip(toRelease);
- }
+ /** When the inflater has processed compressed data, remove it from the buffer. */
+ private void releaseInflatedBytes() throws IOException {
+ if (bufferBytesHeldByInflater == 0) return;
+ int toRelease = bufferBytesHeldByInflater - inflater.getRemaining();
+ bufferBytesHeldByInflater -= toRelease;
+ source.skip(toRelease);
+ }
- @Override public Timeout timeout() {
- return source.timeout();
- }
+ @Override public Timeout timeout() {
+ return source.timeout();
+ }
- @Override public void close() throws IOException {
- if (closed) return;
- inflater.end();
- closed = true;
- source.close();
- }
+ @Override public void close() throws IOException {
+ if (closed) return;
+ inflater.end();
+ closed = true;
+ source.close();
+ }
}
diff --git a/AndroidAsync/src/com/koushikdutta/async/http/spdy/okio/Segment.java b/AndroidAsync/src/com/koushikdutta/async/http/spdy/okio/Segment.java
index 501343a..9c289ef 100644
--- a/AndroidAsync/src/com/koushikdutta/async/http/spdy/okio/Segment.java
+++ b/AndroidAsync/src/com/koushikdutta/async/http/spdy/okio/Segment.java
@@ -31,13 +31,13 @@ public final class Segment {
// TODO: Is 2 KiB a good default segment size?
static final int SIZE = 2048;
- final byte[] data = new byte[SIZE];
+ public final byte[] data = new byte[SIZE];
/** The next byte of application data byte to read in this segment. */
- int pos;
+ public int pos;
/** The first byte of available data ready to be written to. */
- int limit;
+ public int limit;
/** Next segment in a linked or circularly-linked list. */
Segment next;
diff --git a/AndroidAsync/src/com/koushikdutta/async/http/spdy/okio/SegmentPool.java b/AndroidAsync/src/com/koushikdutta/async/http/spdy/okio/SegmentPool.java
index f410c8c..58d362b 100644
--- a/AndroidAsync/src/com/koushikdutta/async/http/spdy/okio/SegmentPool.java
+++ b/AndroidAsync/src/com/koushikdutta/async/http/spdy/okio/SegmentPool.java
@@ -51,7 +51,7 @@ public final class SegmentPool {
return new Segment(); // Pool is empty. Don't zero-fill while holding a lock.
}
- void recycle(Segment segment) {
+ public void recycle(Segment segment) {
if (segment.next != null || segment.prev != null) throw new IllegalArgumentException();
synchronized (this) {
if (byteCount + Segment.SIZE > MAX_SIZE) return; // Pool is full.