summaryrefslogtreecommitdiffstats
path: root/Lib/test
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/test')
-rw-r--r--Lib/test/test_email/__init__.py14
-rw-r--r--Lib/test/test_email/test_email.py180
-rw-r--r--Lib/test/test_email/test_generator.py85
-rw-r--r--Lib/test/test_email/test_parser.py276
-rw-r--r--Lib/test/test_email/test_policy.py114
5 files changed, 514 insertions, 155 deletions
diff --git a/Lib/test/test_email/__init__.py b/Lib/test/test_email/__init__.py
index d72b50e..b05fb3c 100644
--- a/Lib/test/test_email/__init__.py
+++ b/Lib/test/test_email/__init__.py
@@ -3,6 +3,8 @@ import sys
import unittest
import test.support
import email
+from email.message import Message
+from email._policybase import compat32
from test.test_email import __file__ as landmark
# Run all tests in package for '-m unittest test.test_email'
@@ -36,16 +38,26 @@ def openfile(filename, *args, **kws):
class TestEmailBase(unittest.TestCase):
maxDiff = None
+ # Currently the default policy is compat32. By setting that as the default
+ # here we make minimal changes in the test_email tests compared to their
+ # pre-3.3 state.
+ policy = compat32
def __init__(self, *args, **kw):
super().__init__(*args, **kw)
self.addTypeEqualityFunc(bytes, self.assertBytesEqual)
+ # Backward compatibility to minimize test_email test changes.
ndiffAssertEqual = unittest.TestCase.assertEqual
def _msgobj(self, filename):
with openfile(filename) as fp:
- return email.message_from_file(fp)
+ return email.message_from_file(fp, policy=self.policy)
+
+ def _str_msg(self, string, message=Message, policy=None):
+ if policy is None:
+ policy = self.policy
+ return email.message_from_string(string, message, policy=policy)
def _bytes_repr(self, b):
return [repr(x) for x in b.splitlines(keepends=True)]
diff --git a/Lib/test/test_email/test_email.py b/Lib/test/test_email/test_email.py
index b07f675..ac6ee65 100644
--- a/Lib/test/test_email/test_email.py
+++ b/Lib/test/test_email/test_email.py
@@ -16,6 +16,7 @@ from io import StringIO, BytesIO
from itertools import chain
import email
+import email.policy
from email.charset import Charset
from email.header import Header, decode_header, make_header
@@ -1805,11 +1806,7 @@ YXNkZg==
# Test some badly formatted messages
-class TestNonConformantBase:
-
- def _msgobj(self, filename):
- with openfile(filename) as fp:
- return email.message_from_file(fp, policy=self.policy)
+class TestNonConformant(TestEmailBase):
def test_parse_missing_minor_type(self):
eq = self.assertEqual
@@ -1818,24 +1815,26 @@ class TestNonConformantBase:
eq(msg.get_content_maintype(), 'text')
eq(msg.get_content_subtype(), 'plain')
+ # test_parser.TestMessageDefectDetectionBase
def test_same_boundary_inner_outer(self):
unless = self.assertTrue
msg = self._msgobj('msg_15.txt')
# XXX We can probably eventually do better
inner = msg.get_payload(0)
unless(hasattr(inner, 'defects'))
- self.assertEqual(len(self.get_defects(inner)), 1)
- unless(isinstance(self.get_defects(inner)[0],
+ self.assertEqual(len(inner.defects), 1)
+ unless(isinstance(inner.defects[0],
errors.StartBoundaryNotFoundDefect))
+ # test_parser.TestMessageDefectDetectionBase
def test_multipart_no_boundary(self):
unless = self.assertTrue
msg = self._msgobj('msg_25.txt')
unless(isinstance(msg.get_payload(), str))
- self.assertEqual(len(self.get_defects(msg)), 2)
- unless(isinstance(self.get_defects(msg)[0],
+ self.assertEqual(len(msg.defects), 2)
+ unless(isinstance(msg.defects[0],
errors.NoBoundaryInMultipartDefect))
- unless(isinstance(self.get_defects(msg)[1],
+ unless(isinstance(msg.defects[1],
errors.MultipartInvariantViolationDefect))
multipart_msg = textwrap.dedent("""\
@@ -1861,27 +1860,26 @@ class TestNonConformantBase:
--===============3344438784458119861==--
""")
+ # test_parser.TestMessageDefectDetectionBase
def test_multipart_invalid_cte(self):
- msg = email.message_from_string(
- self.multipart_msg.format("\nContent-Transfer-Encoding: base64"),
- policy = self.policy)
- self.assertEqual(len(self.get_defects(msg)), 1)
- self.assertIsInstance(self.get_defects(msg)[0],
+ msg = self._str_msg(
+ self.multipart_msg.format("\nContent-Transfer-Encoding: base64"))
+ self.assertEqual(len(msg.defects), 1)
+ self.assertIsInstance(msg.defects[0],
errors.InvalidMultipartContentTransferEncodingDefect)
+ # test_parser.TestMessageDefectDetectionBase
def test_multipart_no_cte_no_defect(self):
- msg = email.message_from_string(
- self.multipart_msg.format(''),
- policy = self.policy)
- self.assertEqual(len(self.get_defects(msg)), 0)
+ msg = self._str_msg(self.multipart_msg.format(''))
+ self.assertEqual(len(msg.defects), 0)
+ # test_parser.TestMessageDefectDetectionBase
def test_multipart_valid_cte_no_defect(self):
for cte in ('7bit', '8bit', 'BINary'):
- msg = email.message_from_string(
+ msg = self._str_msg(
self.multipart_msg.format(
- "\nContent-Transfer-Encoding: {}".format(cte)),
- policy = self.policy)
- self.assertEqual(len(self.get_defects(msg)), 0)
+ "\nContent-Transfer-Encoding: {}".format(cte)))
+ self.assertEqual(len(msg.defects), 0)
def test_invalid_content_type(self):
eq = self.assertEqual
@@ -1932,16 +1930,18 @@ Subject: here's something interesting
counter to RFC 2822, there's no separating newline here
""")
+ # test_parser.TestMessageDefectDetectionBase
def test_lying_multipart(self):
unless = self.assertTrue
msg = self._msgobj('msg_41.txt')
unless(hasattr(msg, 'defects'))
- self.assertEqual(len(self.get_defects(msg)), 2)
- unless(isinstance(self.get_defects(msg)[0],
+ self.assertEqual(len(msg.defects), 2)
+ unless(isinstance(msg.defects[0],
errors.NoBoundaryInMultipartDefect))
- unless(isinstance(self.get_defects(msg)[1],
+ unless(isinstance(msg.defects[1],
errors.MultipartInvariantViolationDefect))
+ # test_parser.TestMessageDefectDetectionBase
def test_missing_start_boundary(self):
outer = self._msgobj('msg_42.txt')
# The message structure is:
@@ -1953,71 +1953,21 @@ counter to RFC 2822, there's no separating newline here
#
# [*] This message is missing its start boundary
bad = outer.get_payload(1).get_payload(0)
- self.assertEqual(len(self.get_defects(bad)), 1)
- self.assertTrue(isinstance(self.get_defects(bad)[0],
+ self.assertEqual(len(bad.defects), 1)
+ self.assertTrue(isinstance(bad.defects[0],
errors.StartBoundaryNotFoundDefect))
+ # test_parser.TestMessageDefectDetectionBase
def test_first_line_is_continuation_header(self):
eq = self.assertEqual
m = ' Line 1\nLine 2\nLine 3'
- msg = email.message_from_string(m, policy=self.policy)
+ msg = email.message_from_string(m)
eq(msg.keys(), [])
eq(msg.get_payload(), 'Line 2\nLine 3')
- eq(len(self.get_defects(msg)), 1)
- self.assertTrue(isinstance(self.get_defects(msg)[0],
+ eq(len(msg.defects), 1)
+ self.assertTrue(isinstance(msg.defects[0],
errors.FirstHeaderLineIsContinuationDefect))
- eq(self.get_defects(msg)[0].line, ' Line 1\n')
-
-
-class TestNonConformant(TestNonConformantBase, TestEmailBase):
-
- policy=email.policy.default
-
- def get_defects(self, obj):
- return obj.defects
-
-
-class TestNonConformantCapture(TestNonConformantBase, TestEmailBase):
-
- class CapturePolicy(email.policy.Policy):
- captured = None
- def register_defect(self, obj, defect):
- self.captured.append(defect)
-
- def setUp(self):
- self.policy = self.CapturePolicy(captured=list())
-
- def get_defects(self, obj):
- return self.policy.captured
-
-
-class TestRaisingDefects(TestEmailBase):
-
- def _msgobj(self, filename):
- with openfile(filename) as fp:
- return email.message_from_file(fp, policy=email.policy.strict)
-
- def test_same_boundary_inner_outer(self):
- with self.assertRaises(errors.StartBoundaryNotFoundDefect):
- self._msgobj('msg_15.txt')
-
- def test_multipart_no_boundary(self):
- with self.assertRaises(errors.NoBoundaryInMultipartDefect):
- self._msgobj('msg_25.txt')
-
- def test_lying_multipart(self):
- with self.assertRaises(errors.NoBoundaryInMultipartDefect):
- self._msgobj('msg_41.txt')
-
-
- def test_missing_start_boundary(self):
- with self.assertRaises(errors.StartBoundaryNotFoundDefect):
- self._msgobj('msg_42.txt')
-
- def test_first_line_is_continuation_header(self):
- m = ' Line 1\nLine 2\nLine 3'
- with self.assertRaises(errors.FirstHeaderLineIsContinuationDefect):
- msg = email.message_from_string(m, policy=email.policy.strict)
+ eq(msg.defects[0].line, ' Line 1\n')
# Test RFC 2047 header encoding and decoding
@@ -2610,6 +2560,13 @@ class TestMiscellaneous(TestEmailBase):
for subpart in msg.walk():
unless(isinstance(subpart, MyMessage))
+ def test_custom_message_does_not_require_arguments(self):
+ class MyMessage(Message):
+ def __init__(self):
+ super().__init__()
+ msg = self._str_msg("Subject: test\n\ntest", MyMessage)
+ self.assertTrue(isinstance(msg, MyMessage))
+
def test__all__(self):
module = __import__('email')
self.assertEqual(sorted(module.__all__), [
@@ -3137,25 +3094,6 @@ Here's the message body
g.flatten(msg, linesep='\r\n')
self.assertEqual(s.getvalue(), text)
- def test_crlf_control_via_policy(self):
- with openfile('msg_26.txt', newline='\n') as fp:
- text = fp.read()
- msg = email.message_from_string(text)
- s = StringIO()
- g = email.generator.Generator(s, policy=email.policy.SMTP)
- g.flatten(msg)
- self.assertEqual(s.getvalue(), text)
-
- def test_flatten_linesep_overrides_policy(self):
- # msg_27 is lf separated
- with openfile('msg_27.txt', newline='\n') as fp:
- text = fp.read()
- msg = email.message_from_string(text)
- s = StringIO()
- g = email.generator.Generator(s, policy=email.policy.SMTP)
- g.flatten(msg, linesep='\n')
- self.assertEqual(s.getvalue(), text)
-
maxDiff = None
def test_multipart_digest_with_extra_mime_headers(self):
@@ -3646,44 +3584,6 @@ class Test8BitBytesHandling(unittest.TestCase):
s.getvalue(),
'Subject: =?utf-8?b?xb5sdcWlb3XEjWvDvSBrxa/FiA==?=\r\n\r\n')
- def test_crlf_control_via_policy(self):
- # msg_26 is crlf terminated
- with openfile('msg_26.txt', 'rb') as fp:
- text = fp.read()
- msg = email.message_from_bytes(text)
- s = BytesIO()
- g = email.generator.BytesGenerator(s, policy=email.policy.SMTP)
- g.flatten(msg)
- self.assertEqual(s.getvalue(), text)
-
- def test_flatten_linesep_overrides_policy(self):
- # msg_27 is lf separated
- with openfile('msg_27.txt', 'rb') as fp:
- text = fp.read()
- msg = email.message_from_bytes(text)
- s = BytesIO()
- g = email.generator.BytesGenerator(s, policy=email.policy.SMTP)
- g.flatten(msg, linesep='\n')
- self.assertEqual(s.getvalue(), text)
-
- def test_must_be_7bit_handles_unknown_8bit(self):
- msg = email.message_from_bytes(self.non_latin_bin_msg)
- out = BytesIO()
- g = email.generator.BytesGenerator(out,
- policy=email.policy.default.clone(must_be_7bit=True))
- g.flatten(msg)
- self.assertEqual(out.getvalue(),
- self.non_latin_bin_msg_as7bit_wrapped.encode('ascii'))
-
- def test_must_be_7bit_transforms_8bit_cte(self):
- msg = email.message_from_bytes(self.latin_bin_msg)
- out = BytesIO()
- g = email.generator.BytesGenerator(out,
- policy=email.policy.default.clone(must_be_7bit=True))
- g.flatten(msg)
- self.assertEqual(out.getvalue(),
- self.latin_bin_msg_as7bit.encode('ascii'))
-
maxDiff = None
diff --git a/Lib/test/test_email/test_generator.py b/Lib/test/test_email/test_generator.py
index 35ca6c5..8f5fde7 100644
--- a/Lib/test/test_email/test_generator.py
+++ b/Lib/test/test_email/test_generator.py
@@ -11,6 +11,8 @@ from test.test_email import TestEmailBase
class TestGeneratorBase():
+ policy = policy.compat32
+
long_subject = {
0: textwrap.dedent("""\
To: whom_it_may_concern@example.com
@@ -58,11 +60,11 @@ class TestGeneratorBase():
long_subject[100] = long_subject[0]
def maxheaderlen_parameter_test(self, n):
- msg = self.msgmaker(self.long_subject[0])
+ msg = self.msgmaker(self.typ(self.long_subject[0]))
s = self.ioclass()
g = self.genclass(s, maxheaderlen=n)
g.flatten(msg)
- self.assertEqual(s.getvalue(), self.long_subject[n])
+ self.assertEqual(s.getvalue(), self.typ(self.long_subject[n]))
def test_maxheaderlen_parameter_0(self):
self.maxheaderlen_parameter_test(0)
@@ -77,11 +79,11 @@ class TestGeneratorBase():
self.maxheaderlen_parameter_test(20)
def maxheaderlen_policy_test(self, n):
- msg = self.msgmaker(self.long_subject[0])
+ msg = self.msgmaker(self.typ(self.long_subject[0]))
s = self.ioclass()
g = self.genclass(s, policy=policy.default.clone(max_line_length=n))
g.flatten(msg)
- self.assertEqual(s.getvalue(), self.long_subject[n])
+ self.assertEqual(s.getvalue(), self.typ(self.long_subject[n]))
def test_maxheaderlen_policy_0(self):
self.maxheaderlen_policy_test(0)
@@ -96,12 +98,12 @@ class TestGeneratorBase():
self.maxheaderlen_policy_test(20)
def maxheaderlen_parm_overrides_policy_test(self, n):
- msg = self.msgmaker(self.long_subject[0])
+ msg = self.msgmaker(self.typ(self.long_subject[0]))
s = self.ioclass()
g = self.genclass(s, maxheaderlen=n,
policy=policy.default.clone(max_line_length=10))
g.flatten(msg)
- self.assertEqual(s.getvalue(), self.long_subject[n])
+ self.assertEqual(s.getvalue(), self.typ(self.long_subject[n]))
def test_maxheaderlen_parm_overrides_policy_0(self):
self.maxheaderlen_parm_overrides_policy_test(0)
@@ -115,21 +117,84 @@ class TestGeneratorBase():
def test_maxheaderlen_parm_overrides_policy_20(self):
self.maxheaderlen_parm_overrides_policy_test(20)
+ def test_crlf_control_via_policy(self):
+ source = "Subject: test\r\n\r\ntest body\r\n"
+ expected = source
+ msg = self.msgmaker(self.typ(source))
+ s = self.ioclass()
+ g = self.genclass(s, policy=policy.SMTP)
+ g.flatten(msg)
+ self.assertEqual(s.getvalue(), self.typ(expected))
+
+ def test_flatten_linesep_overrides_policy(self):
+ source = "Subject: test\n\ntest body\n"
+ expected = source
+ msg = self.msgmaker(self.typ(source))
+ s = self.ioclass()
+ g = self.genclass(s, policy=policy.SMTP)
+ g.flatten(msg, linesep='\n')
+ self.assertEqual(s.getvalue(), self.typ(expected))
+
class TestGenerator(TestGeneratorBase, TestEmailBase):
- msgmaker = staticmethod(message_from_string)
genclass = Generator
ioclass = io.StringIO
+ typ = str
+
+ def msgmaker(self, msg, policy=None):
+ policy = self.policy if policy is None else policy
+ return message_from_string(msg, policy=policy)
class TestBytesGenerator(TestGeneratorBase, TestEmailBase):
- msgmaker = staticmethod(message_from_bytes)
genclass = BytesGenerator
ioclass = io.BytesIO
- long_subject = {key: x.encode('ascii')
- for key, x in TestGeneratorBase.long_subject.items()}
+ typ = lambda self, x: x.encode('ascii')
+
+ def msgmaker(self, msg, policy=None):
+ policy = self.policy if policy is None else policy
+ return message_from_bytes(msg, policy=policy)
+
+ def test_cte_type_7bit_handles_unknown_8bit(self):
+ source = ("Subject: Maintenant je vous présente mon "
+ "collègue\n\n").encode('utf-8')
+ expected = ('Subject: =?unknown-8bit?q?Maintenant_je_vous_pr=C3=A9sente_mon_'
+ 'coll=C3=A8gue?=\n\n').encode('ascii')
+ msg = message_from_bytes(source)
+ s = io.BytesIO()
+ g = BytesGenerator(s, policy=self.policy.clone(cte_type='7bit'))
+ g.flatten(msg)
+ self.assertEqual(s.getvalue(), expected)
+
+ def test_cte_type_7bit_transforms_8bit_cte(self):
+ source = textwrap.dedent("""\
+ From: foo@bar.com
+ To: Dinsdale
+ Subject: Nudge nudge, wink, wink
+ Mime-Version: 1.0
+ Content-Type: text/plain; charset="latin-1"
+ Content-Transfer-Encoding: 8bit
+
+ oh là là, know what I mean, know what I mean?
+ """).encode('latin1')
+ msg = message_from_bytes(source)
+ expected = textwrap.dedent("""\
+ From: foo@bar.com
+ To: Dinsdale
+ Subject: Nudge nudge, wink, wink
+ Mime-Version: 1.0
+ Content-Type: text/plain; charset="iso-8859-1"
+ Content-Transfer-Encoding: quoted-printable
+
+ oh l=E0 l=E0, know what I mean, know what I mean?
+ """).encode('ascii')
+ s = io.BytesIO()
+ g = BytesGenerator(s, policy=self.policy.clone(cte_type='7bit',
+ linesep='\n'))
+ g.flatten(msg)
+ self.assertEqual(s.getvalue(), expected)
if __name__ == '__main__':
diff --git a/Lib/test/test_email/test_parser.py b/Lib/test/test_email/test_parser.py
new file mode 100644
index 0000000..864e4c1
--- /dev/null
+++ b/Lib/test/test_email/test_parser.py
@@ -0,0 +1,276 @@
+import io
+import email
+import textwrap
+import unittest
+from email._policybase import Compat32
+from email import errors
+from email.message import Message
+from test.test_email import TestEmailBase
+
+
+class TestCustomMessage(TestEmailBase):
+
+ class MyMessage(Message):
+ def __init__(self, policy):
+ self.check_policy = policy
+ super().__init__()
+
+ MyPolicy = TestEmailBase.policy.clone(linesep='boo')
+
+ def test_custom_message_gets_policy_if_possible_from_string(self):
+ msg = email.message_from_string("Subject: bogus\n\nmsg\n",
+ self.MyMessage,
+ policy=self.MyPolicy)
+ self.assertTrue(isinstance(msg, self.MyMessage))
+ self.assertIs(msg.check_policy, self.MyPolicy)
+
+ def test_custom_message_gets_policy_if_possible_from_file(self):
+ source_file = io.StringIO("Subject: bogus\n\nmsg\n")
+ msg = email.message_from_file(source_file,
+ self.MyMessage,
+ policy=self.MyPolicy)
+ self.assertTrue(isinstance(msg, self.MyMessage))
+ self.assertIs(msg.check_policy, self.MyPolicy)
+
+ # XXX add tests for other functions that take Message arg.
+
+
+class TestMessageDefectDetectionBase:
+
+ dup_boundary_msg = textwrap.dedent("""\
+ Subject: XX
+ From: xx@xx.dk
+ To: XX
+ Mime-version: 1.0
+ Content-type: multipart/mixed;
+ boundary="MS_Mac_OE_3071477847_720252_MIME_Part"
+
+ --MS_Mac_OE_3071477847_720252_MIME_Part
+ Content-type: multipart/alternative;
+ boundary="MS_Mac_OE_3071477847_720252_MIME_Part"
+
+ --MS_Mac_OE_3071477847_720252_MIME_Part
+ Content-type: text/plain; charset="ISO-8859-1"
+ Content-transfer-encoding: quoted-printable
+
+ text
+
+ --MS_Mac_OE_3071477847_720252_MIME_Part
+ Content-type: text/html; charset="ISO-8859-1"
+ Content-transfer-encoding: quoted-printable
+
+ <HTML></HTML>
+
+ --MS_Mac_OE_3071477847_720252_MIME_Part--
+
+ --MS_Mac_OE_3071477847_720252_MIME_Part
+ Content-type: image/gif; name="xx.gif";
+ Content-disposition: attachment
+ Content-transfer-encoding: base64
+
+ Some removed base64 encoded chars.
+
+ --MS_Mac_OE_3071477847_720252_MIME_Part--
+
+ """)
+
+ def test_same_boundary_inner_outer(self):
+ # XXX better would be to actually detect the duplicate.
+ msg = self._str_msg(self.dup_boundary_msg)
+ inner = msg.get_payload(0)
+ self.assertTrue(hasattr(inner, 'defects'))
+ self.assertEqual(len(self.get_defects(inner)), 1)
+ self.assertTrue(isinstance(self.get_defects(inner)[0],
+ errors.StartBoundaryNotFoundDefect))
+
+ def test_same_boundary_inner_outer_raises_on_defect(self):
+ with self.assertRaises(errors.StartBoundaryNotFoundDefect):
+ self._str_msg(self.dup_boundary_msg,
+ policy=self.policy.clone(raise_on_defect=True))
+
+ no_boundary_msg = textwrap.dedent("""\
+ Date: Fri, 6 Apr 2001 09:23:06 -0800 (GMT-0800)
+ From: foobar
+ Subject: broken mail
+ MIME-Version: 1.0
+ Content-Type: multipart/report; report-type=delivery-status;
+
+ --JAB03225.986577786/zinfandel.lacita.com
+
+ One part
+
+ --JAB03225.986577786/zinfandel.lacita.com
+ Content-Type: message/delivery-status
+
+ Header: Another part
+
+ --JAB03225.986577786/zinfandel.lacita.com--
+ """)
+
+ def test_multipart_no_boundary(self):
+ msg = self._str_msg(self.no_boundary_msg)
+ self.assertTrue(isinstance(msg.get_payload(), str))
+ self.assertEqual(len(self.get_defects(msg)), 2)
+ self.assertTrue(isinstance(self.get_defects(msg)[0],
+ errors.NoBoundaryInMultipartDefect))
+ self.assertTrue(isinstance(self.get_defects(msg)[1],
+ errors.MultipartInvariantViolationDefect))
+
+ def test_multipart_no_boundary_raise_on_defect(self):
+ with self.assertRaises(errors.NoBoundaryInMultipartDefect):
+ self._str_msg(self.no_boundary_msg,
+ policy=self.policy.clone(raise_on_defect=True))
+
+ multipart_msg = textwrap.dedent("""\
+ Date: Wed, 14 Nov 2007 12:56:23 GMT
+ From: foo@bar.invalid
+ To: foo@bar.invalid
+ Subject: Content-Transfer-Encoding: base64 and multipart
+ MIME-Version: 1.0
+ Content-Type: multipart/mixed;
+ boundary="===============3344438784458119861=="{}
+
+ --===============3344438784458119861==
+ Content-Type: text/plain
+
+ Test message
+
+ --===============3344438784458119861==
+ Content-Type: application/octet-stream
+ Content-Transfer-Encoding: base64
+
+ YWJj
+
+ --===============3344438784458119861==--
+ """)
+
+ def test_multipart_invalid_cte(self):
+ msg = self._str_msg(
+ self.multipart_msg.format("\nContent-Transfer-Encoding: base64"))
+ self.assertEqual(len(self.get_defects(msg)), 1)
+ self.assertIsInstance(self.get_defects(msg)[0],
+ errors.InvalidMultipartContentTransferEncodingDefect)
+
+ def test_multipart_invalid_cte_raise_on_defect(self):
+ with self.assertRaises(
+ errors.InvalidMultipartContentTransferEncodingDefect):
+ self._str_msg(
+ self.multipart_msg.format(
+ "\nContent-Transfer-Encoding: base64"),
+ policy=self.policy.clone(raise_on_defect=True))
+
+ def test_multipart_no_cte_no_defect(self):
+ msg = self._str_msg(self.multipart_msg.format(''))
+ self.assertEqual(len(self.get_defects(msg)), 0)
+
+ def test_multipart_valid_cte_no_defect(self):
+ for cte in ('7bit', '8bit', 'BINary'):
+ msg = self._str_msg(
+ self.multipart_msg.format("\nContent-Transfer-Encoding: "+cte))
+ self.assertEqual(len(self.get_defects(msg)), 0, "cte="+cte)
+
+ lying_multipart_msg = textwrap.dedent("""\
+ From: "Allison Dunlap" <xxx@example.com>
+ To: yyy@example.com
+ Subject: 64423
+ Date: Sun, 11 Jul 2004 16:09:27 -0300
+ MIME-Version: 1.0
+ Content-Type: multipart/alternative;
+
+ Blah blah blah
+ """)
+
+ def test_lying_multipart(self):
+ msg = self._str_msg(self.lying_multipart_msg)
+ self.assertTrue(hasattr(msg, 'defects'))
+ self.assertEqual(len(self.get_defects(msg)), 2)
+ self.assertTrue(isinstance(self.get_defects(msg)[0],
+ errors.NoBoundaryInMultipartDefect))
+ self.assertTrue(isinstance(self.get_defects(msg)[1],
+ errors.MultipartInvariantViolationDefect))
+
+ def test_lying_multipart_raise_on_defect(self):
+ with self.assertRaises(errors.NoBoundaryInMultipartDefect):
+ self._str_msg(self.lying_multipart_msg,
+ policy=self.policy.clone(raise_on_defect=True))
+
+ missing_start_boundary_msg = textwrap.dedent("""\
+ Content-Type: multipart/mixed; boundary="AAA"
+ From: Mail Delivery Subsystem <xxx@example.com>
+ To: yyy@example.com
+
+ --AAA
+
+ Stuff
+
+ --AAA
+ Content-Type: message/rfc822
+
+ From: webmaster@python.org
+ To: zzz@example.com
+ Content-Type: multipart/mixed; boundary="BBB"
+
+ --BBB--
+
+ --AAA--
+
+ """)
+
+ def test_missing_start_boundary(self):
+ # The message structure is:
+ #
+ # multipart/mixed
+ # text/plain
+ # message/rfc822
+ # multipart/mixed [*]
+ #
+ # [*] This message is missing its start boundary
+ outer = self._str_msg(self.missing_start_boundary_msg)
+ bad = outer.get_payload(1).get_payload(0)
+ self.assertEqual(len(self.get_defects(bad)), 1)
+ self.assertTrue(isinstance(self.get_defects(bad)[0],
+ errors.StartBoundaryNotFoundDefect))
+
+ def test_missing_start_boundary_raise_on_defect(self):
+ with self.assertRaises(errors.StartBoundaryNotFoundDefect):
+ self._str_msg(self.missing_start_boundary_msg,
+ policy=self.policy.clone(raise_on_defect=True))
+
+ def test_first_line_is_continuation_header(self):
+ msg = self._str_msg(' Line 1\nLine 2\nLine 3')
+ self.assertEqual(msg.keys(), [])
+ self.assertEqual(msg.get_payload(), 'Line 2\nLine 3')
+ self.assertEqual(len(self.get_defects(msg)), 1)
+ self.assertTrue(isinstance(self.get_defects(msg)[0],
+ errors.FirstHeaderLineIsContinuationDefect))
+ self.assertEqual(self.get_defects(msg)[0].line, ' Line 1\n')
+
+ def test_first_line_is_continuation_header_raise_on_defect(self):
+ with self.assertRaises(errors.FirstHeaderLineIsContinuationDefect):
+ self._str_msg(' Line 1\nLine 2\nLine 3',
+ policy=self.policy.clone(raise_on_defect=True))
+
+
+class TestMessageDefectDetection(TestMessageDefectDetectionBase, TestEmailBase):
+
+ def get_defects(self, obj):
+ return obj.defects
+
+
+class TestMessageDefectDetectionCapture(TestMessageDefectDetectionBase,
+ TestEmailBase):
+
+ class CapturePolicy(Compat32):
+ captured = None
+ def register_defect(self, obj, defect):
+ self.captured.append(defect)
+
+ def setUp(self):
+ self.policy = self.CapturePolicy(captured=list())
+
+ def get_defects(self, obj):
+ return self.policy.captured
+
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/Lib/test/test_email/test_policy.py b/Lib/test/test_email/test_policy.py
index 1c65901..07925a7 100644
--- a/Lib/test/test_email/test_policy.py
+++ b/Lib/test/test_email/test_policy.py
@@ -1,6 +1,10 @@
+import io
import types
+import textwrap
import unittest
import email.policy
+import email.parser
+import email.generator
class PolicyAPITests(unittest.TestCase):
@@ -11,14 +15,15 @@ class PolicyAPITests(unittest.TestCase):
policy_defaults = {
'max_line_length': 78,
'linesep': '\n',
- 'must_be_7bit': False,
+ 'cte_type': '8bit',
'raise_on_defect': False,
}
# For each policy under test, we give here the values of the attributes
# that are different from the defaults for that policy.
policies = {
- email.policy.Policy(): {},
+ email.policy.Compat32(): {},
+ email.policy.compat32: {},
email.policy.default: {},
email.policy.SMTP: {'linesep': '\r\n'},
email.policy.HTTP: {'linesep': '\r\n', 'max_line_length': None},
@@ -44,6 +49,18 @@ class PolicyAPITests(unittest.TestCase):
self.assertIn(attr, self.policy_defaults,
"{} is not fully tested".format(attr))
+ def test_abc(self):
+ with self.assertRaises(TypeError) as cm:
+ email.policy.Policy()
+ msg = str(cm.exception)
+ abstract_methods = ('fold',
+ 'fold_binary',
+ 'header_fetch_parse',
+ 'header_source_parse',
+ 'header_store_parse')
+ for method in abstract_methods:
+ self.assertIn(method, msg)
+
def test_policy_is_immutable(self):
for policy in self.policies:
for attr in self.policy_defaults:
@@ -88,7 +105,7 @@ class PolicyAPITests(unittest.TestCase):
self.defects = []
obj = Dummy()
defect = object()
- policy = email.policy.Policy()
+ policy = email.policy.Compat32()
policy.register_defect(obj, defect)
self.assertEqual(obj.defects, [defect])
defect2 = object()
@@ -117,7 +134,7 @@ class PolicyAPITests(unittest.TestCase):
email.policy.default.handle_defect(foo, defect2)
self.assertEqual(foo.defects, [defect1, defect2])
- class MyPolicy(email.policy.Policy):
+ class MyPolicy(email.policy.Compat32):
defects = None
def __init__(self, *args, **kw):
super().__init__(*args, defects=[], **kw)
@@ -146,5 +163,94 @@ class PolicyAPITests(unittest.TestCase):
# For adding subclassed objects, make sure the usual rules apply (subclass
# wins), but that the order still works (right overrides left).
+
+class TestPolicyPropagation(unittest.TestCase):
+
+ # The abstract methods are used by the parser but not by the wrapper
+ # functions that call it, so if the exception gets raised we know that the
+ # policy was actually propagated all the way to feedparser.
+ class MyPolicy(email.policy.Policy):
+ def badmethod(self, *args, **kw):
+ raise Exception("test")
+ fold = fold_binary = header_fetch_parser = badmethod
+ header_source_parse = header_store_parse = badmethod
+
+ def test_message_from_string(self):
+ with self.assertRaisesRegex(Exception, "^test$"):
+ email.message_from_string("Subject: test\n\n",
+ policy=self.MyPolicy)
+
+ def test_message_from_bytes(self):
+ with self.assertRaisesRegex(Exception, "^test$"):
+ email.message_from_bytes(b"Subject: test\n\n",
+ policy=self.MyPolicy)
+
+ def test_message_from_file(self):
+ f = io.StringIO('Subject: test\n\n')
+ with self.assertRaisesRegex(Exception, "^test$"):
+ email.message_from_file(f, policy=self.MyPolicy)
+
+ def test_message_from_binary_file(self):
+ f = io.BytesIO(b'Subject: test\n\n')
+ with self.assertRaisesRegex(Exception, "^test$"):
+ email.message_from_binary_file(f, policy=self.MyPolicy)
+
+ # These are redundant, but we need them for black-box completeness.
+
+ def test_parser(self):
+ p = email.parser.Parser(policy=self.MyPolicy)
+ with self.assertRaisesRegex(Exception, "^test$"):
+ p.parsestr('Subject: test\n\n')
+
+ def test_bytes_parser(self):
+ p = email.parser.BytesParser(policy=self.MyPolicy)
+ with self.assertRaisesRegex(Exception, "^test$"):
+ p.parsebytes(b'Subject: test\n\n')
+
+ # Now that we've established that all the parse methods get the
+ # policy in to feedparser, we can use message_from_string for
+ # the rest of the propagation tests.
+
+ def _make_msg(self, source='Subject: test\n\n', policy=None):
+ self.policy = email.policy.default.clone() if policy is None else policy
+ return email.message_from_string(source, policy=self.policy)
+
+ def test_parser_propagates_policy_to_message(self):
+ msg = self._make_msg()
+ self.assertIs(msg.policy, self.policy)
+
+ def test_parser_propagates_policy_to_sub_messages(self):
+ msg = self._make_msg(textwrap.dedent("""\
+ Subject: mime test
+ MIME-Version: 1.0
+ Content-Type: multipart/mixed, boundary="XXX"
+
+ --XXX
+ Content-Type: text/plain
+
+ test
+ --XXX
+ Content-Type: text/plain
+
+ test2
+ --XXX--
+ """))
+ for part in msg.walk():
+ self.assertIs(part.policy, self.policy)
+
+ def test_message_policy_propagates_to_generator(self):
+ msg = self._make_msg("Subject: test\nTo: foo\n\n",
+ policy=email.policy.default.clone(linesep='X'))
+ s = io.StringIO()
+ g = email.generator.Generator(s)
+ g.flatten(msg)
+ self.assertEqual(s.getvalue(), "Subject: testXTo: fooXX")
+
+ def test_message_policy_used_by_as_string(self):
+ msg = self._make_msg("Subject: test\nTo: foo\n\n",
+ policy=email.policy.default.clone(linesep='X'))
+ self.assertEqual(msg.as_string(), "Subject: testXTo: fooXX")
+
+
if __name__ == '__main__':
unittest.main()