formdata.py 6.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179
  1. import io
  2. import warnings
  3. from typing import Any, Iterable, List, Optional
  4. from urllib.parse import urlencode
  5. from multidict import MultiDict, MultiDictProxy
  6. from . import hdrs, multipart, payload
  7. from .helpers import guess_filename
  8. from .payload import Payload
  9. __all__ = ("FormData",)
  10. class FormData:
  11. """Helper class for form body generation.
  12. Supports multipart/form-data and application/x-www-form-urlencoded.
  13. """
  14. def __init__(
  15. self,
  16. fields: Iterable[Any] = (),
  17. quote_fields: bool = True,
  18. charset: Optional[str] = None,
  19. *,
  20. default_to_multipart: bool = False,
  21. ) -> None:
  22. self._writer = multipart.MultipartWriter("form-data")
  23. self._fields: List[Any] = []
  24. self._is_multipart = default_to_multipart
  25. self._quote_fields = quote_fields
  26. self._charset = charset
  27. if isinstance(fields, dict):
  28. fields = list(fields.items())
  29. elif not isinstance(fields, (list, tuple)):
  30. fields = (fields,)
  31. self.add_fields(*fields)
  32. @property
  33. def is_multipart(self) -> bool:
  34. return self._is_multipart
  35. def add_field(
  36. self,
  37. name: str,
  38. value: Any,
  39. *,
  40. content_type: Optional[str] = None,
  41. filename: Optional[str] = None,
  42. content_transfer_encoding: Optional[str] = None,
  43. ) -> None:
  44. if isinstance(value, io.IOBase):
  45. self._is_multipart = True
  46. elif isinstance(value, (bytes, bytearray, memoryview)):
  47. msg = (
  48. "In v4, passing bytes will no longer create a file field. "
  49. "Please explicitly use the filename parameter or pass a BytesIO object."
  50. )
  51. if filename is None and content_transfer_encoding is None:
  52. warnings.warn(msg, DeprecationWarning)
  53. filename = name
  54. type_options: MultiDict[str] = MultiDict({"name": name})
  55. if filename is not None and not isinstance(filename, str):
  56. raise TypeError("filename must be an instance of str. Got: %s" % filename)
  57. if filename is None and isinstance(value, io.IOBase):
  58. filename = guess_filename(value, name)
  59. if filename is not None:
  60. type_options["filename"] = filename
  61. self._is_multipart = True
  62. headers = {}
  63. if content_type is not None:
  64. if not isinstance(content_type, str):
  65. raise TypeError(
  66. "content_type must be an instance of str. Got: %s" % content_type
  67. )
  68. headers[hdrs.CONTENT_TYPE] = content_type
  69. self._is_multipart = True
  70. if content_transfer_encoding is not None:
  71. if not isinstance(content_transfer_encoding, str):
  72. raise TypeError(
  73. "content_transfer_encoding must be an instance"
  74. " of str. Got: %s" % content_transfer_encoding
  75. )
  76. msg = (
  77. "content_transfer_encoding is deprecated. "
  78. "To maintain compatibility with v4 please pass a BytesPayload."
  79. )
  80. warnings.warn(msg, DeprecationWarning)
  81. self._is_multipart = True
  82. self._fields.append((type_options, headers, value))
  83. def add_fields(self, *fields: Any) -> None:
  84. to_add = list(fields)
  85. while to_add:
  86. rec = to_add.pop(0)
  87. if isinstance(rec, io.IOBase):
  88. k = guess_filename(rec, "unknown")
  89. self.add_field(k, rec) # type: ignore[arg-type]
  90. elif isinstance(rec, (MultiDictProxy, MultiDict)):
  91. to_add.extend(rec.items())
  92. elif isinstance(rec, (list, tuple)) and len(rec) == 2:
  93. k, fp = rec
  94. self.add_field(k, fp)
  95. else:
  96. raise TypeError(
  97. "Only io.IOBase, multidict and (name, file) "
  98. "pairs allowed, use .add_field() for passing "
  99. "more complex parameters, got {!r}".format(rec)
  100. )
  101. def _gen_form_urlencoded(self) -> payload.BytesPayload:
  102. # form data (x-www-form-urlencoded)
  103. data = []
  104. for type_options, _, value in self._fields:
  105. data.append((type_options["name"], value))
  106. charset = self._charset if self._charset is not None else "utf-8"
  107. if charset == "utf-8":
  108. content_type = "application/x-www-form-urlencoded"
  109. else:
  110. content_type = "application/x-www-form-urlencoded; charset=%s" % charset
  111. return payload.BytesPayload(
  112. urlencode(data, doseq=True, encoding=charset).encode(),
  113. content_type=content_type,
  114. )
  115. def _gen_form_data(self) -> multipart.MultipartWriter:
  116. """Encode a list of fields using the multipart/form-data MIME format"""
  117. for dispparams, headers, value in self._fields:
  118. try:
  119. if hdrs.CONTENT_TYPE in headers:
  120. part = payload.get_payload(
  121. value,
  122. content_type=headers[hdrs.CONTENT_TYPE],
  123. headers=headers,
  124. encoding=self._charset,
  125. )
  126. else:
  127. part = payload.get_payload(
  128. value, headers=headers, encoding=self._charset
  129. )
  130. except Exception as exc:
  131. raise TypeError(
  132. "Can not serialize value type: %r\n "
  133. "headers: %r\n value: %r" % (type(value), headers, value)
  134. ) from exc
  135. if dispparams:
  136. part.set_content_disposition(
  137. "form-data", quote_fields=self._quote_fields, **dispparams
  138. )
  139. # FIXME cgi.FieldStorage doesn't likes body parts with
  140. # Content-Length which were sent via chunked transfer encoding
  141. assert part.headers is not None
  142. part.headers.popall(hdrs.CONTENT_LENGTH, None)
  143. self._writer.append_payload(part)
  144. self._fields.clear()
  145. return self._writer
  146. def __call__(self) -> Payload:
  147. if self._is_multipart:
  148. return self._gen_form_data()
  149. else:
  150. return self._gen_form_urlencoded()