diff options
Diffstat (limited to 'Lib/test/test_email')
-rw-r--r-- | Lib/test/test_email/__init__.py | 2 | ||||
-rw-r--r-- | Lib/test/test_email/test_email.py | 144 | ||||
-rw-r--r-- | Lib/test/test_email/test_generator.py | 136 | ||||
-rw-r--r-- | Lib/test/test_email/test_policy.py | 148 |
4 files changed, 415 insertions, 15 deletions
diff --git a/Lib/test/test_email/__init__.py b/Lib/test/test_email/__init__.py index 69be678..04fdf89 100644 --- a/Lib/test/test_email/__init__.py +++ b/Lib/test/test_email/__init__.py @@ -25,6 +25,8 @@ def openfile(filename, *args, **kws): # Base test class class TestEmailBase(unittest.TestCase): + maxDiff = None + def __init__(self, *args, **kw): super().__init__(*args, **kw) self.addTypeEqualityFunc(bytes, self.assertBytesEqual) diff --git a/Lib/test/test_email/test_email.py b/Lib/test/test_email/test_email.py index 46206c3..1657afc 100644 --- a/Lib/test/test_email/test_email.py +++ b/Lib/test/test_email/test_email.py @@ -1776,7 +1776,12 @@ YXNkZg== # Test some badly formatted messages -class TestNonConformant(TestEmailBase): +class TestNonConformantBase: + + def _msgobj(self, filename): + with openfile(filename) as fp: + return email.message_from_file(fp, policy=self.policy) + def test_parse_missing_minor_type(self): eq = self.assertEqual msg = self._msgobj('msg_14.txt') @@ -1790,17 +1795,18 @@ class TestNonConformant(TestEmailBase): # XXX We can probably eventually do better inner = msg.get_payload(0) unless(hasattr(inner, 'defects')) - self.assertEqual(len(inner.defects), 1) - unless(isinstance(inner.defects[0], + self.assertEqual(len(self.get_defects(inner)), 1) + unless(isinstance(self.get_defects(inner)[0], errors.StartBoundaryNotFoundDefect)) def test_multipart_no_boundary(self): unless = self.assertTrue msg = self._msgobj('msg_25.txt') unless(isinstance(msg.get_payload(), str)) - self.assertEqual(len(msg.defects), 2) - unless(isinstance(msg.defects[0], errors.NoBoundaryInMultipartDefect)) - unless(isinstance(msg.defects[1], + self.assertEqual(len(self.get_defects(msg)), 2) + unless(isinstance(self.get_defects(msg)[0], + errors.NoBoundaryInMultipartDefect)) + unless(isinstance(self.get_defects(msg)[1], errors.MultipartInvariantViolationDefect)) def test_invalid_content_type(self): @@ -1856,9 +1862,10 @@ counter to RFC 2822, there's no separating newline here unless = self.assertTrue msg = self._msgobj('msg_41.txt') unless(hasattr(msg, 'defects')) - self.assertEqual(len(msg.defects), 2) - unless(isinstance(msg.defects[0], errors.NoBoundaryInMultipartDefect)) - unless(isinstance(msg.defects[1], + self.assertEqual(len(self.get_defects(msg)), 2) + unless(isinstance(self.get_defects(msg)[0], + errors.NoBoundaryInMultipartDefect)) + unless(isinstance(self.get_defects(msg)[1], errors.MultipartInvariantViolationDefect)) def test_missing_start_boundary(self): @@ -1872,21 +1879,71 @@ counter to RFC 2822, there's no separating newline here # # [*] This message is missing its start boundary bad = outer.get_payload(1).get_payload(0) - self.assertEqual(len(bad.defects), 1) - self.assertTrue(isinstance(bad.defects[0], + self.assertEqual(len(self.get_defects(bad)), 1) + self.assertTrue(isinstance(self.get_defects(bad)[0], errors.StartBoundaryNotFoundDefect)) def test_first_line_is_continuation_header(self): eq = self.assertEqual m = ' Line 1\nLine 2\nLine 3' - msg = email.message_from_string(m) + msg = email.message_from_string(m, policy=self.policy) eq(msg.keys(), []) eq(msg.get_payload(), 'Line 2\nLine 3') - eq(len(msg.defects), 1) - self.assertTrue(isinstance(msg.defects[0], + eq(len(self.get_defects(msg)), 1) + self.assertTrue(isinstance(self.get_defects(msg)[0], errors.FirstHeaderLineIsContinuationDefect)) - eq(msg.defects[0].line, ' Line 1\n') + eq(self.get_defects(msg)[0].line, ' Line 1\n') + + +class TestNonConformant(TestNonConformantBase, TestEmailBase): + + policy=email.policy.default + + def get_defects(self, obj): + return obj.defects + + +class TestNonConformantCapture(TestNonConformantBase, TestEmailBase): + + class CapturePolicy(email.policy.Policy): + captured = None + def register_defect(self, obj, defect): + self.captured.append(defect) + + def setUp(self): + self.policy = self.CapturePolicy(captured=list()) + + def get_defects(self, obj): + return self.policy.captured + +class TestRaisingDefects(TestEmailBase): + + def _msgobj(self, filename): + with openfile(filename) as fp: + return email.message_from_file(fp, policy=email.policy.strict) + + def test_same_boundary_inner_outer(self): + with self.assertRaises(errors.StartBoundaryNotFoundDefect): + self._msgobj('msg_15.txt') + + def test_multipart_no_boundary(self): + with self.assertRaises(errors.NoBoundaryInMultipartDefect): + self._msgobj('msg_25.txt') + + def test_lying_multipart(self): + with self.assertRaises(errors.NoBoundaryInMultipartDefect): + self._msgobj('msg_41.txt') + + + def test_missing_start_boundary(self): + with self.assertRaises(errors.StartBoundaryNotFoundDefect): + self._msgobj('msg_42.txt') + + def test_first_line_is_continuation_header(self): + m = ' Line 1\nLine 2\nLine 3' + with self.assertRaises(errors.FirstHeaderLineIsContinuationDefect): + msg = email.message_from_string(m, policy=email.policy.strict) # Test RFC 2047 header encoding and decoding @@ -2997,6 +3054,25 @@ Here's the message body g.flatten(msg, linesep='\r\n') self.assertEqual(s.getvalue(), text) + def test_crlf_control_via_policy(self): + with openfile('msg_26.txt', newline='\n') as fp: + text = fp.read() + msg = email.message_from_string(text) + s = StringIO() + g = email.generator.Generator(s, policy=email.policy.SMTP) + g.flatten(msg) + self.assertEqual(s.getvalue(), text) + + def test_flatten_linesep_overrides_policy(self): + # msg_27 is lf separated + with openfile('msg_27.txt', newline='\n') as fp: + text = fp.read() + msg = email.message_from_string(text) + s = StringIO() + g = email.generator.Generator(s, policy=email.policy.SMTP) + g.flatten(msg, linesep='\n') + self.assertEqual(s.getvalue(), text) + maxDiff = None def test_multipart_digest_with_extra_mime_headers(self): @@ -3463,6 +3539,44 @@ class Test8BitBytesHandling(unittest.TestCase): g.flatten(msg) self.assertEqual(s.getvalue(), source) + def test_crlf_control_via_policy(self): + # msg_26 is crlf terminated + with openfile('msg_26.txt', 'rb') as fp: + text = fp.read() + msg = email.message_from_bytes(text) + s = BytesIO() + g = email.generator.BytesGenerator(s, policy=email.policy.SMTP) + g.flatten(msg) + self.assertEqual(s.getvalue(), text) + + def test_flatten_linesep_overrides_policy(self): + # msg_27 is lf separated + with openfile('msg_27.txt', 'rb') as fp: + text = fp.read() + msg = email.message_from_bytes(text) + s = BytesIO() + g = email.generator.BytesGenerator(s, policy=email.policy.SMTP) + g.flatten(msg, linesep='\n') + self.assertEqual(s.getvalue(), text) + + def test_must_be_7bit_handles_unknown_8bit(self): + msg = email.message_from_bytes(self.non_latin_bin_msg) + out = BytesIO() + g = email.generator.BytesGenerator(out, + policy=email.policy.default.clone(must_be_7bit=True)) + g.flatten(msg) + self.assertEqual(out.getvalue(), + self.non_latin_bin_msg_as7bit_wrapped.encode('ascii')) + + def test_must_be_7bit_transforms_8bit_cte(self): + msg = email.message_from_bytes(self.latin_bin_msg) + out = BytesIO() + g = email.generator.BytesGenerator(out, + policy=email.policy.default.clone(must_be_7bit=True)) + g.flatten(msg) + self.assertEqual(out.getvalue(), + self.latin_bin_msg_as7bit.encode('ascii')) + maxDiff = None diff --git a/Lib/test/test_email/test_generator.py b/Lib/test/test_email/test_generator.py new file mode 100644 index 0000000..35ca6c5 --- /dev/null +++ b/Lib/test/test_email/test_generator.py @@ -0,0 +1,136 @@ +import io +import textwrap +import unittest +from email import message_from_string, message_from_bytes +from email.generator import Generator, BytesGenerator +from email import policy +from test.test_email import TestEmailBase + +# XXX: move generator tests from test_email into here at some point. + + +class TestGeneratorBase(): + + long_subject = { + 0: textwrap.dedent("""\ + To: whom_it_may_concern@example.com + From: nobody_you_want_to_know@example.com + Subject: We the willing led by the unknowing are doing the + impossible for the ungrateful. We have done so much for so long with so little + we are now qualified to do anything with nothing. + + None + """), + 40: textwrap.dedent("""\ + To: whom_it_may_concern@example.com + From:\x20 + nobody_you_want_to_know@example.com + Subject: We the willing led by the + unknowing are doing the + impossible for the ungrateful. We have + done so much for so long with so little + we are now qualified to do anything + with nothing. + + None + """), + 20: textwrap.dedent("""\ + To:\x20 + whom_it_may_concern@example.com + From:\x20 + nobody_you_want_to_know@example.com + Subject: We the + willing led by the + unknowing are doing + the + impossible for the + ungrateful. We have + done so much for so + long with so little + we are now + qualified to do + anything with + nothing. + + None + """), + } + long_subject[100] = long_subject[0] + + def maxheaderlen_parameter_test(self, n): + msg = self.msgmaker(self.long_subject[0]) + s = self.ioclass() + g = self.genclass(s, maxheaderlen=n) + g.flatten(msg) + self.assertEqual(s.getvalue(), self.long_subject[n]) + + def test_maxheaderlen_parameter_0(self): + self.maxheaderlen_parameter_test(0) + + def test_maxheaderlen_parameter_100(self): + self.maxheaderlen_parameter_test(100) + + def test_maxheaderlen_parameter_40(self): + self.maxheaderlen_parameter_test(40) + + def test_maxheaderlen_parameter_20(self): + self.maxheaderlen_parameter_test(20) + + def maxheaderlen_policy_test(self, n): + msg = self.msgmaker(self.long_subject[0]) + s = self.ioclass() + g = self.genclass(s, policy=policy.default.clone(max_line_length=n)) + g.flatten(msg) + self.assertEqual(s.getvalue(), self.long_subject[n]) + + def test_maxheaderlen_policy_0(self): + self.maxheaderlen_policy_test(0) + + def test_maxheaderlen_policy_100(self): + self.maxheaderlen_policy_test(100) + + def test_maxheaderlen_policy_40(self): + self.maxheaderlen_policy_test(40) + + def test_maxheaderlen_policy_20(self): + self.maxheaderlen_policy_test(20) + + def maxheaderlen_parm_overrides_policy_test(self, n): + msg = self.msgmaker(self.long_subject[0]) + s = self.ioclass() + g = self.genclass(s, maxheaderlen=n, + policy=policy.default.clone(max_line_length=10)) + g.flatten(msg) + self.assertEqual(s.getvalue(), self.long_subject[n]) + + def test_maxheaderlen_parm_overrides_policy_0(self): + self.maxheaderlen_parm_overrides_policy_test(0) + + def test_maxheaderlen_parm_overrides_policy_100(self): + self.maxheaderlen_parm_overrides_policy_test(100) + + def test_maxheaderlen_parm_overrides_policy_40(self): + self.maxheaderlen_parm_overrides_policy_test(40) + + def test_maxheaderlen_parm_overrides_policy_20(self): + self.maxheaderlen_parm_overrides_policy_test(20) + + +class TestGenerator(TestGeneratorBase, TestEmailBase): + + msgmaker = staticmethod(message_from_string) + genclass = Generator + ioclass = io.StringIO + + +class TestBytesGenerator(TestGeneratorBase, TestEmailBase): + + msgmaker = staticmethod(message_from_bytes) + genclass = BytesGenerator + ioclass = io.BytesIO + long_subject = {key: x.encode('ascii') + for key, x in TestGeneratorBase.long_subject.items()} + + +if __name__ == '__main__': + unittest.main() diff --git a/Lib/test/test_email/test_policy.py b/Lib/test/test_email/test_policy.py new file mode 100644 index 0000000..086ce40 --- /dev/null +++ b/Lib/test/test_email/test_policy.py @@ -0,0 +1,148 @@ +import types +import unittest +import email.policy + +class PolicyAPITests(unittest.TestCase): + + longMessage = True + + # These default values are the ones set on email.policy.default. + # If any of these defaults change, the docs must be updated. + policy_defaults = { + 'max_line_length': 78, + 'linesep': '\n', + 'must_be_7bit': False, + 'raise_on_defect': False, + } + + # For each policy under test, we give here the values of the attributes + # that are different from the defaults for that policy. + policies = { + email.policy.Policy(): {}, + email.policy.default: {}, + email.policy.SMTP: {'linesep': '\r\n'}, + email.policy.HTTP: {'linesep': '\r\n', 'max_line_length': None}, + email.policy.strict: {'raise_on_defect': True}, + } + + def test_defaults(self): + for policy, changed_defaults in self.policies.items(): + expected = self.policy_defaults.copy() + expected.update(changed_defaults) + for attr, value in expected.items(): + self.assertEqual(getattr(policy, attr), value, + ("change {} docs/docstrings if defaults have " + "changed").format(policy)) + + def test_all_attributes_covered(self): + for attr in dir(email.policy.default): + if (attr.startswith('_') or + isinstance(getattr(email.policy.Policy, attr), + types.FunctionType)): + continue + else: + self.assertIn(attr, self.policy_defaults, + "{} is not fully tested".format(attr)) + + def test_policy_is_immutable(self): + for policy in self.policies: + for attr in self.policy_defaults: + with self.assertRaisesRegex(AttributeError, attr+".*read-only"): + setattr(policy, attr, None) + with self.assertRaisesRegex(AttributeError, 'no attribute.*foo'): + policy.foo = None + + def test_set_policy_attrs_when_calledl(self): + testattrdict = { attr: None for attr in self.policy_defaults } + for policyclass in self.policies: + policy = policyclass.clone(**testattrdict) + for attr in self.policy_defaults: + self.assertIsNone(getattr(policy, attr)) + + def test_reject_non_policy_keyword_when_called(self): + for policyclass in self.policies: + with self.assertRaises(TypeError): + policyclass(this_keyword_should_not_be_valid=None) + with self.assertRaises(TypeError): + policyclass(newtline=None) + + def test_policy_addition(self): + expected = self.policy_defaults.copy() + p1 = email.policy.default.clone(max_line_length=100) + p2 = email.policy.default.clone(max_line_length=50) + added = p1 + p2 + expected.update(max_line_length=50) + for attr, value in expected.items(): + self.assertEqual(getattr(added, attr), value) + added = p2 + p1 + expected.update(max_line_length=100) + for attr, value in expected.items(): + self.assertEqual(getattr(added, attr), value) + added = added + email.policy.default + for attr, value in expected.items(): + self.assertEqual(getattr(added, attr), value) + + def test_register_defect(self): + class Dummy: + def __init__(self): + self.defects = [] + obj = Dummy() + defect = object() + policy = email.policy.Policy() + policy.register_defect(obj, defect) + self.assertEqual(obj.defects, [defect]) + defect2 = object() + policy.register_defect(obj, defect2) + self.assertEqual(obj.defects, [defect, defect2]) + + class MyObj: + def __init__(self): + self.defects = [] + + class MyDefect(Exception): + pass + + def test_handle_defect_raises_on_strict(self): + foo = self.MyObj() + defect = self.MyDefect("the telly is broken") + with self.assertRaisesRegex(self.MyDefect, "the telly is broken"): + email.policy.strict.handle_defect(foo, defect) + + def test_handle_defect_registers_defect(self): + foo = self.MyObj() + defect1 = self.MyDefect("one") + email.policy.default.handle_defect(foo, defect1) + self.assertEqual(foo.defects, [defect1]) + defect2 = self.MyDefect("two") + email.policy.default.handle_defect(foo, defect2) + self.assertEqual(foo.defects, [defect1, defect2]) + + class MyPolicy(email.policy.Policy): + defects = [] + def register_defect(self, obj, defect): + self.defects.append(defect) + + def test_overridden_register_defect_still_raises(self): + foo = self.MyObj() + defect = self.MyDefect("the telly is broken") + with self.assertRaisesRegex(self.MyDefect, "the telly is broken"): + self.MyPolicy(raise_on_defect=True).handle_defect(foo, defect) + + def test_overriden_register_defect_works(self): + foo = self.MyObj() + defect1 = self.MyDefect("one") + my_policy = self.MyPolicy() + my_policy.handle_defect(foo, defect1) + self.assertEqual(my_policy.defects, [defect1]) + self.assertEqual(foo.defects, []) + defect2 = self.MyDefect("two") + my_policy.handle_defect(foo, defect2) + self.assertEqual(my_policy.defects, [defect1, defect2]) + self.assertEqual(foo.defects, []) + + # XXX: Need subclassing tests. + # For adding subclassed objects, make sure the usual rules apply (subclass + # wins), but that the order still works (right overrides left). + +if __name__ == '__main__': + unittest.main() |