aboutsummaryrefslogtreecommitdiffstats
path: root/test_futures.py
diff options
context:
space:
mode:
Diffstat (limited to 'test_futures.py')
-rw-r--r--test_futures.py804
1 files changed, 804 insertions, 0 deletions
diff --git a/test_futures.py b/test_futures.py
new file mode 100644
index 0000000..d5b3499
--- /dev/null
+++ b/test_futures.py
@@ -0,0 +1,804 @@
+import os
+import subprocess
+import sys
+import threading
+import functools
+import contextlib
+import logging
+import re
+import time
+import gc
+import traceback
+from StringIO import StringIO
+from test import test_support
+
+from concurrent import futures
+from concurrent.futures._base import (
+ PENDING, RUNNING, CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED, Future)
+from concurrent.futures.thread import cpu_count
+
+try:
+ import unittest2 as unittest
+except ImportError:
+ import unittest
+
+
+def reap_threads(func):
+ """Use this function when threads are being used. This will
+ ensure that the threads are cleaned up even when the test fails.
+ If threading is unavailable this function does nothing.
+ """
+ @functools.wraps(func)
+ def decorator(*args):
+ key = test_support.threading_setup()
+ try:
+ return func(*args)
+ finally:
+ test_support.threading_cleanup(*key)
+ return decorator
+
+
+# Executing the interpreter in a subprocess
+def _assert_python(expected_success, *args, **env_vars):
+ cmd_line = [sys.executable]
+ if not env_vars:
+ cmd_line.append('-E')
+ # Need to preserve the original environment, for in-place testing of
+ # shared library builds.
+ env = os.environ.copy()
+ # But a special flag that can be set to override -- in this case, the
+ # caller is responsible to pass the full environment.
+ if env_vars.pop('__cleanenv', None):
+ env = {}
+ env.update(env_vars)
+ cmd_line.extend(args)
+ p = subprocess.Popen(cmd_line, stdin=subprocess.PIPE,
+ stdout=subprocess.PIPE, stderr=subprocess.PIPE,
+ env=env)
+ try:
+ out, err = p.communicate()
+ finally:
+ subprocess._cleanup()
+ p.stdout.close()
+ p.stderr.close()
+ rc = p.returncode
+ err = strip_python_stderr(err)
+ if (rc and expected_success) or (not rc and not expected_success):
+ raise AssertionError(
+ "Process return code is %d, "
+ "stderr follows:\n%s" % (rc, err.decode('ascii', 'ignore')))
+ return rc, out, err
+
+
+def assert_python_ok(*args, **env_vars):
+ """
+ Assert that running the interpreter with `args` and optional environment
+ variables `env_vars` is ok and return a (return code, stdout, stderr) tuple.
+ """
+ return _assert_python(True, *args, **env_vars)
+
+
+def strip_python_stderr(stderr):
+ """Strip the stderr of a Python process from potential debug output
+ emitted by the interpreter.
+
+ This will typically be run on the result of the communicate() method
+ of a subprocess.Popen object.
+ """
+ stderr = re.sub(r"\[\d+ refs\]\r?\n?$".encode(), "".encode(), stderr).strip()
+ return stderr
+
+
+@contextlib.contextmanager
+def captured_stderr():
+ """Return a context manager used by captured_stdout/stdin/stderr
+ that temporarily replaces the sys stream *stream_name* with a StringIO."""
+ logging_stream = StringIO()
+ handler = logging.StreamHandler(logging_stream)
+ logging.root.addHandler(handler)
+
+ try:
+ yield logging_stream
+ finally:
+ logging.root.removeHandler(handler)
+
+
+def create_future(state=PENDING, exception=None, result=None):
+ f = Future()
+ f._state = state
+ f._exception = exception
+ f._result = result
+ return f
+
+
+PENDING_FUTURE = create_future(state=PENDING)
+RUNNING_FUTURE = create_future(state=RUNNING)
+CANCELLED_FUTURE = create_future(state=CANCELLED)
+CANCELLED_AND_NOTIFIED_FUTURE = create_future(state=CANCELLED_AND_NOTIFIED)
+EXCEPTION_FUTURE = create_future(state=FINISHED, exception=IOError())
+SUCCESSFUL_FUTURE = create_future(state=FINISHED, result=42)
+
+
+def mul(x, y):
+ return x * y
+
+
+def sleep_and_raise(t):
+ time.sleep(t)
+ raise Exception('this is an exception')
+
+def sleep_and_print(t, msg):
+ time.sleep(t)
+ print(msg)
+ sys.stdout.flush()
+
+
+class ExecutorMixin:
+ worker_count = 5
+
+ def setUp(self):
+ self.t1 = time.time()
+ try:
+ self.executor = self.executor_type(max_workers=self.worker_count)
+ except NotImplementedError:
+ e = sys.exc_info()[1]
+ self.skipTest(str(e))
+ self._prime_executor()
+
+ def tearDown(self):
+ self.executor.shutdown(wait=True)
+ dt = time.time() - self.t1
+ if test_support.verbose:
+ print("%.2fs" % dt)
+ self.assertLess(dt, 60, "synchronization issue: test lasted too long")
+
+ def _prime_executor(self):
+ # Make sure that the executor is ready to do work before running the
+ # tests. This should reduce the probability of timeouts in the tests.
+ futures = [self.executor.submit(time.sleep, 0.1)
+ for _ in range(self.worker_count)]
+
+ for f in futures:
+ f.result()
+
+
+class ThreadPoolMixin(ExecutorMixin):
+ executor_type = futures.ThreadPoolExecutor
+
+
+class ProcessPoolMixin(ExecutorMixin):
+ executor_type = futures.ProcessPoolExecutor
+
+
+class ExecutorShutdownTest(unittest.TestCase):
+ def test_run_after_shutdown(self):
+ self.executor.shutdown()
+ self.assertRaises(RuntimeError,
+ self.executor.submit,
+ pow, 2, 5)
+
+ def test_interpreter_shutdown(self):
+ # Test the atexit hook for shutdown of worker threads and processes
+ rc, out, err = assert_python_ok('-c', """if 1:
+ from concurrent.futures import %s
+ from time import sleep
+ from test_futures import sleep_and_print
+ t = %s(5)
+ t.submit(sleep_and_print, 1.0, "apple")
+ """ % (self.executor_type.__name__, self.executor_type.__name__))
+ # Errors in atexit hooks don't change the process exit code, check
+ # stderr manually.
+ self.assertFalse(err)
+ self.assertEqual(out.strip(), "apple".encode())
+
+ def test_hang_issue12364(self):
+ fs = [self.executor.submit(time.sleep, 0.1) for _ in range(50)]
+ self.executor.shutdown()
+ for f in fs:
+ f.result()
+
+
+class ThreadPoolShutdownTest(ThreadPoolMixin, ExecutorShutdownTest):
+ def _prime_executor(self):
+ pass
+
+ def test_threads_terminate(self):
+ self.executor.submit(mul, 21, 2)
+ self.executor.submit(mul, 6, 7)
+ self.executor.submit(mul, 3, 14)
+ self.assertEqual(len(self.executor._threads), 3)
+ self.executor.shutdown()
+ for t in self.executor._threads:
+ t.join()
+
+ def test_context_manager_shutdown(self):
+ with futures.ThreadPoolExecutor(max_workers=5) as e:
+ executor = e
+ self.assertEqual(list(e.map(abs, range(-5, 5))),
+ [5, 4, 3, 2, 1, 0, 1, 2, 3, 4])
+
+ for t in executor._threads:
+ t.join()
+
+ def test_del_shutdown(self):
+ executor = futures.ThreadPoolExecutor(max_workers=5)
+ executor.map(abs, range(-5, 5))
+ threads = executor._threads
+ del executor
+ gc.collect()
+
+ for t in threads:
+ t.join()
+
+ def test_thread_names_assigned(self):
+ executor = futures.ThreadPoolExecutor(
+ max_workers=5, thread_name_prefix='SpecialPool')
+ executor.map(abs, range(-5, 5))
+ threads = executor._threads
+ del executor
+ gc.collect()
+
+ for t in threads:
+ self.assertRegexpMatches(t.name, r'^SpecialPool_[0-4]$')
+ t.join()
+
+ def test_thread_names_default(self):
+ executor = futures.ThreadPoolExecutor(max_workers=5)
+ executor.map(abs, range(-5, 5))
+ threads = executor._threads
+ del executor
+ gc.collect()
+
+ for t in threads:
+ # Ensure that our default name is reasonably sane and unique when
+ # no thread_name_prefix was supplied.
+ self.assertRegexpMatches(t.name, r'ThreadPoolExecutor-\d+_[0-4]$')
+ t.join()
+
+
+class ProcessPoolShutdownTest(ProcessPoolMixin, ExecutorShutdownTest):
+ def _prime_executor(self):
+ pass
+
+ def test_processes_terminate(self):
+ self.executor.submit(mul, 21, 2)
+ self.executor.submit(mul, 6, 7)
+ self.executor.submit(mul, 3, 14)
+ self.assertEqual(len(self.executor._processes), 5)
+ processes = self.executor._processes
+ self.executor.shutdown()
+
+ for p in processes:
+ p.join()
+
+ def test_context_manager_shutdown(self):
+ with futures.ProcessPoolExecutor(max_workers=5) as e:
+ processes = e._processes
+ self.assertEqual(list(e.map(abs, range(-5, 5))),
+ [5, 4, 3, 2, 1, 0, 1, 2, 3, 4])
+
+ for p in processes:
+ p.join()
+
+ def test_del_shutdown(self):
+ executor = futures.ProcessPoolExecutor(max_workers=5)
+ list(executor.map(abs, range(-5, 5)))
+ queue_management_thread = executor._queue_management_thread
+ processes = executor._processes
+ del executor
+ gc.collect()
+
+ queue_management_thread.join()
+ for p in processes:
+ p.join()
+
+
+class WaitTests(unittest.TestCase):
+
+ def test_first_completed(self):
+ future1 = self.executor.submit(mul, 21, 2)
+ future2 = self.executor.submit(time.sleep, 1.5)
+
+ done, not_done = futures.wait(
+ [CANCELLED_FUTURE, future1, future2],
+ return_when=futures.FIRST_COMPLETED)
+
+ self.assertEqual(set([future1]), done)
+ self.assertEqual(set([CANCELLED_FUTURE, future2]), not_done)
+
+ def test_first_completed_some_already_completed(self):
+ future1 = self.executor.submit(time.sleep, 1.5)
+
+ finished, pending = futures.wait(
+ [CANCELLED_AND_NOTIFIED_FUTURE, SUCCESSFUL_FUTURE, future1],
+ return_when=futures.FIRST_COMPLETED)
+
+ self.assertEqual(
+ set([CANCELLED_AND_NOTIFIED_FUTURE, SUCCESSFUL_FUTURE]),
+ finished)
+ self.assertEqual(set([future1]), pending)
+
+ def test_first_exception(self):
+ future1 = self.executor.submit(mul, 2, 21)
+ future2 = self.executor.submit(sleep_and_raise, 1.5)
+ future3 = self.executor.submit(time.sleep, 3)
+
+ finished, pending = futures.wait(
+ [future1, future2, future3],
+ return_when=futures.FIRST_EXCEPTION)
+
+ self.assertEqual(set([future1, future2]), finished)
+ self.assertEqual(set([future3]), pending)
+
+ def test_first_exception_some_already_complete(self):
+ future1 = self.executor.submit(divmod, 21, 0)
+ future2 = self.executor.submit(time.sleep, 1.5)
+
+ finished, pending = futures.wait(
+ [SUCCESSFUL_FUTURE,
+ CANCELLED_FUTURE,
+ CANCELLED_AND_NOTIFIED_FUTURE,
+ future1, future2],
+ return_when=futures.FIRST_EXCEPTION)
+
+ self.assertEqual(set([SUCCESSFUL_FUTURE,
+ CANCELLED_AND_NOTIFIED_FUTURE,
+ future1]), finished)
+ self.assertEqual(set([CANCELLED_FUTURE, future2]), pending)
+
+ def test_first_exception_one_already_failed(self):
+ future1 = self.executor.submit(time.sleep, 2)
+
+ finished, pending = futures.wait(
+ [EXCEPTION_FUTURE, future1],
+ return_when=futures.FIRST_EXCEPTION)
+
+ self.assertEqual(set([EXCEPTION_FUTURE]), finished)
+ self.assertEqual(set([future1]), pending)
+
+ def test_all_completed(self):
+ future1 = self.executor.submit(divmod, 2, 0)
+ future2 = self.executor.submit(mul, 2, 21)
+
+ finished, pending = futures.wait(
+ [SUCCESSFUL_FUTURE,
+ CANCELLED_AND_NOTIFIED_FUTURE,
+ EXCEPTION_FUTURE,
+ future1,
+ future2],
+ return_when=futures.ALL_COMPLETED)
+
+ self.assertEqual(set([SUCCESSFUL_FUTURE,
+ CANCELLED_AND_NOTIFIED_FUTURE,
+ EXCEPTION_FUTURE,
+ future1,
+ future2]), finished)
+ self.assertEqual(set(), pending)
+
+ def test_timeout(self):
+ future1 = self.executor.submit(mul, 6, 7)
+ future2 = self.executor.submit(time.sleep, 3)
+
+ finished, pending = futures.wait(
+ [CANCELLED_AND_NOTIFIED_FUTURE,
+ EXCEPTION_FUTURE,
+ SUCCESSFUL_FUTURE,
+ future1, future2],
+ timeout=1.5,
+ return_when=futures.ALL_COMPLETED)
+
+ self.assertEqual(set([CANCELLED_AND_NOTIFIED_FUTURE,
+ EXCEPTION_FUTURE,
+ SUCCESSFUL_FUTURE,
+ future1]), finished)
+ self.assertEqual(set([future2]), pending)
+
+
+class ThreadPoolWaitTests(ThreadPoolMixin, WaitTests):
+
+ def test_pending_calls_race(self):
+ # Issue #14406: multi-threaded race condition when waiting on all
+ # futures.
+ event = threading.Event()
+ def future_func():
+ event.wait()
+ oldswitchinterval = sys.getcheckinterval()
+ sys.setcheckinterval(1)
+ try:
+ fs = set(self.executor.submit(future_func) for i in range(100))
+ event.set()
+ futures.wait(fs, return_when=futures.ALL_COMPLETED)
+ finally:
+ sys.setcheckinterval(oldswitchinterval)
+
+
+class ProcessPoolWaitTests(ProcessPoolMixin, WaitTests):
+ pass
+
+
+class AsCompletedTests(unittest.TestCase):
+ # TODO(brian@sweetapp.com): Should have a test with a non-zero timeout.
+ def test_no_timeout(self):
+ future1 = self.executor.submit(mul, 2, 21)
+ future2 = self.executor.submit(mul, 7, 6)
+
+ completed = set(futures.as_completed(
+ [CANCELLED_AND_NOTIFIED_FUTURE,
+ EXCEPTION_FUTURE,
+ SUCCESSFUL_FUTURE,
+ future1, future2]))
+ self.assertEqual(set(
+ [CANCELLED_AND_NOTIFIED_FUTURE,
+ EXCEPTION_FUTURE,
+ SUCCESSFUL_FUTURE,
+ future1, future2]),
+ completed)
+
+ def test_zero_timeout(self):
+ future1 = self.executor.submit(time.sleep, 2)
+ completed_futures = set()
+ try:
+ for future in futures.as_completed(
+ [CANCELLED_AND_NOTIFIED_FUTURE,
+ EXCEPTION_FUTURE,
+ SUCCESSFUL_FUTURE,
+ future1],
+ timeout=0):
+ completed_futures.add(future)
+ except futures.TimeoutError:
+ pass
+
+ self.assertEqual(set([CANCELLED_AND_NOTIFIED_FUTURE,
+ EXCEPTION_FUTURE,
+ SUCCESSFUL_FUTURE]),
+ completed_futures)
+
+ def test_duplicate_futures(self):
+ # Issue 20367. Duplicate futures should not raise exceptions or give
+ # duplicate responses.
+ future1 = self.executor.submit(time.sleep, 2)
+ completed = [f for f in futures.as_completed([future1,future1])]
+ self.assertEqual(len(completed), 1)
+
+
+class ThreadPoolAsCompletedTests(ThreadPoolMixin, AsCompletedTests):
+ pass
+
+
+class ProcessPoolAsCompletedTests(ProcessPoolMixin, AsCompletedTests):
+ pass
+
+
+class ExecutorTest(unittest.TestCase):
+ # Executor.shutdown() and context manager usage is tested by
+ # ExecutorShutdownTest.
+ def test_submit(self):
+ future = self.executor.submit(pow, 2, 8)
+ self.assertEqual(256, future.result())
+
+ def test_submit_keyword(self):
+ future = self.executor.submit(mul, 2, y=8)
+ self.assertEqual(16, future.result())
+
+ def test_map(self):
+ self.assertEqual(
+ list(self.executor.map(pow, range(10), range(10))),
+ list(map(pow, range(10), range(10))))
+
+ def test_map_exception(self):
+ i = self.executor.map(divmod, [1, 1, 1, 1], [2, 3, 0, 5])
+ self.assertEqual(next(i), (0, 1))
+ self.assertEqual(next(i), (0, 1))
+ self.assertRaises(ZeroDivisionError, next, i)
+
+ def test_map_timeout(self):
+ results = []
+ try:
+ for i in self.executor.map(time.sleep,
+ [0, 0, 3],
+ timeout=1.5):
+ results.append(i)
+ except futures.TimeoutError:
+ pass
+ else:
+ self.fail('expected TimeoutError')
+
+ self.assertEqual([None, None], results)
+
+ def test_max_workers_negative(self):
+ for number in (0, -1):
+ with self.assertRaises(ValueError) as cm:
+ self.executor_type(max_workers=number)
+
+ assert str(cm.exception) == "max_workers must be greater than 0"
+
+
+class ThreadPoolExecutorTest(ThreadPoolMixin, ExecutorTest):
+ def test_map_submits_without_iteration(self):
+ """Tests verifying issue 11777."""
+ finished = []
+ def record_finished(n):
+ finished.append(n)
+
+ self.executor.map(record_finished, range(10))
+ self.executor.shutdown(wait=True)
+ self.assertEqual(len(finished), 10)
+
+ def test_default_workers(self):
+ executor = self.executor_type()
+ self.assertEqual(executor._max_workers,
+ (cpu_count() or 1) * 5)
+
+
+class ProcessPoolExecutorTest(ProcessPoolMixin, ExecutorTest):
+ pass
+
+
+class FutureTests(unittest.TestCase):
+ def test_done_callback_with_result(self):
+ callback_result = [None]
+ def fn(callback_future):
+ callback_result[0] = callback_future.result()
+
+ f = Future()
+ f.add_done_callback(fn)
+ f.set_result(5)
+ self.assertEqual(5, callback_result[0])
+
+ def test_done_callback_with_exception(self):
+ callback_exception = [None]
+ def fn(callback_future):
+ callback_exception[0] = callback_future.exception()
+
+ f = Future()
+ f.add_done_callback(fn)
+ f.set_exception(Exception('test'))
+ self.assertEqual(('test',), callback_exception[0].args)
+
+ def test_done_callback_with_cancel(self):
+ was_cancelled = [None]
+ def fn(callback_future):
+ was_cancelled[0] = callback_future.cancelled()
+
+ f = Future()
+ f.add_done_callback(fn)
+ self.assertTrue(f.cancel())
+ self.assertTrue(was_cancelled[0])
+
+ def test_done_callback_raises(self):
+ with captured_stderr() as stderr:
+ raising_was_called = [False]
+ raising_old_style_was_called = [False]
+ fn_was_called = [False]
+
+ def raising_fn(callback_future):
+ raising_was_called[0] = True
+ raise Exception('doh!')
+
+ def raising_old_style_fn(callback_future):
+ raising_old_style_was_called[0] = True
+ class OldStyle: # Does not inherit from object
+ def __str__(self):
+ return 'doh!'
+ raise OldStyle()
+
+ def fn(callback_future):
+ fn_was_called[0] = True
+
+ f = Future()
+ f.add_done_callback(raising_fn)
+ f.add_done_callback(raising_old_style_fn)
+ f.add_done_callback(fn)
+ f.set_result(5)
+ self.assertTrue(raising_was_called)
+ self.assertTrue(raising_old_style_was_called)
+ self.assertTrue(fn_was_called)
+ self.assertIn('Exception: doh!', stderr.getvalue())
+ self.assertIn('OldStyle: doh!', stderr.getvalue())
+
+ def test_done_callback_already_successful(self):
+ callback_result = [None]
+ def fn(callback_future):
+ callback_result[0] = callback_future.result()
+
+ f = Future()
+ f.set_result(5)
+ f.add_done_callback(fn)
+ self.assertEqual(5, callback_result[0])
+
+ def test_done_callback_already_failed(self):
+ callback_exception = [None]
+ def fn(callback_future):
+ callback_exception[0] = callback_future.exception()
+
+ f = Future()
+ f.set_exception(Exception('test'))
+ f.add_done_callback(fn)
+ self.assertEqual(('test',), callback_exception[0].args)
+
+ def test_done_callback_already_cancelled(self):
+ was_cancelled = [None]
+ def fn(callback_future):
+ was_cancelled[0] = callback_future.cancelled()
+
+ f = Future()
+ self.assertTrue(f.cancel())
+ f.add_done_callback(fn)
+ self.assertTrue(was_cancelled[0])
+
+ def test_repr(self):
+ self.assertRegexpMatches(repr(PENDING_FUTURE),
+ '<Future at 0x[0-9a-f]+L? state=pending>')
+ self.assertRegexpMatches(repr(RUNNING_FUTURE),
+ '<Future at 0x[0-9a-f]+L? state=running>')
+ self.assertRegexpMatches(repr(CANCELLED_FUTURE),
+ '<Future at 0x[0-9a-f]+L? state=cancelled>')
+ self.assertRegexpMatches(repr(CANCELLED_AND_NOTIFIED_FUTURE),
+ '<Future at 0x[0-9a-f]+L? state=cancelled>')
+ self.assertRegexpMatches(
+ repr(EXCEPTION_FUTURE),
+ '<Future at 0x[0-9a-f]+L? state=finished raised IOError>')
+ self.assertRegexpMatches(
+ repr(SUCCESSFUL_FUTURE),
+ '<Future at 0x[0-9a-f]+L? state=finished returned int>')
+
+ def test_cancel(self):
+ f1 = create_future(state=PENDING)
+ f2 = create_future(state=RUNNING)
+ f3 = create_future(state=CANCELLED)
+ f4 = create_future(state=CANCELLED_AND_NOTIFIED)
+ f5 = create_future(state=FINISHED, exception=IOError())
+ f6 = create_future(state=FINISHED, result=5)
+
+ self.assertTrue(f1.cancel())
+ self.assertEqual(f1._state, CANCELLED)
+
+ self.assertFalse(f2.cancel())
+ self.assertEqual(f2._state, RUNNING)
+
+ self.assertTrue(f3.cancel())
+ self.assertEqual(f3._state, CANCELLED)
+
+ self.assertTrue(f4.cancel())
+ self.assertEqual(f4._state, CANCELLED_AND_NOTIFIED)
+
+ self.assertFalse(f5.cancel())
+ self.assertEqual(f5._state, FINISHED)
+
+ self.assertFalse(f6.cancel())
+ self.assertEqual(f6._state, FINISHED)
+
+ def test_cancelled(self):
+ self.assertFalse(PENDING_FUTURE.cancelled())
+ self.assertFalse(RUNNING_FUTURE.cancelled())
+ self.assertTrue(CANCELLED_FUTURE.cancelled())
+ self.assertTrue(CANCELLED_AND_NOTIFIED_FUTURE.cancelled())
+ self.assertFalse(EXCEPTION_FUTURE.cancelled())
+ self.assertFalse(SUCCESSFUL_FUTURE.cancelled())
+
+ def test_done(self):
+ self.assertFalse(PENDING_FUTURE.done())
+ self.assertFalse(RUNNING_FUTURE.done())
+ self.assertTrue(CANCELLED_FUTURE.done())
+ self.assertTrue(CANCELLED_AND_NOTIFIED_FUTURE.done())
+ self.assertTrue(EXCEPTION_FUTURE.done())
+ self.assertTrue(SUCCESSFUL_FUTURE.done())
+
+ def test_running(self):
+ self.assertFalse(PENDING_FUTURE.running())
+ self.assertTrue(RUNNING_FUTURE.running())
+ self.assertFalse(CANCELLED_FUTURE.running())
+ self.assertFalse(CANCELLED_AND_NOTIFIED_FUTURE.running())
+ self.assertFalse(EXCEPTION_FUTURE.running())
+ self.assertFalse(SUCCESSFUL_FUTURE.running())
+
+ def test_result_with_timeout(self):
+ self.assertRaises(futures.TimeoutError,
+ PENDING_FUTURE.result, timeout=0)
+ self.assertRaises(futures.TimeoutError,
+ RUNNING_FUTURE.result, timeout=0)
+ self.assertRaises(futures.CancelledError,
+ CANCELLED_FUTURE.result, timeout=0)
+ self.assertRaises(futures.CancelledError,
+ CANCELLED_AND_NOTIFIED_FUTURE.result, timeout=0)
+ self.assertRaises(IOError, EXCEPTION_FUTURE.result, timeout=0)
+ self.assertEqual(SUCCESSFUL_FUTURE.result(timeout=0), 42)
+
+ def test_result_with_success(self):
+ # TODO(brian@sweetapp.com): This test is timing dependant.
+ def notification():
+ # Wait until the main thread is waiting for the result.
+ time.sleep(1)
+ f1.set_result(42)
+
+ f1 = create_future(state=PENDING)
+ t = threading.Thread(target=notification)
+ t.start()
+
+ self.assertEqual(f1.result(timeout=5), 42)
+
+ def test_result_with_cancel(self):
+ # TODO(brian@sweetapp.com): This test is timing dependant.
+ def notification():
+ # Wait until the main thread is waiting for the result.
+ time.sleep(1)
+ f1.cancel()
+
+ f1 = create_future(state=PENDING)
+ t = threading.Thread(target=notification)
+ t.start()
+
+ self.assertRaises(futures.CancelledError, f1.result, timeout=5)
+
+ def test_exception_with_timeout(self):
+ self.assertRaises(futures.TimeoutError,
+ PENDING_FUTURE.exception, timeout=0)
+ self.assertRaises(futures.TimeoutError,
+ RUNNING_FUTURE.exception, timeout=0)
+ self.assertRaises(futures.CancelledError,
+ CANCELLED_FUTURE.exception, timeout=0)
+ self.assertRaises(futures.CancelledError,
+ CANCELLED_AND_NOTIFIED_FUTURE.exception, timeout=0)
+ self.assertTrue(isinstance(EXCEPTION_FUTURE.exception(timeout=0),
+ IOError))
+ self.assertEqual(SUCCESSFUL_FUTURE.exception(timeout=0), None)
+
+ def test_exception_with_success(self):
+ def notification():
+ # Wait until the main thread is waiting for the exception.
+ time.sleep(1)
+ with f1._condition:
+ f1._state = FINISHED
+ f1._exception = IOError()
+ f1._condition.notify_all()
+
+ f1 = create_future(state=PENDING)
+ t = threading.Thread(target=notification)
+ t.start()
+
+ self.assertTrue(isinstance(f1.exception(timeout=5), IOError))
+
+ def test_old_style_exception(self):
+ class OldStyle: # Does not inherit from object
+ def __str__(self):
+ return 'doh!'
+ callback_exc_info = [None]
+ def fn(callback_future):
+ callback_exc_info[0] = callback_future.exception_info()
+ f = Future()
+ f.add_done_callback(fn)
+ try:
+ raise OldStyle()
+ except OldStyle:
+ want_exc_info = sys.exc_info()
+ f.set_exception_info(*want_exc_info[1:])
+ self.assertEqual(f.exception_info(), want_exc_info[1:])
+ self.assertEqual(callback_exc_info[0], want_exc_info[1:])
+ try:
+ f.result()
+ except OldStyle:
+ got_exc_info = sys.exc_info()
+ else:
+ self.fail('OldStyle exception not raised')
+ self.assertEqual(got_exc_info[:2], want_exc_info[:2])
+ got_tb = traceback.extract_tb(got_exc_info[2])
+ want_tb = traceback.extract_tb(want_exc_info[2])
+ self.assertEqual(got_tb[-len(want_tb):], want_tb)
+
+@reap_threads
+def test_main():
+ try:
+ test_support.run_unittest(ProcessPoolExecutorTest,
+ ThreadPoolExecutorTest,
+ ProcessPoolWaitTests,
+ ThreadPoolWaitTests,
+ ProcessPoolAsCompletedTests,
+ ThreadPoolAsCompletedTests,
+ FutureTests,
+ ProcessPoolShutdownTest,
+ ThreadPoolShutdownTest)
+ finally:
+ test_support.reap_children()
+
+if __name__ == "__main__":
+ test_main()