-
-
Notifications
You must be signed in to change notification settings - Fork 2k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
refactor MultipartWriter to use Payload
- Loading branch information
Showing
12 changed files
with
787 additions
and
863 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,122 @@ | ||
import io | ||
from urllib.parse import urlencode | ||
|
||
from multidict import MultiDict, MultiDictProxy | ||
|
||
from . import hdrs, multipart, payload | ||
from .helpers import guess_filename | ||
|
||
__all__ = ('FormData',) | ||
|
||
|
||
class FormData: | ||
"""Helper class for multipart/form-data and | ||
application/x-www-form-urlencoded body generation.""" | ||
|
||
def __init__(self, fields=(), quote_fields=True): | ||
self._writer = multipart.MultipartWriter('form-data') | ||
self._fields = [] | ||
self._is_multipart = False | ||
self._quote_fields = quote_fields | ||
|
||
if isinstance(fields, dict): | ||
fields = list(fields.items()) | ||
elif not isinstance(fields, (list, tuple)): | ||
fields = (fields,) | ||
self.add_fields(*fields) | ||
|
||
def add_field(self, name, value, *, content_type=None, filename=None, | ||
content_transfer_encoding=None): | ||
|
||
if isinstance(value, io.IOBase): | ||
self._is_multipart = True | ||
elif isinstance(value, (bytes, bytearray, memoryview)): | ||
if filename is None and content_transfer_encoding is None: | ||
filename = name | ||
|
||
type_options = MultiDict({'name': name}) | ||
if filename is not None and not isinstance(filename, str): | ||
raise TypeError('filename must be an instance of str. ' | ||
'Got: %s' % filename) | ||
if filename is None and isinstance(value, io.IOBase): | ||
filename = guess_filename(value, name) | ||
if filename is not None: | ||
type_options['filename'] = filename | ||
self._is_multipart = True | ||
|
||
headers = {} | ||
if content_type is not None: | ||
if not isinstance(content_type, str): | ||
raise TypeError('content_type must be an instance of str. ' | ||
'Got: %s' % content_type) | ||
headers[hdrs.CONTENT_TYPE] = content_type | ||
self._is_multipart = True | ||
if content_transfer_encoding is not None: | ||
if not isinstance(content_transfer_encoding, str): | ||
raise TypeError('content_transfer_encoding must be an instance' | ||
' of str. Got: %s' % content_transfer_encoding) | ||
headers[hdrs.CONTENT_TRANSFER_ENCODING] = content_transfer_encoding | ||
self._is_multipart = True | ||
|
||
self._fields.append((type_options, headers, value)) | ||
|
||
def add_fields(self, *fields): | ||
to_add = list(fields) | ||
|
||
while to_add: | ||
rec = to_add.pop(0) | ||
|
||
if isinstance(rec, io.IOBase): | ||
k = guess_filename(rec, 'unknown') | ||
self.add_field(k, rec) | ||
|
||
elif isinstance(rec, (MultiDictProxy, MultiDict)): | ||
to_add.extend(rec.items()) | ||
|
||
elif isinstance(rec, (list, tuple)) and len(rec) == 2: | ||
k, fp = rec | ||
self.add_field(k, fp) | ||
|
||
else: | ||
raise TypeError('Only io.IOBase, multidict and (name, file) ' | ||
'pairs allowed, use .add_field() for passing ' | ||
'more complex parameters, got {!r}' | ||
.format(rec)) | ||
|
||
def _gen_form_urlencoded(self, encoding): | ||
# form data (x-www-form-urlencoded) | ||
data = [] | ||
for type_options, _, value in self._fields: | ||
data.append((type_options['name'], value)) | ||
|
||
return payload.BytesPayload( | ||
urlencode(data, doseq=True).encode(encoding), | ||
content_type='application/x-www-form-urlencoded') | ||
|
||
def _gen_form_data(self, encoding): | ||
"""Encode a list of fields using the multipart/form-data MIME format""" | ||
for dispparams, headers, value in self._fields: | ||
if hdrs.CONTENT_TYPE in headers: | ||
part = payload.get_payload( | ||
value, content_type=headers[hdrs.CONTENT_TYPE], | ||
headers=headers, encoding=encoding) | ||
else: | ||
part = payload.get_payload( | ||
value, headers=headers, encoding=encoding) | ||
if dispparams: | ||
part.set_content_disposition( | ||
'form-data', quote_fields=self._quote_fields, **dispparams | ||
) | ||
# FIXME cgi.FieldStorage doesn't likes body parts with | ||
# Content-Length which were sent via chunked transfer encoding | ||
part.headers.pop(hdrs.CONTENT_LENGTH, None) | ||
|
||
self._writer.append_payload(part) | ||
|
||
return self._writer | ||
|
||
def __call__(self, encoding): | ||
if self._is_multipart: | ||
return self._gen_form_data(encoding) | ||
else: | ||
return self._gen_form_urlencoded(encoding) |
Oops, something went wrong.