trio.py 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143
  1. # -*- coding: utf-8 -*-
  2. from contextlib import AbstractAsyncContextManager, asynccontextmanager
  3. from types import TracebackType
  4. from typing import (
  5. Any,
  6. AsyncGenerator,
  7. Awaitable,
  8. Callable,
  9. cast,
  10. Dict,
  11. Optional,
  12. Tuple,
  13. Type,
  14. )
  15. import trio
  16. from pyee.base import EventEmitter, PyeeError
  17. Self = Any
  18. __all__ = ["TrioEventEmitter"]
  19. Nursery = trio.Nursery
  20. class TrioEventEmitter(EventEmitter):
  21. """An event emitter class which can run trio tasks in a trio nursery.
  22. By default, this class will lazily create both a nursery manager (the
  23. object returned from `trio.open_nursery()` and a nursery (the object
  24. yielded by using the nursery manager as an async context manager). It is
  25. also possible to supply an existing nursery manager via the `manager`
  26. argument, or an existing nursery via the `nursery` argument.
  27. Instances of TrioEventEmitter are themselves async context managers, so
  28. that they may manage the lifecycle of the underlying trio nursery. For
  29. example, typical usage of this library may look something like this::
  30. ```py
  31. async with TrioEventEmitter() as ee:
  32. # Underlying nursery is instantiated and ready to go
  33. @ee.on('data')
  34. async def handler(data):
  35. print(data)
  36. ee.emit('event')
  37. # Underlying nursery and manager have been cleaned up
  38. ```
  39. Unlike the case with the EventEmitter, all exceptions raised by event
  40. handlers are automatically emitted on the `error` event. This is
  41. important for trio coroutines specifically but is also handled for
  42. synchronous functions for consistency.
  43. For trio coroutine event handlers, calling emit is non-blocking. In other
  44. words, you should not attempt to await emit; the coroutine is scheduled
  45. in a fire-and-forget fashion.
  46. """
  47. def __init__(
  48. self: Self,
  49. nursery: Optional[Nursery] = None,
  50. manager: Optional["AbstractAsyncContextManager[trio.Nursery]"] = None,
  51. ):
  52. super(TrioEventEmitter, self).__init__()
  53. self._nursery: Optional[Nursery] = None
  54. self._manager: Optional["AbstractAsyncContextManager[trio.Nursery]"] = None
  55. if nursery:
  56. if manager:
  57. raise PyeeError(
  58. "You may either pass a nursery or a nursery manager " "but not both"
  59. )
  60. self._nursery = nursery
  61. elif manager:
  62. self._manager = manager
  63. else:
  64. self._manager = trio.open_nursery()
  65. def _async_runner(
  66. self: Self,
  67. f: Callable,
  68. args: Tuple[Any, ...],
  69. kwargs: Dict[str, Any],
  70. ) -> Callable[[], Awaitable[None]]:
  71. async def runner() -> None:
  72. try:
  73. await f(*args, **kwargs)
  74. except Exception as exc:
  75. self.emit("error", exc)
  76. return runner
  77. def _emit_run(
  78. self: Self,
  79. f: Callable,
  80. args: Tuple[Any, ...],
  81. kwargs: Dict[str, Any],
  82. ) -> None:
  83. if not self._nursery:
  84. raise PyeeError("Uninitialized trio nursery")
  85. self._nursery.start_soon(self._async_runner(f, args, kwargs))
  86. @asynccontextmanager
  87. async def context(
  88. self: Self,
  89. ) -> AsyncGenerator["TrioEventEmitter", None]:
  90. """Returns an async contextmanager which manages the underlying
  91. nursery to the EventEmitter. The `TrioEventEmitter`'s
  92. async context management methods are implemented using this
  93. function, but it may also be used directly for clarity.
  94. """
  95. if self._nursery is not None:
  96. yield self
  97. elif self._manager is not None:
  98. async with self._manager as nursery:
  99. self._nursery = nursery
  100. yield self
  101. else:
  102. raise PyeeError("Uninitialized nursery or nursery manager")
  103. async def __aenter__(self: Self) -> "TrioEventEmitter":
  104. self._context: Optional[AbstractAsyncContextManager["TrioEventEmitter"]] = (
  105. self.context()
  106. )
  107. return await cast(Any, self._context).__aenter__()
  108. async def __aexit__(
  109. self: Self,
  110. type: Optional[Type[BaseException]],
  111. value: Optional[BaseException],
  112. traceback: Optional[TracebackType],
  113. ) -> Optional[bool]:
  114. if self._context is None:
  115. raise PyeeError("Attempting to exit uninitialized context")
  116. rv = await self._context.__aexit__(type, value, traceback)
  117. self._context = None
  118. self._nursery = None
  119. self._manager = None
  120. return rv