twisted.py 2.9 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495
  1. # -*- coding: utf-8 -*-
  2. from asyncio import iscoroutine
  3. from typing import Any, Callable, cast, Dict, Optional, Tuple
  4. from twisted.internet.defer import Deferred, ensureDeferred
  5. from twisted.python.failure import Failure
  6. from pyee.base import EventEmitter, PyeeError
  7. Self = Any
  8. __all__ = ["TwistedEventEmitter"]
  9. class TwistedEventEmitter(EventEmitter):
  10. """An event emitter class which can run twisted coroutines and handle
  11. returned Deferreds, in addition to synchronous blocking functions. For
  12. example:
  13. ```py
  14. @ee.on('event')
  15. @inlineCallbacks
  16. def async_handler(*args, **kwargs):
  17. yield returns_a_deferred()
  18. ```
  19. or:
  20. ```py
  21. @ee.on('event')
  22. async def async_handler(*args, **kwargs):
  23. await returns_a_deferred()
  24. ```
  25. When async handlers fail, Failures are first emitted on the `failure`
  26. event. If there are no `failure` handlers, the Failure's associated
  27. exception is then emitted on the `error` event. If there are no `error`
  28. handlers, the exception is raised. For consistency, when handlers raise
  29. errors synchronously, they're captured, wrapped in a Failure and treated
  30. as an async failure. This is unlike the behavior of EventEmitter,
  31. which have no special error handling.
  32. For twisted coroutine event handlers, calling emit is non-blocking.
  33. In other words, you do not have to await any results from emit, and the
  34. coroutine is scheduled in a fire-and-forget fashion.
  35. Similar behavior occurs for "sync" functions which return Deferreds.
  36. """
  37. def __init__(self: Self) -> None:
  38. super(TwistedEventEmitter, self).__init__()
  39. def _emit_run(
  40. self: Self,
  41. f: Callable,
  42. args: Tuple[Any, ...],
  43. kwargs: Dict[str, Any],
  44. ) -> None:
  45. d: Optional[Deferred[Any]] = None
  46. try:
  47. result = f(*args, **kwargs)
  48. except Exception:
  49. self.emit("failure", Failure())
  50. else:
  51. if iscoroutine(result):
  52. d = ensureDeferred(result)
  53. elif isinstance(result, Deferred):
  54. d = result
  55. elif not d:
  56. return
  57. def errback(failure: Failure) -> None:
  58. if failure:
  59. self.emit("failure", failure)
  60. d.addErrback(errback)
  61. def _emit_handle_potential_error(self: Self, event: str, error: Any) -> None:
  62. if event == "failure":
  63. if isinstance(error, Failure):
  64. try:
  65. error.raiseException()
  66. except Exception as exc:
  67. self.emit("error", exc)
  68. elif isinstance(error, Exception):
  69. self.emit("error", error)
  70. else:
  71. self.emit("error", PyeeError(f"Unexpected failure object: {error}"))
  72. else:
  73. cast(Any, super(TwistedEventEmitter, self))._emit_handle_potential_error(
  74. event, error
  75. )