aboutsummaryrefslogtreecommitdiffstats
path: root/AndroidAsync/src/com/koushikdutta/async/http/ResponseCacheMiddleware.java
diff options
context:
space:
mode:
Diffstat (limited to 'AndroidAsync/src/com/koushikdutta/async/http/ResponseCacheMiddleware.java')
-rw-r--r--AndroidAsync/src/com/koushikdutta/async/http/ResponseCacheMiddleware.java155
1 files changed, 53 insertions, 102 deletions
diff --git a/AndroidAsync/src/com/koushikdutta/async/http/ResponseCacheMiddleware.java b/AndroidAsync/src/com/koushikdutta/async/http/ResponseCacheMiddleware.java
index cfa693f..3715fb7 100644
--- a/AndroidAsync/src/com/koushikdutta/async/http/ResponseCacheMiddleware.java
+++ b/AndroidAsync/src/com/koushikdutta/async/http/ResponseCacheMiddleware.java
@@ -8,24 +8,23 @@ import com.koushikdutta.async.AsyncServer;
import com.koushikdutta.async.AsyncSocket;
import com.koushikdutta.async.ByteBufferList;
import com.koushikdutta.async.DataEmitter;
-import com.koushikdutta.async.DataEmitterBase;
import com.koushikdutta.async.FilteredDataEmitter;
import com.koushikdutta.async.Util;
import com.koushikdutta.async.callback.CompletedCallback;
import com.koushikdutta.async.callback.WritableCallback;
import com.koushikdutta.async.future.Cancellable;
import com.koushikdutta.async.future.SimpleCancellable;
-import com.koushikdutta.async.http.libcore.Charsets;
+import com.koushikdutta.async.util.Charsets;
import com.koushikdutta.async.http.libcore.RawHeaders;
import com.koushikdutta.async.http.libcore.ResponseHeaders;
import com.koushikdutta.async.http.libcore.ResponseSource;
import com.koushikdutta.async.http.libcore.StrictLineReader;
+import com.koushikdutta.async.util.Allocator;
import com.koushikdutta.async.util.FileCache;
import com.koushikdutta.async.util.StreamUtility;
import java.io.BufferedWriter;
import java.io.ByteArrayInputStream;
-import java.io.DataInputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
@@ -35,7 +34,6 @@ import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.io.Writer;
import java.net.CacheResponse;
-import java.net.URI;
import java.nio.ByteBuffer;
import java.security.cert.Certificate;
import java.security.cert.CertificateEncodingException;
@@ -45,6 +43,8 @@ import java.security.cert.X509Certificate;
import java.util.List;
import java.util.Map;
+import javax.net.ssl.SSLEngine;
+
public class ResponseCacheMiddleware extends SimpleMiddleware {
public static final int ENTRY_METADATA = 0;
public static final int ENTRY_BODY = 1;
@@ -147,6 +147,9 @@ public class ResponseCacheMiddleware extends SimpleMiddleware {
RawHeaders rawResponseHeaders = RawHeaders.fromMultimap(responseHeadersMap);
ResponseHeaders cachedResponseHeaders = new ResponseHeaders(data.request.getUri(), rawResponseHeaders);
+ rawResponseHeaders.set("Content-Length", String.valueOf(contentLength));
+ rawResponseHeaders.removeAll("Content-Encoding");
+ rawResponseHeaders.removeAll("Transfer-Encoding");
cachedResponseHeaders.setLocalTimestamps(System.currentTimeMillis(), System.currentTimeMillis());
long now = System.currentTimeMillis();
@@ -155,9 +158,6 @@ public class ResponseCacheMiddleware extends SimpleMiddleware {
if (responseSource == ResponseSource.CACHE) {
data.request.logi("Response retrieved from cache");
final CachedSocket socket = entry.isHttps() ? new CachedSSLSocket(candidate, contentLength) : new CachedSocket(candidate, contentLength);
- rawResponseHeaders.removeAll("Content-Encoding");
- rawResponseHeaders.removeAll("Transfer-Encoding");
- rawResponseHeaders.set("Content-Length", String.valueOf(contentLength));
socket.pending.add(ByteBuffer.wrap(rawResponseHeaders.toHeaderString().getBytes()));
server.post(new Runnable() {
@@ -219,14 +219,14 @@ public class ResponseCacheMiddleware extends SimpleMiddleware {
if (cacheData != null) {
if (cacheData.cachedResponseHeaders.validate(data.headers)) {
data.request.logi("Serving response from conditional cache");
+ data.headers.getHeaders().removeAll("Content-Length");
data.headers = cacheData.cachedResponseHeaders.combine(data.headers);
data.headers.getHeaders().setStatusLine(cacheData.cachedResponseHeaders.getHeaders().getStatusLine());
data.headers.getHeaders().set(SERVED_FROM, CONDITIONAL_CACHE);
conditionalCacheHitCount++;
- BodySpewer bodySpewer = new BodySpewer(cacheData.contentLength);
- bodySpewer.cacheResponse = cacheData.candidate;
+ CachedBodyEmitter bodySpewer = new CachedBodyEmitter(cacheData.candidate, cacheData.contentLength);
bodySpewer.setDataEmitter(data.bodyEmitter);
data.bodyEmitter = bodySpewer;
bodySpewer.spew();
@@ -341,7 +341,7 @@ public class ResponseCacheMiddleware extends SimpleMiddleware {
while (!bb.isEmpty()) {
ByteBuffer b = bb.remove();
try {
- outputStream.write(b.array(), b.arrayOffset() + b.position(), b.remaining());
+ ByteBufferList.writeOutputStream(outputStream, b);
}
finally {
copy.add(b);
@@ -384,54 +384,62 @@ public class ResponseCacheMiddleware extends SimpleMiddleware {
}
}
- private static class BodySpewer extends FilteredDataEmitter {
- long contentLength;
+ private static class CachedBodyEmitter extends FilteredDataEmitter {
EntryCacheResponse cacheResponse;
- boolean first = true;
ByteBufferList pending = new ByteBufferList();
- boolean paused;
+ private boolean paused;
+ private Allocator allocator = new Allocator();
boolean allowEnd;
- public BodySpewer(long contentLength) {
- this.contentLength = contentLength;
+ public CachedBodyEmitter(EntryCacheResponse cacheResponse, long contentLength) {
+ this.cacheResponse = cacheResponse;
+ allocator.setCurrentAlloc((int)contentLength);
}
+ Runnable spewRunnable = new Runnable() {
+ @Override
+ public void run() {
+ spewInternal();
+ }
+ };
+
void spewInternal() {
if (pending.remaining() > 0) {
- com.koushikdutta.async.Util.emitAllData(BodySpewer.this, pending);
+ com.koushikdutta.async.Util.emitAllData(CachedBodyEmitter.this, pending);
if (pending.remaining() > 0)
return;
}
// fill pending
try {
- assert first;
- if (!first)
- return;
- first = false;
- ByteBuffer buffer = ByteBufferList.obtain((int)contentLength);
+ ByteBuffer buffer = allocator.allocate();
assert buffer.position() == 0;
- DataInputStream din = new DataInputStream(cacheResponse.getBody());
- din.readFully(buffer.array(), buffer.arrayOffset(), (int)contentLength);
- buffer.limit((int)contentLength);
+ FileInputStream din = cacheResponse.getBody();
+ int read = din.read(buffer.array(), buffer.arrayOffset(), buffer.capacity());
+ if (read == -1) {
+ ByteBufferList.reclaim(buffer);
+ allowEnd = true;
+ report(null);
+ return;
+ }
+ allocator.track(read);
+ buffer.limit(read);
pending.add(buffer);
- com.koushikdutta.async.Util.emitAllData(this, pending);
- assert din.read() == -1;
- allowEnd = true;
- report(null);
}
catch (IOException e) {
allowEnd = true;
report(e);
+ return;
}
+ com.koushikdutta.async.Util.emitAllData(this, pending);
+ if (pending.remaining() > 0)
+ return;
+ // this limits max throughput to 256k (aka max alloc) * 100 per second...
+ // roughly 25MB/s
+ getServer().postDelayed(spewRunnable, 10);
}
void spew() {
- getServer().post(new Runnable() {
- @Override
- public void run() {
- spewInternal();
- }
- });
+ getServer().post(spewRunnable);
}
@Override
@@ -447,6 +455,8 @@ public class ResponseCacheMiddleware extends SimpleMiddleware {
@Override
protected void report(Exception e) {
+ // a 304 response will immediate call report/end since there is no body.
+ // prevent this from happening by waiting for the actual body to be spit out.
if (!allowEnd)
return;
StreamUtility.closeQuietly(cacheResponse.getBody());
@@ -671,23 +681,23 @@ public class ResponseCacheMiddleware extends SimpleMiddleware {
}
@Override
+ public SSLEngine getSSLEngine() {
+ return null;
+ }
+
+ @Override
public X509Certificate[] getPeerCertificates() {
return null;
}
}
- private class CachedSocket extends DataEmitterBase implements AsyncSocket {
- EntryCacheResponse cacheResponse;
- long contentLength;
- boolean paused;
+ private class CachedSocket extends CachedBodyEmitter implements AsyncSocket {
boolean closed;
- boolean first = true;
- ByteBufferList pending = new ByteBufferList();
boolean open;
CompletedCallback closedCallback;
public CachedSocket(EntryCacheResponse cacheResponse, long contentLength) {
- this.cacheResponse = cacheResponse;
- this.contentLength = contentLength;
+ super(cacheResponse, contentLength);
+ allowEnd = true;
}
@Override
@@ -695,19 +705,8 @@ public class ResponseCacheMiddleware extends SimpleMiddleware {
}
@Override
- public boolean isChunked() {
- return false;
- }
-
- @Override
- public void pause() {
- paused = true;
- }
-
- @Override
protected void report(Exception e) {
super.report(e);
- StreamUtility.closeQuietly(cacheResponse.getBody());
if (closed)
return;
closed = true;
@@ -715,54 +714,6 @@ public class ResponseCacheMiddleware extends SimpleMiddleware {
closedCallback.onCompleted(e);
}
- void spewInternal() {
- if (pending.remaining() > 0) {
- com.koushikdutta.async.Util.emitAllData(CachedSocket.this, pending);
- if (pending.remaining() > 0)
- return;
- }
-
- // fill pending
- try {
- assert first;
- if (!first)
- return;
- first = false;
- ByteBuffer buffer = ByteBufferList.obtain((int)contentLength);
- assert buffer.position() == 0;
- DataInputStream din = new DataInputStream(cacheResponse.getBody());
- din.readFully(buffer.array(), buffer.arrayOffset(), (int)contentLength);
- buffer.limit((int)contentLength);
- pending.add(buffer);
- com.koushikdutta.async.Util.emitAllData(CachedSocket.this, pending);
- assert din.read() == -1;
- report(null);
- }
- catch (IOException e) {
- report(e);
- }
- }
-
- void spew() {
- getServer().post(new Runnable() {
- @Override
- public void run() {
- spewInternal();
- }
- });
- }
-
- @Override
- public void resume() {
- paused = false;
- spew();
- }
-
- @Override
- public boolean isPaused() {
- return paused;
- }
-
@Override
public void write(ByteBuffer bb) {
// it's gonna write headers and stuff... whatever