aboutsummaryrefslogtreecommitdiffstats
path: root/AndroidAsync
diff options
context:
space:
mode:
authorJustin Huntington <justinhuntington@gmail.com>2014-10-23 19:37:47 -0400
committerJustin Huntington <justinhuntington@gmail.com>2014-10-24 19:35:18 -0400
commit910e3c8b05e2f46115704d9b6049c1dc46dbb18a (patch)
treecd2e2eb9e01294dd0f3858a19744b36665484255 /AndroidAsync
parent806eb7a8b55fe345bbcd0a2a229333b0b5bf67a5 (diff)
downloadAndroidAsync-910e3c8b05e2f46115704d9b6049c1dc46dbb18a.tar.gz
AndroidAsync-910e3c8b05e2f46115704d9b6049c1dc46dbb18a.tar.bz2
AndroidAsync-910e3c8b05e2f46115704d9b6049c1dc46dbb18a.zip
name threads created by AsyncServer.synchronousWorkers
Diffstat (limited to 'AndroidAsync')
-rw-r--r--AndroidAsync/src/com/koushikdutta/async/AsyncServer.java75
1 files changed, 55 insertions, 20 deletions
diff --git a/AndroidAsync/src/com/koushikdutta/async/AsyncServer.java b/AndroidAsync/src/com/koushikdutta/async/AsyncServer.java
index 4503964..1d7ecff 100644
--- a/AndroidAsync/src/com/koushikdutta/async/AsyncServer.java
+++ b/AndroidAsync/src/com/koushikdutta/async/AsyncServer.java
@@ -31,7 +31,12 @@ import java.util.Set;
import java.util.WeakHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Semaphore;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
public class AsyncServer {
public static final String LOGTAG = "NIO";
@@ -85,7 +90,7 @@ public class AsyncServer {
catch (Throwable ex) {
}
}
-
+
static AsyncServer mInstance = new AsyncServer();
public static AsyncServer getDefault() {
return mInstance;
@@ -114,7 +119,7 @@ public class AsyncServer {
ckey.attach(handler);
handler.setup(this, ckey);
}
-
+
public void removeAllCallbacks(Object scheduled) {
synchronized (this) {
mQueue.remove(scheduled);
@@ -134,7 +139,7 @@ public class AsyncServer {
}
});
}
-
+
public Object postDelayed(Runnable runnable, long delay) {
Scheduled s;
synchronized (this) {
@@ -161,11 +166,11 @@ public class AsyncServer {
}
return s;
}
-
+
public Object post(Runnable runnable) {
return postDelayed(runnable, 0);
}
-
+
public Object post(final CompletedCallback callback, final Exception e) {
return post(new Runnable() {
@Override
@@ -174,7 +179,7 @@ public class AsyncServer {
}
});
}
-
+
public void run(final Runnable runnable) {
if (Thread.currentThread() == mAffinity) {
post(runnable);
@@ -263,7 +268,7 @@ public class AsyncServer {
catch (Exception e) {
}
}
-
+
protected void onDataReceived(int transmitted) {
}
@@ -335,7 +340,7 @@ public class AsyncServer {
SocketChannel socket;
ConnectCallback callback;
}
-
+
private ConnectFuture connectResolvedInetSocketAddress(final InetSocketAddress address, final ConnectCallback callback) {
final ConnectFuture cancel = new ConnectFuture();
assert !address.isUnresolved();
@@ -396,7 +401,14 @@ public class AsyncServer {
return connectSocket(InetSocketAddress.createUnresolved(host, port), callback);
}
- private static ExecutorService synchronousWorkers = Executors.newFixedThreadPool(4);
+ private static ExecutorService newSynchronousWorkers() {
+ ThreadFactory tf = new NamedThreadFactory("AsyncServer-worker-");
+ ThreadPoolExecutor tpe = new ThreadPoolExecutor(4, 4, 0L,
+ TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), tf);
+ return tpe;
+ }
+
+ private static ExecutorService synchronousWorkers = newSynchronousWorkers();
public Future<InetAddress[]> getAllByName(final String host) {
final SimpleFuture<InetAddress[]> ret = new SimpleFuture<InetAddress[]>();
synchronousWorkers.execute(new Runnable() {
@@ -487,7 +499,7 @@ public class AsyncServer {
});
return handler;
}
-
+
public AsyncDatagramSocket connectDatagram(final SocketAddress remote) throws IOException {
final DatagramChannel socket = DatagramChannel.open();
final AsyncDatagramSocket handler = new AsyncDatagramSocket();
@@ -509,7 +521,7 @@ public class AsyncServer {
});
return handler;
}
-
+
final static WeakHashMap<Thread, AsyncServer> mServers = new WeakHashMap<Thread, AsyncServer>();
private boolean addMe() {
@@ -527,7 +539,7 @@ public class AsyncServer {
public static AsyncServer getCurrentThreadServer() {
return mServers.get(Thread.currentThread());
}
-
+
Thread mAffinity;
private void run(boolean newThread) {
final SelectorWrapper selector;
@@ -596,10 +608,10 @@ public class AsyncServer {
}
return;
}
-
+
run(this, selector, queue);
}
-
+
private static void run(final AsyncServer server, final SelectorWrapper selector, final PriorityQueue<Scheduled> queue) {
// Log.i(LOGTAG, "****AsyncServer is starting.****");
// at this point, this local queue and selector are owned
@@ -666,11 +678,11 @@ public class AsyncServer {
catch (Exception e) {
}
}
-
+
private static final long QUEUE_EMPTY = Long.MAX_VALUE;
private static long lockAndRunQueue(final AsyncServer server, final PriorityQueue<Scheduled> queue) {
long wait = QUEUE_EMPTY;
-
+
// find the first item we can actually run
while (true) {
Scheduled run = null;
@@ -689,10 +701,10 @@ public class AsyncServer {
}
}
}
-
+
if (run == null)
break;
-
+
run.runnable.run();
}
@@ -834,11 +846,11 @@ public class AsyncServer {
}
});
}
-
+
public Thread getAffinity() {
return mAffinity;
}
-
+
public boolean isAffinityThread() {
return mAffinity == Thread.currentThread();
}
@@ -847,4 +859,27 @@ public class AsyncServer {
Thread affinity = mAffinity;
return affinity == null || affinity == Thread.currentThread();
}
+
+ private static class NamedThreadFactory implements ThreadFactory {
+ private final ThreadGroup group;
+ private final AtomicInteger threadNumber = new AtomicInteger(1);
+ private final String namePrefix;
+
+ NamedThreadFactory(String namePrefix) {
+ SecurityManager s = System.getSecurityManager();
+ group = (s != null) ? s.getThreadGroup() :
+ Thread.currentThread().getThreadGroup();
+ this.namePrefix = namePrefix;
+ }
+
+ public Thread newThread(Runnable r) {
+ Thread t = new Thread(group, r,
+ namePrefix + threadNumber.getAndIncrement(), 0);
+ if (t.isDaemon()) t.setDaemon(false);
+ if (t.getPriority() != Thread.NORM_PRIORITY) {
+ t.setPriority(Thread.NORM_PRIORITY);
+ }
+ return t;
+ }
+ }
}