payload.py 40 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120
  1. import asyncio
  2. import enum
  3. import io
  4. import json
  5. import mimetypes
  6. import os
  7. import sys
  8. import warnings
  9. from abc import ABC, abstractmethod
  10. from collections.abc import Iterable
  11. from itertools import chain
  12. from typing import (
  13. IO,
  14. TYPE_CHECKING,
  15. Any,
  16. Dict,
  17. Final,
  18. List,
  19. Optional,
  20. Set,
  21. TextIO,
  22. Tuple,
  23. Type,
  24. Union,
  25. )
  26. from multidict import CIMultiDict
  27. from . import hdrs
  28. from .abc import AbstractStreamWriter
  29. from .helpers import (
  30. _SENTINEL,
  31. content_disposition_header,
  32. guess_filename,
  33. parse_mimetype,
  34. sentinel,
  35. )
  36. from .streams import StreamReader
  37. from .typedefs import JSONEncoder, _CIMultiDict
  38. __all__ = (
  39. "PAYLOAD_REGISTRY",
  40. "get_payload",
  41. "payload_type",
  42. "Payload",
  43. "BytesPayload",
  44. "StringPayload",
  45. "IOBasePayload",
  46. "BytesIOPayload",
  47. "BufferedReaderPayload",
  48. "TextIOPayload",
  49. "StringIOPayload",
  50. "JsonPayload",
  51. "AsyncIterablePayload",
  52. )
  53. TOO_LARGE_BYTES_BODY: Final[int] = 2**20 # 1 MB
  54. READ_SIZE: Final[int] = 2**16 # 64 KB
  55. _CLOSE_FUTURES: Set[asyncio.Future[None]] = set()
  56. class LookupError(Exception):
  57. """Raised when no payload factory is found for the given data type."""
  58. class Order(str, enum.Enum):
  59. normal = "normal"
  60. try_first = "try_first"
  61. try_last = "try_last"
  62. def get_payload(data: Any, *args: Any, **kwargs: Any) -> "Payload":
  63. return PAYLOAD_REGISTRY.get(data, *args, **kwargs)
  64. def register_payload(
  65. factory: Type["Payload"], type: Any, *, order: Order = Order.normal
  66. ) -> None:
  67. PAYLOAD_REGISTRY.register(factory, type, order=order)
  68. class payload_type:
  69. def __init__(self, type: Any, *, order: Order = Order.normal) -> None:
  70. self.type = type
  71. self.order = order
  72. def __call__(self, factory: Type["Payload"]) -> Type["Payload"]:
  73. register_payload(factory, self.type, order=self.order)
  74. return factory
  75. PayloadType = Type["Payload"]
  76. _PayloadRegistryItem = Tuple[PayloadType, Any]
  77. class PayloadRegistry:
  78. """Payload registry.
  79. note: we need zope.interface for more efficient adapter search
  80. """
  81. __slots__ = ("_first", "_normal", "_last", "_normal_lookup")
  82. def __init__(self) -> None:
  83. self._first: List[_PayloadRegistryItem] = []
  84. self._normal: List[_PayloadRegistryItem] = []
  85. self._last: List[_PayloadRegistryItem] = []
  86. self._normal_lookup: Dict[Any, PayloadType] = {}
  87. def get(
  88. self,
  89. data: Any,
  90. *args: Any,
  91. _CHAIN: "Type[chain[_PayloadRegistryItem]]" = chain,
  92. **kwargs: Any,
  93. ) -> "Payload":
  94. if self._first:
  95. for factory, type_ in self._first:
  96. if isinstance(data, type_):
  97. return factory(data, *args, **kwargs)
  98. # Try the fast lookup first
  99. if lookup_factory := self._normal_lookup.get(type(data)):
  100. return lookup_factory(data, *args, **kwargs)
  101. # Bail early if its already a Payload
  102. if isinstance(data, Payload):
  103. return data
  104. # Fallback to the slower linear search
  105. for factory, type_ in _CHAIN(self._normal, self._last):
  106. if isinstance(data, type_):
  107. return factory(data, *args, **kwargs)
  108. raise LookupError()
  109. def register(
  110. self, factory: PayloadType, type: Any, *, order: Order = Order.normal
  111. ) -> None:
  112. if order is Order.try_first:
  113. self._first.append((factory, type))
  114. elif order is Order.normal:
  115. self._normal.append((factory, type))
  116. if isinstance(type, Iterable):
  117. for t in type:
  118. self._normal_lookup[t] = factory
  119. else:
  120. self._normal_lookup[type] = factory
  121. elif order is Order.try_last:
  122. self._last.append((factory, type))
  123. else:
  124. raise ValueError(f"Unsupported order {order!r}")
  125. class Payload(ABC):
  126. _default_content_type: str = "application/octet-stream"
  127. _size: Optional[int] = None
  128. _consumed: bool = False # Default: payload has not been consumed yet
  129. _autoclose: bool = False # Default: assume resource needs explicit closing
  130. def __init__(
  131. self,
  132. value: Any,
  133. headers: Optional[
  134. Union[_CIMultiDict, Dict[str, str], Iterable[Tuple[str, str]]]
  135. ] = None,
  136. content_type: Union[str, None, _SENTINEL] = sentinel,
  137. filename: Optional[str] = None,
  138. encoding: Optional[str] = None,
  139. **kwargs: Any,
  140. ) -> None:
  141. self._encoding = encoding
  142. self._filename = filename
  143. self._headers: _CIMultiDict = CIMultiDict()
  144. self._value = value
  145. if content_type is not sentinel and content_type is not None:
  146. self._headers[hdrs.CONTENT_TYPE] = content_type
  147. elif self._filename is not None:
  148. if sys.version_info >= (3, 13):
  149. guesser = mimetypes.guess_file_type
  150. else:
  151. guesser = mimetypes.guess_type
  152. content_type = guesser(self._filename)[0]
  153. if content_type is None:
  154. content_type = self._default_content_type
  155. self._headers[hdrs.CONTENT_TYPE] = content_type
  156. else:
  157. self._headers[hdrs.CONTENT_TYPE] = self._default_content_type
  158. if headers:
  159. self._headers.update(headers)
  160. @property
  161. def size(self) -> Optional[int]:
  162. """Size of the payload in bytes.
  163. Returns the number of bytes that will be transmitted when the payload
  164. is written. For string payloads, this is the size after encoding to bytes,
  165. not the length of the string.
  166. """
  167. return self._size
  168. @property
  169. def filename(self) -> Optional[str]:
  170. """Filename of the payload."""
  171. return self._filename
  172. @property
  173. def headers(self) -> _CIMultiDict:
  174. """Custom item headers"""
  175. return self._headers
  176. @property
  177. def _binary_headers(self) -> bytes:
  178. return (
  179. "".join([k + ": " + v + "\r\n" for k, v in self.headers.items()]).encode(
  180. "utf-8"
  181. )
  182. + b"\r\n"
  183. )
  184. @property
  185. def encoding(self) -> Optional[str]:
  186. """Payload encoding"""
  187. return self._encoding
  188. @property
  189. def content_type(self) -> str:
  190. """Content type"""
  191. return self._headers[hdrs.CONTENT_TYPE]
  192. @property
  193. def consumed(self) -> bool:
  194. """Whether the payload has been consumed and cannot be reused."""
  195. return self._consumed
  196. @property
  197. def autoclose(self) -> bool:
  198. """
  199. Whether the payload can close itself automatically.
  200. Returns True if the payload has no file handles or resources that need
  201. explicit closing. If False, callers must await close() to release resources.
  202. """
  203. return self._autoclose
  204. def set_content_disposition(
  205. self,
  206. disptype: str,
  207. quote_fields: bool = True,
  208. _charset: str = "utf-8",
  209. **params: Any,
  210. ) -> None:
  211. """Sets ``Content-Disposition`` header."""
  212. self._headers[hdrs.CONTENT_DISPOSITION] = content_disposition_header(
  213. disptype, quote_fields=quote_fields, _charset=_charset, **params
  214. )
  215. @abstractmethod
  216. def decode(self, encoding: str = "utf-8", errors: str = "strict") -> str:
  217. """
  218. Return string representation of the value.
  219. This is named decode() to allow compatibility with bytes objects.
  220. """
  221. @abstractmethod
  222. async def write(self, writer: AbstractStreamWriter) -> None:
  223. """
  224. Write payload to the writer stream.
  225. Args:
  226. writer: An AbstractStreamWriter instance that handles the actual writing
  227. This is a legacy method that writes the entire payload without length constraints.
  228. Important:
  229. For new implementations, use write_with_length() instead of this method.
  230. This method is maintained for backwards compatibility and will eventually
  231. delegate to write_with_length(writer, None) in all implementations.
  232. All payload subclasses must override this method for backwards compatibility,
  233. but new code should use write_with_length for more flexibility and control.
  234. """
  235. # write_with_length is new in aiohttp 3.12
  236. # it should be overridden by subclasses
  237. async def write_with_length(
  238. self, writer: AbstractStreamWriter, content_length: Optional[int]
  239. ) -> None:
  240. """
  241. Write payload with a specific content length constraint.
  242. Args:
  243. writer: An AbstractStreamWriter instance that handles the actual writing
  244. content_length: Maximum number of bytes to write (None for unlimited)
  245. This method allows writing payload content with a specific length constraint,
  246. which is particularly useful for HTTP responses with Content-Length header.
  247. Note:
  248. This is the base implementation that provides backwards compatibility
  249. for subclasses that don't override this method. Specific payload types
  250. should override this method to implement proper length-constrained writing.
  251. """
  252. # Backwards compatibility for subclasses that don't override this method
  253. # and for the default implementation
  254. await self.write(writer)
  255. async def as_bytes(self, encoding: str = "utf-8", errors: str = "strict") -> bytes:
  256. """
  257. Return bytes representation of the value.
  258. This is a convenience method that calls decode() and encodes the result
  259. to bytes using the specified encoding.
  260. """
  261. # Use instance encoding if available, otherwise use parameter
  262. actual_encoding = self._encoding or encoding
  263. return self.decode(actual_encoding, errors).encode(actual_encoding)
  264. def _close(self) -> None:
  265. """
  266. Async safe synchronous close operations for backwards compatibility.
  267. This method exists only for backwards compatibility with code that
  268. needs to clean up payloads synchronously. In the future, we will
  269. drop this method and only support the async close() method.
  270. WARNING: This method must be safe to call from within the event loop
  271. without blocking. Subclasses should not perform any blocking I/O here.
  272. WARNING: This method must be called from within an event loop for
  273. certain payload types (e.g., IOBasePayload). Calling it outside an
  274. event loop may raise RuntimeError.
  275. """
  276. # This is a no-op by default, but subclasses can override it
  277. # for non-blocking cleanup operations.
  278. async def close(self) -> None:
  279. """
  280. Close the payload if it holds any resources.
  281. IMPORTANT: This method must not await anything that might not finish
  282. immediately, as it may be called during cleanup/cancellation. Schedule
  283. any long-running operations without awaiting them.
  284. In the future, this will be the only close method supported.
  285. """
  286. self._close()
  287. class BytesPayload(Payload):
  288. _value: bytes
  289. # _consumed = False (inherited) - Bytes are immutable and can be reused
  290. _autoclose = True # No file handle, just bytes in memory
  291. def __init__(
  292. self, value: Union[bytes, bytearray, memoryview], *args: Any, **kwargs: Any
  293. ) -> None:
  294. if "content_type" not in kwargs:
  295. kwargs["content_type"] = "application/octet-stream"
  296. super().__init__(value, *args, **kwargs)
  297. if isinstance(value, memoryview):
  298. self._size = value.nbytes
  299. elif isinstance(value, (bytes, bytearray)):
  300. self._size = len(value)
  301. else:
  302. raise TypeError(f"value argument must be byte-ish, not {type(value)!r}")
  303. if self._size > TOO_LARGE_BYTES_BODY:
  304. kwargs = {"source": self}
  305. warnings.warn(
  306. "Sending a large body directly with raw bytes might"
  307. " lock the event loop. You should probably pass an "
  308. "io.BytesIO object instead",
  309. ResourceWarning,
  310. **kwargs,
  311. )
  312. def decode(self, encoding: str = "utf-8", errors: str = "strict") -> str:
  313. return self._value.decode(encoding, errors)
  314. async def as_bytes(self, encoding: str = "utf-8", errors: str = "strict") -> bytes:
  315. """
  316. Return bytes representation of the value.
  317. This method returns the raw bytes content of the payload.
  318. It is equivalent to accessing the _value attribute directly.
  319. """
  320. return self._value
  321. async def write(self, writer: AbstractStreamWriter) -> None:
  322. """
  323. Write the entire bytes payload to the writer stream.
  324. Args:
  325. writer: An AbstractStreamWriter instance that handles the actual writing
  326. This method writes the entire bytes content without any length constraint.
  327. Note:
  328. For new implementations that need length control, use write_with_length().
  329. This method is maintained for backwards compatibility and is equivalent
  330. to write_with_length(writer, None).
  331. """
  332. await writer.write(self._value)
  333. async def write_with_length(
  334. self, writer: AbstractStreamWriter, content_length: Optional[int]
  335. ) -> None:
  336. """
  337. Write bytes payload with a specific content length constraint.
  338. Args:
  339. writer: An AbstractStreamWriter instance that handles the actual writing
  340. content_length: Maximum number of bytes to write (None for unlimited)
  341. This method writes either the entire byte sequence or a slice of it
  342. up to the specified content_length. For BytesPayload, this operation
  343. is performed efficiently using array slicing.
  344. """
  345. if content_length is not None:
  346. await writer.write(self._value[:content_length])
  347. else:
  348. await writer.write(self._value)
  349. class StringPayload(BytesPayload):
  350. def __init__(
  351. self,
  352. value: str,
  353. *args: Any,
  354. encoding: Optional[str] = None,
  355. content_type: Optional[str] = None,
  356. **kwargs: Any,
  357. ) -> None:
  358. if encoding is None:
  359. if content_type is None:
  360. real_encoding = "utf-8"
  361. content_type = "text/plain; charset=utf-8"
  362. else:
  363. mimetype = parse_mimetype(content_type)
  364. real_encoding = mimetype.parameters.get("charset", "utf-8")
  365. else:
  366. if content_type is None:
  367. content_type = "text/plain; charset=%s" % encoding
  368. real_encoding = encoding
  369. super().__init__(
  370. value.encode(real_encoding),
  371. encoding=real_encoding,
  372. content_type=content_type,
  373. *args,
  374. **kwargs,
  375. )
  376. class StringIOPayload(StringPayload):
  377. def __init__(self, value: IO[str], *args: Any, **kwargs: Any) -> None:
  378. super().__init__(value.read(), *args, **kwargs)
  379. class IOBasePayload(Payload):
  380. _value: io.IOBase
  381. # _consumed = False (inherited) - File can be re-read from the same position
  382. _start_position: Optional[int] = None
  383. # _autoclose = False (inherited) - Has file handle that needs explicit closing
  384. def __init__(
  385. self, value: IO[Any], disposition: str = "attachment", *args: Any, **kwargs: Any
  386. ) -> None:
  387. if "filename" not in kwargs:
  388. kwargs["filename"] = guess_filename(value)
  389. super().__init__(value, *args, **kwargs)
  390. if self._filename is not None and disposition is not None:
  391. if hdrs.CONTENT_DISPOSITION not in self.headers:
  392. self.set_content_disposition(disposition, filename=self._filename)
  393. def _set_or_restore_start_position(self) -> None:
  394. """Set or restore the start position of the file-like object."""
  395. if self._start_position is None:
  396. try:
  397. self._start_position = self._value.tell()
  398. except (OSError, AttributeError):
  399. self._consumed = True # Cannot seek, mark as consumed
  400. return
  401. try:
  402. self._value.seek(self._start_position)
  403. except (OSError, AttributeError):
  404. # Failed to seek back - mark as consumed since we've already read
  405. self._consumed = True
  406. def _read_and_available_len(
  407. self, remaining_content_len: Optional[int]
  408. ) -> Tuple[Optional[int], bytes]:
  409. """
  410. Read the file-like object and return both its total size and the first chunk.
  411. Args:
  412. remaining_content_len: Optional limit on how many bytes to read in this operation.
  413. If None, READ_SIZE will be used as the default chunk size.
  414. Returns:
  415. A tuple containing:
  416. - The total size of the remaining unread content (None if size cannot be determined)
  417. - The first chunk of bytes read from the file object
  418. This method is optimized to perform both size calculation and initial read
  419. in a single operation, which is executed in a single executor job to minimize
  420. context switches and file operations when streaming content.
  421. """
  422. self._set_or_restore_start_position()
  423. size = self.size # Call size only once since it does I/O
  424. return size, self._value.read(
  425. min(READ_SIZE, size or READ_SIZE, remaining_content_len or READ_SIZE)
  426. )
  427. def _read(self, remaining_content_len: Optional[int]) -> bytes:
  428. """
  429. Read a chunk of data from the file-like object.
  430. Args:
  431. remaining_content_len: Optional maximum number of bytes to read.
  432. If None, READ_SIZE will be used as the default chunk size.
  433. Returns:
  434. A chunk of bytes read from the file object, respecting the
  435. remaining_content_len limit if specified.
  436. This method is used for subsequent reads during streaming after
  437. the initial _read_and_available_len call has been made.
  438. """
  439. return self._value.read(remaining_content_len or READ_SIZE) # type: ignore[no-any-return]
  440. @property
  441. def size(self) -> Optional[int]:
  442. """
  443. Size of the payload in bytes.
  444. Returns the total size of the payload content from the initial position.
  445. This ensures consistent Content-Length for requests, including 307/308 redirects
  446. where the same payload instance is reused.
  447. Returns None if the size cannot be determined (e.g., for unseekable streams).
  448. """
  449. try:
  450. # Store the start position on first access.
  451. # This is critical when the same payload instance is reused (e.g., 307/308
  452. # redirects). Without storing the initial position, after the payload is
  453. # read once, the file position would be at EOF, which would cause the
  454. # size calculation to return 0 (file_size - EOF position).
  455. # By storing the start position, we ensure the size calculation always
  456. # returns the correct total size for any subsequent use.
  457. if self._start_position is None:
  458. self._start_position = self._value.tell()
  459. # Return the total size from the start position
  460. # This ensures Content-Length is correct even after reading
  461. return os.fstat(self._value.fileno()).st_size - self._start_position
  462. except (AttributeError, OSError):
  463. return None
  464. async def write(self, writer: AbstractStreamWriter) -> None:
  465. """
  466. Write the entire file-like payload to the writer stream.
  467. Args:
  468. writer: An AbstractStreamWriter instance that handles the actual writing
  469. This method writes the entire file content without any length constraint.
  470. It delegates to write_with_length() with no length limit for implementation
  471. consistency.
  472. Note:
  473. For new implementations that need length control, use write_with_length() directly.
  474. This method is maintained for backwards compatibility with existing code.
  475. """
  476. await self.write_with_length(writer, None)
  477. async def write_with_length(
  478. self, writer: AbstractStreamWriter, content_length: Optional[int]
  479. ) -> None:
  480. """
  481. Write file-like payload with a specific content length constraint.
  482. Args:
  483. writer: An AbstractStreamWriter instance that handles the actual writing
  484. content_length: Maximum number of bytes to write (None for unlimited)
  485. This method implements optimized streaming of file content with length constraints:
  486. 1. File reading is performed in a thread pool to avoid blocking the event loop
  487. 2. Content is read and written in chunks to maintain memory efficiency
  488. 3. Writing stops when either:
  489. - All available file content has been written (when size is known)
  490. - The specified content_length has been reached
  491. 4. File resources are properly closed even if the operation is cancelled
  492. The implementation carefully handles both known-size and unknown-size payloads,
  493. as well as constrained and unconstrained content lengths.
  494. """
  495. loop = asyncio.get_running_loop()
  496. total_written_len = 0
  497. remaining_content_len = content_length
  498. # Get initial data and available length
  499. available_len, chunk = await loop.run_in_executor(
  500. None, self._read_and_available_len, remaining_content_len
  501. )
  502. # Process data chunks until done
  503. while chunk:
  504. chunk_len = len(chunk)
  505. # Write data with or without length constraint
  506. if remaining_content_len is None:
  507. await writer.write(chunk)
  508. else:
  509. await writer.write(chunk[:remaining_content_len])
  510. remaining_content_len -= chunk_len
  511. total_written_len += chunk_len
  512. # Check if we're done writing
  513. if self._should_stop_writing(
  514. available_len, total_written_len, remaining_content_len
  515. ):
  516. return
  517. # Read next chunk
  518. chunk = await loop.run_in_executor(
  519. None,
  520. self._read,
  521. (
  522. min(READ_SIZE, remaining_content_len)
  523. if remaining_content_len is not None
  524. else READ_SIZE
  525. ),
  526. )
  527. def _should_stop_writing(
  528. self,
  529. available_len: Optional[int],
  530. total_written_len: int,
  531. remaining_content_len: Optional[int],
  532. ) -> bool:
  533. """
  534. Determine if we should stop writing data.
  535. Args:
  536. available_len: Known size of the payload if available (None if unknown)
  537. total_written_len: Number of bytes already written
  538. remaining_content_len: Remaining bytes to be written for content-length limited responses
  539. Returns:
  540. True if we should stop writing data, based on either:
  541. - Having written all available data (when size is known)
  542. - Having written all requested content (when content-length is specified)
  543. """
  544. return (available_len is not None and total_written_len >= available_len) or (
  545. remaining_content_len is not None and remaining_content_len <= 0
  546. )
  547. def _close(self) -> None:
  548. """
  549. Async safe synchronous close operations for backwards compatibility.
  550. This method exists only for backwards
  551. compatibility. Use the async close() method instead.
  552. WARNING: This method MUST be called from within an event loop.
  553. Calling it outside an event loop will raise RuntimeError.
  554. """
  555. # Skip if already consumed
  556. if self._consumed:
  557. return
  558. self._consumed = True # Mark as consumed to prevent further writes
  559. # Schedule file closing without awaiting to prevent cancellation issues
  560. loop = asyncio.get_running_loop()
  561. close_future = loop.run_in_executor(None, self._value.close)
  562. # Hold a strong reference to the future to prevent it from being
  563. # garbage collected before it completes.
  564. _CLOSE_FUTURES.add(close_future)
  565. close_future.add_done_callback(_CLOSE_FUTURES.remove)
  566. async def close(self) -> None:
  567. """
  568. Close the payload if it holds any resources.
  569. IMPORTANT: This method must not await anything that might not finish
  570. immediately, as it may be called during cleanup/cancellation. Schedule
  571. any long-running operations without awaiting them.
  572. """
  573. self._close()
  574. def decode(self, encoding: str = "utf-8", errors: str = "strict") -> str:
  575. """
  576. Return string representation of the value.
  577. WARNING: This method does blocking I/O and should not be called in the event loop.
  578. """
  579. return self._read_all().decode(encoding, errors)
  580. def _read_all(self) -> bytes:
  581. """Read the entire file-like object and return its content as bytes."""
  582. self._set_or_restore_start_position()
  583. # Use readlines() to ensure we get all content
  584. return b"".join(self._value.readlines())
  585. async def as_bytes(self, encoding: str = "utf-8", errors: str = "strict") -> bytes:
  586. """
  587. Return bytes representation of the value.
  588. This method reads the entire file content and returns it as bytes.
  589. It is equivalent to reading the file-like object directly.
  590. The file reading is performed in an executor to avoid blocking the event loop.
  591. """
  592. loop = asyncio.get_running_loop()
  593. return await loop.run_in_executor(None, self._read_all)
  594. class TextIOPayload(IOBasePayload):
  595. _value: io.TextIOBase
  596. # _autoclose = False (inherited) - Has text file handle that needs explicit closing
  597. def __init__(
  598. self,
  599. value: TextIO,
  600. *args: Any,
  601. encoding: Optional[str] = None,
  602. content_type: Optional[str] = None,
  603. **kwargs: Any,
  604. ) -> None:
  605. if encoding is None:
  606. if content_type is None:
  607. encoding = "utf-8"
  608. content_type = "text/plain; charset=utf-8"
  609. else:
  610. mimetype = parse_mimetype(content_type)
  611. encoding = mimetype.parameters.get("charset", "utf-8")
  612. else:
  613. if content_type is None:
  614. content_type = "text/plain; charset=%s" % encoding
  615. super().__init__(
  616. value,
  617. content_type=content_type,
  618. encoding=encoding,
  619. *args,
  620. **kwargs,
  621. )
  622. def _read_and_available_len(
  623. self, remaining_content_len: Optional[int]
  624. ) -> Tuple[Optional[int], bytes]:
  625. """
  626. Read the text file-like object and return both its total size and the first chunk.
  627. Args:
  628. remaining_content_len: Optional limit on how many bytes to read in this operation.
  629. If None, READ_SIZE will be used as the default chunk size.
  630. Returns:
  631. A tuple containing:
  632. - The total size of the remaining unread content (None if size cannot be determined)
  633. - The first chunk of bytes read from the file object, encoded using the payload's encoding
  634. This method is optimized to perform both size calculation and initial read
  635. in a single operation, which is executed in a single executor job to minimize
  636. context switches and file operations when streaming content.
  637. Note:
  638. TextIOPayload handles encoding of the text content before writing it
  639. to the stream. If no encoding is specified, UTF-8 is used as the default.
  640. """
  641. self._set_or_restore_start_position()
  642. size = self.size
  643. chunk = self._value.read(
  644. min(READ_SIZE, size or READ_SIZE, remaining_content_len or READ_SIZE)
  645. )
  646. return size, chunk.encode(self._encoding) if self._encoding else chunk.encode()
  647. def _read(self, remaining_content_len: Optional[int]) -> bytes:
  648. """
  649. Read a chunk of data from the text file-like object.
  650. Args:
  651. remaining_content_len: Optional maximum number of bytes to read.
  652. If None, READ_SIZE will be used as the default chunk size.
  653. Returns:
  654. A chunk of bytes read from the file object and encoded using the payload's
  655. encoding. The data is automatically converted from text to bytes.
  656. This method is used for subsequent reads during streaming after
  657. the initial _read_and_available_len call has been made. It properly
  658. handles text encoding, converting the text content to bytes using
  659. the specified encoding (or UTF-8 if none was provided).
  660. """
  661. chunk = self._value.read(remaining_content_len or READ_SIZE)
  662. return chunk.encode(self._encoding) if self._encoding else chunk.encode()
  663. def decode(self, encoding: str = "utf-8", errors: str = "strict") -> str:
  664. """
  665. Return string representation of the value.
  666. WARNING: This method does blocking I/O and should not be called in the event loop.
  667. """
  668. self._set_or_restore_start_position()
  669. return self._value.read()
  670. async def as_bytes(self, encoding: str = "utf-8", errors: str = "strict") -> bytes:
  671. """
  672. Return bytes representation of the value.
  673. This method reads the entire text file content and returns it as bytes.
  674. It encodes the text content using the specified encoding.
  675. The file reading is performed in an executor to avoid blocking the event loop.
  676. """
  677. loop = asyncio.get_running_loop()
  678. # Use instance encoding if available, otherwise use parameter
  679. actual_encoding = self._encoding or encoding
  680. def _read_and_encode() -> bytes:
  681. self._set_or_restore_start_position()
  682. # TextIO read() always returns the full content
  683. return self._value.read().encode(actual_encoding, errors)
  684. return await loop.run_in_executor(None, _read_and_encode)
  685. class BytesIOPayload(IOBasePayload):
  686. _value: io.BytesIO
  687. _size: int # Always initialized in __init__
  688. _autoclose = True # BytesIO is in-memory, safe to auto-close
  689. def __init__(self, value: io.BytesIO, *args: Any, **kwargs: Any) -> None:
  690. super().__init__(value, *args, **kwargs)
  691. # Calculate size once during initialization
  692. self._size = len(self._value.getbuffer()) - self._value.tell()
  693. @property
  694. def size(self) -> int:
  695. """Size of the payload in bytes.
  696. Returns the number of bytes in the BytesIO buffer that will be transmitted.
  697. This is calculated once during initialization for efficiency.
  698. """
  699. return self._size
  700. def decode(self, encoding: str = "utf-8", errors: str = "strict") -> str:
  701. self._set_or_restore_start_position()
  702. return self._value.read().decode(encoding, errors)
  703. async def write(self, writer: AbstractStreamWriter) -> None:
  704. return await self.write_with_length(writer, None)
  705. async def write_with_length(
  706. self, writer: AbstractStreamWriter, content_length: Optional[int]
  707. ) -> None:
  708. """
  709. Write BytesIO payload with a specific content length constraint.
  710. Args:
  711. writer: An AbstractStreamWriter instance that handles the actual writing
  712. content_length: Maximum number of bytes to write (None for unlimited)
  713. This implementation is specifically optimized for BytesIO objects:
  714. 1. Reads content in chunks to maintain memory efficiency
  715. 2. Yields control back to the event loop periodically to prevent blocking
  716. when dealing with large BytesIO objects
  717. 3. Respects content_length constraints when specified
  718. 4. Properly cleans up by closing the BytesIO object when done or on error
  719. The periodic yielding to the event loop is important for maintaining
  720. responsiveness when processing large in-memory buffers.
  721. """
  722. self._set_or_restore_start_position()
  723. loop_count = 0
  724. remaining_bytes = content_length
  725. while chunk := self._value.read(READ_SIZE):
  726. if loop_count > 0:
  727. # Avoid blocking the event loop
  728. # if they pass a large BytesIO object
  729. # and we are not in the first iteration
  730. # of the loop
  731. await asyncio.sleep(0)
  732. if remaining_bytes is None:
  733. await writer.write(chunk)
  734. else:
  735. await writer.write(chunk[:remaining_bytes])
  736. remaining_bytes -= len(chunk)
  737. if remaining_bytes <= 0:
  738. return
  739. loop_count += 1
  740. async def as_bytes(self, encoding: str = "utf-8", errors: str = "strict") -> bytes:
  741. """
  742. Return bytes representation of the value.
  743. This method reads the entire BytesIO content and returns it as bytes.
  744. It is equivalent to accessing the _value attribute directly.
  745. """
  746. self._set_or_restore_start_position()
  747. return self._value.read()
  748. async def close(self) -> None:
  749. """
  750. Close the BytesIO payload.
  751. This does nothing since BytesIO is in-memory and does not require explicit closing.
  752. """
  753. class BufferedReaderPayload(IOBasePayload):
  754. _value: io.BufferedIOBase
  755. # _autoclose = False (inherited) - Has buffered file handle that needs explicit closing
  756. def decode(self, encoding: str = "utf-8", errors: str = "strict") -> str:
  757. self._set_or_restore_start_position()
  758. return self._value.read().decode(encoding, errors)
  759. class JsonPayload(BytesPayload):
  760. def __init__(
  761. self,
  762. value: Any,
  763. encoding: str = "utf-8",
  764. content_type: str = "application/json",
  765. dumps: JSONEncoder = json.dumps,
  766. *args: Any,
  767. **kwargs: Any,
  768. ) -> None:
  769. super().__init__(
  770. dumps(value).encode(encoding),
  771. content_type=content_type,
  772. encoding=encoding,
  773. *args,
  774. **kwargs,
  775. )
  776. if TYPE_CHECKING:
  777. from typing import AsyncIterable, AsyncIterator
  778. _AsyncIterator = AsyncIterator[bytes]
  779. _AsyncIterable = AsyncIterable[bytes]
  780. else:
  781. from collections.abc import AsyncIterable, AsyncIterator
  782. _AsyncIterator = AsyncIterator
  783. _AsyncIterable = AsyncIterable
  784. class AsyncIterablePayload(Payload):
  785. _iter: Optional[_AsyncIterator] = None
  786. _value: _AsyncIterable
  787. _cached_chunks: Optional[List[bytes]] = None
  788. # _consumed stays False to allow reuse with cached content
  789. _autoclose = True # Iterator doesn't need explicit closing
  790. def __init__(self, value: _AsyncIterable, *args: Any, **kwargs: Any) -> None:
  791. if not isinstance(value, AsyncIterable):
  792. raise TypeError(
  793. "value argument must support "
  794. "collections.abc.AsyncIterable interface, "
  795. "got {!r}".format(type(value))
  796. )
  797. if "content_type" not in kwargs:
  798. kwargs["content_type"] = "application/octet-stream"
  799. super().__init__(value, *args, **kwargs)
  800. self._iter = value.__aiter__()
  801. async def write(self, writer: AbstractStreamWriter) -> None:
  802. """
  803. Write the entire async iterable payload to the writer stream.
  804. Args:
  805. writer: An AbstractStreamWriter instance that handles the actual writing
  806. This method iterates through the async iterable and writes each chunk
  807. to the writer without any length constraint.
  808. Note:
  809. For new implementations that need length control, use write_with_length() directly.
  810. This method is maintained for backwards compatibility with existing code.
  811. """
  812. await self.write_with_length(writer, None)
  813. async def write_with_length(
  814. self, writer: AbstractStreamWriter, content_length: Optional[int]
  815. ) -> None:
  816. """
  817. Write async iterable payload with a specific content length constraint.
  818. Args:
  819. writer: An AbstractStreamWriter instance that handles the actual writing
  820. content_length: Maximum number of bytes to write (None for unlimited)
  821. This implementation handles streaming of async iterable content with length constraints:
  822. 1. If cached chunks are available, writes from them
  823. 2. Otherwise iterates through the async iterable one chunk at a time
  824. 3. Respects content_length constraints when specified
  825. 4. Does NOT generate cache - that's done by as_bytes()
  826. """
  827. # If we have cached chunks, use them
  828. if self._cached_chunks is not None:
  829. remaining_bytes = content_length
  830. for chunk in self._cached_chunks:
  831. if remaining_bytes is None:
  832. await writer.write(chunk)
  833. elif remaining_bytes > 0:
  834. await writer.write(chunk[:remaining_bytes])
  835. remaining_bytes -= len(chunk)
  836. else:
  837. break
  838. return
  839. # If iterator is exhausted and we don't have cached chunks, nothing to write
  840. if self._iter is None:
  841. return
  842. # Stream from the iterator
  843. remaining_bytes = content_length
  844. try:
  845. while True:
  846. if sys.version_info >= (3, 10):
  847. chunk = await anext(self._iter)
  848. else:
  849. chunk = await self._iter.__anext__()
  850. if remaining_bytes is None:
  851. await writer.write(chunk)
  852. # If we have a content length limit
  853. elif remaining_bytes > 0:
  854. await writer.write(chunk[:remaining_bytes])
  855. remaining_bytes -= len(chunk)
  856. # We still want to exhaust the iterator even
  857. # if we have reached the content length limit
  858. # since the file handle may not get closed by
  859. # the iterator if we don't do this
  860. except StopAsyncIteration:
  861. # Iterator is exhausted
  862. self._iter = None
  863. self._consumed = True # Mark as consumed when streamed without caching
  864. def decode(self, encoding: str = "utf-8", errors: str = "strict") -> str:
  865. """Decode the payload content as a string if cached chunks are available."""
  866. if self._cached_chunks is not None:
  867. return b"".join(self._cached_chunks).decode(encoding, errors)
  868. raise TypeError("Unable to decode - content not cached. Call as_bytes() first.")
  869. async def as_bytes(self, encoding: str = "utf-8", errors: str = "strict") -> bytes:
  870. """
  871. Return bytes representation of the value.
  872. This method reads the entire async iterable content and returns it as bytes.
  873. It generates and caches the chunks for future reuse.
  874. """
  875. # If we have cached chunks, return them joined
  876. if self._cached_chunks is not None:
  877. return b"".join(self._cached_chunks)
  878. # If iterator is exhausted and no cache, return empty
  879. if self._iter is None:
  880. return b""
  881. # Read all chunks and cache them
  882. chunks: List[bytes] = []
  883. async for chunk in self._iter:
  884. chunks.append(chunk)
  885. # Iterator is exhausted, cache the chunks
  886. self._iter = None
  887. self._cached_chunks = chunks
  888. # Keep _consumed as False to allow reuse with cached chunks
  889. return b"".join(chunks)
  890. class StreamReaderPayload(AsyncIterablePayload):
  891. def __init__(self, value: StreamReader, *args: Any, **kwargs: Any) -> None:
  892. super().__init__(value.iter_any(), *args, **kwargs)
  893. PAYLOAD_REGISTRY = PayloadRegistry()
  894. PAYLOAD_REGISTRY.register(BytesPayload, (bytes, bytearray, memoryview))
  895. PAYLOAD_REGISTRY.register(StringPayload, str)
  896. PAYLOAD_REGISTRY.register(StringIOPayload, io.StringIO)
  897. PAYLOAD_REGISTRY.register(TextIOPayload, io.TextIOBase)
  898. PAYLOAD_REGISTRY.register(BytesIOPayload, io.BytesIO)
  899. PAYLOAD_REGISTRY.register(BufferedReaderPayload, (io.BufferedReader, io.BufferedRandom))
  900. PAYLOAD_REGISTRY.register(IOBasePayload, io.IOBase)
  901. PAYLOAD_REGISTRY.register(StreamReaderPayload, StreamReader)
  902. # try_last for giving a chance to more specialized async interables like
  903. # multipart.BodyPartReaderPayload override the default
  904. PAYLOAD_REGISTRY.register(AsyncIterablePayload, AsyncIterable, order=Order.try_last)