From da36214fc4de9b073d3e6d9562e588d553509ee1 Mon Sep 17 00:00:00 2001 From: Jessica James Date: Fri, 18 Apr 2025 17:23:30 -0400 Subject: [PATCH 1/8] GH-115512: Optimize peak memory usage and runtime for large emails --- Lib/email/feedparser.py | 466 ++++++++++++++++++++++++------ Lib/email/parser.py | 41 ++- Lib/test/test_email/test_email.py | 204 +++++++++++++ Misc/ACKS | 1 + 4 files changed, 610 insertions(+), 102 deletions(-) diff --git a/Lib/email/feedparser.py b/Lib/email/feedparser.py index b2bc4afc1cc26f..26c226f589c4a0 100644 --- a/Lib/email/feedparser.py +++ b/Lib/email/feedparser.py @@ -22,6 +22,7 @@ __all__ = ['FeedParser', 'BytesFeedParser'] import re +import sys from email import errors from email._policybase import compat32 @@ -52,31 +53,30 @@ class BufferedSubFile(object): simple abstraction -- it parses until EOF closes the current message. """ def __init__(self): - # Text stream of the last partial line pushed into this object. - # See issue 22233 for why this is a text stream and not a list. - self._partial = StringIO(newline='') + self._partial: list[str] = [] + self._dangling_partial: bool = False # A deque of full, pushed lines - self._lines = deque() + self._lines: deque[str] = deque() # The stack of false-EOF checking predicates. self._eofstack = [] # A flag indicating whether the file has been closed or not. - self._closed = False + self._closed: bool = False + self._dump_destination: deque[str]|None = None + self._dump_result: str|None = None - def push_eof_matcher(self, pred): + def push_eof_matcher(self, pred) -> None: self._eofstack.append(pred) def pop_eof_matcher(self): return self._eofstack.pop() - def close(self): + def close(self) -> None: # Don't forget any trailing partial line. - self._partial.seek(0) - self.pushlines(self._partial.readlines()) - self._partial.seek(0) - self._partial.truncate() + if self._partial: + self._flush_partial() self._closed = True - def readline(self): + def readline(self) -> str|object: if not self._lines: if self._closed: return '' @@ -87,51 +87,351 @@ def readline(self): # RFC 2046, section 5.1.2 requires us to recognize outer level # boundaries at any level of inner nesting. Do this, but be sure it's # in the order of most to least nested. - for ateof in reversed(self._eofstack): - if ateof(line): - # We're at the false EOF. But push the last line back first. - self._lines.appendleft(line) - return '' + if self._check_eofstack(line): + # We're at the false EOF. But push the last line back first. + self._lines.appendleft(line) + return '' + return line - def unreadline(self, line): + def _check_eofstack(self, data, start=0, end=sys.maxsize) -> bool: + for ateof in reversed(self._eofstack): + if ateof(data, start, end): + # We're at the false EOF. + return True + + return False + + def unreadline(self, line) -> None: # Let the consumer push a line back into the buffer. assert line is not NeedMoreData self._lines.appendleft(line) - def push(self, data): + def _flush_partial(self) -> None: + line = EMPTYSTRING.join(self._partial) + if not line: + pass + elif self._dump_destination is None: + # We're not dumping data. Just flush the partial to lines, as normal + self._lines.append(line) + elif self._check_eofstack(line): + # We were dumping, but we've now reached the end of the dump. Push our line and stop dumping. + self._dump_destination = None + self._lines.append(line) + else: + # We're still dumping; push to dump + self._dump_destination.append(line) + + self._partial.clear() + self._dangling_partial = False + + def push(self, data) -> None: """Push some new data into this object.""" - self._partial.write(data) - if '\n' not in data and '\r' not in data: + if not data: + return + + # If we're dumping, and we don't have anything that will ever tell us to terminate, simply dump everything + if self._can_dump_data(data): + self._dump_destination.append(data) + return + + self._push_data(data) + + def _can_dump_data(self, data) -> bool: + if self._dump_destination is None: + # We're not dumping data + return False + + # We're dumping; check for easy optimizations + if not self._eofstack: + # There's nothing that will ever tell us to stop dumping. Go ahead and dump the entire `data` object. + # This does absolute wonders for large non-multipart emails. + assert not self._lines + assert not self._dangling_partial + assert not self._partial + return True + + # We can't dump this blob if we have pending partial data + if self._partial: + return False + + all_boundary_matches = True + for pred in self._eofstack: + if not hasattr(pred, 'is_boundary_match'): + all_boundary_matches = False + break + + if all_boundary_matches and '-' not in data: + # We eventually need to stop, but we only care about boundary matches, and there's no boundaries + # here. Dump the entire `data` object. This does wonders for multipart emails with large parts. + assert not self._lines + return True + + # We're still dumping, but there's a potential boundary marker or EOF or similar issue. Force a proper parse. + return False + + def _can_dump_partial(self, line, start: int=0, end: int=sys.maxsize) -> bool: + # Very similar to _can_dump_data above, except we can make some additional assumptions for partials/lines. + # This should only ever be checked when we have a new partial line, in which case we have no partial, + # or when checking the partial itself, in which case it'll always be the first part + assert not self._partial or line is self._partial[0] + + if self._dump_destination is None: + # We're not dumping data + return False + + # We're dumping. There should be absolutely no other pending lines, because those should've been dumped. + assert not self._lines + if not self._eofstack: + # There's nothing that will ever tell us to stop dumping. Dump away. + return True + + all_boundary_matches = True + for pred in self._eofstack: + if not hasattr(pred, 'is_boundary_match'): + all_boundary_matches = False + break + + if all_boundary_matches and not line.startswith("-", start, end): + # We eventually need to stop, but we only care about boundary matches, and there's no boundaries + # here. Dump the entire `data` object. This does wonders for multipart emails with large parts. + return True + + # We're still dumping, but there's a potential boundary marker or EOF or similar issue. Force a proper parse. + return False + + def _is_dump_midline(self): + if not self._dump_destination: + return False + + assert self._dump_destination[-1] # Never push empty strings to _dump_destination + return self._dump_destination[-1][-1] not in ('\n', '\r') + + def _push_data(self, data) -> None: + # Find first newline character in the data + unl_start_index = BufferedSubFile._find_unl(data) + if unl_start_index < 0: # No new complete lines, wait for more. + # Check to see if we had a previous dangling partial newline + if self._dangling_partial: + # We previously pushed a dangling line expecting a \n to follow, however we received other data instead. + # Therefore, that \r does actually terminate a line. Go ahead and push it. + self._flush_partial() + + # No lines in data to push; wait for more data + if self._is_dump_midline(): + assert not self._partial + self._dump_destination.append(data) + else: + self._partial.append(data) return - # Crack into lines, preserving the linesep characters. - self._partial.seek(0) - parts = self._partial.readlines() - self._partial.seek(0) - self._partial.truncate() - - # If the last element of the list does not end in a newline, then treat - # it as a partial line. We only check for '\n' here because a line - # ending with '\r' might be a line that was split in the middle of a - # '\r\n' sequence (see bugs 1555570 and 1721862). - if not parts[-1].endswith('\n'): - self._partial.write(parts.pop()) - self.pushlines(parts) - - def pushlines(self, lines): + data_start_index = 0 + + # Complete our previous/partial line + if self._partial: + # Check to see if we had any dangling newlines in our partial, and handle if appropriate + if self._dangling_partial: + # We had a previously dangling line; this is either a \n (completion), or some other char (termination) + if data[0] != NL: + # "\r" -- push what we had, as it has been terminated; data_start_index = 0 + self._flush_partial() + else: + # "\r\n" -- append \n and push it; data_start_index = 1 + self._partial.append(NL) + self._flush_partial() + data_start_index = 1 + + # Find the next newline + unl_start_index = BufferedSubFile._find_unl(data, data_start_index) + # Fall through + else: + # Our partial has no dangling newline; complete our partial with the new line and push it + unl_end_index = BufferedSubFile._find_unl_end(data, unl_start_index) + if unl_end_index < 0: + # The newline is incomplete; append data and return + self._partial.append(data) + self._dangling_partial = True + return + + # We have a complete line; append it and flush _partial + self._partial.append(data[data_start_index:unl_end_index]) + self._flush_partial() + data_start_index = unl_end_index + + # Find the next newline + unl_start_index = BufferedSubFile._find_unl(data, data_start_index) + # Fall through + + # _partial is now guaranteed to point to be empty + # data_start_index is an index which points to the start of the next line + # unl_start_index is an index which points to the start of the next newline character, if there is one + self._push_data_no_partial(data, data_start_index, unl_start_index) + + def _push_data_no_partial(self, data, data_start_index: int, unl_start_index: int) -> None: + # _partial is now guaranteed to point to be empty + # data_start_index is an index which points to the start of the next line + # unl_start_index is an index which points to the start of the next newline character, if there is one + + # Process any remaining whole lines in data + if unl_start_index < 0: + # Push right to the partial if there's no lines + if data_start_index < len(data): + assert data_start_index >= 0 + partial_line = data[data_start_index:] + if self._is_dump_midline() or self._can_dump_partial(partial_line): + self._dump_destination.append(partial_line) + else: + self._partial = [partial_line] + if data[-1] == '\r': + self._dangling_partial = True + elif self._dump_destination is None and unl_start_index < len(data) // 2: + # If it looks like we're going to be doing a lot of splits/joins, just go ahead and use StringIO, for speed + # If we had some sort of "StringViewIO" to avoid the copy, this would be significantly more efficient + # This code block, and the "else" code block below, functionally do the exact same thing, except this path + # makes no attempt to handle dumping data + sio = StringIO(data, '') + sio.seek(data_start_index) + lines = sio.readlines() + if lines: + if data[-1] != '\n': + self._partial.append(lines.pop()) + if data[-1] == '\r': + self._dangling_partial = True + + self.pushlines(lines) + else: + # If we're not, let's keep it in Python + dump_data_start = None if self._dump_destination is None else data_start_index + while unl_start_index >= 0: + unl_end_index = BufferedSubFile._find_unl_end(data, unl_start_index) + if unl_end_index < 0: + # Incomplete line ending; break to update our partial and return + self._dangling_partial = True + break + + # We have an easy line; push it + if self._dump_destination is not None: + # We have a window into a line. Make sure it's not EOF, and continue as long as it's not + if self._check_eofstack(data, data_start_index, unl_end_index): + # This line is "EOF". This is the end of our dump data! Push the dump data. + self._dump_destination.append(data[dump_data_start:data_start_index]) + + # Also push our line, since we already have it + self._lines.append(data[data_start_index:unl_end_index]) + + # Mark dump complete + self._dump_destination = None + #else: # This line didn't mark the end. Keep going. + else: + # We're not dumping. Just go ahead and push the line + self._lines.append(data[data_start_index:unl_end_index]) + + # Update our iterators + data_start_index = unl_end_index + unl_start_index = BufferedSubFile._find_unl(data, data_start_index) + + # If we're still dumping, push everything that isn't going into the partial to the dump + if self._dump_destination is not None: + # If we're able to safely flush the partial, go ahead and do that too + # We don't care about self._is_dump_midline() here, because data_start_index always represents the + # start of a new line, always + if self._can_dump_partial(data, data_start_index): + self._dump_destination.append(data[dump_data_start:]) + + # We've consumed the partial; flush any partial-related state we may have set + self._dangling_partial = False + return # skip the _partial.append below, because it's already been consumed + else: + self._dump_destination.append(data[dump_data_start:data_start_index]) + + # If we have any partial data leftover, go ahead and set it + if data_start_index < len(data): + self._partial.append(data[data_start_index:]) + + def pushlines(self, lines) -> None: + # This method is not documented on docs.python.org self._lines.extend(lines) def __iter__(self): return self - def __next__(self): + def __next__(self) -> str|object: line = self.readline() if line == '': raise StopIteration return line + def _get_dump(self, start_value:str|None = None): + _dump_destination = deque() + self._dump_destination = _dump_destination + + if start_value: + _dump_destination.append(start_value) + + # Flush our current _lines to _dump_destination + needs_more_data = False + for line in self: + if line is NeedMoreData: + needs_more_data = True + break + _dump_destination.append(line) + + # Pull in more data, if we need more + if needs_more_data: + # Flush our partial, if we can + if self._partial and self._can_dump_partial(self._partial[0]): + assert self._partial[0] # We shouldn't ever push empty strings to _partial + _dump_destination.extend(self._partial) + self._partial.clear() + self._dangling_partial = False + + # Pull in more data until we're told to stop + while not self._closed and self._dump_destination is not None: + yield NeedMoreData + + # Flush our final dump string to _dump_result + self._dump_destination = None + self._dump_result = EMPTYSTRING.join(_dump_destination) + + def _pop_dump(self) -> str: + result = self._dump_result + self._dump_result = None + return result + + @staticmethod + def _find_unl(data, start=0) -> int: + # Like str.find(), but for universal newlines + # Originally, this iterated over the string, however just calling find() twice is drastically faster + # This could be sped up by replacing with a similar function in C, so we don't pass over the string twice. + cr_index = data.find('\r', start) + if cr_index < 0: + return data.find(NL, start) + + nl_index = data.find(NL, start, cr_index) + return nl_index if nl_index >= 0 else cr_index + + @staticmethod + def _find_unl_end(data, start) -> int: + # A helper function which returns the 1-past-the-end index of a universal newline + # This could be sped up by replacing with a similar function in C. + #assert data[start] in '\r\n' + + # \n is always end of line + if data.startswith(NL, start): + return start + 1 + + # \r\n is always end of line + if data.startswith(NL, start + 1): + return start + 2 + + # End of string; we can't know if a \n follows, so no universal line end + if start + 1 >= len(data): + return -1 + + # This is a \r followed by some other non-newline character + return start + 1 + class FeedParser: """A feed-style parser of email.""" @@ -161,23 +461,23 @@ def __init__(self, _factory=None, *, policy=compat32): self._old_style_factory = True self._input = BufferedSubFile() self._msgstack = [] - self._parse = self._parsegen().__next__ + self._parse = self._parsegen().__next__ # Interesting trick which replaces yield values with return values self._cur = None self._last = None self._headersonly = False # Non-public interface for supporting Parser's headersonly flag - def _set_headersonly(self): + def _set_headersonly(self) -> None: self._headersonly = True - def feed(self, data): + def feed(self, data) -> None: """Push more data into the parser.""" self._input.push(data) self._call_parse() - def _call_parse(self): + def _call_parse(self) -> None: try: - self._parse() + self._parse() # Return value is always NeedMoreData or None, but discarded here in either case except StopIteration: pass @@ -194,7 +494,7 @@ def close(self): self.policy.handle_defect(root, defect) return root - def _new_message(self): + def _new_message(self) -> None: if self._old_style_factory: msg = self._factory() else: @@ -215,7 +515,7 @@ def _pop_message(self): self._cur = None return retval - def _parsegen(self): + def _parsegen(self): # yields: NeedMoreData # Create a new message and start by parsing headers. self._new_message() headers = [] @@ -242,16 +542,8 @@ def _parsegen(self): # necessary in the older parser, which could raise errors. All # remaining lines in the input are thrown into the message body. if self._headersonly: - lines = [] - while True: - line = self._input.readline() - if line is NeedMoreData: - yield NeedMoreData - continue - if line == '': - break - lines.append(line) - self._cur.set_payload(EMPTYSTRING.join(lines)) + yield from self._input._get_dump() + self._cur.set_payload(self._input._pop_dump()) return if self._cur.get_content_type() == 'message/delivery-status': # message/delivery-status contains blocks of headers separated by @@ -311,13 +603,8 @@ def _parsegen(self): # defective. defect = errors.NoBoundaryInMultipartDefect() self.policy.handle_defect(self._cur, defect) - lines = [] - for line in self._input: - if line is NeedMoreData: - yield NeedMoreData - continue - lines.append(line) - self._cur.set_payload(EMPTYSTRING.join(lines)) + yield from self._input._get_dump() + self._cur.set_payload(self._input._pop_dump()) return # Make sure a valid content type was specified per RFC 2045:6.4. if (str(self._cur.get('content-transfer-encoding', '8bit')).lower() @@ -329,10 +616,11 @@ def _parsegen(self): # this onto the input stream until we've scanned past the # preamble. separator = '--' + boundary - def boundarymatch(line): - if not line.startswith(separator): + def boundarymatch(line, pos: int = 0, endpos: int = sys.maxsize): + if not line.startswith(separator, pos, endpos): return None - return boundaryendRE.match(line, len(separator)) + return boundaryendRE.match(line, pos + len(separator), endpos) + boundarymatch.is_boundary_match = True capturing_preamble = True preamble = [] linesep = False @@ -424,12 +712,11 @@ def boundarymatch(line): defect = errors.StartBoundaryNotFoundDefect() self.policy.handle_defect(self._cur, defect) self._cur.set_payload(EMPTYSTRING.join(preamble)) - epilogue = [] for line in self._input: if line is NeedMoreData: yield NeedMoreData continue - self._cur.epilogue = EMPTYSTRING.join(epilogue) + self._cur.epilogue = '' return # If we're not processing the preamble, then we might have seen # EOF without seeing that end boundary...that is also a defect. @@ -440,36 +727,29 @@ def boundarymatch(line): # Everything from here to the EOF is epilogue. If the end boundary # ended in a newline, we'll need to make sure the epilogue isn't # None - if linesep: - epilogue = [''] - else: - epilogue = [] - for line in self._input: - if line is NeedMoreData: - yield NeedMoreData - continue - epilogue.append(line) - # Any CRLF at the front of the epilogue is not technically part of - # the epilogue. Also, watch out for an empty string epilogue, - # which means a single newline. - if epilogue: - firstline = epilogue[0] - bolmo = NLCRE_bol.match(firstline) - if bolmo: - epilogue[0] = firstline[len(bolmo.group(0)):] - self._cur.epilogue = EMPTYSTRING.join(epilogue) + first_line = '' + if not linesep: + for line in self._input: + if line is NeedMoreData: + yield NeedMoreData + continue + + first_line = line + if first_line: + bolmo = NLCRE_bol.match(first_line) + if bolmo: + first_line = first_line[len(bolmo.group(0)):] + break + + yield from self._input._get_dump(first_line) + self._cur.epilogue = self._input._pop_dump() return # Otherwise, it's some non-multipart type, so the entire rest of the # file contents becomes the payload. - lines = [] - for line in self._input: - if line is NeedMoreData: - yield NeedMoreData - continue - lines.append(line) - self._cur.set_payload(EMPTYSTRING.join(lines)) + yield from self._input._get_dump() + self._cur.set_payload(self._input._pop_dump()) - def _parse_headers(self, lines): + def _parse_headers(self, lines) -> None: # Passed a list of lines that make up the headers for the current msg lastheader = '' lastvalue = [] @@ -533,5 +813,5 @@ def _parse_headers(self, lines): class BytesFeedParser(FeedParser): """Like FeedParser, but feed accepts bytes.""" - def feed(self, data): + def feed(self, data) -> None: super().feed(data.decode('ascii', 'surrogateescape')) diff --git a/Lib/email/parser.py b/Lib/email/parser.py index 039f03cba74fa0..a182daebec0092 100644 --- a/Lib/email/parser.py +++ b/Lib/email/parser.py @@ -12,6 +12,8 @@ from email.feedparser import FeedParser, BytesFeedParser from email._policybase import compat32 +_FEED_CHUNK_SIZE = 8192 + class Parser: def __init__(self, _class=None, *, policy=compat32): @@ -38,6 +40,18 @@ def __init__(self, _class=None, *, policy=compat32): self._class = _class self.policy = policy + def _parse_chunks(self, chunk_generator, headersonly=False): + """Internal method / implementation detail + + Parses chunks from a chunk generator into a FeedParser, returning the result + """ + feedparser = FeedParser(self._class, policy=self.policy) + if headersonly: + feedparser._set_headersonly() + for data in chunk_generator: + feedparser.feed(data) + return feedparser.close() + def parse(self, fp, headersonly=False): """Create a message structure from the data in a file. @@ -46,12 +60,12 @@ def parse(self, fp, headersonly=False): parsing after reading the headers or not. The default is False, meaning it parses the entire contents of the file. """ - feedparser = FeedParser(self._class, policy=self.policy) - if headersonly: - feedparser._set_headersonly() - while data := fp.read(8192): - feedparser.feed(data) - return feedparser.close() + def _fp_get_chunks(): + while data := fp.read(_FEED_CHUNK_SIZE): + yield data + _chunk_generator = _fp_get_chunks() + + return self._parse_chunks(_chunk_generator, headersonly) def parsestr(self, text, headersonly=False): """Create a message structure from a string. @@ -61,7 +75,12 @@ def parsestr(self, text, headersonly=False): not. The default is False, meaning it parses the entire contents of the file. """ - return self.parse(StringIO(text), headersonly=headersonly) + _chunk_generator = ( + text[offset:offset + _FEED_CHUNK_SIZE] + for offset in range(0, len(text), _FEED_CHUNK_SIZE) + ) + + return self._parse_chunks(_chunk_generator, headersonly) class HeaderParser(Parser): @@ -115,8 +134,12 @@ def parsebytes(self, text, headersonly=False): not. The default is False, meaning it parses the entire contents of the file. """ - text = text.decode('ASCII', errors='surrogateescape') - return self.parser.parsestr(text, headersonly) + _chunk_generator = ( + text[offset:offset + _FEED_CHUNK_SIZE].decode('ASCII', errors='surrogateescape') + for offset in range(0, len(text), _FEED_CHUNK_SIZE) + ) + + return self.parser._parse_chunks(_chunk_generator, headersonly) class BytesHeaderParser(BytesParser): diff --git a/Lib/test/test_email/test_email.py b/Lib/test/test_email/test_email.py index 7b14305f997e5d..b02931e97b7f27 100644 --- a/Lib/test/test_email/test_email.py +++ b/Lib/test/test_email/test_email.py @@ -4600,6 +4600,210 @@ def _idempotent(self, msg, data, unixfrom=False): g.flatten(msg, unixfrom=unixfrom, linesep=self.linesep) self.assertEqual(data, b.getvalue()) +class TestFeedParserTrickle(TestEmailBase): + @staticmethod + def _msgobj_trickle(filename, trickle_size=2, force_linetype="\r\n"): + # Trickle data into the feed parser, one character at a time + with openfile(filename, encoding="utf-8") as fp: + file_str = fp.read() + file_str = file_str.replace("\r\n", "\n").replace("\r", "\n").replace("\n", force_linetype) + + feedparser = FeedParser() + for index in range(0, len(file_str), trickle_size): + feedparser.feed(file_str[index:index + trickle_size]) + return feedparser.close() + + def _validate_msg10_msgobj(self, msg, line_end): + if isinstance(line_end, str): + line_end = line_end.encode() + eq = self.assertEqual + # The outer message is a multipart + eq(msg.get_payload(decode=True), None) + # Subpart 1 is 7bit encoded + eq(msg.get_payload(0).get_payload(decode=True), + b'This is a 7bit encoded message.' + line_end) + # Subpart 2 is quopri + eq(msg.get_payload(1).get_payload(decode=True), + b'\xa1This is a Quoted Printable encoded message!' + line_end) + # Subpart 3 is base64 + eq(msg.get_payload(2).get_payload(decode=True), + b'This is a Base64 encoded message.') + # Subpart 4 is base64 with a trailing newline, which + # used to be stripped (issue 7143). + eq(msg.get_payload(3).get_payload(decode=True), + b'This is a Base64 encoded message.\n') + # Subpart 5 has no Content-Transfer-Encoding: header. + eq(msg.get_payload(4).get_payload(decode=True), + b'This has no Content-Transfer-Encoding: header.' + line_end) + + def test_trickle_1chr_crlf(self): + msg = self._msgobj_trickle('msg_10.txt', 1, '\r\n') + self._validate_msg10_msgobj(msg, '\r\n') + + def test_trickle_1chr_cr(self): + msg = self._msgobj_trickle('msg_10.txt', 1, '\r') + self._validate_msg10_msgobj(msg, '\r') + + def test_trickle_1chr_lf(self): + msg = self._msgobj_trickle('msg_10.txt', 1, '\n') + self._validate_msg10_msgobj(msg, '\n') + + def test_trickle_2chr_crlf(self): + # During initial testing, it was realized that an edge case was missed around dangling newlines. + # This helps test that behavior, as it is not otherwise covered by tests. + msg = self._msgobj_trickle('msg_10.txt', 2, '\r\n') + self._validate_msg10_msgobj(msg, '\r\n') + + def test_trickle_2chr_cr(self): + msg = self._msgobj_trickle('msg_10.txt', 2, '\r') + self._validate_msg10_msgobj(msg, '\r') + + def test_trickle_2chr_lf(self): + msg = self._msgobj_trickle('msg_10.txt', 2, '\n') + self._validate_msg10_msgobj(msg, '\n') + + def test_trickle_3chr_crlf(self): + msg = self._msgobj_trickle('msg_10.txt', 3, '\r\n') + self._validate_msg10_msgobj(msg, '\r\n') + + def test_trickle_3chr_cr(self): + msg = self._msgobj_trickle('msg_10.txt', 3, '\r') + self._validate_msg10_msgobj(msg, '\r') + + def test_trickle_3chr_lf(self): + msg = self._msgobj_trickle('msg_10.txt', 3, '\n') + self._validate_msg10_msgobj(msg, '\n') + + +class TestPeakMemoryUsage(unittest.TestCase): + + maxDiff = None + SMALLER_CHUNK_SIZE = 1024 + + def _msg_bytes(self, filename): + with openfile(filename, 'rb') as fp: + data = fp.read() + return data + + def _make_plaintext_msg_bytes(self, min_size): + # Get msg_01 as our baseline + msg_bytes = self._msg_bytes('msg_01.txt') + if len(msg_bytes) < min_size: + # Make it bigger + msg_bytes = msg_bytes * ((min_size // len(msg_bytes)) + 1) + msg_bytes = msg_bytes[:min_size] # Truncate it to min_size + assert len(msg_bytes) >= min_size + + match = re.search(rb'(\r|\n|\r\n){2}', msg_bytes) + self.assertIsNotNone(match) + expected_payload = msg_bytes[match.end():] + + return msg_bytes, expected_payload + + def _measure_message_from_bytes(self, msg_bytes): + import tracemalloc + + # Call email.message_from_bytes, gathering some memory usage stats in the process + tracemalloc.start() + start_time = time.perf_counter() + msgobj = email.message_from_bytes(msg_bytes, policy=email.policy.default) + end_time = time.perf_counter() + after_bytes, after_peak_bytes = tracemalloc.get_traced_memory() + tracemalloc.stop() + + # "How many bytes did we allocate, that were ultimately discarded?" + peak_overhead = after_peak_bytes - after_bytes + + # "How large was that overhead, relative to the size of the message?" + overhead_ratio = peak_overhead / len(msg_bytes) if len(msg_bytes) > 0 else None + + return msgobj, peak_overhead, overhead_ratio, end_time - start_time + + def _base64_encode(self, bytes_to_encode, one_line=True): + base64_str = base64mime.body_encode(bytes_to_encode) + if one_line: + base64_str = "".join(base64_str.splitlines()) + return base64_str + + _multipart_msg_base = textwrap.dedent("""\ + Date: Wed, 14 Nov 2007 12:56:23 GMT + From: foo@bar.invalid + To: foo@bar.invalid + Subject: Content-Transfer-Encoding: base64 and multipart + MIME-Version: 1.0 + Content-Type: multipart/mixed; boundary="BOUNDARY" + + --BOUNDARY + Content-Type: text/plain + + Test message + + --BOUNDARY + Content-Type: application/octet-stream + Content-Transfer-Encoding: base64 + + {} + --BOUNDARY-- + """) + + def _make_junk_bytes(self, bytes_length): + junk_data = bytearray(bytes_length) + for i in range(len(junk_data)): + junk_data[i] = i % 256 + return bytes(junk_data) + + def _make_junk_base64(self, bytes_length, one_line=True): + junk_bytes = self._make_junk_bytes(bytes_length) + return self._base64_encode(junk_bytes, one_line), junk_bytes + + _LARGE_EMAIL_BYTE_SIZE = 1024*1024*10 # 10 MiB + + def test_message_from_bytes_plaintext(self): + # Generate a 10MiB plaintext email + msg_bytes, expected_payload = self._make_plaintext_msg_bytes(self._LARGE_EMAIL_BYTE_SIZE) + + # Parse it, collecting stats + msgobj, peak_overhead, overhead_ratio, time_taken = self._measure_message_from_bytes(msg_bytes) + + # Verify the message payload/content is correct. + self.assertEqual(msgobj.get_payload(decode=True), expected_payload) + self.assertEqual(msgobj.get_content(), expected_payload.decode()) + + # overhead_ratio at time of writing: 1.0102445602416992 + self.assertLess(overhead_ratio, 1.05) + + def test_message_from_bytes_large_attachment_body_encoded(self): + # Generate a 10 MiB attachment + attachment_base64, attachment_bytes = self._make_junk_base64(self._LARGE_EMAIL_BYTE_SIZE, False) + multipart_msg_bytes = self._multipart_msg_base.format(attachment_base64).encode() + + # Parse it, collecting stats + msgobj, peak_overhead, overhead_ratio, time_taken = self._measure_message_from_bytes(multipart_msg_bytes) + + # Verify the message payload/content is correct. + attachment_msg = msgobj.get_payload(1) + self.assertEqual(attachment_msg.get_content(), attachment_bytes) + self.assertEqual(attachment_msg.get_payload(decode=False), attachment_base64) + + # overhead_ratio at time of writing: 1.0088957315722829 - 85.0565% decrease + self.assertLess(overhead_ratio, 1.05) + + def test_message_from_bytes_large_attachment_one_line(self): + # Generate a 10 MiB attachment + attachment_base64, attachment_bytes = self._make_junk_base64(self._LARGE_EMAIL_BYTE_SIZE, True) + multipart_msg_bytes = self._multipart_msg_base.format(attachment_base64).encode() + + # Parse it, collecting stats + msgobj, peak_overhead, overhead_ratio, time_taken = self._measure_message_from_bytes(multipart_msg_bytes) + + # Verify the message payload/content is correct. + attachment_msg = msgobj.get_payload(1) + self.assertEqual(attachment_msg.get_content(), attachment_bytes) + self.assertEqual(attachment_msg.get_payload(decode=False), attachment_base64) + + # overhead_ratio at time of writing: 1.0077472351610626 - 89.2775% decrease + self.assertLess(overhead_ratio, 1.05) + class TestBytesGeneratorIdempotentNL(BaseTestBytesGeneratorIdempotent, TestIdempotent): diff --git a/Misc/ACKS b/Misc/ACKS index 25542d01de695c..228459f0350b56 100644 --- a/Misc/ACKS +++ b/Misc/ACKS @@ -875,6 +875,7 @@ Jeffrey C. Jacobs Kevin Jacobs Kjetil Jacobsen Shantanu Jain +Jessica A. James Bertrand Janin Geert Jansen Jack Jansen From c2eb551cccda1661b901d3288b3ec83375b3ddd3 Mon Sep 17 00:00:00 2001 From: Jessica James Date: Fri, 18 Apr 2025 18:11:56 -0400 Subject: [PATCH 2/8] Add NEWS entry per bot --- .../Library/2025-04-18-18-11-15.gh-issue-115512.oE6Jkw.rst | 6 ++++++ 1 file changed, 6 insertions(+) create mode 100644 Misc/NEWS.d/next/Library/2025-04-18-18-11-15.gh-issue-115512.oE6Jkw.rst diff --git a/Misc/NEWS.d/next/Library/2025-04-18-18-11-15.gh-issue-115512.oE6Jkw.rst b/Misc/NEWS.d/next/Library/2025-04-18-18-11-15.gh-issue-115512.oE6Jkw.rst new file mode 100644 index 00000000000000..ced37ad143576e --- /dev/null +++ b/Misc/NEWS.d/next/Library/2025-04-18-18-11-15.gh-issue-115512.oE6Jkw.rst @@ -0,0 +1,6 @@ +Substantially improved memory usage and performance when parsing email text +in :mod:`email`. Primarily reduces memory usage in +:func:`email.message_from_bytes`, :func:`email.message_from_string`, +:class:`email.parser.Parser`, :class:`email.parser.ByteParser`, +:class:`email.feedparser.FeedParser`, +:class:`email.feedparser.BytesFeedParser` From d6233e90adce57e04800523b3301734b91a4c65a Mon Sep 17 00:00:00 2001 From: Jessica James Date: Fri, 18 Apr 2025 18:21:13 -0400 Subject: [PATCH 3/8] Correct class references --- .../Library/2025-04-18-18-11-15.gh-issue-115512.oE6Jkw.rst | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/Misc/NEWS.d/next/Library/2025-04-18-18-11-15.gh-issue-115512.oE6Jkw.rst b/Misc/NEWS.d/next/Library/2025-04-18-18-11-15.gh-issue-115512.oE6Jkw.rst index ced37ad143576e..80fd2f47385cc3 100644 --- a/Misc/NEWS.d/next/Library/2025-04-18-18-11-15.gh-issue-115512.oE6Jkw.rst +++ b/Misc/NEWS.d/next/Library/2025-04-18-18-11-15.gh-issue-115512.oE6Jkw.rst @@ -1,6 +1,5 @@ Substantially improved memory usage and performance when parsing email text in :mod:`email`. Primarily reduces memory usage in :func:`email.message_from_bytes`, :func:`email.message_from_string`, -:class:`email.parser.Parser`, :class:`email.parser.ByteParser`, -:class:`email.feedparser.FeedParser`, -:class:`email.feedparser.BytesFeedParser` +:class:`email.parser.Parser`, :class:`email.parser.BytesParser`, +:class:`email.parser.FeedParser`, :class:`email.parser.BytesFeedParser` From 530f6d442dae842a08f8a4c5a06428db13533d8b Mon Sep 17 00:00:00 2001 From: Jessica James Date: Fri, 18 Apr 2025 19:31:48 -0400 Subject: [PATCH 4/8] Comment: remove annotations --- Lib/email/feedparser.py | 58 ++++++++++++++++++++--------------------- 1 file changed, 29 insertions(+), 29 deletions(-) diff --git a/Lib/email/feedparser.py b/Lib/email/feedparser.py index 26c226f589c4a0..ef4ebac82746e1 100644 --- a/Lib/email/feedparser.py +++ b/Lib/email/feedparser.py @@ -53,30 +53,30 @@ class BufferedSubFile(object): simple abstraction -- it parses until EOF closes the current message. """ def __init__(self): - self._partial: list[str] = [] - self._dangling_partial: bool = False + self._partial = [] + self._dangling_partial = False # A deque of full, pushed lines - self._lines: deque[str] = deque() + self._lines = deque() # The stack of false-EOF checking predicates. self._eofstack = [] # A flag indicating whether the file has been closed or not. - self._closed: bool = False - self._dump_destination: deque[str]|None = None - self._dump_result: str|None = None + self._closed = False + self._dump_destination = None + self._dump_result = None - def push_eof_matcher(self, pred) -> None: + def push_eof_matcher(self, pred): self._eofstack.append(pred) def pop_eof_matcher(self): return self._eofstack.pop() - def close(self) -> None: + def close(self): # Don't forget any trailing partial line. if self._partial: self._flush_partial() self._closed = True - def readline(self) -> str|object: + def readline(self): if not self._lines: if self._closed: return '' @@ -94,7 +94,7 @@ def readline(self) -> str|object: return line - def _check_eofstack(self, data, start=0, end=sys.maxsize) -> bool: + def _check_eofstack(self, data, start=0, end=sys.maxsize): for ateof in reversed(self._eofstack): if ateof(data, start, end): # We're at the false EOF. @@ -102,12 +102,12 @@ def _check_eofstack(self, data, start=0, end=sys.maxsize) -> bool: return False - def unreadline(self, line) -> None: + def unreadline(self, line): # Let the consumer push a line back into the buffer. assert line is not NeedMoreData self._lines.appendleft(line) - def _flush_partial(self) -> None: + def _flush_partial(self): line = EMPTYSTRING.join(self._partial) if not line: pass @@ -125,7 +125,7 @@ def _flush_partial(self) -> None: self._partial.clear() self._dangling_partial = False - def push(self, data) -> None: + def push(self, data): """Push some new data into this object.""" if not data: return @@ -137,7 +137,7 @@ def push(self, data) -> None: self._push_data(data) - def _can_dump_data(self, data) -> bool: + def _can_dump_data(self, data): if self._dump_destination is None: # We're not dumping data return False @@ -170,7 +170,7 @@ def _can_dump_data(self, data) -> bool: # We're still dumping, but there's a potential boundary marker or EOF or similar issue. Force a proper parse. return False - def _can_dump_partial(self, line, start: int=0, end: int=sys.maxsize) -> bool: + def _can_dump_partial(self, line, start=0, end=sys.maxsize): # Very similar to _can_dump_data above, except we can make some additional assumptions for partials/lines. # This should only ever be checked when we have a new partial line, in which case we have no partial, # or when checking the partial itself, in which case it'll always be the first part @@ -207,7 +207,7 @@ def _is_dump_midline(self): assert self._dump_destination[-1] # Never push empty strings to _dump_destination return self._dump_destination[-1][-1] not in ('\n', '\r') - def _push_data(self, data) -> None: + def _push_data(self, data): # Find first newline character in the data unl_start_index = BufferedSubFile._find_unl(data) if unl_start_index < 0: @@ -268,7 +268,7 @@ def _push_data(self, data) -> None: # unl_start_index is an index which points to the start of the next newline character, if there is one self._push_data_no_partial(data, data_start_index, unl_start_index) - def _push_data_no_partial(self, data, data_start_index: int, unl_start_index: int) -> None: + def _push_data_no_partial(self, data, data_start_index, unl_start_index): # _partial is now guaranteed to point to be empty # data_start_index is an index which points to the start of the next line # unl_start_index is an index which points to the start of the next newline character, if there is one @@ -349,14 +349,14 @@ def _push_data_no_partial(self, data, data_start_index: int, unl_start_index: in if data_start_index < len(data): self._partial.append(data[data_start_index:]) - def pushlines(self, lines) -> None: + def pushlines(self, lines): # This method is not documented on docs.python.org self._lines.extend(lines) def __iter__(self): return self - def __next__(self) -> str|object: + def __next__(self): line = self.readline() if line == '': raise StopIteration @@ -394,13 +394,13 @@ def _get_dump(self, start_value:str|None = None): self._dump_destination = None self._dump_result = EMPTYSTRING.join(_dump_destination) - def _pop_dump(self) -> str: + def _pop_dump(self): result = self._dump_result self._dump_result = None return result @staticmethod - def _find_unl(data, start=0) -> int: + def _find_unl(data, start=0): # Like str.find(), but for universal newlines # Originally, this iterated over the string, however just calling find() twice is drastically faster # This could be sped up by replacing with a similar function in C, so we don't pass over the string twice. @@ -412,7 +412,7 @@ def _find_unl(data, start=0) -> int: return nl_index if nl_index >= 0 else cr_index @staticmethod - def _find_unl_end(data, start) -> int: + def _find_unl_end(data, start): # A helper function which returns the 1-past-the-end index of a universal newline # This could be sped up by replacing with a similar function in C. #assert data[start] in '\r\n' @@ -467,15 +467,15 @@ def __init__(self, _factory=None, *, policy=compat32): self._headersonly = False # Non-public interface for supporting Parser's headersonly flag - def _set_headersonly(self) -> None: + def _set_headersonly(self): self._headersonly = True - def feed(self, data) -> None: + def feed(self, data): """Push more data into the parser.""" self._input.push(data) self._call_parse() - def _call_parse(self) -> None: + def _call_parse(self): try: self._parse() # Return value is always NeedMoreData or None, but discarded here in either case except StopIteration: @@ -494,7 +494,7 @@ def close(self): self.policy.handle_defect(root, defect) return root - def _new_message(self) -> None: + def _new_message(self): if self._old_style_factory: msg = self._factory() else: @@ -616,7 +616,7 @@ def _parsegen(self): # yields: NeedMoreData # this onto the input stream until we've scanned past the # preamble. separator = '--' + boundary - def boundarymatch(line, pos: int = 0, endpos: int = sys.maxsize): + def boundarymatch(line, pos = 0, endpos = sys.maxsize): if not line.startswith(separator, pos, endpos): return None return boundaryendRE.match(line, pos + len(separator), endpos) @@ -749,7 +749,7 @@ def boundarymatch(line, pos: int = 0, endpos: int = sys.maxsize): yield from self._input._get_dump() self._cur.set_payload(self._input._pop_dump()) - def _parse_headers(self, lines) -> None: + def _parse_headers(self, lines): # Passed a list of lines that make up the headers for the current msg lastheader = '' lastvalue = [] @@ -813,5 +813,5 @@ def _parse_headers(self, lines) -> None: class BytesFeedParser(FeedParser): """Like FeedParser, but feed accepts bytes.""" - def feed(self, data) -> None: + def feed(self, data): super().feed(data.decode('ascii', 'surrogateescape')) From 13ebb398c80c93372fbea31c85f5347d1e5a9dba Mon Sep 17 00:00:00 2001 From: Jessica James Date: Sat, 19 Apr 2025 13:21:26 -0400 Subject: [PATCH 5/8] Comment: 80-rule, remove some comments --- Lib/email/feedparser.py | 158 +++++++++++++++++++--------------------- Lib/email/parser.py | 5 +- 2 files changed, 77 insertions(+), 86 deletions(-) diff --git a/Lib/email/feedparser.py b/Lib/email/feedparser.py index ef4ebac82746e1..9926770d53e047 100644 --- a/Lib/email/feedparser.py +++ b/Lib/email/feedparser.py @@ -112,10 +112,10 @@ def _flush_partial(self): if not line: pass elif self._dump_destination is None: - # We're not dumping data. Just flush the partial to lines, as normal + # We're not dumping data. Just flush the partial to lines self._lines.append(line) elif self._check_eofstack(line): - # We were dumping, but we've now reached the end of the dump. Push our line and stop dumping. + # We were dumping, but we've now reached the end of the dump. self._dump_destination = None self._lines.append(line) else: @@ -130,7 +130,6 @@ def push(self, data): if not data: return - # If we're dumping, and we don't have anything that will ever tell us to terminate, simply dump everything if self._can_dump_data(data): self._dump_destination.append(data) return @@ -139,12 +138,11 @@ def push(self, data): def _can_dump_data(self, data): if self._dump_destination is None: - # We're not dumping data return False # We're dumping; check for easy optimizations if not self._eofstack: - # There's nothing that will ever tell us to stop dumping. Go ahead and dump the entire `data` object. + # There's nothing that will ever tell us to stop dumping. # This does absolute wonders for large non-multipart emails. assert not self._lines assert not self._dangling_partial @@ -155,56 +153,44 @@ def _can_dump_data(self, data): if self._partial: return False - all_boundary_matches = True for pred in self._eofstack: if not hasattr(pred, 'is_boundary_match'): - all_boundary_matches = False - break - - if all_boundary_matches and '-' not in data: - # We eventually need to stop, but we only care about boundary matches, and there's no boundaries - # here. Dump the entire `data` object. This does wonders for multipart emails with large parts. - assert not self._lines - return True - - # We're still dumping, but there's a potential boundary marker or EOF or similar issue. Force a proper parse. - return False + # We can't blindly dump entire chunks, if we're interested in + # more than just boundaries + return False + + # We only care about boundaries; we can dump as long as there's no + # potential boundaries. + return '-' not in data def _can_dump_partial(self, line, start=0, end=sys.maxsize): - # Very similar to _can_dump_data above, except we can make some additional assumptions for partials/lines. - # This should only ever be checked when we have a new partial line, in which case we have no partial, - # or when checking the partial itself, in which case it'll always be the first part + # Very similar to _can_dump_data above, except we can make some + # additional assumptions for partials/lines. assert not self._partial or line is self._partial[0] if self._dump_destination is None: - # We're not dumping data return False - # We're dumping. There should be absolutely no other pending lines, because those should've been dumped. + # We're dumping. There should be absolutely no other pending lines, + # because those should've been dumped. assert not self._lines if not self._eofstack: - # There's nothing that will ever tell us to stop dumping. Dump away. + # There's nothing that will ever tell us to stop dumping. Dump away return True all_boundary_matches = True for pred in self._eofstack: if not hasattr(pred, 'is_boundary_match'): - all_boundary_matches = False - break - - if all_boundary_matches and not line.startswith("-", start, end): - # We eventually need to stop, but we only care about boundary matches, and there's no boundaries - # here. Dump the entire `data` object. This does wonders for multipart emails with large parts. - return True + return False - # We're still dumping, but there's a potential boundary marker or EOF or similar issue. Force a proper parse. - return False + # We only care about boundaries; we can dump as long as there's no + # potential boundaries. + return not line.startswith("-", start, end) def _is_dump_midline(self): if not self._dump_destination: return False - assert self._dump_destination[-1] # Never push empty strings to _dump_destination return self._dump_destination[-1][-1] not in ('\n', '\r') def _push_data(self, data): @@ -214,8 +200,9 @@ def _push_data(self, data): # No new complete lines, wait for more. # Check to see if we had a previous dangling partial newline if self._dangling_partial: - # We previously pushed a dangling line expecting a \n to follow, however we received other data instead. - # Therefore, that \r does actually terminate a line. Go ahead and push it. + # We previously pushed a dangling line expecting \n to follow, + # however we received other data instead. Therefore, that \r + # does actually terminate a line. Go ahead and push it. self._flush_partial() # No lines in data to push; wait for more data @@ -230,24 +217,23 @@ def _push_data(self, data): # Complete our previous/partial line if self._partial: - # Check to see if we had any dangling newlines in our partial, and handle if appropriate if self._dangling_partial: - # We had a previously dangling line; this is either a \n (completion), or some other char (termination) if data[0] != NL: - # "\r" -- push what we had, as it has been terminated; data_start_index = 0 + # "\r" -- push what we had, it's been terminated self._flush_partial() else: - # "\r\n" -- append \n and push it; data_start_index = 1 + # "\r\n" -- append \n to complete it and push self._partial.append(NL) self._flush_partial() data_start_index = 1 # Find the next newline - unl_start_index = BufferedSubFile._find_unl(data, data_start_index) - # Fall through + unl_start_index = BufferedSubFile._find_unl( + data, data_start_index) else: - # Our partial has no dangling newline; complete our partial with the new line and push it - unl_end_index = BufferedSubFile._find_unl_end(data, unl_start_index) + # Complete our partial with the new line and push it + unl_end_index = BufferedSubFile._find_unl_end( + data, unl_start_index) if unl_end_index < 0: # The newline is incomplete; append data and return self._partial.append(data) @@ -260,36 +246,37 @@ def _push_data(self, data): data_start_index = unl_end_index # Find the next newline - unl_start_index = BufferedSubFile._find_unl(data, data_start_index) - # Fall through + unl_start_index = BufferedSubFile._find_unl( + data, data_start_index) # _partial is now guaranteed to point to be empty - # data_start_index is an index which points to the start of the next line - # unl_start_index is an index which points to the start of the next newline character, if there is one + # data_start_index is an index which points to the start of next line + # unl_start_index is the start of the next newline character, or -1 self._push_data_no_partial(data, data_start_index, unl_start_index) def _push_data_no_partial(self, data, data_start_index, unl_start_index): - # _partial is now guaranteed to point to be empty - # data_start_index is an index which points to the start of the next line - # unl_start_index is an index which points to the start of the next newline character, if there is one - # Process any remaining whole lines in data if unl_start_index < 0: # Push right to the partial if there's no lines if data_start_index < len(data): assert data_start_index >= 0 partial_line = data[data_start_index:] - if self._is_dump_midline() or self._can_dump_partial(partial_line): + if self._is_dump_midline() \ + or self._can_dump_partial(partial_line): self._dump_destination.append(partial_line) else: self._partial = [partial_line] if data[-1] == '\r': self._dangling_partial = True - elif self._dump_destination is None and unl_start_index < len(data) // 2: - # If it looks like we're going to be doing a lot of splits/joins, just go ahead and use StringIO, for speed - # If we had some sort of "StringViewIO" to avoid the copy, this would be significantly more efficient - # This code block, and the "else" code block below, functionally do the exact same thing, except this path - # makes no attempt to handle dumping data + elif self._dump_destination is None \ + and unl_start_index < len(data) // 2: + # If it looks like we're going to be doing a lot of splits/joins, + # just go ahead and use StringIO, for speed + # If we had some sort of "StringViewIO" to avoid the copy, this + # would be significantly more efficient + # This code block, and the "else" code block below, functionally do + # the exact same thing, except this path makes no attempt to handle + # dumping data sio = StringIO(data, '') sio.seek(data_start_index) lines = sio.readlines() @@ -301,26 +288,28 @@ def _push_data_no_partial(self, data, data_start_index, unl_start_index): self.pushlines(lines) else: - # If we're not, let's keep it in Python - dump_data_start = None if self._dump_destination is None else data_start_index + dump_data_start = None if self._dump_destination is None \ + else data_start_index while unl_start_index >= 0: - unl_end_index = BufferedSubFile._find_unl_end(data, unl_start_index) + unl_end_index = BufferedSubFile._find_unl_end( + data, unl_start_index) if unl_end_index < 0: - # Incomplete line ending; break to update our partial and return + # Incomplete line ending; break to just update our partial self._dangling_partial = True break # We have an easy line; push it if self._dump_destination is not None: - # We have a window into a line. Make sure it's not EOF, and continue as long as it's not - if self._check_eofstack(data, data_start_index, unl_end_index): - # This line is "EOF". This is the end of our dump data! Push the dump data. - self._dump_destination.append(data[dump_data_start:data_start_index]) + # We have a window into a line. Make sure it's not EOF + if self._check_eofstack( + data, data_start_index, unl_end_index): + # This line is "EOF". This is the end of our dump data + self._dump_destination.append( + data[dump_data_start:data_start_index]) # Also push our line, since we already have it - self._lines.append(data[data_start_index:unl_end_index]) - - # Mark dump complete + self._lines.append( + data[data_start_index:unl_end_index]) self._dump_destination = None #else: # This line didn't mark the end. Keep going. else: @@ -329,21 +318,23 @@ def _push_data_no_partial(self, data, data_start_index, unl_start_index): # Update our iterators data_start_index = unl_end_index - unl_start_index = BufferedSubFile._find_unl(data, data_start_index) + unl_start_index = BufferedSubFile._find_unl( + data, data_start_index) - # If we're still dumping, push everything that isn't going into the partial to the dump if self._dump_destination is not None: - # If we're able to safely flush the partial, go ahead and do that too - # We don't care about self._is_dump_midline() here, because data_start_index always represents the - # start of a new line, always + # Push everything that isn't going into the partial to the dump + # If we're able to safely flush the partial, do that too + # We don't care about self._is_dump_midline() here, because + # data_start_index always represents the start of a new line if self._can_dump_partial(data, data_start_index): self._dump_destination.append(data[dump_data_start:]) - # We've consumed the partial; flush any partial-related state we may have set + # Flush any partial-related state we may have set self._dangling_partial = False - return # skip the _partial.append below, because it's already been consumed + return # skip the _partial.append below else: - self._dump_destination.append(data[dump_data_start:data_start_index]) + self._dump_destination.append( + data[dump_data_start:data_start_index]) # If we have any partial data leftover, go ahead and set it if data_start_index < len(data): @@ -381,7 +372,6 @@ def _get_dump(self, start_value:str|None = None): if needs_more_data: # Flush our partial, if we can if self._partial and self._can_dump_partial(self._partial[0]): - assert self._partial[0] # We shouldn't ever push empty strings to _partial _dump_destination.extend(self._partial) self._partial.clear() self._dangling_partial = False @@ -402,8 +392,9 @@ def _pop_dump(self): @staticmethod def _find_unl(data, start=0): # Like str.find(), but for universal newlines - # Originally, this iterated over the string, however just calling find() twice is drastically faster - # This could be sped up by replacing with a similar function in C, so we don't pass over the string twice. + # Originally, this iterated over the string, however this is faster + # This could be sped up by replacing with a similar function in C, + # so we don't pass over the string twice. cr_index = data.find('\r', start) if cr_index < 0: return data.find(NL, start) @@ -413,9 +404,8 @@ def _find_unl(data, start=0): @staticmethod def _find_unl_end(data, start): - # A helper function which returns the 1-past-the-end index of a universal newline + # Returns the 1-past-the-end index of a universal newline # This could be sped up by replacing with a similar function in C. - #assert data[start] in '\r\n' # \n is always end of line if data.startswith(NL, start): @@ -425,7 +415,7 @@ def _find_unl_end(data, start): if data.startswith(NL, start + 1): return start + 2 - # End of string; we can't know if a \n follows, so no universal line end + # End of data; we can't know if a \n follows, so no universal line end if start + 1 >= len(data): return -1 @@ -461,7 +451,7 @@ def __init__(self, _factory=None, *, policy=compat32): self._old_style_factory = True self._input = BufferedSubFile() self._msgstack = [] - self._parse = self._parsegen().__next__ # Interesting trick which replaces yield values with return values + self._parse = self._parsegen().__next__ self._cur = None self._last = None self._headersonly = False @@ -477,7 +467,7 @@ def feed(self, data): def _call_parse(self): try: - self._parse() # Return value is always NeedMoreData or None, but discarded here in either case + self._parse() except StopIteration: pass diff --git a/Lib/email/parser.py b/Lib/email/parser.py index a182daebec0092..2914fac09894be 100644 --- a/Lib/email/parser.py +++ b/Lib/email/parser.py @@ -43,7 +43,7 @@ def __init__(self, _class=None, *, policy=compat32): def _parse_chunks(self, chunk_generator, headersonly=False): """Internal method / implementation detail - Parses chunks from a chunk generator into a FeedParser, returning the result + Parses chunks from a chunk generator into a FeedParser """ feedparser = FeedParser(self._class, policy=self.policy) if headersonly: @@ -135,7 +135,8 @@ def parsebytes(self, text, headersonly=False): the file. """ _chunk_generator = ( - text[offset:offset + _FEED_CHUNK_SIZE].decode('ASCII', errors='surrogateescape') + text[offset:offset + _FEED_CHUNK_SIZE].decode( + 'ASCII', errors='surrogateescape') for offset in range(0, len(text), _FEED_CHUNK_SIZE) ) From 2f6002eb253197556f5ddb17577210960207049a Mon Sep 17 00:00:00 2001 From: Jessica James Date: Sat, 19 Apr 2025 13:32:01 -0400 Subject: [PATCH 6/8] Comment: Remove TestPeakMemoryUsage --- Lib/test/test_email/test_email.py | 142 +----------------------------- 1 file changed, 2 insertions(+), 140 deletions(-) diff --git a/Lib/test/test_email/test_email.py b/Lib/test/test_email/test_email.py index b02931e97b7f27..abfa3f7ddfaa73 100644 --- a/Lib/test/test_email/test_email.py +++ b/Lib/test/test_email/test_email.py @@ -4606,7 +4606,8 @@ def _msgobj_trickle(filename, trickle_size=2, force_linetype="\r\n"): # Trickle data into the feed parser, one character at a time with openfile(filename, encoding="utf-8") as fp: file_str = fp.read() - file_str = file_str.replace("\r\n", "\n").replace("\r", "\n").replace("\n", force_linetype) + file_str = file_str.replace("\r\n", "\n").replace("\r", "\n") \ + .replace("\n", force_linetype) feedparser = FeedParser() for index in range(0, len(file_str), trickle_size): @@ -4617,22 +4618,15 @@ def _validate_msg10_msgobj(self, msg, line_end): if isinstance(line_end, str): line_end = line_end.encode() eq = self.assertEqual - # The outer message is a multipart eq(msg.get_payload(decode=True), None) - # Subpart 1 is 7bit encoded eq(msg.get_payload(0).get_payload(decode=True), b'This is a 7bit encoded message.' + line_end) - # Subpart 2 is quopri eq(msg.get_payload(1).get_payload(decode=True), b'\xa1This is a Quoted Printable encoded message!' + line_end) - # Subpart 3 is base64 eq(msg.get_payload(2).get_payload(decode=True), b'This is a Base64 encoded message.') - # Subpart 4 is base64 with a trailing newline, which - # used to be stripped (issue 7143). eq(msg.get_payload(3).get_payload(decode=True), b'This is a Base64 encoded message.\n') - # Subpart 5 has no Content-Transfer-Encoding: header. eq(msg.get_payload(4).get_payload(decode=True), b'This has no Content-Transfer-Encoding: header.' + line_end) @@ -4649,8 +4643,6 @@ def test_trickle_1chr_lf(self): self._validate_msg10_msgobj(msg, '\n') def test_trickle_2chr_crlf(self): - # During initial testing, it was realized that an edge case was missed around dangling newlines. - # This helps test that behavior, as it is not otherwise covered by tests. msg = self._msgobj_trickle('msg_10.txt', 2, '\r\n') self._validate_msg10_msgobj(msg, '\r\n') @@ -4675,136 +4667,6 @@ def test_trickle_3chr_lf(self): self._validate_msg10_msgobj(msg, '\n') -class TestPeakMemoryUsage(unittest.TestCase): - - maxDiff = None - SMALLER_CHUNK_SIZE = 1024 - - def _msg_bytes(self, filename): - with openfile(filename, 'rb') as fp: - data = fp.read() - return data - - def _make_plaintext_msg_bytes(self, min_size): - # Get msg_01 as our baseline - msg_bytes = self._msg_bytes('msg_01.txt') - if len(msg_bytes) < min_size: - # Make it bigger - msg_bytes = msg_bytes * ((min_size // len(msg_bytes)) + 1) - msg_bytes = msg_bytes[:min_size] # Truncate it to min_size - assert len(msg_bytes) >= min_size - - match = re.search(rb'(\r|\n|\r\n){2}', msg_bytes) - self.assertIsNotNone(match) - expected_payload = msg_bytes[match.end():] - - return msg_bytes, expected_payload - - def _measure_message_from_bytes(self, msg_bytes): - import tracemalloc - - # Call email.message_from_bytes, gathering some memory usage stats in the process - tracemalloc.start() - start_time = time.perf_counter() - msgobj = email.message_from_bytes(msg_bytes, policy=email.policy.default) - end_time = time.perf_counter() - after_bytes, after_peak_bytes = tracemalloc.get_traced_memory() - tracemalloc.stop() - - # "How many bytes did we allocate, that were ultimately discarded?" - peak_overhead = after_peak_bytes - after_bytes - - # "How large was that overhead, relative to the size of the message?" - overhead_ratio = peak_overhead / len(msg_bytes) if len(msg_bytes) > 0 else None - - return msgobj, peak_overhead, overhead_ratio, end_time - start_time - - def _base64_encode(self, bytes_to_encode, one_line=True): - base64_str = base64mime.body_encode(bytes_to_encode) - if one_line: - base64_str = "".join(base64_str.splitlines()) - return base64_str - - _multipart_msg_base = textwrap.dedent("""\ - Date: Wed, 14 Nov 2007 12:56:23 GMT - From: foo@bar.invalid - To: foo@bar.invalid - Subject: Content-Transfer-Encoding: base64 and multipart - MIME-Version: 1.0 - Content-Type: multipart/mixed; boundary="BOUNDARY" - - --BOUNDARY - Content-Type: text/plain - - Test message - - --BOUNDARY - Content-Type: application/octet-stream - Content-Transfer-Encoding: base64 - - {} - --BOUNDARY-- - """) - - def _make_junk_bytes(self, bytes_length): - junk_data = bytearray(bytes_length) - for i in range(len(junk_data)): - junk_data[i] = i % 256 - return bytes(junk_data) - - def _make_junk_base64(self, bytes_length, one_line=True): - junk_bytes = self._make_junk_bytes(bytes_length) - return self._base64_encode(junk_bytes, one_line), junk_bytes - - _LARGE_EMAIL_BYTE_SIZE = 1024*1024*10 # 10 MiB - - def test_message_from_bytes_plaintext(self): - # Generate a 10MiB plaintext email - msg_bytes, expected_payload = self._make_plaintext_msg_bytes(self._LARGE_EMAIL_BYTE_SIZE) - - # Parse it, collecting stats - msgobj, peak_overhead, overhead_ratio, time_taken = self._measure_message_from_bytes(msg_bytes) - - # Verify the message payload/content is correct. - self.assertEqual(msgobj.get_payload(decode=True), expected_payload) - self.assertEqual(msgobj.get_content(), expected_payload.decode()) - - # overhead_ratio at time of writing: 1.0102445602416992 - self.assertLess(overhead_ratio, 1.05) - - def test_message_from_bytes_large_attachment_body_encoded(self): - # Generate a 10 MiB attachment - attachment_base64, attachment_bytes = self._make_junk_base64(self._LARGE_EMAIL_BYTE_SIZE, False) - multipart_msg_bytes = self._multipart_msg_base.format(attachment_base64).encode() - - # Parse it, collecting stats - msgobj, peak_overhead, overhead_ratio, time_taken = self._measure_message_from_bytes(multipart_msg_bytes) - - # Verify the message payload/content is correct. - attachment_msg = msgobj.get_payload(1) - self.assertEqual(attachment_msg.get_content(), attachment_bytes) - self.assertEqual(attachment_msg.get_payload(decode=False), attachment_base64) - - # overhead_ratio at time of writing: 1.0088957315722829 - 85.0565% decrease - self.assertLess(overhead_ratio, 1.05) - - def test_message_from_bytes_large_attachment_one_line(self): - # Generate a 10 MiB attachment - attachment_base64, attachment_bytes = self._make_junk_base64(self._LARGE_EMAIL_BYTE_SIZE, True) - multipart_msg_bytes = self._multipart_msg_base.format(attachment_base64).encode() - - # Parse it, collecting stats - msgobj, peak_overhead, overhead_ratio, time_taken = self._measure_message_from_bytes(multipart_msg_bytes) - - # Verify the message payload/content is correct. - attachment_msg = msgobj.get_payload(1) - self.assertEqual(attachment_msg.get_content(), attachment_bytes) - self.assertEqual(attachment_msg.get_payload(decode=False), attachment_base64) - - # overhead_ratio at time of writing: 1.0077472351610626 - 89.2775% decrease - self.assertLess(overhead_ratio, 1.05) - - class TestBytesGeneratorIdempotentNL(BaseTestBytesGeneratorIdempotent, TestIdempotent): linesep = '\n' From 4fa675568447e75ff23a02bdd06c2b6cce752d8f Mon Sep 17 00:00:00 2001 From: Jessica James Date: Sat, 19 Apr 2025 13:41:09 -0400 Subject: [PATCH 7/8] Lint: Remove spaces --- Lib/email/feedparser.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Lib/email/feedparser.py b/Lib/email/feedparser.py index 9926770d53e047..1f4204a10426e8 100644 --- a/Lib/email/feedparser.py +++ b/Lib/email/feedparser.py @@ -158,7 +158,7 @@ def _can_dump_data(self, data): # We can't blindly dump entire chunks, if we're interested in # more than just boundaries return False - + # We only care about boundaries; we can dump as long as there's no # potential boundaries. return '-' not in data From 4f3622733d274169c9582aa7bf2ce4af942f7cd8 Mon Sep 17 00:00:00 2001 From: Jessica James Date: Sat, 19 Apr 2025 15:47:10 -0400 Subject: [PATCH 8/8] Missed a comment which served as an annotation --- Lib/email/feedparser.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Lib/email/feedparser.py b/Lib/email/feedparser.py index 1f4204a10426e8..af09c560edd948 100644 --- a/Lib/email/feedparser.py +++ b/Lib/email/feedparser.py @@ -505,7 +505,7 @@ def _pop_message(self): self._cur = None return retval - def _parsegen(self): # yields: NeedMoreData + def _parsegen(self): # Create a new message and start by parsing headers. self._new_message() headers = []