connector.py 67 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842
  1. import asyncio
  2. import functools
  3. import random
  4. import socket
  5. import sys
  6. import traceback
  7. import warnings
  8. from collections import OrderedDict, defaultdict, deque
  9. from contextlib import suppress
  10. from http import HTTPStatus
  11. from itertools import chain, cycle, islice
  12. from time import monotonic
  13. from types import TracebackType
  14. from typing import (
  15. TYPE_CHECKING,
  16. Any,
  17. Awaitable,
  18. Callable,
  19. DefaultDict,
  20. Deque,
  21. Dict,
  22. Iterator,
  23. List,
  24. Literal,
  25. Optional,
  26. Sequence,
  27. Set,
  28. Tuple,
  29. Type,
  30. Union,
  31. cast,
  32. )
  33. import aiohappyeyeballs
  34. from aiohappyeyeballs import AddrInfoType, SocketFactoryType
  35. from . import hdrs, helpers
  36. from .abc import AbstractResolver, ResolveResult
  37. from .client_exceptions import (
  38. ClientConnectionError,
  39. ClientConnectorCertificateError,
  40. ClientConnectorDNSError,
  41. ClientConnectorError,
  42. ClientConnectorSSLError,
  43. ClientHttpProxyError,
  44. ClientProxyConnectionError,
  45. ServerFingerprintMismatch,
  46. UnixClientConnectorError,
  47. cert_errors,
  48. ssl_errors,
  49. )
  50. from .client_proto import ResponseHandler
  51. from .client_reqrep import ClientRequest, Fingerprint, _merge_ssl_params
  52. from .helpers import (
  53. _SENTINEL,
  54. ceil_timeout,
  55. is_ip_address,
  56. noop,
  57. sentinel,
  58. set_exception,
  59. set_result,
  60. )
  61. from .log import client_logger
  62. from .resolver import DefaultResolver
  63. if sys.version_info >= (3, 12):
  64. from collections.abc import Buffer
  65. else:
  66. Buffer = Union[bytes, bytearray, "memoryview[int]", "memoryview[bytes]"]
  67. if TYPE_CHECKING:
  68. import ssl
  69. SSLContext = ssl.SSLContext
  70. else:
  71. try:
  72. import ssl
  73. SSLContext = ssl.SSLContext
  74. except ImportError: # pragma: no cover
  75. ssl = None # type: ignore[assignment]
  76. SSLContext = object # type: ignore[misc,assignment]
  77. EMPTY_SCHEMA_SET = frozenset({""})
  78. HTTP_SCHEMA_SET = frozenset({"http", "https"})
  79. WS_SCHEMA_SET = frozenset({"ws", "wss"})
  80. HTTP_AND_EMPTY_SCHEMA_SET = HTTP_SCHEMA_SET | EMPTY_SCHEMA_SET
  81. HIGH_LEVEL_SCHEMA_SET = HTTP_AND_EMPTY_SCHEMA_SET | WS_SCHEMA_SET
  82. NEEDS_CLEANUP_CLOSED = (3, 13, 0) <= sys.version_info < (
  83. 3,
  84. 13,
  85. 1,
  86. ) or sys.version_info < (3, 12, 7)
  87. # Cleanup closed is no longer needed after https://github.com/python/cpython/pull/118960
  88. # which first appeared in Python 3.12.7 and 3.13.1
  89. __all__ = (
  90. "BaseConnector",
  91. "TCPConnector",
  92. "UnixConnector",
  93. "NamedPipeConnector",
  94. "AddrInfoType",
  95. "SocketFactoryType",
  96. )
  97. if TYPE_CHECKING:
  98. from .client import ClientTimeout
  99. from .client_reqrep import ConnectionKey
  100. from .tracing import Trace
  101. class _DeprecationWaiter:
  102. __slots__ = ("_awaitable", "_awaited")
  103. def __init__(self, awaitable: Awaitable[Any]) -> None:
  104. self._awaitable = awaitable
  105. self._awaited = False
  106. def __await__(self) -> Any:
  107. self._awaited = True
  108. return self._awaitable.__await__()
  109. def __del__(self) -> None:
  110. if not self._awaited:
  111. warnings.warn(
  112. "Connector.close() is a coroutine, "
  113. "please use await connector.close()",
  114. DeprecationWarning,
  115. )
  116. async def _wait_for_close(waiters: List[Awaitable[object]]) -> None:
  117. """Wait for all waiters to finish closing."""
  118. results = await asyncio.gather(*waiters, return_exceptions=True)
  119. for res in results:
  120. if isinstance(res, Exception):
  121. client_logger.debug("Error while closing connector: %r", res)
  122. class Connection:
  123. _source_traceback = None
  124. def __init__(
  125. self,
  126. connector: "BaseConnector",
  127. key: "ConnectionKey",
  128. protocol: ResponseHandler,
  129. loop: asyncio.AbstractEventLoop,
  130. ) -> None:
  131. self._key = key
  132. self._connector = connector
  133. self._loop = loop
  134. self._protocol: Optional[ResponseHandler] = protocol
  135. self._callbacks: List[Callable[[], None]] = []
  136. if loop.get_debug():
  137. self._source_traceback = traceback.extract_stack(sys._getframe(1))
  138. def __repr__(self) -> str:
  139. return f"Connection<{self._key}>"
  140. def __del__(self, _warnings: Any = warnings) -> None:
  141. if self._protocol is not None:
  142. kwargs = {"source": self}
  143. _warnings.warn(f"Unclosed connection {self!r}", ResourceWarning, **kwargs)
  144. if self._loop.is_closed():
  145. return
  146. self._connector._release(self._key, self._protocol, should_close=True)
  147. context = {"client_connection": self, "message": "Unclosed connection"}
  148. if self._source_traceback is not None:
  149. context["source_traceback"] = self._source_traceback
  150. self._loop.call_exception_handler(context)
  151. def __bool__(self) -> Literal[True]:
  152. """Force subclasses to not be falsy, to make checks simpler."""
  153. return True
  154. @property
  155. def loop(self) -> asyncio.AbstractEventLoop:
  156. warnings.warn(
  157. "connector.loop property is deprecated", DeprecationWarning, stacklevel=2
  158. )
  159. return self._loop
  160. @property
  161. def transport(self) -> Optional[asyncio.Transport]:
  162. if self._protocol is None:
  163. return None
  164. return self._protocol.transport
  165. @property
  166. def protocol(self) -> Optional[ResponseHandler]:
  167. return self._protocol
  168. def add_callback(self, callback: Callable[[], None]) -> None:
  169. if callback is not None:
  170. self._callbacks.append(callback)
  171. def _notify_release(self) -> None:
  172. callbacks, self._callbacks = self._callbacks[:], []
  173. for cb in callbacks:
  174. with suppress(Exception):
  175. cb()
  176. def close(self) -> None:
  177. self._notify_release()
  178. if self._protocol is not None:
  179. self._connector._release(self._key, self._protocol, should_close=True)
  180. self._protocol = None
  181. def release(self) -> None:
  182. self._notify_release()
  183. if self._protocol is not None:
  184. self._connector._release(self._key, self._protocol)
  185. self._protocol = None
  186. @property
  187. def closed(self) -> bool:
  188. return self._protocol is None or not self._protocol.is_connected()
  189. class _ConnectTunnelConnection(Connection):
  190. """Special connection wrapper for CONNECT tunnels that must never be pooled.
  191. This connection wraps the proxy connection that will be upgraded with TLS.
  192. It must never be released to the pool because:
  193. 1. Its 'closed' future will never complete, causing session.close() to hang
  194. 2. It represents an intermediate state, not a reusable connection
  195. 3. The real connection (with TLS) will be created separately
  196. """
  197. def release(self) -> None:
  198. """Do nothing - don't pool or close the connection.
  199. These connections are an intermediate state during the CONNECT tunnel
  200. setup and will be cleaned up naturally after the TLS upgrade. If they
  201. were to be pooled, they would never be properly closed, causing
  202. session.close() to wait forever for their 'closed' future.
  203. """
  204. class _TransportPlaceholder:
  205. """placeholder for BaseConnector.connect function"""
  206. __slots__ = ("closed", "transport")
  207. def __init__(self, closed_future: asyncio.Future[Optional[Exception]]) -> None:
  208. """Initialize a placeholder for a transport."""
  209. self.closed = closed_future
  210. self.transport = None
  211. def close(self) -> None:
  212. """Close the placeholder."""
  213. def abort(self) -> None:
  214. """Abort the placeholder (does nothing)."""
  215. class BaseConnector:
  216. """Base connector class.
  217. keepalive_timeout - (optional) Keep-alive timeout.
  218. force_close - Set to True to force close and do reconnect
  219. after each request (and between redirects).
  220. limit - The total number of simultaneous connections.
  221. limit_per_host - Number of simultaneous connections to one host.
  222. enable_cleanup_closed - Enables clean-up closed ssl transports.
  223. Disabled by default.
  224. timeout_ceil_threshold - Trigger ceiling of timeout values when
  225. it's above timeout_ceil_threshold.
  226. loop - Optional event loop.
  227. """
  228. _closed = True # prevent AttributeError in __del__ if ctor was failed
  229. _source_traceback = None
  230. # abort transport after 2 seconds (cleanup broken connections)
  231. _cleanup_closed_period = 2.0
  232. allowed_protocol_schema_set = HIGH_LEVEL_SCHEMA_SET
  233. def __init__(
  234. self,
  235. *,
  236. keepalive_timeout: Union[object, None, float] = sentinel,
  237. force_close: bool = False,
  238. limit: int = 100,
  239. limit_per_host: int = 0,
  240. enable_cleanup_closed: bool = False,
  241. loop: Optional[asyncio.AbstractEventLoop] = None,
  242. timeout_ceil_threshold: float = 5,
  243. ) -> None:
  244. if force_close:
  245. if keepalive_timeout is not None and keepalive_timeout is not sentinel:
  246. raise ValueError(
  247. "keepalive_timeout cannot be set if force_close is True"
  248. )
  249. else:
  250. if keepalive_timeout is sentinel:
  251. keepalive_timeout = 15.0
  252. loop = loop or asyncio.get_running_loop()
  253. self._timeout_ceil_threshold = timeout_ceil_threshold
  254. self._closed = False
  255. if loop.get_debug():
  256. self._source_traceback = traceback.extract_stack(sys._getframe(1))
  257. # Connection pool of reusable connections.
  258. # We use a deque to store connections because it has O(1) popleft()
  259. # and O(1) append() operations to implement a FIFO queue.
  260. self._conns: DefaultDict[
  261. ConnectionKey, Deque[Tuple[ResponseHandler, float]]
  262. ] = defaultdict(deque)
  263. self._limit = limit
  264. self._limit_per_host = limit_per_host
  265. self._acquired: Set[ResponseHandler] = set()
  266. self._acquired_per_host: DefaultDict[ConnectionKey, Set[ResponseHandler]] = (
  267. defaultdict(set)
  268. )
  269. self._keepalive_timeout = cast(float, keepalive_timeout)
  270. self._force_close = force_close
  271. # {host_key: FIFO list of waiters}
  272. # The FIFO is implemented with an OrderedDict with None keys because
  273. # python does not have an ordered set.
  274. self._waiters: DefaultDict[
  275. ConnectionKey, OrderedDict[asyncio.Future[None], None]
  276. ] = defaultdict(OrderedDict)
  277. self._loop = loop
  278. self._factory = functools.partial(ResponseHandler, loop=loop)
  279. # start keep-alive connection cleanup task
  280. self._cleanup_handle: Optional[asyncio.TimerHandle] = None
  281. # start cleanup closed transports task
  282. self._cleanup_closed_handle: Optional[asyncio.TimerHandle] = None
  283. if enable_cleanup_closed and not NEEDS_CLEANUP_CLOSED:
  284. warnings.warn(
  285. "enable_cleanup_closed ignored because "
  286. "https://github.com/python/cpython/pull/118960 is fixed "
  287. f"in Python version {sys.version_info}",
  288. DeprecationWarning,
  289. stacklevel=2,
  290. )
  291. enable_cleanup_closed = False
  292. self._cleanup_closed_disabled = not enable_cleanup_closed
  293. self._cleanup_closed_transports: List[Optional[asyncio.Transport]] = []
  294. self._placeholder_future: asyncio.Future[Optional[Exception]] = (
  295. loop.create_future()
  296. )
  297. self._placeholder_future.set_result(None)
  298. self._cleanup_closed()
  299. def __del__(self, _warnings: Any = warnings) -> None:
  300. if self._closed:
  301. return
  302. if not self._conns:
  303. return
  304. conns = [repr(c) for c in self._conns.values()]
  305. self._close()
  306. kwargs = {"source": self}
  307. _warnings.warn(f"Unclosed connector {self!r}", ResourceWarning, **kwargs)
  308. context = {
  309. "connector": self,
  310. "connections": conns,
  311. "message": "Unclosed connector",
  312. }
  313. if self._source_traceback is not None:
  314. context["source_traceback"] = self._source_traceback
  315. self._loop.call_exception_handler(context)
  316. def __enter__(self) -> "BaseConnector":
  317. warnings.warn(
  318. '"with Connector():" is deprecated, '
  319. 'use "async with Connector():" instead',
  320. DeprecationWarning,
  321. )
  322. return self
  323. def __exit__(self, *exc: Any) -> None:
  324. self._close()
  325. async def __aenter__(self) -> "BaseConnector":
  326. return self
  327. async def __aexit__(
  328. self,
  329. exc_type: Optional[Type[BaseException]] = None,
  330. exc_value: Optional[BaseException] = None,
  331. exc_traceback: Optional[TracebackType] = None,
  332. ) -> None:
  333. await self.close()
  334. @property
  335. def force_close(self) -> bool:
  336. """Ultimately close connection on releasing if True."""
  337. return self._force_close
  338. @property
  339. def limit(self) -> int:
  340. """The total number for simultaneous connections.
  341. If limit is 0 the connector has no limit.
  342. The default limit size is 100.
  343. """
  344. return self._limit
  345. @property
  346. def limit_per_host(self) -> int:
  347. """The limit for simultaneous connections to the same endpoint.
  348. Endpoints are the same if they are have equal
  349. (host, port, is_ssl) triple.
  350. """
  351. return self._limit_per_host
  352. def _cleanup(self) -> None:
  353. """Cleanup unused transports."""
  354. if self._cleanup_handle:
  355. self._cleanup_handle.cancel()
  356. # _cleanup_handle should be unset, otherwise _release() will not
  357. # recreate it ever!
  358. self._cleanup_handle = None
  359. now = monotonic()
  360. timeout = self._keepalive_timeout
  361. if self._conns:
  362. connections = defaultdict(deque)
  363. deadline = now - timeout
  364. for key, conns in self._conns.items():
  365. alive: Deque[Tuple[ResponseHandler, float]] = deque()
  366. for proto, use_time in conns:
  367. if proto.is_connected() and use_time - deadline >= 0:
  368. alive.append((proto, use_time))
  369. continue
  370. transport = proto.transport
  371. proto.close()
  372. if not self._cleanup_closed_disabled and key.is_ssl:
  373. self._cleanup_closed_transports.append(transport)
  374. if alive:
  375. connections[key] = alive
  376. self._conns = connections
  377. if self._conns:
  378. self._cleanup_handle = helpers.weakref_handle(
  379. self,
  380. "_cleanup",
  381. timeout,
  382. self._loop,
  383. timeout_ceil_threshold=self._timeout_ceil_threshold,
  384. )
  385. def _cleanup_closed(self) -> None:
  386. """Double confirmation for transport close.
  387. Some broken ssl servers may leave socket open without proper close.
  388. """
  389. if self._cleanup_closed_handle:
  390. self._cleanup_closed_handle.cancel()
  391. for transport in self._cleanup_closed_transports:
  392. if transport is not None:
  393. transport.abort()
  394. self._cleanup_closed_transports = []
  395. if not self._cleanup_closed_disabled:
  396. self._cleanup_closed_handle = helpers.weakref_handle(
  397. self,
  398. "_cleanup_closed",
  399. self._cleanup_closed_period,
  400. self._loop,
  401. timeout_ceil_threshold=self._timeout_ceil_threshold,
  402. )
  403. def close(self, *, abort_ssl: bool = False) -> Awaitable[None]:
  404. """Close all opened transports.
  405. :param abort_ssl: If True, SSL connections will be aborted immediately
  406. without performing the shutdown handshake. This provides
  407. faster cleanup at the cost of less graceful disconnection.
  408. """
  409. if not (waiters := self._close(abort_ssl=abort_ssl)):
  410. # If there are no connections to close, we can return a noop
  411. # awaitable to avoid scheduling a task on the event loop.
  412. return _DeprecationWaiter(noop())
  413. coro = _wait_for_close(waiters)
  414. if sys.version_info >= (3, 12):
  415. # Optimization for Python 3.12, try to close connections
  416. # immediately to avoid having to schedule the task on the event loop.
  417. task = asyncio.Task(coro, loop=self._loop, eager_start=True)
  418. else:
  419. task = self._loop.create_task(coro)
  420. return _DeprecationWaiter(task)
  421. def _close(self, *, abort_ssl: bool = False) -> List[Awaitable[object]]:
  422. waiters: List[Awaitable[object]] = []
  423. if self._closed:
  424. return waiters
  425. self._closed = True
  426. try:
  427. if self._loop.is_closed():
  428. return waiters
  429. # cancel cleanup task
  430. if self._cleanup_handle:
  431. self._cleanup_handle.cancel()
  432. # cancel cleanup close task
  433. if self._cleanup_closed_handle:
  434. self._cleanup_closed_handle.cancel()
  435. for data in self._conns.values():
  436. for proto, _ in data:
  437. if (
  438. abort_ssl
  439. and proto.transport
  440. and proto.transport.get_extra_info("sslcontext") is not None
  441. ):
  442. proto.abort()
  443. else:
  444. proto.close()
  445. if closed := proto.closed:
  446. waiters.append(closed)
  447. for proto in self._acquired:
  448. if (
  449. abort_ssl
  450. and proto.transport
  451. and proto.transport.get_extra_info("sslcontext") is not None
  452. ):
  453. proto.abort()
  454. else:
  455. proto.close()
  456. if closed := proto.closed:
  457. waiters.append(closed)
  458. for transport in self._cleanup_closed_transports:
  459. if transport is not None:
  460. transport.abort()
  461. return waiters
  462. finally:
  463. self._conns.clear()
  464. self._acquired.clear()
  465. for keyed_waiters in self._waiters.values():
  466. for keyed_waiter in keyed_waiters:
  467. keyed_waiter.cancel()
  468. self._waiters.clear()
  469. self._cleanup_handle = None
  470. self._cleanup_closed_transports.clear()
  471. self._cleanup_closed_handle = None
  472. @property
  473. def closed(self) -> bool:
  474. """Is connector closed.
  475. A readonly property.
  476. """
  477. return self._closed
  478. def _available_connections(self, key: "ConnectionKey") -> int:
  479. """
  480. Return number of available connections.
  481. The limit, limit_per_host and the connection key are taken into account.
  482. If it returns less than 1 means that there are no connections
  483. available.
  484. """
  485. # check total available connections
  486. # If there are no limits, this will always return 1
  487. total_remain = 1
  488. if self._limit and (total_remain := self._limit - len(self._acquired)) <= 0:
  489. return total_remain
  490. # check limit per host
  491. if host_remain := self._limit_per_host:
  492. if acquired := self._acquired_per_host.get(key):
  493. host_remain -= len(acquired)
  494. if total_remain > host_remain:
  495. return host_remain
  496. return total_remain
  497. def _update_proxy_auth_header_and_build_proxy_req(
  498. self, req: ClientRequest
  499. ) -> ClientRequest:
  500. """Set Proxy-Authorization header for non-SSL proxy requests and builds the proxy request for SSL proxy requests."""
  501. url = req.proxy
  502. assert url is not None
  503. headers: Dict[str, str] = {}
  504. if req.proxy_headers is not None:
  505. headers = req.proxy_headers # type: ignore[assignment]
  506. headers[hdrs.HOST] = req.headers[hdrs.HOST]
  507. proxy_req = ClientRequest(
  508. hdrs.METH_GET,
  509. url,
  510. headers=headers,
  511. auth=req.proxy_auth,
  512. loop=self._loop,
  513. ssl=req.ssl,
  514. )
  515. auth = proxy_req.headers.pop(hdrs.AUTHORIZATION, None)
  516. if auth is not None:
  517. if not req.is_ssl():
  518. req.headers[hdrs.PROXY_AUTHORIZATION] = auth
  519. else:
  520. proxy_req.headers[hdrs.PROXY_AUTHORIZATION] = auth
  521. return proxy_req
  522. async def connect(
  523. self, req: ClientRequest, traces: List["Trace"], timeout: "ClientTimeout"
  524. ) -> Connection:
  525. """Get from pool or create new connection."""
  526. key = req.connection_key
  527. if (conn := await self._get(key, traces)) is not None:
  528. # If we do not have to wait and we can get a connection from the pool
  529. # we can avoid the timeout ceil logic and directly return the connection
  530. if req.proxy:
  531. self._update_proxy_auth_header_and_build_proxy_req(req)
  532. return conn
  533. async with ceil_timeout(timeout.connect, timeout.ceil_threshold):
  534. if self._available_connections(key) <= 0:
  535. await self._wait_for_available_connection(key, traces)
  536. if (conn := await self._get(key, traces)) is not None:
  537. if req.proxy:
  538. self._update_proxy_auth_header_and_build_proxy_req(req)
  539. return conn
  540. placeholder = cast(
  541. ResponseHandler, _TransportPlaceholder(self._placeholder_future)
  542. )
  543. self._acquired.add(placeholder)
  544. if self._limit_per_host:
  545. self._acquired_per_host[key].add(placeholder)
  546. try:
  547. # Traces are done inside the try block to ensure that the
  548. # that the placeholder is still cleaned up if an exception
  549. # is raised.
  550. if traces:
  551. for trace in traces:
  552. await trace.send_connection_create_start()
  553. proto = await self._create_connection(req, traces, timeout)
  554. if traces:
  555. for trace in traces:
  556. await trace.send_connection_create_end()
  557. except BaseException:
  558. self._release_acquired(key, placeholder)
  559. raise
  560. else:
  561. if self._closed:
  562. proto.close()
  563. raise ClientConnectionError("Connector is closed.")
  564. # The connection was successfully created, drop the placeholder
  565. # and add the real connection to the acquired set. There should
  566. # be no awaits after the proto is added to the acquired set
  567. # to ensure that the connection is not left in the acquired set
  568. # on cancellation.
  569. self._acquired.remove(placeholder)
  570. self._acquired.add(proto)
  571. if self._limit_per_host:
  572. acquired_per_host = self._acquired_per_host[key]
  573. acquired_per_host.remove(placeholder)
  574. acquired_per_host.add(proto)
  575. return Connection(self, key, proto, self._loop)
  576. async def _wait_for_available_connection(
  577. self, key: "ConnectionKey", traces: List["Trace"]
  578. ) -> None:
  579. """Wait for an available connection slot."""
  580. # We loop here because there is a race between
  581. # the connection limit check and the connection
  582. # being acquired. If the connection is acquired
  583. # between the check and the await statement, we
  584. # need to loop again to check if the connection
  585. # slot is still available.
  586. attempts = 0
  587. while True:
  588. fut: asyncio.Future[None] = self._loop.create_future()
  589. keyed_waiters = self._waiters[key]
  590. keyed_waiters[fut] = None
  591. if attempts:
  592. # If we have waited before, we need to move the waiter
  593. # to the front of the queue as otherwise we might get
  594. # starved and hit the timeout.
  595. keyed_waiters.move_to_end(fut, last=False)
  596. try:
  597. # Traces happen in the try block to ensure that the
  598. # the waiter is still cleaned up if an exception is raised.
  599. if traces:
  600. for trace in traces:
  601. await trace.send_connection_queued_start()
  602. await fut
  603. if traces:
  604. for trace in traces:
  605. await trace.send_connection_queued_end()
  606. finally:
  607. # pop the waiter from the queue if its still
  608. # there and not already removed by _release_waiter
  609. keyed_waiters.pop(fut, None)
  610. if not self._waiters.get(key, True):
  611. del self._waiters[key]
  612. if self._available_connections(key) > 0:
  613. break
  614. attempts += 1
  615. async def _get(
  616. self, key: "ConnectionKey", traces: List["Trace"]
  617. ) -> Optional[Connection]:
  618. """Get next reusable connection for the key or None.
  619. The connection will be marked as acquired.
  620. """
  621. if (conns := self._conns.get(key)) is None:
  622. return None
  623. t1 = monotonic()
  624. while conns:
  625. proto, t0 = conns.popleft()
  626. # We will we reuse the connection if its connected and
  627. # the keepalive timeout has not been exceeded
  628. if proto.is_connected() and t1 - t0 <= self._keepalive_timeout:
  629. if not conns:
  630. # The very last connection was reclaimed: drop the key
  631. del self._conns[key]
  632. self._acquired.add(proto)
  633. if self._limit_per_host:
  634. self._acquired_per_host[key].add(proto)
  635. if traces:
  636. for trace in traces:
  637. try:
  638. await trace.send_connection_reuseconn()
  639. except BaseException:
  640. self._release_acquired(key, proto)
  641. raise
  642. return Connection(self, key, proto, self._loop)
  643. # Connection cannot be reused, close it
  644. transport = proto.transport
  645. proto.close()
  646. # only for SSL transports
  647. if not self._cleanup_closed_disabled and key.is_ssl:
  648. self._cleanup_closed_transports.append(transport)
  649. # No more connections: drop the key
  650. del self._conns[key]
  651. return None
  652. def _release_waiter(self) -> None:
  653. """
  654. Iterates over all waiters until one to be released is found.
  655. The one to be released is not finished and
  656. belongs to a host that has available connections.
  657. """
  658. if not self._waiters:
  659. return
  660. # Having the dict keys ordered this avoids to iterate
  661. # at the same order at each call.
  662. queues = list(self._waiters)
  663. random.shuffle(queues)
  664. for key in queues:
  665. if self._available_connections(key) < 1:
  666. continue
  667. waiters = self._waiters[key]
  668. while waiters:
  669. waiter, _ = waiters.popitem(last=False)
  670. if not waiter.done():
  671. waiter.set_result(None)
  672. return
  673. def _release_acquired(self, key: "ConnectionKey", proto: ResponseHandler) -> None:
  674. """Release acquired connection."""
  675. if self._closed:
  676. # acquired connection is already released on connector closing
  677. return
  678. self._acquired.discard(proto)
  679. if self._limit_per_host and (conns := self._acquired_per_host.get(key)):
  680. conns.discard(proto)
  681. if not conns:
  682. del self._acquired_per_host[key]
  683. self._release_waiter()
  684. def _release(
  685. self,
  686. key: "ConnectionKey",
  687. protocol: ResponseHandler,
  688. *,
  689. should_close: bool = False,
  690. ) -> None:
  691. if self._closed:
  692. # acquired connection is already released on connector closing
  693. return
  694. self._release_acquired(key, protocol)
  695. if self._force_close or should_close or protocol.should_close:
  696. transport = protocol.transport
  697. protocol.close()
  698. if key.is_ssl and not self._cleanup_closed_disabled:
  699. self._cleanup_closed_transports.append(transport)
  700. return
  701. self._conns[key].append((protocol, monotonic()))
  702. if self._cleanup_handle is None:
  703. self._cleanup_handle = helpers.weakref_handle(
  704. self,
  705. "_cleanup",
  706. self._keepalive_timeout,
  707. self._loop,
  708. timeout_ceil_threshold=self._timeout_ceil_threshold,
  709. )
  710. async def _create_connection(
  711. self, req: ClientRequest, traces: List["Trace"], timeout: "ClientTimeout"
  712. ) -> ResponseHandler:
  713. raise NotImplementedError()
  714. class _DNSCacheTable:
  715. def __init__(self, ttl: Optional[float] = None) -> None:
  716. self._addrs_rr: Dict[Tuple[str, int], Tuple[Iterator[ResolveResult], int]] = {}
  717. self._timestamps: Dict[Tuple[str, int], float] = {}
  718. self._ttl = ttl
  719. def __contains__(self, host: object) -> bool:
  720. return host in self._addrs_rr
  721. def add(self, key: Tuple[str, int], addrs: List[ResolveResult]) -> None:
  722. self._addrs_rr[key] = (cycle(addrs), len(addrs))
  723. if self._ttl is not None:
  724. self._timestamps[key] = monotonic()
  725. def remove(self, key: Tuple[str, int]) -> None:
  726. self._addrs_rr.pop(key, None)
  727. if self._ttl is not None:
  728. self._timestamps.pop(key, None)
  729. def clear(self) -> None:
  730. self._addrs_rr.clear()
  731. self._timestamps.clear()
  732. def next_addrs(self, key: Tuple[str, int]) -> List[ResolveResult]:
  733. loop, length = self._addrs_rr[key]
  734. addrs = list(islice(loop, length))
  735. # Consume one more element to shift internal state of `cycle`
  736. next(loop)
  737. return addrs
  738. def expired(self, key: Tuple[str, int]) -> bool:
  739. if self._ttl is None:
  740. return False
  741. return self._timestamps[key] + self._ttl < monotonic()
  742. def _make_ssl_context(verified: bool) -> SSLContext:
  743. """Create SSL context.
  744. This method is not async-friendly and should be called from a thread
  745. because it will load certificates from disk and do other blocking I/O.
  746. """
  747. if ssl is None:
  748. # No ssl support
  749. return None
  750. if verified:
  751. sslcontext = ssl.create_default_context()
  752. else:
  753. sslcontext = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
  754. sslcontext.options |= ssl.OP_NO_SSLv2
  755. sslcontext.options |= ssl.OP_NO_SSLv3
  756. sslcontext.check_hostname = False
  757. sslcontext.verify_mode = ssl.CERT_NONE
  758. sslcontext.options |= ssl.OP_NO_COMPRESSION
  759. sslcontext.set_default_verify_paths()
  760. sslcontext.set_alpn_protocols(("http/1.1",))
  761. return sslcontext
  762. # The default SSLContext objects are created at import time
  763. # since they do blocking I/O to load certificates from disk,
  764. # and imports should always be done before the event loop starts
  765. # or in a thread.
  766. _SSL_CONTEXT_VERIFIED = _make_ssl_context(True)
  767. _SSL_CONTEXT_UNVERIFIED = _make_ssl_context(False)
  768. class TCPConnector(BaseConnector):
  769. """TCP connector.
  770. verify_ssl - Set to True to check ssl certifications.
  771. fingerprint - Pass the binary sha256
  772. digest of the expected certificate in DER format to verify
  773. that the certificate the server presents matches. See also
  774. https://en.wikipedia.org/wiki/HTTP_Public_Key_Pinning
  775. resolver - Enable DNS lookups and use this
  776. resolver
  777. use_dns_cache - Use memory cache for DNS lookups.
  778. ttl_dns_cache - Max seconds having cached a DNS entry, None forever.
  779. family - socket address family
  780. local_addr - local tuple of (host, port) to bind socket to
  781. keepalive_timeout - (optional) Keep-alive timeout.
  782. force_close - Set to True to force close and do reconnect
  783. after each request (and between redirects).
  784. limit - The total number of simultaneous connections.
  785. limit_per_host - Number of simultaneous connections to one host.
  786. enable_cleanup_closed - Enables clean-up closed ssl transports.
  787. Disabled by default.
  788. happy_eyeballs_delay - This is the “Connection Attempt Delay”
  789. as defined in RFC 8305. To disable
  790. the happy eyeballs algorithm, set to None.
  791. interleave - “First Address Family Count” as defined in RFC 8305
  792. loop - Optional event loop.
  793. socket_factory - A SocketFactoryType function that, if supplied,
  794. will be used to create sockets given an
  795. AddrInfoType.
  796. ssl_shutdown_timeout - DEPRECATED. Will be removed in aiohttp 4.0.
  797. Grace period for SSL shutdown handshake on TLS
  798. connections. Default is 0 seconds (immediate abort).
  799. This parameter allowed for a clean SSL shutdown by
  800. notifying the remote peer of connection closure,
  801. while avoiding excessive delays during connector cleanup.
  802. Note: Only takes effect on Python 3.11+.
  803. """
  804. allowed_protocol_schema_set = HIGH_LEVEL_SCHEMA_SET | frozenset({"tcp"})
  805. def __init__(
  806. self,
  807. *,
  808. verify_ssl: bool = True,
  809. fingerprint: Optional[bytes] = None,
  810. use_dns_cache: bool = True,
  811. ttl_dns_cache: Optional[int] = 10,
  812. family: socket.AddressFamily = socket.AddressFamily.AF_UNSPEC,
  813. ssl_context: Optional[SSLContext] = None,
  814. ssl: Union[bool, Fingerprint, SSLContext] = True,
  815. local_addr: Optional[Tuple[str, int]] = None,
  816. resolver: Optional[AbstractResolver] = None,
  817. keepalive_timeout: Union[None, float, object] = sentinel,
  818. force_close: bool = False,
  819. limit: int = 100,
  820. limit_per_host: int = 0,
  821. enable_cleanup_closed: bool = False,
  822. loop: Optional[asyncio.AbstractEventLoop] = None,
  823. timeout_ceil_threshold: float = 5,
  824. happy_eyeballs_delay: Optional[float] = 0.25,
  825. interleave: Optional[int] = None,
  826. socket_factory: Optional[SocketFactoryType] = None,
  827. ssl_shutdown_timeout: Union[_SENTINEL, None, float] = sentinel,
  828. ):
  829. super().__init__(
  830. keepalive_timeout=keepalive_timeout,
  831. force_close=force_close,
  832. limit=limit,
  833. limit_per_host=limit_per_host,
  834. enable_cleanup_closed=enable_cleanup_closed,
  835. loop=loop,
  836. timeout_ceil_threshold=timeout_ceil_threshold,
  837. )
  838. self._ssl = _merge_ssl_params(ssl, verify_ssl, ssl_context, fingerprint)
  839. self._resolver: AbstractResolver
  840. if resolver is None:
  841. self._resolver = DefaultResolver(loop=self._loop)
  842. self._resolver_owner = True
  843. else:
  844. self._resolver = resolver
  845. self._resolver_owner = False
  846. self._use_dns_cache = use_dns_cache
  847. self._cached_hosts = _DNSCacheTable(ttl=ttl_dns_cache)
  848. self._throttle_dns_futures: Dict[
  849. Tuple[str, int], Set["asyncio.Future[None]"]
  850. ] = {}
  851. self._family = family
  852. self._local_addr_infos = aiohappyeyeballs.addr_to_addr_infos(local_addr)
  853. self._happy_eyeballs_delay = happy_eyeballs_delay
  854. self._interleave = interleave
  855. self._resolve_host_tasks: Set["asyncio.Task[List[ResolveResult]]"] = set()
  856. self._socket_factory = socket_factory
  857. self._ssl_shutdown_timeout: Optional[float]
  858. # Handle ssl_shutdown_timeout with warning for Python < 3.11
  859. if ssl_shutdown_timeout is sentinel:
  860. self._ssl_shutdown_timeout = 0
  861. else:
  862. # Deprecation warning for ssl_shutdown_timeout parameter
  863. warnings.warn(
  864. "The ssl_shutdown_timeout parameter is deprecated and will be removed in aiohttp 4.0",
  865. DeprecationWarning,
  866. stacklevel=2,
  867. )
  868. if (
  869. sys.version_info < (3, 11)
  870. and ssl_shutdown_timeout is not None
  871. and ssl_shutdown_timeout != 0
  872. ):
  873. warnings.warn(
  874. f"ssl_shutdown_timeout={ssl_shutdown_timeout} is ignored on Python < 3.11; "
  875. "only ssl_shutdown_timeout=0 is supported. The timeout will be ignored.",
  876. RuntimeWarning,
  877. stacklevel=2,
  878. )
  879. self._ssl_shutdown_timeout = ssl_shutdown_timeout
  880. def _close(self, *, abort_ssl: bool = False) -> List[Awaitable[object]]:
  881. """Close all ongoing DNS calls."""
  882. for fut in chain.from_iterable(self._throttle_dns_futures.values()):
  883. fut.cancel()
  884. waiters = super()._close(abort_ssl=abort_ssl)
  885. for t in self._resolve_host_tasks:
  886. t.cancel()
  887. waiters.append(t)
  888. return waiters
  889. async def close(self, *, abort_ssl: bool = False) -> None:
  890. """
  891. Close all opened transports.
  892. :param abort_ssl: If True, SSL connections will be aborted immediately
  893. without performing the shutdown handshake. If False (default),
  894. the behavior is determined by ssl_shutdown_timeout:
  895. - If ssl_shutdown_timeout=0: connections are aborted
  896. - If ssl_shutdown_timeout>0: graceful shutdown is performed
  897. """
  898. if self._resolver_owner:
  899. await self._resolver.close()
  900. # Use abort_ssl param if explicitly set, otherwise use ssl_shutdown_timeout default
  901. await super().close(abort_ssl=abort_ssl or self._ssl_shutdown_timeout == 0)
  902. @property
  903. def family(self) -> int:
  904. """Socket family like AF_INET."""
  905. return self._family
  906. @property
  907. def use_dns_cache(self) -> bool:
  908. """True if local DNS caching is enabled."""
  909. return self._use_dns_cache
  910. def clear_dns_cache(
  911. self, host: Optional[str] = None, port: Optional[int] = None
  912. ) -> None:
  913. """Remove specified host/port or clear all dns local cache."""
  914. if host is not None and port is not None:
  915. self._cached_hosts.remove((host, port))
  916. elif host is not None or port is not None:
  917. raise ValueError("either both host and port or none of them are allowed")
  918. else:
  919. self._cached_hosts.clear()
  920. async def _resolve_host(
  921. self, host: str, port: int, traces: Optional[Sequence["Trace"]] = None
  922. ) -> List[ResolveResult]:
  923. """Resolve host and return list of addresses."""
  924. if is_ip_address(host):
  925. return [
  926. {
  927. "hostname": host,
  928. "host": host,
  929. "port": port,
  930. "family": self._family,
  931. "proto": 0,
  932. "flags": 0,
  933. }
  934. ]
  935. if not self._use_dns_cache:
  936. if traces:
  937. for trace in traces:
  938. await trace.send_dns_resolvehost_start(host)
  939. res = await self._resolver.resolve(host, port, family=self._family)
  940. if traces:
  941. for trace in traces:
  942. await trace.send_dns_resolvehost_end(host)
  943. return res
  944. key = (host, port)
  945. if key in self._cached_hosts and not self._cached_hosts.expired(key):
  946. # get result early, before any await (#4014)
  947. result = self._cached_hosts.next_addrs(key)
  948. if traces:
  949. for trace in traces:
  950. await trace.send_dns_cache_hit(host)
  951. return result
  952. futures: Set["asyncio.Future[None]"]
  953. #
  954. # If multiple connectors are resolving the same host, we wait
  955. # for the first one to resolve and then use the result for all of them.
  956. # We use a throttle to ensure that we only resolve the host once
  957. # and then use the result for all the waiters.
  958. #
  959. if key in self._throttle_dns_futures:
  960. # get futures early, before any await (#4014)
  961. futures = self._throttle_dns_futures[key]
  962. future: asyncio.Future[None] = self._loop.create_future()
  963. futures.add(future)
  964. if traces:
  965. for trace in traces:
  966. await trace.send_dns_cache_hit(host)
  967. try:
  968. await future
  969. finally:
  970. futures.discard(future)
  971. return self._cached_hosts.next_addrs(key)
  972. # update dict early, before any await (#4014)
  973. self._throttle_dns_futures[key] = futures = set()
  974. # In this case we need to create a task to ensure that we can shield
  975. # the task from cancellation as cancelling this lookup should not cancel
  976. # the underlying lookup or else the cancel event will get broadcast to
  977. # all the waiters across all connections.
  978. #
  979. coro = self._resolve_host_with_throttle(key, host, port, futures, traces)
  980. loop = asyncio.get_running_loop()
  981. if sys.version_info >= (3, 12):
  982. # Optimization for Python 3.12, try to send immediately
  983. resolved_host_task = asyncio.Task(coro, loop=loop, eager_start=True)
  984. else:
  985. resolved_host_task = loop.create_task(coro)
  986. if not resolved_host_task.done():
  987. self._resolve_host_tasks.add(resolved_host_task)
  988. resolved_host_task.add_done_callback(self._resolve_host_tasks.discard)
  989. try:
  990. return await asyncio.shield(resolved_host_task)
  991. except asyncio.CancelledError:
  992. def drop_exception(fut: "asyncio.Future[List[ResolveResult]]") -> None:
  993. with suppress(Exception, asyncio.CancelledError):
  994. fut.result()
  995. resolved_host_task.add_done_callback(drop_exception)
  996. raise
  997. async def _resolve_host_with_throttle(
  998. self,
  999. key: Tuple[str, int],
  1000. host: str,
  1001. port: int,
  1002. futures: Set["asyncio.Future[None]"],
  1003. traces: Optional[Sequence["Trace"]],
  1004. ) -> List[ResolveResult]:
  1005. """Resolve host and set result for all waiters.
  1006. This method must be run in a task and shielded from cancellation
  1007. to avoid cancelling the underlying lookup.
  1008. """
  1009. try:
  1010. if traces:
  1011. for trace in traces:
  1012. await trace.send_dns_cache_miss(host)
  1013. for trace in traces:
  1014. await trace.send_dns_resolvehost_start(host)
  1015. addrs = await self._resolver.resolve(host, port, family=self._family)
  1016. if traces:
  1017. for trace in traces:
  1018. await trace.send_dns_resolvehost_end(host)
  1019. self._cached_hosts.add(key, addrs)
  1020. for fut in futures:
  1021. set_result(fut, None)
  1022. except BaseException as e:
  1023. # any DNS exception is set for the waiters to raise the same exception.
  1024. # This coro is always run in task that is shielded from cancellation so
  1025. # we should never be propagating cancellation here.
  1026. for fut in futures:
  1027. set_exception(fut, e)
  1028. raise
  1029. finally:
  1030. self._throttle_dns_futures.pop(key)
  1031. return self._cached_hosts.next_addrs(key)
  1032. async def _create_connection(
  1033. self, req: ClientRequest, traces: List["Trace"], timeout: "ClientTimeout"
  1034. ) -> ResponseHandler:
  1035. """Create connection.
  1036. Has same keyword arguments as BaseEventLoop.create_connection.
  1037. """
  1038. if req.proxy:
  1039. _, proto = await self._create_proxy_connection(req, traces, timeout)
  1040. else:
  1041. _, proto = await self._create_direct_connection(req, traces, timeout)
  1042. return proto
  1043. def _get_ssl_context(self, req: ClientRequest) -> Optional[SSLContext]:
  1044. """Logic to get the correct SSL context
  1045. 0. if req.ssl is false, return None
  1046. 1. if ssl_context is specified in req, use it
  1047. 2. if _ssl_context is specified in self, use it
  1048. 3. otherwise:
  1049. 1. if verify_ssl is not specified in req, use self.ssl_context
  1050. (will generate a default context according to self.verify_ssl)
  1051. 2. if verify_ssl is True in req, generate a default SSL context
  1052. 3. if verify_ssl is False in req, generate a SSL context that
  1053. won't verify
  1054. """
  1055. if not req.is_ssl():
  1056. return None
  1057. if ssl is None: # pragma: no cover
  1058. raise RuntimeError("SSL is not supported.")
  1059. sslcontext = req.ssl
  1060. if isinstance(sslcontext, ssl.SSLContext):
  1061. return sslcontext
  1062. if sslcontext is not True:
  1063. # not verified or fingerprinted
  1064. return _SSL_CONTEXT_UNVERIFIED
  1065. sslcontext = self._ssl
  1066. if isinstance(sslcontext, ssl.SSLContext):
  1067. return sslcontext
  1068. if sslcontext is not True:
  1069. # not verified or fingerprinted
  1070. return _SSL_CONTEXT_UNVERIFIED
  1071. return _SSL_CONTEXT_VERIFIED
  1072. def _get_fingerprint(self, req: ClientRequest) -> Optional["Fingerprint"]:
  1073. ret = req.ssl
  1074. if isinstance(ret, Fingerprint):
  1075. return ret
  1076. ret = self._ssl
  1077. if isinstance(ret, Fingerprint):
  1078. return ret
  1079. return None
  1080. async def _wrap_create_connection(
  1081. self,
  1082. *args: Any,
  1083. addr_infos: List[AddrInfoType],
  1084. req: ClientRequest,
  1085. timeout: "ClientTimeout",
  1086. client_error: Type[Exception] = ClientConnectorError,
  1087. **kwargs: Any,
  1088. ) -> Tuple[asyncio.Transport, ResponseHandler]:
  1089. try:
  1090. async with ceil_timeout(
  1091. timeout.sock_connect, ceil_threshold=timeout.ceil_threshold
  1092. ):
  1093. sock = await aiohappyeyeballs.start_connection(
  1094. addr_infos=addr_infos,
  1095. local_addr_infos=self._local_addr_infos,
  1096. happy_eyeballs_delay=self._happy_eyeballs_delay,
  1097. interleave=self._interleave,
  1098. loop=self._loop,
  1099. socket_factory=self._socket_factory,
  1100. )
  1101. # Add ssl_shutdown_timeout for Python 3.11+ when SSL is used
  1102. if (
  1103. kwargs.get("ssl")
  1104. and self._ssl_shutdown_timeout
  1105. and sys.version_info >= (3, 11)
  1106. ):
  1107. kwargs["ssl_shutdown_timeout"] = self._ssl_shutdown_timeout
  1108. return await self._loop.create_connection(*args, **kwargs, sock=sock)
  1109. except cert_errors as exc:
  1110. raise ClientConnectorCertificateError(req.connection_key, exc) from exc
  1111. except ssl_errors as exc:
  1112. raise ClientConnectorSSLError(req.connection_key, exc) from exc
  1113. except OSError as exc:
  1114. if exc.errno is None and isinstance(exc, asyncio.TimeoutError):
  1115. raise
  1116. raise client_error(req.connection_key, exc) from exc
  1117. async def _wrap_existing_connection(
  1118. self,
  1119. *args: Any,
  1120. req: ClientRequest,
  1121. timeout: "ClientTimeout",
  1122. client_error: Type[Exception] = ClientConnectorError,
  1123. **kwargs: Any,
  1124. ) -> Tuple[asyncio.Transport, ResponseHandler]:
  1125. try:
  1126. async with ceil_timeout(
  1127. timeout.sock_connect, ceil_threshold=timeout.ceil_threshold
  1128. ):
  1129. return await self._loop.create_connection(*args, **kwargs)
  1130. except cert_errors as exc:
  1131. raise ClientConnectorCertificateError(req.connection_key, exc) from exc
  1132. except ssl_errors as exc:
  1133. raise ClientConnectorSSLError(req.connection_key, exc) from exc
  1134. except OSError as exc:
  1135. if exc.errno is None and isinstance(exc, asyncio.TimeoutError):
  1136. raise
  1137. raise client_error(req.connection_key, exc) from exc
  1138. def _fail_on_no_start_tls(self, req: "ClientRequest") -> None:
  1139. """Raise a :py:exc:`RuntimeError` on missing ``start_tls()``.
  1140. It is necessary for TLS-in-TLS so that it is possible to
  1141. send HTTPS queries through HTTPS proxies.
  1142. This doesn't affect regular HTTP requests, though.
  1143. """
  1144. if not req.is_ssl():
  1145. return
  1146. proxy_url = req.proxy
  1147. assert proxy_url is not None
  1148. if proxy_url.scheme != "https":
  1149. return
  1150. self._check_loop_for_start_tls()
  1151. def _check_loop_for_start_tls(self) -> None:
  1152. try:
  1153. self._loop.start_tls
  1154. except AttributeError as attr_exc:
  1155. raise RuntimeError(
  1156. "An HTTPS request is being sent through an HTTPS proxy. "
  1157. "This needs support for TLS in TLS but it is not implemented "
  1158. "in your runtime for the stdlib asyncio.\n\n"
  1159. "Please upgrade to Python 3.11 or higher. For more details, "
  1160. "please see:\n"
  1161. "* https://bugs.python.org/issue37179\n"
  1162. "* https://github.com/python/cpython/pull/28073\n"
  1163. "* https://docs.aiohttp.org/en/stable/"
  1164. "client_advanced.html#proxy-support\n"
  1165. "* https://github.com/aio-libs/aiohttp/discussions/6044\n",
  1166. ) from attr_exc
  1167. def _loop_supports_start_tls(self) -> bool:
  1168. try:
  1169. self._check_loop_for_start_tls()
  1170. except RuntimeError:
  1171. return False
  1172. else:
  1173. return True
  1174. def _warn_about_tls_in_tls(
  1175. self,
  1176. underlying_transport: asyncio.Transport,
  1177. req: ClientRequest,
  1178. ) -> None:
  1179. """Issue a warning if the requested URL has HTTPS scheme."""
  1180. if req.request_info.url.scheme != "https":
  1181. return
  1182. # Check if uvloop is being used, which supports TLS in TLS,
  1183. # otherwise assume that asyncio's native transport is being used.
  1184. if type(underlying_transport).__module__.startswith("uvloop"):
  1185. return
  1186. # Support in asyncio was added in Python 3.11 (bpo-44011)
  1187. asyncio_supports_tls_in_tls = sys.version_info >= (3, 11) or getattr(
  1188. underlying_transport,
  1189. "_start_tls_compatible",
  1190. False,
  1191. )
  1192. if asyncio_supports_tls_in_tls:
  1193. return
  1194. warnings.warn(
  1195. "An HTTPS request is being sent through an HTTPS proxy. "
  1196. "This support for TLS in TLS is known to be disabled "
  1197. "in the stdlib asyncio (Python <3.11). This is why you'll probably see "
  1198. "an error in the log below.\n\n"
  1199. "It is possible to enable it via monkeypatching. "
  1200. "For more details, see:\n"
  1201. "* https://bugs.python.org/issue37179\n"
  1202. "* https://github.com/python/cpython/pull/28073\n\n"
  1203. "You can temporarily patch this as follows:\n"
  1204. "* https://docs.aiohttp.org/en/stable/client_advanced.html#proxy-support\n"
  1205. "* https://github.com/aio-libs/aiohttp/discussions/6044\n",
  1206. RuntimeWarning,
  1207. source=self,
  1208. # Why `4`? At least 3 of the calls in the stack originate
  1209. # from the methods in this class.
  1210. stacklevel=3,
  1211. )
  1212. async def _start_tls_connection(
  1213. self,
  1214. underlying_transport: asyncio.Transport,
  1215. req: ClientRequest,
  1216. timeout: "ClientTimeout",
  1217. client_error: Type[Exception] = ClientConnectorError,
  1218. ) -> Tuple[asyncio.BaseTransport, ResponseHandler]:
  1219. """Wrap the raw TCP transport with TLS."""
  1220. tls_proto = self._factory() # Create a brand new proto for TLS
  1221. sslcontext = self._get_ssl_context(req)
  1222. if TYPE_CHECKING:
  1223. # _start_tls_connection is unreachable in the current code path
  1224. # if sslcontext is None.
  1225. assert sslcontext is not None
  1226. try:
  1227. async with ceil_timeout(
  1228. timeout.sock_connect, ceil_threshold=timeout.ceil_threshold
  1229. ):
  1230. try:
  1231. # ssl_shutdown_timeout is only available in Python 3.11+
  1232. if sys.version_info >= (3, 11) and self._ssl_shutdown_timeout:
  1233. tls_transport = await self._loop.start_tls(
  1234. underlying_transport,
  1235. tls_proto,
  1236. sslcontext,
  1237. server_hostname=req.server_hostname or req.host,
  1238. ssl_handshake_timeout=timeout.total,
  1239. ssl_shutdown_timeout=self._ssl_shutdown_timeout,
  1240. )
  1241. else:
  1242. tls_transport = await self._loop.start_tls(
  1243. underlying_transport,
  1244. tls_proto,
  1245. sslcontext,
  1246. server_hostname=req.server_hostname or req.host,
  1247. ssl_handshake_timeout=timeout.total,
  1248. )
  1249. except BaseException:
  1250. # We need to close the underlying transport since
  1251. # `start_tls()` probably failed before it had a
  1252. # chance to do this:
  1253. if self._ssl_shutdown_timeout == 0:
  1254. underlying_transport.abort()
  1255. else:
  1256. underlying_transport.close()
  1257. raise
  1258. if isinstance(tls_transport, asyncio.Transport):
  1259. fingerprint = self._get_fingerprint(req)
  1260. if fingerprint:
  1261. try:
  1262. fingerprint.check(tls_transport)
  1263. except ServerFingerprintMismatch:
  1264. tls_transport.close()
  1265. if not self._cleanup_closed_disabled:
  1266. self._cleanup_closed_transports.append(tls_transport)
  1267. raise
  1268. except cert_errors as exc:
  1269. raise ClientConnectorCertificateError(req.connection_key, exc) from exc
  1270. except ssl_errors as exc:
  1271. raise ClientConnectorSSLError(req.connection_key, exc) from exc
  1272. except OSError as exc:
  1273. if exc.errno is None and isinstance(exc, asyncio.TimeoutError):
  1274. raise
  1275. raise client_error(req.connection_key, exc) from exc
  1276. except TypeError as type_err:
  1277. # Example cause looks like this:
  1278. # TypeError: transport <asyncio.sslproto._SSLProtocolTransport
  1279. # object at 0x7f760615e460> is not supported by start_tls()
  1280. raise ClientConnectionError(
  1281. "Cannot initialize a TLS-in-TLS connection to host "
  1282. f"{req.host!s}:{req.port:d} through an underlying connection "
  1283. f"to an HTTPS proxy {req.proxy!s} ssl:{req.ssl or 'default'} "
  1284. f"[{type_err!s}]"
  1285. ) from type_err
  1286. else:
  1287. if tls_transport is None:
  1288. msg = "Failed to start TLS (possibly caused by closing transport)"
  1289. raise client_error(req.connection_key, OSError(msg))
  1290. tls_proto.connection_made(
  1291. tls_transport
  1292. ) # Kick the state machine of the new TLS protocol
  1293. return tls_transport, tls_proto
  1294. def _convert_hosts_to_addr_infos(
  1295. self, hosts: List[ResolveResult]
  1296. ) -> List[AddrInfoType]:
  1297. """Converts the list of hosts to a list of addr_infos.
  1298. The list of hosts is the result of a DNS lookup. The list of
  1299. addr_infos is the result of a call to `socket.getaddrinfo()`.
  1300. """
  1301. addr_infos: List[AddrInfoType] = []
  1302. for hinfo in hosts:
  1303. host = hinfo["host"]
  1304. is_ipv6 = ":" in host
  1305. family = socket.AF_INET6 if is_ipv6 else socket.AF_INET
  1306. if self._family and self._family != family:
  1307. continue
  1308. addr = (host, hinfo["port"], 0, 0) if is_ipv6 else (host, hinfo["port"])
  1309. addr_infos.append(
  1310. (family, socket.SOCK_STREAM, socket.IPPROTO_TCP, "", addr)
  1311. )
  1312. return addr_infos
  1313. async def _create_direct_connection(
  1314. self,
  1315. req: ClientRequest,
  1316. traces: List["Trace"],
  1317. timeout: "ClientTimeout",
  1318. *,
  1319. client_error: Type[Exception] = ClientConnectorError,
  1320. ) -> Tuple[asyncio.Transport, ResponseHandler]:
  1321. sslcontext = self._get_ssl_context(req)
  1322. fingerprint = self._get_fingerprint(req)
  1323. host = req.url.raw_host
  1324. assert host is not None
  1325. # Replace multiple trailing dots with a single one.
  1326. # A trailing dot is only present for fully-qualified domain names.
  1327. # See https://github.com/aio-libs/aiohttp/pull/7364.
  1328. if host.endswith(".."):
  1329. host = host.rstrip(".") + "."
  1330. port = req.port
  1331. assert port is not None
  1332. try:
  1333. # Cancelling this lookup should not cancel the underlying lookup
  1334. # or else the cancel event will get broadcast to all the waiters
  1335. # across all connections.
  1336. hosts = await self._resolve_host(host, port, traces=traces)
  1337. except OSError as exc:
  1338. if exc.errno is None and isinstance(exc, asyncio.TimeoutError):
  1339. raise
  1340. # in case of proxy it is not ClientProxyConnectionError
  1341. # it is problem of resolving proxy ip itself
  1342. raise ClientConnectorDNSError(req.connection_key, exc) from exc
  1343. last_exc: Optional[Exception] = None
  1344. addr_infos = self._convert_hosts_to_addr_infos(hosts)
  1345. while addr_infos:
  1346. # Strip trailing dots, certificates contain FQDN without dots.
  1347. # See https://github.com/aio-libs/aiohttp/issues/3636
  1348. server_hostname = (
  1349. (req.server_hostname or host).rstrip(".") if sslcontext else None
  1350. )
  1351. try:
  1352. transp, proto = await self._wrap_create_connection(
  1353. self._factory,
  1354. timeout=timeout,
  1355. ssl=sslcontext,
  1356. addr_infos=addr_infos,
  1357. server_hostname=server_hostname,
  1358. req=req,
  1359. client_error=client_error,
  1360. )
  1361. except (ClientConnectorError, asyncio.TimeoutError) as exc:
  1362. last_exc = exc
  1363. aiohappyeyeballs.pop_addr_infos_interleave(addr_infos, self._interleave)
  1364. continue
  1365. if req.is_ssl() and fingerprint:
  1366. try:
  1367. fingerprint.check(transp)
  1368. except ServerFingerprintMismatch as exc:
  1369. transp.close()
  1370. if not self._cleanup_closed_disabled:
  1371. self._cleanup_closed_transports.append(transp)
  1372. last_exc = exc
  1373. # Remove the bad peer from the list of addr_infos
  1374. sock: socket.socket = transp.get_extra_info("socket")
  1375. bad_peer = sock.getpeername()
  1376. aiohappyeyeballs.remove_addr_infos(addr_infos, bad_peer)
  1377. continue
  1378. return transp, proto
  1379. else:
  1380. assert last_exc is not None
  1381. raise last_exc
  1382. async def _create_proxy_connection(
  1383. self, req: ClientRequest, traces: List["Trace"], timeout: "ClientTimeout"
  1384. ) -> Tuple[asyncio.BaseTransport, ResponseHandler]:
  1385. self._fail_on_no_start_tls(req)
  1386. runtime_has_start_tls = self._loop_supports_start_tls()
  1387. proxy_req = self._update_proxy_auth_header_and_build_proxy_req(req)
  1388. # create connection to proxy server
  1389. transport, proto = await self._create_direct_connection(
  1390. proxy_req, [], timeout, client_error=ClientProxyConnectionError
  1391. )
  1392. if req.is_ssl():
  1393. if runtime_has_start_tls:
  1394. self._warn_about_tls_in_tls(transport, req)
  1395. # For HTTPS requests over HTTP proxy
  1396. # we must notify proxy to tunnel connection
  1397. # so we send CONNECT command:
  1398. # CONNECT www.python.org:443 HTTP/1.1
  1399. # Host: www.python.org
  1400. #
  1401. # next we must do TLS handshake and so on
  1402. # to do this we must wrap raw socket into secure one
  1403. # asyncio handles this perfectly
  1404. proxy_req.method = hdrs.METH_CONNECT
  1405. proxy_req.url = req.url
  1406. key = req.connection_key._replace(
  1407. proxy=None, proxy_auth=None, proxy_headers_hash=None
  1408. )
  1409. conn = _ConnectTunnelConnection(self, key, proto, self._loop)
  1410. proxy_resp = await proxy_req.send(conn)
  1411. try:
  1412. protocol = conn._protocol
  1413. assert protocol is not None
  1414. # read_until_eof=True will ensure the connection isn't closed
  1415. # once the response is received and processed allowing
  1416. # START_TLS to work on the connection below.
  1417. protocol.set_response_params(
  1418. read_until_eof=runtime_has_start_tls,
  1419. timeout_ceil_threshold=self._timeout_ceil_threshold,
  1420. )
  1421. resp = await proxy_resp.start(conn)
  1422. except BaseException:
  1423. proxy_resp.close()
  1424. conn.close()
  1425. raise
  1426. else:
  1427. conn._protocol = None
  1428. try:
  1429. if resp.status != 200:
  1430. message = resp.reason
  1431. if message is None:
  1432. message = HTTPStatus(resp.status).phrase
  1433. raise ClientHttpProxyError(
  1434. proxy_resp.request_info,
  1435. resp.history,
  1436. status=resp.status,
  1437. message=message,
  1438. headers=resp.headers,
  1439. )
  1440. if not runtime_has_start_tls:
  1441. rawsock = transport.get_extra_info("socket", default=None)
  1442. if rawsock is None:
  1443. raise RuntimeError(
  1444. "Transport does not expose socket instance"
  1445. )
  1446. # Duplicate the socket, so now we can close proxy transport
  1447. rawsock = rawsock.dup()
  1448. except BaseException:
  1449. # It shouldn't be closed in `finally` because it's fed to
  1450. # `loop.start_tls()` and the docs say not to touch it after
  1451. # passing there.
  1452. transport.close()
  1453. raise
  1454. finally:
  1455. if not runtime_has_start_tls:
  1456. transport.close()
  1457. if not runtime_has_start_tls:
  1458. # HTTP proxy with support for upgrade to HTTPS
  1459. sslcontext = self._get_ssl_context(req)
  1460. return await self._wrap_existing_connection(
  1461. self._factory,
  1462. timeout=timeout,
  1463. ssl=sslcontext,
  1464. sock=rawsock,
  1465. server_hostname=req.host,
  1466. req=req,
  1467. )
  1468. return await self._start_tls_connection(
  1469. # Access the old transport for the last time before it's
  1470. # closed and forgotten forever:
  1471. transport,
  1472. req=req,
  1473. timeout=timeout,
  1474. )
  1475. finally:
  1476. proxy_resp.close()
  1477. return transport, proto
  1478. class UnixConnector(BaseConnector):
  1479. """Unix socket connector.
  1480. path - Unix socket path.
  1481. keepalive_timeout - (optional) Keep-alive timeout.
  1482. force_close - Set to True to force close and do reconnect
  1483. after each request (and between redirects).
  1484. limit - The total number of simultaneous connections.
  1485. limit_per_host - Number of simultaneous connections to one host.
  1486. loop - Optional event loop.
  1487. """
  1488. allowed_protocol_schema_set = HIGH_LEVEL_SCHEMA_SET | frozenset({"unix"})
  1489. def __init__(
  1490. self,
  1491. path: str,
  1492. force_close: bool = False,
  1493. keepalive_timeout: Union[object, float, None] = sentinel,
  1494. limit: int = 100,
  1495. limit_per_host: int = 0,
  1496. loop: Optional[asyncio.AbstractEventLoop] = None,
  1497. ) -> None:
  1498. super().__init__(
  1499. force_close=force_close,
  1500. keepalive_timeout=keepalive_timeout,
  1501. limit=limit,
  1502. limit_per_host=limit_per_host,
  1503. loop=loop,
  1504. )
  1505. self._path = path
  1506. @property
  1507. def path(self) -> str:
  1508. """Path to unix socket."""
  1509. return self._path
  1510. async def _create_connection(
  1511. self, req: ClientRequest, traces: List["Trace"], timeout: "ClientTimeout"
  1512. ) -> ResponseHandler:
  1513. try:
  1514. async with ceil_timeout(
  1515. timeout.sock_connect, ceil_threshold=timeout.ceil_threshold
  1516. ):
  1517. _, proto = await self._loop.create_unix_connection(
  1518. self._factory, self._path
  1519. )
  1520. except OSError as exc:
  1521. if exc.errno is None and isinstance(exc, asyncio.TimeoutError):
  1522. raise
  1523. raise UnixClientConnectorError(self.path, req.connection_key, exc) from exc
  1524. return proto
  1525. class NamedPipeConnector(BaseConnector):
  1526. """Named pipe connector.
  1527. Only supported by the proactor event loop.
  1528. See also: https://docs.python.org/3/library/asyncio-eventloop.html
  1529. path - Windows named pipe path.
  1530. keepalive_timeout - (optional) Keep-alive timeout.
  1531. force_close - Set to True to force close and do reconnect
  1532. after each request (and between redirects).
  1533. limit - The total number of simultaneous connections.
  1534. limit_per_host - Number of simultaneous connections to one host.
  1535. loop - Optional event loop.
  1536. """
  1537. allowed_protocol_schema_set = HIGH_LEVEL_SCHEMA_SET | frozenset({"npipe"})
  1538. def __init__(
  1539. self,
  1540. path: str,
  1541. force_close: bool = False,
  1542. keepalive_timeout: Union[object, float, None] = sentinel,
  1543. limit: int = 100,
  1544. limit_per_host: int = 0,
  1545. loop: Optional[asyncio.AbstractEventLoop] = None,
  1546. ) -> None:
  1547. super().__init__(
  1548. force_close=force_close,
  1549. keepalive_timeout=keepalive_timeout,
  1550. limit=limit,
  1551. limit_per_host=limit_per_host,
  1552. loop=loop,
  1553. )
  1554. if not isinstance(
  1555. self._loop,
  1556. asyncio.ProactorEventLoop, # type: ignore[attr-defined]
  1557. ):
  1558. raise RuntimeError(
  1559. "Named Pipes only available in proactor loop under windows"
  1560. )
  1561. self._path = path
  1562. @property
  1563. def path(self) -> str:
  1564. """Path to the named pipe."""
  1565. return self._path
  1566. async def _create_connection(
  1567. self, req: ClientRequest, traces: List["Trace"], timeout: "ClientTimeout"
  1568. ) -> ResponseHandler:
  1569. try:
  1570. async with ceil_timeout(
  1571. timeout.sock_connect, ceil_threshold=timeout.ceil_threshold
  1572. ):
  1573. _, proto = await self._loop.create_pipe_connection( # type: ignore[attr-defined]
  1574. self._factory, self._path
  1575. )
  1576. # the drain is required so that the connection_made is called
  1577. # and transport is set otherwise it is not set before the
  1578. # `assert conn.transport is not None`
  1579. # in client.py's _request method
  1580. await asyncio.sleep(0)
  1581. # other option is to manually set transport like
  1582. # `proto.transport = trans`
  1583. except OSError as exc:
  1584. if exc.errno is None and isinstance(exc, asyncio.TimeoutError):
  1585. raise
  1586. raise ClientConnectorError(req.connection_key, exc) from exc
  1587. return cast(ResponseHandler, proto)