From 7428a3442372800a86a67bb4e778a3622269ee52 Mon Sep 17 00:00:00 2001 From: ecederstrand Date: Sun, 23 Jul 2023 11:42:40 +0200 Subject: [PATCH] fix: Handle invalid XML also in streaming XML parser. Fixes #1200 --- exchangelib/util.py | 16 +++++++++++----- tests/test_util.py | 9 ++++++++- 2 files changed, 19 insertions(+), 6 deletions(-) diff --git a/exchangelib/util.py b/exchangelib/util.py index 4f3605a6..00c90554 100644 --- a/exchangelib/util.py +++ b/exchangelib/util.py @@ -68,6 +68,7 @@ def __init__(self, msg, data): # Regex of UTF-8 control characters that are illegal in XML 1.0 (and XML 1.1). # See https://stackoverflow.com/a/22273639/219640 _ILLEGAL_XML_CHARS_RE = re.compile("[\x00-\x08\x0B-\x0C\x0E-\x1F\x7F-\x84\x86-\x9F\uFDD0-\uFDDF\uFFFE\uFFFF]") +_ILLEGAL_XML_ESCAPE_CHARS_RE = re.compile(rb"&(#[0-9]+;?|#[xX][0-9a-fA-F]+;?)") # Could match the above better # XML namespaces SOAPNS = "http://schemas.xmlsoap.org/soap/envelope/" @@ -268,6 +269,10 @@ def safe_xml_value(value, replacement="?"): return _ILLEGAL_XML_CHARS_RE.sub(replacement, value) +def sanitize_xml(data, replacement=b"?"): + return _ILLEGAL_XML_ESCAPE_CHARS_RE.sub(replacement, data) + + def create_element(name, attrs=None, nsmap=None): if ":" in name: ns, name = name.split(":") @@ -362,19 +367,20 @@ def parse(self, r): collected_data = [] while buffer: if not self.element_found: - collected_data += buffer + collected_data.extend(buffer) yield from self.feed(buffer) buffer = file.read(self._bufsize) # Any remaining data in self.buffer should be padding chars now self.buffer = None self.close() if not self.element_found: - data = bytes(collected_data) - raise ElementNotFound("The element to be streamed from was not found", data=bytes(data)) + raise ElementNotFound("The element to be streamed from was not found", data=bytes(collected_data)) def feed(self, data, isFinal=0): - """Yield the current content of the character buffer.""" - DefusedExpatParser.feed(self, data=data, isFinal=isFinal) + """Yield the current content of the character buffer. The input XML may contain illegal characters. The lxml + parser handles this gracefully with the 'recover' option, but ExpatParser doesn't have this option. Remove + illegal characters before parsing.""" + DefusedExpatParser.feed(self, data=sanitize_xml(data), isFinal=isFinal) return self._decode_buffer() def _decode_buffer(self): diff --git a/tests/test_util.py b/tests/test_util.py index 46f99301..5f449f70 100644 --- a/tests/test_util.py +++ b/tests/test_util.py @@ -24,6 +24,7 @@ DocumentYielder, ParseError, PrettyXmlHandler, + StreamingBase64Parser, chunkify, get_domain, get_redirect_url, @@ -129,8 +130,9 @@ def test_get_redirect_url(self, m): def test_to_xml(self): to_xml(b'') + to_xml(b'&broken') + to_xml(b'') to_xml(BOM_UTF8 + b'') - to_xml(BOM_UTF8 + b'&broken') with self.assertRaises(ParseError): to_xml(b"foo") @@ -166,6 +168,11 @@ def test_xml_to_str(self): with self.assertRaises(AttributeError): xml_to_str("XXX", encoding=None, xml_declaration=True) + def test_streaming_parser(self): + StreamingBase64Parser().feed(b"SomeName.png", 1) + # Test that we can handle invalid chars in the streaming parser + StreamingBase64Parser().feed(b"SomeName.png", 1) + def test_anonymizing_handler(self): h = AnonymizingXmlHandler(forbidden_strings=("XXX", "yyy")) self.assertEqual(