diff options
author | Guido van Rossum <guido@python.org> | 2007-08-30 01:15:14 (GMT) |
---|---|---|
committer | Guido van Rossum <guido@python.org> | 2007-08-30 01:15:14 (GMT) |
commit | 8b3febef2f96c35e9aad9db2ef499db040fdefae (patch) | |
tree | 6bc3322d80780a8d57d845b350aad9fbe250d5de /Lib/email/header.py | |
parent | 21b731fb7798218a0e59e6db204d1d43d2a1e820 (diff) | |
download | cpython-8b3febef2f96c35e9aad9db2ef499db040fdefae.zip cpython-8b3febef2f96c35e9aad9db2ef499db040fdefae.tar.gz cpython-8b3febef2f96c35e9aad9db2ef499db040fdefae.tar.bz2 |
Copying the email package back, despite its failings.
Diffstat (limited to 'Lib/email/header.py')
-rw-r--r-- | Lib/email/header.py | 520 |
1 files changed, 520 insertions, 0 deletions
diff --git a/Lib/email/header.py b/Lib/email/header.py new file mode 100644 index 0000000..1d97f8f --- /dev/null +++ b/Lib/email/header.py @@ -0,0 +1,520 @@ +# Copyright (C) 2002-2007 Python Software Foundation +# Author: Ben Gertzfield, Barry Warsaw +# Contact: email-sig@python.org + +"""Header encoding and decoding functionality.""" + +__all__ = [ + 'Header', + 'decode_header', + 'make_header', + ] + +import re +import binascii + +import email.quoprimime +import email.base64mime + +from email.errors import HeaderParseError +from email.charset import Charset + +NL = '\n' +SPACE = ' ' +BSPACE = b' ' +SPACE8 = ' ' * 8 +EMPTYSTRING = '' + +MAXLINELEN = 76 + +USASCII = Charset('us-ascii') +UTF8 = Charset('utf-8') + +# Match encoded-word strings in the form =?charset?q?Hello_World?= +ecre = re.compile(r''' + =\? # literal =? + (?P<charset>[^?]*?) # non-greedy up to the next ? is the charset + \? # literal ? + (?P<encoding>[qb]) # either a "q" or a "b", case insensitive + \? # literal ? + (?P<encoded>.*?) # non-greedy up to the next ?= is the encoded string + \?= # literal ?= + (?=[ \t]|$) # whitespace or the end of the string + ''', re.VERBOSE | re.IGNORECASE | re.MULTILINE) + +# Field name regexp, including trailing colon, but not separating whitespace, +# according to RFC 2822. Character range is from tilde to exclamation mark. +# For use with .match() +fcre = re.compile(r'[\041-\176]+:$') + + + +# Helpers +_max_append = email.quoprimime._max_append + + + +def decode_header(header): + """Decode a message header value without converting charset. + + Returns a list of (string, charset) pairs containing each of the decoded + parts of the header. Charset is None for non-encoded parts of the header, + otherwise a lower-case string containing the name of the character set + specified in the encoded string. + + An email.Errors.HeaderParseError may be raised when certain decoding error + occurs (e.g. a base64 decoding exception). + """ + # If no encoding, just return the header with no charset. + if not ecre.search(header): + return [(header, None)] + # First step is to parse all the encoded parts into triplets of the form + # (encoded_string, encoding, charset). For unencoded strings, the last + # two parts will be None. + words = [] + for line in header.splitlines(): + parts = ecre.split(line) + while parts: + unencoded = parts.pop(0).strip() + if unencoded: + words.append((unencoded, None, None)) + if parts: + charset = parts.pop(0).lower() + encoding = parts.pop(0).lower() + encoded = parts.pop(0) + words.append((encoded, encoding, charset)) + # The next step is to decode each encoded word by applying the reverse + # base64 or quopri transformation. decoded_words is now a list of the + # form (decoded_word, charset). + decoded_words = [] + for encoded_string, encoding, charset in words: + if encoding is None: + # This is an unencoded word. + decoded_words.append((encoded_string, charset)) + elif encoding == 'q': + word = email.quoprimime.header_decode(encoded_string) + decoded_words.append((word, charset)) + elif encoding == 'b': + try: + word = email.base64mime.decode(encoded_string) + except binascii.Error: + raise HeaderParseError('Base64 decoding error') + else: + decoded_words.append((word, charset)) + else: + raise AssertionError('Unexpected encoding: ' + encoding) + # Now convert all words to bytes and collapse consecutive runs of + # similarly encoded words. + collapsed = [] + last_word = last_charset = None + for word, charset in decoded_words: + if isinstance(word, str): + word = bytes(ord(c) for c in word) + if last_word is None: + last_word = word + last_charset = charset + elif charset != last_charset: + collapsed.append((last_word, last_charset)) + last_word = word + last_charset = charset + elif last_charset is None: + last_word += BSPACE + word + else: + last_word += word + collapsed.append((last_word, last_charset)) + return collapsed + + + +def make_header(decoded_seq, maxlinelen=None, header_name=None, + continuation_ws=' '): + """Create a Header from a sequence of pairs as returned by decode_header() + + decode_header() takes a header value string and returns a sequence of + pairs of the format (decoded_string, charset) where charset is the string + name of the character set. + + This function takes one of those sequence of pairs and returns a Header + instance. Optional maxlinelen, header_name, and continuation_ws are as in + the Header constructor. + """ + h = Header(maxlinelen=maxlinelen, header_name=header_name, + continuation_ws=continuation_ws) + for s, charset in decoded_seq: + # None means us-ascii but we can simply pass it on to h.append() + if charset is not None and not isinstance(charset, Charset): + charset = Charset(charset) + h.append(s, charset) + return h + + + +class Header: + def __init__(self, s=None, charset=None, + maxlinelen=None, header_name=None, + continuation_ws=' ', errors='strict'): + """Create a MIME-compliant header that can contain many character sets. + + Optional s is the initial header value. If None, the initial header + value is not set. You can later append to the header with .append() + method calls. s may be a byte string or a Unicode string, but see the + .append() documentation for semantics. + + Optional charset serves two purposes: it has the same meaning as the + charset argument to the .append() method. It also sets the default + character set for all subsequent .append() calls that omit the charset + argument. If charset is not provided in the constructor, the us-ascii + charset is used both as s's initial charset and as the default for + subsequent .append() calls. + + The maximum line length can be specified explicit via maxlinelen. For + splitting the first line to a shorter value (to account for the field + header which isn't included in s, e.g. `Subject') pass in the name of + the field in header_name. The default maxlinelen is 76. + + continuation_ws must be RFC 2822 compliant folding whitespace (usually + either a space or a hard tab) which will be prepended to continuation + lines. + + errors is passed through to the .append() call. + """ + if charset is None: + charset = USASCII + elif not isinstance(charset, Charset): + charset = Charset(charset) + self._charset = charset + self._continuation_ws = continuation_ws + self._chunks = [] + if s is not None: + self.append(s, charset, errors) + if maxlinelen is None: + maxlinelen = MAXLINELEN + self._maxlinelen = maxlinelen + if header_name is None: + self._headerlen = 0 + else: + # Take the separating colon and space into account. + self._headerlen = len(header_name) + 2 + + def __str__(self): + """Return the string value of the header.""" + uchunks = [] + lastcs = None + for s, charset in self._chunks: + # We must preserve spaces between encoded and non-encoded word + # boundaries, which means for us we need to add a space when we go + # from a charset to None/us-ascii, or from None/us-ascii to a + # charset. Only do this for the second and subsequent chunks. + nextcs = charset + if uchunks: + if lastcs not in (None, 'us-ascii'): + if nextcs in (None, 'us-ascii'): + uchunks.append(SPACE) + nextcs = None + elif nextcs not in (None, 'us-ascii'): + uchunks.append(SPACE) + lastcs = nextcs + uchunks.append(s) + return EMPTYSTRING.join(uchunks) + + # Rich comparison operators for equality only. BAW: does it make sense to + # have or explicitly disable <, <=, >, >= operators? + def __eq__(self, other): + # other may be a Header or a string. Both are fine so coerce + # ourselves to a string, swap the args and do another comparison. + return other == self.encode() + + def __ne__(self, other): + return not self == other + + def append(self, s, charset=None, errors='strict'): + """Append a string to the MIME header. + + Optional charset, if given, should be a Charset instance or the name + of a character set (which will be converted to a Charset instance). A + value of None (the default) means that the charset given in the + constructor is used. + + s may be a byte string or a Unicode string. If it is a byte string + (i.e. isinstance(s, str) is true), then charset is the encoding of + that byte string, and a UnicodeError will be raised if the string + cannot be decoded with that charset. If s is a Unicode string, then + charset is a hint specifying the character set of the characters in + the string. In this case, when producing an RFC 2822 compliant header + using RFC 2047 rules, the Unicode string will be encoded using the + following charsets in order: us-ascii, the charset hint, utf-8. The + first character set not to provoke a UnicodeError is used. + + Optional `errors' is passed as the third argument to any unicode() or + ustr.encode() call. + """ + if charset is None: + charset = self._charset + elif not isinstance(charset, Charset): + charset = Charset(charset) + if isinstance(s, str): + # Convert the string from the input character set to the output + # character set and store the resulting bytes and the charset for + # composition later. + input_charset = charset.input_codec or 'us-ascii' + input_bytes = s.encode(input_charset, errors) + else: + # We already have the bytes we will store internally. + input_bytes = s + # Ensure that the bytes we're storing can be decoded to the output + # character set, otherwise an early error is thrown. + output_charset = charset.output_codec or 'us-ascii' + output_string = input_bytes.decode(output_charset, errors) + self._chunks.append((output_string, charset)) + + def encode(self, splitchars=';, \t'): + """Encode a message header into an RFC-compliant format. + + There are many issues involved in converting a given string for use in + an email header. Only certain character sets are readable in most + email clients, and as header strings can only contain a subset of + 7-bit ASCII, care must be taken to properly convert and encode (with + Base64 or quoted-printable) header strings. In addition, there is a + 75-character length limit on any given encoded header field, so + line-wrapping must be performed, even with double-byte character sets. + + This method will do its best to convert the string to the correct + character set used in email, and encode and line wrap it safely with + the appropriate scheme for that character set. + + If the given charset is not known or an error occurs during + conversion, this function will return the header untouched. + + Optional splitchars is a string containing characters to split long + ASCII lines on, in rough support of RFC 2822's `highest level + syntactic breaks'. This doesn't affect RFC 2047 encoded lines. + """ + self._normalize() + formatter = _ValueFormatter(self._headerlen, self._maxlinelen, + self._continuation_ws, splitchars) + for string, charset in self._chunks: + lines = string.splitlines() + for line in lines: + formatter.feed(line, charset) + if len(lines) > 1: + formatter.newline() + return str(formatter) + + def _normalize(self): + # Normalize the chunks so that all runs of identical charsets get + # collapsed into a single unicode string. You need a space between + # encoded words, or between encoded and unencoded words. + chunks = [] + last_charset = None + last_chunk = [] + for string, charset in self._chunks: + if charset == last_charset: + last_chunk.append(string) + else: + if last_charset is not None: + chunks.append((SPACE.join(last_chunk), last_charset)) + if last_charset != USASCII or charset != USASCII: + chunks.append((' ', USASCII)) + last_chunk = [string] + last_charset = charset + if last_chunk: + chunks.append((SPACE.join(last_chunk), last_charset)) + self._chunks = chunks + + + +class _ValueFormatter: + def __init__(self, headerlen, maxlen, continuation_ws, splitchars): + self._maxlen = maxlen + self._continuation_ws = continuation_ws + self._continuation_ws_len = len(continuation_ws.replace('\t', SPACE8)) + self._splitchars = splitchars + self._lines = [] + self._current_line = _Accumulator(headerlen) + + def __str__(self): + self.newline() + return NL.join(self._lines) + + def newline(self): + if len(self._current_line) > 0: + self._lines.append(str(self._current_line)) + self._current_line.reset() + + def feed(self, string, charset): + # If the string itself fits on the current line in its encoded format, + # then add it now and be done with it. + encoded_string = charset.header_encode(string) + if len(encoded_string) + len(self._current_line) <= self._maxlen: + self._current_line.push(encoded_string) + return + # Attempt to split the line at the highest-level syntactic break + # possible. Note that we don't have a lot of smarts about field + # syntax; we just try to break on semi-colons, then commas, then + # whitespace. Eventually, we'll allow this to be pluggable. + for ch in self._splitchars: + if ch in string: + break + else: + # We can't split the string to fit on the current line, so just + # put it on a line by itself. + self._lines.append(str(self._current_line)) + self._current_line.reset(self._continuation_ws) + self._current_line.push(encoded_string) + return + self._spliterate(string, ch, charset) + + def _spliterate(self, string, ch, charset): + holding = _Accumulator(transformfunc=charset.header_encode) + # Split the line on the split character, preserving it. If the split + # character is whitespace RFC 2822 $2.2.3 requires us to fold on the + # whitespace, so that the line leads with the original whitespace we + # split on. However, if a higher syntactic break is used instead + # (e.g. comma or semicolon), the folding should happen after the split + # character. But then in that case, we need to add our own + # continuation whitespace -- although won't that break unfolding? + for part, splitpart, nextpart in _spliterator(ch, string): + if not splitpart: + # No splitpart means this is the last chunk. Put this part + # either on the current line or the next line depending on + # whether it fits. + holding.push(part) + if len(holding) + len(self._current_line) <= self._maxlen: + # It fits, but we're done. + self._current_line.push(str(holding)) + else: + # It doesn't fit, but we're done. Before pushing a new + # line, watch out for the current line containing only + # whitespace. + holding.pop() + if len(self._current_line) == 0 and ( + len(holding) == 0 or str(holding).isspace()): + # Don't start a new line. + holding.push(part) + part = None + self._current_line.push(str(holding)) + self._lines.append(str(self._current_line)) + if part is None: + self._current_line.reset() + else: + holding.reset(part) + self._current_line.reset(str(holding)) + return + elif not nextpart: + # There must be some trailing split characters because we + # found a split character but no next part. In this case we + # must treat the thing to fit as the part + splitpart because + # if splitpart is whitespace it's not allowed to be the only + # thing on the line, and if it's not whitespace we must split + # after the syntactic break. In either case, we're done. + holding_prelen = len(holding) + holding.push(part + splitpart) + if len(holding) + len(self._current_line) <= self._maxlen: + self._current_line.push(str(holding)) + elif holding_prelen == 0: + # This is the only chunk left so it has to go on the + # current line. + self._current_line.push(str(holding)) + else: + save_part = holding.pop() + self._current_line.push(str(holding)) + self._lines.append(str(self._current_line)) + holding.reset(save_part) + self._current_line.reset(str(holding)) + return + elif not part: + # We're leading with a split character. See if the splitpart + # and nextpart fits on the current line. + holding.push(splitpart + nextpart) + holding_len = len(holding) + # We know we're not leaving the nextpart on the stack. + holding.pop() + if holding_len + len(self._current_line) <= self._maxlen: + holding.push(splitpart) + else: + # It doesn't fit. Since there's no current part really + # the best we can do is start a new line and push the + # split part onto it. + self._current_line.push(str(holding)) + holding.reset() + if len(self._current_line) > 0 and self._lines: + self._lines.append(str(self._current_line)) + self._current_line.reset() + holding.push(splitpart) + else: + # All three parts are present. First let's see if all three + # parts will fit on the current line. If so, we don't need to + # split it. + holding.push(part + splitpart + nextpart) + holding_len = len(holding) + # Pop the part because we'll push nextpart on the next + # iteration through the loop. + holding.pop() + if holding_len + len(self._current_line) <= self._maxlen: + holding.push(part + splitpart) + else: + # The entire thing doesn't fit. See if we need to split + # before or after the split characters. + if splitpart.isspace(): + # Split before whitespace. Remember that the + # whitespace becomes the continuation whitespace of + # the next line so it goes to current_line not holding. + holding.push(part) + self._current_line.push(str(holding)) + holding.reset() + self._lines.append(str(self._current_line)) + self._current_line.reset(splitpart) + else: + # Split after non-whitespace. The continuation + # whitespace comes from the instance variable. + holding.push(part + splitpart) + self._current_line.push(str(holding)) + holding.reset() + self._lines.append(str(self._current_line)) + if nextpart[0].isspace(): + self._current_line.reset() + else: + self._current_line.reset(self._continuation_ws) + # Get the last of the holding part + self._current_line.push(str(holding)) + + + +def _spliterator(character, string): + parts = list(reversed(re.split('(%s)' % character, string))) + while parts: + part = parts.pop() + splitparts = (parts.pop() if parts else None) + nextpart = (parts.pop() if parts else None) + yield (part, splitparts, nextpart) + if nextpart is not None: + parts.append(nextpart) + + +class _Accumulator: + def __init__(self, initial_size=0, transformfunc=None): + self._initial_size = initial_size + if transformfunc is None: + self._transformfunc = lambda string: string + else: + self._transformfunc = transformfunc + self._current = [] + + def push(self, string): + self._current.append(string) + + def pop(self): + return self._current.pop() + + def __len__(self): + return len(str(self)) + self._initial_size + + def __str__(self): + return self._transformfunc(EMPTYSTRING.join(self._current)) + + def reset(self, string=None): + self._current = [] + self._current_len = 0 + self._initial_size = 0 + if string is not None: + self.push(string) |