aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorKoushik Dutta <koushd@gmail.com>2014-04-10 15:23:45 -0700
committerKoushik Dutta <koushd@gmail.com>2014-04-10 15:23:45 -0700
commit3168f78ba2cf3497cc87df8b48a962dc3f91583d (patch)
tree6db216d7c68661d27cee91cf3f97de3007516eb5
parent5f1b0660b64aa194a23b88568198b1648f961a9c (diff)
downloadAndroidAsync-3168f78ba2cf3497cc87df8b48a962dc3f91583d.tar.gz
AndroidAsync-3168f78ba2cf3497cc87df8b48a962dc3f91583d.tar.bz2
AndroidAsync-3168f78ba2cf3497cc87df8b48a962dc3f91583d.zip
AsyncServer: nslookup exceptions were not being passed to ConnectCallback
AsyncSocketMiddleware: merge keep alive socket hash with connection info hash. AsyncSocketMiddleware: only keep sockets alive for 5 minutes (tbd)
-rw-r--r--AndroidAsync/src/com/koushikdutta/async/AsyncServer.java25
-rw-r--r--AndroidAsync/src/com/koushikdutta/async/http/AsyncSocketMiddleware.java209
-rw-r--r--AndroidAsyncTest/src/com/koushikdutta/async/test/HttpClientTests.java4
3 files changed, 128 insertions, 110 deletions
diff --git a/AndroidAsync/src/com/koushikdutta/async/AsyncServer.java b/AndroidAsync/src/com/koushikdutta/async/AsyncServer.java
index 317bfcf..9a161cd 100644
--- a/AndroidAsync/src/com/koushikdutta/async/AsyncServer.java
+++ b/AndroidAsync/src/com/koushikdutta/async/AsyncServer.java
@@ -9,6 +9,7 @@ import com.koushikdutta.async.callback.ConnectCallback;
import com.koushikdutta.async.callback.ListenCallback;
import com.koushikdutta.async.future.Cancellable;
import com.koushikdutta.async.future.Future;
+import com.koushikdutta.async.future.FutureCallback;
import com.koushikdutta.async.future.SimpleFuture;
import com.koushikdutta.async.future.TransformFuture;
import com.koushikdutta.async.util.StreamUtility;
@@ -358,13 +359,24 @@ public class AsyncServer {
if (!remote.isUnresolved())
return connectResolvedInetSocketAddress(remote, callback);
- return getByName(remote.getHostName())
- .then(new TransformFuture<AsyncSocket, InetAddress>() {
+ final SimpleFuture<AsyncNetworkSocket> ret = new SimpleFuture<AsyncNetworkSocket>();
+
+ Future<InetAddress> lookup = getByName(remote.getHostName());
+ ret.setParent(lookup);
+ lookup
+ .setCallback(new FutureCallback<InetAddress>() {
@Override
- protected void transform(InetAddress result) throws Exception {
- setParent(connectResolvedInetSocketAddress(new InetSocketAddress(remote.getHostName(), remote.getPort()), callback));
+ public void onCompleted(Exception e, InetAddress result) {
+ if (e != null) {
+ callback.onConnectCompleted(e, null);
+ ret.setComplete(e);
+ return;
+ }
+
+ ret.setComplete(connectResolvedInetSocketAddress(new InetSocketAddress(remote.getHostName(), remote.getPort()), callback));
}
});
+ return ret;
}
public Cancellable connectSocket(final String host, final int port, final ConnectCallback callback) {
@@ -787,4 +799,9 @@ public class AsyncServer {
public boolean isAffinityThread() {
return mAffinity == Thread.currentThread();
}
+
+ public boolean isAffinityThreadOrStopped() {
+ Thread affinity = mAffinity;
+ return affinity == null || affinity == Thread.currentThread();
+ }
}
diff --git a/AndroidAsync/src/com/koushikdutta/async/http/AsyncSocketMiddleware.java b/AndroidAsync/src/com/koushikdutta/async/http/AsyncSocketMiddleware.java
index 9b8d4a9..16cf1ed 100644
--- a/AndroidAsync/src/com/koushikdutta/async/http/AsyncSocketMiddleware.java
+++ b/AndroidAsync/src/com/koushikdutta/async/http/AsyncSocketMiddleware.java
@@ -16,17 +16,23 @@ import com.koushikdutta.async.future.TransformFuture;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.URI;
-import java.util.HashSet;
import java.util.Hashtable;
public class AsyncSocketMiddleware extends SimpleMiddleware {
String scheme;
int port;
+ // 5 min idle timeout
+ int idleTimeoutMs = 300 * 1000;
+
public AsyncSocketMiddleware(AsyncHttpClient client, String scheme, int port) {
mClient = client;
this.scheme = scheme;
this.port = port;
}
+
+ public void setIdleTimeoutMs(int idleTimeoutMs) {
+ this.idleTimeoutMs = idleTimeoutMs;
+ }
public int getSchemePort(URI uri) {
if (uri.getScheme() == null || !uri.getScheme().equals(scheme))
@@ -44,7 +50,6 @@ public class AsyncSocketMiddleware extends SimpleMiddleware {
}
AsyncHttpClient mClient;
- private Hashtable<String, HashSet<AsyncSocket>> mSockets = new Hashtable<String, HashSet<AsyncSocket>>();
protected ConnectCallback wrapCallback(ConnectCallback callback, URI uri, int port) {
return callback;
@@ -75,47 +80,34 @@ public class AsyncSocketMiddleware extends SimpleMiddleware {
proxyAddress = null;
}
- String computeLookup(URI uri, int port, AsyncHttpRequest request) {
+ String computeLookup(URI uri, int port, String proxyHost, int proxyPort) {
String proxy;
if (proxyHost != null)
proxy = proxyHost + ":" + proxyPort;
else
proxy = "";
- if (request.proxyHost != null)
- proxy = request.getProxyHost() + ":" + request.proxyPort;
+ if (proxyHost != null)
+ proxy = proxyHost + ":" + proxyPort;
return uri.getScheme() + "//" + uri.getHost() + ":" + port + "?proxy=" + proxy;
}
+ class IdleSocketHolder {
+ public IdleSocketHolder(AsyncSocket socket) {
+ this.socket = socket;
+ }
+ AsyncSocket socket;
+ long idleTime = System.currentTimeMillis();
+ }
+
static class ConnectionInfo {
int openCount;
ArrayDeque<GetSocketData> queue = new ArrayDeque<GetSocketData>();
+ ArrayDeque<IdleSocketHolder> sockets = new ArrayDeque<IdleSocketHolder>();
}
Hashtable<String, ConnectionInfo> connectionInfo = new Hashtable<String, ConnectionInfo>();
- private static String getConnectionKey(String scheme, String host, int port) {
- return scheme + "://" + host + ":" + port;
- }
-
- public int getOpenConnectionCount(String scheme, String host, int port) {
- String key = getConnectionKey(scheme, host, port);
- ConnectionInfo info = connectionInfo.get(key);
- if (info == null)
- return 0;
- return info.openCount;
- }
-
- private ConnectionInfo getConnectionInfo(String scheme, String host, int port) {
- String key = getConnectionKey(scheme, host, port);
- ConnectionInfo info = connectionInfo.get(key);
- if (info == null) {
- info = new ConnectionInfo();
- connectionInfo.put(key, info);
- }
- return info;
- }
-
int maxConnectionCount = Integer.MAX_VALUE;
public int getMaxConnectionCount() {
@@ -134,44 +126,38 @@ public class AsyncSocketMiddleware extends SimpleMiddleware {
return null;
}
- ConnectionInfo info = getConnectionInfo(uri.getScheme(), uri.getHost(), port);
- if (info.openCount >= maxConnectionCount) {
- // wait for a connection queue to free up
- SimpleCancellable queueCancel = new SimpleCancellable();
- info.queue.add(data);
- return queueCancel;
- }
+ final String lookup = computeLookup(uri, port, data.request.getProxyHost(), data.request.getProxyPort());
+ ConnectionInfo info = getOrCreateConnectionInfo(lookup);
+ synchronized (AsyncSocketMiddleware.this) {
+ if (info.openCount >= maxConnectionCount) {
+ // wait for a connection queue to free up
+ SimpleCancellable queueCancel = new SimpleCancellable();
+ info.queue.add(data);
+ return queueCancel;
+ }
- info.openCount++;
-
- final String lookup = computeLookup(uri, port, data.request);
-
- data.state.putBoolean(getClass().getCanonicalName() + ".owned", true);
-
- synchronized (this) {
- final HashSet<AsyncSocket> sockets = mSockets.get(lookup);
- if (sockets != null) {
- for (final AsyncSocket socket: sockets) {
- if (socket.isOpen()) {
- sockets.remove(socket);
- socket.setClosedCallback(null);
-
- mClient.getServer().post(new Runnable() {
- @Override
- public void run() {
- data.request.logd("Reusing keep-alive socket");
- data.connectCallback.onConnectCompleted(null, socket);
- }
- });
-
- // replace above code with immediate callback?
-// data.request.logd("Reusing keep-alive socket");
-// data.connectCallback.onConnectCompleted(null, socket);
-
- // just a noop/dummy, as this can't actually be cancelled.
- return new SimpleCancellable();
- }
+ info.openCount++;
+
+ data.state.putBoolean(getClass().getCanonicalName() + ".owned", true);
+
+ while (!info.sockets.isEmpty()) {
+ IdleSocketHolder idleSocketHolder = info.sockets.pop();
+ AsyncSocket socket = idleSocketHolder.socket;
+ if (idleSocketHolder.idleTime + idleTimeoutMs < System.currentTimeMillis()) {
+ socket.close();
+ continue;
}
+ if (!socket.isOpen())
+ continue;
+
+ // use this or the above?
+ data.request.logd("Reusing keep-alive socket");
+ data.connectCallback.onConnectCompleted(null, socket);
+
+ // just a noop/dummy, as this can't actually be cancelled.
+ SimpleCancellable ret = new SimpleCancellable();
+ ret.setComplete();
+ return ret;
}
}
@@ -265,14 +251,29 @@ public class AsyncSocketMiddleware extends SimpleMiddleware {
});
}
- public int getConnectionPoolCount() {
- int ret = 0;
- synchronized (this) {
- for (HashSet<AsyncSocket> sockets: mSockets.values()) {
- ret += sockets.size();
- }
+ private ConnectionInfo getOrCreateConnectionInfo(String lookup) {
+ ConnectionInfo info = connectionInfo.get(lookup);
+ if (info == null) {
+ info = new ConnectionInfo();
+ connectionInfo.put(lookup, info);
+ }
+ return info;
+ }
+
+ private void maybeCleanupConnectionInfo(String lookup) {
+ ConnectionInfo info = connectionInfo.get(lookup);
+ if (info == null)
+ return;
+ while (!info.sockets.isEmpty()) {
+ IdleSocketHolder idleSocketHolder = info.sockets.peekLast();
+ AsyncSocket socket = idleSocketHolder.socket;
+ if (idleSocketHolder.idleTime + idleTimeoutMs > System.currentTimeMillis())
+ break;
+ info.sockets.pop();
+ socket.close();
}
- return ret;
+ if (info.openCount == 0 && info.queue.isEmpty() && info.sockets.isEmpty())
+ connectionInfo.remove(lookup);
}
private void recycleSocket(final AsyncSocket socket, AsyncHttpRequest request) {
@@ -280,26 +281,24 @@ public class AsyncSocketMiddleware extends SimpleMiddleware {
return;
URI uri = request.getUri();
int port = getSchemePort(uri);
- String lookup = computeLookup(uri, port, request);
- // nothing here will block...
- synchronized (this) {
- HashSet<AsyncSocket> sockets = mSockets.get(lookup);
- if (sockets == null) {
- sockets = new HashSet<AsyncSocket>();
- mSockets.put(lookup, sockets);
- }
- final HashSet<AsyncSocket> ss = sockets;
- sockets.add(socket);
- socket.setClosedCallback(new CompletedCallback() {
- @Override
- public void onCompleted(Exception ex) {
- synchronized (AsyncSocketMiddleware.this) {
- ss.remove(socket);
- }
+ final String lookup = computeLookup(uri, port, request.getProxyHost(), request.getProxyPort());
+ final ArrayDeque<IdleSocketHolder> sockets;
+ final IdleSocketHolder idleSocketHolder = new IdleSocketHolder(socket);
+ synchronized (AsyncSocketMiddleware.this) {
+ ConnectionInfo info = getOrCreateConnectionInfo(lookup);
+ sockets = info.sockets;
+ sockets.push(idleSocketHolder);
+ }
+ socket.setClosedCallback(new CompletedCallback() {
+ @Override
+ public void onCompleted(Exception ex) {
+ synchronized (AsyncSocketMiddleware.this) {
+ sockets.remove(idleSocketHolder);
socket.setClosedCallback(null);
+ maybeCleanupConnectionInfo(lookup);
}
- });
- }
+ }
+ });
}
private void idleSocket(final AsyncSocket socket) {
@@ -323,23 +322,25 @@ public class AsyncSocketMiddleware extends SimpleMiddleware {
});
}
- private void nextConnection(URI uri) {
+ private void nextConnection(AsyncHttpRequest request) {
+ URI uri = request.getUri();
final int port = getSchemePort(uri);
- String key = getConnectionKey(uri.getScheme(), uri.getHost(), port);
- ConnectionInfo info = connectionInfo.get(key);
- if (info == null)
- return;
- --info.openCount;
- while (info.openCount < maxConnectionCount && info.queue.size() > 0) {
- GetSocketData gsd = info.queue.remove();
- SimpleCancellable socketCancellable = (SimpleCancellable)gsd.socketCancellable;
- if (socketCancellable.isCancelled())
- continue;
- Cancellable connect = getSocket(gsd);
- socketCancellable.setParent(connect);
+ String key = computeLookup(uri, port, request.getProxyHost(), request.getProxyPort());
+ synchronized (AsyncSocketMiddleware.this) {
+ ConnectionInfo info = connectionInfo.get(key);
+ if (info == null)
+ return;
+ --info.openCount;
+ while (info.openCount < maxConnectionCount && info.queue.size() > 0) {
+ GetSocketData gsd = info.queue.remove();
+ SimpleCancellable socketCancellable = (SimpleCancellable)gsd.socketCancellable;
+ if (socketCancellable.isCancelled())
+ continue;
+ Cancellable connect = getSocket(gsd);
+ socketCancellable.setParent(connect);
+ }
+ maybeCleanupConnectionInfo(key);
}
- if (info.queue.size() == 0 && info.openCount == 0)
- connectionInfo.remove(key);
}
@Override
@@ -365,7 +366,7 @@ public class AsyncSocketMiddleware extends SimpleMiddleware {
recycleSocket(data.socket, data.request);
}
finally {
- nextConnection(data.request.getUri());
+ nextConnection(data.request);
}
}
}
diff --git a/AndroidAsyncTest/src/com/koushikdutta/async/test/HttpClientTests.java b/AndroidAsyncTest/src/com/koushikdutta/async/test/HttpClientTests.java
index 2b612a1..a0b778f 100644
--- a/AndroidAsyncTest/src/com/koushikdutta/async/test/HttpClientTests.java
+++ b/AndroidAsyncTest/src/com/koushikdutta/async/test/HttpClientTests.java
@@ -136,7 +136,7 @@ public class HttpClientTests extends TestCase {
final Semaphore semaphore = new Semaphore(0);
final Md5 md5 = Md5.createInstance();
AsyncHttpGet get = new AsyncHttpGet(github);
-// get.setLogging("AsyncTest", Log.VERBOSE);
+ get.setLogging("AsyncTest", Log.VERBOSE);
client.execute(get, new HttpConnectCallback() {
@Override
public void onConnectCompleted(Exception ex, AsyncHttpResponse response) {
@@ -212,7 +212,7 @@ public class HttpClientTests extends TestCase {
Future<String> future;
public void testCancel() throws Exception {
- future = AsyncHttpClient.getDefaultInstance().executeString(new AsyncHttpGet("http://yahoo.com"), new StringCallback() {
+ future = client.executeString(new AsyncHttpGet("http://yahoo.com"), new StringCallback() {
@Override
public void onCompleted(Exception e, AsyncHttpResponse source, String result) {
fail();