summaryrefslogtreecommitdiffstats
path: root/Lib/test/test_email/test_email.py
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/test/test_email/test_email.py')
-rw-r--r--Lib/test/test_email/test_email.py144
1 files changed, 129 insertions, 15 deletions
diff --git a/Lib/test/test_email/test_email.py b/Lib/test/test_email/test_email.py
index 46206c3..1657afc 100644
--- a/Lib/test/test_email/test_email.py
+++ b/Lib/test/test_email/test_email.py
@@ -1776,7 +1776,12 @@ YXNkZg==
# Test some badly formatted messages
-class TestNonConformant(TestEmailBase):
+class TestNonConformantBase:
+
+ def _msgobj(self, filename):
+ with openfile(filename) as fp:
+ return email.message_from_file(fp, policy=self.policy)
+
def test_parse_missing_minor_type(self):
eq = self.assertEqual
msg = self._msgobj('msg_14.txt')
@@ -1790,17 +1795,18 @@ class TestNonConformant(TestEmailBase):
# XXX We can probably eventually do better
inner = msg.get_payload(0)
unless(hasattr(inner, 'defects'))
- self.assertEqual(len(inner.defects), 1)
- unless(isinstance(inner.defects[0],
+ self.assertEqual(len(self.get_defects(inner)), 1)
+ unless(isinstance(self.get_defects(inner)[0],
errors.StartBoundaryNotFoundDefect))
def test_multipart_no_boundary(self):
unless = self.assertTrue
msg = self._msgobj('msg_25.txt')
unless(isinstance(msg.get_payload(), str))
- self.assertEqual(len(msg.defects), 2)
- unless(isinstance(msg.defects[0], errors.NoBoundaryInMultipartDefect))
- unless(isinstance(msg.defects[1],
+ self.assertEqual(len(self.get_defects(msg)), 2)
+ unless(isinstance(self.get_defects(msg)[0],
+ errors.NoBoundaryInMultipartDefect))
+ unless(isinstance(self.get_defects(msg)[1],
errors.MultipartInvariantViolationDefect))
def test_invalid_content_type(self):
@@ -1856,9 +1862,10 @@ counter to RFC 2822, there's no separating newline here
unless = self.assertTrue
msg = self._msgobj('msg_41.txt')
unless(hasattr(msg, 'defects'))
- self.assertEqual(len(msg.defects), 2)
- unless(isinstance(msg.defects[0], errors.NoBoundaryInMultipartDefect))
- unless(isinstance(msg.defects[1],
+ self.assertEqual(len(self.get_defects(msg)), 2)
+ unless(isinstance(self.get_defects(msg)[0],
+ errors.NoBoundaryInMultipartDefect))
+ unless(isinstance(self.get_defects(msg)[1],
errors.MultipartInvariantViolationDefect))
def test_missing_start_boundary(self):
@@ -1872,21 +1879,71 @@ counter to RFC 2822, there's no separating newline here
#
# [*] This message is missing its start boundary
bad = outer.get_payload(1).get_payload(0)
- self.assertEqual(len(bad.defects), 1)
- self.assertTrue(isinstance(bad.defects[0],
+ self.assertEqual(len(self.get_defects(bad)), 1)
+ self.assertTrue(isinstance(self.get_defects(bad)[0],
errors.StartBoundaryNotFoundDefect))
def test_first_line_is_continuation_header(self):
eq = self.assertEqual
m = ' Line 1\nLine 2\nLine 3'
- msg = email.message_from_string(m)
+ msg = email.message_from_string(m, policy=self.policy)
eq(msg.keys(), [])
eq(msg.get_payload(), 'Line 2\nLine 3')
- eq(len(msg.defects), 1)
- self.assertTrue(isinstance(msg.defects[0],
+ eq(len(self.get_defects(msg)), 1)
+ self.assertTrue(isinstance(self.get_defects(msg)[0],
errors.FirstHeaderLineIsContinuationDefect))
- eq(msg.defects[0].line, ' Line 1\n')
+ eq(self.get_defects(msg)[0].line, ' Line 1\n')
+
+
+class TestNonConformant(TestNonConformantBase, TestEmailBase):
+
+ policy=email.policy.default
+
+ def get_defects(self, obj):
+ return obj.defects
+
+
+class TestNonConformantCapture(TestNonConformantBase, TestEmailBase):
+
+ class CapturePolicy(email.policy.Policy):
+ captured = None
+ def register_defect(self, obj, defect):
+ self.captured.append(defect)
+
+ def setUp(self):
+ self.policy = self.CapturePolicy(captured=list())
+
+ def get_defects(self, obj):
+ return self.policy.captured
+
+class TestRaisingDefects(TestEmailBase):
+
+ def _msgobj(self, filename):
+ with openfile(filename) as fp:
+ return email.message_from_file(fp, policy=email.policy.strict)
+
+ def test_same_boundary_inner_outer(self):
+ with self.assertRaises(errors.StartBoundaryNotFoundDefect):
+ self._msgobj('msg_15.txt')
+
+ def test_multipart_no_boundary(self):
+ with self.assertRaises(errors.NoBoundaryInMultipartDefect):
+ self._msgobj('msg_25.txt')
+
+ def test_lying_multipart(self):
+ with self.assertRaises(errors.NoBoundaryInMultipartDefect):
+ self._msgobj('msg_41.txt')
+
+
+ def test_missing_start_boundary(self):
+ with self.assertRaises(errors.StartBoundaryNotFoundDefect):
+ self._msgobj('msg_42.txt')
+
+ def test_first_line_is_continuation_header(self):
+ m = ' Line 1\nLine 2\nLine 3'
+ with self.assertRaises(errors.FirstHeaderLineIsContinuationDefect):
+ msg = email.message_from_string(m, policy=email.policy.strict)
# Test RFC 2047 header encoding and decoding
@@ -2997,6 +3054,25 @@ Here's the message body
g.flatten(msg, linesep='\r\n')
self.assertEqual(s.getvalue(), text)
+ def test_crlf_control_via_policy(self):
+ with openfile('msg_26.txt', newline='\n') as fp:
+ text = fp.read()
+ msg = email.message_from_string(text)
+ s = StringIO()
+ g = email.generator.Generator(s, policy=email.policy.SMTP)
+ g.flatten(msg)
+ self.assertEqual(s.getvalue(), text)
+
+ def test_flatten_linesep_overrides_policy(self):
+ # msg_27 is lf separated
+ with openfile('msg_27.txt', newline='\n') as fp:
+ text = fp.read()
+ msg = email.message_from_string(text)
+ s = StringIO()
+ g = email.generator.Generator(s, policy=email.policy.SMTP)
+ g.flatten(msg, linesep='\n')
+ self.assertEqual(s.getvalue(), text)
+
maxDiff = None
def test_multipart_digest_with_extra_mime_headers(self):
@@ -3463,6 +3539,44 @@ class Test8BitBytesHandling(unittest.TestCase):
g.flatten(msg)
self.assertEqual(s.getvalue(), source)
+ def test_crlf_control_via_policy(self):
+ # msg_26 is crlf terminated
+ with openfile('msg_26.txt', 'rb') as fp:
+ text = fp.read()
+ msg = email.message_from_bytes(text)
+ s = BytesIO()
+ g = email.generator.BytesGenerator(s, policy=email.policy.SMTP)
+ g.flatten(msg)
+ self.assertEqual(s.getvalue(), text)
+
+ def test_flatten_linesep_overrides_policy(self):
+ # msg_27 is lf separated
+ with openfile('msg_27.txt', 'rb') as fp:
+ text = fp.read()
+ msg = email.message_from_bytes(text)
+ s = BytesIO()
+ g = email.generator.BytesGenerator(s, policy=email.policy.SMTP)
+ g.flatten(msg, linesep='\n')
+ self.assertEqual(s.getvalue(), text)
+
+ def test_must_be_7bit_handles_unknown_8bit(self):
+ msg = email.message_from_bytes(self.non_latin_bin_msg)
+ out = BytesIO()
+ g = email.generator.BytesGenerator(out,
+ policy=email.policy.default.clone(must_be_7bit=True))
+ g.flatten(msg)
+ self.assertEqual(out.getvalue(),
+ self.non_latin_bin_msg_as7bit_wrapped.encode('ascii'))
+
+ def test_must_be_7bit_transforms_8bit_cte(self):
+ msg = email.message_from_bytes(self.latin_bin_msg)
+ out = BytesIO()
+ g = email.generator.BytesGenerator(out,
+ policy=email.policy.default.clone(must_be_7bit=True))
+ g.flatten(msg)
+ self.assertEqual(out.getvalue(),
+ self.latin_bin_msg_as7bit.encode('ascii'))
+
maxDiff = None