aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--AndroidAsync/src/com/koushikdutta/async/FilteredDataCallback.java1
-rw-r--r--AndroidAsync/src/com/koushikdutta/async/http/AsyncHttpClient.java89
-rw-r--r--AndroidAsync/src/com/koushikdutta/async/http/AsyncHttpResponse.java2
-rw-r--r--AndroidAsync/src/com/koushikdutta/async/http/AsyncHttpResponseImpl.java70
-rw-r--r--AndroidAsync/src/com/koushikdutta/async/http/Util.java5
-rw-r--r--AndroidAsync/src/com/koushikdutta/async/http/server/WebSocketImpl.java91
-rw-r--r--AndroidAsyncSample/src/com/koushikdutta/async/sample/MainActivity.java18
7 files changed, 149 insertions, 127 deletions
diff --git a/AndroidAsync/src/com/koushikdutta/async/FilteredDataCallback.java b/AndroidAsync/src/com/koushikdutta/async/FilteredDataCallback.java
index 6ad3154..a450e84 100644
--- a/AndroidAsync/src/com/koushikdutta/async/FilteredDataCallback.java
+++ b/AndroidAsync/src/com/koushikdutta/async/FilteredDataCallback.java
@@ -43,6 +43,7 @@ public class FilteredDataCallback implements DataEmitter, DataCallback, Complete
@Override
public void onDataAvailable(DataEmitter emitter, ByteBufferList bb) {
+ Assert.assertNull(pending);
Assert.assertNotNull(mDataCallback);
Util.emitAllData(this, bb);
if (bb.remaining() > 0)
diff --git a/AndroidAsync/src/com/koushikdutta/async/http/AsyncHttpClient.java b/AndroidAsync/src/com/koushikdutta/async/http/AsyncHttpClient.java
index f36a91d..5dc1cbf 100644
--- a/AndroidAsync/src/com/koushikdutta/async/http/AsyncHttpClient.java
+++ b/AndroidAsync/src/com/koushikdutta/async/http/AsyncHttpClient.java
@@ -22,25 +22,22 @@ import com.koushikdutta.async.AsyncServer;
import com.koushikdutta.async.AsyncSocket;
import com.koushikdutta.async.ByteBufferList;
import com.koushikdutta.async.DataEmitter;
-import com.koushikdutta.async.NullDataCallback;
import com.koushikdutta.async.callback.ClosedCallback;
import com.koushikdutta.async.callback.CompletedCallback;
import com.koushikdutta.async.callback.ConnectCallback;
import com.koushikdutta.async.callback.DataCallback;
import com.koushikdutta.async.callback.RequestCallback;
import com.koushikdutta.async.http.libcore.RawHeaders;
+import com.koushikdutta.async.http.server.WebSocket;
+import com.koushikdutta.async.http.server.WebSocketImpl;
import com.koushikdutta.async.stream.OutputStreamDataCallback;
public class AsyncHttpClient {
private static abstract class InternalConnectCallback implements ConnectCallback {
- AsyncSocket exchange;
+ boolean reused = false;
}
-
- private static class SocketExchange {
- AsyncSocket socket;
- AsyncSocket exchange;
- }
- private static Hashtable<String, HashSet<SocketExchange>> mSockets = new Hashtable<String, HashSet<SocketExchange>>();
+
+ private static Hashtable<String, HashSet<AsyncSocket>> mSockets = new Hashtable<String, HashSet<AsyncSocket>>();
public static void execute(final AsyncHttpRequest request, final HttpConnectCallback callback) {
execute(AsyncServer.getDefault(), request, callback);
@@ -72,7 +69,7 @@ public class AsyncHttpClient {
final InternalConnectCallback socketConnected = new InternalConnectCallback() {
@Override
- public void onConnectCompleted(Exception ex, final AsyncSocket socket) {
+ public void onConnectCompleted(Exception ex, AsyncSocket socket) {
if (ex != null) {
callback.onConnectCompleted(ex, null);
return;
@@ -80,8 +77,6 @@ public class AsyncHttpClient {
final AsyncHttpResponseImpl ret = new AsyncHttpResponseImpl(request) {
boolean keepalive = false;
protected void onHeadersReceived() {
- super.onHeadersReceived();
-
try {
RawHeaders headers = getRawHeaders();
@@ -93,7 +88,7 @@ public class AsyncHttpClient {
AsyncHttpRequest newReq = new AsyncHttpRequest(new URI(headers.get("Location")), request.getMethod());
execute(server, newReq, callback, redirectCount + 1);
- setDataCallback(new NullDataCallback());
+// setDataCallback(new NullDataCallback());
return;
}
@@ -104,63 +99,73 @@ public class AsyncHttpClient {
}
};
- protected void onCompleted(Exception ex) {
- super.onCompleted(ex);
+ @Override
+ protected void report(Exception ex) {
+ if (mSocket == null)
+ return;
+ super.report(ex);
if (!keepalive) {
- socket.close();
+ mSocket.close();
}
else {
- HashSet<SocketExchange> sockets = mSockets.get(lookup);
+ HashSet<AsyncSocket> sockets = mSockets.get(lookup);
if (sockets == null) {
- sockets = new HashSet<SocketExchange>();
+ sockets = new HashSet<AsyncSocket>();
mSockets.put(lookup, sockets);
}
- final HashSet<SocketExchange> ss = sockets;
+ final HashSet<AsyncSocket> ss = sockets;
synchronized (sockets) {
- SocketExchange se = new SocketExchange();
- se.socket = socket;
- se.exchange = exchange;
- sockets.add(se);
- socket.setClosedCallback(new ClosedCallback() {
+ sockets.add(mSocket);
+ mSocket.setClosedCallback(new ClosedCallback() {
@Override
public void onClosed() {
synchronized (ss) {
- ss.remove(socket);
+ ss.remove(mSocket);
}
- socket.setClosedCallback(null);
+ mSocket.setClosedCallback(null);
}
});
}
}
- };
+ }
+
+
+ @Override
+ public AsyncSocket detachSocket() {
+ mSocket.setWriteableCallback(null);
+ mSocket.setClosedCallback(null);
+ mSocket.setCompletedCallback(null);
+ mSocket.setDataCallback(null);
+ AsyncSocket socket = mSocket;
+ mSocket = null;
+ return socket;
+ }
};
// socket and exchange are the same for regular http
// but different for https (ssl)
// the exchange will be a wrapper around socket that does
// ssl translation.
- if (exchange == null) {
- exchange = socket;
+ if (!reused) {
if (request.getUri().getScheme().equals("https")) {
- exchange = new AsyncSSLSocket(socket, uri.getHost(), finalPort);
+ socket = new AsyncSSLSocket(socket, uri.getHost(), finalPort);
}
}
- ret.setSocket(socket, exchange);
+ ret.setSocket(socket);
}
};
- HashSet<SocketExchange> sockets = mSockets.get(lookup);
+ HashSet<AsyncSocket> sockets = mSockets.get(lookup);
if (sockets != null) {
synchronized (sockets) {
- for (final SocketExchange se: sockets) {
- final AsyncSocket socket = se.socket;
+ for (final AsyncSocket socket: sockets) {
if (socket.isConnected()) {
- sockets.remove(se);
+ sockets.remove(socket);
socket.setClosedCallback(null);
server.post(new Runnable() {
@Override
public void run() {
- socketConnected.exchange = se.exchange;
+ socketConnected.reused = true;
socketConnected.onConnectCompleted(null, socket);
}
});
@@ -299,23 +304,23 @@ public class AsyncHttpClient {
}
}
- public static class WebSocketCallback implements CompletedCallback {
- @Override
- public void onCompleted(Exception ex) {
- }
+ public static interface WebSocketCallback {
+ public void onCompleted(Exception ex, WebSocket webSocket);
}
public static void websocket(String uri, final WebSocketCallback callback) {
try {
- execute(new AsyncHttpGet(uri), new HttpConnectCallback() {
+ final AsyncHttpGet get = new AsyncHttpGet(uri);
+ WebSocketImpl.addWebSocketUpgradeHeaders(get.getHeaders().getHeaders());
+ execute(get, new HttpConnectCallback() {
@Override
public void onConnectCompleted(Exception ex, AsyncHttpResponse response) {
-// response.get
+ callback.onCompleted(null, WebSocketImpl.finishHandshake(get.getHeaders().getHeaders(), response));
}
});
}
catch (URISyntaxException e) {
- callback.onCompleted(e);
+ callback.onCompleted(e, null);
}
}
diff --git a/AndroidAsync/src/com/koushikdutta/async/http/AsyncHttpResponse.java b/AndroidAsync/src/com/koushikdutta/async/http/AsyncHttpResponse.java
index 7761dcd..24b515d 100644
--- a/AndroidAsync/src/com/koushikdutta/async/http/AsyncHttpResponse.java
+++ b/AndroidAsync/src/com/koushikdutta/async/http/AsyncHttpResponse.java
@@ -1,5 +1,6 @@
package com.koushikdutta.async.http;
+import com.koushikdutta.async.AsyncSocket;
import com.koushikdutta.async.CompletedEmitter;
import com.koushikdutta.async.DataEmitter;
import com.koushikdutta.async.DataSink;
@@ -11,4 +12,5 @@ public interface AsyncHttpResponse extends DataEmitter, DataSink, CompletedEmitt
public CompletedCallback getCompletedCallback();
public ResponseHeaders getHeaders();
public void end();
+ public AsyncSocket detachSocket();
}
diff --git a/AndroidAsync/src/com/koushikdutta/async/http/AsyncHttpResponseImpl.java b/AndroidAsync/src/com/koushikdutta/async/http/AsyncHttpResponseImpl.java
index ed5c5ac..0372af8 100644
--- a/AndroidAsync/src/com/koushikdutta/async/http/AsyncHttpResponseImpl.java
+++ b/AndroidAsync/src/com/koushikdutta/async/http/AsyncHttpResponseImpl.java
@@ -19,16 +19,15 @@ import com.koushikdutta.async.http.filter.ChunkedOutputFilter;
import com.koushikdutta.async.http.libcore.RawHeaders;
import com.koushikdutta.async.http.libcore.ResponseHeaders;
-public class AsyncHttpResponseImpl extends FilteredDataCallback implements AsyncHttpResponse {
+abstract class AsyncHttpResponseImpl extends FilteredDataCallback implements AsyncHttpResponse {
private RawHeaders mRawHeaders = new RawHeaders();
RawHeaders getRawHeaders() {
return mRawHeaders;
}
private AsyncHttpRequestBody mWriter;
- void setSocket(AsyncSocket socket, AsyncSocket exchange) {
- mSocket = socket;
- mExchange = exchange;
+ void setSocket(AsyncSocket exchange) {
+ mSocket = exchange;
mWriter = mRequest.getBody();
if (mWriter != null) {
@@ -66,17 +65,7 @@ public class AsyncHttpResponseImpl extends FilteredDataCallback implements Async
}
};
- protected void onHeadersReceived() {
- mHeaders = new ResponseHeaders(mRequest.getUri(), mRawHeaders);
-
- if (mHeaders.getContentLength() == 0) {
- report(null);
- return;
- }
-
- DataCallback callback = Util.getBodyDecoder(this, mRawHeaders, mReporter);
- mExchange.setDataCallback(callback);
- }
+ protected abstract void onHeadersReceived();
StringCallback mHeaderCallback = new StringCallback() {
@Override
@@ -89,7 +78,13 @@ public class AsyncHttpResponseImpl extends FilteredDataCallback implements Async
mRawHeaders.addLine(s);
}
else {
+ mHeaders = new ResponseHeaders(mRequest.getUri(), mRawHeaders);
onHeadersReceived();
+ // socket may get detached after headers (websocket)
+ if (mSocket == null)
+ return;
+ DataCallback callback = Util.getBodyDecoder(AsyncHttpResponseImpl.this, mRawHeaders, mReporter);
+ mSocket.setDataCallback(callback);
}
}
catch (Exception ex) {
@@ -101,48 +96,45 @@ public class AsyncHttpResponseImpl extends FilteredDataCallback implements Async
@Override
protected void report(Exception e) {
super.report(e);
- onCompleted(e);
- }
-
- private AsyncSocket mSocket;
- private AsyncHttpRequest mRequest;
- private AsyncSocket mExchange;
- private ResponseHeaders mHeaders;
- public AsyncHttpResponseImpl(AsyncHttpRequest request) {
- mRequest = request;
- }
- boolean mCompleted = false;
- protected void onCompleted(Exception ex) {
// DISCONNECT. EVERYTHING.
// should not get any data after this point...
// if so, eat it and disconnect.
- mExchange.setDataCallback(new NullDataCallback() {
+ mSocket.setDataCallback(new NullDataCallback() {
@Override
public void onDataAvailable(DataEmitter emitter, ByteBufferList bb) {
super.onDataAvailable(emitter, bb);
mSocket.close();
}
});
- mExchange.setWriteableCallback(null);
+ mSocket.setWriteableCallback(null);
mSocket.setClosedCallback(null);
mSocket.setCompletedCallback(null);
mCompleted = true;
// System.out.println("closing up shop");
- if (mCompletedCallback != null)
- mCompletedCallback.onCompleted(ex);
+// if (mCompletedCallback != null)
+// mCompletedCallback.onCompleted(e);
}
- CompletedCallback mCompletedCallback;
- @Override
- public void setCompletedCallback(CompletedCallback handler) {
- mCompletedCallback = handler;
+ private AsyncHttpRequest mRequest;
+ AsyncSocket mSocket;
+ private ResponseHeaders mHeaders;
+ public AsyncHttpResponseImpl(AsyncHttpRequest request) {
+ mRequest = request;
}
- @Override
- public CompletedCallback getCompletedCallback() {
- return mCompletedCallback;
- }
+ boolean mCompleted = false;
+//
+// CompletedCallback mCompletedCallback;
+// @Override
+// public void setCompletedCallback(CompletedCallback handler) {
+// mCompletedCallback = handler;
+// }
+//
+// @Override
+// public CompletedCallback getCompletedCallback() {
+// return mCompletedCallback;
+// }
@Override
public ResponseHeaders getHeaders() {
diff --git a/AndroidAsync/src/com/koushikdutta/async/http/Util.java b/AndroidAsync/src/com/koushikdutta/async/http/Util.java
index 01c8a05..89af1d2 100644
--- a/AndroidAsync/src/com/koushikdutta/async/http/Util.java
+++ b/AndroidAsync/src/com/koushikdutta/async/http/Util.java
@@ -56,6 +56,11 @@ public class Util {
if (-1 != contentLength) {
if (contentLength < 0) {
reporter.onCompleted(new Exception("not using chunked encoding, and no content-length found."));
+ return callback;
+ }
+ if (contentLength == 0) {
+ reporter.onCompleted(null);
+ return callback;
}
// System.out.println("Content len: " + contentLength);
FilteredDataCallback contentLengthWatcher = new FilteredDataCallback() {
diff --git a/AndroidAsync/src/com/koushikdutta/async/http/server/WebSocketImpl.java b/AndroidAsync/src/com/koushikdutta/async/http/server/WebSocketImpl.java
index af463d0..516e3da 100644
--- a/AndroidAsync/src/com/koushikdutta/async/http/server/WebSocketImpl.java
+++ b/AndroidAsync/src/com/koushikdutta/async/http/server/WebSocketImpl.java
@@ -2,6 +2,7 @@ package com.koushikdutta.async.http.server;
import java.nio.ByteBuffer;
import java.security.MessageDigest;
+import java.util.UUID;
import android.util.Base64;
@@ -9,6 +10,8 @@ import com.koushikdutta.async.AsyncSocket;
import com.koushikdutta.async.BufferedDataSink;
import com.koushikdutta.async.callback.ClosedCallback;
import com.koushikdutta.async.callback.CompletedCallback;
+import com.koushikdutta.async.http.AsyncHttpResponse;
+import com.koushikdutta.async.http.libcore.RawHeaders;
public class WebSocketImpl implements WebSocket {
private static String SHA1(String text) {
@@ -25,34 +28,8 @@ public class WebSocketImpl implements WebSocket {
final static String MAGIC = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
- private AsyncSocket mSocket;
- BufferedDataSink mSink;
- public WebSocketImpl(AsyncHttpServerRequest request, AsyncHttpServerResponse response) {
- mSocket = request.getSocket();
- mSink = new BufferedDataSink(mSocket);
-
- mSocket.setClosedCallback(new ClosedCallback() {
- @Override
- public void onClosed() {
- if (WebSocketImpl.this.mClosedCallback != null)
- WebSocketImpl.this.mClosedCallback.onClosed();
- }
- });
-
- String key = request.getHeaders().getHeaders().get("Sec-WebSocket-Key");
- String concat = key + MAGIC;
- String sha1 = SHA1(concat);
- String origin = request.getHeaders().getHeaders().get("Origin");
-
- response.responseCode(101);
- response.getHeaders().getHeaders().set("Upgrade", "WebSocket");
- response.getHeaders().getHeaders().set("Connection", "Upgrade");
- response.getHeaders().getHeaders().set("Sec-WebSocket-Accept", sha1);
-// if (origin != null)
-// response.getHeaders().getHeaders().set("Access-Control-Allow-Origin", "http://" + origin);
- response.writeHead();
-
- mParser = new HybiParser(request) {
+ private void setupParser() {
+ mParser = new HybiParser(mSocket) {
@Override
protected void report(Exception ex) {
if (WebSocketImpl.this.mExceptionCallback != null)
@@ -76,6 +53,64 @@ public class WebSocketImpl implements WebSocket {
}
};
mParser.setMasking(false);
+ if (mSocket.isPaused())
+ mSocket.resume();
+ }
+
+ private AsyncSocket mSocket;
+ BufferedDataSink mSink;
+ public WebSocketImpl(AsyncHttpServerRequest request, AsyncHttpServerResponse response) {
+ this(request.getSocket());
+
+ String key = request.getHeaders().getHeaders().get("Sec-WebSocket-Key");
+ String concat = key + MAGIC;
+ String sha1 = SHA1(concat);
+ String origin = request.getHeaders().getHeaders().get("Origin");
+
+ response.responseCode(101);
+ response.getHeaders().getHeaders().set("Upgrade", "WebSocket");
+ response.getHeaders().getHeaders().set("Connection", "Upgrade");
+ response.getHeaders().getHeaders().set("Sec-WebSocket-Accept", sha1);
+// if (origin != null)
+// response.getHeaders().getHeaders().set("Access-Control-Allow-Origin", "http://" + origin);
+ response.writeHead();
+
+ setupParser();
+ }
+
+ public static void addWebSocketUpgradeHeaders(RawHeaders headers) {
+ final String key = UUID.randomUUID().toString();
+ headers.set("Sec-WebSocket-Key", key);
+ headers.set("Connection", "Upgrade");
+ headers.set("Upgrade", "websocket");
+ }
+
+ public WebSocketImpl(AsyncSocket socket) {
+ mSocket = socket;
+ mSink = new BufferedDataSink(mSocket);
+
+ mSocket.setClosedCallback(new ClosedCallback() {
+ @Override
+ public void onClosed() {
+ if (WebSocketImpl.this.mClosedCallback != null)
+ WebSocketImpl.this.mClosedCallback.onClosed();
+ }
+ });
+ }
+
+ public static WebSocket finishHandshake(RawHeaders requestHeaders, AsyncHttpResponse response) {
+ if (response == null)
+ return null;
+ if (response.getHeaders().getHeaders().getResponseCode() != 101)
+ return null;
+ if (!"websocket".equalsIgnoreCase(response.getHeaders().getHeaders().get("Upgrade")))
+ return null;
+
+ // TODO: verify accept hash Sec-WebSocket-Accept
+
+ WebSocketImpl ret = new WebSocketImpl(response.detachSocket());
+ ret.setupParser();
+ return ret;
}
HybiParser mParser;
diff --git a/AndroidAsyncSample/src/com/koushikdutta/async/sample/MainActivity.java b/AndroidAsyncSample/src/com/koushikdutta/async/sample/MainActivity.java
index f9f1a0e..a335726 100644
--- a/AndroidAsyncSample/src/com/koushikdutta/async/sample/MainActivity.java
+++ b/AndroidAsyncSample/src/com/koushikdutta/async/sample/MainActivity.java
@@ -21,10 +21,6 @@ import com.koushikdutta.async.http.AsyncHttpClient;
import com.koushikdutta.async.http.AsyncHttpPost;
import com.koushikdutta.async.http.AsyncHttpResponse;
import com.koushikdutta.async.http.UrlEncodedFormBody;
-import com.koushikdutta.async.http.server.AsyncHttpServer;
-import com.koushikdutta.async.http.server.WebSocket;
-import com.koushikdutta.async.http.server.WebSocket.StringCallback;
-import com.koushikdutta.async.http.server.WebSocketCallback;
public class MainActivity extends Activity {
ImageView rommanager;
@@ -49,21 +45,7 @@ public class MainActivity extends Activity {
tether = (ImageView)findViewById(R.id.tether);
desksms = (ImageView)findViewById(R.id.desksms);
chart = (ImageView)findViewById(R.id.chart);
-
- server.listen(4500);
- server.websocket("/", new WebSocketCallback() {
- @Override
- public void onConnected(WebSocket webSocket) {
- webSocket.setStringCallback(new StringCallback() {
- @Override
- public void onStringAvailable(String s) {
- System.out.println("String: " + s);
- }
- });
- }
- });
}
- AsyncHttpServer server = new AsyncHttpServer();
@Override
public boolean onCreateOptionsMenu(Menu menu) {