diff options
author | R David Murray <rdmurray@bitdance.com> | 2012-05-28 01:23:34 (GMT) |
---|---|---|
committer | R David Murray <rdmurray@bitdance.com> | 2012-05-28 01:23:34 (GMT) |
commit | 80e0aee95b8c4a7da8a1b794793a9e9537d021cf (patch) | |
tree | 36bc2ab00ee8552d10ea4b92f80f9bf9355a55fd | |
parent | adbdcdbd9527a3c4000cd4ff0678ff60151f1f79 (diff) | |
download | cpython-80e0aee95b8c4a7da8a1b794793a9e9537d021cf.zip cpython-80e0aee95b8c4a7da8a1b794793a9e9537d021cf.tar.gz cpython-80e0aee95b8c4a7da8a1b794793a9e9537d021cf.tar.bz2 |
#1672568: email now registers defects for base64 payload format errors.
Which also means that it is now producing *something* for any base64
payload, which is what leads to the couple of older test changes in
test_email. This is a slightly backward incompatible behavior change,
but the new behavior is so much more useful than the old (you can now
*reliably* detect errors, and any program that was detecting errors by
sniffing for a base64 return from get_payload(decode=True) and then doing
its own error-recovery decode will just get the error-recovery decode
right away). So this seems to me to be worth the small risk inherent
in this behavior change.
This patch also refactors the defect tests into a separate test file,
since they are no longer just parser tests.
-rw-r--r-- | Doc/library/email.errors.rst | 7 | ||||
-rw-r--r-- | Doc/library/email.message.rst | 8 | ||||
-rw-r--r-- | Lib/email/message.py | 12 | ||||
-rw-r--r-- | Lib/test/test_email/test_defect_handling.py | 304 | ||||
-rw-r--r-- | Lib/test/test_email/test_email.py | 32 | ||||
-rw-r--r-- | Lib/test/test_email/test_parser.py | 256 |
6 files changed, 344 insertions, 275 deletions
diff --git a/Doc/library/email.errors.rst b/Doc/library/email.errors.rst index b71ffa0..2bc3164 100644 --- a/Doc/library/email.errors.rst +++ b/Doc/library/email.errors.rst @@ -96,3 +96,10 @@ this class is *not* an exception! this defect, its :meth:`is_multipart` method may return false even though its content type claims to be :mimetype:`multipart`. +* :class:`InvalidBase64PaddingDefect` -- When decoding a block of base64 + enocded bytes, the padding was not correct. Enough padding is added to + perform the decode, but the resulting decoded bytes may be invalid. + +* :class:`InvalidBase64CharactersDefect` -- When decoding a block of base64 + enocded bytes, characters outside the base64 alphebet were encountered. + The characters are ignored, but the resulting decoded bytes may be invalid. diff --git a/Doc/library/email.message.rst b/Doc/library/email.message.rst index f685e54..59ab47d 100644 --- a/Doc/library/email.message.rst +++ b/Doc/library/email.message.rst @@ -111,10 +111,14 @@ Here are the methods of the :class:`Message` class: header. When ``True`` and the message is not a multipart, the payload will be decoded if this header's value is ``quoted-printable`` or ``base64``. If some other encoding is used, or :mailheader:`Content-Transfer-Encoding` - header is missing, or if the payload has bogus base64 data, the payload is + header is missing, the payload is returned as-is (undecoded). In all cases the returned value is binary data. If the message is a multipart and the *decode* flag is ``True``, - then ``None`` is returned. + then ``None`` is returned. If the payload is base64 and it was not + perfectly formed (missing padding, characters outside the base64 + alphabet), then an appropriate defect will be added to the message's + defect property (:class:`~email.errors.InvalidBase64PaddingDefect` or + :class:`~email.errors.InvalidBase64CharactersDefect`, respectively). When *decode* is ``False`` (the default) the body is returned as a string without decoding the :mailheader:`Content-Transfer-Encoding`. However, diff --git a/Lib/email/message.py b/Lib/email/message.py index 91976f1..62b82b7 100644 --- a/Lib/email/message.py +++ b/Lib/email/message.py @@ -17,6 +17,7 @@ from email import utils from email import errors from email._policybase import compat32 from email import charset as _charset +from email._encoded_words import decode_b Charset = _charset.Charset SEMISPACE = '; ' @@ -249,11 +250,12 @@ class Message: if cte == 'quoted-printable': return utils._qdecode(bpayload) elif cte == 'base64': - try: - return base64.b64decode(bpayload) - except binascii.Error: - # Incorrect padding - return bpayload + # XXX: this is a bit of a hack; decode_b should probably be factored + # out somewhere, but I haven't figured out where yet. + value, defects = decode_b(b''.join(bpayload.splitlines())) + for defect in defects: + self.policy.handle_defect(self, defect) + return value elif cte in ('x-uuencode', 'uuencode', 'uue', 'x-uue'): in_file = BytesIO(bpayload) out_file = BytesIO() diff --git a/Lib/test/test_email/test_defect_handling.py b/Lib/test/test_email/test_defect_handling.py new file mode 100644 index 0000000..d3df1e4 --- /dev/null +++ b/Lib/test/test_email/test_defect_handling.py @@ -0,0 +1,304 @@ +import textwrap +import unittest +from email._policybase import Compat32 +from email import errors +from test.test_email import TestEmailBase + + +class TestMessageDefectDetectionBase: + + dup_boundary_msg = textwrap.dedent("""\ + Subject: XX + From: xx@xx.dk + To: XX + Mime-version: 1.0 + Content-type: multipart/mixed; + boundary="MS_Mac_OE_3071477847_720252_MIME_Part" + + --MS_Mac_OE_3071477847_720252_MIME_Part + Content-type: multipart/alternative; + boundary="MS_Mac_OE_3071477847_720252_MIME_Part" + + --MS_Mac_OE_3071477847_720252_MIME_Part + Content-type: text/plain; charset="ISO-8859-1" + Content-transfer-encoding: quoted-printable + + text + + --MS_Mac_OE_3071477847_720252_MIME_Part + Content-type: text/html; charset="ISO-8859-1" + Content-transfer-encoding: quoted-printable + + <HTML></HTML> + + --MS_Mac_OE_3071477847_720252_MIME_Part-- + + --MS_Mac_OE_3071477847_720252_MIME_Part + Content-type: image/gif; name="xx.gif"; + Content-disposition: attachment + Content-transfer-encoding: base64 + + Some removed base64 encoded chars. + + --MS_Mac_OE_3071477847_720252_MIME_Part-- + + """) + + def test_same_boundary_inner_outer(self): + # XXX better would be to actually detect the duplicate. + msg = self._str_msg(self.dup_boundary_msg) + inner = msg.get_payload(0) + self.assertTrue(hasattr(inner, 'defects')) + self.assertEqual(len(self.get_defects(inner)), 1) + self.assertTrue(isinstance(self.get_defects(inner)[0], + errors.StartBoundaryNotFoundDefect)) + + def test_same_boundary_inner_outer_raises_on_defect(self): + with self.assertRaises(errors.StartBoundaryNotFoundDefect): + self._str_msg(self.dup_boundary_msg, + policy=self.policy.clone(raise_on_defect=True)) + + no_boundary_msg = textwrap.dedent("""\ + Date: Fri, 6 Apr 2001 09:23:06 -0800 (GMT-0800) + From: foobar + Subject: broken mail + MIME-Version: 1.0 + Content-Type: multipart/report; report-type=delivery-status; + + --JAB03225.986577786/zinfandel.lacita.com + + One part + + --JAB03225.986577786/zinfandel.lacita.com + Content-Type: message/delivery-status + + Header: Another part + + --JAB03225.986577786/zinfandel.lacita.com-- + """) + + def test_multipart_no_boundary(self): + msg = self._str_msg(self.no_boundary_msg) + self.assertTrue(isinstance(msg.get_payload(), str)) + self.assertEqual(len(self.get_defects(msg)), 2) + self.assertTrue(isinstance(self.get_defects(msg)[0], + errors.NoBoundaryInMultipartDefect)) + self.assertTrue(isinstance(self.get_defects(msg)[1], + errors.MultipartInvariantViolationDefect)) + + def test_multipart_no_boundary_raise_on_defect(self): + with self.assertRaises(errors.NoBoundaryInMultipartDefect): + self._str_msg(self.no_boundary_msg, + policy=self.policy.clone(raise_on_defect=True)) + + multipart_msg = textwrap.dedent("""\ + Date: Wed, 14 Nov 2007 12:56:23 GMT + From: foo@bar.invalid + To: foo@bar.invalid + Subject: Content-Transfer-Encoding: base64 and multipart + MIME-Version: 1.0 + Content-Type: multipart/mixed; + boundary="===============3344438784458119861=="{} + + --===============3344438784458119861== + Content-Type: text/plain + + Test message + + --===============3344438784458119861== + Content-Type: application/octet-stream + Content-Transfer-Encoding: base64 + + YWJj + + --===============3344438784458119861==-- + """) + + def test_multipart_invalid_cte(self): + msg = self._str_msg( + self.multipart_msg.format("\nContent-Transfer-Encoding: base64")) + self.assertEqual(len(self.get_defects(msg)), 1) + self.assertIsInstance(self.get_defects(msg)[0], + errors.InvalidMultipartContentTransferEncodingDefect) + + def test_multipart_invalid_cte_raise_on_defect(self): + with self.assertRaises( + errors.InvalidMultipartContentTransferEncodingDefect): + self._str_msg( + self.multipart_msg.format( + "\nContent-Transfer-Encoding: base64"), + policy=self.policy.clone(raise_on_defect=True)) + + def test_multipart_no_cte_no_defect(self): + msg = self._str_msg(self.multipart_msg.format('')) + self.assertEqual(len(self.get_defects(msg)), 0) + + def test_multipart_valid_cte_no_defect(self): + for cte in ('7bit', '8bit', 'BINary'): + msg = self._str_msg( + self.multipart_msg.format("\nContent-Transfer-Encoding: "+cte)) + self.assertEqual(len(self.get_defects(msg)), 0, "cte="+cte) + + lying_multipart_msg = textwrap.dedent("""\ + From: "Allison Dunlap" <xxx@example.com> + To: yyy@example.com + Subject: 64423 + Date: Sun, 11 Jul 2004 16:09:27 -0300 + MIME-Version: 1.0 + Content-Type: multipart/alternative; + + Blah blah blah + """) + + def test_lying_multipart(self): + msg = self._str_msg(self.lying_multipart_msg) + self.assertTrue(hasattr(msg, 'defects')) + self.assertEqual(len(self.get_defects(msg)), 2) + self.assertTrue(isinstance(self.get_defects(msg)[0], + errors.NoBoundaryInMultipartDefect)) + self.assertTrue(isinstance(self.get_defects(msg)[1], + errors.MultipartInvariantViolationDefect)) + + def test_lying_multipart_raise_on_defect(self): + with self.assertRaises(errors.NoBoundaryInMultipartDefect): + self._str_msg(self.lying_multipart_msg, + policy=self.policy.clone(raise_on_defect=True)) + + missing_start_boundary_msg = textwrap.dedent("""\ + Content-Type: multipart/mixed; boundary="AAA" + From: Mail Delivery Subsystem <xxx@example.com> + To: yyy@example.com + + --AAA + + Stuff + + --AAA + Content-Type: message/rfc822 + + From: webmaster@python.org + To: zzz@example.com + Content-Type: multipart/mixed; boundary="BBB" + + --BBB-- + + --AAA-- + + """) + + def test_missing_start_boundary(self): + # The message structure is: + # + # multipart/mixed + # text/plain + # message/rfc822 + # multipart/mixed [*] + # + # [*] This message is missing its start boundary + outer = self._str_msg(self.missing_start_boundary_msg) + bad = outer.get_payload(1).get_payload(0) + self.assertEqual(len(self.get_defects(bad)), 1) + self.assertTrue(isinstance(self.get_defects(bad)[0], + errors.StartBoundaryNotFoundDefect)) + + def test_missing_start_boundary_raise_on_defect(self): + with self.assertRaises(errors.StartBoundaryNotFoundDefect): + self._str_msg(self.missing_start_boundary_msg, + policy=self.policy.clone(raise_on_defect=True)) + + def test_first_line_is_continuation_header(self): + msg = self._str_msg(' Line 1\nSubject: test\n\nbody') + self.assertEqual(msg.keys(), ['Subject']) + self.assertEqual(msg.get_payload(), 'body') + self.assertEqual(len(self.get_defects(msg)), 1) + self.assertDefectsEqual(self.get_defects(msg), + [errors.FirstHeaderLineIsContinuationDefect]) + self.assertEqual(self.get_defects(msg)[0].line, ' Line 1\n') + + def test_first_line_is_continuation_header_raise_on_defect(self): + with self.assertRaises(errors.FirstHeaderLineIsContinuationDefect): + self._str_msg(' Line 1\nSubject: test\n\nbody\n', + policy=self.policy.clone(raise_on_defect=True)) + + def test_missing_header_body_separator(self): + # Our heuristic if we see a line that doesn't look like a header (no + # leading whitespace but no ':') is to assume that the blank line that + # separates the header from the body is missing, and to stop parsing + # headers and start parsing the body. + msg = self._str_msg('Subject: test\nnot a header\nTo: abc\n\nb\n') + self.assertEqual(msg.keys(), ['Subject']) + self.assertEqual(msg.get_payload(), 'not a header\nTo: abc\n\nb\n') + self.assertDefectsEqual(self.get_defects(msg), + [errors.MissingHeaderBodySeparatorDefect]) + + def test_missing_header_body_separator_raise_on_defect(self): + with self.assertRaises(errors.MissingHeaderBodySeparatorDefect): + self._str_msg('Subject: test\nnot a header\nTo: abc\n\nb\n', + policy=self.policy.clone(raise_on_defect=True)) + + badly_padded_base64_payload = textwrap.dedent("""\ + Subject: test + MIME-Version: 1.0 + Content-Type: text/plain; charset="utf-8" + Content-Transfer-Encoding: base64 + + dmk + """) + + def test_bad_padding_in_base64_payload(self): + msg = self._str_msg(self.badly_padded_base64_payload) + self.assertEqual(msg.get_payload(decode=True), b'vi') + self.assertDefectsEqual(self.get_defects(msg), + [errors.InvalidBase64PaddingDefect]) + + def test_bad_padding_in_base64_payload_raise_on_defect(self): + msg = self._str_msg(self.badly_padded_base64_payload, + policy=self.policy.clone(raise_on_defect=True)) + with self.assertRaises(errors.InvalidBase64PaddingDefect): + msg.get_payload(decode=True) + + invalid_chars_in_base64_payload = textwrap.dedent("""\ + Subject: test + MIME-Version: 1.0 + Content-Type: text/plain; charset="utf-8" + Content-Transfer-Encoding: base64 + + dm\x01k=== + """) + + def test_invalid_chars_in_base64_payload(self): + msg = self._str_msg(self.invalid_chars_in_base64_payload) + self.assertEqual(msg.get_payload(decode=True), b'vi') + self.assertDefectsEqual(self.get_defects(msg), + [errors.InvalidBase64CharactersDefect]) + + def test_invalid_chars_in_base64_payload_raise_on_defect(self): + msg = self._str_msg(self.invalid_chars_in_base64_payload, + policy=self.policy.clone(raise_on_defect=True)) + with self.assertRaises(errors.InvalidBase64CharactersDefect): + msg.get_payload(decode=True) + + +class TestMessageDefectDetection(TestMessageDefectDetectionBase, TestEmailBase): + + def get_defects(self, obj): + return obj.defects + + +class TestMessageDefectDetectionCapture(TestMessageDefectDetectionBase, + TestEmailBase): + + class CapturePolicy(Compat32): + captured = None + def register_defect(self, obj, defect): + self.captured.append(defect) + + def setUp(self): + self.policy = self.CapturePolicy(captured=list()) + + def get_defects(self, obj): + return self.policy.captured + + +if __name__ == '__main__': + unittest.main() diff --git a/Lib/test/test_email/test_email.py b/Lib/test/test_email/test_email.py index c04952c..5131b65 100644 --- a/Lib/test/test_email/test_email.py +++ b/Lib/test/test_email/test_email.py @@ -513,6 +513,7 @@ class TestMessageAPI(TestEmailBase): eq(msg.values(), ['One Hundred', 'Twenty', 'Three', 'Eleven']) self.assertRaises(KeyError, msg.replace_header, 'Fourth', 'Missing') + # test_defect_handling:test_invalid_chars_in_base64_payload def test_broken_base64_payload(self): x = 'AwDp0P7//y6LwKEAcPa/6Q=9' msg = Message() @@ -520,7 +521,10 @@ class TestMessageAPI(TestEmailBase): msg['content-transfer-encoding'] = 'base64' msg.set_payload(x) self.assertEqual(msg.get_payload(decode=True), - bytes(x, 'raw-unicode-escape')) + (b'\x03\x00\xe9\xd0\xfe\xff\xff.\x8b\xc0' + b'\xa1\x00p\xf6\xbf\xe9\x0f')) + self.assertIsInstance(msg.defects[0], + errors.InvalidBase64CharactersDefect) def test_broken_unicode_payload(self): # This test improves coverage but is not a compliance test. @@ -1815,7 +1819,7 @@ class TestNonConformant(TestEmailBase): eq(msg.get_content_maintype(), 'text') eq(msg.get_content_subtype(), 'plain') - # test_parser.TestMessageDefectDetectionBase + # test_defect_handling def test_same_boundary_inner_outer(self): unless = self.assertTrue msg = self._msgobj('msg_15.txt') @@ -1826,7 +1830,7 @@ class TestNonConformant(TestEmailBase): unless(isinstance(inner.defects[0], errors.StartBoundaryNotFoundDefect)) - # test_parser.TestMessageDefectDetectionBase + # test_defect_handling def test_multipart_no_boundary(self): unless = self.assertTrue msg = self._msgobj('msg_25.txt') @@ -1860,7 +1864,7 @@ class TestNonConformant(TestEmailBase): --===============3344438784458119861==-- """) - # test_parser.TestMessageDefectDetectionBase + # test_defect_handling def test_multipart_invalid_cte(self): msg = self._str_msg( self.multipart_msg.format("\nContent-Transfer-Encoding: base64")) @@ -1868,12 +1872,12 @@ class TestNonConformant(TestEmailBase): self.assertIsInstance(msg.defects[0], errors.InvalidMultipartContentTransferEncodingDefect) - # test_parser.TestMessageDefectDetectionBase + # test_defect_handling def test_multipart_no_cte_no_defect(self): msg = self._str_msg(self.multipart_msg.format('')) self.assertEqual(len(msg.defects), 0) - # test_parser.TestMessageDefectDetectionBase + # test_defect_handling def test_multipart_valid_cte_no_defect(self): for cte in ('7bit', '8bit', 'BINary'): msg = self._str_msg( @@ -1930,7 +1934,7 @@ Subject: here's something interesting counter to RFC 2822, there's no separating newline here """) - # test_parser.TestMessageDefectDetectionBase + # test_defect_handling def test_lying_multipart(self): unless = self.assertTrue msg = self._msgobj('msg_41.txt') @@ -1941,7 +1945,7 @@ counter to RFC 2822, there's no separating newline here unless(isinstance(msg.defects[1], errors.MultipartInvariantViolationDefect)) - # test_parser.TestMessageDefectDetectionBase + # test_defect_handling def test_missing_start_boundary(self): outer = self._msgobj('msg_42.txt') # The message structure is: @@ -1957,7 +1961,7 @@ counter to RFC 2822, there's no separating newline here self.assertTrue(isinstance(bad.defects[0], errors.StartBoundaryNotFoundDefect)) - # test_parser.TestMessageDefectDetectionBase + # test_defect_handling def test_first_line_is_continuation_header(self): eq = self.assertEqual m = ' Line 1\nSubject: test\n\nbody' @@ -3271,15 +3275,19 @@ class Test8BitBytesHandling(unittest.TestCase): self.assertEqual(msg.get_payload(decode=True), 'pöstál\n'.encode('utf-8')) + # test_defect_handling:test_invalid_chars_in_base64_payload def test_8bit_in_base64_body(self): - # Sticking an 8bit byte in a base64 block makes it undecodable by - # normal means, so the block is returned undecoded, but as bytes. + # If we get 8bit bytes in a base64 body, we can just ignore them + # as being outside the base64 alphabet and decode anyway. But + # we register a defect. m = self.bodytest_msg.format(charset='utf-8', cte='base64', bodyline='cMO2c3RhbAá=').encode('utf-8') msg = email.message_from_bytes(m) self.assertEqual(msg.get_payload(decode=True), - 'cMO2c3RhbAá=\n'.encode('utf-8')) + 'pöstal'.encode('utf-8')) + self.assertIsInstance(msg.defects[0], + errors.InvalidBase64CharactersDefect) def test_8bit_in_uuencode_body(self): # Sticking an 8bit byte in a uuencode block makes it undecodable by diff --git a/Lib/test/test_email/test_parser.py b/Lib/test/test_email/test_parser.py index f58e7c1..3abd11a 100644 --- a/Lib/test/test_email/test_parser.py +++ b/Lib/test/test_email/test_parser.py @@ -1,9 +1,6 @@ import io import email -import textwrap import unittest -from email._policybase import Compat32 -from email import errors from email.message import Message from test.test_email import TestEmailBase @@ -35,258 +32,5 @@ class TestCustomMessage(TestEmailBase): # XXX add tests for other functions that take Message arg. -class TestMessageDefectDetectionBase: - - dup_boundary_msg = textwrap.dedent("""\ - Subject: XX - From: xx@xx.dk - To: XX - Mime-version: 1.0 - Content-type: multipart/mixed; - boundary="MS_Mac_OE_3071477847_720252_MIME_Part" - - --MS_Mac_OE_3071477847_720252_MIME_Part - Content-type: multipart/alternative; - boundary="MS_Mac_OE_3071477847_720252_MIME_Part" - - --MS_Mac_OE_3071477847_720252_MIME_Part - Content-type: text/plain; charset="ISO-8859-1" - Content-transfer-encoding: quoted-printable - - text - - --MS_Mac_OE_3071477847_720252_MIME_Part - Content-type: text/html; charset="ISO-8859-1" - Content-transfer-encoding: quoted-printable - - <HTML></HTML> - - --MS_Mac_OE_3071477847_720252_MIME_Part-- - - --MS_Mac_OE_3071477847_720252_MIME_Part - Content-type: image/gif; name="xx.gif"; - Content-disposition: attachment - Content-transfer-encoding: base64 - - Some removed base64 encoded chars. - - --MS_Mac_OE_3071477847_720252_MIME_Part-- - - """) - - def test_same_boundary_inner_outer(self): - # XXX better would be to actually detect the duplicate. - msg = self._str_msg(self.dup_boundary_msg) - inner = msg.get_payload(0) - self.assertTrue(hasattr(inner, 'defects')) - self.assertEqual(len(self.get_defects(inner)), 1) - self.assertTrue(isinstance(self.get_defects(inner)[0], - errors.StartBoundaryNotFoundDefect)) - - def test_same_boundary_inner_outer_raises_on_defect(self): - with self.assertRaises(errors.StartBoundaryNotFoundDefect): - self._str_msg(self.dup_boundary_msg, - policy=self.policy.clone(raise_on_defect=True)) - - no_boundary_msg = textwrap.dedent("""\ - Date: Fri, 6 Apr 2001 09:23:06 -0800 (GMT-0800) - From: foobar - Subject: broken mail - MIME-Version: 1.0 - Content-Type: multipart/report; report-type=delivery-status; - - --JAB03225.986577786/zinfandel.lacita.com - - One part - - --JAB03225.986577786/zinfandel.lacita.com - Content-Type: message/delivery-status - - Header: Another part - - --JAB03225.986577786/zinfandel.lacita.com-- - """) - - def test_multipart_no_boundary(self): - msg = self._str_msg(self.no_boundary_msg) - self.assertTrue(isinstance(msg.get_payload(), str)) - self.assertEqual(len(self.get_defects(msg)), 2) - self.assertTrue(isinstance(self.get_defects(msg)[0], - errors.NoBoundaryInMultipartDefect)) - self.assertTrue(isinstance(self.get_defects(msg)[1], - errors.MultipartInvariantViolationDefect)) - - def test_multipart_no_boundary_raise_on_defect(self): - with self.assertRaises(errors.NoBoundaryInMultipartDefect): - self._str_msg(self.no_boundary_msg, - policy=self.policy.clone(raise_on_defect=True)) - - multipart_msg = textwrap.dedent("""\ - Date: Wed, 14 Nov 2007 12:56:23 GMT - From: foo@bar.invalid - To: foo@bar.invalid - Subject: Content-Transfer-Encoding: base64 and multipart - MIME-Version: 1.0 - Content-Type: multipart/mixed; - boundary="===============3344438784458119861=="{} - - --===============3344438784458119861== - Content-Type: text/plain - - Test message - - --===============3344438784458119861== - Content-Type: application/octet-stream - Content-Transfer-Encoding: base64 - - YWJj - - --===============3344438784458119861==-- - """) - - def test_multipart_invalid_cte(self): - msg = self._str_msg( - self.multipart_msg.format("\nContent-Transfer-Encoding: base64")) - self.assertEqual(len(self.get_defects(msg)), 1) - self.assertIsInstance(self.get_defects(msg)[0], - errors.InvalidMultipartContentTransferEncodingDefect) - - def test_multipart_invalid_cte_raise_on_defect(self): - with self.assertRaises( - errors.InvalidMultipartContentTransferEncodingDefect): - self._str_msg( - self.multipart_msg.format( - "\nContent-Transfer-Encoding: base64"), - policy=self.policy.clone(raise_on_defect=True)) - - def test_multipart_no_cte_no_defect(self): - msg = self._str_msg(self.multipart_msg.format('')) - self.assertEqual(len(self.get_defects(msg)), 0) - - def test_multipart_valid_cte_no_defect(self): - for cte in ('7bit', '8bit', 'BINary'): - msg = self._str_msg( - self.multipart_msg.format("\nContent-Transfer-Encoding: "+cte)) - self.assertEqual(len(self.get_defects(msg)), 0, "cte="+cte) - - lying_multipart_msg = textwrap.dedent("""\ - From: "Allison Dunlap" <xxx@example.com> - To: yyy@example.com - Subject: 64423 - Date: Sun, 11 Jul 2004 16:09:27 -0300 - MIME-Version: 1.0 - Content-Type: multipart/alternative; - - Blah blah blah - """) - - def test_lying_multipart(self): - msg = self._str_msg(self.lying_multipart_msg) - self.assertTrue(hasattr(msg, 'defects')) - self.assertEqual(len(self.get_defects(msg)), 2) - self.assertTrue(isinstance(self.get_defects(msg)[0], - errors.NoBoundaryInMultipartDefect)) - self.assertTrue(isinstance(self.get_defects(msg)[1], - errors.MultipartInvariantViolationDefect)) - - def test_lying_multipart_raise_on_defect(self): - with self.assertRaises(errors.NoBoundaryInMultipartDefect): - self._str_msg(self.lying_multipart_msg, - policy=self.policy.clone(raise_on_defect=True)) - - missing_start_boundary_msg = textwrap.dedent("""\ - Content-Type: multipart/mixed; boundary="AAA" - From: Mail Delivery Subsystem <xxx@example.com> - To: yyy@example.com - - --AAA - - Stuff - - --AAA - Content-Type: message/rfc822 - - From: webmaster@python.org - To: zzz@example.com - Content-Type: multipart/mixed; boundary="BBB" - - --BBB-- - - --AAA-- - - """) - - def test_missing_start_boundary(self): - # The message structure is: - # - # multipart/mixed - # text/plain - # message/rfc822 - # multipart/mixed [*] - # - # [*] This message is missing its start boundary - outer = self._str_msg(self.missing_start_boundary_msg) - bad = outer.get_payload(1).get_payload(0) - self.assertEqual(len(self.get_defects(bad)), 1) - self.assertTrue(isinstance(self.get_defects(bad)[0], - errors.StartBoundaryNotFoundDefect)) - - def test_missing_start_boundary_raise_on_defect(self): - with self.assertRaises(errors.StartBoundaryNotFoundDefect): - self._str_msg(self.missing_start_boundary_msg, - policy=self.policy.clone(raise_on_defect=True)) - - def test_first_line_is_continuation_header(self): - msg = self._str_msg(' Line 1\nSubject: test\n\nbody') - self.assertEqual(msg.keys(), ['Subject']) - self.assertEqual(msg.get_payload(), 'body') - self.assertEqual(len(self.get_defects(msg)), 1) - self.assertDefectsEqual(self.get_defects(msg), - [errors.FirstHeaderLineIsContinuationDefect]) - self.assertEqual(self.get_defects(msg)[0].line, ' Line 1\n') - - def test_first_line_is_continuation_header_raise_on_defect(self): - with self.assertRaises(errors.FirstHeaderLineIsContinuationDefect): - self._str_msg(' Line 1\nSubject: test\n\nbody\n', - policy=self.policy.clone(raise_on_defect=True)) - - def test_missing_header_body_separator(self): - # Our heuristic if we see a line that doesn't look like a header (no - # leading whitespace but no ':') is to assume that the blank line that - # separates the header from the body is missing, and to stop parsing - # headers and start parsing the body. - msg = self._str_msg('Subject: test\nnot a header\nTo: abc\n\nb\n') - self.assertEqual(msg.keys(), ['Subject']) - self.assertEqual(msg.get_payload(), 'not a header\nTo: abc\n\nb\n') - self.assertDefectsEqual(self.get_defects(msg), - [errors.MissingHeaderBodySeparatorDefect]) - - def test_missing_header_body_separator_raise_on_defect(self): - with self.assertRaises(errors.MissingHeaderBodySeparatorDefect): - self._str_msg('Subject: test\nnot a header\nTo: abc\n\nb\n', - policy=self.policy.clone(raise_on_defect=True)) - - -class TestMessageDefectDetection(TestMessageDefectDetectionBase, TestEmailBase): - - def get_defects(self, obj): - return obj.defects - - -class TestMessageDefectDetectionCapture(TestMessageDefectDetectionBase, - TestEmailBase): - - class CapturePolicy(Compat32): - captured = None - def register_defect(self, obj, defect): - self.captured.append(defect) - - def setUp(self): - self.policy = self.CapturePolicy(captured=list()) - - def get_defects(self, obj): - return self.policy.captured - - if __name__ == '__main__': unittest.main() |