aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--AndroidAsync/src/com/koushikdutta/async/AsyncServer.java1
-rw-r--r--AndroidAsync/src/com/koushikdutta/async/AsyncSocket.java2
-rw-r--r--AndroidAsync/src/com/koushikdutta/async/AsyncSocketImpl.java53
-rw-r--r--AndroidAsync/src/com/koushikdutta/async/BufferedDataEmitter.java1
-rw-r--r--AndroidAsync/src/com/koushikdutta/async/BufferedDataSink.java12
-rw-r--r--AndroidAsync/src/com/koushikdutta/async/Util.java11
6 files changed, 61 insertions, 19 deletions
diff --git a/AndroidAsync/src/com/koushikdutta/async/AsyncServer.java b/AndroidAsync/src/com/koushikdutta/async/AsyncServer.java
index d320aef..d505ce7 100644
--- a/AndroidAsync/src/com/koushikdutta/async/AsyncServer.java
+++ b/AndroidAsync/src/com/koushikdutta/async/AsyncServer.java
@@ -272,6 +272,7 @@ public class AsyncServer {
}
private static void runLoop(AsyncServer server, Selector selector, LinkedList<Runnable> queue, boolean keepRunning) throws IOException {
+// Log.i(LOGTAG, "Keys: " + selector.keys().size());
boolean needsSelect = true;
synchronized (server) {
// run the queue to populate the selector with keys
diff --git a/AndroidAsync/src/com/koushikdutta/async/AsyncSocket.java b/AndroidAsync/src/com/koushikdutta/async/AsyncSocket.java
index 82d4b08..063709a 100644
--- a/AndroidAsync/src/com/koushikdutta/async/AsyncSocket.java
+++ b/AndroidAsync/src/com/koushikdutta/async/AsyncSocket.java
@@ -3,4 +3,6 @@ package com.koushikdutta.async;
public interface AsyncSocket extends DataExchange, CloseableData, ExceptionEmitter {
public boolean isConnected();
+ public void pause();
+ public void resume();
}
diff --git a/AndroidAsync/src/com/koushikdutta/async/AsyncSocketImpl.java b/AndroidAsync/src/com/koushikdutta/async/AsyncSocketImpl.java
index 2c06b10..4f38ca2 100644
--- a/AndroidAsync/src/com/koushikdutta/async/AsyncSocketImpl.java
+++ b/AndroidAsync/src/com/koushikdutta/async/AsyncSocketImpl.java
@@ -7,6 +7,7 @@ import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import junit.framework.Assert;
+import android.util.Log;
import com.koushikdutta.async.callback.ClosedCallback;
import com.koushikdutta.async.callback.DataCallback;
@@ -85,17 +86,26 @@ class AsyncSocketImpl implements AsyncSocket {
}
}
+ int mToAlloc = 0;
int onReadable() {
int total = 0;
try {
boolean closed = false;
ByteBufferList list = new ByteBufferList();
+ ByteBuffer b = null;
+ // keep track of the max mount read during this read cycle
+ // so we can be quicker about allocations during the next
+ // time this socket reads.
+ int maxRead = 0;
while (true) {
- ByteBuffer b = ByteBuffer.allocate(2 << 10);
- list.add(b);
+ if (b == null) {
+ b = ByteBuffer.allocate(Math.min(Math.max(mToAlloc, 2 << 11), 1024 * 1024));
+ }
+ else {
+ b = ByteBuffer.allocate(Math.min(b.capacity() * 2, 1024 * 1024));
+ }
int read = mChannel.read(b);
- b.limit(b.position());
- b.position(0);
+ maxRead = Math.max(read, maxRead);
if (read < 0) {
close();
closed = true;
@@ -105,24 +115,21 @@ class AsyncSocketImpl implements AsyncSocket {
}
if (read <= 0)
break;
- if (mChannel.isChunked()) {
- Assert.assertNotNull(mDataHandler);
- mDataHandler.onDataAvailable(this, list);
+
+ mToAlloc = read;
+ b.limit(b.position());
+ b.position(0);
+ list.add(b);
+ if (mChannel.isChunked() || b.capacity() == 1024 * 1024) {
+ Util.emitAllData(this, list);
list = new ByteBufferList();
}
}
+
+ mToAlloc = maxRead;
+
if (!mChannel.isChunked()) {
-// Util.emitAllData(this, list);
- int remaining;
- while (mDataHandler != null && (remaining = list.remaining()) > 0) {
- DataCallback handler = mDataHandler;
- mDataHandler.onDataAvailable(this, list);
- if (remaining == list.remaining() && handler == mDataHandler) {
- Assert.fail("mDataHandler failed to consume data, yet remains the mDataHandler.");
- break;
- }
- }
-// Assert.assertEquals(list.remaining(), 0);
+ Util.emitAllData(this, list);
}
if (closed)
@@ -207,4 +214,14 @@ class AsyncSocketImpl implements AsyncSocket {
public boolean isConnected() {
return mChannel.isConnected();
}
+
+ @Override
+ public void pause() {
+ mKey.interestOps(~SelectionKey.OP_READ & mKey.interestOps());
+ }
+
+ @Override
+ public void resume() {
+ mKey.interestOps(SelectionKey.OP_READ | mKey.interestOps());
+ }
}
diff --git a/AndroidAsync/src/com/koushikdutta/async/BufferedDataEmitter.java b/AndroidAsync/src/com/koushikdutta/async/BufferedDataEmitter.java
index 50e1289..f00ad63 100644
--- a/AndroidAsync/src/com/koushikdutta/async/BufferedDataEmitter.java
+++ b/AndroidAsync/src/com/koushikdutta/async/BufferedDataEmitter.java
@@ -36,5 +36,4 @@ public class BufferedDataEmitter implements DataEmitter, DataCallback {
onDataAvailable();
}
-
}
diff --git a/AndroidAsync/src/com/koushikdutta/async/BufferedDataSink.java b/AndroidAsync/src/com/koushikdutta/async/BufferedDataSink.java
index 3aa9330..a21103b 100644
--- a/AndroidAsync/src/com/koushikdutta/async/BufferedDataSink.java
+++ b/AndroidAsync/src/com/koushikdutta/async/BufferedDataSink.java
@@ -3,6 +3,7 @@ package com.koushikdutta.async;
import java.nio.ByteBuffer;
import junit.framework.Assert;
+import android.util.Log;
import com.koushikdutta.async.callback.WritableCallback;
@@ -23,9 +24,11 @@ public class BufferedDataSink implements DataSink {
}
private void writePending() {
+// Log.i("NIO", "Writing to buffer...");
mDataSink.write(mPendingWrites);
if (mPendingWrites.remaining() == 0) {
mPendingWrites = null;
+ onFlushed();
}
}
@@ -77,4 +80,13 @@ public class BufferedDataSink implements DataSink {
Assert.fail("BufferingDataSink is always writeable.");
return null;
}
+
+ public int remaining() {
+ if (mPendingWrites == null)
+ return 0;
+ return mPendingWrites.remaining();
+ }
+
+ public void onFlushed() {
+ }
}
diff --git a/AndroidAsync/src/com/koushikdutta/async/Util.java b/AndroidAsync/src/com/koushikdutta/async/Util.java
index 5a7c09d..7e0b443 100644
--- a/AndroidAsync/src/com/koushikdutta/async/Util.java
+++ b/AndroidAsync/src/com/koushikdutta/async/Util.java
@@ -1,5 +1,7 @@
package com.koushikdutta.async;
+import java.nio.ByteBuffer;
+
import junit.framework.Assert;
import com.koushikdutta.async.callback.DataCallback;
@@ -17,4 +19,13 @@ public class Util {
}
Assert.assertEquals(list.remaining(), 0);
}
+
+ public static void emitAllData(DataEmitter emitter, ByteBuffer b) {
+ ByteBufferList list = new ByteBufferList();
+ list.add(b);
+ emitAllData(emitter, list);
+ // previous call makes sure list is empty,
+ // so this is safe to clear
+ b.position(b.limit());
+ }
}