web_response.py 29 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856
  1. import asyncio
  2. import collections.abc
  3. import datetime
  4. import enum
  5. import json
  6. import math
  7. import time
  8. import warnings
  9. from concurrent.futures import Executor
  10. from http import HTTPStatus
  11. from http.cookies import SimpleCookie
  12. from typing import (
  13. TYPE_CHECKING,
  14. Any,
  15. Dict,
  16. Iterator,
  17. MutableMapping,
  18. Optional,
  19. Union,
  20. cast,
  21. )
  22. from multidict import CIMultiDict, istr
  23. from . import hdrs, payload
  24. from .abc import AbstractStreamWriter
  25. from .compression_utils import ZLibCompressor
  26. from .helpers import (
  27. ETAG_ANY,
  28. QUOTED_ETAG_RE,
  29. ETag,
  30. HeadersMixin,
  31. must_be_empty_body,
  32. parse_http_date,
  33. rfc822_formatted_time,
  34. sentinel,
  35. should_remove_content_length,
  36. validate_etag_value,
  37. )
  38. from .http import SERVER_SOFTWARE, HttpVersion10, HttpVersion11
  39. from .payload import Payload
  40. from .typedefs import JSONEncoder, LooseHeaders
  41. REASON_PHRASES = {http_status.value: http_status.phrase for http_status in HTTPStatus}
  42. LARGE_BODY_SIZE = 1024**2
  43. __all__ = ("ContentCoding", "StreamResponse", "Response", "json_response")
  44. if TYPE_CHECKING:
  45. from .web_request import BaseRequest
  46. BaseClass = MutableMapping[str, Any]
  47. else:
  48. BaseClass = collections.abc.MutableMapping
  49. # TODO(py311): Convert to StrEnum for wider use
  50. class ContentCoding(enum.Enum):
  51. # The content codings that we have support for.
  52. #
  53. # Additional registered codings are listed at:
  54. # https://www.iana.org/assignments/http-parameters/http-parameters.xhtml#content-coding
  55. deflate = "deflate"
  56. gzip = "gzip"
  57. identity = "identity"
  58. CONTENT_CODINGS = {coding.value: coding for coding in ContentCoding}
  59. ############################################################
  60. # HTTP Response classes
  61. ############################################################
  62. class StreamResponse(BaseClass, HeadersMixin):
  63. _body: Union[None, bytes, bytearray, Payload]
  64. _length_check = True
  65. _body = None
  66. _keep_alive: Optional[bool] = None
  67. _chunked: bool = False
  68. _compression: bool = False
  69. _compression_strategy: Optional[int] = None
  70. _compression_force: Optional[ContentCoding] = None
  71. _req: Optional["BaseRequest"] = None
  72. _payload_writer: Optional[AbstractStreamWriter] = None
  73. _eof_sent: bool = False
  74. _must_be_empty_body: Optional[bool] = None
  75. _body_length = 0
  76. _cookies: Optional[SimpleCookie] = None
  77. _send_headers_immediately = True
  78. def __init__(
  79. self,
  80. *,
  81. status: int = 200,
  82. reason: Optional[str] = None,
  83. headers: Optional[LooseHeaders] = None,
  84. _real_headers: Optional[CIMultiDict[str]] = None,
  85. ) -> None:
  86. """Initialize a new stream response object.
  87. _real_headers is an internal parameter used to pass a pre-populated
  88. headers object. It is used by the `Response` class to avoid copying
  89. the headers when creating a new response object. It is not intended
  90. to be used by external code.
  91. """
  92. self._state: Dict[str, Any] = {}
  93. if _real_headers is not None:
  94. self._headers = _real_headers
  95. elif headers is not None:
  96. self._headers: CIMultiDict[str] = CIMultiDict(headers)
  97. else:
  98. self._headers = CIMultiDict()
  99. self._set_status(status, reason)
  100. @property
  101. def prepared(self) -> bool:
  102. return self._eof_sent or self._payload_writer is not None
  103. @property
  104. def task(self) -> "Optional[asyncio.Task[None]]":
  105. if self._req:
  106. return self._req.task
  107. else:
  108. return None
  109. @property
  110. def status(self) -> int:
  111. return self._status
  112. @property
  113. def chunked(self) -> bool:
  114. return self._chunked
  115. @property
  116. def compression(self) -> bool:
  117. return self._compression
  118. @property
  119. def reason(self) -> str:
  120. return self._reason
  121. def set_status(
  122. self,
  123. status: int,
  124. reason: Optional[str] = None,
  125. ) -> None:
  126. assert (
  127. not self.prepared
  128. ), "Cannot change the response status code after the headers have been sent"
  129. self._set_status(status, reason)
  130. def _set_status(self, status: int, reason: Optional[str]) -> None:
  131. self._status = int(status)
  132. if reason is None:
  133. reason = REASON_PHRASES.get(self._status, "")
  134. elif "\n" in reason:
  135. raise ValueError("Reason cannot contain \\n")
  136. self._reason = reason
  137. @property
  138. def keep_alive(self) -> Optional[bool]:
  139. return self._keep_alive
  140. def force_close(self) -> None:
  141. self._keep_alive = False
  142. @property
  143. def body_length(self) -> int:
  144. return self._body_length
  145. @property
  146. def output_length(self) -> int:
  147. warnings.warn("output_length is deprecated", DeprecationWarning)
  148. assert self._payload_writer
  149. return self._payload_writer.buffer_size
  150. def enable_chunked_encoding(self, chunk_size: Optional[int] = None) -> None:
  151. """Enables automatic chunked transfer encoding."""
  152. if hdrs.CONTENT_LENGTH in self._headers:
  153. raise RuntimeError(
  154. "You can't enable chunked encoding when a content length is set"
  155. )
  156. if chunk_size is not None:
  157. warnings.warn("Chunk size is deprecated #1615", DeprecationWarning)
  158. self._chunked = True
  159. def enable_compression(
  160. self,
  161. force: Optional[Union[bool, ContentCoding]] = None,
  162. strategy: Optional[int] = None,
  163. ) -> None:
  164. """Enables response compression encoding."""
  165. # Backwards compatibility for when force was a bool <0.17.
  166. if isinstance(force, bool):
  167. force = ContentCoding.deflate if force else ContentCoding.identity
  168. warnings.warn(
  169. "Using boolean for force is deprecated #3318", DeprecationWarning
  170. )
  171. elif force is not None:
  172. assert isinstance(
  173. force, ContentCoding
  174. ), "force should one of None, bool or ContentEncoding"
  175. self._compression = True
  176. self._compression_force = force
  177. self._compression_strategy = strategy
  178. @property
  179. def headers(self) -> "CIMultiDict[str]":
  180. return self._headers
  181. @property
  182. def cookies(self) -> SimpleCookie:
  183. if self._cookies is None:
  184. self._cookies = SimpleCookie()
  185. return self._cookies
  186. def set_cookie(
  187. self,
  188. name: str,
  189. value: str,
  190. *,
  191. expires: Optional[str] = None,
  192. domain: Optional[str] = None,
  193. max_age: Optional[Union[int, str]] = None,
  194. path: str = "/",
  195. secure: Optional[bool] = None,
  196. httponly: Optional[bool] = None,
  197. version: Optional[str] = None,
  198. samesite: Optional[str] = None,
  199. partitioned: Optional[bool] = None,
  200. ) -> None:
  201. """Set or update response cookie.
  202. Sets new cookie or updates existent with new value.
  203. Also updates only those params which are not None.
  204. """
  205. if self._cookies is None:
  206. self._cookies = SimpleCookie()
  207. self._cookies[name] = value
  208. c = self._cookies[name]
  209. if expires is not None:
  210. c["expires"] = expires
  211. elif c.get("expires") == "Thu, 01 Jan 1970 00:00:00 GMT":
  212. del c["expires"]
  213. if domain is not None:
  214. c["domain"] = domain
  215. if max_age is not None:
  216. c["max-age"] = str(max_age)
  217. elif "max-age" in c:
  218. del c["max-age"]
  219. c["path"] = path
  220. if secure is not None:
  221. c["secure"] = secure
  222. if httponly is not None:
  223. c["httponly"] = httponly
  224. if version is not None:
  225. c["version"] = version
  226. if samesite is not None:
  227. c["samesite"] = samesite
  228. if partitioned is not None:
  229. c["partitioned"] = partitioned
  230. def del_cookie(
  231. self,
  232. name: str,
  233. *,
  234. domain: Optional[str] = None,
  235. path: str = "/",
  236. secure: Optional[bool] = None,
  237. httponly: Optional[bool] = None,
  238. samesite: Optional[str] = None,
  239. ) -> None:
  240. """Delete cookie.
  241. Creates new empty expired cookie.
  242. """
  243. # TODO: do we need domain/path here?
  244. if self._cookies is not None:
  245. self._cookies.pop(name, None)
  246. self.set_cookie(
  247. name,
  248. "",
  249. max_age=0,
  250. expires="Thu, 01 Jan 1970 00:00:00 GMT",
  251. domain=domain,
  252. path=path,
  253. secure=secure,
  254. httponly=httponly,
  255. samesite=samesite,
  256. )
  257. @property
  258. def content_length(self) -> Optional[int]:
  259. # Just a placeholder for adding setter
  260. return super().content_length
  261. @content_length.setter
  262. def content_length(self, value: Optional[int]) -> None:
  263. if value is not None:
  264. value = int(value)
  265. if self._chunked:
  266. raise RuntimeError(
  267. "You can't set content length when chunked encoding is enable"
  268. )
  269. self._headers[hdrs.CONTENT_LENGTH] = str(value)
  270. else:
  271. self._headers.pop(hdrs.CONTENT_LENGTH, None)
  272. @property
  273. def content_type(self) -> str:
  274. # Just a placeholder for adding setter
  275. return super().content_type
  276. @content_type.setter
  277. def content_type(self, value: str) -> None:
  278. self.content_type # read header values if needed
  279. self._content_type = str(value)
  280. self._generate_content_type_header()
  281. @property
  282. def charset(self) -> Optional[str]:
  283. # Just a placeholder for adding setter
  284. return super().charset
  285. @charset.setter
  286. def charset(self, value: Optional[str]) -> None:
  287. ctype = self.content_type # read header values if needed
  288. if ctype == "application/octet-stream":
  289. raise RuntimeError(
  290. "Setting charset for application/octet-stream "
  291. "doesn't make sense, setup content_type first"
  292. )
  293. assert self._content_dict is not None
  294. if value is None:
  295. self._content_dict.pop("charset", None)
  296. else:
  297. self._content_dict["charset"] = str(value).lower()
  298. self._generate_content_type_header()
  299. @property
  300. def last_modified(self) -> Optional[datetime.datetime]:
  301. """The value of Last-Modified HTTP header, or None.
  302. This header is represented as a `datetime` object.
  303. """
  304. return parse_http_date(self._headers.get(hdrs.LAST_MODIFIED))
  305. @last_modified.setter
  306. def last_modified(
  307. self, value: Optional[Union[int, float, datetime.datetime, str]]
  308. ) -> None:
  309. if value is None:
  310. self._headers.pop(hdrs.LAST_MODIFIED, None)
  311. elif isinstance(value, (int, float)):
  312. self._headers[hdrs.LAST_MODIFIED] = time.strftime(
  313. "%a, %d %b %Y %H:%M:%S GMT", time.gmtime(math.ceil(value))
  314. )
  315. elif isinstance(value, datetime.datetime):
  316. self._headers[hdrs.LAST_MODIFIED] = time.strftime(
  317. "%a, %d %b %Y %H:%M:%S GMT", value.utctimetuple()
  318. )
  319. elif isinstance(value, str):
  320. self._headers[hdrs.LAST_MODIFIED] = value
  321. else:
  322. msg = f"Unsupported type for last_modified: {type(value).__name__}"
  323. raise TypeError(msg)
  324. @property
  325. def etag(self) -> Optional[ETag]:
  326. quoted_value = self._headers.get(hdrs.ETAG)
  327. if not quoted_value:
  328. return None
  329. elif quoted_value == ETAG_ANY:
  330. return ETag(value=ETAG_ANY)
  331. match = QUOTED_ETAG_RE.fullmatch(quoted_value)
  332. if not match:
  333. return None
  334. is_weak, value = match.group(1, 2)
  335. return ETag(
  336. is_weak=bool(is_weak),
  337. value=value,
  338. )
  339. @etag.setter
  340. def etag(self, value: Optional[Union[ETag, str]]) -> None:
  341. if value is None:
  342. self._headers.pop(hdrs.ETAG, None)
  343. elif (isinstance(value, str) and value == ETAG_ANY) or (
  344. isinstance(value, ETag) and value.value == ETAG_ANY
  345. ):
  346. self._headers[hdrs.ETAG] = ETAG_ANY
  347. elif isinstance(value, str):
  348. validate_etag_value(value)
  349. self._headers[hdrs.ETAG] = f'"{value}"'
  350. elif isinstance(value, ETag) and isinstance(value.value, str):
  351. validate_etag_value(value.value)
  352. hdr_value = f'W/"{value.value}"' if value.is_weak else f'"{value.value}"'
  353. self._headers[hdrs.ETAG] = hdr_value
  354. else:
  355. raise ValueError(
  356. f"Unsupported etag type: {type(value)}. "
  357. f"etag must be str, ETag or None"
  358. )
  359. def _generate_content_type_header(
  360. self, CONTENT_TYPE: istr = hdrs.CONTENT_TYPE
  361. ) -> None:
  362. assert self._content_dict is not None
  363. assert self._content_type is not None
  364. params = "; ".join(f"{k}={v}" for k, v in self._content_dict.items())
  365. if params:
  366. ctype = self._content_type + "; " + params
  367. else:
  368. ctype = self._content_type
  369. self._headers[CONTENT_TYPE] = ctype
  370. async def _do_start_compression(self, coding: ContentCoding) -> None:
  371. if coding is ContentCoding.identity:
  372. return
  373. assert self._payload_writer is not None
  374. self._headers[hdrs.CONTENT_ENCODING] = coding.value
  375. self._payload_writer.enable_compression(
  376. coding.value, self._compression_strategy
  377. )
  378. # Compressed payload may have different content length,
  379. # remove the header
  380. self._headers.popall(hdrs.CONTENT_LENGTH, None)
  381. async def _start_compression(self, request: "BaseRequest") -> None:
  382. if self._compression_force:
  383. await self._do_start_compression(self._compression_force)
  384. return
  385. # Encoding comparisons should be case-insensitive
  386. # https://www.rfc-editor.org/rfc/rfc9110#section-8.4.1
  387. accept_encoding = request.headers.get(hdrs.ACCEPT_ENCODING, "").lower()
  388. for value, coding in CONTENT_CODINGS.items():
  389. if value in accept_encoding:
  390. await self._do_start_compression(coding)
  391. return
  392. async def prepare(self, request: "BaseRequest") -> Optional[AbstractStreamWriter]:
  393. if self._eof_sent:
  394. return None
  395. if self._payload_writer is not None:
  396. return self._payload_writer
  397. self._must_be_empty_body = must_be_empty_body(request.method, self.status)
  398. return await self._start(request)
  399. async def _start(self, request: "BaseRequest") -> AbstractStreamWriter:
  400. self._req = request
  401. writer = self._payload_writer = request._payload_writer
  402. await self._prepare_headers()
  403. await request._prepare_hook(self)
  404. await self._write_headers()
  405. return writer
  406. async def _prepare_headers(self) -> None:
  407. request = self._req
  408. assert request is not None
  409. writer = self._payload_writer
  410. assert writer is not None
  411. keep_alive = self._keep_alive
  412. if keep_alive is None:
  413. keep_alive = request.keep_alive
  414. self._keep_alive = keep_alive
  415. version = request.version
  416. headers = self._headers
  417. if self._cookies:
  418. for cookie in self._cookies.values():
  419. value = cookie.output(header="")[1:]
  420. headers.add(hdrs.SET_COOKIE, value)
  421. if self._compression:
  422. await self._start_compression(request)
  423. if self._chunked:
  424. if version != HttpVersion11:
  425. raise RuntimeError(
  426. "Using chunked encoding is forbidden "
  427. "for HTTP/{0.major}.{0.minor}".format(request.version)
  428. )
  429. if not self._must_be_empty_body:
  430. writer.enable_chunking()
  431. headers[hdrs.TRANSFER_ENCODING] = "chunked"
  432. elif self._length_check: # Disabled for WebSockets
  433. writer.length = self.content_length
  434. if writer.length is None:
  435. if version >= HttpVersion11:
  436. if not self._must_be_empty_body:
  437. writer.enable_chunking()
  438. headers[hdrs.TRANSFER_ENCODING] = "chunked"
  439. elif not self._must_be_empty_body:
  440. keep_alive = False
  441. # HTTP 1.1: https://tools.ietf.org/html/rfc7230#section-3.3.2
  442. # HTTP 1.0: https://tools.ietf.org/html/rfc1945#section-10.4
  443. if self._must_be_empty_body:
  444. if hdrs.CONTENT_LENGTH in headers and should_remove_content_length(
  445. request.method, self.status
  446. ):
  447. del headers[hdrs.CONTENT_LENGTH]
  448. # https://datatracker.ietf.org/doc/html/rfc9112#section-6.1-10
  449. # https://datatracker.ietf.org/doc/html/rfc9112#section-6.1-13
  450. if hdrs.TRANSFER_ENCODING in headers:
  451. del headers[hdrs.TRANSFER_ENCODING]
  452. elif (writer.length if self._length_check else self.content_length) != 0:
  453. # https://www.rfc-editor.org/rfc/rfc9110#section-8.3-5
  454. headers.setdefault(hdrs.CONTENT_TYPE, "application/octet-stream")
  455. headers.setdefault(hdrs.DATE, rfc822_formatted_time())
  456. headers.setdefault(hdrs.SERVER, SERVER_SOFTWARE)
  457. # connection header
  458. if hdrs.CONNECTION not in headers:
  459. if keep_alive:
  460. if version == HttpVersion10:
  461. headers[hdrs.CONNECTION] = "keep-alive"
  462. elif version == HttpVersion11:
  463. headers[hdrs.CONNECTION] = "close"
  464. async def _write_headers(self) -> None:
  465. request = self._req
  466. assert request is not None
  467. writer = self._payload_writer
  468. assert writer is not None
  469. # status line
  470. version = request.version
  471. status_line = f"HTTP/{version[0]}.{version[1]} {self._status} {self._reason}"
  472. await writer.write_headers(status_line, self._headers)
  473. # Send headers immediately if not opted into buffering
  474. if self._send_headers_immediately:
  475. writer.send_headers()
  476. async def write(self, data: Union[bytes, bytearray, memoryview]) -> None:
  477. assert isinstance(
  478. data, (bytes, bytearray, memoryview)
  479. ), "data argument must be byte-ish (%r)" % type(data)
  480. if self._eof_sent:
  481. raise RuntimeError("Cannot call write() after write_eof()")
  482. if self._payload_writer is None:
  483. raise RuntimeError("Cannot call write() before prepare()")
  484. await self._payload_writer.write(data)
  485. async def drain(self) -> None:
  486. assert not self._eof_sent, "EOF has already been sent"
  487. assert self._payload_writer is not None, "Response has not been started"
  488. warnings.warn(
  489. "drain method is deprecated, use await resp.write()",
  490. DeprecationWarning,
  491. stacklevel=2,
  492. )
  493. await self._payload_writer.drain()
  494. async def write_eof(self, data: bytes = b"") -> None:
  495. assert isinstance(
  496. data, (bytes, bytearray, memoryview)
  497. ), "data argument must be byte-ish (%r)" % type(data)
  498. if self._eof_sent:
  499. return
  500. assert self._payload_writer is not None, "Response has not been started"
  501. await self._payload_writer.write_eof(data)
  502. self._eof_sent = True
  503. self._req = None
  504. self._body_length = self._payload_writer.output_size
  505. self._payload_writer = None
  506. def __repr__(self) -> str:
  507. if self._eof_sent:
  508. info = "eof"
  509. elif self.prepared:
  510. assert self._req is not None
  511. info = f"{self._req.method} {self._req.path} "
  512. else:
  513. info = "not prepared"
  514. return f"<{self.__class__.__name__} {self.reason} {info}>"
  515. def __getitem__(self, key: str) -> Any:
  516. return self._state[key]
  517. def __setitem__(self, key: str, value: Any) -> None:
  518. self._state[key] = value
  519. def __delitem__(self, key: str) -> None:
  520. del self._state[key]
  521. def __len__(self) -> int:
  522. return len(self._state)
  523. def __iter__(self) -> Iterator[str]:
  524. return iter(self._state)
  525. def __hash__(self) -> int:
  526. return hash(id(self))
  527. def __eq__(self, other: object) -> bool:
  528. return self is other
  529. def __bool__(self) -> bool:
  530. return True
  531. class Response(StreamResponse):
  532. _compressed_body: Optional[bytes] = None
  533. _send_headers_immediately = False
  534. def __init__(
  535. self,
  536. *,
  537. body: Any = None,
  538. status: int = 200,
  539. reason: Optional[str] = None,
  540. text: Optional[str] = None,
  541. headers: Optional[LooseHeaders] = None,
  542. content_type: Optional[str] = None,
  543. charset: Optional[str] = None,
  544. zlib_executor_size: Optional[int] = None,
  545. zlib_executor: Optional[Executor] = None,
  546. ) -> None:
  547. if body is not None and text is not None:
  548. raise ValueError("body and text are not allowed together")
  549. if headers is None:
  550. real_headers: CIMultiDict[str] = CIMultiDict()
  551. else:
  552. real_headers = CIMultiDict(headers)
  553. if content_type is not None and "charset" in content_type:
  554. raise ValueError("charset must not be in content_type argument")
  555. if text is not None:
  556. if hdrs.CONTENT_TYPE in real_headers:
  557. if content_type or charset:
  558. raise ValueError(
  559. "passing both Content-Type header and "
  560. "content_type or charset params "
  561. "is forbidden"
  562. )
  563. else:
  564. # fast path for filling headers
  565. if not isinstance(text, str):
  566. raise TypeError("text argument must be str (%r)" % type(text))
  567. if content_type is None:
  568. content_type = "text/plain"
  569. if charset is None:
  570. charset = "utf-8"
  571. real_headers[hdrs.CONTENT_TYPE] = content_type + "; charset=" + charset
  572. body = text.encode(charset)
  573. text = None
  574. elif hdrs.CONTENT_TYPE in real_headers:
  575. if content_type is not None or charset is not None:
  576. raise ValueError(
  577. "passing both Content-Type header and "
  578. "content_type or charset params "
  579. "is forbidden"
  580. )
  581. elif content_type is not None:
  582. if charset is not None:
  583. content_type += "; charset=" + charset
  584. real_headers[hdrs.CONTENT_TYPE] = content_type
  585. super().__init__(status=status, reason=reason, _real_headers=real_headers)
  586. if text is not None:
  587. self.text = text
  588. else:
  589. self.body = body
  590. self._zlib_executor_size = zlib_executor_size
  591. self._zlib_executor = zlib_executor
  592. @property
  593. def body(self) -> Optional[Union[bytes, Payload]]:
  594. return self._body
  595. @body.setter
  596. def body(self, body: Any) -> None:
  597. if body is None:
  598. self._body = None
  599. elif isinstance(body, (bytes, bytearray)):
  600. self._body = body
  601. else:
  602. try:
  603. self._body = body = payload.PAYLOAD_REGISTRY.get(body)
  604. except payload.LookupError:
  605. raise ValueError("Unsupported body type %r" % type(body))
  606. headers = self._headers
  607. # set content-type
  608. if hdrs.CONTENT_TYPE not in headers:
  609. headers[hdrs.CONTENT_TYPE] = body.content_type
  610. # copy payload headers
  611. if body.headers:
  612. for key, value in body.headers.items():
  613. if key not in headers:
  614. headers[key] = value
  615. self._compressed_body = None
  616. @property
  617. def text(self) -> Optional[str]:
  618. if self._body is None:
  619. return None
  620. # Note: When _body is a Payload (e.g. FilePayload), this may do blocking I/O
  621. # This is generally safe as most common payloads (BytesPayload, StringPayload)
  622. # don't do blocking I/O, but be careful with file-based payloads
  623. return self._body.decode(self.charset or "utf-8")
  624. @text.setter
  625. def text(self, text: str) -> None:
  626. assert text is None or isinstance(
  627. text, str
  628. ), "text argument must be str (%r)" % type(text)
  629. if self.content_type == "application/octet-stream":
  630. self.content_type = "text/plain"
  631. if self.charset is None:
  632. self.charset = "utf-8"
  633. self._body = text.encode(self.charset)
  634. self._compressed_body = None
  635. @property
  636. def content_length(self) -> Optional[int]:
  637. if self._chunked:
  638. return None
  639. if hdrs.CONTENT_LENGTH in self._headers:
  640. return int(self._headers[hdrs.CONTENT_LENGTH])
  641. if self._compressed_body is not None:
  642. # Return length of the compressed body
  643. return len(self._compressed_body)
  644. elif isinstance(self._body, Payload):
  645. # A payload without content length, or a compressed payload
  646. return None
  647. elif self._body is not None:
  648. return len(self._body)
  649. else:
  650. return 0
  651. @content_length.setter
  652. def content_length(self, value: Optional[int]) -> None:
  653. raise RuntimeError("Content length is set automatically")
  654. async def write_eof(self, data: bytes = b"") -> None:
  655. if self._eof_sent:
  656. return
  657. if self._compressed_body is None:
  658. body: Optional[Union[bytes, Payload]] = self._body
  659. else:
  660. body = self._compressed_body
  661. assert not data, f"data arg is not supported, got {data!r}"
  662. assert self._req is not None
  663. assert self._payload_writer is not None
  664. if body is None or self._must_be_empty_body:
  665. await super().write_eof()
  666. elif isinstance(self._body, Payload):
  667. await self._body.write(self._payload_writer)
  668. await self._body.close()
  669. await super().write_eof()
  670. else:
  671. await super().write_eof(cast(bytes, body))
  672. async def _start(self, request: "BaseRequest") -> AbstractStreamWriter:
  673. if hdrs.CONTENT_LENGTH in self._headers:
  674. if should_remove_content_length(request.method, self.status):
  675. del self._headers[hdrs.CONTENT_LENGTH]
  676. elif not self._chunked:
  677. if isinstance(self._body, Payload):
  678. if (size := self._body.size) is not None:
  679. self._headers[hdrs.CONTENT_LENGTH] = str(size)
  680. else:
  681. body_len = len(self._body) if self._body else "0"
  682. # https://www.rfc-editor.org/rfc/rfc9110.html#section-8.6-7
  683. if body_len != "0" or (
  684. self.status != 304 and request.method not in hdrs.METH_HEAD_ALL
  685. ):
  686. self._headers[hdrs.CONTENT_LENGTH] = str(body_len)
  687. return await super()._start(request)
  688. async def _do_start_compression(self, coding: ContentCoding) -> None:
  689. if self._chunked or isinstance(self._body, Payload):
  690. return await super()._do_start_compression(coding)
  691. if coding is ContentCoding.identity:
  692. return
  693. # Instead of using _payload_writer.enable_compression,
  694. # compress the whole body
  695. compressor = ZLibCompressor(
  696. encoding=coding.value,
  697. max_sync_chunk_size=self._zlib_executor_size,
  698. executor=self._zlib_executor,
  699. )
  700. assert self._body is not None
  701. if self._zlib_executor_size is None and len(self._body) > LARGE_BODY_SIZE:
  702. warnings.warn(
  703. "Synchronous compression of large response bodies "
  704. f"({len(self._body)} bytes) might block the async event loop. "
  705. "Consider providing a custom value to zlib_executor_size/"
  706. "zlib_executor response properties or disabling compression on it."
  707. )
  708. self._compressed_body = (
  709. await compressor.compress(self._body) + compressor.flush()
  710. )
  711. self._headers[hdrs.CONTENT_ENCODING] = coding.value
  712. self._headers[hdrs.CONTENT_LENGTH] = str(len(self._compressed_body))
  713. def json_response(
  714. data: Any = sentinel,
  715. *,
  716. text: Optional[str] = None,
  717. body: Optional[bytes] = None,
  718. status: int = 200,
  719. reason: Optional[str] = None,
  720. headers: Optional[LooseHeaders] = None,
  721. content_type: str = "application/json",
  722. dumps: JSONEncoder = json.dumps,
  723. ) -> Response:
  724. if data is not sentinel:
  725. if text or body:
  726. raise ValueError("only one of data, text, or body should be specified")
  727. else:
  728. text = dumps(data)
  729. return Response(
  730. text=text,
  731. body=body,
  732. status=status,
  733. reason=reason,
  734. headers=headers,
  735. content_type=content_type,
  736. )