From 9604e66660bfe5066a88e3eb560a5846c620e8de Mon Sep 17 00:00:00 2001 From: Guido van Rossum Date: Thu, 30 Aug 2007 03:46:43 +0000 Subject: Oops. I copied a slightly older version of the email package from the sandbox. This should restore the email package in the py3k branch to exactly what's in the sandbox. This wipes out 1-2 fixes made post-copy, which I'll re-apply shortly. --- Lib/email/base64mime.py | 105 +++-------- Lib/email/charset.py | 189 +++++++++---------- Lib/email/generator.py | 8 +- Lib/email/header.py | 136 +++++++++---- Lib/email/message.py | 20 +- Lib/email/quoprimime.py | 99 +++++----- Lib/email/test/test_email.py | 441 ++++++++++++++++++++++++++----------------- Lib/email/utils.py | 12 +- 8 files changed, 536 insertions(+), 474 deletions(-) diff --git a/Lib/email/base64mime.py b/Lib/email/base64mime.py index e309f30..0035b79 100644 --- a/Lib/email/base64mime.py +++ b/Lib/email/base64mime.py @@ -25,7 +25,6 @@ module. """ __all__ = [ - 'base64_len', 'body_decode', 'body_encode', 'decode', @@ -33,12 +32,13 @@ __all__ = [ 'encode', 'encodestring', 'header_encode', + 'header_length', ] import re +from base64 import b64encode from binascii import b2a_base64, a2b_base64 -from email.utils import fix_eols CRLF = '\r\n' NL = '\n' @@ -50,11 +50,10 @@ MISC_LEN = 7 # Helpers -def base64_len(s): +def header_length(bytearray): """Return the length of s when it is encoded with base64.""" - groups_of_3, leftover = divmod(len(s), 3) + groups_of_3, leftover = divmod(len(bytearray), 3) # 4 bytes out for each 3 bytes (or nonzero fraction thereof) in. - # Thanks, Tim! n = groups_of_3 * 4 if leftover: n += 4 @@ -62,74 +61,26 @@ def base64_len(s): -def header_encode(header, charset='iso-8859-1', keep_eols=False, - maxlinelen=76, eol=NL): +def header_encode(header_bytes, charset='iso-8859-1'): """Encode a single header line with Base64 encoding in a given charset. - Defined in RFC 2045, this Base64 encoding is identical to normal Base64 - encoding, except that each line must be intelligently wrapped (respecting - the Base64 encoding), and subsequent lines must start with a space. - charset names the character set to use to encode the header. It defaults - to iso-8859-1. - - End-of-line characters (\\r, \\n, \\r\\n) will be automatically converted - to the canonical email line separator \\r\\n unless the keep_eols - parameter is True (the default is False). - - Each line of the header will be terminated in the value of eol, which - defaults to "\\n". Set this to "\\r\\n" if you are using the result of - this function directly in email. - - The resulting string will be in the form: - - "=?charset?b?WW/5ciBtYXp66XLrIHf8eiBhIGhhbXBzdGHuciBBIFlv+XIgbWF6euly?=\\n - =?charset?b?6yB3/HogYSBoYW1wc3Rh7nIgQkMgWW/5ciBtYXp66XLrIHf8eiBhIGhh?=" - - with each line wrapped at, at most, maxlinelen characters (defaults to 76 - characters). + to iso-8859-1. Base64 encoding is defined in RFC 2045. """ # Return empty headers unchanged - if not header: - return header - - if not keep_eols: - header = fix_eols(header) - - # Base64 encode each line, in encoded chunks no greater than maxlinelen in - # length, after the RFC chrome is added in. - base64ed = [] - max_encoded = maxlinelen - len(charset) - MISC_LEN - max_unencoded = max_encoded * 3 // 4 - - for i in range(0, len(header), max_unencoded): - base64ed.append(b2a_base64(header[i:i+max_unencoded])) - - # Now add the RFC chrome to each encoded chunk - lines = [] - for line in base64ed: - # Ignore the last character of each line if it is a newline - if line[-1] == ord(NL): - line = line[:-1] - # Add the chrome - lines.append('=?%s?b?%s?=' % (charset, line)) - # Glue the lines together and return it. BAW: should we be able to - # specify the leading whitespace in the joiner? - joiner = eol + ' ' - return joiner.join(lines) + if not header_bytes: + return str(header_bytes) + encoded = b64encode(header_bytes) + return '=?%s?b?%s?=' % (charset, encoded) -def encode(s, binary=True, maxlinelen=76, eol=NL): +def body_encode(s, maxlinelen=76, eol=NL): """Encode a string with base64. Each line will be wrapped at, at most, maxlinelen characters (defaults to 76 characters). - If binary is False, end-of-line characters will be converted to the - canonical email end-of-line sequence \\r\\n. Otherwise they will be left - verbatim (this is the default). - Each line of encoded text will end with eol, which defaults to "\\n". Set this to "\r\n" if you will be using the result of this function directly in an email. @@ -137,9 +88,6 @@ def encode(s, binary=True, maxlinelen=76, eol=NL): if not s: return s - if not binary: - s = fix_eols(s) - encvec = [] max_unencoded = maxlinelen * 3 // 4 for i in range(0, len(s), max_unencoded): @@ -152,25 +100,26 @@ def encode(s, binary=True, maxlinelen=76, eol=NL): return EMPTYSTRING.join(encvec) -# For convenience and backwards compatibility w/ standard base64 module -body_encode = encode -encodestring = encode - - -def decode(string): +def decode(s, convert_eols=False): """Decode a raw base64 string, returning a bytes object. - This function does not parse a full MIME header value encoded with base64 - (like =?iso-8895-1?b?bmloISBuaWgh?=) -- use the high level - email.Header class for that functionality. + If convert_eols is set to a string value, all canonical email linefeeds, + e.g. "\\r\\n", in the decoded text will be converted to the value of + convert_eols. os.linesep is a good choice for convert_eols if you are + decoding a text attachment. + + This function does not parse a full MIME header value encoded with + base64 (like =?iso-8895-1?b?bmloISBuaWgh?=) -- please use the high + level email.Header class for that functionality. """ - if not string: - return bytes() - elif isinstance(string, str): - return a2b_base64(string.encode('raw-unicode-escape')) - else: - return a2b_base64(string) + if not s: + return s + + dec = a2b_base64(s) + if convert_eols: + return dec.replace(CRLF, convert_eols) + return dec # For convenience and backwards compatibility w/ standard base64 module diff --git a/Lib/email/charset.py b/Lib/email/charset.py index 1435ee5..9e5ee67 100644 --- a/Lib/email/charset.py +++ b/Lib/email/charset.py @@ -9,6 +9,8 @@ __all__ = [ 'add_codec', ] +from functools import partial + import email.base64mime import email.quoprimime @@ -23,9 +25,10 @@ BASE64 = 2 # Base64 SHORTEST = 3 # the shorter of QP and base64, but only for headers # In "=?charset?q?hello_world?=", the =?, ?q?, and ?= add up to 7 -MISC_LEN = 7 +RFC2047_CHROME_LEN = 7 DEFAULT_CHARSET = 'us-ascii' +EMPTYSTRING = '' @@ -259,63 +262,6 @@ class Charset: else: return encode_7or8bit - def convert(self, s): - """Convert a string from the input_codec to the output_codec.""" - if self.input_codec != self.output_codec: - rawbytes = bytes(ord(c) for c in s) - decoded = rawbytes.decode(self.input_codec) - encoded = decoded.encode(self.output_codec) - return str(encoded) - else: - return s - - def to_splittable(self, s): - """Convert a possibly multibyte string to a safely splittable format. - - Uses the input_codec to try and convert the string to Unicode, so it - can be safely split on character boundaries (even for multibyte - characters). - - Returns the string as-is if it isn't known how to convert it to - Unicode with the input_charset. - - Characters that could not be converted to Unicode will be replaced - with the Unicode replacement character U+FFFD. - """ - if isinstance(s, str) or self.input_codec is None: - return s - try: - return str(s, self.input_codec, 'replace') - except LookupError: - # Input codec not installed on system, so return the original - # string unchanged. - return s - - def from_splittable(self, ustr, to_output=True): - """Convert a splittable string back into an encoded string. - - Uses the proper codec to try and convert the string from Unicode back - into an encoded format. Return the string as-is if it is not Unicode, - or if it could not be converted from Unicode. - - Characters that could not be converted from Unicode will be replaced - with an appropriate character (usually '?'). - - If to_output is True (the default), uses output_codec to convert to an - encoded format. If to_output is False, uses input_codec. - """ - if to_output: - codec = self.output_codec - else: - codec = self.input_codec - if not isinstance(ustr, str) or codec is None: - return ustr - try: - return str(ustr.encode(codec, 'replace')) - except LookupError: - # Output codec not installed - return ustr - def get_output_charset(self): """Return the output character set. @@ -324,66 +270,115 @@ class Charset: """ return self.output_charset or self.input_charset - def encoded_header_len(self, s): - """Return the length of the encoded header string.""" - cset = self.get_output_charset() - # The len(s) of a 7bit encoding is len(s) - if self.header_encoding == BASE64: - return email.base64mime.base64_len(s) + len(cset) + MISC_LEN - elif self.header_encoding == QP: - return email.quoprimime.header_quopri_len(s) + len(cset) + MISC_LEN - elif self.header_encoding == SHORTEST: - lenb64 = email.base64mime.base64_len(s) - lenqp = email.quoprimime.header_quopri_len(s) - return min(lenb64, lenqp) + len(cset) + MISC_LEN - else: - return len(s) - def header_encode(self, string): """Header-encode a string by converting it first to bytes. - :param string: A unicode string for the header. This must be - encodable to bytes using the current character set's `output_codec`. - The type of encoding (base64 or quoted-printable) will be based on this charset's `header_encoding`. + + :param string: A unicode string for the header. It must be possible + to encode this string to bytes using the character set's + output codec. + :return: The encoded string, with RFC 2047 chrome. """ codec = self.output_codec or 'us-ascii' charset = self.get_output_charset() header_bytes = string.encode(codec) # 7bit/8bit encodings return the string unchanged (modulo conversions) + encoder_module = self._get_encoder(header_bytes) + if encoder_module is None: + return string + return encoder_module.header_encode(header_bytes, codec) + + def header_encode_lines(self, string, maxlengths): + """Header-encode a string by converting it first to bytes. + + This is similar to `header_encode()` except that the string is fit + into maximum line lengths as given by the arguments. + + :param string: A unicode string for the header. It must be possible + to encode this string to bytes using the character set's + output codec. + :param maxlengths: Maximum line length iterator. Each element + returned from this iterator will provide the next maximum line + length. This parameter is used as an argument to built-in next() + and should never be exhausted. The maximum line lengths should + not count the RFC 2047 chrome. These line lengths are only a + hint; the splitter does the best it can. + :param firstmaxlen: The maximum line length of the first line. If + None (the default), then `maxlen` is used for the first line. + :return: Lines of encoded strings, each with RFC 2047 chrome. + """ + # See which encoding we should use. + codec = self.output_codec or 'us-ascii' + header_bytes = string.encode(codec) + encoder_module = self._get_encoder(header_bytes) + encoder = partial(encoder_module.header_encode, charset=str(self)) + # Calculate the number of characters that the RFC 2047 chrome will + # contribute to each line. + charset = self.get_output_charset() + extra = len(charset) + RFC2047_CHROME_LEN + # Now comes the hard part. We must encode bytes but we can't split on + # bytes because some character sets are variable length and each + # encoded word must stand on its own. So the problem is you have to + # encode to bytes to figure out this word's length, but you must split + # on characters. This causes two problems: first, we don't know how + # many octets a specific substring of unicode characters will get + # encoded to, and second, we don't know how many ASCII characters + # those octets will get encoded to. Unless we try it. Which seems + # inefficient. In the interest of being correct rather than fast (and + # in the hope that there will be few encoded headers in any such + # message), brute force it. :( + lines = [] + current_line = [] + maxlen = next(maxlengths) - extra + for character in string: + current_line.append(character) + this_line = EMPTYSTRING.join(current_line) + length = encoder_module.header_length(this_line.encode(charset)) + if length > maxlen: + # This last character doesn't fit so pop it off. + current_line.pop() + # Does nothing fit on the first line? + if not lines and not current_line: + lines.append(None) + else: + separator = (' ' if lines else '') + joined_line = EMPTYSTRING.join(current_line) + header_bytes = joined_line.encode(codec) + lines.append(encoder(header_bytes)) + current_line = [character] + maxlen = next(maxlengths) - extra + joined_line = EMPTYSTRING.join(current_line) + header_bytes = joined_line.encode(codec) + lines.append(encoder(header_bytes)) + return lines + + def _get_encoder(self, header_bytes): if self.header_encoding == BASE64: - encoder = email.base64mime.header_encode + return email.base64mime elif self.header_encoding == QP: - encoder = email.quoprimime.header_encode + return email.quoprimime elif self.header_encoding == SHORTEST: - lenb64 = email.base64mime.base64_len(header_bytes) - lenqp = email.quoprimime.header_quopri_len(header_bytes) - if lenb64 < lenqp: - encoder = email.base64mime.header_encode + len64 = email.base64mime.header_length(header_bytes) + lenqp = email.quoprimime.header_length(header_bytes) + if len64 < lenqp: + return email.base64mime else: - encoder = email.quoprimime.header_encode + return email.quoprimime else: - return string - return encoder(header_bytes, codec) + return None - def body_encode(self, s, convert=True): - """Body-encode a string and convert it to output_charset. - - If convert is True (the default), the string will be converted from - the input charset to output charset automatically. Unlike - header_encode(), there are no issues with byte boundaries and - multibyte charsets in email bodies, so this is usually pretty safe. + def body_encode(self, string): + """Body-encode a string by converting it first to bytes. The type of encoding (base64 or quoted-printable) will be based on self.body_encoding. """ - if convert: - s = self.convert(s) # 7bit/8bit encodings return the string unchanged (module conversions) if self.body_encoding is BASE64: - return email.base64mime.body_encode(s) + return email.base64mime.body_encode(string) elif self.body_encoding is QP: - return email.quoprimime.body_encode(s) + return email.quoprimime.body_encode(string) else: - return s + return string diff --git a/Lib/email/generator.py b/Lib/email/generator.py index 1352ede..2d6a191 100644 --- a/Lib/email/generator.py +++ b/Lib/email/generator.py @@ -133,12 +133,8 @@ class Generator: def _write_headers(self, msg): for h, v in msg.items(): print('%s:' % h, end=' ', file=self._fp) - if self._maxheaderlen == 0: - # Explicit no-wrapping - print(v, file=self._fp) - elif isinstance(v, Header): - # Header instances know what to do - print(v.encode(), file=self._fp) + if isinstance(v, Header): + print(v.encode(maxlinelen=self._maxheaderlen), file=self._fp) else: # Header's got lots of smarts, so use it. header = Header(v, maxlinelen=self._maxheaderlen, diff --git a/Lib/email/header.py b/Lib/email/header.py index 1d97f8f..e03e42d 100644 --- a/Lib/email/header.py +++ b/Lib/email/header.py @@ -25,10 +25,11 @@ BSPACE = b' ' SPACE8 = ' ' * 8 EMPTYSTRING = '' -MAXLINELEN = 76 +MAXLINELEN = 78 USASCII = Charset('us-ascii') UTF8 = Charset('utf-8') +TRANSITIONAL_SPACE = object() # Match encoded-word strings in the form =?charset?q?Hello_World?= ecre = re.compile(r''' @@ -109,7 +110,7 @@ def decode_header(header): last_word = last_charset = None for word, charset in decoded_words: if isinstance(word, str): - word = bytes(ord(c) for c in word) + word = bytes(word, 'raw-unicode-escape') if last_word is None: last_word = word last_charset = charset @@ -170,7 +171,8 @@ class Header: The maximum line length can be specified explicit via maxlinelen. For splitting the first line to a shorter value (to account for the field header which isn't included in s, e.g. `Subject') pass in the name of - the field in header_name. The default maxlinelen is 76. + the field in header_name. The default maxlinelen is 78 as recommended + by RFC 2822. continuation_ws must be RFC 2822 compliant folding whitespace (usually either a space or a hard tab) which will be prepended to continuation @@ -198,9 +200,10 @@ class Header: def __str__(self): """Return the string value of the header.""" + self._normalize() uchunks = [] lastcs = None - for s, charset in self._chunks: + for string, charset in self._chunks: # We must preserve spaces between encoded and non-encoded word # boundaries, which means for us we need to add a space when we go # from a charset to None/us-ascii, or from None/us-ascii to a @@ -214,15 +217,16 @@ class Header: elif nextcs not in (None, 'us-ascii'): uchunks.append(SPACE) lastcs = nextcs - uchunks.append(s) + uchunks.append(string) return EMPTYSTRING.join(uchunks) # Rich comparison operators for equality only. BAW: does it make sense to # have or explicitly disable <, <=, >, >= operators? def __eq__(self, other): # other may be a Header or a string. Both are fine so coerce - # ourselves to a string, swap the args and do another comparison. - return other == self.encode() + # ourselves to a unicode (of the unencoded header value), swap the + # args and do another comparison. + return other == str(self) def __ne__(self, other): return not self == other @@ -267,7 +271,7 @@ class Header: output_string = input_bytes.decode(output_charset, errors) self._chunks.append((output_string, charset)) - def encode(self, splitchars=';, \t'): + def encode(self, splitchars=';, \t', maxlinelen=None): """Encode a message header into an RFC-compliant format. There are many issues involved in converting a given string for use in @@ -290,7 +294,14 @@ class Header: syntactic breaks'. This doesn't affect RFC 2047 encoded lines. """ self._normalize() - formatter = _ValueFormatter(self._headerlen, self._maxlinelen, + if maxlinelen is None: + maxlinelen = self._maxlinelen + # A maxlinelen of 0 means don't wrap. For all practical purposes, + # choosing a huge number here accomplishes that and makes the + # _ValueFormatter algorithm much simpler. + if maxlinelen == 0: + maxlinelen = 1000000 + formatter = _ValueFormatter(self._headerlen, maxlinelen, self._continuation_ws, splitchars) for string, charset in self._chunks: lines = string.splitlines() @@ -301,9 +312,8 @@ class Header: return str(formatter) def _normalize(self): - # Normalize the chunks so that all runs of identical charsets get - # collapsed into a single unicode string. You need a space between - # encoded words, or between encoded and unencoded words. + # Step 1: Normalize the chunks so that all runs of identical charsets + # get collapsed into a single unicode string. chunks = [] last_charset = None last_chunk = [] @@ -313,8 +323,6 @@ class Header: else: if last_charset is not None: chunks.append((SPACE.join(last_chunk), last_charset)) - if last_charset != USASCII or charset != USASCII: - chunks.append((' ', USASCII)) last_chunk = [string] last_charset = charset if last_chunk: @@ -333,6 +341,10 @@ class _ValueFormatter: self._current_line = _Accumulator(headerlen) def __str__(self): + # Remove the trailing TRANSITIONAL_SPACE + last_line = self._current_line.pop() + if last_line is not TRANSITIONAL_SPACE: + self._current_line.push(last_line) self.newline() return NL.join(self._lines) @@ -348,24 +360,66 @@ class _ValueFormatter: if len(encoded_string) + len(self._current_line) <= self._maxlen: self._current_line.push(encoded_string) return - # Attempt to split the line at the highest-level syntactic break - # possible. Note that we don't have a lot of smarts about field + # If the charset has no header encoding (i.e. it is an ASCII encoding) + # then we must split the header at the "highest level syntactic break" + # possible. Note that we don't have a lot of smarts about field # syntax; we just try to break on semi-colons, then commas, then - # whitespace. Eventually, we'll allow this to be pluggable. - for ch in self._splitchars: - if ch in string: - break - else: - # We can't split the string to fit on the current line, so just - # put it on a line by itself. - self._lines.append(str(self._current_line)) - self._current_line.reset(self._continuation_ws) - self._current_line.push(encoded_string) + # whitespace. Eventually, this should be pluggable. + if charset.header_encoding is None: + for ch in self._splitchars: + if ch in string: + break + else: + ch = None + # If there's no available split character then regardless of + # whether the string fits on the line, we have to put it on a line + # by itself. + if ch is None: + if not self._current_line.is_onlyws(): + self._lines.append(str(self._current_line)) + self._current_line.reset(self._continuation_ws) + self._current_line.push(encoded_string) + else: + self._ascii_split(string, ch) return - self._spliterate(string, ch, charset) - - def _spliterate(self, string, ch, charset): - holding = _Accumulator(transformfunc=charset.header_encode) + # Otherwise, we're doing either a Base64 or a quoted-printable + # encoding which means we don't need to split the line on syntactic + # breaks. We can basically just find enough characters to fit on the + # current line, minus the RFC 2047 chrome. What makes this trickier + # though is that we have to split at octet boundaries, not character + # boundaries but it's only safe to split at character boundaries so at + # best we can only get close. + encoded_lines = charset.header_encode_lines(string, self._maxlengths()) + # The first element extends the current line, but if it's None then + # nothing more fit on the current line so start a new line. + try: + first_line = encoded_lines.pop(0) + except IndexError: + # There are no encoded lines, so we're done. + return + if first_line is not None: + self._current_line.push(first_line) + self._lines.append(str(self._current_line)) + self._current_line.reset(self._continuation_ws) + try: + last_line = encoded_lines.pop() + except IndexError: + # There was only one line. + return + self._current_line.push(last_line) + self._current_line.push(TRANSITIONAL_SPACE) + # Everything else are full lines in themselves. + for line in encoded_lines: + self._lines.append(self._continuation_ws + line) + + def _maxlengths(self): + # The first line's length. + yield self._maxlen - len(self._current_line) + while True: + yield self._maxlen - self._continuation_ws_len + + def _ascii_split(self, string, ch): + holding = _Accumulator() # Split the line on the split character, preserving it. If the split # character is whitespace RFC 2822 $2.2.3 requires us to fold on the # whitespace, so that the line leads with the original whitespace we @@ -387,8 +441,7 @@ class _ValueFormatter: # line, watch out for the current line containing only # whitespace. holding.pop() - if len(self._current_line) == 0 and ( - len(holding) == 0 or str(holding).isspace()): + if self._current_line.is_onlyws() and holding.is_onlyws(): # Don't start a new line. holding.push(part) part = None @@ -492,12 +545,8 @@ def _spliterator(character, string): class _Accumulator: - def __init__(self, initial_size=0, transformfunc=None): + def __init__(self, initial_size=0): self._initial_size = initial_size - if transformfunc is None: - self._transformfunc = lambda string: string - else: - self._transformfunc = transformfunc self._current = [] def push(self, string): @@ -507,14 +556,21 @@ class _Accumulator: return self._current.pop() def __len__(self): - return len(str(self)) + self._initial_size + return sum((len(string) + for string in self._current + if string is not TRANSITIONAL_SPACE), + self._initial_size) def __str__(self): - return self._transformfunc(EMPTYSTRING.join(self._current)) + return EMPTYSTRING.join( + (' ' if string is TRANSITIONAL_SPACE else string) + for string in self._current) def reset(self, string=None): self._current = [] - self._current_len = 0 self._initial_size = 0 if string is not None: self.push(string) + + def is_onlyws(self): + return len(self) == 0 or str(self).isspace() diff --git a/Lib/email/message.py b/Lib/email/message.py index ad795f9..50d6604 100644 --- a/Lib/email/message.py +++ b/Lib/email/message.py @@ -13,9 +13,9 @@ import warnings from io import BytesIO, StringIO # Intrapackage imports -import email.charset from email import utils from email import errors +from email.charset import Charset SEMISPACE = '; ' @@ -201,7 +201,7 @@ class Message: # Incorrect padding pass elif cte in ('x-uuencode', 'uuencode', 'uue', 'x-uue'): - in_file = BytesIO((payload + '\n').encode('raw-unicode-escape')) + in_file = BytesIO(bytes(payload + '\n')) out_file = BytesIO() try: uu.decode(in_file, out_file, quiet=True) @@ -211,7 +211,7 @@ class Message: pass # Is there a better way to do this? We can't use the bytes # constructor. - return bytes(ord(c) for c in payload) + return bytes(payload, 'raw-unicode-escape') def set_payload(self, payload, charset=None): """Set the payload to the given value. @@ -236,18 +236,13 @@ class Message: and encoded properly, if needed, when generating the plain text representation of the message. MIME headers (MIME-Version, Content-Type, Content-Transfer-Encoding) will be added as needed. - """ if charset is None: self.del_param('charset') self._charset = None return - if isinstance(charset, basestring): - charset = email.charset.Charset(charset) - if not isinstance(charset, email.charset.Charset): - raise TypeError(charset) - # BAW: should we accept strings that can serve as arguments to the - # Charset constructor? + if not isinstance(charset, Charset): + charset = Charset(charset) self._charset = charset if 'MIME-Version' not in self: self.add_header('MIME-Version', '1.0') @@ -256,7 +251,7 @@ class Message: charset=charset.get_output_charset()) else: self.set_param('charset', charset.get_output_charset()) - if str(charset) != charset.get_output_charset(): + if charset != charset.get_output_charset(): self._payload = charset.body_encode(self._payload) if 'Content-Transfer-Encoding' not in self: cte = charset.get_body_encoding() @@ -757,8 +752,7 @@ class Message: # LookupError will be raised if the charset isn't known to # Python. UnicodeError will be raised if the encoded text # contains a character not in the charset. - as_bytes = charset[2].encode('raw-unicode-escape') - charset = str(as_bytes, pcharset) + charset = str(bytes(charset[2]), pcharset) except (LookupError, UnicodeError): charset = charset[2] # charset characters must be in us-ascii range diff --git a/Lib/email/quoprimime.py b/Lib/email/quoprimime.py index dfd3ccb..01de769 100644 --- a/Lib/email/quoprimime.py +++ b/Lib/email/quoprimime.py @@ -29,16 +29,14 @@ wrapping issues, use the email.Header module. __all__ = [ 'body_decode', 'body_encode', - 'body_quopri_check', - 'body_quopri_len', + 'body_length', 'decode', 'decodestring', 'encode', 'encodestring', 'header_decode', 'header_encode', - 'header_quopri_check', - 'header_quopri_len', + 'header_length', 'quote', 'unquote', ] @@ -46,54 +44,65 @@ __all__ = [ import re from string import ascii_letters, digits, hexdigits -from email.utils import fix_eols CRLF = '\r\n' NL = '\n' EMPTYSTRING = '' -# See also Charset.py -MISC_LEN = 7 +# Build a mapping of octets to the expansion of that octet. Since we're only +# going to have 256 of these things, this isn't terribly inefficient +# space-wise. Remember that headers and bodies have different sets of safe +# characters. Initialize both maps with the full expansion, and then override +# the safe bytes with the more compact form. +_QUOPRI_HEADER_MAP = dict((c, '=%02X' % c) for c in range(256)) +_QUOPRI_BODY_MAP = _QUOPRI_HEADER_MAP.copy() -HEADER_SAFE_BYTES = (b'-!*+/ ' + - ascii_letters.encode('raw-unicode-escape') + - digits.encode('raw-unicode-escape')) +# Safe header bytes which need no encoding. +for c in b'-!*+/' + bytes(ascii_letters) + bytes(digits): + _QUOPRI_HEADER_MAP[c] = chr(c) +# Headers have one other special encoding; spaces become underscores. +_QUOPRI_HEADER_MAP[ord(' ')] = '_' -BODY_SAFE_BYTES = (b' !"#$%&\'()*+,-./0123456789:;<>' - b'?@ABCDEFGHIJKLMNOPQRSTUVWXYZ[\\]^_`' - b'abcdefghijklmnopqrstuvwxyz{|}~\t') +# Safe body bytes which need no encoding. +for c in (b' !"#$%&\'()*+,-./0123456789:;<>' + b'?@ABCDEFGHIJKLMNOPQRSTUVWXYZ[\\]^_`' + b'abcdefghijklmnopqrstuvwxyz{|}~\t'): + _QUOPRI_BODY_MAP[c] = chr(c) # Helpers -def header_quopri_check(c): - """Return True if the character should be escaped with header quopri.""" - return c not in HEADER_SAFE_BYTES +def header_check(octet): + """Return True if the octet should be escaped with header quopri.""" + return chr(octet) != _QUOPRI_HEADER_MAP[octet] -def body_quopri_check(c): - """Return True if the character should be escaped with body quopri.""" - return c not in BODY_SAFE_BYTES +def body_check(octet): + """Return True if the octet should be escaped with body quopri.""" + return chr(octet) != _QUOPRI_BODY_MAP[octet] -def header_quopri_len(bytearray): - """Return the length of bytearray when it is encoded with header quopri. +def header_length(bytearray): + """Return a header quoted-printable encoding length. Note that this does not include any RFC 2047 chrome added by `header_encode()`. + + :param bytearray: An array of bytes (a.k.a. octets). + :return: The length in bytes of the byte array when it is encoded with + quoted-printable for headers. """ - count = 0 - for c in bytearray: - count += (3 if header_quopri_check(c) else 1) - return count + return sum(len(_QUOPRI_HEADER_MAP[octet]) for octet in bytearray) + +def body_length(bytearray): + """Return a body quoted-printable encoding length. -def body_quopri_len(bytearray): - """Return the length of bytearray when it is encoded with body quopri.""" - count = 0 - for c in bytearray: - count += (3 if body_quopri_check(c) else 1) - return count + :param bytearray: An array of bytes (a.k.a. octets). + :return: The length in bytes of the byte array when it is encoded with + quoted-printable for bodies. + """ + return sum(len(_QUOPRI_BODY_MAP[octet]) for octet in bytearray) def _max_append(L, s, maxlen, extra=''): @@ -133,29 +142,17 @@ def header_encode(header_bytes, charset='iso-8859-1'): return str(header_bytes) # Iterate over every byte, encoding if necessary. encoded = [] - for character in header_bytes: - # Space may be represented as _ instead of =20 for readability - if character == ord(' '): - encoded.append('_') - # These characters can be included verbatim. - elif not header_quopri_check(character): - encoded.append(chr(character)) - # Otherwise, replace with hex value like =E2 - else: - encoded.append('=%02X' % character) + for octet in header_bytes: + encoded.append(_QUOPRI_HEADER_MAP[octet]) # Now add the RFC chrome to each encoded chunk and glue the chunks # together. return '=?%s?q?%s?=' % (charset, EMPTYSTRING.join(encoded)) -def encode(body, binary=False, maxlinelen=76, eol=NL): +def body_encode(body, maxlinelen=76, eol=NL): """Encode with quoted-printable, wrapping at maxlinelen characters. - If binary is False (the default), end-of-line characters will be converted - to the canonical email end-of-line sequence \\r\\n. Otherwise they will - be left verbatim. - Each line of encoded text will end with eol, which defaults to "\\n". Set this to "\\r\\n" if you will be using the result of this function directly in an email. @@ -168,9 +165,6 @@ def encode(body, binary=False, maxlinelen=76, eol=NL): if not body: return body - if not binary: - body = fix_eols(body) - # BAW: We're accumulating the body text by string concatenation. That # can't be very efficient, but I don't have time now to rewrite it. It # just feels like this algorithm could be more efficient. @@ -195,7 +189,7 @@ def encode(body, binary=False, maxlinelen=76, eol=NL): for j in range(linelen): c = line[j] prev = c - if body_quopri_check(c): + if body_check(c): c = quote(c) elif j+1 == linelen: # Check for whitespace at end of line; special case @@ -231,11 +225,6 @@ def encode(body, binary=False, maxlinelen=76, eol=NL): return encoded_body -# For convenience and backwards compatibility w/ standard base64 module -body_encode = encode -encodestring = encode - - # BAW: I'm not sure if the intent was for the signature of this function to be # the same as base64MIME.decode() or not... diff --git a/Lib/email/test/test_email.py b/Lib/email/test/test_email.py index 981441c..78d702d 100644 --- a/Lib/email/test/test_email.py +++ b/Lib/email/test/test_email.py @@ -482,7 +482,7 @@ class TestMessageAPI(TestEmailBase): msg['content-transfer-encoding'] = 'base64' msg.set_payload(x) self.assertEqual(msg.get_payload(decode=True), - bytes(ord(c) for c in x)) + bytes(x, 'raw-unicode-escape')) @@ -580,31 +580,31 @@ bug demonstration g = Generator(sfp) g.flatten(msg) eq(sfp.getvalue(), """\ -Subject: =?iso-8859-1?q?Die_Mieter_treten_hier_ein_werden_mit_einem_Foerd?= - =?iso-8859-1?q?erband_komfortabel_den_Korridor_entlang=2C_an_s=FCdl=FCndi?= - =?iso-8859-1?q?schen_Wandgem=E4lden_vorbei=2C_gegen_die_rotierenden_Kling?= - =?iso-8859-1?q?en_bef=F6rdert=2E_?= =?iso-8859-2?q?Finan=E8ni_met?= - =?iso-8859-2?q?ropole_se_hroutily_pod_tlakem_jejich_d=F9vtipu=2E=2E_?= - =?utf-8?b?5q2j56K644Gr6KiA44GG44Go57+76Kiz44Gv44GV44KM44Gm44GE?= - =?utf-8?b?44G+44Gb44KT44CC5LiA6YOo44Gv44OJ44Kk44OE6Kqe44Gn44GZ44GM44CB?= - =?utf-8?b?44GC44Go44Gv44Gn44Gf44KJ44KB44Gn44GZ44CC5a6f6Zqb44Gr44Gv44CM?= - =?utf-8?q?Wenn_ist_das_Nunstuck_git_und_Slotermeyer=3F_Ja!_Beiherhund_das?= - =?utf-8?b?IE9kZXIgZGllIEZsaXBwZXJ3YWxkdCBnZXJzcHV0LuOAjeOBqOiogOOBow==?= - =?utf-8?b?44Gm44GE44G+44GZ44CC?= +Subject: =?iso-8859-1?q?Die_Mieter_treten_hier_ein_werden_mit_einem_Foerderb?= + =?iso-8859-1?q?and_komfortabel_den_Korridor_entlang=2C_an_s=FCdl=FCndischen?= + =?iso-8859-1?q?_Wandgem=E4lden_vorbei=2C_gegen_die_rotierenden_Klingen_bef?= + =?iso-8859-1?q?=F6rdert=2E_?= =?iso-8859-2?q?Finan=E8ni_metropole_se_hrouti?= + =?iso-8859-2?q?ly_pod_tlakem_jejich_d=F9vtipu=2E=2E_?= =?utf-8?b?5q2j56K6?= + =?utf-8?b?44Gr6KiA44GG44Go57+76Kiz44Gv44GV44KM44Gm44GE44G+44Gb44KT44CC5LiA?= + =?utf-8?b?6YOo44Gv44OJ44Kk44OE6Kqe44Gn44GZ44GM44CB44GC44Go44Gv44Gn44Gf44KJ?= + =?utf-8?b?44KB44Gn44GZ44CC5a6f6Zqb44Gr44Gv44CMV2VubiBpc3QgZGFzIE51bnN0dWNr?= + =?utf-8?b?IGdpdCB1bmQgU2xvdGVybWV5ZXI/IEphISBCZWloZXJodW5kIGRhcyBPZGVyIGRp?= + =?utf-8?b?ZSBGbGlwcGVyd2FsZHQgZ2Vyc3B1dC7jgI3jgajoqIDjgaPjgabjgYTjgb7jgZk=?= + =?utf-8?b?44CC?= """) - eq(h.encode(), """\ -=?iso-8859-1?q?Die_Mieter_treten_hier_ein_werden_mit_einem_Foerd?= - =?iso-8859-1?q?erband_komfortabel_den_Korridor_entlang=2C_an_s=FCdl=FCndi?= - =?iso-8859-1?q?schen_Wandgem=E4lden_vorbei=2C_gegen_die_rotierenden_Kling?= - =?iso-8859-1?q?en_bef=F6rdert=2E_?= =?iso-8859-2?q?Finan=E8ni_met?= - =?iso-8859-2?q?ropole_se_hroutily_pod_tlakem_jejich_d=F9vtipu=2E=2E_?= - =?utf-8?b?5q2j56K644Gr6KiA44GG44Go57+76Kiz44Gv44GV44KM44Gm44GE?= - =?utf-8?b?44G+44Gb44KT44CC5LiA6YOo44Gv44OJ44Kk44OE6Kqe44Gn44GZ44GM44CB?= - =?utf-8?b?44GC44Go44Gv44Gn44Gf44KJ44KB44Gn44GZ44CC5a6f6Zqb44Gr44Gv44CM?= - =?utf-8?q?Wenn_ist_das_Nunstuck_git_und_Slotermeyer=3F_Ja!_Beiherhund_das?= - =?utf-8?b?IE9kZXIgZGllIEZsaXBwZXJ3YWxkdCBnZXJzcHV0LuOAjeOBqOiogOOBow==?= - =?utf-8?b?44Gm44GE44G+44GZ44CC?=""") + eq(h.encode(maxlinelen=76), """\ +=?iso-8859-1?q?Die_Mieter_treten_hier_ein_werden_mit_einem_Foerde?= + =?iso-8859-1?q?rband_komfortabel_den_Korridor_entlang=2C_an_s=FCdl=FCndis?= + =?iso-8859-1?q?chen_Wandgem=E4lden_vorbei=2C_gegen_die_rotierenden_Klinge?= + =?iso-8859-1?q?n_bef=F6rdert=2E_?= =?iso-8859-2?q?Finan=E8ni_metropole_se?= + =?iso-8859-2?q?_hroutily_pod_tlakem_jejich_d=F9vtipu=2E=2E_?= + =?utf-8?b?5q2j56K644Gr6KiA44GG44Go57+76Kiz44Gv44GV44KM44Gm44GE44G+44Gb?= + =?utf-8?b?44KT44CC5LiA6YOo44Gv44OJ44Kk44OE6Kqe44Gn44GZ44GM44CB44GC44Go?= + =?utf-8?b?44Gv44Gn44Gf44KJ44KB44Gn44GZ44CC5a6f6Zqb44Gr44Gv44CMV2VubiBp?= + =?utf-8?b?c3QgZGFzIE51bnN0dWNrIGdpdCB1bmQgU2xvdGVybWV5ZXI/IEphISBCZWlo?= + =?utf-8?b?ZXJodW5kIGRhcyBPZGVyIGRpZSBGbGlwcGVyd2FsZHQgZ2Vyc3B1dC7jgI0=?= + =?utf-8?b?44Go6KiA44Gj44Gm44GE44G+44GZ44CC?=""") def test_long_header_encode(self): eq = self.ndiffAssertEqual @@ -674,9 +674,14 @@ Test""") def test_no_split_long_header(self): eq = self.ndiffAssertEqual hstr = 'References: ' + 'x' * 80 - h = Header(hstr, continuation_ws='\t') + h = Header(hstr) + # These come on two lines because Headers are really field value + # classes and don't really know about their field names. eq(h.encode(), """\ -References: xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx""") +References: + xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx""") + h = Header('x' * 80) + eq(h.encode(), 'xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx') def test_splitting_multiple_long_lines(self): eq = self.ndiffAssertEqual @@ -722,10 +727,17 @@ from modemcable093.139-201-24.que.mc.videotron.ca ([24.201.139.93] h = Header('Britische Regierung gibt', 'iso-8859-1', header_name='Subject') h.append('gr\xfcnes Licht f\xfcr Offshore-Windkraftprojekte') + eq(h.encode(maxlinelen=76), """\ +=?iso-8859-1?q?Britische_Regierung_gibt_gr=FCnes_Licht_f=FCr_Offs?= + =?iso-8859-1?q?hore-Windkraftprojekte?=""") msg['Subject'] = h - eq(msg.as_string(), """\ -Subject: =?iso-8859-1?q?Britische_Regierung_gibt_gr=FCnes_Licht_f=FCr?= - =?iso-8859-1?q?Offshore-Windkraftprojekte?= + eq(msg.as_string(maxheaderlen=76), """\ +Subject: =?iso-8859-1?q?Britische_Regierung_gibt_gr=FCnes_Licht_f=FCr_Offs?= + =?iso-8859-1?q?hore-Windkraftprojekte?= + +""") + eq(msg.as_string(maxheaderlen=0), """\ +Subject: =?iso-8859-1?q?Britische_Regierung_gibt_gr=FCnes_Licht_f=FCr_Offshore-Windkraftprojekte?= """) @@ -748,10 +760,10 @@ Reply-To: Britische Regierung gibt gr\xfcnes Licht f\xfcr Offshore-Windkraftproj msg = Message() msg['To'] = to eq(msg.as_string(maxheaderlen=78), '''\ -To: "Someone Test #A" , , +To: "Someone Test #A" ,, \t"Someone Test #B" , -\t"Someone Test #C" , -\t"Someone Test #D" + "Someone Test #C" , + "Someone Test #D" ''') @@ -760,7 +772,7 @@ To: "Someone Test #A" , , s = 'This is an example of string which has almost the limit of header length.' h = Header(s) h.append('Add another line.') - eq(h.encode(), """\ + eq(h.encode(maxlinelen=76), """\ This is an example of string which has almost the limit of header length. Add another line.""") @@ -775,14 +787,17 @@ This is an example of string which has almost the limit of header length. def test_long_field_name(self): eq = self.ndiffAssertEqual fn = 'X-Very-Very-Very-Long-Header-Name' - gs = "Die Mieter treten hier ein werden mit einem Foerderband komfortabel den Korridor entlang, an s\xfcdl\xfcndischen Wandgem\xe4lden vorbei, gegen die rotierenden Klingen bef\xf6rdert. " + gs = ('Die Mieter treten hier ein werden mit einem Foerderband ' + 'komfortabel den Korridor entlang, an s\xfcdl\xfcndischen ' + 'Wandgem\xe4lden vorbei, gegen die rotierenden Klingen ' + 'bef\xf6rdert. ') h = Header(gs, 'iso-8859-1', header_name=fn) # BAW: this seems broken because the first line is too long - eq(h.encode(), """\ -=?iso-8859-1?q?Die_Mieter_treten_hier_?= - =?iso-8859-1?q?ein_werden_mit_einem_Foerderband_komfortabel_den_Korridor_?= - =?iso-8859-1?q?entlang=2C_an_s=FCdl=FCndischen_Wandgem=E4lden_vorbei=2C_g?= - =?iso-8859-1?q?egen_die_rotierenden_Klingen_bef=F6rdert=2E_?=""") + eq(h.encode(maxlinelen=76), """\ +=?iso-8859-1?q?Die_Mieter_treten_hier_e?= + =?iso-8859-1?q?in_werden_mit_einem_Foerderband_komfortabel_den_Korridor_e?= + =?iso-8859-1?q?ntlang=2C_an_s=FCdl=FCndischen_Wandgem=E4lden_vorbei=2C_ge?= + =?iso-8859-1?q?gen_die_rotierenden_Klingen_bef=F6rdert=2E_?=""") def test_long_received_header(self): h = ('from FOO.TLD (vizworld.acl.foo.tld [123.452.678.9]) ' @@ -811,9 +826,9 @@ Received-2: from FOO.TLD (vizworld.acl.foo.tld [123.452.678.9]) by msg['Received-2'] = h self.ndiffAssertEqual(msg.as_string(maxheaderlen=78), """\ Received-1: <15975.17901.207240.414604@sgigritzmann1.mathematik.tu-muenchen.de> -\t(David Bremner's message of "Thu, 6 Mar 2003 13:58:21 +0100") + (David Bremner's message of \"Thu, 6 Mar 2003 13:58:21 +0100\") Received-2: <15975.17901.207240.414604@sgigritzmann1.mathematik.tu-muenchen.de> -\t(David Bremner's message of "Thu, 6 Mar 2003 13:58:21 +0100") + (David Bremner's message of \"Thu, 6 Mar 2003 13:58:21 +0100\") """) @@ -837,12 +852,12 @@ Face-2: iVBORw0KGgoAAAANSUhEUgAAADAAAAAwBAMAAAClLOS0AAAAGFBMVEUAAAAkHiJeRUIcGBi9 eq = self.ndiffAssertEqual m = ('Received: from siimage.com ' '([172.25.1.3]) by zima.siliconimage.com with ' - 'Microsoft SMTPSVC(5.0.2195.4905);' - '\tWed, 16 Oct 2002 07:41:11 -0700') + 'Microsoft SMTPSVC(5.0.2195.4905); ' + 'Wed, 16 Oct 2002 07:41:11 -0700') msg = email.message_from_string(m) eq(msg.as_string(maxheaderlen=78), '''\ Received: from siimage.com ([172.25.1.3]) by zima.siliconimage.com with -\tMicrosoft SMTPSVC(5.0.2195.4905); Wed, 16 Oct 2002 07:41:11 -0700 + Microsoft SMTPSVC(5.0.2195.4905); Wed, 16 Oct 2002 07:41:11 -0700 ''') @@ -1519,7 +1534,7 @@ counter to RFC 2822, there's no separating newline here # Test RFC 2047 header encoding and decoding -class TestRFC2047(unittest.TestCase): +class TestRFC2047(TestEmailBase): def test_rfc2047_multiline(self): eq = self.assertEqual s = """Re: =?mac-iceland?q?r=8Aksm=9Arg=8Cs?= baz @@ -1533,9 +1548,9 @@ class TestRFC2047(unittest.TestCase): header = make_header(dh) eq(str(header), 'Re: r\xe4ksm\xf6rg\xe5s baz foo bar r\xe4ksm\xf6rg\xe5s') - eq(header.encode(), - """Re: =?mac-iceland?q?r=8Aksm=9Arg=8Cs?= baz foo bar - =?mac-iceland?q?r=8Aksm=9Arg=8Cs?=""") + self.ndiffAssertEqual(header.encode(), """\ +Re: =?mac-iceland?q?r=8Aksm=9Arg=8Cs?= baz foo bar =?mac-iceland?q?r=8Aksm?= + =?mac-iceland?q?=9Arg=8Cs?=""") def test_whitespace_eater_unicode(self): eq = self.assertEqual @@ -2185,14 +2200,6 @@ Foo utils.formataddr(('A Silly; Person', 'person@dom.ain')), r'"A Silly; Person" ') - def test_fix_eols(self): - eq = self.assertEqual - eq(utils.fix_eols('hello'), 'hello') - eq(utils.fix_eols('hello\n'), 'hello\r\n') - eq(utils.fix_eols('hello\r'), 'hello\r\n') - eq(utils.fix_eols('hello\r\n'), 'hello\r\n') - eq(utils.fix_eols('hello\n\r'), 'hello\r\n\r\n') - def test_charset_richcomparisons(self): eq = self.assertEqual ne = self.failIfEqual @@ -2518,8 +2525,8 @@ Here's the message body class TestBase64(unittest.TestCase): def test_len(self): eq = self.assertEqual - eq(base64mime.base64_len('hello'), - len(base64mime.encode('hello', eol=''))) + eq(base64mime.header_length('hello'), + len(base64mime.body_encode('hello', eol=''))) for size in range(15): if size == 0 : bsize = 0 elif size <= 3 : bsize = 4 @@ -2527,22 +2534,24 @@ class TestBase64(unittest.TestCase): elif size <= 9 : bsize = 12 elif size <= 12: bsize = 16 else : bsize = 20 - eq(base64mime.base64_len('x'*size), bsize) + eq(base64mime.header_length('x' * size), bsize) def test_decode(self): eq = self.assertEqual - eq(base64mime.decode(''), b'') + eq(base64mime.decode(''), '') eq(base64mime.decode('aGVsbG8='), b'hello') + eq(base64mime.decode('aGVsbG8=', 'X'), b'hello') + eq(base64mime.decode('aGVsbG8NCndvcmxk\n', 'X'), b'helloXworld') def test_encode(self): eq = self.assertEqual - eq(base64mime.encode(''), '') - eq(base64mime.encode('hello'), 'aGVsbG8=\n') + eq(base64mime.body_encode(''), '') + eq(base64mime.body_encode('hello'), 'aGVsbG8=\n') # Test the binary flag - eq(base64mime.encode('hello\n'), 'aGVsbG8K\n') - eq(base64mime.encode('hello\n', 0), 'aGVsbG8NCg==\n') + eq(base64mime.body_encode('hello\n'), 'aGVsbG8K\n') + eq(base64mime.body_encode('hello\n', 0), 'aGVsbG8NCg==\n') # Test the maxlinelen arg - eq(base64mime.encode('xxxx ' * 20, maxlinelen=40), """\ + eq(base64mime.body_encode('xxxx ' * 20, maxlinelen=40), """\ eHh4eCB4eHh4IHh4eHggeHh4eCB4eHh4IHh4eHgg eHh4eCB4eHh4IHh4eHggeHh4eCB4eHh4IHh4eHgg eHh4eCB4eHh4IHh4eHggeHh4eCB4eHh4IHh4eHgg @@ -2560,26 +2569,11 @@ eHh4eCB4eHh4IA==\r eq = self.assertEqual he = base64mime.header_encode eq(he('hello'), '=?iso-8859-1?b?aGVsbG8=?=') - eq(he('hello\nworld'), '=?iso-8859-1?b?aGVsbG8NCndvcmxk?=') + eq(he('hello\r\nworld'), '=?iso-8859-1?b?aGVsbG8NCndvcmxk?=') + eq(he('hello\nworld'), '=?iso-8859-1?b?aGVsbG8Kd29ybGQ=?=') # Test the charset option eq(he('hello', charset='iso-8859-2'), '=?iso-8859-2?b?aGVsbG8=?=') eq(he('hello\nworld'), '=?iso-8859-1?b?aGVsbG8Kd29ybGQ=?=') - # Test the maxlinelen argument - eq(he('xxxx ' * 20, maxlinelen=40), """\ -=?iso-8859-1?b?eHh4eCB4eHh4IHh4eHggeHg=?= - =?iso-8859-1?b?eHggeHh4eCB4eHh4IHh4eHg=?= - =?iso-8859-1?b?IHh4eHggeHh4eCB4eHh4IHg=?= - =?iso-8859-1?b?eHh4IHh4eHggeHh4eCB4eHg=?= - =?iso-8859-1?b?eCB4eHh4IHh4eHggeHh4eCA=?= - =?iso-8859-1?b?eHh4eCB4eHh4IHh4eHgg?=""") - # Test the eol argument - eq(he('xxxx ' * 20, maxlinelen=40, eol='\r\n'), """\ -=?iso-8859-1?b?eHh4eCB4eHh4IHh4eHggeHg=?=\r - =?iso-8859-1?b?eHggeHh4eCB4eHh4IHh4eHg=?=\r - =?iso-8859-1?b?IHh4eHggeHh4eCB4eHh4IHg=?=\r - =?iso-8859-1?b?eHh4IHh4eHggeHh4eCB4eHg=?=\r - =?iso-8859-1?b?eCB4eHh4IHh4eHggeHh4eCA=?=\r - =?iso-8859-1?b?eHh4eCB4eHh4IHh4eHgg?=""") @@ -2591,7 +2585,7 @@ class TestQuopri(unittest.TestCase): range(ord('a'), ord('z') + 1), range(ord('A'), ord('Z') + 1), range(ord('0'), ord('9') + 1), - (c for c in b'!*+-/ '))) + (c for c in b'!*+-/'))) # Set of characters (as byte integers) that do need to be encoded in # headers. self.hnon = [c for c in range(256) if c not in self.hlit] @@ -2606,46 +2600,53 @@ class TestQuopri(unittest.TestCase): self.bnon = [c for c in range(256) if c not in self.blit] assert len(self.blit) + len(self.bnon) == 256 - def test_header_quopri_check(self): + def test_quopri_header_check(self): for c in self.hlit: - self.failIf(quoprimime.header_quopri_check(c)) + self.failIf(quoprimime.header_check(c), + 'Should not be header quopri encoded: %s' % chr(c)) for c in self.hnon: - self.failUnless(quoprimime.header_quopri_check(c)) + self.failUnless(quoprimime.header_check(c), + 'Should be header quopri encoded: %s' % chr(c)) - def test_body_quopri_check(self): + def test_quopri_body_check(self): for c in self.blit: - self.failIf(quoprimime.body_quopri_check(c)) + self.failIf(quoprimime.body_check(c), + 'Should not be body quopri encoded: %s' % chr(c)) for c in self.bnon: - self.failUnless(quoprimime.body_quopri_check(c)) + self.failUnless(quoprimime.body_check(c), + 'Should be body quopri encoded: %s' % chr(c)) def test_header_quopri_len(self): eq = self.assertEqual - eq(quoprimime.header_quopri_len(b'hello'), 5) - # RFC 2047 chrome is not included in header_quopri_len(). + eq(quoprimime.header_length(b'hello'), 5) + # RFC 2047 chrome is not included in header_length(). eq(len(quoprimime.header_encode(b'hello', charset='xxx')), - quoprimime.header_quopri_len(b'hello') + + quoprimime.header_length(b'hello') + # =?xxx?q?...?= means 10 extra characters 10) - eq(quoprimime.header_quopri_len(b'h@e@l@l@o@'), 20) - # RFC 2047 chrome is not included in header_quopri_len(). + eq(quoprimime.header_length(b'h@e@l@l@o@'), 20) + # RFC 2047 chrome is not included in header_length(). eq(len(quoprimime.header_encode(b'h@e@l@l@o@', charset='xxx')), - quoprimime.header_quopri_len(b'h@e@l@l@o@') + + quoprimime.header_length(b'h@e@l@l@o@') + # =?xxx?q?...?= means 10 extra characters 10) for c in self.hlit: - eq(quoprimime.header_quopri_len(bytes([c])), 1, + eq(quoprimime.header_length(bytes([c])), 1, 'expected length 1 for %r' % chr(c)) for c in self.hnon: - eq(quoprimime.header_quopri_len(bytes([c])), 3, + # Space is special; it's encoded to _ + if c == ord(' '): + continue + eq(quoprimime.header_length(bytes([c])), 3, 'expected length 3 for %r' % chr(c)) + eq(quoprimime.header_length(b' '), 1) def test_body_quopri_len(self): eq = self.assertEqual - bql = quoprimime.body_quopri_len for c in self.blit: - eq(bql(c), 1) + eq(quoprimime.body_length(bytes([c])), 1) for c in self.bnon: - eq(bql(c), 3) + eq(quoprimime.body_length(bytes([c])), 3) def test_quote_unquote_idempotent(self): for x in range(256): @@ -2670,22 +2671,23 @@ class TestQuopri(unittest.TestCase): def test_encode(self): eq = self.assertEqual - eq(quoprimime.encode(''), '') - eq(quoprimime.encode('hello'), 'hello') + eq(quoprimime.body_encode(''), '') + eq(quoprimime.body_encode('hello'), 'hello') # Test the binary flag - eq(quoprimime.encode('hello\r\nworld'), 'hello\nworld') - eq(quoprimime.encode('hello\r\nworld', 0), 'hello\nworld') + eq(quoprimime.body_encode('hello\r\nworld'), 'hello\nworld') + eq(quoprimime.body_encode('hello\r\nworld', 0), 'hello\nworld') # Test the maxlinelen arg - eq(quoprimime.encode('xxxx ' * 20, maxlinelen=40), """\ + eq(quoprimime.body_encode('xxxx ' * 20, maxlinelen=40), """\ xxxx xxxx xxxx xxxx xxxx xxxx xxxx xxxx= xxxx xxxx xxxx xxxx xxxx xxxx xxxx xxx= x xxxx xxxx xxxx xxxx=20""") # Test the eol argument - eq(quoprimime.encode('xxxx ' * 20, maxlinelen=40, eol='\r\n'), """\ + eq(quoprimime.body_encode('xxxx ' * 20, maxlinelen=40, eol='\r\n'), + """\ xxxx xxxx xxxx xxxx xxxx xxxx xxxx xxxx=\r xxxx xxxx xxxx xxxx xxxx xxxx xxxx xxx=\r x xxxx xxxx xxxx xxxx=20""") - eq(quoprimime.encode("""\ + eq(quoprimime.body_encode("""\ one line two line"""), """\ @@ -2704,17 +2706,16 @@ class TestCharset(unittest.TestCase): except KeyError: pass - def test_idempotent(self): + def test_codec_encodeable(self): eq = self.assertEqual # Make sure us-ascii = no Unicode conversion c = Charset('us-ascii') - s = 'Hello World!' - sp = c.to_splittable(s) - eq(s, c.from_splittable(sp)) - # test 8-bit idempotency with us-ascii + eq(c.header_encode('Hello World!'), 'Hello World!') + # Test 8-bit idempotency with us-ascii s = '\xa4\xa2\xa4\xa4\xa4\xa6\xa4\xa8\xa4\xaa' - sp = c.to_splittable(s) - eq(s, c.from_splittable(sp)) + self.assertRaises(UnicodeError, c.header_encode, s) + c = Charset('utf-8') + eq(c.header_encode(s), '=?utf-8?b?wqTCosKkwqTCpMKmwqTCqMKkwqo=?=') def test_body_encode(self): eq = self.assertEqual @@ -2801,43 +2802,46 @@ class TestHeader(TestEmailBase): h = Header(g_head, g) h.append(cz_head, cz) h.append(utf8_head, utf8) - enc = h.encode() + enc = h.encode(maxlinelen=76) eq(enc, """\ -=?iso-8859-1?q?Die_Mieter_treten_hier_ein_werden_mit_einem_Foerderband_ko?= - =?iso-8859-1?q?mfortabel_den_Korridor_entlang=2C_an_s=FCdl=FCndischen_Wan?= - =?iso-8859-1?q?dgem=E4lden_vorbei=2C_gegen_die_rotierenden_Klingen_bef=F6?= - =?iso-8859-1?q?rdert=2E_?= =?iso-8859-2?q?Finan=E8ni_metropole_se_hroutily?= +=?iso-8859-1?q?Die_Mieter_treten_hier_ein_werden_mit_einem_Foerderband_kom?= + =?iso-8859-1?q?fortabel_den_Korridor_entlang=2C_an_s=FCdl=FCndischen_Wand?= + =?iso-8859-1?q?gem=E4lden_vorbei=2C_gegen_die_rotierenden_Klingen_bef=F6r?= + =?iso-8859-1?q?dert=2E_?= =?iso-8859-2?q?Finan=E8ni_metropole_se_hroutily?= =?iso-8859-2?q?_pod_tlakem_jejich_d=F9vtipu=2E=2E_?= =?utf-8?b?5q2j56K6?= =?utf-8?b?44Gr6KiA44GG44Go57+76Kiz44Gv44GV44KM44Gm44GE44G+44Gb44KT44CC?= =?utf-8?b?5LiA6YOo44Gv44OJ44Kk44OE6Kqe44Gn44GZ44GM44CB44GC44Go44Gv44Gn?= =?utf-8?b?44Gf44KJ44KB44Gn44GZ44CC5a6f6Zqb44Gr44Gv44CMV2VubiBpc3QgZGFz?= - =?utf-8?q?_Nunstuck_git_und_Slotermeyer=3F_Ja!_Beiherhund_das_Oder_die_Fl?= - =?utf-8?b?aXBwZXJ3YWxkdCBnZXJzcHV0LuOAjeOBqOiogOOBo+OBpuOBhOOBvuOBmQ==?= - =?utf-8?b?44CC?=""") - eq(decode_header(enc), - [(g_head, "iso-8859-1"), (cz_head, "iso-8859-2"), - (utf8_head, "utf-8")]) + =?utf-8?b?IE51bnN0dWNrIGdpdCB1bmQgU2xvdGVybWV5ZXI/IEphISBCZWloZXJodW5k?= + =?utf-8?b?IGRhcyBPZGVyIGRpZSBGbGlwcGVyd2FsZHQgZ2Vyc3B1dC7jgI3jgajoqIA=?= + =?utf-8?b?44Gj44Gm44GE44G+44GZ44CC?=""") + decoded = decode_header(enc) + eq(len(decoded), 3) + eq(decoded[0], (g_head, 'iso-8859-1')) + eq(decoded[1], (cz_head, 'iso-8859-2')) + eq(decoded[2], (utf8_head.encode('utf-8'), 'utf-8')) ustr = str(h) - eq(ustr.encode('utf-8'), - 'Die Mieter treten hier ein werden mit einem Foerderband ' - 'komfortabel den Korridor entlang, an s\xc3\xbcdl\xc3\xbcndischen ' - 'Wandgem\xc3\xa4lden vorbei, gegen die rotierenden Klingen ' - 'bef\xc3\xb6rdert. Finan\xc4\x8dni metropole se hroutily pod ' - 'tlakem jejich d\xc5\xafvtipu.. \xe6\xad\xa3\xe7\xa2\xba\xe3\x81' - '\xab\xe8\xa8\x80\xe3\x81\x86\xe3\x81\xa8\xe7\xbf\xbb\xe8\xa8\xb3' - '\xe3\x81\xaf\xe3\x81\x95\xe3\x82\x8c\xe3\x81\xa6\xe3\x81\x84\xe3' - '\x81\xbe\xe3\x81\x9b\xe3\x82\x93\xe3\x80\x82\xe4\xb8\x80\xe9\x83' - '\xa8\xe3\x81\xaf\xe3\x83\x89\xe3\x82\xa4\xe3\x83\x84\xe8\xaa\x9e' - '\xe3\x81\xa7\xe3\x81\x99\xe3\x81\x8c\xe3\x80\x81\xe3\x81\x82\xe3' - '\x81\xa8\xe3\x81\xaf\xe3\x81\xa7\xe3\x81\x9f\xe3\x82\x89\xe3\x82' - '\x81\xe3\x81\xa7\xe3\x81\x99\xe3\x80\x82\xe5\xae\x9f\xe9\x9a\x9b' - '\xe3\x81\xab\xe3\x81\xaf\xe3\x80\x8cWenn ist das Nunstuck git ' - 'und Slotermeyer? Ja! Beiherhund das Oder die Flipperwaldt ' - 'gersput.\xe3\x80\x8d\xe3\x81\xa8\xe8\xa8\x80\xe3\x81\xa3\xe3\x81' - '\xa6\xe3\x81\x84\xe3\x81\xbe\xe3\x81\x99\xe3\x80\x82') + eq(ustr, + (b'Die Mieter treten hier ein werden mit einem Foerderband ' + b'komfortabel den Korridor entlang, an s\xc3\xbcdl\xc3\xbcndischen ' + b'Wandgem\xc3\xa4lden vorbei, gegen die rotierenden Klingen ' + b'bef\xc3\xb6rdert. Finan\xc4\x8dni metropole se hroutily pod ' + b'tlakem jejich d\xc5\xafvtipu.. \xe6\xad\xa3\xe7\xa2\xba\xe3\x81' + b'\xab\xe8\xa8\x80\xe3\x81\x86\xe3\x81\xa8\xe7\xbf\xbb\xe8\xa8\xb3' + b'\xe3\x81\xaf\xe3\x81\x95\xe3\x82\x8c\xe3\x81\xa6\xe3\x81\x84\xe3' + b'\x81\xbe\xe3\x81\x9b\xe3\x82\x93\xe3\x80\x82\xe4\xb8\x80\xe9\x83' + b'\xa8\xe3\x81\xaf\xe3\x83\x89\xe3\x82\xa4\xe3\x83\x84\xe8\xaa\x9e' + b'\xe3\x81\xa7\xe3\x81\x99\xe3\x81\x8c\xe3\x80\x81\xe3\x81\x82\xe3' + b'\x81\xa8\xe3\x81\xaf\xe3\x81\xa7\xe3\x81\x9f\xe3\x82\x89\xe3\x82' + b'\x81\xe3\x81\xa7\xe3\x81\x99\xe3\x80\x82\xe5\xae\x9f\xe9\x9a\x9b' + b'\xe3\x81\xab\xe3\x81\xaf\xe3\x80\x8cWenn ist das Nunstuck git ' + b'und Slotermeyer? Ja! Beiherhund das Oder die Flipperwaldt ' + b'gersput.\xe3\x80\x8d\xe3\x81\xa8\xe8\xa8\x80\xe3\x81\xa3\xe3\x81' + b'\xa6\xe3\x81\x84\xe3\x81\xbe\xe3\x81\x99\xe3\x80\x82' + ).decode('utf-8')) # Test make_header() newh = make_header(decode_header(enc)) - eq(newh, enc) + eq(newh, h) def test_empty_header_encode(self): h = Header() @@ -2848,7 +2852,7 @@ class TestHeader(TestEmailBase): h = Header() eq(h, '') h.append('foo', Charset('iso-8859-1')) - eq(h, '=?iso-8859-1?q?foo?=') + eq(h, 'foo') def test_explicit_maxlinelen(self): eq = self.ndiffAssertEqual @@ -2869,39 +2873,128 @@ A very long line that must get split to something other than at the eq(h.encode(), hstr) eq(str(h), hstr) - def test_long_splittables_with_trailing_spaces(self): + def test_quopri_splittable(self): eq = self.ndiffAssertEqual h = Header(charset='iso-8859-1', maxlinelen=20) - h.append('xxxx ' * 20) - eq(h.encode(), """\ -=?iso-8859-1?q?xxxx?= - =?iso-8859-1?q?xxxx?= - =?iso-8859-1?q?xxxx?= - =?iso-8859-1?q?xxxx?= - =?iso-8859-1?q?xxxx?= - =?iso-8859-1?q?xxxx?= - =?iso-8859-1?q?xxxx?= - =?iso-8859-1?q?xxxx?= - =?iso-8859-1?q?xxxx?= - =?iso-8859-1?q?xxxx?= - =?iso-8859-1?q?xxxx?= - =?iso-8859-1?q?xxxx?= - =?iso-8859-1?q?xxxx?= - =?iso-8859-1?q?xxxx?= - =?iso-8859-1?q?xxxx?= - =?iso-8859-1?q?xxxx?= - =?iso-8859-1?q?xxxx?= - =?iso-8859-1?q?xxxx?= - =?iso-8859-1?q?xxxx?= - =?iso-8859-1?q?xxxx_?=""") + x = 'xxxx ' * 20 + h.append(x) + s = h.encode() + eq(s, """\ +=?iso-8859-1?q?xxx?= + =?iso-8859-1?q?x_?= + =?iso-8859-1?q?xx?= + =?iso-8859-1?q?xx?= + =?iso-8859-1?q?_x?= + =?iso-8859-1?q?xx?= + =?iso-8859-1?q?x_?= + =?iso-8859-1?q?xx?= + =?iso-8859-1?q?xx?= + =?iso-8859-1?q?_x?= + =?iso-8859-1?q?xx?= + =?iso-8859-1?q?x_?= + =?iso-8859-1?q?xx?= + =?iso-8859-1?q?xx?= + =?iso-8859-1?q?_x?= + =?iso-8859-1?q?xx?= + =?iso-8859-1?q?x_?= + =?iso-8859-1?q?xx?= + =?iso-8859-1?q?xx?= + =?iso-8859-1?q?_x?= + =?iso-8859-1?q?xx?= + =?iso-8859-1?q?x_?= + =?iso-8859-1?q?xx?= + =?iso-8859-1?q?xx?= + =?iso-8859-1?q?_x?= + =?iso-8859-1?q?xx?= + =?iso-8859-1?q?x_?= + =?iso-8859-1?q?xx?= + =?iso-8859-1?q?xx?= + =?iso-8859-1?q?_x?= + =?iso-8859-1?q?xx?= + =?iso-8859-1?q?x_?= + =?iso-8859-1?q?xx?= + =?iso-8859-1?q?xx?= + =?iso-8859-1?q?_x?= + =?iso-8859-1?q?xx?= + =?iso-8859-1?q?x_?= + =?iso-8859-1?q?xx?= + =?iso-8859-1?q?xx?= + =?iso-8859-1?q?_x?= + =?iso-8859-1?q?xx?= + =?iso-8859-1?q?x_?= + =?iso-8859-1?q?xx?= + =?iso-8859-1?q?xx?= + =?iso-8859-1?q?_x?= + =?iso-8859-1?q?xx?= + =?iso-8859-1?q?x_?= + =?iso-8859-1?q?xx?= + =?iso-8859-1?q?xx?= + =?iso-8859-1?q?_?=""") + eq(x, str(make_header(decode_header(s)))) h = Header(charset='iso-8859-1', maxlinelen=40) h.append('xxxx ' * 20) - eq(h.encode(), """\ -=?iso-8859-1?q?xxxx_xxxx_xxxx_xxxx?= - =?iso-8859-1?q?xxxx_xxxx_xxxx_xxxx?= - =?iso-8859-1?q?xxxx_xxxx_xxxx_xxxx?= - =?iso-8859-1?q?xxxx_xxxx_xxxx_xxxx?= - =?iso-8859-1?q?xxxx_xxxx_xxxx_xxxx_?=""") + s = h.encode() + eq(s, """\ +=?iso-8859-1?q?xxxx_xxxx_xxxx_xxxx_xxx?= + =?iso-8859-1?q?x_xxxx_xxxx_xxxx_xxxx_?= + =?iso-8859-1?q?xxxx_xxxx_xxxx_xxxx_xx?= + =?iso-8859-1?q?xx_xxxx_xxxx_xxxx_xxxx?= + =?iso-8859-1?q?_xxxx_xxxx_?=""") + eq(x, str(make_header(decode_header(s)))) + + def test_base64_splittable(self): + eq = self.ndiffAssertEqual + h = Header(charset='koi8-r', maxlinelen=20) + x = 'xxxx ' * 20 + h.append(x) + s = h.encode() + eq(s, """\ +=?koi8-r?b?eHh4?= + =?koi8-r?b?eCB4?= + =?koi8-r?b?eHh4?= + =?koi8-r?b?IHh4?= + =?koi8-r?b?eHgg?= + =?koi8-r?b?eHh4?= + =?koi8-r?b?eCB4?= + =?koi8-r?b?eHh4?= + =?koi8-r?b?IHh4?= + =?koi8-r?b?eHgg?= + =?koi8-r?b?eHh4?= + =?koi8-r?b?eCB4?= + =?koi8-r?b?eHh4?= + =?koi8-r?b?IHh4?= + =?koi8-r?b?eHgg?= + =?koi8-r?b?eHh4?= + =?koi8-r?b?eCB4?= + =?koi8-r?b?eHh4?= + =?koi8-r?b?IHh4?= + =?koi8-r?b?eHgg?= + =?koi8-r?b?eHh4?= + =?koi8-r?b?eCB4?= + =?koi8-r?b?eHh4?= + =?koi8-r?b?IHh4?= + =?koi8-r?b?eHgg?= + =?koi8-r?b?eHh4?= + =?koi8-r?b?eCB4?= + =?koi8-r?b?eHh4?= + =?koi8-r?b?IHh4?= + =?koi8-r?b?eHgg?= + =?koi8-r?b?eHh4?= + =?koi8-r?b?eCB4?= + =?koi8-r?b?eHh4?= + =?koi8-r?b?IA==?=""") + eq(x, str(make_header(decode_header(s)))) + h = Header(charset='koi8-r', maxlinelen=40) + h.append(x) + s = h.encode() + eq(s, """\ +=?koi8-r?b?eHh4eCB4eHh4IHh4eHggeHh4?= + =?koi8-r?b?eCB4eHh4IHh4eHggeHh4eCB4?= + =?koi8-r?b?eHh4IHh4eHggeHh4eCB4eHh4?= + =?koi8-r?b?IHh4eHggeHh4eCB4eHh4IHh4?= + =?koi8-r?b?eHggeHh4eCB4eHh4IHh4eHgg?= + =?koi8-r?b?eHh4eCB4eHh4IA==?=""") + eq(x, str(make_header(decode_header(s)))) def test_us_ascii_header(self): eq = self.assertEqual @@ -2915,7 +3008,7 @@ A very long line that must get split to something other than at the eq = self.assertEqual h = Header() h.append('hello', 'iso-8859-1') - eq(h, '=?iso-8859-1?q?hello?=') + eq(h, 'hello') ## def test_unicode_error(self): ## raises = self.assertRaises diff --git a/Lib/email/utils.py b/Lib/email/utils.py index 404cd96..5771209 100644 --- a/Lib/email/utils.py +++ b/Lib/email/utils.py @@ -71,16 +71,6 @@ def _bdecode(s): -def fix_eols(s): - """Replace all line-ending characters with \r\n.""" - # Fix newlines with no preceding carriage return - s = re.sub(r'(?