summaryrefslogtreecommitdiffstats
path: root/Lib
diff options
context:
space:
mode:
authorR David Murray <rdmurray@bitdance.com>2011-04-18 17:59:37 (GMT)
committerR David Murray <rdmurray@bitdance.com>2011-04-18 17:59:37 (GMT)
commit3edd22ac950d3a2bcc1ad2e5a83554970aef3369 (patch)
treeb4661afc1be45e0d072c1c83ab354b2362f05afb /Lib
parentce16be91dc68597b0c5bfc7b4b1c5136fe5697a6 (diff)
downloadcpython-3edd22ac950d3a2bcc1ad2e5a83554970aef3369.zip
cpython-3edd22ac950d3a2bcc1ad2e5a83554970aef3369.tar.gz
cpython-3edd22ac950d3a2bcc1ad2e5a83554970aef3369.tar.bz2
#11731: simplify/enhance parser/generator API by introducing policy objects.
This new interface will also allow for future planned enhancements in control over the parser/generator without requiring any additional complexity in the parser/generator API. Patch reviewed by Éric Araujo and Barry Warsaw.
Diffstat (limited to 'Lib')
-rw-r--r--Lib/email/errors.py2
-rw-r--r--Lib/email/feedparser.py23
-rw-r--r--Lib/email/generator.py62
-rw-r--r--Lib/email/parser.py11
-rw-r--r--Lib/email/policy.py174
-rw-r--r--Lib/test/test_email/__init__.py2
-rw-r--r--Lib/test/test_email/test_email.py144
-rw-r--r--Lib/test/test_email/test_generator.py136
-rw-r--r--Lib/test/test_email/test_policy.py148
9 files changed, 653 insertions, 49 deletions
diff --git a/Lib/email/errors.py b/Lib/email/errors.py
index d52a624..c2ea7d4 100644
--- a/Lib/email/errors.py
+++ b/Lib/email/errors.py
@@ -32,7 +32,7 @@ class CharsetError(MessageError):
# These are parsing defects which the parser was able to work around.
-class MessageDefect:
+class MessageDefect(Exception):
"""Base class for a message defect."""
def __init__(self, line=None):
diff --git a/Lib/email/feedparser.py b/Lib/email/feedparser.py
index 1b752d0..60de49e 100644
--- a/Lib/email/feedparser.py
+++ b/Lib/email/feedparser.py
@@ -25,6 +25,7 @@ import re
from email import errors
from email import message
+from email import policy
NLCRE = re.compile('\r\n|\r|\n')
NLCRE_bol = re.compile('(\r\n|\r|\n)')
@@ -134,9 +135,16 @@ class BufferedSubFile(object):
class FeedParser:
"""A feed-style parser of email."""
- def __init__(self, _factory=message.Message):
- """_factory is called with no arguments to create a new message obj"""
+ def __init__(self, _factory=message.Message, *, policy=policy.default):
+ """_factory is called with no arguments to create a new message obj
+
+ The policy keyword specifies a policy object that controls a number of
+ aspects of the parser's operation. The default policy maintains
+ backward compatibility.
+
+ """
self._factory = _factory
+ self.policy = policy
self._input = BufferedSubFile()
self._msgstack = []
self._parse = self._parsegen().__next__
@@ -168,7 +176,8 @@ class FeedParser:
# Look for final set of defects
if root.get_content_maintype() == 'multipart' \
and not root.is_multipart():
- root.defects.append(errors.MultipartInvariantViolationDefect())
+ defect = errors.MultipartInvariantViolationDefect()
+ self.policy.handle_defect(root, defect)
return root
def _new_message(self):
@@ -281,7 +290,8 @@ class FeedParser:
# defined a boundary. That's a problem which we'll handle by
# reading everything until the EOF and marking the message as
# defective.
- self._cur.defects.append(errors.NoBoundaryInMultipartDefect())
+ defect = errors.NoBoundaryInMultipartDefect()
+ self.policy.handle_defect(self._cur, defect)
lines = []
for line in self._input:
if line is NeedMoreData:
@@ -385,7 +395,8 @@ class FeedParser:
# that as a defect and store the captured text as the payload.
# Everything from here to the EOF is epilogue.
if capturing_preamble:
- self._cur.defects.append(errors.StartBoundaryNotFoundDefect())
+ defect = errors.StartBoundaryNotFoundDefect()
+ self.policy.handle_defect(self._cur, defect)
self._cur.set_payload(EMPTYSTRING.join(preamble))
epilogue = []
for line in self._input:
@@ -437,7 +448,7 @@ class FeedParser:
# is illegal, so let's note the defect, store the illegal
# line, and ignore it for purposes of headers.
defect = errors.FirstHeaderLineIsContinuationDefect(line)
- self._cur.defects.append(defect)
+ self.policy.handle_defect(self._cur, defect)
continue
lastvalue.append(line)
continue
diff --git a/Lib/email/generator.py b/Lib/email/generator.py
index fdd34e4..d8b8fa9 100644
--- a/Lib/email/generator.py
+++ b/Lib/email/generator.py
@@ -13,8 +13,10 @@ import random
import warnings
from io import StringIO, BytesIO
+from email import policy
from email.header import Header
from email.message import _has_surrogates
+import email.charset as _charset
UNDERSCORE = '_'
NL = '\n' # XXX: no longer used by the code below.
@@ -33,7 +35,8 @@ class Generator:
# Public interface
#
- def __init__(self, outfp, mangle_from_=True, maxheaderlen=78):
+ def __init__(self, outfp, mangle_from_=True, maxheaderlen=None, *,
+ policy=policy.default):
"""Create the generator for message flattening.
outfp is the output file-like object for writing the message to. It
@@ -49,16 +52,23 @@ class Generator:
defined in the Header class. Set maxheaderlen to zero to disable
header wrapping. The default is 78, as recommended (but not required)
by RFC 2822.
+
+ The policy keyword specifies a policy object that controls a number of
+ aspects of the generator's operation. The default policy maintains
+ backward compatibility.
+
"""
self._fp = outfp
self._mangle_from_ = mangle_from_
- self._maxheaderlen = maxheaderlen
+ self._maxheaderlen = (maxheaderlen if maxheaderlen is not None else
+ policy.max_line_length)
+ self.policy = policy
def write(self, s):
# Just delegate to the file object
self._fp.write(s)
- def flatten(self, msg, unixfrom=False, linesep='\n'):
+ def flatten(self, msg, unixfrom=False, linesep=None):
r"""Print the message object tree rooted at msg to the output file
specified when the Generator instance was created.
@@ -70,17 +80,15 @@ class Generator:
Note that for subobjects, no From_ line is printed.
linesep specifies the characters used to indicate a new line in
- the output. The default value is the most useful for typical
- Python applications, but it can be set to \r\n to produce RFC-compliant
- line separators when needed.
+ the output. The default value is determined by the policy.
"""
# We use the _XXX constants for operating on data that comes directly
# from the msg, and _encoded_XXX constants for operating on data that
# has already been converted (to bytes in the BytesGenerator) and
# inserted into a temporary buffer.
- self._NL = linesep
- self._encoded_NL = self._encode(linesep)
+ self._NL = linesep if linesep is not None else self.policy.linesep
+ self._encoded_NL = self._encode(self._NL)
self._EMPTY = ''
self._encoded_EMTPY = self._encode('')
if unixfrom:
@@ -338,7 +346,10 @@ class BytesGenerator(Generator):
Functionally identical to the base Generator except that the output is
bytes and not string. When surrogates were used in the input to encode
- bytes, these are decoded back to bytes for output.
+ bytes, these are decoded back to bytes for output. If the policy has
+ must_be_7bit set true, then the message is transformed such that the
+ non-ASCII bytes are properly content transfer encoded, using the
+ charset unknown-8bit.
The outfp object must accept bytes in its write method.
"""
@@ -361,21 +372,22 @@ class BytesGenerator(Generator):
# strings with 8bit bytes.
for h, v in msg._headers:
self.write('%s: ' % h)
- if isinstance(v, Header):
- self.write(v.encode(maxlinelen=self._maxheaderlen)+NL)
- elif _has_surrogates(v):
- # If we have raw 8bit data in a byte string, we have no idea
- # what the encoding is. There is no safe way to split this
- # string. If it's ascii-subset, then we could do a normal
- # ascii split, but if it's multibyte then we could break the
- # string. There's no way to know so the least harm seems to
- # be to not split the string and risk it being too long.
- self.write(v+NL)
- else:
- # Header's got lots of smarts and this string is safe...
- header = Header(v, maxlinelen=self._maxheaderlen,
- header_name=h)
- self.write(header.encode(linesep=self._NL)+self._NL)
+ if isinstance(v, str):
+ if _has_surrogates(v):
+ if not self.policy.must_be_7bit:
+ # If we have raw 8bit data in a byte string, we have no idea
+ # what the encoding is. There is no safe way to split this
+ # string. If it's ascii-subset, then we could do a normal
+ # ascii split, but if it's multibyte then we could break the
+ # string. There's no way to know so the least harm seems to
+ # be to not split the string and risk it being too long.
+ self.write(v+NL)
+ continue
+ h = Header(v, charset=_charset.UNKNOWN8BIT, header_name=h)
+ else:
+ h = Header(v, header_name=h)
+ self.write(h.encode(linesep=self._NL,
+ maxlinelen=self._maxheaderlen)+self._NL)
# A blank line always separates headers from body
self.write(self._NL)
@@ -384,7 +396,7 @@ class BytesGenerator(Generator):
# just write it back out.
if msg._payload is None:
return
- if _has_surrogates(msg._payload):
+ if _has_surrogates(msg._payload) and not self.policy.must_be_7bit:
self.write(msg._payload)
else:
super(BytesGenerator,self)._handle_text(msg)
diff --git a/Lib/email/parser.py b/Lib/email/parser.py
index fc5090b..0f92160 100644
--- a/Lib/email/parser.py
+++ b/Lib/email/parser.py
@@ -11,11 +11,12 @@ from io import StringIO, TextIOWrapper
from email.feedparser import FeedParser
from email.message import Message
+from email import policy
class Parser:
- def __init__(self, _class=Message):
+ def __init__(self, _class=Message, *, policy=policy.default):
"""Parser of RFC 2822 and MIME email messages.
Creates an in-memory object tree representing the email message, which
@@ -30,8 +31,14 @@ class Parser:
_class is the class to instantiate for new message objects when they
must be created. This class must have a constructor that can take
zero arguments. Default is Message.Message.
+
+ The policy keyword specifies a policy object that controls a number of
+ aspects of the parser's operation. The default policy maintains
+ backward compatibility.
+
"""
self._class = _class
+ self.policy = policy
def parse(self, fp, headersonly=False):
"""Create a message structure from the data in a file.
@@ -41,7 +48,7 @@ class Parser:
parsing after reading the headers or not. The default is False,
meaning it parses the entire contents of the file.
"""
- feedparser = FeedParser(self._class)
+ feedparser = FeedParser(self._class, policy=self.policy)
if headersonly:
feedparser._set_headersonly()
while True:
diff --git a/Lib/email/policy.py b/Lib/email/policy.py
new file mode 100644
index 0000000..88877a2
--- /dev/null
+++ b/Lib/email/policy.py
@@ -0,0 +1,174 @@
+"""Policy framework for the email package.
+
+Allows fine grained feature control of how the package parses and emits data.
+"""
+
+__all__ = [
+ 'Policy',
+ 'default',
+ 'strict',
+ 'SMTP',
+ 'HTTP',
+ ]
+
+
+class _PolicyBase:
+
+ """Policy Object basic framework.
+
+ This class is useless unless subclassed. A subclass should define
+ class attributes with defaults for any values that are to be
+ managed by the Policy object. The constructor will then allow
+ non-default values to be set for these attributes at instance
+ creation time. The instance will be callable, taking these same
+ attributes keyword arguments, and returning a new instance
+ identical to the called instance except for those values changed
+ by the keyword arguments. Instances may be added, yielding new
+ instances with any non-default values from the right hand
+ operand overriding those in the left hand operand. That is,
+
+ A + B == A(<non-default values of B>)
+
+ The repr of an instance can be used to reconstruct the object
+ if and only if the repr of the values can be used to reconstruct
+ those values.
+
+ """
+
+ def __init__(self, **kw):
+ """Create new Policy, possibly overriding some defaults.
+
+ See class docstring for a list of overridable attributes.
+
+ """
+ for name, value in kw.items():
+ if hasattr(self, name):
+ super(_PolicyBase,self).__setattr__(name, value)
+ else:
+ raise TypeError(
+ "{!r} is an invalid keyword argument for {}".format(
+ name, self.__class__.__name__))
+
+ def __repr__(self):
+ args = [ "{}={!r}".format(name, value)
+ for name, value in self.__dict__.items() ]
+ return "{}({})".format(self.__class__.__name__, args if args else '')
+
+ def clone(self, **kw):
+ """Return a new instance with specified attributes changed.
+
+ The new instance has the same attribute values as the current object,
+ except for the changes passed in as keyword arguments.
+
+ """
+ for attr, value in self.__dict__.items():
+ if attr not in kw:
+ kw[attr] = value
+ return self.__class__(**kw)
+
+ def __setattr__(self, name, value):
+ if hasattr(self, name):
+ msg = "{!r} object attribute {!r} is read-only"
+ else:
+ msg = "{!r} object has no attribute {!r}"
+ raise AttributeError(msg.format(self.__class__.__name__, name))
+
+ def __add__(self, other):
+ """Non-default values from right operand override those from left.
+
+ The object returned is a new instance of the subclass.
+
+ """
+ return self.clone(**other.__dict__)
+
+
+class Policy(_PolicyBase):
+
+ """Controls for how messages are interpreted and formatted.
+
+ Most of the classes and many of the methods in the email package
+ accept Policy objects as parameters. A Policy object contains a set
+ of values and functions that control how input is interpreted and how
+ output is rendered. For example, the parameter 'raise_on_defect'
+ controls whether or not an RFC violation throws an error or not,
+ while 'max_line_length' controls the maximum length of output lines
+ when a Message is serialized.
+
+ Any valid attribute may be overridden when a Policy is created by
+ passing it as a keyword argument to the constructor. Policy
+ objects are immutable, but a new Policy object can be created
+ with only certain values changed by calling the Policy instance
+ with keyword arguments. Policy objects can also be added,
+ producing a new Policy object in which the non-default attributes
+ set in the right hand operand overwrite those specified in the
+ left operand.
+
+ Settable attributes:
+
+ raise_on_defect -- If true, then defects should be raised
+ as errors. Default False.
+
+ linesep -- string containing the value to use as
+ separation between output lines. Default '\n'.
+
+ must_be_7bit -- output must contain only 7bit clean data.
+ Default False.
+
+ max_line_length -- maximum length of lines, excluding 'linesep',
+ during serialization. None means no line
+ wrapping is done. Default is 78.
+
+ Methods:
+
+ register_defect(obj, defect)
+ defect is a Defect instance. The default implementation appends defect
+ to the objs 'defects' attribute.
+
+ handle_defect(obj, defect)
+ intended to be called by parser code that finds a defect. If
+ raise_on_defect is True, defect is raised as an error, otherwise
+ register_defect is called.
+
+ """
+
+ raise_on_defect = False
+ linesep = '\n'
+ must_be_7bit = False
+ max_line_length = 78
+
+ def handle_defect(self, obj, defect):
+ """Based on policy, either raise defect or call register_defect.
+
+ handle_defect(obj, defect)
+
+ defect should be a Defect subclass, but in any case must be an
+ Exception subclass. obj is the object on which the defect should be
+ registered if it is not raised. If the raise_on_defect is True, the
+ defect is raised as an error, otherwise the object and the defect are
+ passed to register_defect.
+
+ This class is intended to be called by parsers that discover defects,
+ and will not be called from code using the library unless that code is
+ implementing an alternate parser.
+
+ """
+ if self.raise_on_defect:
+ raise defect
+ self.register_defect(obj, defect)
+
+ def register_defect(self, obj, defect):
+ """Record 'defect' on 'obj'.
+
+ Called by handle_defect if raise_on_defect is False. This method is
+ part of the Policy API so that Policy subclasses can implement custom
+ defect handling. The default implementation calls the append method
+ of the defects attribute of obj.
+
+ """
+ obj.defects.append(defect)
+
+
+default = Policy()
+strict = default.clone(raise_on_defect=True)
+SMTP = default.clone(linesep='\r\n')
+HTTP = default.clone(linesep='\r\n', max_line_length=None)
diff --git a/Lib/test/test_email/__init__.py b/Lib/test/test_email/__init__.py
index 69be678..04fdf89 100644
--- a/Lib/test/test_email/__init__.py
+++ b/Lib/test/test_email/__init__.py
@@ -25,6 +25,8 @@ def openfile(filename, *args, **kws):
# Base test class
class TestEmailBase(unittest.TestCase):
+ maxDiff = None
+
def __init__(self, *args, **kw):
super().__init__(*args, **kw)
self.addTypeEqualityFunc(bytes, self.assertBytesEqual)
diff --git a/Lib/test/test_email/test_email.py b/Lib/test/test_email/test_email.py
index 46206c3..1657afc 100644
--- a/Lib/test/test_email/test_email.py
+++ b/Lib/test/test_email/test_email.py
@@ -1776,7 +1776,12 @@ YXNkZg==
# Test some badly formatted messages
-class TestNonConformant(TestEmailBase):
+class TestNonConformantBase:
+
+ def _msgobj(self, filename):
+ with openfile(filename) as fp:
+ return email.message_from_file(fp, policy=self.policy)
+
def test_parse_missing_minor_type(self):
eq = self.assertEqual
msg = self._msgobj('msg_14.txt')
@@ -1790,17 +1795,18 @@ class TestNonConformant(TestEmailBase):
# XXX We can probably eventually do better
inner = msg.get_payload(0)
unless(hasattr(inner, 'defects'))
- self.assertEqual(len(inner.defects), 1)
- unless(isinstance(inner.defects[0],
+ self.assertEqual(len(self.get_defects(inner)), 1)
+ unless(isinstance(self.get_defects(inner)[0],
errors.StartBoundaryNotFoundDefect))
def test_multipart_no_boundary(self):
unless = self.assertTrue
msg = self._msgobj('msg_25.txt')
unless(isinstance(msg.get_payload(), str))
- self.assertEqual(len(msg.defects), 2)
- unless(isinstance(msg.defects[0], errors.NoBoundaryInMultipartDefect))
- unless(isinstance(msg.defects[1],
+ self.assertEqual(len(self.get_defects(msg)), 2)
+ unless(isinstance(self.get_defects(msg)[0],
+ errors.NoBoundaryInMultipartDefect))
+ unless(isinstance(self.get_defects(msg)[1],
errors.MultipartInvariantViolationDefect))
def test_invalid_content_type(self):
@@ -1856,9 +1862,10 @@ counter to RFC 2822, there's no separating newline here
unless = self.assertTrue
msg = self._msgobj('msg_41.txt')
unless(hasattr(msg, 'defects'))
- self.assertEqual(len(msg.defects), 2)
- unless(isinstance(msg.defects[0], errors.NoBoundaryInMultipartDefect))
- unless(isinstance(msg.defects[1],
+ self.assertEqual(len(self.get_defects(msg)), 2)
+ unless(isinstance(self.get_defects(msg)[0],
+ errors.NoBoundaryInMultipartDefect))
+ unless(isinstance(self.get_defects(msg)[1],
errors.MultipartInvariantViolationDefect))
def test_missing_start_boundary(self):
@@ -1872,21 +1879,71 @@ counter to RFC 2822, there's no separating newline here
#
# [*] This message is missing its start boundary
bad = outer.get_payload(1).get_payload(0)
- self.assertEqual(len(bad.defects), 1)
- self.assertTrue(isinstance(bad.defects[0],
+ self.assertEqual(len(self.get_defects(bad)), 1)
+ self.assertTrue(isinstance(self.get_defects(bad)[0],
errors.StartBoundaryNotFoundDefect))
def test_first_line_is_continuation_header(self):
eq = self.assertEqual
m = ' Line 1\nLine 2\nLine 3'
- msg = email.message_from_string(m)
+ msg = email.message_from_string(m, policy=self.policy)
eq(msg.keys(), [])
eq(msg.get_payload(), 'Line 2\nLine 3')
- eq(len(msg.defects), 1)
- self.assertTrue(isinstance(msg.defects[0],
+ eq(len(self.get_defects(msg)), 1)
+ self.assertTrue(isinstance(self.get_defects(msg)[0],
errors.FirstHeaderLineIsContinuationDefect))
- eq(msg.defects[0].line, ' Line 1\n')
+ eq(self.get_defects(msg)[0].line, ' Line 1\n')
+
+
+class TestNonConformant(TestNonConformantBase, TestEmailBase):
+
+ policy=email.policy.default
+
+ def get_defects(self, obj):
+ return obj.defects
+
+
+class TestNonConformantCapture(TestNonConformantBase, TestEmailBase):
+
+ class CapturePolicy(email.policy.Policy):
+ captured = None
+ def register_defect(self, obj, defect):
+ self.captured.append(defect)
+
+ def setUp(self):
+ self.policy = self.CapturePolicy(captured=list())
+
+ def get_defects(self, obj):
+ return self.policy.captured
+
+class TestRaisingDefects(TestEmailBase):
+
+ def _msgobj(self, filename):
+ with openfile(filename) as fp:
+ return email.message_from_file(fp, policy=email.policy.strict)
+
+ def test_same_boundary_inner_outer(self):
+ with self.assertRaises(errors.StartBoundaryNotFoundDefect):
+ self._msgobj('msg_15.txt')
+
+ def test_multipart_no_boundary(self):
+ with self.assertRaises(errors.NoBoundaryInMultipartDefect):
+ self._msgobj('msg_25.txt')
+
+ def test_lying_multipart(self):
+ with self.assertRaises(errors.NoBoundaryInMultipartDefect):
+ self._msgobj('msg_41.txt')
+
+
+ def test_missing_start_boundary(self):
+ with self.assertRaises(errors.StartBoundaryNotFoundDefect):
+ self._msgobj('msg_42.txt')
+
+ def test_first_line_is_continuation_header(self):
+ m = ' Line 1\nLine 2\nLine 3'
+ with self.assertRaises(errors.FirstHeaderLineIsContinuationDefect):
+ msg = email.message_from_string(m, policy=email.policy.strict)
# Test RFC 2047 header encoding and decoding
@@ -2997,6 +3054,25 @@ Here's the message body
g.flatten(msg, linesep='\r\n')
self.assertEqual(s.getvalue(), text)
+ def test_crlf_control_via_policy(self):
+ with openfile('msg_26.txt', newline='\n') as fp:
+ text = fp.read()
+ msg = email.message_from_string(text)
+ s = StringIO()
+ g = email.generator.Generator(s, policy=email.policy.SMTP)
+ g.flatten(msg)
+ self.assertEqual(s.getvalue(), text)
+
+ def test_flatten_linesep_overrides_policy(self):
+ # msg_27 is lf separated
+ with openfile('msg_27.txt', newline='\n') as fp:
+ text = fp.read()
+ msg = email.message_from_string(text)
+ s = StringIO()
+ g = email.generator.Generator(s, policy=email.policy.SMTP)
+ g.flatten(msg, linesep='\n')
+ self.assertEqual(s.getvalue(), text)
+
maxDiff = None
def test_multipart_digest_with_extra_mime_headers(self):
@@ -3463,6 +3539,44 @@ class Test8BitBytesHandling(unittest.TestCase):
g.flatten(msg)
self.assertEqual(s.getvalue(), source)
+ def test_crlf_control_via_policy(self):
+ # msg_26 is crlf terminated
+ with openfile('msg_26.txt', 'rb') as fp:
+ text = fp.read()
+ msg = email.message_from_bytes(text)
+ s = BytesIO()
+ g = email.generator.BytesGenerator(s, policy=email.policy.SMTP)
+ g.flatten(msg)
+ self.assertEqual(s.getvalue(), text)
+
+ def test_flatten_linesep_overrides_policy(self):
+ # msg_27 is lf separated
+ with openfile('msg_27.txt', 'rb') as fp:
+ text = fp.read()
+ msg = email.message_from_bytes(text)
+ s = BytesIO()
+ g = email.generator.BytesGenerator(s, policy=email.policy.SMTP)
+ g.flatten(msg, linesep='\n')
+ self.assertEqual(s.getvalue(), text)
+
+ def test_must_be_7bit_handles_unknown_8bit(self):
+ msg = email.message_from_bytes(self.non_latin_bin_msg)
+ out = BytesIO()
+ g = email.generator.BytesGenerator(out,
+ policy=email.policy.default.clone(must_be_7bit=True))
+ g.flatten(msg)
+ self.assertEqual(out.getvalue(),
+ self.non_latin_bin_msg_as7bit_wrapped.encode('ascii'))
+
+ def test_must_be_7bit_transforms_8bit_cte(self):
+ msg = email.message_from_bytes(self.latin_bin_msg)
+ out = BytesIO()
+ g = email.generator.BytesGenerator(out,
+ policy=email.policy.default.clone(must_be_7bit=True))
+ g.flatten(msg)
+ self.assertEqual(out.getvalue(),
+ self.latin_bin_msg_as7bit.encode('ascii'))
+
maxDiff = None
diff --git a/Lib/test/test_email/test_generator.py b/Lib/test/test_email/test_generator.py
new file mode 100644
index 0000000..35ca6c5
--- /dev/null
+++ b/Lib/test/test_email/test_generator.py
@@ -0,0 +1,136 @@
+import io
+import textwrap
+import unittest
+from email import message_from_string, message_from_bytes
+from email.generator import Generator, BytesGenerator
+from email import policy
+from test.test_email import TestEmailBase
+
+# XXX: move generator tests from test_email into here at some point.
+
+
+class TestGeneratorBase():
+
+ long_subject = {
+ 0: textwrap.dedent("""\
+ To: whom_it_may_concern@example.com
+ From: nobody_you_want_to_know@example.com
+ Subject: We the willing led by the unknowing are doing the
+ impossible for the ungrateful. We have done so much for so long with so little
+ we are now qualified to do anything with nothing.
+
+ None
+ """),
+ 40: textwrap.dedent("""\
+ To: whom_it_may_concern@example.com
+ From:\x20
+ nobody_you_want_to_know@example.com
+ Subject: We the willing led by the
+ unknowing are doing the
+ impossible for the ungrateful. We have
+ done so much for so long with so little
+ we are now qualified to do anything
+ with nothing.
+
+ None
+ """),
+ 20: textwrap.dedent("""\
+ To:\x20
+ whom_it_may_concern@example.com
+ From:\x20
+ nobody_you_want_to_know@example.com
+ Subject: We the
+ willing led by the
+ unknowing are doing
+ the
+ impossible for the
+ ungrateful. We have
+ done so much for so
+ long with so little
+ we are now
+ qualified to do
+ anything with
+ nothing.
+
+ None
+ """),
+ }
+ long_subject[100] = long_subject[0]
+
+ def maxheaderlen_parameter_test(self, n):
+ msg = self.msgmaker(self.long_subject[0])
+ s = self.ioclass()
+ g = self.genclass(s, maxheaderlen=n)
+ g.flatten(msg)
+ self.assertEqual(s.getvalue(), self.long_subject[n])
+
+ def test_maxheaderlen_parameter_0(self):
+ self.maxheaderlen_parameter_test(0)
+
+ def test_maxheaderlen_parameter_100(self):
+ self.maxheaderlen_parameter_test(100)
+
+ def test_maxheaderlen_parameter_40(self):
+ self.maxheaderlen_parameter_test(40)
+
+ def test_maxheaderlen_parameter_20(self):
+ self.maxheaderlen_parameter_test(20)
+
+ def maxheaderlen_policy_test(self, n):
+ msg = self.msgmaker(self.long_subject[0])
+ s = self.ioclass()
+ g = self.genclass(s, policy=policy.default.clone(max_line_length=n))
+ g.flatten(msg)
+ self.assertEqual(s.getvalue(), self.long_subject[n])
+
+ def test_maxheaderlen_policy_0(self):
+ self.maxheaderlen_policy_test(0)
+
+ def test_maxheaderlen_policy_100(self):
+ self.maxheaderlen_policy_test(100)
+
+ def test_maxheaderlen_policy_40(self):
+ self.maxheaderlen_policy_test(40)
+
+ def test_maxheaderlen_policy_20(self):
+ self.maxheaderlen_policy_test(20)
+
+ def maxheaderlen_parm_overrides_policy_test(self, n):
+ msg = self.msgmaker(self.long_subject[0])
+ s = self.ioclass()
+ g = self.genclass(s, maxheaderlen=n,
+ policy=policy.default.clone(max_line_length=10))
+ g.flatten(msg)
+ self.assertEqual(s.getvalue(), self.long_subject[n])
+
+ def test_maxheaderlen_parm_overrides_policy_0(self):
+ self.maxheaderlen_parm_overrides_policy_test(0)
+
+ def test_maxheaderlen_parm_overrides_policy_100(self):
+ self.maxheaderlen_parm_overrides_policy_test(100)
+
+ def test_maxheaderlen_parm_overrides_policy_40(self):
+ self.maxheaderlen_parm_overrides_policy_test(40)
+
+ def test_maxheaderlen_parm_overrides_policy_20(self):
+ self.maxheaderlen_parm_overrides_policy_test(20)
+
+
+class TestGenerator(TestGeneratorBase, TestEmailBase):
+
+ msgmaker = staticmethod(message_from_string)
+ genclass = Generator
+ ioclass = io.StringIO
+
+
+class TestBytesGenerator(TestGeneratorBase, TestEmailBase):
+
+ msgmaker = staticmethod(message_from_bytes)
+ genclass = BytesGenerator
+ ioclass = io.BytesIO
+ long_subject = {key: x.encode('ascii')
+ for key, x in TestGeneratorBase.long_subject.items()}
+
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/Lib/test/test_email/test_policy.py b/Lib/test/test_email/test_policy.py
new file mode 100644
index 0000000..086ce40
--- /dev/null
+++ b/Lib/test/test_email/test_policy.py
@@ -0,0 +1,148 @@
+import types
+import unittest
+import email.policy
+
+class PolicyAPITests(unittest.TestCase):
+
+ longMessage = True
+
+ # These default values are the ones set on email.policy.default.
+ # If any of these defaults change, the docs must be updated.
+ policy_defaults = {
+ 'max_line_length': 78,
+ 'linesep': '\n',
+ 'must_be_7bit': False,
+ 'raise_on_defect': False,
+ }
+
+ # For each policy under test, we give here the values of the attributes
+ # that are different from the defaults for that policy.
+ policies = {
+ email.policy.Policy(): {},
+ email.policy.default: {},
+ email.policy.SMTP: {'linesep': '\r\n'},
+ email.policy.HTTP: {'linesep': '\r\n', 'max_line_length': None},
+ email.policy.strict: {'raise_on_defect': True},
+ }
+
+ def test_defaults(self):
+ for policy, changed_defaults in self.policies.items():
+ expected = self.policy_defaults.copy()
+ expected.update(changed_defaults)
+ for attr, value in expected.items():
+ self.assertEqual(getattr(policy, attr), value,
+ ("change {} docs/docstrings if defaults have "
+ "changed").format(policy))
+
+ def test_all_attributes_covered(self):
+ for attr in dir(email.policy.default):
+ if (attr.startswith('_') or
+ isinstance(getattr(email.policy.Policy, attr),
+ types.FunctionType)):
+ continue
+ else:
+ self.assertIn(attr, self.policy_defaults,
+ "{} is not fully tested".format(attr))
+
+ def test_policy_is_immutable(self):
+ for policy in self.policies:
+ for attr in self.policy_defaults:
+ with self.assertRaisesRegex(AttributeError, attr+".*read-only"):
+ setattr(policy, attr, None)
+ with self.assertRaisesRegex(AttributeError, 'no attribute.*foo'):
+ policy.foo = None
+
+ def test_set_policy_attrs_when_calledl(self):
+ testattrdict = { attr: None for attr in self.policy_defaults }
+ for policyclass in self.policies:
+ policy = policyclass.clone(**testattrdict)
+ for attr in self.policy_defaults:
+ self.assertIsNone(getattr(policy, attr))
+
+ def test_reject_non_policy_keyword_when_called(self):
+ for policyclass in self.policies:
+ with self.assertRaises(TypeError):
+ policyclass(this_keyword_should_not_be_valid=None)
+ with self.assertRaises(TypeError):
+ policyclass(newtline=None)
+
+ def test_policy_addition(self):
+ expected = self.policy_defaults.copy()
+ p1 = email.policy.default.clone(max_line_length=100)
+ p2 = email.policy.default.clone(max_line_length=50)
+ added = p1 + p2
+ expected.update(max_line_length=50)
+ for attr, value in expected.items():
+ self.assertEqual(getattr(added, attr), value)
+ added = p2 + p1
+ expected.update(max_line_length=100)
+ for attr, value in expected.items():
+ self.assertEqual(getattr(added, attr), value)
+ added = added + email.policy.default
+ for attr, value in expected.items():
+ self.assertEqual(getattr(added, attr), value)
+
+ def test_register_defect(self):
+ class Dummy:
+ def __init__(self):
+ self.defects = []
+ obj = Dummy()
+ defect = object()
+ policy = email.policy.Policy()
+ policy.register_defect(obj, defect)
+ self.assertEqual(obj.defects, [defect])
+ defect2 = object()
+ policy.register_defect(obj, defect2)
+ self.assertEqual(obj.defects, [defect, defect2])
+
+ class MyObj:
+ def __init__(self):
+ self.defects = []
+
+ class MyDefect(Exception):
+ pass
+
+ def test_handle_defect_raises_on_strict(self):
+ foo = self.MyObj()
+ defect = self.MyDefect("the telly is broken")
+ with self.assertRaisesRegex(self.MyDefect, "the telly is broken"):
+ email.policy.strict.handle_defect(foo, defect)
+
+ def test_handle_defect_registers_defect(self):
+ foo = self.MyObj()
+ defect1 = self.MyDefect("one")
+ email.policy.default.handle_defect(foo, defect1)
+ self.assertEqual(foo.defects, [defect1])
+ defect2 = self.MyDefect("two")
+ email.policy.default.handle_defect(foo, defect2)
+ self.assertEqual(foo.defects, [defect1, defect2])
+
+ class MyPolicy(email.policy.Policy):
+ defects = []
+ def register_defect(self, obj, defect):
+ self.defects.append(defect)
+
+ def test_overridden_register_defect_still_raises(self):
+ foo = self.MyObj()
+ defect = self.MyDefect("the telly is broken")
+ with self.assertRaisesRegex(self.MyDefect, "the telly is broken"):
+ self.MyPolicy(raise_on_defect=True).handle_defect(foo, defect)
+
+ def test_overriden_register_defect_works(self):
+ foo = self.MyObj()
+ defect1 = self.MyDefect("one")
+ my_policy = self.MyPolicy()
+ my_policy.handle_defect(foo, defect1)
+ self.assertEqual(my_policy.defects, [defect1])
+ self.assertEqual(foo.defects, [])
+ defect2 = self.MyDefect("two")
+ my_policy.handle_defect(foo, defect2)
+ self.assertEqual(my_policy.defects, [defect1, defect2])
+ self.assertEqual(foo.defects, [])
+
+ # XXX: Need subclassing tests.
+ # For adding subclassed objects, make sure the usual rules apply (subclass
+ # wins), but that the order still works (right overrides left).
+
+if __name__ == '__main__':
+ unittest.main()