executor.py 2.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384
  1. # -*- coding: utf-8 -*-
  2. from concurrent.futures import Executor, Future, ThreadPoolExecutor
  3. from types import TracebackType
  4. from typing import Any, Callable, Dict, Optional, Tuple, Type
  5. from pyee.base import EventEmitter
  6. Self = Any
  7. __all__ = ["ExecutorEventEmitter"]
  8. class ExecutorEventEmitter(EventEmitter):
  9. """An event emitter class which runs handlers in a `concurrent.futures`
  10. executor.
  11. By default, this class creates a default `ThreadPoolExecutor`, but
  12. a custom executor may also be passed in explicitly to, for instance,
  13. use a `ProcessPoolExecutor` instead.
  14. This class runs all emitted events on the configured executor. Errors
  15. captured by the resulting Future are automatically emitted on the
  16. `error` event. This is unlike the EventEmitter, which have no error
  17. handling.
  18. The underlying executor may be shut down by calling the `shutdown`
  19. method. Alternately you can treat the event emitter as a context manager:
  20. ```py
  21. with ExecutorEventEmitter() as ee:
  22. # Underlying executor open
  23. @ee.on('data')
  24. def handler(data):
  25. print(data)
  26. ee.emit('event')
  27. # Underlying executor closed
  28. ```
  29. Since the function call is scheduled on an executor, emit is always
  30. non-blocking.
  31. No effort is made to ensure thread safety, beyond using an executor.
  32. """
  33. def __init__(self: Self, executor: Optional[Executor] = None) -> None:
  34. super(ExecutorEventEmitter, self).__init__()
  35. if executor:
  36. self._executor: Executor = executor
  37. else:
  38. self._executor = ThreadPoolExecutor()
  39. def _emit_run(
  40. self: Self,
  41. f: Callable,
  42. args: Tuple[Any, ...],
  43. kwargs: Dict[str, Any],
  44. ) -> None:
  45. future: Future = self._executor.submit(f, *args, **kwargs)
  46. @future.add_done_callback
  47. def _callback(f: Future) -> None:
  48. exc: Optional[BaseException] = f.exception()
  49. if isinstance(exc, Exception):
  50. self.emit("error", exc)
  51. elif exc is not None:
  52. raise exc
  53. def shutdown(self: Self, wait: bool = True) -> None:
  54. """Call `shutdown` on the internal executor."""
  55. self._executor.shutdown(wait=wait)
  56. def __enter__(self: Self) -> "ExecutorEventEmitter":
  57. return self
  58. def __exit__(
  59. self: Self, type: Type[Exception], value: Exception, traceback: TracebackType
  60. ) -> Optional[bool]:
  61. self.shutdown()
  62. return None