current_thread_executor.py 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115
  1. import queue
  2. import sys
  3. import threading
  4. from concurrent.futures import Executor, Future
  5. from typing import TYPE_CHECKING, Any, Callable, TypeVar, Union
  6. if sys.version_info >= (3, 10):
  7. from typing import ParamSpec
  8. else:
  9. from typing_extensions import ParamSpec
  10. _T = TypeVar("_T")
  11. _P = ParamSpec("_P")
  12. _R = TypeVar("_R")
  13. class _WorkItem:
  14. """
  15. Represents an item needing to be run in the executor.
  16. Copied from ThreadPoolExecutor (but it's private, so we're not going to rely on importing it)
  17. """
  18. def __init__(
  19. self,
  20. future: "Future[_R]",
  21. fn: Callable[_P, _R],
  22. *args: _P.args,
  23. **kwargs: _P.kwargs,
  24. ):
  25. self.future = future
  26. self.fn = fn
  27. self.args = args
  28. self.kwargs = kwargs
  29. def run(self) -> None:
  30. __traceback_hide__ = True # noqa: F841
  31. if not self.future.set_running_or_notify_cancel():
  32. return
  33. try:
  34. result = self.fn(*self.args, **self.kwargs)
  35. except BaseException as exc:
  36. self.future.set_exception(exc)
  37. # Break a reference cycle with the exception 'exc'
  38. self = None # type: ignore[assignment]
  39. else:
  40. self.future.set_result(result)
  41. class CurrentThreadExecutor(Executor):
  42. """
  43. An Executor that actually runs code in the thread it is instantiated in.
  44. Passed to other threads running async code, so they can run sync code in
  45. the thread they came from.
  46. """
  47. def __init__(self) -> None:
  48. self._work_thread = threading.current_thread()
  49. self._work_queue: queue.Queue[Union[_WorkItem, "Future[Any]"]] = queue.Queue()
  50. self._broken = False
  51. def run_until_future(self, future: "Future[Any]") -> None:
  52. """
  53. Runs the code in the work queue until a result is available from the future.
  54. Should be run from the thread the executor is initialised in.
  55. """
  56. # Check we're in the right thread
  57. if threading.current_thread() != self._work_thread:
  58. raise RuntimeError(
  59. "You cannot run CurrentThreadExecutor from a different thread"
  60. )
  61. future.add_done_callback(self._work_queue.put)
  62. # Keep getting and running work items until we get the future we're waiting for
  63. # back via the future's done callback.
  64. try:
  65. while True:
  66. # Get a work item and run it
  67. work_item = self._work_queue.get()
  68. if work_item is future:
  69. return
  70. assert isinstance(work_item, _WorkItem)
  71. work_item.run()
  72. del work_item
  73. finally:
  74. self._broken = True
  75. def _submit(
  76. self,
  77. fn: Callable[_P, _R],
  78. *args: _P.args,
  79. **kwargs: _P.kwargs,
  80. ) -> "Future[_R]":
  81. # Check they're not submitting from the same thread
  82. if threading.current_thread() == self._work_thread:
  83. raise RuntimeError(
  84. "You cannot submit onto CurrentThreadExecutor from its own thread"
  85. )
  86. # Check they're not too late or the executor errored
  87. if self._broken:
  88. raise RuntimeError("CurrentThreadExecutor already quit or is broken")
  89. # Add to work queue
  90. f: "Future[_R]" = Future()
  91. work_item = _WorkItem(f, fn, *args, **kwargs)
  92. self._work_queue.put(work_item)
  93. # Return the future
  94. return f
  95. # Python 3.9+ has a new signature for submit with a "/" after `fn`, to enforce
  96. # it to be a positional argument. If we ignore[override] mypy on 3.9+ will be
  97. # happy but 3.7/3.8 will say that the ignore comment is unused, even when
  98. # defining them differently based on sys.version_info.
  99. # We should be able to remove this when we drop support for 3.7/3.8.
  100. if not TYPE_CHECKING:
  101. def submit(self, fn, *args, **kwargs):
  102. return self._submit(fn, *args, **kwargs)