diff options
3 files changed, 89 insertions, 57 deletions
diff --git a/AndroidAsync/src/com/koushikdutta/async/AsyncServer.java b/AndroidAsync/src/com/koushikdutta/async/AsyncServer.java index b562baf..d320aef 100644 --- a/AndroidAsync/src/com/koushikdutta/async/AsyncServer.java +++ b/AndroidAsync/src/com/koushikdutta/async/AsyncServer.java @@ -158,10 +158,10 @@ public class AsyncServer { @Override public void run() { try { - SocketAddress remote = new InetSocketAddress(host, port); - socket.connect(remote); SelectionKey ckey = sc.register(mSelector); ckey.attach(handler); + SocketAddress remote = new InetSocketAddress(host, port); + socket.connect(remote); } catch (Exception e) { handler.onConnectCompleted(e, null); @@ -298,50 +298,56 @@ public class AsyncServer { // process whatever keys are ready Set<SelectionKey> readyKeys = selector.selectedKeys(); for (SelectionKey key : readyKeys) { - if (key.isAcceptable()) { - ServerSocketChannel nextReady = (ServerSocketChannel) key.channel(); - SocketChannel sc = nextReady.accept(); - if (sc == null) - continue; - sc.configureBlocking(false); - SelectionKey ckey = sc.register(selector, SelectionKey.OP_READ); - ListenCallback serverHandler = (ListenCallback) key.attachment(); - AsyncSocketImpl handler = new AsyncSocketImpl(); - handler.attach(sc); - handler.mKey = ckey; - ckey.attach(handler); - serverHandler.onAccepted(handler); - } - else if (key.isReadable()) { - AsyncSocketImpl handler = (AsyncSocketImpl) key.attachment(); - int transmitted = handler.onReadable(); - server.onDataTransmitted(transmitted); - } - else if (key.isWritable()) { - AsyncSocketImpl handler = (AsyncSocketImpl) key.attachment(); - handler.onDataWritable(); - } - else if (key.isConnectable()) { - ConnectCallback handler = (ConnectCallback) key.attachment(); - SocketChannel sc = (SocketChannel) key.channel(); - key.interestOps(SelectionKey.OP_READ); - try { - sc.finishConnect(); - AsyncSocketImpl newHandler = new AsyncSocketImpl(); - newHandler.mKey = key; - newHandler.attach(sc); - key.attach(newHandler); - handler.onConnectCompleted(null, newHandler); + try { + if (key.isAcceptable()) { + ServerSocketChannel nextReady = (ServerSocketChannel) key.channel(); + SocketChannel sc = nextReady.accept(); + if (sc == null) + continue; + sc.configureBlocking(false); + SelectionKey ckey = sc.register(selector, SelectionKey.OP_READ); + ListenCallback serverHandler = (ListenCallback) key.attachment(); + AsyncSocketImpl handler = new AsyncSocketImpl(); + handler.attach(sc); + handler.mKey = ckey; + ckey.attach(handler); + serverHandler.onAccepted(handler); + } + else if (key.isReadable()) { + AsyncSocketImpl handler = (AsyncSocketImpl) key.attachment(); + int transmitted = handler.onReadable(); + server.onDataTransmitted(transmitted); + } + else if (key.isWritable()) { + AsyncSocketImpl handler = (AsyncSocketImpl) key.attachment(); + handler.onDataWritable(); + } + else if (key.isConnectable()) { + ConnectCallback handler = (ConnectCallback) key.attachment(); + SocketChannel sc = (SocketChannel) key.channel(); + key.interestOps(SelectionKey.OP_READ); + try { + sc.finishConnect(); + AsyncSocketImpl newHandler = new AsyncSocketImpl(); + newHandler.mKey = key; + newHandler.attach(sc); + key.attach(newHandler); + handler.onConnectCompleted(null, newHandler); + } + catch (Exception ex) { + key.cancel(); + sc.close(); + handler.onConnectCompleted(ex, null); + } } - catch (Exception ex) { - key.cancel(); - sc.close(); - handler.onConnectCompleted(ex, null); + else { + Log.i(LOGTAG, "wtf"); + Assert.fail(); } } - else { - Log.i(LOGTAG, "wtf"); - Assert.fail(); + catch (Exception ex) { + Log.i(LOGTAG, "inner loop exception"); + ex.printStackTrace(); } } readyKeys.clear(); diff --git a/AndroidAsync/src/com/koushikdutta/async/BufferedDataSink.java b/AndroidAsync/src/com/koushikdutta/async/BufferedDataSink.java index 4b176b7..3aa9330 100644 --- a/AndroidAsync/src/com/koushikdutta/async/BufferedDataSink.java +++ b/AndroidAsync/src/com/koushikdutta/async/BufferedDataSink.java @@ -24,23 +24,47 @@ public class BufferedDataSink implements DataSink { private void writePending() { mDataSink.write(mPendingWrites); + if (mPendingWrites.remaining() == 0) { + mPendingWrites = null; + } } - ByteBufferList mPendingWrites = new ByteBufferList(); + ByteBufferList mPendingWrites; @Override public void write(ByteBuffer bb) { - mPendingWrites.add(ByteBuffer.wrap(bb.array(), bb.arrayOffset() + bb.position(), bb.remaining())); - bb.position(0); - bb.limit(0); - writePending(); + if (mPendingWrites == null) { + mDataSink.write(bb); + if (bb.remaining() > 0) { + mPendingWrites = new ByteBufferList(); + mPendingWrites.add(bb); + bb.position(0); + bb.limit(0); + } + } + else { + mPendingWrites.add(ByteBuffer.wrap(bb.array(), bb.arrayOffset() + bb.position(), bb.remaining())); + bb.position(0); + bb.limit(0); + writePending(); + } } @Override public void write(ByteBufferList bb) { - mPendingWrites.add(bb); - bb.clear(); - writePending(); + if (mPendingWrites == null) { + mDataSink.write(bb); + if (bb.remaining() > 0) { + mPendingWrites = new ByteBufferList(); + mPendingWrites.add(bb); + } + bb.clear(); + } + else { + mPendingWrites.add(bb); + bb.clear(); + writePending(); + } } @Override diff --git a/AndroidAsync/src/com/koushikdutta/async/ByteBufferList.java b/AndroidAsync/src/com/koushikdutta/async/ByteBufferList.java index 8179e57..8836e46 100644 --- a/AndroidAsync/src/com/koushikdutta/async/ByteBufferList.java +++ b/AndroidAsync/src/com/koushikdutta/async/ByteBufferList.java @@ -87,15 +87,16 @@ public class ByteBufferList implements Iterable<ByteBuffer> { public ByteBuffer read(int count) { Assert.assertTrue(count <= remaining()); - if (count == 0) { - return ByteBuffer.wrap(new byte[0]); - } ByteBuffer first = mBuffers.peek(); - while (first.position() == first.limit()) { + while (first != null && first.position() == first.limit()) { mBuffers.remove(); first = mBuffers.peek(); } + + if (first == null) { + return ByteBuffer.wrap(new byte[0]); + } if (first.remaining() >= count) { return first; @@ -124,8 +125,9 @@ public class ByteBufferList implements Iterable<ByteBuffer> { public void trim() { // this clears out buffers that are empty in the beginning of the list - if (size() > 0) - read(0); + read(0); + if (remaining() == 0) + mBuffers = new LinkedList<ByteBuffer>(); } public void add(ByteBuffer b) { |