streams.py 23 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758
  1. import asyncio
  2. import collections
  3. import warnings
  4. from typing import (
  5. Awaitable,
  6. Callable,
  7. Deque,
  8. Final,
  9. Generic,
  10. List,
  11. Optional,
  12. Tuple,
  13. TypeVar,
  14. )
  15. from .base_protocol import BaseProtocol
  16. from .helpers import (
  17. _EXC_SENTINEL,
  18. BaseTimerContext,
  19. TimerNoop,
  20. set_exception,
  21. set_result,
  22. )
  23. from .log import internal_logger
  24. __all__ = (
  25. "EMPTY_PAYLOAD",
  26. "EofStream",
  27. "StreamReader",
  28. "DataQueue",
  29. )
  30. _T = TypeVar("_T")
  31. class EofStream(Exception):
  32. """eof stream indication."""
  33. class AsyncStreamIterator(Generic[_T]):
  34. __slots__ = ("read_func",)
  35. def __init__(self, read_func: Callable[[], Awaitable[_T]]) -> None:
  36. self.read_func = read_func
  37. def __aiter__(self) -> "AsyncStreamIterator[_T]":
  38. return self
  39. async def __anext__(self) -> _T:
  40. try:
  41. rv = await self.read_func()
  42. except EofStream:
  43. raise StopAsyncIteration
  44. if rv == b"":
  45. raise StopAsyncIteration
  46. return rv
  47. class ChunkTupleAsyncStreamIterator:
  48. __slots__ = ("_stream",)
  49. def __init__(self, stream: "StreamReader") -> None:
  50. self._stream = stream
  51. def __aiter__(self) -> "ChunkTupleAsyncStreamIterator":
  52. return self
  53. async def __anext__(self) -> Tuple[bytes, bool]:
  54. rv = await self._stream.readchunk()
  55. if rv == (b"", False):
  56. raise StopAsyncIteration
  57. return rv
  58. class AsyncStreamReaderMixin:
  59. __slots__ = ()
  60. def __aiter__(self) -> AsyncStreamIterator[bytes]:
  61. return AsyncStreamIterator(self.readline) # type: ignore[attr-defined]
  62. def iter_chunked(self, n: int) -> AsyncStreamIterator[bytes]:
  63. """Returns an asynchronous iterator that yields chunks of size n."""
  64. return AsyncStreamIterator(lambda: self.read(n)) # type: ignore[attr-defined]
  65. def iter_any(self) -> AsyncStreamIterator[bytes]:
  66. """Yield all available data as soon as it is received."""
  67. return AsyncStreamIterator(self.readany) # type: ignore[attr-defined]
  68. def iter_chunks(self) -> ChunkTupleAsyncStreamIterator:
  69. """Yield chunks of data as they are received by the server.
  70. The yielded objects are tuples
  71. of (bytes, bool) as returned by the StreamReader.readchunk method.
  72. """
  73. return ChunkTupleAsyncStreamIterator(self) # type: ignore[arg-type]
  74. class StreamReader(AsyncStreamReaderMixin):
  75. """An enhancement of asyncio.StreamReader.
  76. Supports asynchronous iteration by line, chunk or as available::
  77. async for line in reader:
  78. ...
  79. async for chunk in reader.iter_chunked(1024):
  80. ...
  81. async for slice in reader.iter_any():
  82. ...
  83. """
  84. __slots__ = (
  85. "_protocol",
  86. "_low_water",
  87. "_high_water",
  88. "_low_water_chunks",
  89. "_high_water_chunks",
  90. "_loop",
  91. "_size",
  92. "_cursor",
  93. "_http_chunk_splits",
  94. "_buffer",
  95. "_buffer_offset",
  96. "_eof",
  97. "_waiter",
  98. "_eof_waiter",
  99. "_exception",
  100. "_timer",
  101. "_eof_callbacks",
  102. "_eof_counter",
  103. "total_bytes",
  104. "total_compressed_bytes",
  105. )
  106. def __init__(
  107. self,
  108. protocol: BaseProtocol,
  109. limit: int,
  110. *,
  111. timer: Optional[BaseTimerContext] = None,
  112. loop: Optional[asyncio.AbstractEventLoop] = None,
  113. ) -> None:
  114. self._protocol = protocol
  115. self._low_water = limit
  116. self._high_water = limit * 2
  117. if loop is None:
  118. loop = asyncio.get_event_loop()
  119. # Ensure high_water_chunks >= 3 so it's always > low_water_chunks.
  120. self._high_water_chunks = max(3, limit // 4)
  121. # Use max(2, ...) because there's always at least 1 chunk split remaining
  122. # (the current position), so we need low_water >= 2 to allow resume.
  123. self._low_water_chunks = max(2, self._high_water_chunks // 2)
  124. self._loop = loop
  125. self._size = 0
  126. self._cursor = 0
  127. self._http_chunk_splits: Optional[Deque[int]] = None
  128. self._buffer: Deque[bytes] = collections.deque()
  129. self._buffer_offset = 0
  130. self._eof = False
  131. self._waiter: Optional[asyncio.Future[None]] = None
  132. self._eof_waiter: Optional[asyncio.Future[None]] = None
  133. self._exception: Optional[BaseException] = None
  134. self._timer = TimerNoop() if timer is None else timer
  135. self._eof_callbacks: List[Callable[[], None]] = []
  136. self._eof_counter = 0
  137. self.total_bytes = 0
  138. self.total_compressed_bytes: Optional[int] = None
  139. def __repr__(self) -> str:
  140. info = [self.__class__.__name__]
  141. if self._size:
  142. info.append("%d bytes" % self._size)
  143. if self._eof:
  144. info.append("eof")
  145. if self._low_water != 2**16: # default limit
  146. info.append("low=%d high=%d" % (self._low_water, self._high_water))
  147. if self._waiter:
  148. info.append("w=%r" % self._waiter)
  149. if self._exception:
  150. info.append("e=%r" % self._exception)
  151. return "<%s>" % " ".join(info)
  152. def get_read_buffer_limits(self) -> Tuple[int, int]:
  153. return (self._low_water, self._high_water)
  154. def exception(self) -> Optional[BaseException]:
  155. return self._exception
  156. def set_exception(
  157. self,
  158. exc: BaseException,
  159. exc_cause: BaseException = _EXC_SENTINEL,
  160. ) -> None:
  161. self._exception = exc
  162. self._eof_callbacks.clear()
  163. waiter = self._waiter
  164. if waiter is not None:
  165. self._waiter = None
  166. set_exception(waiter, exc, exc_cause)
  167. waiter = self._eof_waiter
  168. if waiter is not None:
  169. self._eof_waiter = None
  170. set_exception(waiter, exc, exc_cause)
  171. def on_eof(self, callback: Callable[[], None]) -> None:
  172. if self._eof:
  173. try:
  174. callback()
  175. except Exception:
  176. internal_logger.exception("Exception in eof callback")
  177. else:
  178. self._eof_callbacks.append(callback)
  179. def feed_eof(self) -> None:
  180. self._eof = True
  181. waiter = self._waiter
  182. if waiter is not None:
  183. self._waiter = None
  184. set_result(waiter, None)
  185. waiter = self._eof_waiter
  186. if waiter is not None:
  187. self._eof_waiter = None
  188. set_result(waiter, None)
  189. if self._protocol._reading_paused:
  190. self._protocol.resume_reading()
  191. for cb in self._eof_callbacks:
  192. try:
  193. cb()
  194. except Exception:
  195. internal_logger.exception("Exception in eof callback")
  196. self._eof_callbacks.clear()
  197. def is_eof(self) -> bool:
  198. """Return True if 'feed_eof' was called."""
  199. return self._eof
  200. def at_eof(self) -> bool:
  201. """Return True if the buffer is empty and 'feed_eof' was called."""
  202. return self._eof and not self._buffer
  203. async def wait_eof(self) -> None:
  204. if self._eof:
  205. return
  206. assert self._eof_waiter is None
  207. self._eof_waiter = self._loop.create_future()
  208. try:
  209. await self._eof_waiter
  210. finally:
  211. self._eof_waiter = None
  212. @property
  213. def total_raw_bytes(self) -> int:
  214. if self.total_compressed_bytes is None:
  215. return self.total_bytes
  216. return self.total_compressed_bytes
  217. def unread_data(self, data: bytes) -> None:
  218. """rollback reading some data from stream, inserting it to buffer head."""
  219. warnings.warn(
  220. "unread_data() is deprecated "
  221. "and will be removed in future releases (#3260)",
  222. DeprecationWarning,
  223. stacklevel=2,
  224. )
  225. if not data:
  226. return
  227. if self._buffer_offset:
  228. self._buffer[0] = self._buffer[0][self._buffer_offset :]
  229. self._buffer_offset = 0
  230. self._size += len(data)
  231. self._cursor -= len(data)
  232. self._buffer.appendleft(data)
  233. self._eof_counter = 0
  234. # TODO: size is ignored, remove the param later
  235. def feed_data(self, data: bytes, size: int = 0) -> None:
  236. assert not self._eof, "feed_data after feed_eof"
  237. if not data:
  238. return
  239. data_len = len(data)
  240. self._size += data_len
  241. self._buffer.append(data)
  242. self.total_bytes += data_len
  243. waiter = self._waiter
  244. if waiter is not None:
  245. self._waiter = None
  246. set_result(waiter, None)
  247. if self._size > self._high_water and not self._protocol._reading_paused:
  248. self._protocol.pause_reading()
  249. def begin_http_chunk_receiving(self) -> None:
  250. if self._http_chunk_splits is None:
  251. if self.total_bytes:
  252. raise RuntimeError(
  253. "Called begin_http_chunk_receiving when some data was already fed"
  254. )
  255. self._http_chunk_splits = collections.deque()
  256. def end_http_chunk_receiving(self) -> None:
  257. if self._http_chunk_splits is None:
  258. raise RuntimeError(
  259. "Called end_chunk_receiving without calling "
  260. "begin_chunk_receiving first"
  261. )
  262. # self._http_chunk_splits contains logical byte offsets from start of
  263. # the body transfer. Each offset is the offset of the end of a chunk.
  264. # "Logical" means bytes, accessible for a user.
  265. # If no chunks containing logical data were received, current position
  266. # is difinitely zero.
  267. pos = self._http_chunk_splits[-1] if self._http_chunk_splits else 0
  268. if self.total_bytes == pos:
  269. # We should not add empty chunks here. So we check for that.
  270. # Note, when chunked + gzip is used, we can receive a chunk
  271. # of compressed data, but that data may not be enough for gzip FSM
  272. # to yield any uncompressed data. That's why current position may
  273. # not change after receiving a chunk.
  274. return
  275. self._http_chunk_splits.append(self.total_bytes)
  276. # If we get too many small chunks before self._high_water is reached, then any
  277. # .read() call becomes computationally expensive, and could block the event loop
  278. # for too long, hence an additional self._high_water_chunks here.
  279. if (
  280. len(self._http_chunk_splits) > self._high_water_chunks
  281. and not self._protocol._reading_paused
  282. ):
  283. self._protocol.pause_reading()
  284. # wake up readchunk when end of http chunk received
  285. waiter = self._waiter
  286. if waiter is not None:
  287. self._waiter = None
  288. set_result(waiter, None)
  289. async def _wait(self, func_name: str) -> None:
  290. if not self._protocol.connected:
  291. raise RuntimeError("Connection closed.")
  292. # StreamReader uses a future to link the protocol feed_data() method
  293. # to a read coroutine. Running two read coroutines at the same time
  294. # would have an unexpected behaviour. It would not possible to know
  295. # which coroutine would get the next data.
  296. if self._waiter is not None:
  297. raise RuntimeError(
  298. "%s() called while another coroutine is "
  299. "already waiting for incoming data" % func_name
  300. )
  301. waiter = self._waiter = self._loop.create_future()
  302. try:
  303. with self._timer:
  304. await waiter
  305. finally:
  306. self._waiter = None
  307. async def readline(self) -> bytes:
  308. return await self.readuntil()
  309. async def readuntil(self, separator: bytes = b"\n") -> bytes:
  310. seplen = len(separator)
  311. if seplen == 0:
  312. raise ValueError("Separator should be at least one-byte string")
  313. if self._exception is not None:
  314. raise self._exception
  315. chunk = b""
  316. chunk_size = 0
  317. not_enough = True
  318. while not_enough:
  319. while self._buffer and not_enough:
  320. offset = self._buffer_offset
  321. ichar = self._buffer[0].find(separator, offset) + 1
  322. # Read from current offset to found separator or to the end.
  323. data = self._read_nowait_chunk(
  324. ichar - offset + seplen - 1 if ichar else -1
  325. )
  326. chunk += data
  327. chunk_size += len(data)
  328. if ichar:
  329. not_enough = False
  330. if chunk_size > self._high_water:
  331. raise ValueError("Chunk too big")
  332. if self._eof:
  333. break
  334. if not_enough:
  335. await self._wait("readuntil")
  336. return chunk
  337. async def read(self, n: int = -1) -> bytes:
  338. if self._exception is not None:
  339. raise self._exception
  340. # migration problem; with DataQueue you have to catch
  341. # EofStream exception, so common way is to run payload.read() inside
  342. # infinite loop. what can cause real infinite loop with StreamReader
  343. # lets keep this code one major release.
  344. if __debug__:
  345. if self._eof and not self._buffer:
  346. self._eof_counter = getattr(self, "_eof_counter", 0) + 1
  347. if self._eof_counter > 5:
  348. internal_logger.warning(
  349. "Multiple access to StreamReader in eof state, "
  350. "might be infinite loop.",
  351. stack_info=True,
  352. )
  353. if not n:
  354. return b""
  355. if n < 0:
  356. # This used to just loop creating a new waiter hoping to
  357. # collect everything in self._buffer, but that would
  358. # deadlock if the subprocess sends more than self.limit
  359. # bytes. So just call self.readany() until EOF.
  360. blocks = []
  361. while True:
  362. block = await self.readany()
  363. if not block:
  364. break
  365. blocks.append(block)
  366. return b"".join(blocks)
  367. # TODO: should be `if` instead of `while`
  368. # because waiter maybe triggered on chunk end,
  369. # without feeding any data
  370. while not self._buffer and not self._eof:
  371. await self._wait("read")
  372. return self._read_nowait(n)
  373. async def readany(self) -> bytes:
  374. if self._exception is not None:
  375. raise self._exception
  376. # TODO: should be `if` instead of `while`
  377. # because waiter maybe triggered on chunk end,
  378. # without feeding any data
  379. while not self._buffer and not self._eof:
  380. await self._wait("readany")
  381. return self._read_nowait(-1)
  382. async def readchunk(self) -> Tuple[bytes, bool]:
  383. """Returns a tuple of (data, end_of_http_chunk).
  384. When chunked transfer
  385. encoding is used, end_of_http_chunk is a boolean indicating if the end
  386. of the data corresponds to the end of a HTTP chunk , otherwise it is
  387. always False.
  388. """
  389. while True:
  390. if self._exception is not None:
  391. raise self._exception
  392. while self._http_chunk_splits:
  393. pos = self._http_chunk_splits.popleft()
  394. if pos == self._cursor:
  395. return (b"", True)
  396. if pos > self._cursor:
  397. return (self._read_nowait(pos - self._cursor), True)
  398. internal_logger.warning(
  399. "Skipping HTTP chunk end due to data "
  400. "consumption beyond chunk boundary"
  401. )
  402. if self._buffer:
  403. return (self._read_nowait_chunk(-1), False)
  404. # return (self._read_nowait(-1), False)
  405. if self._eof:
  406. # Special case for signifying EOF.
  407. # (b'', True) is not a final return value actually.
  408. return (b"", False)
  409. await self._wait("readchunk")
  410. async def readexactly(self, n: int) -> bytes:
  411. if self._exception is not None:
  412. raise self._exception
  413. blocks: List[bytes] = []
  414. while n > 0:
  415. block = await self.read(n)
  416. if not block:
  417. partial = b"".join(blocks)
  418. raise asyncio.IncompleteReadError(partial, len(partial) + n)
  419. blocks.append(block)
  420. n -= len(block)
  421. return b"".join(blocks)
  422. def read_nowait(self, n: int = -1) -> bytes:
  423. # default was changed to be consistent with .read(-1)
  424. #
  425. # I believe the most users don't know about the method and
  426. # they are not affected.
  427. if self._exception is not None:
  428. raise self._exception
  429. if self._waiter and not self._waiter.done():
  430. raise RuntimeError(
  431. "Called while some coroutine is waiting for incoming data."
  432. )
  433. return self._read_nowait(n)
  434. def _read_nowait_chunk(self, n: int) -> bytes:
  435. first_buffer = self._buffer[0]
  436. offset = self._buffer_offset
  437. if n != -1 and len(first_buffer) - offset > n:
  438. data = first_buffer[offset : offset + n]
  439. self._buffer_offset += n
  440. elif offset:
  441. self._buffer.popleft()
  442. data = first_buffer[offset:]
  443. self._buffer_offset = 0
  444. else:
  445. data = self._buffer.popleft()
  446. data_len = len(data)
  447. self._size -= data_len
  448. self._cursor += data_len
  449. chunk_splits = self._http_chunk_splits
  450. # Prevent memory leak: drop useless chunk splits
  451. while chunk_splits and chunk_splits[0] < self._cursor:
  452. chunk_splits.popleft()
  453. if (
  454. self._protocol._reading_paused
  455. and self._size < self._low_water
  456. and (
  457. self._http_chunk_splits is None
  458. or len(self._http_chunk_splits) < self._low_water_chunks
  459. )
  460. ):
  461. self._protocol.resume_reading()
  462. return data
  463. def _read_nowait(self, n: int) -> bytes:
  464. """Read not more than n bytes, or whole buffer if n == -1"""
  465. self._timer.assert_timeout()
  466. chunks = []
  467. while self._buffer:
  468. chunk = self._read_nowait_chunk(n)
  469. chunks.append(chunk)
  470. if n != -1:
  471. n -= len(chunk)
  472. if n == 0:
  473. break
  474. return b"".join(chunks) if chunks else b""
  475. class EmptyStreamReader(StreamReader): # lgtm [py/missing-call-to-init]
  476. __slots__ = ("_read_eof_chunk",)
  477. def __init__(self) -> None:
  478. self._read_eof_chunk = False
  479. self.total_bytes = 0
  480. def __repr__(self) -> str:
  481. return "<%s>" % self.__class__.__name__
  482. def exception(self) -> Optional[BaseException]:
  483. return None
  484. def set_exception(
  485. self,
  486. exc: BaseException,
  487. exc_cause: BaseException = _EXC_SENTINEL,
  488. ) -> None:
  489. pass
  490. def on_eof(self, callback: Callable[[], None]) -> None:
  491. try:
  492. callback()
  493. except Exception:
  494. internal_logger.exception("Exception in eof callback")
  495. def feed_eof(self) -> None:
  496. pass
  497. def is_eof(self) -> bool:
  498. return True
  499. def at_eof(self) -> bool:
  500. return True
  501. async def wait_eof(self) -> None:
  502. return
  503. def feed_data(self, data: bytes, n: int = 0) -> None:
  504. pass
  505. async def readline(self) -> bytes:
  506. return b""
  507. async def read(self, n: int = -1) -> bytes:
  508. return b""
  509. # TODO add async def readuntil
  510. async def readany(self) -> bytes:
  511. return b""
  512. async def readchunk(self) -> Tuple[bytes, bool]:
  513. if not self._read_eof_chunk:
  514. self._read_eof_chunk = True
  515. return (b"", False)
  516. return (b"", True)
  517. async def readexactly(self, n: int) -> bytes:
  518. raise asyncio.IncompleteReadError(b"", n)
  519. def read_nowait(self, n: int = -1) -> bytes:
  520. return b""
  521. EMPTY_PAYLOAD: Final[StreamReader] = EmptyStreamReader()
  522. class DataQueue(Generic[_T]):
  523. """DataQueue is a general-purpose blocking queue with one reader."""
  524. def __init__(self, loop: asyncio.AbstractEventLoop) -> None:
  525. self._loop = loop
  526. self._eof = False
  527. self._waiter: Optional[asyncio.Future[None]] = None
  528. self._exception: Optional[BaseException] = None
  529. self._buffer: Deque[Tuple[_T, int]] = collections.deque()
  530. def __len__(self) -> int:
  531. return len(self._buffer)
  532. def is_eof(self) -> bool:
  533. return self._eof
  534. def at_eof(self) -> bool:
  535. return self._eof and not self._buffer
  536. def exception(self) -> Optional[BaseException]:
  537. return self._exception
  538. def set_exception(
  539. self,
  540. exc: BaseException,
  541. exc_cause: BaseException = _EXC_SENTINEL,
  542. ) -> None:
  543. self._eof = True
  544. self._exception = exc
  545. if (waiter := self._waiter) is not None:
  546. self._waiter = None
  547. set_exception(waiter, exc, exc_cause)
  548. def feed_data(self, data: _T, size: int = 0) -> None:
  549. self._buffer.append((data, size))
  550. if (waiter := self._waiter) is not None:
  551. self._waiter = None
  552. set_result(waiter, None)
  553. def feed_eof(self) -> None:
  554. self._eof = True
  555. if (waiter := self._waiter) is not None:
  556. self._waiter = None
  557. set_result(waiter, None)
  558. async def read(self) -> _T:
  559. if not self._buffer and not self._eof:
  560. assert not self._waiter
  561. self._waiter = self._loop.create_future()
  562. try:
  563. await self._waiter
  564. except (asyncio.CancelledError, asyncio.TimeoutError):
  565. self._waiter = None
  566. raise
  567. if self._buffer:
  568. data, _ = self._buffer.popleft()
  569. return data
  570. if self._exception is not None:
  571. raise self._exception
  572. raise EofStream
  573. def __aiter__(self) -> AsyncStreamIterator[_T]:
  574. return AsyncStreamIterator(self.read)
  575. class FlowControlDataQueue(DataQueue[_T]):
  576. """FlowControlDataQueue resumes and pauses an underlying stream.
  577. It is a destination for parsed data.
  578. This class is deprecated and will be removed in version 4.0.
  579. """
  580. def __init__(
  581. self, protocol: BaseProtocol, limit: int, *, loop: asyncio.AbstractEventLoop
  582. ) -> None:
  583. super().__init__(loop=loop)
  584. self._size = 0
  585. self._protocol = protocol
  586. self._limit = limit * 2
  587. def feed_data(self, data: _T, size: int = 0) -> None:
  588. super().feed_data(data, size)
  589. self._size += size
  590. if self._size > self._limit and not self._protocol._reading_paused:
  591. self._protocol.pause_reading()
  592. async def read(self) -> _T:
  593. if not self._buffer and not self._eof:
  594. assert not self._waiter
  595. self._waiter = self._loop.create_future()
  596. try:
  597. await self._waiter
  598. except (asyncio.CancelledError, asyncio.TimeoutError):
  599. self._waiter = None
  600. raise
  601. if self._buffer:
  602. data, size = self._buffer.popleft()
  603. self._size -= size
  604. if self._size < self._limit and self._protocol._reading_paused:
  605. self._protocol.resume_reading()
  606. return data
  607. if self._exception is not None:
  608. raise self._exception
  609. raise EofStream