aboutsummaryrefslogtreecommitdiffstats
path: root/AndroidAsync/src/com/koushikdutta/async/DataEmitterStream.java
blob: 37bc4966418e2abd88a12bf5a3b7e3cd860c274c (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
package com.koushikdutta.async;

import junit.framework.Assert;

import com.koushikdutta.async.callback.DataCallback;

public class DataEmitterStream implements AsyncInputStream {
    DataCallback mPendingRead;
    int mPendingReadLength;
    ByteBufferList mPendingData = new ByteBufferList();
    @Override
    public void read(int count, DataCallback callback) {
        Assert.assertNull(mPendingRead);    
        mPendingReadLength = count;
        mPendingRead = callback;
        mPendingData = new ByteBufferList();
        mEmitter.setDataCallback(mMyHandler);
    }
    
    DataCallback mMyHandler = new DataCallback() {
        @Override
        public void onDataAvailable(DataEmitter emitter, ByteBufferList bb) {
            // if we're registered for data, we must be waiting for a read
            Assert.assertNotNull(mPendingRead);
            do {
                int need = Math.min(bb.remaining(), mPendingReadLength - mPendingData.remaining());
                mPendingData.add(bb.get(need));
            }
            while (handlePendingData() && mPendingRead != null);
        }
    };

    private boolean handlePendingData() {
        if (mPendingReadLength > mPendingData.remaining())
            return false;

        DataCallback pendingRead = mPendingRead;
        mPendingRead = null;
        pendingRead.onDataAvailable(mEmitter, mPendingData);

        return true;
    }
    
    DataEmitter mEmitter;
    public DataEmitterStream(DataEmitter emitter) {
        Assert.assertFalse(emitter.isChunked());
        mEmitter = emitter;
    }
}