diff options
author | R. David Murray <rdmurray@bitdance.com> | 2010-10-08 15:55:28 (GMT) |
---|---|---|
committer | R. David Murray <rdmurray@bitdance.com> | 2010-10-08 15:55:28 (GMT) |
commit | 96fd54eaec700cc50e5960f45ee79bc25c2c48c5 (patch) | |
tree | 4e4fc3f48d8957b6b0fccc372410e8374ce4fb70 /Lib | |
parent | 59fdd6736bbf1ba14083a4bb777abaefc364f876 (diff) | |
download | cpython-96fd54eaec700cc50e5960f45ee79bc25c2c48c5.zip cpython-96fd54eaec700cc50e5960f45ee79bc25c2c48c5.tar.gz cpython-96fd54eaec700cc50e5960f45ee79bc25c2c48c5.tar.bz2 |
#4661: add bytes parsing and generation to email (email version bump to 5.1.0)
The work on this is not 100% complete, but everything is present to
allow real-world testing of the code. The only remaining major todo
item is to (hopefully!) enhance the handling of non-ASCII bytes in headers
converted to unicode by RFC2047 encoding them rather than replacing them with
'?'s.
Diffstat (limited to 'Lib')
-rw-r--r-- | Lib/email/__init__.py | 19 | ||||
-rw-r--r-- | Lib/email/feedparser.py | 7 | ||||
-rw-r--r-- | Lib/email/generator.py | 186 | ||||
-rw-r--r-- | Lib/email/message.py | 98 | ||||
-rw-r--r-- | Lib/email/parser.py | 46 | ||||
-rw-r--r-- | Lib/email/test/test_email.py | 272 |
6 files changed, 554 insertions, 74 deletions
diff --git a/Lib/email/__init__.py b/Lib/email/__init__.py index 8702212..c54a2c7 100644 --- a/Lib/email/__init__.py +++ b/Lib/email/__init__.py @@ -4,7 +4,7 @@ """A package for parsing, handling, and generating email messages.""" -__version__ = '5.0.0' +__version__ = '5.1.0' __all__ = [ 'base64mime', @@ -16,7 +16,9 @@ __all__ = [ 'iterators', 'message', 'message_from_file', + 'message_from_binary_file', 'message_from_string', + 'message_from_bytes', 'mime', 'parser', 'quoprimime', @@ -36,6 +38,13 @@ def message_from_string(s, *args, **kws): from email.parser import Parser return Parser(*args, **kws).parsestr(s) +def message_from_bytes(s, *args, **kws): + """Parse a bytes string into a Message object model. + + Optional _class and strict are passed to the Parser constructor. + """ + from email.parser import BytesParser + return BytesParser(*args, **kws).parsebytes(s) def message_from_file(fp, *args, **kws): """Read a file and parse its contents into a Message object model. @@ -44,3 +53,11 @@ def message_from_file(fp, *args, **kws): """ from email.parser import Parser return Parser(*args, **kws).parse(fp) + +def message_from_binary_file(fp, *args, **kws): + """Read a binary file and parse its contents into a Message object model. + + Optional _class and strict are passed to the Parser constructor. + """ + from email.parser import Parser + return BytesParser(*args, **kws).parse(fp) diff --git a/Lib/email/feedparser.py b/Lib/email/feedparser.py index 8db70b3..de8750d 100644 --- a/Lib/email/feedparser.py +++ b/Lib/email/feedparser.py @@ -482,3 +482,10 @@ class FeedParser: if lastheader: # XXX reconsider the joining of folded lines self._cur[lastheader] = EMPTYSTRING.join(lastvalue).rstrip('\r\n') + + +class BytesFeedParser(FeedParser): + """Like FeedParser, but feed accepts bytes.""" + + def feed(self, data): + super().feed(data.decode('ascii', 'surrogateescape')) diff --git a/Lib/email/generator.py b/Lib/email/generator.py index e05b67d..40b95c4 100644 --- a/Lib/email/generator.py +++ b/Lib/email/generator.py @@ -12,8 +12,9 @@ import time import random import warnings -from io import StringIO +from io import StringIO, BytesIO from email.header import Header +from email.message import _has_surrogates UNDERSCORE = '_' NL = '\n' @@ -72,7 +73,7 @@ class Generator: ufrom = msg.get_unixfrom() if not ufrom: ufrom = 'From nobody ' + time.ctime(time.time()) - print(ufrom, file=self._fp) + self.write(ufrom + NL) self._write(msg) def clone(self, fp): @@ -83,6 +84,29 @@ class Generator: # Protected interface - undocumented ;/ # + # Note that we use 'self.write' when what we are writing is coming from + # the source, and self._fp.write when what we are writing is coming from a + # buffer (because the Bytes subclass has already had a chance to transform + # the data in its write method in that case). This is an entirely + # pragmatic split determined by experiment; we could be more general by + # always using write and having the Bytes subclass write method detect when + # it has already transformed the input; but, since this whole thing is a + # hack anyway this seems good enough. + + # We use these class constants when we need to manipulate data that has + # already been written to a buffer (ex: constructing a re to check the + # boundary), and the module level NL constant when adding new output to a + # buffer via self.write, because 'write' always takes strings. + # Having write always take strings makes the code simpler, but there are + # a few occasions when we need to write previously created data back + # to the buffer or to a new buffer; for those cases we use self._fp.write. + _NL = NL + _EMPTY = '' + + def _new_buffer(self): + # BytesGenerator overrides this to return BytesIO. + return StringIO() + def _write(self, msg): # We can't write the headers yet because of the following scenario: # say a multipart message includes the boundary string somewhere in @@ -91,13 +115,13 @@ class Generator: # parameter. # # The way we do this, so as to make the _handle_*() methods simpler, - # is to cache any subpart writes into a StringIO. The we write the - # headers and the StringIO contents. That way, subpart handlers can + # is to cache any subpart writes into a buffer. The we write the + # headers and the buffer contents. That way, subpart handlers can # Do The Right Thing, and can still modify the Content-Type: header if # necessary. oldfp = self._fp try: - self._fp = sfp = StringIO() + self._fp = sfp = self._new_buffer() self._dispatch(msg) finally: self._fp = oldfp @@ -132,16 +156,16 @@ class Generator: def _write_headers(self, msg): for h, v in msg.items(): - print('%s:' % h, end=' ', file=self._fp) + self.write('%s: ' % h) if isinstance(v, Header): - print(v.encode(maxlinelen=self._maxheaderlen), file=self._fp) + self.write(v.encode(maxlinelen=self._maxheaderlen)+NL) else: # Header's got lots of smarts, so use it. header = Header(v, maxlinelen=self._maxheaderlen, header_name=h) - print(header.encode(), file=self._fp) + self.write(header.encode()+NL) # A blank line always separates headers from body - print(file=self._fp) + self.write(NL) # # Handlers for writing types and subtypes @@ -153,9 +177,15 @@ class Generator: return if not isinstance(payload, str): raise TypeError('string payload expected: %s' % type(payload)) + if _has_surrogates(msg._payload): + charset = msg.get_param('charset') + if charset is not None: + del msg['content-transfer-encoding'] + msg.set_payload(payload, charset) + payload = msg.get_payload() if self._mangle_from_: payload = fcre.sub('>From ', payload) - self._fp.write(payload) + self.write(payload) # Default body handler _writeBody = _handle_text @@ -170,21 +200,21 @@ class Generator: subparts = [] elif isinstance(subparts, str): # e.g. a non-strict parse of a message with no starting boundary. - self._fp.write(subparts) + self.write(subparts) return elif not isinstance(subparts, list): # Scalar payload subparts = [subparts] for part in subparts: - s = StringIO() + s = self._new_buffer() g = self.clone(s) g.flatten(part, unixfrom=False) msgtexts.append(s.getvalue()) # Now make sure the boundary we've selected doesn't appear in any of # the message texts. - alltext = NL.join(msgtexts) + alltext = self._NL.join(msgtexts) # BAW: What about boundaries that are wrapped in double-quotes? - boundary = msg.get_boundary(failobj=_make_boundary(alltext)) + boundary = msg.get_boundary(failobj=self._make_boundary(alltext)) # If we had to calculate a new boundary because the body text # contained that string, set the new boundary. We don't do it # unconditionally because, while set_boundary() preserves order, it @@ -195,9 +225,9 @@ class Generator: msg.set_boundary(boundary) # If there's a preamble, write it out, with a trailing CRLF if msg.preamble is not None: - print(msg.preamble, file=self._fp) + self.write(msg.preamble + NL) # dash-boundary transport-padding CRLF - print('--' + boundary, file=self._fp) + self.write('--' + boundary + NL) # body-part if msgtexts: self._fp.write(msgtexts.pop(0)) @@ -206,14 +236,14 @@ class Generator: # --> CRLF body-part for body_part in msgtexts: # delimiter transport-padding CRLF - print('\n--' + boundary, file=self._fp) + self.write('\n--' + boundary + NL) # body-part self._fp.write(body_part) # close-delimiter transport-padding - self._fp.write('\n--' + boundary + '--') + self.write('\n--' + boundary + '--') if msg.epilogue is not None: - print(file=self._fp) - self._fp.write(msg.epilogue) + self.write(NL) + self.write(msg.epilogue) def _handle_multipart_signed(self, msg): # The contents of signed parts has to stay unmodified in order to keep @@ -232,23 +262,23 @@ class Generator: # block and the boundary. Sigh. blocks = [] for part in msg.get_payload(): - s = StringIO() + s = self._new_buffer() g = self.clone(s) g.flatten(part, unixfrom=False) text = s.getvalue() - lines = text.split('\n') + lines = text.split(self._NL) # Strip off the unnecessary trailing empty line - if lines and lines[-1] == '': - blocks.append(NL.join(lines[:-1])) + if lines and lines[-1] == self._EMPTY: + blocks.append(self._NL.join(lines[:-1])) else: blocks.append(text) # Now join all the blocks with an empty line. This has the lovely # effect of separating each block with an empty line, but not adding # an extra one after the last one. - self._fp.write(NL.join(blocks)) + self._fp.write(self._NL.join(blocks)) def _handle_message(self, msg): - s = StringIO() + s = self._new_buffer() g = self.clone(s) # The payload of a message/rfc822 part should be a multipart sequence # of length 1. The zeroth element of the list should be the Message @@ -265,6 +295,90 @@ class Generator: payload = s.getvalue() self._fp.write(payload) + # This used to be a module level function; we use a classmethod for this + # and _compile_re so we can continue to provide the module level function + # for backward compatibility by doing + # _make_boudary = Generator._make_boundary + # at the end of the module. It *is* internal, so we could drop that... + @classmethod + def _make_boundary(cls, text=None): + # Craft a random boundary. If text is given, ensure that the chosen + # boundary doesn't appear in the text. + token = random.randrange(sys.maxsize) + boundary = ('=' * 15) + (_fmt % token) + '==' + if text is None: + return boundary + b = boundary + counter = 0 + while True: + cre = cls._compile_re('^--' + re.escape(b) + '(--)?$', re.MULTILINE) + if not cre.search(text): + break + b = boundary + '.' + str(counter) + counter += 1 + return b + + @classmethod + def _compile_re(cls, s, flags): + return re.compile(s, flags) + + +class BytesGenerator(Generator): + """Generates a bytes version of a Message object tree. + + Functionally identical to the base Generator except that the output is + bytes and not string. When surrogates were used in the input to encode + bytes, these are decoded back to bytes for output. + + The outfp object must accept bytes in its write method. + """ + + # Bytes versions of these constants for use in manipulating data from + # the BytesIO buffer. + _NL = NL.encode('ascii') + _EMPTY = b'' + + def write(self, s): + self._fp.write(s.encode('ascii', 'surrogateescape')) + + def _new_buffer(self): + return BytesIO() + + def _write_headers(self, msg): + # This is almost the same as the string version, except for handling + # strings with 8bit bytes. + for h, v in msg._headers: + self.write('%s: ' % h) + if isinstance(v, Header): + self.write(v.encode(maxlinelen=self._maxheaderlen)+NL) + elif _has_surrogates(v): + # If we have raw 8bit data in a byte string, we have no idea + # what the encoding is. There is no safe way to split this + # string. If it's ascii-subset, then we could do a normal + # ascii split, but if it's multibyte then we could break the + # string. There's no way to know so the least harm seems to + # be to not split the string and risk it being too long. + self.write(v+NL) + else: + # Header's got lots of smarts and this string is safe... + header = Header(v, maxlinelen=self._maxheaderlen, + header_name=h) + self.write(header.encode()+NL) + # A blank line always separates headers from body + self.write(NL) + + def _handle_text(self, msg): + # If the string has surrogates the original source was bytes, so + # just write it back out. + if _has_surrogates(msg._payload): + self.write(msg._payload) + else: + super(BytesGenerator,self)._handle_text(msg) + + @classmethod + def _compile_re(cls, s, flags): + return re.compile(s.encode('ascii'), flags) + _FMT = '[Non-text (%(type)s) part of message omitted, filename %(filename)s]' @@ -325,23 +439,9 @@ class DecodedGenerator(Generator): -# Helper +# Helper used by Generator._make_boundary _width = len(repr(sys.maxsize-1)) _fmt = '%%0%dd' % _width -def _make_boundary(text=None): - # Craft a random boundary. If text is given, ensure that the chosen - # boundary doesn't appear in the text. - token = random.randrange(sys.maxsize) - boundary = ('=' * 15) + (_fmt % token) + '==' - if text is None: - return boundary - b = boundary - counter = 0 - while True: - cre = re.compile('^--' + re.escape(b) + '(--)?$', re.MULTILINE) - if not cre.search(text): - break - b = boundary + '.' + str(counter) - counter += 1 - return b +# Backward compatibility +_make_boundary = Generator._make_boundary diff --git a/Lib/email/message.py b/Lib/email/message.py index 923b26c..a835ce2 100644 --- a/Lib/email/message.py +++ b/Lib/email/message.py @@ -24,8 +24,26 @@ SEMISPACE = '; ' # existence of which force quoting of the parameter value. tspecials = re.compile(r'[ \(\)<>@,;:\\"/\[\]\?=]') +# How to figure out if we are processing strings that come from a byte +# source with undecodable characters. +_has_surrogates = re.compile( + '([^\ud800-\udbff]|\A)[\udc00-\udfff]([^\udc00-\udfff]|\Z)').search + # Helper functions +def _sanitize_surrogates(value): + # If the value contains surrogates, re-decode and replace the original + # non-ascii bytes with '?'s. Used to sanitize header values before letting + # them escape as strings. + if not isinstance(value, str): + # Header object + return value + if _has_surrogates(value): + original_bytes = value.encode('ascii', 'surrogateescape') + return original_bytes.decode('ascii', 'replace').replace('\ufffd', '?') + else: + return value + def _splitparam(param): # Split header parameters. BAW: this may be too simple. It isn't # strictly RFC 2045 (section 5.1) compliant, but it catches most headers @@ -184,44 +202,72 @@ class Message: If the message is a multipart and the decode flag is True, then None is returned. """ - if i is None: - payload = self._payload - elif not isinstance(self._payload, list): + # Here is the logic table for this code, based on the email5.0.0 code: + # i decode is_multipart result + # ------ ------ ------------ ------------------------------ + # None True True None + # i True True None + # None False True _payload (a list) + # i False True _payload element i (a Message) + # i False False error (not a list) + # i True False error (not a list) + # None False False _payload + # None True False _payload decoded (bytes) + # Note that Barry planned to factor out the 'decode' case, but that + # isn't so easy now that we handle the 8 bit data, which needs to be + # converted in both the decode and non-decode path. + if self.is_multipart(): + if decode: + return None + if i is None: + return self._payload + else: + return self._payload[i] + # For backward compatibility, Use isinstance and this error message + # instead of the more logical is_multipart test. + if i is not None and not isinstance(self._payload, list): raise TypeError('Expected list, got %s' % type(self._payload)) - else: - payload = self._payload[i] + payload = self._payload + cte = self.get('content-transfer-encoding', '').lower() + # payload can be bytes here, (I wonder if that is actually a bug?) + if isinstance(payload, str): + if _has_surrogates(payload): + bpayload = payload.encode('ascii', 'surrogateescape') + if not decode: + try: + payload = bpayload.decode(self.get_param('charset', 'ascii'), 'replace') + except LookupError: + payload = bpayload.decode('ascii', 'replace') + elif decode: + try: + bpayload = payload.encode('ascii') + except UnicodeError: + # This won't happen for RFC compliant messages (messages + # containing only ASCII codepoints in the unicode input). + # If it does happen, turn the string into bytes in a way + # guaranteed not to fail. + bpayload = payload.encode('raw-unicode-escape') if not decode: return payload - # Decoded payloads always return bytes. XXX split this part out into - # a new method called .get_decoded_payload(). - if self.is_multipart(): - return None - cte = self.get('content-transfer-encoding', '').lower() if cte == 'quoted-printable': - if isinstance(payload, str): - payload = payload.encode('ascii') - return utils._qdecode(payload) + return utils._qdecode(bpayload) elif cte == 'base64': try: - if isinstance(payload, str): - payload = payload.encode('ascii') - return base64.b64decode(payload) + return base64.b64decode(bpayload) except binascii.Error: # Incorrect padding - pass + return bpayload elif cte in ('x-uuencode', 'uuencode', 'uue', 'x-uue'): - in_file = BytesIO(payload.encode('ascii')) + in_file = BytesIO(bpayload) out_file = BytesIO() try: uu.decode(in_file, out_file, quiet=True) return out_file.getvalue() except uu.Error: # Some decoding problem - pass - # Is there a better way to do this? We can't use the bytes - # constructor. + return bpayload if isinstance(payload, str): - return payload.encode('raw-unicode-escape') + return bpayload return payload def set_payload(self, payload, charset=None): @@ -340,7 +386,7 @@ class Message: Any fields deleted and re-inserted are always appended to the header list. """ - return [v for k, v in self._headers] + return [_sanitize_surrogates(v) for k, v in self._headers] def items(self): """Get all the message's header fields and values. @@ -350,7 +396,7 @@ class Message: Any fields deleted and re-inserted are always appended to the header list. """ - return self._headers[:] + return [(k, _sanitize_surrogates(v)) for k, v in self._headers] def get(self, name, failobj=None): """Get a header value. @@ -361,7 +407,7 @@ class Message: name = name.lower() for k, v in self._headers: if k.lower() == name: - return v + return _sanitize_surrogates(v) return failobj # @@ -381,7 +427,7 @@ class Message: name = name.lower() for k, v in self._headers: if k.lower() == name: - values.append(v) + values.append(_sanitize_surrogates(v)) if not values: return failobj return values diff --git a/Lib/email/parser.py b/Lib/email/parser.py index 06014e2..b83e0f7 100644 --- a/Lib/email/parser.py +++ b/Lib/email/parser.py @@ -7,7 +7,7 @@ __all__ = ['Parser', 'HeaderParser'] import warnings -from io import StringIO +from io import StringIO, TextIOWrapper from email.feedparser import FeedParser from email.message import Message @@ -89,3 +89,47 @@ class HeaderParser(Parser): def parsestr(self, text, headersonly=True): return Parser.parsestr(self, text, True) + + +class BytesParser: + + def __init__(self, *args, **kw): + """Parser of binary RFC 2822 and MIME email messages. + + Creates an in-memory object tree representing the email message, which + can then be manipulated and turned over to a Generator to return the + textual representation of the message. + + The input must be formatted as a block of RFC 2822 headers and header + continuation lines, optionally preceeded by a `Unix-from' header. The + header block is terminated either by the end of the input or by a + blank line. + + _class is the class to instantiate for new message objects when they + must be created. This class must have a constructor that can take + zero arguments. Default is Message.Message. + """ + self.parser = Parser(*args, **kw) + + def parse(self, fp, headersonly=False): + """Create a message structure from the data in a binary file. + + Reads all the data from the file and returns the root of the message + structure. Optional headersonly is a flag specifying whether to stop + parsing after reading the headers or not. The default is False, + meaning it parses the entire contents of the file. + """ + fp = TextIOWrapper(fp, encoding='ascii', errors='surrogateescape') + return self.parser.parse(fp, headersonly) + + + def parsebytes(self, text, headersonly=False): + """Create a message structure from a byte string. + + Returns the root of the message structure. Optional headersonly is a + flag specifying whether to stop parsing after reading the headers or + not. The default is False, meaning it parses the entire contents of + the file. + """ + text = text.decode('ASCII', errors='surrogateescape') + return self.parser.parsestr(text, headersonly) diff --git a/Lib/email/test/test_email.py b/Lib/email/test/test_email.py index 8f95125..e5e51c6 100644 --- a/Lib/email/test/test_email.py +++ b/Lib/email/test/test_email.py @@ -9,8 +9,9 @@ import base64 import difflib import unittest import warnings +import textwrap -from io import StringIO +from io import StringIO, BytesIO from itertools import chain import email @@ -34,7 +35,7 @@ from email import iterators from email import base64mime from email import quoprimime -from test.support import findfile, run_unittest +from test.support import findfile, run_unittest, unlink from email.test import __file__ as landmark @@ -2070,6 +2071,10 @@ class TestIdempotent(TestEmailBase): msg, text = self._msgobj('msg_36.txt') self._idempotent(msg, text) + def test_message_signed_idempotent(self): + msg, text = self._msgobj('msg_45.txt') + self._idempotent(msg, text) + def test_content_type(self): eq = self.assertEquals unless = self.assertTrue @@ -2186,7 +2191,8 @@ class TestMiscellaneous(TestEmailBase): all.sort() self.assertEqual(all, [ 'base64mime', 'charset', 'encoders', 'errors', 'generator', - 'header', 'iterators', 'message', 'message_from_file', + 'header', 'iterators', 'message', 'message_from_binary_file', + 'message_from_bytes', 'message_from_file', 'message_from_string', 'mime', 'parser', 'quoprimime', 'utils', ]) @@ -2687,6 +2693,266 @@ Here's the message body self.assertTrue(msg.get_payload(0).get_payload().endswith('\r\n')) +class Test8BitBytesHandling(unittest.TestCase): + # In Python3 all input is string, but that doesn't work if the actual input + # uses an 8bit transfer encoding. To hack around that, in email 5.1 we + # decode byte streams using the surrogateescape error handler, and + # reconvert to binary at appropriate places if we detect surrogates. This + # doesn't allow us to transform headers with 8bit bytes (they get munged), + # but it does allow us to parse and preserve them, and to decode body + # parts that use an 8bit CTE. + + bodytest_msg = textwrap.dedent("""\ + From: foo@bar.com + To: baz + Mime-Version: 1.0 + Content-Type: text/plain; charset={charset} + Content-Transfer-Encoding: {cte} + + {bodyline} + """) + + def test_known_8bit_CTE(self): + m = self.bodytest_msg.format(charset='utf-8', + cte='8bit', + bodyline='pöstal').encode('utf-8') + msg = email.message_from_bytes(m) + self.assertEqual(msg.get_payload(), "pöstal\n") + self.assertEqual(msg.get_payload(decode=True), + "pöstal\n".encode('utf-8')) + + def test_unknown_8bit_CTE(self): + m = self.bodytest_msg.format(charset='notavalidcharset', + cte='8bit', + bodyline='pöstal').encode('utf-8') + msg = email.message_from_bytes(m) + self.assertEqual(msg.get_payload(), "p��stal\n") + self.assertEqual(msg.get_payload(decode=True), + "pöstal\n".encode('utf-8')) + + def test_8bit_in_quopri_body(self): + # This is non-RFC compliant data...without 'decode' the library code + # decodes the body using the charset from the headers, and because the + # source byte really is utf-8 this works. This is likely to fail + # against real dirty data (ie: produce mojibake), but the data is + # invalid anyway so it is as good a guess as any. But this means that + # this test just confirms the current behavior; that behavior is not + # necessarily the best possible behavior. With 'decode' it is + # returning the raw bytes, so that test should be of correct behavior, + # or at least produce the same result that email4 did. + m = self.bodytest_msg.format(charset='utf-8', + cte='quoted-printable', + bodyline='p=C3=B6stál').encode('utf-8') + msg = email.message_from_bytes(m) + self.assertEqual(msg.get_payload(), 'p=C3=B6stál\n') + self.assertEqual(msg.get_payload(decode=True), + 'pöstál\n'.encode('utf-8')) + + def test_invalid_8bit_in_non_8bit_cte_uses_replace(self): + # This is similar to the previous test, but proves that if the 8bit + # byte is undecodeable in the specified charset, it gets replaced + # by the unicode 'unknown' character. Again, this may or may not + # be the ideal behavior. Note that if decode=False none of the + # decoders will get involved, so this is the only test we need + # for this behavior. + m = self.bodytest_msg.format(charset='ascii', + cte='quoted-printable', + bodyline='p=C3=B6stál').encode('utf-8') + msg = email.message_from_bytes(m) + self.assertEqual(msg.get_payload(), 'p=C3=B6st��l\n') + self.assertEqual(msg.get_payload(decode=True), + 'pöstál\n'.encode('utf-8')) + + def test_8bit_in_base64_body(self): + # Sticking an 8bit byte in a base64 block makes it undecodable by + # normal means, so the block is returned undecoded, but as bytes. + m = self.bodytest_msg.format(charset='utf-8', + cte='base64', + bodyline='cMO2c3RhbAá=').encode('utf-8') + msg = email.message_from_bytes(m) + self.assertEqual(msg.get_payload(decode=True), + 'cMO2c3RhbAá=\n'.encode('utf-8')) + + def test_8bit_in_uuencode_body(self): + # Sticking an 8bit byte in a uuencode block makes it undecodable by + # normal means, so the block is returned undecoded, but as bytes. + m = self.bodytest_msg.format(charset='utf-8', + cte='uuencode', + bodyline='<,.V<W1A; á ').encode('utf-8') + msg = email.message_from_bytes(m) + self.assertEqual(msg.get_payload(decode=True), + '<,.V<W1A; á \n'.encode('utf-8')) + + + headertest_msg = textwrap.dedent("""\ + From: foo@bar.com + To: báz + Subject: Maintenant je vous présente mon collègue, le pouf célèbre + \tJean de Baddie + From: göst + + Yes, they are flying. + """).encode('utf-8') + + def test_get_8bit_header(self): + msg = email.message_from_bytes(self.headertest_msg) + self.assertEqual(msg.get('to'), 'b??z') + self.assertEqual(msg['to'], 'b??z') + + def test_print_8bit_headers(self): + msg = email.message_from_bytes(self.headertest_msg) + self.assertEqual(str(msg), + self.headertest_msg.decode( + 'ascii', 'replace').replace('�', '?')) + + def test_values_with_8bit_headers(self): + msg = email.message_from_bytes(self.headertest_msg) + self.assertListEqual(msg.values(), + ['foo@bar.com', + 'b??z', + 'Maintenant je vous pr??sente mon ' + 'coll??gue, le pouf c??l??bre\n' + '\tJean de Baddie', + "g??st"]) + + def test_items_with_8bit_headers(self): + msg = email.message_from_bytes(self.headertest_msg) + self.assertListEqual(msg.items(), + [('From', 'foo@bar.com'), + ('To', 'b??z'), + ('Subject', 'Maintenant je vous pr??sente mon ' + 'coll??gue, le pouf c??l??bre\n' + '\tJean de Baddie'), + ('From', 'g??st')]) + + def test_get_all_with_8bit_headers(self): + msg = email.message_from_bytes(self.headertest_msg) + self.assertListEqual(msg.get_all('from'), + ['foo@bar.com', + 'g??st']) + + non_latin_bin_msg = textwrap.dedent("""\ + From: foo@bar.com + To: báz + Subject: Maintenant je vous présente mon collègue, le pouf célèbre + \tJean de Baddie + Mime-Version: 1.0 + Content-Type: text/plain; charset="utf-8" + Content-Transfer-Encoding: 8bit + + Да, они летят. + """).encode('utf-8') + + def test_bytes_generator(self): + msg = email.message_from_bytes(self.non_latin_bin_msg) + out = BytesIO() + email.generator.BytesGenerator(out).flatten(msg) + self.assertEqual(out.getvalue(), self.non_latin_bin_msg) + + # XXX: ultimately the '?' should turn into CTE encoded bytes + # using 'unknown-8bit' charset. + non_latin_bin_msg_as7bit = textwrap.dedent("""\ + From: foo@bar.com + To: b??z + Subject: Maintenant je vous pr??sente mon coll??gue, le pouf c??l??bre + \tJean de Baddie + Mime-Version: 1.0 + Content-Type: text/plain; charset="utf-8" + Content-Transfer-Encoding: base64 + + 0JTQsCwg0L7QvdC4INC70LXRgtGP0YIuCg== + """) + + def test_generator_handles_8bit(self): + msg = email.message_from_bytes(self.non_latin_bin_msg) + out = StringIO() + email.generator.Generator(out).flatten(msg) + self.assertEqual(out.getvalue(), self.non_latin_bin_msg_as7bit) + + def test_bytes_generator_with_unix_from(self): + # The unixfrom contains a current date, so we can't check it + # literally. Just make sure the first word is 'From' and the + # rest of the message matches the input. + msg = email.message_from_bytes(self.non_latin_bin_msg) + out = BytesIO() + email.generator.BytesGenerator(out).flatten(msg, unixfrom=True) + lines = out.getvalue().split(b'\n') + self.assertEqual(lines[0].split()[0], b'From') + self.assertEqual(b'\n'.join(lines[1:]), self.non_latin_bin_msg) + + def test_message_from_binary_file(self): + fn = 'test.msg' + self.addCleanup(unlink, fn) + with open(fn, 'wb') as testfile: + testfile.write(self.non_latin_bin_msg) + m = email.parser.BytesParser().parse(open(fn, 'rb')) + self.assertEqual(str(m), self.non_latin_bin_msg_as7bit) + + latin_bin_msg = textwrap.dedent("""\ + From: foo@bar.com + To: Dinsdale + Subject: Nudge nudge, wink, wink + Mime-Version: 1.0 + Content-Type: text/plain; charset="latin-1" + Content-Transfer-Encoding: 8bit + + oh là là, know what I mean, know what I mean? + """).encode('latin-1') + + latin_bin_msg_as7bit = textwrap.dedent("""\ + From: foo@bar.com + To: Dinsdale + Subject: Nudge nudge, wink, wink + Mime-Version: 1.0 + Content-Type: text/plain; charset="iso-8859-1" + Content-Transfer-Encoding: quoted-printable + + oh l=E0 l=E0, know what I mean, know what I mean? + """) + + def test_string_generator_reencodes_to_quopri_when_appropriate(self): + m = email.message_from_bytes(self.latin_bin_msg) + self.assertEqual(str(m), self.latin_bin_msg_as7bit) + + def test_decoded_generator_emits_unicode_body(self): + m = email.message_from_bytes(self.latin_bin_msg) + out = StringIO() + email.generator.DecodedGenerator(out).flatten(m) + #DecodedHeader output contains an extra blank line compared + #to the input message. RDM: not sure if this is a bug or not, + #but it is not specific to the 8bit->7bit conversion. + self.assertEqual(out.getvalue(), + self.latin_bin_msg.decode('latin-1')+'\n') + + def test_bytes_feedparser(self): + bfp = email.feedparser.BytesFeedParser() + for i in range(0, len(self.latin_bin_msg), 10): + bfp.feed(self.latin_bin_msg[i:i+10]) + m = bfp.close() + self.assertEqual(str(m), self.latin_bin_msg_as7bit) + + +class TestBytesGeneratorIdempotent(TestIdempotent): + + def _msgobj(self, filename): + with openfile(filename, 'rb') as fp: + data = fp.read() + msg = email.message_from_bytes(data) + return msg, data + + def _idempotent(self, msg, data): + b = BytesIO() + g = email.generator.BytesGenerator(b, maxheaderlen=0) + g.flatten(msg) + self.assertEqual(data, b.getvalue()) + + maxDiff = None + + def assertEqual(self, str1, str2): + self.assertListEqual(str1.split(b'\n'), str2.split(b'\n')) + + + class TestBase64(unittest.TestCase): def test_len(self): eq = self.assertEqual |