summaryrefslogtreecommitdiffstats
path: root/Lib/test/test_email
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/test/test_email')
-rw-r--r--Lib/test/test_email/__init__.py26
-rw-r--r--Lib/test/test_email/test_contentmanager.py796
-rw-r--r--Lib/test/test_email/test_email.py42
-rw-r--r--Lib/test/test_email/test_headerregistry.py14
-rw-r--r--Lib/test/test_email/test_message.py758
-rw-r--r--Lib/test/test_email/test_policy.py1
-rw-r--r--Lib/test/test_email/torture_test.py2
7 files changed, 1630 insertions, 9 deletions
diff --git a/Lib/test/test_email/__init__.py b/Lib/test/test_email/__init__.py
index f206ace..d8896ee 100644
--- a/Lib/test/test_email/__init__.py
+++ b/Lib/test/test_email/__init__.py
@@ -2,6 +2,7 @@ import os
import sys
import unittest
import test.support
+import collections
import email
from email.message import Message
from email._policybase import compat32
@@ -42,6 +43,8 @@ class TestEmailBase(unittest.TestCase):
# here we make minimal changes in the test_email tests compared to their
# pre-3.3 state.
policy = compat32
+ # Likewise, the default message object is Message.
+ message = Message
def __init__(self, *args, **kw):
super().__init__(*args, **kw)
@@ -54,11 +57,23 @@ class TestEmailBase(unittest.TestCase):
with openfile(filename) as fp:
return email.message_from_file(fp, policy=self.policy)
- def _str_msg(self, string, message=Message, policy=None):
+ def _str_msg(self, string, message=None, policy=None):
if policy is None:
policy = self.policy
+ if message is None:
+ message = self.message
return email.message_from_string(string, message, policy=policy)
+ def _bytes_msg(self, bytestring, message=None, policy=None):
+ if policy is None:
+ policy = self.policy
+ if message is None:
+ message = self.message
+ return email.message_from_bytes(bytestring, message, policy=policy)
+
+ def _make_message(self):
+ return self.message(policy=self.policy)
+
def _bytes_repr(self, b):
return [repr(x) for x in b.splitlines(keepends=True)]
@@ -123,6 +138,7 @@ def parameterize(cls):
"""
paramdicts = {}
+ testers = collections.defaultdict(list)
for name, attr in cls.__dict__.items():
if name.endswith('_params'):
if not hasattr(attr, 'keys'):
@@ -134,7 +150,15 @@ def parameterize(cls):
d[n] = x
attr = d
paramdicts[name[:-7] + '_as_'] = attr
+ if '_as_' in name:
+ testers[name.split('_as_')[0] + '_as_'].append(name)
testfuncs = {}
+ for name in paramdicts:
+ if name not in testers:
+ raise ValueError("No tester found for {}".format(name))
+ for name in testers:
+ if name not in paramdicts:
+ raise ValueError("No params found for {}".format(name))
for name, attr in cls.__dict__.items():
for paramsname, paramsdict in paramdicts.items():
if name.startswith(paramsname):
diff --git a/Lib/test/test_email/test_contentmanager.py b/Lib/test/test_email/test_contentmanager.py
new file mode 100644
index 0000000..cdb04e4
--- /dev/null
+++ b/Lib/test/test_email/test_contentmanager.py
@@ -0,0 +1,796 @@
+import unittest
+from test.test_email import TestEmailBase, parameterize
+import textwrap
+from email import policy
+from email.message import EmailMessage
+from email.contentmanager import ContentManager, raw_data_manager
+
+
+@parameterize
+class TestContentManager(TestEmailBase):
+
+ policy = policy.default
+ message = EmailMessage
+
+ get_key_params = {
+ 'full_type': (1, 'text/plain',),
+ 'maintype_only': (2, 'text',),
+ 'null_key': (3, '',),
+ }
+
+ def get_key_as_get_content_key(self, order, key):
+ def foo_getter(msg, foo=None):
+ bar = msg['X-Bar-Header']
+ return foo, bar
+ cm = ContentManager()
+ cm.add_get_handler(key, foo_getter)
+ m = self._make_message()
+ m['Content-Type'] = 'text/plain'
+ m['X-Bar-Header'] = 'foo'
+ self.assertEqual(cm.get_content(m, foo='bar'), ('bar', 'foo'))
+
+ def get_key_as_get_content_key_order(self, order, key):
+ def bar_getter(msg):
+ return msg['X-Bar-Header']
+ def foo_getter(msg):
+ return msg['X-Foo-Header']
+ cm = ContentManager()
+ cm.add_get_handler(key, foo_getter)
+ for precedence, key in self.get_key_params.values():
+ if precedence > order:
+ cm.add_get_handler(key, bar_getter)
+ m = self._make_message()
+ m['Content-Type'] = 'text/plain'
+ m['X-Bar-Header'] = 'bar'
+ m['X-Foo-Header'] = 'foo'
+ self.assertEqual(cm.get_content(m), ('foo'))
+
+ def test_get_content_raises_if_unknown_mimetype_and_no_default(self):
+ cm = ContentManager()
+ m = self._make_message()
+ m['Content-Type'] = 'text/plain'
+ with self.assertRaisesRegex(KeyError, 'text/plain'):
+ cm.get_content(m)
+
+ class BaseThing(str):
+ pass
+ baseobject_full_path = __name__ + '.' + 'TestContentManager.BaseThing'
+ class Thing(BaseThing):
+ pass
+ testobject_full_path = __name__ + '.' + 'TestContentManager.Thing'
+
+ set_key_params = {
+ 'type': (0, Thing,),
+ 'full_path': (1, testobject_full_path,),
+ 'qualname': (2, 'TestContentManager.Thing',),
+ 'name': (3, 'Thing',),
+ 'base_type': (4, BaseThing,),
+ 'base_full_path': (5, baseobject_full_path,),
+ 'base_qualname': (6, 'TestContentManager.BaseThing',),
+ 'base_name': (7, 'BaseThing',),
+ 'str_type': (8, str,),
+ 'str_full_path': (9, 'builtins.str',),
+ 'str_name': (10, 'str',), # str name and qualname are the same
+ 'null_key': (11, None,),
+ }
+
+ def set_key_as_set_content_key(self, order, key):
+ def foo_setter(msg, obj, foo=None):
+ msg['X-Foo-Header'] = foo
+ msg.set_payload(obj)
+ cm = ContentManager()
+ cm.add_set_handler(key, foo_setter)
+ m = self._make_message()
+ msg_obj = self.Thing()
+ cm.set_content(m, msg_obj, foo='bar')
+ self.assertEqual(m['X-Foo-Header'], 'bar')
+ self.assertEqual(m.get_payload(), msg_obj)
+
+ def set_key_as_set_content_key_order(self, order, key):
+ def foo_setter(msg, obj):
+ msg['X-FooBar-Header'] = 'foo'
+ msg.set_payload(obj)
+ def bar_setter(msg, obj):
+ msg['X-FooBar-Header'] = 'bar'
+ cm = ContentManager()
+ cm.add_set_handler(key, foo_setter)
+ for precedence, key in self.get_key_params.values():
+ if precedence > order:
+ cm.add_set_handler(key, bar_setter)
+ m = self._make_message()
+ msg_obj = self.Thing()
+ cm.set_content(m, msg_obj)
+ self.assertEqual(m['X-FooBar-Header'], 'foo')
+ self.assertEqual(m.get_payload(), msg_obj)
+
+ def test_set_content_raises_if_unknown_type_and_no_default(self):
+ cm = ContentManager()
+ m = self._make_message()
+ msg_obj = self.Thing()
+ with self.assertRaisesRegex(KeyError, self.testobject_full_path):
+ cm.set_content(m, msg_obj)
+
+ def test_set_content_raises_if_called_on_multipart(self):
+ cm = ContentManager()
+ m = self._make_message()
+ m['Content-Type'] = 'multipart/foo'
+ with self.assertRaises(TypeError):
+ cm.set_content(m, 'test')
+
+ def test_set_content_calls_clear_content(self):
+ m = self._make_message()
+ m['Content-Foo'] = 'bar'
+ m['Content-Type'] = 'text/html'
+ m['To'] = 'test'
+ m.set_payload('abc')
+ cm = ContentManager()
+ cm.add_set_handler(str, lambda *args, **kw: None)
+ m.set_content('xyz', content_manager=cm)
+ self.assertIsNone(m['Content-Foo'])
+ self.assertIsNone(m['Content-Type'])
+ self.assertEqual(m['To'], 'test')
+ self.assertIsNone(m.get_payload())
+
+
+@parameterize
+class TestRawDataManager(TestEmailBase):
+ # Note: these tests are dependent on the order in which headers are added
+ # to the message objects by the code. There's no defined ordering in
+ # RFC5322/MIME, so this makes the tests more fragile than the standards
+ # require. However, if the header order changes it is best to understand
+ # *why*, and make sure it isn't a subtle bug in whatever change was
+ # applied.
+
+ policy = policy.default.clone(max_line_length=60,
+ content_manager=raw_data_manager)
+ message = EmailMessage
+
+ def test_get_text_plain(self):
+ m = self._str_msg(textwrap.dedent("""\
+ Content-Type: text/plain
+
+ Basic text.
+ """))
+ self.assertEqual(raw_data_manager.get_content(m), "Basic text.\n")
+
+ def test_get_text_html(self):
+ m = self._str_msg(textwrap.dedent("""\
+ Content-Type: text/html
+
+ <p>Basic text.</p>
+ """))
+ self.assertEqual(raw_data_manager.get_content(m),
+ "<p>Basic text.</p>\n")
+
+ def test_get_text_plain_latin1(self):
+ m = self._bytes_msg(textwrap.dedent("""\
+ Content-Type: text/plain; charset=latin1
+
+ Basìc tëxt.
+ """).encode('latin1'))
+ self.assertEqual(raw_data_manager.get_content(m), "Basìc tëxt.\n")
+
+ def test_get_text_plain_latin1_quoted_printable(self):
+ m = self._str_msg(textwrap.dedent("""\
+ Content-Type: text/plain; charset="latin-1"
+ Content-Transfer-Encoding: quoted-printable
+
+ Bas=ECc t=EBxt.
+ """))
+ self.assertEqual(raw_data_manager.get_content(m), "Basìc tëxt.\n")
+
+ def test_get_text_plain_utf8_base64(self):
+ m = self._str_msg(textwrap.dedent("""\
+ Content-Type: text/plain; charset="utf8"
+ Content-Transfer-Encoding: base64
+
+ QmFzw6xjIHTDq3h0Lgo=
+ """))
+ self.assertEqual(raw_data_manager.get_content(m), "Basìc tëxt.\n")
+
+ def test_get_text_plain_bad_utf8_quoted_printable(self):
+ m = self._str_msg(textwrap.dedent("""\
+ Content-Type: text/plain; charset="utf8"
+ Content-Transfer-Encoding: quoted-printable
+
+ Bas=c3=acc t=c3=abxt=fd.
+ """))
+ self.assertEqual(raw_data_manager.get_content(m), "Basìc tëxt�.\n")
+
+ def test_get_text_plain_bad_utf8_quoted_printable_ignore_errors(self):
+ m = self._str_msg(textwrap.dedent("""\
+ Content-Type: text/plain; charset="utf8"
+ Content-Transfer-Encoding: quoted-printable
+
+ Bas=c3=acc t=c3=abxt=fd.
+ """))
+ self.assertEqual(raw_data_manager.get_content(m, errors='ignore'),
+ "Basìc tëxt.\n")
+
+ def test_get_text_plain_utf8_base64_recoverable_bad_CTE_data(self):
+ m = self._str_msg(textwrap.dedent("""\
+ Content-Type: text/plain; charset="utf8"
+ Content-Transfer-Encoding: base64
+
+ QmFzw6xjIHTDq3h0Lgo\xFF=
+ """))
+ self.assertEqual(raw_data_manager.get_content(m, errors='ignore'),
+ "Basìc tëxt.\n")
+
+ def test_get_text_invalid_keyword(self):
+ m = self._str_msg(textwrap.dedent("""\
+ Content-Type: text/plain
+
+ Basic text.
+ """))
+ with self.assertRaises(TypeError):
+ raw_data_manager.get_content(m, foo='ignore')
+
+ def test_get_non_text(self):
+ template = textwrap.dedent("""\
+ Content-Type: {}
+ Content-Transfer-Encoding: base64
+
+ Ym9ndXMgZGF0YQ==
+ """)
+ for maintype in 'audio image video application'.split():
+ with self.subTest(maintype=maintype):
+ m = self._str_msg(template.format(maintype+'/foo'))
+ self.assertEqual(raw_data_manager.get_content(m), b"bogus data")
+
+ def test_get_non_text_invalid_keyword(self):
+ m = self._str_msg(textwrap.dedent("""\
+ Content-Type: image/jpg
+ Content-Transfer-Encoding: base64
+
+ Ym9ndXMgZGF0YQ==
+ """))
+ with self.assertRaises(TypeError):
+ raw_data_manager.get_content(m, errors='ignore')
+
+ def test_get_raises_on_multipart(self):
+ m = self._str_msg(textwrap.dedent("""\
+ Content-Type: multipart/mixed; boundary="==="
+
+ --===
+ --===--
+ """))
+ with self.assertRaises(KeyError):
+ raw_data_manager.get_content(m)
+
+ def test_get_message_rfc822_and_external_body(self):
+ template = textwrap.dedent("""\
+ Content-Type: message/{}
+
+ To: foo@example.com
+ From: bar@example.com
+ Subject: example
+
+ an example message
+ """)
+ for subtype in 'rfc822 external-body'.split():
+ with self.subTest(subtype=subtype):
+ m = self._str_msg(template.format(subtype))
+ sub_msg = raw_data_manager.get_content(m)
+ self.assertIsInstance(sub_msg, self.message)
+ self.assertEqual(raw_data_manager.get_content(sub_msg),
+ "an example message\n")
+ self.assertEqual(sub_msg['to'], 'foo@example.com')
+ self.assertEqual(sub_msg['from'].addresses[0].username, 'bar')
+
+ def test_get_message_non_rfc822_or_external_body_yields_bytes(self):
+ m = self._str_msg(textwrap.dedent("""\
+ Content-Type: message/partial
+
+ To: foo@example.com
+ From: bar@example.com
+ Subject: example
+
+ The real body is in another message.
+ """))
+ self.assertEqual(raw_data_manager.get_content(m)[:10], b'To: foo@ex')
+
+ def test_set_text_plain(self):
+ m = self._make_message()
+ content = "Simple message.\n"
+ raw_data_manager.set_content(m, content)
+ self.assertEqual(str(m), textwrap.dedent("""\
+ Content-Type: text/plain; charset="utf-8"
+ Content-Transfer-Encoding: 7bit
+
+ Simple message.
+ """))
+ self.assertEqual(m.get_payload(decode=True).decode('utf-8'), content)
+ self.assertEqual(m.get_content(), content)
+
+ def test_set_text_html(self):
+ m = self._make_message()
+ content = "<p>Simple message.</p>\n"
+ raw_data_manager.set_content(m, content, subtype='html')
+ self.assertEqual(str(m), textwrap.dedent("""\
+ Content-Type: text/html; charset="utf-8"
+ Content-Transfer-Encoding: 7bit
+
+ <p>Simple message.</p>
+ """))
+ self.assertEqual(m.get_payload(decode=True).decode('utf-8'), content)
+ self.assertEqual(m.get_content(), content)
+
+ def test_set_text_charset_latin_1(self):
+ m = self._make_message()
+ content = "Simple message.\n"
+ raw_data_manager.set_content(m, content, charset='latin-1')
+ self.assertEqual(str(m), textwrap.dedent("""\
+ Content-Type: text/plain; charset="iso-8859-1"
+ Content-Transfer-Encoding: 7bit
+
+ Simple message.
+ """))
+ self.assertEqual(m.get_payload(decode=True).decode('utf-8'), content)
+ self.assertEqual(m.get_content(), content)
+
+ def test_set_text_short_line_minimal_non_ascii_heuristics(self):
+ m = self._make_message()
+ content = "et là il est monté sur moi et il commence à m'éto.\n"
+ raw_data_manager.set_content(m, content)
+ self.assertEqual(bytes(m), textwrap.dedent("""\
+ Content-Type: text/plain; charset="utf-8"
+ Content-Transfer-Encoding: 8bit
+
+ et là il est monté sur moi et il commence à m'éto.
+ """).encode('utf-8'))
+ self.assertEqual(m.get_payload(decode=True).decode('utf-8'), content)
+ self.assertEqual(m.get_content(), content)
+
+ def test_set_text_long_line_minimal_non_ascii_heuristics(self):
+ m = self._make_message()
+ content = ("j'ai un problème de python. il est sorti de son"
+ " vivarium. et là il est monté sur moi et il commence"
+ " à m'éto.\n")
+ raw_data_manager.set_content(m, content)
+ self.assertEqual(bytes(m), textwrap.dedent("""\
+ Content-Type: text/plain; charset="utf-8"
+ Content-Transfer-Encoding: quoted-printable
+
+ j'ai un probl=C3=A8me de python. il est sorti de son vivari=
+ um. et l=C3=A0 il est mont=C3=A9 sur moi et il commence =
+ =C3=A0 m'=C3=A9to.
+ """).encode('utf-8'))
+ self.assertEqual(m.get_payload(decode=True).decode('utf-8'), content)
+ self.assertEqual(m.get_content(), content)
+
+ def test_set_text_11_lines_long_line_minimal_non_ascii_heuristics(self):
+ m = self._make_message()
+ content = '\n'*10 + (
+ "j'ai un problème de python. il est sorti de son"
+ " vivarium. et là il est monté sur moi et il commence"
+ " à m'éto.\n")
+ raw_data_manager.set_content(m, content)
+ self.assertEqual(bytes(m), textwrap.dedent("""\
+ Content-Type: text/plain; charset="utf-8"
+ Content-Transfer-Encoding: quoted-printable
+ """ + '\n'*10 + """
+ j'ai un probl=C3=A8me de python. il est sorti de son vivari=
+ um. et l=C3=A0 il est mont=C3=A9 sur moi et il commence =
+ =C3=A0 m'=C3=A9to.
+ """).encode('utf-8'))
+ self.assertEqual(m.get_payload(decode=True).decode('utf-8'), content)
+ self.assertEqual(m.get_content(), content)
+
+ def test_set_text_maximal_non_ascii_heuristics(self):
+ m = self._make_message()
+ content = "áàäéèęöő.\n"
+ raw_data_manager.set_content(m, content)
+ self.assertEqual(bytes(m), textwrap.dedent("""\
+ Content-Type: text/plain; charset="utf-8"
+ Content-Transfer-Encoding: 8bit
+
+ áàäéèęöő.
+ """).encode('utf-8'))
+ self.assertEqual(m.get_payload(decode=True).decode('utf-8'), content)
+ self.assertEqual(m.get_content(), content)
+
+ def test_set_text_11_lines_maximal_non_ascii_heuristics(self):
+ m = self._make_message()
+ content = '\n'*10 + "áàäéèęöő.\n"
+ raw_data_manager.set_content(m, content)
+ self.assertEqual(bytes(m), textwrap.dedent("""\
+ Content-Type: text/plain; charset="utf-8"
+ Content-Transfer-Encoding: 8bit
+ """ + '\n'*10 + """
+ áàäéèęöő.
+ """).encode('utf-8'))
+ self.assertEqual(m.get_payload(decode=True).decode('utf-8'), content)
+ self.assertEqual(m.get_content(), content)
+
+ def test_set_text_long_line_maximal_non_ascii_heuristics(self):
+ m = self._make_message()
+ content = ("áàäéèęöőáàäéèęöőáàäéèęöőáàäéèęöő"
+ "áàäéèęöőáàäéèęöőáàäéèęöőáàäéèęöő"
+ "áàäéèęöőáàäéèęöőáàäéèęöőáàäéèęöő.\n")
+ raw_data_manager.set_content(m, content)
+ self.assertEqual(bytes(m), textwrap.dedent("""\
+ Content-Type: text/plain; charset="utf-8"
+ Content-Transfer-Encoding: base64
+
+ w6HDoMOkw6nDqMSZw7bFkcOhw6DDpMOpw6jEmcO2xZHDocOgw6TDqcOoxJnD
+ tsWRw6HDoMOkw6nDqMSZw7bFkcOhw6DDpMOpw6jEmcO2xZHDocOgw6TDqcOo
+ xJnDtsWRw6HDoMOkw6nDqMSZw7bFkcOhw6DDpMOpw6jEmcO2xZHDocOgw6TD
+ qcOoxJnDtsWRw6HDoMOkw6nDqMSZw7bFkcOhw6DDpMOpw6jEmcO2xZHDocOg
+ w6TDqcOoxJnDtsWRLgo=
+ """).encode('utf-8'))
+ self.assertEqual(m.get_payload(decode=True).decode('utf-8'), content)
+ self.assertEqual(m.get_content(), content)
+
+ def test_set_text_11_lines_long_line_maximal_non_ascii_heuristics(self):
+ # Yes, it chooses "wrong" here. It's a heuristic. So this result
+ # could change if we come up with a better heuristic.
+ m = self._make_message()
+ content = ('\n'*10 +
+ "áàäéèęöőáàäéèęöőáàäéèęöőáàäéèęöő"
+ "áàäéèęöőáàäéèęöőáàäéèęöőáàäéèęöő"
+ "áàäéèęöőáàäéèęöőáàäéèęöőáàäéèęöő.\n")
+ raw_data_manager.set_content(m, "\n"*10 +
+ "áàäéèęöőáàäéèęöőáàäéèęöőáàäéèęöő"
+ "áàäéèęöőáàäéèęöőáàäéèęöőáàäéèęöő"
+ "áàäéèęöőáàäéèęöőáàäéèęöőáàäéèęöő.\n")
+ self.assertEqual(bytes(m), textwrap.dedent("""\
+ Content-Type: text/plain; charset="utf-8"
+ Content-Transfer-Encoding: quoted-printable
+ """ + '\n'*10 + """
+ =C3=A1=C3=A0=C3=A4=C3=A9=C3=A8=C4=99=C3=B6=C5=91=C3=A1=C3=
+ =A0=C3=A4=C3=A9=C3=A8=C4=99=C3=B6=C5=91=C3=A1=C3=A0=C3=A4=
+ =C3=A9=C3=A8=C4=99=C3=B6=C5=91=C3=A1=C3=A0=C3=A4=C3=A9=C3=
+ =A8=C4=99=C3=B6=C5=91=C3=A1=C3=A0=C3=A4=C3=A9=C3=A8=C4=99=
+ =C3=B6=C5=91=C3=A1=C3=A0=C3=A4=C3=A9=C3=A8=C4=99=C3=B6=C5=
+ =91=C3=A1=C3=A0=C3=A4=C3=A9=C3=A8=C4=99=C3=B6=C5=91=C3=A1=
+ =C3=A0=C3=A4=C3=A9=C3=A8=C4=99=C3=B6=C5=91=C3=A1=C3=A0=C3=
+ =A4=C3=A9=C3=A8=C4=99=C3=B6=C5=91=C3=A1=C3=A0=C3=A4=C3=A9=
+ =C3=A8=C4=99=C3=B6=C5=91=C3=A1=C3=A0=C3=A4=C3=A9=C3=A8=C4=
+ =99=C3=B6=C5=91=C3=A1=C3=A0=C3=A4=C3=A9=C3=A8=C4=99=C3=B6=
+ =C5=91.
+ """).encode('utf-8'))
+ self.assertEqual(m.get_payload(decode=True).decode('utf-8'), content)
+ self.assertEqual(m.get_content(), content)
+
+ def test_set_text_non_ascii_with_cte_7bit_raises(self):
+ m = self._make_message()
+ with self.assertRaises(UnicodeError):
+ raw_data_manager.set_content(m,"áàäéèęöő.\n", cte='7bit')
+
+ def test_set_text_non_ascii_with_charset_ascii_raises(self):
+ m = self._make_message()
+ with self.assertRaises(UnicodeError):
+ raw_data_manager.set_content(m,"áàäéèęöő.\n", charset='ascii')
+
+ def test_set_text_non_ascii_with_cte_7bit_and_charset_ascii_raises(self):
+ m = self._make_message()
+ with self.assertRaises(UnicodeError):
+ raw_data_manager.set_content(m,"áàäéèęöő.\n", cte='7bit', charset='ascii')
+
+ def test_set_message(self):
+ m = self._make_message()
+ m['Subject'] = "Forwarded message"
+ content = self._make_message()
+ content['To'] = 'python@vivarium.org'
+ content['From'] = 'police@monty.org'
+ content['Subject'] = "get back in your box"
+ content.set_content("Or face the comfy chair.")
+ raw_data_manager.set_content(m, content)
+ self.assertEqual(str(m), textwrap.dedent("""\
+ Subject: Forwarded message
+ Content-Type: message/rfc822
+ Content-Transfer-Encoding: 8bit
+
+ To: python@vivarium.org
+ From: police@monty.org
+ Subject: get back in your box
+ Content-Type: text/plain; charset="utf-8"
+ Content-Transfer-Encoding: 7bit
+ MIME-Version: 1.0
+
+ Or face the comfy chair.
+ """))
+ payload = m.get_payload(0)
+ self.assertIsInstance(payload, self.message)
+ self.assertEqual(str(payload), str(content))
+ self.assertIsInstance(m.get_content(), self.message)
+ self.assertEqual(str(m.get_content()), str(content))
+
+ def test_set_message_with_non_ascii_and_coercion_to_7bit(self):
+ m = self._make_message()
+ m['Subject'] = "Escape report"
+ content = self._make_message()
+ content['To'] = 'police@monty.org'
+ content['From'] = 'victim@monty.org'
+ content['Subject'] = "Help"
+ content.set_content("j'ai un problème de python. il est sorti de son"
+ " vivarium.")
+ raw_data_manager.set_content(m, content)
+ self.assertEqual(bytes(m), textwrap.dedent("""\
+ Subject: Escape report
+ Content-Type: message/rfc822
+ Content-Transfer-Encoding: 8bit
+
+ To: police@monty.org
+ From: victim@monty.org
+ Subject: Help
+ Content-Type: text/plain; charset="utf-8"
+ Content-Transfer-Encoding: 8bit
+ MIME-Version: 1.0
+
+ j'ai un problème de python. il est sorti de son vivarium.
+ """).encode('utf-8'))
+ # The choice of base64 for the body encoding is because generator
+ # doesn't bother with heuristics and uses it unconditionally for utf-8
+ # text.
+ # XXX: the first cte should be 7bit, too...that's a generator bug.
+ # XXX: the line length in the body also looks like a generator bug.
+ self.assertEqual(m.as_string(maxheaderlen=self.policy.max_line_length),
+ textwrap.dedent("""\
+ Subject: Escape report
+ Content-Type: message/rfc822
+ Content-Transfer-Encoding: 8bit
+
+ To: police@monty.org
+ From: victim@monty.org
+ Subject: Help
+ Content-Type: text/plain; charset="utf-8"
+ Content-Transfer-Encoding: base64
+ MIME-Version: 1.0
+
+ aidhaSB1biBwcm9ibMOobWUgZGUgcHl0aG9uLiBpbCBlc3Qgc29ydGkgZGUgc29uIHZpdmFyaXVt
+ Lgo=
+ """))
+ self.assertIsInstance(m.get_content(), self.message)
+ self.assertEqual(str(m.get_content()), str(content))
+
+ def test_set_message_invalid_cte_raises(self):
+ m = self._make_message()
+ content = self._make_message()
+ for cte in 'quoted-printable base64'.split():
+ for subtype in 'rfc822 external-body'.split():
+ with self.subTest(cte=cte, subtype=subtype):
+ with self.assertRaises(ValueError) as ar:
+ m.set_content(content, subtype, cte=cte)
+ exc = str(ar.exception)
+ self.assertIn(cte, exc)
+ self.assertIn(subtype, exc)
+ subtype = 'external-body'
+ for cte in '8bit binary'.split():
+ with self.subTest(cte=cte, subtype=subtype):
+ with self.assertRaises(ValueError) as ar:
+ m.set_content(content, subtype, cte=cte)
+ exc = str(ar.exception)
+ self.assertIn(cte, exc)
+ self.assertIn(subtype, exc)
+
+ def test_set_image_jpg(self):
+ for content in (b"bogus content",
+ bytearray(b"bogus content"),
+ memoryview(b"bogus content")):
+ with self.subTest(content=content):
+ m = self._make_message()
+ raw_data_manager.set_content(m, content, 'image', 'jpeg')
+ self.assertEqual(str(m), textwrap.dedent("""\
+ Content-Type: image/jpeg
+ Content-Transfer-Encoding: base64
+
+ Ym9ndXMgY29udGVudA==
+ """))
+ self.assertEqual(m.get_payload(decode=True), content)
+ self.assertEqual(m.get_content(), content)
+
+ def test_set_audio_aif_with_quoted_printable_cte(self):
+ # Why you would use qp, I don't know, but it is technically supported.
+ # XXX: the incorrect line length is because binascii.b2a_qp doesn't
+ # support a line length parameter, but we must use it to get newline
+ # encoding.
+ # XXX: what about that lack of tailing newline? Do we actually handle
+ # that correctly in all cases? That is, if the *source* has an
+ # unencoded newline, do we add an extra newline to the returned payload
+ # or not? And can that actually be disambiguated based on the RFC?
+ m = self._make_message()
+ content = b'b\xFFgus\tcon\nt\rent ' + b'z'*100
+ m.set_content(content, 'audio', 'aif', cte='quoted-printable')
+ self.assertEqual(bytes(m), textwrap.dedent("""\
+ Content-Type: audio/aif
+ Content-Transfer-Encoding: quoted-printable
+ MIME-Version: 1.0
+
+ b=FFgus=09con=0At=0Dent=20zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz=
+ zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz""").encode('latin-1'))
+ self.assertEqual(m.get_payload(decode=True), content)
+ self.assertEqual(m.get_content(), content)
+
+ def test_set_video_mpeg_with_binary_cte(self):
+ m = self._make_message()
+ content = b'b\xFFgus\tcon\nt\rent ' + b'z'*100
+ m.set_content(content, 'video', 'mpeg', cte='binary')
+ self.assertEqual(bytes(m), textwrap.dedent("""\
+ Content-Type: video/mpeg
+ Content-Transfer-Encoding: binary
+ MIME-Version: 1.0
+
+ """).encode('ascii') +
+ # XXX: the second \n ought to be a \r, but generator gets it wrong.
+ # THIS MEANS WE DON'T ACTUALLY SUPPORT THE 'binary' CTE.
+ b'b\xFFgus\tcon\nt\nent zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz' +
+ b'zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz')
+ self.assertEqual(m.get_payload(decode=True), content)
+ self.assertEqual(m.get_content(), content)
+
+ def test_set_application_octet_stream_with_8bit_cte(self):
+ # In 8bit mode, univeral line end logic applies. It is up to the
+ # application to make sure the lines are short enough; we don't check.
+ m = self._make_message()
+ content = b'b\xFFgus\tcon\nt\rent\n' + b'z'*60 + b'\n'
+ m.set_content(content, 'application', 'octet-stream', cte='8bit')
+ self.assertEqual(bytes(m), textwrap.dedent("""\
+ Content-Type: application/octet-stream
+ Content-Transfer-Encoding: 8bit
+ MIME-Version: 1.0
+
+ """).encode('ascii') +
+ b'b\xFFgus\tcon\nt\nent\n' +
+ b'zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz\n')
+ self.assertEqual(m.get_payload(decode=True), content)
+ self.assertEqual(m.get_content(), content)
+
+ def test_set_headers_from_header_objects(self):
+ m = self._make_message()
+ content = "Simple message.\n"
+ header_factory = self.policy.header_factory
+ raw_data_manager.set_content(m, content, headers=(
+ header_factory("To", "foo@example.com"),
+ header_factory("From", "foo@example.com"),
+ header_factory("Subject", "I'm talking to myself.")))
+ self.assertEqual(str(m), textwrap.dedent("""\
+ Content-Type: text/plain; charset="utf-8"
+ To: foo@example.com
+ From: foo@example.com
+ Subject: I'm talking to myself.
+ Content-Transfer-Encoding: 7bit
+
+ Simple message.
+ """))
+
+ def test_set_headers_from_strings(self):
+ m = self._make_message()
+ content = "Simple message.\n"
+ raw_data_manager.set_content(m, content, headers=(
+ "X-Foo-Header: foo",
+ "X-Bar-Header: bar",))
+ self.assertEqual(str(m), textwrap.dedent("""\
+ Content-Type: text/plain; charset="utf-8"
+ X-Foo-Header: foo
+ X-Bar-Header: bar
+ Content-Transfer-Encoding: 7bit
+
+ Simple message.
+ """))
+
+ def test_set_headers_with_invalid_duplicate_string_header_raises(self):
+ m = self._make_message()
+ content = "Simple message.\n"
+ with self.assertRaisesRegex(ValueError, 'Content-Type'):
+ raw_data_manager.set_content(m, content, headers=(
+ "Content-Type: foo/bar",)
+ )
+
+ def test_set_headers_with_invalid_duplicate_header_header_raises(self):
+ m = self._make_message()
+ content = "Simple message.\n"
+ header_factory = self.policy.header_factory
+ with self.assertRaisesRegex(ValueError, 'Content-Type'):
+ raw_data_manager.set_content(m, content, headers=(
+ header_factory("Content-Type", " foo/bar"),)
+ )
+
+ def test_set_headers_with_defective_string_header_raises(self):
+ m = self._make_message()
+ content = "Simple message.\n"
+ with self.assertRaisesRegex(ValueError, 'a@fairly@@invalid@address'):
+ raw_data_manager.set_content(m, content, headers=(
+ 'To: a@fairly@@invalid@address',)
+ )
+ print(m['To'].defects)
+
+ def test_set_headers_with_defective_header_header_raises(self):
+ m = self._make_message()
+ content = "Simple message.\n"
+ header_factory = self.policy.header_factory
+ with self.assertRaisesRegex(ValueError, 'a@fairly@@invalid@address'):
+ raw_data_manager.set_content(m, content, headers=(
+ header_factory('To', 'a@fairly@@invalid@address'),)
+ )
+ print(m['To'].defects)
+
+ def test_set_disposition_inline(self):
+ m = self._make_message()
+ m.set_content('foo', disposition='inline')
+ self.assertEqual(m['Content-Disposition'], 'inline')
+
+ def test_set_disposition_attachment(self):
+ m = self._make_message()
+ m.set_content('foo', disposition='attachment')
+ self.assertEqual(m['Content-Disposition'], 'attachment')
+
+ def test_set_disposition_foo(self):
+ m = self._make_message()
+ m.set_content('foo', disposition='foo')
+ self.assertEqual(m['Content-Disposition'], 'foo')
+
+ # XXX: we should have a 'strict' policy mode (beyond raise_on_defect) that
+ # would cause 'foo' above to raise.
+
+ def test_set_filename(self):
+ m = self._make_message()
+ m.set_content('foo', filename='bar.txt')
+ self.assertEqual(m['Content-Disposition'],
+ 'attachment; filename="bar.txt"')
+
+ def test_set_filename_and_disposition_inline(self):
+ m = self._make_message()
+ m.set_content('foo', disposition='inline', filename='bar.txt')
+ self.assertEqual(m['Content-Disposition'], 'inline; filename="bar.txt"')
+
+ def test_set_non_ascii_filename(self):
+ m = self._make_message()
+ m.set_content('foo', filename='ábárî.txt')
+ self.assertEqual(bytes(m), textwrap.dedent("""\
+ Content-Type: text/plain; charset="utf-8"
+ Content-Transfer-Encoding: 7bit
+ Content-Disposition: attachment;
+ filename*=utf-8''%C3%A1b%C3%A1r%C3%AE.txt
+ MIME-Version: 1.0
+
+ foo
+ """).encode('ascii'))
+
+ content_object_params = {
+ 'text_plain': ('content', ()),
+ 'text_html': ('content', ('html',)),
+ 'application_octet_stream': (b'content',
+ ('application', 'octet_stream')),
+ 'image_jpeg': (b'content', ('image', 'jpeg')),
+ 'message_rfc822': (message(), ()),
+ 'message_external_body': (message(), ('external-body',)),
+ }
+
+ def content_object_as_header_receiver(self, obj, mimetype):
+ m = self._make_message()
+ m.set_content(obj, *mimetype, headers=(
+ 'To: foo@example.com',
+ 'From: bar@simple.net'))
+ self.assertEqual(m['to'], 'foo@example.com')
+ self.assertEqual(m['from'], 'bar@simple.net')
+
+ def content_object_as_disposition_inline_receiver(self, obj, mimetype):
+ m = self._make_message()
+ m.set_content(obj, *mimetype, disposition='inline')
+ self.assertEqual(m['Content-Disposition'], 'inline')
+
+ def content_object_as_non_ascii_filename_receiver(self, obj, mimetype):
+ m = self._make_message()
+ m.set_content(obj, *mimetype, disposition='inline', filename='bár.txt')
+ self.assertEqual(m['Content-Disposition'], 'inline; filename="bár.txt"')
+ self.assertEqual(m.get_filename(), "bár.txt")
+ self.assertEqual(m['Content-Disposition'].params['filename'], "bár.txt")
+
+ def content_object_as_cid_receiver(self, obj, mimetype):
+ m = self._make_message()
+ m.set_content(obj, *mimetype, cid='some_random_stuff')
+ self.assertEqual(m['Content-ID'], 'some_random_stuff')
+
+ def content_object_as_params_receiver(self, obj, mimetype):
+ m = self._make_message()
+ params = {'foo': 'bár', 'abc': 'xyz'}
+ m.set_content(obj, *mimetype, params=params)
+ if isinstance(obj, str):
+ params['charset'] = 'utf-8'
+ self.assertEqual(m['Content-Type'].params, params)
+
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/Lib/test/test_email/test_email.py b/Lib/test/test_email/test_email.py
index 73ec2a6..d1e234d 100644
--- a/Lib/test/test_email/test_email.py
+++ b/Lib/test/test_email/test_email.py
@@ -281,15 +281,42 @@ class TestMessageAPI(TestEmailBase):
self.assertIn('TO', msg)
def test_as_string(self):
- eq = self.ndiffAssertEqual
msg = self._msgobj('msg_01.txt')
with openfile('msg_01.txt') as fp:
text = fp.read()
- eq(text, str(msg))
+ self.assertEqual(text, str(msg))
fullrepr = msg.as_string(unixfrom=True)
lines = fullrepr.split('\n')
self.assertTrue(lines[0].startswith('From '))
- eq(text, NL.join(lines[1:]))
+ self.assertEqual(text, NL.join(lines[1:]))
+
+ def test_as_string_policy(self):
+ msg = self._msgobj('msg_01.txt')
+ newpolicy = msg.policy.clone(linesep='\r\n')
+ fullrepr = msg.as_string(policy=newpolicy)
+ s = StringIO()
+ g = Generator(s, policy=newpolicy)
+ g.flatten(msg)
+ self.assertEqual(fullrepr, s.getvalue())
+
+ def test_as_bytes(self):
+ msg = self._msgobj('msg_01.txt')
+ with openfile('msg_01.txt') as fp:
+ data = fp.read().encode('ascii')
+ self.assertEqual(data, bytes(msg))
+ fullrepr = msg.as_bytes(unixfrom=True)
+ lines = fullrepr.split(b'\n')
+ self.assertTrue(lines[0].startswith(b'From '))
+ self.assertEqual(data, b'\n'.join(lines[1:]))
+
+ def test_as_bytes_policy(self):
+ msg = self._msgobj('msg_01.txt')
+ newpolicy = msg.policy.clone(linesep='\r\n')
+ fullrepr = msg.as_bytes(policy=newpolicy)
+ s = BytesIO()
+ g = BytesGenerator(s,policy=newpolicy)
+ g.flatten(msg)
+ self.assertEqual(fullrepr, s.getvalue())
# test_headerregistry.TestContentTypeHeader.bad_params
def test_bad_param(self):
@@ -742,8 +769,15 @@ class TestEncoders(unittest.TestCase):
# whose output character set is 7bit gets a transfer-encoding
# of 7bit.
eq = self.assertEqual
- msg = MIMEText('文', _charset='euc-jp')
+ msg = MIMEText('文\n', _charset='euc-jp')
eq(msg['content-transfer-encoding'], '7bit')
+ eq(msg.as_string(), textwrap.dedent("""\
+ MIME-Version: 1.0
+ Content-Type: text/plain; charset="iso-2022-jp"
+ Content-Transfer-Encoding: 7bit
+
+ \x1b$BJ8\x1b(B
+ """))
def test_qp_encode_latin1(self):
msg = MIMEText('\xe1\xf6\n', 'text', 'ISO-8859-1')
diff --git a/Lib/test/test_email/test_headerregistry.py b/Lib/test/test_email/test_headerregistry.py
index f829f83..f2df372 100644
--- a/Lib/test/test_email/test_headerregistry.py
+++ b/Lib/test/test_email/test_headerregistry.py
@@ -661,7 +661,7 @@ class TestContentTypeHeader(TestHeaderBase):
'text/plain; name="ascii_is_the_default"'),
'rfc2231_bad_character_in_charset_parameter_value': (
- "text/plain; charset*=ascii''utf-8%E2%80%9D",
+ "text/plain; charset*=ascii''utf-8%F1%F2%F3",
'text/plain',
'text',
'plain',
@@ -669,6 +669,18 @@ class TestContentTypeHeader(TestHeaderBase):
[errors.UndecodableBytesDefect],
'text/plain; charset="utf-8\uFFFD\uFFFD\uFFFD"'),
+ 'rfc2231_utf_8_in_supposedly_ascii_charset_parameter_value': (
+ "text/plain; charset*=ascii''utf-8%E2%80%9D",
+ 'text/plain',
+ 'text',
+ 'plain',
+ {'charset': 'utf-8”'},
+ [errors.UndecodableBytesDefect],
+ 'text/plain; charset="utf-8”"',
+ ),
+ # XXX: if the above were *re*folded, it would get tagged as utf-8
+ # instead of ascii in the param, since it now contains non-ASCII.
+
'rfc2231_encoded_then_unencoded_segments': (
('application/x-foo;'
'\tname*0*="us-ascii\'en-us\'My";'
diff --git a/Lib/test/test_email/test_message.py b/Lib/test/test_email/test_message.py
index 8cc3f80..c761c62 100644
--- a/Lib/test/test_email/test_message.py
+++ b/Lib/test/test_email/test_message.py
@@ -1,6 +1,13 @@
import unittest
-from email import policy
-from test.test_email import TestEmailBase
+import textwrap
+from email import policy, message_from_string
+from email.message import EmailMessage, MIMEPart
+from test.test_email import TestEmailBase, parameterize
+
+
+# Helper.
+def first(iterable):
+ return next(filter(lambda x: x is not None, iterable), None)
class Test(TestEmailBase):
@@ -13,6 +20,753 @@ class Test(TestEmailBase):
with self.assertRaises(ValueError):
m['To'] = 'xyz@abc'
+ def test_rfc2043_auto_decoded_and_emailmessage_used(self):
+ m = message_from_string(textwrap.dedent("""\
+ Subject: Ayons asperges pour le =?utf-8?q?d=C3=A9jeuner?=
+ From: =?utf-8?q?Pep=C3=A9?= Le Pew <pepe@example.com>
+ To: "Penelope Pussycat" <"penelope@example.com">
+ MIME-Version: 1.0
+ Content-Type: text/plain; charset="utf-8"
+
+ sample text
+ """), policy=policy.default)
+ self.assertEqual(m['subject'], "Ayons asperges pour le déjeuner")
+ self.assertEqual(m['from'], "Pepé Le Pew <pepe@example.com>")
+ self.assertIsInstance(m, EmailMessage)
+
+
+@parameterize
+class TestEmailMessageBase:
+
+ policy = policy.default
+
+ # The first argument is a triple (related, html, plain) of indices into the
+ # list returned by 'walk' called on a Message constructed from the third.
+ # The indices indicate which part should match the corresponding part-type
+ # when passed to get_body (ie: the "first" part of that type in the
+ # message). The second argument is a list of indices into the 'walk' list
+ # of the attachments that should be returned by a call to
+ # 'iter_attachments'. The third argument is a list of indices into 'walk'
+ # that should be returned by a call to 'iter_parts'. Note that the first
+ # item returned by 'walk' is the Message itself.
+
+ message_params = {
+
+ 'empty_message': (
+ (None, None, 0),
+ (),
+ (),
+ ""),
+
+ 'non_mime_plain': (
+ (None, None, 0),
+ (),
+ (),
+ textwrap.dedent("""\
+ To: foo@example.com
+
+ simple text body
+ """)),
+
+ 'mime_non_text': (
+ (None, None, None),
+ (),
+ (),
+ textwrap.dedent("""\
+ To: foo@example.com
+ MIME-Version: 1.0
+ Content-Type: image/jpg
+
+ bogus body.
+ """)),
+
+ 'plain_html_alternative': (
+ (None, 2, 1),
+ (),
+ (1, 2),
+ textwrap.dedent("""\
+ To: foo@example.com
+ MIME-Version: 1.0
+ Content-Type: multipart/alternative; boundary="==="
+
+ preamble
+
+ --===
+ Content-Type: text/plain
+
+ simple body
+
+ --===
+ Content-Type: text/html
+
+ <p>simple body</p>
+ --===--
+ """)),
+
+ 'plain_html_mixed': (
+ (None, 2, 1),
+ (),
+ (1, 2),
+ textwrap.dedent("""\
+ To: foo@example.com
+ MIME-Version: 1.0
+ Content-Type: multipart/mixed; boundary="==="
+
+ preamble
+
+ --===
+ Content-Type: text/plain
+
+ simple body
+
+ --===
+ Content-Type: text/html
+
+ <p>simple body</p>
+
+ --===--
+ """)),
+
+ 'plain_html_attachment_mixed': (
+ (None, None, 1),
+ (2,),
+ (1, 2),
+ textwrap.dedent("""\
+ To: foo@example.com
+ MIME-Version: 1.0
+ Content-Type: multipart/mixed; boundary="==="
+
+ --===
+ Content-Type: text/plain
+
+ simple body
+
+ --===
+ Content-Type: text/html
+ Content-Disposition: attachment
+
+ <p>simple body</p>
+
+ --===--
+ """)),
+
+ 'html_text_attachment_mixed': (
+ (None, 2, None),
+ (1,),
+ (1, 2),
+ textwrap.dedent("""\
+ To: foo@example.com
+ MIME-Version: 1.0
+ Content-Type: multipart/mixed; boundary="==="
+
+ --===
+ Content-Type: text/plain
+ Content-Disposition: AtTaChment
+
+ simple body
+
+ --===
+ Content-Type: text/html
+
+ <p>simple body</p>
+
+ --===--
+ """)),
+
+ 'html_text_attachment_inline_mixed': (
+ (None, 2, 1),
+ (),
+ (1, 2),
+ textwrap.dedent("""\
+ To: foo@example.com
+ MIME-Version: 1.0
+ Content-Type: multipart/mixed; boundary="==="
+
+ --===
+ Content-Type: text/plain
+ Content-Disposition: InLine
+
+ simple body
+
+ --===
+ Content-Type: text/html
+ Content-Disposition: inline
+
+ <p>simple body</p>
+
+ --===--
+ """)),
+
+ # RFC 2387
+ 'related': (
+ (0, 1, None),
+ (2,),
+ (1, 2),
+ textwrap.dedent("""\
+ To: foo@example.com
+ MIME-Version: 1.0
+ Content-Type: multipart/related; boundary="==="; type=text/html
+
+ --===
+ Content-Type: text/html
+
+ <p>simple body</p>
+
+ --===
+ Content-Type: image/jpg
+ Content-ID: <image1>
+
+ bogus data
+
+ --===--
+ """)),
+
+ # This message structure will probably never be seen in the wild, but
+ # it proves we distinguish between text parts based on 'start'. The
+ # content would not, of course, actually work :)
+ 'related_with_start': (
+ (0, 2, None),
+ (1,),
+ (1, 2),
+ textwrap.dedent("""\
+ To: foo@example.com
+ MIME-Version: 1.0
+ Content-Type: multipart/related; boundary="==="; type=text/html;
+ start="<body>"
+
+ --===
+ Content-Type: text/html
+ Content-ID: <include>
+
+ useless text
+
+ --===
+ Content-Type: text/html
+ Content-ID: <body>
+
+ <p>simple body</p>
+ <!--#include file="<include>"-->
+
+ --===--
+ """)),
+
+
+ 'mixed_alternative_plain_related': (
+ (3, 4, 2),
+ (6, 7),
+ (1, 6, 7),
+ textwrap.dedent("""\
+ To: foo@example.com
+ MIME-Version: 1.0
+ Content-Type: multipart/mixed; boundary="==="
+
+ --===
+ Content-Type: multipart/alternative; boundary="+++"
+
+ --+++
+ Content-Type: text/plain
+
+ simple body
+
+ --+++
+ Content-Type: multipart/related; boundary="___"
+
+ --___
+ Content-Type: text/html
+
+ <p>simple body</p>
+
+ --___
+ Content-Type: image/jpg
+ Content-ID: <image1@cid>
+
+ bogus jpg body
+
+ --___--
+
+ --+++--
+
+ --===
+ Content-Type: image/jpg
+ Content-Disposition: attachment
+
+ bogus jpg body
+
+ --===
+ Content-Type: image/jpg
+ Content-Disposition: AttacHmenT
+
+ another bogus jpg body
+
+ --===--
+ """)),
+
+ # This structure suggested by Stephen J. Turnbull...may not exist/be
+ # supported in the wild, but we want to support it.
+ 'mixed_related_alternative_plain_html': (
+ (1, 4, 3),
+ (6, 7),
+ (1, 6, 7),
+ textwrap.dedent("""\
+ To: foo@example.com
+ MIME-Version: 1.0
+ Content-Type: multipart/mixed; boundary="==="
+
+ --===
+ Content-Type: multipart/related; boundary="+++"
+
+ --+++
+ Content-Type: multipart/alternative; boundary="___"
+
+ --___
+ Content-Type: text/plain
+
+ simple body
+
+ --___
+ Content-Type: text/html
+
+ <p>simple body</p>
+
+ --___--
+
+ --+++
+ Content-Type: image/jpg
+ Content-ID: <image1@cid>
+
+ bogus jpg body
+
+ --+++--
+
+ --===
+ Content-Type: image/jpg
+ Content-Disposition: attachment
+
+ bogus jpg body
+
+ --===
+ Content-Type: image/jpg
+ Content-Disposition: attachment
+
+ another bogus jpg body
+
+ --===--
+ """)),
+
+ # Same thing, but proving we only look at the root part, which is the
+ # first one if there isn't any start parameter. That is, this is a
+ # broken related.
+ 'mixed_related_alternative_plain_html_wrong_order': (
+ (1, None, None),
+ (6, 7),
+ (1, 6, 7),
+ textwrap.dedent("""\
+ To: foo@example.com
+ MIME-Version: 1.0
+ Content-Type: multipart/mixed; boundary="==="
+
+ --===
+ Content-Type: multipart/related; boundary="+++"
+
+ --+++
+ Content-Type: image/jpg
+ Content-ID: <image1@cid>
+
+ bogus jpg body
+
+ --+++
+ Content-Type: multipart/alternative; boundary="___"
+
+ --___
+ Content-Type: text/plain
+
+ simple body
+
+ --___
+ Content-Type: text/html
+
+ <p>simple body</p>
+
+ --___--
+
+ --+++--
+
+ --===
+ Content-Type: image/jpg
+ Content-Disposition: attachment
+
+ bogus jpg body
+
+ --===
+ Content-Type: image/jpg
+ Content-Disposition: attachment
+
+ another bogus jpg body
+
+ --===--
+ """)),
+
+ 'message_rfc822': (
+ (None, None, None),
+ (),
+ (),
+ textwrap.dedent("""\
+ To: foo@example.com
+ MIME-Version: 1.0
+ Content-Type: message/rfc822
+
+ To: bar@example.com
+ From: robot@examp.com
+
+ this is a message body.
+ """)),
+
+ 'mixed_text_message_rfc822': (
+ (None, None, 1),
+ (2,),
+ (1, 2),
+ textwrap.dedent("""\
+ To: foo@example.com
+ MIME-Version: 1.0
+ Content-Type: multipart/mixed; boundary="==="
+
+ --===
+ Content-Type: text/plain
+
+ Your message has bounced, ser.
+
+ --===
+ Content-Type: message/rfc822
+
+ To: bar@example.com
+ From: robot@examp.com
+
+ this is a message body.
+
+ --===--
+ """)),
+
+ }
+
+ def message_as_get_body(self, body_parts, attachments, parts, msg):
+ m = self._str_msg(msg)
+ allparts = list(m.walk())
+ expected = [None if n is None else allparts[n] for n in body_parts]
+ related = 0; html = 1; plain = 2
+ self.assertEqual(m.get_body(), first(expected))
+ self.assertEqual(m.get_body(preferencelist=(
+ 'related', 'html', 'plain')),
+ first(expected))
+ self.assertEqual(m.get_body(preferencelist=('related', 'html')),
+ first(expected[related:html+1]))
+ self.assertEqual(m.get_body(preferencelist=('related', 'plain')),
+ first([expected[related], expected[plain]]))
+ self.assertEqual(m.get_body(preferencelist=('html', 'plain')),
+ first(expected[html:plain+1]))
+ self.assertEqual(m.get_body(preferencelist=['related']),
+ expected[related])
+ self.assertEqual(m.get_body(preferencelist=['html']), expected[html])
+ self.assertEqual(m.get_body(preferencelist=['plain']), expected[plain])
+ self.assertEqual(m.get_body(preferencelist=('plain', 'html')),
+ first(expected[plain:html-1:-1]))
+ self.assertEqual(m.get_body(preferencelist=('plain', 'related')),
+ first([expected[plain], expected[related]]))
+ self.assertEqual(m.get_body(preferencelist=('html', 'related')),
+ first(expected[html::-1]))
+ self.assertEqual(m.get_body(preferencelist=('plain', 'html', 'related')),
+ first(expected[::-1]))
+ self.assertEqual(m.get_body(preferencelist=('html', 'plain', 'related')),
+ first([expected[html],
+ expected[plain],
+ expected[related]]))
+
+ def message_as_iter_attachment(self, body_parts, attachments, parts, msg):
+ m = self._str_msg(msg)
+ allparts = list(m.walk())
+ attachments = [allparts[n] for n in attachments]
+ self.assertEqual(list(m.iter_attachments()), attachments)
+
+ def message_as_iter_parts(self, body_parts, attachments, parts, msg):
+ m = self._str_msg(msg)
+ allparts = list(m.walk())
+ parts = [allparts[n] for n in parts]
+ self.assertEqual(list(m.iter_parts()), parts)
+
+ class _TestContentManager:
+ def get_content(self, msg, *args, **kw):
+ return msg, args, kw
+ def set_content(self, msg, *args, **kw):
+ self.msg = msg
+ self.args = args
+ self.kw = kw
+
+ def test_get_content_with_cm(self):
+ m = self._str_msg('')
+ cm = self._TestContentManager()
+ self.assertEqual(m.get_content(content_manager=cm), (m, (), {}))
+ msg, args, kw = m.get_content('foo', content_manager=cm, bar=1, k=2)
+ self.assertEqual(msg, m)
+ self.assertEqual(args, ('foo',))
+ self.assertEqual(kw, dict(bar=1, k=2))
+
+ def test_get_content_default_cm_comes_from_policy(self):
+ p = policy.default.clone(content_manager=self._TestContentManager())
+ m = self._str_msg('', policy=p)
+ self.assertEqual(m.get_content(), (m, (), {}))
+ msg, args, kw = m.get_content('foo', bar=1, k=2)
+ self.assertEqual(msg, m)
+ self.assertEqual(args, ('foo',))
+ self.assertEqual(kw, dict(bar=1, k=2))
+
+ def test_set_content_with_cm(self):
+ m = self._str_msg('')
+ cm = self._TestContentManager()
+ m.set_content(content_manager=cm)
+ self.assertEqual(cm.msg, m)
+ self.assertEqual(cm.args, ())
+ self.assertEqual(cm.kw, {})
+ m.set_content('foo', content_manager=cm, bar=1, k=2)
+ self.assertEqual(cm.msg, m)
+ self.assertEqual(cm.args, ('foo',))
+ self.assertEqual(cm.kw, dict(bar=1, k=2))
+
+ def test_set_content_default_cm_comes_from_policy(self):
+ cm = self._TestContentManager()
+ p = policy.default.clone(content_manager=cm)
+ m = self._str_msg('', policy=p)
+ m.set_content()
+ self.assertEqual(cm.msg, m)
+ self.assertEqual(cm.args, ())
+ self.assertEqual(cm.kw, {})
+ m.set_content('foo', bar=1, k=2)
+ self.assertEqual(cm.msg, m)
+ self.assertEqual(cm.args, ('foo',))
+ self.assertEqual(cm.kw, dict(bar=1, k=2))
+
+ # outcome is whether xxx_method should raise ValueError error when called
+ # on multipart/subtype. Blank outcome means it depends on xxx (add
+ # succeeds, make raises). Note: 'none' means there are content-type
+ # headers but payload is None...this happening in practice would be very
+ # unusual, so treating it as if there were content seems reasonable.
+ # method subtype outcome
+ subtype_params = (
+ ('related', 'no_content', 'succeeds'),
+ ('related', 'none', 'succeeds'),
+ ('related', 'plain', 'succeeds'),
+ ('related', 'related', ''),
+ ('related', 'alternative', 'raises'),
+ ('related', 'mixed', 'raises'),
+ ('alternative', 'no_content', 'succeeds'),
+ ('alternative', 'none', 'succeeds'),
+ ('alternative', 'plain', 'succeeds'),
+ ('alternative', 'related', 'succeeds'),
+ ('alternative', 'alternative', ''),
+ ('alternative', 'mixed', 'raises'),
+ ('mixed', 'no_content', 'succeeds'),
+ ('mixed', 'none', 'succeeds'),
+ ('mixed', 'plain', 'succeeds'),
+ ('mixed', 'related', 'succeeds'),
+ ('mixed', 'alternative', 'succeeds'),
+ ('mixed', 'mixed', ''),
+ )
+
+ def _make_subtype_test_message(self, subtype):
+ m = self.message()
+ payload = None
+ msg_headers = [
+ ('To', 'foo@bar.com'),
+ ('From', 'bar@foo.com'),
+ ]
+ if subtype != 'no_content':
+ ('content-shadow', 'Logrus'),
+ msg_headers.append(('X-Random-Header', 'Corwin'))
+ if subtype == 'text':
+ payload = ''
+ msg_headers.append(('Content-Type', 'text/plain'))
+ m.set_payload('')
+ elif subtype != 'no_content':
+ payload = []
+ msg_headers.append(('Content-Type', 'multipart/' + subtype))
+ msg_headers.append(('X-Trump', 'Random'))
+ m.set_payload(payload)
+ for name, value in msg_headers:
+ m[name] = value
+ return m, msg_headers, payload
+
+ def _check_disallowed_subtype_raises(self, m, method_name, subtype, method):
+ with self.assertRaises(ValueError) as ar:
+ getattr(m, method)()
+ exc_text = str(ar.exception)
+ self.assertIn(subtype, exc_text)
+ self.assertIn(method_name, exc_text)
+
+ def _check_make_multipart(self, m, msg_headers, payload):
+ count = 0
+ for name, value in msg_headers:
+ if not name.lower().startswith('content-'):
+ self.assertEqual(m[name], value)
+ count += 1
+ self.assertEqual(len(m), count+1) # +1 for new Content-Type
+ part = next(m.iter_parts())
+ count = 0
+ for name, value in msg_headers:
+ if name.lower().startswith('content-'):
+ self.assertEqual(part[name], value)
+ count += 1
+ self.assertEqual(len(part), count)
+ self.assertEqual(part.get_payload(), payload)
+
+ def subtype_as_make(self, method, subtype, outcome):
+ m, msg_headers, payload = self._make_subtype_test_message(subtype)
+ make_method = 'make_' + method
+ if outcome in ('', 'raises'):
+ self._check_disallowed_subtype_raises(m, method, subtype, make_method)
+ return
+ getattr(m, make_method)()
+ self.assertEqual(m.get_content_maintype(), 'multipart')
+ self.assertEqual(m.get_content_subtype(), method)
+ if subtype == 'no_content':
+ self.assertEqual(len(m.get_payload()), 0)
+ self.assertEqual(m.items(),
+ msg_headers + [('Content-Type',
+ 'multipart/'+method)])
+ else:
+ self.assertEqual(len(m.get_payload()), 1)
+ self._check_make_multipart(m, msg_headers, payload)
+
+ def subtype_as_make_with_boundary(self, method, subtype, outcome):
+ # Doing all variation is a bit of overkill...
+ m = self.message()
+ if outcome in ('', 'raises'):
+ m['Content-Type'] = 'multipart/' + subtype
+ with self.assertRaises(ValueError) as cm:
+ getattr(m, 'make_' + method)()
+ return
+ if subtype == 'plain':
+ m['Content-Type'] = 'text/plain'
+ elif subtype != 'no_content':
+ m['Content-Type'] = 'multipart/' + subtype
+ getattr(m, 'make_' + method)(boundary="abc")
+ self.assertTrue(m.is_multipart())
+ self.assertEqual(m.get_boundary(), 'abc')
+
+ def test_policy_on_part_made_by_make_comes_from_message(self):
+ for method in ('make_related', 'make_alternative', 'make_mixed'):
+ m = self.message(policy=self.policy.clone(content_manager='foo'))
+ m['Content-Type'] = 'text/plain'
+ getattr(m, method)()
+ self.assertEqual(m.get_payload(0).policy.content_manager, 'foo')
+
+ class _TestSetContentManager:
+ def set_content(self, msg, content, *args, **kw):
+ msg['Content-Type'] = 'text/plain'
+ msg.set_payload(content)
+
+ def subtype_as_add(self, method, subtype, outcome):
+ m, msg_headers, payload = self._make_subtype_test_message(subtype)
+ cm = self._TestSetContentManager()
+ add_method = 'add_attachment' if method=='mixed' else 'add_' + method
+ if outcome == 'raises':
+ self._check_disallowed_subtype_raises(m, method, subtype, add_method)
+ return
+ getattr(m, add_method)('test', content_manager=cm)
+ self.assertEqual(m.get_content_maintype(), 'multipart')
+ self.assertEqual(m.get_content_subtype(), method)
+ if method == subtype or subtype == 'no_content':
+ self.assertEqual(len(m.get_payload()), 1)
+ for name, value in msg_headers:
+ self.assertEqual(m[name], value)
+ part = m.get_payload()[0]
+ else:
+ self.assertEqual(len(m.get_payload()), 2)
+ self._check_make_multipart(m, msg_headers, payload)
+ part = m.get_payload()[1]
+ self.assertEqual(part.get_content_type(), 'text/plain')
+ self.assertEqual(part.get_payload(), 'test')
+ if method=='mixed':
+ self.assertEqual(part['Content-Disposition'], 'attachment')
+ elif method=='related':
+ self.assertEqual(part['Content-Disposition'], 'inline')
+ else:
+ # Otherwise we don't guess.
+ self.assertIsNone(part['Content-Disposition'])
+
+ class _TestSetRaisingContentManager:
+ def set_content(self, msg, content, *args, **kw):
+ raise Exception('test')
+
+ def test_default_content_manager_for_add_comes_from_policy(self):
+ cm = self._TestSetRaisingContentManager()
+ m = self.message(policy=self.policy.clone(content_manager=cm))
+ for method in ('add_related', 'add_alternative', 'add_attachment'):
+ with self.assertRaises(Exception) as ar:
+ getattr(m, method)('')
+ self.assertEqual(str(ar.exception), 'test')
+
+ def message_as_clear(self, body_parts, attachments, parts, msg):
+ m = self._str_msg(msg)
+ m.clear()
+ self.assertEqual(len(m), 0)
+ self.assertEqual(list(m.items()), [])
+ self.assertIsNone(m.get_payload())
+ self.assertEqual(list(m.iter_parts()), [])
+
+ def message_as_clear_content(self, body_parts, attachments, parts, msg):
+ m = self._str_msg(msg)
+ expected_headers = [h for h in m.keys()
+ if not h.lower().startswith('content-')]
+ m.clear_content()
+ self.assertEqual(list(m.keys()), expected_headers)
+ self.assertIsNone(m.get_payload())
+ self.assertEqual(list(m.iter_parts()), [])
+
+ def test_is_attachment(self):
+ m = self._make_message()
+ self.assertFalse(m.is_attachment)
+ m['Content-Disposition'] = 'inline'
+ self.assertFalse(m.is_attachment)
+ m.replace_header('Content-Disposition', 'attachment')
+ self.assertTrue(m.is_attachment)
+ m.replace_header('Content-Disposition', 'AtTachMent')
+ self.assertTrue(m.is_attachment)
+
+
+
+class TestEmailMessage(TestEmailMessageBase, TestEmailBase):
+ message = EmailMessage
+
+ def test_set_content_adds_MIME_Version(self):
+ m = self._str_msg('')
+ cm = self._TestContentManager()
+ self.assertNotIn('MIME-Version', m)
+ m.set_content(content_manager=cm)
+ self.assertEqual(m['MIME-Version'], '1.0')
+
+ class _MIME_Version_adding_CM:
+ def set_content(self, msg, *args, **kw):
+ msg['MIME-Version'] = '1.0'
+
+ def test_set_content_does_not_duplicate_MIME_Version(self):
+ m = self._str_msg('')
+ cm = self._MIME_Version_adding_CM()
+ self.assertNotIn('MIME-Version', m)
+ m.set_content(content_manager=cm)
+ self.assertEqual(m['MIME-Version'], '1.0')
+
+
+class TestMIMEPart(TestEmailMessageBase, TestEmailBase):
+ # Doing the full test run here may seem a bit redundant, since the two
+ # classes are almost identical. But what if they drift apart? So we do
+ # the full tests so that any future drift doesn't introduce bugs.
+ message = MIMEPart
+
+ def test_set_content_does_not_add_MIME_Version(self):
+ m = self._str_msg('')
+ cm = self._TestContentManager()
+ self.assertNotIn('MIME-Version', m)
+ m.set_content(content_manager=cm)
+ self.assertNotIn('MIME-Version', m)
+
if __name__ == '__main__':
unittest.main()
diff --git a/Lib/test/test_email/test_policy.py b/Lib/test/test_email/test_policy.py
index 983bd49..06ad5f2 100644
--- a/Lib/test/test_email/test_policy.py
+++ b/Lib/test/test_email/test_policy.py
@@ -30,6 +30,7 @@ class PolicyAPITests(unittest.TestCase):
'raise_on_defect': False,
'header_factory': email.policy.EmailPolicy.header_factory,
'refold_source': 'long',
+ 'content_manager': email.policy.EmailPolicy.content_manager,
})
# For each policy under test, we give here what we expect the defaults to
diff --git a/Lib/test/test_email/torture_test.py b/Lib/test/test_email/torture_test.py
index 544b1bb..19cf64f 100644
--- a/Lib/test/test_email/torture_test.py
+++ b/Lib/test/test_email/torture_test.py
@@ -27,7 +27,7 @@ def openfile(filename):
# Prevent this test from running in the Python distro
try:
openfile('crispin-torture.txt')
-except IOError:
+except OSError:
raise TestSkipped