| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242 |
- import enum
- import functools
- import reprlib
- import sys
- from array import array
- from collections.abc import (
- ItemsView,
- Iterable,
- Iterator,
- KeysView,
- Mapping,
- ValuesView,
- )
- from dataclasses import dataclass
- from typing import (
- TYPE_CHECKING,
- Any,
- ClassVar,
- Generic,
- NoReturn,
- Optional,
- TypeVar,
- Union,
- cast,
- overload,
- )
- from ._abc import MDArg, MultiMapping, MutableMultiMapping, SupportsKeys
- if sys.version_info >= (3, 11):
- from typing import Self
- else:
- from typing_extensions import Self
- class istr(str):
- """Case insensitive str."""
- __is_istr__ = True
- __istr_identity__: Optional[str] = None
- _V = TypeVar("_V")
- _T = TypeVar("_T")
- _SENTINEL = enum.Enum("_SENTINEL", "sentinel")
- sentinel = _SENTINEL.sentinel
- _version = array("Q", [0])
- class _Iter(Generic[_T]):
- __slots__ = ("_size", "_iter")
- def __init__(self, size: int, iterator: Iterator[_T]):
- self._size = size
- self._iter = iterator
- def __iter__(self) -> Self:
- return self
- def __next__(self) -> _T:
- return next(self._iter)
- def __length_hint__(self) -> int:
- return self._size
- class _ViewBase(Generic[_V]):
- def __init__(
- self,
- md: "MultiDict[_V]",
- ):
- self._md = md
- def __len__(self) -> int:
- return len(self._md)
- class _ItemsView(_ViewBase[_V], ItemsView[str, _V]):
- def __contains__(self, item: object) -> bool:
- if not isinstance(item, (tuple, list)) or len(item) != 2:
- return False
- key, value = item
- try:
- identity = self._md._identity(key)
- except TypeError:
- return False
- hash_ = hash(identity)
- for slot, idx, e in self._md._keys.iter_hash(hash_):
- if e.identity == identity and value == e.value:
- return True
- return False
- def __iter__(self) -> _Iter[tuple[str, _V]]:
- return _Iter(len(self), self._iter(self._md._version))
- def _iter(self, version: int) -> Iterator[tuple[str, _V]]:
- for e in self._md._keys.iter_entries():
- if version != self._md._version:
- raise RuntimeError("Dictionary changed during iteration")
- yield self._md._key(e.key), e.value
- @reprlib.recursive_repr()
- def __repr__(self) -> str:
- lst = []
- for e in self._md._keys.iter_entries():
- lst.append(f"'{e.key}': {e.value!r}")
- body = ", ".join(lst)
- return f"<{self.__class__.__name__}({body})>"
- def _parse_item(
- self, arg: Union[tuple[str, _V], _T]
- ) -> Optional[tuple[int, str, str, _V]]:
- if not isinstance(arg, tuple):
- return None
- if len(arg) != 2:
- return None
- try:
- identity = self._md._identity(arg[0])
- return (hash(identity), identity, arg[0], arg[1])
- except TypeError:
- return None
- def _tmp_set(self, it: Iterable[_T]) -> set[tuple[str, _V]]:
- tmp = set()
- for arg in it:
- item = self._parse_item(arg)
- if item is None:
- continue
- else:
- tmp.add((item[1], item[3]))
- return tmp
- def __and__(self, other: Iterable[Any]) -> set[tuple[str, _V]]:
- ret = set()
- try:
- it = iter(other)
- except TypeError:
- return NotImplemented
- for arg in it:
- item = self._parse_item(arg)
- if item is None:
- continue
- hash_, identity, key, value = item
- for slot, idx, e in self._md._keys.iter_hash(hash_):
- e.hash = -1
- if e.identity == identity and e.value == value:
- ret.add((e.key, e.value))
- self._md._keys.restore_hash(hash_)
- return ret
- def __rand__(self, other: Iterable[_T]) -> set[_T]:
- ret = set()
- try:
- it = iter(other)
- except TypeError:
- return NotImplemented
- for arg in it:
- item = self._parse_item(arg)
- if item is None:
- continue
- hash_, identity, key, value = item
- for slot, idx, e in self._md._keys.iter_hash(hash_):
- if e.identity == identity and e.value == value:
- ret.add(arg)
- break
- return ret
- def __or__(self, other: Iterable[_T]) -> set[Union[tuple[str, _V], _T]]:
- ret: set[Union[tuple[str, _V], _T]] = set(self)
- try:
- it = iter(other)
- except TypeError:
- return NotImplemented
- for arg in it:
- item: Optional[tuple[int, str, str, _V]] = self._parse_item(arg)
- if item is None:
- ret.add(arg)
- continue
- hash_, identity, key, value = item
- for slot, idx, e in self._md._keys.iter_hash(hash_):
- if e.identity == identity and e.value == value: # pragma: no branch
- break
- else:
- ret.add(arg)
- return ret
- def __ror__(self, other: Iterable[_T]) -> set[Union[tuple[str, _V], _T]]:
- try:
- ret: set[Union[tuple[str, _V], _T]] = set(other)
- except TypeError:
- return NotImplemented
- tmp = self._tmp_set(ret)
- for e in self._md._keys.iter_entries():
- if (e.identity, e.value) not in tmp:
- ret.add((e.key, e.value))
- return ret
- def __sub__(self, other: Iterable[_T]) -> set[Union[tuple[str, _V], _T]]:
- ret: set[Union[tuple[str, _V], _T]] = set()
- try:
- it = iter(other)
- except TypeError:
- return NotImplemented
- tmp = self._tmp_set(it)
- for e in self._md._keys.iter_entries():
- if (e.identity, e.value) not in tmp:
- ret.add((e.key, e.value))
- return ret
- def __rsub__(self, other: Iterable[_T]) -> set[_T]:
- ret: set[_T] = set()
- try:
- it = iter(other)
- except TypeError:
- return NotImplemented
- for arg in it:
- item = self._parse_item(arg)
- if item is None:
- ret.add(arg)
- continue
- hash_, identity, key, value = item
- for slot, idx, e in self._md._keys.iter_hash(hash_):
- if e.identity == identity and e.value == value: # pragma: no branch
- break
- else:
- ret.add(arg)
- return ret
- def __xor__(self, other: Iterable[_T]) -> set[Union[tuple[str, _V], _T]]:
- try:
- rgt = set(other)
- except TypeError:
- return NotImplemented
- ret: set[Union[tuple[str, _V], _T]] = self - rgt
- ret |= rgt - self
- return ret
- __rxor__ = __xor__
- def isdisjoint(self, other: Iterable[tuple[str, _V]]) -> bool:
- for arg in other:
- item = self._parse_item(arg)
- if item is None:
- continue
- hash_, identity, key, value = item
- for slot, idx, e in self._md._keys.iter_hash(hash_):
- if e.identity == identity and e.value == value: # pragma: no branch
- return False
- return True
- class _ValuesView(_ViewBase[_V], ValuesView[_V]):
- def __contains__(self, value: object) -> bool:
- for e in self._md._keys.iter_entries():
- if e.value == value:
- return True
- return False
- def __iter__(self) -> _Iter[_V]:
- return _Iter(len(self), self._iter(self._md._version))
- def _iter(self, version: int) -> Iterator[_V]:
- for e in self._md._keys.iter_entries():
- if version != self._md._version:
- raise RuntimeError("Dictionary changed during iteration")
- yield e.value
- @reprlib.recursive_repr()
- def __repr__(self) -> str:
- lst = []
- for e in self._md._keys.iter_entries():
- lst.append(repr(e.value))
- body = ", ".join(lst)
- return f"<{self.__class__.__name__}({body})>"
- class _KeysView(_ViewBase[_V], KeysView[str]):
- def __contains__(self, key: object) -> bool:
- if not isinstance(key, str):
- return False
- identity = self._md._identity(key)
- hash_ = hash(identity)
- for slot, idx, e in self._md._keys.iter_hash(hash_):
- if e.identity == identity: # pragma: no branch
- return True
- return False
- def __iter__(self) -> _Iter[str]:
- return _Iter(len(self), self._iter(self._md._version))
- def _iter(self, version: int) -> Iterator[str]:
- for e in self._md._keys.iter_entries():
- if version != self._md._version:
- raise RuntimeError("Dictionary changed during iteration")
- yield self._md._key(e.key)
- def __repr__(self) -> str:
- lst = []
- for e in self._md._keys.iter_entries():
- lst.append(f"'{e.key}'")
- body = ", ".join(lst)
- return f"<{self.__class__.__name__}({body})>"
- def __and__(self, other: Iterable[object]) -> set[str]:
- ret = set()
- try:
- it = iter(other)
- except TypeError:
- return NotImplemented
- for key in it:
- if not isinstance(key, str):
- continue
- identity = self._md._identity(key)
- hash_ = hash(identity)
- for slot, idx, e in self._md._keys.iter_hash(hash_):
- if e.identity == identity: # pragma: no branch
- ret.add(e.key)
- break
- return ret
- def __rand__(self, other: Iterable[_T]) -> set[_T]:
- ret = set()
- try:
- it = iter(other)
- except TypeError:
- return NotImplemented
- for key in it:
- if not isinstance(key, str):
- continue
- if key in self._md:
- ret.add(key)
- return cast(set[_T], ret)
- def __or__(self, other: Iterable[_T]) -> set[Union[str, _T]]:
- ret: set[Union[str, _T]] = set(self)
- try:
- it = iter(other)
- except TypeError:
- return NotImplemented
- for key in it:
- if not isinstance(key, str):
- ret.add(key)
- continue
- if key not in self._md:
- ret.add(key)
- return ret
- def __ror__(self, other: Iterable[_T]) -> set[Union[str, _T]]:
- try:
- ret: set[Union[str, _T]] = set(other)
- except TypeError:
- return NotImplemented
- tmp = set()
- for key in ret:
- if not isinstance(key, str):
- continue
- identity = self._md._identity(key)
- tmp.add(identity)
- for e in self._md._keys.iter_entries():
- if e.identity not in tmp:
- ret.add(e.key)
- return ret
- def __sub__(self, other: Iterable[object]) -> set[str]:
- ret = set(self)
- try:
- it = iter(other)
- except TypeError:
- return NotImplemented
- for key in it:
- if not isinstance(key, str):
- continue
- identity = self._md._identity(key)
- hash_ = hash(identity)
- for slot, idx, e in self._md._keys.iter_hash(hash_):
- if e.identity == identity: # pragma: no branch
- ret.discard(e.key)
- break
- return ret
- def __rsub__(self, other: Iterable[_T]) -> set[_T]:
- try:
- ret: set[_T] = set(other)
- except TypeError:
- return NotImplemented
- for key in other:
- if not isinstance(key, str):
- continue
- if key in self._md:
- ret.discard(key) # type: ignore[arg-type]
- return ret
- def __xor__(self, other: Iterable[_T]) -> set[Union[str, _T]]:
- try:
- rgt = set(other)
- except TypeError:
- return NotImplemented
- ret: set[Union[str, _T]] = self - rgt # type: ignore[assignment]
- ret |= rgt - self
- return ret
- __rxor__ = __xor__
- def isdisjoint(self, other: Iterable[object]) -> bool:
- for key in other:
- if not isinstance(key, str):
- continue
- if key in self._md:
- return False
- return True
- class _CSMixin:
- _ci: ClassVar[bool] = False
- def _key(self, key: str) -> str:
- return key
- def _identity(self, key: str) -> str:
- if isinstance(key, str):
- return key
- else:
- raise TypeError("MultiDict keys should be either str or subclasses of str")
- class _CIMixin:
- _ci: ClassVar[bool] = True
- def _key(self, key: str) -> str:
- if type(key) is istr:
- return key
- else:
- return istr(key)
- def _identity(self, key: str) -> str:
- if isinstance(key, istr):
- ret = key.__istr_identity__
- if ret is None:
- ret = key.lower()
- key.__istr_identity__ = ret
- return ret
- if isinstance(key, str):
- return key.lower()
- else:
- raise TypeError("MultiDict keys should be either str or subclasses of str")
- def estimate_log2_keysize(n: int) -> int:
- # 7 == HT_MINSIZE - 1
- return (((n * 3 + 1) // 2) | 7).bit_length()
- @dataclass
- class _Entry(Generic[_V]):
- hash: int
- identity: str
- key: str
- value: _V
- @dataclass
- class _HtKeys(Generic[_V]): # type: ignore[misc]
- LOG_MINSIZE: ClassVar[int] = 3
- MINSIZE: ClassVar[int] = 8
- PREALLOCATED_INDICES: ClassVar[dict[int, array]] = { # type: ignore[type-arg]
- log2_size: array(
- "b" if log2_size < 8 else "h", (-1 for i in range(1 << log2_size))
- )
- for log2_size in range(3, 10)
- }
- log2_size: int
- usable: int
- indices: array # type: ignore[type-arg] # in py3.9 array is not generic
- entries: list[Optional[_Entry[_V]]]
- @functools.cached_property
- def nslots(self) -> int:
- return 1 << self.log2_size
- @functools.cached_property
- def mask(self) -> int:
- return self.nslots - 1
- if sys.implementation.name != "pypy":
- def __sizeof__(self) -> int:
- return (
- object.__sizeof__(self)
- + sys.getsizeof(self.indices)
- + sys.getsizeof(self.entries)
- )
- @classmethod
- def new(cls, log2_size: int, entries: list[Optional[_Entry[_V]]]) -> Self:
- size = 1 << log2_size
- usable = (size << 1) // 3
- if log2_size < 10:
- indices = cls.PREALLOCATED_INDICES[log2_size].__copy__()
- elif log2_size < 16:
- indices = array("h", (-1 for i in range(size)))
- elif log2_size < 32:
- indices = array("l", (-1 for i in range(size)))
- else: # pragma: no cover # don't test huge multidicts
- indices = array("q", (-1 for i in range(size)))
- ret = cls(
- log2_size=log2_size,
- usable=usable,
- indices=indices,
- entries=entries,
- )
- return ret
- def clone(self) -> "_HtKeys[_V]":
- entries = [
- _Entry(e.hash, e.identity, e.key, e.value) if e is not None else None
- for e in self.entries
- ]
- return _HtKeys(
- log2_size=self.log2_size,
- usable=self.usable,
- indices=self.indices.__copy__(),
- entries=entries,
- )
- def build_indices(self, update: bool) -> None:
- mask = self.mask
- indices = self.indices
- for idx, e in enumerate(self.entries):
- assert e is not None
- hash_ = e.hash
- if update:
- if hash_ == -1:
- hash_ = hash(e.identity)
- else:
- assert hash_ != -1
- i = hash_ & mask
- perturb = hash_ & sys.maxsize
- while indices[i] != -1:
- perturb >>= 5
- i = mask & (i * 5 + perturb + 1)
- indices[i] = idx
- def find_empty_slot(self, hash_: int) -> int:
- mask = self.mask
- indices = self.indices
- i = hash_ & mask
- perturb = hash_ & sys.maxsize
- ix = indices[i]
- while ix != -1:
- perturb >>= 5
- i = (i * 5 + perturb + 1) & mask
- ix = indices[i]
- return i
- def iter_hash(self, hash_: int) -> Iterator[tuple[int, int, _Entry[_V]]]:
- mask = self.mask
- indices = self.indices
- entries = self.entries
- i = hash_ & mask
- perturb = hash_ & sys.maxsize
- ix = indices[i]
- while ix != -1:
- if ix != -2:
- e = entries[ix]
- if e.hash == hash_:
- yield i, ix, e
- perturb >>= 5
- i = (i * 5 + perturb + 1) & mask
- ix = indices[i]
- def del_idx(self, hash_: int, idx: int) -> None:
- mask = self.mask
- indices = self.indices
- i = hash_ & mask
- perturb = hash_ & sys.maxsize
- ix = indices[i]
- while ix != idx:
- perturb >>= 5
- i = (i * 5 + perturb + 1) & mask
- ix = indices[i]
- indices[i] = -2
- def iter_entries(self) -> Iterator[_Entry[_V]]:
- return filter(None, self.entries)
- def restore_hash(self, hash_: int) -> None:
- mask = self.mask
- indices = self.indices
- entries = self.entries
- i = hash_ & mask
- perturb = hash_ & sys.maxsize
- ix = indices[i]
- while ix != -1:
- if ix != -2:
- entry = entries[ix]
- if entry.hash == -1:
- entry.hash = hash_
- perturb >>= 5
- i = (i * 5 + perturb + 1) & mask
- ix = indices[i]
- class MultiDict(_CSMixin, MutableMultiMapping[_V]):
- """Dictionary with the support for duplicate keys."""
- __slots__ = ("_keys", "_used", "_version")
- def __init__(self, arg: MDArg[_V] = None, /, **kwargs: _V):
- self._used = 0
- v = _version
- v[0] += 1
- self._version = v[0]
- if not kwargs:
- md = None
- if isinstance(arg, MultiDictProxy):
- md = arg._md
- elif isinstance(arg, MultiDict):
- md = arg
- if md is not None and md._ci is self._ci:
- self._from_md(md)
- return
- it = self._parse_args(arg, kwargs)
- log2_size = estimate_log2_keysize(cast(int, next(it)))
- if log2_size > 17: # pragma: no cover
- # Don't overallocate really huge keys space in init
- log2_size = 17
- self._keys: _HtKeys[_V] = _HtKeys.new(log2_size, [])
- self._extend_items(cast(Iterator[_Entry[_V]], it))
- def _from_md(self, md: "MultiDict[_V]") -> None:
- # Copy everything as-is without compacting the new multidict,
- # otherwise it requires reindexing
- self._keys = md._keys.clone()
- self._used = md._used
- @overload
- def getall(self, key: str) -> list[_V]: ...
- @overload
- def getall(self, key: str, default: _T) -> Union[list[_V], _T]: ...
- def getall(
- self, key: str, default: Union[_T, _SENTINEL] = sentinel
- ) -> Union[list[_V], _T]:
- """Return a list of all values matching the key."""
- identity = self._identity(key)
- hash_ = hash(identity)
- res = []
- restore = []
- for slot, idx, e in self._keys.iter_hash(hash_):
- if e.identity == identity: # pragma: no branch
- res.append(e.value)
- e.hash = -1
- restore.append(idx)
- if res:
- entries = self._keys.entries
- for idx in restore:
- entries[idx].hash = hash_ # type: ignore[union-attr]
- return res
- if not res and default is not sentinel:
- return default
- raise KeyError("Key not found: %r" % key)
- @overload
- def getone(self, key: str) -> _V: ...
- @overload
- def getone(self, key: str, default: _T) -> Union[_V, _T]: ...
- def getone(
- self, key: str, default: Union[_T, _SENTINEL] = sentinel
- ) -> Union[_V, _T]:
- """Get first value matching the key.
- Raises KeyError if the key is not found and no default is provided.
- """
- identity = self._identity(key)
- hash_ = hash(identity)
- for slot, idx, e in self._keys.iter_hash(hash_):
- if e.identity == identity: # pragma: no branch
- return e.value
- if default is not sentinel:
- return default
- raise KeyError("Key not found: %r" % key)
- # Mapping interface #
- def __getitem__(self, key: str) -> _V:
- return self.getone(key)
- @overload
- def get(self, key: str, /) -> Union[_V, None]: ...
- @overload
- def get(self, key: str, /, default: _T) -> Union[_V, _T]: ...
- def get(self, key: str, default: Union[_T, None] = None) -> Union[_V, _T, None]:
- """Get first value matching the key.
- If the key is not found, returns the default (or None if no default is provided)
- """
- return self.getone(key, default)
- def __iter__(self) -> Iterator[str]:
- return iter(self.keys())
- def __len__(self) -> int:
- return self._used
- def keys(self) -> KeysView[str]:
- """Return a new view of the dictionary's keys."""
- return _KeysView(self)
- def items(self) -> ItemsView[str, _V]:
- """Return a new view of the dictionary's items *(key, value) pairs)."""
- return _ItemsView(self)
- def values(self) -> _ValuesView[_V]:
- """Return a new view of the dictionary's values."""
- return _ValuesView(self)
- def __eq__(self, other: object) -> bool:
- if not isinstance(other, Mapping):
- return NotImplemented
- if isinstance(other, MultiDictProxy):
- return self == other._md
- if isinstance(other, MultiDict):
- lft = self._keys
- rht = other._keys
- if self._used != other._used:
- return False
- for e1, e2 in zip(lft.iter_entries(), rht.iter_entries()):
- if e1.identity != e2.identity or e1.value != e2.value:
- return False
- return True
- if self._used != len(other):
- return False
- for k, v in self.items():
- nv = other.get(k, sentinel)
- if v != nv:
- return False
- return True
- def __contains__(self, key: object) -> bool:
- if not isinstance(key, str):
- return False
- identity = self._identity(key)
- hash_ = hash(identity)
- for slot, idx, e in self._keys.iter_hash(hash_):
- if e.identity == identity: # pragma: no branch
- return True
- return False
- @reprlib.recursive_repr()
- def __repr__(self) -> str:
- body = ", ".join(f"'{e.key}': {e.value!r}" for e in self._keys.iter_entries())
- return f"<{self.__class__.__name__}({body})>"
- if sys.implementation.name != "pypy":
- def __sizeof__(self) -> int:
- return object.__sizeof__(self) + sys.getsizeof(self._keys)
- def __reduce__(self) -> tuple[type[Self], tuple[list[tuple[str, _V]]]]:
- return (self.__class__, (list(self.items()),))
- def add(self, key: str, value: _V) -> None:
- identity = self._identity(key)
- hash_ = hash(identity)
- self._add_with_hash(_Entry(hash_, identity, key, value))
- self._incr_version()
- def copy(self) -> Self:
- """Return a copy of itself."""
- cls = self.__class__
- return cls(self)
- __copy__ = copy
- def extend(self, arg: MDArg[_V] = None, /, **kwargs: _V) -> None:
- """Extend current MultiDict with more values.
- This method must be used instead of update.
- """
- it = self._parse_args(arg, kwargs)
- newsize = self._used + cast(int, next(it))
- self._resize(estimate_log2_keysize(newsize), False)
- self._extend_items(cast(Iterator[_Entry[_V]], it))
- def _parse_args(
- self,
- arg: MDArg[_V],
- kwargs: Mapping[str, _V],
- ) -> Iterator[Union[int, _Entry[_V]]]:
- identity_func = self._identity
- if arg:
- if isinstance(arg, MultiDictProxy):
- arg = arg._md
- if isinstance(arg, MultiDict):
- yield len(arg) + len(kwargs)
- if self._ci is not arg._ci:
- for e in arg._keys.iter_entries():
- identity = identity_func(e.key)
- yield _Entry(hash(identity), identity, e.key, e.value)
- else:
- for e in arg._keys.iter_entries():
- yield _Entry(e.hash, e.identity, e.key, e.value)
- if kwargs:
- for key, value in kwargs.items():
- identity = identity_func(key)
- yield _Entry(hash(identity), identity, key, value)
- else:
- if hasattr(arg, "keys"):
- arg = cast(SupportsKeys[_V], arg)
- arg = [(k, arg[k]) for k in arg.keys()]
- if kwargs:
- arg = list(arg)
- arg.extend(list(kwargs.items()))
- try:
- yield len(arg) + len(kwargs) # type: ignore[arg-type]
- except TypeError:
- yield 0
- for pos, item in enumerate(arg):
- if not len(item) == 2:
- raise ValueError(
- f"multidict update sequence element #{pos}"
- f"has length {len(item)}; 2 is required"
- )
- identity = identity_func(item[0])
- yield _Entry(hash(identity), identity, item[0], item[1])
- else:
- yield len(kwargs)
- for key, value in kwargs.items():
- identity = identity_func(key)
- yield _Entry(hash(identity), identity, key, value)
- def _extend_items(self, items: Iterable[_Entry[_V]]) -> None:
- for e in items:
- self._add_with_hash(e)
- self._incr_version()
- def clear(self) -> None:
- """Remove all items from MultiDict."""
- self._used = 0
- self._keys = _HtKeys.new(_HtKeys.LOG_MINSIZE, [])
- self._incr_version()
- # Mapping interface #
- def __setitem__(self, key: str, value: _V) -> None:
- identity = self._identity(key)
- hash_ = hash(identity)
- found = False
- for slot, idx, e in self._keys.iter_hash(hash_):
- if e.identity == identity: # pragma: no branch
- if not found:
- e.key = key
- e.value = value
- e.hash = -1
- found = True
- self._incr_version()
- elif e.hash != -1: # pragma: no branch
- self._del_at(slot, idx)
- if not found:
- self._add_with_hash(_Entry(hash_, identity, key, value))
- else:
- self._keys.restore_hash(hash_)
- def __delitem__(self, key: str) -> None:
- found = False
- identity = self._identity(key)
- hash_ = hash(identity)
- for slot, idx, e in self._keys.iter_hash(hash_):
- if e.identity == identity: # pragma: no branch
- self._del_at(slot, idx)
- found = True
- if not found:
- raise KeyError(key)
- else:
- self._incr_version()
- @overload
- def setdefault(
- self: "MultiDict[Union[_T, None]]", key: str, default: None = None
- ) -> Union[_T, None]: ...
- @overload
- def setdefault(self, key: str, default: _V) -> _V: ...
- def setdefault(self, key: str, default: Union[_V, None] = None) -> Union[_V, None]: # type: ignore[misc]
- """Return value for key, set value to default if key is not present."""
- identity = self._identity(key)
- hash_ = hash(identity)
- for slot, idx, e in self._keys.iter_hash(hash_):
- if e.identity == identity: # pragma: no branch
- return e.value
- self.add(key, default) # type: ignore[arg-type]
- return default
- @overload
- def popone(self, key: str) -> _V: ...
- @overload
- def popone(self, key: str, default: _T) -> Union[_V, _T]: ...
- def popone(
- self, key: str, default: Union[_T, _SENTINEL] = sentinel
- ) -> Union[_V, _T]:
- """Remove specified key and return the corresponding value.
- If key is not found, d is returned if given, otherwise
- KeyError is raised.
- """
- identity = self._identity(key)
- hash_ = hash(identity)
- for slot, idx, e in self._keys.iter_hash(hash_):
- if e.identity == identity: # pragma: no branch
- value = e.value
- self._del_at(slot, idx)
- self._incr_version()
- return value
- if default is sentinel:
- raise KeyError(key)
- else:
- return default
- # Type checking will inherit signature for pop() if we don't confuse it here.
- if not TYPE_CHECKING:
- pop = popone
- @overload
- def popall(self, key: str) -> list[_V]: ...
- @overload
- def popall(self, key: str, default: _T) -> Union[list[_V], _T]: ...
- def popall(
- self, key: str, default: Union[_T, _SENTINEL] = sentinel
- ) -> Union[list[_V], _T]:
- """Remove all occurrences of key and return the list of corresponding
- values.
- If key is not found, default is returned if given, otherwise
- KeyError is raised.
- """
- found = False
- identity = self._identity(key)
- hash_ = hash(identity)
- ret = []
- for slot, idx, e in self._keys.iter_hash(hash_):
- if e.identity == identity: # pragma: no branch
- found = True
- ret.append(e.value)
- self._del_at(slot, idx)
- self._incr_version()
- if not found:
- if default is sentinel:
- raise KeyError(key)
- else:
- return default
- else:
- return ret
- def popitem(self) -> tuple[str, _V]:
- """Remove and return an arbitrary (key, value) pair."""
- if self._used <= 0:
- raise KeyError("empty multidict")
- pos = len(self._keys.entries) - 1
- entry = self._keys.entries.pop()
- while entry is None:
- pos -= 1
- entry = self._keys.entries.pop()
- ret = self._key(entry.key), entry.value
- self._keys.del_idx(entry.hash, pos)
- self._used -= 1
- self._incr_version()
- return ret
- def update(self, arg: MDArg[_V] = None, /, **kwargs: _V) -> None:
- """Update the dictionary, overwriting existing keys."""
- it = self._parse_args(arg, kwargs)
- newsize = self._used + cast(int, next(it))
- log2_size = estimate_log2_keysize(newsize)
- if log2_size > 17: # pragma: no cover
- # Don't overallocate really huge keys space in update,
- # duplicate keys could reduce the resulting anount of entries
- log2_size = 17
- if log2_size > self._keys.log2_size:
- self._resize(log2_size, False)
- try:
- self._update_items(cast(Iterator[_Entry[_V]], it))
- finally:
- self._post_update()
- def _update_items(self, items: Iterator[_Entry[_V]]) -> None:
- for entry in items:
- found = False
- hash_ = entry.hash
- identity = entry.identity
- for slot, idx, e in self._keys.iter_hash(hash_):
- if e.identity == identity: # pragma: no branch
- if not found:
- found = True
- e.key = entry.key
- e.value = entry.value
- e.hash = -1
- else:
- self._del_at_for_upd(e)
- if not found:
- self._add_with_hash_for_upd(entry)
- def _post_update(self) -> None:
- keys = self._keys
- indices = keys.indices
- entries = keys.entries
- for slot in range(keys.nslots):
- idx = indices[slot]
- if idx >= 0:
- e2 = entries[idx]
- assert e2 is not None
- if e2.key is None:
- entries[idx] = None
- indices[slot] = -2
- self._used -= 1
- if e2.hash == -1:
- e2.hash = hash(e2.identity)
- self._incr_version()
- def merge(self, arg: MDArg[_V] = None, /, **kwargs: _V) -> None:
- """Merge into the dictionary, adding non-existing keys."""
- it = self._parse_args(arg, kwargs)
- newsize = self._used + cast(int, next(it))
- log2_size = estimate_log2_keysize(newsize)
- if log2_size > 17: # pragma: no cover
- # Don't overallocate really huge keys space in update,
- # duplicate keys could reduce the resulting anount of entries
- log2_size = 17
- if log2_size > self._keys.log2_size:
- self._resize(log2_size, False)
- try:
- self._merge_items(cast(Iterator[_Entry[_V]], it))
- finally:
- self._post_update()
- def _merge_items(self, items: Iterator[_Entry[_V]]) -> None:
- for entry in items:
- hash_ = entry.hash
- identity = entry.identity
- for slot, idx, e in self._keys.iter_hash(hash_):
- if e.identity == identity: # pragma: no branch
- break
- else:
- self._add_with_hash_for_upd(entry)
- def _incr_version(self) -> None:
- v = _version
- v[0] += 1
- self._version = v[0]
- def _resize(self, log2_newsize: int, update: bool) -> None:
- oldkeys = self._keys
- newentries = self._used
- if len(oldkeys.entries) == newentries:
- entries = oldkeys.entries
- else:
- entries = [e for e in oldkeys.entries if e is not None]
- newkeys: _HtKeys[_V] = _HtKeys.new(log2_newsize, entries)
- newkeys.usable -= newentries
- newkeys.build_indices(update)
- self._keys = newkeys
- def _add_with_hash(self, entry: _Entry[_V]) -> None:
- if self._keys.usable <= 0:
- self._resize((self._used * 3 | _HtKeys.MINSIZE - 1).bit_length(), False)
- keys = self._keys
- slot = keys.find_empty_slot(entry.hash)
- keys.indices[slot] = len(keys.entries)
- keys.entries.append(entry)
- self._incr_version()
- self._used += 1
- keys.usable -= 1
- def _add_with_hash_for_upd(self, entry: _Entry[_V]) -> None:
- if self._keys.usable <= 0:
- self._resize((self._used * 3 | _HtKeys.MINSIZE - 1).bit_length(), True)
- keys = self._keys
- slot = keys.find_empty_slot(entry.hash)
- keys.indices[slot] = len(keys.entries)
- entry.hash = -1
- keys.entries.append(entry)
- self._incr_version()
- self._used += 1
- keys.usable -= 1
- def _del_at(self, slot: int, idx: int) -> None:
- self._keys.entries[idx] = None
- self._keys.indices[slot] = -2
- self._used -= 1
- def _del_at_for_upd(self, entry: _Entry[_V]) -> None:
- entry.key = None # type: ignore[assignment]
- entry.value = None # type: ignore[assignment]
- class CIMultiDict(_CIMixin, MultiDict[_V]):
- """Dictionary with the support for duplicate case-insensitive keys."""
- class MultiDictProxy(_CSMixin, MultiMapping[_V]):
- """Read-only proxy for MultiDict instance."""
- __slots__ = ("_md",)
- _md: MultiDict[_V]
- def __init__(self, arg: Union[MultiDict[_V], "MultiDictProxy[_V]"]):
- if not isinstance(arg, (MultiDict, MultiDictProxy)):
- raise TypeError(
- f"ctor requires MultiDict or MultiDictProxy instance, not {type(arg)}"
- )
- if isinstance(arg, MultiDictProxy):
- self._md = arg._md
- else:
- self._md = arg
- def __reduce__(self) -> NoReturn:
- raise TypeError(f"can't pickle {self.__class__.__name__} objects")
- @overload
- def getall(self, key: str) -> list[_V]: ...
- @overload
- def getall(self, key: str, default: _T) -> Union[list[_V], _T]: ...
- def getall(
- self, key: str, default: Union[_T, _SENTINEL] = sentinel
- ) -> Union[list[_V], _T]:
- """Return a list of all values matching the key."""
- if default is not sentinel:
- return self._md.getall(key, default)
- else:
- return self._md.getall(key)
- @overload
- def getone(self, key: str) -> _V: ...
- @overload
- def getone(self, key: str, default: _T) -> Union[_V, _T]: ...
- def getone(
- self, key: str, default: Union[_T, _SENTINEL] = sentinel
- ) -> Union[_V, _T]:
- """Get first value matching the key.
- Raises KeyError if the key is not found and no default is provided.
- """
- if default is not sentinel:
- return self._md.getone(key, default)
- else:
- return self._md.getone(key)
- # Mapping interface #
- def __getitem__(self, key: str) -> _V:
- return self.getone(key)
- @overload
- def get(self, key: str, /) -> Union[_V, None]: ...
- @overload
- def get(self, key: str, /, default: _T) -> Union[_V, _T]: ...
- def get(self, key: str, default: Union[_T, None] = None) -> Union[_V, _T, None]:
- """Get first value matching the key.
- If the key is not found, returns the default (or None if no default is provided)
- """
- return self._md.getone(key, default)
- def __iter__(self) -> Iterator[str]:
- return iter(self._md.keys())
- def __len__(self) -> int:
- return len(self._md)
- def keys(self) -> KeysView[str]:
- """Return a new view of the dictionary's keys."""
- return self._md.keys()
- def items(self) -> ItemsView[str, _V]:
- """Return a new view of the dictionary's items *(key, value) pairs)."""
- return self._md.items()
- def values(self) -> _ValuesView[_V]:
- """Return a new view of the dictionary's values."""
- return self._md.values()
- def __eq__(self, other: object) -> bool:
- return self._md == other
- def __contains__(self, key: object) -> bool:
- return key in self._md
- @reprlib.recursive_repr()
- def __repr__(self) -> str:
- body = ", ".join(f"'{k}': {v!r}" for k, v in self.items())
- return f"<{self.__class__.__name__}({body})>"
- def copy(self) -> MultiDict[_V]:
- """Return a copy of itself."""
- return MultiDict(self._md)
- class CIMultiDictProxy(_CIMixin, MultiDictProxy[_V]):
- """Read-only proxy for CIMultiDict instance."""
- def __init__(self, arg: Union[MultiDict[_V], MultiDictProxy[_V]]):
- if not isinstance(arg, (CIMultiDict, CIMultiDictProxy)):
- raise TypeError(
- "ctor requires CIMultiDict or CIMultiDictProxy instance"
- f", not {type(arg)}"
- )
- super().__init__(arg)
- def copy(self) -> CIMultiDict[_V]:
- """Return a copy of itself."""
- return CIMultiDict(self._md)
- def getversion(md: Union[MultiDict[object], MultiDictProxy[object]]) -> int:
- if isinstance(md, MultiDictProxy):
- md = md._md
- elif not isinstance(md, MultiDict):
- raise TypeError("Parameter should be multidict or proxy")
- return md._version
|