diff options
Diffstat (limited to 'AndroidAsync/src/com/koushikdutta/async/http/ResponseCacheMiddleware.java')
-rw-r--r-- | AndroidAsync/src/com/koushikdutta/async/http/ResponseCacheMiddleware.java | 155 |
1 files changed, 53 insertions, 102 deletions
diff --git a/AndroidAsync/src/com/koushikdutta/async/http/ResponseCacheMiddleware.java b/AndroidAsync/src/com/koushikdutta/async/http/ResponseCacheMiddleware.java index cfa693f..3715fb7 100644 --- a/AndroidAsync/src/com/koushikdutta/async/http/ResponseCacheMiddleware.java +++ b/AndroidAsync/src/com/koushikdutta/async/http/ResponseCacheMiddleware.java @@ -8,24 +8,23 @@ import com.koushikdutta.async.AsyncServer; import com.koushikdutta.async.AsyncSocket; import com.koushikdutta.async.ByteBufferList; import com.koushikdutta.async.DataEmitter; -import com.koushikdutta.async.DataEmitterBase; import com.koushikdutta.async.FilteredDataEmitter; import com.koushikdutta.async.Util; import com.koushikdutta.async.callback.CompletedCallback; import com.koushikdutta.async.callback.WritableCallback; import com.koushikdutta.async.future.Cancellable; import com.koushikdutta.async.future.SimpleCancellable; -import com.koushikdutta.async.http.libcore.Charsets; +import com.koushikdutta.async.util.Charsets; import com.koushikdutta.async.http.libcore.RawHeaders; import com.koushikdutta.async.http.libcore.ResponseHeaders; import com.koushikdutta.async.http.libcore.ResponseSource; import com.koushikdutta.async.http.libcore.StrictLineReader; +import com.koushikdutta.async.util.Allocator; import com.koushikdutta.async.util.FileCache; import com.koushikdutta.async.util.StreamUtility; import java.io.BufferedWriter; import java.io.ByteArrayInputStream; -import java.io.DataInputStream; import java.io.File; import java.io.FileInputStream; import java.io.FileOutputStream; @@ -35,7 +34,6 @@ import java.io.OutputStream; import java.io.OutputStreamWriter; import java.io.Writer; import java.net.CacheResponse; -import java.net.URI; import java.nio.ByteBuffer; import java.security.cert.Certificate; import java.security.cert.CertificateEncodingException; @@ -45,6 +43,8 @@ import java.security.cert.X509Certificate; import java.util.List; import java.util.Map; +import javax.net.ssl.SSLEngine; + public class ResponseCacheMiddleware extends SimpleMiddleware { public static final int ENTRY_METADATA = 0; public static final int ENTRY_BODY = 1; @@ -147,6 +147,9 @@ public class ResponseCacheMiddleware extends SimpleMiddleware { RawHeaders rawResponseHeaders = RawHeaders.fromMultimap(responseHeadersMap); ResponseHeaders cachedResponseHeaders = new ResponseHeaders(data.request.getUri(), rawResponseHeaders); + rawResponseHeaders.set("Content-Length", String.valueOf(contentLength)); + rawResponseHeaders.removeAll("Content-Encoding"); + rawResponseHeaders.removeAll("Transfer-Encoding"); cachedResponseHeaders.setLocalTimestamps(System.currentTimeMillis(), System.currentTimeMillis()); long now = System.currentTimeMillis(); @@ -155,9 +158,6 @@ public class ResponseCacheMiddleware extends SimpleMiddleware { if (responseSource == ResponseSource.CACHE) { data.request.logi("Response retrieved from cache"); final CachedSocket socket = entry.isHttps() ? new CachedSSLSocket(candidate, contentLength) : new CachedSocket(candidate, contentLength); - rawResponseHeaders.removeAll("Content-Encoding"); - rawResponseHeaders.removeAll("Transfer-Encoding"); - rawResponseHeaders.set("Content-Length", String.valueOf(contentLength)); socket.pending.add(ByteBuffer.wrap(rawResponseHeaders.toHeaderString().getBytes())); server.post(new Runnable() { @@ -219,14 +219,14 @@ public class ResponseCacheMiddleware extends SimpleMiddleware { if (cacheData != null) { if (cacheData.cachedResponseHeaders.validate(data.headers)) { data.request.logi("Serving response from conditional cache"); + data.headers.getHeaders().removeAll("Content-Length"); data.headers = cacheData.cachedResponseHeaders.combine(data.headers); data.headers.getHeaders().setStatusLine(cacheData.cachedResponseHeaders.getHeaders().getStatusLine()); data.headers.getHeaders().set(SERVED_FROM, CONDITIONAL_CACHE); conditionalCacheHitCount++; - BodySpewer bodySpewer = new BodySpewer(cacheData.contentLength); - bodySpewer.cacheResponse = cacheData.candidate; + CachedBodyEmitter bodySpewer = new CachedBodyEmitter(cacheData.candidate, cacheData.contentLength); bodySpewer.setDataEmitter(data.bodyEmitter); data.bodyEmitter = bodySpewer; bodySpewer.spew(); @@ -341,7 +341,7 @@ public class ResponseCacheMiddleware extends SimpleMiddleware { while (!bb.isEmpty()) { ByteBuffer b = bb.remove(); try { - outputStream.write(b.array(), b.arrayOffset() + b.position(), b.remaining()); + ByteBufferList.writeOutputStream(outputStream, b); } finally { copy.add(b); @@ -384,54 +384,62 @@ public class ResponseCacheMiddleware extends SimpleMiddleware { } } - private static class BodySpewer extends FilteredDataEmitter { - long contentLength; + private static class CachedBodyEmitter extends FilteredDataEmitter { EntryCacheResponse cacheResponse; - boolean first = true; ByteBufferList pending = new ByteBufferList(); - boolean paused; + private boolean paused; + private Allocator allocator = new Allocator(); boolean allowEnd; - public BodySpewer(long contentLength) { - this.contentLength = contentLength; + public CachedBodyEmitter(EntryCacheResponse cacheResponse, long contentLength) { + this.cacheResponse = cacheResponse; + allocator.setCurrentAlloc((int)contentLength); } + Runnable spewRunnable = new Runnable() { + @Override + public void run() { + spewInternal(); + } + }; + void spewInternal() { if (pending.remaining() > 0) { - com.koushikdutta.async.Util.emitAllData(BodySpewer.this, pending); + com.koushikdutta.async.Util.emitAllData(CachedBodyEmitter.this, pending); if (pending.remaining() > 0) return; } // fill pending try { - assert first; - if (!first) - return; - first = false; - ByteBuffer buffer = ByteBufferList.obtain((int)contentLength); + ByteBuffer buffer = allocator.allocate(); assert buffer.position() == 0; - DataInputStream din = new DataInputStream(cacheResponse.getBody()); - din.readFully(buffer.array(), buffer.arrayOffset(), (int)contentLength); - buffer.limit((int)contentLength); + FileInputStream din = cacheResponse.getBody(); + int read = din.read(buffer.array(), buffer.arrayOffset(), buffer.capacity()); + if (read == -1) { + ByteBufferList.reclaim(buffer); + allowEnd = true; + report(null); + return; + } + allocator.track(read); + buffer.limit(read); pending.add(buffer); - com.koushikdutta.async.Util.emitAllData(this, pending); - assert din.read() == -1; - allowEnd = true; - report(null); } catch (IOException e) { allowEnd = true; report(e); + return; } + com.koushikdutta.async.Util.emitAllData(this, pending); + if (pending.remaining() > 0) + return; + // this limits max throughput to 256k (aka max alloc) * 100 per second... + // roughly 25MB/s + getServer().postDelayed(spewRunnable, 10); } void spew() { - getServer().post(new Runnable() { - @Override - public void run() { - spewInternal(); - } - }); + getServer().post(spewRunnable); } @Override @@ -447,6 +455,8 @@ public class ResponseCacheMiddleware extends SimpleMiddleware { @Override protected void report(Exception e) { + // a 304 response will immediate call report/end since there is no body. + // prevent this from happening by waiting for the actual body to be spit out. if (!allowEnd) return; StreamUtility.closeQuietly(cacheResponse.getBody()); @@ -671,23 +681,23 @@ public class ResponseCacheMiddleware extends SimpleMiddleware { } @Override + public SSLEngine getSSLEngine() { + return null; + } + + @Override public X509Certificate[] getPeerCertificates() { return null; } } - private class CachedSocket extends DataEmitterBase implements AsyncSocket { - EntryCacheResponse cacheResponse; - long contentLength; - boolean paused; + private class CachedSocket extends CachedBodyEmitter implements AsyncSocket { boolean closed; - boolean first = true; - ByteBufferList pending = new ByteBufferList(); boolean open; CompletedCallback closedCallback; public CachedSocket(EntryCacheResponse cacheResponse, long contentLength) { - this.cacheResponse = cacheResponse; - this.contentLength = contentLength; + super(cacheResponse, contentLength); + allowEnd = true; } @Override @@ -695,19 +705,8 @@ public class ResponseCacheMiddleware extends SimpleMiddleware { } @Override - public boolean isChunked() { - return false; - } - - @Override - public void pause() { - paused = true; - } - - @Override protected void report(Exception e) { super.report(e); - StreamUtility.closeQuietly(cacheResponse.getBody()); if (closed) return; closed = true; @@ -715,54 +714,6 @@ public class ResponseCacheMiddleware extends SimpleMiddleware { closedCallback.onCompleted(e); } - void spewInternal() { - if (pending.remaining() > 0) { - com.koushikdutta.async.Util.emitAllData(CachedSocket.this, pending); - if (pending.remaining() > 0) - return; - } - - // fill pending - try { - assert first; - if (!first) - return; - first = false; - ByteBuffer buffer = ByteBufferList.obtain((int)contentLength); - assert buffer.position() == 0; - DataInputStream din = new DataInputStream(cacheResponse.getBody()); - din.readFully(buffer.array(), buffer.arrayOffset(), (int)contentLength); - buffer.limit((int)contentLength); - pending.add(buffer); - com.koushikdutta.async.Util.emitAllData(CachedSocket.this, pending); - assert din.read() == -1; - report(null); - } - catch (IOException e) { - report(e); - } - } - - void spew() { - getServer().post(new Runnable() { - @Override - public void run() { - spewInternal(); - } - }); - } - - @Override - public void resume() { - paused = false; - spew(); - } - - @Override - public boolean isPaused() { - return paused; - } - @Override public void write(ByteBuffer bb) { // it's gonna write headers and stuff... whatever |