diff options
author | Koushik Dutta <koushd@gmail.com> | 2014-04-10 15:23:45 -0700 |
---|---|---|
committer | Koushik Dutta <koushd@gmail.com> | 2014-04-10 15:23:45 -0700 |
commit | 3168f78ba2cf3497cc87df8b48a962dc3f91583d (patch) | |
tree | 6db216d7c68661d27cee91cf3f97de3007516eb5 | |
parent | 5f1b0660b64aa194a23b88568198b1648f961a9c (diff) | |
download | AndroidAsync-3168f78ba2cf3497cc87df8b48a962dc3f91583d.tar.gz AndroidAsync-3168f78ba2cf3497cc87df8b48a962dc3f91583d.tar.bz2 AndroidAsync-3168f78ba2cf3497cc87df8b48a962dc3f91583d.zip |
AsyncServer: nslookup exceptions were not being passed to ConnectCallback
AsyncSocketMiddleware: merge keep alive socket hash with connection info hash.
AsyncSocketMiddleware: only keep sockets alive for 5 minutes (tbd)
3 files changed, 128 insertions, 110 deletions
diff --git a/AndroidAsync/src/com/koushikdutta/async/AsyncServer.java b/AndroidAsync/src/com/koushikdutta/async/AsyncServer.java index 317bfcf..9a161cd 100644 --- a/AndroidAsync/src/com/koushikdutta/async/AsyncServer.java +++ b/AndroidAsync/src/com/koushikdutta/async/AsyncServer.java @@ -9,6 +9,7 @@ import com.koushikdutta.async.callback.ConnectCallback; import com.koushikdutta.async.callback.ListenCallback; import com.koushikdutta.async.future.Cancellable; import com.koushikdutta.async.future.Future; +import com.koushikdutta.async.future.FutureCallback; import com.koushikdutta.async.future.SimpleFuture; import com.koushikdutta.async.future.TransformFuture; import com.koushikdutta.async.util.StreamUtility; @@ -358,13 +359,24 @@ public class AsyncServer { if (!remote.isUnresolved()) return connectResolvedInetSocketAddress(remote, callback); - return getByName(remote.getHostName()) - .then(new TransformFuture<AsyncSocket, InetAddress>() { + final SimpleFuture<AsyncNetworkSocket> ret = new SimpleFuture<AsyncNetworkSocket>(); + + Future<InetAddress> lookup = getByName(remote.getHostName()); + ret.setParent(lookup); + lookup + .setCallback(new FutureCallback<InetAddress>() { @Override - protected void transform(InetAddress result) throws Exception { - setParent(connectResolvedInetSocketAddress(new InetSocketAddress(remote.getHostName(), remote.getPort()), callback)); + public void onCompleted(Exception e, InetAddress result) { + if (e != null) { + callback.onConnectCompleted(e, null); + ret.setComplete(e); + return; + } + + ret.setComplete(connectResolvedInetSocketAddress(new InetSocketAddress(remote.getHostName(), remote.getPort()), callback)); } }); + return ret; } public Cancellable connectSocket(final String host, final int port, final ConnectCallback callback) { @@ -787,4 +799,9 @@ public class AsyncServer { public boolean isAffinityThread() { return mAffinity == Thread.currentThread(); } + + public boolean isAffinityThreadOrStopped() { + Thread affinity = mAffinity; + return affinity == null || affinity == Thread.currentThread(); + } } diff --git a/AndroidAsync/src/com/koushikdutta/async/http/AsyncSocketMiddleware.java b/AndroidAsync/src/com/koushikdutta/async/http/AsyncSocketMiddleware.java index 9b8d4a9..16cf1ed 100644 --- a/AndroidAsync/src/com/koushikdutta/async/http/AsyncSocketMiddleware.java +++ b/AndroidAsync/src/com/koushikdutta/async/http/AsyncSocketMiddleware.java @@ -16,17 +16,23 @@ import com.koushikdutta.async.future.TransformFuture; import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.URI; -import java.util.HashSet; import java.util.Hashtable; public class AsyncSocketMiddleware extends SimpleMiddleware { String scheme; int port; + // 5 min idle timeout + int idleTimeoutMs = 300 * 1000; + public AsyncSocketMiddleware(AsyncHttpClient client, String scheme, int port) { mClient = client; this.scheme = scheme; this.port = port; } + + public void setIdleTimeoutMs(int idleTimeoutMs) { + this.idleTimeoutMs = idleTimeoutMs; + } public int getSchemePort(URI uri) { if (uri.getScheme() == null || !uri.getScheme().equals(scheme)) @@ -44,7 +50,6 @@ public class AsyncSocketMiddleware extends SimpleMiddleware { } AsyncHttpClient mClient; - private Hashtable<String, HashSet<AsyncSocket>> mSockets = new Hashtable<String, HashSet<AsyncSocket>>(); protected ConnectCallback wrapCallback(ConnectCallback callback, URI uri, int port) { return callback; @@ -75,47 +80,34 @@ public class AsyncSocketMiddleware extends SimpleMiddleware { proxyAddress = null; } - String computeLookup(URI uri, int port, AsyncHttpRequest request) { + String computeLookup(URI uri, int port, String proxyHost, int proxyPort) { String proxy; if (proxyHost != null) proxy = proxyHost + ":" + proxyPort; else proxy = ""; - if (request.proxyHost != null) - proxy = request.getProxyHost() + ":" + request.proxyPort; + if (proxyHost != null) + proxy = proxyHost + ":" + proxyPort; return uri.getScheme() + "//" + uri.getHost() + ":" + port + "?proxy=" + proxy; } + class IdleSocketHolder { + public IdleSocketHolder(AsyncSocket socket) { + this.socket = socket; + } + AsyncSocket socket; + long idleTime = System.currentTimeMillis(); + } + static class ConnectionInfo { int openCount; ArrayDeque<GetSocketData> queue = new ArrayDeque<GetSocketData>(); + ArrayDeque<IdleSocketHolder> sockets = new ArrayDeque<IdleSocketHolder>(); } Hashtable<String, ConnectionInfo> connectionInfo = new Hashtable<String, ConnectionInfo>(); - private static String getConnectionKey(String scheme, String host, int port) { - return scheme + "://" + host + ":" + port; - } - - public int getOpenConnectionCount(String scheme, String host, int port) { - String key = getConnectionKey(scheme, host, port); - ConnectionInfo info = connectionInfo.get(key); - if (info == null) - return 0; - return info.openCount; - } - - private ConnectionInfo getConnectionInfo(String scheme, String host, int port) { - String key = getConnectionKey(scheme, host, port); - ConnectionInfo info = connectionInfo.get(key); - if (info == null) { - info = new ConnectionInfo(); - connectionInfo.put(key, info); - } - return info; - } - int maxConnectionCount = Integer.MAX_VALUE; public int getMaxConnectionCount() { @@ -134,44 +126,38 @@ public class AsyncSocketMiddleware extends SimpleMiddleware { return null; } - ConnectionInfo info = getConnectionInfo(uri.getScheme(), uri.getHost(), port); - if (info.openCount >= maxConnectionCount) { - // wait for a connection queue to free up - SimpleCancellable queueCancel = new SimpleCancellable(); - info.queue.add(data); - return queueCancel; - } + final String lookup = computeLookup(uri, port, data.request.getProxyHost(), data.request.getProxyPort()); + ConnectionInfo info = getOrCreateConnectionInfo(lookup); + synchronized (AsyncSocketMiddleware.this) { + if (info.openCount >= maxConnectionCount) { + // wait for a connection queue to free up + SimpleCancellable queueCancel = new SimpleCancellable(); + info.queue.add(data); + return queueCancel; + } - info.openCount++; - - final String lookup = computeLookup(uri, port, data.request); - - data.state.putBoolean(getClass().getCanonicalName() + ".owned", true); - - synchronized (this) { - final HashSet<AsyncSocket> sockets = mSockets.get(lookup); - if (sockets != null) { - for (final AsyncSocket socket: sockets) { - if (socket.isOpen()) { - sockets.remove(socket); - socket.setClosedCallback(null); - - mClient.getServer().post(new Runnable() { - @Override - public void run() { - data.request.logd("Reusing keep-alive socket"); - data.connectCallback.onConnectCompleted(null, socket); - } - }); - - // replace above code with immediate callback? -// data.request.logd("Reusing keep-alive socket"); -// data.connectCallback.onConnectCompleted(null, socket); - - // just a noop/dummy, as this can't actually be cancelled. - return new SimpleCancellable(); - } + info.openCount++; + + data.state.putBoolean(getClass().getCanonicalName() + ".owned", true); + + while (!info.sockets.isEmpty()) { + IdleSocketHolder idleSocketHolder = info.sockets.pop(); + AsyncSocket socket = idleSocketHolder.socket; + if (idleSocketHolder.idleTime + idleTimeoutMs < System.currentTimeMillis()) { + socket.close(); + continue; } + if (!socket.isOpen()) + continue; + + // use this or the above? + data.request.logd("Reusing keep-alive socket"); + data.connectCallback.onConnectCompleted(null, socket); + + // just a noop/dummy, as this can't actually be cancelled. + SimpleCancellable ret = new SimpleCancellable(); + ret.setComplete(); + return ret; } } @@ -265,14 +251,29 @@ public class AsyncSocketMiddleware extends SimpleMiddleware { }); } - public int getConnectionPoolCount() { - int ret = 0; - synchronized (this) { - for (HashSet<AsyncSocket> sockets: mSockets.values()) { - ret += sockets.size(); - } + private ConnectionInfo getOrCreateConnectionInfo(String lookup) { + ConnectionInfo info = connectionInfo.get(lookup); + if (info == null) { + info = new ConnectionInfo(); + connectionInfo.put(lookup, info); + } + return info; + } + + private void maybeCleanupConnectionInfo(String lookup) { + ConnectionInfo info = connectionInfo.get(lookup); + if (info == null) + return; + while (!info.sockets.isEmpty()) { + IdleSocketHolder idleSocketHolder = info.sockets.peekLast(); + AsyncSocket socket = idleSocketHolder.socket; + if (idleSocketHolder.idleTime + idleTimeoutMs > System.currentTimeMillis()) + break; + info.sockets.pop(); + socket.close(); } - return ret; + if (info.openCount == 0 && info.queue.isEmpty() && info.sockets.isEmpty()) + connectionInfo.remove(lookup); } private void recycleSocket(final AsyncSocket socket, AsyncHttpRequest request) { @@ -280,26 +281,24 @@ public class AsyncSocketMiddleware extends SimpleMiddleware { return; URI uri = request.getUri(); int port = getSchemePort(uri); - String lookup = computeLookup(uri, port, request); - // nothing here will block... - synchronized (this) { - HashSet<AsyncSocket> sockets = mSockets.get(lookup); - if (sockets == null) { - sockets = new HashSet<AsyncSocket>(); - mSockets.put(lookup, sockets); - } - final HashSet<AsyncSocket> ss = sockets; - sockets.add(socket); - socket.setClosedCallback(new CompletedCallback() { - @Override - public void onCompleted(Exception ex) { - synchronized (AsyncSocketMiddleware.this) { - ss.remove(socket); - } + final String lookup = computeLookup(uri, port, request.getProxyHost(), request.getProxyPort()); + final ArrayDeque<IdleSocketHolder> sockets; + final IdleSocketHolder idleSocketHolder = new IdleSocketHolder(socket); + synchronized (AsyncSocketMiddleware.this) { + ConnectionInfo info = getOrCreateConnectionInfo(lookup); + sockets = info.sockets; + sockets.push(idleSocketHolder); + } + socket.setClosedCallback(new CompletedCallback() { + @Override + public void onCompleted(Exception ex) { + synchronized (AsyncSocketMiddleware.this) { + sockets.remove(idleSocketHolder); socket.setClosedCallback(null); + maybeCleanupConnectionInfo(lookup); } - }); - } + } + }); } private void idleSocket(final AsyncSocket socket) { @@ -323,23 +322,25 @@ public class AsyncSocketMiddleware extends SimpleMiddleware { }); } - private void nextConnection(URI uri) { + private void nextConnection(AsyncHttpRequest request) { + URI uri = request.getUri(); final int port = getSchemePort(uri); - String key = getConnectionKey(uri.getScheme(), uri.getHost(), port); - ConnectionInfo info = connectionInfo.get(key); - if (info == null) - return; - --info.openCount; - while (info.openCount < maxConnectionCount && info.queue.size() > 0) { - GetSocketData gsd = info.queue.remove(); - SimpleCancellable socketCancellable = (SimpleCancellable)gsd.socketCancellable; - if (socketCancellable.isCancelled()) - continue; - Cancellable connect = getSocket(gsd); - socketCancellable.setParent(connect); + String key = computeLookup(uri, port, request.getProxyHost(), request.getProxyPort()); + synchronized (AsyncSocketMiddleware.this) { + ConnectionInfo info = connectionInfo.get(key); + if (info == null) + return; + --info.openCount; + while (info.openCount < maxConnectionCount && info.queue.size() > 0) { + GetSocketData gsd = info.queue.remove(); + SimpleCancellable socketCancellable = (SimpleCancellable)gsd.socketCancellable; + if (socketCancellable.isCancelled()) + continue; + Cancellable connect = getSocket(gsd); + socketCancellable.setParent(connect); + } + maybeCleanupConnectionInfo(key); } - if (info.queue.size() == 0 && info.openCount == 0) - connectionInfo.remove(key); } @Override @@ -365,7 +366,7 @@ public class AsyncSocketMiddleware extends SimpleMiddleware { recycleSocket(data.socket, data.request); } finally { - nextConnection(data.request.getUri()); + nextConnection(data.request); } } } diff --git a/AndroidAsyncTest/src/com/koushikdutta/async/test/HttpClientTests.java b/AndroidAsyncTest/src/com/koushikdutta/async/test/HttpClientTests.java index 2b612a1..a0b778f 100644 --- a/AndroidAsyncTest/src/com/koushikdutta/async/test/HttpClientTests.java +++ b/AndroidAsyncTest/src/com/koushikdutta/async/test/HttpClientTests.java @@ -136,7 +136,7 @@ public class HttpClientTests extends TestCase { final Semaphore semaphore = new Semaphore(0); final Md5 md5 = Md5.createInstance(); AsyncHttpGet get = new AsyncHttpGet(github); -// get.setLogging("AsyncTest", Log.VERBOSE); + get.setLogging("AsyncTest", Log.VERBOSE); client.execute(get, new HttpConnectCallback() { @Override public void onConnectCompleted(Exception ex, AsyncHttpResponse response) { @@ -212,7 +212,7 @@ public class HttpClientTests extends TestCase { Future<String> future; public void testCancel() throws Exception { - future = AsyncHttpClient.getDefaultInstance().executeString(new AsyncHttpGet("http://yahoo.com"), new StringCallback() { + future = client.executeString(new AsyncHttpGet("http://yahoo.com"), new StringCallback() { @Override public void onCompleted(Exception e, AsyncHttpResponse source, String result) { fail(); |