diff --git a/ofxtools/header.py b/ofxtools/header.py index 1f7bbba..9d26c8b 100644 --- a/ofxtools/header.py +++ b/ofxtools/header.py @@ -261,14 +261,26 @@ def parse_header(source: BinaryIO) -> Tuple[OFXHeaderType, str]: """ logger.info("Parsing OFX header") - # Skip any empty lines at the beginning - while True: + # Skip empty lines at the beginning + # If we can't find any content in the first 8 lines, there's + # something wrong with the data. + found_header = False + for _ in range(8): + # Remember the position within the file where the header begins + # We'll need this, plus the offset to the end of the regex, to seek() to the + # start of the body tag soup when parsing OFXv1 + header_start = source.tell() + # OFX header is read by nice clean machines, not meatbags - # should not contain 💩, 漢字, or what have you. line = source.readline().decode("ascii") if line.strip(): + found_header = True break + if not found_header: + raise OFXHeaderError(f"Invalid OFX header - {source}") + # If the first non-empty line contains an XML declaration, it's OFX v2 xml_match = XML_REGEX.match(line) if xml_match: @@ -289,9 +301,10 @@ def parse_header(source: BinaryIO) -> Tuple[OFXHeaderType, str]: rawheader = line + "\n" # First line is OFXHEADER; need to read next 8 lines for a fixed # total of 9 fields required by OFX v1 spec. - for n in range(8): + for _ in range(8): rawheader += source.readline().decode("ascii") - header, header_end_index = OFXHeaderV1.parse(rawheader) + + header, header_end_offset = OFXHeaderV1.parse(rawheader) # Input source stream position should have advanced to the beginning of # the OFX body tag soup, which is where subsequent calls @@ -299,13 +312,13 @@ def parse_header(source: BinaryIO) -> Tuple[OFXHeaderType, str]: # # The seek call will correct the position when \r newline character is used # (Issue #84) - source.seek(header_end_index) + source.seek(header_start + header_end_offset) # Decode the OFX data body according to the encoding declared # in the OFX header - message = source.read().decode(header.codec) + message = source.read().decode(header.codec).strip() - return header, message.strip() + return header, message def make_header( diff --git a/tests/test_header.py b/tests/test_header.py index 3c9392b..432bf12 100644 --- a/tests/test_header.py +++ b/tests/test_header.py @@ -390,6 +390,46 @@ def testNoLineBreaksAnywhere(self): self.assertEqual(body, body_) + def testHeaderlessDoomScroll(self): + # Some FIs apparently send endless empty lines with no header. + # To prevent that, we allow a maximum of 7 empty lines + # before getting down to brass tacks. + header = "\r\n" * 7 + header += ( + "OFXHEADER: 100\r\n" + "DATA: OFXSGML\r\n" + "VERSION: 160\r\n" + "SECURITY: NONE\r\n" + "ENCODING: USASCII\r\n" + "CHARSET: NONE\r\n" + "COMPRESSION: NONE\r\n" + "OLDFILEUID: NONE\r\n" + "NEWFILEUID: NONE\r\n" + ) + + ofx = header + self.body + ofx = BytesIO(ofx.encode("utf8")) + ofxheader, body = ofxtools.header.parse_header(ofx) + + self.assertEqual(ofxheader.ofxheader, 100) + self.assertEqual(ofxheader.data, "OFXSGML") + self.assertEqual(ofxheader.version, 160) + self.assertEqual(ofxheader.security, "NONE") + self.assertEqual(ofxheader.encoding, "USASCII") + self.assertEqual(ofxheader.charset, "NONE") + self.assertEqual(ofxheader.compression, "NONE") + self.assertEqual(ofxheader.oldfileuid, "NONE") + self.assertEqual(ofxheader.newfileuid, "NONE") + + self.assertEqual(body, self.body) + + # 8 empty lines before header should fail + header = "\r\n" + header + ofx = header + self.body + ofx = BytesIO(ofx.encode("utf8")) + with self.assertRaises(ofxtools.header.OFXHeaderError): + ofxtools.header.parse_header(ofx) + class OFXHeaderV2TestCase(unittest.TestCase, OFXHeaderTestMixin): headerClass = ofxtools.header.OFXHeaderV2