aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorKoushik Dutta <koushd@gmail.com>2013-03-19 21:44:54 -0700
committerKoushik Dutta <koushd@gmail.com>2013-03-19 21:44:54 -0700
commitd8fa4bba0c2e8962cb348ce459c94763b88ee651 (patch)
tree46c983d144e7160bb7f6224edb12af0288d860fa
parent3638a69e3640bd2dbd7e004ba51840ddd01bca57 (diff)
downloadAndroidAsync-d8fa4bba0c2e8962cb348ce459c94763b88ee651.tar.gz
AndroidAsync-d8fa4bba0c2e8962cb348ce459c94763b88ee651.tar.bz2
AndroidAsync-d8fa4bba0c2e8962cb348ce459c94763b88ee651.zip
wip
-rw-r--r--AndroidAsync/src/com/koushikdutta/async/AsyncServer.java33
-rw-r--r--AndroidAsync/src/com/koushikdutta/async/BufferedDataEmitter.java22
-rw-r--r--AndroidAsync/src/com/koushikdutta/async/FilteredDataEmitter.java71
-rw-r--r--AndroidAsync/src/com/koushikdutta/async/WrapperSocket.java104
-rw-r--r--AndroidAsync/src/com/koushikdutta/async/http/AsyncHttpClient.java54
-rw-r--r--AndroidAsync/src/com/koushikdutta/async/http/AsyncHttpClientMiddleware.java8
-rw-r--r--AndroidAsync/src/com/koushikdutta/async/http/AsyncHttpResponseImpl.java14
-rw-r--r--AndroidAsync/src/com/koushikdutta/async/http/server/AsyncHttpServerResponseImpl.java2
-rw-r--r--AndroidAsyncSample/src/com/koushikdutta/async/sample/MainActivity.java6
9 files changed, 260 insertions, 54 deletions
diff --git a/AndroidAsync/src/com/koushikdutta/async/AsyncServer.java b/AndroidAsync/src/com/koushikdutta/async/AsyncServer.java
index 43c2dd4..b198a88 100644
--- a/AndroidAsync/src/com/koushikdutta/async/AsyncServer.java
+++ b/AndroidAsync/src/com/koushikdutta/async/AsyncServer.java
@@ -123,7 +123,9 @@ public class AsyncServer {
}
}
- private static void runQueue(LinkedList<Scheduled> queue) {
+ private static final long DEFAULT_WAIT = 100;
+ private static long runQueue(LinkedList<Scheduled> queue) {
+ long wait = DEFAULT_WAIT;
long now = System.currentTimeMillis();
LinkedList<Scheduled> later = null;
while (queue.size() > 0) {
@@ -131,6 +133,7 @@ public class AsyncServer {
if (s.time < now)
s.runnable.run();
else {
+ wait = Math.min(wait, s.time - now);
if (later == null)
later = new LinkedList<AsyncServer.Scheduled>();
later.add(s);
@@ -138,6 +141,8 @@ public class AsyncServer {
}
if (later != null)
queue.addAll(later);
+
+ return wait;
}
private static class Scheduled {
@@ -235,15 +240,17 @@ public class AsyncServer {
return new SimpleCancelable() {
@Override
public Cancelable cancel() {
- synchronized (this) {
- super.cancel();
- try {
- sc.close();
- }
- catch (IOException e) {
+ run(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ sc.close();
+ }
+ catch (IOException e) {
+ }
}
- return this;
- }
+ });
+ return this;
}
};
}
@@ -410,13 +417,13 @@ public class AsyncServer {
}
}
- private static void lockAndRunQueue(AsyncServer server, LinkedList<Scheduled> queue) {
+ private static long lockAndRunQueue(AsyncServer server, LinkedList<Scheduled> queue) {
LinkedList<Scheduled> copy;
synchronized (server) {
copy = new LinkedList<Scheduled>(queue);
queue.clear();
}
- runQueue(copy);
+ return runQueue(copy);
}
private static void runLoop(AsyncServer server, Selector selector, LinkedList<Scheduled> queue, boolean keepRunning) throws IOException {
@@ -424,7 +431,7 @@ public class AsyncServer {
boolean needsSelect = true;
// run the queue to populate the selector with keys
- lockAndRunQueue(server, queue);
+ long wait = lockAndRunQueue(server, queue);
synchronized (server) {
// select now to see if anything is ready immediately. this
// also clears the canceled key queue.
@@ -444,7 +451,7 @@ public class AsyncServer {
if (needsSelect) {
// nothing to select immediately but there so let's block and wait.
- selector.select(100);
+ selector.select(wait);
}
// process whatever keys are ready
diff --git a/AndroidAsync/src/com/koushikdutta/async/BufferedDataEmitter.java b/AndroidAsync/src/com/koushikdutta/async/BufferedDataEmitter.java
index fa501f7..4c7f8e0 100644
--- a/AndroidAsync/src/com/koushikdutta/async/BufferedDataEmitter.java
+++ b/AndroidAsync/src/com/koushikdutta/async/BufferedDataEmitter.java
@@ -8,11 +8,27 @@ public class BufferedDataEmitter implements DataEmitter, DataCallback {
public BufferedDataEmitter(DataEmitter emitter) {
mEmitter = emitter;
mEmitter.setDataCallback(this);
+
+ mEmitter.setEndCallback(new CompletedCallback() {
+ @Override
+ public void onCompleted(Exception ex) {
+ mEnded = true;
+ mEndException = ex;
+ if (mBuffers.remaining() == 0 && mEndCallback != null)
+ mEndCallback.onCompleted(ex);
+ }
+ });
}
+ boolean mEnded = false;
+ Exception mEndException;
+
public void onDataAvailable() {
if (mDataCallback != null && !mPaused && mBuffers.remaining() > 0)
mDataCallback.onDataAvailable(this, mBuffers);
+
+ if (mEnded && mBuffers.remaining() == 0)
+ mEndCallback.onCompleted(mEndException);
}
ByteBufferList mBuffers = new ByteBufferList();
@@ -60,13 +76,15 @@ public class BufferedDataEmitter implements DataEmitter, DataCallback {
return mPaused;
}
+
+ CompletedCallback mEndCallback;
@Override
public void setEndCallback(CompletedCallback callback) {
- mEmitter.setEndCallback(callback);
+ mEndCallback = callback;
}
@Override
public CompletedCallback getEndCallback() {
- return mEmitter.getEndCallback();
+ return mEndCallback;
}
}
diff --git a/AndroidAsync/src/com/koushikdutta/async/FilteredDataEmitter.java b/AndroidAsync/src/com/koushikdutta/async/FilteredDataEmitter.java
new file mode 100644
index 0000000..c4409c6
--- /dev/null
+++ b/AndroidAsync/src/com/koushikdutta/async/FilteredDataEmitter.java
@@ -0,0 +1,71 @@
+package com.koushikdutta.async;
+
+import com.koushikdutta.async.callback.CompletedCallback;
+import com.koushikdutta.async.callback.DataCallback;
+
+public class FilteredDataEmitter implements DataEmitter {
+ DataEmitter mEmitter;
+ public DataEmitter getDataEmitter() {
+ return mEmitter;
+ }
+
+ public void setDataEmitter(DataEmitter emitter) {
+ if (mEmitter != null) {
+ mEmitter.setDataCallback(null);
+ }
+ mEmitter = emitter;
+ mEmitter.setDataCallback(new DataCallback() {
+ @Override
+ public void onDataAvailable(DataEmitter emitter, ByteBufferList bb) {
+ FilteredDataEmitter.this.onDataAvailable(emitter, bb);
+ }
+ });
+ }
+
+ protected void onDataAvailable(DataEmitter emitter, ByteBufferList bb) {
+ Util.emitAllData(this, bb);
+ // if there's data after the emitting, and it is paused... the underlying implementation
+ // is obligated to cache the byte buffer list.
+ }
+
+ DataCallback mDataCallback;
+ @Override
+ public void setDataCallback(DataCallback callback) {
+ mDataCallback = callback;
+ }
+
+ @Override
+ public DataCallback getDataCallback() {
+ return mDataCallback;
+ }
+
+ @Override
+ public boolean isChunked() {
+ return mEmitter.isChunked();
+ }
+
+ @Override
+ public void pause() {
+ mEmitter.pause();
+ }
+
+ @Override
+ public void resume() {
+ mEmitter.resume();
+ }
+
+ @Override
+ public boolean isPaused() {
+ return mEmitter.isPaused();
+ }
+
+ @Override
+ public void setEndCallback(CompletedCallback callback) {
+ mEmitter.setEndCallback(callback);
+ }
+
+ @Override
+ public CompletedCallback getEndCallback() {
+ return mEmitter.getEndCallback();
+ }
+}
diff --git a/AndroidAsync/src/com/koushikdutta/async/WrapperSocket.java b/AndroidAsync/src/com/koushikdutta/async/WrapperSocket.java
new file mode 100644
index 0000000..41afe36
--- /dev/null
+++ b/AndroidAsync/src/com/koushikdutta/async/WrapperSocket.java
@@ -0,0 +1,104 @@
+package com.koushikdutta.async;
+
+import java.nio.ByteBuffer;
+
+import com.koushikdutta.async.callback.CompletedCallback;
+import com.koushikdutta.async.callback.DataCallback;
+import com.koushikdutta.async.callback.WritableCallback;
+
+public class WrapperSocket implements AsyncSocket {
+ private AsyncSocket mSocket;
+ public void setSocket(AsyncSocket socket) {
+ mSocket = socket;
+ }
+
+ public AsyncSocket getSocket() {
+ return mSocket;
+ }
+
+ @Override
+ public void setDataCallback(DataCallback callback) {
+ mSocket.setDataCallback(callback);
+ }
+
+ @Override
+ public DataCallback getDataCallback() {
+ return mSocket.getDataCallback();
+ }
+
+ @Override
+ public boolean isChunked() {
+ return mSocket.isChunked();
+ }
+
+ @Override
+ public void pause() {
+ mSocket.pause();
+ }
+
+ @Override
+ public void resume() {
+ mSocket.resume();
+ }
+
+ @Override
+ public boolean isPaused() {
+ return mSocket.isPaused();
+ }
+
+ @Override
+ public void setEndCallback(CompletedCallback callback) {
+ mSocket.setEndCallback(callback);
+ }
+
+ @Override
+ public CompletedCallback getEndCallback() {
+ return mSocket.getEndCallback();
+ }
+
+ @Override
+ public void write(ByteBuffer bb) {
+ mSocket.write(bb);
+ }
+
+ @Override
+ public void write(ByteBufferList bb) {
+ mSocket.write(bb);
+ }
+
+ @Override
+ public void setWriteableCallback(WritableCallback handler) {
+ mSocket.setWriteableCallback(handler);
+ }
+
+ @Override
+ public WritableCallback getWriteableCallback() {
+ return getWriteableCallback();
+ }
+
+ @Override
+ public boolean isOpen() {
+ return mSocket.isOpen();
+ }
+
+ @Override
+ public void close() {
+ mSocket.close();
+ }
+
+ @Override
+ public void setClosedCallback(CompletedCallback handler) {
+ mSocket.setClosedCallback(handler);
+ }
+
+ @Override
+ public CompletedCallback getClosedCallback() {
+ return mSocket.getClosedCallback();
+ }
+
+ @Override
+ public AsyncServer getServer() {
+ return mSocket.getServer();
+ }
+
+}
diff --git a/AndroidAsync/src/com/koushikdutta/async/http/AsyncHttpClient.java b/AndroidAsync/src/com/koushikdutta/async/http/AsyncHttpClient.java
index ca654dd..5a9b6a7 100644
--- a/AndroidAsync/src/com/koushikdutta/async/http/AsyncHttpClient.java
+++ b/AndroidAsync/src/com/koushikdutta/async/http/AsyncHttpClient.java
@@ -9,6 +9,7 @@ import java.net.URI;
import java.nio.ByteBuffer;
import java.util.HashSet;
import java.util.Hashtable;
+import java.util.concurrent.CancellationException;
import java.util.concurrent.TimeoutException;
import org.json.JSONException;
@@ -54,12 +55,33 @@ public class AsyncHttpClient {
}
public Cancelable execute(final AsyncHttpRequest request, final HttpConnectCallback callback) {
- CancelableImpl ret = new CancelableImpl();
+ CancelableImpl ret;
execute(request, callback, 0, ret = new CancelableImpl());
return ret;
}
+ private static final String LOGTAG = "AsyncHttp";
private static class CancelableImpl extends SimpleCancelable {
+ public HttpConnectCallback callback;
+ public Cancelable socketCancelable;
+ public AsyncSocket socket;
+
+ @Override
+ public Cancelable cancel() {
+ super.cancel();
+
+ if (socketCancelable != null) {
+ socketCancelable.cancel();
+ }
+
+ if (socket != null)
+ socket.close();
+
+ // call this?
+ callback.onConnectCompleted(new CancellationException(), null);
+
+ return this;
+ }
}
private void reportConnectedCompleted(CancelableImpl cancel, Exception ex, AsyncHttpResponseImpl response, final HttpConnectCallback callback) {
@@ -88,7 +110,6 @@ public class AsyncHttpClient {
final int finalPort = port;
final InternalConnectCallback socketConnected = new InternalConnectCallback() {
- AsyncSocket cancelSocket;
Object scheduled;
{
if (request.getTimeout() > 0) {
@@ -96,8 +117,6 @@ public class AsyncHttpClient {
@Override
public void run() {
cancel.cancel();
- if (cancelSocket != null)
- cancelSocket.close();
reportConnectedCompleted(cancel, new TimeoutException(), null, callback);
}
}, request.getTimeout());
@@ -114,7 +133,7 @@ public class AsyncHttpClient {
socket.close();
return;
}
- cancelSocket = socket;
+ cancel.socket = socket;
if (ex != null) {
reportConnectedCompleted(cancel, ex, null, callback);
return;
@@ -233,7 +252,7 @@ public class AsyncHttpClient {
HashSet<AsyncSocket> sockets = mSockets.get(lookup);
if (sockets != null) {
-// synchronized (sockets) {
+ synchronized (sockets) {
for (final AsyncSocket socket: sockets) {
if (socket.isOpen()) {
sockets.remove(socket);
@@ -241,7 +260,7 @@ public class AsyncHttpClient {
mServer.post(new Runnable() {
@Override
public void run() {
- Log.i("Async", "Reusing socket.");
+// Log.i(LOGTAG, "Reusing socket.");
socketConnected.reused = true;
socketConnected.onConnectCompleted(null, socket);
}
@@ -249,9 +268,9 @@ public class AsyncHttpClient {
return;
}
}
-// }
+ }
}
- mServer.connectSocket(uri.getHost(), port, socketConnected);
+ cancel.socketCancelable = mServer.connectSocket(uri.getHost(), port, socketConnected);
}
public Cancelable execute(URI uri, final HttpConnectCallback callback) {
@@ -373,7 +392,7 @@ public class AsyncHttpClient {
public Cancelable execute(AsyncHttpRequest req, final String filename, final FileCallback callback) {
final Handler handler = Looper.myLooper() == null ? null : new Handler();
final File file = new File(filename);
- final CancelableRequest cancel = new CancelableRequest() {
+ final CancelableImpl cancel = new CancelableImpl() {
@Override
public Cancelable cancel() {
Cancelable ret = super.cancel();
@@ -404,7 +423,6 @@ public class AsyncHttpClient {
invoke(handler, callback, AsyncServer.getDefault(), response, ex, null);
return;
}
- cancel.response = response;
final int contentLength = response.getHeaders().getContentLength();
@@ -439,20 +457,9 @@ public class AsyncHttpClient {
return cancel;
}
- private static class CancelableRequest extends CancelableImpl {
- AsyncHttpResponse response;
- @Override
- public Cancelable cancel() {
- Cancelable ret = super.cancel();
- if (response != null)
- response.close();
- return ret;
- }
- }
-
private Cancelable execute(AsyncHttpRequest req, final RequestCallback callback, final ResultConvert convert) {
final Handler handler = Looper.myLooper() == null ? null : new Handler();
- final CancelableRequest cancel = new CancelableRequest();
+ final CancelableImpl cancel = new CancelableImpl();
execute(req, new HttpConnectCallback() {
int mDownloaded = 0;
ByteBufferList buffer = new ByteBufferList();
@@ -462,7 +469,6 @@ public class AsyncHttpClient {
invoke(handler, callback, AsyncServer.getDefault(), response, ex, null);
return;
}
- cancel.response = response;
final int contentLength = response.getHeaders().getContentLength();
diff --git a/AndroidAsync/src/com/koushikdutta/async/http/AsyncHttpClientMiddleware.java b/AndroidAsync/src/com/koushikdutta/async/http/AsyncHttpClientMiddleware.java
new file mode 100644
index 0000000..59756f8
--- /dev/null
+++ b/AndroidAsync/src/com/koushikdutta/async/http/AsyncHttpClientMiddleware.java
@@ -0,0 +1,8 @@
+package com.koushikdutta.async.http;
+
+import com.koushikdutta.async.AsyncSocket;
+
+public interface AsyncHttpClientMiddleware {
+ public AsyncSocket getSocket(final AsyncHttpRequest request, final HttpConnectCallback callback);
+ public AsyncSocket onSocket(AsyncSocket socket);
+}
diff --git a/AndroidAsync/src/com/koushikdutta/async/http/AsyncHttpResponseImpl.java b/AndroidAsync/src/com/koushikdutta/async/http/AsyncHttpResponseImpl.java
index c5de2e8..1719f42 100644
--- a/AndroidAsync/src/com/koushikdutta/async/http/AsyncHttpResponseImpl.java
+++ b/AndroidAsync/src/com/koushikdutta/async/http/AsyncHttpResponseImpl.java
@@ -130,9 +130,6 @@ abstract class AsyncHttpResponseImpl extends FilteredDataCallback implements Asy
mSocket.setClosedCallback(null);
mSocket.setEndCallback(null);
mCompleted = true;
-// System.out.println("closing up shop");
-// if (mCompletedCallback != null)
-// mCompletedCallback.onCompleted(e);
}
private AsyncHttpRequest mRequest;
@@ -143,17 +140,6 @@ abstract class AsyncHttpResponseImpl extends FilteredDataCallback implements Asy
}
boolean mCompleted = false;
-//
-// CompletedCallback mCompletedCallback;
-// @Override
-// public void setCompletedCallback(CompletedCallback handler) {
-// mCompletedCallback = handler;
-// }
-//
-// @Override
-// public CompletedCallback getCompletedCallback() {
-// return mCompletedCallback;
-// }
@Override
public ResponseHeaders getHeaders() {
diff --git a/AndroidAsync/src/com/koushikdutta/async/http/server/AsyncHttpServerResponseImpl.java b/AndroidAsync/src/com/koushikdutta/async/http/server/AsyncHttpServerResponseImpl.java
index 0d31260..d822292 100644
--- a/AndroidAsync/src/com/koushikdutta/async/http/server/AsyncHttpServerResponseImpl.java
+++ b/AndroidAsync/src/com/koushikdutta/async/http/server/AsyncHttpServerResponseImpl.java
@@ -59,7 +59,7 @@ public class AsyncHttpServerResponseImpl implements AsyncHttpServerResponse {
}
boolean mHasWritten = false;
- FilteredDataSink mChunker;
+ ChunkedOutputFilter mChunker;
void initFirstWrite() {
if (mHasWritten)
return;
diff --git a/AndroidAsyncSample/src/com/koushikdutta/async/sample/MainActivity.java b/AndroidAsyncSample/src/com/koushikdutta/async/sample/MainActivity.java
index d7c30eb..a15ce92 100644
--- a/AndroidAsyncSample/src/com/koushikdutta/async/sample/MainActivity.java
+++ b/AndroidAsyncSample/src/com/koushikdutta/async/sample/MainActivity.java
@@ -56,6 +56,12 @@ public class MainActivity extends Activity {
private void getFile(final ImageView iv, String url, final String filename) {
AsyncHttpClient.getDefaultInstance().get(url, filename, new AsyncHttpClient.FileCallback() {
@Override
+ public void onProgress(AsyncHttpResponse response, int downloaded, int total) {
+ response.pause();
+ super.onProgress(response, downloaded, total);
+ }
+
+ @Override
public void onCompleted(Exception e, AsyncHttpResponse response, File result) {
if (e != null) {
e.printStackTrace();