blob: 37bc4966418e2abd88a12bf5a3b7e3cd860c274c (
plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
|
package com.koushikdutta.async;
import junit.framework.Assert;
import com.koushikdutta.async.callback.DataCallback;
public class DataEmitterStream implements AsyncInputStream {
DataCallback mPendingRead;
int mPendingReadLength;
ByteBufferList mPendingData = new ByteBufferList();
@Override
public void read(int count, DataCallback callback) {
Assert.assertNull(mPendingRead);
mPendingReadLength = count;
mPendingRead = callback;
mPendingData = new ByteBufferList();
mEmitter.setDataCallback(mMyHandler);
}
DataCallback mMyHandler = new DataCallback() {
@Override
public void onDataAvailable(DataEmitter emitter, ByteBufferList bb) {
// if we're registered for data, we must be waiting for a read
Assert.assertNotNull(mPendingRead);
do {
int need = Math.min(bb.remaining(), mPendingReadLength - mPendingData.remaining());
mPendingData.add(bb.get(need));
}
while (handlePendingData() && mPendingRead != null);
}
};
private boolean handlePendingData() {
if (mPendingReadLength > mPendingData.remaining())
return false;
DataCallback pendingRead = mPendingRead;
mPendingRead = null;
pendingRead.onDataAvailable(mEmitter, mPendingData);
return true;
}
DataEmitter mEmitter;
public DataEmitterStream(DataEmitter emitter) {
Assert.assertFalse(emitter.isChunked());
mEmitter = emitter;
}
}
|