summaryrefslogtreecommitdiffstats
path: root/Lib/test/test_email/test_parser.py
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/test/test_email/test_parser.py')
-rw-r--r--Lib/test/test_email/test_parser.py276
1 files changed, 276 insertions, 0 deletions
diff --git a/Lib/test/test_email/test_parser.py b/Lib/test/test_email/test_parser.py
new file mode 100644
index 0000000..864e4c1
--- /dev/null
+++ b/Lib/test/test_email/test_parser.py
@@ -0,0 +1,276 @@
+import io
+import email
+import textwrap
+import unittest
+from email._policybase import Compat32
+from email import errors
+from email.message import Message
+from test.test_email import TestEmailBase
+
+
+class TestCustomMessage(TestEmailBase):
+
+ class MyMessage(Message):
+ def __init__(self, policy):
+ self.check_policy = policy
+ super().__init__()
+
+ MyPolicy = TestEmailBase.policy.clone(linesep='boo')
+
+ def test_custom_message_gets_policy_if_possible_from_string(self):
+ msg = email.message_from_string("Subject: bogus\n\nmsg\n",
+ self.MyMessage,
+ policy=self.MyPolicy)
+ self.assertTrue(isinstance(msg, self.MyMessage))
+ self.assertIs(msg.check_policy, self.MyPolicy)
+
+ def test_custom_message_gets_policy_if_possible_from_file(self):
+ source_file = io.StringIO("Subject: bogus\n\nmsg\n")
+ msg = email.message_from_file(source_file,
+ self.MyMessage,
+ policy=self.MyPolicy)
+ self.assertTrue(isinstance(msg, self.MyMessage))
+ self.assertIs(msg.check_policy, self.MyPolicy)
+
+ # XXX add tests for other functions that take Message arg.
+
+
+class TestMessageDefectDetectionBase:
+
+ dup_boundary_msg = textwrap.dedent("""\
+ Subject: XX
+ From: xx@xx.dk
+ To: XX
+ Mime-version: 1.0
+ Content-type: multipart/mixed;
+ boundary="MS_Mac_OE_3071477847_720252_MIME_Part"
+
+ --MS_Mac_OE_3071477847_720252_MIME_Part
+ Content-type: multipart/alternative;
+ boundary="MS_Mac_OE_3071477847_720252_MIME_Part"
+
+ --MS_Mac_OE_3071477847_720252_MIME_Part
+ Content-type: text/plain; charset="ISO-8859-1"
+ Content-transfer-encoding: quoted-printable
+
+ text
+
+ --MS_Mac_OE_3071477847_720252_MIME_Part
+ Content-type: text/html; charset="ISO-8859-1"
+ Content-transfer-encoding: quoted-printable
+
+ <HTML></HTML>
+
+ --MS_Mac_OE_3071477847_720252_MIME_Part--
+
+ --MS_Mac_OE_3071477847_720252_MIME_Part
+ Content-type: image/gif; name="xx.gif";
+ Content-disposition: attachment
+ Content-transfer-encoding: base64
+
+ Some removed base64 encoded chars.
+
+ --MS_Mac_OE_3071477847_720252_MIME_Part--
+
+ """)
+
+ def test_same_boundary_inner_outer(self):
+ # XXX better would be to actually detect the duplicate.
+ msg = self._str_msg(self.dup_boundary_msg)
+ inner = msg.get_payload(0)
+ self.assertTrue(hasattr(inner, 'defects'))
+ self.assertEqual(len(self.get_defects(inner)), 1)
+ self.assertTrue(isinstance(self.get_defects(inner)[0],
+ errors.StartBoundaryNotFoundDefect))
+
+ def test_same_boundary_inner_outer_raises_on_defect(self):
+ with self.assertRaises(errors.StartBoundaryNotFoundDefect):
+ self._str_msg(self.dup_boundary_msg,
+ policy=self.policy.clone(raise_on_defect=True))
+
+ no_boundary_msg = textwrap.dedent("""\
+ Date: Fri, 6 Apr 2001 09:23:06 -0800 (GMT-0800)
+ From: foobar
+ Subject: broken mail
+ MIME-Version: 1.0
+ Content-Type: multipart/report; report-type=delivery-status;
+
+ --JAB03225.986577786/zinfandel.lacita.com
+
+ One part
+
+ --JAB03225.986577786/zinfandel.lacita.com
+ Content-Type: message/delivery-status
+
+ Header: Another part
+
+ --JAB03225.986577786/zinfandel.lacita.com--
+ """)
+
+ def test_multipart_no_boundary(self):
+ msg = self._str_msg(self.no_boundary_msg)
+ self.assertTrue(isinstance(msg.get_payload(), str))
+ self.assertEqual(len(self.get_defects(msg)), 2)
+ self.assertTrue(isinstance(self.get_defects(msg)[0],
+ errors.NoBoundaryInMultipartDefect))
+ self.assertTrue(isinstance(self.get_defects(msg)[1],
+ errors.MultipartInvariantViolationDefect))
+
+ def test_multipart_no_boundary_raise_on_defect(self):
+ with self.assertRaises(errors.NoBoundaryInMultipartDefect):
+ self._str_msg(self.no_boundary_msg,
+ policy=self.policy.clone(raise_on_defect=True))
+
+ multipart_msg = textwrap.dedent("""\
+ Date: Wed, 14 Nov 2007 12:56:23 GMT
+ From: foo@bar.invalid
+ To: foo@bar.invalid
+ Subject: Content-Transfer-Encoding: base64 and multipart
+ MIME-Version: 1.0
+ Content-Type: multipart/mixed;
+ boundary="===============3344438784458119861=="{}
+
+ --===============3344438784458119861==
+ Content-Type: text/plain
+
+ Test message
+
+ --===============3344438784458119861==
+ Content-Type: application/octet-stream
+ Content-Transfer-Encoding: base64
+
+ YWJj
+
+ --===============3344438784458119861==--
+ """)
+
+ def test_multipart_invalid_cte(self):
+ msg = self._str_msg(
+ self.multipart_msg.format("\nContent-Transfer-Encoding: base64"))
+ self.assertEqual(len(self.get_defects(msg)), 1)
+ self.assertIsInstance(self.get_defects(msg)[0],
+ errors.InvalidMultipartContentTransferEncodingDefect)
+
+ def test_multipart_invalid_cte_raise_on_defect(self):
+ with self.assertRaises(
+ errors.InvalidMultipartContentTransferEncodingDefect):
+ self._str_msg(
+ self.multipart_msg.format(
+ "\nContent-Transfer-Encoding: base64"),
+ policy=self.policy.clone(raise_on_defect=True))
+
+ def test_multipart_no_cte_no_defect(self):
+ msg = self._str_msg(self.multipart_msg.format(''))
+ self.assertEqual(len(self.get_defects(msg)), 0)
+
+ def test_multipart_valid_cte_no_defect(self):
+ for cte in ('7bit', '8bit', 'BINary'):
+ msg = self._str_msg(
+ self.multipart_msg.format("\nContent-Transfer-Encoding: "+cte))
+ self.assertEqual(len(self.get_defects(msg)), 0, "cte="+cte)
+
+ lying_multipart_msg = textwrap.dedent("""\
+ From: "Allison Dunlap" <xxx@example.com>
+ To: yyy@example.com
+ Subject: 64423
+ Date: Sun, 11 Jul 2004 16:09:27 -0300
+ MIME-Version: 1.0
+ Content-Type: multipart/alternative;
+
+ Blah blah blah
+ """)
+
+ def test_lying_multipart(self):
+ msg = self._str_msg(self.lying_multipart_msg)
+ self.assertTrue(hasattr(msg, 'defects'))
+ self.assertEqual(len(self.get_defects(msg)), 2)
+ self.assertTrue(isinstance(self.get_defects(msg)[0],
+ errors.NoBoundaryInMultipartDefect))
+ self.assertTrue(isinstance(self.get_defects(msg)[1],
+ errors.MultipartInvariantViolationDefect))
+
+ def test_lying_multipart_raise_on_defect(self):
+ with self.assertRaises(errors.NoBoundaryInMultipartDefect):
+ self._str_msg(self.lying_multipart_msg,
+ policy=self.policy.clone(raise_on_defect=True))
+
+ missing_start_boundary_msg = textwrap.dedent("""\
+ Content-Type: multipart/mixed; boundary="AAA"
+ From: Mail Delivery Subsystem <xxx@example.com>
+ To: yyy@example.com
+
+ --AAA
+
+ Stuff
+
+ --AAA
+ Content-Type: message/rfc822
+
+ From: webmaster@python.org
+ To: zzz@example.com
+ Content-Type: multipart/mixed; boundary="BBB"
+
+ --BBB--
+
+ --AAA--
+
+ """)
+
+ def test_missing_start_boundary(self):
+ # The message structure is:
+ #
+ # multipart/mixed
+ # text/plain
+ # message/rfc822
+ # multipart/mixed [*]
+ #
+ # [*] This message is missing its start boundary
+ outer = self._str_msg(self.missing_start_boundary_msg)
+ bad = outer.get_payload(1).get_payload(0)
+ self.assertEqual(len(self.get_defects(bad)), 1)
+ self.assertTrue(isinstance(self.get_defects(bad)[0],
+ errors.StartBoundaryNotFoundDefect))
+
+ def test_missing_start_boundary_raise_on_defect(self):
+ with self.assertRaises(errors.StartBoundaryNotFoundDefect):
+ self._str_msg(self.missing_start_boundary_msg,
+ policy=self.policy.clone(raise_on_defect=True))
+
+ def test_first_line_is_continuation_header(self):
+ msg = self._str_msg(' Line 1\nLine 2\nLine 3')
+ self.assertEqual(msg.keys(), [])
+ self.assertEqual(msg.get_payload(), 'Line 2\nLine 3')
+ self.assertEqual(len(self.get_defects(msg)), 1)
+ self.assertTrue(isinstance(self.get_defects(msg)[0],
+ errors.FirstHeaderLineIsContinuationDefect))
+ self.assertEqual(self.get_defects(msg)[0].line, ' Line 1\n')
+
+ def test_first_line_is_continuation_header_raise_on_defect(self):
+ with self.assertRaises(errors.FirstHeaderLineIsContinuationDefect):
+ self._str_msg(' Line 1\nLine 2\nLine 3',
+ policy=self.policy.clone(raise_on_defect=True))
+
+
+class TestMessageDefectDetection(TestMessageDefectDetectionBase, TestEmailBase):
+
+ def get_defects(self, obj):
+ return obj.defects
+
+
+class TestMessageDefectDetectionCapture(TestMessageDefectDetectionBase,
+ TestEmailBase):
+
+ class CapturePolicy(Compat32):
+ captured = None
+ def register_defect(self, obj, defect):
+ self.captured.append(defect)
+
+ def setUp(self):
+ self.policy = self.CapturePolicy(captured=list())
+
+ def get_defects(self, obj):
+ return self.policy.captured
+
+
+if __name__ == '__main__':
+ unittest.main()