| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665 |
- import re
- import sys
- import warnings
- from collections.abc import Mapping, Sequence
- from enum import Enum
- from functools import _CacheInfo, lru_cache
- from ipaddress import ip_address
- from typing import (
- TYPE_CHECKING,
- Any,
- NoReturn,
- TypedDict,
- TypeVar,
- Union,
- cast,
- overload,
- )
- from urllib.parse import SplitResult, uses_relative
- import idna
- from multidict import MultiDict, MultiDictProxy, istr
- from propcache.api import under_cached_property as cached_property
- from ._parse import (
- USES_AUTHORITY,
- SplitURLType,
- make_netloc,
- query_to_pairs,
- split_netloc,
- split_url,
- unsplit_result,
- )
- from ._path import normalize_path, normalize_path_segments
- from ._query import (
- Query,
- QueryVariable,
- SimpleQuery,
- get_str_query,
- get_str_query_from_iterable,
- get_str_query_from_sequence_iterable,
- )
- from ._quoters import (
- FRAGMENT_QUOTER,
- FRAGMENT_REQUOTER,
- PATH_QUOTER,
- PATH_REQUOTER,
- PATH_SAFE_UNQUOTER,
- PATH_UNQUOTER,
- QS_UNQUOTER,
- QUERY_QUOTER,
- QUERY_REQUOTER,
- QUOTER,
- REQUOTER,
- UNQUOTER,
- human_quote,
- )
- try:
- from pydantic import GetCoreSchemaHandler, GetJsonSchemaHandler
- from pydantic.json_schema import JsonSchemaValue
- from pydantic_core import core_schema
- HAS_PYDANTIC = True
- except ImportError:
- HAS_PYDANTIC = False
- DEFAULT_PORTS = {"http": 80, "https": 443, "ws": 80, "wss": 443, "ftp": 21}
- USES_RELATIVE = frozenset(uses_relative)
- # Special schemes https://url.spec.whatwg.org/#special-scheme
- # are not allowed to have an empty host https://url.spec.whatwg.org/#url-representation
- SCHEME_REQUIRES_HOST = frozenset(("http", "https", "ws", "wss", "ftp"))
- # reg-name: unreserved / pct-encoded / sub-delims
- # this pattern matches anything that is *not* in those classes. and is only used
- # on lower-cased ASCII values.
- NOT_REG_NAME = re.compile(
- r"""
- # any character not in the unreserved or sub-delims sets, plus %
- # (validated with the additional check for pct-encoded sequences below)
- [^a-z0-9\-._~!$&'()*+,;=%]
- |
- # % only allowed if it is part of a pct-encoded
- # sequence of 2 hex digits.
- %(?![0-9a-f]{2})
- """,
- re.VERBOSE,
- )
- _T = TypeVar("_T")
- if sys.version_info >= (3, 11):
- from typing import Self
- else:
- Self = Any
- class UndefinedType(Enum):
- """Singleton type for use with not set sentinel values."""
- _singleton = 0
- UNDEFINED = UndefinedType._singleton
- class CacheInfo(TypedDict):
- """Host encoding cache."""
- idna_encode: _CacheInfo
- idna_decode: _CacheInfo
- ip_address: _CacheInfo
- host_validate: _CacheInfo
- encode_host: _CacheInfo
- class _InternalURLCache(TypedDict, total=False):
- _val: SplitURLType
- _origin: "URL"
- absolute: bool
- hash: int
- scheme: str
- raw_authority: str
- authority: str
- raw_user: str | None
- user: str | None
- raw_password: str | None
- password: str | None
- raw_host: str | None
- host: str | None
- host_subcomponent: str | None
- host_port_subcomponent: str | None
- port: int | None
- explicit_port: int | None
- raw_path: str
- path: str
- _parsed_query: list[tuple[str, str]]
- query: "MultiDictProxy[str]"
- raw_query_string: str
- query_string: str
- path_qs: str
- raw_path_qs: str
- raw_fragment: str
- fragment: str
- raw_parts: tuple[str, ...]
- parts: tuple[str, ...]
- parent: "URL"
- raw_name: str
- name: str
- raw_suffix: str
- suffix: str
- raw_suffixes: tuple[str, ...]
- suffixes: tuple[str, ...]
- def rewrite_module(obj: _T) -> _T:
- obj.__module__ = "yarl"
- return obj
- @lru_cache
- def encode_url(url_str: str) -> "URL":
- """Parse unencoded URL."""
- cache: _InternalURLCache = {}
- host: str | None
- scheme, netloc, path, query, fragment = split_url(url_str)
- if not netloc: # netloc
- host = ""
- else:
- if ":" in netloc or "@" in netloc or "[" in netloc:
- # Complex netloc
- username, password, host, port = split_netloc(netloc)
- else:
- username = password = port = None
- host = netloc
- if host is None:
- if scheme in SCHEME_REQUIRES_HOST:
- msg = (
- "Invalid URL: host is required for "
- f"absolute urls with the {scheme} scheme"
- )
- raise ValueError(msg)
- else:
- host = ""
- host = _encode_host(host, validate_host=False)
- # Remove brackets as host encoder adds back brackets for IPv6 addresses
- cache["raw_host"] = host[1:-1] if "[" in host else host
- cache["explicit_port"] = port
- if password is None and username is None:
- # Fast path for URLs without user, password
- netloc = host if port is None else f"{host}:{port}"
- cache["raw_user"] = None
- cache["raw_password"] = None
- else:
- raw_user = REQUOTER(username) if username else username
- raw_password = REQUOTER(password) if password else password
- netloc = make_netloc(raw_user, raw_password, host, port)
- cache["raw_user"] = raw_user
- cache["raw_password"] = raw_password
- if path:
- path = PATH_REQUOTER(path)
- if netloc and "." in path:
- path = normalize_path(path)
- if query:
- query = QUERY_REQUOTER(query)
- if fragment:
- fragment = FRAGMENT_REQUOTER(fragment)
- cache["scheme"] = scheme
- cache["raw_path"] = "/" if not path and netloc else path
- cache["raw_query_string"] = query
- cache["raw_fragment"] = fragment
- self = object.__new__(URL)
- self._scheme = scheme
- self._netloc = netloc
- self._path = path
- self._query = query
- self._fragment = fragment
- self._cache = cache
- return self
- @lru_cache
- def pre_encoded_url(url_str: str) -> "URL":
- """Parse pre-encoded URL."""
- self = object.__new__(URL)
- val = split_url(url_str)
- self._scheme, self._netloc, self._path, self._query, self._fragment = val
- self._cache = {}
- return self
- @lru_cache
- def build_pre_encoded_url(
- scheme: str,
- authority: str,
- user: str | None,
- password: str | None,
- host: str,
- port: int | None,
- path: str,
- query_string: str,
- fragment: str,
- ) -> "URL":
- """Build a pre-encoded URL from parts."""
- self = object.__new__(URL)
- self._scheme = scheme
- if authority:
- self._netloc = authority
- elif host:
- if port is not None:
- port = None if port == DEFAULT_PORTS.get(scheme) else port
- if user is None and password is None:
- self._netloc = host if port is None else f"{host}:{port}"
- else:
- self._netloc = make_netloc(user, password, host, port)
- else:
- self._netloc = ""
- self._path = path
- self._query = query_string
- self._fragment = fragment
- self._cache = {}
- return self
- def from_parts_uncached(
- scheme: str, netloc: str, path: str, query: str, fragment: str
- ) -> "URL":
- """Create a new URL from parts."""
- self = object.__new__(URL)
- self._scheme = scheme
- self._netloc = netloc
- self._path = path
- self._query = query
- self._fragment = fragment
- self._cache = {}
- return self
- from_parts = lru_cache(from_parts_uncached)
- @rewrite_module
- class URL:
- # Don't derive from str
- # follow pathlib.Path design
- # probably URL will not suffer from pathlib problems:
- # it's intended for libraries like aiohttp,
- # not to be passed into standard library functions like os.open etc.
- # URL grammar (RFC 3986)
- # pct-encoded = "%" HEXDIG HEXDIG
- # reserved = gen-delims / sub-delims
- # gen-delims = ":" / "/" / "?" / "#" / "[" / "]" / "@"
- # sub-delims = "!" / "$" / "&" / "'" / "(" / ")"
- # / "*" / "+" / "," / ";" / "="
- # unreserved = ALPHA / DIGIT / "-" / "." / "_" / "~"
- # URI = scheme ":" hier-part [ "?" query ] [ "#" fragment ]
- # hier-part = "//" authority path-abempty
- # / path-absolute
- # / path-rootless
- # / path-empty
- # scheme = ALPHA *( ALPHA / DIGIT / "+" / "-" / "." )
- # authority = [ userinfo "@" ] host [ ":" port ]
- # userinfo = *( unreserved / pct-encoded / sub-delims / ":" )
- # host = IP-literal / IPv4address / reg-name
- # IP-literal = "[" ( IPv6address / IPvFuture ) "]"
- # IPvFuture = "v" 1*HEXDIG "." 1*( unreserved / sub-delims / ":" )
- # IPv6address = 6( h16 ":" ) ls32
- # / "::" 5( h16 ":" ) ls32
- # / [ h16 ] "::" 4( h16 ":" ) ls32
- # / [ *1( h16 ":" ) h16 ] "::" 3( h16 ":" ) ls32
- # / [ *2( h16 ":" ) h16 ] "::" 2( h16 ":" ) ls32
- # / [ *3( h16 ":" ) h16 ] "::" h16 ":" ls32
- # / [ *4( h16 ":" ) h16 ] "::" ls32
- # / [ *5( h16 ":" ) h16 ] "::" h16
- # / [ *6( h16 ":" ) h16 ] "::"
- # ls32 = ( h16 ":" h16 ) / IPv4address
- # ; least-significant 32 bits of address
- # h16 = 1*4HEXDIG
- # ; 16 bits of address represented in hexadecimal
- # IPv4address = dec-octet "." dec-octet "." dec-octet "." dec-octet
- # dec-octet = DIGIT ; 0-9
- # / %x31-39 DIGIT ; 10-99
- # / "1" 2DIGIT ; 100-199
- # / "2" %x30-34 DIGIT ; 200-249
- # / "25" %x30-35 ; 250-255
- # reg-name = *( unreserved / pct-encoded / sub-delims )
- # port = *DIGIT
- # path = path-abempty ; begins with "/" or is empty
- # / path-absolute ; begins with "/" but not "//"
- # / path-noscheme ; begins with a non-colon segment
- # / path-rootless ; begins with a segment
- # / path-empty ; zero characters
- # path-abempty = *( "/" segment )
- # path-absolute = "/" [ segment-nz *( "/" segment ) ]
- # path-noscheme = segment-nz-nc *( "/" segment )
- # path-rootless = segment-nz *( "/" segment )
- # path-empty = 0<pchar>
- # segment = *pchar
- # segment-nz = 1*pchar
- # segment-nz-nc = 1*( unreserved / pct-encoded / sub-delims / "@" )
- # ; non-zero-length segment without any colon ":"
- # pchar = unreserved / pct-encoded / sub-delims / ":" / "@"
- # query = *( pchar / "/" / "?" )
- # fragment = *( pchar / "/" / "?" )
- # URI-reference = URI / relative-ref
- # relative-ref = relative-part [ "?" query ] [ "#" fragment ]
- # relative-part = "//" authority path-abempty
- # / path-absolute
- # / path-noscheme
- # / path-empty
- # absolute-URI = scheme ":" hier-part [ "?" query ]
- __slots__ = ("_cache", "_scheme", "_netloc", "_path", "_query", "_fragment")
- _cache: _InternalURLCache
- _scheme: str
- _netloc: str
- _path: str
- _query: str
- _fragment: str
- def __new__(
- cls,
- val: Union[str, SplitResult, "URL", UndefinedType] = UNDEFINED,
- *,
- encoded: bool = False,
- strict: bool | None = None,
- ) -> "URL":
- if strict is not None: # pragma: no cover
- warnings.warn("strict parameter is ignored")
- if type(val) is str:
- return pre_encoded_url(val) if encoded else encode_url(val)
- if type(val) is cls:
- return val
- if type(val) is SplitResult:
- if not encoded:
- raise ValueError("Cannot apply decoding to SplitResult")
- return from_parts(*val)
- if isinstance(val, str):
- return pre_encoded_url(str(val)) if encoded else encode_url(str(val))
- if val is UNDEFINED:
- # Special case for UNDEFINED since it might be unpickling and we do
- # not want to cache as the `__set_state__` call would mutate the URL
- # object in the `pre_encoded_url` or `encoded_url` caches.
- self = object.__new__(URL)
- self._scheme = self._netloc = self._path = self._query = self._fragment = ""
- self._cache = {}
- return self
- raise TypeError("Constructor parameter should be str")
- @classmethod
- def build(
- cls,
- *,
- scheme: str = "",
- authority: str = "",
- user: str | None = None,
- password: str | None = None,
- host: str = "",
- port: int | None = None,
- path: str = "",
- query: Query | None = None,
- query_string: str = "",
- fragment: str = "",
- encoded: bool = False,
- ) -> "URL":
- """Creates and returns a new URL"""
- if authority and (user or password or host or port):
- raise ValueError(
- 'Can\'t mix "authority" with "user", "password", "host" or "port".'
- )
- if port is not None and not isinstance(port, int):
- raise TypeError(f"The port is required to be int, got {type(port)!r}.")
- if port and not host:
- raise ValueError('Can\'t build URL with "port" but without "host".')
- if query and query_string:
- raise ValueError('Only one of "query" or "query_string" should be passed')
- if (
- scheme is None # type: ignore[redundant-expr]
- or authority is None # type: ignore[redundant-expr]
- or host is None # type: ignore[redundant-expr]
- or path is None # type: ignore[redundant-expr]
- or query_string is None # type: ignore[redundant-expr]
- or fragment is None
- ):
- raise TypeError(
- 'NoneType is illegal for "scheme", "authority", "host", "path", '
- '"query_string", and "fragment" args, use empty string instead.'
- )
- if query:
- query_string = get_str_query(query) or ""
- if encoded:
- return build_pre_encoded_url(
- scheme,
- authority,
- user,
- password,
- host,
- port,
- path,
- query_string,
- fragment,
- )
- self = object.__new__(URL)
- self._scheme = scheme
- _host: str | None = None
- if authority:
- user, password, _host, port = split_netloc(authority)
- _host = _encode_host(_host, validate_host=False) if _host else ""
- elif host:
- _host = _encode_host(host, validate_host=True)
- else:
- self._netloc = ""
- if _host is not None:
- if port is not None:
- port = None if port == DEFAULT_PORTS.get(scheme) else port
- if user is None and password is None:
- self._netloc = _host if port is None else f"{_host}:{port}"
- else:
- self._netloc = make_netloc(user, password, _host, port, True)
- path = PATH_QUOTER(path) if path else path
- if path and self._netloc:
- if "." in path:
- path = normalize_path(path)
- if path[0] != "/":
- msg = (
- "Path in a URL with authority should "
- "start with a slash ('/') if set"
- )
- raise ValueError(msg)
- self._path = path
- if not query and query_string:
- query_string = QUERY_QUOTER(query_string)
- self._query = query_string
- self._fragment = FRAGMENT_QUOTER(fragment) if fragment else fragment
- self._cache = {}
- return self
- def __init_subclass__(cls) -> NoReturn:
- raise TypeError(f"Inheriting a class {cls!r} from URL is forbidden")
- def __str__(self) -> str:
- if not self._path and self._netloc and (self._query or self._fragment):
- path = "/"
- else:
- path = self._path
- if (port := self.explicit_port) is not None and port == DEFAULT_PORTS.get(
- self._scheme
- ):
- # port normalization - using None for default ports to remove from rendering
- # https://datatracker.ietf.org/doc/html/rfc3986.html#section-6.2.3
- host = self.host_subcomponent
- netloc = make_netloc(self.raw_user, self.raw_password, host, None)
- else:
- netloc = self._netloc
- return unsplit_result(self._scheme, netloc, path, self._query, self._fragment)
- def __repr__(self) -> str:
- return f"{self.__class__.__name__}('{str(self)}')"
- def __bytes__(self) -> bytes:
- return str(self).encode("ascii")
- def __eq__(self, other: object) -> bool:
- if type(other) is not URL:
- return NotImplemented
- path1 = "/" if not self._path and self._netloc else self._path
- path2 = "/" if not other._path and other._netloc else other._path
- return (
- self._scheme == other._scheme
- and self._netloc == other._netloc
- and path1 == path2
- and self._query == other._query
- and self._fragment == other._fragment
- )
- def __hash__(self) -> int:
- if (ret := self._cache.get("hash")) is None:
- path = "/" if not self._path and self._netloc else self._path
- ret = self._cache["hash"] = hash(
- (self._scheme, self._netloc, path, self._query, self._fragment)
- )
- return ret
- def __le__(self, other: object) -> bool:
- if type(other) is not URL:
- return NotImplemented
- return self._val <= other._val
- def __lt__(self, other: object) -> bool:
- if type(other) is not URL:
- return NotImplemented
- return self._val < other._val
- def __ge__(self, other: object) -> bool:
- if type(other) is not URL:
- return NotImplemented
- return self._val >= other._val
- def __gt__(self, other: object) -> bool:
- if type(other) is not URL:
- return NotImplemented
- return self._val > other._val
- def __truediv__(self, name: str) -> "URL":
- if not isinstance(name, str):
- return NotImplemented # type: ignore[unreachable]
- return self._make_child((str(name),))
- def __mod__(self, query: Query) -> "URL":
- return self.update_query(query)
- def __bool__(self) -> bool:
- return bool(self._netloc or self._path or self._query or self._fragment)
- def __getstate__(self) -> tuple[SplitResult]:
- return (tuple.__new__(SplitResult, self._val),)
- def __setstate__(
- self, state: tuple[SplitURLType] | tuple[None, _InternalURLCache]
- ) -> None:
- if state[0] is None and isinstance(state[1], dict):
- # default style pickle
- val = state[1]["_val"]
- else:
- unused: list[object]
- val, *unused = state
- self._scheme, self._netloc, self._path, self._query, self._fragment = val
- self._cache = {}
- def _cache_netloc(self) -> None:
- """Cache the netloc parts of the URL."""
- c = self._cache
- split_loc = split_netloc(self._netloc)
- c["raw_user"], c["raw_password"], c["raw_host"], c["explicit_port"] = split_loc
- def is_absolute(self) -> bool:
- """A check for absolute URLs.
- Return True for absolute ones (having scheme or starting
- with //), False otherwise.
- Is is preferred to call the .absolute property instead
- as it is cached.
- """
- return self.absolute
- def is_default_port(self) -> bool:
- """A check for default port.
- Return True if port is default for specified scheme,
- e.g. 'http://python.org' or 'http://python.org:80', False
- otherwise.
- Return False for relative URLs.
- """
- if (explicit := self.explicit_port) is None:
- # If the explicit port is None, then the URL must be
- # using the default port unless its a relative URL
- # which does not have an implicit port / default port
- return self._netloc != ""
- return explicit == DEFAULT_PORTS.get(self._scheme)
- def origin(self) -> "URL":
- """Return an URL with scheme, host and port parts only.
- user, password, path, query and fragment are removed.
- """
- # TODO: add a keyword-only option for keeping user/pass maybe?
- return self._origin
- @cached_property
- def _val(self) -> SplitURLType:
- return (self._scheme, self._netloc, self._path, self._query, self._fragment)
- @cached_property
- def _origin(self) -> "URL":
- """Return an URL with scheme, host and port parts only.
- user, password, path, query and fragment are removed.
- """
- if not (netloc := self._netloc):
- raise ValueError("URL should be absolute")
- if not (scheme := self._scheme):
- raise ValueError("URL should have scheme")
- if "@" in netloc:
- encoded_host = self.host_subcomponent
- netloc = make_netloc(None, None, encoded_host, self.explicit_port)
- elif not self._path and not self._query and not self._fragment:
- return self
- return from_parts(scheme, netloc, "", "", "")
- def relative(self) -> "URL":
- """Return a relative part of the URL.
- scheme, user, password, host and port are removed.
- """
- if not self._netloc:
- raise ValueError("URL should be absolute")
- return from_parts("", "", self._path, self._query, self._fragment)
- @cached_property
- def absolute(self) -> bool:
- """A check for absolute URLs.
- Return True for absolute ones (having scheme or starting
- with //), False otherwise.
- """
- # `netloc`` is an empty string for relative URLs
- # Checking `netloc` is faster than checking `hostname`
- # because `hostname` is a property that does some extra work
- # to parse the host from the `netloc`
- return self._netloc != ""
- @cached_property
- def scheme(self) -> str:
- """Scheme for absolute URLs.
- Empty string for relative URLs or URLs starting with //
- """
- return self._scheme
- @cached_property
- def raw_authority(self) -> str:
- """Encoded authority part of URL.
- Empty string for relative URLs.
- """
- return self._netloc
- @cached_property
- def authority(self) -> str:
- """Decoded authority part of URL.
- Empty string for relative URLs.
- """
- return make_netloc(self.user, self.password, self.host, self.port)
- @cached_property
- def raw_user(self) -> str | None:
- """Encoded user part of URL.
- None if user is missing.
- """
- # not .username
- self._cache_netloc()
- return self._cache["raw_user"]
- @cached_property
- def user(self) -> str | None:
- """Decoded user part of URL.
- None if user is missing.
- """
- if (raw_user := self.raw_user) is None:
- return None
- return UNQUOTER(raw_user)
- @cached_property
- def raw_password(self) -> str | None:
- """Encoded password part of URL.
- None if password is missing.
- """
- self._cache_netloc()
- return self._cache["raw_password"]
- @cached_property
- def password(self) -> str | None:
- """Decoded password part of URL.
- None if password is missing.
- """
- if (raw_password := self.raw_password) is None:
- return None
- return UNQUOTER(raw_password)
- @cached_property
- def raw_host(self) -> str | None:
- """Encoded host part of URL.
- None for relative URLs.
- When working with IPv6 addresses, use the `host_subcomponent` property instead
- as it will return the host subcomponent with brackets.
- """
- # Use host instead of hostname for sake of shortness
- # May add .hostname prop later
- self._cache_netloc()
- return self._cache["raw_host"]
- @cached_property
- def host(self) -> str | None:
- """Decoded host part of URL.
- None for relative URLs.
- """
- if (raw := self.raw_host) is None:
- return None
- if raw and raw[-1].isdigit() or ":" in raw:
- # IP addresses are never IDNA encoded
- return raw
- return _idna_decode(raw)
- @cached_property
- def host_subcomponent(self) -> str | None:
- """Return the host subcomponent part of URL.
- None for relative URLs.
- https://datatracker.ietf.org/doc/html/rfc3986#section-3.2.2
- `IP-literal = "[" ( IPv6address / IPvFuture ) "]"`
- Examples:
- - `http://example.com:8080` -> `example.com`
- - `http://example.com:80` -> `example.com`
- - `https://127.0.0.1:8443` -> `127.0.0.1`
- - `https://[::1]:8443` -> `[::1]`
- - `http://[::1]` -> `[::1]`
- """
- if (raw := self.raw_host) is None:
- return None
- return f"[{raw}]" if ":" in raw else raw
- @cached_property
- def host_port_subcomponent(self) -> str | None:
- """Return the host and port subcomponent part of URL.
- Trailing dots are removed from the host part.
- This value is suitable for use in the Host header of an HTTP request.
- None for relative URLs.
- https://datatracker.ietf.org/doc/html/rfc3986#section-3.2.2
- `IP-literal = "[" ( IPv6address / IPvFuture ) "]"`
- https://datatracker.ietf.org/doc/html/rfc3986#section-3.2.3
- port = *DIGIT
- Examples:
- - `http://example.com:8080` -> `example.com:8080`
- - `http://example.com:80` -> `example.com`
- - `http://example.com.:80` -> `example.com`
- - `https://127.0.0.1:8443` -> `127.0.0.1:8443`
- - `https://[::1]:8443` -> `[::1]:8443`
- - `http://[::1]` -> `[::1]`
- """
- if (raw := self.raw_host) is None:
- return None
- if raw[-1] == ".":
- # Remove all trailing dots from the netloc as while
- # they are valid FQDNs in DNS, TLS validation fails.
- # See https://github.com/aio-libs/aiohttp/issues/3636.
- # To avoid string manipulation we only call rstrip if
- # the last character is a dot.
- raw = raw.rstrip(".")
- port = self.explicit_port
- if port is None or port == DEFAULT_PORTS.get(self._scheme):
- return f"[{raw}]" if ":" in raw else raw
- return f"[{raw}]:{port}" if ":" in raw else f"{raw}:{port}"
- @cached_property
- def port(self) -> int | None:
- """Port part of URL, with scheme-based fallback.
- None for relative URLs or URLs without explicit port and
- scheme without default port substitution.
- """
- if (explicit_port := self.explicit_port) is not None:
- return explicit_port
- return DEFAULT_PORTS.get(self._scheme)
- @cached_property
- def explicit_port(self) -> int | None:
- """Port part of URL, without scheme-based fallback.
- None for relative URLs or URLs without explicit port.
- """
- self._cache_netloc()
- return self._cache["explicit_port"]
- @cached_property
- def raw_path(self) -> str:
- """Encoded path of URL.
- / for absolute URLs without path part.
- """
- return self._path if self._path or not self._netloc else "/"
- @cached_property
- def path(self) -> str:
- """Decoded path of URL.
- / for absolute URLs without path part.
- """
- return PATH_UNQUOTER(self._path) if self._path else "/" if self._netloc else ""
- @cached_property
- def path_safe(self) -> str:
- """Decoded path of URL.
- / for absolute URLs without path part.
- / (%2F) and % (%25) are not decoded
- """
- if self._path:
- return PATH_SAFE_UNQUOTER(self._path)
- return "/" if self._netloc else ""
- @cached_property
- def _parsed_query(self) -> list[tuple[str, str]]:
- """Parse query part of URL."""
- return query_to_pairs(self._query)
- @cached_property
- def query(self) -> "MultiDictProxy[str]":
- """A MultiDictProxy representing parsed query parameters in decoded
- representation.
- Empty value if URL has no query part.
- """
- return MultiDictProxy(MultiDict(self._parsed_query))
- @cached_property
- def raw_query_string(self) -> str:
- """Encoded query part of URL.
- Empty string if query is missing.
- """
- return self._query
- @cached_property
- def query_string(self) -> str:
- """Decoded query part of URL.
- Empty string if query is missing.
- """
- return QS_UNQUOTER(self._query) if self._query else ""
- @cached_property
- def path_qs(self) -> str:
- """Decoded path of URL with query."""
- return self.path if not (q := self.query_string) else f"{self.path}?{q}"
- @cached_property
- def raw_path_qs(self) -> str:
- """Encoded path of URL with query."""
- if q := self._query:
- return f"{self._path}?{q}" if self._path or not self._netloc else f"/?{q}"
- return self._path if self._path or not self._netloc else "/"
- @cached_property
- def raw_fragment(self) -> str:
- """Encoded fragment part of URL.
- Empty string if fragment is missing.
- """
- return self._fragment
- @cached_property
- def fragment(self) -> str:
- """Decoded fragment part of URL.
- Empty string if fragment is missing.
- """
- return UNQUOTER(self._fragment) if self._fragment else ""
- @cached_property
- def raw_parts(self) -> tuple[str, ...]:
- """A tuple containing encoded *path* parts.
- ('/',) for absolute URLs if *path* is missing.
- """
- path = self._path
- if self._netloc:
- return ("/", *path[1:].split("/")) if path else ("/",)
- if path and path[0] == "/":
- return ("/", *path[1:].split("/"))
- return tuple(path.split("/"))
- @cached_property
- def parts(self) -> tuple[str, ...]:
- """A tuple containing decoded *path* parts.
- ('/',) for absolute URLs if *path* is missing.
- """
- return tuple(UNQUOTER(part) for part in self.raw_parts)
- @cached_property
- def parent(self) -> "URL":
- """A new URL with last part of path removed and cleaned up query and
- fragment.
- """
- path = self._path
- if not path or path == "/":
- if self._fragment or self._query:
- return from_parts(self._scheme, self._netloc, path, "", "")
- return self
- parts = path.split("/")
- return from_parts(self._scheme, self._netloc, "/".join(parts[:-1]), "", "")
- @cached_property
- def raw_name(self) -> str:
- """The last part of raw_parts."""
- parts = self.raw_parts
- if not self._netloc:
- return parts[-1]
- parts = parts[1:]
- return parts[-1] if parts else ""
- @cached_property
- def name(self) -> str:
- """The last part of parts."""
- return UNQUOTER(self.raw_name)
- @cached_property
- def raw_suffix(self) -> str:
- name = self.raw_name
- i = name.rfind(".")
- return name[i:] if 0 < i < len(name) - 1 else ""
- @cached_property
- def suffix(self) -> str:
- return UNQUOTER(self.raw_suffix)
- @cached_property
- def raw_suffixes(self) -> tuple[str, ...]:
- name = self.raw_name
- if name.endswith("."):
- return ()
- name = name.lstrip(".")
- return tuple("." + suffix for suffix in name.split(".")[1:])
- @cached_property
- def suffixes(self) -> tuple[str, ...]:
- return tuple(UNQUOTER(suffix) for suffix in self.raw_suffixes)
- def _make_child(self, paths: "Sequence[str]", encoded: bool = False) -> "URL":
- """
- add paths to self._path, accounting for absolute vs relative paths,
- keep existing, but do not create new, empty segments
- """
- parsed: list[str] = []
- needs_normalize: bool = False
- for idx, path in enumerate(reversed(paths)):
- # empty segment of last is not removed
- last = idx == 0
- if path and path[0] == "/":
- raise ValueError(
- f"Appending path {path!r} starting from slash is forbidden"
- )
- # We need to quote the path if it is not already encoded
- # This cannot be done at the end because the existing
- # path is already quoted and we do not want to double quote
- # the existing path.
- path = path if encoded else PATH_QUOTER(path)
- needs_normalize |= "." in path
- segments = path.split("/")
- segments.reverse()
- # remove trailing empty segment for all but the last path
- parsed += segments[1:] if not last and segments[0] == "" else segments
- if (path := self._path) and (old_segments := path.split("/")):
- # If the old path ends with a slash, the last segment is an empty string
- # and should be removed before adding the new path segments.
- old = old_segments[:-1] if old_segments[-1] == "" else old_segments
- old.reverse()
- parsed += old
- # If the netloc is present, inject a leading slash when adding a
- # path to an absolute URL where there was none before.
- if (netloc := self._netloc) and parsed and parsed[-1] != "":
- parsed.append("")
- parsed.reverse()
- if not netloc or not needs_normalize:
- return from_parts(self._scheme, netloc, "/".join(parsed), "", "")
- path = "/".join(normalize_path_segments(parsed))
- # If normalizing the path segments removed the leading slash, add it back.
- if path and path[0] != "/":
- path = f"/{path}"
- return from_parts(self._scheme, netloc, path, "", "")
- def with_scheme(self, scheme: str) -> "URL":
- """Return a new URL with scheme replaced."""
- # N.B. doesn't cleanup query/fragment
- if not isinstance(scheme, str):
- raise TypeError("Invalid scheme type")
- lower_scheme = scheme.lower()
- netloc = self._netloc
- if not netloc and lower_scheme in SCHEME_REQUIRES_HOST:
- msg = (
- "scheme replacement is not allowed for "
- f"relative URLs for the {lower_scheme} scheme"
- )
- raise ValueError(msg)
- return from_parts(lower_scheme, netloc, self._path, self._query, self._fragment)
- def with_user(self, user: str | None) -> "URL":
- """Return a new URL with user replaced.
- Autoencode user if needed.
- Clear user/password if user is None.
- """
- # N.B. doesn't cleanup query/fragment
- if user is None:
- password = None
- elif isinstance(user, str):
- user = QUOTER(user)
- password = self.raw_password
- else:
- raise TypeError("Invalid user type")
- if not (netloc := self._netloc):
- raise ValueError("user replacement is not allowed for relative URLs")
- encoded_host = self.host_subcomponent or ""
- netloc = make_netloc(user, password, encoded_host, self.explicit_port)
- return from_parts(self._scheme, netloc, self._path, self._query, self._fragment)
- def with_password(self, password: str | None) -> "URL":
- """Return a new URL with password replaced.
- Autoencode password if needed.
- Clear password if argument is None.
- """
- # N.B. doesn't cleanup query/fragment
- if password is None:
- pass
- elif isinstance(password, str):
- password = QUOTER(password)
- else:
- raise TypeError("Invalid password type")
- if not (netloc := self._netloc):
- raise ValueError("password replacement is not allowed for relative URLs")
- encoded_host = self.host_subcomponent or ""
- port = self.explicit_port
- netloc = make_netloc(self.raw_user, password, encoded_host, port)
- return from_parts(self._scheme, netloc, self._path, self._query, self._fragment)
- def with_host(self, host: str) -> "URL":
- """Return a new URL with host replaced.
- Autoencode host if needed.
- Changing host for relative URLs is not allowed, use .join()
- instead.
- """
- # N.B. doesn't cleanup query/fragment
- if not isinstance(host, str):
- raise TypeError("Invalid host type")
- if not (netloc := self._netloc):
- raise ValueError("host replacement is not allowed for relative URLs")
- if not host:
- raise ValueError("host removing is not allowed")
- encoded_host = _encode_host(host, validate_host=True) if host else ""
- port = self.explicit_port
- netloc = make_netloc(self.raw_user, self.raw_password, encoded_host, port)
- return from_parts(self._scheme, netloc, self._path, self._query, self._fragment)
- def with_port(self, port: int | None) -> "URL":
- """Return a new URL with port replaced.
- Clear port to default if None is passed.
- """
- # N.B. doesn't cleanup query/fragment
- if port is not None:
- if isinstance(port, bool) or not isinstance(port, int):
- raise TypeError(f"port should be int or None, got {type(port)}")
- if not (0 <= port <= 65535):
- raise ValueError(f"port must be between 0 and 65535, got {port}")
- if not (netloc := self._netloc):
- raise ValueError("port replacement is not allowed for relative URLs")
- encoded_host = self.host_subcomponent or ""
- netloc = make_netloc(self.raw_user, self.raw_password, encoded_host, port)
- return from_parts(self._scheme, netloc, self._path, self._query, self._fragment)
- def with_path(
- self,
- path: str,
- *,
- encoded: bool = False,
- keep_query: bool = False,
- keep_fragment: bool = False,
- ) -> "URL":
- """Return a new URL with path replaced."""
- netloc = self._netloc
- if not encoded:
- path = PATH_QUOTER(path)
- if netloc:
- path = normalize_path(path) if "." in path else path
- if path and path[0] != "/":
- path = f"/{path}"
- query = self._query if keep_query else ""
- fragment = self._fragment if keep_fragment else ""
- return from_parts(self._scheme, netloc, path, query, fragment)
- @overload
- def with_query(self, query: Query) -> "URL": ...
- @overload
- def with_query(self, **kwargs: QueryVariable) -> "URL": ...
- def with_query(self, *args: Any, **kwargs: Any) -> "URL":
- """Return a new URL with query part replaced.
- Accepts any Mapping (e.g. dict, multidict.MultiDict instances)
- or str, autoencode the argument if needed.
- A sequence of (key, value) pairs is supported as well.
- It also can take an arbitrary number of keyword arguments.
- Clear query if None is passed.
- """
- # N.B. doesn't cleanup query/fragment
- query = get_str_query(*args, **kwargs) or ""
- return from_parts_uncached(
- self._scheme, self._netloc, self._path, query, self._fragment
- )
- @overload
- def extend_query(self, query: Query) -> "URL": ...
- @overload
- def extend_query(self, **kwargs: QueryVariable) -> "URL": ...
- def extend_query(self, *args: Any, **kwargs: Any) -> "URL":
- """Return a new URL with query part combined with the existing.
- This method will not remove existing query parameters.
- Example:
- >>> url = URL('http://example.com/?a=1&b=2')
- >>> url.extend_query(a=3, c=4)
- URL('http://example.com/?a=1&b=2&a=3&c=4')
- """
- if not (new_query := get_str_query(*args, **kwargs)):
- return self
- if query := self._query:
- # both strings are already encoded so we can use a simple
- # string join
- query += new_query if query[-1] == "&" else f"&{new_query}"
- else:
- query = new_query
- return from_parts_uncached(
- self._scheme, self._netloc, self._path, query, self._fragment
- )
- @overload
- def update_query(self, query: Query) -> "URL": ...
- @overload
- def update_query(self, **kwargs: QueryVariable) -> "URL": ...
- def update_query(self, *args: Any, **kwargs: Any) -> "URL":
- """Return a new URL with query part updated.
- This method will overwrite existing query parameters.
- Example:
- >>> url = URL('http://example.com/?a=1&b=2')
- >>> url.update_query(a=3, c=4)
- URL('http://example.com/?a=3&b=2&c=4')
- """
- in_query: (
- str
- | Mapping[str, QueryVariable]
- | Sequence[tuple[str | istr, SimpleQuery]]
- | None
- )
- if kwargs:
- if args:
- msg = "Either kwargs or single query parameter must be present"
- raise ValueError(msg)
- in_query = kwargs
- elif len(args) == 1:
- in_query = args[0]
- else:
- raise ValueError("Either kwargs or single query parameter must be present")
- if in_query is None:
- query = ""
- elif not in_query:
- query = self._query
- elif isinstance(in_query, Mapping):
- qm: MultiDict[QueryVariable] = MultiDict(self._parsed_query)
- qm.update(in_query)
- query = get_str_query_from_sequence_iterable(qm.items())
- elif isinstance(in_query, str):
- qstr: MultiDict[str] = MultiDict(self._parsed_query)
- qstr.update(query_to_pairs(in_query))
- query = get_str_query_from_iterable(qstr.items())
- elif isinstance(in_query, (bytes, bytearray, memoryview)):
- msg = "Invalid query type: bytes, bytearray and memoryview are forbidden"
- raise TypeError(msg)
- elif isinstance(in_query, Sequence):
- # We don't expect sequence values if we're given a list of pairs
- # already; only mappings like builtin `dict` which can't have the
- # same key pointing to multiple values are allowed to use
- # `_query_seq_pairs`.
- if TYPE_CHECKING:
- in_query = cast(
- Sequence[tuple[Union[str, istr], SimpleQuery]], in_query
- )
- qs: MultiDict[SimpleQuery] = MultiDict(self._parsed_query)
- qs.update(in_query)
- query = get_str_query_from_iterable(qs.items())
- else:
- raise TypeError(
- "Invalid query type: only str, mapping or "
- "sequence of (key, value) pairs is allowed"
- )
- return from_parts_uncached(
- self._scheme, self._netloc, self._path, query, self._fragment
- )
- def without_query_params(self, *query_params: str) -> "URL":
- """Remove some keys from query part and return new URL."""
- params_to_remove = set(query_params) & self.query.keys()
- if not params_to_remove:
- return self
- return self.with_query(
- tuple(
- (name, value)
- for name, value in self.query.items()
- if name not in params_to_remove
- )
- )
- def with_fragment(self, fragment: str | None) -> "URL":
- """Return a new URL with fragment replaced.
- Autoencode fragment if needed.
- Clear fragment to default if None is passed.
- """
- # N.B. doesn't cleanup query/fragment
- if fragment is None:
- raw_fragment = ""
- elif not isinstance(fragment, str):
- raise TypeError("Invalid fragment type")
- else:
- raw_fragment = FRAGMENT_QUOTER(fragment)
- if self._fragment == raw_fragment:
- return self
- return from_parts(
- self._scheme, self._netloc, self._path, self._query, raw_fragment
- )
- def with_name(
- self,
- name: str,
- *,
- keep_query: bool = False,
- keep_fragment: bool = False,
- ) -> "URL":
- """Return a new URL with name (last part of path) replaced.
- Query and fragment parts are cleaned up.
- Name is encoded if needed.
- """
- # N.B. DOES cleanup query/fragment
- if not isinstance(name, str):
- raise TypeError("Invalid name type")
- if "/" in name:
- raise ValueError("Slash in name is not allowed")
- name = PATH_QUOTER(name)
- if name in (".", ".."):
- raise ValueError(". and .. values are forbidden")
- parts = list(self.raw_parts)
- if netloc := self._netloc:
- if len(parts) == 1:
- parts.append(name)
- else:
- parts[-1] = name
- parts[0] = "" # replace leading '/'
- else:
- parts[-1] = name
- if parts[0] == "/":
- parts[0] = "" # replace leading '/'
- query = self._query if keep_query else ""
- fragment = self._fragment if keep_fragment else ""
- return from_parts(self._scheme, netloc, "/".join(parts), query, fragment)
- def with_suffix(
- self,
- suffix: str,
- *,
- keep_query: bool = False,
- keep_fragment: bool = False,
- ) -> "URL":
- """Return a new URL with suffix (file extension of name) replaced.
- Query and fragment parts are cleaned up.
- suffix is encoded if needed.
- """
- if not isinstance(suffix, str):
- raise TypeError("Invalid suffix type")
- if suffix and not suffix[0] == "." or suffix == "." or "/" in suffix:
- raise ValueError(f"Invalid suffix {suffix!r}")
- name = self.raw_name
- if not name:
- raise ValueError(f"{self!r} has an empty name")
- old_suffix = self.raw_suffix
- suffix = PATH_QUOTER(suffix)
- name = name + suffix if not old_suffix else name[: -len(old_suffix)] + suffix
- if name in (".", ".."):
- raise ValueError(". and .. values are forbidden")
- parts = list(self.raw_parts)
- if netloc := self._netloc:
- if len(parts) == 1:
- parts.append(name)
- else:
- parts[-1] = name
- parts[0] = "" # replace leading '/'
- else:
- parts[-1] = name
- if parts[0] == "/":
- parts[0] = "" # replace leading '/'
- query = self._query if keep_query else ""
- fragment = self._fragment if keep_fragment else ""
- return from_parts(self._scheme, netloc, "/".join(parts), query, fragment)
- def join(self, url: "URL") -> "URL":
- """Join URLs
- Construct a full (“absolute”) URL by combining a “base URL”
- (self) with another URL (url).
- Informally, this uses components of the base URL, in
- particular the addressing scheme, the network location and
- (part of) the path, to provide missing components in the
- relative URL.
- """
- if type(url) is not URL:
- raise TypeError("url should be URL")
- scheme = url._scheme or self._scheme
- if scheme != self._scheme or scheme not in USES_RELATIVE:
- return url
- # scheme is in uses_authority as uses_authority is a superset of uses_relative
- if (join_netloc := url._netloc) and scheme in USES_AUTHORITY:
- return from_parts(scheme, join_netloc, url._path, url._query, url._fragment)
- orig_path = self._path
- if join_path := url._path:
- if join_path[0] == "/":
- path = join_path
- elif not orig_path:
- path = f"/{join_path}"
- elif orig_path[-1] == "/":
- path = f"{orig_path}{join_path}"
- else:
- # …
- # and relativizing ".."
- # parts[0] is / for absolute urls,
- # this join will add a double slash there
- path = "/".join([*self.parts[:-1], ""]) + join_path
- # which has to be removed
- if orig_path[0] == "/":
- path = path[1:]
- path = normalize_path(path) if "." in path else path
- else:
- path = orig_path
- return from_parts(
- scheme,
- self._netloc,
- path,
- url._query if join_path or url._query else self._query,
- url._fragment if join_path or url._fragment else self._fragment,
- )
- def joinpath(self, *other: str, encoded: bool = False) -> "URL":
- """Return a new URL with the elements in other appended to the path."""
- return self._make_child(other, encoded=encoded)
- def human_repr(self) -> str:
- """Return decoded human readable string for URL representation."""
- user = human_quote(self.user, "#/:?@[]")
- password = human_quote(self.password, "#/:?@[]")
- if (host := self.host) and ":" in host:
- host = f"[{host}]"
- path = human_quote(self.path, "#?")
- if TYPE_CHECKING:
- assert path is not None
- query_string = "&".join(
- "{}={}".format(human_quote(k, "#&+;="), human_quote(v, "#&+;="))
- for k, v in self.query.items()
- )
- fragment = human_quote(self.fragment, "")
- if TYPE_CHECKING:
- assert fragment is not None
- netloc = make_netloc(user, password, host, self.explicit_port)
- return unsplit_result(self._scheme, netloc, path, query_string, fragment)
- if HAS_PYDANTIC: # pragma: no cover
- # Borrowed from https://docs.pydantic.dev/latest/concepts/types/#handling-third-party-types
- @classmethod
- def __get_pydantic_json_schema__(
- cls, core_schema: core_schema.CoreSchema, handler: GetJsonSchemaHandler
- ) -> JsonSchemaValue:
- field_schema: dict[str, Any] = {}
- field_schema.update(type="string", format="uri")
- return field_schema
- @classmethod
- def __get_pydantic_core_schema__(
- cls, source_type: type[Self] | type[str], handler: GetCoreSchemaHandler
- ) -> core_schema.CoreSchema:
- from_str_schema = core_schema.chain_schema(
- [
- core_schema.str_schema(),
- core_schema.no_info_plain_validator_function(URL),
- ]
- )
- return core_schema.json_or_python_schema(
- json_schema=from_str_schema,
- python_schema=core_schema.union_schema(
- [
- # check if it's an instance first before doing any further work
- core_schema.is_instance_schema(URL),
- from_str_schema,
- ]
- ),
- serialization=core_schema.plain_serializer_function_ser_schema(str),
- )
- _DEFAULT_IDNA_SIZE = 256
- _DEFAULT_ENCODE_SIZE = 512
- @lru_cache(_DEFAULT_IDNA_SIZE)
- def _idna_decode(raw: str) -> str:
- try:
- return idna.decode(raw.encode("ascii"))
- except UnicodeError: # e.g. '::1'
- return raw.encode("ascii").decode("idna")
- @lru_cache(_DEFAULT_IDNA_SIZE)
- def _idna_encode(host: str) -> str:
- try:
- return idna.encode(host, uts46=True).decode("ascii")
- except UnicodeError:
- return host.encode("idna").decode("ascii")
- @lru_cache(_DEFAULT_ENCODE_SIZE)
- def _encode_host(host: str, validate_host: bool) -> str:
- """Encode host part of URL."""
- # If the host ends with a digit or contains a colon, its likely
- # an IP address.
- if host and (host[-1].isdigit() or ":" in host):
- raw_ip, sep, zone = host.partition("%")
- # If it looks like an IP, we check with _ip_compressed_version
- # and fall-through if its not an IP address. This is a performance
- # optimization to avoid parsing IP addresses as much as possible
- # because it is orders of magnitude slower than almost any other
- # operation this library does.
- # Might be an IP address, check it
- #
- # IP Addresses can look like:
- # https://datatracker.ietf.org/doc/html/rfc3986#section-3.2.2
- # - 127.0.0.1 (last character is a digit)
- # - 2001:db8::ff00:42:8329 (contains a colon)
- # - 2001:db8::ff00:42:8329%eth0 (contains a colon)
- # - [2001:db8::ff00:42:8329] (contains a colon -- brackets should
- # have been removed before it gets here)
- # Rare IP Address formats are not supported per:
- # https://datatracker.ietf.org/doc/html/rfc3986#section-7.4
- #
- # IP parsing is slow, so its wrapped in an LRU
- try:
- ip = ip_address(raw_ip)
- except ValueError:
- pass
- else:
- # These checks should not happen in the
- # LRU to keep the cache size small
- host = ip.compressed
- if ip.version == 6:
- return f"[{host}%{zone}]" if sep else f"[{host}]"
- return f"{host}%{zone}" if sep else host
- # IDNA encoding is slow, skip it for ASCII-only strings
- if host.isascii():
- # Check for invalid characters explicitly; _idna_encode() does this
- # for non-ascii host names.
- host = host.lower()
- if validate_host and (invalid := NOT_REG_NAME.search(host)):
- value, pos, extra = invalid.group(), invalid.start(), ""
- if value == "@" or (value == ":" and "@" in host[pos:]):
- # this looks like an authority string
- extra = (
- ", if the value includes a username or password, "
- "use 'authority' instead of 'host'"
- )
- raise ValueError(
- f"Host {host!r} cannot contain {value!r} (at position {pos}){extra}"
- ) from None
- return host
- return _idna_encode(host)
- @rewrite_module
- def cache_clear() -> None:
- """Clear all LRU caches."""
- _idna_encode.cache_clear()
- _idna_decode.cache_clear()
- _encode_host.cache_clear()
- @rewrite_module
- def cache_info() -> CacheInfo:
- """Report cache statistics."""
- return {
- "idna_encode": _idna_encode.cache_info(),
- "idna_decode": _idna_decode.cache_info(),
- "ip_address": _encode_host.cache_info(),
- "host_validate": _encode_host.cache_info(),
- "encode_host": _encode_host.cache_info(),
- }
- @rewrite_module
- def cache_configure(
- *,
- idna_encode_size: int | None = _DEFAULT_IDNA_SIZE,
- idna_decode_size: int | None = _DEFAULT_IDNA_SIZE,
- ip_address_size: int | None | UndefinedType = UNDEFINED,
- host_validate_size: int | None | UndefinedType = UNDEFINED,
- encode_host_size: int | None | UndefinedType = UNDEFINED,
- ) -> None:
- """Configure LRU cache sizes."""
- global _idna_decode, _idna_encode, _encode_host
- # ip_address_size, host_validate_size are no longer
- # used, but are kept for backwards compatibility.
- if ip_address_size is not UNDEFINED or host_validate_size is not UNDEFINED:
- warnings.warn(
- "cache_configure() no longer accepts the "
- "ip_address_size or host_validate_size arguments, "
- "they are used to set the encode_host_size instead "
- "and will be removed in the future",
- DeprecationWarning,
- stacklevel=2,
- )
- if encode_host_size is not None:
- for size in (ip_address_size, host_validate_size):
- if size is None:
- encode_host_size = None
- elif encode_host_size is UNDEFINED:
- if size is not UNDEFINED:
- encode_host_size = size
- elif size is not UNDEFINED:
- if TYPE_CHECKING:
- assert isinstance(size, int)
- assert isinstance(encode_host_size, int)
- encode_host_size = max(size, encode_host_size)
- if encode_host_size is UNDEFINED:
- encode_host_size = _DEFAULT_ENCODE_SIZE
- _encode_host = lru_cache(encode_host_size)(_encode_host.__wrapped__)
- _idna_decode = lru_cache(idna_decode_size)(_idna_decode.__wrapped__)
- _idna_encode = lru_cache(idna_encode_size)(_idna_encode.__wrapped__)
|