diff options
author | Justin Huntington <justinhuntington@gmail.com> | 2014-10-23 19:37:47 -0400 |
---|---|---|
committer | Justin Huntington <justinhuntington@gmail.com> | 2014-10-24 19:35:18 -0400 |
commit | 910e3c8b05e2f46115704d9b6049c1dc46dbb18a (patch) | |
tree | cd2e2eb9e01294dd0f3858a19744b36665484255 | |
parent | 806eb7a8b55fe345bbcd0a2a229333b0b5bf67a5 (diff) | |
download | AndroidAsync-910e3c8b05e2f46115704d9b6049c1dc46dbb18a.tar.gz AndroidAsync-910e3c8b05e2f46115704d9b6049c1dc46dbb18a.tar.bz2 AndroidAsync-910e3c8b05e2f46115704d9b6049c1dc46dbb18a.zip |
name threads created by AsyncServer.synchronousWorkers
-rw-r--r-- | AndroidAsync/src/com/koushikdutta/async/AsyncServer.java | 75 |
1 files changed, 55 insertions, 20 deletions
diff --git a/AndroidAsync/src/com/koushikdutta/async/AsyncServer.java b/AndroidAsync/src/com/koushikdutta/async/AsyncServer.java index 4503964..1d7ecff 100644 --- a/AndroidAsync/src/com/koushikdutta/async/AsyncServer.java +++ b/AndroidAsync/src/com/koushikdutta/async/AsyncServer.java @@ -31,7 +31,12 @@ import java.util.Set; import java.util.WeakHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.Semaphore; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; public class AsyncServer { public static final String LOGTAG = "NIO"; @@ -85,7 +90,7 @@ public class AsyncServer { catch (Throwable ex) { } } - + static AsyncServer mInstance = new AsyncServer(); public static AsyncServer getDefault() { return mInstance; @@ -114,7 +119,7 @@ public class AsyncServer { ckey.attach(handler); handler.setup(this, ckey); } - + public void removeAllCallbacks(Object scheduled) { synchronized (this) { mQueue.remove(scheduled); @@ -134,7 +139,7 @@ public class AsyncServer { } }); } - + public Object postDelayed(Runnable runnable, long delay) { Scheduled s; synchronized (this) { @@ -161,11 +166,11 @@ public class AsyncServer { } return s; } - + public Object post(Runnable runnable) { return postDelayed(runnable, 0); } - + public Object post(final CompletedCallback callback, final Exception e) { return post(new Runnable() { @Override @@ -174,7 +179,7 @@ public class AsyncServer { } }); } - + public void run(final Runnable runnable) { if (Thread.currentThread() == mAffinity) { post(runnable); @@ -263,7 +268,7 @@ public class AsyncServer { catch (Exception e) { } } - + protected void onDataReceived(int transmitted) { } @@ -335,7 +340,7 @@ public class AsyncServer { SocketChannel socket; ConnectCallback callback; } - + private ConnectFuture connectResolvedInetSocketAddress(final InetSocketAddress address, final ConnectCallback callback) { final ConnectFuture cancel = new ConnectFuture(); assert !address.isUnresolved(); @@ -396,7 +401,14 @@ public class AsyncServer { return connectSocket(InetSocketAddress.createUnresolved(host, port), callback); } - private static ExecutorService synchronousWorkers = Executors.newFixedThreadPool(4); + private static ExecutorService newSynchronousWorkers() { + ThreadFactory tf = new NamedThreadFactory("AsyncServer-worker-"); + ThreadPoolExecutor tpe = new ThreadPoolExecutor(4, 4, 0L, + TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), tf); + return tpe; + } + + private static ExecutorService synchronousWorkers = newSynchronousWorkers(); public Future<InetAddress[]> getAllByName(final String host) { final SimpleFuture<InetAddress[]> ret = new SimpleFuture<InetAddress[]>(); synchronousWorkers.execute(new Runnable() { @@ -487,7 +499,7 @@ public class AsyncServer { }); return handler; } - + public AsyncDatagramSocket connectDatagram(final SocketAddress remote) throws IOException { final DatagramChannel socket = DatagramChannel.open(); final AsyncDatagramSocket handler = new AsyncDatagramSocket(); @@ -509,7 +521,7 @@ public class AsyncServer { }); return handler; } - + final static WeakHashMap<Thread, AsyncServer> mServers = new WeakHashMap<Thread, AsyncServer>(); private boolean addMe() { @@ -527,7 +539,7 @@ public class AsyncServer { public static AsyncServer getCurrentThreadServer() { return mServers.get(Thread.currentThread()); } - + Thread mAffinity; private void run(boolean newThread) { final SelectorWrapper selector; @@ -596,10 +608,10 @@ public class AsyncServer { } return; } - + run(this, selector, queue); } - + private static void run(final AsyncServer server, final SelectorWrapper selector, final PriorityQueue<Scheduled> queue) { // Log.i(LOGTAG, "****AsyncServer is starting.****"); // at this point, this local queue and selector are owned @@ -666,11 +678,11 @@ public class AsyncServer { catch (Exception e) { } } - + private static final long QUEUE_EMPTY = Long.MAX_VALUE; private static long lockAndRunQueue(final AsyncServer server, final PriorityQueue<Scheduled> queue) { long wait = QUEUE_EMPTY; - + // find the first item we can actually run while (true) { Scheduled run = null; @@ -689,10 +701,10 @@ public class AsyncServer { } } } - + if (run == null) break; - + run.runnable.run(); } @@ -834,11 +846,11 @@ public class AsyncServer { } }); } - + public Thread getAffinity() { return mAffinity; } - + public boolean isAffinityThread() { return mAffinity == Thread.currentThread(); } @@ -847,4 +859,27 @@ public class AsyncServer { Thread affinity = mAffinity; return affinity == null || affinity == Thread.currentThread(); } + + private static class NamedThreadFactory implements ThreadFactory { + private final ThreadGroup group; + private final AtomicInteger threadNumber = new AtomicInteger(1); + private final String namePrefix; + + NamedThreadFactory(String namePrefix) { + SecurityManager s = System.getSecurityManager(); + group = (s != null) ? s.getThreadGroup() : + Thread.currentThread().getThreadGroup(); + this.namePrefix = namePrefix; + } + + public Thread newThread(Runnable r) { + Thread t = new Thread(group, r, + namePrefix + threadNumber.getAndIncrement(), 0); + if (t.isDaemon()) t.setDaemon(false); + if (t.getPriority() != Thread.NORM_PRIORITY) { + t.setPriority(Thread.NORM_PRIORITY); + } + return t; + } + } } |