api.py 37 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974
  1. from __future__ import annotations
  2. import logging
  3. from os import PathLike
  4. from typing import BinaryIO
  5. from .cd import (
  6. coherence_ratio,
  7. encoding_languages,
  8. mb_encoding_languages,
  9. merge_coherence_ratios,
  10. )
  11. from .constant import (
  12. IANA_SUPPORTED,
  13. IANA_SUPPORTED_SIMILAR,
  14. TOO_BIG_SEQUENCE,
  15. TOO_SMALL_SEQUENCE,
  16. TRACE,
  17. )
  18. from .md import mess_ratio
  19. from .models import CharsetMatch, CharsetMatches
  20. from .utils import (
  21. any_specified_encoding,
  22. cut_sequence_chunks,
  23. iana_name,
  24. identify_sig_or_bom,
  25. is_multi_byte_encoding,
  26. should_strip_sig_or_bom,
  27. )
  28. logger = logging.getLogger("charset_normalizer")
  29. explain_handler = logging.StreamHandler()
  30. explain_handler.setFormatter(
  31. logging.Formatter("%(asctime)s | %(levelname)s | %(message)s")
  32. )
  33. # Pre-compute a reordered encoding list: multibyte first, then single-byte.
  34. # This allows the mb_definitive_match optimization to fire earlier, skipping
  35. # all single-byte encodings for genuine CJK content. Multibyte codecs
  36. # hard-fail (UnicodeDecodeError) on single-byte data almost instantly, so
  37. # testing them first costs negligible time for non-CJK files.
  38. _mb_supported: list[str] = []
  39. _sb_supported: list[str] = []
  40. for _supported_enc in IANA_SUPPORTED:
  41. try:
  42. if is_multi_byte_encoding(_supported_enc):
  43. _mb_supported.append(_supported_enc)
  44. else:
  45. _sb_supported.append(_supported_enc)
  46. except ImportError:
  47. _sb_supported.append(_supported_enc)
  48. IANA_SUPPORTED_MB_FIRST: list[str] = _mb_supported + _sb_supported
  49. def from_bytes(
  50. sequences: bytes | bytearray,
  51. steps: int = 5,
  52. chunk_size: int = 512,
  53. threshold: float = 0.2,
  54. cp_isolation: list[str] | None = None,
  55. cp_exclusion: list[str] | None = None,
  56. preemptive_behaviour: bool = True,
  57. explain: bool = False,
  58. language_threshold: float = 0.1,
  59. enable_fallback: bool = True,
  60. ) -> CharsetMatches:
  61. """
  62. Given a raw bytes sequence, return the best possibles charset usable to render str objects.
  63. If there is no results, it is a strong indicator that the source is binary/not text.
  64. By default, the process will extract 5 blocks of 512o each to assess the mess and coherence of a given sequence.
  65. And will give up a particular code page after 20% of measured mess. Those criteria are customizable at will.
  66. The preemptive behavior DOES NOT replace the traditional detection workflow, it prioritize a particular code page
  67. but never take it for granted. Can improve the performance.
  68. You may want to focus your attention to some code page or/and not others, use cp_isolation and cp_exclusion for that
  69. purpose.
  70. This function will strip the SIG in the payload/sequence every time except on UTF-16, UTF-32.
  71. By default the library does not setup any handler other than the NullHandler, if you choose to set the 'explain'
  72. toggle to True it will alter the logger configuration to add a StreamHandler that is suitable for debugging.
  73. Custom logging format and handler can be set manually.
  74. """
  75. if not isinstance(sequences, (bytearray, bytes)):
  76. raise TypeError(
  77. "Expected object of type bytes or bytearray, got: {}".format(
  78. type(sequences)
  79. )
  80. )
  81. if explain:
  82. previous_logger_level: int = logger.level
  83. logger.addHandler(explain_handler)
  84. logger.setLevel(TRACE)
  85. length: int = len(sequences)
  86. if length == 0:
  87. logger.debug("Encoding detection on empty bytes, assuming utf_8 intention.")
  88. if explain: # Defensive: ensure exit path clean handler
  89. logger.removeHandler(explain_handler)
  90. logger.setLevel(previous_logger_level)
  91. return CharsetMatches([CharsetMatch(sequences, "utf_8", 0.0, False, [], "")])
  92. if cp_isolation is not None:
  93. logger.log(
  94. TRACE,
  95. "cp_isolation is set. use this flag for debugging purpose. "
  96. "limited list of encoding allowed : %s.",
  97. ", ".join(cp_isolation),
  98. )
  99. cp_isolation = [iana_name(cp, False) for cp in cp_isolation]
  100. else:
  101. cp_isolation = []
  102. if cp_exclusion is not None:
  103. logger.log(
  104. TRACE,
  105. "cp_exclusion is set. use this flag for debugging purpose. "
  106. "limited list of encoding excluded : %s.",
  107. ", ".join(cp_exclusion),
  108. )
  109. cp_exclusion = [iana_name(cp, False) for cp in cp_exclusion]
  110. else:
  111. cp_exclusion = []
  112. if length <= (chunk_size * steps):
  113. logger.log(
  114. TRACE,
  115. "override steps (%i) and chunk_size (%i) as content does not fit (%i byte(s) given) parameters.",
  116. steps,
  117. chunk_size,
  118. length,
  119. )
  120. steps = 1
  121. chunk_size = length
  122. if steps > 1 and length / steps < chunk_size:
  123. chunk_size = int(length / steps)
  124. is_too_small_sequence: bool = len(sequences) < TOO_SMALL_SEQUENCE
  125. is_too_large_sequence: bool = len(sequences) >= TOO_BIG_SEQUENCE
  126. if is_too_small_sequence:
  127. logger.log(
  128. TRACE,
  129. "Trying to detect encoding from a tiny portion of ({}) byte(s).".format(
  130. length
  131. ),
  132. )
  133. elif is_too_large_sequence:
  134. logger.log(
  135. TRACE,
  136. "Using lazy str decoding because the payload is quite large, ({}) byte(s).".format(
  137. length
  138. ),
  139. )
  140. prioritized_encodings: list[str] = []
  141. specified_encoding: str | None = (
  142. any_specified_encoding(sequences) if preemptive_behaviour else None
  143. )
  144. if specified_encoding is not None:
  145. prioritized_encodings.append(specified_encoding)
  146. logger.log(
  147. TRACE,
  148. "Detected declarative mark in sequence. Priority +1 given for %s.",
  149. specified_encoding,
  150. )
  151. tested: set[str] = set()
  152. tested_but_hard_failure: list[str] = []
  153. tested_but_soft_failure: list[str] = []
  154. soft_failure_skip: set[str] = set()
  155. success_fast_tracked: set[str] = set()
  156. # Cache for decoded payload deduplication: hash(decoded_payload) -> (mean_mess_ratio, cd_ratios_merged, passed)
  157. # When multiple encodings decode to the exact same string, we can skip the expensive
  158. # mess_ratio and coherence_ratio analysis and reuse the results from the first encoding.
  159. payload_result_cache: dict[int, tuple[float, list[tuple[str, float]], bool]] = {}
  160. # When a definitive result (chaos=0.0 and good coherence) is found after testing
  161. # the prioritized encodings (ascii, utf_8), we can significantly reduce the remaining
  162. # work. Encodings that target completely different language families (e.g., Cyrillic
  163. # when the definitive match is Latin) are skipped entirely.
  164. # Additionally, for same-family encodings that pass chaos probing, we reuse the
  165. # definitive match's coherence ratios instead of recomputing them — a major savings
  166. # since coherence_ratio accounts for ~30% of total time on slow Latin files.
  167. definitive_match_found: bool = False
  168. definitive_target_languages: set[str] = set()
  169. # After the definitive match fires, we cap the number of additional same-family
  170. # single-byte encodings that pass chaos probing. Once we've accumulated enough
  171. # good candidates (N), further same-family SB encodings are unlikely to produce
  172. # a better best() result and just waste mess_ratio + coherence_ratio time.
  173. # The first encoding to trigger the definitive match is NOT counted (it's already in).
  174. post_definitive_sb_success_count: int = 0
  175. POST_DEFINITIVE_SB_CAP: int = 7
  176. # When a non-UTF multibyte encoding passes chaos probing with significant multibyte
  177. # content (decoded length < 98% of raw length), skip all remaining single-byte encodings.
  178. # Rationale: multi-byte decoders (CJK) have strict byte-sequence validation — if they
  179. # decode without error AND pass chaos probing with substantial multibyte content, the
  180. # data is genuinely multibyte encoded. Single-byte encodings will always decode (every
  181. # byte maps to something) but waste time on mess_ratio before failing.
  182. # The 98% threshold prevents false triggers on files that happen to have a few valid
  183. # multibyte pairs (e.g., cp424/_ude_1.txt where big5 decodes with 99% ratio).
  184. mb_definitive_match_found: bool = False
  185. fallback_ascii: CharsetMatch | None = None
  186. fallback_u8: CharsetMatch | None = None
  187. fallback_specified: CharsetMatch | None = None
  188. results: CharsetMatches = CharsetMatches()
  189. early_stop_results: CharsetMatches = CharsetMatches()
  190. sig_encoding, sig_payload = identify_sig_or_bom(sequences)
  191. if sig_encoding is not None:
  192. prioritized_encodings.append(sig_encoding)
  193. logger.log(
  194. TRACE,
  195. "Detected a SIG or BOM mark on first %i byte(s). Priority +1 given for %s.",
  196. len(sig_payload),
  197. sig_encoding,
  198. )
  199. prioritized_encodings.append("ascii")
  200. if "utf_8" not in prioritized_encodings:
  201. prioritized_encodings.append("utf_8")
  202. for encoding_iana in prioritized_encodings + IANA_SUPPORTED_MB_FIRST:
  203. if cp_isolation and encoding_iana not in cp_isolation:
  204. continue
  205. if cp_exclusion and encoding_iana in cp_exclusion:
  206. continue
  207. if encoding_iana in tested:
  208. continue
  209. tested.add(encoding_iana)
  210. decoded_payload: str | None = None
  211. bom_or_sig_available: bool = sig_encoding == encoding_iana
  212. strip_sig_or_bom: bool = bom_or_sig_available and should_strip_sig_or_bom(
  213. encoding_iana
  214. )
  215. if encoding_iana in {"utf_16", "utf_32"} and not bom_or_sig_available:
  216. logger.log(
  217. TRACE,
  218. "Encoding %s won't be tested as-is because it require a BOM. Will try some sub-encoder LE/BE.",
  219. encoding_iana,
  220. )
  221. continue
  222. if encoding_iana in {"utf_7"} and not bom_or_sig_available:
  223. logger.log(
  224. TRACE,
  225. "Encoding %s won't be tested as-is because detection is unreliable without BOM/SIG.",
  226. encoding_iana,
  227. )
  228. continue
  229. # Skip encodings similar to ones that already soft-failed (high mess ratio).
  230. # Checked BEFORE the expensive decode attempt.
  231. if encoding_iana in soft_failure_skip:
  232. logger.log(
  233. TRACE,
  234. "%s is deemed too similar to a code page that was already considered unsuited. Continuing!",
  235. encoding_iana,
  236. )
  237. continue
  238. # Skip encodings that were already fast-tracked from a similar successful encoding.
  239. if encoding_iana in success_fast_tracked:
  240. logger.log(
  241. TRACE,
  242. "Skipping %s: already fast-tracked from a similar successful encoding.",
  243. encoding_iana,
  244. )
  245. continue
  246. try:
  247. is_multi_byte_decoder: bool = is_multi_byte_encoding(encoding_iana)
  248. except (ModuleNotFoundError, ImportError): # Defensive:
  249. logger.log(
  250. TRACE,
  251. "Encoding %s does not provide an IncrementalDecoder",
  252. encoding_iana,
  253. )
  254. continue
  255. # When we've already found a definitive match (chaos=0.0 with good coherence)
  256. # after testing the prioritized encodings, skip encodings that target
  257. # completely different language families. This avoids running expensive
  258. # mess_ratio + coherence_ratio on clearly unrelated candidates (e.g., Cyrillic
  259. # when the definitive match is Latin-based).
  260. if definitive_match_found:
  261. if not is_multi_byte_decoder:
  262. enc_languages = set(encoding_languages(encoding_iana))
  263. else:
  264. enc_languages = set(mb_encoding_languages(encoding_iana))
  265. if not enc_languages.intersection(definitive_target_languages):
  266. logger.log(
  267. TRACE,
  268. "Skipping %s: definitive match already found, this encoding targets different languages (%s vs %s).",
  269. encoding_iana,
  270. enc_languages,
  271. definitive_target_languages,
  272. )
  273. continue
  274. # After the definitive match, cap the number of additional same-family
  275. # single-byte encodings that pass chaos probing. This avoids testing the
  276. # tail of rare, low-value same-family encodings (mac_iceland, cp860, etc.)
  277. # that almost never change best() but each cost ~1-2ms of mess_ratio + coherence.
  278. if (
  279. definitive_match_found
  280. and not is_multi_byte_decoder
  281. and post_definitive_sb_success_count >= POST_DEFINITIVE_SB_CAP
  282. ):
  283. logger.log(
  284. TRACE,
  285. "Skipping %s: already accumulated %d same-family results after definitive match (cap=%d).",
  286. encoding_iana,
  287. post_definitive_sb_success_count,
  288. POST_DEFINITIVE_SB_CAP,
  289. )
  290. continue
  291. # When a multibyte encoding with significant multibyte content has already
  292. # passed chaos probing, skip all single-byte encodings. They will either fail
  293. # chaos probing (wasting mess_ratio time) or produce inferior results.
  294. if mb_definitive_match_found and not is_multi_byte_decoder:
  295. logger.log(
  296. TRACE,
  297. "Skipping single-byte %s: multi-byte definitive match already found.",
  298. encoding_iana,
  299. )
  300. continue
  301. try:
  302. if is_too_large_sequence and is_multi_byte_decoder is False:
  303. str(
  304. (
  305. sequences[: int(50e4)]
  306. if strip_sig_or_bom is False
  307. else sequences[len(sig_payload) : int(50e4)]
  308. ),
  309. encoding=encoding_iana,
  310. )
  311. else:
  312. decoded_payload = str(
  313. (
  314. sequences
  315. if strip_sig_or_bom is False
  316. else sequences[len(sig_payload) :]
  317. ),
  318. encoding=encoding_iana,
  319. )
  320. except (UnicodeDecodeError, LookupError) as e:
  321. if not isinstance(e, LookupError):
  322. logger.log(
  323. TRACE,
  324. "Code page %s does not fit given bytes sequence at ALL. %s",
  325. encoding_iana,
  326. str(e),
  327. )
  328. tested_but_hard_failure.append(encoding_iana)
  329. continue
  330. r_ = range(
  331. 0 if not bom_or_sig_available else len(sig_payload),
  332. length,
  333. int(length / steps),
  334. )
  335. multi_byte_bonus: bool = (
  336. is_multi_byte_decoder
  337. and decoded_payload is not None
  338. and len(decoded_payload) < length
  339. )
  340. if multi_byte_bonus:
  341. logger.log(
  342. TRACE,
  343. "Code page %s is a multi byte encoding table and it appear that at least one character "
  344. "was encoded using n-bytes.",
  345. encoding_iana,
  346. )
  347. # Payload-hash deduplication: if another encoding already decoded to the
  348. # exact same string, reuse its mess_ratio and coherence results entirely.
  349. # This is strictly more general than the old IANA_SUPPORTED_SIMILAR approach
  350. # because it catches ALL identical decoding, not just pre-mapped ones.
  351. if decoded_payload is not None and not is_multi_byte_decoder:
  352. payload_hash: int = hash(decoded_payload)
  353. cached = payload_result_cache.get(payload_hash)
  354. if cached is not None:
  355. cached_mess, cached_cd, cached_passed = cached
  356. if cached_passed:
  357. # The previous encoding with identical output passed chaos probing.
  358. fast_match = CharsetMatch(
  359. sequences,
  360. encoding_iana,
  361. cached_mess,
  362. bom_or_sig_available,
  363. cached_cd,
  364. (
  365. decoded_payload
  366. if (
  367. is_too_large_sequence is False
  368. or encoding_iana
  369. in [specified_encoding, "ascii", "utf_8"]
  370. )
  371. else None
  372. ),
  373. preemptive_declaration=specified_encoding,
  374. )
  375. results.append(fast_match)
  376. success_fast_tracked.add(encoding_iana)
  377. logger.log(
  378. TRACE,
  379. "%s fast-tracked (identical decoded payload to a prior encoding, chaos=%f %%).",
  380. encoding_iana,
  381. round(cached_mess * 100, ndigits=3),
  382. )
  383. if (
  384. encoding_iana in [specified_encoding, "ascii", "utf_8"]
  385. and cached_mess < 0.1
  386. ):
  387. if cached_mess == 0.0:
  388. logger.debug(
  389. "Encoding detection: %s is most likely the one.",
  390. fast_match.encoding,
  391. )
  392. if explain:
  393. logger.removeHandler(explain_handler)
  394. logger.setLevel(previous_logger_level)
  395. return CharsetMatches([fast_match])
  396. early_stop_results.append(fast_match)
  397. if (
  398. len(early_stop_results)
  399. and (specified_encoding is None or specified_encoding in tested)
  400. and "ascii" in tested
  401. and "utf_8" in tested
  402. ):
  403. probable_result: CharsetMatch = early_stop_results.best() # type: ignore[assignment]
  404. logger.debug(
  405. "Encoding detection: %s is most likely the one.",
  406. probable_result.encoding,
  407. )
  408. if explain:
  409. logger.removeHandler(explain_handler)
  410. logger.setLevel(previous_logger_level)
  411. return CharsetMatches([probable_result])
  412. continue
  413. else:
  414. # The previous encoding with identical output failed chaos probing.
  415. tested_but_soft_failure.append(encoding_iana)
  416. logger.log(
  417. TRACE,
  418. "%s fast-skipped (identical decoded payload to a prior encoding that failed chaos probing).",
  419. encoding_iana,
  420. )
  421. # Prepare fallbacks for special encodings even when skipped.
  422. if enable_fallback and encoding_iana in [
  423. "ascii",
  424. "utf_8",
  425. specified_encoding,
  426. "utf_16",
  427. "utf_32",
  428. ]:
  429. fallback_entry = CharsetMatch(
  430. sequences,
  431. encoding_iana,
  432. threshold,
  433. bom_or_sig_available,
  434. [],
  435. decoded_payload,
  436. preemptive_declaration=specified_encoding,
  437. )
  438. if encoding_iana == specified_encoding:
  439. fallback_specified = fallback_entry
  440. elif encoding_iana == "ascii":
  441. fallback_ascii = fallback_entry
  442. else:
  443. fallback_u8 = fallback_entry
  444. continue
  445. max_chunk_gave_up: int = int(len(r_) / 4)
  446. max_chunk_gave_up = max(max_chunk_gave_up, 2)
  447. early_stop_count: int = 0
  448. lazy_str_hard_failure = False
  449. md_chunks: list[str] = []
  450. md_ratios = []
  451. try:
  452. for chunk in cut_sequence_chunks(
  453. sequences,
  454. encoding_iana,
  455. r_,
  456. chunk_size,
  457. bom_or_sig_available,
  458. strip_sig_or_bom,
  459. sig_payload,
  460. is_multi_byte_decoder,
  461. decoded_payload,
  462. ):
  463. md_chunks.append(chunk)
  464. md_ratios.append(
  465. mess_ratio(
  466. chunk,
  467. threshold,
  468. explain is True and 1 <= len(cp_isolation) <= 2,
  469. )
  470. )
  471. if md_ratios[-1] >= threshold:
  472. early_stop_count += 1
  473. if (early_stop_count >= max_chunk_gave_up) or (
  474. bom_or_sig_available and strip_sig_or_bom is False
  475. ):
  476. break
  477. except (
  478. UnicodeDecodeError
  479. ) as e: # Lazy str loading may have missed something there
  480. logger.log(
  481. TRACE,
  482. "LazyStr Loading: After MD chunk decode, code page %s does not fit given bytes sequence at ALL. %s",
  483. encoding_iana,
  484. str(e),
  485. )
  486. early_stop_count = max_chunk_gave_up
  487. lazy_str_hard_failure = True
  488. # We might want to check the sequence again with the whole content
  489. # Only if initial MD tests passes
  490. if (
  491. not lazy_str_hard_failure
  492. and is_too_large_sequence
  493. and not is_multi_byte_decoder
  494. ):
  495. try:
  496. sequences[int(50e3) :].decode(encoding_iana, errors="strict")
  497. except UnicodeDecodeError as e:
  498. logger.log(
  499. TRACE,
  500. "LazyStr Loading: After final lookup, code page %s does not fit given bytes sequence at ALL. %s",
  501. encoding_iana,
  502. str(e),
  503. )
  504. tested_but_hard_failure.append(encoding_iana)
  505. continue
  506. mean_mess_ratio: float = sum(md_ratios) / len(md_ratios) if md_ratios else 0.0
  507. if mean_mess_ratio >= threshold or early_stop_count >= max_chunk_gave_up:
  508. tested_but_soft_failure.append(encoding_iana)
  509. if encoding_iana in IANA_SUPPORTED_SIMILAR:
  510. soft_failure_skip.update(IANA_SUPPORTED_SIMILAR[encoding_iana])
  511. # Cache this soft-failure so identical decoding from other encodings
  512. # can be skipped immediately.
  513. if decoded_payload is not None and not is_multi_byte_decoder:
  514. payload_result_cache.setdefault(
  515. hash(decoded_payload), (mean_mess_ratio, [], False)
  516. )
  517. logger.log(
  518. TRACE,
  519. "%s was excluded because of initial chaos probing. Gave up %i time(s). "
  520. "Computed mean chaos is %f %%.",
  521. encoding_iana,
  522. early_stop_count,
  523. round(mean_mess_ratio * 100, ndigits=3),
  524. )
  525. # Preparing those fallbacks in case we got nothing.
  526. if (
  527. enable_fallback
  528. and encoding_iana
  529. in ["ascii", "utf_8", specified_encoding, "utf_16", "utf_32"]
  530. and not lazy_str_hard_failure
  531. ):
  532. fallback_entry = CharsetMatch(
  533. sequences,
  534. encoding_iana,
  535. threshold,
  536. bom_or_sig_available,
  537. [],
  538. decoded_payload,
  539. preemptive_declaration=specified_encoding,
  540. )
  541. if encoding_iana == specified_encoding:
  542. fallback_specified = fallback_entry
  543. elif encoding_iana == "ascii":
  544. fallback_ascii = fallback_entry
  545. else:
  546. fallback_u8 = fallback_entry
  547. continue
  548. logger.log(
  549. TRACE,
  550. "%s passed initial chaos probing. Mean measured chaos is %f %%",
  551. encoding_iana,
  552. round(mean_mess_ratio * 100, ndigits=3),
  553. )
  554. if not is_multi_byte_decoder:
  555. target_languages: list[str] = encoding_languages(encoding_iana)
  556. else:
  557. target_languages = mb_encoding_languages(encoding_iana)
  558. if target_languages:
  559. logger.log(
  560. TRACE,
  561. "{} should target any language(s) of {}".format(
  562. encoding_iana, str(target_languages)
  563. ),
  564. )
  565. cd_ratios = []
  566. # Run coherence detection on all chunks. We previously tried limiting to
  567. # 1-2 chunks for post-definitive encodings to save time, but this caused
  568. # coverage regressions by producing unrepresentative coherence scores.
  569. # The SB cap and language-family skip optimizations provide sufficient
  570. # speedup without sacrificing coherence accuracy.
  571. if encoding_iana != "ascii":
  572. # We shall skip the CD when its about ASCII
  573. # Most of the time its not relevant to run "language-detection" on it.
  574. for chunk in md_chunks:
  575. chunk_languages = coherence_ratio(
  576. chunk,
  577. language_threshold,
  578. ",".join(target_languages) if target_languages else None,
  579. )
  580. cd_ratios.append(chunk_languages)
  581. cd_ratios_merged = merge_coherence_ratios(cd_ratios)
  582. else:
  583. cd_ratios_merged = merge_coherence_ratios(cd_ratios)
  584. if cd_ratios_merged:
  585. logger.log(
  586. TRACE,
  587. "We detected language {} using {}".format(
  588. cd_ratios_merged, encoding_iana
  589. ),
  590. )
  591. current_match = CharsetMatch(
  592. sequences,
  593. encoding_iana,
  594. mean_mess_ratio,
  595. bom_or_sig_available,
  596. cd_ratios_merged,
  597. (
  598. decoded_payload
  599. if (
  600. is_too_large_sequence is False
  601. or encoding_iana in [specified_encoding, "ascii", "utf_8"]
  602. )
  603. else None
  604. ),
  605. preemptive_declaration=specified_encoding,
  606. )
  607. results.append(current_match)
  608. # Cache the successful result for payload-hash deduplication.
  609. if decoded_payload is not None and not is_multi_byte_decoder:
  610. payload_result_cache.setdefault(
  611. hash(decoded_payload),
  612. (mean_mess_ratio, cd_ratios_merged, True),
  613. )
  614. # Count post-definitive same-family SB successes for the early termination cap.
  615. # Only count low-mess encodings (< 2%) toward the cap. High-mess encodings are
  616. # marginal results that shouldn't prevent better-quality candidates from being
  617. # tested. For example, iso8859_4 (mess=0%) should not be skipped just because
  618. # 7 high-mess Latin encodings (cp1252 at 8%, etc.) were tried first.
  619. if (
  620. definitive_match_found
  621. and not is_multi_byte_decoder
  622. and mean_mess_ratio < 0.02
  623. ):
  624. post_definitive_sb_success_count += 1
  625. if (
  626. encoding_iana in [specified_encoding, "ascii", "utf_8"]
  627. and mean_mess_ratio < 0.1
  628. ):
  629. # If md says nothing to worry about, then... stop immediately!
  630. if mean_mess_ratio == 0.0:
  631. logger.debug(
  632. "Encoding detection: %s is most likely the one.",
  633. current_match.encoding,
  634. )
  635. if explain: # Defensive: ensure exit path clean handler
  636. logger.removeHandler(explain_handler)
  637. logger.setLevel(previous_logger_level)
  638. return CharsetMatches([current_match])
  639. early_stop_results.append(current_match)
  640. if (
  641. len(early_stop_results)
  642. and (specified_encoding is None or specified_encoding in tested)
  643. and "ascii" in tested
  644. and "utf_8" in tested
  645. ):
  646. probable_result = early_stop_results.best() # type: ignore[assignment]
  647. logger.debug(
  648. "Encoding detection: %s is most likely the one.",
  649. probable_result.encoding, # type: ignore[union-attr]
  650. )
  651. if explain: # Defensive: ensure exit path clean handler
  652. logger.removeHandler(explain_handler)
  653. logger.setLevel(previous_logger_level)
  654. return CharsetMatches([probable_result])
  655. # Once we find a result with good coherence (>= 0.5) after testing the
  656. # prioritized encodings (ascii, utf_8), activate "definitive mode": skip
  657. # encodings that target completely different language families. This avoids
  658. # running expensive mess_ratio + coherence_ratio on clearly unrelated
  659. # candidates (e.g., Cyrillic encodings when the match is Latin-based).
  660. # We require coherence >= 0.5 to avoid false positives (e.g., cp1251 decoding
  661. # Hebrew text with 0.0 chaos but wrong language detection at coherence 0.33).
  662. if not definitive_match_found and not is_multi_byte_decoder:
  663. best_coherence = (
  664. max((v for _, v in cd_ratios_merged), default=0.0)
  665. if cd_ratios_merged
  666. else 0.0
  667. )
  668. if best_coherence >= 0.5 and "ascii" in tested and "utf_8" in tested:
  669. definitive_match_found = True
  670. definitive_target_languages.update(target_languages)
  671. logger.log(
  672. TRACE,
  673. "Definitive match found: %s (chaos=%.3f, coherence=%.2f). Encodings targeting different language families will be skipped.",
  674. encoding_iana,
  675. mean_mess_ratio,
  676. best_coherence,
  677. )
  678. # When a non-UTF multibyte encoding passes chaos probing with significant
  679. # multibyte content (decoded < 98% of raw), activate mb_definitive_match.
  680. # This skips all remaining single-byte encodings which would either soft-fail
  681. # (running expensive mess_ratio for nothing) or produce inferior results.
  682. if (
  683. not mb_definitive_match_found
  684. and is_multi_byte_decoder
  685. and multi_byte_bonus
  686. and decoded_payload is not None
  687. and len(decoded_payload) < length * 0.98
  688. and encoding_iana
  689. not in {
  690. "utf_8",
  691. "utf_8_sig",
  692. "utf_16",
  693. "utf_16_be",
  694. "utf_16_le",
  695. "utf_32",
  696. "utf_32_be",
  697. "utf_32_le",
  698. "utf_7",
  699. }
  700. and "ascii" in tested
  701. and "utf_8" in tested
  702. ):
  703. mb_definitive_match_found = True
  704. logger.log(
  705. TRACE,
  706. "Multi-byte definitive match: %s (chaos=%.3f, decoded=%d/%d=%.1f%%). Single-byte encodings will be skipped.",
  707. encoding_iana,
  708. mean_mess_ratio,
  709. len(decoded_payload),
  710. length,
  711. len(decoded_payload) / length * 100,
  712. )
  713. if encoding_iana == sig_encoding:
  714. logger.debug(
  715. "Encoding detection: %s is most likely the one as we detected a BOM or SIG within "
  716. "the beginning of the sequence.",
  717. encoding_iana,
  718. )
  719. if explain: # Defensive: ensure exit path clean handler
  720. logger.removeHandler(explain_handler)
  721. logger.setLevel(previous_logger_level)
  722. return CharsetMatches([results[encoding_iana]])
  723. if len(results) == 0:
  724. if fallback_u8 or fallback_ascii or fallback_specified:
  725. logger.log(
  726. TRACE,
  727. "Nothing got out of the detection process. Using ASCII/UTF-8/Specified fallback.",
  728. )
  729. if fallback_specified:
  730. logger.debug(
  731. "Encoding detection: %s will be used as a fallback match",
  732. fallback_specified.encoding,
  733. )
  734. results.append(fallback_specified)
  735. elif (
  736. (fallback_u8 and fallback_ascii is None)
  737. or (
  738. fallback_u8
  739. and fallback_ascii
  740. and fallback_u8.fingerprint != fallback_ascii.fingerprint
  741. )
  742. or (fallback_u8 is not None)
  743. ):
  744. logger.debug("Encoding detection: utf_8 will be used as a fallback match")
  745. results.append(fallback_u8)
  746. elif fallback_ascii:
  747. logger.debug("Encoding detection: ascii will be used as a fallback match")
  748. results.append(fallback_ascii)
  749. if results:
  750. logger.debug(
  751. "Encoding detection: Found %s as plausible (best-candidate) for content. With %i alternatives.",
  752. results.best().encoding, # type: ignore
  753. len(results) - 1,
  754. )
  755. else:
  756. logger.debug("Encoding detection: Unable to determine any suitable charset.")
  757. if explain:
  758. logger.removeHandler(explain_handler)
  759. logger.setLevel(previous_logger_level)
  760. return results
  761. def from_fp(
  762. fp: BinaryIO,
  763. steps: int = 5,
  764. chunk_size: int = 512,
  765. threshold: float = 0.20,
  766. cp_isolation: list[str] | None = None,
  767. cp_exclusion: list[str] | None = None,
  768. preemptive_behaviour: bool = True,
  769. explain: bool = False,
  770. language_threshold: float = 0.1,
  771. enable_fallback: bool = True,
  772. ) -> CharsetMatches:
  773. """
  774. Same thing than the function from_bytes but using a file pointer that is already ready.
  775. Will not close the file pointer.
  776. """
  777. return from_bytes(
  778. fp.read(),
  779. steps,
  780. chunk_size,
  781. threshold,
  782. cp_isolation,
  783. cp_exclusion,
  784. preemptive_behaviour,
  785. explain,
  786. language_threshold,
  787. enable_fallback,
  788. )
  789. def from_path(
  790. path: str | bytes | PathLike, # type: ignore[type-arg]
  791. steps: int = 5,
  792. chunk_size: int = 512,
  793. threshold: float = 0.20,
  794. cp_isolation: list[str] | None = None,
  795. cp_exclusion: list[str] | None = None,
  796. preemptive_behaviour: bool = True,
  797. explain: bool = False,
  798. language_threshold: float = 0.1,
  799. enable_fallback: bool = True,
  800. ) -> CharsetMatches:
  801. """
  802. Same thing than the function from_bytes but with one extra step. Opening and reading given file path in binary mode.
  803. Can raise IOError.
  804. """
  805. with open(path, "rb") as fp:
  806. return from_fp(
  807. fp,
  808. steps,
  809. chunk_size,
  810. threshold,
  811. cp_isolation,
  812. cp_exclusion,
  813. preemptive_behaviour,
  814. explain,
  815. language_threshold,
  816. enable_fallback,
  817. )
  818. def is_binary(
  819. fp_or_path_or_payload: PathLike | str | BinaryIO | bytes, # type: ignore[type-arg]
  820. steps: int = 5,
  821. chunk_size: int = 512,
  822. threshold: float = 0.20,
  823. cp_isolation: list[str] | None = None,
  824. cp_exclusion: list[str] | None = None,
  825. preemptive_behaviour: bool = True,
  826. explain: bool = False,
  827. language_threshold: float = 0.1,
  828. enable_fallback: bool = False,
  829. ) -> bool:
  830. """
  831. Detect if the given input (file, bytes, or path) points to a binary file. aka. not a string.
  832. Based on the same main heuristic algorithms and default kwargs at the sole exception that fallbacks match
  833. are disabled to be stricter around ASCII-compatible but unlikely to be a string.
  834. """
  835. if isinstance(fp_or_path_or_payload, (str, PathLike)):
  836. guesses = from_path(
  837. fp_or_path_or_payload,
  838. steps=steps,
  839. chunk_size=chunk_size,
  840. threshold=threshold,
  841. cp_isolation=cp_isolation,
  842. cp_exclusion=cp_exclusion,
  843. preemptive_behaviour=preemptive_behaviour,
  844. explain=explain,
  845. language_threshold=language_threshold,
  846. enable_fallback=enable_fallback,
  847. )
  848. elif isinstance(
  849. fp_or_path_or_payload,
  850. (
  851. bytes,
  852. bytearray,
  853. ),
  854. ):
  855. guesses = from_bytes(
  856. fp_or_path_or_payload,
  857. steps=steps,
  858. chunk_size=chunk_size,
  859. threshold=threshold,
  860. cp_isolation=cp_isolation,
  861. cp_exclusion=cp_exclusion,
  862. preemptive_behaviour=preemptive_behaviour,
  863. explain=explain,
  864. language_threshold=language_threshold,
  865. enable_fallback=enable_fallback,
  866. )
  867. else:
  868. guesses = from_fp(
  869. fp_or_path_or_payload,
  870. steps=steps,
  871. chunk_size=chunk_size,
  872. threshold=threshold,
  873. cp_isolation=cp_isolation,
  874. cp_exclusion=cp_exclusion,
  875. preemptive_behaviour=preemptive_behaviour,
  876. explain=explain,
  877. language_threshold=language_threshold,
  878. enable_fallback=enable_fallback,
  879. )
  880. return not guesses