aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorKoushik Dutta <koushd@gmail.com>2014-07-27 19:53:50 -0700
committerKoushik Dutta <koushd@gmail.com>2014-07-27 19:53:50 -0700
commit432c80258b0d164f291cbb30a08d994b11f77ce4 (patch)
tree98b10f7d2828edb4cff0e647b61b2ce240eabc65
parent9e24bdf9a04b2d664424255bfba28965598a69cc (diff)
downloadAndroidAsync-432c80258b0d164f291cbb30a08d994b11f77ce4.tar.gz
AndroidAsync-432c80258b0d164f291cbb30a08d994b11f77ce4.tar.bz2
AndroidAsync-432c80258b0d164f291cbb30a08d994b11f77ce4.zip
refactor w/out framereader okio
-rw-r--r--AndroidAsync/src/com/koushikdutta/async/PushParser.java3
-rw-r--r--AndroidAsync/src/com/koushikdutta/async/http/spdy/AsyncSpdyConnection.java60
-rw-r--r--AndroidAsync/src/com/koushikdutta/async/http/spdy/okhttp/internal/spdy/FrameReader.java223
-rw-r--r--AndroidAsync/src/com/koushikdutta/async/http/spdy/okhttp/internal/spdy/HeaderReader.java69
-rw-r--r--AndroidAsync/src/com/koushikdutta/async/http/spdy/okhttp/internal/spdy/Http20Draft13.java.ignore (renamed from AndroidAsync/src/com/koushikdutta/async/http/spdy/okhttp/internal/spdy/Http20Draft13.java)1
-rw-r--r--AndroidAsync/src/com/koushikdutta/async/http/spdy/okhttp/internal/spdy/NameValueBlockReader.java119
-rw-r--r--AndroidAsync/src/com/koushikdutta/async/http/spdy/okhttp/internal/spdy/Spdy3.java918
-rw-r--r--AndroidAsync/src/com/koushikdutta/async/http/spdy/okhttp/internal/spdy/Variant.java4
8 files changed, 697 insertions, 700 deletions
diff --git a/AndroidAsync/src/com/koushikdutta/async/PushParser.java b/AndroidAsync/src/com/koushikdutta/async/PushParser.java
index e02ac7f..51a79cc 100644
--- a/AndroidAsync/src/com/koushikdutta/async/PushParser.java
+++ b/AndroidAsync/src/com/koushikdutta/async/PushParser.java
@@ -237,8 +237,9 @@ public class PushParser implements DataCallback {
private ArrayList<Object> args = new ArrayList<Object>();
ByteOrder order = ByteOrder.BIG_ENDIAN;
- public void setOrder(ByteOrder order) {
+ public PushParser setOrder(ByteOrder order) {
this.order = order;
+ return this;
}
public PushParser(DataEmitter s) {
diff --git a/AndroidAsync/src/com/koushikdutta/async/http/spdy/AsyncSpdyConnection.java b/AndroidAsync/src/com/koushikdutta/async/http/spdy/AsyncSpdyConnection.java
index 7471615..d9feaf7 100644
--- a/AndroidAsync/src/com/koushikdutta/async/http/spdy/AsyncSpdyConnection.java
+++ b/AndroidAsync/src/com/koushikdutta/async/http/spdy/AsyncSpdyConnection.java
@@ -5,31 +5,25 @@ import com.koushikdutta.async.AsyncSocket;
import com.koushikdutta.async.BufferedDataEmitter;
import com.koushikdutta.async.BufferedDataSink;
import com.koushikdutta.async.ByteBufferList;
-import com.koushikdutta.async.DataEmitter;
import com.koushikdutta.async.Util;
import com.koushikdutta.async.callback.CompletedCallback;
import com.koushikdutta.async.callback.DataCallback;
import com.koushikdutta.async.callback.WritableCallback;
import com.koushikdutta.async.future.SimpleFuture;
-import com.koushikdutta.async.http.Headers;
import com.koushikdutta.async.http.Protocol;
import com.koushikdutta.async.http.spdy.okhttp.internal.spdy.ErrorCode;
import com.koushikdutta.async.http.spdy.okhttp.internal.spdy.FrameReader;
import com.koushikdutta.async.http.spdy.okhttp.internal.spdy.FrameWriter;
import com.koushikdutta.async.http.spdy.okhttp.internal.spdy.Header;
import com.koushikdutta.async.http.spdy.okhttp.internal.spdy.HeadersMode;
-import com.koushikdutta.async.http.spdy.okhttp.internal.spdy.Http20Draft13;
import com.koushikdutta.async.http.spdy.okhttp.internal.spdy.Ping;
import com.koushikdutta.async.http.spdy.okhttp.internal.spdy.Settings;
import com.koushikdutta.async.http.spdy.okhttp.internal.spdy.Spdy3;
import com.koushikdutta.async.http.spdy.okhttp.internal.spdy.Variant;
import com.koushikdutta.async.http.spdy.okio.BufferedSink;
-import com.koushikdutta.async.http.spdy.okio.BufferedSource;
import com.koushikdutta.async.http.spdy.okio.ByteString;
import com.koushikdutta.async.http.spdy.okio.Okio;
-import junit.framework.Assert;
-
import java.io.IOException;
import java.util.Hashtable;
import java.util.Iterator;
@@ -42,21 +36,17 @@ import static com.koushikdutta.async.http.spdy.okhttp.internal.spdy.Settings.DEF
* Created by koush on 7/16/14.
*/
public class AsyncSpdyConnection implements FrameReader.Handler {
- BufferedDataEmitter emitter;
AsyncSocket socket;
BufferedDataSink bufferedSocket;
FrameReader reader;
FrameWriter writer;
Variant variant;
-// SpdySocket zero = new SpdySocket(0, false, false, null);
- ByteBufferListSource source = new ByteBufferListSource();
ByteBufferListSink sink = new ByteBufferListSink() {
@Override
public void flush() throws IOException {
AsyncSpdyConnection.this.flush();
}
};
- BufferedSource bufferedSource;
BufferedSink bufferedSink;
Hashtable<Integer, SpdySocket> sockets = new Hashtable<Integer, SpdySocket>();
Protocol protocol;
@@ -286,16 +276,15 @@ public class AsyncSpdyConnection implements FrameReader.Handler {
this.protocol = protocol;
this.socket = socket;
this.bufferedSocket = new BufferedDataSink(socket);
- emitter = new BufferedDataEmitter(socket);
- emitter.setDataCallback(callback);
if (protocol == Protocol.SPDY_3) {
variant = new Spdy3();
}
else if (protocol == Protocol.HTTP_2) {
- variant = new Http20Draft13();
+ throw new AssertionError("http20draft13");
+// variant = new Http20Draft13();
}
- reader = variant.newReader(bufferedSource = Okio.buffer(source), true);
+ reader = variant.newReader(socket, this, true);
writer = variant.newWriter(bufferedSink = Okio.buffer(sink), true);
boolean client = true;
@@ -313,22 +302,6 @@ public class AsyncSpdyConnection implements FrameReader.Handler {
}
}
- DataCallback callback = new DataCallback() {
- @Override
- public void onDataAvailable(DataEmitter emitter, ByteBufferList bb) {
- int needed;
- while ((needed = reader.canProcessFrame(bb)) > 0) {
- bb.get(source, needed);
- try {
- reader.nextFrame(AsyncSpdyConnection.this);
- }
- catch (IOException e) {
- throw new AssertionError(e);
- }
- }
- }
- };
-
/**
* Sends a connection header if the current variant requires it. This should
* be called after {@link Builder#build} for all new connections.
@@ -348,7 +321,7 @@ public class AsyncSpdyConnection implements FrameReader.Handler {
}
@Override
- public void data(boolean inFinished, int streamId, BufferedSource source, int length) throws IOException {
+ public void data(boolean inFinished, int streamId, ByteBufferList source) {
if (pushedStream(streamId)) {
throw new AssertionError("push");
// pushDataLater(streamId, source, length, inFinished);
@@ -356,14 +329,17 @@ public class AsyncSpdyConnection implements FrameReader.Handler {
}
SpdySocket socket = sockets.get(streamId);
if (socket == null) {
- writer.rstStream(streamId, ErrorCode.INVALID_STREAM);
- source.skip(length);
+ try {
+ writer.rstStream(streamId, ErrorCode.INVALID_STREAM);
+ }
+ catch (IOException e) {
+ throw new AssertionError(e);
+ }
+ source.recycle();
return;
}
- if (source != this.bufferedSource || this.source.remaining() + source.buffer().size() != length)
- throw new AssertionError();
- source.buffer().readAll(socket.pending);
- this.source.get(socket.pending);
+ int length = source.remaining();
+ source.get(socket.pending);
socket.updateWindowRead(length);
Util.emitAllData(socket, socket.pending);
if (inFinished) {
@@ -562,4 +538,14 @@ public class AsyncSpdyConnection implements FrameReader.Handler {
@Override
public void alternateService(int streamId, String origin, ByteString protocol, String host, int port, long maxAge) {
}
+
+ @Override
+ public void error(Exception e) {
+ socket.close();
+ for (Iterator<Map.Entry<Integer, SpdySocket>> i = sockets.entrySet().iterator(); i.hasNext();) {
+ Map.Entry<Integer, SpdySocket> entry = i.next();
+ Util.end(entry.getValue(), e);
+ i.remove();
+ }
+ }
}
diff --git a/AndroidAsync/src/com/koushikdutta/async/http/spdy/okhttp/internal/spdy/FrameReader.java b/AndroidAsync/src/com/koushikdutta/async/http/spdy/okhttp/internal/spdy/FrameReader.java
index 19b6b77..3f457fb 100644
--- a/AndroidAsync/src/com/koushikdutta/async/http/spdy/okhttp/internal/spdy/FrameReader.java
+++ b/AndroidAsync/src/com/koushikdutta/async/http/spdy/okhttp/internal/spdy/FrameReader.java
@@ -17,124 +17,129 @@
package com.koushikdutta.async.http.spdy.okhttp.internal.spdy;
import com.koushikdutta.async.ByteBufferList;
-import com.koushikdutta.async.http.spdy.okio.BufferedSource;
import com.koushikdutta.async.http.spdy.okio.ByteString;
import java.io.Closeable;
import java.io.IOException;
import java.util.List;
-/** Reads transport frames for SPDY/3 or HTTP/2. */
+/**
+ * Reads transport frames for SPDY/3 or HTTP/2.
+ */
public interface FrameReader extends Closeable {
- int canProcessFrame(ByteBufferList bb);
- void readConnectionPreface() throws IOException;
- boolean nextFrame(Handler handler) throws IOException;
+ void readConnectionPreface() throws IOException;
+// boolean nextFrame(Handler handler) throws IOException;
- public interface Handler {
- void data(boolean inFinished, int streamId, BufferedSource source, int length)
- throws IOException;
+ public interface Handler {
+ void error(Exception e);
+
+ void data(boolean inFinished, int streamId, ByteBufferList bb);
+
+ /**
+ * Create or update incoming headers, creating the corresponding streams
+ * if necessary. Frames that trigger this are SPDY SYN_STREAM, HEADERS, and
+ * SYN_REPLY, and HTTP/2 HEADERS and PUSH_PROMISE.
+ *
+ * @param outFinished true if the receiver should not send further frames.
+ * @param inFinished true if the sender will not send further frames.
+ * @param streamId the stream owning these headers.
+ * @param associatedStreamId the stream that triggered the sender to create
+ * this stream.
+ */
+ void headers(boolean outFinished, boolean inFinished, int streamId, int associatedStreamId,
+ List<Header> headerBlock, HeadersMode headersMode);
+
+ void rstStream(int streamId, ErrorCode errorCode);
+
+ void settings(boolean clearPrevious, Settings settings);
+
+ /**
+ * HTTP/2 only.
+ */
+ void ackSettings();
+
+ /**
+ * Read a connection-level ping from the peer. {@code ack} indicates this
+ * is a reply. Payload parameters are different between SPDY/3 and HTTP/2.
+ * <p/>
+ * In SPDY/3, only the first {@code payload1} parameter is set. If the
+ * reader is a client, it is an unsigned even number. Likewise, a server
+ * will receive an odd number.
+ * <p/>
+ * In HTTP/2, both {@code payload1} and {@code payload2} parameters are
+ * set. The data is opaque binary, and there are no rules on the content.
+ */
+ void ping(boolean ack, int payload1, int payload2);
+
+ /**
+ * The peer tells us to stop creating streams. It is safe to replay
+ * streams with {@code ID > lastGoodStreamId} on a new connection. In-
+ * flight streams with {@code ID <= lastGoodStreamId} can only be replayed
+ * on a new connection if they are idempotent.
+ *
+ * @param lastGoodStreamId the last stream ID the peer processed before
+ * sending this message. If {@code lastGoodStreamId} is zero, the peer
+ * processed no frames.
+ * @param errorCode reason for closing the connection.
+ * @param debugData only valid for HTTP/2; opaque debug data to send.
+ */
+ void goAway(int lastGoodStreamId, ErrorCode errorCode, ByteString debugData);
+
+ /**
+ * Notifies that an additional {@code windowSizeIncrement} bytes can be
+ * sent on {@code streamId}, or the connection if {@code streamId} is zero.
+ */
+ void windowUpdate(int streamId, long windowSizeIncrement);
+
+ /**
+ * Called when reading a headers or priority frame. This may be used to
+ * change the stream's weight from the default (16) to a new value.
+ *
+ * @param streamId stream which has a priority change.
+ * @param streamDependency the stream ID this stream is dependent on.
+ * @param weight relative proportion of priority in [1..256].
+ * @param exclusive inserts this stream ID as the sole child of
+ * {@code streamDependency}.
+ */
+ void priority(int streamId, int streamDependency, int weight, boolean exclusive);
- /**
- * Create or update incoming headers, creating the corresponding streams
- * if necessary. Frames that trigger this are SPDY SYN_STREAM, HEADERS, and
- * SYN_REPLY, and HTTP/2 HEADERS and PUSH_PROMISE.
- *
- * @param outFinished true if the receiver should not send further frames.
- * @param inFinished true if the sender will not send further frames.
- * @param streamId the stream owning these headers.
- * @param associatedStreamId the stream that triggered the sender to create
- * this stream.
- */
- void headers(boolean outFinished, boolean inFinished, int streamId, int associatedStreamId,
- List<Header> headerBlock, HeadersMode headersMode);
- void rstStream(int streamId, ErrorCode errorCode);
- void settings(boolean clearPrevious, Settings settings);
-
- /** HTTP/2 only. */
- void ackSettings();
-
- /**
- * Read a connection-level ping from the peer. {@code ack} indicates this
- * is a reply. Payload parameters are different between SPDY/3 and HTTP/2.
- * <p>
- * In SPDY/3, only the first {@code payload1} parameter is set. If the
- * reader is a client, it is an unsigned even number. Likewise, a server
- * will receive an odd number.
- * <p>
- * In HTTP/2, both {@code payload1} and {@code payload2} parameters are
- * set. The data is opaque binary, and there are no rules on the content.
- */
- void ping(boolean ack, int payload1, int payload2);
-
- /**
- * The peer tells us to stop creating streams. It is safe to replay
- * streams with {@code ID > lastGoodStreamId} on a new connection. In-
- * flight streams with {@code ID <= lastGoodStreamId} can only be replayed
- * on a new connection if they are idempotent.
- *
- * @param lastGoodStreamId the last stream ID the peer processed before
- * sending this message. If {@code lastGoodStreamId} is zero, the peer
- * processed no frames.
- * @param errorCode reason for closing the connection.
- * @param debugData only valid for HTTP/2; opaque debug data to send.
- */
- void goAway(int lastGoodStreamId, ErrorCode errorCode, ByteString debugData);
-
- /**
- * Notifies that an additional {@code windowSizeIncrement} bytes can be
- * sent on {@code streamId}, or the connection if {@code streamId} is zero.
- */
- void windowUpdate(int streamId, long windowSizeIncrement);
-
- /**
- * Called when reading a headers or priority frame. This may be used to
- * change the stream's weight from the default (16) to a new value.
- *
- * @param streamId stream which has a priority change.
- * @param streamDependency the stream ID this stream is dependent on.
- * @param weight relative proportion of priority in [1..256].
- * @param exclusive inserts this stream ID as the sole child of
- * {@code streamDependency}.
- */
- void priority(int streamId, int streamDependency, int weight, boolean exclusive);
-
- /**
- * HTTP/2 only. Receive a push promise header block.
- * <p>
- * A push promise contains all the headers that pertain to a server-initiated
- * request, and a {@code promisedStreamId} to which response frames will be
- * delivered. Push promise frames are sent as a part of the response to
- * {@code streamId}.
- *
- * @param streamId client-initiated stream ID. Must be an odd number.
- * @param promisedStreamId server-initiated stream ID. Must be an even
- * number.
- * @param requestHeaders minimally includes {@code :method}, {@code :scheme},
- * {@code :authority}, and (@code :path}.
- */
- void pushPromise(int streamId, int promisedStreamId, List<Header> requestHeaders)
+ /**
+ * HTTP/2 only. Receive a push promise header block.
+ * <p/>
+ * A push promise contains all the headers that pertain to a server-initiated
+ * request, and a {@code promisedStreamId} to which response frames will be
+ * delivered. Push promise frames are sent as a part of the response to
+ * {@code streamId}.
+ *
+ * @param streamId client-initiated stream ID. Must be an odd number.
+ * @param promisedStreamId server-initiated stream ID. Must be an even
+ * number.
+ * @param requestHeaders minimally includes {@code :method}, {@code :scheme},
+ * {@code :authority}, and (@code :path}.
+ */
+ void pushPromise(int streamId, int promisedStreamId, List<Header> requestHeaders)
throws IOException;
- /**
- * HTTP/2 only. Expresses that resources for the connection or a client-
- * initiated stream are available from a different network location or
- * protocol configuration.
- *
- * <p>See <a href="https://tools.ietf.org/html/draft-ietf-httpbis-alt-svc-01">alt-svc</a>
- *
- * @param streamId when a client-initiated stream ID (odd number), the
- * origin of this alternate service is the origin of the stream. When
- * zero, the origin is specified in the {@code origin} parameter.
- * @param origin when present, the
- * <a href="http://tools.ietf.org/html/rfc6454">origin</a> is typically
- * represented as a combination of scheme, host and port. When empty,
- * the origin is that of the {@code streamId}.
- * @param protocol an ALPN protocol, such as {@code h2}.
- * @param host an IP address or hostname.
- * @param port the IP port associated with the service.
- * @param maxAge time in seconds that this alternative is considered fresh.
- */
- void alternateService(int streamId, String origin, ByteString protocol, String host, int port,
- long maxAge);
- }
+ /**
+ * HTTP/2 only. Expresses that resources for the connection or a client-
+ * initiated stream are available from a different network location or
+ * protocol configuration.
+ * <p/>
+ * <p>See <a href="https://tools.ietf.org/html/draft-ietf-httpbis-alt-svc-01">alt-svc</a>
+ *
+ * @param streamId when a client-initiated stream ID (odd number), the
+ * origin of this alternate service is the origin of the stream. When
+ * zero, the origin is specified in the {@code origin} parameter.
+ * @param origin when present, the
+ * <a href="http://tools.ietf.org/html/rfc6454">origin</a> is typically
+ * represented as a combination of scheme, host and port. When empty,
+ * the origin is that of the {@code streamId}.
+ * @param protocol an ALPN protocol, such as {@code h2}.
+ * @param host an IP address or hostname.
+ * @param port the IP port associated with the service.
+ * @param maxAge time in seconds that this alternative is considered fresh.
+ */
+ void alternateService(int streamId, String origin, ByteString protocol, String host, int port,
+ long maxAge);
+ }
}
diff --git a/AndroidAsync/src/com/koushikdutta/async/http/spdy/okhttp/internal/spdy/HeaderReader.java b/AndroidAsync/src/com/koushikdutta/async/http/spdy/okhttp/internal/spdy/HeaderReader.java
new file mode 100644
index 0000000..618f705
--- /dev/null
+++ b/AndroidAsync/src/com/koushikdutta/async/http/spdy/okhttp/internal/spdy/HeaderReader.java
@@ -0,0 +1,69 @@
+package com.koushikdutta.async.http.spdy.okhttp.internal.spdy;
+
+import com.koushikdutta.async.ByteBufferList;
+import com.koushikdutta.async.http.spdy.okio.ByteString;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.zip.DataFormatException;
+import java.util.zip.Inflater;
+
+/**
+ * Created by koush on 7/27/14.
+ */
+public class HeaderReader {
+ Inflater inflater;
+ public HeaderReader() {
+ inflater = new Inflater() {
+ @Override public int inflate(byte[] buffer, int offset, int count)
+ throws DataFormatException {
+ int result = super.inflate(buffer, offset, count);
+ if (result == 0 && needsDictionary()) {
+ setDictionary(Spdy3.DICTIONARY);
+ result = super.inflate(buffer, offset, count);
+ }
+ return result;
+ }
+ };
+ }
+
+ public List<Header> readHeader(ByteBufferList bb, int length) throws IOException {
+ byte[] bytes = new byte[length];
+ bb.get(bytes);
+
+ inflater.setInput(bytes);
+
+ ByteBufferList source = new ByteBufferList().order(ByteOrder.BIG_ENDIAN);
+ while (!inflater.needsInput()) {
+ ByteBuffer b = ByteBufferList.obtain(8192);
+ try {
+ int read = inflater.inflate(b.array());
+ b.limit(read);
+ source.add(b);
+ }
+ catch (DataFormatException e) {
+ throw new IOException(e);
+ }
+ }
+
+ int numberOfPairs = source.getInt();
+ List<Header> entries = new ArrayList<Header>(numberOfPairs);
+ for (int i = 0; i < numberOfPairs; i++) {
+ ByteString name = readByteString(source).toAsciiLowercase();
+ ByteString values = readByteString(source);
+ if (name.size() == 0) throw new IOException("name.size == 0");
+ entries.add(new Header(name, values));
+ }
+ return entries;
+ }
+
+ private static ByteString readByteString(ByteBufferList source) {
+ int length = source.getInt();
+ byte[] bytes = new byte[length];
+ source.get(bytes);
+ return ByteString.of(bytes);
+ }
+}
diff --git a/AndroidAsync/src/com/koushikdutta/async/http/spdy/okhttp/internal/spdy/Http20Draft13.java b/AndroidAsync/src/com/koushikdutta/async/http/spdy/okhttp/internal/spdy/Http20Draft13.java.ignore
index 1425cbb..c223197 100644
--- a/AndroidAsync/src/com/koushikdutta/async/http/spdy/okhttp/internal/spdy/Http20Draft13.java
+++ b/AndroidAsync/src/com/koushikdutta/async/http/spdy/okhttp/internal/spdy/Http20Draft13.java.ignore
@@ -38,6 +38,7 @@ import static java.util.logging.Level.FINE;
* Read and write HTTP/2 v13 frames.
* <p>http://tools.ietf.org/html/draft-ietf-httpbis-http2-13
*/
+
public final class Http20Draft13 implements Variant {
private static final Logger logger = Logger.getLogger(Http20Draft13.class.getName());
diff --git a/AndroidAsync/src/com/koushikdutta/async/http/spdy/okhttp/internal/spdy/NameValueBlockReader.java b/AndroidAsync/src/com/koushikdutta/async/http/spdy/okhttp/internal/spdy/NameValueBlockReader.java
deleted file mode 100644
index adc15f8..0000000
--- a/AndroidAsync/src/com/koushikdutta/async/http/spdy/okhttp/internal/spdy/NameValueBlockReader.java
+++ /dev/null
@@ -1,119 +0,0 @@
-/*
- * Copyright (C) 2013 Square, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.koushikdutta.async.http.spdy.okhttp.internal.spdy;
-
-import com.koushikdutta.async.http.spdy.okio.Buffer;
-import com.koushikdutta.async.http.spdy.okio.BufferedSource;
-import com.koushikdutta.async.http.spdy.okio.ByteString;
-import com.koushikdutta.async.http.spdy.okio.ForwardingSource;
-import com.koushikdutta.async.http.spdy.okio.InflaterSource;
-import com.koushikdutta.async.http.spdy.okio.Okio;
-import com.koushikdutta.async.http.spdy.okio.Source;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.zip.DataFormatException;
-import java.util.zip.Inflater;
-
-/**
- * Reads a SPDY/3 Name/Value header block. This class is made complicated by the
- * requirement that we're strict with which bytes we put in the compressed bytes
- * buffer. We need to put all compressed bytes into that buffer -- but no other
- * bytes.
- */
-class NameValueBlockReader {
- /** This source transforms compressed bytes into uncompressed bytes. */
- private final InflaterSource inflaterSource;
-
- /**
- * How many compressed bytes must be read into inflaterSource before
- * {@link #readNameValueBlock} returns.
- */
- private int compressedLimit;
-
- /** This source holds inflated bytes. */
- private final BufferedSource source;
-
- public NameValueBlockReader(BufferedSource source) {
- // Limit the inflater input stream to only those bytes in the Name/Value
- // block. We cut the inflater off at its source because we can't predict the
- // ratio of compressed bytes to uncompressed bytes.
- Source throttleSource = new ForwardingSource(source) {
- @Override public long read(Buffer sink, long byteCount) throws IOException {
- if (compressedLimit == 0) return -1; // Out of data for the current block.
- long read = super.read(sink, Math.min(byteCount, compressedLimit));
- if (read == -1) return -1;
- compressedLimit -= read;
- return read;
- }
- };
-
- // Subclass inflater to install a dictionary when it's needed.
- Inflater inflater = new Inflater() {
- @Override public int inflate(byte[] buffer, int offset, int count)
- throws DataFormatException {
- int result = super.inflate(buffer, offset, count);
- if (result == 0 && needsDictionary()) {
- setDictionary(Spdy3.DICTIONARY);
- result = super.inflate(buffer, offset, count);
- }
- return result;
- }
- };
-
- this.inflaterSource = new InflaterSource(throttleSource, inflater);
- this.source = Okio.buffer(inflaterSource);
- }
-
- public List<Header> readNameValueBlock(int length) throws IOException {
- this.compressedLimit += length;
-
- int numberOfPairs = source.readInt();
- if (numberOfPairs < 0) throw new IOException("numberOfPairs < 0: " + numberOfPairs);
- if (numberOfPairs > 1024) throw new IOException("numberOfPairs > 1024: " + numberOfPairs);
-
- List<Header> entries = new ArrayList<Header>(numberOfPairs);
- for (int i = 0; i < numberOfPairs; i++) {
- ByteString name = readByteString().toAsciiLowercase();
- ByteString values = readByteString();
- if (name.size() == 0) throw new IOException("name.size == 0");
- entries.add(new Header(name, values));
- }
-
- doneReading();
- return entries;
- }
-
- private ByteString readByteString() throws IOException {
- int length = source.readInt();
- return source.readByteString(length);
- }
-
- private void doneReading() throws IOException {
- // Move any outstanding unread bytes into the inflater. One side-effect of
- // deflate compression is that sometimes there are bytes remaining in the
- // stream after we've consumed all of the content.
- if (compressedLimit > 0) {
- inflaterSource.refill();
- if (compressedLimit != 0) throw new IOException("compressedLimit > 0: " + compressedLimit);
- }
- }
-
- public void close() throws IOException {
- source.close();
- }
-}
diff --git a/AndroidAsync/src/com/koushikdutta/async/http/spdy/okhttp/internal/spdy/Spdy3.java b/AndroidAsync/src/com/koushikdutta/async/http/spdy/okhttp/internal/spdy/Spdy3.java
index e38f3f0..2868ff2 100644
--- a/AndroidAsync/src/com/koushikdutta/async/http/spdy/okhttp/internal/spdy/Spdy3.java
+++ b/AndroidAsync/src/com/koushikdutta/async/http/spdy/okhttp/internal/spdy/Spdy3.java
@@ -16,11 +16,13 @@
package com.koushikdutta.async.http.spdy.okhttp.internal.spdy;
import com.koushikdutta.async.ByteBufferList;
+import com.koushikdutta.async.DataEmitter;
+import com.koushikdutta.async.DataEmitterReader;
+import com.koushikdutta.async.callback.DataCallback;
import com.koushikdutta.async.http.Protocol;
import com.koushikdutta.async.http.spdy.okhttp.internal.Util;
import com.koushikdutta.async.http.spdy.okio.Buffer;
import com.koushikdutta.async.http.spdy.okio.BufferedSink;
-import com.koushikdutta.async.http.spdy.okio.BufferedSource;
import com.koushikdutta.async.http.spdy.okio.ByteString;
import com.koushikdutta.async.http.spdy.okio.DeflaterSink;
import com.koushikdutta.async.http.spdy.okio.Okio;
@@ -29,7 +31,6 @@ import com.koushikdutta.async.util.Charsets;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.net.ProtocolException;
-import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.List;
import java.util.zip.Deflater;
@@ -41,477 +42,530 @@ import java.util.zip.Deflater;
*/
public final class Spdy3 implements Variant {
- @Override public Protocol getProtocol() {
- return Protocol.SPDY_3;
- }
-
- static final int TYPE_DATA = 0x0;
- static final int TYPE_SYN_STREAM = 0x1;
- static final int TYPE_SYN_REPLY = 0x2;
- static final int TYPE_RST_STREAM = 0x3;
- static final int TYPE_SETTINGS = 0x4;
- static final int TYPE_PING = 0x6;
- static final int TYPE_GOAWAY = 0x7;
- static final int TYPE_HEADERS = 0x8;
- static final int TYPE_WINDOW_UPDATE = 0x9;
-
- static final int FLAG_FIN = 0x1;
- static final int FLAG_UNIDIRECTIONAL = 0x2;
-
- static final int VERSION = 3;
-
- static final byte[] DICTIONARY;
- static {
- try {
- DICTIONARY = ("\u0000\u0000\u0000\u0007options\u0000\u0000\u0000\u0004hea"
- + "d\u0000\u0000\u0000\u0004post\u0000\u0000\u0000\u0003put\u0000\u0000\u0000\u0006dele"
- + "te\u0000\u0000\u0000\u0005trace\u0000\u0000\u0000\u0006accept\u0000\u0000\u0000"
- + "\u000Eaccept-charset\u0000\u0000\u0000\u000Faccept-encoding\u0000\u0000\u0000\u000Fa"
- + "ccept-language\u0000\u0000\u0000\raccept-ranges\u0000\u0000\u0000\u0003age\u0000"
- + "\u0000\u0000\u0005allow\u0000\u0000\u0000\rauthorization\u0000\u0000\u0000\rcache-co"
- + "ntrol\u0000\u0000\u0000\nconnection\u0000\u0000\u0000\fcontent-base\u0000\u0000"
- + "\u0000\u0010content-encoding\u0000\u0000\u0000\u0010content-language\u0000\u0000"
- + "\u0000\u000Econtent-length\u0000\u0000\u0000\u0010content-location\u0000\u0000\u0000"
- + "\u000Bcontent-md5\u0000\u0000\u0000\rcontent-range\u0000\u0000\u0000\fcontent-type"
- + "\u0000\u0000\u0000\u0004date\u0000\u0000\u0000\u0004etag\u0000\u0000\u0000\u0006expe"
- + "ct\u0000\u0000\u0000\u0007expires\u0000\u0000\u0000\u0004from\u0000\u0000\u0000"
- + "\u0004host\u0000\u0000\u0000\bif-match\u0000\u0000\u0000\u0011if-modified-since"
- + "\u0000\u0000\u0000\rif-none-match\u0000\u0000\u0000\bif-range\u0000\u0000\u0000"
- + "\u0013if-unmodified-since\u0000\u0000\u0000\rlast-modified\u0000\u0000\u0000\blocati"
- + "on\u0000\u0000\u0000\fmax-forwards\u0000\u0000\u0000\u0006pragma\u0000\u0000\u0000"
- + "\u0012proxy-authenticate\u0000\u0000\u0000\u0013proxy-authorization\u0000\u0000"
- + "\u0000\u0005range\u0000\u0000\u0000\u0007referer\u0000\u0000\u0000\u000Bretry-after"
- + "\u0000\u0000\u0000\u0006server\u0000\u0000\u0000\u0002te\u0000\u0000\u0000\u0007trai"
- + "ler\u0000\u0000\u0000\u0011transfer-encoding\u0000\u0000\u0000\u0007upgrade\u0000"
- + "\u0000\u0000\nuser-agent\u0000\u0000\u0000\u0004vary\u0000\u0000\u0000\u0003via"
- + "\u0000\u0000\u0000\u0007warning\u0000\u0000\u0000\u0010www-authenticate\u0000\u0000"
- + "\u0000\u0006method\u0000\u0000\u0000\u0003get\u0000\u0000\u0000\u0006status\u0000"
- + "\u0000\u0000\u0006200 OK\u0000\u0000\u0000\u0007version\u0000\u0000\u0000\bHTTP/1.1"
- + "\u0000\u0000\u0000\u0003url\u0000\u0000\u0000\u0006public\u0000\u0000\u0000\nset-coo"
- + "kie\u0000\u0000\u0000\nkeep-alive\u0000\u0000\u0000\u0006origin100101201202205206300"
- + "302303304305306307402405406407408409410411412413414415416417502504505203 Non-Authori"
- + "tative Information204 No Content301 Moved Permanently400 Bad Request401 Unauthorized"
- + "403 Forbidden404 Not Found500 Internal Server Error501 Not Implemented503 Service Un"
- + "availableJan Feb Mar Apr May Jun Jul Aug Sept Oct Nov Dec 00:00:00 Mon, Tue, Wed, Th"
- + "u, Fri, Sat, Sun, GMTchunked,text/html,image/png,image/jpg,image/gif,application/xml"
- + ",application/xhtml+xml,text/plain,text/javascript,publicprivatemax-age=gzip,deflate,"
- + "sdchcharset=utf-8charset=iso-8859-1,utf-,*,enq=0.").getBytes(Charsets.UTF_8.name());
- } catch (UnsupportedEncodingException e) {
- throw new AssertionError();
- }
- }
-
- @Override public FrameReader newReader(BufferedSource source, boolean client) {
- return new Reader(source, client);
- }
-
- @Override public FrameWriter newWriter(BufferedSink sink, boolean client) {
- return new Writer(sink, client);
- }
-
- @Override public int maxFrameSize() {
- return 16383;
- }
-
- /** Read spdy/3 frames. */
- static final class Reader implements FrameReader {
- private final BufferedSource source;
- private final boolean client;
- private final NameValueBlockReader headerBlockReader;
-
- Reader(BufferedSource source, boolean client) {
- this.source = source;
- this.headerBlockReader = new NameValueBlockReader(this.source);
- this.client = client;
+ @Override
+ public Protocol getProtocol() {
+ return Protocol.SPDY_3;
+ }
+
+ static final int TYPE_DATA = 0x0;
+ static final int TYPE_SYN_STREAM = 0x1;
+ static final int TYPE_SYN_REPLY = 0x2;
+ static final int TYPE_RST_STREAM = 0x3;
+ static final int TYPE_SETTINGS = 0x4;
+ static final int TYPE_PING = 0x6;
+ static final int TYPE_GOAWAY = 0x7;
+ static final int TYPE_HEADERS = 0x8;
+ static final int TYPE_WINDOW_UPDATE = 0x9;
+
+ static final int FLAG_FIN = 0x1;
+ static final int FLAG_UNIDIRECTIONAL = 0x2;
+
+ static final int VERSION = 3;
+
+ static final byte[] DICTIONARY;
+
+ static {
+ try {
+ DICTIONARY = ("\u0000\u0000\u0000\u0007options\u0000\u0000\u0000\u0004hea"
+ + "d\u0000\u0000\u0000\u0004post\u0000\u0000\u0000\u0003put\u0000\u0000\u0000\u0006dele"
+ + "te\u0000\u0000\u0000\u0005trace\u0000\u0000\u0000\u0006accept\u0000\u0000\u0000"
+ + "\u000Eaccept-charset\u0000\u0000\u0000\u000Faccept-encoding\u0000\u0000\u0000\u000Fa"
+ + "ccept-language\u0000\u0000\u0000\raccept-ranges\u0000\u0000\u0000\u0003age\u0000"
+ + "\u0000\u0000\u0005allow\u0000\u0000\u0000\rauthorization\u0000\u0000\u0000\rcache-co"
+ + "ntrol\u0000\u0000\u0000\nconnection\u0000\u0000\u0000\fcontent-base\u0000\u0000"
+ + "\u0000\u0010content-encoding\u0000\u0000\u0000\u0010content-language\u0000\u0000"
+ + "\u0000\u000Econtent-length\u0000\u0000\u0000\u0010content-location\u0000\u0000\u0000"
+ + "\u000Bcontent-md5\u0000\u0000\u0000\rcontent-range\u0000\u0000\u0000\fcontent-type"
+ + "\u0000\u0000\u0000\u0004date\u0000\u0000\u0000\u0004etag\u0000\u0000\u0000\u0006expe"
+ + "ct\u0000\u0000\u0000\u0007expires\u0000\u0000\u0000\u0004from\u0000\u0000\u0000"
+ + "\u0004host\u0000\u0000\u0000\bif-match\u0000\u0000\u0000\u0011if-modified-since"
+ + "\u0000\u0000\u0000\rif-none-match\u0000\u0000\u0000\bif-range\u0000\u0000\u0000"
+ + "\u0013if-unmodified-since\u0000\u0000\u0000\rlast-modified\u0000\u0000\u0000\blocati"
+ + "on\u0000\u0000\u0000\fmax-forwards\u0000\u0000\u0000\u0006pragma\u0000\u0000\u0000"
+ + "\u0012proxy-authenticate\u0000\u0000\u0000\u0013proxy-authorization\u0000\u0000"
+ + "\u0000\u0005range\u0000\u0000\u0000\u0007referer\u0000\u0000\u0000\u000Bretry-after"
+ + "\u0000\u0000\u0000\u0006server\u0000\u0000\u0000\u0002te\u0000\u0000\u0000\u0007trai"
+ + "ler\u0000\u0000\u0000\u0011transfer-encoding\u0000\u0000\u0000\u0007upgrade\u0000"
+ + "\u0000\u0000\nuser-agent\u0000\u0000\u0000\u0004vary\u0000\u0000\u0000\u0003via"
+ + "\u0000\u0000\u0000\u0007warning\u0000\u0000\u0000\u0010www-authenticate\u0000\u0000"
+ + "\u0000\u0006method\u0000\u0000\u0000\u0003get\u0000\u0000\u0000\u0006status\u0000"
+ + "\u0000\u0000\u0006200 OK\u0000\u0000\u0000\u0007version\u0000\u0000\u0000\bHTTP/1.1"
+ + "\u0000\u0000\u0000\u0003url\u0000\u0000\u0000\u0006public\u0000\u0000\u0000\nset-coo"
+ + "kie\u0000\u0000\u0000\nkeep-alive\u0000\u0000\u0000\u0006origin100101201202205206300"
+ + "302303304305306307402405406407408409410411412413414415416417502504505203 Non-Authori"
+ + "tative Information204 No Content301 Moved Permanently400 Bad Request401 Unauthorized"
+ + "403 Forbidden404 Not Found500 Internal Server Error501 Not Implemented503 Service Un"
+ + "availableJan Feb Mar Apr May Jun Jul Aug Sept Oct Nov Dec 00:00:00 Mon, Tue, Wed, Th"
+ + "u, Fri, Sat, Sun, GMTchunked,text/html,image/png,image/jpg,image/gif,application/xml"
+ + ",application/xhtml+xml,text/plain,text/javascript,publicprivatemax-age=gzip,deflate,"
+ + "sdchcharset=utf-8charset=iso-8859-1,utf-,*,enq=0.").getBytes(Charsets.UTF_8.name());
+ } catch (UnsupportedEncodingException e) {
+ throw new AssertionError();
+ }
}
- @Override public void readConnectionPreface() {
+ @Override
+ public FrameReader newReader(DataEmitter source, FrameReader.Handler handler, boolean client) {
+ return new Reader(source, handler, client);
}
- @Override
- public int canProcessFrame(ByteBufferList bb) {
- if (source.buffer().size() + bb.remaining() < 8)
- return 0;
- ByteBuffer peek = ByteBuffer.wrap(bb.peekBytes(8)).order(ByteOrder.BIG_ENDIAN);
- int w1 = peek.getInt();
- int w2 = peek.getInt();
-
- int length = (w2 & 0xffffff);
- if (bb.remaining() < 8 + length)
- return 0;
- return 8 + length;
- }
-
- /**
- * Send the next frame to {@code handler}. Returns true unless there are no
- * more frames on the stream.
- */
- @Override public boolean nextFrame(Handler handler) throws IOException {
- int w1;
- int w2;
- try {
- w1 = source.readInt();
- w2 = source.readInt();
- } catch (IOException e) {
- return false; // This might be a normal socket close.
- }
-
- boolean control = (w1 & 0x80000000) != 0;
- int flags = (w2 & 0xff000000) >>> 24;
- int length = (w2 & 0xffffff);
-
- if (control) {
- int version = (w1 & 0x7fff0000) >>> 16;
- int type = (w1 & 0xffff);
-
- if (version != 3) {
- throw new ProtocolException("version != 3: " + version);
- }
-
- switch (type) {
- case TYPE_SYN_STREAM:
- readSynStream(handler, flags, length);
- return true;
-
- case TYPE_SYN_REPLY:
- readSynReply(handler, flags, length);
- return true;
-
- case TYPE_RST_STREAM:
- readRstStream(handler, flags, length);
- return true;
-
- case TYPE_SETTINGS:
- readSettings(handler, flags, length);
- return true;
-
- case TYPE_PING:
- readPing(handler, flags, length);
- return true;
-
- case TYPE_GOAWAY:
- readGoAway(handler, flags, length);
- return true;
-
- case TYPE_HEADERS:
- readHeaders(handler, flags, length);
- return true;
-
- case TYPE_WINDOW_UPDATE:
- readWindowUpdate(handler, flags, length);
- return true;
-
- default:
- source.skip(length);
- return true;
- }
- } else {
- int streamId = w1 & 0x7fffffff;
- boolean inFinished = (flags & FLAG_FIN) != 0;
- handler.data(inFinished, streamId, source, length);
- return true;
- }
+ @Override
+ public FrameWriter newWriter(BufferedSink sink, boolean client) {
+ return new Writer(sink, client);
}
- private void readSynStream(Handler handler, int flags, int length) throws IOException {
- int w1 = source.readInt();
- int w2 = source.readInt();
- int streamId = w1 & 0x7fffffff;
- int associatedStreamId = w2 & 0x7fffffff;
- source.readShort(); // int priority = (s3 & 0xe000) >>> 13; int slot = s3 & 0xff;
- List<Header> headerBlock = headerBlockReader.readNameValueBlock(length - 10);
-
- boolean inFinished = (flags & FLAG_FIN) != 0;
- boolean outFinished = (flags & FLAG_UNIDIRECTIONAL) != 0;
- handler.headers(outFinished, inFinished, streamId, associatedStreamId, headerBlock,
- HeadersMode.SPDY_SYN_STREAM);
+ @Override
+ public int maxFrameSize() {
+ return 16383;
}
- private void readSynReply(Handler handler, int flags, int length) throws IOException {
- int w1 = source.readInt();
- int streamId = w1 & 0x7fffffff;
- List<Header> headerBlock = headerBlockReader.readNameValueBlock(length - 4);
- boolean inFinished = (flags & FLAG_FIN) != 0;
- handler.headers(false, inFinished, streamId, -1, headerBlock, HeadersMode.SPDY_REPLY);
- }
+ /**
+ * Read spdy/3 frames.
+ */
+ static final class Reader implements FrameReader {
+ private final HeaderReader headerReader = new HeaderReader();
+ private final DataEmitter emitter;
+ private final boolean client;
+ private final Handler handler;
+ private final DataEmitterReader reader;
+
+ Reader(DataEmitter emitter, Handler handler, boolean client) {
+ this.emitter = emitter;
+ this.handler = handler;
+ this.client = client;
+
+ reader = new DataEmitterReader();
+ parseFrameHeader();
+ }
- private void readRstStream(Handler handler, int flags, int length) throws IOException {
- if (length != 8) throw ioException("TYPE_RST_STREAM length: %d != 8", length);
- int streamId = source.readInt() & 0x7fffffff;
- int errorCodeInt = source.readInt();
- ErrorCode errorCode = ErrorCode.fromSpdy3Rst(errorCodeInt);
- if (errorCode == null) {
- throw ioException("TYPE_RST_STREAM unexpected error code: %d", errorCodeInt);
- }
- handler.rstStream(streamId, errorCode);
- }
+ private void parseFrameHeader() {
+ emitter.setDataCallback(reader);
+ reader.read(8, onFrame);
+ }
- private void readHeaders(Handler handler, int flags, int length) throws IOException {
- int w1 = source.readInt();
- int streamId = w1 & 0x7fffffff;
- List<Header> headerBlock = headerBlockReader.readNameValueBlock(length - 4);
- handler.headers(false, false, streamId, -1, headerBlock, HeadersMode.SPDY_HEADERS);
- }
+ int w1;
+ int w2;
+ int flags;
+ int length;
+ int streamId;
+ boolean inFinished;
+ private final DataCallback onFrame = new DataCallback() {
+ @Override
+ public void onDataAvailable(DataEmitter emitter, ByteBufferList bb) {
+ bb.order(ByteOrder.BIG_ENDIAN);
+ w1 = bb.getInt();
+ w2 = bb.getInt();
+
+ boolean control = (w1 & 0x80000000) != 0;
+ flags = (w2 & 0xff000000) >>> 24;
+ length = (w2 & 0xffffff);
+
+ if (!control) {
+ streamId = w1 & 0x7fffffff;
+ inFinished = (flags & FLAG_FIN) != 0;
+ emitter.setDataCallback(onDataFrame);
+ }
+ else {
+ reader.read(length, onFullFrame);
+ }
+ }
+ };
+
+ private final DataCallback onDataFrame = new DataCallback() {
+ @Override
+ public void onDataAvailable(DataEmitter emitter, ByteBufferList bb) {
+ int toRead = Math.min(bb.remaining(), length);
+ if (toRead < bb.remaining()) {
+ ByteBufferList partial = new ByteBufferList();
+ bb.get(partial, toRead);
+ bb = partial;
+ }
+
+ length -= toRead;
+ handler.data(length == 0 && inFinished, streamId, bb);
+
+ if (length == 0)
+ parseFrameHeader();
+ }
+ };
+
+ private final DataCallback onFullFrame = new DataCallback() {
+ @Override
+ public void onDataAvailable(DataEmitter emitter, ByteBufferList bb) {
+ // queue up the next frame read
+ bb.order(ByteOrder.BIG_ENDIAN);
+
+ int version = (w1 & 0x7fff0000) >>> 16;
+ int type = (w1 & 0xffff);
+
+ try {
+ if (version != 3) {
+ throw new ProtocolException("version != 3: " + version);
+ }
+
+ switch (type) {
+ case TYPE_SYN_STREAM:
+ readSynStream(bb, flags, length);
+ break;
+
+ case TYPE_SYN_REPLY:
+ readSynReply(bb, flags, length);
+ break;
+
+ case TYPE_RST_STREAM:
+ readRstStream(bb, flags, length);
+ break;
+
+ case TYPE_SETTINGS:
+ readSettings(bb, flags, length);
+ break;
+
+ case TYPE_PING:
+ readPing(bb, flags, length);
+ break;
+
+ case TYPE_GOAWAY:
+ readGoAway(bb, flags, length);
+ break;
+
+ case TYPE_HEADERS:
+ readHeaders(bb, flags, length);
+ break;
+
+ case TYPE_WINDOW_UPDATE:
+ readWindowUpdate(bb, flags, length);
+ break;
+
+ default:
+ bb.recycle();
+ break;
+ }
+ parseFrameHeader();
+ }
+ catch (IOException e) {
+ handler.error(e);
+ }
+ }
+ };
+
+ @Override
+ public void readConnectionPreface() {
+ }
+ private void readSynStream(ByteBufferList source, int flags, int length) throws IOException {
+ int w1 = source.getInt();
+ int w2 = source.getInt();
+ int streamId = w1 & 0x7fffffff;
+ int associatedStreamId = w2 & 0x7fffffff;
+ source.getShort(); // int priority = (s3 & 0xe000) >>> 13; int slot = s3 & 0xff;
+ List<Header> headerBlock = headerReader.readHeader(source, length - 10);
+
+ boolean inFinished = (flags & FLAG_FIN) != 0;
+ boolean outFinished = (flags & FLAG_UNIDIRECTIONAL) != 0;
+ handler.headers(outFinished, inFinished, streamId, associatedStreamId, headerBlock,
+ HeadersMode.SPDY_SYN_STREAM);
+ }
- private void readWindowUpdate(Handler handler, int flags, int length) throws IOException {
- if (length != 8) throw ioException("TYPE_WINDOW_UPDATE length: %d != 8", length);
- int w1 = source.readInt();
- int w2 = source.readInt();
- int streamId = w1 & 0x7fffffff;
- long increment = w2 & 0x7fffffff;
- if (increment == 0) throw ioException("windowSizeIncrement was 0", increment);
- handler.windowUpdate(streamId, increment);
- }
+ private void readSynReply(ByteBufferList source, int flags, int length) throws IOException {
+ int w1 = source.getInt();
+ int streamId = w1 & 0x7fffffff;
+ List<Header> headerBlock = headerReader.readHeader(source, length - 4);
+ boolean inFinished = (flags & FLAG_FIN) != 0;
+ handler.headers(false, inFinished, streamId, -1, headerBlock, HeadersMode.SPDY_REPLY);
+ }
- private void readPing(Handler handler, int flags, int length) throws IOException {
- if (length != 4) throw ioException("TYPE_PING length: %d != 4", length);
- int id = source.readInt();
- boolean ack = client == ((id & 1) == 1);
- handler.ping(ack, id, 0);
- }
+ private void readRstStream(ByteBufferList source, int flags, int length) throws IOException {
+ if (length != 8) throw ioException("TYPE_RST_STREAM length: %d != 8", length);
+ int streamId = source.getInt() & 0x7fffffff;
+ int errorCodeInt = source.getInt();
+ ErrorCode errorCode = ErrorCode.fromSpdy3Rst(errorCodeInt);
+ if (errorCode == null) {
+ throw ioException("TYPE_RST_STREAM unexpected error code: %d", errorCodeInt);
+ }
+ handler.rstStream(streamId, errorCode);
+ }
- private void readGoAway(Handler handler, int flags, int length) throws IOException {
- if (length != 8) throw ioException("TYPE_GOAWAY length: %d != 8", length);
- int lastGoodStreamId = source.readInt() & 0x7fffffff;
- int errorCodeInt = source.readInt();
- ErrorCode errorCode = ErrorCode.fromSpdyGoAway(errorCodeInt);
- if (errorCode == null) {
- throw ioException("TYPE_GOAWAY unexpected error code: %d", errorCodeInt);
- }
- handler.goAway(lastGoodStreamId, errorCode, ByteString.EMPTY);
- }
+ private void readHeaders(ByteBufferList source, int flags, int length) throws IOException {
+ int w1 = source.getInt();
+ int streamId = w1 & 0x7fffffff;
+ List<Header> headerBlock = headerReader.readHeader(source, length - 4);
+ handler.headers(false, false, streamId, -1, headerBlock, HeadersMode.SPDY_HEADERS);
+ }
- private void readSettings(Handler handler, int flags, int length) throws IOException {
- int numberOfEntries = source.readInt();
- if (length != 4 + 8 * numberOfEntries) {
- throw ioException("TYPE_SETTINGS length: %d != 4 + 8 * %d", length, numberOfEntries);
- }
- Settings settings = new Settings();
- for (int i = 0; i < numberOfEntries; i++) {
- int w1 = source.readInt();
- int value = source.readInt();
- int idFlags = (w1 & 0xff000000) >>> 24;
- int id = w1 & 0xffffff;
- settings.set(id, idFlags, value);
- }
- boolean clearPrevious = (flags & Settings.FLAG_CLEAR_PREVIOUSLY_PERSISTED_SETTINGS) != 0;
- handler.settings(clearPrevious, settings);
- }
+ private void readWindowUpdate(ByteBufferList source, int flags, int length) throws IOException {
+ if (length != 8) throw ioException("TYPE_WINDOW_UPDATE length: %d != 8", length);
+ int w1 = source.getInt();
+ int w2 = source.getInt();
+ int streamId = w1 & 0x7fffffff;
+ long increment = w2 & 0x7fffffff;
+ if (increment == 0) throw ioException("windowSizeIncrement was 0", increment);
+ handler.windowUpdate(streamId, increment);
+ }
- private static IOException ioException(String message, Object... args) throws IOException {
- throw new IOException(String.format(message, args));
- }
+ private void readPing(ByteBufferList source, int flags, int length) throws IOException {
+ if (length != 4) throw ioException("TYPE_PING length: %d != 4", length);
+ int id = source.getInt();
+ boolean ack = client == ((id & 1) == 1);
+ handler.ping(ack, id, 0);
+ }
- @Override public void close() throws IOException {
- headerBlockReader.close();
- }
- }
-
- /** Write spdy/3 frames. */
- static final class Writer implements FrameWriter {
- private final BufferedSink sink;
- private final Buffer headerBlockBuffer;
- private final BufferedSink headerBlockOut;
- private final boolean client;
- private boolean closed;
-
- Writer(BufferedSink sink, boolean client) {
- this.sink = sink;
- this.client = client;
-
- Deflater deflater = new Deflater();
- deflater.setDictionary(DICTIONARY);
- headerBlockBuffer = new Buffer();
- headerBlockOut = Okio.buffer(new DeflaterSink(headerBlockBuffer, deflater));
- }
+ private void readGoAway(ByteBufferList source, int flags, int length) throws IOException {
+ if (length != 8) throw ioException("TYPE_GOAWAY length: %d != 8", length);
+ int lastGoodStreamId = source.getInt() & 0x7fffffff;
+ int errorCodeInt = source.getInt();
+ ErrorCode errorCode = ErrorCode.fromSpdyGoAway(errorCodeInt);
+ if (errorCode == null) {
+ throw ioException("TYPE_GOAWAY unexpected error code: %d", errorCodeInt);
+ }
+ handler.goAway(lastGoodStreamId, errorCode, ByteString.EMPTY);
+ }
+
+ private void readSettings(ByteBufferList source, int flags, int length) throws IOException {
+ int numberOfEntries = source.getInt();
+ if (length != 4 + 8 * numberOfEntries) {
+ throw ioException("TYPE_SETTINGS length: %d != 4 + 8 * %d", length, numberOfEntries);
+ }
+ Settings settings = new Settings();
+ for (int i = 0; i < numberOfEntries; i++) {
+ int w1 = source.getInt();
+ int value = source.getInt();
+ int idFlags = (w1 & 0xff000000) >>> 24;
+ int id = w1 & 0xffffff;
+ settings.set(id, idFlags, value);
+ }
+ boolean clearPrevious = (flags & Settings.FLAG_CLEAR_PREVIOUSLY_PERSISTED_SETTINGS) != 0;
+ handler.settings(clearPrevious, settings);
+ }
+
+ private static IOException ioException(String message, Object... args) throws IOException {
+ throw new IOException(String.format(message, args));
+ }
- @Override public void ackSettings() {
- // Do nothing: no ACK for SPDY/3 settings.
+ @Override
+ public void close() throws IOException {
+ }
}
- @Override
- public void pushPromise(int streamId, int promisedStreamId, List<Header> requestHeaders)
+ /**
+ * Write spdy/3 frames.
+ */
+ static final class Writer implements FrameWriter {
+ private final BufferedSink sink;
+ private final Buffer headerBlockBuffer;
+ private final BufferedSink headerBlockOut;
+ private final boolean client;
+ private boolean closed;
+
+ Writer(BufferedSink sink, boolean client) {
+ this.sink = sink;
+ this.client = client;
+
+ Deflater deflater = new Deflater();
+ deflater.setDictionary(DICTIONARY);
+ headerBlockBuffer = new Buffer();
+ headerBlockOut = Okio.buffer(new DeflaterSink(headerBlockBuffer, deflater));
+ }
+
+ @Override
+ public void ackSettings() {
+ // Do nothing: no ACK for SPDY/3 settings.
+ }
+
+ @Override
+ public void pushPromise(int streamId, int promisedStreamId, List<Header> requestHeaders)
throws IOException {
- // Do nothing: no push promise for SPDY/3.
- }
+ // Do nothing: no push promise for SPDY/3.
+ }
- @Override public synchronized void connectionPreface() {
- // Do nothing: no connection preface for SPDY/3.
- }
+ @Override
+ public synchronized void connectionPreface() {
+ // Do nothing: no connection preface for SPDY/3.
+ }
- @Override public synchronized void flush() throws IOException {
- if (closed) throw new IOException("closed");
- sink.flush();
- }
+ @Override
+ public synchronized void flush() throws IOException {
+ if (closed) throw new IOException("closed");
+ sink.flush();
+ }
- @Override public synchronized void synStream(boolean outFinished, boolean inFinished,
- int streamId, int associatedStreamId, List<Header> headerBlock)
+ @Override
+ public synchronized void synStream(boolean outFinished, boolean inFinished,
+ int streamId, int associatedStreamId, List<Header> headerBlock)
throws IOException {
- if (closed) throw new IOException("closed");
- writeNameValueBlockToBuffer(headerBlock);
- int length = (int) (10 + headerBlockBuffer.size());
- int type = TYPE_SYN_STREAM;
- int flags = (outFinished ? FLAG_FIN : 0) | (inFinished ? FLAG_UNIDIRECTIONAL : 0);
-
- int unused = 0;
- sink.writeInt(0x80000000 | (VERSION & 0x7fff) << 16 | type & 0xffff);
- sink.writeInt((flags & 0xff) << 24 | length & 0xffffff);
- sink.writeInt(streamId & 0x7fffffff);
- sink.writeInt(associatedStreamId & 0x7fffffff);
- sink.writeShort((unused & 0x7) << 13 | (unused & 0x1f) << 8 | (unused & 0xff));
- sink.writeAll(headerBlockBuffer);
- sink.flush();
- }
+ if (closed) throw new IOException("closed");
+ writeNameValueBlockToBuffer(headerBlock);
+ int length = (int) (10 + headerBlockBuffer.size());
+ int type = TYPE_SYN_STREAM;
+ int flags = (outFinished ? FLAG_FIN : 0) | (inFinished ? FLAG_UNIDIRECTIONAL : 0);
+
+ int unused = 0;
+ sink.writeInt(0x80000000 | (VERSION & 0x7fff) << 16 | type & 0xffff);
+ sink.writeInt((flags & 0xff) << 24 | length & 0xffffff);
+ sink.writeInt(streamId & 0x7fffffff);
+ sink.writeInt(associatedStreamId & 0x7fffffff);
+ sink.writeShort((unused & 0x7) << 13 | (unused & 0x1f) << 8 | (unused & 0xff));
+ sink.writeAll(headerBlockBuffer);
+ sink.flush();
+ }
- @Override public synchronized void synReply(boolean outFinished, int streamId,
- List<Header> headerBlock) throws IOException {
- if (closed) throw new IOException("closed");
- writeNameValueBlockToBuffer(headerBlock);
- int type = TYPE_SYN_REPLY;
- int flags = (outFinished ? FLAG_FIN : 0);
- int length = (int) (headerBlockBuffer.size() + 4);
-
- sink.writeInt(0x80000000 | (VERSION & 0x7fff) << 16 | type & 0xffff);
- sink.writeInt((flags & 0xff) << 24 | length & 0xffffff);
- sink.writeInt(streamId & 0x7fffffff);
- sink.writeAll(headerBlockBuffer);
- sink.flush();
- }
+ @Override
+ public synchronized void synReply(boolean outFinished, int streamId,
+ List<Header> headerBlock) throws IOException {
+ if (closed) throw new IOException("closed");
+ writeNameValueBlockToBuffer(headerBlock);
+ int type = TYPE_SYN_REPLY;
+ int flags = (outFinished ? FLAG_FIN : 0);
+ int length = (int) (headerBlockBuffer.size() + 4);
+
+ sink.writeInt(0x80000000 | (VERSION & 0x7fff) << 16 | type & 0xffff);
+ sink.writeInt((flags & 0xff) << 24 | length & 0xffffff);
+ sink.writeInt(streamId & 0x7fffffff);
+ sink.writeAll(headerBlockBuffer);
+ sink.flush();
+ }
- @Override public synchronized void headers(int streamId, List<Header> headerBlock)
+ @Override
+ public synchronized void headers(int streamId, List<Header> headerBlock)
throws IOException {
- if (closed) throw new IOException("closed");
- writeNameValueBlockToBuffer(headerBlock);
- int flags = 0;
- int type = TYPE_HEADERS;
- int length = (int) (headerBlockBuffer.size() + 4);
-
- sink.writeInt(0x80000000 | (VERSION & 0x7fff) << 16 | type & 0xffff);
- sink.writeInt((flags & 0xff) << 24 | length & 0xffffff);
- sink.writeInt(streamId & 0x7fffffff);
- sink.writeAll(headerBlockBuffer);
- }
+ if (closed) throw new IOException("closed");
+ writeNameValueBlockToBuffer(headerBlock);
+ int flags = 0;
+ int type = TYPE_HEADERS;
+ int length = (int) (headerBlockBuffer.size() + 4);
+
+ sink.writeInt(0x80000000 | (VERSION & 0x7fff) << 16 | type & 0xffff);
+ sink.writeInt((flags & 0xff) << 24 | length & 0xffffff);
+ sink.writeInt(streamId & 0x7fffffff);
+ sink.writeAll(headerBlockBuffer);
+ }
- @Override public synchronized void rstStream(int streamId, ErrorCode errorCode)
+ @Override
+ public synchronized void rstStream(int streamId, ErrorCode errorCode)
throws IOException {
- if (closed) throw new IOException("closed");
- if (errorCode.spdyRstCode == -1) throw new IllegalArgumentException();
- int flags = 0;
- int type = TYPE_RST_STREAM;
- int length = 8;
- sink.writeInt(0x80000000 | (VERSION & 0x7fff) << 16 | type & 0xffff);
- sink.writeInt((flags & 0xff) << 24 | length & 0xffffff);
- sink.writeInt(streamId & 0x7fffffff);
- sink.writeInt(errorCode.spdyRstCode);
- sink.flush();
- }
+ if (closed) throw new IOException("closed");
+ if (errorCode.spdyRstCode == -1) throw new IllegalArgumentException();
+ int flags = 0;
+ int type = TYPE_RST_STREAM;
+ int length = 8;
+ sink.writeInt(0x80000000 | (VERSION & 0x7fff) << 16 | type & 0xffff);
+ sink.writeInt((flags & 0xff) << 24 | length & 0xffffff);
+ sink.writeInt(streamId & 0x7fffffff);
+ sink.writeInt(errorCode.spdyRstCode);
+ sink.flush();
+ }
- @Override public synchronized void data(boolean outFinished, int streamId, Buffer source)
+ @Override
+ public synchronized void data(boolean outFinished, int streamId, Buffer source)
throws IOException {
- data(outFinished, streamId, source, (int) source.size());
- }
+ data(outFinished, streamId, source, (int) source.size());
+ }
- @Override public synchronized void data(boolean outFinished, int streamId, Buffer source,
- int byteCount) throws IOException {
- int flags = (outFinished ? FLAG_FIN : 0);
- sendDataFrame(streamId, flags, source, byteCount);
- }
+ @Override
+ public synchronized void data(boolean outFinished, int streamId, Buffer source,
+ int byteCount) throws IOException {
+ int flags = (outFinished ? FLAG_FIN : 0);
+ sendDataFrame(streamId, flags, source, byteCount);
+ }
- void sendDataFrame(int streamId, int flags, Buffer buffer, int byteCount)
+ void sendDataFrame(int streamId, int flags, Buffer buffer, int byteCount)
throws IOException {
- if (closed) throw new IOException("closed");
- if (byteCount > 0xffffffL) {
- throw new IllegalArgumentException("FRAME_TOO_LARGE max size is 16Mib: " + byteCount);
- }
- sink.writeInt(streamId & 0x7fffffff);
- sink.writeInt((flags & 0xff) << 24 | byteCount & 0xffffff);
- if (byteCount > 0) {
- sink.write(buffer, byteCount);
- }
- }
+ if (closed) throw new IOException("closed");
+ if (byteCount > 0xffffffL) {
+ throw new IllegalArgumentException("FRAME_TOO_LARGE max size is 16Mib: " + byteCount);
+ }
+ sink.writeInt(streamId & 0x7fffffff);
+ sink.writeInt((flags & 0xff) << 24 | byteCount & 0xffffff);
+ if (byteCount > 0) {
+ sink.write(buffer, byteCount);
+ }
+ }
- private void writeNameValueBlockToBuffer(List<Header> headerBlock) throws IOException {
- if (headerBlockBuffer.size() != 0) throw new IllegalStateException();
- headerBlockOut.writeInt(headerBlock.size());
- for (int i = 0, size = headerBlock.size(); i < size; i++) {
- ByteString name = headerBlock.get(i).name;
- headerBlockOut.writeInt(name.size());
- headerBlockOut.write(name);
- ByteString value = headerBlock.get(i).value;
- headerBlockOut.writeInt(value.size());
- headerBlockOut.write(value);
- }
- headerBlockOut.flush();
- }
+ private void writeNameValueBlockToBuffer(List<Header> headerBlock) throws IOException {
+ if (headerBlockBuffer.size() != 0) throw new IllegalStateException();
+ headerBlockOut.writeInt(headerBlock.size());
+ for (int i = 0, size = headerBlock.size(); i < size; i++) {
+ ByteString name = headerBlock.get(i).name;
+ headerBlockOut.writeInt(name.size());
+ headerBlockOut.write(name);
+ ByteString value = headerBlock.get(i).value;
+ headerBlockOut.writeInt(value.size());
+ headerBlockOut.write(value);
+ }
+ headerBlockOut.flush();
+ }
- @Override public synchronized void settings(Settings settings) throws IOException {
- if (closed) throw new IOException("closed");
- int type = TYPE_SETTINGS;
- int flags = 0;
- int size = settings.size();
- int length = 4 + size * 8;
- sink.writeInt(0x80000000 | (VERSION & 0x7fff) << 16 | type & 0xffff);
- sink.writeInt((flags & 0xff) << 24 | length & 0xffffff);
- sink.writeInt(size);
- for (int i = 0; i <= Settings.COUNT; i++) {
- if (!settings.isSet(i)) continue;
- int settingsFlags = settings.flags(i);
- sink.writeInt((settingsFlags & 0xff) << 24 | (i & 0xffffff));
- sink.writeInt(settings.get(i));
- }
- sink.flush();
- }
+ @Override
+ public synchronized void settings(Settings settings) throws IOException {
+ if (closed) throw new IOException("closed");
+ int type = TYPE_SETTINGS;
+ int flags = 0;
+ int size = settings.size();
+ int length = 4 + size * 8;
+ sink.writeInt(0x80000000 | (VERSION & 0x7fff) << 16 | type & 0xffff);
+ sink.writeInt((flags & 0xff) << 24 | length & 0xffffff);
+ sink.writeInt(size);
+ for (int i = 0; i <= Settings.COUNT; i++) {
+ if (!settings.isSet(i)) continue;
+ int settingsFlags = settings.flags(i);
+ sink.writeInt((settingsFlags & 0xff) << 24 | (i & 0xffffff));
+ sink.writeInt(settings.get(i));
+ }
+ sink.flush();
+ }
- @Override public synchronized void ping(boolean reply, int payload1, int payload2)
+ @Override
+ public synchronized void ping(boolean reply, int payload1, int payload2)
throws IOException {
- if (closed) throw new IOException("closed");
- boolean payloadIsReply = client != ((payload1 & 1) == 1);
- if (reply != payloadIsReply) throw new IllegalArgumentException("payload != reply");
- int type = TYPE_PING;
- int flags = 0;
- int length = 4;
- sink.writeInt(0x80000000 | (VERSION & 0x7fff) << 16 | type & 0xffff);
- sink.writeInt((flags & 0xff) << 24 | length & 0xffffff);
- sink.writeInt(payload1);
- sink.flush();
- }
+ if (closed) throw new IOException("closed");
+ boolean payloadIsReply = client != ((payload1 & 1) == 1);
+ if (reply != payloadIsReply) throw new IllegalArgumentException("payload != reply");
+ int type = TYPE_PING;
+ int flags = 0;
+ int length = 4;
+ sink.writeInt(0x80000000 | (VERSION & 0x7fff) << 16 | type & 0xffff);
+ sink.writeInt((flags & 0xff) << 24 | length & 0xffffff);
+ sink.writeInt(payload1);
+ sink.flush();
+ }
- @Override public synchronized void goAway(int lastGoodStreamId, ErrorCode errorCode,
- byte[] ignored) throws IOException {
- if (closed) throw new IOException("closed");
- if (errorCode.spdyGoAwayCode == -1) {
- throw new IllegalArgumentException("errorCode.spdyGoAwayCode == -1");
- }
- int type = TYPE_GOAWAY;
- int flags = 0;
- int length = 8;
- sink.writeInt(0x80000000 | (VERSION & 0x7fff) << 16 | type & 0xffff);
- sink.writeInt((flags & 0xff) << 24 | length & 0xffffff);
- sink.writeInt(lastGoodStreamId);
- sink.writeInt(errorCode.spdyGoAwayCode);
- sink.flush();
- }
+ @Override
+ public synchronized void goAway(int lastGoodStreamId, ErrorCode errorCode,
+ byte[] ignored) throws IOException {
+ if (closed) throw new IOException("closed");
+ if (errorCode.spdyGoAwayCode == -1) {
+ throw new IllegalArgumentException("errorCode.spdyGoAwayCode == -1");
+ }
+ int type = TYPE_GOAWAY;
+ int flags = 0;
+ int length = 8;
+ sink.writeInt(0x80000000 | (VERSION & 0x7fff) << 16 | type & 0xffff);
+ sink.writeInt((flags & 0xff) << 24 | length & 0xffffff);
+ sink.writeInt(lastGoodStreamId);
+ sink.writeInt(errorCode.spdyGoAwayCode);
+ sink.flush();
+ }
- @Override public synchronized void windowUpdate(int streamId, long increment)
+ @Override
+ public synchronized void windowUpdate(int streamId, long increment)
throws IOException {
- if (closed) throw new IOException("closed");
- if (increment == 0 || increment > 0x7fffffffL) {
- throw new IllegalArgumentException(
- "windowSizeIncrement must be between 1 and 0x7fffffff: " + increment);
- }
- int type = TYPE_WINDOW_UPDATE;
- int flags = 0;
- int length = 8;
- sink.writeInt(0x80000000 | (VERSION & 0x7fff) << 16 | type & 0xffff);
- sink.writeInt((flags & 0xff) << 24 | length & 0xffffff);
- sink.writeInt(streamId);
- sink.writeInt((int) increment);
- sink.flush();
- }
+ if (closed) throw new IOException("closed");
+ if (increment == 0 || increment > 0x7fffffffL) {
+ throw new IllegalArgumentException(
+ "windowSizeIncrement must be between 1 and 0x7fffffff: " + increment);
+ }
+ int type = TYPE_WINDOW_UPDATE;
+ int flags = 0;
+ int length = 8;
+ sink.writeInt(0x80000000 | (VERSION & 0x7fff) << 16 | type & 0xffff);
+ sink.writeInt((flags & 0xff) << 24 | length & 0xffffff);
+ sink.writeInt(streamId);
+ sink.writeInt((int) increment);
+ sink.flush();
+ }
- @Override public synchronized void close() throws IOException {
- closed = true;
- Util.closeAll(sink, headerBlockOut);
+ @Override
+ public synchronized void close() throws IOException {
+ closed = true;
+ Util.closeAll(sink, headerBlockOut);
+ }
}
- }
}
diff --git a/AndroidAsync/src/com/koushikdutta/async/http/spdy/okhttp/internal/spdy/Variant.java b/AndroidAsync/src/com/koushikdutta/async/http/spdy/okhttp/internal/spdy/Variant.java
index 56994d1..eeb7c15 100644
--- a/AndroidAsync/src/com/koushikdutta/async/http/spdy/okhttp/internal/spdy/Variant.java
+++ b/AndroidAsync/src/com/koushikdutta/async/http/spdy/okhttp/internal/spdy/Variant.java
@@ -16,9 +16,9 @@
package com.koushikdutta.async.http.spdy.okhttp.internal.spdy;
+import com.koushikdutta.async.DataEmitter;
import com.koushikdutta.async.http.Protocol;
import com.koushikdutta.async.http.spdy.okio.BufferedSink;
-import com.koushikdutta.async.http.spdy.okio.BufferedSource;
/** A version and dialect of the framed socket protocol. */
public interface Variant {
@@ -29,7 +29,7 @@ public interface Variant {
/**
* @param client true if this is the HTTP client's reader, reading frames from a server.
*/
- FrameReader newReader(BufferedSource source, boolean client);
+ FrameReader newReader(DataEmitter source, FrameReader.Handler handler, boolean client);
/**
* @param client true if this is the HTTP client's writer, writing frames to a server.