diff options
7 files changed, 149 insertions, 127 deletions
diff --git a/AndroidAsync/src/com/koushikdutta/async/FilteredDataCallback.java b/AndroidAsync/src/com/koushikdutta/async/FilteredDataCallback.java index 6ad3154..a450e84 100644 --- a/AndroidAsync/src/com/koushikdutta/async/FilteredDataCallback.java +++ b/AndroidAsync/src/com/koushikdutta/async/FilteredDataCallback.java @@ -43,6 +43,7 @@ public class FilteredDataCallback implements DataEmitter, DataCallback, Complete @Override public void onDataAvailable(DataEmitter emitter, ByteBufferList bb) { + Assert.assertNull(pending); Assert.assertNotNull(mDataCallback); Util.emitAllData(this, bb); if (bb.remaining() > 0) diff --git a/AndroidAsync/src/com/koushikdutta/async/http/AsyncHttpClient.java b/AndroidAsync/src/com/koushikdutta/async/http/AsyncHttpClient.java index f36a91d..5dc1cbf 100644 --- a/AndroidAsync/src/com/koushikdutta/async/http/AsyncHttpClient.java +++ b/AndroidAsync/src/com/koushikdutta/async/http/AsyncHttpClient.java @@ -22,25 +22,22 @@ import com.koushikdutta.async.AsyncServer; import com.koushikdutta.async.AsyncSocket; import com.koushikdutta.async.ByteBufferList; import com.koushikdutta.async.DataEmitter; -import com.koushikdutta.async.NullDataCallback; import com.koushikdutta.async.callback.ClosedCallback; import com.koushikdutta.async.callback.CompletedCallback; import com.koushikdutta.async.callback.ConnectCallback; import com.koushikdutta.async.callback.DataCallback; import com.koushikdutta.async.callback.RequestCallback; import com.koushikdutta.async.http.libcore.RawHeaders; +import com.koushikdutta.async.http.server.WebSocket; +import com.koushikdutta.async.http.server.WebSocketImpl; import com.koushikdutta.async.stream.OutputStreamDataCallback; public class AsyncHttpClient { private static abstract class InternalConnectCallback implements ConnectCallback { - AsyncSocket exchange; + boolean reused = false; } - - private static class SocketExchange { - AsyncSocket socket; - AsyncSocket exchange; - } - private static Hashtable<String, HashSet<SocketExchange>> mSockets = new Hashtable<String, HashSet<SocketExchange>>(); + + private static Hashtable<String, HashSet<AsyncSocket>> mSockets = new Hashtable<String, HashSet<AsyncSocket>>(); public static void execute(final AsyncHttpRequest request, final HttpConnectCallback callback) { execute(AsyncServer.getDefault(), request, callback); @@ -72,7 +69,7 @@ public class AsyncHttpClient { final InternalConnectCallback socketConnected = new InternalConnectCallback() { @Override - public void onConnectCompleted(Exception ex, final AsyncSocket socket) { + public void onConnectCompleted(Exception ex, AsyncSocket socket) { if (ex != null) { callback.onConnectCompleted(ex, null); return; @@ -80,8 +77,6 @@ public class AsyncHttpClient { final AsyncHttpResponseImpl ret = new AsyncHttpResponseImpl(request) { boolean keepalive = false; protected void onHeadersReceived() { - super.onHeadersReceived(); - try { RawHeaders headers = getRawHeaders(); @@ -93,7 +88,7 @@ public class AsyncHttpClient { AsyncHttpRequest newReq = new AsyncHttpRequest(new URI(headers.get("Location")), request.getMethod()); execute(server, newReq, callback, redirectCount + 1); - setDataCallback(new NullDataCallback()); +// setDataCallback(new NullDataCallback()); return; } @@ -104,63 +99,73 @@ public class AsyncHttpClient { } }; - protected void onCompleted(Exception ex) { - super.onCompleted(ex); + @Override + protected void report(Exception ex) { + if (mSocket == null) + return; + super.report(ex); if (!keepalive) { - socket.close(); + mSocket.close(); } else { - HashSet<SocketExchange> sockets = mSockets.get(lookup); + HashSet<AsyncSocket> sockets = mSockets.get(lookup); if (sockets == null) { - sockets = new HashSet<SocketExchange>(); + sockets = new HashSet<AsyncSocket>(); mSockets.put(lookup, sockets); } - final HashSet<SocketExchange> ss = sockets; + final HashSet<AsyncSocket> ss = sockets; synchronized (sockets) { - SocketExchange se = new SocketExchange(); - se.socket = socket; - se.exchange = exchange; - sockets.add(se); - socket.setClosedCallback(new ClosedCallback() { + sockets.add(mSocket); + mSocket.setClosedCallback(new ClosedCallback() { @Override public void onClosed() { synchronized (ss) { - ss.remove(socket); + ss.remove(mSocket); } - socket.setClosedCallback(null); + mSocket.setClosedCallback(null); } }); } } - }; + } + + + @Override + public AsyncSocket detachSocket() { + mSocket.setWriteableCallback(null); + mSocket.setClosedCallback(null); + mSocket.setCompletedCallback(null); + mSocket.setDataCallback(null); + AsyncSocket socket = mSocket; + mSocket = null; + return socket; + } }; // socket and exchange are the same for regular http // but different for https (ssl) // the exchange will be a wrapper around socket that does // ssl translation. - if (exchange == null) { - exchange = socket; + if (!reused) { if (request.getUri().getScheme().equals("https")) { - exchange = new AsyncSSLSocket(socket, uri.getHost(), finalPort); + socket = new AsyncSSLSocket(socket, uri.getHost(), finalPort); } } - ret.setSocket(socket, exchange); + ret.setSocket(socket); } }; - HashSet<SocketExchange> sockets = mSockets.get(lookup); + HashSet<AsyncSocket> sockets = mSockets.get(lookup); if (sockets != null) { synchronized (sockets) { - for (final SocketExchange se: sockets) { - final AsyncSocket socket = se.socket; + for (final AsyncSocket socket: sockets) { if (socket.isConnected()) { - sockets.remove(se); + sockets.remove(socket); socket.setClosedCallback(null); server.post(new Runnable() { @Override public void run() { - socketConnected.exchange = se.exchange; + socketConnected.reused = true; socketConnected.onConnectCompleted(null, socket); } }); @@ -299,23 +304,23 @@ public class AsyncHttpClient { } } - public static class WebSocketCallback implements CompletedCallback { - @Override - public void onCompleted(Exception ex) { - } + public static interface WebSocketCallback { + public void onCompleted(Exception ex, WebSocket webSocket); } public static void websocket(String uri, final WebSocketCallback callback) { try { - execute(new AsyncHttpGet(uri), new HttpConnectCallback() { + final AsyncHttpGet get = new AsyncHttpGet(uri); + WebSocketImpl.addWebSocketUpgradeHeaders(get.getHeaders().getHeaders()); + execute(get, new HttpConnectCallback() { @Override public void onConnectCompleted(Exception ex, AsyncHttpResponse response) { -// response.get + callback.onCompleted(null, WebSocketImpl.finishHandshake(get.getHeaders().getHeaders(), response)); } }); } catch (URISyntaxException e) { - callback.onCompleted(e); + callback.onCompleted(e, null); } } diff --git a/AndroidAsync/src/com/koushikdutta/async/http/AsyncHttpResponse.java b/AndroidAsync/src/com/koushikdutta/async/http/AsyncHttpResponse.java index 7761dcd..24b515d 100644 --- a/AndroidAsync/src/com/koushikdutta/async/http/AsyncHttpResponse.java +++ b/AndroidAsync/src/com/koushikdutta/async/http/AsyncHttpResponse.java @@ -1,5 +1,6 @@ package com.koushikdutta.async.http; +import com.koushikdutta.async.AsyncSocket; import com.koushikdutta.async.CompletedEmitter; import com.koushikdutta.async.DataEmitter; import com.koushikdutta.async.DataSink; @@ -11,4 +12,5 @@ public interface AsyncHttpResponse extends DataEmitter, DataSink, CompletedEmitt public CompletedCallback getCompletedCallback(); public ResponseHeaders getHeaders(); public void end(); + public AsyncSocket detachSocket(); } diff --git a/AndroidAsync/src/com/koushikdutta/async/http/AsyncHttpResponseImpl.java b/AndroidAsync/src/com/koushikdutta/async/http/AsyncHttpResponseImpl.java index ed5c5ac..0372af8 100644 --- a/AndroidAsync/src/com/koushikdutta/async/http/AsyncHttpResponseImpl.java +++ b/AndroidAsync/src/com/koushikdutta/async/http/AsyncHttpResponseImpl.java @@ -19,16 +19,15 @@ import com.koushikdutta.async.http.filter.ChunkedOutputFilter; import com.koushikdutta.async.http.libcore.RawHeaders; import com.koushikdutta.async.http.libcore.ResponseHeaders; -public class AsyncHttpResponseImpl extends FilteredDataCallback implements AsyncHttpResponse { +abstract class AsyncHttpResponseImpl extends FilteredDataCallback implements AsyncHttpResponse { private RawHeaders mRawHeaders = new RawHeaders(); RawHeaders getRawHeaders() { return mRawHeaders; } private AsyncHttpRequestBody mWriter; - void setSocket(AsyncSocket socket, AsyncSocket exchange) { - mSocket = socket; - mExchange = exchange; + void setSocket(AsyncSocket exchange) { + mSocket = exchange; mWriter = mRequest.getBody(); if (mWriter != null) { @@ -66,17 +65,7 @@ public class AsyncHttpResponseImpl extends FilteredDataCallback implements Async } }; - protected void onHeadersReceived() { - mHeaders = new ResponseHeaders(mRequest.getUri(), mRawHeaders); - - if (mHeaders.getContentLength() == 0) { - report(null); - return; - } - - DataCallback callback = Util.getBodyDecoder(this, mRawHeaders, mReporter); - mExchange.setDataCallback(callback); - } + protected abstract void onHeadersReceived(); StringCallback mHeaderCallback = new StringCallback() { @Override @@ -89,7 +78,13 @@ public class AsyncHttpResponseImpl extends FilteredDataCallback implements Async mRawHeaders.addLine(s); } else { + mHeaders = new ResponseHeaders(mRequest.getUri(), mRawHeaders); onHeadersReceived(); + // socket may get detached after headers (websocket) + if (mSocket == null) + return; + DataCallback callback = Util.getBodyDecoder(AsyncHttpResponseImpl.this, mRawHeaders, mReporter); + mSocket.setDataCallback(callback); } } catch (Exception ex) { @@ -101,48 +96,45 @@ public class AsyncHttpResponseImpl extends FilteredDataCallback implements Async @Override protected void report(Exception e) { super.report(e); - onCompleted(e); - } - - private AsyncSocket mSocket; - private AsyncHttpRequest mRequest; - private AsyncSocket mExchange; - private ResponseHeaders mHeaders; - public AsyncHttpResponseImpl(AsyncHttpRequest request) { - mRequest = request; - } - boolean mCompleted = false; - protected void onCompleted(Exception ex) { // DISCONNECT. EVERYTHING. // should not get any data after this point... // if so, eat it and disconnect. - mExchange.setDataCallback(new NullDataCallback() { + mSocket.setDataCallback(new NullDataCallback() { @Override public void onDataAvailable(DataEmitter emitter, ByteBufferList bb) { super.onDataAvailable(emitter, bb); mSocket.close(); } }); - mExchange.setWriteableCallback(null); + mSocket.setWriteableCallback(null); mSocket.setClosedCallback(null); mSocket.setCompletedCallback(null); mCompleted = true; // System.out.println("closing up shop"); - if (mCompletedCallback != null) - mCompletedCallback.onCompleted(ex); +// if (mCompletedCallback != null) +// mCompletedCallback.onCompleted(e); } - CompletedCallback mCompletedCallback; - @Override - public void setCompletedCallback(CompletedCallback handler) { - mCompletedCallback = handler; + private AsyncHttpRequest mRequest; + AsyncSocket mSocket; + private ResponseHeaders mHeaders; + public AsyncHttpResponseImpl(AsyncHttpRequest request) { + mRequest = request; } - @Override - public CompletedCallback getCompletedCallback() { - return mCompletedCallback; - } + boolean mCompleted = false; +// +// CompletedCallback mCompletedCallback; +// @Override +// public void setCompletedCallback(CompletedCallback handler) { +// mCompletedCallback = handler; +// } +// +// @Override +// public CompletedCallback getCompletedCallback() { +// return mCompletedCallback; +// } @Override public ResponseHeaders getHeaders() { diff --git a/AndroidAsync/src/com/koushikdutta/async/http/Util.java b/AndroidAsync/src/com/koushikdutta/async/http/Util.java index 01c8a05..89af1d2 100644 --- a/AndroidAsync/src/com/koushikdutta/async/http/Util.java +++ b/AndroidAsync/src/com/koushikdutta/async/http/Util.java @@ -56,6 +56,11 @@ public class Util { if (-1 != contentLength) { if (contentLength < 0) { reporter.onCompleted(new Exception("not using chunked encoding, and no content-length found.")); + return callback; + } + if (contentLength == 0) { + reporter.onCompleted(null); + return callback; } // System.out.println("Content len: " + contentLength); FilteredDataCallback contentLengthWatcher = new FilteredDataCallback() { diff --git a/AndroidAsync/src/com/koushikdutta/async/http/server/WebSocketImpl.java b/AndroidAsync/src/com/koushikdutta/async/http/server/WebSocketImpl.java index af463d0..516e3da 100644 --- a/AndroidAsync/src/com/koushikdutta/async/http/server/WebSocketImpl.java +++ b/AndroidAsync/src/com/koushikdutta/async/http/server/WebSocketImpl.java @@ -2,6 +2,7 @@ package com.koushikdutta.async.http.server; import java.nio.ByteBuffer; import java.security.MessageDigest; +import java.util.UUID; import android.util.Base64; @@ -9,6 +10,8 @@ import com.koushikdutta.async.AsyncSocket; import com.koushikdutta.async.BufferedDataSink; import com.koushikdutta.async.callback.ClosedCallback; import com.koushikdutta.async.callback.CompletedCallback; +import com.koushikdutta.async.http.AsyncHttpResponse; +import com.koushikdutta.async.http.libcore.RawHeaders; public class WebSocketImpl implements WebSocket { private static String SHA1(String text) { @@ -25,34 +28,8 @@ public class WebSocketImpl implements WebSocket { final static String MAGIC = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"; - private AsyncSocket mSocket; - BufferedDataSink mSink; - public WebSocketImpl(AsyncHttpServerRequest request, AsyncHttpServerResponse response) { - mSocket = request.getSocket(); - mSink = new BufferedDataSink(mSocket); - - mSocket.setClosedCallback(new ClosedCallback() { - @Override - public void onClosed() { - if (WebSocketImpl.this.mClosedCallback != null) - WebSocketImpl.this.mClosedCallback.onClosed(); - } - }); - - String key = request.getHeaders().getHeaders().get("Sec-WebSocket-Key"); - String concat = key + MAGIC; - String sha1 = SHA1(concat); - String origin = request.getHeaders().getHeaders().get("Origin"); - - response.responseCode(101); - response.getHeaders().getHeaders().set("Upgrade", "WebSocket"); - response.getHeaders().getHeaders().set("Connection", "Upgrade"); - response.getHeaders().getHeaders().set("Sec-WebSocket-Accept", sha1); -// if (origin != null) -// response.getHeaders().getHeaders().set("Access-Control-Allow-Origin", "http://" + origin); - response.writeHead(); - - mParser = new HybiParser(request) { + private void setupParser() { + mParser = new HybiParser(mSocket) { @Override protected void report(Exception ex) { if (WebSocketImpl.this.mExceptionCallback != null) @@ -76,6 +53,64 @@ public class WebSocketImpl implements WebSocket { } }; mParser.setMasking(false); + if (mSocket.isPaused()) + mSocket.resume(); + } + + private AsyncSocket mSocket; + BufferedDataSink mSink; + public WebSocketImpl(AsyncHttpServerRequest request, AsyncHttpServerResponse response) { + this(request.getSocket()); + + String key = request.getHeaders().getHeaders().get("Sec-WebSocket-Key"); + String concat = key + MAGIC; + String sha1 = SHA1(concat); + String origin = request.getHeaders().getHeaders().get("Origin"); + + response.responseCode(101); + response.getHeaders().getHeaders().set("Upgrade", "WebSocket"); + response.getHeaders().getHeaders().set("Connection", "Upgrade"); + response.getHeaders().getHeaders().set("Sec-WebSocket-Accept", sha1); +// if (origin != null) +// response.getHeaders().getHeaders().set("Access-Control-Allow-Origin", "http://" + origin); + response.writeHead(); + + setupParser(); + } + + public static void addWebSocketUpgradeHeaders(RawHeaders headers) { + final String key = UUID.randomUUID().toString(); + headers.set("Sec-WebSocket-Key", key); + headers.set("Connection", "Upgrade"); + headers.set("Upgrade", "websocket"); + } + + public WebSocketImpl(AsyncSocket socket) { + mSocket = socket; + mSink = new BufferedDataSink(mSocket); + + mSocket.setClosedCallback(new ClosedCallback() { + @Override + public void onClosed() { + if (WebSocketImpl.this.mClosedCallback != null) + WebSocketImpl.this.mClosedCallback.onClosed(); + } + }); + } + + public static WebSocket finishHandshake(RawHeaders requestHeaders, AsyncHttpResponse response) { + if (response == null) + return null; + if (response.getHeaders().getHeaders().getResponseCode() != 101) + return null; + if (!"websocket".equalsIgnoreCase(response.getHeaders().getHeaders().get("Upgrade"))) + return null; + + // TODO: verify accept hash Sec-WebSocket-Accept + + WebSocketImpl ret = new WebSocketImpl(response.detachSocket()); + ret.setupParser(); + return ret; } HybiParser mParser; diff --git a/AndroidAsyncSample/src/com/koushikdutta/async/sample/MainActivity.java b/AndroidAsyncSample/src/com/koushikdutta/async/sample/MainActivity.java index f9f1a0e..a335726 100644 --- a/AndroidAsyncSample/src/com/koushikdutta/async/sample/MainActivity.java +++ b/AndroidAsyncSample/src/com/koushikdutta/async/sample/MainActivity.java @@ -21,10 +21,6 @@ import com.koushikdutta.async.http.AsyncHttpClient; import com.koushikdutta.async.http.AsyncHttpPost; import com.koushikdutta.async.http.AsyncHttpResponse; import com.koushikdutta.async.http.UrlEncodedFormBody; -import com.koushikdutta.async.http.server.AsyncHttpServer; -import com.koushikdutta.async.http.server.WebSocket; -import com.koushikdutta.async.http.server.WebSocket.StringCallback; -import com.koushikdutta.async.http.server.WebSocketCallback; public class MainActivity extends Activity { ImageView rommanager; @@ -49,21 +45,7 @@ public class MainActivity extends Activity { tether = (ImageView)findViewById(R.id.tether); desksms = (ImageView)findViewById(R.id.desksms); chart = (ImageView)findViewById(R.id.chart); - - server.listen(4500); - server.websocket("/", new WebSocketCallback() { - @Override - public void onConnected(WebSocket webSocket) { - webSocket.setStringCallback(new StringCallback() { - @Override - public void onStringAvailable(String s) { - System.out.println("String: " + s); - } - }); - } - }); } - AsyncHttpServer server = new AsyncHttpServer(); @Override public boolean onCreateOptionsMenu(Menu menu) { |