diff options
9 files changed, 45 insertions, 28 deletions
diff --git a/AndroidAsync/src/com/koushikdutta/async/AsyncNetworkSocket.java b/AndroidAsync/src/com/koushikdutta/async/AsyncNetworkSocket.java index 9d8fbe5..c3fa1d5 100644 --- a/AndroidAsync/src/com/koushikdutta/async/AsyncNetworkSocket.java +++ b/AndroidAsync/src/com/koushikdutta/async/AsyncNetworkSocket.java @@ -45,7 +45,7 @@ public class AsyncNetworkSocket implements AsyncSocket { } public void onDataWritable() { - assert mWriteableHandler != null; +// assert mWriteableHandler != null; if (mWriteableHandler != null) mWriteableHandler.onWriteable(); } @@ -295,6 +295,7 @@ public class AsyncNetworkSocket implements AsyncSocket { if (mPaused) return; + mPaused = true; try { mKey.interestOps(~SelectionKey.OP_READ & mKey.interestOps()); diff --git a/AndroidAsync/src/com/koushikdutta/async/ByteBufferList.java b/AndroidAsync/src/com/koushikdutta/async/ByteBufferList.java index 6aa8d78..ee914ba 100644 --- a/AndroidAsync/src/com/koushikdutta/async/ByteBufferList.java +++ b/AndroidAsync/src/com/koushikdutta/async/ByteBufferList.java @@ -154,7 +154,6 @@ public class ByteBufferList { mBuffers.addFirst(b); assert subset.capacity() >= need; assert subset.position() == 0; - assert into.remaining() == length; break; } else { @@ -187,7 +186,7 @@ public class ByteBufferList { private ByteBuffer read(int count) { if (remaining() < count) - throw new IllegalArgumentException("count"); + throw new IllegalArgumentException("count : " + remaining() + "/" + count); ByteBuffer first = mBuffers.peek(); while (first != null && !first.hasRemaining()) { @@ -421,7 +420,6 @@ public class ByteBufferList { public static ByteBuffer obtain(int size) { if (size <= maxItem) { - assert Thread.currentThread() != Looper.getMainLooper().getThread(); PriorityQueue<ByteBuffer> r = getReclaimed(); if (r != null) { synchronized (LOCK) { diff --git a/AndroidAsync/src/com/koushikdutta/async/Util.java b/AndroidAsync/src/com/koushikdutta/async/Util.java index 25f46db..b26669b 100644 --- a/AndroidAsync/src/com/koushikdutta/async/Util.java +++ b/AndroidAsync/src/com/koushikdutta/async/Util.java @@ -22,7 +22,7 @@ public class Util { // not all the data was consumed... // call byteBufferList.recycle() or read all the data to prevent this assertion. // this is nice to have, as it identifies protocol or parsing errors. - System.out.println("Data: " + list.peekString()); +// System.out.println("Data: " + list.peekString()); System.out.println("handler: " + handler); assert false; throw new RuntimeException("mDataHandler failed to consume data, yet remains the mDataHandler."); @@ -32,8 +32,9 @@ public class Util { // not all the data was consumed... // call byteBufferList.recycle() or read all the data to prevent this assertion. // this is nice to have, as it identifies protocol or parsing errors. - System.out.println("Data: " + list.peekString()); +// System.out.println("Data: " + list.peekString()); System.out.println("handler: " + handler); + System.out.println("emitter: " + emitter); assert false; throw new RuntimeException("mDataHandler failed to consume data, yet remains the mDataHandler."); } @@ -112,13 +113,11 @@ public class Util { } public static void pump(final DataEmitter emitter, final DataSink sink, final CompletedCallback callback) { - final ByteBufferList pending = new ByteBufferList(); final DataCallback dataCallback = new DataCallback() { @Override public void onDataAvailable(DataEmitter emitter, ByteBufferList bb) { - bb.get(pending); - sink.write(pending); - if (pending.remaining() > 0) + sink.write(bb); + if (bb.remaining() > 0) emitter.pause(); } }; diff --git a/AndroidAsync/src/com/koushikdutta/async/dns/Dns.java b/AndroidAsync/src/com/koushikdutta/async/dns/Dns.java index 61ea1c4..53cf575 100644 --- a/AndroidAsync/src/com/koushikdutta/async/dns/Dns.java +++ b/AndroidAsync/src/com/koushikdutta/async/dns/Dns.java @@ -197,6 +197,7 @@ public class Dns { dgram = server.connectDatagram(new InetSocketAddress("8.8.8.8", 53)); } else { + System.out.println("multicast dns..."); dgram = AsyncServer.getDefault().openDatagram(new InetSocketAddress(5353), true); Field field = DatagramSocket.class.getDeclaredField("impl"); field.setAccessible(true); @@ -217,9 +218,9 @@ public class Dns { @Override public void onDataAvailable(DataEmitter emitter, ByteBufferList bb) { try { - System.out.println(dgram.getRemoteAddress()); - DnsResponse response; - System.out.println(response = parse(bb)); +// System.out.println(dgram.getRemoteAddress()); + DnsResponse response = parse(bb); +// System.out.println(response); response.source = dgram.getRemoteAddress(); if (!multicast) { diff --git a/AndroidAsync/src/com/koushikdutta/async/http/AsyncHttpClient.java b/AndroidAsync/src/com/koushikdutta/async/http/AsyncHttpClient.java index d839b00..f18adf9 100644 --- a/AndroidAsync/src/com/koushikdutta/async/http/AsyncHttpClient.java +++ b/AndroidAsync/src/com/koushikdutta/async/http/AsyncHttpClient.java @@ -116,7 +116,7 @@ public class AsyncHttpClient { } if (complete) { callback.onConnectCompleted(ex, response); - assert ex != null || response.getSocket() == null || response.getDataCallback() != null; + assert ex != null || response.getSocket() == null || response.getDataCallback() != null || response.isPaused(); return; } @@ -191,6 +191,7 @@ public class AsyncHttpClient { data.connectCallback = new ConnectCallback() { @Override public void onConnectCompleted(Exception ex, AsyncSocket socket) { + request.logv("socket connected"); if (cancel.isCancelled()) { if (socket != null) socket.close(); @@ -221,6 +222,7 @@ public class AsyncHttpClient { final AsyncHttpResponseImpl ret = new AsyncHttpResponseImpl(request) { @Override protected void onRequestCompleted(Exception ex) { + request.logv("request completed"); if (cancel.isCancelled()) return; // 5) after request is sent, set a header timeout diff --git a/AndroidAsync/src/com/koushikdutta/async/http/filter/ContentLengthFilter.java b/AndroidAsync/src/com/koushikdutta/async/http/filter/ContentLengthFilter.java index 7ecca51..5ca2489 100644 --- a/AndroidAsync/src/com/koushikdutta/async/http/filter/ContentLengthFilter.java +++ b/AndroidAsync/src/com/koushikdutta/async/http/filter/ContentLengthFilter.java @@ -22,13 +22,19 @@ public class ContentLengthFilter extends FilteredDataEmitter { @Override public void onDataAvailable(DataEmitter emitter, ByteBufferList bb) { assert totalRead < contentLength; + int remaining = bb.remaining(); int toRead = Math.min(contentLength - totalRead, remaining); bb.get(transformed, toRead); - totalRead += transformed.remaining(); + int beforeRead = transformed.remaining(); + super.onDataAvailable(emitter, transformed); + + totalRead += (beforeRead - transformed.remaining()); + transformed.get(bb); + if (totalRead == contentLength) report(null); } diff --git a/AndroidAsync/src/com/koushikdutta/async/http/server/UnknownRequestBody.java b/AndroidAsync/src/com/koushikdutta/async/http/server/UnknownRequestBody.java index 60d9c10..a4c81d4 100644 --- a/AndroidAsync/src/com/koushikdutta/async/http/server/UnknownRequestBody.java +++ b/AndroidAsync/src/com/koushikdutta/async/http/server/UnknownRequestBody.java @@ -3,6 +3,7 @@ package com.koushikdutta.async.http.server; import com.koushikdutta.async.DataEmitter; import com.koushikdutta.async.DataSink; import com.koushikdutta.async.NullDataCallback; +import com.koushikdutta.async.Util; import com.koushikdutta.async.callback.CompletedCallback; import com.koushikdutta.async.callback.DataCallback; import com.koushikdutta.async.http.AsyncHttpRequest; @@ -13,9 +14,18 @@ public class UnknownRequestBody implements AsyncHttpRequestBody<Void> { mContentType = contentType; } + int length = -1; + public UnknownRequestBody(DataEmitter emitter, String contentType, int length) { + mContentType = contentType; + this.emitter = emitter; + this.length = length; + } + @Override - public void write(AsyncHttpRequest request, DataSink sink, final CompletedCallback completed) { - assert false; + public void write(final AsyncHttpRequest request, DataSink sink, final CompletedCallback completed) { + Util.pump(emitter, sink, completed); + if (emitter.isPaused()) + emitter.resume(); } private String mContentType; @@ -31,7 +41,7 @@ public class UnknownRequestBody implements AsyncHttpRequestBody<Void> { @Override public int length() { - return -1; + return length; } @Override diff --git a/AndroidAsyncTest/src/com/koushikdutta/async/test/DnsTests.java b/AndroidAsyncTest/src/com/koushikdutta/async/test/DnsTests.java index 425646e..c69c172 100644 --- a/AndroidAsyncTest/src/com/koushikdutta/async/test/DnsTests.java +++ b/AndroidAsyncTest/src/com/koushikdutta/async/test/DnsTests.java @@ -61,14 +61,14 @@ public class DnsTests extends TestCase { // ((DatagramSocket)dgram.getSocket()).setBroadcast(true); - final Semaphore semaphore = new Semaphore(0); - Dns.multicastLookup("_airplay._tcp.local", new FutureCallback<DnsResponse>() { - @Override - public void onCompleted(Exception e, DnsResponse result) { -// semaphore.release(); - } - }); - - semaphore.tryAcquire(1000000, TimeUnit.MILLISECONDS); +// final Semaphore semaphore = new Semaphore(0); +// Dns.multicastLookup("_airplay._tcp.local", new FutureCallback<DnsResponse>() { +// @Override +// public void onCompleted(Exception e, DnsResponse result) { +//// semaphore.release(); +// } +// }); +// +// semaphore.tryAcquire(1000000, TimeUnit.MILLISECONDS); } } diff --git a/AndroidAsyncTest/src/com/koushikdutta/async/test/HttpClientTests.java b/AndroidAsyncTest/src/com/koushikdutta/async/test/HttpClientTests.java index 8c969d1..a60b76e 100644 --- a/AndroidAsyncTest/src/com/koushikdutta/async/test/HttpClientTests.java +++ b/AndroidAsyncTest/src/com/koushikdutta/async/test/HttpClientTests.java @@ -271,7 +271,7 @@ public class HttpClientTests extends TestCase { .setCallback(new FutureCallback<File>() { @Override public void onCompleted(Exception e, File result) { - fail(); + assertTrue(e instanceof CancellationException); } }); |