summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorR David Murray <rdmurray@bitdance.com>2012-05-28 01:23:34 (GMT)
committerR David Murray <rdmurray@bitdance.com>2012-05-28 01:23:34 (GMT)
commit80e0aee95b8c4a7da8a1b794793a9e9537d021cf (patch)
tree36bc2ab00ee8552d10ea4b92f80f9bf9355a55fd
parentadbdcdbd9527a3c4000cd4ff0678ff60151f1f79 (diff)
downloadcpython-80e0aee95b8c4a7da8a1b794793a9e9537d021cf.zip
cpython-80e0aee95b8c4a7da8a1b794793a9e9537d021cf.tar.gz
cpython-80e0aee95b8c4a7da8a1b794793a9e9537d021cf.tar.bz2
#1672568: email now registers defects for base64 payload format errors.
Which also means that it is now producing *something* for any base64 payload, which is what leads to the couple of older test changes in test_email. This is a slightly backward incompatible behavior change, but the new behavior is so much more useful than the old (you can now *reliably* detect errors, and any program that was detecting errors by sniffing for a base64 return from get_payload(decode=True) and then doing its own error-recovery decode will just get the error-recovery decode right away). So this seems to me to be worth the small risk inherent in this behavior change. This patch also refactors the defect tests into a separate test file, since they are no longer just parser tests.
-rw-r--r--Doc/library/email.errors.rst7
-rw-r--r--Doc/library/email.message.rst8
-rw-r--r--Lib/email/message.py12
-rw-r--r--Lib/test/test_email/test_defect_handling.py304
-rw-r--r--Lib/test/test_email/test_email.py32
-rw-r--r--Lib/test/test_email/test_parser.py256
6 files changed, 344 insertions, 275 deletions
diff --git a/Doc/library/email.errors.rst b/Doc/library/email.errors.rst
index b71ffa0..2bc3164 100644
--- a/Doc/library/email.errors.rst
+++ b/Doc/library/email.errors.rst
@@ -96,3 +96,10 @@ this class is *not* an exception!
this defect, its :meth:`is_multipart` method may return false even though its
content type claims to be :mimetype:`multipart`.
+* :class:`InvalidBase64PaddingDefect` -- When decoding a block of base64
+ enocded bytes, the padding was not correct. Enough padding is added to
+ perform the decode, but the resulting decoded bytes may be invalid.
+
+* :class:`InvalidBase64CharactersDefect` -- When decoding a block of base64
+ enocded bytes, characters outside the base64 alphebet were encountered.
+ The characters are ignored, but the resulting decoded bytes may be invalid.
diff --git a/Doc/library/email.message.rst b/Doc/library/email.message.rst
index f685e54..59ab47d 100644
--- a/Doc/library/email.message.rst
+++ b/Doc/library/email.message.rst
@@ -111,10 +111,14 @@ Here are the methods of the :class:`Message` class:
header. When ``True`` and the message is not a multipart, the payload will
be decoded if this header's value is ``quoted-printable`` or ``base64``.
If some other encoding is used, or :mailheader:`Content-Transfer-Encoding`
- header is missing, or if the payload has bogus base64 data, the payload is
+ header is missing, the payload is
returned as-is (undecoded). In all cases the returned value is binary
data. If the message is a multipart and the *decode* flag is ``True``,
- then ``None`` is returned.
+ then ``None`` is returned. If the payload is base64 and it was not
+ perfectly formed (missing padding, characters outside the base64
+ alphabet), then an appropriate defect will be added to the message's
+ defect property (:class:`~email.errors.InvalidBase64PaddingDefect` or
+ :class:`~email.errors.InvalidBase64CharactersDefect`, respectively).
When *decode* is ``False`` (the default) the body is returned as a string
without decoding the :mailheader:`Content-Transfer-Encoding`. However,
diff --git a/Lib/email/message.py b/Lib/email/message.py
index 91976f1..62b82b7 100644
--- a/Lib/email/message.py
+++ b/Lib/email/message.py
@@ -17,6 +17,7 @@ from email import utils
from email import errors
from email._policybase import compat32
from email import charset as _charset
+from email._encoded_words import decode_b
Charset = _charset.Charset
SEMISPACE = '; '
@@ -249,11 +250,12 @@ class Message:
if cte == 'quoted-printable':
return utils._qdecode(bpayload)
elif cte == 'base64':
- try:
- return base64.b64decode(bpayload)
- except binascii.Error:
- # Incorrect padding
- return bpayload
+ # XXX: this is a bit of a hack; decode_b should probably be factored
+ # out somewhere, but I haven't figured out where yet.
+ value, defects = decode_b(b''.join(bpayload.splitlines()))
+ for defect in defects:
+ self.policy.handle_defect(self, defect)
+ return value
elif cte in ('x-uuencode', 'uuencode', 'uue', 'x-uue'):
in_file = BytesIO(bpayload)
out_file = BytesIO()
diff --git a/Lib/test/test_email/test_defect_handling.py b/Lib/test/test_email/test_defect_handling.py
new file mode 100644
index 0000000..d3df1e4
--- /dev/null
+++ b/Lib/test/test_email/test_defect_handling.py
@@ -0,0 +1,304 @@
+import textwrap
+import unittest
+from email._policybase import Compat32
+from email import errors
+from test.test_email import TestEmailBase
+
+
+class TestMessageDefectDetectionBase:
+
+ dup_boundary_msg = textwrap.dedent("""\
+ Subject: XX
+ From: xx@xx.dk
+ To: XX
+ Mime-version: 1.0
+ Content-type: multipart/mixed;
+ boundary="MS_Mac_OE_3071477847_720252_MIME_Part"
+
+ --MS_Mac_OE_3071477847_720252_MIME_Part
+ Content-type: multipart/alternative;
+ boundary="MS_Mac_OE_3071477847_720252_MIME_Part"
+
+ --MS_Mac_OE_3071477847_720252_MIME_Part
+ Content-type: text/plain; charset="ISO-8859-1"
+ Content-transfer-encoding: quoted-printable
+
+ text
+
+ --MS_Mac_OE_3071477847_720252_MIME_Part
+ Content-type: text/html; charset="ISO-8859-1"
+ Content-transfer-encoding: quoted-printable
+
+ <HTML></HTML>
+
+ --MS_Mac_OE_3071477847_720252_MIME_Part--
+
+ --MS_Mac_OE_3071477847_720252_MIME_Part
+ Content-type: image/gif; name="xx.gif";
+ Content-disposition: attachment
+ Content-transfer-encoding: base64
+
+ Some removed base64 encoded chars.
+
+ --MS_Mac_OE_3071477847_720252_MIME_Part--
+
+ """)
+
+ def test_same_boundary_inner_outer(self):
+ # XXX better would be to actually detect the duplicate.
+ msg = self._str_msg(self.dup_boundary_msg)
+ inner = msg.get_payload(0)
+ self.assertTrue(hasattr(inner, 'defects'))
+ self.assertEqual(len(self.get_defects(inner)), 1)
+ self.assertTrue(isinstance(self.get_defects(inner)[0],
+ errors.StartBoundaryNotFoundDefect))
+
+ def test_same_boundary_inner_outer_raises_on_defect(self):
+ with self.assertRaises(errors.StartBoundaryNotFoundDefect):
+ self._str_msg(self.dup_boundary_msg,
+ policy=self.policy.clone(raise_on_defect=True))
+
+ no_boundary_msg = textwrap.dedent("""\
+ Date: Fri, 6 Apr 2001 09:23:06 -0800 (GMT-0800)
+ From: foobar
+ Subject: broken mail
+ MIME-Version: 1.0
+ Content-Type: multipart/report; report-type=delivery-status;
+
+ --JAB03225.986577786/zinfandel.lacita.com
+
+ One part
+
+ --JAB03225.986577786/zinfandel.lacita.com
+ Content-Type: message/delivery-status
+
+ Header: Another part
+
+ --JAB03225.986577786/zinfandel.lacita.com--
+ """)
+
+ def test_multipart_no_boundary(self):
+ msg = self._str_msg(self.no_boundary_msg)
+ self.assertTrue(isinstance(msg.get_payload(), str))
+ self.assertEqual(len(self.get_defects(msg)), 2)
+ self.assertTrue(isinstance(self.get_defects(msg)[0],
+ errors.NoBoundaryInMultipartDefect))
+ self.assertTrue(isinstance(self.get_defects(msg)[1],
+ errors.MultipartInvariantViolationDefect))
+
+ def test_multipart_no_boundary_raise_on_defect(self):
+ with self.assertRaises(errors.NoBoundaryInMultipartDefect):
+ self._str_msg(self.no_boundary_msg,
+ policy=self.policy.clone(raise_on_defect=True))
+
+ multipart_msg = textwrap.dedent("""\
+ Date: Wed, 14 Nov 2007 12:56:23 GMT
+ From: foo@bar.invalid
+ To: foo@bar.invalid
+ Subject: Content-Transfer-Encoding: base64 and multipart
+ MIME-Version: 1.0
+ Content-Type: multipart/mixed;
+ boundary="===============3344438784458119861=="{}
+
+ --===============3344438784458119861==
+ Content-Type: text/plain
+
+ Test message
+
+ --===============3344438784458119861==
+ Content-Type: application/octet-stream
+ Content-Transfer-Encoding: base64
+
+ YWJj
+
+ --===============3344438784458119861==--
+ """)
+
+ def test_multipart_invalid_cte(self):
+ msg = self._str_msg(
+ self.multipart_msg.format("\nContent-Transfer-Encoding: base64"))
+ self.assertEqual(len(self.get_defects(msg)), 1)
+ self.assertIsInstance(self.get_defects(msg)[0],
+ errors.InvalidMultipartContentTransferEncodingDefect)
+
+ def test_multipart_invalid_cte_raise_on_defect(self):
+ with self.assertRaises(
+ errors.InvalidMultipartContentTransferEncodingDefect):
+ self._str_msg(
+ self.multipart_msg.format(
+ "\nContent-Transfer-Encoding: base64"),
+ policy=self.policy.clone(raise_on_defect=True))
+
+ def test_multipart_no_cte_no_defect(self):
+ msg = self._str_msg(self.multipart_msg.format(''))
+ self.assertEqual(len(self.get_defects(msg)), 0)
+
+ def test_multipart_valid_cte_no_defect(self):
+ for cte in ('7bit', '8bit', 'BINary'):
+ msg = self._str_msg(
+ self.multipart_msg.format("\nContent-Transfer-Encoding: "+cte))
+ self.assertEqual(len(self.get_defects(msg)), 0, "cte="+cte)
+
+ lying_multipart_msg = textwrap.dedent("""\
+ From: "Allison Dunlap" <xxx@example.com>
+ To: yyy@example.com
+ Subject: 64423
+ Date: Sun, 11 Jul 2004 16:09:27 -0300
+ MIME-Version: 1.0
+ Content-Type: multipart/alternative;
+
+ Blah blah blah
+ """)
+
+ def test_lying_multipart(self):
+ msg = self._str_msg(self.lying_multipart_msg)
+ self.assertTrue(hasattr(msg, 'defects'))
+ self.assertEqual(len(self.get_defects(msg)), 2)
+ self.assertTrue(isinstance(self.get_defects(msg)[0],
+ errors.NoBoundaryInMultipartDefect))
+ self.assertTrue(isinstance(self.get_defects(msg)[1],
+ errors.MultipartInvariantViolationDefect))
+
+ def test_lying_multipart_raise_on_defect(self):
+ with self.assertRaises(errors.NoBoundaryInMultipartDefect):
+ self._str_msg(self.lying_multipart_msg,
+ policy=self.policy.clone(raise_on_defect=True))
+
+ missing_start_boundary_msg = textwrap.dedent("""\
+ Content-Type: multipart/mixed; boundary="AAA"
+ From: Mail Delivery Subsystem <xxx@example.com>
+ To: yyy@example.com
+
+ --AAA
+
+ Stuff
+
+ --AAA
+ Content-Type: message/rfc822
+
+ From: webmaster@python.org
+ To: zzz@example.com
+ Content-Type: multipart/mixed; boundary="BBB"
+
+ --BBB--
+
+ --AAA--
+
+ """)
+
+ def test_missing_start_boundary(self):
+ # The message structure is:
+ #
+ # multipart/mixed
+ # text/plain
+ # message/rfc822
+ # multipart/mixed [*]
+ #
+ # [*] This message is missing its start boundary
+ outer = self._str_msg(self.missing_start_boundary_msg)
+ bad = outer.get_payload(1).get_payload(0)
+ self.assertEqual(len(self.get_defects(bad)), 1)
+ self.assertTrue(isinstance(self.get_defects(bad)[0],
+ errors.StartBoundaryNotFoundDefect))
+
+ def test_missing_start_boundary_raise_on_defect(self):
+ with self.assertRaises(errors.StartBoundaryNotFoundDefect):
+ self._str_msg(self.missing_start_boundary_msg,
+ policy=self.policy.clone(raise_on_defect=True))
+
+ def test_first_line_is_continuation_header(self):
+ msg = self._str_msg(' Line 1\nSubject: test\n\nbody')
+ self.assertEqual(msg.keys(), ['Subject'])
+ self.assertEqual(msg.get_payload(), 'body')
+ self.assertEqual(len(self.get_defects(msg)), 1)
+ self.assertDefectsEqual(self.get_defects(msg),
+ [errors.FirstHeaderLineIsContinuationDefect])
+ self.assertEqual(self.get_defects(msg)[0].line, ' Line 1\n')
+
+ def test_first_line_is_continuation_header_raise_on_defect(self):
+ with self.assertRaises(errors.FirstHeaderLineIsContinuationDefect):
+ self._str_msg(' Line 1\nSubject: test\n\nbody\n',
+ policy=self.policy.clone(raise_on_defect=True))
+
+ def test_missing_header_body_separator(self):
+ # Our heuristic if we see a line that doesn't look like a header (no
+ # leading whitespace but no ':') is to assume that the blank line that
+ # separates the header from the body is missing, and to stop parsing
+ # headers and start parsing the body.
+ msg = self._str_msg('Subject: test\nnot a header\nTo: abc\n\nb\n')
+ self.assertEqual(msg.keys(), ['Subject'])
+ self.assertEqual(msg.get_payload(), 'not a header\nTo: abc\n\nb\n')
+ self.assertDefectsEqual(self.get_defects(msg),
+ [errors.MissingHeaderBodySeparatorDefect])
+
+ def test_missing_header_body_separator_raise_on_defect(self):
+ with self.assertRaises(errors.MissingHeaderBodySeparatorDefect):
+ self._str_msg('Subject: test\nnot a header\nTo: abc\n\nb\n',
+ policy=self.policy.clone(raise_on_defect=True))
+
+ badly_padded_base64_payload = textwrap.dedent("""\
+ Subject: test
+ MIME-Version: 1.0
+ Content-Type: text/plain; charset="utf-8"
+ Content-Transfer-Encoding: base64
+
+ dmk
+ """)
+
+ def test_bad_padding_in_base64_payload(self):
+ msg = self._str_msg(self.badly_padded_base64_payload)
+ self.assertEqual(msg.get_payload(decode=True), b'vi')
+ self.assertDefectsEqual(self.get_defects(msg),
+ [errors.InvalidBase64PaddingDefect])
+
+ def test_bad_padding_in_base64_payload_raise_on_defect(self):
+ msg = self._str_msg(self.badly_padded_base64_payload,
+ policy=self.policy.clone(raise_on_defect=True))
+ with self.assertRaises(errors.InvalidBase64PaddingDefect):
+ msg.get_payload(decode=True)
+
+ invalid_chars_in_base64_payload = textwrap.dedent("""\
+ Subject: test
+ MIME-Version: 1.0
+ Content-Type: text/plain; charset="utf-8"
+ Content-Transfer-Encoding: base64
+
+ dm\x01k===
+ """)
+
+ def test_invalid_chars_in_base64_payload(self):
+ msg = self._str_msg(self.invalid_chars_in_base64_payload)
+ self.assertEqual(msg.get_payload(decode=True), b'vi')
+ self.assertDefectsEqual(self.get_defects(msg),
+ [errors.InvalidBase64CharactersDefect])
+
+ def test_invalid_chars_in_base64_payload_raise_on_defect(self):
+ msg = self._str_msg(self.invalid_chars_in_base64_payload,
+ policy=self.policy.clone(raise_on_defect=True))
+ with self.assertRaises(errors.InvalidBase64CharactersDefect):
+ msg.get_payload(decode=True)
+
+
+class TestMessageDefectDetection(TestMessageDefectDetectionBase, TestEmailBase):
+
+ def get_defects(self, obj):
+ return obj.defects
+
+
+class TestMessageDefectDetectionCapture(TestMessageDefectDetectionBase,
+ TestEmailBase):
+
+ class CapturePolicy(Compat32):
+ captured = None
+ def register_defect(self, obj, defect):
+ self.captured.append(defect)
+
+ def setUp(self):
+ self.policy = self.CapturePolicy(captured=list())
+
+ def get_defects(self, obj):
+ return self.policy.captured
+
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/Lib/test/test_email/test_email.py b/Lib/test/test_email/test_email.py
index c04952c..5131b65 100644
--- a/Lib/test/test_email/test_email.py
+++ b/Lib/test/test_email/test_email.py
@@ -513,6 +513,7 @@ class TestMessageAPI(TestEmailBase):
eq(msg.values(), ['One Hundred', 'Twenty', 'Three', 'Eleven'])
self.assertRaises(KeyError, msg.replace_header, 'Fourth', 'Missing')
+ # test_defect_handling:test_invalid_chars_in_base64_payload
def test_broken_base64_payload(self):
x = 'AwDp0P7//y6LwKEAcPa/6Q=9'
msg = Message()
@@ -520,7 +521,10 @@ class TestMessageAPI(TestEmailBase):
msg['content-transfer-encoding'] = 'base64'
msg.set_payload(x)
self.assertEqual(msg.get_payload(decode=True),
- bytes(x, 'raw-unicode-escape'))
+ (b'\x03\x00\xe9\xd0\xfe\xff\xff.\x8b\xc0'
+ b'\xa1\x00p\xf6\xbf\xe9\x0f'))
+ self.assertIsInstance(msg.defects[0],
+ errors.InvalidBase64CharactersDefect)
def test_broken_unicode_payload(self):
# This test improves coverage but is not a compliance test.
@@ -1815,7 +1819,7 @@ class TestNonConformant(TestEmailBase):
eq(msg.get_content_maintype(), 'text')
eq(msg.get_content_subtype(), 'plain')
- # test_parser.TestMessageDefectDetectionBase
+ # test_defect_handling
def test_same_boundary_inner_outer(self):
unless = self.assertTrue
msg = self._msgobj('msg_15.txt')
@@ -1826,7 +1830,7 @@ class TestNonConformant(TestEmailBase):
unless(isinstance(inner.defects[0],
errors.StartBoundaryNotFoundDefect))
- # test_parser.TestMessageDefectDetectionBase
+ # test_defect_handling
def test_multipart_no_boundary(self):
unless = self.assertTrue
msg = self._msgobj('msg_25.txt')
@@ -1860,7 +1864,7 @@ class TestNonConformant(TestEmailBase):
--===============3344438784458119861==--
""")
- # test_parser.TestMessageDefectDetectionBase
+ # test_defect_handling
def test_multipart_invalid_cte(self):
msg = self._str_msg(
self.multipart_msg.format("\nContent-Transfer-Encoding: base64"))
@@ -1868,12 +1872,12 @@ class TestNonConformant(TestEmailBase):
self.assertIsInstance(msg.defects[0],
errors.InvalidMultipartContentTransferEncodingDefect)
- # test_parser.TestMessageDefectDetectionBase
+ # test_defect_handling
def test_multipart_no_cte_no_defect(self):
msg = self._str_msg(self.multipart_msg.format(''))
self.assertEqual(len(msg.defects), 0)
- # test_parser.TestMessageDefectDetectionBase
+ # test_defect_handling
def test_multipart_valid_cte_no_defect(self):
for cte in ('7bit', '8bit', 'BINary'):
msg = self._str_msg(
@@ -1930,7 +1934,7 @@ Subject: here's something interesting
counter to RFC 2822, there's no separating newline here
""")
- # test_parser.TestMessageDefectDetectionBase
+ # test_defect_handling
def test_lying_multipart(self):
unless = self.assertTrue
msg = self._msgobj('msg_41.txt')
@@ -1941,7 +1945,7 @@ counter to RFC 2822, there's no separating newline here
unless(isinstance(msg.defects[1],
errors.MultipartInvariantViolationDefect))
- # test_parser.TestMessageDefectDetectionBase
+ # test_defect_handling
def test_missing_start_boundary(self):
outer = self._msgobj('msg_42.txt')
# The message structure is:
@@ -1957,7 +1961,7 @@ counter to RFC 2822, there's no separating newline here
self.assertTrue(isinstance(bad.defects[0],
errors.StartBoundaryNotFoundDefect))
- # test_parser.TestMessageDefectDetectionBase
+ # test_defect_handling
def test_first_line_is_continuation_header(self):
eq = self.assertEqual
m = ' Line 1\nSubject: test\n\nbody'
@@ -3271,15 +3275,19 @@ class Test8BitBytesHandling(unittest.TestCase):
self.assertEqual(msg.get_payload(decode=True),
'pöstál\n'.encode('utf-8'))
+ # test_defect_handling:test_invalid_chars_in_base64_payload
def test_8bit_in_base64_body(self):
- # Sticking an 8bit byte in a base64 block makes it undecodable by
- # normal means, so the block is returned undecoded, but as bytes.
+ # If we get 8bit bytes in a base64 body, we can just ignore them
+ # as being outside the base64 alphabet and decode anyway. But
+ # we register a defect.
m = self.bodytest_msg.format(charset='utf-8',
cte='base64',
bodyline='cMO2c3RhbAá=').encode('utf-8')
msg = email.message_from_bytes(m)
self.assertEqual(msg.get_payload(decode=True),
- 'cMO2c3RhbAá=\n'.encode('utf-8'))
+ 'pöstal'.encode('utf-8'))
+ self.assertIsInstance(msg.defects[0],
+ errors.InvalidBase64CharactersDefect)
def test_8bit_in_uuencode_body(self):
# Sticking an 8bit byte in a uuencode block makes it undecodable by
diff --git a/Lib/test/test_email/test_parser.py b/Lib/test/test_email/test_parser.py
index f58e7c1..3abd11a 100644
--- a/Lib/test/test_email/test_parser.py
+++ b/Lib/test/test_email/test_parser.py
@@ -1,9 +1,6 @@
import io
import email
-import textwrap
import unittest
-from email._policybase import Compat32
-from email import errors
from email.message import Message
from test.test_email import TestEmailBase
@@ -35,258 +32,5 @@ class TestCustomMessage(TestEmailBase):
# XXX add tests for other functions that take Message arg.
-class TestMessageDefectDetectionBase:
-
- dup_boundary_msg = textwrap.dedent("""\
- Subject: XX
- From: xx@xx.dk
- To: XX
- Mime-version: 1.0
- Content-type: multipart/mixed;
- boundary="MS_Mac_OE_3071477847_720252_MIME_Part"
-
- --MS_Mac_OE_3071477847_720252_MIME_Part
- Content-type: multipart/alternative;
- boundary="MS_Mac_OE_3071477847_720252_MIME_Part"
-
- --MS_Mac_OE_3071477847_720252_MIME_Part
- Content-type: text/plain; charset="ISO-8859-1"
- Content-transfer-encoding: quoted-printable
-
- text
-
- --MS_Mac_OE_3071477847_720252_MIME_Part
- Content-type: text/html; charset="ISO-8859-1"
- Content-transfer-encoding: quoted-printable
-
- <HTML></HTML>
-
- --MS_Mac_OE_3071477847_720252_MIME_Part--
-
- --MS_Mac_OE_3071477847_720252_MIME_Part
- Content-type: image/gif; name="xx.gif";
- Content-disposition: attachment
- Content-transfer-encoding: base64
-
- Some removed base64 encoded chars.
-
- --MS_Mac_OE_3071477847_720252_MIME_Part--
-
- """)
-
- def test_same_boundary_inner_outer(self):
- # XXX better would be to actually detect the duplicate.
- msg = self._str_msg(self.dup_boundary_msg)
- inner = msg.get_payload(0)
- self.assertTrue(hasattr(inner, 'defects'))
- self.assertEqual(len(self.get_defects(inner)), 1)
- self.assertTrue(isinstance(self.get_defects(inner)[0],
- errors.StartBoundaryNotFoundDefect))
-
- def test_same_boundary_inner_outer_raises_on_defect(self):
- with self.assertRaises(errors.StartBoundaryNotFoundDefect):
- self._str_msg(self.dup_boundary_msg,
- policy=self.policy.clone(raise_on_defect=True))
-
- no_boundary_msg = textwrap.dedent("""\
- Date: Fri, 6 Apr 2001 09:23:06 -0800 (GMT-0800)
- From: foobar
- Subject: broken mail
- MIME-Version: 1.0
- Content-Type: multipart/report; report-type=delivery-status;
-
- --JAB03225.986577786/zinfandel.lacita.com
-
- One part
-
- --JAB03225.986577786/zinfandel.lacita.com
- Content-Type: message/delivery-status
-
- Header: Another part
-
- --JAB03225.986577786/zinfandel.lacita.com--
- """)
-
- def test_multipart_no_boundary(self):
- msg = self._str_msg(self.no_boundary_msg)
- self.assertTrue(isinstance(msg.get_payload(), str))
- self.assertEqual(len(self.get_defects(msg)), 2)
- self.assertTrue(isinstance(self.get_defects(msg)[0],
- errors.NoBoundaryInMultipartDefect))
- self.assertTrue(isinstance(self.get_defects(msg)[1],
- errors.MultipartInvariantViolationDefect))
-
- def test_multipart_no_boundary_raise_on_defect(self):
- with self.assertRaises(errors.NoBoundaryInMultipartDefect):
- self._str_msg(self.no_boundary_msg,
- policy=self.policy.clone(raise_on_defect=True))
-
- multipart_msg = textwrap.dedent("""\
- Date: Wed, 14 Nov 2007 12:56:23 GMT
- From: foo@bar.invalid
- To: foo@bar.invalid
- Subject: Content-Transfer-Encoding: base64 and multipart
- MIME-Version: 1.0
- Content-Type: multipart/mixed;
- boundary="===============3344438784458119861=="{}
-
- --===============3344438784458119861==
- Content-Type: text/plain
-
- Test message
-
- --===============3344438784458119861==
- Content-Type: application/octet-stream
- Content-Transfer-Encoding: base64
-
- YWJj
-
- --===============3344438784458119861==--
- """)
-
- def test_multipart_invalid_cte(self):
- msg = self._str_msg(
- self.multipart_msg.format("\nContent-Transfer-Encoding: base64"))
- self.assertEqual(len(self.get_defects(msg)), 1)
- self.assertIsInstance(self.get_defects(msg)[0],
- errors.InvalidMultipartContentTransferEncodingDefect)
-
- def test_multipart_invalid_cte_raise_on_defect(self):
- with self.assertRaises(
- errors.InvalidMultipartContentTransferEncodingDefect):
- self._str_msg(
- self.multipart_msg.format(
- "\nContent-Transfer-Encoding: base64"),
- policy=self.policy.clone(raise_on_defect=True))
-
- def test_multipart_no_cte_no_defect(self):
- msg = self._str_msg(self.multipart_msg.format(''))
- self.assertEqual(len(self.get_defects(msg)), 0)
-
- def test_multipart_valid_cte_no_defect(self):
- for cte in ('7bit', '8bit', 'BINary'):
- msg = self._str_msg(
- self.multipart_msg.format("\nContent-Transfer-Encoding: "+cte))
- self.assertEqual(len(self.get_defects(msg)), 0, "cte="+cte)
-
- lying_multipart_msg = textwrap.dedent("""\
- From: "Allison Dunlap" <xxx@example.com>
- To: yyy@example.com
- Subject: 64423
- Date: Sun, 11 Jul 2004 16:09:27 -0300
- MIME-Version: 1.0
- Content-Type: multipart/alternative;
-
- Blah blah blah
- """)
-
- def test_lying_multipart(self):
- msg = self._str_msg(self.lying_multipart_msg)
- self.assertTrue(hasattr(msg, 'defects'))
- self.assertEqual(len(self.get_defects(msg)), 2)
- self.assertTrue(isinstance(self.get_defects(msg)[0],
- errors.NoBoundaryInMultipartDefect))
- self.assertTrue(isinstance(self.get_defects(msg)[1],
- errors.MultipartInvariantViolationDefect))
-
- def test_lying_multipart_raise_on_defect(self):
- with self.assertRaises(errors.NoBoundaryInMultipartDefect):
- self._str_msg(self.lying_multipart_msg,
- policy=self.policy.clone(raise_on_defect=True))
-
- missing_start_boundary_msg = textwrap.dedent("""\
- Content-Type: multipart/mixed; boundary="AAA"
- From: Mail Delivery Subsystem <xxx@example.com>
- To: yyy@example.com
-
- --AAA
-
- Stuff
-
- --AAA
- Content-Type: message/rfc822
-
- From: webmaster@python.org
- To: zzz@example.com
- Content-Type: multipart/mixed; boundary="BBB"
-
- --BBB--
-
- --AAA--
-
- """)
-
- def test_missing_start_boundary(self):
- # The message structure is:
- #
- # multipart/mixed
- # text/plain
- # message/rfc822
- # multipart/mixed [*]
- #
- # [*] This message is missing its start boundary
- outer = self._str_msg(self.missing_start_boundary_msg)
- bad = outer.get_payload(1).get_payload(0)
- self.assertEqual(len(self.get_defects(bad)), 1)
- self.assertTrue(isinstance(self.get_defects(bad)[0],
- errors.StartBoundaryNotFoundDefect))
-
- def test_missing_start_boundary_raise_on_defect(self):
- with self.assertRaises(errors.StartBoundaryNotFoundDefect):
- self._str_msg(self.missing_start_boundary_msg,
- policy=self.policy.clone(raise_on_defect=True))
-
- def test_first_line_is_continuation_header(self):
- msg = self._str_msg(' Line 1\nSubject: test\n\nbody')
- self.assertEqual(msg.keys(), ['Subject'])
- self.assertEqual(msg.get_payload(), 'body')
- self.assertEqual(len(self.get_defects(msg)), 1)
- self.assertDefectsEqual(self.get_defects(msg),
- [errors.FirstHeaderLineIsContinuationDefect])
- self.assertEqual(self.get_defects(msg)[0].line, ' Line 1\n')
-
- def test_first_line_is_continuation_header_raise_on_defect(self):
- with self.assertRaises(errors.FirstHeaderLineIsContinuationDefect):
- self._str_msg(' Line 1\nSubject: test\n\nbody\n',
- policy=self.policy.clone(raise_on_defect=True))
-
- def test_missing_header_body_separator(self):
- # Our heuristic if we see a line that doesn't look like a header (no
- # leading whitespace but no ':') is to assume that the blank line that
- # separates the header from the body is missing, and to stop parsing
- # headers and start parsing the body.
- msg = self._str_msg('Subject: test\nnot a header\nTo: abc\n\nb\n')
- self.assertEqual(msg.keys(), ['Subject'])
- self.assertEqual(msg.get_payload(), 'not a header\nTo: abc\n\nb\n')
- self.assertDefectsEqual(self.get_defects(msg),
- [errors.MissingHeaderBodySeparatorDefect])
-
- def test_missing_header_body_separator_raise_on_defect(self):
- with self.assertRaises(errors.MissingHeaderBodySeparatorDefect):
- self._str_msg('Subject: test\nnot a header\nTo: abc\n\nb\n',
- policy=self.policy.clone(raise_on_defect=True))
-
-
-class TestMessageDefectDetection(TestMessageDefectDetectionBase, TestEmailBase):
-
- def get_defects(self, obj):
- return obj.defects
-
-
-class TestMessageDefectDetectionCapture(TestMessageDefectDetectionBase,
- TestEmailBase):
-
- class CapturePolicy(Compat32):
- captured = None
- def register_defect(self, obj, defect):
- self.captured.append(defect)
-
- def setUp(self):
- self.policy = self.CapturePolicy(captured=list())
-
- def get_defects(self, obj):
- return self.policy.captured
-
-
if __name__ == '__main__':
unittest.main()