| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179 |
- # -*- coding: utf-8 -*-
- from asyncio import AbstractEventLoop, ensure_future, Future, iscoroutine, wait
- from typing import Any, Callable, cast, Dict, Optional, Set, Tuple
- from pyee.base import EventEmitter
- Self = Any
- __all__ = ["AsyncIOEventEmitter"]
- class AsyncIOEventEmitter(EventEmitter):
- """An event emitter class which can run asyncio coroutines in addition to
- synchronous blocking functions. For example:
- ```py
- @ee.on('event')
- async def async_handler(*args, **kwargs):
- await returns_a_future()
- ```
- On emit, the event emitter will automatically schedule the coroutine using
- `asyncio.ensure_future` and the configured event loop (defaults to
- `asyncio.get_event_loop()`).
- Unlike the case with the EventEmitter, all exceptions raised by
- event handlers are automatically emitted on the `error` event. This is
- important for asyncio coroutines specifically but is also handled for
- synchronous functions for consistency.
- When `loop` is specified, the supplied event loop will be used when
- scheduling work with `ensure_future`. Otherwise, the default asyncio
- event loop is used.
- For asyncio coroutine event handlers, calling emit is non-blocking.
- In other words, you do not have to await any results from emit, and the
- coroutine is scheduled in a fire-and-forget fashion.
- """
- def __init__(self: Self, loop: Optional[AbstractEventLoop] = None) -> None:
- super(AsyncIOEventEmitter, self).__init__()
- self._loop: Optional[AbstractEventLoop] = loop
- self._waiting: Set[Future] = set()
- def emit(
- self: Self,
- event: str,
- *args: Any,
- **kwargs: Any,
- ) -> bool:
- """Emit `event`, passing `*args` and `**kwargs` to each attached
- function or coroutine. Returns `True` if any functions are attached to
- `event`; otherwise returns `False`.
- Example:
- ```py
- ee.emit('data', '00101001')
- ```
- Assuming `data` is an attached function, this will call
- `data('00101001')'`.
- When executing coroutine handlers, their respective futures will be
- stored in a "waiting" state. These futures may be waited on or
- canceled with `wait_for_complete` and `cancel`, respectively; and
- their status may be checked via the `complete` property.
- """
- return super().emit(event, *args, **kwargs)
- def _emit_run(
- self: Self,
- f: Callable,
- args: Tuple[Any, ...],
- kwargs: Dict[str, Any],
- ) -> None:
- try:
- coro: Any = f(*args, **kwargs)
- except Exception as exc:
- self.emit("error", exc)
- else:
- if iscoroutine(coro):
- if self._loop:
- # ensure_future is *extremely* cranky about the types here,
- # but this is relatively well-tested and I think the types
- # are more strict than they should be
- fut: Any = ensure_future(cast(Any, coro), loop=self._loop)
- else:
- fut = ensure_future(cast(Any, coro))
- elif isinstance(coro, Future):
- fut = cast(Any, coro)
- else:
- return
- def callback(f: Future) -> None:
- self._waiting.discard(f)
- if f.cancelled():
- return
- exc: Optional[BaseException] = f.exception()
- if exc:
- self.emit("error", exc)
- fut.add_done_callback(callback)
- self._waiting.add(fut)
- async def wait_for_complete(self: Self) -> None:
- """Waits for all pending tasks to complete. For example:
- ```py
- @ee.on('event')
- async def async_handler(*args, **kwargs):
- await returns_a_future()
- # Triggers execution of async_handler
- ee.emit('data', '00101001')
- await ee.wait_for_complete()
- # async_handler has completed execution
- ```
- This is useful if you're attempting a graceful shutdown of your
- application and want to ensure all coroutines have completed execution
- beforehand.
- """
- if self._waiting:
- await wait(self._waiting)
- def cancel(self: Self) -> None:
- """Cancel all pending tasks. For example:
- ```py
- @ee.on('event')
- async def async_handler(*args, **kwargs):
- await returns_a_future()
- # Triggers execution of async_handler
- ee.emit('data', '00101001')
- ee.cancel()
- # async_handler execution has been canceled
- ```
- This is useful if you're attempting to shut down your application and
- attempts at a graceful shutdown via `wait_for_complete` have failed.
- """
- for fut in self._waiting:
- if not fut.done() and not fut.cancelled():
- fut.cancel()
- self._waiting.clear()
- @property
- def complete(self: Self) -> bool:
- """When true, there are no pending tasks, and execution is complete.
- For example:
- ```py
- @ee.on('event')
- async def async_handler(*args, **kwargs):
- await returns_a_future()
- # Triggers execution of async_handler
- ee.emit('data', '00101001')
- # async_handler is still running, so this prints False
- print(ee.complete)
- await ee.wait_for_complete()
- # async_handler has completed execution, so this prints True
- print(ee.complete)
- ```
- """
- return not self._waiting
|