diff options
author | Koushik Dutta <koushd@gmail.com> | 2013-03-19 21:44:54 -0700 |
---|---|---|
committer | Koushik Dutta <koushd@gmail.com> | 2013-03-19 21:44:54 -0700 |
commit | d8fa4bba0c2e8962cb348ce459c94763b88ee651 (patch) | |
tree | 46c983d144e7160bb7f6224edb12af0288d860fa | |
parent | 3638a69e3640bd2dbd7e004ba51840ddd01bca57 (diff) | |
download | AndroidAsync-d8fa4bba0c2e8962cb348ce459c94763b88ee651.tar.gz AndroidAsync-d8fa4bba0c2e8962cb348ce459c94763b88ee651.tar.bz2 AndroidAsync-d8fa4bba0c2e8962cb348ce459c94763b88ee651.zip |
wip
9 files changed, 260 insertions, 54 deletions
diff --git a/AndroidAsync/src/com/koushikdutta/async/AsyncServer.java b/AndroidAsync/src/com/koushikdutta/async/AsyncServer.java index 43c2dd4..b198a88 100644 --- a/AndroidAsync/src/com/koushikdutta/async/AsyncServer.java +++ b/AndroidAsync/src/com/koushikdutta/async/AsyncServer.java @@ -123,7 +123,9 @@ public class AsyncServer { } } - private static void runQueue(LinkedList<Scheduled> queue) { + private static final long DEFAULT_WAIT = 100; + private static long runQueue(LinkedList<Scheduled> queue) { + long wait = DEFAULT_WAIT; long now = System.currentTimeMillis(); LinkedList<Scheduled> later = null; while (queue.size() > 0) { @@ -131,6 +133,7 @@ public class AsyncServer { if (s.time < now) s.runnable.run(); else { + wait = Math.min(wait, s.time - now); if (later == null) later = new LinkedList<AsyncServer.Scheduled>(); later.add(s); @@ -138,6 +141,8 @@ public class AsyncServer { } if (later != null) queue.addAll(later); + + return wait; } private static class Scheduled { @@ -235,15 +240,17 @@ public class AsyncServer { return new SimpleCancelable() { @Override public Cancelable cancel() { - synchronized (this) { - super.cancel(); - try { - sc.close(); - } - catch (IOException e) { + run(new Runnable() { + @Override + public void run() { + try { + sc.close(); + } + catch (IOException e) { + } } - return this; - } + }); + return this; } }; } @@ -410,13 +417,13 @@ public class AsyncServer { } } - private static void lockAndRunQueue(AsyncServer server, LinkedList<Scheduled> queue) { + private static long lockAndRunQueue(AsyncServer server, LinkedList<Scheduled> queue) { LinkedList<Scheduled> copy; synchronized (server) { copy = new LinkedList<Scheduled>(queue); queue.clear(); } - runQueue(copy); + return runQueue(copy); } private static void runLoop(AsyncServer server, Selector selector, LinkedList<Scheduled> queue, boolean keepRunning) throws IOException { @@ -424,7 +431,7 @@ public class AsyncServer { boolean needsSelect = true; // run the queue to populate the selector with keys - lockAndRunQueue(server, queue); + long wait = lockAndRunQueue(server, queue); synchronized (server) { // select now to see if anything is ready immediately. this // also clears the canceled key queue. @@ -444,7 +451,7 @@ public class AsyncServer { if (needsSelect) { // nothing to select immediately but there so let's block and wait. - selector.select(100); + selector.select(wait); } // process whatever keys are ready diff --git a/AndroidAsync/src/com/koushikdutta/async/BufferedDataEmitter.java b/AndroidAsync/src/com/koushikdutta/async/BufferedDataEmitter.java index fa501f7..4c7f8e0 100644 --- a/AndroidAsync/src/com/koushikdutta/async/BufferedDataEmitter.java +++ b/AndroidAsync/src/com/koushikdutta/async/BufferedDataEmitter.java @@ -8,11 +8,27 @@ public class BufferedDataEmitter implements DataEmitter, DataCallback { public BufferedDataEmitter(DataEmitter emitter) { mEmitter = emitter; mEmitter.setDataCallback(this); + + mEmitter.setEndCallback(new CompletedCallback() { + @Override + public void onCompleted(Exception ex) { + mEnded = true; + mEndException = ex; + if (mBuffers.remaining() == 0 && mEndCallback != null) + mEndCallback.onCompleted(ex); + } + }); } + boolean mEnded = false; + Exception mEndException; + public void onDataAvailable() { if (mDataCallback != null && !mPaused && mBuffers.remaining() > 0) mDataCallback.onDataAvailable(this, mBuffers); + + if (mEnded && mBuffers.remaining() == 0) + mEndCallback.onCompleted(mEndException); } ByteBufferList mBuffers = new ByteBufferList(); @@ -60,13 +76,15 @@ public class BufferedDataEmitter implements DataEmitter, DataCallback { return mPaused; } + + CompletedCallback mEndCallback; @Override public void setEndCallback(CompletedCallback callback) { - mEmitter.setEndCallback(callback); + mEndCallback = callback; } @Override public CompletedCallback getEndCallback() { - return mEmitter.getEndCallback(); + return mEndCallback; } } diff --git a/AndroidAsync/src/com/koushikdutta/async/FilteredDataEmitter.java b/AndroidAsync/src/com/koushikdutta/async/FilteredDataEmitter.java new file mode 100644 index 0000000..c4409c6 --- /dev/null +++ b/AndroidAsync/src/com/koushikdutta/async/FilteredDataEmitter.java @@ -0,0 +1,71 @@ +package com.koushikdutta.async; + +import com.koushikdutta.async.callback.CompletedCallback; +import com.koushikdutta.async.callback.DataCallback; + +public class FilteredDataEmitter implements DataEmitter { + DataEmitter mEmitter; + public DataEmitter getDataEmitter() { + return mEmitter; + } + + public void setDataEmitter(DataEmitter emitter) { + if (mEmitter != null) { + mEmitter.setDataCallback(null); + } + mEmitter = emitter; + mEmitter.setDataCallback(new DataCallback() { + @Override + public void onDataAvailable(DataEmitter emitter, ByteBufferList bb) { + FilteredDataEmitter.this.onDataAvailable(emitter, bb); + } + }); + } + + protected void onDataAvailable(DataEmitter emitter, ByteBufferList bb) { + Util.emitAllData(this, bb); + // if there's data after the emitting, and it is paused... the underlying implementation + // is obligated to cache the byte buffer list. + } + + DataCallback mDataCallback; + @Override + public void setDataCallback(DataCallback callback) { + mDataCallback = callback; + } + + @Override + public DataCallback getDataCallback() { + return mDataCallback; + } + + @Override + public boolean isChunked() { + return mEmitter.isChunked(); + } + + @Override + public void pause() { + mEmitter.pause(); + } + + @Override + public void resume() { + mEmitter.resume(); + } + + @Override + public boolean isPaused() { + return mEmitter.isPaused(); + } + + @Override + public void setEndCallback(CompletedCallback callback) { + mEmitter.setEndCallback(callback); + } + + @Override + public CompletedCallback getEndCallback() { + return mEmitter.getEndCallback(); + } +} diff --git a/AndroidAsync/src/com/koushikdutta/async/WrapperSocket.java b/AndroidAsync/src/com/koushikdutta/async/WrapperSocket.java new file mode 100644 index 0000000..41afe36 --- /dev/null +++ b/AndroidAsync/src/com/koushikdutta/async/WrapperSocket.java @@ -0,0 +1,104 @@ +package com.koushikdutta.async; + +import java.nio.ByteBuffer; + +import com.koushikdutta.async.callback.CompletedCallback; +import com.koushikdutta.async.callback.DataCallback; +import com.koushikdutta.async.callback.WritableCallback; + +public class WrapperSocket implements AsyncSocket { + private AsyncSocket mSocket; + public void setSocket(AsyncSocket socket) { + mSocket = socket; + } + + public AsyncSocket getSocket() { + return mSocket; + } + + @Override + public void setDataCallback(DataCallback callback) { + mSocket.setDataCallback(callback); + } + + @Override + public DataCallback getDataCallback() { + return mSocket.getDataCallback(); + } + + @Override + public boolean isChunked() { + return mSocket.isChunked(); + } + + @Override + public void pause() { + mSocket.pause(); + } + + @Override + public void resume() { + mSocket.resume(); + } + + @Override + public boolean isPaused() { + return mSocket.isPaused(); + } + + @Override + public void setEndCallback(CompletedCallback callback) { + mSocket.setEndCallback(callback); + } + + @Override + public CompletedCallback getEndCallback() { + return mSocket.getEndCallback(); + } + + @Override + public void write(ByteBuffer bb) { + mSocket.write(bb); + } + + @Override + public void write(ByteBufferList bb) { + mSocket.write(bb); + } + + @Override + public void setWriteableCallback(WritableCallback handler) { + mSocket.setWriteableCallback(handler); + } + + @Override + public WritableCallback getWriteableCallback() { + return getWriteableCallback(); + } + + @Override + public boolean isOpen() { + return mSocket.isOpen(); + } + + @Override + public void close() { + mSocket.close(); + } + + @Override + public void setClosedCallback(CompletedCallback handler) { + mSocket.setClosedCallback(handler); + } + + @Override + public CompletedCallback getClosedCallback() { + return mSocket.getClosedCallback(); + } + + @Override + public AsyncServer getServer() { + return mSocket.getServer(); + } + +} diff --git a/AndroidAsync/src/com/koushikdutta/async/http/AsyncHttpClient.java b/AndroidAsync/src/com/koushikdutta/async/http/AsyncHttpClient.java index ca654dd..5a9b6a7 100644 --- a/AndroidAsync/src/com/koushikdutta/async/http/AsyncHttpClient.java +++ b/AndroidAsync/src/com/koushikdutta/async/http/AsyncHttpClient.java @@ -9,6 +9,7 @@ import java.net.URI; import java.nio.ByteBuffer; import java.util.HashSet; import java.util.Hashtable; +import java.util.concurrent.CancellationException; import java.util.concurrent.TimeoutException; import org.json.JSONException; @@ -54,12 +55,33 @@ public class AsyncHttpClient { } public Cancelable execute(final AsyncHttpRequest request, final HttpConnectCallback callback) { - CancelableImpl ret = new CancelableImpl(); + CancelableImpl ret; execute(request, callback, 0, ret = new CancelableImpl()); return ret; } + private static final String LOGTAG = "AsyncHttp"; private static class CancelableImpl extends SimpleCancelable { + public HttpConnectCallback callback; + public Cancelable socketCancelable; + public AsyncSocket socket; + + @Override + public Cancelable cancel() { + super.cancel(); + + if (socketCancelable != null) { + socketCancelable.cancel(); + } + + if (socket != null) + socket.close(); + + // call this? + callback.onConnectCompleted(new CancellationException(), null); + + return this; + } } private void reportConnectedCompleted(CancelableImpl cancel, Exception ex, AsyncHttpResponseImpl response, final HttpConnectCallback callback) { @@ -88,7 +110,6 @@ public class AsyncHttpClient { final int finalPort = port; final InternalConnectCallback socketConnected = new InternalConnectCallback() { - AsyncSocket cancelSocket; Object scheduled; { if (request.getTimeout() > 0) { @@ -96,8 +117,6 @@ public class AsyncHttpClient { @Override public void run() { cancel.cancel(); - if (cancelSocket != null) - cancelSocket.close(); reportConnectedCompleted(cancel, new TimeoutException(), null, callback); } }, request.getTimeout()); @@ -114,7 +133,7 @@ public class AsyncHttpClient { socket.close(); return; } - cancelSocket = socket; + cancel.socket = socket; if (ex != null) { reportConnectedCompleted(cancel, ex, null, callback); return; @@ -233,7 +252,7 @@ public class AsyncHttpClient { HashSet<AsyncSocket> sockets = mSockets.get(lookup); if (sockets != null) { -// synchronized (sockets) { + synchronized (sockets) { for (final AsyncSocket socket: sockets) { if (socket.isOpen()) { sockets.remove(socket); @@ -241,7 +260,7 @@ public class AsyncHttpClient { mServer.post(new Runnable() { @Override public void run() { - Log.i("Async", "Reusing socket."); +// Log.i(LOGTAG, "Reusing socket."); socketConnected.reused = true; socketConnected.onConnectCompleted(null, socket); } @@ -249,9 +268,9 @@ public class AsyncHttpClient { return; } } -// } + } } - mServer.connectSocket(uri.getHost(), port, socketConnected); + cancel.socketCancelable = mServer.connectSocket(uri.getHost(), port, socketConnected); } public Cancelable execute(URI uri, final HttpConnectCallback callback) { @@ -373,7 +392,7 @@ public class AsyncHttpClient { public Cancelable execute(AsyncHttpRequest req, final String filename, final FileCallback callback) { final Handler handler = Looper.myLooper() == null ? null : new Handler(); final File file = new File(filename); - final CancelableRequest cancel = new CancelableRequest() { + final CancelableImpl cancel = new CancelableImpl() { @Override public Cancelable cancel() { Cancelable ret = super.cancel(); @@ -404,7 +423,6 @@ public class AsyncHttpClient { invoke(handler, callback, AsyncServer.getDefault(), response, ex, null); return; } - cancel.response = response; final int contentLength = response.getHeaders().getContentLength(); @@ -439,20 +457,9 @@ public class AsyncHttpClient { return cancel; } - private static class CancelableRequest extends CancelableImpl { - AsyncHttpResponse response; - @Override - public Cancelable cancel() { - Cancelable ret = super.cancel(); - if (response != null) - response.close(); - return ret; - } - } - private Cancelable execute(AsyncHttpRequest req, final RequestCallback callback, final ResultConvert convert) { final Handler handler = Looper.myLooper() == null ? null : new Handler(); - final CancelableRequest cancel = new CancelableRequest(); + final CancelableImpl cancel = new CancelableImpl(); execute(req, new HttpConnectCallback() { int mDownloaded = 0; ByteBufferList buffer = new ByteBufferList(); @@ -462,7 +469,6 @@ public class AsyncHttpClient { invoke(handler, callback, AsyncServer.getDefault(), response, ex, null); return; } - cancel.response = response; final int contentLength = response.getHeaders().getContentLength(); diff --git a/AndroidAsync/src/com/koushikdutta/async/http/AsyncHttpClientMiddleware.java b/AndroidAsync/src/com/koushikdutta/async/http/AsyncHttpClientMiddleware.java new file mode 100644 index 0000000..59756f8 --- /dev/null +++ b/AndroidAsync/src/com/koushikdutta/async/http/AsyncHttpClientMiddleware.java @@ -0,0 +1,8 @@ +package com.koushikdutta.async.http; + +import com.koushikdutta.async.AsyncSocket; + +public interface AsyncHttpClientMiddleware { + public AsyncSocket getSocket(final AsyncHttpRequest request, final HttpConnectCallback callback); + public AsyncSocket onSocket(AsyncSocket socket); +} diff --git a/AndroidAsync/src/com/koushikdutta/async/http/AsyncHttpResponseImpl.java b/AndroidAsync/src/com/koushikdutta/async/http/AsyncHttpResponseImpl.java index c5de2e8..1719f42 100644 --- a/AndroidAsync/src/com/koushikdutta/async/http/AsyncHttpResponseImpl.java +++ b/AndroidAsync/src/com/koushikdutta/async/http/AsyncHttpResponseImpl.java @@ -130,9 +130,6 @@ abstract class AsyncHttpResponseImpl extends FilteredDataCallback implements Asy mSocket.setClosedCallback(null); mSocket.setEndCallback(null); mCompleted = true; -// System.out.println("closing up shop"); -// if (mCompletedCallback != null) -// mCompletedCallback.onCompleted(e); } private AsyncHttpRequest mRequest; @@ -143,17 +140,6 @@ abstract class AsyncHttpResponseImpl extends FilteredDataCallback implements Asy } boolean mCompleted = false; -// -// CompletedCallback mCompletedCallback; -// @Override -// public void setCompletedCallback(CompletedCallback handler) { -// mCompletedCallback = handler; -// } -// -// @Override -// public CompletedCallback getCompletedCallback() { -// return mCompletedCallback; -// } @Override public ResponseHeaders getHeaders() { diff --git a/AndroidAsync/src/com/koushikdutta/async/http/server/AsyncHttpServerResponseImpl.java b/AndroidAsync/src/com/koushikdutta/async/http/server/AsyncHttpServerResponseImpl.java index 0d31260..d822292 100644 --- a/AndroidAsync/src/com/koushikdutta/async/http/server/AsyncHttpServerResponseImpl.java +++ b/AndroidAsync/src/com/koushikdutta/async/http/server/AsyncHttpServerResponseImpl.java @@ -59,7 +59,7 @@ public class AsyncHttpServerResponseImpl implements AsyncHttpServerResponse { } boolean mHasWritten = false; - FilteredDataSink mChunker; + ChunkedOutputFilter mChunker; void initFirstWrite() { if (mHasWritten) return; diff --git a/AndroidAsyncSample/src/com/koushikdutta/async/sample/MainActivity.java b/AndroidAsyncSample/src/com/koushikdutta/async/sample/MainActivity.java index d7c30eb..a15ce92 100644 --- a/AndroidAsyncSample/src/com/koushikdutta/async/sample/MainActivity.java +++ b/AndroidAsyncSample/src/com/koushikdutta/async/sample/MainActivity.java @@ -56,6 +56,12 @@ public class MainActivity extends Activity { private void getFile(final ImageView iv, String url, final String filename) { AsyncHttpClient.getDefaultInstance().get(url, filename, new AsyncHttpClient.FileCallback() { @Override + public void onProgress(AsyncHttpResponse response, int downloaded, int total) { + response.pause(); + super.onProgress(response, downloaded, total); + } + + @Override public void onCompleted(Exception e, AsyncHttpResponse response, File result) { if (e != null) { e.printStackTrace(); |