diff options
author | Koushik Dutta <koushd@gmail.com> | 2014-07-27 19:53:50 -0700 |
---|---|---|
committer | Koushik Dutta <koushd@gmail.com> | 2014-07-27 19:53:50 -0700 |
commit | 432c80258b0d164f291cbb30a08d994b11f77ce4 (patch) | |
tree | 98b10f7d2828edb4cff0e647b61b2ce240eabc65 | |
parent | 9e24bdf9a04b2d664424255bfba28965598a69cc (diff) | |
download | AndroidAsync-432c80258b0d164f291cbb30a08d994b11f77ce4.tar.gz AndroidAsync-432c80258b0d164f291cbb30a08d994b11f77ce4.tar.bz2 AndroidAsync-432c80258b0d164f291cbb30a08d994b11f77ce4.zip |
refactor w/out framereader okio
-rw-r--r-- | AndroidAsync/src/com/koushikdutta/async/PushParser.java | 3 | ||||
-rw-r--r-- | AndroidAsync/src/com/koushikdutta/async/http/spdy/AsyncSpdyConnection.java | 60 | ||||
-rw-r--r-- | AndroidAsync/src/com/koushikdutta/async/http/spdy/okhttp/internal/spdy/FrameReader.java | 223 | ||||
-rw-r--r-- | AndroidAsync/src/com/koushikdutta/async/http/spdy/okhttp/internal/spdy/HeaderReader.java | 69 | ||||
-rw-r--r-- | AndroidAsync/src/com/koushikdutta/async/http/spdy/okhttp/internal/spdy/Http20Draft13.java.ignore (renamed from AndroidAsync/src/com/koushikdutta/async/http/spdy/okhttp/internal/spdy/Http20Draft13.java) | 1 | ||||
-rw-r--r-- | AndroidAsync/src/com/koushikdutta/async/http/spdy/okhttp/internal/spdy/NameValueBlockReader.java | 119 | ||||
-rw-r--r-- | AndroidAsync/src/com/koushikdutta/async/http/spdy/okhttp/internal/spdy/Spdy3.java | 918 | ||||
-rw-r--r-- | AndroidAsync/src/com/koushikdutta/async/http/spdy/okhttp/internal/spdy/Variant.java | 4 |
8 files changed, 697 insertions, 700 deletions
diff --git a/AndroidAsync/src/com/koushikdutta/async/PushParser.java b/AndroidAsync/src/com/koushikdutta/async/PushParser.java index e02ac7f..51a79cc 100644 --- a/AndroidAsync/src/com/koushikdutta/async/PushParser.java +++ b/AndroidAsync/src/com/koushikdutta/async/PushParser.java @@ -237,8 +237,9 @@ public class PushParser implements DataCallback { private ArrayList<Object> args = new ArrayList<Object>(); ByteOrder order = ByteOrder.BIG_ENDIAN; - public void setOrder(ByteOrder order) { + public PushParser setOrder(ByteOrder order) { this.order = order; + return this; } public PushParser(DataEmitter s) { diff --git a/AndroidAsync/src/com/koushikdutta/async/http/spdy/AsyncSpdyConnection.java b/AndroidAsync/src/com/koushikdutta/async/http/spdy/AsyncSpdyConnection.java index 7471615..d9feaf7 100644 --- a/AndroidAsync/src/com/koushikdutta/async/http/spdy/AsyncSpdyConnection.java +++ b/AndroidAsync/src/com/koushikdutta/async/http/spdy/AsyncSpdyConnection.java @@ -5,31 +5,25 @@ import com.koushikdutta.async.AsyncSocket; import com.koushikdutta.async.BufferedDataEmitter; import com.koushikdutta.async.BufferedDataSink; import com.koushikdutta.async.ByteBufferList; -import com.koushikdutta.async.DataEmitter; import com.koushikdutta.async.Util; import com.koushikdutta.async.callback.CompletedCallback; import com.koushikdutta.async.callback.DataCallback; import com.koushikdutta.async.callback.WritableCallback; import com.koushikdutta.async.future.SimpleFuture; -import com.koushikdutta.async.http.Headers; import com.koushikdutta.async.http.Protocol; import com.koushikdutta.async.http.spdy.okhttp.internal.spdy.ErrorCode; import com.koushikdutta.async.http.spdy.okhttp.internal.spdy.FrameReader; import com.koushikdutta.async.http.spdy.okhttp.internal.spdy.FrameWriter; import com.koushikdutta.async.http.spdy.okhttp.internal.spdy.Header; import com.koushikdutta.async.http.spdy.okhttp.internal.spdy.HeadersMode; -import com.koushikdutta.async.http.spdy.okhttp.internal.spdy.Http20Draft13; import com.koushikdutta.async.http.spdy.okhttp.internal.spdy.Ping; import com.koushikdutta.async.http.spdy.okhttp.internal.spdy.Settings; import com.koushikdutta.async.http.spdy.okhttp.internal.spdy.Spdy3; import com.koushikdutta.async.http.spdy.okhttp.internal.spdy.Variant; import com.koushikdutta.async.http.spdy.okio.BufferedSink; -import com.koushikdutta.async.http.spdy.okio.BufferedSource; import com.koushikdutta.async.http.spdy.okio.ByteString; import com.koushikdutta.async.http.spdy.okio.Okio; -import junit.framework.Assert; - import java.io.IOException; import java.util.Hashtable; import java.util.Iterator; @@ -42,21 +36,17 @@ import static com.koushikdutta.async.http.spdy.okhttp.internal.spdy.Settings.DEF * Created by koush on 7/16/14. */ public class AsyncSpdyConnection implements FrameReader.Handler { - BufferedDataEmitter emitter; AsyncSocket socket; BufferedDataSink bufferedSocket; FrameReader reader; FrameWriter writer; Variant variant; -// SpdySocket zero = new SpdySocket(0, false, false, null); - ByteBufferListSource source = new ByteBufferListSource(); ByteBufferListSink sink = new ByteBufferListSink() { @Override public void flush() throws IOException { AsyncSpdyConnection.this.flush(); } }; - BufferedSource bufferedSource; BufferedSink bufferedSink; Hashtable<Integer, SpdySocket> sockets = new Hashtable<Integer, SpdySocket>(); Protocol protocol; @@ -286,16 +276,15 @@ public class AsyncSpdyConnection implements FrameReader.Handler { this.protocol = protocol; this.socket = socket; this.bufferedSocket = new BufferedDataSink(socket); - emitter = new BufferedDataEmitter(socket); - emitter.setDataCallback(callback); if (protocol == Protocol.SPDY_3) { variant = new Spdy3(); } else if (protocol == Protocol.HTTP_2) { - variant = new Http20Draft13(); + throw new AssertionError("http20draft13"); +// variant = new Http20Draft13(); } - reader = variant.newReader(bufferedSource = Okio.buffer(source), true); + reader = variant.newReader(socket, this, true); writer = variant.newWriter(bufferedSink = Okio.buffer(sink), true); boolean client = true; @@ -313,22 +302,6 @@ public class AsyncSpdyConnection implements FrameReader.Handler { } } - DataCallback callback = new DataCallback() { - @Override - public void onDataAvailable(DataEmitter emitter, ByteBufferList bb) { - int needed; - while ((needed = reader.canProcessFrame(bb)) > 0) { - bb.get(source, needed); - try { - reader.nextFrame(AsyncSpdyConnection.this); - } - catch (IOException e) { - throw new AssertionError(e); - } - } - } - }; - /** * Sends a connection header if the current variant requires it. This should * be called after {@link Builder#build} for all new connections. @@ -348,7 +321,7 @@ public class AsyncSpdyConnection implements FrameReader.Handler { } @Override - public void data(boolean inFinished, int streamId, BufferedSource source, int length) throws IOException { + public void data(boolean inFinished, int streamId, ByteBufferList source) { if (pushedStream(streamId)) { throw new AssertionError("push"); // pushDataLater(streamId, source, length, inFinished); @@ -356,14 +329,17 @@ public class AsyncSpdyConnection implements FrameReader.Handler { } SpdySocket socket = sockets.get(streamId); if (socket == null) { - writer.rstStream(streamId, ErrorCode.INVALID_STREAM); - source.skip(length); + try { + writer.rstStream(streamId, ErrorCode.INVALID_STREAM); + } + catch (IOException e) { + throw new AssertionError(e); + } + source.recycle(); return; } - if (source != this.bufferedSource || this.source.remaining() + source.buffer().size() != length) - throw new AssertionError(); - source.buffer().readAll(socket.pending); - this.source.get(socket.pending); + int length = source.remaining(); + source.get(socket.pending); socket.updateWindowRead(length); Util.emitAllData(socket, socket.pending); if (inFinished) { @@ -562,4 +538,14 @@ public class AsyncSpdyConnection implements FrameReader.Handler { @Override public void alternateService(int streamId, String origin, ByteString protocol, String host, int port, long maxAge) { } + + @Override + public void error(Exception e) { + socket.close(); + for (Iterator<Map.Entry<Integer, SpdySocket>> i = sockets.entrySet().iterator(); i.hasNext();) { + Map.Entry<Integer, SpdySocket> entry = i.next(); + Util.end(entry.getValue(), e); + i.remove(); + } + } } diff --git a/AndroidAsync/src/com/koushikdutta/async/http/spdy/okhttp/internal/spdy/FrameReader.java b/AndroidAsync/src/com/koushikdutta/async/http/spdy/okhttp/internal/spdy/FrameReader.java index 19b6b77..3f457fb 100644 --- a/AndroidAsync/src/com/koushikdutta/async/http/spdy/okhttp/internal/spdy/FrameReader.java +++ b/AndroidAsync/src/com/koushikdutta/async/http/spdy/okhttp/internal/spdy/FrameReader.java @@ -17,124 +17,129 @@ package com.koushikdutta.async.http.spdy.okhttp.internal.spdy; import com.koushikdutta.async.ByteBufferList; -import com.koushikdutta.async.http.spdy.okio.BufferedSource; import com.koushikdutta.async.http.spdy.okio.ByteString; import java.io.Closeable; import java.io.IOException; import java.util.List; -/** Reads transport frames for SPDY/3 or HTTP/2. */ +/** + * Reads transport frames for SPDY/3 or HTTP/2. + */ public interface FrameReader extends Closeable { - int canProcessFrame(ByteBufferList bb); - void readConnectionPreface() throws IOException; - boolean nextFrame(Handler handler) throws IOException; + void readConnectionPreface() throws IOException; +// boolean nextFrame(Handler handler) throws IOException; - public interface Handler { - void data(boolean inFinished, int streamId, BufferedSource source, int length) - throws IOException; + public interface Handler { + void error(Exception e); + + void data(boolean inFinished, int streamId, ByteBufferList bb); + + /** + * Create or update incoming headers, creating the corresponding streams + * if necessary. Frames that trigger this are SPDY SYN_STREAM, HEADERS, and + * SYN_REPLY, and HTTP/2 HEADERS and PUSH_PROMISE. + * + * @param outFinished true if the receiver should not send further frames. + * @param inFinished true if the sender will not send further frames. + * @param streamId the stream owning these headers. + * @param associatedStreamId the stream that triggered the sender to create + * this stream. + */ + void headers(boolean outFinished, boolean inFinished, int streamId, int associatedStreamId, + List<Header> headerBlock, HeadersMode headersMode); + + void rstStream(int streamId, ErrorCode errorCode); + + void settings(boolean clearPrevious, Settings settings); + + /** + * HTTP/2 only. + */ + void ackSettings(); + + /** + * Read a connection-level ping from the peer. {@code ack} indicates this + * is a reply. Payload parameters are different between SPDY/3 and HTTP/2. + * <p/> + * In SPDY/3, only the first {@code payload1} parameter is set. If the + * reader is a client, it is an unsigned even number. Likewise, a server + * will receive an odd number. + * <p/> + * In HTTP/2, both {@code payload1} and {@code payload2} parameters are + * set. The data is opaque binary, and there are no rules on the content. + */ + void ping(boolean ack, int payload1, int payload2); + + /** + * The peer tells us to stop creating streams. It is safe to replay + * streams with {@code ID > lastGoodStreamId} on a new connection. In- + * flight streams with {@code ID <= lastGoodStreamId} can only be replayed + * on a new connection if they are idempotent. + * + * @param lastGoodStreamId the last stream ID the peer processed before + * sending this message. If {@code lastGoodStreamId} is zero, the peer + * processed no frames. + * @param errorCode reason for closing the connection. + * @param debugData only valid for HTTP/2; opaque debug data to send. + */ + void goAway(int lastGoodStreamId, ErrorCode errorCode, ByteString debugData); + + /** + * Notifies that an additional {@code windowSizeIncrement} bytes can be + * sent on {@code streamId}, or the connection if {@code streamId} is zero. + */ + void windowUpdate(int streamId, long windowSizeIncrement); + + /** + * Called when reading a headers or priority frame. This may be used to + * change the stream's weight from the default (16) to a new value. + * + * @param streamId stream which has a priority change. + * @param streamDependency the stream ID this stream is dependent on. + * @param weight relative proportion of priority in [1..256]. + * @param exclusive inserts this stream ID as the sole child of + * {@code streamDependency}. + */ + void priority(int streamId, int streamDependency, int weight, boolean exclusive); - /** - * Create or update incoming headers, creating the corresponding streams - * if necessary. Frames that trigger this are SPDY SYN_STREAM, HEADERS, and - * SYN_REPLY, and HTTP/2 HEADERS and PUSH_PROMISE. - * - * @param outFinished true if the receiver should not send further frames. - * @param inFinished true if the sender will not send further frames. - * @param streamId the stream owning these headers. - * @param associatedStreamId the stream that triggered the sender to create - * this stream. - */ - void headers(boolean outFinished, boolean inFinished, int streamId, int associatedStreamId, - List<Header> headerBlock, HeadersMode headersMode); - void rstStream(int streamId, ErrorCode errorCode); - void settings(boolean clearPrevious, Settings settings); - - /** HTTP/2 only. */ - void ackSettings(); - - /** - * Read a connection-level ping from the peer. {@code ack} indicates this - * is a reply. Payload parameters are different between SPDY/3 and HTTP/2. - * <p> - * In SPDY/3, only the first {@code payload1} parameter is set. If the - * reader is a client, it is an unsigned even number. Likewise, a server - * will receive an odd number. - * <p> - * In HTTP/2, both {@code payload1} and {@code payload2} parameters are - * set. The data is opaque binary, and there are no rules on the content. - */ - void ping(boolean ack, int payload1, int payload2); - - /** - * The peer tells us to stop creating streams. It is safe to replay - * streams with {@code ID > lastGoodStreamId} on a new connection. In- - * flight streams with {@code ID <= lastGoodStreamId} can only be replayed - * on a new connection if they are idempotent. - * - * @param lastGoodStreamId the last stream ID the peer processed before - * sending this message. If {@code lastGoodStreamId} is zero, the peer - * processed no frames. - * @param errorCode reason for closing the connection. - * @param debugData only valid for HTTP/2; opaque debug data to send. - */ - void goAway(int lastGoodStreamId, ErrorCode errorCode, ByteString debugData); - - /** - * Notifies that an additional {@code windowSizeIncrement} bytes can be - * sent on {@code streamId}, or the connection if {@code streamId} is zero. - */ - void windowUpdate(int streamId, long windowSizeIncrement); - - /** - * Called when reading a headers or priority frame. This may be used to - * change the stream's weight from the default (16) to a new value. - * - * @param streamId stream which has a priority change. - * @param streamDependency the stream ID this stream is dependent on. - * @param weight relative proportion of priority in [1..256]. - * @param exclusive inserts this stream ID as the sole child of - * {@code streamDependency}. - */ - void priority(int streamId, int streamDependency, int weight, boolean exclusive); - - /** - * HTTP/2 only. Receive a push promise header block. - * <p> - * A push promise contains all the headers that pertain to a server-initiated - * request, and a {@code promisedStreamId} to which response frames will be - * delivered. Push promise frames are sent as a part of the response to - * {@code streamId}. - * - * @param streamId client-initiated stream ID. Must be an odd number. - * @param promisedStreamId server-initiated stream ID. Must be an even - * number. - * @param requestHeaders minimally includes {@code :method}, {@code :scheme}, - * {@code :authority}, and (@code :path}. - */ - void pushPromise(int streamId, int promisedStreamId, List<Header> requestHeaders) + /** + * HTTP/2 only. Receive a push promise header block. + * <p/> + * A push promise contains all the headers that pertain to a server-initiated + * request, and a {@code promisedStreamId} to which response frames will be + * delivered. Push promise frames are sent as a part of the response to + * {@code streamId}. + * + * @param streamId client-initiated stream ID. Must be an odd number. + * @param promisedStreamId server-initiated stream ID. Must be an even + * number. + * @param requestHeaders minimally includes {@code :method}, {@code :scheme}, + * {@code :authority}, and (@code :path}. + */ + void pushPromise(int streamId, int promisedStreamId, List<Header> requestHeaders) throws IOException; - /** - * HTTP/2 only. Expresses that resources for the connection or a client- - * initiated stream are available from a different network location or - * protocol configuration. - * - * <p>See <a href="https://tools.ietf.org/html/draft-ietf-httpbis-alt-svc-01">alt-svc</a> - * - * @param streamId when a client-initiated stream ID (odd number), the - * origin of this alternate service is the origin of the stream. When - * zero, the origin is specified in the {@code origin} parameter. - * @param origin when present, the - * <a href="http://tools.ietf.org/html/rfc6454">origin</a> is typically - * represented as a combination of scheme, host and port. When empty, - * the origin is that of the {@code streamId}. - * @param protocol an ALPN protocol, such as {@code h2}. - * @param host an IP address or hostname. - * @param port the IP port associated with the service. - * @param maxAge time in seconds that this alternative is considered fresh. - */ - void alternateService(int streamId, String origin, ByteString protocol, String host, int port, - long maxAge); - } + /** + * HTTP/2 only. Expresses that resources for the connection or a client- + * initiated stream are available from a different network location or + * protocol configuration. + * <p/> + * <p>See <a href="https://tools.ietf.org/html/draft-ietf-httpbis-alt-svc-01">alt-svc</a> + * + * @param streamId when a client-initiated stream ID (odd number), the + * origin of this alternate service is the origin of the stream. When + * zero, the origin is specified in the {@code origin} parameter. + * @param origin when present, the + * <a href="http://tools.ietf.org/html/rfc6454">origin</a> is typically + * represented as a combination of scheme, host and port. When empty, + * the origin is that of the {@code streamId}. + * @param protocol an ALPN protocol, such as {@code h2}. + * @param host an IP address or hostname. + * @param port the IP port associated with the service. + * @param maxAge time in seconds that this alternative is considered fresh. + */ + void alternateService(int streamId, String origin, ByteString protocol, String host, int port, + long maxAge); + } } diff --git a/AndroidAsync/src/com/koushikdutta/async/http/spdy/okhttp/internal/spdy/HeaderReader.java b/AndroidAsync/src/com/koushikdutta/async/http/spdy/okhttp/internal/spdy/HeaderReader.java new file mode 100644 index 0000000..618f705 --- /dev/null +++ b/AndroidAsync/src/com/koushikdutta/async/http/spdy/okhttp/internal/spdy/HeaderReader.java @@ -0,0 +1,69 @@ +package com.koushikdutta.async.http.spdy.okhttp.internal.spdy; + +import com.koushikdutta.async.ByteBufferList; +import com.koushikdutta.async.http.spdy.okio.ByteString; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.util.ArrayList; +import java.util.List; +import java.util.zip.DataFormatException; +import java.util.zip.Inflater; + +/** + * Created by koush on 7/27/14. + */ +public class HeaderReader { + Inflater inflater; + public HeaderReader() { + inflater = new Inflater() { + @Override public int inflate(byte[] buffer, int offset, int count) + throws DataFormatException { + int result = super.inflate(buffer, offset, count); + if (result == 0 && needsDictionary()) { + setDictionary(Spdy3.DICTIONARY); + result = super.inflate(buffer, offset, count); + } + return result; + } + }; + } + + public List<Header> readHeader(ByteBufferList bb, int length) throws IOException { + byte[] bytes = new byte[length]; + bb.get(bytes); + + inflater.setInput(bytes); + + ByteBufferList source = new ByteBufferList().order(ByteOrder.BIG_ENDIAN); + while (!inflater.needsInput()) { + ByteBuffer b = ByteBufferList.obtain(8192); + try { + int read = inflater.inflate(b.array()); + b.limit(read); + source.add(b); + } + catch (DataFormatException e) { + throw new IOException(e); + } + } + + int numberOfPairs = source.getInt(); + List<Header> entries = new ArrayList<Header>(numberOfPairs); + for (int i = 0; i < numberOfPairs; i++) { + ByteString name = readByteString(source).toAsciiLowercase(); + ByteString values = readByteString(source); + if (name.size() == 0) throw new IOException("name.size == 0"); + entries.add(new Header(name, values)); + } + return entries; + } + + private static ByteString readByteString(ByteBufferList source) { + int length = source.getInt(); + byte[] bytes = new byte[length]; + source.get(bytes); + return ByteString.of(bytes); + } +} diff --git a/AndroidAsync/src/com/koushikdutta/async/http/spdy/okhttp/internal/spdy/Http20Draft13.java b/AndroidAsync/src/com/koushikdutta/async/http/spdy/okhttp/internal/spdy/Http20Draft13.java.ignore index 1425cbb..c223197 100644 --- a/AndroidAsync/src/com/koushikdutta/async/http/spdy/okhttp/internal/spdy/Http20Draft13.java +++ b/AndroidAsync/src/com/koushikdutta/async/http/spdy/okhttp/internal/spdy/Http20Draft13.java.ignore @@ -38,6 +38,7 @@ import static java.util.logging.Level.FINE; * Read and write HTTP/2 v13 frames. * <p>http://tools.ietf.org/html/draft-ietf-httpbis-http2-13 */ + public final class Http20Draft13 implements Variant { private static final Logger logger = Logger.getLogger(Http20Draft13.class.getName()); diff --git a/AndroidAsync/src/com/koushikdutta/async/http/spdy/okhttp/internal/spdy/NameValueBlockReader.java b/AndroidAsync/src/com/koushikdutta/async/http/spdy/okhttp/internal/spdy/NameValueBlockReader.java deleted file mode 100644 index adc15f8..0000000 --- a/AndroidAsync/src/com/koushikdutta/async/http/spdy/okhttp/internal/spdy/NameValueBlockReader.java +++ /dev/null @@ -1,119 +0,0 @@ -/* - * Copyright (C) 2013 Square, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.koushikdutta.async.http.spdy.okhttp.internal.spdy; - -import com.koushikdutta.async.http.spdy.okio.Buffer; -import com.koushikdutta.async.http.spdy.okio.BufferedSource; -import com.koushikdutta.async.http.spdy.okio.ByteString; -import com.koushikdutta.async.http.spdy.okio.ForwardingSource; -import com.koushikdutta.async.http.spdy.okio.InflaterSource; -import com.koushikdutta.async.http.spdy.okio.Okio; -import com.koushikdutta.async.http.spdy.okio.Source; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; -import java.util.zip.DataFormatException; -import java.util.zip.Inflater; - -/** - * Reads a SPDY/3 Name/Value header block. This class is made complicated by the - * requirement that we're strict with which bytes we put in the compressed bytes - * buffer. We need to put all compressed bytes into that buffer -- but no other - * bytes. - */ -class NameValueBlockReader { - /** This source transforms compressed bytes into uncompressed bytes. */ - private final InflaterSource inflaterSource; - - /** - * How many compressed bytes must be read into inflaterSource before - * {@link #readNameValueBlock} returns. - */ - private int compressedLimit; - - /** This source holds inflated bytes. */ - private final BufferedSource source; - - public NameValueBlockReader(BufferedSource source) { - // Limit the inflater input stream to only those bytes in the Name/Value - // block. We cut the inflater off at its source because we can't predict the - // ratio of compressed bytes to uncompressed bytes. - Source throttleSource = new ForwardingSource(source) { - @Override public long read(Buffer sink, long byteCount) throws IOException { - if (compressedLimit == 0) return -1; // Out of data for the current block. - long read = super.read(sink, Math.min(byteCount, compressedLimit)); - if (read == -1) return -1; - compressedLimit -= read; - return read; - } - }; - - // Subclass inflater to install a dictionary when it's needed. - Inflater inflater = new Inflater() { - @Override public int inflate(byte[] buffer, int offset, int count) - throws DataFormatException { - int result = super.inflate(buffer, offset, count); - if (result == 0 && needsDictionary()) { - setDictionary(Spdy3.DICTIONARY); - result = super.inflate(buffer, offset, count); - } - return result; - } - }; - - this.inflaterSource = new InflaterSource(throttleSource, inflater); - this.source = Okio.buffer(inflaterSource); - } - - public List<Header> readNameValueBlock(int length) throws IOException { - this.compressedLimit += length; - - int numberOfPairs = source.readInt(); - if (numberOfPairs < 0) throw new IOException("numberOfPairs < 0: " + numberOfPairs); - if (numberOfPairs > 1024) throw new IOException("numberOfPairs > 1024: " + numberOfPairs); - - List<Header> entries = new ArrayList<Header>(numberOfPairs); - for (int i = 0; i < numberOfPairs; i++) { - ByteString name = readByteString().toAsciiLowercase(); - ByteString values = readByteString(); - if (name.size() == 0) throw new IOException("name.size == 0"); - entries.add(new Header(name, values)); - } - - doneReading(); - return entries; - } - - private ByteString readByteString() throws IOException { - int length = source.readInt(); - return source.readByteString(length); - } - - private void doneReading() throws IOException { - // Move any outstanding unread bytes into the inflater. One side-effect of - // deflate compression is that sometimes there are bytes remaining in the - // stream after we've consumed all of the content. - if (compressedLimit > 0) { - inflaterSource.refill(); - if (compressedLimit != 0) throw new IOException("compressedLimit > 0: " + compressedLimit); - } - } - - public void close() throws IOException { - source.close(); - } -} diff --git a/AndroidAsync/src/com/koushikdutta/async/http/spdy/okhttp/internal/spdy/Spdy3.java b/AndroidAsync/src/com/koushikdutta/async/http/spdy/okhttp/internal/spdy/Spdy3.java index e38f3f0..2868ff2 100644 --- a/AndroidAsync/src/com/koushikdutta/async/http/spdy/okhttp/internal/spdy/Spdy3.java +++ b/AndroidAsync/src/com/koushikdutta/async/http/spdy/okhttp/internal/spdy/Spdy3.java @@ -16,11 +16,13 @@ package com.koushikdutta.async.http.spdy.okhttp.internal.spdy; import com.koushikdutta.async.ByteBufferList; +import com.koushikdutta.async.DataEmitter; +import com.koushikdutta.async.DataEmitterReader; +import com.koushikdutta.async.callback.DataCallback; import com.koushikdutta.async.http.Protocol; import com.koushikdutta.async.http.spdy.okhttp.internal.Util; import com.koushikdutta.async.http.spdy.okio.Buffer; import com.koushikdutta.async.http.spdy.okio.BufferedSink; -import com.koushikdutta.async.http.spdy.okio.BufferedSource; import com.koushikdutta.async.http.spdy.okio.ByteString; import com.koushikdutta.async.http.spdy.okio.DeflaterSink; import com.koushikdutta.async.http.spdy.okio.Okio; @@ -29,7 +31,6 @@ import com.koushikdutta.async.util.Charsets; import java.io.IOException; import java.io.UnsupportedEncodingException; import java.net.ProtocolException; -import java.nio.ByteBuffer; import java.nio.ByteOrder; import java.util.List; import java.util.zip.Deflater; @@ -41,477 +42,530 @@ import java.util.zip.Deflater; */ public final class Spdy3 implements Variant { - @Override public Protocol getProtocol() { - return Protocol.SPDY_3; - } - - static final int TYPE_DATA = 0x0; - static final int TYPE_SYN_STREAM = 0x1; - static final int TYPE_SYN_REPLY = 0x2; - static final int TYPE_RST_STREAM = 0x3; - static final int TYPE_SETTINGS = 0x4; - static final int TYPE_PING = 0x6; - static final int TYPE_GOAWAY = 0x7; - static final int TYPE_HEADERS = 0x8; - static final int TYPE_WINDOW_UPDATE = 0x9; - - static final int FLAG_FIN = 0x1; - static final int FLAG_UNIDIRECTIONAL = 0x2; - - static final int VERSION = 3; - - static final byte[] DICTIONARY; - static { - try { - DICTIONARY = ("\u0000\u0000\u0000\u0007options\u0000\u0000\u0000\u0004hea" - + "d\u0000\u0000\u0000\u0004post\u0000\u0000\u0000\u0003put\u0000\u0000\u0000\u0006dele" - + "te\u0000\u0000\u0000\u0005trace\u0000\u0000\u0000\u0006accept\u0000\u0000\u0000" - + "\u000Eaccept-charset\u0000\u0000\u0000\u000Faccept-encoding\u0000\u0000\u0000\u000Fa" - + "ccept-language\u0000\u0000\u0000\raccept-ranges\u0000\u0000\u0000\u0003age\u0000" - + "\u0000\u0000\u0005allow\u0000\u0000\u0000\rauthorization\u0000\u0000\u0000\rcache-co" - + "ntrol\u0000\u0000\u0000\nconnection\u0000\u0000\u0000\fcontent-base\u0000\u0000" - + "\u0000\u0010content-encoding\u0000\u0000\u0000\u0010content-language\u0000\u0000" - + "\u0000\u000Econtent-length\u0000\u0000\u0000\u0010content-location\u0000\u0000\u0000" - + "\u000Bcontent-md5\u0000\u0000\u0000\rcontent-range\u0000\u0000\u0000\fcontent-type" - + "\u0000\u0000\u0000\u0004date\u0000\u0000\u0000\u0004etag\u0000\u0000\u0000\u0006expe" - + "ct\u0000\u0000\u0000\u0007expires\u0000\u0000\u0000\u0004from\u0000\u0000\u0000" - + "\u0004host\u0000\u0000\u0000\bif-match\u0000\u0000\u0000\u0011if-modified-since" - + "\u0000\u0000\u0000\rif-none-match\u0000\u0000\u0000\bif-range\u0000\u0000\u0000" - + "\u0013if-unmodified-since\u0000\u0000\u0000\rlast-modified\u0000\u0000\u0000\blocati" - + "on\u0000\u0000\u0000\fmax-forwards\u0000\u0000\u0000\u0006pragma\u0000\u0000\u0000" - + "\u0012proxy-authenticate\u0000\u0000\u0000\u0013proxy-authorization\u0000\u0000" - + "\u0000\u0005range\u0000\u0000\u0000\u0007referer\u0000\u0000\u0000\u000Bretry-after" - + "\u0000\u0000\u0000\u0006server\u0000\u0000\u0000\u0002te\u0000\u0000\u0000\u0007trai" - + "ler\u0000\u0000\u0000\u0011transfer-encoding\u0000\u0000\u0000\u0007upgrade\u0000" - + "\u0000\u0000\nuser-agent\u0000\u0000\u0000\u0004vary\u0000\u0000\u0000\u0003via" - + "\u0000\u0000\u0000\u0007warning\u0000\u0000\u0000\u0010www-authenticate\u0000\u0000" - + "\u0000\u0006method\u0000\u0000\u0000\u0003get\u0000\u0000\u0000\u0006status\u0000" - + "\u0000\u0000\u0006200 OK\u0000\u0000\u0000\u0007version\u0000\u0000\u0000\bHTTP/1.1" - + "\u0000\u0000\u0000\u0003url\u0000\u0000\u0000\u0006public\u0000\u0000\u0000\nset-coo" - + "kie\u0000\u0000\u0000\nkeep-alive\u0000\u0000\u0000\u0006origin100101201202205206300" - + "302303304305306307402405406407408409410411412413414415416417502504505203 Non-Authori" - + "tative Information204 No Content301 Moved Permanently400 Bad Request401 Unauthorized" - + "403 Forbidden404 Not Found500 Internal Server Error501 Not Implemented503 Service Un" - + "availableJan Feb Mar Apr May Jun Jul Aug Sept Oct Nov Dec 00:00:00 Mon, Tue, Wed, Th" - + "u, Fri, Sat, Sun, GMTchunked,text/html,image/png,image/jpg,image/gif,application/xml" - + ",application/xhtml+xml,text/plain,text/javascript,publicprivatemax-age=gzip,deflate," - + "sdchcharset=utf-8charset=iso-8859-1,utf-,*,enq=0.").getBytes(Charsets.UTF_8.name()); - } catch (UnsupportedEncodingException e) { - throw new AssertionError(); - } - } - - @Override public FrameReader newReader(BufferedSource source, boolean client) { - return new Reader(source, client); - } - - @Override public FrameWriter newWriter(BufferedSink sink, boolean client) { - return new Writer(sink, client); - } - - @Override public int maxFrameSize() { - return 16383; - } - - /** Read spdy/3 frames. */ - static final class Reader implements FrameReader { - private final BufferedSource source; - private final boolean client; - private final NameValueBlockReader headerBlockReader; - - Reader(BufferedSource source, boolean client) { - this.source = source; - this.headerBlockReader = new NameValueBlockReader(this.source); - this.client = client; + @Override + public Protocol getProtocol() { + return Protocol.SPDY_3; + } + + static final int TYPE_DATA = 0x0; + static final int TYPE_SYN_STREAM = 0x1; + static final int TYPE_SYN_REPLY = 0x2; + static final int TYPE_RST_STREAM = 0x3; + static final int TYPE_SETTINGS = 0x4; + static final int TYPE_PING = 0x6; + static final int TYPE_GOAWAY = 0x7; + static final int TYPE_HEADERS = 0x8; + static final int TYPE_WINDOW_UPDATE = 0x9; + + static final int FLAG_FIN = 0x1; + static final int FLAG_UNIDIRECTIONAL = 0x2; + + static final int VERSION = 3; + + static final byte[] DICTIONARY; + + static { + try { + DICTIONARY = ("\u0000\u0000\u0000\u0007options\u0000\u0000\u0000\u0004hea" + + "d\u0000\u0000\u0000\u0004post\u0000\u0000\u0000\u0003put\u0000\u0000\u0000\u0006dele" + + "te\u0000\u0000\u0000\u0005trace\u0000\u0000\u0000\u0006accept\u0000\u0000\u0000" + + "\u000Eaccept-charset\u0000\u0000\u0000\u000Faccept-encoding\u0000\u0000\u0000\u000Fa" + + "ccept-language\u0000\u0000\u0000\raccept-ranges\u0000\u0000\u0000\u0003age\u0000" + + "\u0000\u0000\u0005allow\u0000\u0000\u0000\rauthorization\u0000\u0000\u0000\rcache-co" + + "ntrol\u0000\u0000\u0000\nconnection\u0000\u0000\u0000\fcontent-base\u0000\u0000" + + "\u0000\u0010content-encoding\u0000\u0000\u0000\u0010content-language\u0000\u0000" + + "\u0000\u000Econtent-length\u0000\u0000\u0000\u0010content-location\u0000\u0000\u0000" + + "\u000Bcontent-md5\u0000\u0000\u0000\rcontent-range\u0000\u0000\u0000\fcontent-type" + + "\u0000\u0000\u0000\u0004date\u0000\u0000\u0000\u0004etag\u0000\u0000\u0000\u0006expe" + + "ct\u0000\u0000\u0000\u0007expires\u0000\u0000\u0000\u0004from\u0000\u0000\u0000" + + "\u0004host\u0000\u0000\u0000\bif-match\u0000\u0000\u0000\u0011if-modified-since" + + "\u0000\u0000\u0000\rif-none-match\u0000\u0000\u0000\bif-range\u0000\u0000\u0000" + + "\u0013if-unmodified-since\u0000\u0000\u0000\rlast-modified\u0000\u0000\u0000\blocati" + + "on\u0000\u0000\u0000\fmax-forwards\u0000\u0000\u0000\u0006pragma\u0000\u0000\u0000" + + "\u0012proxy-authenticate\u0000\u0000\u0000\u0013proxy-authorization\u0000\u0000" + + "\u0000\u0005range\u0000\u0000\u0000\u0007referer\u0000\u0000\u0000\u000Bretry-after" + + "\u0000\u0000\u0000\u0006server\u0000\u0000\u0000\u0002te\u0000\u0000\u0000\u0007trai" + + "ler\u0000\u0000\u0000\u0011transfer-encoding\u0000\u0000\u0000\u0007upgrade\u0000" + + "\u0000\u0000\nuser-agent\u0000\u0000\u0000\u0004vary\u0000\u0000\u0000\u0003via" + + "\u0000\u0000\u0000\u0007warning\u0000\u0000\u0000\u0010www-authenticate\u0000\u0000" + + "\u0000\u0006method\u0000\u0000\u0000\u0003get\u0000\u0000\u0000\u0006status\u0000" + + "\u0000\u0000\u0006200 OK\u0000\u0000\u0000\u0007version\u0000\u0000\u0000\bHTTP/1.1" + + "\u0000\u0000\u0000\u0003url\u0000\u0000\u0000\u0006public\u0000\u0000\u0000\nset-coo" + + "kie\u0000\u0000\u0000\nkeep-alive\u0000\u0000\u0000\u0006origin100101201202205206300" + + "302303304305306307402405406407408409410411412413414415416417502504505203 Non-Authori" + + "tative Information204 No Content301 Moved Permanently400 Bad Request401 Unauthorized" + + "403 Forbidden404 Not Found500 Internal Server Error501 Not Implemented503 Service Un" + + "availableJan Feb Mar Apr May Jun Jul Aug Sept Oct Nov Dec 00:00:00 Mon, Tue, Wed, Th" + + "u, Fri, Sat, Sun, GMTchunked,text/html,image/png,image/jpg,image/gif,application/xml" + + ",application/xhtml+xml,text/plain,text/javascript,publicprivatemax-age=gzip,deflate," + + "sdchcharset=utf-8charset=iso-8859-1,utf-,*,enq=0.").getBytes(Charsets.UTF_8.name()); + } catch (UnsupportedEncodingException e) { + throw new AssertionError(); + } } - @Override public void readConnectionPreface() { + @Override + public FrameReader newReader(DataEmitter source, FrameReader.Handler handler, boolean client) { + return new Reader(source, handler, client); } - @Override - public int canProcessFrame(ByteBufferList bb) { - if (source.buffer().size() + bb.remaining() < 8) - return 0; - ByteBuffer peek = ByteBuffer.wrap(bb.peekBytes(8)).order(ByteOrder.BIG_ENDIAN); - int w1 = peek.getInt(); - int w2 = peek.getInt(); - - int length = (w2 & 0xffffff); - if (bb.remaining() < 8 + length) - return 0; - return 8 + length; - } - - /** - * Send the next frame to {@code handler}. Returns true unless there are no - * more frames on the stream. - */ - @Override public boolean nextFrame(Handler handler) throws IOException { - int w1; - int w2; - try { - w1 = source.readInt(); - w2 = source.readInt(); - } catch (IOException e) { - return false; // This might be a normal socket close. - } - - boolean control = (w1 & 0x80000000) != 0; - int flags = (w2 & 0xff000000) >>> 24; - int length = (w2 & 0xffffff); - - if (control) { - int version = (w1 & 0x7fff0000) >>> 16; - int type = (w1 & 0xffff); - - if (version != 3) { - throw new ProtocolException("version != 3: " + version); - } - - switch (type) { - case TYPE_SYN_STREAM: - readSynStream(handler, flags, length); - return true; - - case TYPE_SYN_REPLY: - readSynReply(handler, flags, length); - return true; - - case TYPE_RST_STREAM: - readRstStream(handler, flags, length); - return true; - - case TYPE_SETTINGS: - readSettings(handler, flags, length); - return true; - - case TYPE_PING: - readPing(handler, flags, length); - return true; - - case TYPE_GOAWAY: - readGoAway(handler, flags, length); - return true; - - case TYPE_HEADERS: - readHeaders(handler, flags, length); - return true; - - case TYPE_WINDOW_UPDATE: - readWindowUpdate(handler, flags, length); - return true; - - default: - source.skip(length); - return true; - } - } else { - int streamId = w1 & 0x7fffffff; - boolean inFinished = (flags & FLAG_FIN) != 0; - handler.data(inFinished, streamId, source, length); - return true; - } + @Override + public FrameWriter newWriter(BufferedSink sink, boolean client) { + return new Writer(sink, client); } - private void readSynStream(Handler handler, int flags, int length) throws IOException { - int w1 = source.readInt(); - int w2 = source.readInt(); - int streamId = w1 & 0x7fffffff; - int associatedStreamId = w2 & 0x7fffffff; - source.readShort(); // int priority = (s3 & 0xe000) >>> 13; int slot = s3 & 0xff; - List<Header> headerBlock = headerBlockReader.readNameValueBlock(length - 10); - - boolean inFinished = (flags & FLAG_FIN) != 0; - boolean outFinished = (flags & FLAG_UNIDIRECTIONAL) != 0; - handler.headers(outFinished, inFinished, streamId, associatedStreamId, headerBlock, - HeadersMode.SPDY_SYN_STREAM); + @Override + public int maxFrameSize() { + return 16383; } - private void readSynReply(Handler handler, int flags, int length) throws IOException { - int w1 = source.readInt(); - int streamId = w1 & 0x7fffffff; - List<Header> headerBlock = headerBlockReader.readNameValueBlock(length - 4); - boolean inFinished = (flags & FLAG_FIN) != 0; - handler.headers(false, inFinished, streamId, -1, headerBlock, HeadersMode.SPDY_REPLY); - } + /** + * Read spdy/3 frames. + */ + static final class Reader implements FrameReader { + private final HeaderReader headerReader = new HeaderReader(); + private final DataEmitter emitter; + private final boolean client; + private final Handler handler; + private final DataEmitterReader reader; + + Reader(DataEmitter emitter, Handler handler, boolean client) { + this.emitter = emitter; + this.handler = handler; + this.client = client; + + reader = new DataEmitterReader(); + parseFrameHeader(); + } - private void readRstStream(Handler handler, int flags, int length) throws IOException { - if (length != 8) throw ioException("TYPE_RST_STREAM length: %d != 8", length); - int streamId = source.readInt() & 0x7fffffff; - int errorCodeInt = source.readInt(); - ErrorCode errorCode = ErrorCode.fromSpdy3Rst(errorCodeInt); - if (errorCode == null) { - throw ioException("TYPE_RST_STREAM unexpected error code: %d", errorCodeInt); - } - handler.rstStream(streamId, errorCode); - } + private void parseFrameHeader() { + emitter.setDataCallback(reader); + reader.read(8, onFrame); + } - private void readHeaders(Handler handler, int flags, int length) throws IOException { - int w1 = source.readInt(); - int streamId = w1 & 0x7fffffff; - List<Header> headerBlock = headerBlockReader.readNameValueBlock(length - 4); - handler.headers(false, false, streamId, -1, headerBlock, HeadersMode.SPDY_HEADERS); - } + int w1; + int w2; + int flags; + int length; + int streamId; + boolean inFinished; + private final DataCallback onFrame = new DataCallback() { + @Override + public void onDataAvailable(DataEmitter emitter, ByteBufferList bb) { + bb.order(ByteOrder.BIG_ENDIAN); + w1 = bb.getInt(); + w2 = bb.getInt(); + + boolean control = (w1 & 0x80000000) != 0; + flags = (w2 & 0xff000000) >>> 24; + length = (w2 & 0xffffff); + + if (!control) { + streamId = w1 & 0x7fffffff; + inFinished = (flags & FLAG_FIN) != 0; + emitter.setDataCallback(onDataFrame); + } + else { + reader.read(length, onFullFrame); + } + } + }; + + private final DataCallback onDataFrame = new DataCallback() { + @Override + public void onDataAvailable(DataEmitter emitter, ByteBufferList bb) { + int toRead = Math.min(bb.remaining(), length); + if (toRead < bb.remaining()) { + ByteBufferList partial = new ByteBufferList(); + bb.get(partial, toRead); + bb = partial; + } + + length -= toRead; + handler.data(length == 0 && inFinished, streamId, bb); + + if (length == 0) + parseFrameHeader(); + } + }; + + private final DataCallback onFullFrame = new DataCallback() { + @Override + public void onDataAvailable(DataEmitter emitter, ByteBufferList bb) { + // queue up the next frame read + bb.order(ByteOrder.BIG_ENDIAN); + + int version = (w1 & 0x7fff0000) >>> 16; + int type = (w1 & 0xffff); + + try { + if (version != 3) { + throw new ProtocolException("version != 3: " + version); + } + + switch (type) { + case TYPE_SYN_STREAM: + readSynStream(bb, flags, length); + break; + + case TYPE_SYN_REPLY: + readSynReply(bb, flags, length); + break; + + case TYPE_RST_STREAM: + readRstStream(bb, flags, length); + break; + + case TYPE_SETTINGS: + readSettings(bb, flags, length); + break; + + case TYPE_PING: + readPing(bb, flags, length); + break; + + case TYPE_GOAWAY: + readGoAway(bb, flags, length); + break; + + case TYPE_HEADERS: + readHeaders(bb, flags, length); + break; + + case TYPE_WINDOW_UPDATE: + readWindowUpdate(bb, flags, length); + break; + + default: + bb.recycle(); + break; + } + parseFrameHeader(); + } + catch (IOException e) { + handler.error(e); + } + } + }; + + @Override + public void readConnectionPreface() { + } + private void readSynStream(ByteBufferList source, int flags, int length) throws IOException { + int w1 = source.getInt(); + int w2 = source.getInt(); + int streamId = w1 & 0x7fffffff; + int associatedStreamId = w2 & 0x7fffffff; + source.getShort(); // int priority = (s3 & 0xe000) >>> 13; int slot = s3 & 0xff; + List<Header> headerBlock = headerReader.readHeader(source, length - 10); + + boolean inFinished = (flags & FLAG_FIN) != 0; + boolean outFinished = (flags & FLAG_UNIDIRECTIONAL) != 0; + handler.headers(outFinished, inFinished, streamId, associatedStreamId, headerBlock, + HeadersMode.SPDY_SYN_STREAM); + } - private void readWindowUpdate(Handler handler, int flags, int length) throws IOException { - if (length != 8) throw ioException("TYPE_WINDOW_UPDATE length: %d != 8", length); - int w1 = source.readInt(); - int w2 = source.readInt(); - int streamId = w1 & 0x7fffffff; - long increment = w2 & 0x7fffffff; - if (increment == 0) throw ioException("windowSizeIncrement was 0", increment); - handler.windowUpdate(streamId, increment); - } + private void readSynReply(ByteBufferList source, int flags, int length) throws IOException { + int w1 = source.getInt(); + int streamId = w1 & 0x7fffffff; + List<Header> headerBlock = headerReader.readHeader(source, length - 4); + boolean inFinished = (flags & FLAG_FIN) != 0; + handler.headers(false, inFinished, streamId, -1, headerBlock, HeadersMode.SPDY_REPLY); + } - private void readPing(Handler handler, int flags, int length) throws IOException { - if (length != 4) throw ioException("TYPE_PING length: %d != 4", length); - int id = source.readInt(); - boolean ack = client == ((id & 1) == 1); - handler.ping(ack, id, 0); - } + private void readRstStream(ByteBufferList source, int flags, int length) throws IOException { + if (length != 8) throw ioException("TYPE_RST_STREAM length: %d != 8", length); + int streamId = source.getInt() & 0x7fffffff; + int errorCodeInt = source.getInt(); + ErrorCode errorCode = ErrorCode.fromSpdy3Rst(errorCodeInt); + if (errorCode == null) { + throw ioException("TYPE_RST_STREAM unexpected error code: %d", errorCodeInt); + } + handler.rstStream(streamId, errorCode); + } - private void readGoAway(Handler handler, int flags, int length) throws IOException { - if (length != 8) throw ioException("TYPE_GOAWAY length: %d != 8", length); - int lastGoodStreamId = source.readInt() & 0x7fffffff; - int errorCodeInt = source.readInt(); - ErrorCode errorCode = ErrorCode.fromSpdyGoAway(errorCodeInt); - if (errorCode == null) { - throw ioException("TYPE_GOAWAY unexpected error code: %d", errorCodeInt); - } - handler.goAway(lastGoodStreamId, errorCode, ByteString.EMPTY); - } + private void readHeaders(ByteBufferList source, int flags, int length) throws IOException { + int w1 = source.getInt(); + int streamId = w1 & 0x7fffffff; + List<Header> headerBlock = headerReader.readHeader(source, length - 4); + handler.headers(false, false, streamId, -1, headerBlock, HeadersMode.SPDY_HEADERS); + } - private void readSettings(Handler handler, int flags, int length) throws IOException { - int numberOfEntries = source.readInt(); - if (length != 4 + 8 * numberOfEntries) { - throw ioException("TYPE_SETTINGS length: %d != 4 + 8 * %d", length, numberOfEntries); - } - Settings settings = new Settings(); - for (int i = 0; i < numberOfEntries; i++) { - int w1 = source.readInt(); - int value = source.readInt(); - int idFlags = (w1 & 0xff000000) >>> 24; - int id = w1 & 0xffffff; - settings.set(id, idFlags, value); - } - boolean clearPrevious = (flags & Settings.FLAG_CLEAR_PREVIOUSLY_PERSISTED_SETTINGS) != 0; - handler.settings(clearPrevious, settings); - } + private void readWindowUpdate(ByteBufferList source, int flags, int length) throws IOException { + if (length != 8) throw ioException("TYPE_WINDOW_UPDATE length: %d != 8", length); + int w1 = source.getInt(); + int w2 = source.getInt(); + int streamId = w1 & 0x7fffffff; + long increment = w2 & 0x7fffffff; + if (increment == 0) throw ioException("windowSizeIncrement was 0", increment); + handler.windowUpdate(streamId, increment); + } - private static IOException ioException(String message, Object... args) throws IOException { - throw new IOException(String.format(message, args)); - } + private void readPing(ByteBufferList source, int flags, int length) throws IOException { + if (length != 4) throw ioException("TYPE_PING length: %d != 4", length); + int id = source.getInt(); + boolean ack = client == ((id & 1) == 1); + handler.ping(ack, id, 0); + } - @Override public void close() throws IOException { - headerBlockReader.close(); - } - } - - /** Write spdy/3 frames. */ - static final class Writer implements FrameWriter { - private final BufferedSink sink; - private final Buffer headerBlockBuffer; - private final BufferedSink headerBlockOut; - private final boolean client; - private boolean closed; - - Writer(BufferedSink sink, boolean client) { - this.sink = sink; - this.client = client; - - Deflater deflater = new Deflater(); - deflater.setDictionary(DICTIONARY); - headerBlockBuffer = new Buffer(); - headerBlockOut = Okio.buffer(new DeflaterSink(headerBlockBuffer, deflater)); - } + private void readGoAway(ByteBufferList source, int flags, int length) throws IOException { + if (length != 8) throw ioException("TYPE_GOAWAY length: %d != 8", length); + int lastGoodStreamId = source.getInt() & 0x7fffffff; + int errorCodeInt = source.getInt(); + ErrorCode errorCode = ErrorCode.fromSpdyGoAway(errorCodeInt); + if (errorCode == null) { + throw ioException("TYPE_GOAWAY unexpected error code: %d", errorCodeInt); + } + handler.goAway(lastGoodStreamId, errorCode, ByteString.EMPTY); + } + + private void readSettings(ByteBufferList source, int flags, int length) throws IOException { + int numberOfEntries = source.getInt(); + if (length != 4 + 8 * numberOfEntries) { + throw ioException("TYPE_SETTINGS length: %d != 4 + 8 * %d", length, numberOfEntries); + } + Settings settings = new Settings(); + for (int i = 0; i < numberOfEntries; i++) { + int w1 = source.getInt(); + int value = source.getInt(); + int idFlags = (w1 & 0xff000000) >>> 24; + int id = w1 & 0xffffff; + settings.set(id, idFlags, value); + } + boolean clearPrevious = (flags & Settings.FLAG_CLEAR_PREVIOUSLY_PERSISTED_SETTINGS) != 0; + handler.settings(clearPrevious, settings); + } + + private static IOException ioException(String message, Object... args) throws IOException { + throw new IOException(String.format(message, args)); + } - @Override public void ackSettings() { - // Do nothing: no ACK for SPDY/3 settings. + @Override + public void close() throws IOException { + } } - @Override - public void pushPromise(int streamId, int promisedStreamId, List<Header> requestHeaders) + /** + * Write spdy/3 frames. + */ + static final class Writer implements FrameWriter { + private final BufferedSink sink; + private final Buffer headerBlockBuffer; + private final BufferedSink headerBlockOut; + private final boolean client; + private boolean closed; + + Writer(BufferedSink sink, boolean client) { + this.sink = sink; + this.client = client; + + Deflater deflater = new Deflater(); + deflater.setDictionary(DICTIONARY); + headerBlockBuffer = new Buffer(); + headerBlockOut = Okio.buffer(new DeflaterSink(headerBlockBuffer, deflater)); + } + + @Override + public void ackSettings() { + // Do nothing: no ACK for SPDY/3 settings. + } + + @Override + public void pushPromise(int streamId, int promisedStreamId, List<Header> requestHeaders) throws IOException { - // Do nothing: no push promise for SPDY/3. - } + // Do nothing: no push promise for SPDY/3. + } - @Override public synchronized void connectionPreface() { - // Do nothing: no connection preface for SPDY/3. - } + @Override + public synchronized void connectionPreface() { + // Do nothing: no connection preface for SPDY/3. + } - @Override public synchronized void flush() throws IOException { - if (closed) throw new IOException("closed"); - sink.flush(); - } + @Override + public synchronized void flush() throws IOException { + if (closed) throw new IOException("closed"); + sink.flush(); + } - @Override public synchronized void synStream(boolean outFinished, boolean inFinished, - int streamId, int associatedStreamId, List<Header> headerBlock) + @Override + public synchronized void synStream(boolean outFinished, boolean inFinished, + int streamId, int associatedStreamId, List<Header> headerBlock) throws IOException { - if (closed) throw new IOException("closed"); - writeNameValueBlockToBuffer(headerBlock); - int length = (int) (10 + headerBlockBuffer.size()); - int type = TYPE_SYN_STREAM; - int flags = (outFinished ? FLAG_FIN : 0) | (inFinished ? FLAG_UNIDIRECTIONAL : 0); - - int unused = 0; - sink.writeInt(0x80000000 | (VERSION & 0x7fff) << 16 | type & 0xffff); - sink.writeInt((flags & 0xff) << 24 | length & 0xffffff); - sink.writeInt(streamId & 0x7fffffff); - sink.writeInt(associatedStreamId & 0x7fffffff); - sink.writeShort((unused & 0x7) << 13 | (unused & 0x1f) << 8 | (unused & 0xff)); - sink.writeAll(headerBlockBuffer); - sink.flush(); - } + if (closed) throw new IOException("closed"); + writeNameValueBlockToBuffer(headerBlock); + int length = (int) (10 + headerBlockBuffer.size()); + int type = TYPE_SYN_STREAM; + int flags = (outFinished ? FLAG_FIN : 0) | (inFinished ? FLAG_UNIDIRECTIONAL : 0); + + int unused = 0; + sink.writeInt(0x80000000 | (VERSION & 0x7fff) << 16 | type & 0xffff); + sink.writeInt((flags & 0xff) << 24 | length & 0xffffff); + sink.writeInt(streamId & 0x7fffffff); + sink.writeInt(associatedStreamId & 0x7fffffff); + sink.writeShort((unused & 0x7) << 13 | (unused & 0x1f) << 8 | (unused & 0xff)); + sink.writeAll(headerBlockBuffer); + sink.flush(); + } - @Override public synchronized void synReply(boolean outFinished, int streamId, - List<Header> headerBlock) throws IOException { - if (closed) throw new IOException("closed"); - writeNameValueBlockToBuffer(headerBlock); - int type = TYPE_SYN_REPLY; - int flags = (outFinished ? FLAG_FIN : 0); - int length = (int) (headerBlockBuffer.size() + 4); - - sink.writeInt(0x80000000 | (VERSION & 0x7fff) << 16 | type & 0xffff); - sink.writeInt((flags & 0xff) << 24 | length & 0xffffff); - sink.writeInt(streamId & 0x7fffffff); - sink.writeAll(headerBlockBuffer); - sink.flush(); - } + @Override + public synchronized void synReply(boolean outFinished, int streamId, + List<Header> headerBlock) throws IOException { + if (closed) throw new IOException("closed"); + writeNameValueBlockToBuffer(headerBlock); + int type = TYPE_SYN_REPLY; + int flags = (outFinished ? FLAG_FIN : 0); + int length = (int) (headerBlockBuffer.size() + 4); + + sink.writeInt(0x80000000 | (VERSION & 0x7fff) << 16 | type & 0xffff); + sink.writeInt((flags & 0xff) << 24 | length & 0xffffff); + sink.writeInt(streamId & 0x7fffffff); + sink.writeAll(headerBlockBuffer); + sink.flush(); + } - @Override public synchronized void headers(int streamId, List<Header> headerBlock) + @Override + public synchronized void headers(int streamId, List<Header> headerBlock) throws IOException { - if (closed) throw new IOException("closed"); - writeNameValueBlockToBuffer(headerBlock); - int flags = 0; - int type = TYPE_HEADERS; - int length = (int) (headerBlockBuffer.size() + 4); - - sink.writeInt(0x80000000 | (VERSION & 0x7fff) << 16 | type & 0xffff); - sink.writeInt((flags & 0xff) << 24 | length & 0xffffff); - sink.writeInt(streamId & 0x7fffffff); - sink.writeAll(headerBlockBuffer); - } + if (closed) throw new IOException("closed"); + writeNameValueBlockToBuffer(headerBlock); + int flags = 0; + int type = TYPE_HEADERS; + int length = (int) (headerBlockBuffer.size() + 4); + + sink.writeInt(0x80000000 | (VERSION & 0x7fff) << 16 | type & 0xffff); + sink.writeInt((flags & 0xff) << 24 | length & 0xffffff); + sink.writeInt(streamId & 0x7fffffff); + sink.writeAll(headerBlockBuffer); + } - @Override public synchronized void rstStream(int streamId, ErrorCode errorCode) + @Override + public synchronized void rstStream(int streamId, ErrorCode errorCode) throws IOException { - if (closed) throw new IOException("closed"); - if (errorCode.spdyRstCode == -1) throw new IllegalArgumentException(); - int flags = 0; - int type = TYPE_RST_STREAM; - int length = 8; - sink.writeInt(0x80000000 | (VERSION & 0x7fff) << 16 | type & 0xffff); - sink.writeInt((flags & 0xff) << 24 | length & 0xffffff); - sink.writeInt(streamId & 0x7fffffff); - sink.writeInt(errorCode.spdyRstCode); - sink.flush(); - } + if (closed) throw new IOException("closed"); + if (errorCode.spdyRstCode == -1) throw new IllegalArgumentException(); + int flags = 0; + int type = TYPE_RST_STREAM; + int length = 8; + sink.writeInt(0x80000000 | (VERSION & 0x7fff) << 16 | type & 0xffff); + sink.writeInt((flags & 0xff) << 24 | length & 0xffffff); + sink.writeInt(streamId & 0x7fffffff); + sink.writeInt(errorCode.spdyRstCode); + sink.flush(); + } - @Override public synchronized void data(boolean outFinished, int streamId, Buffer source) + @Override + public synchronized void data(boolean outFinished, int streamId, Buffer source) throws IOException { - data(outFinished, streamId, source, (int) source.size()); - } + data(outFinished, streamId, source, (int) source.size()); + } - @Override public synchronized void data(boolean outFinished, int streamId, Buffer source, - int byteCount) throws IOException { - int flags = (outFinished ? FLAG_FIN : 0); - sendDataFrame(streamId, flags, source, byteCount); - } + @Override + public synchronized void data(boolean outFinished, int streamId, Buffer source, + int byteCount) throws IOException { + int flags = (outFinished ? FLAG_FIN : 0); + sendDataFrame(streamId, flags, source, byteCount); + } - void sendDataFrame(int streamId, int flags, Buffer buffer, int byteCount) + void sendDataFrame(int streamId, int flags, Buffer buffer, int byteCount) throws IOException { - if (closed) throw new IOException("closed"); - if (byteCount > 0xffffffL) { - throw new IllegalArgumentException("FRAME_TOO_LARGE max size is 16Mib: " + byteCount); - } - sink.writeInt(streamId & 0x7fffffff); - sink.writeInt((flags & 0xff) << 24 | byteCount & 0xffffff); - if (byteCount > 0) { - sink.write(buffer, byteCount); - } - } + if (closed) throw new IOException("closed"); + if (byteCount > 0xffffffL) { + throw new IllegalArgumentException("FRAME_TOO_LARGE max size is 16Mib: " + byteCount); + } + sink.writeInt(streamId & 0x7fffffff); + sink.writeInt((flags & 0xff) << 24 | byteCount & 0xffffff); + if (byteCount > 0) { + sink.write(buffer, byteCount); + } + } - private void writeNameValueBlockToBuffer(List<Header> headerBlock) throws IOException { - if (headerBlockBuffer.size() != 0) throw new IllegalStateException(); - headerBlockOut.writeInt(headerBlock.size()); - for (int i = 0, size = headerBlock.size(); i < size; i++) { - ByteString name = headerBlock.get(i).name; - headerBlockOut.writeInt(name.size()); - headerBlockOut.write(name); - ByteString value = headerBlock.get(i).value; - headerBlockOut.writeInt(value.size()); - headerBlockOut.write(value); - } - headerBlockOut.flush(); - } + private void writeNameValueBlockToBuffer(List<Header> headerBlock) throws IOException { + if (headerBlockBuffer.size() != 0) throw new IllegalStateException(); + headerBlockOut.writeInt(headerBlock.size()); + for (int i = 0, size = headerBlock.size(); i < size; i++) { + ByteString name = headerBlock.get(i).name; + headerBlockOut.writeInt(name.size()); + headerBlockOut.write(name); + ByteString value = headerBlock.get(i).value; + headerBlockOut.writeInt(value.size()); + headerBlockOut.write(value); + } + headerBlockOut.flush(); + } - @Override public synchronized void settings(Settings settings) throws IOException { - if (closed) throw new IOException("closed"); - int type = TYPE_SETTINGS; - int flags = 0; - int size = settings.size(); - int length = 4 + size * 8; - sink.writeInt(0x80000000 | (VERSION & 0x7fff) << 16 | type & 0xffff); - sink.writeInt((flags & 0xff) << 24 | length & 0xffffff); - sink.writeInt(size); - for (int i = 0; i <= Settings.COUNT; i++) { - if (!settings.isSet(i)) continue; - int settingsFlags = settings.flags(i); - sink.writeInt((settingsFlags & 0xff) << 24 | (i & 0xffffff)); - sink.writeInt(settings.get(i)); - } - sink.flush(); - } + @Override + public synchronized void settings(Settings settings) throws IOException { + if (closed) throw new IOException("closed"); + int type = TYPE_SETTINGS; + int flags = 0; + int size = settings.size(); + int length = 4 + size * 8; + sink.writeInt(0x80000000 | (VERSION & 0x7fff) << 16 | type & 0xffff); + sink.writeInt((flags & 0xff) << 24 | length & 0xffffff); + sink.writeInt(size); + for (int i = 0; i <= Settings.COUNT; i++) { + if (!settings.isSet(i)) continue; + int settingsFlags = settings.flags(i); + sink.writeInt((settingsFlags & 0xff) << 24 | (i & 0xffffff)); + sink.writeInt(settings.get(i)); + } + sink.flush(); + } - @Override public synchronized void ping(boolean reply, int payload1, int payload2) + @Override + public synchronized void ping(boolean reply, int payload1, int payload2) throws IOException { - if (closed) throw new IOException("closed"); - boolean payloadIsReply = client != ((payload1 & 1) == 1); - if (reply != payloadIsReply) throw new IllegalArgumentException("payload != reply"); - int type = TYPE_PING; - int flags = 0; - int length = 4; - sink.writeInt(0x80000000 | (VERSION & 0x7fff) << 16 | type & 0xffff); - sink.writeInt((flags & 0xff) << 24 | length & 0xffffff); - sink.writeInt(payload1); - sink.flush(); - } + if (closed) throw new IOException("closed"); + boolean payloadIsReply = client != ((payload1 & 1) == 1); + if (reply != payloadIsReply) throw new IllegalArgumentException("payload != reply"); + int type = TYPE_PING; + int flags = 0; + int length = 4; + sink.writeInt(0x80000000 | (VERSION & 0x7fff) << 16 | type & 0xffff); + sink.writeInt((flags & 0xff) << 24 | length & 0xffffff); + sink.writeInt(payload1); + sink.flush(); + } - @Override public synchronized void goAway(int lastGoodStreamId, ErrorCode errorCode, - byte[] ignored) throws IOException { - if (closed) throw new IOException("closed"); - if (errorCode.spdyGoAwayCode == -1) { - throw new IllegalArgumentException("errorCode.spdyGoAwayCode == -1"); - } - int type = TYPE_GOAWAY; - int flags = 0; - int length = 8; - sink.writeInt(0x80000000 | (VERSION & 0x7fff) << 16 | type & 0xffff); - sink.writeInt((flags & 0xff) << 24 | length & 0xffffff); - sink.writeInt(lastGoodStreamId); - sink.writeInt(errorCode.spdyGoAwayCode); - sink.flush(); - } + @Override + public synchronized void goAway(int lastGoodStreamId, ErrorCode errorCode, + byte[] ignored) throws IOException { + if (closed) throw new IOException("closed"); + if (errorCode.spdyGoAwayCode == -1) { + throw new IllegalArgumentException("errorCode.spdyGoAwayCode == -1"); + } + int type = TYPE_GOAWAY; + int flags = 0; + int length = 8; + sink.writeInt(0x80000000 | (VERSION & 0x7fff) << 16 | type & 0xffff); + sink.writeInt((flags & 0xff) << 24 | length & 0xffffff); + sink.writeInt(lastGoodStreamId); + sink.writeInt(errorCode.spdyGoAwayCode); + sink.flush(); + } - @Override public synchronized void windowUpdate(int streamId, long increment) + @Override + public synchronized void windowUpdate(int streamId, long increment) throws IOException { - if (closed) throw new IOException("closed"); - if (increment == 0 || increment > 0x7fffffffL) { - throw new IllegalArgumentException( - "windowSizeIncrement must be between 1 and 0x7fffffff: " + increment); - } - int type = TYPE_WINDOW_UPDATE; - int flags = 0; - int length = 8; - sink.writeInt(0x80000000 | (VERSION & 0x7fff) << 16 | type & 0xffff); - sink.writeInt((flags & 0xff) << 24 | length & 0xffffff); - sink.writeInt(streamId); - sink.writeInt((int) increment); - sink.flush(); - } + if (closed) throw new IOException("closed"); + if (increment == 0 || increment > 0x7fffffffL) { + throw new IllegalArgumentException( + "windowSizeIncrement must be between 1 and 0x7fffffff: " + increment); + } + int type = TYPE_WINDOW_UPDATE; + int flags = 0; + int length = 8; + sink.writeInt(0x80000000 | (VERSION & 0x7fff) << 16 | type & 0xffff); + sink.writeInt((flags & 0xff) << 24 | length & 0xffffff); + sink.writeInt(streamId); + sink.writeInt((int) increment); + sink.flush(); + } - @Override public synchronized void close() throws IOException { - closed = true; - Util.closeAll(sink, headerBlockOut); + @Override + public synchronized void close() throws IOException { + closed = true; + Util.closeAll(sink, headerBlockOut); + } } - } } diff --git a/AndroidAsync/src/com/koushikdutta/async/http/spdy/okhttp/internal/spdy/Variant.java b/AndroidAsync/src/com/koushikdutta/async/http/spdy/okhttp/internal/spdy/Variant.java index 56994d1..eeb7c15 100644 --- a/AndroidAsync/src/com/koushikdutta/async/http/spdy/okhttp/internal/spdy/Variant.java +++ b/AndroidAsync/src/com/koushikdutta/async/http/spdy/okhttp/internal/spdy/Variant.java @@ -16,9 +16,9 @@ package com.koushikdutta.async.http.spdy.okhttp.internal.spdy; +import com.koushikdutta.async.DataEmitter; import com.koushikdutta.async.http.Protocol; import com.koushikdutta.async.http.spdy.okio.BufferedSink; -import com.koushikdutta.async.http.spdy.okio.BufferedSource; /** A version and dialect of the framed socket protocol. */ public interface Variant { @@ -29,7 +29,7 @@ public interface Variant { /** * @param client true if this is the HTTP client's reader, reading frames from a server. */ - FrameReader newReader(BufferedSource source, boolean client); + FrameReader newReader(DataEmitter source, FrameReader.Handler handler, boolean client); /** * @param client true if this is the HTTP client's writer, writing frames to a server. |