diff options
author | R David Murray <rdmurray@bitdance.com> | 2012-05-25 19:01:48 (GMT) |
---|---|---|
committer | R David Murray <rdmurray@bitdance.com> | 2012-05-25 19:01:48 (GMT) |
commit | c27e52265b7ff4aa57dc357c289cce8c9dd0fec3 (patch) | |
tree | b2a25260b0aa89d0a4db3c0d2f91c8cb5e68d51a /Lib/test/test_email | |
parent | 9242c1378f77214f5b9b90149861cb13ca986fb0 (diff) | |
download | cpython-c27e52265b7ff4aa57dc357c289cce8c9dd0fec3.zip cpython-c27e52265b7ff4aa57dc357c289cce8c9dd0fec3.tar.gz cpython-c27e52265b7ff4aa57dc357c289cce8c9dd0fec3.tar.bz2 |
#14731: refactor email policy framework.
This patch primarily does two things: (1) it adds some internal-interface
methods to Policy that allow for Policy to control the parsing and folding of
headers in such a way that we can construct a backward compatibility policy
that is 100% compatible with the 3.2 API, while allowing a new policy to
implement the email6 API. (2) it adds that backward compatibility policy and
refactors the test suite so that the only differences between the 3.2
test_email.py file and the 3.3 test_email.py file is some small changes in
test framework and the addition of tests for bugs fixed that apply to the 3.2
API.
There are some additional teaks, such as moving just the code needed for the
compatibility policy into _policybase, so that the library code can import
only _policybase. That way the new code that will be added for email6
will only get imported when a non-compatibility policy is imported.
Diffstat (limited to 'Lib/test/test_email')
-rw-r--r-- | Lib/test/test_email/__init__.py | 14 | ||||
-rw-r--r-- | Lib/test/test_email/test_email.py | 180 | ||||
-rw-r--r-- | Lib/test/test_email/test_generator.py | 85 | ||||
-rw-r--r-- | Lib/test/test_email/test_parser.py | 276 | ||||
-rw-r--r-- | Lib/test/test_email/test_policy.py | 114 |
5 files changed, 514 insertions, 155 deletions
diff --git a/Lib/test/test_email/__init__.py b/Lib/test/test_email/__init__.py index d72b50e..b05fb3c 100644 --- a/Lib/test/test_email/__init__.py +++ b/Lib/test/test_email/__init__.py @@ -3,6 +3,8 @@ import sys import unittest import test.support import email +from email.message import Message +from email._policybase import compat32 from test.test_email import __file__ as landmark # Run all tests in package for '-m unittest test.test_email' @@ -36,16 +38,26 @@ def openfile(filename, *args, **kws): class TestEmailBase(unittest.TestCase): maxDiff = None + # Currently the default policy is compat32. By setting that as the default + # here we make minimal changes in the test_email tests compared to their + # pre-3.3 state. + policy = compat32 def __init__(self, *args, **kw): super().__init__(*args, **kw) self.addTypeEqualityFunc(bytes, self.assertBytesEqual) + # Backward compatibility to minimize test_email test changes. ndiffAssertEqual = unittest.TestCase.assertEqual def _msgobj(self, filename): with openfile(filename) as fp: - return email.message_from_file(fp) + return email.message_from_file(fp, policy=self.policy) + + def _str_msg(self, string, message=Message, policy=None): + if policy is None: + policy = self.policy + return email.message_from_string(string, message, policy=policy) def _bytes_repr(self, b): return [repr(x) for x in b.splitlines(keepends=True)] diff --git a/Lib/test/test_email/test_email.py b/Lib/test/test_email/test_email.py index b07f675..ac6ee65 100644 --- a/Lib/test/test_email/test_email.py +++ b/Lib/test/test_email/test_email.py @@ -16,6 +16,7 @@ from io import StringIO, BytesIO from itertools import chain import email +import email.policy from email.charset import Charset from email.header import Header, decode_header, make_header @@ -1805,11 +1806,7 @@ YXNkZg== # Test some badly formatted messages -class TestNonConformantBase: - - def _msgobj(self, filename): - with openfile(filename) as fp: - return email.message_from_file(fp, policy=self.policy) +class TestNonConformant(TestEmailBase): def test_parse_missing_minor_type(self): eq = self.assertEqual @@ -1818,24 +1815,26 @@ class TestNonConformantBase: eq(msg.get_content_maintype(), 'text') eq(msg.get_content_subtype(), 'plain') + # test_parser.TestMessageDefectDetectionBase def test_same_boundary_inner_outer(self): unless = self.assertTrue msg = self._msgobj('msg_15.txt') # XXX We can probably eventually do better inner = msg.get_payload(0) unless(hasattr(inner, 'defects')) - self.assertEqual(len(self.get_defects(inner)), 1) - unless(isinstance(self.get_defects(inner)[0], + self.assertEqual(len(inner.defects), 1) + unless(isinstance(inner.defects[0], errors.StartBoundaryNotFoundDefect)) + # test_parser.TestMessageDefectDetectionBase def test_multipart_no_boundary(self): unless = self.assertTrue msg = self._msgobj('msg_25.txt') unless(isinstance(msg.get_payload(), str)) - self.assertEqual(len(self.get_defects(msg)), 2) - unless(isinstance(self.get_defects(msg)[0], + self.assertEqual(len(msg.defects), 2) + unless(isinstance(msg.defects[0], errors.NoBoundaryInMultipartDefect)) - unless(isinstance(self.get_defects(msg)[1], + unless(isinstance(msg.defects[1], errors.MultipartInvariantViolationDefect)) multipart_msg = textwrap.dedent("""\ @@ -1861,27 +1860,26 @@ class TestNonConformantBase: --===============3344438784458119861==-- """) + # test_parser.TestMessageDefectDetectionBase def test_multipart_invalid_cte(self): - msg = email.message_from_string( - self.multipart_msg.format("\nContent-Transfer-Encoding: base64"), - policy = self.policy) - self.assertEqual(len(self.get_defects(msg)), 1) - self.assertIsInstance(self.get_defects(msg)[0], + msg = self._str_msg( + self.multipart_msg.format("\nContent-Transfer-Encoding: base64")) + self.assertEqual(len(msg.defects), 1) + self.assertIsInstance(msg.defects[0], errors.InvalidMultipartContentTransferEncodingDefect) + # test_parser.TestMessageDefectDetectionBase def test_multipart_no_cte_no_defect(self): - msg = email.message_from_string( - self.multipart_msg.format(''), - policy = self.policy) - self.assertEqual(len(self.get_defects(msg)), 0) + msg = self._str_msg(self.multipart_msg.format('')) + self.assertEqual(len(msg.defects), 0) + # test_parser.TestMessageDefectDetectionBase def test_multipart_valid_cte_no_defect(self): for cte in ('7bit', '8bit', 'BINary'): - msg = email.message_from_string( + msg = self._str_msg( self.multipart_msg.format( - "\nContent-Transfer-Encoding: {}".format(cte)), - policy = self.policy) - self.assertEqual(len(self.get_defects(msg)), 0) + "\nContent-Transfer-Encoding: {}".format(cte))) + self.assertEqual(len(msg.defects), 0) def test_invalid_content_type(self): eq = self.assertEqual @@ -1932,16 +1930,18 @@ Subject: here's something interesting counter to RFC 2822, there's no separating newline here """) + # test_parser.TestMessageDefectDetectionBase def test_lying_multipart(self): unless = self.assertTrue msg = self._msgobj('msg_41.txt') unless(hasattr(msg, 'defects')) - self.assertEqual(len(self.get_defects(msg)), 2) - unless(isinstance(self.get_defects(msg)[0], + self.assertEqual(len(msg.defects), 2) + unless(isinstance(msg.defects[0], errors.NoBoundaryInMultipartDefect)) - unless(isinstance(self.get_defects(msg)[1], + unless(isinstance(msg.defects[1], errors.MultipartInvariantViolationDefect)) + # test_parser.TestMessageDefectDetectionBase def test_missing_start_boundary(self): outer = self._msgobj('msg_42.txt') # The message structure is: @@ -1953,71 +1953,21 @@ counter to RFC 2822, there's no separating newline here # # [*] This message is missing its start boundary bad = outer.get_payload(1).get_payload(0) - self.assertEqual(len(self.get_defects(bad)), 1) - self.assertTrue(isinstance(self.get_defects(bad)[0], + self.assertEqual(len(bad.defects), 1) + self.assertTrue(isinstance(bad.defects[0], errors.StartBoundaryNotFoundDefect)) + # test_parser.TestMessageDefectDetectionBase def test_first_line_is_continuation_header(self): eq = self.assertEqual m = ' Line 1\nLine 2\nLine 3' - msg = email.message_from_string(m, policy=self.policy) + msg = email.message_from_string(m) eq(msg.keys(), []) eq(msg.get_payload(), 'Line 2\nLine 3') - eq(len(self.get_defects(msg)), 1) - self.assertTrue(isinstance(self.get_defects(msg)[0], + eq(len(msg.defects), 1) + self.assertTrue(isinstance(msg.defects[0], errors.FirstHeaderLineIsContinuationDefect)) - eq(self.get_defects(msg)[0].line, ' Line 1\n') - - -class TestNonConformant(TestNonConformantBase, TestEmailBase): - - policy=email.policy.default - - def get_defects(self, obj): - return obj.defects - - -class TestNonConformantCapture(TestNonConformantBase, TestEmailBase): - - class CapturePolicy(email.policy.Policy): - captured = None - def register_defect(self, obj, defect): - self.captured.append(defect) - - def setUp(self): - self.policy = self.CapturePolicy(captured=list()) - - def get_defects(self, obj): - return self.policy.captured - - -class TestRaisingDefects(TestEmailBase): - - def _msgobj(self, filename): - with openfile(filename) as fp: - return email.message_from_file(fp, policy=email.policy.strict) - - def test_same_boundary_inner_outer(self): - with self.assertRaises(errors.StartBoundaryNotFoundDefect): - self._msgobj('msg_15.txt') - - def test_multipart_no_boundary(self): - with self.assertRaises(errors.NoBoundaryInMultipartDefect): - self._msgobj('msg_25.txt') - - def test_lying_multipart(self): - with self.assertRaises(errors.NoBoundaryInMultipartDefect): - self._msgobj('msg_41.txt') - - - def test_missing_start_boundary(self): - with self.assertRaises(errors.StartBoundaryNotFoundDefect): - self._msgobj('msg_42.txt') - - def test_first_line_is_continuation_header(self): - m = ' Line 1\nLine 2\nLine 3' - with self.assertRaises(errors.FirstHeaderLineIsContinuationDefect): - msg = email.message_from_string(m, policy=email.policy.strict) + eq(msg.defects[0].line, ' Line 1\n') # Test RFC 2047 header encoding and decoding @@ -2610,6 +2560,13 @@ class TestMiscellaneous(TestEmailBase): for subpart in msg.walk(): unless(isinstance(subpart, MyMessage)) + def test_custom_message_does_not_require_arguments(self): + class MyMessage(Message): + def __init__(self): + super().__init__() + msg = self._str_msg("Subject: test\n\ntest", MyMessage) + self.assertTrue(isinstance(msg, MyMessage)) + def test__all__(self): module = __import__('email') self.assertEqual(sorted(module.__all__), [ @@ -3137,25 +3094,6 @@ Here's the message body g.flatten(msg, linesep='\r\n') self.assertEqual(s.getvalue(), text) - def test_crlf_control_via_policy(self): - with openfile('msg_26.txt', newline='\n') as fp: - text = fp.read() - msg = email.message_from_string(text) - s = StringIO() - g = email.generator.Generator(s, policy=email.policy.SMTP) - g.flatten(msg) - self.assertEqual(s.getvalue(), text) - - def test_flatten_linesep_overrides_policy(self): - # msg_27 is lf separated - with openfile('msg_27.txt', newline='\n') as fp: - text = fp.read() - msg = email.message_from_string(text) - s = StringIO() - g = email.generator.Generator(s, policy=email.policy.SMTP) - g.flatten(msg, linesep='\n') - self.assertEqual(s.getvalue(), text) - maxDiff = None def test_multipart_digest_with_extra_mime_headers(self): @@ -3646,44 +3584,6 @@ class Test8BitBytesHandling(unittest.TestCase): s.getvalue(), 'Subject: =?utf-8?b?xb5sdcWlb3XEjWvDvSBrxa/FiA==?=\r\n\r\n') - def test_crlf_control_via_policy(self): - # msg_26 is crlf terminated - with openfile('msg_26.txt', 'rb') as fp: - text = fp.read() - msg = email.message_from_bytes(text) - s = BytesIO() - g = email.generator.BytesGenerator(s, policy=email.policy.SMTP) - g.flatten(msg) - self.assertEqual(s.getvalue(), text) - - def test_flatten_linesep_overrides_policy(self): - # msg_27 is lf separated - with openfile('msg_27.txt', 'rb') as fp: - text = fp.read() - msg = email.message_from_bytes(text) - s = BytesIO() - g = email.generator.BytesGenerator(s, policy=email.policy.SMTP) - g.flatten(msg, linesep='\n') - self.assertEqual(s.getvalue(), text) - - def test_must_be_7bit_handles_unknown_8bit(self): - msg = email.message_from_bytes(self.non_latin_bin_msg) - out = BytesIO() - g = email.generator.BytesGenerator(out, - policy=email.policy.default.clone(must_be_7bit=True)) - g.flatten(msg) - self.assertEqual(out.getvalue(), - self.non_latin_bin_msg_as7bit_wrapped.encode('ascii')) - - def test_must_be_7bit_transforms_8bit_cte(self): - msg = email.message_from_bytes(self.latin_bin_msg) - out = BytesIO() - g = email.generator.BytesGenerator(out, - policy=email.policy.default.clone(must_be_7bit=True)) - g.flatten(msg) - self.assertEqual(out.getvalue(), - self.latin_bin_msg_as7bit.encode('ascii')) - maxDiff = None diff --git a/Lib/test/test_email/test_generator.py b/Lib/test/test_email/test_generator.py index 35ca6c5..8f5fde7 100644 --- a/Lib/test/test_email/test_generator.py +++ b/Lib/test/test_email/test_generator.py @@ -11,6 +11,8 @@ from test.test_email import TestEmailBase class TestGeneratorBase(): + policy = policy.compat32 + long_subject = { 0: textwrap.dedent("""\ To: whom_it_may_concern@example.com @@ -58,11 +60,11 @@ class TestGeneratorBase(): long_subject[100] = long_subject[0] def maxheaderlen_parameter_test(self, n): - msg = self.msgmaker(self.long_subject[0]) + msg = self.msgmaker(self.typ(self.long_subject[0])) s = self.ioclass() g = self.genclass(s, maxheaderlen=n) g.flatten(msg) - self.assertEqual(s.getvalue(), self.long_subject[n]) + self.assertEqual(s.getvalue(), self.typ(self.long_subject[n])) def test_maxheaderlen_parameter_0(self): self.maxheaderlen_parameter_test(0) @@ -77,11 +79,11 @@ class TestGeneratorBase(): self.maxheaderlen_parameter_test(20) def maxheaderlen_policy_test(self, n): - msg = self.msgmaker(self.long_subject[0]) + msg = self.msgmaker(self.typ(self.long_subject[0])) s = self.ioclass() g = self.genclass(s, policy=policy.default.clone(max_line_length=n)) g.flatten(msg) - self.assertEqual(s.getvalue(), self.long_subject[n]) + self.assertEqual(s.getvalue(), self.typ(self.long_subject[n])) def test_maxheaderlen_policy_0(self): self.maxheaderlen_policy_test(0) @@ -96,12 +98,12 @@ class TestGeneratorBase(): self.maxheaderlen_policy_test(20) def maxheaderlen_parm_overrides_policy_test(self, n): - msg = self.msgmaker(self.long_subject[0]) + msg = self.msgmaker(self.typ(self.long_subject[0])) s = self.ioclass() g = self.genclass(s, maxheaderlen=n, policy=policy.default.clone(max_line_length=10)) g.flatten(msg) - self.assertEqual(s.getvalue(), self.long_subject[n]) + self.assertEqual(s.getvalue(), self.typ(self.long_subject[n])) def test_maxheaderlen_parm_overrides_policy_0(self): self.maxheaderlen_parm_overrides_policy_test(0) @@ -115,21 +117,84 @@ class TestGeneratorBase(): def test_maxheaderlen_parm_overrides_policy_20(self): self.maxheaderlen_parm_overrides_policy_test(20) + def test_crlf_control_via_policy(self): + source = "Subject: test\r\n\r\ntest body\r\n" + expected = source + msg = self.msgmaker(self.typ(source)) + s = self.ioclass() + g = self.genclass(s, policy=policy.SMTP) + g.flatten(msg) + self.assertEqual(s.getvalue(), self.typ(expected)) + + def test_flatten_linesep_overrides_policy(self): + source = "Subject: test\n\ntest body\n" + expected = source + msg = self.msgmaker(self.typ(source)) + s = self.ioclass() + g = self.genclass(s, policy=policy.SMTP) + g.flatten(msg, linesep='\n') + self.assertEqual(s.getvalue(), self.typ(expected)) + class TestGenerator(TestGeneratorBase, TestEmailBase): - msgmaker = staticmethod(message_from_string) genclass = Generator ioclass = io.StringIO + typ = str + + def msgmaker(self, msg, policy=None): + policy = self.policy if policy is None else policy + return message_from_string(msg, policy=policy) class TestBytesGenerator(TestGeneratorBase, TestEmailBase): - msgmaker = staticmethod(message_from_bytes) genclass = BytesGenerator ioclass = io.BytesIO - long_subject = {key: x.encode('ascii') - for key, x in TestGeneratorBase.long_subject.items()} + typ = lambda self, x: x.encode('ascii') + + def msgmaker(self, msg, policy=None): + policy = self.policy if policy is None else policy + return message_from_bytes(msg, policy=policy) + + def test_cte_type_7bit_handles_unknown_8bit(self): + source = ("Subject: Maintenant je vous présente mon " + "collègue\n\n").encode('utf-8') + expected = ('Subject: =?unknown-8bit?q?Maintenant_je_vous_pr=C3=A9sente_mon_' + 'coll=C3=A8gue?=\n\n').encode('ascii') + msg = message_from_bytes(source) + s = io.BytesIO() + g = BytesGenerator(s, policy=self.policy.clone(cte_type='7bit')) + g.flatten(msg) + self.assertEqual(s.getvalue(), expected) + + def test_cte_type_7bit_transforms_8bit_cte(self): + source = textwrap.dedent("""\ + From: foo@bar.com + To: Dinsdale + Subject: Nudge nudge, wink, wink + Mime-Version: 1.0 + Content-Type: text/plain; charset="latin-1" + Content-Transfer-Encoding: 8bit + + oh là là, know what I mean, know what I mean? + """).encode('latin1') + msg = message_from_bytes(source) + expected = textwrap.dedent("""\ + From: foo@bar.com + To: Dinsdale + Subject: Nudge nudge, wink, wink + Mime-Version: 1.0 + Content-Type: text/plain; charset="iso-8859-1" + Content-Transfer-Encoding: quoted-printable + + oh l=E0 l=E0, know what I mean, know what I mean? + """).encode('ascii') + s = io.BytesIO() + g = BytesGenerator(s, policy=self.policy.clone(cte_type='7bit', + linesep='\n')) + g.flatten(msg) + self.assertEqual(s.getvalue(), expected) if __name__ == '__main__': diff --git a/Lib/test/test_email/test_parser.py b/Lib/test/test_email/test_parser.py new file mode 100644 index 0000000..864e4c1 --- /dev/null +++ b/Lib/test/test_email/test_parser.py @@ -0,0 +1,276 @@ +import io +import email +import textwrap +import unittest +from email._policybase import Compat32 +from email import errors +from email.message import Message +from test.test_email import TestEmailBase + + +class TestCustomMessage(TestEmailBase): + + class MyMessage(Message): + def __init__(self, policy): + self.check_policy = policy + super().__init__() + + MyPolicy = TestEmailBase.policy.clone(linesep='boo') + + def test_custom_message_gets_policy_if_possible_from_string(self): + msg = email.message_from_string("Subject: bogus\n\nmsg\n", + self.MyMessage, + policy=self.MyPolicy) + self.assertTrue(isinstance(msg, self.MyMessage)) + self.assertIs(msg.check_policy, self.MyPolicy) + + def test_custom_message_gets_policy_if_possible_from_file(self): + source_file = io.StringIO("Subject: bogus\n\nmsg\n") + msg = email.message_from_file(source_file, + self.MyMessage, + policy=self.MyPolicy) + self.assertTrue(isinstance(msg, self.MyMessage)) + self.assertIs(msg.check_policy, self.MyPolicy) + + # XXX add tests for other functions that take Message arg. + + +class TestMessageDefectDetectionBase: + + dup_boundary_msg = textwrap.dedent("""\ + Subject: XX + From: xx@xx.dk + To: XX + Mime-version: 1.0 + Content-type: multipart/mixed; + boundary="MS_Mac_OE_3071477847_720252_MIME_Part" + + --MS_Mac_OE_3071477847_720252_MIME_Part + Content-type: multipart/alternative; + boundary="MS_Mac_OE_3071477847_720252_MIME_Part" + + --MS_Mac_OE_3071477847_720252_MIME_Part + Content-type: text/plain; charset="ISO-8859-1" + Content-transfer-encoding: quoted-printable + + text + + --MS_Mac_OE_3071477847_720252_MIME_Part + Content-type: text/html; charset="ISO-8859-1" + Content-transfer-encoding: quoted-printable + + <HTML></HTML> + + --MS_Mac_OE_3071477847_720252_MIME_Part-- + + --MS_Mac_OE_3071477847_720252_MIME_Part + Content-type: image/gif; name="xx.gif"; + Content-disposition: attachment + Content-transfer-encoding: base64 + + Some removed base64 encoded chars. + + --MS_Mac_OE_3071477847_720252_MIME_Part-- + + """) + + def test_same_boundary_inner_outer(self): + # XXX better would be to actually detect the duplicate. + msg = self._str_msg(self.dup_boundary_msg) + inner = msg.get_payload(0) + self.assertTrue(hasattr(inner, 'defects')) + self.assertEqual(len(self.get_defects(inner)), 1) + self.assertTrue(isinstance(self.get_defects(inner)[0], + errors.StartBoundaryNotFoundDefect)) + + def test_same_boundary_inner_outer_raises_on_defect(self): + with self.assertRaises(errors.StartBoundaryNotFoundDefect): + self._str_msg(self.dup_boundary_msg, + policy=self.policy.clone(raise_on_defect=True)) + + no_boundary_msg = textwrap.dedent("""\ + Date: Fri, 6 Apr 2001 09:23:06 -0800 (GMT-0800) + From: foobar + Subject: broken mail + MIME-Version: 1.0 + Content-Type: multipart/report; report-type=delivery-status; + + --JAB03225.986577786/zinfandel.lacita.com + + One part + + --JAB03225.986577786/zinfandel.lacita.com + Content-Type: message/delivery-status + + Header: Another part + + --JAB03225.986577786/zinfandel.lacita.com-- + """) + + def test_multipart_no_boundary(self): + msg = self._str_msg(self.no_boundary_msg) + self.assertTrue(isinstance(msg.get_payload(), str)) + self.assertEqual(len(self.get_defects(msg)), 2) + self.assertTrue(isinstance(self.get_defects(msg)[0], + errors.NoBoundaryInMultipartDefect)) + self.assertTrue(isinstance(self.get_defects(msg)[1], + errors.MultipartInvariantViolationDefect)) + + def test_multipart_no_boundary_raise_on_defect(self): + with self.assertRaises(errors.NoBoundaryInMultipartDefect): + self._str_msg(self.no_boundary_msg, + policy=self.policy.clone(raise_on_defect=True)) + + multipart_msg = textwrap.dedent("""\ + Date: Wed, 14 Nov 2007 12:56:23 GMT + From: foo@bar.invalid + To: foo@bar.invalid + Subject: Content-Transfer-Encoding: base64 and multipart + MIME-Version: 1.0 + Content-Type: multipart/mixed; + boundary="===============3344438784458119861=="{} + + --===============3344438784458119861== + Content-Type: text/plain + + Test message + + --===============3344438784458119861== + Content-Type: application/octet-stream + Content-Transfer-Encoding: base64 + + YWJj + + --===============3344438784458119861==-- + """) + + def test_multipart_invalid_cte(self): + msg = self._str_msg( + self.multipart_msg.format("\nContent-Transfer-Encoding: base64")) + self.assertEqual(len(self.get_defects(msg)), 1) + self.assertIsInstance(self.get_defects(msg)[0], + errors.InvalidMultipartContentTransferEncodingDefect) + + def test_multipart_invalid_cte_raise_on_defect(self): + with self.assertRaises( + errors.InvalidMultipartContentTransferEncodingDefect): + self._str_msg( + self.multipart_msg.format( + "\nContent-Transfer-Encoding: base64"), + policy=self.policy.clone(raise_on_defect=True)) + + def test_multipart_no_cte_no_defect(self): + msg = self._str_msg(self.multipart_msg.format('')) + self.assertEqual(len(self.get_defects(msg)), 0) + + def test_multipart_valid_cte_no_defect(self): + for cte in ('7bit', '8bit', 'BINary'): + msg = self._str_msg( + self.multipart_msg.format("\nContent-Transfer-Encoding: "+cte)) + self.assertEqual(len(self.get_defects(msg)), 0, "cte="+cte) + + lying_multipart_msg = textwrap.dedent("""\ + From: "Allison Dunlap" <xxx@example.com> + To: yyy@example.com + Subject: 64423 + Date: Sun, 11 Jul 2004 16:09:27 -0300 + MIME-Version: 1.0 + Content-Type: multipart/alternative; + + Blah blah blah + """) + + def test_lying_multipart(self): + msg = self._str_msg(self.lying_multipart_msg) + self.assertTrue(hasattr(msg, 'defects')) + self.assertEqual(len(self.get_defects(msg)), 2) + self.assertTrue(isinstance(self.get_defects(msg)[0], + errors.NoBoundaryInMultipartDefect)) + self.assertTrue(isinstance(self.get_defects(msg)[1], + errors.MultipartInvariantViolationDefect)) + + def test_lying_multipart_raise_on_defect(self): + with self.assertRaises(errors.NoBoundaryInMultipartDefect): + self._str_msg(self.lying_multipart_msg, + policy=self.policy.clone(raise_on_defect=True)) + + missing_start_boundary_msg = textwrap.dedent("""\ + Content-Type: multipart/mixed; boundary="AAA" + From: Mail Delivery Subsystem <xxx@example.com> + To: yyy@example.com + + --AAA + + Stuff + + --AAA + Content-Type: message/rfc822 + + From: webmaster@python.org + To: zzz@example.com + Content-Type: multipart/mixed; boundary="BBB" + + --BBB-- + + --AAA-- + + """) + + def test_missing_start_boundary(self): + # The message structure is: + # + # multipart/mixed + # text/plain + # message/rfc822 + # multipart/mixed [*] + # + # [*] This message is missing its start boundary + outer = self._str_msg(self.missing_start_boundary_msg) + bad = outer.get_payload(1).get_payload(0) + self.assertEqual(len(self.get_defects(bad)), 1) + self.assertTrue(isinstance(self.get_defects(bad)[0], + errors.StartBoundaryNotFoundDefect)) + + def test_missing_start_boundary_raise_on_defect(self): + with self.assertRaises(errors.StartBoundaryNotFoundDefect): + self._str_msg(self.missing_start_boundary_msg, + policy=self.policy.clone(raise_on_defect=True)) + + def test_first_line_is_continuation_header(self): + msg = self._str_msg(' Line 1\nLine 2\nLine 3') + self.assertEqual(msg.keys(), []) + self.assertEqual(msg.get_payload(), 'Line 2\nLine 3') + self.assertEqual(len(self.get_defects(msg)), 1) + self.assertTrue(isinstance(self.get_defects(msg)[0], + errors.FirstHeaderLineIsContinuationDefect)) + self.assertEqual(self.get_defects(msg)[0].line, ' Line 1\n') + + def test_first_line_is_continuation_header_raise_on_defect(self): + with self.assertRaises(errors.FirstHeaderLineIsContinuationDefect): + self._str_msg(' Line 1\nLine 2\nLine 3', + policy=self.policy.clone(raise_on_defect=True)) + + +class TestMessageDefectDetection(TestMessageDefectDetectionBase, TestEmailBase): + + def get_defects(self, obj): + return obj.defects + + +class TestMessageDefectDetectionCapture(TestMessageDefectDetectionBase, + TestEmailBase): + + class CapturePolicy(Compat32): + captured = None + def register_defect(self, obj, defect): + self.captured.append(defect) + + def setUp(self): + self.policy = self.CapturePolicy(captured=list()) + + def get_defects(self, obj): + return self.policy.captured + + +if __name__ == '__main__': + unittest.main() diff --git a/Lib/test/test_email/test_policy.py b/Lib/test/test_email/test_policy.py index 1c65901..07925a7 100644 --- a/Lib/test/test_email/test_policy.py +++ b/Lib/test/test_email/test_policy.py @@ -1,6 +1,10 @@ +import io import types +import textwrap import unittest import email.policy +import email.parser +import email.generator class PolicyAPITests(unittest.TestCase): @@ -11,14 +15,15 @@ class PolicyAPITests(unittest.TestCase): policy_defaults = { 'max_line_length': 78, 'linesep': '\n', - 'must_be_7bit': False, + 'cte_type': '8bit', 'raise_on_defect': False, } # For each policy under test, we give here the values of the attributes # that are different from the defaults for that policy. policies = { - email.policy.Policy(): {}, + email.policy.Compat32(): {}, + email.policy.compat32: {}, email.policy.default: {}, email.policy.SMTP: {'linesep': '\r\n'}, email.policy.HTTP: {'linesep': '\r\n', 'max_line_length': None}, @@ -44,6 +49,18 @@ class PolicyAPITests(unittest.TestCase): self.assertIn(attr, self.policy_defaults, "{} is not fully tested".format(attr)) + def test_abc(self): + with self.assertRaises(TypeError) as cm: + email.policy.Policy() + msg = str(cm.exception) + abstract_methods = ('fold', + 'fold_binary', + 'header_fetch_parse', + 'header_source_parse', + 'header_store_parse') + for method in abstract_methods: + self.assertIn(method, msg) + def test_policy_is_immutable(self): for policy in self.policies: for attr in self.policy_defaults: @@ -88,7 +105,7 @@ class PolicyAPITests(unittest.TestCase): self.defects = [] obj = Dummy() defect = object() - policy = email.policy.Policy() + policy = email.policy.Compat32() policy.register_defect(obj, defect) self.assertEqual(obj.defects, [defect]) defect2 = object() @@ -117,7 +134,7 @@ class PolicyAPITests(unittest.TestCase): email.policy.default.handle_defect(foo, defect2) self.assertEqual(foo.defects, [defect1, defect2]) - class MyPolicy(email.policy.Policy): + class MyPolicy(email.policy.Compat32): defects = None def __init__(self, *args, **kw): super().__init__(*args, defects=[], **kw) @@ -146,5 +163,94 @@ class PolicyAPITests(unittest.TestCase): # For adding subclassed objects, make sure the usual rules apply (subclass # wins), but that the order still works (right overrides left). + +class TestPolicyPropagation(unittest.TestCase): + + # The abstract methods are used by the parser but not by the wrapper + # functions that call it, so if the exception gets raised we know that the + # policy was actually propagated all the way to feedparser. + class MyPolicy(email.policy.Policy): + def badmethod(self, *args, **kw): + raise Exception("test") + fold = fold_binary = header_fetch_parser = badmethod + header_source_parse = header_store_parse = badmethod + + def test_message_from_string(self): + with self.assertRaisesRegex(Exception, "^test$"): + email.message_from_string("Subject: test\n\n", + policy=self.MyPolicy) + + def test_message_from_bytes(self): + with self.assertRaisesRegex(Exception, "^test$"): + email.message_from_bytes(b"Subject: test\n\n", + policy=self.MyPolicy) + + def test_message_from_file(self): + f = io.StringIO('Subject: test\n\n') + with self.assertRaisesRegex(Exception, "^test$"): + email.message_from_file(f, policy=self.MyPolicy) + + def test_message_from_binary_file(self): + f = io.BytesIO(b'Subject: test\n\n') + with self.assertRaisesRegex(Exception, "^test$"): + email.message_from_binary_file(f, policy=self.MyPolicy) + + # These are redundant, but we need them for black-box completeness. + + def test_parser(self): + p = email.parser.Parser(policy=self.MyPolicy) + with self.assertRaisesRegex(Exception, "^test$"): + p.parsestr('Subject: test\n\n') + + def test_bytes_parser(self): + p = email.parser.BytesParser(policy=self.MyPolicy) + with self.assertRaisesRegex(Exception, "^test$"): + p.parsebytes(b'Subject: test\n\n') + + # Now that we've established that all the parse methods get the + # policy in to feedparser, we can use message_from_string for + # the rest of the propagation tests. + + def _make_msg(self, source='Subject: test\n\n', policy=None): + self.policy = email.policy.default.clone() if policy is None else policy + return email.message_from_string(source, policy=self.policy) + + def test_parser_propagates_policy_to_message(self): + msg = self._make_msg() + self.assertIs(msg.policy, self.policy) + + def test_parser_propagates_policy_to_sub_messages(self): + msg = self._make_msg(textwrap.dedent("""\ + Subject: mime test + MIME-Version: 1.0 + Content-Type: multipart/mixed, boundary="XXX" + + --XXX + Content-Type: text/plain + + test + --XXX + Content-Type: text/plain + + test2 + --XXX-- + """)) + for part in msg.walk(): + self.assertIs(part.policy, self.policy) + + def test_message_policy_propagates_to_generator(self): + msg = self._make_msg("Subject: test\nTo: foo\n\n", + policy=email.policy.default.clone(linesep='X')) + s = io.StringIO() + g = email.generator.Generator(s) + g.flatten(msg) + self.assertEqual(s.getvalue(), "Subject: testXTo: fooXX") + + def test_message_policy_used_by_as_string(self): + msg = self._make_msg("Subject: test\nTo: foo\n\n", + policy=email.policy.default.clone(linesep='X')) + self.assertEqual(msg.as_string(), "Subject: testXTo: fooXX") + + if __name__ == '__main__': unittest.main() |