1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
|
package com.koushikdutta.async;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import junit.framework.Assert;
import com.koushikdutta.async.callback.CompletedCallback;
import com.koushikdutta.async.callback.DataCallback;
import com.koushikdutta.async.callback.WritableCallback;
public class Util {
public static void emitAllData(DataEmitter emitter, ByteBufferList list) {
int remaining;
DataCallback handler = null;
while (!emitter.isPaused() && (handler = emitter.getDataCallback()) != null && (remaining = list.remaining()) > 0) {
handler.onDataAvailable(emitter, list);
if (remaining == list.remaining() && handler == emitter.getDataCallback()) {
Assert.fail("mDataHandler failed to consume data, yet remains the mDataHandler.");
break;
}
}
if (list.remaining() != 0 && !emitter.isPaused()) {
System.out.println("Data: " + list.peekString());
System.out.println("handler: " + handler);
Assert.fail();
}
}
public static void emitAllData(DataEmitter emitter, ByteBuffer b) {
ByteBufferList list = new ByteBufferList();
list.add(b);
emitAllData(emitter, list);
// previous call makes sure list is empty,
// so this is safe to clear
b.position(b.limit());
}
public static void pump(final InputStream is, final DataSink ds, final CompletedCallback callback) {
final WritableCallback cb = new WritableCallback() {
private void close() {
try {
is.close();
}
catch (IOException e) {
e.printStackTrace();
}
}
byte[] buffer = new byte[8192];
ByteBuffer pending = ByteBuffer.wrap(buffer);
{
pending.limit(pending.position());
}
@Override
public void onWriteable() {
try {
int remaining;
// long start = System.currentTimeMillis();
do {
if (pending.remaining() == 0) {
int read = is.read(buffer);
if (read == -1) {
close();
callback.onCompleted(null);
return;
}
pending.position(0);
pending.limit(read);
}
remaining = pending.remaining();
ds.write(pending);
}
while (remaining != pending.remaining());
}
catch (Exception e) {
close();
callback.onCompleted(e);
return;
}
}
};
ds.setWriteableCallback(cb);
ds.setClosedCallback(callback);
cb.onWriteable();
}
public static void pump(final DataEmitter emitter, final DataSink sink, final CompletedCallback callback) {
emitter.setDataCallback(new DataCallback() {
@Override
public void onDataAvailable(DataEmitter emitter, ByteBufferList bb) {
sink.write(bb);
if (bb.remaining() > 0)
emitter.pause();
}
});
sink.setWriteableCallback(new WritableCallback() {
@Override
public void onWriteable() {
emitter.resume();
}
});
emitter.setEndCallback(callback);
sink.setClosedCallback(callback);
}
public static void stream(AsyncSocket s1, AsyncSocket s2, CompletedCallback callback) {
pump(s1, s2, callback);
pump(s2, s1, callback);
}
public static void pump(final File file, final DataSink ds, final CompletedCallback callback) {
try {
if (file == null || ds == null) {
callback.onCompleted(null);
return;
}
final InputStream is = new FileInputStream(file);
pump(is, ds, new CompletedCallback() {
@Override
public void onCompleted(Exception ex) {
try {
is.close();
callback.onCompleted(ex);
}
catch (IOException e) {
callback.onCompleted(e);
}
}
});
}
catch (Exception e) {
callback.onCompleted(e);
}
}
public static void writeAll(final DataSink sink, final ByteBufferList bb, final CompletedCallback callback) {
sink.setWriteableCallback(new WritableCallback() {
@Override
public void onWriteable() {
if (bb.remaining() == 0)
return;
sink.write(bb);
if (bb.remaining() == 0 && callback != null)
callback.onCompleted(null);
}
});
sink.write(bb);
if (bb.remaining() == 0 && callback != null)
callback.onCompleted(null);
}
public static void writeAll(DataSink sink, byte[] bytes, CompletedCallback callback) {
ByteBuffer bb = ByteBuffer.wrap(bytes);
ByteBufferList bbl = new ByteBufferList();
bbl.add(bb);
writeAll(sink, bbl, callback);
}
public static AsyncSocket getWrappedSocket(AsyncSocket socket, Class wrappedClass) {
if (wrappedClass.isInstance(socket))
return socket;
while (socket instanceof WrapperSocket) {
socket = ((WrapperSocket)socket).getSocket();
if (wrappedClass.isInstance(socket))
return socket;
}
return null;
}
}
|