summaryrefslogtreecommitdiffstats
path: root/Lib/test/test_email
diff options
context:
space:
mode:
authorR David Murray <rdmurray@bitdance.com>2012-05-25 19:01:48 (GMT)
committerR David Murray <rdmurray@bitdance.com>2012-05-25 19:01:48 (GMT)
commitc27e52265b7ff4aa57dc357c289cce8c9dd0fec3 (patch)
treeb2a25260b0aa89d0a4db3c0d2f91c8cb5e68d51a /Lib/test/test_email
parent9242c1378f77214f5b9b90149861cb13ca986fb0 (diff)
downloadcpython-c27e52265b7ff4aa57dc357c289cce8c9dd0fec3.zip
cpython-c27e52265b7ff4aa57dc357c289cce8c9dd0fec3.tar.gz
cpython-c27e52265b7ff4aa57dc357c289cce8c9dd0fec3.tar.bz2
#14731: refactor email policy framework.
This patch primarily does two things: (1) it adds some internal-interface methods to Policy that allow for Policy to control the parsing and folding of headers in such a way that we can construct a backward compatibility policy that is 100% compatible with the 3.2 API, while allowing a new policy to implement the email6 API. (2) it adds that backward compatibility policy and refactors the test suite so that the only differences between the 3.2 test_email.py file and the 3.3 test_email.py file is some small changes in test framework and the addition of tests for bugs fixed that apply to the 3.2 API. There are some additional teaks, such as moving just the code needed for the compatibility policy into _policybase, so that the library code can import only _policybase. That way the new code that will be added for email6 will only get imported when a non-compatibility policy is imported.
Diffstat (limited to 'Lib/test/test_email')
-rw-r--r--Lib/test/test_email/__init__.py14
-rw-r--r--Lib/test/test_email/test_email.py180
-rw-r--r--Lib/test/test_email/test_generator.py85
-rw-r--r--Lib/test/test_email/test_parser.py276
-rw-r--r--Lib/test/test_email/test_policy.py114
5 files changed, 514 insertions, 155 deletions
diff --git a/Lib/test/test_email/__init__.py b/Lib/test/test_email/__init__.py
index d72b50e..b05fb3c 100644
--- a/Lib/test/test_email/__init__.py
+++ b/Lib/test/test_email/__init__.py
@@ -3,6 +3,8 @@ import sys
import unittest
import test.support
import email
+from email.message import Message
+from email._policybase import compat32
from test.test_email import __file__ as landmark
# Run all tests in package for '-m unittest test.test_email'
@@ -36,16 +38,26 @@ def openfile(filename, *args, **kws):
class TestEmailBase(unittest.TestCase):
maxDiff = None
+ # Currently the default policy is compat32. By setting that as the default
+ # here we make minimal changes in the test_email tests compared to their
+ # pre-3.3 state.
+ policy = compat32
def __init__(self, *args, **kw):
super().__init__(*args, **kw)
self.addTypeEqualityFunc(bytes, self.assertBytesEqual)
+ # Backward compatibility to minimize test_email test changes.
ndiffAssertEqual = unittest.TestCase.assertEqual
def _msgobj(self, filename):
with openfile(filename) as fp:
- return email.message_from_file(fp)
+ return email.message_from_file(fp, policy=self.policy)
+
+ def _str_msg(self, string, message=Message, policy=None):
+ if policy is None:
+ policy = self.policy
+ return email.message_from_string(string, message, policy=policy)
def _bytes_repr(self, b):
return [repr(x) for x in b.splitlines(keepends=True)]
diff --git a/Lib/test/test_email/test_email.py b/Lib/test/test_email/test_email.py
index b07f675..ac6ee65 100644
--- a/Lib/test/test_email/test_email.py
+++ b/Lib/test/test_email/test_email.py
@@ -16,6 +16,7 @@ from io import StringIO, BytesIO
from itertools import chain
import email
+import email.policy
from email.charset import Charset
from email.header import Header, decode_header, make_header
@@ -1805,11 +1806,7 @@ YXNkZg==
# Test some badly formatted messages
-class TestNonConformantBase:
-
- def _msgobj(self, filename):
- with openfile(filename) as fp:
- return email.message_from_file(fp, policy=self.policy)
+class TestNonConformant(TestEmailBase):
def test_parse_missing_minor_type(self):
eq = self.assertEqual
@@ -1818,24 +1815,26 @@ class TestNonConformantBase:
eq(msg.get_content_maintype(), 'text')
eq(msg.get_content_subtype(), 'plain')
+ # test_parser.TestMessageDefectDetectionBase
def test_same_boundary_inner_outer(self):
unless = self.assertTrue
msg = self._msgobj('msg_15.txt')
# XXX We can probably eventually do better
inner = msg.get_payload(0)
unless(hasattr(inner, 'defects'))
- self.assertEqual(len(self.get_defects(inner)), 1)
- unless(isinstance(self.get_defects(inner)[0],
+ self.assertEqual(len(inner.defects), 1)
+ unless(isinstance(inner.defects[0],
errors.StartBoundaryNotFoundDefect))
+ # test_parser.TestMessageDefectDetectionBase
def test_multipart_no_boundary(self):
unless = self.assertTrue
msg = self._msgobj('msg_25.txt')
unless(isinstance(msg.get_payload(), str))
- self.assertEqual(len(self.get_defects(msg)), 2)
- unless(isinstance(self.get_defects(msg)[0],
+ self.assertEqual(len(msg.defects), 2)
+ unless(isinstance(msg.defects[0],
errors.NoBoundaryInMultipartDefect))
- unless(isinstance(self.get_defects(msg)[1],
+ unless(isinstance(msg.defects[1],
errors.MultipartInvariantViolationDefect))
multipart_msg = textwrap.dedent("""\
@@ -1861,27 +1860,26 @@ class TestNonConformantBase:
--===============3344438784458119861==--
""")
+ # test_parser.TestMessageDefectDetectionBase
def test_multipart_invalid_cte(self):
- msg = email.message_from_string(
- self.multipart_msg.format("\nContent-Transfer-Encoding: base64"),
- policy = self.policy)
- self.assertEqual(len(self.get_defects(msg)), 1)
- self.assertIsInstance(self.get_defects(msg)[0],
+ msg = self._str_msg(
+ self.multipart_msg.format("\nContent-Transfer-Encoding: base64"))
+ self.assertEqual(len(msg.defects), 1)
+ self.assertIsInstance(msg.defects[0],
errors.InvalidMultipartContentTransferEncodingDefect)
+ # test_parser.TestMessageDefectDetectionBase
def test_multipart_no_cte_no_defect(self):
- msg = email.message_from_string(
- self.multipart_msg.format(''),
- policy = self.policy)
- self.assertEqual(len(self.get_defects(msg)), 0)
+ msg = self._str_msg(self.multipart_msg.format(''))
+ self.assertEqual(len(msg.defects), 0)
+ # test_parser.TestMessageDefectDetectionBase
def test_multipart_valid_cte_no_defect(self):
for cte in ('7bit', '8bit', 'BINary'):
- msg = email.message_from_string(
+ msg = self._str_msg(
self.multipart_msg.format(
- "\nContent-Transfer-Encoding: {}".format(cte)),
- policy = self.policy)
- self.assertEqual(len(self.get_defects(msg)), 0)
+ "\nContent-Transfer-Encoding: {}".format(cte)))
+ self.assertEqual(len(msg.defects), 0)
def test_invalid_content_type(self):
eq = self.assertEqual
@@ -1932,16 +1930,18 @@ Subject: here's something interesting
counter to RFC 2822, there's no separating newline here
""")
+ # test_parser.TestMessageDefectDetectionBase
def test_lying_multipart(self):
unless = self.assertTrue
msg = self._msgobj('msg_41.txt')
unless(hasattr(msg, 'defects'))
- self.assertEqual(len(self.get_defects(msg)), 2)
- unless(isinstance(self.get_defects(msg)[0],
+ self.assertEqual(len(msg.defects), 2)
+ unless(isinstance(msg.defects[0],
errors.NoBoundaryInMultipartDefect))
- unless(isinstance(self.get_defects(msg)[1],
+ unless(isinstance(msg.defects[1],
errors.MultipartInvariantViolationDefect))
+ # test_parser.TestMessageDefectDetectionBase
def test_missing_start_boundary(self):
outer = self._msgobj('msg_42.txt')
# The message structure is:
@@ -1953,71 +1953,21 @@ counter to RFC 2822, there's no separating newline here
#
# [*] This message is missing its start boundary
bad = outer.get_payload(1).get_payload(0)
- self.assertEqual(len(self.get_defects(bad)), 1)
- self.assertTrue(isinstance(self.get_defects(bad)[0],
+ self.assertEqual(len(bad.defects), 1)
+ self.assertTrue(isinstance(bad.defects[0],
errors.StartBoundaryNotFoundDefect))
+ # test_parser.TestMessageDefectDetectionBase
def test_first_line_is_continuation_header(self):
eq = self.assertEqual
m = ' Line 1\nLine 2\nLine 3'
- msg = email.message_from_string(m, policy=self.policy)
+ msg = email.message_from_string(m)
eq(msg.keys(), [])
eq(msg.get_payload(), 'Line 2\nLine 3')
- eq(len(self.get_defects(msg)), 1)
- self.assertTrue(isinstance(self.get_defects(msg)[0],
+ eq(len(msg.defects), 1)
+ self.assertTrue(isinstance(msg.defects[0],
errors.FirstHeaderLineIsContinuationDefect))
- eq(self.get_defects(msg)[0].line, ' Line 1\n')
-
-
-class TestNonConformant(TestNonConformantBase, TestEmailBase):
-
- policy=email.policy.default
-
- def get_defects(self, obj):
- return obj.defects
-
-
-class TestNonConformantCapture(TestNonConformantBase, TestEmailBase):
-
- class CapturePolicy(email.policy.Policy):
- captured = None
- def register_defect(self, obj, defect):
- self.captured.append(defect)
-
- def setUp(self):
- self.policy = self.CapturePolicy(captured=list())
-
- def get_defects(self, obj):
- return self.policy.captured
-
-
-class TestRaisingDefects(TestEmailBase):
-
- def _msgobj(self, filename):
- with openfile(filename) as fp:
- return email.message_from_file(fp, policy=email.policy.strict)
-
- def test_same_boundary_inner_outer(self):
- with self.assertRaises(errors.StartBoundaryNotFoundDefect):
- self._msgobj('msg_15.txt')
-
- def test_multipart_no_boundary(self):
- with self.assertRaises(errors.NoBoundaryInMultipartDefect):
- self._msgobj('msg_25.txt')
-
- def test_lying_multipart(self):
- with self.assertRaises(errors.NoBoundaryInMultipartDefect):
- self._msgobj('msg_41.txt')
-
-
- def test_missing_start_boundary(self):
- with self.assertRaises(errors.StartBoundaryNotFoundDefect):
- self._msgobj('msg_42.txt')
-
- def test_first_line_is_continuation_header(self):
- m = ' Line 1\nLine 2\nLine 3'
- with self.assertRaises(errors.FirstHeaderLineIsContinuationDefect):
- msg = email.message_from_string(m, policy=email.policy.strict)
+ eq(msg.defects[0].line, ' Line 1\n')
# Test RFC 2047 header encoding and decoding
@@ -2610,6 +2560,13 @@ class TestMiscellaneous(TestEmailBase):
for subpart in msg.walk():
unless(isinstance(subpart, MyMessage))
+ def test_custom_message_does_not_require_arguments(self):
+ class MyMessage(Message):
+ def __init__(self):
+ super().__init__()
+ msg = self._str_msg("Subject: test\n\ntest", MyMessage)
+ self.assertTrue(isinstance(msg, MyMessage))
+
def test__all__(self):
module = __import__('email')
self.assertEqual(sorted(module.__all__), [
@@ -3137,25 +3094,6 @@ Here's the message body
g.flatten(msg, linesep='\r\n')
self.assertEqual(s.getvalue(), text)
- def test_crlf_control_via_policy(self):
- with openfile('msg_26.txt', newline='\n') as fp:
- text = fp.read()
- msg = email.message_from_string(text)
- s = StringIO()
- g = email.generator.Generator(s, policy=email.policy.SMTP)
- g.flatten(msg)
- self.assertEqual(s.getvalue(), text)
-
- def test_flatten_linesep_overrides_policy(self):
- # msg_27 is lf separated
- with openfile('msg_27.txt', newline='\n') as fp:
- text = fp.read()
- msg = email.message_from_string(text)
- s = StringIO()
- g = email.generator.Generator(s, policy=email.policy.SMTP)
- g.flatten(msg, linesep='\n')
- self.assertEqual(s.getvalue(), text)
-
maxDiff = None
def test_multipart_digest_with_extra_mime_headers(self):
@@ -3646,44 +3584,6 @@ class Test8BitBytesHandling(unittest.TestCase):
s.getvalue(),
'Subject: =?utf-8?b?xb5sdcWlb3XEjWvDvSBrxa/FiA==?=\r\n\r\n')
- def test_crlf_control_via_policy(self):
- # msg_26 is crlf terminated
- with openfile('msg_26.txt', 'rb') as fp:
- text = fp.read()
- msg = email.message_from_bytes(text)
- s = BytesIO()
- g = email.generator.BytesGenerator(s, policy=email.policy.SMTP)
- g.flatten(msg)
- self.assertEqual(s.getvalue(), text)
-
- def test_flatten_linesep_overrides_policy(self):
- # msg_27 is lf separated
- with openfile('msg_27.txt', 'rb') as fp:
- text = fp.read()
- msg = email.message_from_bytes(text)
- s = BytesIO()
- g = email.generator.BytesGenerator(s, policy=email.policy.SMTP)
- g.flatten(msg, linesep='\n')
- self.assertEqual(s.getvalue(), text)
-
- def test_must_be_7bit_handles_unknown_8bit(self):
- msg = email.message_from_bytes(self.non_latin_bin_msg)
- out = BytesIO()
- g = email.generator.BytesGenerator(out,
- policy=email.policy.default.clone(must_be_7bit=True))
- g.flatten(msg)
- self.assertEqual(out.getvalue(),
- self.non_latin_bin_msg_as7bit_wrapped.encode('ascii'))
-
- def test_must_be_7bit_transforms_8bit_cte(self):
- msg = email.message_from_bytes(self.latin_bin_msg)
- out = BytesIO()
- g = email.generator.BytesGenerator(out,
- policy=email.policy.default.clone(must_be_7bit=True))
- g.flatten(msg)
- self.assertEqual(out.getvalue(),
- self.latin_bin_msg_as7bit.encode('ascii'))
-
maxDiff = None
diff --git a/Lib/test/test_email/test_generator.py b/Lib/test/test_email/test_generator.py
index 35ca6c5..8f5fde7 100644
--- a/Lib/test/test_email/test_generator.py
+++ b/Lib/test/test_email/test_generator.py
@@ -11,6 +11,8 @@ from test.test_email import TestEmailBase
class TestGeneratorBase():
+ policy = policy.compat32
+
long_subject = {
0: textwrap.dedent("""\
To: whom_it_may_concern@example.com
@@ -58,11 +60,11 @@ class TestGeneratorBase():
long_subject[100] = long_subject[0]
def maxheaderlen_parameter_test(self, n):
- msg = self.msgmaker(self.long_subject[0])
+ msg = self.msgmaker(self.typ(self.long_subject[0]))
s = self.ioclass()
g = self.genclass(s, maxheaderlen=n)
g.flatten(msg)
- self.assertEqual(s.getvalue(), self.long_subject[n])
+ self.assertEqual(s.getvalue(), self.typ(self.long_subject[n]))
def test_maxheaderlen_parameter_0(self):
self.maxheaderlen_parameter_test(0)
@@ -77,11 +79,11 @@ class TestGeneratorBase():
self.maxheaderlen_parameter_test(20)
def maxheaderlen_policy_test(self, n):
- msg = self.msgmaker(self.long_subject[0])
+ msg = self.msgmaker(self.typ(self.long_subject[0]))
s = self.ioclass()
g = self.genclass(s, policy=policy.default.clone(max_line_length=n))
g.flatten(msg)
- self.assertEqual(s.getvalue(), self.long_subject[n])
+ self.assertEqual(s.getvalue(), self.typ(self.long_subject[n]))
def test_maxheaderlen_policy_0(self):
self.maxheaderlen_policy_test(0)
@@ -96,12 +98,12 @@ class TestGeneratorBase():
self.maxheaderlen_policy_test(20)
def maxheaderlen_parm_overrides_policy_test(self, n):
- msg = self.msgmaker(self.long_subject[0])
+ msg = self.msgmaker(self.typ(self.long_subject[0]))
s = self.ioclass()
g = self.genclass(s, maxheaderlen=n,
policy=policy.default.clone(max_line_length=10))
g.flatten(msg)
- self.assertEqual(s.getvalue(), self.long_subject[n])
+ self.assertEqual(s.getvalue(), self.typ(self.long_subject[n]))
def test_maxheaderlen_parm_overrides_policy_0(self):
self.maxheaderlen_parm_overrides_policy_test(0)
@@ -115,21 +117,84 @@ class TestGeneratorBase():
def test_maxheaderlen_parm_overrides_policy_20(self):
self.maxheaderlen_parm_overrides_policy_test(20)
+ def test_crlf_control_via_policy(self):
+ source = "Subject: test\r\n\r\ntest body\r\n"
+ expected = source
+ msg = self.msgmaker(self.typ(source))
+ s = self.ioclass()
+ g = self.genclass(s, policy=policy.SMTP)
+ g.flatten(msg)
+ self.assertEqual(s.getvalue(), self.typ(expected))
+
+ def test_flatten_linesep_overrides_policy(self):
+ source = "Subject: test\n\ntest body\n"
+ expected = source
+ msg = self.msgmaker(self.typ(source))
+ s = self.ioclass()
+ g = self.genclass(s, policy=policy.SMTP)
+ g.flatten(msg, linesep='\n')
+ self.assertEqual(s.getvalue(), self.typ(expected))
+
class TestGenerator(TestGeneratorBase, TestEmailBase):
- msgmaker = staticmethod(message_from_string)
genclass = Generator
ioclass = io.StringIO
+ typ = str
+
+ def msgmaker(self, msg, policy=None):
+ policy = self.policy if policy is None else policy
+ return message_from_string(msg, policy=policy)
class TestBytesGenerator(TestGeneratorBase, TestEmailBase):
- msgmaker = staticmethod(message_from_bytes)
genclass = BytesGenerator
ioclass = io.BytesIO
- long_subject = {key: x.encode('ascii')
- for key, x in TestGeneratorBase.long_subject.items()}
+ typ = lambda self, x: x.encode('ascii')
+
+ def msgmaker(self, msg, policy=None):
+ policy = self.policy if policy is None else policy
+ return message_from_bytes(msg, policy=policy)
+
+ def test_cte_type_7bit_handles_unknown_8bit(self):
+ source = ("Subject: Maintenant je vous présente mon "
+ "collègue\n\n").encode('utf-8')
+ expected = ('Subject: =?unknown-8bit?q?Maintenant_je_vous_pr=C3=A9sente_mon_'
+ 'coll=C3=A8gue?=\n\n').encode('ascii')
+ msg = message_from_bytes(source)
+ s = io.BytesIO()
+ g = BytesGenerator(s, policy=self.policy.clone(cte_type='7bit'))
+ g.flatten(msg)
+ self.assertEqual(s.getvalue(), expected)
+
+ def test_cte_type_7bit_transforms_8bit_cte(self):
+ source = textwrap.dedent("""\
+ From: foo@bar.com
+ To: Dinsdale
+ Subject: Nudge nudge, wink, wink
+ Mime-Version: 1.0
+ Content-Type: text/plain; charset="latin-1"
+ Content-Transfer-Encoding: 8bit
+
+ oh là là, know what I mean, know what I mean?
+ """).encode('latin1')
+ msg = message_from_bytes(source)
+ expected = textwrap.dedent("""\
+ From: foo@bar.com
+ To: Dinsdale
+ Subject: Nudge nudge, wink, wink
+ Mime-Version: 1.0
+ Content-Type: text/plain; charset="iso-8859-1"
+ Content-Transfer-Encoding: quoted-printable
+
+ oh l=E0 l=E0, know what I mean, know what I mean?
+ """).encode('ascii')
+ s = io.BytesIO()
+ g = BytesGenerator(s, policy=self.policy.clone(cte_type='7bit',
+ linesep='\n'))
+ g.flatten(msg)
+ self.assertEqual(s.getvalue(), expected)
if __name__ == '__main__':
diff --git a/Lib/test/test_email/test_parser.py b/Lib/test/test_email/test_parser.py
new file mode 100644
index 0000000..864e4c1
--- /dev/null
+++ b/Lib/test/test_email/test_parser.py
@@ -0,0 +1,276 @@
+import io
+import email
+import textwrap
+import unittest
+from email._policybase import Compat32
+from email import errors
+from email.message import Message
+from test.test_email import TestEmailBase
+
+
+class TestCustomMessage(TestEmailBase):
+
+ class MyMessage(Message):
+ def __init__(self, policy):
+ self.check_policy = policy
+ super().__init__()
+
+ MyPolicy = TestEmailBase.policy.clone(linesep='boo')
+
+ def test_custom_message_gets_policy_if_possible_from_string(self):
+ msg = email.message_from_string("Subject: bogus\n\nmsg\n",
+ self.MyMessage,
+ policy=self.MyPolicy)
+ self.assertTrue(isinstance(msg, self.MyMessage))
+ self.assertIs(msg.check_policy, self.MyPolicy)
+
+ def test_custom_message_gets_policy_if_possible_from_file(self):
+ source_file = io.StringIO("Subject: bogus\n\nmsg\n")
+ msg = email.message_from_file(source_file,
+ self.MyMessage,
+ policy=self.MyPolicy)
+ self.assertTrue(isinstance(msg, self.MyMessage))
+ self.assertIs(msg.check_policy, self.MyPolicy)
+
+ # XXX add tests for other functions that take Message arg.
+
+
+class TestMessageDefectDetectionBase:
+
+ dup_boundary_msg = textwrap.dedent("""\
+ Subject: XX
+ From: xx@xx.dk
+ To: XX
+ Mime-version: 1.0
+ Content-type: multipart/mixed;
+ boundary="MS_Mac_OE_3071477847_720252_MIME_Part"
+
+ --MS_Mac_OE_3071477847_720252_MIME_Part
+ Content-type: multipart/alternative;
+ boundary="MS_Mac_OE_3071477847_720252_MIME_Part"
+
+ --MS_Mac_OE_3071477847_720252_MIME_Part
+ Content-type: text/plain; charset="ISO-8859-1"
+ Content-transfer-encoding: quoted-printable
+
+ text
+
+ --MS_Mac_OE_3071477847_720252_MIME_Part
+ Content-type: text/html; charset="ISO-8859-1"
+ Content-transfer-encoding: quoted-printable
+
+ <HTML></HTML>
+
+ --MS_Mac_OE_3071477847_720252_MIME_Part--
+
+ --MS_Mac_OE_3071477847_720252_MIME_Part
+ Content-type: image/gif; name="xx.gif";
+ Content-disposition: attachment
+ Content-transfer-encoding: base64
+
+ Some removed base64 encoded chars.
+
+ --MS_Mac_OE_3071477847_720252_MIME_Part--
+
+ """)
+
+ def test_same_boundary_inner_outer(self):
+ # XXX better would be to actually detect the duplicate.
+ msg = self._str_msg(self.dup_boundary_msg)
+ inner = msg.get_payload(0)
+ self.assertTrue(hasattr(inner, 'defects'))
+ self.assertEqual(len(self.get_defects(inner)), 1)
+ self.assertTrue(isinstance(self.get_defects(inner)[0],
+ errors.StartBoundaryNotFoundDefect))
+
+ def test_same_boundary_inner_outer_raises_on_defect(self):
+ with self.assertRaises(errors.StartBoundaryNotFoundDefect):
+ self._str_msg(self.dup_boundary_msg,
+ policy=self.policy.clone(raise_on_defect=True))
+
+ no_boundary_msg = textwrap.dedent("""\
+ Date: Fri, 6 Apr 2001 09:23:06 -0800 (GMT-0800)
+ From: foobar
+ Subject: broken mail
+ MIME-Version: 1.0
+ Content-Type: multipart/report; report-type=delivery-status;
+
+ --JAB03225.986577786/zinfandel.lacita.com
+
+ One part
+
+ --JAB03225.986577786/zinfandel.lacita.com
+ Content-Type: message/delivery-status
+
+ Header: Another part
+
+ --JAB03225.986577786/zinfandel.lacita.com--
+ """)
+
+ def test_multipart_no_boundary(self):
+ msg = self._str_msg(self.no_boundary_msg)
+ self.assertTrue(isinstance(msg.get_payload(), str))
+ self.assertEqual(len(self.get_defects(msg)), 2)
+ self.assertTrue(isinstance(self.get_defects(msg)[0],
+ errors.NoBoundaryInMultipartDefect))
+ self.assertTrue(isinstance(self.get_defects(msg)[1],
+ errors.MultipartInvariantViolationDefect))
+
+ def test_multipart_no_boundary_raise_on_defect(self):
+ with self.assertRaises(errors.NoBoundaryInMultipartDefect):
+ self._str_msg(self.no_boundary_msg,
+ policy=self.policy.clone(raise_on_defect=True))
+
+ multipart_msg = textwrap.dedent("""\
+ Date: Wed, 14 Nov 2007 12:56:23 GMT
+ From: foo@bar.invalid
+ To: foo@bar.invalid
+ Subject: Content-Transfer-Encoding: base64 and multipart
+ MIME-Version: 1.0
+ Content-Type: multipart/mixed;
+ boundary="===============3344438784458119861=="{}
+
+ --===============3344438784458119861==
+ Content-Type: text/plain
+
+ Test message
+
+ --===============3344438784458119861==
+ Content-Type: application/octet-stream
+ Content-Transfer-Encoding: base64
+
+ YWJj
+
+ --===============3344438784458119861==--
+ """)
+
+ def test_multipart_invalid_cte(self):
+ msg = self._str_msg(
+ self.multipart_msg.format("\nContent-Transfer-Encoding: base64"))
+ self.assertEqual(len(self.get_defects(msg)), 1)
+ self.assertIsInstance(self.get_defects(msg)[0],
+ errors.InvalidMultipartContentTransferEncodingDefect)
+
+ def test_multipart_invalid_cte_raise_on_defect(self):
+ with self.assertRaises(
+ errors.InvalidMultipartContentTransferEncodingDefect):
+ self._str_msg(
+ self.multipart_msg.format(
+ "\nContent-Transfer-Encoding: base64"),
+ policy=self.policy.clone(raise_on_defect=True))
+
+ def test_multipart_no_cte_no_defect(self):
+ msg = self._str_msg(self.multipart_msg.format(''))
+ self.assertEqual(len(self.get_defects(msg)), 0)
+
+ def test_multipart_valid_cte_no_defect(self):
+ for cte in ('7bit', '8bit', 'BINary'):
+ msg = self._str_msg(
+ self.multipart_msg.format("\nContent-Transfer-Encoding: "+cte))
+ self.assertEqual(len(self.get_defects(msg)), 0, "cte="+cte)
+
+ lying_multipart_msg = textwrap.dedent("""\
+ From: "Allison Dunlap" <xxx@example.com>
+ To: yyy@example.com
+ Subject: 64423
+ Date: Sun, 11 Jul 2004 16:09:27 -0300
+ MIME-Version: 1.0
+ Content-Type: multipart/alternative;
+
+ Blah blah blah
+ """)
+
+ def test_lying_multipart(self):
+ msg = self._str_msg(self.lying_multipart_msg)
+ self.assertTrue(hasattr(msg, 'defects'))
+ self.assertEqual(len(self.get_defects(msg)), 2)
+ self.assertTrue(isinstance(self.get_defects(msg)[0],
+ errors.NoBoundaryInMultipartDefect))
+ self.assertTrue(isinstance(self.get_defects(msg)[1],
+ errors.MultipartInvariantViolationDefect))
+
+ def test_lying_multipart_raise_on_defect(self):
+ with self.assertRaises(errors.NoBoundaryInMultipartDefect):
+ self._str_msg(self.lying_multipart_msg,
+ policy=self.policy.clone(raise_on_defect=True))
+
+ missing_start_boundary_msg = textwrap.dedent("""\
+ Content-Type: multipart/mixed; boundary="AAA"
+ From: Mail Delivery Subsystem <xxx@example.com>
+ To: yyy@example.com
+
+ --AAA
+
+ Stuff
+
+ --AAA
+ Content-Type: message/rfc822
+
+ From: webmaster@python.org
+ To: zzz@example.com
+ Content-Type: multipart/mixed; boundary="BBB"
+
+ --BBB--
+
+ --AAA--
+
+ """)
+
+ def test_missing_start_boundary(self):
+ # The message structure is:
+ #
+ # multipart/mixed
+ # text/plain
+ # message/rfc822
+ # multipart/mixed [*]
+ #
+ # [*] This message is missing its start boundary
+ outer = self._str_msg(self.missing_start_boundary_msg)
+ bad = outer.get_payload(1).get_payload(0)
+ self.assertEqual(len(self.get_defects(bad)), 1)
+ self.assertTrue(isinstance(self.get_defects(bad)[0],
+ errors.StartBoundaryNotFoundDefect))
+
+ def test_missing_start_boundary_raise_on_defect(self):
+ with self.assertRaises(errors.StartBoundaryNotFoundDefect):
+ self._str_msg(self.missing_start_boundary_msg,
+ policy=self.policy.clone(raise_on_defect=True))
+
+ def test_first_line_is_continuation_header(self):
+ msg = self._str_msg(' Line 1\nLine 2\nLine 3')
+ self.assertEqual(msg.keys(), [])
+ self.assertEqual(msg.get_payload(), 'Line 2\nLine 3')
+ self.assertEqual(len(self.get_defects(msg)), 1)
+ self.assertTrue(isinstance(self.get_defects(msg)[0],
+ errors.FirstHeaderLineIsContinuationDefect))
+ self.assertEqual(self.get_defects(msg)[0].line, ' Line 1\n')
+
+ def test_first_line_is_continuation_header_raise_on_defect(self):
+ with self.assertRaises(errors.FirstHeaderLineIsContinuationDefect):
+ self._str_msg(' Line 1\nLine 2\nLine 3',
+ policy=self.policy.clone(raise_on_defect=True))
+
+
+class TestMessageDefectDetection(TestMessageDefectDetectionBase, TestEmailBase):
+
+ def get_defects(self, obj):
+ return obj.defects
+
+
+class TestMessageDefectDetectionCapture(TestMessageDefectDetectionBase,
+ TestEmailBase):
+
+ class CapturePolicy(Compat32):
+ captured = None
+ def register_defect(self, obj, defect):
+ self.captured.append(defect)
+
+ def setUp(self):
+ self.policy = self.CapturePolicy(captured=list())
+
+ def get_defects(self, obj):
+ return self.policy.captured
+
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/Lib/test/test_email/test_policy.py b/Lib/test/test_email/test_policy.py
index 1c65901..07925a7 100644
--- a/Lib/test/test_email/test_policy.py
+++ b/Lib/test/test_email/test_policy.py
@@ -1,6 +1,10 @@
+import io
import types
+import textwrap
import unittest
import email.policy
+import email.parser
+import email.generator
class PolicyAPITests(unittest.TestCase):
@@ -11,14 +15,15 @@ class PolicyAPITests(unittest.TestCase):
policy_defaults = {
'max_line_length': 78,
'linesep': '\n',
- 'must_be_7bit': False,
+ 'cte_type': '8bit',
'raise_on_defect': False,
}
# For each policy under test, we give here the values of the attributes
# that are different from the defaults for that policy.
policies = {
- email.policy.Policy(): {},
+ email.policy.Compat32(): {},
+ email.policy.compat32: {},
email.policy.default: {},
email.policy.SMTP: {'linesep': '\r\n'},
email.policy.HTTP: {'linesep': '\r\n', 'max_line_length': None},
@@ -44,6 +49,18 @@ class PolicyAPITests(unittest.TestCase):
self.assertIn(attr, self.policy_defaults,
"{} is not fully tested".format(attr))
+ def test_abc(self):
+ with self.assertRaises(TypeError) as cm:
+ email.policy.Policy()
+ msg = str(cm.exception)
+ abstract_methods = ('fold',
+ 'fold_binary',
+ 'header_fetch_parse',
+ 'header_source_parse',
+ 'header_store_parse')
+ for method in abstract_methods:
+ self.assertIn(method, msg)
+
def test_policy_is_immutable(self):
for policy in self.policies:
for attr in self.policy_defaults:
@@ -88,7 +105,7 @@ class PolicyAPITests(unittest.TestCase):
self.defects = []
obj = Dummy()
defect = object()
- policy = email.policy.Policy()
+ policy = email.policy.Compat32()
policy.register_defect(obj, defect)
self.assertEqual(obj.defects, [defect])
defect2 = object()
@@ -117,7 +134,7 @@ class PolicyAPITests(unittest.TestCase):
email.policy.default.handle_defect(foo, defect2)
self.assertEqual(foo.defects, [defect1, defect2])
- class MyPolicy(email.policy.Policy):
+ class MyPolicy(email.policy.Compat32):
defects = None
def __init__(self, *args, **kw):
super().__init__(*args, defects=[], **kw)
@@ -146,5 +163,94 @@ class PolicyAPITests(unittest.TestCase):
# For adding subclassed objects, make sure the usual rules apply (subclass
# wins), but that the order still works (right overrides left).
+
+class TestPolicyPropagation(unittest.TestCase):
+
+ # The abstract methods are used by the parser but not by the wrapper
+ # functions that call it, so if the exception gets raised we know that the
+ # policy was actually propagated all the way to feedparser.
+ class MyPolicy(email.policy.Policy):
+ def badmethod(self, *args, **kw):
+ raise Exception("test")
+ fold = fold_binary = header_fetch_parser = badmethod
+ header_source_parse = header_store_parse = badmethod
+
+ def test_message_from_string(self):
+ with self.assertRaisesRegex(Exception, "^test$"):
+ email.message_from_string("Subject: test\n\n",
+ policy=self.MyPolicy)
+
+ def test_message_from_bytes(self):
+ with self.assertRaisesRegex(Exception, "^test$"):
+ email.message_from_bytes(b"Subject: test\n\n",
+ policy=self.MyPolicy)
+
+ def test_message_from_file(self):
+ f = io.StringIO('Subject: test\n\n')
+ with self.assertRaisesRegex(Exception, "^test$"):
+ email.message_from_file(f, policy=self.MyPolicy)
+
+ def test_message_from_binary_file(self):
+ f = io.BytesIO(b'Subject: test\n\n')
+ with self.assertRaisesRegex(Exception, "^test$"):
+ email.message_from_binary_file(f, policy=self.MyPolicy)
+
+ # These are redundant, but we need them for black-box completeness.
+
+ def test_parser(self):
+ p = email.parser.Parser(policy=self.MyPolicy)
+ with self.assertRaisesRegex(Exception, "^test$"):
+ p.parsestr('Subject: test\n\n')
+
+ def test_bytes_parser(self):
+ p = email.parser.BytesParser(policy=self.MyPolicy)
+ with self.assertRaisesRegex(Exception, "^test$"):
+ p.parsebytes(b'Subject: test\n\n')
+
+ # Now that we've established that all the parse methods get the
+ # policy in to feedparser, we can use message_from_string for
+ # the rest of the propagation tests.
+
+ def _make_msg(self, source='Subject: test\n\n', policy=None):
+ self.policy = email.policy.default.clone() if policy is None else policy
+ return email.message_from_string(source, policy=self.policy)
+
+ def test_parser_propagates_policy_to_message(self):
+ msg = self._make_msg()
+ self.assertIs(msg.policy, self.policy)
+
+ def test_parser_propagates_policy_to_sub_messages(self):
+ msg = self._make_msg(textwrap.dedent("""\
+ Subject: mime test
+ MIME-Version: 1.0
+ Content-Type: multipart/mixed, boundary="XXX"
+
+ --XXX
+ Content-Type: text/plain
+
+ test
+ --XXX
+ Content-Type: text/plain
+
+ test2
+ --XXX--
+ """))
+ for part in msg.walk():
+ self.assertIs(part.policy, self.policy)
+
+ def test_message_policy_propagates_to_generator(self):
+ msg = self._make_msg("Subject: test\nTo: foo\n\n",
+ policy=email.policy.default.clone(linesep='X'))
+ s = io.StringIO()
+ g = email.generator.Generator(s)
+ g.flatten(msg)
+ self.assertEqual(s.getvalue(), "Subject: testXTo: fooXX")
+
+ def test_message_policy_used_by_as_string(self):
+ msg = self._make_msg("Subject: test\nTo: foo\n\n",
+ policy=email.policy.default.clone(linesep='X'))
+ self.assertEqual(msg.as_string(), "Subject: testXTo: fooXX")
+
+
if __name__ == '__main__':
unittest.main()