client_reqrep.py 52 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536
  1. import asyncio
  2. import codecs
  3. import contextlib
  4. import functools
  5. import io
  6. import re
  7. import sys
  8. import traceback
  9. import warnings
  10. from collections.abc import Mapping
  11. from hashlib import md5, sha1, sha256
  12. from http.cookies import Morsel, SimpleCookie
  13. from types import MappingProxyType, TracebackType
  14. from typing import (
  15. TYPE_CHECKING,
  16. Any,
  17. Callable,
  18. Dict,
  19. Iterable,
  20. List,
  21. Literal,
  22. NamedTuple,
  23. Optional,
  24. Tuple,
  25. Type,
  26. Union,
  27. )
  28. import attr
  29. from multidict import CIMultiDict, CIMultiDictProxy, MultiDict, MultiDictProxy
  30. from yarl import URL
  31. from . import hdrs, helpers, http, multipart, payload
  32. from ._cookie_helpers import (
  33. parse_cookie_header,
  34. parse_set_cookie_headers,
  35. preserve_morsel_with_coded_value,
  36. )
  37. from .abc import AbstractStreamWriter
  38. from .client_exceptions import (
  39. ClientConnectionError,
  40. ClientOSError,
  41. ClientResponseError,
  42. ContentTypeError,
  43. InvalidURL,
  44. ServerFingerprintMismatch,
  45. )
  46. from .compression_utils import HAS_BROTLI, HAS_ZSTD
  47. from .formdata import FormData
  48. from .helpers import (
  49. _SENTINEL,
  50. BaseTimerContext,
  51. BasicAuth,
  52. HeadersMixin,
  53. TimerNoop,
  54. noop,
  55. reify,
  56. sentinel,
  57. set_exception,
  58. set_result,
  59. )
  60. from .http import (
  61. SERVER_SOFTWARE,
  62. HttpVersion,
  63. HttpVersion10,
  64. HttpVersion11,
  65. StreamWriter,
  66. )
  67. from .streams import StreamReader
  68. from .typedefs import (
  69. DEFAULT_JSON_DECODER,
  70. JSONDecoder,
  71. LooseCookies,
  72. LooseHeaders,
  73. Query,
  74. RawHeaders,
  75. )
  76. if TYPE_CHECKING:
  77. import ssl
  78. from ssl import SSLContext
  79. else:
  80. try:
  81. import ssl
  82. from ssl import SSLContext
  83. except ImportError: # pragma: no cover
  84. ssl = None # type: ignore[assignment]
  85. SSLContext = object # type: ignore[misc,assignment]
  86. __all__ = ("ClientRequest", "ClientResponse", "RequestInfo", "Fingerprint")
  87. if TYPE_CHECKING:
  88. from .client import ClientSession
  89. from .connector import Connection
  90. from .tracing import Trace
  91. _CONNECTION_CLOSED_EXCEPTION = ClientConnectionError("Connection closed")
  92. _CONTAINS_CONTROL_CHAR_RE = re.compile(r"[^-!#$%&'*+.^_`|~0-9a-zA-Z]")
  93. json_re = re.compile(r"^application/(?:[\w.+-]+?\+)?json")
  94. def _gen_default_accept_encoding() -> str:
  95. encodings = [
  96. "gzip",
  97. "deflate",
  98. ]
  99. if HAS_BROTLI:
  100. encodings.append("br")
  101. if HAS_ZSTD:
  102. encodings.append("zstd")
  103. return ", ".join(encodings)
  104. @attr.s(auto_attribs=True, frozen=True, slots=True)
  105. class ContentDisposition:
  106. type: Optional[str]
  107. parameters: "MappingProxyType[str, str]"
  108. filename: Optional[str]
  109. class _RequestInfo(NamedTuple):
  110. url: URL
  111. method: str
  112. headers: "CIMultiDictProxy[str]"
  113. real_url: URL
  114. class RequestInfo(_RequestInfo):
  115. def __new__(
  116. cls,
  117. url: URL,
  118. method: str,
  119. headers: "CIMultiDictProxy[str]",
  120. real_url: Union[URL, _SENTINEL] = sentinel,
  121. ) -> "RequestInfo":
  122. """Create a new RequestInfo instance.
  123. For backwards compatibility, the real_url parameter is optional.
  124. """
  125. return tuple.__new__(
  126. cls, (url, method, headers, url if real_url is sentinel else real_url)
  127. )
  128. class Fingerprint:
  129. HASHFUNC_BY_DIGESTLEN = {
  130. 16: md5,
  131. 20: sha1,
  132. 32: sha256,
  133. }
  134. def __init__(self, fingerprint: bytes) -> None:
  135. digestlen = len(fingerprint)
  136. hashfunc = self.HASHFUNC_BY_DIGESTLEN.get(digestlen)
  137. if not hashfunc:
  138. raise ValueError("fingerprint has invalid length")
  139. elif hashfunc is md5 or hashfunc is sha1:
  140. raise ValueError("md5 and sha1 are insecure and not supported. Use sha256.")
  141. self._hashfunc = hashfunc
  142. self._fingerprint = fingerprint
  143. @property
  144. def fingerprint(self) -> bytes:
  145. return self._fingerprint
  146. def check(self, transport: asyncio.Transport) -> None:
  147. if not transport.get_extra_info("sslcontext"):
  148. return
  149. sslobj = transport.get_extra_info("ssl_object")
  150. cert = sslobj.getpeercert(binary_form=True)
  151. got = self._hashfunc(cert).digest()
  152. if got != self._fingerprint:
  153. host, port, *_ = transport.get_extra_info("peername")
  154. raise ServerFingerprintMismatch(self._fingerprint, got, host, port)
  155. if ssl is not None:
  156. SSL_ALLOWED_TYPES = (ssl.SSLContext, bool, Fingerprint, type(None))
  157. else: # pragma: no cover
  158. SSL_ALLOWED_TYPES = (bool, type(None))
  159. def _merge_ssl_params(
  160. ssl: Union["SSLContext", bool, Fingerprint],
  161. verify_ssl: Optional[bool],
  162. ssl_context: Optional["SSLContext"],
  163. fingerprint: Optional[bytes],
  164. ) -> Union["SSLContext", bool, Fingerprint]:
  165. if ssl is None:
  166. ssl = True # Double check for backwards compatibility
  167. if verify_ssl is not None and not verify_ssl:
  168. warnings.warn(
  169. "verify_ssl is deprecated, use ssl=False instead",
  170. DeprecationWarning,
  171. stacklevel=3,
  172. )
  173. if ssl is not True:
  174. raise ValueError(
  175. "verify_ssl, ssl_context, fingerprint and ssl "
  176. "parameters are mutually exclusive"
  177. )
  178. else:
  179. ssl = False
  180. if ssl_context is not None:
  181. warnings.warn(
  182. "ssl_context is deprecated, use ssl=context instead",
  183. DeprecationWarning,
  184. stacklevel=3,
  185. )
  186. if ssl is not True:
  187. raise ValueError(
  188. "verify_ssl, ssl_context, fingerprint and ssl "
  189. "parameters are mutually exclusive"
  190. )
  191. else:
  192. ssl = ssl_context
  193. if fingerprint is not None:
  194. warnings.warn(
  195. "fingerprint is deprecated, use ssl=Fingerprint(fingerprint) instead",
  196. DeprecationWarning,
  197. stacklevel=3,
  198. )
  199. if ssl is not True:
  200. raise ValueError(
  201. "verify_ssl, ssl_context, fingerprint and ssl "
  202. "parameters are mutually exclusive"
  203. )
  204. else:
  205. ssl = Fingerprint(fingerprint)
  206. if not isinstance(ssl, SSL_ALLOWED_TYPES):
  207. raise TypeError(
  208. "ssl should be SSLContext, bool, Fingerprint or None, "
  209. "got {!r} instead.".format(ssl)
  210. )
  211. return ssl
  212. _SSL_SCHEMES = frozenset(("https", "wss"))
  213. # ConnectionKey is a NamedTuple because it is used as a key in a dict
  214. # and a set in the connector. Since a NamedTuple is a tuple it uses
  215. # the fast native tuple __hash__ and __eq__ implementation in CPython.
  216. class ConnectionKey(NamedTuple):
  217. # the key should contain an information about used proxy / TLS
  218. # to prevent reusing wrong connections from a pool
  219. host: str
  220. port: Optional[int]
  221. is_ssl: bool
  222. ssl: Union[SSLContext, bool, Fingerprint]
  223. proxy: Optional[URL]
  224. proxy_auth: Optional[BasicAuth]
  225. proxy_headers_hash: Optional[int] # hash(CIMultiDict)
  226. def _is_expected_content_type(
  227. response_content_type: str, expected_content_type: str
  228. ) -> bool:
  229. if expected_content_type == "application/json":
  230. return json_re.match(response_content_type) is not None
  231. return expected_content_type in response_content_type
  232. def _warn_if_unclosed_payload(payload: payload.Payload, stacklevel: int = 2) -> None:
  233. """Warn if the payload is not closed.
  234. Callers must check that the body is a Payload before calling this method.
  235. Args:
  236. payload: The payload to check
  237. stacklevel: Stack level for the warning (default 2 for direct callers)
  238. """
  239. if not payload.autoclose and not payload.consumed:
  240. warnings.warn(
  241. "The previous request body contains unclosed resources. "
  242. "Use await request.update_body() instead of setting request.body "
  243. "directly to properly close resources and avoid leaks.",
  244. ResourceWarning,
  245. stacklevel=stacklevel,
  246. )
  247. class ClientResponse(HeadersMixin):
  248. # Some of these attributes are None when created,
  249. # but will be set by the start() method.
  250. # As the end user will likely never see the None values, we cheat the types below.
  251. # from the Status-Line of the response
  252. version: Optional[HttpVersion] = None # HTTP-Version
  253. status: int = None # type: ignore[assignment] # Status-Code
  254. reason: Optional[str] = None # Reason-Phrase
  255. content: StreamReader = None # type: ignore[assignment] # Payload stream
  256. _body: Optional[bytes] = None
  257. _headers: CIMultiDictProxy[str] = None # type: ignore[assignment]
  258. _history: Tuple["ClientResponse", ...] = ()
  259. _raw_headers: RawHeaders = None # type: ignore[assignment]
  260. _connection: Optional["Connection"] = None # current connection
  261. _cookies: Optional[SimpleCookie] = None
  262. _raw_cookie_headers: Optional[Tuple[str, ...]] = None
  263. _continue: Optional["asyncio.Future[bool]"] = None
  264. _source_traceback: Optional[traceback.StackSummary] = None
  265. _session: Optional["ClientSession"] = None
  266. # set up by ClientRequest after ClientResponse object creation
  267. # post-init stage allows to not change ctor signature
  268. _closed = True # to allow __del__ for non-initialized properly response
  269. _released = False
  270. _in_context = False
  271. _resolve_charset: Callable[["ClientResponse", bytes], str] = lambda *_: "utf-8"
  272. __writer: Optional["asyncio.Task[None]"] = None
  273. def __init__(
  274. self,
  275. method: str,
  276. url: URL,
  277. *,
  278. writer: "Optional[asyncio.Task[None]]",
  279. continue100: Optional["asyncio.Future[bool]"],
  280. timer: BaseTimerContext,
  281. request_info: RequestInfo,
  282. traces: List["Trace"],
  283. loop: asyncio.AbstractEventLoop,
  284. session: "ClientSession",
  285. ) -> None:
  286. # URL forbids subclasses, so a simple type check is enough.
  287. assert type(url) is URL
  288. self.method = method
  289. self._real_url = url
  290. self._url = url.with_fragment(None) if url.raw_fragment else url
  291. if writer is not None:
  292. self._writer = writer
  293. if continue100 is not None:
  294. self._continue = continue100
  295. self._request_info = request_info
  296. self._timer = timer if timer is not None else TimerNoop()
  297. self._cache: Dict[str, Any] = {}
  298. self._traces = traces
  299. self._loop = loop
  300. # Save reference to _resolve_charset, so that get_encoding() will still
  301. # work after the response has finished reading the body.
  302. # TODO: Fix session=None in tests (see ClientRequest.__init__).
  303. if session is not None:
  304. # store a reference to session #1985
  305. self._session = session
  306. self._resolve_charset = session._resolve_charset
  307. if loop.get_debug():
  308. self._source_traceback = traceback.extract_stack(sys._getframe(1))
  309. def __reset_writer(self, _: object = None) -> None:
  310. self.__writer = None
  311. @property
  312. def _writer(self) -> Optional["asyncio.Task[None]"]:
  313. """The writer task for streaming data.
  314. _writer is only provided for backwards compatibility
  315. for subclasses that may need to access it.
  316. """
  317. return self.__writer
  318. @_writer.setter
  319. def _writer(self, writer: Optional["asyncio.Task[None]"]) -> None:
  320. """Set the writer task for streaming data."""
  321. if self.__writer is not None:
  322. self.__writer.remove_done_callback(self.__reset_writer)
  323. self.__writer = writer
  324. if writer is None:
  325. return
  326. if writer.done():
  327. # The writer is already done, so we can clear it immediately.
  328. self.__writer = None
  329. else:
  330. writer.add_done_callback(self.__reset_writer)
  331. @property
  332. def cookies(self) -> SimpleCookie:
  333. if self._cookies is None:
  334. if self._raw_cookie_headers is not None:
  335. # Parse cookies for response.cookies (SimpleCookie for backward compatibility)
  336. cookies = SimpleCookie()
  337. # Use parse_set_cookie_headers for more lenient parsing that handles
  338. # malformed cookies better than SimpleCookie.load
  339. cookies.update(parse_set_cookie_headers(self._raw_cookie_headers))
  340. self._cookies = cookies
  341. else:
  342. self._cookies = SimpleCookie()
  343. return self._cookies
  344. @cookies.setter
  345. def cookies(self, cookies: SimpleCookie) -> None:
  346. self._cookies = cookies
  347. # Generate raw cookie headers from the SimpleCookie
  348. if cookies:
  349. self._raw_cookie_headers = tuple(
  350. morsel.OutputString() for morsel in cookies.values()
  351. )
  352. else:
  353. self._raw_cookie_headers = None
  354. @reify
  355. def url(self) -> URL:
  356. return self._url
  357. @reify
  358. def url_obj(self) -> URL:
  359. warnings.warn("Deprecated, use .url #1654", DeprecationWarning, stacklevel=2)
  360. return self._url
  361. @reify
  362. def real_url(self) -> URL:
  363. return self._real_url
  364. @reify
  365. def host(self) -> str:
  366. assert self._url.host is not None
  367. return self._url.host
  368. @reify
  369. def headers(self) -> "CIMultiDictProxy[str]":
  370. return self._headers
  371. @reify
  372. def raw_headers(self) -> RawHeaders:
  373. return self._raw_headers
  374. @reify
  375. def request_info(self) -> RequestInfo:
  376. return self._request_info
  377. @reify
  378. def content_disposition(self) -> Optional[ContentDisposition]:
  379. raw = self._headers.get(hdrs.CONTENT_DISPOSITION)
  380. if raw is None:
  381. return None
  382. disposition_type, params_dct = multipart.parse_content_disposition(raw)
  383. params = MappingProxyType(params_dct)
  384. filename = multipart.content_disposition_filename(params)
  385. return ContentDisposition(disposition_type, params, filename)
  386. def __del__(self, _warnings: Any = warnings) -> None:
  387. if self._closed:
  388. return
  389. if self._connection is not None:
  390. self._connection.release()
  391. self._cleanup_writer()
  392. if self._loop.get_debug():
  393. kwargs = {"source": self}
  394. _warnings.warn(f"Unclosed response {self!r}", ResourceWarning, **kwargs)
  395. context = {"client_response": self, "message": "Unclosed response"}
  396. if self._source_traceback:
  397. context["source_traceback"] = self._source_traceback
  398. self._loop.call_exception_handler(context)
  399. def __repr__(self) -> str:
  400. out = io.StringIO()
  401. ascii_encodable_url = str(self.url)
  402. if self.reason:
  403. ascii_encodable_reason = self.reason.encode(
  404. "ascii", "backslashreplace"
  405. ).decode("ascii")
  406. else:
  407. ascii_encodable_reason = "None"
  408. print(
  409. "<ClientResponse({}) [{} {}]>".format(
  410. ascii_encodable_url, self.status, ascii_encodable_reason
  411. ),
  412. file=out,
  413. )
  414. print(self.headers, file=out)
  415. return out.getvalue()
  416. @property
  417. def connection(self) -> Optional["Connection"]:
  418. return self._connection
  419. @reify
  420. def history(self) -> Tuple["ClientResponse", ...]:
  421. """A sequence of of responses, if redirects occurred."""
  422. return self._history
  423. @reify
  424. def links(self) -> "MultiDictProxy[MultiDictProxy[Union[str, URL]]]":
  425. links_str = ", ".join(self.headers.getall("link", []))
  426. if not links_str:
  427. return MultiDictProxy(MultiDict())
  428. links: MultiDict[MultiDictProxy[Union[str, URL]]] = MultiDict()
  429. for val in re.split(r",(?=\s*<)", links_str):
  430. match = re.match(r"\s*<(.*)>(.*)", val)
  431. if match is None: # pragma: no cover
  432. # the check exists to suppress mypy error
  433. continue
  434. url, params_str = match.groups()
  435. params = params_str.split(";")[1:]
  436. link: MultiDict[Union[str, URL]] = MultiDict()
  437. for param in params:
  438. match = re.match(r"^\s*(\S*)\s*=\s*(['\"]?)(.*?)(\2)\s*$", param, re.M)
  439. if match is None: # pragma: no cover
  440. # the check exists to suppress mypy error
  441. continue
  442. key, _, value, _ = match.groups()
  443. link.add(key, value)
  444. key = link.get("rel", url)
  445. link.add("url", self.url.join(URL(url)))
  446. links.add(str(key), MultiDictProxy(link))
  447. return MultiDictProxy(links)
  448. async def start(self, connection: "Connection") -> "ClientResponse":
  449. """Start response processing."""
  450. self._closed = False
  451. self._protocol = connection.protocol
  452. self._connection = connection
  453. with self._timer:
  454. while True:
  455. # read response
  456. try:
  457. protocol = self._protocol
  458. message, payload = await protocol.read() # type: ignore[union-attr]
  459. except http.HttpProcessingError as exc:
  460. raise ClientResponseError(
  461. self.request_info,
  462. self.history,
  463. status=exc.code,
  464. message=exc.message,
  465. headers=exc.headers,
  466. ) from exc
  467. if message.code < 100 or message.code > 199 or message.code == 101:
  468. break
  469. if self._continue is not None:
  470. set_result(self._continue, True)
  471. self._continue = None
  472. # payload eof handler
  473. payload.on_eof(self._response_eof)
  474. # response status
  475. self.version = message.version
  476. self.status = message.code
  477. self.reason = message.reason
  478. # headers
  479. self._headers = message.headers # type is CIMultiDictProxy
  480. self._raw_headers = message.raw_headers # type is Tuple[bytes, bytes]
  481. # payload
  482. self.content = payload
  483. # cookies
  484. if cookie_hdrs := self.headers.getall(hdrs.SET_COOKIE, ()):
  485. # Store raw cookie headers for CookieJar
  486. self._raw_cookie_headers = tuple(cookie_hdrs)
  487. return self
  488. def _response_eof(self) -> None:
  489. if self._closed:
  490. return
  491. # protocol could be None because connection could be detached
  492. protocol = self._connection and self._connection.protocol
  493. if protocol is not None and protocol.upgraded:
  494. return
  495. self._closed = True
  496. self._cleanup_writer()
  497. self._release_connection()
  498. @property
  499. def closed(self) -> bool:
  500. return self._closed
  501. def close(self) -> None:
  502. if not self._released:
  503. self._notify_content()
  504. self._closed = True
  505. if self._loop is None or self._loop.is_closed():
  506. return
  507. self._cleanup_writer()
  508. if self._connection is not None:
  509. self._connection.close()
  510. self._connection = None
  511. def release(self) -> Any:
  512. if not self._released:
  513. self._notify_content()
  514. self._closed = True
  515. self._cleanup_writer()
  516. self._release_connection()
  517. return noop()
  518. @property
  519. def ok(self) -> bool:
  520. """Returns ``True`` if ``status`` is less than ``400``, ``False`` if not.
  521. This is **not** a check for ``200 OK`` but a check that the response
  522. status is under 400.
  523. """
  524. return 400 > self.status
  525. def raise_for_status(self) -> None:
  526. if not self.ok:
  527. # reason should always be not None for a started response
  528. assert self.reason is not None
  529. # If we're in a context we can rely on __aexit__() to release as the
  530. # exception propagates.
  531. if not self._in_context:
  532. self.release()
  533. raise ClientResponseError(
  534. self.request_info,
  535. self.history,
  536. status=self.status,
  537. message=self.reason,
  538. headers=self.headers,
  539. )
  540. def _release_connection(self) -> None:
  541. if self._connection is not None:
  542. if self.__writer is None:
  543. self._connection.release()
  544. self._connection = None
  545. else:
  546. self.__writer.add_done_callback(lambda f: self._release_connection())
  547. async def _wait_released(self) -> None:
  548. if self.__writer is not None:
  549. try:
  550. await self.__writer
  551. except asyncio.CancelledError:
  552. if (
  553. sys.version_info >= (3, 11)
  554. and (task := asyncio.current_task())
  555. and task.cancelling()
  556. ):
  557. raise
  558. self._release_connection()
  559. def _cleanup_writer(self) -> None:
  560. if self.__writer is not None:
  561. self.__writer.cancel()
  562. self._session = None
  563. def _notify_content(self) -> None:
  564. content = self.content
  565. if content and content.exception() is None:
  566. set_exception(content, _CONNECTION_CLOSED_EXCEPTION)
  567. self._released = True
  568. async def wait_for_close(self) -> None:
  569. if self.__writer is not None:
  570. try:
  571. await self.__writer
  572. except asyncio.CancelledError:
  573. if (
  574. sys.version_info >= (3, 11)
  575. and (task := asyncio.current_task())
  576. and task.cancelling()
  577. ):
  578. raise
  579. self.release()
  580. async def read(self) -> bytes:
  581. """Read response payload."""
  582. if self._body is None:
  583. try:
  584. self._body = await self.content.read()
  585. for trace in self._traces:
  586. await trace.send_response_chunk_received(
  587. self.method, self.url, self._body
  588. )
  589. except BaseException:
  590. self.close()
  591. raise
  592. elif self._released: # Response explicitly released
  593. raise ClientConnectionError("Connection closed")
  594. protocol = self._connection and self._connection.protocol
  595. if protocol is None or not protocol.upgraded:
  596. await self._wait_released() # Underlying connection released
  597. return self._body
  598. def get_encoding(self) -> str:
  599. ctype = self.headers.get(hdrs.CONTENT_TYPE, "").lower()
  600. mimetype = helpers.parse_mimetype(ctype)
  601. encoding = mimetype.parameters.get("charset")
  602. if encoding:
  603. with contextlib.suppress(LookupError, ValueError):
  604. return codecs.lookup(encoding).name
  605. if mimetype.type == "application" and (
  606. mimetype.subtype == "json" or mimetype.subtype == "rdap"
  607. ):
  608. # RFC 7159 states that the default encoding is UTF-8.
  609. # RFC 7483 defines application/rdap+json
  610. return "utf-8"
  611. if self._body is None:
  612. raise RuntimeError(
  613. "Cannot compute fallback encoding of a not yet read body"
  614. )
  615. return self._resolve_charset(self, self._body)
  616. async def text(self, encoding: Optional[str] = None, errors: str = "strict") -> str:
  617. """Read response payload and decode."""
  618. if self._body is None:
  619. await self.read()
  620. if encoding is None:
  621. encoding = self.get_encoding()
  622. return self._body.decode(encoding, errors=errors) # type: ignore[union-attr]
  623. async def json(
  624. self,
  625. *,
  626. encoding: Optional[str] = None,
  627. loads: JSONDecoder = DEFAULT_JSON_DECODER,
  628. content_type: Optional[str] = "application/json",
  629. ) -> Any:
  630. """Read and decodes JSON response."""
  631. if self._body is None:
  632. await self.read()
  633. if content_type:
  634. ctype = self.headers.get(hdrs.CONTENT_TYPE, "").lower()
  635. if not _is_expected_content_type(ctype, content_type):
  636. raise ContentTypeError(
  637. self.request_info,
  638. self.history,
  639. status=self.status,
  640. message=(
  641. "Attempt to decode JSON with unexpected mimetype: %s" % ctype
  642. ),
  643. headers=self.headers,
  644. )
  645. stripped = self._body.strip() # type: ignore[union-attr]
  646. if not stripped:
  647. return None
  648. if encoding is None:
  649. encoding = self.get_encoding()
  650. return loads(stripped.decode(encoding))
  651. async def __aenter__(self) -> "ClientResponse":
  652. self._in_context = True
  653. return self
  654. async def __aexit__(
  655. self,
  656. exc_type: Optional[Type[BaseException]],
  657. exc_val: Optional[BaseException],
  658. exc_tb: Optional[TracebackType],
  659. ) -> None:
  660. self._in_context = False
  661. # similar to _RequestContextManager, we do not need to check
  662. # for exceptions, response object can close connection
  663. # if state is broken
  664. self.release()
  665. await self.wait_for_close()
  666. class ClientRequest:
  667. GET_METHODS = {
  668. hdrs.METH_GET,
  669. hdrs.METH_HEAD,
  670. hdrs.METH_OPTIONS,
  671. hdrs.METH_TRACE,
  672. }
  673. POST_METHODS = {hdrs.METH_PATCH, hdrs.METH_POST, hdrs.METH_PUT}
  674. ALL_METHODS = GET_METHODS.union(POST_METHODS).union({hdrs.METH_DELETE})
  675. DEFAULT_HEADERS = {
  676. hdrs.ACCEPT: "*/*",
  677. hdrs.ACCEPT_ENCODING: _gen_default_accept_encoding(),
  678. }
  679. # Type of body depends on PAYLOAD_REGISTRY, which is dynamic.
  680. _body: Union[None, payload.Payload] = None
  681. auth = None
  682. response = None
  683. __writer: Optional["asyncio.Task[None]"] = None # async task for streaming data
  684. # These class defaults help create_autospec() work correctly.
  685. # If autospec is improved in future, maybe these can be removed.
  686. url = URL()
  687. method = "GET"
  688. _continue = None # waiter future for '100 Continue' response
  689. _skip_auto_headers: Optional["CIMultiDict[None]"] = None
  690. # N.B.
  691. # Adding __del__ method with self._writer closing doesn't make sense
  692. # because _writer is instance method, thus it keeps a reference to self.
  693. # Until writer has finished finalizer will not be called.
  694. def __init__(
  695. self,
  696. method: str,
  697. url: URL,
  698. *,
  699. params: Query = None,
  700. headers: Optional[LooseHeaders] = None,
  701. skip_auto_headers: Optional[Iterable[str]] = None,
  702. data: Any = None,
  703. cookies: Optional[LooseCookies] = None,
  704. auth: Optional[BasicAuth] = None,
  705. version: http.HttpVersion = http.HttpVersion11,
  706. compress: Union[str, bool, None] = None,
  707. chunked: Optional[bool] = None,
  708. expect100: bool = False,
  709. loop: Optional[asyncio.AbstractEventLoop] = None,
  710. response_class: Optional[Type["ClientResponse"]] = None,
  711. proxy: Optional[URL] = None,
  712. proxy_auth: Optional[BasicAuth] = None,
  713. timer: Optional[BaseTimerContext] = None,
  714. session: Optional["ClientSession"] = None,
  715. ssl: Union[SSLContext, bool, Fingerprint] = True,
  716. proxy_headers: Optional[LooseHeaders] = None,
  717. traces: Optional[List["Trace"]] = None,
  718. trust_env: bool = False,
  719. server_hostname: Optional[str] = None,
  720. ):
  721. if loop is None:
  722. loop = asyncio.get_event_loop()
  723. if match := _CONTAINS_CONTROL_CHAR_RE.search(method):
  724. raise ValueError(
  725. f"Method cannot contain non-token characters {method!r} "
  726. f"(found at least {match.group()!r})"
  727. )
  728. # URL forbids subclasses, so a simple type check is enough.
  729. assert type(url) is URL, url
  730. if proxy is not None:
  731. assert type(proxy) is URL, proxy
  732. # FIXME: session is None in tests only, need to fix tests
  733. # assert session is not None
  734. if TYPE_CHECKING:
  735. assert session is not None
  736. self._session = session
  737. if params:
  738. url = url.extend_query(params)
  739. self.original_url = url
  740. self.url = url.with_fragment(None) if url.raw_fragment else url
  741. self.method = method.upper()
  742. self.chunked = chunked
  743. self.compress = compress
  744. self.loop = loop
  745. self.length = None
  746. if response_class is None:
  747. real_response_class = ClientResponse
  748. else:
  749. real_response_class = response_class
  750. self.response_class: Type[ClientResponse] = real_response_class
  751. self._timer = timer if timer is not None else TimerNoop()
  752. self._ssl = ssl if ssl is not None else True
  753. self.server_hostname = server_hostname
  754. if loop.get_debug():
  755. self._source_traceback = traceback.extract_stack(sys._getframe(1))
  756. self.update_version(version)
  757. self.update_host(url)
  758. self.update_headers(headers)
  759. self.update_auto_headers(skip_auto_headers)
  760. self.update_cookies(cookies)
  761. self.update_content_encoding(data)
  762. self.update_auth(auth, trust_env)
  763. self.update_proxy(proxy, proxy_auth, proxy_headers)
  764. self.update_body_from_data(data)
  765. if data is not None or self.method not in self.GET_METHODS:
  766. self.update_transfer_encoding()
  767. self.update_expect_continue(expect100)
  768. self._traces = [] if traces is None else traces
  769. def __reset_writer(self, _: object = None) -> None:
  770. self.__writer = None
  771. def _get_content_length(self) -> Optional[int]:
  772. """Extract and validate Content-Length header value.
  773. Returns parsed Content-Length value or None if not set.
  774. Raises ValueError if header exists but cannot be parsed as an integer.
  775. """
  776. if hdrs.CONTENT_LENGTH not in self.headers:
  777. return None
  778. content_length_hdr = self.headers[hdrs.CONTENT_LENGTH]
  779. try:
  780. return int(content_length_hdr)
  781. except ValueError:
  782. raise ValueError(
  783. f"Invalid Content-Length header: {content_length_hdr}"
  784. ) from None
  785. @property
  786. def skip_auto_headers(self) -> CIMultiDict[None]:
  787. return self._skip_auto_headers or CIMultiDict()
  788. @property
  789. def _writer(self) -> Optional["asyncio.Task[None]"]:
  790. return self.__writer
  791. @_writer.setter
  792. def _writer(self, writer: "asyncio.Task[None]") -> None:
  793. if self.__writer is not None:
  794. self.__writer.remove_done_callback(self.__reset_writer)
  795. self.__writer = writer
  796. writer.add_done_callback(self.__reset_writer)
  797. def is_ssl(self) -> bool:
  798. return self.url.scheme in _SSL_SCHEMES
  799. @property
  800. def ssl(self) -> Union["SSLContext", bool, Fingerprint]:
  801. return self._ssl
  802. @property
  803. def connection_key(self) -> ConnectionKey:
  804. if proxy_headers := self.proxy_headers:
  805. h: Optional[int] = hash(tuple(proxy_headers.items()))
  806. else:
  807. h = None
  808. url = self.url
  809. return tuple.__new__(
  810. ConnectionKey,
  811. (
  812. url.raw_host or "",
  813. url.port,
  814. url.scheme in _SSL_SCHEMES,
  815. self._ssl,
  816. self.proxy,
  817. self.proxy_auth,
  818. h,
  819. ),
  820. )
  821. @property
  822. def host(self) -> str:
  823. ret = self.url.raw_host
  824. assert ret is not None
  825. return ret
  826. @property
  827. def port(self) -> Optional[int]:
  828. return self.url.port
  829. @property
  830. def body(self) -> Union[payload.Payload, Literal[b""]]:
  831. """Request body."""
  832. # empty body is represented as bytes for backwards compatibility
  833. return self._body or b""
  834. @body.setter
  835. def body(self, value: Any) -> None:
  836. """Set request body with warning for non-autoclose payloads.
  837. WARNING: This setter must be called from within an event loop and is not
  838. thread-safe. Setting body outside of an event loop may raise RuntimeError
  839. when closing file-based payloads.
  840. DEPRECATED: Direct assignment to body is deprecated and will be removed
  841. in a future version. Use await update_body() instead for proper resource
  842. management.
  843. """
  844. # Close existing payload if present
  845. if self._body is not None:
  846. # Warn if the payload needs manual closing
  847. # stacklevel=3: user code -> body setter -> _warn_if_unclosed_payload
  848. _warn_if_unclosed_payload(self._body, stacklevel=3)
  849. # NOTE: In the future, when we remove sync close support,
  850. # this setter will need to be removed and only the async
  851. # update_body() method will be available. For now, we call
  852. # _close() for backwards compatibility.
  853. self._body._close()
  854. self._update_body(value)
  855. @property
  856. def request_info(self) -> RequestInfo:
  857. headers: CIMultiDictProxy[str] = CIMultiDictProxy(self.headers)
  858. # These are created on every request, so we use a NamedTuple
  859. # for performance reasons. We don't use the RequestInfo.__new__
  860. # method because it has a different signature which is provided
  861. # for backwards compatibility only.
  862. return tuple.__new__(
  863. RequestInfo, (self.url, self.method, headers, self.original_url)
  864. )
  865. @property
  866. def session(self) -> "ClientSession":
  867. """Return the ClientSession instance.
  868. This property provides access to the ClientSession that initiated
  869. this request, allowing middleware to make additional requests
  870. using the same session.
  871. """
  872. return self._session
  873. def update_host(self, url: URL) -> None:
  874. """Update destination host, port and connection type (ssl)."""
  875. # get host/port
  876. if not url.raw_host:
  877. raise InvalidURL(url)
  878. # basic auth info
  879. if url.raw_user or url.raw_password:
  880. self.auth = helpers.BasicAuth(url.user or "", url.password or "")
  881. def update_version(self, version: Union[http.HttpVersion, str]) -> None:
  882. """Convert request version to two elements tuple.
  883. parser HTTP version '1.1' => (1, 1)
  884. """
  885. if isinstance(version, str):
  886. v = [part.strip() for part in version.split(".", 1)]
  887. try:
  888. version = http.HttpVersion(int(v[0]), int(v[1]))
  889. except ValueError:
  890. raise ValueError(
  891. f"Can not parse http version number: {version}"
  892. ) from None
  893. self.version = version
  894. def update_headers(self, headers: Optional[LooseHeaders]) -> None:
  895. """Update request headers."""
  896. self.headers: CIMultiDict[str] = CIMultiDict()
  897. # Build the host header
  898. host = self.url.host_port_subcomponent
  899. # host_port_subcomponent is None when the URL is a relative URL.
  900. # but we know we do not have a relative URL here.
  901. assert host is not None
  902. self.headers[hdrs.HOST] = host
  903. if not headers:
  904. return
  905. if isinstance(headers, (dict, MultiDictProxy, MultiDict)):
  906. headers = headers.items()
  907. for key, value in headers: # type: ignore[misc]
  908. # A special case for Host header
  909. if key in hdrs.HOST_ALL:
  910. self.headers[key] = value
  911. else:
  912. self.headers.add(key, value)
  913. def update_auto_headers(self, skip_auto_headers: Optional[Iterable[str]]) -> None:
  914. if skip_auto_headers is not None:
  915. self._skip_auto_headers = CIMultiDict(
  916. (hdr, None) for hdr in sorted(skip_auto_headers)
  917. )
  918. used_headers = self.headers.copy()
  919. used_headers.extend(self._skip_auto_headers) # type: ignore[arg-type]
  920. else:
  921. # Fast path when there are no headers to skip
  922. # which is the most common case.
  923. used_headers = self.headers
  924. for hdr, val in self.DEFAULT_HEADERS.items():
  925. if hdr not in used_headers:
  926. self.headers[hdr] = val
  927. if hdrs.USER_AGENT not in used_headers:
  928. self.headers[hdrs.USER_AGENT] = SERVER_SOFTWARE
  929. def update_cookies(self, cookies: Optional[LooseCookies]) -> None:
  930. """Update request cookies header."""
  931. if not cookies:
  932. return
  933. c = SimpleCookie()
  934. if hdrs.COOKIE in self.headers:
  935. # parse_cookie_header for RFC 6265 compliant Cookie header parsing
  936. c.update(parse_cookie_header(self.headers.get(hdrs.COOKIE, "")))
  937. del self.headers[hdrs.COOKIE]
  938. if isinstance(cookies, Mapping):
  939. iter_cookies = cookies.items()
  940. else:
  941. iter_cookies = cookies # type: ignore[assignment]
  942. for name, value in iter_cookies:
  943. if isinstance(value, Morsel):
  944. # Use helper to preserve coded_value exactly as sent by server
  945. c[name] = preserve_morsel_with_coded_value(value)
  946. else:
  947. c[name] = value # type: ignore[assignment]
  948. self.headers[hdrs.COOKIE] = c.output(header="", sep=";").strip()
  949. def update_content_encoding(self, data: Any) -> None:
  950. """Set request content encoding."""
  951. if not data:
  952. # Don't compress an empty body.
  953. self.compress = None
  954. return
  955. if self.headers.get(hdrs.CONTENT_ENCODING):
  956. if self.compress:
  957. raise ValueError(
  958. "compress can not be set if Content-Encoding header is set"
  959. )
  960. elif self.compress:
  961. if not isinstance(self.compress, str):
  962. self.compress = "deflate"
  963. self.headers[hdrs.CONTENT_ENCODING] = self.compress
  964. self.chunked = True # enable chunked, no need to deal with length
  965. def update_transfer_encoding(self) -> None:
  966. """Analyze transfer-encoding header."""
  967. te = self.headers.get(hdrs.TRANSFER_ENCODING, "").lower()
  968. if "chunked" in te:
  969. if self.chunked:
  970. raise ValueError(
  971. "chunked can not be set "
  972. 'if "Transfer-Encoding: chunked" header is set'
  973. )
  974. elif self.chunked:
  975. if hdrs.CONTENT_LENGTH in self.headers:
  976. raise ValueError(
  977. "chunked can not be set if Content-Length header is set"
  978. )
  979. self.headers[hdrs.TRANSFER_ENCODING] = "chunked"
  980. def update_auth(self, auth: Optional[BasicAuth], trust_env: bool = False) -> None:
  981. """Set basic auth."""
  982. if auth is None:
  983. auth = self.auth
  984. if auth is None:
  985. return
  986. if not isinstance(auth, helpers.BasicAuth):
  987. raise TypeError("BasicAuth() tuple is required instead")
  988. self.headers[hdrs.AUTHORIZATION] = auth.encode()
  989. def update_body_from_data(self, body: Any, _stacklevel: int = 3) -> None:
  990. """Update request body from data."""
  991. if self._body is not None:
  992. _warn_if_unclosed_payload(self._body, stacklevel=_stacklevel)
  993. if body is None:
  994. self._body = None
  995. # Set Content-Length to 0 when body is None for methods that expect a body
  996. if (
  997. self.method not in self.GET_METHODS
  998. and not self.chunked
  999. and hdrs.CONTENT_LENGTH not in self.headers
  1000. ):
  1001. self.headers[hdrs.CONTENT_LENGTH] = "0"
  1002. return
  1003. # FormData
  1004. maybe_payload = body() if isinstance(body, FormData) else body
  1005. try:
  1006. body_payload = payload.PAYLOAD_REGISTRY.get(maybe_payload, disposition=None)
  1007. except payload.LookupError:
  1008. body_payload = FormData(maybe_payload)() # type: ignore[arg-type]
  1009. self._body = body_payload
  1010. # enable chunked encoding if needed
  1011. if not self.chunked and hdrs.CONTENT_LENGTH not in self.headers:
  1012. if (size := body_payload.size) is not None:
  1013. self.headers[hdrs.CONTENT_LENGTH] = str(size)
  1014. else:
  1015. self.chunked = True
  1016. # copy payload headers
  1017. assert body_payload.headers
  1018. headers = self.headers
  1019. skip_headers = self._skip_auto_headers
  1020. for key, value in body_payload.headers.items():
  1021. if key in headers or (skip_headers is not None and key in skip_headers):
  1022. continue
  1023. headers[key] = value
  1024. def _update_body(self, body: Any) -> None:
  1025. """Update request body after its already been set."""
  1026. # Remove existing Content-Length header since body is changing
  1027. if hdrs.CONTENT_LENGTH in self.headers:
  1028. del self.headers[hdrs.CONTENT_LENGTH]
  1029. # Remove existing Transfer-Encoding header to avoid conflicts
  1030. if self.chunked and hdrs.TRANSFER_ENCODING in self.headers:
  1031. del self.headers[hdrs.TRANSFER_ENCODING]
  1032. # Now update the body using the existing method
  1033. # Called from _update_body, add 1 to stacklevel from caller
  1034. self.update_body_from_data(body, _stacklevel=4)
  1035. # Update transfer encoding headers if needed (same logic as __init__)
  1036. if body is not None or self.method not in self.GET_METHODS:
  1037. self.update_transfer_encoding()
  1038. async def update_body(self, body: Any) -> None:
  1039. """
  1040. Update request body and close previous payload if needed.
  1041. This method safely updates the request body by first closing any existing
  1042. payload to prevent resource leaks, then setting the new body.
  1043. IMPORTANT: Always use this method instead of setting request.body directly.
  1044. Direct assignment to request.body will leak resources if the previous body
  1045. contains file handles, streams, or other resources that need cleanup.
  1046. Args:
  1047. body: The new body content. Can be:
  1048. - bytes/bytearray: Raw binary data
  1049. - str: Text data (will be encoded using charset from Content-Type)
  1050. - FormData: Form data that will be encoded as multipart/form-data
  1051. - Payload: A pre-configured payload object
  1052. - AsyncIterable: An async iterable of bytes chunks
  1053. - File-like object: Will be read and sent as binary data
  1054. - None: Clears the body
  1055. Usage:
  1056. # CORRECT: Use update_body
  1057. await request.update_body(b"new request data")
  1058. # WRONG: Don't set body directly
  1059. # request.body = b"new request data" # This will leak resources!
  1060. # Update with form data
  1061. form_data = FormData()
  1062. form_data.add_field('field', 'value')
  1063. await request.update_body(form_data)
  1064. # Clear body
  1065. await request.update_body(None)
  1066. Note:
  1067. This method is async because it may need to close file handles or
  1068. other resources associated with the previous payload. Always await
  1069. this method to ensure proper cleanup.
  1070. Warning:
  1071. Setting request.body directly is highly discouraged and can lead to:
  1072. - Resource leaks (unclosed file handles, streams)
  1073. - Memory leaks (unreleased buffers)
  1074. - Unexpected behavior with streaming payloads
  1075. It is not recommended to change the payload type in middleware. If the
  1076. body was already set (e.g., as bytes), it's best to keep the same type
  1077. rather than converting it (e.g., to str) as this may result in unexpected
  1078. behavior.
  1079. See Also:
  1080. - update_body_from_data: Synchronous body update without cleanup
  1081. - body property: Direct body access (STRONGLY DISCOURAGED)
  1082. """
  1083. # Close existing payload if it exists and needs closing
  1084. if self._body is not None:
  1085. await self._body.close()
  1086. self._update_body(body)
  1087. def update_expect_continue(self, expect: bool = False) -> None:
  1088. if expect:
  1089. self.headers[hdrs.EXPECT] = "100-continue"
  1090. elif (
  1091. hdrs.EXPECT in self.headers
  1092. and self.headers[hdrs.EXPECT].lower() == "100-continue"
  1093. ):
  1094. expect = True
  1095. if expect:
  1096. self._continue = self.loop.create_future()
  1097. def update_proxy(
  1098. self,
  1099. proxy: Optional[URL],
  1100. proxy_auth: Optional[BasicAuth],
  1101. proxy_headers: Optional[LooseHeaders],
  1102. ) -> None:
  1103. self.proxy = proxy
  1104. if proxy is None:
  1105. self.proxy_auth = None
  1106. self.proxy_headers = None
  1107. return
  1108. if proxy_auth and not isinstance(proxy_auth, helpers.BasicAuth):
  1109. raise ValueError("proxy_auth must be None or BasicAuth() tuple")
  1110. self.proxy_auth = proxy_auth
  1111. if proxy_headers is not None and not isinstance(
  1112. proxy_headers, (MultiDict, MultiDictProxy)
  1113. ):
  1114. proxy_headers = CIMultiDict(proxy_headers)
  1115. self.proxy_headers = proxy_headers
  1116. async def write_bytes(
  1117. self,
  1118. writer: AbstractStreamWriter,
  1119. conn: "Connection",
  1120. content_length: Optional[int] = None,
  1121. ) -> None:
  1122. """
  1123. Write the request body to the connection stream.
  1124. This method handles writing different types of request bodies:
  1125. 1. Payload objects (using their specialized write_with_length method)
  1126. 2. Bytes/bytearray objects
  1127. 3. Iterable body content
  1128. Args:
  1129. writer: The stream writer to write the body to
  1130. conn: The connection being used for this request
  1131. content_length: Optional maximum number of bytes to write from the body
  1132. (None means write the entire body)
  1133. The method properly handles:
  1134. - Waiting for 100-Continue responses if required
  1135. - Content length constraints for chunked encoding
  1136. - Error handling for network issues, cancellation, and other exceptions
  1137. - Signaling EOF and timeout management
  1138. Raises:
  1139. ClientOSError: When there's an OS-level error writing the body
  1140. ClientConnectionError: When there's a general connection error
  1141. asyncio.CancelledError: When the operation is cancelled
  1142. """
  1143. # 100 response
  1144. if self._continue is not None:
  1145. # Force headers to be sent before waiting for 100-continue
  1146. writer.send_headers()
  1147. await writer.drain()
  1148. await self._continue
  1149. protocol = conn.protocol
  1150. assert protocol is not None
  1151. try:
  1152. # This should be a rare case but the
  1153. # self._body can be set to None while
  1154. # the task is being started or we wait above
  1155. # for the 100-continue response.
  1156. # The more likely case is we have an empty
  1157. # payload, but 100-continue is still expected.
  1158. if self._body is not None:
  1159. await self._body.write_with_length(writer, content_length)
  1160. except OSError as underlying_exc:
  1161. reraised_exc = underlying_exc
  1162. # Distinguish between timeout and other OS errors for better error reporting
  1163. exc_is_not_timeout = underlying_exc.errno is not None or not isinstance(
  1164. underlying_exc, asyncio.TimeoutError
  1165. )
  1166. if exc_is_not_timeout:
  1167. reraised_exc = ClientOSError(
  1168. underlying_exc.errno,
  1169. f"Can not write request body for {self.url !s}",
  1170. )
  1171. set_exception(protocol, reraised_exc, underlying_exc)
  1172. except asyncio.CancelledError:
  1173. # Body hasn't been fully sent, so connection can't be reused
  1174. conn.close()
  1175. raise
  1176. except Exception as underlying_exc:
  1177. set_exception(
  1178. protocol,
  1179. ClientConnectionError(
  1180. "Failed to send bytes into the underlying connection "
  1181. f"{conn !s}: {underlying_exc!r}",
  1182. ),
  1183. underlying_exc,
  1184. )
  1185. else:
  1186. # Successfully wrote the body, signal EOF and start response timeout
  1187. await writer.write_eof()
  1188. protocol.start_timeout()
  1189. async def send(self, conn: "Connection") -> "ClientResponse":
  1190. # Specify request target:
  1191. # - CONNECT request must send authority form URI
  1192. # - not CONNECT proxy must send absolute form URI
  1193. # - most common is origin form URI
  1194. if self.method == hdrs.METH_CONNECT:
  1195. connect_host = self.url.host_subcomponent
  1196. assert connect_host is not None
  1197. path = f"{connect_host}:{self.url.port}"
  1198. elif self.proxy and not self.is_ssl():
  1199. path = str(self.url)
  1200. else:
  1201. path = self.url.raw_path_qs
  1202. protocol = conn.protocol
  1203. assert protocol is not None
  1204. writer = StreamWriter(
  1205. protocol,
  1206. self.loop,
  1207. on_chunk_sent=(
  1208. functools.partial(self._on_chunk_request_sent, self.method, self.url)
  1209. if self._traces
  1210. else None
  1211. ),
  1212. on_headers_sent=(
  1213. functools.partial(self._on_headers_request_sent, self.method, self.url)
  1214. if self._traces
  1215. else None
  1216. ),
  1217. )
  1218. if self.compress:
  1219. writer.enable_compression(self.compress) # type: ignore[arg-type]
  1220. if self.chunked is not None:
  1221. writer.enable_chunking()
  1222. # set default content-type
  1223. if (
  1224. self.method in self.POST_METHODS
  1225. and (
  1226. self._skip_auto_headers is None
  1227. or hdrs.CONTENT_TYPE not in self._skip_auto_headers
  1228. )
  1229. and hdrs.CONTENT_TYPE not in self.headers
  1230. ):
  1231. self.headers[hdrs.CONTENT_TYPE] = "application/octet-stream"
  1232. v = self.version
  1233. if hdrs.CONNECTION not in self.headers:
  1234. if conn._connector.force_close:
  1235. if v == HttpVersion11:
  1236. self.headers[hdrs.CONNECTION] = "close"
  1237. elif v == HttpVersion10:
  1238. self.headers[hdrs.CONNECTION] = "keep-alive"
  1239. # status + headers
  1240. status_line = f"{self.method} {path} HTTP/{v.major}.{v.minor}"
  1241. # Buffer headers for potential coalescing with body
  1242. await writer.write_headers(status_line, self.headers)
  1243. task: Optional["asyncio.Task[None]"]
  1244. if self._body or self._continue is not None or protocol.writing_paused:
  1245. coro = self.write_bytes(writer, conn, self._get_content_length())
  1246. if sys.version_info >= (3, 12):
  1247. # Optimization for Python 3.12, try to write
  1248. # bytes immediately to avoid having to schedule
  1249. # the task on the event loop.
  1250. task = asyncio.Task(coro, loop=self.loop, eager_start=True)
  1251. else:
  1252. task = self.loop.create_task(coro)
  1253. if task.done():
  1254. task = None
  1255. else:
  1256. self._writer = task
  1257. else:
  1258. # We have nothing to write because
  1259. # - there is no body
  1260. # - the protocol does not have writing paused
  1261. # - we are not waiting for a 100-continue response
  1262. protocol.start_timeout()
  1263. writer.set_eof()
  1264. task = None
  1265. response_class = self.response_class
  1266. assert response_class is not None
  1267. self.response = response_class(
  1268. self.method,
  1269. self.original_url,
  1270. writer=task,
  1271. continue100=self._continue,
  1272. timer=self._timer,
  1273. request_info=self.request_info,
  1274. traces=self._traces,
  1275. loop=self.loop,
  1276. session=self._session,
  1277. )
  1278. return self.response
  1279. async def close(self) -> None:
  1280. if self.__writer is not None:
  1281. try:
  1282. await self.__writer
  1283. except asyncio.CancelledError:
  1284. if (
  1285. sys.version_info >= (3, 11)
  1286. and (task := asyncio.current_task())
  1287. and task.cancelling()
  1288. ):
  1289. raise
  1290. def terminate(self) -> None:
  1291. if self.__writer is not None:
  1292. if not self.loop.is_closed():
  1293. self.__writer.cancel()
  1294. self.__writer.remove_done_callback(self.__reset_writer)
  1295. self.__writer = None
  1296. async def _on_chunk_request_sent(self, method: str, url: URL, chunk: bytes) -> None:
  1297. for trace in self._traces:
  1298. await trace.send_request_chunk_sent(method, url, chunk)
  1299. async def _on_headers_request_sent(
  1300. self, method: str, url: URL, headers: "CIMultiDict[str]"
  1301. ) -> None:
  1302. for trace in self._traces:
  1303. await trace.send_request_headers(method, url, headers)