diff options
author | Koushik Dutta <koushd@gmail.com> | 2014-07-24 19:42:25 -0700 |
---|---|---|
committer | Koushik Dutta <koushd@gmail.com> | 2014-07-24 19:42:25 -0700 |
commit | 7ce289952764e8a8a7a85c4f43658e06b369ac98 (patch) | |
tree | 8ad6999bda3b109579e08ac57bae3d6f2799d8be /AndroidAsync | |
parent | c3618aa1598ac3de0d39f199274812c5782798de (diff) | |
download | AndroidAsync-7ce289952764e8a8a7a85c4f43658e06b369ac98.tar.gz AndroidAsync-7ce289952764e8a8a7a85c4f43658e06b369ac98.tar.bz2 AndroidAsync-7ce289952764e8a8a7a85c4f43658e06b369ac98.zip |
decouple the http transport from the client. prep for spdy.
Diffstat (limited to 'AndroidAsync')
8 files changed, 177 insertions, 82 deletions
diff --git a/AndroidAsync/src/com/koushikdutta/async/http/AsyncHttpClient.java b/AndroidAsync/src/com/koushikdutta/async/http/AsyncHttpClient.java index 313a9fa..fb13811 100644 --- a/AndroidAsync/src/com/koushikdutta/async/http/AsyncHttpClient.java +++ b/AndroidAsync/src/com/koushikdutta/async/http/AsyncHttpClient.java @@ -66,11 +66,13 @@ public class AsyncHttpClient { AsyncSSLSocketMiddleware sslSocketMiddleware; AsyncSocketMiddleware socketMiddleware; + HttpTransportMiddleware httpTransportMiddleware; AsyncServer mServer; public AsyncHttpClient(AsyncServer server) { mServer = server; insertMiddleware(socketMiddleware = new AsyncSocketMiddleware(this)); insertMiddleware(sslSocketMiddleware = new AsyncSSLSocketMiddleware(this)); + insertMiddleware(httpTransportMiddleware = new HttpTransportMiddleware()); } @@ -274,7 +276,7 @@ public class AsyncHttpClient { if (cancel.isCancelled()) return; // 5) after request is sent, set a header timeout - if (cancel.timeoutRunnable != null && data.headers == null) { + if (cancel.timeoutRunnable != null && mHeaders == null) { mServer.removeAllCallbacks(cancel.scheduled); cancel.scheduled = mServer.postDelayed(cancel.timeoutRunnable, getTimeoutRemaining(request)); } @@ -282,14 +284,12 @@ public class AsyncHttpClient { @Override public void setDataEmitter(DataEmitter emitter) { - data.response = this; data.bodyEmitter = emitter; synchronized (mMiddleware) { for (AsyncHttpClientMiddleware middleware: mMiddleware) { middleware.onBodyDecoder(data); } } - mHeaders = data.headers; super.setDataEmitter(data.bodyEmitter); @@ -333,31 +333,25 @@ public class AsyncHttpClient { } protected void onHeadersReceived() { - try { - if (cancel.isCancelled()) - return; + super.onHeadersReceived(); + if (cancel.isCancelled()) + return; - // 7) on headers, cancel timeout - if (cancel.timeoutRunnable != null) - mServer.removeAllCallbacks(cancel.scheduled); + // 7) on headers, cancel timeout + if (cancel.timeoutRunnable != null) + mServer.removeAllCallbacks(cancel.scheduled); - // allow the middleware to massage the headers before the body is decoded - request.logv("Received headers:\n" + toString()); + // allow the middleware to massage the headers before the body is decoded + request.logv("Received headers:\n" + toString()); - data.headers = mHeaders; - synchronized (mMiddleware) { - for (AsyncHttpClientMiddleware middleware: mMiddleware) { - middleware.onHeadersReceived(data); - } + synchronized (mMiddleware) { + for (AsyncHttpClientMiddleware middleware: mMiddleware) { + middleware.onHeadersReceived(data); } - mHeaders = data.headers; - - // drop through, and setDataEmitter will be called for the body decoder. - // headers will be further massaged in there. - } - catch (Exception ex) { - reportConnectedCompleted(cancel, ex, null, request, callback); } + + // drop through, and setDataEmitter will be called for the body decoder. + // headers will be further massaged in there. } @Override @@ -390,7 +384,6 @@ public class AsyncHttpClient { } } - @Override public AsyncSocket detachSocket() { request.logd("Detaching socket"); @@ -406,7 +399,24 @@ public class AsyncHttpClient { } }; + data.sendHeadersCallback = new CompletedCallback() { + @Override + public void onCompleted(Exception ex) { + if (ex != null) + ret.report(ex); + else + ret.onHeadersSent(); + } + }; + data.response = ret; ret.setSocket(socket); + + synchronized (mMiddleware) { + for (AsyncHttpClientMiddleware middleware: mMiddleware) { + if (middleware.sendHeaders(data)) + break; + } + } } }; diff --git a/AndroidAsync/src/com/koushikdutta/async/http/AsyncHttpClientMiddleware.java b/AndroidAsync/src/com/koushikdutta/async/http/AsyncHttpClientMiddleware.java index c706e53..b198199 100644 --- a/AndroidAsync/src/com/koushikdutta/async/http/AsyncHttpClientMiddleware.java +++ b/AndroidAsync/src/com/koushikdutta/async/http/AsyncHttpClientMiddleware.java @@ -2,6 +2,7 @@ package com.koushikdutta.async.http; import com.koushikdutta.async.AsyncSocket; import com.koushikdutta.async.DataEmitter; +import com.koushikdutta.async.DataSink; import com.koushikdutta.async.callback.CompletedCallback; import com.koushikdutta.async.callback.ConnectCallback; import com.koushikdutta.async.future.Cancellable; @@ -12,6 +13,19 @@ import com.koushikdutta.async.util.UntypedHashtable; * inspect, manipulate, and handle http requests. */ public interface AsyncHttpClientMiddleware { + public interface ResponseHead { + public String protocol(); + public String message(); + public int code(); + public ResponseHead protocol(String protocol); + public ResponseHead message(String message); + public ResponseHead code(int code); + public Headers headers(); + public ResponseHead headers(Headers headers); + public DataSink sink(); + public ResponseHead sink(DataSink sink); + } + public static class OnRequestData { public UntypedHashtable state = new UntypedHashtable(); public AsyncHttpRequest request; @@ -25,15 +39,15 @@ public interface AsyncHttpClientMiddleware { public static class SendHeaderData extends GetSocketData { public AsyncSocket socket; + public ResponseHead response; public CompletedCallback sendHeadersCallback; } public static class OnHeadersReceivedData extends SendHeaderData { - public Headers headers; +// public Headers headers; } public static class OnBodyData extends OnHeadersReceivedData { - public AsyncHttpResponse response; public DataEmitter bodyEmitter; } diff --git a/AndroidAsync/src/com/koushikdutta/async/http/AsyncHttpResponse.java b/AndroidAsync/src/com/koushikdutta/async/http/AsyncHttpResponse.java index e771395..fa3b710 100644 --- a/AndroidAsync/src/com/koushikdutta/async/http/AsyncHttpResponse.java +++ b/AndroidAsync/src/com/koushikdutta/async/http/AsyncHttpResponse.java @@ -8,9 +8,6 @@ public interface AsyncHttpResponse extends DataEmitter { public String protocol(); public String message(); public int code(); - public AsyncHttpResponse protocol(String protocol); - public AsyncHttpResponse message(String message); - public AsyncHttpResponse code(int code); public Headers headers(); public AsyncSocket detachSocket(); public AsyncHttpRequest getRequest(); diff --git a/AndroidAsync/src/com/koushikdutta/async/http/AsyncHttpResponseImpl.java b/AndroidAsync/src/com/koushikdutta/async/http/AsyncHttpResponseImpl.java index fca0473..fa2bb11 100644 --- a/AndroidAsync/src/com/koushikdutta/async/http/AsyncHttpResponseImpl.java +++ b/AndroidAsync/src/com/koushikdutta/async/http/AsyncHttpResponseImpl.java @@ -18,7 +18,7 @@ import com.koushikdutta.async.http.filter.ChunkedOutputFilter; import java.io.IOException; import java.nio.charset.Charset; -abstract class AsyncHttpResponseImpl extends FilteredDataEmitter implements AsyncSocket, AsyncHttpResponse { +abstract class AsyncHttpResponseImpl extends FilteredDataEmitter implements AsyncSocket, AsyncHttpResponse, AsyncHttpClientMiddleware.ResponseHead { private AsyncHttpRequestBody mWriter; public AsyncSocket getSocket() { @@ -32,59 +32,31 @@ abstract class AsyncHttpResponseImpl extends FilteredDataEmitter implements Asyn void setSocket(AsyncSocket exchange) { mSocket = exchange; - if (mSocket == null) return; - mWriter = mRequest.getBody(); - if (mWriter != null) { - if (mRequest.getHeaders().get("Content-Type") == null) - mRequest.getHeaders().set("Content-Type", mWriter.getContentType()); - if (mWriter.length() >= 0) { - mRequest.getHeaders().set("Content-Length", String.valueOf(mWriter.length())); - mSink = mSocket; - } - else { - mRequest.getHeaders().set("Transfer-Encoding", "Chunked"); - mSink = new ChunkedOutputFilter(mSocket); - } - } - else { - mSink = mSocket; - } - mSocket.setEndCallback(mReporter); - mSocket.setClosedCallback(new CompletedCallback() { - @Override - public void onCompleted(Exception ex) { - // TODO: do we care? throw if socket is still writing or something? - } - }); - String rl = mRequest.getRequestLine().toString(); - String rs = mRequest.getHeaders().toPrefixString(rl); - mRequest.logv("\n" + rs); - Util.writeAll(exchange, rs.getBytes(), new CompletedCallback() { - @Override - public void onCompleted(Exception ex) { - if (mWriter != null) { - mWriter.write(mRequest, AsyncHttpResponseImpl.this, new CompletedCallback() { - @Override - public void onCompleted(Exception ex) { - onRequestCompleted(ex); - } - }); - } else { - onRequestCompleted(null); - } - } - }); + mWriter = mRequest.getBody(); LineEmitter liner = new LineEmitter(); exchange.setDataCallback(liner); liner.setLineCallback(mHeaderCallback); } + protected void onHeadersSent() { + if (mWriter != null) { + mWriter.write(mRequest, AsyncHttpResponseImpl.this, new CompletedCallback() { + @Override + public void onCompleted(Exception ex) { + onRequestCompleted(ex); + } + }); + } else { + onRequestCompleted(null); + } + } + protected void onRequestCompleted(Exception ex) { } @@ -100,7 +72,8 @@ abstract class AsyncHttpResponseImpl extends FilteredDataEmitter implements Asyn } }; - protected abstract void onHeadersReceived(); + protected void onHeadersReceived() { + } StringCallback mHeaderCallback = new StringCallback() { private Headers mRawHeaders = new Headers(); @@ -179,6 +152,12 @@ abstract class AsyncHttpResponseImpl extends FilteredDataEmitter implements Asyn return mHeaders; } + @Override + public AsyncHttpClientMiddleware.ResponseHead headers(Headers headers) { + mHeaders = headers; + return this; + } + int code; @Override public int code() { @@ -186,19 +165,19 @@ abstract class AsyncHttpResponseImpl extends FilteredDataEmitter implements Asyn } @Override - public AsyncHttpResponse code(int code) { + public AsyncHttpClientMiddleware.ResponseHead code(int code) { this.code = code; return this; } @Override - public AsyncHttpResponse protocol(String protocol) { + public AsyncHttpClientMiddleware.ResponseHead protocol(String protocol) { this.protocol = protocol; return this; } @Override - public AsyncHttpResponse message(String message) { + public AsyncHttpClientMiddleware.ResponseHead message(String message) { this.message = message; return this; } @@ -232,6 +211,18 @@ abstract class AsyncHttpResponseImpl extends FilteredDataEmitter implements Asyn } DataSink mSink; + + @Override + public DataSink sink() { + return mSink; + } + + @Override + public AsyncHttpClientMiddleware.ResponseHead sink(DataSink sink) { + mSink = sink; + return this; + } + @Override public void write(ByteBufferList bb) { assertContent(); diff --git a/AndroidAsync/src/com/koushikdutta/async/http/AsyncSocketMiddleware.java b/AndroidAsync/src/com/koushikdutta/async/http/AsyncSocketMiddleware.java index 37ffe5c..5c11684 100644 --- a/AndroidAsync/src/com/koushikdutta/async/http/AsyncSocketMiddleware.java +++ b/AndroidAsync/src/com/koushikdutta/async/http/AsyncSocketMiddleware.java @@ -358,7 +358,7 @@ public class AsyncSocketMiddleware extends SimpleMiddleware { data.socket.close(); return; } - if (!HttpUtil.isKeepAlive(data.response.protocol(), data.headers) + if (!HttpUtil.isKeepAlive(data.response.protocol(), data.response.headers()) || !HttpUtil.isKeepAlive(Protocol.HTTP_1_1, data.request.getHeaders())) { data.request.logv("closing out socket (not keep alive)"); data.socket.close(); diff --git a/AndroidAsync/src/com/koushikdutta/async/http/HttpTransportMiddleware.java b/AndroidAsync/src/com/koushikdutta/async/http/HttpTransportMiddleware.java new file mode 100644 index 0000000..1ca6b5a --- /dev/null +++ b/AndroidAsync/src/com/koushikdutta/async/http/HttpTransportMiddleware.java @@ -0,0 +1,81 @@ +package com.koushikdutta.async.http; + +import com.koushikdutta.async.Util; +import com.koushikdutta.async.callback.CompletedCallback; +import com.koushikdutta.async.http.body.AsyncHttpRequestBody; +import com.koushikdutta.async.http.filter.ChunkedOutputFilter; + +/** + * Created by koush on 7/24/14. + */ +public class HttpTransportMiddleware extends SimpleMiddleware { + + @Override + public boolean sendHeaders(final SendHeaderData data) { + Protocol p = Protocol.get(data.protocol); + if (p != null && p != Protocol.HTTP_1_0 && p != Protocol.HTTP_1_1) + return super.sendHeaders(data); + + AsyncHttpRequest request = data.request; + AsyncHttpRequestBody requestBody = data.request.getBody(); + + if (requestBody != null) { + if (request.getHeaders().get("Content-Type") == null) + request.getHeaders().set("Content-Type", requestBody.getContentType()); + if (requestBody.length() >= 0) { + request.getHeaders().set("Content-Length", String.valueOf(requestBody.length())); + data.response.sink(data.socket); + } else { + request.getHeaders().set("Transfer-Encoding", "Chunked"); + data.response.sink(new ChunkedOutputFilter(data.socket)); + } + } + + String rl = request.getRequestLine().toString(); + String rs = request.getHeaders().toPrefixString(rl); + request.logv("\n" + rs); + + Util.writeAll(data.socket, rs.getBytes(), new CompletedCallback() { + @Override + public void onCompleted(Exception ex) { + data.sendHeadersCallback.onCompleted(ex); + } + }); + +// LineEmitter.StringCallback headerCallback = new LineEmitter.StringCallback() { +// Headers mRawHeaders = new Headers(); +// String statusLine; +// +// @Override +// public void onStringAvailable(String s) { +// try { +// if (statusLine == null) { +// statusLine = s; +// } +// else if (!"\r".equals(s)) { +// mRawHeaders.addLine(s); +// } +// else { +// String[] parts = statusLine.split(" ", 3); +// if (parts.length != 3) +// throw new Exception(new IOException("Not HTTP")); +// +// data.response.headers(mRawHeaders); +// data.response.protocol(parts[0]); +// data.response.code(Integer.parseInt(parts[1])); +// data.response.message(parts[2]); +// data.sendHeadersCallback.onCompleted(null); +// } +// } +// catch (Exception ex) { +// data.sendHeadersCallback.onCompleted(ex); +// } +// } +// }; +// +// LineEmitter liner = new LineEmitter(); +// data.socket.setDataCallback(liner); +// liner.setLineCallback(headerCallback); + return true; + } +} diff --git a/AndroidAsync/src/com/koushikdutta/async/http/Protocol.java b/AndroidAsync/src/com/koushikdutta/async/http/Protocol.java index 8e5a46c..4613428 100644 --- a/AndroidAsync/src/com/koushikdutta/async/http/Protocol.java +++ b/AndroidAsync/src/com/koushikdutta/async/http/Protocol.java @@ -75,6 +75,8 @@ public enum Protocol { * Returns the protocol identified by {@code protocol}. */ public static Protocol get(String protocol) { + if (protocol == null) + return null; return protocols.get(protocol.toLowerCase()); } diff --git a/AndroidAsync/src/com/koushikdutta/async/http/cache/ResponseCacheMiddleware.java b/AndroidAsync/src/com/koushikdutta/async/http/cache/ResponseCacheMiddleware.java index 447c81b..57e3895 100644 --- a/AndroidAsync/src/com/koushikdutta/async/http/cache/ResponseCacheMiddleware.java +++ b/AndroidAsync/src/com/koushikdutta/async/http/cache/ResponseCacheMiddleware.java @@ -216,25 +216,25 @@ public class ResponseCacheMiddleware extends SimpleMiddleware { public void onBodyDecoder(OnBodyData data) { CachedSocket cached = com.koushikdutta.async.Util.getWrappedSocket(data.socket, CachedSocket.class); if (cached != null) { - data.headers.set(SERVED_FROM, CACHE); + data.response.headers().set(SERVED_FROM, CACHE); return; } CacheData cacheData = data.state.get("cache-data"); - RawHeaders rh = RawHeaders.fromMultimap(data.headers.getMultiMap()); + RawHeaders rh = RawHeaders.fromMultimap(data.response.headers().getMultiMap()); + rh.removeAll("Content-Length"); rh.setStatusLine(String.format("%s %s %s", data.response.protocol(), data.response.code(), data.response.message())); ResponseHeaders networkResponse = new ResponseHeaders(data.request.getUri(), rh); data.state.put("response-headers", networkResponse); if (cacheData != null) { if (cacheData.cachedResponseHeaders.validate(networkResponse)) { data.request.logi("Serving response from conditional cache"); - data.headers.removeAll("Content-Length"); ResponseHeaders combined = cacheData.cachedResponseHeaders.combine(networkResponse); - data.headers = new Headers(combined.getHeaders().toMultimap()); + data.response.headers(new Headers(combined.getHeaders().toMultimap())); data.response.code(combined.getHeaders().getResponseCode()); data.response.message(combined.getHeaders().getResponseMessage()); - data.headers.set(SERVED_FROM, CONDITIONAL_CACHE); + data.response.headers().set(SERVED_FROM, CONDITIONAL_CACHE); conditionalCacheHitCount++; CachedBodyEmitter bodySpewer = new CachedBodyEmitter(cacheData.candidate, cacheData.contentLength); |