diff options
6 files changed, 61 insertions, 19 deletions
diff --git a/AndroidAsync/src/com/koushikdutta/async/AsyncServer.java b/AndroidAsync/src/com/koushikdutta/async/AsyncServer.java index d320aef..d505ce7 100644 --- a/AndroidAsync/src/com/koushikdutta/async/AsyncServer.java +++ b/AndroidAsync/src/com/koushikdutta/async/AsyncServer.java @@ -272,6 +272,7 @@ public class AsyncServer { } private static void runLoop(AsyncServer server, Selector selector, LinkedList<Runnable> queue, boolean keepRunning) throws IOException { +// Log.i(LOGTAG, "Keys: " + selector.keys().size()); boolean needsSelect = true; synchronized (server) { // run the queue to populate the selector with keys diff --git a/AndroidAsync/src/com/koushikdutta/async/AsyncSocket.java b/AndroidAsync/src/com/koushikdutta/async/AsyncSocket.java index 82d4b08..063709a 100644 --- a/AndroidAsync/src/com/koushikdutta/async/AsyncSocket.java +++ b/AndroidAsync/src/com/koushikdutta/async/AsyncSocket.java @@ -3,4 +3,6 @@ package com.koushikdutta.async; public interface AsyncSocket extends DataExchange, CloseableData, ExceptionEmitter { public boolean isConnected(); + public void pause(); + public void resume(); } diff --git a/AndroidAsync/src/com/koushikdutta/async/AsyncSocketImpl.java b/AndroidAsync/src/com/koushikdutta/async/AsyncSocketImpl.java index 2c06b10..4f38ca2 100644 --- a/AndroidAsync/src/com/koushikdutta/async/AsyncSocketImpl.java +++ b/AndroidAsync/src/com/koushikdutta/async/AsyncSocketImpl.java @@ -7,6 +7,7 @@ import java.nio.channels.SelectionKey; import java.nio.channels.SocketChannel; import junit.framework.Assert; +import android.util.Log; import com.koushikdutta.async.callback.ClosedCallback; import com.koushikdutta.async.callback.DataCallback; @@ -85,17 +86,26 @@ class AsyncSocketImpl implements AsyncSocket { } } + int mToAlloc = 0; int onReadable() { int total = 0; try { boolean closed = false; ByteBufferList list = new ByteBufferList(); + ByteBuffer b = null; + // keep track of the max mount read during this read cycle + // so we can be quicker about allocations during the next + // time this socket reads. + int maxRead = 0; while (true) { - ByteBuffer b = ByteBuffer.allocate(2 << 10); - list.add(b); + if (b == null) { + b = ByteBuffer.allocate(Math.min(Math.max(mToAlloc, 2 << 11), 1024 * 1024)); + } + else { + b = ByteBuffer.allocate(Math.min(b.capacity() * 2, 1024 * 1024)); + } int read = mChannel.read(b); - b.limit(b.position()); - b.position(0); + maxRead = Math.max(read, maxRead); if (read < 0) { close(); closed = true; @@ -105,24 +115,21 @@ class AsyncSocketImpl implements AsyncSocket { } if (read <= 0) break; - if (mChannel.isChunked()) { - Assert.assertNotNull(mDataHandler); - mDataHandler.onDataAvailable(this, list); + + mToAlloc = read; + b.limit(b.position()); + b.position(0); + list.add(b); + if (mChannel.isChunked() || b.capacity() == 1024 * 1024) { + Util.emitAllData(this, list); list = new ByteBufferList(); } } + + mToAlloc = maxRead; + if (!mChannel.isChunked()) { -// Util.emitAllData(this, list); - int remaining; - while (mDataHandler != null && (remaining = list.remaining()) > 0) { - DataCallback handler = mDataHandler; - mDataHandler.onDataAvailable(this, list); - if (remaining == list.remaining() && handler == mDataHandler) { - Assert.fail("mDataHandler failed to consume data, yet remains the mDataHandler."); - break; - } - } -// Assert.assertEquals(list.remaining(), 0); + Util.emitAllData(this, list); } if (closed) @@ -207,4 +214,14 @@ class AsyncSocketImpl implements AsyncSocket { public boolean isConnected() { return mChannel.isConnected(); } + + @Override + public void pause() { + mKey.interestOps(~SelectionKey.OP_READ & mKey.interestOps()); + } + + @Override + public void resume() { + mKey.interestOps(SelectionKey.OP_READ | mKey.interestOps()); + } } diff --git a/AndroidAsync/src/com/koushikdutta/async/BufferedDataEmitter.java b/AndroidAsync/src/com/koushikdutta/async/BufferedDataEmitter.java index 50e1289..f00ad63 100644 --- a/AndroidAsync/src/com/koushikdutta/async/BufferedDataEmitter.java +++ b/AndroidAsync/src/com/koushikdutta/async/BufferedDataEmitter.java @@ -36,5 +36,4 @@ public class BufferedDataEmitter implements DataEmitter, DataCallback { onDataAvailable(); } - } diff --git a/AndroidAsync/src/com/koushikdutta/async/BufferedDataSink.java b/AndroidAsync/src/com/koushikdutta/async/BufferedDataSink.java index 3aa9330..a21103b 100644 --- a/AndroidAsync/src/com/koushikdutta/async/BufferedDataSink.java +++ b/AndroidAsync/src/com/koushikdutta/async/BufferedDataSink.java @@ -3,6 +3,7 @@ package com.koushikdutta.async; import java.nio.ByteBuffer; import junit.framework.Assert; +import android.util.Log; import com.koushikdutta.async.callback.WritableCallback; @@ -23,9 +24,11 @@ public class BufferedDataSink implements DataSink { } private void writePending() { +// Log.i("NIO", "Writing to buffer..."); mDataSink.write(mPendingWrites); if (mPendingWrites.remaining() == 0) { mPendingWrites = null; + onFlushed(); } } @@ -77,4 +80,13 @@ public class BufferedDataSink implements DataSink { Assert.fail("BufferingDataSink is always writeable."); return null; } + + public int remaining() { + if (mPendingWrites == null) + return 0; + return mPendingWrites.remaining(); + } + + public void onFlushed() { + } } diff --git a/AndroidAsync/src/com/koushikdutta/async/Util.java b/AndroidAsync/src/com/koushikdutta/async/Util.java index 5a7c09d..7e0b443 100644 --- a/AndroidAsync/src/com/koushikdutta/async/Util.java +++ b/AndroidAsync/src/com/koushikdutta/async/Util.java @@ -1,5 +1,7 @@ package com.koushikdutta.async; +import java.nio.ByteBuffer; + import junit.framework.Assert; import com.koushikdutta.async.callback.DataCallback; @@ -17,4 +19,13 @@ public class Util { } Assert.assertEquals(list.remaining(), 0); } + + public static void emitAllData(DataEmitter emitter, ByteBuffer b) { + ByteBufferList list = new ByteBufferList(); + list.add(b); + emitAllData(emitter, list); + // previous call makes sure list is empty, + // so this is safe to clear + b.position(b.limit()); + } } |