asyncio.py 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179
  1. # -*- coding: utf-8 -*-
  2. from asyncio import AbstractEventLoop, ensure_future, Future, iscoroutine, wait
  3. from typing import Any, Callable, cast, Dict, Optional, Set, Tuple
  4. from pyee.base import EventEmitter
  5. Self = Any
  6. __all__ = ["AsyncIOEventEmitter"]
  7. class AsyncIOEventEmitter(EventEmitter):
  8. """An event emitter class which can run asyncio coroutines in addition to
  9. synchronous blocking functions. For example:
  10. ```py
  11. @ee.on('event')
  12. async def async_handler(*args, **kwargs):
  13. await returns_a_future()
  14. ```
  15. On emit, the event emitter will automatically schedule the coroutine using
  16. `asyncio.ensure_future` and the configured event loop (defaults to
  17. `asyncio.get_event_loop()`).
  18. Unlike the case with the EventEmitter, all exceptions raised by
  19. event handlers are automatically emitted on the `error` event. This is
  20. important for asyncio coroutines specifically but is also handled for
  21. synchronous functions for consistency.
  22. When `loop` is specified, the supplied event loop will be used when
  23. scheduling work with `ensure_future`. Otherwise, the default asyncio
  24. event loop is used.
  25. For asyncio coroutine event handlers, calling emit is non-blocking.
  26. In other words, you do not have to await any results from emit, and the
  27. coroutine is scheduled in a fire-and-forget fashion.
  28. """
  29. def __init__(self: Self, loop: Optional[AbstractEventLoop] = None) -> None:
  30. super(AsyncIOEventEmitter, self).__init__()
  31. self._loop: Optional[AbstractEventLoop] = loop
  32. self._waiting: Set[Future] = set()
  33. def emit(
  34. self: Self,
  35. event: str,
  36. *args: Any,
  37. **kwargs: Any,
  38. ) -> bool:
  39. """Emit `event`, passing `*args` and `**kwargs` to each attached
  40. function or coroutine. Returns `True` if any functions are attached to
  41. `event`; otherwise returns `False`.
  42. Example:
  43. ```py
  44. ee.emit('data', '00101001')
  45. ```
  46. Assuming `data` is an attached function, this will call
  47. `data('00101001')'`.
  48. When executing coroutine handlers, their respective futures will be
  49. stored in a "waiting" state. These futures may be waited on or
  50. canceled with `wait_for_complete` and `cancel`, respectively; and
  51. their status may be checked via the `complete` property.
  52. """
  53. return super().emit(event, *args, **kwargs)
  54. def _emit_run(
  55. self: Self,
  56. f: Callable,
  57. args: Tuple[Any, ...],
  58. kwargs: Dict[str, Any],
  59. ) -> None:
  60. try:
  61. coro: Any = f(*args, **kwargs)
  62. except Exception as exc:
  63. self.emit("error", exc)
  64. else:
  65. if iscoroutine(coro):
  66. if self._loop:
  67. # ensure_future is *extremely* cranky about the types here,
  68. # but this is relatively well-tested and I think the types
  69. # are more strict than they should be
  70. fut: Any = ensure_future(cast(Any, coro), loop=self._loop)
  71. else:
  72. fut = ensure_future(cast(Any, coro))
  73. elif isinstance(coro, Future):
  74. fut = cast(Any, coro)
  75. else:
  76. return
  77. def callback(f: Future) -> None:
  78. self._waiting.discard(f)
  79. if f.cancelled():
  80. return
  81. exc: Optional[BaseException] = f.exception()
  82. if exc:
  83. self.emit("error", exc)
  84. fut.add_done_callback(callback)
  85. self._waiting.add(fut)
  86. async def wait_for_complete(self: Self) -> None:
  87. """Waits for all pending tasks to complete. For example:
  88. ```py
  89. @ee.on('event')
  90. async def async_handler(*args, **kwargs):
  91. await returns_a_future()
  92. # Triggers execution of async_handler
  93. ee.emit('data', '00101001')
  94. await ee.wait_for_complete()
  95. # async_handler has completed execution
  96. ```
  97. This is useful if you're attempting a graceful shutdown of your
  98. application and want to ensure all coroutines have completed execution
  99. beforehand.
  100. """
  101. if self._waiting:
  102. await wait(self._waiting)
  103. def cancel(self: Self) -> None:
  104. """Cancel all pending tasks. For example:
  105. ```py
  106. @ee.on('event')
  107. async def async_handler(*args, **kwargs):
  108. await returns_a_future()
  109. # Triggers execution of async_handler
  110. ee.emit('data', '00101001')
  111. ee.cancel()
  112. # async_handler execution has been canceled
  113. ```
  114. This is useful if you're attempting to shut down your application and
  115. attempts at a graceful shutdown via `wait_for_complete` have failed.
  116. """
  117. for fut in self._waiting:
  118. if not fut.done() and not fut.cancelled():
  119. fut.cancel()
  120. self._waiting.clear()
  121. @property
  122. def complete(self: Self) -> bool:
  123. """When true, there are no pending tasks, and execution is complete.
  124. For example:
  125. ```py
  126. @ee.on('event')
  127. async def async_handler(*args, **kwargs):
  128. await returns_a_future()
  129. # Triggers execution of async_handler
  130. ee.emit('data', '00101001')
  131. # async_handler is still running, so this prints False
  132. print(ee.complete)
  133. await ee.wait_for_complete()
  134. # async_handler has completed execution, so this prints True
  135. print(ee.complete)
  136. ```
  137. """
  138. return not self._waiting