diff options
Diffstat (limited to 'Lib/test/test_email')
-rw-r--r-- | Lib/test/test_email/__init__.py | 26 | ||||
-rw-r--r-- | Lib/test/test_email/test_contentmanager.py | 796 | ||||
-rw-r--r-- | Lib/test/test_email/test_email.py | 42 | ||||
-rw-r--r-- | Lib/test/test_email/test_headerregistry.py | 14 | ||||
-rw-r--r-- | Lib/test/test_email/test_message.py | 758 | ||||
-rw-r--r-- | Lib/test/test_email/test_policy.py | 1 | ||||
-rw-r--r-- | Lib/test/test_email/torture_test.py | 2 |
7 files changed, 1630 insertions, 9 deletions
diff --git a/Lib/test/test_email/__init__.py b/Lib/test/test_email/__init__.py index f206ace..d8896ee 100644 --- a/Lib/test/test_email/__init__.py +++ b/Lib/test/test_email/__init__.py @@ -2,6 +2,7 @@ import os import sys import unittest import test.support +import collections import email from email.message import Message from email._policybase import compat32 @@ -42,6 +43,8 @@ class TestEmailBase(unittest.TestCase): # here we make minimal changes in the test_email tests compared to their # pre-3.3 state. policy = compat32 + # Likewise, the default message object is Message. + message = Message def __init__(self, *args, **kw): super().__init__(*args, **kw) @@ -54,11 +57,23 @@ class TestEmailBase(unittest.TestCase): with openfile(filename) as fp: return email.message_from_file(fp, policy=self.policy) - def _str_msg(self, string, message=Message, policy=None): + def _str_msg(self, string, message=None, policy=None): if policy is None: policy = self.policy + if message is None: + message = self.message return email.message_from_string(string, message, policy=policy) + def _bytes_msg(self, bytestring, message=None, policy=None): + if policy is None: + policy = self.policy + if message is None: + message = self.message + return email.message_from_bytes(bytestring, message, policy=policy) + + def _make_message(self): + return self.message(policy=self.policy) + def _bytes_repr(self, b): return [repr(x) for x in b.splitlines(keepends=True)] @@ -123,6 +138,7 @@ def parameterize(cls): """ paramdicts = {} + testers = collections.defaultdict(list) for name, attr in cls.__dict__.items(): if name.endswith('_params'): if not hasattr(attr, 'keys'): @@ -134,7 +150,15 @@ def parameterize(cls): d[n] = x attr = d paramdicts[name[:-7] + '_as_'] = attr + if '_as_' in name: + testers[name.split('_as_')[0] + '_as_'].append(name) testfuncs = {} + for name in paramdicts: + if name not in testers: + raise ValueError("No tester found for {}".format(name)) + for name in testers: + if name not in paramdicts: + raise ValueError("No params found for {}".format(name)) for name, attr in cls.__dict__.items(): for paramsname, paramsdict in paramdicts.items(): if name.startswith(paramsname): diff --git a/Lib/test/test_email/test_contentmanager.py b/Lib/test/test_email/test_contentmanager.py new file mode 100644 index 0000000..cdb04e4 --- /dev/null +++ b/Lib/test/test_email/test_contentmanager.py @@ -0,0 +1,796 @@ +import unittest +from test.test_email import TestEmailBase, parameterize +import textwrap +from email import policy +from email.message import EmailMessage +from email.contentmanager import ContentManager, raw_data_manager + + +@parameterize +class TestContentManager(TestEmailBase): + + policy = policy.default + message = EmailMessage + + get_key_params = { + 'full_type': (1, 'text/plain',), + 'maintype_only': (2, 'text',), + 'null_key': (3, '',), + } + + def get_key_as_get_content_key(self, order, key): + def foo_getter(msg, foo=None): + bar = msg['X-Bar-Header'] + return foo, bar + cm = ContentManager() + cm.add_get_handler(key, foo_getter) + m = self._make_message() + m['Content-Type'] = 'text/plain' + m['X-Bar-Header'] = 'foo' + self.assertEqual(cm.get_content(m, foo='bar'), ('bar', 'foo')) + + def get_key_as_get_content_key_order(self, order, key): + def bar_getter(msg): + return msg['X-Bar-Header'] + def foo_getter(msg): + return msg['X-Foo-Header'] + cm = ContentManager() + cm.add_get_handler(key, foo_getter) + for precedence, key in self.get_key_params.values(): + if precedence > order: + cm.add_get_handler(key, bar_getter) + m = self._make_message() + m['Content-Type'] = 'text/plain' + m['X-Bar-Header'] = 'bar' + m['X-Foo-Header'] = 'foo' + self.assertEqual(cm.get_content(m), ('foo')) + + def test_get_content_raises_if_unknown_mimetype_and_no_default(self): + cm = ContentManager() + m = self._make_message() + m['Content-Type'] = 'text/plain' + with self.assertRaisesRegex(KeyError, 'text/plain'): + cm.get_content(m) + + class BaseThing(str): + pass + baseobject_full_path = __name__ + '.' + 'TestContentManager.BaseThing' + class Thing(BaseThing): + pass + testobject_full_path = __name__ + '.' + 'TestContentManager.Thing' + + set_key_params = { + 'type': (0, Thing,), + 'full_path': (1, testobject_full_path,), + 'qualname': (2, 'TestContentManager.Thing',), + 'name': (3, 'Thing',), + 'base_type': (4, BaseThing,), + 'base_full_path': (5, baseobject_full_path,), + 'base_qualname': (6, 'TestContentManager.BaseThing',), + 'base_name': (7, 'BaseThing',), + 'str_type': (8, str,), + 'str_full_path': (9, 'builtins.str',), + 'str_name': (10, 'str',), # str name and qualname are the same + 'null_key': (11, None,), + } + + def set_key_as_set_content_key(self, order, key): + def foo_setter(msg, obj, foo=None): + msg['X-Foo-Header'] = foo + msg.set_payload(obj) + cm = ContentManager() + cm.add_set_handler(key, foo_setter) + m = self._make_message() + msg_obj = self.Thing() + cm.set_content(m, msg_obj, foo='bar') + self.assertEqual(m['X-Foo-Header'], 'bar') + self.assertEqual(m.get_payload(), msg_obj) + + def set_key_as_set_content_key_order(self, order, key): + def foo_setter(msg, obj): + msg['X-FooBar-Header'] = 'foo' + msg.set_payload(obj) + def bar_setter(msg, obj): + msg['X-FooBar-Header'] = 'bar' + cm = ContentManager() + cm.add_set_handler(key, foo_setter) + for precedence, key in self.get_key_params.values(): + if precedence > order: + cm.add_set_handler(key, bar_setter) + m = self._make_message() + msg_obj = self.Thing() + cm.set_content(m, msg_obj) + self.assertEqual(m['X-FooBar-Header'], 'foo') + self.assertEqual(m.get_payload(), msg_obj) + + def test_set_content_raises_if_unknown_type_and_no_default(self): + cm = ContentManager() + m = self._make_message() + msg_obj = self.Thing() + with self.assertRaisesRegex(KeyError, self.testobject_full_path): + cm.set_content(m, msg_obj) + + def test_set_content_raises_if_called_on_multipart(self): + cm = ContentManager() + m = self._make_message() + m['Content-Type'] = 'multipart/foo' + with self.assertRaises(TypeError): + cm.set_content(m, 'test') + + def test_set_content_calls_clear_content(self): + m = self._make_message() + m['Content-Foo'] = 'bar' + m['Content-Type'] = 'text/html' + m['To'] = 'test' + m.set_payload('abc') + cm = ContentManager() + cm.add_set_handler(str, lambda *args, **kw: None) + m.set_content('xyz', content_manager=cm) + self.assertIsNone(m['Content-Foo']) + self.assertIsNone(m['Content-Type']) + self.assertEqual(m['To'], 'test') + self.assertIsNone(m.get_payload()) + + +@parameterize +class TestRawDataManager(TestEmailBase): + # Note: these tests are dependent on the order in which headers are added + # to the message objects by the code. There's no defined ordering in + # RFC5322/MIME, so this makes the tests more fragile than the standards + # require. However, if the header order changes it is best to understand + # *why*, and make sure it isn't a subtle bug in whatever change was + # applied. + + policy = policy.default.clone(max_line_length=60, + content_manager=raw_data_manager) + message = EmailMessage + + def test_get_text_plain(self): + m = self._str_msg(textwrap.dedent("""\ + Content-Type: text/plain + + Basic text. + """)) + self.assertEqual(raw_data_manager.get_content(m), "Basic text.\n") + + def test_get_text_html(self): + m = self._str_msg(textwrap.dedent("""\ + Content-Type: text/html + + <p>Basic text.</p> + """)) + self.assertEqual(raw_data_manager.get_content(m), + "<p>Basic text.</p>\n") + + def test_get_text_plain_latin1(self): + m = self._bytes_msg(textwrap.dedent("""\ + Content-Type: text/plain; charset=latin1 + + Basìc tëxt. + """).encode('latin1')) + self.assertEqual(raw_data_manager.get_content(m), "Basìc tëxt.\n") + + def test_get_text_plain_latin1_quoted_printable(self): + m = self._str_msg(textwrap.dedent("""\ + Content-Type: text/plain; charset="latin-1" + Content-Transfer-Encoding: quoted-printable + + Bas=ECc t=EBxt. + """)) + self.assertEqual(raw_data_manager.get_content(m), "Basìc tëxt.\n") + + def test_get_text_plain_utf8_base64(self): + m = self._str_msg(textwrap.dedent("""\ + Content-Type: text/plain; charset="utf8" + Content-Transfer-Encoding: base64 + + QmFzw6xjIHTDq3h0Lgo= + """)) + self.assertEqual(raw_data_manager.get_content(m), "Basìc tëxt.\n") + + def test_get_text_plain_bad_utf8_quoted_printable(self): + m = self._str_msg(textwrap.dedent("""\ + Content-Type: text/plain; charset="utf8" + Content-Transfer-Encoding: quoted-printable + + Bas=c3=acc t=c3=abxt=fd. + """)) + self.assertEqual(raw_data_manager.get_content(m), "Basìc tëxt�.\n") + + def test_get_text_plain_bad_utf8_quoted_printable_ignore_errors(self): + m = self._str_msg(textwrap.dedent("""\ + Content-Type: text/plain; charset="utf8" + Content-Transfer-Encoding: quoted-printable + + Bas=c3=acc t=c3=abxt=fd. + """)) + self.assertEqual(raw_data_manager.get_content(m, errors='ignore'), + "Basìc tëxt.\n") + + def test_get_text_plain_utf8_base64_recoverable_bad_CTE_data(self): + m = self._str_msg(textwrap.dedent("""\ + Content-Type: text/plain; charset="utf8" + Content-Transfer-Encoding: base64 + + QmFzw6xjIHTDq3h0Lgo\xFF= + """)) + self.assertEqual(raw_data_manager.get_content(m, errors='ignore'), + "Basìc tëxt.\n") + + def test_get_text_invalid_keyword(self): + m = self._str_msg(textwrap.dedent("""\ + Content-Type: text/plain + + Basic text. + """)) + with self.assertRaises(TypeError): + raw_data_manager.get_content(m, foo='ignore') + + def test_get_non_text(self): + template = textwrap.dedent("""\ + Content-Type: {} + Content-Transfer-Encoding: base64 + + Ym9ndXMgZGF0YQ== + """) + for maintype in 'audio image video application'.split(): + with self.subTest(maintype=maintype): + m = self._str_msg(template.format(maintype+'/foo')) + self.assertEqual(raw_data_manager.get_content(m), b"bogus data") + + def test_get_non_text_invalid_keyword(self): + m = self._str_msg(textwrap.dedent("""\ + Content-Type: image/jpg + Content-Transfer-Encoding: base64 + + Ym9ndXMgZGF0YQ== + """)) + with self.assertRaises(TypeError): + raw_data_manager.get_content(m, errors='ignore') + + def test_get_raises_on_multipart(self): + m = self._str_msg(textwrap.dedent("""\ + Content-Type: multipart/mixed; boundary="===" + + --=== + --===-- + """)) + with self.assertRaises(KeyError): + raw_data_manager.get_content(m) + + def test_get_message_rfc822_and_external_body(self): + template = textwrap.dedent("""\ + Content-Type: message/{} + + To: foo@example.com + From: bar@example.com + Subject: example + + an example message + """) + for subtype in 'rfc822 external-body'.split(): + with self.subTest(subtype=subtype): + m = self._str_msg(template.format(subtype)) + sub_msg = raw_data_manager.get_content(m) + self.assertIsInstance(sub_msg, self.message) + self.assertEqual(raw_data_manager.get_content(sub_msg), + "an example message\n") + self.assertEqual(sub_msg['to'], 'foo@example.com') + self.assertEqual(sub_msg['from'].addresses[0].username, 'bar') + + def test_get_message_non_rfc822_or_external_body_yields_bytes(self): + m = self._str_msg(textwrap.dedent("""\ + Content-Type: message/partial + + To: foo@example.com + From: bar@example.com + Subject: example + + The real body is in another message. + """)) + self.assertEqual(raw_data_manager.get_content(m)[:10], b'To: foo@ex') + + def test_set_text_plain(self): + m = self._make_message() + content = "Simple message.\n" + raw_data_manager.set_content(m, content) + self.assertEqual(str(m), textwrap.dedent("""\ + Content-Type: text/plain; charset="utf-8" + Content-Transfer-Encoding: 7bit + + Simple message. + """)) + self.assertEqual(m.get_payload(decode=True).decode('utf-8'), content) + self.assertEqual(m.get_content(), content) + + def test_set_text_html(self): + m = self._make_message() + content = "<p>Simple message.</p>\n" + raw_data_manager.set_content(m, content, subtype='html') + self.assertEqual(str(m), textwrap.dedent("""\ + Content-Type: text/html; charset="utf-8" + Content-Transfer-Encoding: 7bit + + <p>Simple message.</p> + """)) + self.assertEqual(m.get_payload(decode=True).decode('utf-8'), content) + self.assertEqual(m.get_content(), content) + + def test_set_text_charset_latin_1(self): + m = self._make_message() + content = "Simple message.\n" + raw_data_manager.set_content(m, content, charset='latin-1') + self.assertEqual(str(m), textwrap.dedent("""\ + Content-Type: text/plain; charset="iso-8859-1" + Content-Transfer-Encoding: 7bit + + Simple message. + """)) + self.assertEqual(m.get_payload(decode=True).decode('utf-8'), content) + self.assertEqual(m.get_content(), content) + + def test_set_text_short_line_minimal_non_ascii_heuristics(self): + m = self._make_message() + content = "et là il est monté sur moi et il commence à m'éto.\n" + raw_data_manager.set_content(m, content) + self.assertEqual(bytes(m), textwrap.dedent("""\ + Content-Type: text/plain; charset="utf-8" + Content-Transfer-Encoding: 8bit + + et là il est monté sur moi et il commence à m'éto. + """).encode('utf-8')) + self.assertEqual(m.get_payload(decode=True).decode('utf-8'), content) + self.assertEqual(m.get_content(), content) + + def test_set_text_long_line_minimal_non_ascii_heuristics(self): + m = self._make_message() + content = ("j'ai un problème de python. il est sorti de son" + " vivarium. et là il est monté sur moi et il commence" + " à m'éto.\n") + raw_data_manager.set_content(m, content) + self.assertEqual(bytes(m), textwrap.dedent("""\ + Content-Type: text/plain; charset="utf-8" + Content-Transfer-Encoding: quoted-printable + + j'ai un probl=C3=A8me de python. il est sorti de son vivari= + um. et l=C3=A0 il est mont=C3=A9 sur moi et il commence = + =C3=A0 m'=C3=A9to. + """).encode('utf-8')) + self.assertEqual(m.get_payload(decode=True).decode('utf-8'), content) + self.assertEqual(m.get_content(), content) + + def test_set_text_11_lines_long_line_minimal_non_ascii_heuristics(self): + m = self._make_message() + content = '\n'*10 + ( + "j'ai un problème de python. il est sorti de son" + " vivarium. et là il est monté sur moi et il commence" + " à m'éto.\n") + raw_data_manager.set_content(m, content) + self.assertEqual(bytes(m), textwrap.dedent("""\ + Content-Type: text/plain; charset="utf-8" + Content-Transfer-Encoding: quoted-printable + """ + '\n'*10 + """ + j'ai un probl=C3=A8me de python. il est sorti de son vivari= + um. et l=C3=A0 il est mont=C3=A9 sur moi et il commence = + =C3=A0 m'=C3=A9to. + """).encode('utf-8')) + self.assertEqual(m.get_payload(decode=True).decode('utf-8'), content) + self.assertEqual(m.get_content(), content) + + def test_set_text_maximal_non_ascii_heuristics(self): + m = self._make_message() + content = "áàäéèęöő.\n" + raw_data_manager.set_content(m, content) + self.assertEqual(bytes(m), textwrap.dedent("""\ + Content-Type: text/plain; charset="utf-8" + Content-Transfer-Encoding: 8bit + + áàäéèęöő. + """).encode('utf-8')) + self.assertEqual(m.get_payload(decode=True).decode('utf-8'), content) + self.assertEqual(m.get_content(), content) + + def test_set_text_11_lines_maximal_non_ascii_heuristics(self): + m = self._make_message() + content = '\n'*10 + "áàäéèęöő.\n" + raw_data_manager.set_content(m, content) + self.assertEqual(bytes(m), textwrap.dedent("""\ + Content-Type: text/plain; charset="utf-8" + Content-Transfer-Encoding: 8bit + """ + '\n'*10 + """ + áàäéèęöő. + """).encode('utf-8')) + self.assertEqual(m.get_payload(decode=True).decode('utf-8'), content) + self.assertEqual(m.get_content(), content) + + def test_set_text_long_line_maximal_non_ascii_heuristics(self): + m = self._make_message() + content = ("áàäéèęöőáàäéèęöőáàäéèęöőáàäéèęöő" + "áàäéèęöőáàäéèęöőáàäéèęöőáàäéèęöő" + "áàäéèęöőáàäéèęöőáàäéèęöőáàäéèęöő.\n") + raw_data_manager.set_content(m, content) + self.assertEqual(bytes(m), textwrap.dedent("""\ + Content-Type: text/plain; charset="utf-8" + Content-Transfer-Encoding: base64 + + w6HDoMOkw6nDqMSZw7bFkcOhw6DDpMOpw6jEmcO2xZHDocOgw6TDqcOoxJnD + tsWRw6HDoMOkw6nDqMSZw7bFkcOhw6DDpMOpw6jEmcO2xZHDocOgw6TDqcOo + xJnDtsWRw6HDoMOkw6nDqMSZw7bFkcOhw6DDpMOpw6jEmcO2xZHDocOgw6TD + qcOoxJnDtsWRw6HDoMOkw6nDqMSZw7bFkcOhw6DDpMOpw6jEmcO2xZHDocOg + w6TDqcOoxJnDtsWRLgo= + """).encode('utf-8')) + self.assertEqual(m.get_payload(decode=True).decode('utf-8'), content) + self.assertEqual(m.get_content(), content) + + def test_set_text_11_lines_long_line_maximal_non_ascii_heuristics(self): + # Yes, it chooses "wrong" here. It's a heuristic. So this result + # could change if we come up with a better heuristic. + m = self._make_message() + content = ('\n'*10 + + "áàäéèęöőáàäéèęöőáàäéèęöőáàäéèęöő" + "áàäéèęöőáàäéèęöőáàäéèęöőáàäéèęöő" + "áàäéèęöőáàäéèęöőáàäéèęöőáàäéèęöő.\n") + raw_data_manager.set_content(m, "\n"*10 + + "áàäéèęöőáàäéèęöőáàäéèęöőáàäéèęöő" + "áàäéèęöőáàäéèęöőáàäéèęöőáàäéèęöő" + "áàäéèęöőáàäéèęöőáàäéèęöőáàäéèęöő.\n") + self.assertEqual(bytes(m), textwrap.dedent("""\ + Content-Type: text/plain; charset="utf-8" + Content-Transfer-Encoding: quoted-printable + """ + '\n'*10 + """ + =C3=A1=C3=A0=C3=A4=C3=A9=C3=A8=C4=99=C3=B6=C5=91=C3=A1=C3= + =A0=C3=A4=C3=A9=C3=A8=C4=99=C3=B6=C5=91=C3=A1=C3=A0=C3=A4= + =C3=A9=C3=A8=C4=99=C3=B6=C5=91=C3=A1=C3=A0=C3=A4=C3=A9=C3= + =A8=C4=99=C3=B6=C5=91=C3=A1=C3=A0=C3=A4=C3=A9=C3=A8=C4=99= + =C3=B6=C5=91=C3=A1=C3=A0=C3=A4=C3=A9=C3=A8=C4=99=C3=B6=C5= + =91=C3=A1=C3=A0=C3=A4=C3=A9=C3=A8=C4=99=C3=B6=C5=91=C3=A1= + =C3=A0=C3=A4=C3=A9=C3=A8=C4=99=C3=B6=C5=91=C3=A1=C3=A0=C3= + =A4=C3=A9=C3=A8=C4=99=C3=B6=C5=91=C3=A1=C3=A0=C3=A4=C3=A9= + =C3=A8=C4=99=C3=B6=C5=91=C3=A1=C3=A0=C3=A4=C3=A9=C3=A8=C4= + =99=C3=B6=C5=91=C3=A1=C3=A0=C3=A4=C3=A9=C3=A8=C4=99=C3=B6= + =C5=91. + """).encode('utf-8')) + self.assertEqual(m.get_payload(decode=True).decode('utf-8'), content) + self.assertEqual(m.get_content(), content) + + def test_set_text_non_ascii_with_cte_7bit_raises(self): + m = self._make_message() + with self.assertRaises(UnicodeError): + raw_data_manager.set_content(m,"áàäéèęöő.\n", cte='7bit') + + def test_set_text_non_ascii_with_charset_ascii_raises(self): + m = self._make_message() + with self.assertRaises(UnicodeError): + raw_data_manager.set_content(m,"áàäéèęöő.\n", charset='ascii') + + def test_set_text_non_ascii_with_cte_7bit_and_charset_ascii_raises(self): + m = self._make_message() + with self.assertRaises(UnicodeError): + raw_data_manager.set_content(m,"áàäéèęöő.\n", cte='7bit', charset='ascii') + + def test_set_message(self): + m = self._make_message() + m['Subject'] = "Forwarded message" + content = self._make_message() + content['To'] = 'python@vivarium.org' + content['From'] = 'police@monty.org' + content['Subject'] = "get back in your box" + content.set_content("Or face the comfy chair.") + raw_data_manager.set_content(m, content) + self.assertEqual(str(m), textwrap.dedent("""\ + Subject: Forwarded message + Content-Type: message/rfc822 + Content-Transfer-Encoding: 8bit + + To: python@vivarium.org + From: police@monty.org + Subject: get back in your box + Content-Type: text/plain; charset="utf-8" + Content-Transfer-Encoding: 7bit + MIME-Version: 1.0 + + Or face the comfy chair. + """)) + payload = m.get_payload(0) + self.assertIsInstance(payload, self.message) + self.assertEqual(str(payload), str(content)) + self.assertIsInstance(m.get_content(), self.message) + self.assertEqual(str(m.get_content()), str(content)) + + def test_set_message_with_non_ascii_and_coercion_to_7bit(self): + m = self._make_message() + m['Subject'] = "Escape report" + content = self._make_message() + content['To'] = 'police@monty.org' + content['From'] = 'victim@monty.org' + content['Subject'] = "Help" + content.set_content("j'ai un problème de python. il est sorti de son" + " vivarium.") + raw_data_manager.set_content(m, content) + self.assertEqual(bytes(m), textwrap.dedent("""\ + Subject: Escape report + Content-Type: message/rfc822 + Content-Transfer-Encoding: 8bit + + To: police@monty.org + From: victim@monty.org + Subject: Help + Content-Type: text/plain; charset="utf-8" + Content-Transfer-Encoding: 8bit + MIME-Version: 1.0 + + j'ai un problème de python. il est sorti de son vivarium. + """).encode('utf-8')) + # The choice of base64 for the body encoding is because generator + # doesn't bother with heuristics and uses it unconditionally for utf-8 + # text. + # XXX: the first cte should be 7bit, too...that's a generator bug. + # XXX: the line length in the body also looks like a generator bug. + self.assertEqual(m.as_string(maxheaderlen=self.policy.max_line_length), + textwrap.dedent("""\ + Subject: Escape report + Content-Type: message/rfc822 + Content-Transfer-Encoding: 8bit + + To: police@monty.org + From: victim@monty.org + Subject: Help + Content-Type: text/plain; charset="utf-8" + Content-Transfer-Encoding: base64 + MIME-Version: 1.0 + + aidhaSB1biBwcm9ibMOobWUgZGUgcHl0aG9uLiBpbCBlc3Qgc29ydGkgZGUgc29uIHZpdmFyaXVt + Lgo= + """)) + self.assertIsInstance(m.get_content(), self.message) + self.assertEqual(str(m.get_content()), str(content)) + + def test_set_message_invalid_cte_raises(self): + m = self._make_message() + content = self._make_message() + for cte in 'quoted-printable base64'.split(): + for subtype in 'rfc822 external-body'.split(): + with self.subTest(cte=cte, subtype=subtype): + with self.assertRaises(ValueError) as ar: + m.set_content(content, subtype, cte=cte) + exc = str(ar.exception) + self.assertIn(cte, exc) + self.assertIn(subtype, exc) + subtype = 'external-body' + for cte in '8bit binary'.split(): + with self.subTest(cte=cte, subtype=subtype): + with self.assertRaises(ValueError) as ar: + m.set_content(content, subtype, cte=cte) + exc = str(ar.exception) + self.assertIn(cte, exc) + self.assertIn(subtype, exc) + + def test_set_image_jpg(self): + for content in (b"bogus content", + bytearray(b"bogus content"), + memoryview(b"bogus content")): + with self.subTest(content=content): + m = self._make_message() + raw_data_manager.set_content(m, content, 'image', 'jpeg') + self.assertEqual(str(m), textwrap.dedent("""\ + Content-Type: image/jpeg + Content-Transfer-Encoding: base64 + + Ym9ndXMgY29udGVudA== + """)) + self.assertEqual(m.get_payload(decode=True), content) + self.assertEqual(m.get_content(), content) + + def test_set_audio_aif_with_quoted_printable_cte(self): + # Why you would use qp, I don't know, but it is technically supported. + # XXX: the incorrect line length is because binascii.b2a_qp doesn't + # support a line length parameter, but we must use it to get newline + # encoding. + # XXX: what about that lack of tailing newline? Do we actually handle + # that correctly in all cases? That is, if the *source* has an + # unencoded newline, do we add an extra newline to the returned payload + # or not? And can that actually be disambiguated based on the RFC? + m = self._make_message() + content = b'b\xFFgus\tcon\nt\rent ' + b'z'*100 + m.set_content(content, 'audio', 'aif', cte='quoted-printable') + self.assertEqual(bytes(m), textwrap.dedent("""\ + Content-Type: audio/aif + Content-Transfer-Encoding: quoted-printable + MIME-Version: 1.0 + + b=FFgus=09con=0At=0Dent=20zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz= + zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz""").encode('latin-1')) + self.assertEqual(m.get_payload(decode=True), content) + self.assertEqual(m.get_content(), content) + + def test_set_video_mpeg_with_binary_cte(self): + m = self._make_message() + content = b'b\xFFgus\tcon\nt\rent ' + b'z'*100 + m.set_content(content, 'video', 'mpeg', cte='binary') + self.assertEqual(bytes(m), textwrap.dedent("""\ + Content-Type: video/mpeg + Content-Transfer-Encoding: binary + MIME-Version: 1.0 + + """).encode('ascii') + + # XXX: the second \n ought to be a \r, but generator gets it wrong. + # THIS MEANS WE DON'T ACTUALLY SUPPORT THE 'binary' CTE. + b'b\xFFgus\tcon\nt\nent zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz' + + b'zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz') + self.assertEqual(m.get_payload(decode=True), content) + self.assertEqual(m.get_content(), content) + + def test_set_application_octet_stream_with_8bit_cte(self): + # In 8bit mode, univeral line end logic applies. It is up to the + # application to make sure the lines are short enough; we don't check. + m = self._make_message() + content = b'b\xFFgus\tcon\nt\rent\n' + b'z'*60 + b'\n' + m.set_content(content, 'application', 'octet-stream', cte='8bit') + self.assertEqual(bytes(m), textwrap.dedent("""\ + Content-Type: application/octet-stream + Content-Transfer-Encoding: 8bit + MIME-Version: 1.0 + + """).encode('ascii') + + b'b\xFFgus\tcon\nt\nent\n' + + b'zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz\n') + self.assertEqual(m.get_payload(decode=True), content) + self.assertEqual(m.get_content(), content) + + def test_set_headers_from_header_objects(self): + m = self._make_message() + content = "Simple message.\n" + header_factory = self.policy.header_factory + raw_data_manager.set_content(m, content, headers=( + header_factory("To", "foo@example.com"), + header_factory("From", "foo@example.com"), + header_factory("Subject", "I'm talking to myself."))) + self.assertEqual(str(m), textwrap.dedent("""\ + Content-Type: text/plain; charset="utf-8" + To: foo@example.com + From: foo@example.com + Subject: I'm talking to myself. + Content-Transfer-Encoding: 7bit + + Simple message. + """)) + + def test_set_headers_from_strings(self): + m = self._make_message() + content = "Simple message.\n" + raw_data_manager.set_content(m, content, headers=( + "X-Foo-Header: foo", + "X-Bar-Header: bar",)) + self.assertEqual(str(m), textwrap.dedent("""\ + Content-Type: text/plain; charset="utf-8" + X-Foo-Header: foo + X-Bar-Header: bar + Content-Transfer-Encoding: 7bit + + Simple message. + """)) + + def test_set_headers_with_invalid_duplicate_string_header_raises(self): + m = self._make_message() + content = "Simple message.\n" + with self.assertRaisesRegex(ValueError, 'Content-Type'): + raw_data_manager.set_content(m, content, headers=( + "Content-Type: foo/bar",) + ) + + def test_set_headers_with_invalid_duplicate_header_header_raises(self): + m = self._make_message() + content = "Simple message.\n" + header_factory = self.policy.header_factory + with self.assertRaisesRegex(ValueError, 'Content-Type'): + raw_data_manager.set_content(m, content, headers=( + header_factory("Content-Type", " foo/bar"),) + ) + + def test_set_headers_with_defective_string_header_raises(self): + m = self._make_message() + content = "Simple message.\n" + with self.assertRaisesRegex(ValueError, 'a@fairly@@invalid@address'): + raw_data_manager.set_content(m, content, headers=( + 'To: a@fairly@@invalid@address',) + ) + print(m['To'].defects) + + def test_set_headers_with_defective_header_header_raises(self): + m = self._make_message() + content = "Simple message.\n" + header_factory = self.policy.header_factory + with self.assertRaisesRegex(ValueError, 'a@fairly@@invalid@address'): + raw_data_manager.set_content(m, content, headers=( + header_factory('To', 'a@fairly@@invalid@address'),) + ) + print(m['To'].defects) + + def test_set_disposition_inline(self): + m = self._make_message() + m.set_content('foo', disposition='inline') + self.assertEqual(m['Content-Disposition'], 'inline') + + def test_set_disposition_attachment(self): + m = self._make_message() + m.set_content('foo', disposition='attachment') + self.assertEqual(m['Content-Disposition'], 'attachment') + + def test_set_disposition_foo(self): + m = self._make_message() + m.set_content('foo', disposition='foo') + self.assertEqual(m['Content-Disposition'], 'foo') + + # XXX: we should have a 'strict' policy mode (beyond raise_on_defect) that + # would cause 'foo' above to raise. + + def test_set_filename(self): + m = self._make_message() + m.set_content('foo', filename='bar.txt') + self.assertEqual(m['Content-Disposition'], + 'attachment; filename="bar.txt"') + + def test_set_filename_and_disposition_inline(self): + m = self._make_message() + m.set_content('foo', disposition='inline', filename='bar.txt') + self.assertEqual(m['Content-Disposition'], 'inline; filename="bar.txt"') + + def test_set_non_ascii_filename(self): + m = self._make_message() + m.set_content('foo', filename='ábárî.txt') + self.assertEqual(bytes(m), textwrap.dedent("""\ + Content-Type: text/plain; charset="utf-8" + Content-Transfer-Encoding: 7bit + Content-Disposition: attachment; + filename*=utf-8''%C3%A1b%C3%A1r%C3%AE.txt + MIME-Version: 1.0 + + foo + """).encode('ascii')) + + content_object_params = { + 'text_plain': ('content', ()), + 'text_html': ('content', ('html',)), + 'application_octet_stream': (b'content', + ('application', 'octet_stream')), + 'image_jpeg': (b'content', ('image', 'jpeg')), + 'message_rfc822': (message(), ()), + 'message_external_body': (message(), ('external-body',)), + } + + def content_object_as_header_receiver(self, obj, mimetype): + m = self._make_message() + m.set_content(obj, *mimetype, headers=( + 'To: foo@example.com', + 'From: bar@simple.net')) + self.assertEqual(m['to'], 'foo@example.com') + self.assertEqual(m['from'], 'bar@simple.net') + + def content_object_as_disposition_inline_receiver(self, obj, mimetype): + m = self._make_message() + m.set_content(obj, *mimetype, disposition='inline') + self.assertEqual(m['Content-Disposition'], 'inline') + + def content_object_as_non_ascii_filename_receiver(self, obj, mimetype): + m = self._make_message() + m.set_content(obj, *mimetype, disposition='inline', filename='bár.txt') + self.assertEqual(m['Content-Disposition'], 'inline; filename="bár.txt"') + self.assertEqual(m.get_filename(), "bár.txt") + self.assertEqual(m['Content-Disposition'].params['filename'], "bár.txt") + + def content_object_as_cid_receiver(self, obj, mimetype): + m = self._make_message() + m.set_content(obj, *mimetype, cid='some_random_stuff') + self.assertEqual(m['Content-ID'], 'some_random_stuff') + + def content_object_as_params_receiver(self, obj, mimetype): + m = self._make_message() + params = {'foo': 'bár', 'abc': 'xyz'} + m.set_content(obj, *mimetype, params=params) + if isinstance(obj, str): + params['charset'] = 'utf-8' + self.assertEqual(m['Content-Type'].params, params) + + +if __name__ == '__main__': + unittest.main() diff --git a/Lib/test/test_email/test_email.py b/Lib/test/test_email/test_email.py index 73ec2a6..d1e234d 100644 --- a/Lib/test/test_email/test_email.py +++ b/Lib/test/test_email/test_email.py @@ -281,15 +281,42 @@ class TestMessageAPI(TestEmailBase): self.assertIn('TO', msg) def test_as_string(self): - eq = self.ndiffAssertEqual msg = self._msgobj('msg_01.txt') with openfile('msg_01.txt') as fp: text = fp.read() - eq(text, str(msg)) + self.assertEqual(text, str(msg)) fullrepr = msg.as_string(unixfrom=True) lines = fullrepr.split('\n') self.assertTrue(lines[0].startswith('From ')) - eq(text, NL.join(lines[1:])) + self.assertEqual(text, NL.join(lines[1:])) + + def test_as_string_policy(self): + msg = self._msgobj('msg_01.txt') + newpolicy = msg.policy.clone(linesep='\r\n') + fullrepr = msg.as_string(policy=newpolicy) + s = StringIO() + g = Generator(s, policy=newpolicy) + g.flatten(msg) + self.assertEqual(fullrepr, s.getvalue()) + + def test_as_bytes(self): + msg = self._msgobj('msg_01.txt') + with openfile('msg_01.txt') as fp: + data = fp.read().encode('ascii') + self.assertEqual(data, bytes(msg)) + fullrepr = msg.as_bytes(unixfrom=True) + lines = fullrepr.split(b'\n') + self.assertTrue(lines[0].startswith(b'From ')) + self.assertEqual(data, b'\n'.join(lines[1:])) + + def test_as_bytes_policy(self): + msg = self._msgobj('msg_01.txt') + newpolicy = msg.policy.clone(linesep='\r\n') + fullrepr = msg.as_bytes(policy=newpolicy) + s = BytesIO() + g = BytesGenerator(s,policy=newpolicy) + g.flatten(msg) + self.assertEqual(fullrepr, s.getvalue()) # test_headerregistry.TestContentTypeHeader.bad_params def test_bad_param(self): @@ -742,8 +769,15 @@ class TestEncoders(unittest.TestCase): # whose output character set is 7bit gets a transfer-encoding # of 7bit. eq = self.assertEqual - msg = MIMEText('文', _charset='euc-jp') + msg = MIMEText('文\n', _charset='euc-jp') eq(msg['content-transfer-encoding'], '7bit') + eq(msg.as_string(), textwrap.dedent("""\ + MIME-Version: 1.0 + Content-Type: text/plain; charset="iso-2022-jp" + Content-Transfer-Encoding: 7bit + + \x1b$BJ8\x1b(B + """)) def test_qp_encode_latin1(self): msg = MIMEText('\xe1\xf6\n', 'text', 'ISO-8859-1') diff --git a/Lib/test/test_email/test_headerregistry.py b/Lib/test/test_email/test_headerregistry.py index f829f83..f2df372 100644 --- a/Lib/test/test_email/test_headerregistry.py +++ b/Lib/test/test_email/test_headerregistry.py @@ -661,7 +661,7 @@ class TestContentTypeHeader(TestHeaderBase): 'text/plain; name="ascii_is_the_default"'), 'rfc2231_bad_character_in_charset_parameter_value': ( - "text/plain; charset*=ascii''utf-8%E2%80%9D", + "text/plain; charset*=ascii''utf-8%F1%F2%F3", 'text/plain', 'text', 'plain', @@ -669,6 +669,18 @@ class TestContentTypeHeader(TestHeaderBase): [errors.UndecodableBytesDefect], 'text/plain; charset="utf-8\uFFFD\uFFFD\uFFFD"'), + 'rfc2231_utf_8_in_supposedly_ascii_charset_parameter_value': ( + "text/plain; charset*=ascii''utf-8%E2%80%9D", + 'text/plain', + 'text', + 'plain', + {'charset': 'utf-8”'}, + [errors.UndecodableBytesDefect], + 'text/plain; charset="utf-8”"', + ), + # XXX: if the above were *re*folded, it would get tagged as utf-8 + # instead of ascii in the param, since it now contains non-ASCII. + 'rfc2231_encoded_then_unencoded_segments': ( ('application/x-foo;' '\tname*0*="us-ascii\'en-us\'My";' diff --git a/Lib/test/test_email/test_message.py b/Lib/test/test_email/test_message.py index 8cc3f80..c761c62 100644 --- a/Lib/test/test_email/test_message.py +++ b/Lib/test/test_email/test_message.py @@ -1,6 +1,13 @@ import unittest -from email import policy -from test.test_email import TestEmailBase +import textwrap +from email import policy, message_from_string +from email.message import EmailMessage, MIMEPart +from test.test_email import TestEmailBase, parameterize + + +# Helper. +def first(iterable): + return next(filter(lambda x: x is not None, iterable), None) class Test(TestEmailBase): @@ -13,6 +20,753 @@ class Test(TestEmailBase): with self.assertRaises(ValueError): m['To'] = 'xyz@abc' + def test_rfc2043_auto_decoded_and_emailmessage_used(self): + m = message_from_string(textwrap.dedent("""\ + Subject: Ayons asperges pour le =?utf-8?q?d=C3=A9jeuner?= + From: =?utf-8?q?Pep=C3=A9?= Le Pew <pepe@example.com> + To: "Penelope Pussycat" <"penelope@example.com"> + MIME-Version: 1.0 + Content-Type: text/plain; charset="utf-8" + + sample text + """), policy=policy.default) + self.assertEqual(m['subject'], "Ayons asperges pour le déjeuner") + self.assertEqual(m['from'], "Pepé Le Pew <pepe@example.com>") + self.assertIsInstance(m, EmailMessage) + + +@parameterize +class TestEmailMessageBase: + + policy = policy.default + + # The first argument is a triple (related, html, plain) of indices into the + # list returned by 'walk' called on a Message constructed from the third. + # The indices indicate which part should match the corresponding part-type + # when passed to get_body (ie: the "first" part of that type in the + # message). The second argument is a list of indices into the 'walk' list + # of the attachments that should be returned by a call to + # 'iter_attachments'. The third argument is a list of indices into 'walk' + # that should be returned by a call to 'iter_parts'. Note that the first + # item returned by 'walk' is the Message itself. + + message_params = { + + 'empty_message': ( + (None, None, 0), + (), + (), + ""), + + 'non_mime_plain': ( + (None, None, 0), + (), + (), + textwrap.dedent("""\ + To: foo@example.com + + simple text body + """)), + + 'mime_non_text': ( + (None, None, None), + (), + (), + textwrap.dedent("""\ + To: foo@example.com + MIME-Version: 1.0 + Content-Type: image/jpg + + bogus body. + """)), + + 'plain_html_alternative': ( + (None, 2, 1), + (), + (1, 2), + textwrap.dedent("""\ + To: foo@example.com + MIME-Version: 1.0 + Content-Type: multipart/alternative; boundary="===" + + preamble + + --=== + Content-Type: text/plain + + simple body + + --=== + Content-Type: text/html + + <p>simple body</p> + --===-- + """)), + + 'plain_html_mixed': ( + (None, 2, 1), + (), + (1, 2), + textwrap.dedent("""\ + To: foo@example.com + MIME-Version: 1.0 + Content-Type: multipart/mixed; boundary="===" + + preamble + + --=== + Content-Type: text/plain + + simple body + + --=== + Content-Type: text/html + + <p>simple body</p> + + --===-- + """)), + + 'plain_html_attachment_mixed': ( + (None, None, 1), + (2,), + (1, 2), + textwrap.dedent("""\ + To: foo@example.com + MIME-Version: 1.0 + Content-Type: multipart/mixed; boundary="===" + + --=== + Content-Type: text/plain + + simple body + + --=== + Content-Type: text/html + Content-Disposition: attachment + + <p>simple body</p> + + --===-- + """)), + + 'html_text_attachment_mixed': ( + (None, 2, None), + (1,), + (1, 2), + textwrap.dedent("""\ + To: foo@example.com + MIME-Version: 1.0 + Content-Type: multipart/mixed; boundary="===" + + --=== + Content-Type: text/plain + Content-Disposition: AtTaChment + + simple body + + --=== + Content-Type: text/html + + <p>simple body</p> + + --===-- + """)), + + 'html_text_attachment_inline_mixed': ( + (None, 2, 1), + (), + (1, 2), + textwrap.dedent("""\ + To: foo@example.com + MIME-Version: 1.0 + Content-Type: multipart/mixed; boundary="===" + + --=== + Content-Type: text/plain + Content-Disposition: InLine + + simple body + + --=== + Content-Type: text/html + Content-Disposition: inline + + <p>simple body</p> + + --===-- + """)), + + # RFC 2387 + 'related': ( + (0, 1, None), + (2,), + (1, 2), + textwrap.dedent("""\ + To: foo@example.com + MIME-Version: 1.0 + Content-Type: multipart/related; boundary="==="; type=text/html + + --=== + Content-Type: text/html + + <p>simple body</p> + + --=== + Content-Type: image/jpg + Content-ID: <image1> + + bogus data + + --===-- + """)), + + # This message structure will probably never be seen in the wild, but + # it proves we distinguish between text parts based on 'start'. The + # content would not, of course, actually work :) + 'related_with_start': ( + (0, 2, None), + (1,), + (1, 2), + textwrap.dedent("""\ + To: foo@example.com + MIME-Version: 1.0 + Content-Type: multipart/related; boundary="==="; type=text/html; + start="<body>" + + --=== + Content-Type: text/html + Content-ID: <include> + + useless text + + --=== + Content-Type: text/html + Content-ID: <body> + + <p>simple body</p> + <!--#include file="<include>"--> + + --===-- + """)), + + + 'mixed_alternative_plain_related': ( + (3, 4, 2), + (6, 7), + (1, 6, 7), + textwrap.dedent("""\ + To: foo@example.com + MIME-Version: 1.0 + Content-Type: multipart/mixed; boundary="===" + + --=== + Content-Type: multipart/alternative; boundary="+++" + + --+++ + Content-Type: text/plain + + simple body + + --+++ + Content-Type: multipart/related; boundary="___" + + --___ + Content-Type: text/html + + <p>simple body</p> + + --___ + Content-Type: image/jpg + Content-ID: <image1@cid> + + bogus jpg body + + --___-- + + --+++-- + + --=== + Content-Type: image/jpg + Content-Disposition: attachment + + bogus jpg body + + --=== + Content-Type: image/jpg + Content-Disposition: AttacHmenT + + another bogus jpg body + + --===-- + """)), + + # This structure suggested by Stephen J. Turnbull...may not exist/be + # supported in the wild, but we want to support it. + 'mixed_related_alternative_plain_html': ( + (1, 4, 3), + (6, 7), + (1, 6, 7), + textwrap.dedent("""\ + To: foo@example.com + MIME-Version: 1.0 + Content-Type: multipart/mixed; boundary="===" + + --=== + Content-Type: multipart/related; boundary="+++" + + --+++ + Content-Type: multipart/alternative; boundary="___" + + --___ + Content-Type: text/plain + + simple body + + --___ + Content-Type: text/html + + <p>simple body</p> + + --___-- + + --+++ + Content-Type: image/jpg + Content-ID: <image1@cid> + + bogus jpg body + + --+++-- + + --=== + Content-Type: image/jpg + Content-Disposition: attachment + + bogus jpg body + + --=== + Content-Type: image/jpg + Content-Disposition: attachment + + another bogus jpg body + + --===-- + """)), + + # Same thing, but proving we only look at the root part, which is the + # first one if there isn't any start parameter. That is, this is a + # broken related. + 'mixed_related_alternative_plain_html_wrong_order': ( + (1, None, None), + (6, 7), + (1, 6, 7), + textwrap.dedent("""\ + To: foo@example.com + MIME-Version: 1.0 + Content-Type: multipart/mixed; boundary="===" + + --=== + Content-Type: multipart/related; boundary="+++" + + --+++ + Content-Type: image/jpg + Content-ID: <image1@cid> + + bogus jpg body + + --+++ + Content-Type: multipart/alternative; boundary="___" + + --___ + Content-Type: text/plain + + simple body + + --___ + Content-Type: text/html + + <p>simple body</p> + + --___-- + + --+++-- + + --=== + Content-Type: image/jpg + Content-Disposition: attachment + + bogus jpg body + + --=== + Content-Type: image/jpg + Content-Disposition: attachment + + another bogus jpg body + + --===-- + """)), + + 'message_rfc822': ( + (None, None, None), + (), + (), + textwrap.dedent("""\ + To: foo@example.com + MIME-Version: 1.0 + Content-Type: message/rfc822 + + To: bar@example.com + From: robot@examp.com + + this is a message body. + """)), + + 'mixed_text_message_rfc822': ( + (None, None, 1), + (2,), + (1, 2), + textwrap.dedent("""\ + To: foo@example.com + MIME-Version: 1.0 + Content-Type: multipart/mixed; boundary="===" + + --=== + Content-Type: text/plain + + Your message has bounced, ser. + + --=== + Content-Type: message/rfc822 + + To: bar@example.com + From: robot@examp.com + + this is a message body. + + --===-- + """)), + + } + + def message_as_get_body(self, body_parts, attachments, parts, msg): + m = self._str_msg(msg) + allparts = list(m.walk()) + expected = [None if n is None else allparts[n] for n in body_parts] + related = 0; html = 1; plain = 2 + self.assertEqual(m.get_body(), first(expected)) + self.assertEqual(m.get_body(preferencelist=( + 'related', 'html', 'plain')), + first(expected)) + self.assertEqual(m.get_body(preferencelist=('related', 'html')), + first(expected[related:html+1])) + self.assertEqual(m.get_body(preferencelist=('related', 'plain')), + first([expected[related], expected[plain]])) + self.assertEqual(m.get_body(preferencelist=('html', 'plain')), + first(expected[html:plain+1])) + self.assertEqual(m.get_body(preferencelist=['related']), + expected[related]) + self.assertEqual(m.get_body(preferencelist=['html']), expected[html]) + self.assertEqual(m.get_body(preferencelist=['plain']), expected[plain]) + self.assertEqual(m.get_body(preferencelist=('plain', 'html')), + first(expected[plain:html-1:-1])) + self.assertEqual(m.get_body(preferencelist=('plain', 'related')), + first([expected[plain], expected[related]])) + self.assertEqual(m.get_body(preferencelist=('html', 'related')), + first(expected[html::-1])) + self.assertEqual(m.get_body(preferencelist=('plain', 'html', 'related')), + first(expected[::-1])) + self.assertEqual(m.get_body(preferencelist=('html', 'plain', 'related')), + first([expected[html], + expected[plain], + expected[related]])) + + def message_as_iter_attachment(self, body_parts, attachments, parts, msg): + m = self._str_msg(msg) + allparts = list(m.walk()) + attachments = [allparts[n] for n in attachments] + self.assertEqual(list(m.iter_attachments()), attachments) + + def message_as_iter_parts(self, body_parts, attachments, parts, msg): + m = self._str_msg(msg) + allparts = list(m.walk()) + parts = [allparts[n] for n in parts] + self.assertEqual(list(m.iter_parts()), parts) + + class _TestContentManager: + def get_content(self, msg, *args, **kw): + return msg, args, kw + def set_content(self, msg, *args, **kw): + self.msg = msg + self.args = args + self.kw = kw + + def test_get_content_with_cm(self): + m = self._str_msg('') + cm = self._TestContentManager() + self.assertEqual(m.get_content(content_manager=cm), (m, (), {})) + msg, args, kw = m.get_content('foo', content_manager=cm, bar=1, k=2) + self.assertEqual(msg, m) + self.assertEqual(args, ('foo',)) + self.assertEqual(kw, dict(bar=1, k=2)) + + def test_get_content_default_cm_comes_from_policy(self): + p = policy.default.clone(content_manager=self._TestContentManager()) + m = self._str_msg('', policy=p) + self.assertEqual(m.get_content(), (m, (), {})) + msg, args, kw = m.get_content('foo', bar=1, k=2) + self.assertEqual(msg, m) + self.assertEqual(args, ('foo',)) + self.assertEqual(kw, dict(bar=1, k=2)) + + def test_set_content_with_cm(self): + m = self._str_msg('') + cm = self._TestContentManager() + m.set_content(content_manager=cm) + self.assertEqual(cm.msg, m) + self.assertEqual(cm.args, ()) + self.assertEqual(cm.kw, {}) + m.set_content('foo', content_manager=cm, bar=1, k=2) + self.assertEqual(cm.msg, m) + self.assertEqual(cm.args, ('foo',)) + self.assertEqual(cm.kw, dict(bar=1, k=2)) + + def test_set_content_default_cm_comes_from_policy(self): + cm = self._TestContentManager() + p = policy.default.clone(content_manager=cm) + m = self._str_msg('', policy=p) + m.set_content() + self.assertEqual(cm.msg, m) + self.assertEqual(cm.args, ()) + self.assertEqual(cm.kw, {}) + m.set_content('foo', bar=1, k=2) + self.assertEqual(cm.msg, m) + self.assertEqual(cm.args, ('foo',)) + self.assertEqual(cm.kw, dict(bar=1, k=2)) + + # outcome is whether xxx_method should raise ValueError error when called + # on multipart/subtype. Blank outcome means it depends on xxx (add + # succeeds, make raises). Note: 'none' means there are content-type + # headers but payload is None...this happening in practice would be very + # unusual, so treating it as if there were content seems reasonable. + # method subtype outcome + subtype_params = ( + ('related', 'no_content', 'succeeds'), + ('related', 'none', 'succeeds'), + ('related', 'plain', 'succeeds'), + ('related', 'related', ''), + ('related', 'alternative', 'raises'), + ('related', 'mixed', 'raises'), + ('alternative', 'no_content', 'succeeds'), + ('alternative', 'none', 'succeeds'), + ('alternative', 'plain', 'succeeds'), + ('alternative', 'related', 'succeeds'), + ('alternative', 'alternative', ''), + ('alternative', 'mixed', 'raises'), + ('mixed', 'no_content', 'succeeds'), + ('mixed', 'none', 'succeeds'), + ('mixed', 'plain', 'succeeds'), + ('mixed', 'related', 'succeeds'), + ('mixed', 'alternative', 'succeeds'), + ('mixed', 'mixed', ''), + ) + + def _make_subtype_test_message(self, subtype): + m = self.message() + payload = None + msg_headers = [ + ('To', 'foo@bar.com'), + ('From', 'bar@foo.com'), + ] + if subtype != 'no_content': + ('content-shadow', 'Logrus'), + msg_headers.append(('X-Random-Header', 'Corwin')) + if subtype == 'text': + payload = '' + msg_headers.append(('Content-Type', 'text/plain')) + m.set_payload('') + elif subtype != 'no_content': + payload = [] + msg_headers.append(('Content-Type', 'multipart/' + subtype)) + msg_headers.append(('X-Trump', 'Random')) + m.set_payload(payload) + for name, value in msg_headers: + m[name] = value + return m, msg_headers, payload + + def _check_disallowed_subtype_raises(self, m, method_name, subtype, method): + with self.assertRaises(ValueError) as ar: + getattr(m, method)() + exc_text = str(ar.exception) + self.assertIn(subtype, exc_text) + self.assertIn(method_name, exc_text) + + def _check_make_multipart(self, m, msg_headers, payload): + count = 0 + for name, value in msg_headers: + if not name.lower().startswith('content-'): + self.assertEqual(m[name], value) + count += 1 + self.assertEqual(len(m), count+1) # +1 for new Content-Type + part = next(m.iter_parts()) + count = 0 + for name, value in msg_headers: + if name.lower().startswith('content-'): + self.assertEqual(part[name], value) + count += 1 + self.assertEqual(len(part), count) + self.assertEqual(part.get_payload(), payload) + + def subtype_as_make(self, method, subtype, outcome): + m, msg_headers, payload = self._make_subtype_test_message(subtype) + make_method = 'make_' + method + if outcome in ('', 'raises'): + self._check_disallowed_subtype_raises(m, method, subtype, make_method) + return + getattr(m, make_method)() + self.assertEqual(m.get_content_maintype(), 'multipart') + self.assertEqual(m.get_content_subtype(), method) + if subtype == 'no_content': + self.assertEqual(len(m.get_payload()), 0) + self.assertEqual(m.items(), + msg_headers + [('Content-Type', + 'multipart/'+method)]) + else: + self.assertEqual(len(m.get_payload()), 1) + self._check_make_multipart(m, msg_headers, payload) + + def subtype_as_make_with_boundary(self, method, subtype, outcome): + # Doing all variation is a bit of overkill... + m = self.message() + if outcome in ('', 'raises'): + m['Content-Type'] = 'multipart/' + subtype + with self.assertRaises(ValueError) as cm: + getattr(m, 'make_' + method)() + return + if subtype == 'plain': + m['Content-Type'] = 'text/plain' + elif subtype != 'no_content': + m['Content-Type'] = 'multipart/' + subtype + getattr(m, 'make_' + method)(boundary="abc") + self.assertTrue(m.is_multipart()) + self.assertEqual(m.get_boundary(), 'abc') + + def test_policy_on_part_made_by_make_comes_from_message(self): + for method in ('make_related', 'make_alternative', 'make_mixed'): + m = self.message(policy=self.policy.clone(content_manager='foo')) + m['Content-Type'] = 'text/plain' + getattr(m, method)() + self.assertEqual(m.get_payload(0).policy.content_manager, 'foo') + + class _TestSetContentManager: + def set_content(self, msg, content, *args, **kw): + msg['Content-Type'] = 'text/plain' + msg.set_payload(content) + + def subtype_as_add(self, method, subtype, outcome): + m, msg_headers, payload = self._make_subtype_test_message(subtype) + cm = self._TestSetContentManager() + add_method = 'add_attachment' if method=='mixed' else 'add_' + method + if outcome == 'raises': + self._check_disallowed_subtype_raises(m, method, subtype, add_method) + return + getattr(m, add_method)('test', content_manager=cm) + self.assertEqual(m.get_content_maintype(), 'multipart') + self.assertEqual(m.get_content_subtype(), method) + if method == subtype or subtype == 'no_content': + self.assertEqual(len(m.get_payload()), 1) + for name, value in msg_headers: + self.assertEqual(m[name], value) + part = m.get_payload()[0] + else: + self.assertEqual(len(m.get_payload()), 2) + self._check_make_multipart(m, msg_headers, payload) + part = m.get_payload()[1] + self.assertEqual(part.get_content_type(), 'text/plain') + self.assertEqual(part.get_payload(), 'test') + if method=='mixed': + self.assertEqual(part['Content-Disposition'], 'attachment') + elif method=='related': + self.assertEqual(part['Content-Disposition'], 'inline') + else: + # Otherwise we don't guess. + self.assertIsNone(part['Content-Disposition']) + + class _TestSetRaisingContentManager: + def set_content(self, msg, content, *args, **kw): + raise Exception('test') + + def test_default_content_manager_for_add_comes_from_policy(self): + cm = self._TestSetRaisingContentManager() + m = self.message(policy=self.policy.clone(content_manager=cm)) + for method in ('add_related', 'add_alternative', 'add_attachment'): + with self.assertRaises(Exception) as ar: + getattr(m, method)('') + self.assertEqual(str(ar.exception), 'test') + + def message_as_clear(self, body_parts, attachments, parts, msg): + m = self._str_msg(msg) + m.clear() + self.assertEqual(len(m), 0) + self.assertEqual(list(m.items()), []) + self.assertIsNone(m.get_payload()) + self.assertEqual(list(m.iter_parts()), []) + + def message_as_clear_content(self, body_parts, attachments, parts, msg): + m = self._str_msg(msg) + expected_headers = [h for h in m.keys() + if not h.lower().startswith('content-')] + m.clear_content() + self.assertEqual(list(m.keys()), expected_headers) + self.assertIsNone(m.get_payload()) + self.assertEqual(list(m.iter_parts()), []) + + def test_is_attachment(self): + m = self._make_message() + self.assertFalse(m.is_attachment) + m['Content-Disposition'] = 'inline' + self.assertFalse(m.is_attachment) + m.replace_header('Content-Disposition', 'attachment') + self.assertTrue(m.is_attachment) + m.replace_header('Content-Disposition', 'AtTachMent') + self.assertTrue(m.is_attachment) + + + +class TestEmailMessage(TestEmailMessageBase, TestEmailBase): + message = EmailMessage + + def test_set_content_adds_MIME_Version(self): + m = self._str_msg('') + cm = self._TestContentManager() + self.assertNotIn('MIME-Version', m) + m.set_content(content_manager=cm) + self.assertEqual(m['MIME-Version'], '1.0') + + class _MIME_Version_adding_CM: + def set_content(self, msg, *args, **kw): + msg['MIME-Version'] = '1.0' + + def test_set_content_does_not_duplicate_MIME_Version(self): + m = self._str_msg('') + cm = self._MIME_Version_adding_CM() + self.assertNotIn('MIME-Version', m) + m.set_content(content_manager=cm) + self.assertEqual(m['MIME-Version'], '1.0') + + +class TestMIMEPart(TestEmailMessageBase, TestEmailBase): + # Doing the full test run here may seem a bit redundant, since the two + # classes are almost identical. But what if they drift apart? So we do + # the full tests so that any future drift doesn't introduce bugs. + message = MIMEPart + + def test_set_content_does_not_add_MIME_Version(self): + m = self._str_msg('') + cm = self._TestContentManager() + self.assertNotIn('MIME-Version', m) + m.set_content(content_manager=cm) + self.assertNotIn('MIME-Version', m) + if __name__ == '__main__': unittest.main() diff --git a/Lib/test/test_email/test_policy.py b/Lib/test/test_email/test_policy.py index 983bd49..06ad5f2 100644 --- a/Lib/test/test_email/test_policy.py +++ b/Lib/test/test_email/test_policy.py @@ -30,6 +30,7 @@ class PolicyAPITests(unittest.TestCase): 'raise_on_defect': False, 'header_factory': email.policy.EmailPolicy.header_factory, 'refold_source': 'long', + 'content_manager': email.policy.EmailPolicy.content_manager, }) # For each policy under test, we give here what we expect the defaults to diff --git a/Lib/test/test_email/torture_test.py b/Lib/test/test_email/torture_test.py index 544b1bb..19cf64f 100644 --- a/Lib/test/test_email/torture_test.py +++ b/Lib/test/test_email/torture_test.py @@ -27,7 +27,7 @@ def openfile(filename): # Prevent this test from running in the Python distro try: openfile('crispin-torture.txt') -except IOError: +except OSError: raise TestSkipped |