diff options
Diffstat (limited to 'Lib/test')
-rw-r--r-- | Lib/test/test_email/__init__.py | 14 | ||||
-rw-r--r-- | Lib/test/test_email/test_email.py | 180 | ||||
-rw-r--r-- | Lib/test/test_email/test_generator.py | 85 | ||||
-rw-r--r-- | Lib/test/test_email/test_parser.py | 276 | ||||
-rw-r--r-- | Lib/test/test_email/test_policy.py | 114 |
5 files changed, 514 insertions, 155 deletions
diff --git a/Lib/test/test_email/__init__.py b/Lib/test/test_email/__init__.py index d72b50e..b05fb3c 100644 --- a/Lib/test/test_email/__init__.py +++ b/Lib/test/test_email/__init__.py @@ -3,6 +3,8 @@ import sys import unittest import test.support import email +from email.message import Message +from email._policybase import compat32 from test.test_email import __file__ as landmark # Run all tests in package for '-m unittest test.test_email' @@ -36,16 +38,26 @@ def openfile(filename, *args, **kws): class TestEmailBase(unittest.TestCase): maxDiff = None + # Currently the default policy is compat32. By setting that as the default + # here we make minimal changes in the test_email tests compared to their + # pre-3.3 state. + policy = compat32 def __init__(self, *args, **kw): super().__init__(*args, **kw) self.addTypeEqualityFunc(bytes, self.assertBytesEqual) + # Backward compatibility to minimize test_email test changes. ndiffAssertEqual = unittest.TestCase.assertEqual def _msgobj(self, filename): with openfile(filename) as fp: - return email.message_from_file(fp) + return email.message_from_file(fp, policy=self.policy) + + def _str_msg(self, string, message=Message, policy=None): + if policy is None: + policy = self.policy + return email.message_from_string(string, message, policy=policy) def _bytes_repr(self, b): return [repr(x) for x in b.splitlines(keepends=True)] diff --git a/Lib/test/test_email/test_email.py b/Lib/test/test_email/test_email.py index b07f675..ac6ee65 100644 --- a/Lib/test/test_email/test_email.py +++ b/Lib/test/test_email/test_email.py @@ -16,6 +16,7 @@ from io import StringIO, BytesIO from itertools import chain import email +import email.policy from email.charset import Charset from email.header import Header, decode_header, make_header @@ -1805,11 +1806,7 @@ YXNkZg== # Test some badly formatted messages -class TestNonConformantBase: - - def _msgobj(self, filename): - with openfile(filename) as fp: - return email.message_from_file(fp, policy=self.policy) +class TestNonConformant(TestEmailBase): def test_parse_missing_minor_type(self): eq = self.assertEqual @@ -1818,24 +1815,26 @@ class TestNonConformantBase: eq(msg.get_content_maintype(), 'text') eq(msg.get_content_subtype(), 'plain') + # test_parser.TestMessageDefectDetectionBase def test_same_boundary_inner_outer(self): unless = self.assertTrue msg = self._msgobj('msg_15.txt') # XXX We can probably eventually do better inner = msg.get_payload(0) unless(hasattr(inner, 'defects')) - self.assertEqual(len(self.get_defects(inner)), 1) - unless(isinstance(self.get_defects(inner)[0], + self.assertEqual(len(inner.defects), 1) + unless(isinstance(inner.defects[0], errors.StartBoundaryNotFoundDefect)) + # test_parser.TestMessageDefectDetectionBase def test_multipart_no_boundary(self): unless = self.assertTrue msg = self._msgobj('msg_25.txt') unless(isinstance(msg.get_payload(), str)) - self.assertEqual(len(self.get_defects(msg)), 2) - unless(isinstance(self.get_defects(msg)[0], + self.assertEqual(len(msg.defects), 2) + unless(isinstance(msg.defects[0], errors.NoBoundaryInMultipartDefect)) - unless(isinstance(self.get_defects(msg)[1], + unless(isinstance(msg.defects[1], errors.MultipartInvariantViolationDefect)) multipart_msg = textwrap.dedent("""\ @@ -1861,27 +1860,26 @@ class TestNonConformantBase: --===============3344438784458119861==-- """) + # test_parser.TestMessageDefectDetectionBase def test_multipart_invalid_cte(self): - msg = email.message_from_string( - self.multipart_msg.format("\nContent-Transfer-Encoding: base64"), - policy = self.policy) - self.assertEqual(len(self.get_defects(msg)), 1) - self.assertIsInstance(self.get_defects(msg)[0], + msg = self._str_msg( + self.multipart_msg.format("\nContent-Transfer-Encoding: base64")) + self.assertEqual(len(msg.defects), 1) + self.assertIsInstance(msg.defects[0], errors.InvalidMultipartContentTransferEncodingDefect) + # test_parser.TestMessageDefectDetectionBase def test_multipart_no_cte_no_defect(self): - msg = email.message_from_string( - self.multipart_msg.format(''), - policy = self.policy) - self.assertEqual(len(self.get_defects(msg)), 0) + msg = self._str_msg(self.multipart_msg.format('')) + self.assertEqual(len(msg.defects), 0) + # test_parser.TestMessageDefectDetectionBase def test_multipart_valid_cte_no_defect(self): for cte in ('7bit', '8bit', 'BINary'): - msg = email.message_from_string( + msg = self._str_msg( self.multipart_msg.format( - "\nContent-Transfer-Encoding: {}".format(cte)), - policy = self.policy) - self.assertEqual(len(self.get_defects(msg)), 0) + "\nContent-Transfer-Encoding: {}".format(cte))) + self.assertEqual(len(msg.defects), 0) def test_invalid_content_type(self): eq = self.assertEqual @@ -1932,16 +1930,18 @@ Subject: here's something interesting counter to RFC 2822, there's no separating newline here """) + # test_parser.TestMessageDefectDetectionBase def test_lying_multipart(self): unless = self.assertTrue msg = self._msgobj('msg_41.txt') unless(hasattr(msg, 'defects')) - self.assertEqual(len(self.get_defects(msg)), 2) - unless(isinstance(self.get_defects(msg)[0], + self.assertEqual(len(msg.defects), 2) + unless(isinstance(msg.defects[0], errors.NoBoundaryInMultipartDefect)) - unless(isinstance(self.get_defects(msg)[1], + unless(isinstance(msg.defects[1], errors.MultipartInvariantViolationDefect)) + # test_parser.TestMessageDefectDetectionBase def test_missing_start_boundary(self): outer = self._msgobj('msg_42.txt') # The message structure is: @@ -1953,71 +1953,21 @@ counter to RFC 2822, there's no separating newline here # # [*] This message is missing its start boundary bad = outer.get_payload(1).get_payload(0) - self.assertEqual(len(self.get_defects(bad)), 1) - self.assertTrue(isinstance(self.get_defects(bad)[0], + self.assertEqual(len(bad.defects), 1) + self.assertTrue(isinstance(bad.defects[0], errors.StartBoundaryNotFoundDefect)) + # test_parser.TestMessageDefectDetectionBase def test_first_line_is_continuation_header(self): eq = self.assertEqual m = ' Line 1\nLine 2\nLine 3' - msg = email.message_from_string(m, policy=self.policy) + msg = email.message_from_string(m) eq(msg.keys(), []) eq(msg.get_payload(), 'Line 2\nLine 3') - eq(len(self.get_defects(msg)), 1) - self.assertTrue(isinstance(self.get_defects(msg)[0], + eq(len(msg.defects), 1) + self.assertTrue(isinstance(msg.defects[0], errors.FirstHeaderLineIsContinuationDefect)) - eq(self.get_defects(msg)[0].line, ' Line 1\n') - - -class TestNonConformant(TestNonConformantBase, TestEmailBase): - - policy=email.policy.default - - def get_defects(self, obj): - return obj.defects - - -class TestNonConformantCapture(TestNonConformantBase, TestEmailBase): - - class CapturePolicy(email.policy.Policy): - captured = None - def register_defect(self, obj, defect): - self.captured.append(defect) - - def setUp(self): - self.policy = self.CapturePolicy(captured=list()) - - def get_defects(self, obj): - return self.policy.captured - - -class TestRaisingDefects(TestEmailBase): - - def _msgobj(self, filename): - with openfile(filename) as fp: - return email.message_from_file(fp, policy=email.policy.strict) - - def test_same_boundary_inner_outer(self): - with self.assertRaises(errors.StartBoundaryNotFoundDefect): - self._msgobj('msg_15.txt') - - def test_multipart_no_boundary(self): - with self.assertRaises(errors.NoBoundaryInMultipartDefect): - self._msgobj('msg_25.txt') - - def test_lying_multipart(self): - with self.assertRaises(errors.NoBoundaryInMultipartDefect): - self._msgobj('msg_41.txt') - - - def test_missing_start_boundary(self): - with self.assertRaises(errors.StartBoundaryNotFoundDefect): - self._msgobj('msg_42.txt') - - def test_first_line_is_continuation_header(self): - m = ' Line 1\nLine 2\nLine 3' - with self.assertRaises(errors.FirstHeaderLineIsContinuationDefect): - msg = email.message_from_string(m, policy=email.policy.strict) + eq(msg.defects[0].line, ' Line 1\n') # Test RFC 2047 header encoding and decoding @@ -2610,6 +2560,13 @@ class TestMiscellaneous(TestEmailBase): for subpart in msg.walk(): unless(isinstance(subpart, MyMessage)) + def test_custom_message_does_not_require_arguments(self): + class MyMessage(Message): + def __init__(self): + super().__init__() + msg = self._str_msg("Subject: test\n\ntest", MyMessage) + self.assertTrue(isinstance(msg, MyMessage)) + def test__all__(self): module = __import__('email') self.assertEqual(sorted(module.__all__), [ @@ -3137,25 +3094,6 @@ Here's the message body g.flatten(msg, linesep='\r\n') self.assertEqual(s.getvalue(), text) - def test_crlf_control_via_policy(self): - with openfile('msg_26.txt', newline='\n') as fp: - text = fp.read() - msg = email.message_from_string(text) - s = StringIO() - g = email.generator.Generator(s, policy=email.policy.SMTP) - g.flatten(msg) - self.assertEqual(s.getvalue(), text) - - def test_flatten_linesep_overrides_policy(self): - # msg_27 is lf separated - with openfile('msg_27.txt', newline='\n') as fp: - text = fp.read() - msg = email.message_from_string(text) - s = StringIO() - g = email.generator.Generator(s, policy=email.policy.SMTP) - g.flatten(msg, linesep='\n') - self.assertEqual(s.getvalue(), text) - maxDiff = None def test_multipart_digest_with_extra_mime_headers(self): @@ -3646,44 +3584,6 @@ class Test8BitBytesHandling(unittest.TestCase): s.getvalue(), 'Subject: =?utf-8?b?xb5sdcWlb3XEjWvDvSBrxa/FiA==?=\r\n\r\n') - def test_crlf_control_via_policy(self): - # msg_26 is crlf terminated - with openfile('msg_26.txt', 'rb') as fp: - text = fp.read() - msg = email.message_from_bytes(text) - s = BytesIO() - g = email.generator.BytesGenerator(s, policy=email.policy.SMTP) - g.flatten(msg) - self.assertEqual(s.getvalue(), text) - - def test_flatten_linesep_overrides_policy(self): - # msg_27 is lf separated - with openfile('msg_27.txt', 'rb') as fp: - text = fp.read() - msg = email.message_from_bytes(text) - s = BytesIO() - g = email.generator.BytesGenerator(s, policy=email.policy.SMTP) - g.flatten(msg, linesep='\n') - self.assertEqual(s.getvalue(), text) - - def test_must_be_7bit_handles_unknown_8bit(self): - msg = email.message_from_bytes(self.non_latin_bin_msg) - out = BytesIO() - g = email.generator.BytesGenerator(out, - policy=email.policy.default.clone(must_be_7bit=True)) - g.flatten(msg) - self.assertEqual(out.getvalue(), - self.non_latin_bin_msg_as7bit_wrapped.encode('ascii')) - - def test_must_be_7bit_transforms_8bit_cte(self): - msg = email.message_from_bytes(self.latin_bin_msg) - out = BytesIO() - g = email.generator.BytesGenerator(out, - policy=email.policy.default.clone(must_be_7bit=True)) - g.flatten(msg) - self.assertEqual(out.getvalue(), - self.latin_bin_msg_as7bit.encode('ascii')) - maxDiff = None diff --git a/Lib/test/test_email/test_generator.py b/Lib/test/test_email/test_generator.py index 35ca6c5..8f5fde7 100644 --- a/Lib/test/test_email/test_generator.py +++ b/Lib/test/test_email/test_generator.py @@ -11,6 +11,8 @@ from test.test_email import TestEmailBase class TestGeneratorBase(): + policy = policy.compat32 + long_subject = { 0: textwrap.dedent("""\ To: whom_it_may_concern@example.com @@ -58,11 +60,11 @@ class TestGeneratorBase(): long_subject[100] = long_subject[0] def maxheaderlen_parameter_test(self, n): - msg = self.msgmaker(self.long_subject[0]) + msg = self.msgmaker(self.typ(self.long_subject[0])) s = self.ioclass() g = self.genclass(s, maxheaderlen=n) g.flatten(msg) - self.assertEqual(s.getvalue(), self.long_subject[n]) + self.assertEqual(s.getvalue(), self.typ(self.long_subject[n])) def test_maxheaderlen_parameter_0(self): self.maxheaderlen_parameter_test(0) @@ -77,11 +79,11 @@ class TestGeneratorBase(): self.maxheaderlen_parameter_test(20) def maxheaderlen_policy_test(self, n): - msg = self.msgmaker(self.long_subject[0]) + msg = self.msgmaker(self.typ(self.long_subject[0])) s = self.ioclass() g = self.genclass(s, policy=policy.default.clone(max_line_length=n)) g.flatten(msg) - self.assertEqual(s.getvalue(), self.long_subject[n]) + self.assertEqual(s.getvalue(), self.typ(self.long_subject[n])) def test_maxheaderlen_policy_0(self): self.maxheaderlen_policy_test(0) @@ -96,12 +98,12 @@ class TestGeneratorBase(): self.maxheaderlen_policy_test(20) def maxheaderlen_parm_overrides_policy_test(self, n): - msg = self.msgmaker(self.long_subject[0]) + msg = self.msgmaker(self.typ(self.long_subject[0])) s = self.ioclass() g = self.genclass(s, maxheaderlen=n, policy=policy.default.clone(max_line_length=10)) g.flatten(msg) - self.assertEqual(s.getvalue(), self.long_subject[n]) + self.assertEqual(s.getvalue(), self.typ(self.long_subject[n])) def test_maxheaderlen_parm_overrides_policy_0(self): self.maxheaderlen_parm_overrides_policy_test(0) @@ -115,21 +117,84 @@ class TestGeneratorBase(): def test_maxheaderlen_parm_overrides_policy_20(self): self.maxheaderlen_parm_overrides_policy_test(20) + def test_crlf_control_via_policy(self): + source = "Subject: test\r\n\r\ntest body\r\n" + expected = source + msg = self.msgmaker(self.typ(source)) + s = self.ioclass() + g = self.genclass(s, policy=policy.SMTP) + g.flatten(msg) + self.assertEqual(s.getvalue(), self.typ(expected)) + + def test_flatten_linesep_overrides_policy(self): + source = "Subject: test\n\ntest body\n" + expected = source + msg = self.msgmaker(self.typ(source)) + s = self.ioclass() + g = self.genclass(s, policy=policy.SMTP) + g.flatten(msg, linesep='\n') + self.assertEqual(s.getvalue(), self.typ(expected)) + class TestGenerator(TestGeneratorBase, TestEmailBase): - msgmaker = staticmethod(message_from_string) genclass = Generator ioclass = io.StringIO + typ = str + + def msgmaker(self, msg, policy=None): + policy = self.policy if policy is None else policy + return message_from_string(msg, policy=policy) class TestBytesGenerator(TestGeneratorBase, TestEmailBase): - msgmaker = staticmethod(message_from_bytes) genclass = BytesGenerator ioclass = io.BytesIO - long_subject = {key: x.encode('ascii') - for key, x in TestGeneratorBase.long_subject.items()} + typ = lambda self, x: x.encode('ascii') + + def msgmaker(self, msg, policy=None): + policy = self.policy if policy is None else policy + return message_from_bytes(msg, policy=policy) + + def test_cte_type_7bit_handles_unknown_8bit(self): + source = ("Subject: Maintenant je vous présente mon " + "collègue\n\n").encode('utf-8') + expected = ('Subject: =?unknown-8bit?q?Maintenant_je_vous_pr=C3=A9sente_mon_' + 'coll=C3=A8gue?=\n\n').encode('ascii') + msg = message_from_bytes(source) + s = io.BytesIO() + g = BytesGenerator(s, policy=self.policy.clone(cte_type='7bit')) + g.flatten(msg) + self.assertEqual(s.getvalue(), expected) + + def test_cte_type_7bit_transforms_8bit_cte(self): + source = textwrap.dedent("""\ + From: foo@bar.com + To: Dinsdale + Subject: Nudge nudge, wink, wink + Mime-Version: 1.0 + Content-Type: text/plain; charset="latin-1" + Content-Transfer-Encoding: 8bit + + oh là là, know what I mean, know what I mean? + """).encode('latin1') + msg = message_from_bytes(source) + expected = textwrap.dedent("""\ + From: foo@bar.com + To: Dinsdale + Subject: Nudge nudge, wink, wink + Mime-Version: 1.0 + Content-Type: text/plain; charset="iso-8859-1" + Content-Transfer-Encoding: quoted-printable + + oh l=E0 l=E0, know what I mean, know what I mean? + """).encode('ascii') + s = io.BytesIO() + g = BytesGenerator(s, policy=self.policy.clone(cte_type='7bit', + linesep='\n')) + g.flatten(msg) + self.assertEqual(s.getvalue(), expected) if __name__ == '__main__': diff --git a/Lib/test/test_email/test_parser.py b/Lib/test/test_email/test_parser.py new file mode 100644 index 0000000..864e4c1 --- /dev/null +++ b/Lib/test/test_email/test_parser.py @@ -0,0 +1,276 @@ +import io +import email +import textwrap +import unittest +from email._policybase import Compat32 +from email import errors +from email.message import Message +from test.test_email import TestEmailBase + + +class TestCustomMessage(TestEmailBase): + + class MyMessage(Message): + def __init__(self, policy): + self.check_policy = policy + super().__init__() + + MyPolicy = TestEmailBase.policy.clone(linesep='boo') + + def test_custom_message_gets_policy_if_possible_from_string(self): + msg = email.message_from_string("Subject: bogus\n\nmsg\n", + self.MyMessage, + policy=self.MyPolicy) + self.assertTrue(isinstance(msg, self.MyMessage)) + self.assertIs(msg.check_policy, self.MyPolicy) + + def test_custom_message_gets_policy_if_possible_from_file(self): + source_file = io.StringIO("Subject: bogus\n\nmsg\n") + msg = email.message_from_file(source_file, + self.MyMessage, + policy=self.MyPolicy) + self.assertTrue(isinstance(msg, self.MyMessage)) + self.assertIs(msg.check_policy, self.MyPolicy) + + # XXX add tests for other functions that take Message arg. + + +class TestMessageDefectDetectionBase: + + dup_boundary_msg = textwrap.dedent("""\ + Subject: XX + From: xx@xx.dk + To: XX + Mime-version: 1.0 + Content-type: multipart/mixed; + boundary="MS_Mac_OE_3071477847_720252_MIME_Part" + + --MS_Mac_OE_3071477847_720252_MIME_Part + Content-type: multipart/alternative; + boundary="MS_Mac_OE_3071477847_720252_MIME_Part" + + --MS_Mac_OE_3071477847_720252_MIME_Part + Content-type: text/plain; charset="ISO-8859-1" + Content-transfer-encoding: quoted-printable + + text + + --MS_Mac_OE_3071477847_720252_MIME_Part + Content-type: text/html; charset="ISO-8859-1" + Content-transfer-encoding: quoted-printable + + <HTML></HTML> + + --MS_Mac_OE_3071477847_720252_MIME_Part-- + + --MS_Mac_OE_3071477847_720252_MIME_Part + Content-type: image/gif; name="xx.gif"; + Content-disposition: attachment + Content-transfer-encoding: base64 + + Some removed base64 encoded chars. + + --MS_Mac_OE_3071477847_720252_MIME_Part-- + + """) + + def test_same_boundary_inner_outer(self): + # XXX better would be to actually detect the duplicate. + msg = self._str_msg(self.dup_boundary_msg) + inner = msg.get_payload(0) + self.assertTrue(hasattr(inner, 'defects')) + self.assertEqual(len(self.get_defects(inner)), 1) + self.assertTrue(isinstance(self.get_defects(inner)[0], + errors.StartBoundaryNotFoundDefect)) + + def test_same_boundary_inner_outer_raises_on_defect(self): + with self.assertRaises(errors.StartBoundaryNotFoundDefect): + self._str_msg(self.dup_boundary_msg, + policy=self.policy.clone(raise_on_defect=True)) + + no_boundary_msg = textwrap.dedent("""\ + Date: Fri, 6 Apr 2001 09:23:06 -0800 (GMT-0800) + From: foobar + Subject: broken mail + MIME-Version: 1.0 + Content-Type: multipart/report; report-type=delivery-status; + + --JAB03225.986577786/zinfandel.lacita.com + + One part + + --JAB03225.986577786/zinfandel.lacita.com + Content-Type: message/delivery-status + + Header: Another part + + --JAB03225.986577786/zinfandel.lacita.com-- + """) + + def test_multipart_no_boundary(self): + msg = self._str_msg(self.no_boundary_msg) + self.assertTrue(isinstance(msg.get_payload(), str)) + self.assertEqual(len(self.get_defects(msg)), 2) + self.assertTrue(isinstance(self.get_defects(msg)[0], + errors.NoBoundaryInMultipartDefect)) + self.assertTrue(isinstance(self.get_defects(msg)[1], + errors.MultipartInvariantViolationDefect)) + + def test_multipart_no_boundary_raise_on_defect(self): + with self.assertRaises(errors.NoBoundaryInMultipartDefect): + self._str_msg(self.no_boundary_msg, + policy=self.policy.clone(raise_on_defect=True)) + + multipart_msg = textwrap.dedent("""\ + Date: Wed, 14 Nov 2007 12:56:23 GMT + From: foo@bar.invalid + To: foo@bar.invalid + Subject: Content-Transfer-Encoding: base64 and multipart + MIME-Version: 1.0 + Content-Type: multipart/mixed; + boundary="===============3344438784458119861=="{} + + --===============3344438784458119861== + Content-Type: text/plain + + Test message + + --===============3344438784458119861== + Content-Type: application/octet-stream + Content-Transfer-Encoding: base64 + + YWJj + + --===============3344438784458119861==-- + """) + + def test_multipart_invalid_cte(self): + msg = self._str_msg( + self.multipart_msg.format("\nContent-Transfer-Encoding: base64")) + self.assertEqual(len(self.get_defects(msg)), 1) + self.assertIsInstance(self.get_defects(msg)[0], + errors.InvalidMultipartContentTransferEncodingDefect) + + def test_multipart_invalid_cte_raise_on_defect(self): + with self.assertRaises( + errors.InvalidMultipartContentTransferEncodingDefect): + self._str_msg( + self.multipart_msg.format( + "\nContent-Transfer-Encoding: base64"), + policy=self.policy.clone(raise_on_defect=True)) + + def test_multipart_no_cte_no_defect(self): + msg = self._str_msg(self.multipart_msg.format('')) + self.assertEqual(len(self.get_defects(msg)), 0) + + def test_multipart_valid_cte_no_defect(self): + for cte in ('7bit', '8bit', 'BINary'): + msg = self._str_msg( + self.multipart_msg.format("\nContent-Transfer-Encoding: "+cte)) + self.assertEqual(len(self.get_defects(msg)), 0, "cte="+cte) + + lying_multipart_msg = textwrap.dedent("""\ + From: "Allison Dunlap" <xxx@example.com> + To: yyy@example.com + Subject: 64423 + Date: Sun, 11 Jul 2004 16:09:27 -0300 + MIME-Version: 1.0 + Content-Type: multipart/alternative; + + Blah blah blah + """) + + def test_lying_multipart(self): + msg = self._str_msg(self.lying_multipart_msg) + self.assertTrue(hasattr(msg, 'defects')) + self.assertEqual(len(self.get_defects(msg)), 2) + self.assertTrue(isinstance(self.get_defects(msg)[0], + errors.NoBoundaryInMultipartDefect)) + self.assertTrue(isinstance(self.get_defects(msg)[1], + errors.MultipartInvariantViolationDefect)) + + def test_lying_multipart_raise_on_defect(self): + with self.assertRaises(errors.NoBoundaryInMultipartDefect): + self._str_msg(self.lying_multipart_msg, + policy=self.policy.clone(raise_on_defect=True)) + + missing_start_boundary_msg = textwrap.dedent("""\ + Content-Type: multipart/mixed; boundary="AAA" + From: Mail Delivery Subsystem <xxx@example.com> + To: yyy@example.com + + --AAA + + Stuff + + --AAA + Content-Type: message/rfc822 + + From: webmaster@python.org + To: zzz@example.com + Content-Type: multipart/mixed; boundary="BBB" + + --BBB-- + + --AAA-- + + """) + + def test_missing_start_boundary(self): + # The message structure is: + # + # multipart/mixed + # text/plain + # message/rfc822 + # multipart/mixed [*] + # + # [*] This message is missing its start boundary + outer = self._str_msg(self.missing_start_boundary_msg) + bad = outer.get_payload(1).get_payload(0) + self.assertEqual(len(self.get_defects(bad)), 1) + self.assertTrue(isinstance(self.get_defects(bad)[0], + errors.StartBoundaryNotFoundDefect)) + + def test_missing_start_boundary_raise_on_defect(self): + with self.assertRaises(errors.StartBoundaryNotFoundDefect): + self._str_msg(self.missing_start_boundary_msg, + policy=self.policy.clone(raise_on_defect=True)) + + def test_first_line_is_continuation_header(self): + msg = self._str_msg(' Line 1\nLine 2\nLine 3') + self.assertEqual(msg.keys(), []) + self.assertEqual(msg.get_payload(), 'Line 2\nLine 3') + self.assertEqual(len(self.get_defects(msg)), 1) + self.assertTrue(isinstance(self.get_defects(msg)[0], + errors.FirstHeaderLineIsContinuationDefect)) + self.assertEqual(self.get_defects(msg)[0].line, ' Line 1\n') + + def test_first_line_is_continuation_header_raise_on_defect(self): + with self.assertRaises(errors.FirstHeaderLineIsContinuationDefect): + self._str_msg(' Line 1\nLine 2\nLine 3', + policy=self.policy.clone(raise_on_defect=True)) + + +class TestMessageDefectDetection(TestMessageDefectDetectionBase, TestEmailBase): + + def get_defects(self, obj): + return obj.defects + + +class TestMessageDefectDetectionCapture(TestMessageDefectDetectionBase, + TestEmailBase): + + class CapturePolicy(Compat32): + captured = None + def register_defect(self, obj, defect): + self.captured.append(defect) + + def setUp(self): + self.policy = self.CapturePolicy(captured=list()) + + def get_defects(self, obj): + return self.policy.captured + + +if __name__ == '__main__': + unittest.main() diff --git a/Lib/test/test_email/test_policy.py b/Lib/test/test_email/test_policy.py index 1c65901..07925a7 100644 --- a/Lib/test/test_email/test_policy.py +++ b/Lib/test/test_email/test_policy.py @@ -1,6 +1,10 @@ +import io import types +import textwrap import unittest import email.policy +import email.parser +import email.generator class PolicyAPITests(unittest.TestCase): @@ -11,14 +15,15 @@ class PolicyAPITests(unittest.TestCase): policy_defaults = { 'max_line_length': 78, 'linesep': '\n', - 'must_be_7bit': False, + 'cte_type': '8bit', 'raise_on_defect': False, } # For each policy under test, we give here the values of the attributes # that are different from the defaults for that policy. policies = { - email.policy.Policy(): {}, + email.policy.Compat32(): {}, + email.policy.compat32: {}, email.policy.default: {}, email.policy.SMTP: {'linesep': '\r\n'}, email.policy.HTTP: {'linesep': '\r\n', 'max_line_length': None}, @@ -44,6 +49,18 @@ class PolicyAPITests(unittest.TestCase): self.assertIn(attr, self.policy_defaults, "{} is not fully tested".format(attr)) + def test_abc(self): + with self.assertRaises(TypeError) as cm: + email.policy.Policy() + msg = str(cm.exception) + abstract_methods = ('fold', + 'fold_binary', + 'header_fetch_parse', + 'header_source_parse', + 'header_store_parse') + for method in abstract_methods: + self.assertIn(method, msg) + def test_policy_is_immutable(self): for policy in self.policies: for attr in self.policy_defaults: @@ -88,7 +105,7 @@ class PolicyAPITests(unittest.TestCase): self.defects = [] obj = Dummy() defect = object() - policy = email.policy.Policy() + policy = email.policy.Compat32() policy.register_defect(obj, defect) self.assertEqual(obj.defects, [defect]) defect2 = object() @@ -117,7 +134,7 @@ class PolicyAPITests(unittest.TestCase): email.policy.default.handle_defect(foo, defect2) self.assertEqual(foo.defects, [defect1, defect2]) - class MyPolicy(email.policy.Policy): + class MyPolicy(email.policy.Compat32): defects = None def __init__(self, *args, **kw): super().__init__(*args, defects=[], **kw) @@ -146,5 +163,94 @@ class PolicyAPITests(unittest.TestCase): # For adding subclassed objects, make sure the usual rules apply (subclass # wins), but that the order still works (right overrides left). + +class TestPolicyPropagation(unittest.TestCase): + + # The abstract methods are used by the parser but not by the wrapper + # functions that call it, so if the exception gets raised we know that the + # policy was actually propagated all the way to feedparser. + class MyPolicy(email.policy.Policy): + def badmethod(self, *args, **kw): + raise Exception("test") + fold = fold_binary = header_fetch_parser = badmethod + header_source_parse = header_store_parse = badmethod + + def test_message_from_string(self): + with self.assertRaisesRegex(Exception, "^test$"): + email.message_from_string("Subject: test\n\n", + policy=self.MyPolicy) + + def test_message_from_bytes(self): + with self.assertRaisesRegex(Exception, "^test$"): + email.message_from_bytes(b"Subject: test\n\n", + policy=self.MyPolicy) + + def test_message_from_file(self): + f = io.StringIO('Subject: test\n\n') + with self.assertRaisesRegex(Exception, "^test$"): + email.message_from_file(f, policy=self.MyPolicy) + + def test_message_from_binary_file(self): + f = io.BytesIO(b'Subject: test\n\n') + with self.assertRaisesRegex(Exception, "^test$"): + email.message_from_binary_file(f, policy=self.MyPolicy) + + # These are redundant, but we need them for black-box completeness. + + def test_parser(self): + p = email.parser.Parser(policy=self.MyPolicy) + with self.assertRaisesRegex(Exception, "^test$"): + p.parsestr('Subject: test\n\n') + + def test_bytes_parser(self): + p = email.parser.BytesParser(policy=self.MyPolicy) + with self.assertRaisesRegex(Exception, "^test$"): + p.parsebytes(b'Subject: test\n\n') + + # Now that we've established that all the parse methods get the + # policy in to feedparser, we can use message_from_string for + # the rest of the propagation tests. + + def _make_msg(self, source='Subject: test\n\n', policy=None): + self.policy = email.policy.default.clone() if policy is None else policy + return email.message_from_string(source, policy=self.policy) + + def test_parser_propagates_policy_to_message(self): + msg = self._make_msg() + self.assertIs(msg.policy, self.policy) + + def test_parser_propagates_policy_to_sub_messages(self): + msg = self._make_msg(textwrap.dedent("""\ + Subject: mime test + MIME-Version: 1.0 + Content-Type: multipart/mixed, boundary="XXX" + + --XXX + Content-Type: text/plain + + test + --XXX + Content-Type: text/plain + + test2 + --XXX-- + """)) + for part in msg.walk(): + self.assertIs(part.policy, self.policy) + + def test_message_policy_propagates_to_generator(self): + msg = self._make_msg("Subject: test\nTo: foo\n\n", + policy=email.policy.default.clone(linesep='X')) + s = io.StringIO() + g = email.generator.Generator(s) + g.flatten(msg) + self.assertEqual(s.getvalue(), "Subject: testXTo: fooXX") + + def test_message_policy_used_by_as_string(self): + msg = self._make_msg("Subject: test\nTo: foo\n\n", + policy=email.policy.default.clone(linesep='X')) + self.assertEqual(msg.as_string(), "Subject: testXTo: fooXX") + + if __name__ == '__main__': unittest.main() |