utils.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422
  1. from __future__ import annotations
  2. import importlib
  3. import logging
  4. import unicodedata
  5. from bisect import bisect_right
  6. from codecs import IncrementalDecoder
  7. from encodings.aliases import aliases
  8. from functools import lru_cache
  9. from re import findall
  10. from typing import Generator
  11. from _multibytecodec import ( # type: ignore[import-not-found,import]
  12. MultibyteIncrementalDecoder,
  13. )
  14. from .constant import (
  15. ENCODING_MARKS,
  16. IANA_SUPPORTED_SIMILAR,
  17. RE_POSSIBLE_ENCODING_INDICATION,
  18. UNICODE_RANGES_COMBINED,
  19. UNICODE_SECONDARY_RANGE_KEYWORD,
  20. UTF8_MAXIMAL_ALLOCATION,
  21. COMMON_CJK_CHARACTERS,
  22. _LATIN,
  23. _CJK,
  24. _HANGUL,
  25. _KATAKANA,
  26. _HIRAGANA,
  27. _THAI,
  28. _ARABIC,
  29. _ARABIC_ISOLATED_FORM,
  30. _ACCENT_KEYWORDS,
  31. _ACCENTUATED,
  32. )
  33. @lru_cache(maxsize=UTF8_MAXIMAL_ALLOCATION)
  34. def _character_flags(character: str) -> int:
  35. """Compute all name-based classification flags with a single unicodedata.name() call."""
  36. try:
  37. desc: str = unicodedata.name(character)
  38. except ValueError:
  39. return 0
  40. flags: int = 0
  41. if "LATIN" in desc:
  42. flags |= _LATIN
  43. if "CJK" in desc:
  44. flags |= _CJK
  45. if "HANGUL" in desc:
  46. flags |= _HANGUL
  47. if "KATAKANA" in desc:
  48. flags |= _KATAKANA
  49. if "HIRAGANA" in desc:
  50. flags |= _HIRAGANA
  51. if "THAI" in desc:
  52. flags |= _THAI
  53. if "ARABIC" in desc:
  54. flags |= _ARABIC
  55. if "ISOLATED FORM" in desc:
  56. flags |= _ARABIC_ISOLATED_FORM
  57. for kw in _ACCENT_KEYWORDS:
  58. if kw in desc:
  59. flags |= _ACCENTUATED
  60. break
  61. return flags
  62. @lru_cache(maxsize=UTF8_MAXIMAL_ALLOCATION)
  63. def is_accentuated(character: str) -> bool:
  64. return bool(_character_flags(character) & _ACCENTUATED)
  65. @lru_cache(maxsize=UTF8_MAXIMAL_ALLOCATION)
  66. def remove_accent(character: str) -> str:
  67. decomposed: str = unicodedata.decomposition(character)
  68. if not decomposed:
  69. return character
  70. codes: list[str] = decomposed.split(" ")
  71. return chr(int(codes[0], 16))
  72. # Pre-built sorted lookup table for O(log n) binary search in unicode_range().
  73. # Each entry is (range_start, range_end_exclusive, range_name).
  74. _UNICODE_RANGES_SORTED: list[tuple[int, int, str]] = sorted(
  75. (ord_range.start, ord_range.stop, name)
  76. for name, ord_range in UNICODE_RANGES_COMBINED.items()
  77. )
  78. _UNICODE_RANGE_STARTS: list[int] = [e[0] for e in _UNICODE_RANGES_SORTED]
  79. @lru_cache(maxsize=UTF8_MAXIMAL_ALLOCATION)
  80. def unicode_range(character: str) -> str | None:
  81. """
  82. Retrieve the Unicode range official name from a single character.
  83. """
  84. character_ord: int = ord(character)
  85. # Binary search: find the rightmost range whose start <= character_ord
  86. idx = bisect_right(_UNICODE_RANGE_STARTS, character_ord) - 1
  87. if idx >= 0:
  88. start, stop, name = _UNICODE_RANGES_SORTED[idx]
  89. if character_ord < stop:
  90. return name
  91. return None
  92. @lru_cache(maxsize=UTF8_MAXIMAL_ALLOCATION)
  93. def is_latin(character: str) -> bool:
  94. return bool(_character_flags(character) & _LATIN)
  95. @lru_cache(maxsize=UTF8_MAXIMAL_ALLOCATION)
  96. def is_punctuation(character: str) -> bool:
  97. character_category: str = unicodedata.category(character)
  98. if "P" in character_category:
  99. return True
  100. character_range: str | None = unicode_range(character)
  101. if character_range is None:
  102. return False
  103. return "Punctuation" in character_range
  104. @lru_cache(maxsize=UTF8_MAXIMAL_ALLOCATION)
  105. def is_symbol(character: str) -> bool:
  106. character_category: str = unicodedata.category(character)
  107. if "S" in character_category or "N" in character_category:
  108. return True
  109. character_range: str | None = unicode_range(character)
  110. if character_range is None:
  111. return False
  112. return "Forms" in character_range and character_category != "Lo"
  113. @lru_cache(maxsize=UTF8_MAXIMAL_ALLOCATION)
  114. def is_emoticon(character: str) -> bool:
  115. character_range: str | None = unicode_range(character)
  116. if character_range is None:
  117. return False
  118. return "Emoticons" in character_range or "Pictographs" in character_range
  119. @lru_cache(maxsize=UTF8_MAXIMAL_ALLOCATION)
  120. def is_separator(character: str) -> bool:
  121. if character.isspace() or character in {"|", "+", "<", ">"}:
  122. return True
  123. character_category: str = unicodedata.category(character)
  124. return "Z" in character_category or character_category in {"Po", "Pd", "Pc"}
  125. @lru_cache(maxsize=UTF8_MAXIMAL_ALLOCATION)
  126. def is_case_variable(character: str) -> bool:
  127. return character.islower() != character.isupper()
  128. @lru_cache(maxsize=UTF8_MAXIMAL_ALLOCATION)
  129. def is_cjk(character: str) -> bool:
  130. return bool(_character_flags(character) & _CJK)
  131. @lru_cache(maxsize=UTF8_MAXIMAL_ALLOCATION)
  132. def is_hiragana(character: str) -> bool:
  133. return bool(_character_flags(character) & _HIRAGANA)
  134. @lru_cache(maxsize=UTF8_MAXIMAL_ALLOCATION)
  135. def is_katakana(character: str) -> bool:
  136. return bool(_character_flags(character) & _KATAKANA)
  137. @lru_cache(maxsize=UTF8_MAXIMAL_ALLOCATION)
  138. def is_hangul(character: str) -> bool:
  139. return bool(_character_flags(character) & _HANGUL)
  140. @lru_cache(maxsize=UTF8_MAXIMAL_ALLOCATION)
  141. def is_thai(character: str) -> bool:
  142. return bool(_character_flags(character) & _THAI)
  143. @lru_cache(maxsize=UTF8_MAXIMAL_ALLOCATION)
  144. def is_arabic(character: str) -> bool:
  145. return bool(_character_flags(character) & _ARABIC)
  146. @lru_cache(maxsize=UTF8_MAXIMAL_ALLOCATION)
  147. def is_arabic_isolated_form(character: str) -> bool:
  148. return bool(_character_flags(character) & _ARABIC_ISOLATED_FORM)
  149. @lru_cache(maxsize=UTF8_MAXIMAL_ALLOCATION)
  150. def is_cjk_uncommon(character: str) -> bool:
  151. return character not in COMMON_CJK_CHARACTERS
  152. @lru_cache(maxsize=len(UNICODE_RANGES_COMBINED))
  153. def is_unicode_range_secondary(range_name: str) -> bool:
  154. return any(keyword in range_name for keyword in UNICODE_SECONDARY_RANGE_KEYWORD)
  155. @lru_cache(maxsize=UTF8_MAXIMAL_ALLOCATION)
  156. def is_unprintable(character: str) -> bool:
  157. return (
  158. character.isspace() is False # includes \n \t \r \v
  159. and character.isprintable() is False
  160. and character != "\x1a" # Why? Its the ASCII substitute character.
  161. and character != "\ufeff" # bug discovered in Python,
  162. # Zero Width No-Break Space located in Arabic Presentation Forms-B, Unicode 1.1 not acknowledged as space.
  163. )
  164. def any_specified_encoding(
  165. sequence: bytes | bytearray, search_zone: int = 8192
  166. ) -> str | None:
  167. """
  168. Extract using ASCII-only decoder any specified encoding in the first n-bytes.
  169. """
  170. if not isinstance(sequence, (bytes, bytearray)):
  171. raise TypeError
  172. seq_len: int = len(sequence)
  173. results: list[str] = findall(
  174. RE_POSSIBLE_ENCODING_INDICATION,
  175. sequence[: min(seq_len, search_zone)].decode("ascii", errors="ignore"),
  176. )
  177. if len(results) == 0:
  178. return None
  179. for specified_encoding in results:
  180. specified_encoding = specified_encoding.lower().replace("-", "_")
  181. encoding_alias: str
  182. encoding_iana: str
  183. for encoding_alias, encoding_iana in aliases.items():
  184. if encoding_alias == specified_encoding:
  185. return encoding_iana
  186. if encoding_iana == specified_encoding:
  187. return encoding_iana
  188. return None
  189. @lru_cache(maxsize=128)
  190. def is_multi_byte_encoding(name: str) -> bool:
  191. """
  192. Verify is a specific encoding is a multi byte one based on it IANA name
  193. """
  194. return name in {
  195. "utf_8",
  196. "utf_8_sig",
  197. "utf_16",
  198. "utf_16_be",
  199. "utf_16_le",
  200. "utf_32",
  201. "utf_32_le",
  202. "utf_32_be",
  203. "utf_7",
  204. } or issubclass(
  205. importlib.import_module(f"encodings.{name}").IncrementalDecoder,
  206. MultibyteIncrementalDecoder,
  207. )
  208. def identify_sig_or_bom(sequence: bytes | bytearray) -> tuple[str | None, bytes]:
  209. """
  210. Identify and extract SIG/BOM in given sequence.
  211. """
  212. for iana_encoding in ENCODING_MARKS:
  213. marks: bytes | list[bytes] = ENCODING_MARKS[iana_encoding]
  214. if isinstance(marks, bytes):
  215. marks = [marks]
  216. for mark in marks:
  217. if sequence.startswith(mark):
  218. return iana_encoding, mark
  219. return None, b""
  220. def should_strip_sig_or_bom(iana_encoding: str) -> bool:
  221. return iana_encoding not in {"utf_16", "utf_32"}
  222. def iana_name(cp_name: str, strict: bool = True) -> str:
  223. """Returns the Python normalized encoding name (Not the IANA official name)."""
  224. cp_name = cp_name.lower().replace("-", "_")
  225. encoding_alias: str
  226. encoding_iana: str
  227. for encoding_alias, encoding_iana in aliases.items():
  228. if cp_name in [encoding_alias, encoding_iana]:
  229. return encoding_iana
  230. if strict:
  231. raise ValueError(f"Unable to retrieve IANA for '{cp_name}'")
  232. return cp_name
  233. def cp_similarity(iana_name_a: str, iana_name_b: str) -> float:
  234. if is_multi_byte_encoding(iana_name_a) or is_multi_byte_encoding(iana_name_b):
  235. return 0.0
  236. decoder_a = importlib.import_module(f"encodings.{iana_name_a}").IncrementalDecoder
  237. decoder_b = importlib.import_module(f"encodings.{iana_name_b}").IncrementalDecoder
  238. id_a: IncrementalDecoder = decoder_a(errors="ignore")
  239. id_b: IncrementalDecoder = decoder_b(errors="ignore")
  240. character_match_count: int = 0
  241. for i in range(256):
  242. to_be_decoded: bytes = bytes([i])
  243. if id_a.decode(to_be_decoded) == id_b.decode(to_be_decoded):
  244. character_match_count += 1
  245. return character_match_count / 256
  246. def is_cp_similar(iana_name_a: str, iana_name_b: str) -> bool:
  247. """
  248. Determine if two code page are at least 80% similar. IANA_SUPPORTED_SIMILAR dict was generated using
  249. the function cp_similarity.
  250. """
  251. return (
  252. iana_name_a in IANA_SUPPORTED_SIMILAR
  253. and iana_name_b in IANA_SUPPORTED_SIMILAR[iana_name_a]
  254. )
  255. def set_logging_handler(
  256. name: str = "charset_normalizer",
  257. level: int = logging.INFO,
  258. format_string: str = "%(asctime)s | %(levelname)s | %(message)s",
  259. ) -> None:
  260. logger = logging.getLogger(name)
  261. logger.setLevel(level)
  262. handler = logging.StreamHandler()
  263. handler.setFormatter(logging.Formatter(format_string))
  264. logger.addHandler(handler)
  265. def cut_sequence_chunks(
  266. sequences: bytes | bytearray,
  267. encoding_iana: str,
  268. offsets: range,
  269. chunk_size: int,
  270. bom_or_sig_available: bool,
  271. strip_sig_or_bom: bool,
  272. sig_payload: bytes,
  273. is_multi_byte_decoder: bool,
  274. decoded_payload: str | None = None,
  275. ) -> Generator[str, None, None]:
  276. if decoded_payload and is_multi_byte_decoder is False:
  277. for i in offsets:
  278. chunk = decoded_payload[i : i + chunk_size]
  279. if not chunk:
  280. break
  281. yield chunk
  282. else:
  283. for i in offsets:
  284. chunk_end = i + chunk_size
  285. if chunk_end > len(sequences) + 8:
  286. continue
  287. cut_sequence = sequences[i : i + chunk_size]
  288. if bom_or_sig_available and strip_sig_or_bom is False:
  289. cut_sequence = sig_payload + cut_sequence
  290. chunk = cut_sequence.decode(
  291. encoding_iana,
  292. errors="ignore" if is_multi_byte_decoder else "strict",
  293. )
  294. # multi-byte bad cutting detector and adjustment
  295. # not the cleanest way to perform that fix but clever enough for now.
  296. if is_multi_byte_decoder and i > 0:
  297. chunk_partial_size_chk: int = min(chunk_size, 16)
  298. if (
  299. decoded_payload
  300. and chunk[:chunk_partial_size_chk] not in decoded_payload
  301. ):
  302. for j in range(i, i - 4, -1):
  303. cut_sequence = sequences[j:chunk_end]
  304. if bom_or_sig_available and strip_sig_or_bom is False:
  305. cut_sequence = sig_payload + cut_sequence
  306. chunk = cut_sequence.decode(encoding_iana, errors="ignore")
  307. if chunk[:chunk_partial_size_chk] in decoded_payload:
  308. break
  309. yield chunk