diff options
author | Koushik Dutta <koushd@gmail.com> | 2014-11-03 14:43:14 -0800 |
---|---|---|
committer | Koushik Dutta <koushd@gmail.com> | 2014-11-03 14:43:14 -0800 |
commit | 372f5b330598d0ab6cc50f790148fb8f4c1641da (patch) | |
tree | 1946d893833b7704ab1e28545bf705b9ac0a1f3b /AndroidAsync | |
parent | 170efe3b6157d93839549413d181d5696883dbba (diff) | |
parent | 4d15ec93c0321516704cb738d3f1fa1c1d68892a (diff) | |
download | AndroidAsync-372f5b330598d0ab6cc50f790148fb8f4c1641da.tar.gz AndroidAsync-372f5b330598d0ab6cc50f790148fb8f4c1641da.tar.bz2 AndroidAsync-372f5b330598d0ab6cc50f790148fb8f4c1641da.zip |
Merge remote-tracking branch 'origin/master' into spdy
Conflicts:
AndroidAsync/src/com/koushikdutta/async/http/AsyncHttpClient.java
AndroidAsync/src/com/koushikdutta/async/http/server/AsyncHttpServer.java
AndroidAsync/src/com/koushikdutta/async/http/socketio/transport/WebSocketTransport.java
AndroidAsync/test/src/com/koushikdutta/async/test/WebSocketTests.java
Diffstat (limited to 'AndroidAsync')
11 files changed, 159 insertions, 50 deletions
diff --git a/AndroidAsync/src/com/koushikdutta/async/AsyncServer.java b/AndroidAsync/src/com/koushikdutta/async/AsyncServer.java index 4503964..4c94844 100644 --- a/AndroidAsync/src/com/koushikdutta/async/AsyncServer.java +++ b/AndroidAsync/src/com/koushikdutta/async/AsyncServer.java @@ -31,7 +31,12 @@ import java.util.Set; import java.util.WeakHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.Semaphore; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; public class AsyncServer { public static final String LOGTAG = "NIO"; @@ -85,7 +90,7 @@ public class AsyncServer { catch (Throwable ex) { } } - + static AsyncServer mInstance = new AsyncServer(); public static AsyncServer getDefault() { return mInstance; @@ -114,7 +119,7 @@ public class AsyncServer { ckey.attach(handler); handler.setup(this, ckey); } - + public void removeAllCallbacks(Object scheduled) { synchronized (this) { mQueue.remove(scheduled); @@ -134,7 +139,7 @@ public class AsyncServer { } }); } - + public Object postDelayed(Runnable runnable, long delay) { Scheduled s; synchronized (this) { @@ -161,11 +166,11 @@ public class AsyncServer { } return s; } - + public Object post(Runnable runnable) { return postDelayed(runnable, 0); } - + public Object post(final CompletedCallback callback, final Exception e) { return post(new Runnable() { @Override @@ -174,7 +179,7 @@ public class AsyncServer { } }); } - + public void run(final Runnable runnable) { if (Thread.currentThread() == mAffinity) { post(runnable); @@ -263,7 +268,7 @@ public class AsyncServer { catch (Exception e) { } } - + protected void onDataReceived(int transmitted) { } @@ -335,7 +340,7 @@ public class AsyncServer { SocketChannel socket; ConnectCallback callback; } - + private ConnectFuture connectResolvedInetSocketAddress(final InetSocketAddress address, final ConnectCallback callback) { final ConnectFuture cancel = new ConnectFuture(); assert !address.isUnresolved(); @@ -396,7 +401,14 @@ public class AsyncServer { return connectSocket(InetSocketAddress.createUnresolved(host, port), callback); } - private static ExecutorService synchronousWorkers = Executors.newFixedThreadPool(4); + private static ExecutorService newSynchronousWorkers() { + ThreadFactory tf = new NamedThreadFactory("AsyncServer-worker-"); + ThreadPoolExecutor tpe = new ThreadPoolExecutor(1, 4, 10L, + TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), tf); + return tpe; + } + + private static ExecutorService synchronousWorkers = newSynchronousWorkers(); public Future<InetAddress[]> getAllByName(final String host) { final SimpleFuture<InetAddress[]> ret = new SimpleFuture<InetAddress[]>(); synchronousWorkers.execute(new Runnable() { @@ -487,7 +499,7 @@ public class AsyncServer { }); return handler; } - + public AsyncDatagramSocket connectDatagram(final SocketAddress remote) throws IOException { final DatagramChannel socket = DatagramChannel.open(); final AsyncDatagramSocket handler = new AsyncDatagramSocket(); @@ -509,7 +521,7 @@ public class AsyncServer { }); return handler; } - + final static WeakHashMap<Thread, AsyncServer> mServers = new WeakHashMap<Thread, AsyncServer>(); private boolean addMe() { @@ -527,7 +539,7 @@ public class AsyncServer { public static AsyncServer getCurrentThreadServer() { return mServers.get(Thread.currentThread()); } - + Thread mAffinity; private void run(boolean newThread) { final SelectorWrapper selector; @@ -596,10 +608,10 @@ public class AsyncServer { } return; } - + run(this, selector, queue); } - + private static void run(final AsyncServer server, final SelectorWrapper selector, final PriorityQueue<Scheduled> queue) { // Log.i(LOGTAG, "****AsyncServer is starting.****"); // at this point, this local queue and selector are owned @@ -666,11 +678,11 @@ public class AsyncServer { catch (Exception e) { } } - + private static final long QUEUE_EMPTY = Long.MAX_VALUE; private static long lockAndRunQueue(final AsyncServer server, final PriorityQueue<Scheduled> queue) { long wait = QUEUE_EMPTY; - + // find the first item we can actually run while (true) { Scheduled run = null; @@ -689,10 +701,10 @@ public class AsyncServer { } } } - + if (run == null) break; - + run.runnable.run(); } @@ -834,11 +846,11 @@ public class AsyncServer { } }); } - + public Thread getAffinity() { return mAffinity; } - + public boolean isAffinityThread() { return mAffinity == Thread.currentThread(); } @@ -847,4 +859,27 @@ public class AsyncServer { Thread affinity = mAffinity; return affinity == null || affinity == Thread.currentThread(); } + + private static class NamedThreadFactory implements ThreadFactory { + private final ThreadGroup group; + private final AtomicInteger threadNumber = new AtomicInteger(1); + private final String namePrefix; + + NamedThreadFactory(String namePrefix) { + SecurityManager s = System.getSecurityManager(); + group = (s != null) ? s.getThreadGroup() : + Thread.currentThread().getThreadGroup(); + this.namePrefix = namePrefix; + } + + public Thread newThread(Runnable r) { + Thread t = new Thread(group, r, + namePrefix + threadNumber.getAndIncrement(), 0); + if (t.isDaemon()) t.setDaemon(false); + if (t.getPriority() != Thread.NORM_PRIORITY) { + t.setPriority(Thread.NORM_PRIORITY); + } + return t; + } + } } diff --git a/AndroidAsync/src/com/koushikdutta/async/http/AsyncHttpClient.java b/AndroidAsync/src/com/koushikdutta/async/http/AsyncHttpClient.java index c6b77e1..e2e1404 100644 --- a/AndroidAsync/src/com/koushikdutta/async/http/AsyncHttpClient.java +++ b/AndroidAsync/src/com/koushikdutta/async/http/AsyncHttpClient.java @@ -297,6 +297,7 @@ public class AsyncHttpClient { request.getHeaders().set("Content-Type", request.getBody().getContentType()); } + final Exception unsupportedURI; synchronized (mMiddleware) { for (AsyncHttpClientMiddleware middleware: mMiddleware) { Cancellable socketCancellable = middleware.getSocket(data); @@ -306,8 +307,9 @@ public class AsyncHttpClient { return; } } + unsupportedURI = new IllegalArgumentException("invalid uri="+request.getUri()+" middlewares="+mMiddleware); } - reportConnectedCompleted(cancel, new IllegalArgumentException("invalid uri"), null, request, callback); + reportConnectedCompleted(cancel, unsupportedURI, null, request, callback); } private void executeSocket(final AsyncHttpRequest request, final int redirectCount, diff --git a/AndroidAsync/src/com/koushikdutta/async/http/body/UrlEncodedFormBody.java b/AndroidAsync/src/com/koushikdutta/async/http/body/UrlEncodedFormBody.java index a3fad54..8ffba53 100644 --- a/AndroidAsync/src/com/koushikdutta/async/http/body/UrlEncodedFormBody.java +++ b/AndroidAsync/src/com/koushikdutta/async/http/body/UrlEncodedFormBody.java @@ -60,7 +60,7 @@ public class UrlEncodedFormBody implements AsyncHttpRequestBody<Multimap> { public static final String CONTENT_TYPE = "application/x-www-form-urlencoded"; @Override public String getContentType() { - return CONTENT_TYPE + "; charset=utf8"; + return CONTENT_TYPE + "; charset=utf-8"; } @Override diff --git a/AndroidAsync/src/com/koushikdutta/async/http/server/AsyncHttpServer.java b/AndroidAsync/src/com/koushikdutta/async/http/server/AsyncHttpServer.java index 4d37838..8343965 100644 --- a/AndroidAsync/src/com/koushikdutta/async/http/server/AsyncHttpServer.java +++ b/AndroidAsync/src/com/koushikdutta/async/http/server/AsyncHttpServer.java @@ -300,7 +300,7 @@ public class AsyncHttpServer { } public static interface WebSocketRequestCallback { - public void onConnected(WebSocket webSocket, Headers headers); + public void onConnected(WebSocket webSocket, AsyncHttpServerRequest request); } public void websocket(String regex, final WebSocketRequestCallback callback) { @@ -333,15 +333,11 @@ public class AsyncHttpServer { response.end(); return; } - callback.onConnected(createWebSocket(request, response), request.getHeaders()); + callback.onConnected(new WebSocketImpl(request, response), request); } }); } - protected WebSocket createWebSocket(final AsyncHttpServerRequest request, final AsyncHttpServerResponse response) { - return new WebSocketImpl(request, response); - } - public void get(String regex, HttpServerRequestCallback callback) { addAction(AsyncHttpGet.METHOD, regex, callback); } @@ -351,7 +347,7 @@ public class AsyncHttpServer { } public static android.util.Pair<Integer, InputStream> getAssetStream(final Context context, String asset) { - AssetManager am = context.getAssets(); + AssetManager am = context.getResources().getAssets(); try { InputStream is = am.open(asset); return new android.util.Pair<Integer, InputStream>(is.available(), is); @@ -392,12 +388,16 @@ public class AsyncHttpServer { return null; } + private String replacePrefix(Matcher m) { + return m.group(0).substring(m.end(1)); + } + public void directory(Context context, String regex, final String assetPath) { final Context _context = context.getApplicationContext(); addAction(AsyncHttpGet.METHOD, regex, new HttpServerRequestCallback() { @Override public void onRequest(AsyncHttpServerRequest request, final AsyncHttpServerResponse response) { - String path = request.getMatcher().replaceAll(""); + String path = replacePrefix(request.getMatcher()); android.util.Pair<Integer, InputStream> pair = getAssetStream(_context, assetPath + path); if (pair == null || pair.second == null) { response.code(404); @@ -420,7 +420,7 @@ public class AsyncHttpServer { addAction(AsyncHttpHead.METHOD, regex, new HttpServerRequestCallback() { @Override public void onRequest(AsyncHttpServerRequest request, final AsyncHttpServerResponse response) { - String path = request.getMatcher().replaceAll(""); + String path = replacePrefix(request.getMatcher()); android.util.Pair<Integer, InputStream> pair = getAssetStream(_context, assetPath + path); if (pair == null || pair.second == null) { response.code(404); @@ -447,7 +447,7 @@ public class AsyncHttpServer { addAction("GET", regex, new HttpServerRequestCallback() { @Override public void onRequest(AsyncHttpServerRequest request, final AsyncHttpServerResponse response) { - String path = request.getMatcher().replaceAll(""); + String path = replacePrefix(request.getMatcher()); File file = new File(directory, path); if (file.isDirectory() && list) { diff --git a/AndroidAsync/src/com/koushikdutta/async/http/server/AsyncHttpServerResponseImpl.java b/AndroidAsync/src/com/koushikdutta/async/http/server/AsyncHttpServerResponseImpl.java index 2e7d2ad..1454a85 100644 --- a/AndroidAsync/src/com/koushikdutta/async/http/server/AsyncHttpServerResponseImpl.java +++ b/AndroidAsync/src/com/koushikdutta/async/http/server/AsyncHttpServerResponseImpl.java @@ -230,13 +230,13 @@ public class AsyncHttpServerResponseImpl implements AsyncHttpServerResponse { public void send(String string) { String contentType = mRawHeaders.get("Content-Type"); if (contentType == null) - contentType = "text/html; charset=utf8"; + contentType = "text/html; charset=utf-8"; send(contentType, string); } @Override public void send(JSONObject json) { - send("application/json; charset=utf8", json.toString()); + send("application/json; charset=utf-8", json.toString()); } @Override diff --git a/AndroidAsync/src/com/koushikdutta/async/http/socketio/SocketIOConnection.java b/AndroidAsync/src/com/koushikdutta/async/http/socketio/SocketIOConnection.java index fb4da56..0407f22 100644 --- a/AndroidAsync/src/com/koushikdutta/async/http/socketio/SocketIOConnection.java +++ b/AndroidAsync/src/com/koushikdutta/async/http/socketio/SocketIOConnection.java @@ -30,6 +30,7 @@ import java.util.Hashtable; class SocketIOConnection { AsyncHttpClient httpClient; int heartbeat; + long reconnectDelay; ArrayList<SocketIOClient> clients = new ArrayList<SocketIOClient>(); SocketIOTransport transport; SocketIORequest request; @@ -37,6 +38,7 @@ class SocketIOConnection { public SocketIOConnection(AsyncHttpClient httpClient, SocketIORequest request) { this.httpClient = httpClient; this.request = request; + this.reconnectDelay = this.request.config.reconnectDelay; } public boolean isConnected() { @@ -104,12 +106,12 @@ class SocketIOConnection { request.logi("Reconnecting socket.io"); - Cancellable connecting = httpClient.executeString(request, null) + connecting = httpClient.executeString(request, null) .then(new TransformFuture<SocketIOTransport, String>() { @Override protected void transform(String result) throws Exception { String[] parts = result.split(":"); - String session = parts[0]; + final String sessionId = parts[0]; if (!"".equals(parts[1])) heartbeat = Integer.parseInt(parts[1]) / 2 * 1000; else @@ -122,7 +124,7 @@ class SocketIOConnection { if (set.contains("websocket")) { final String sessionUrl = Uri.parse(request.getUri().toString()).buildUpon() - .appendPath("websocket").appendPath(session) + .appendPath("websocket").appendPath(sessionId) .build().toString(); httpClient.websocket(sessionUrl, null, null) @@ -133,14 +135,14 @@ class SocketIOConnection { transport.setComplete(e); return; } - transport.setComplete(new WebSocketTransport(result)); + transport.setComplete(new WebSocketTransport(result, sessionId)); } }); } else if (set.contains("xhr-polling")) { final String sessionUrl = Uri.parse(request.getUri().toString()).buildUpon() - .appendPath("xhr-polling").appendPath(session) + .appendPath("xhr-polling").appendPath(sessionId) .build().toString(); - XHRPollingTransport xhrPolling = new XHRPollingTransport(httpClient, sessionUrl); + XHRPollingTransport xhrPolling = new XHRPollingTransport(httpClient, sessionUrl, sessionId); transport.setComplete(xhrPolling); } else { throw new SocketIOException("transport not supported"); @@ -157,7 +159,7 @@ class SocketIOConnection { return; } - reconnectDelay = 1000L; + reconnectDelay = request.config.reconnectDelay; SocketIOConnection.this.transport = result; attach(); } @@ -215,11 +217,23 @@ class SocketIOConnection { public void run() { reconnect(null); } - }, reconnectDelay); - reconnectDelay *= 2; + }, nextReconnectDelay(reconnectDelay)); + + reconnectDelay = reconnectDelay * 2; + if (request.config.reconnectDelayMax > 0L) { + reconnectDelay = Math.min(reconnectDelay, request.config.reconnectDelayMax); + } + } + + private long nextReconnectDelay(long targetDelay) { + if (targetDelay < 2L || targetDelay > (Long.MAX_VALUE >> 1) || + !request.config.randomizeReconnectDelay) + { + return targetDelay; + } + return (targetDelay >> 1) + (long) (targetDelay * Math.random()); } - long reconnectDelay = 1000L; private void reportDisconnect(final Exception ex) { if (ex != null) { request.loge("socket.io disconnected", ex); diff --git a/AndroidAsync/src/com/koushikdutta/async/http/socketio/SocketIORequest.java b/AndroidAsync/src/com/koushikdutta/async/http/socketio/SocketIORequest.java index 49ff2da..c5bc873 100644 --- a/AndroidAsync/src/com/koushikdutta/async/http/socketio/SocketIORequest.java +++ b/AndroidAsync/src/com/koushikdutta/async/http/socketio/SocketIORequest.java @@ -1,7 +1,6 @@ package com.koushikdutta.async.http.socketio; import android.net.Uri; -import android.text.TextUtils; import com.koushikdutta.async.http.AsyncHttpPost; @@ -10,6 +9,11 @@ public class SocketIORequest extends AsyncHttpPost { this(uri, ""); } + Config config; + public Config getConfig() { + return config; + } + String endpoint; public String getEndpoint() { return endpoint; @@ -25,8 +29,45 @@ public class SocketIORequest extends AsyncHttpPost { } public SocketIORequest(String uri, String endpoint, String query) { + this(uri, endpoint, query, null); + } + + public SocketIORequest(String uri, String endpoint, String query, Config config) { super(Uri.parse(uri + (query == null ? "" : "?" + query)).buildUpon().encodedPath("/socket.io/1/").build().toString()); + this.config = (config != null) ? config : new Config(); this.endpoint = endpoint; this.query = query; } + + public static class Config { + boolean randomizeReconnectDelay = false; + public void setRandomizeReconnectDelay(boolean randomizeReconnectDelay) { + this.randomizeReconnectDelay = randomizeReconnectDelay; + } + public boolean isRandomizeReconnectDelay() { + return randomizeReconnectDelay; + } + + long reconnectDelay = 1000L; + public void setReconnectDelay(long reconnectDelay) { + if (reconnectDelay < 0L) { + throw new IllegalArgumentException("reconnectDelay must be >= 0"); + } + this.reconnectDelay = reconnectDelay; + } + public long getReconnectDelay() { + return reconnectDelay; + } + + long reconnectDelayMax = 0L; + public void setReconnectDelayMax(long reconnectDelayMax) { + if (reconnectDelay < 0L) { + throw new IllegalArgumentException("reconnectDelayMax must be >= 0"); + } + this.reconnectDelayMax = reconnectDelayMax; + } + public long getReconnectDelayMax() { + return reconnectDelayMax; + } + } } diff --git a/AndroidAsync/src/com/koushikdutta/async/http/socketio/transport/SocketIOTransport.java b/AndroidAsync/src/com/koushikdutta/async/http/socketio/transport/SocketIOTransport.java index 4b5a26d..66219d7 100644 --- a/AndroidAsync/src/com/koushikdutta/async/http/socketio/transport/SocketIOTransport.java +++ b/AndroidAsync/src/com/koushikdutta/async/http/socketio/transport/SocketIOTransport.java @@ -35,4 +35,6 @@ public interface SocketIOTransport { * @return */ public boolean heartbeats(); + + public String getSessionId(); } diff --git a/AndroidAsync/src/com/koushikdutta/async/http/socketio/transport/WebSocketTransport.java b/AndroidAsync/src/com/koushikdutta/async/http/socketio/transport/WebSocketTransport.java index 5514ecd..3be07e7 100644 --- a/AndroidAsync/src/com/koushikdutta/async/http/socketio/transport/WebSocketTransport.java +++ b/AndroidAsync/src/com/koushikdutta/async/http/socketio/transport/WebSocketTransport.java @@ -8,10 +8,11 @@ import com.koushikdutta.async.http.WebSocket; public class WebSocketTransport implements SocketIOTransport { private WebSocket webSocket; private StringCallback stringCallback; + private String sessionId; - public WebSocketTransport(WebSocket webSocket) { + public WebSocketTransport(WebSocket webSocket, String sessionId) { this.webSocket = webSocket; - + this.sessionId = sessionId; this.webSocket.setDataCallback(new DataCallback.NullDataCallback()); } @@ -63,4 +64,10 @@ public class WebSocketTransport implements SocketIOTransport { public boolean heartbeats() { return true; } + + @Override + public String getSessionId() { + return this.sessionId; + } } + diff --git a/AndroidAsync/src/com/koushikdutta/async/http/socketio/transport/XHRPollingTransport.java b/AndroidAsync/src/com/koushikdutta/async/http/socketio/transport/XHRPollingTransport.java index e9a03f6..dc6aca5 100644 --- a/AndroidAsync/src/com/koushikdutta/async/http/socketio/transport/XHRPollingTransport.java +++ b/AndroidAsync/src/com/koushikdutta/async/http/socketio/transport/XHRPollingTransport.java @@ -18,13 +18,15 @@ public class XHRPollingTransport implements SocketIOTransport { private StringCallback stringCallback; private CompletedCallback closedCallback; private boolean connected; + private String sessionId; private static final String SEPARATOR = "\ufffd"; - public XHRPollingTransport(AsyncHttpClient client, String sessionUrl) { + public XHRPollingTransport(AsyncHttpClient client, String sessionUrl, String sessionId) { this.client = client; this.sessionUrl = Uri.parse(sessionUrl); - + this.sessionId = sessionId; + doLongPolling(); connected = true; } @@ -135,4 +137,9 @@ public class XHRPollingTransport implements SocketIOTransport { public boolean heartbeats() { return false; } + + @Override + public String getSessionId() { + return this.sessionId; + } } diff --git a/AndroidAsync/test/src/com/koushikdutta/async/test/WebSocketTests.java b/AndroidAsync/test/src/com/koushikdutta/async/test/WebSocketTests.java index 03de7da..55d9a3e 100644 --- a/AndroidAsync/test/src/com/koushikdutta/async/test/WebSocketTests.java +++ b/AndroidAsync/test/src/com/koushikdutta/async/test/WebSocketTests.java @@ -9,6 +9,7 @@ import com.koushikdutta.async.http.WebSocket; import com.koushikdutta.async.http.WebSocket.StringCallback; import com.koushikdutta.async.http.server.AsyncHttpServer; import com.koushikdutta.async.http.server.AsyncHttpServer.WebSocketRequestCallback; +import com.koushikdutta.async.http.server.AsyncHttpServerRequest; import junit.framework.TestCase; @@ -34,7 +35,7 @@ public class WebSocketTests extends TestCase { httpServer.websocket("/ws", new WebSocketRequestCallback() { @Override - public void onConnected(final WebSocket webSocket, Headers headers) { + public void onConnected(final WebSocket webSocket, AsyncHttpServerRequest request) { webSocket.setStringCallback(new StringCallback() { @Override public void onStringAvailable(String s) { |