summaryrefslogtreecommitdiffstats
path: root/Lib/test/test_mailbox.py
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/test/test_mailbox.py')
-rw-r--r--Lib/test/test_mailbox.py263
1 files changed, 217 insertions, 46 deletions
diff --git a/Lib/test/test_mailbox.py b/Lib/test/test_mailbox.py
index dd13c72..45abcc8 100644
--- a/Lib/test/test_mailbox.py
+++ b/Lib/test/test_mailbox.py
@@ -7,8 +7,10 @@ import email
import email.message
import re
import io
+import tempfile
from test import support
import unittest
+import textwrap
import mailbox
import glob
try:
@@ -21,16 +23,16 @@ class TestBase(unittest.TestCase):
def _check_sample(self, msg):
# Inspect a mailbox.Message representation of the sample message
- self.assertTrue(isinstance(msg, email.message.Message))
- self.assertTrue(isinstance(msg, mailbox.Message))
+ self.assertIsInstance(msg, email.message.Message)
+ self.assertIsInstance(msg, mailbox.Message)
for key, value in _sample_headers.items():
self.assertIn(value, msg.get_all(key))
self.assertTrue(msg.is_multipart())
self.assertEqual(len(msg.get_payload()), len(_sample_payloads))
for i, payload in enumerate(_sample_payloads):
part = msg.get_payload(i)
- self.assertTrue(isinstance(part, email.message.Message))
- self.assertFalse(isinstance(part, mailbox.Message))
+ self.assertIsInstance(part, email.message.Message)
+ self.assertNotIsInstance(part, mailbox.Message)
self.assertEqual(part.get_payload(), payload)
def _delete_recursively(self, target):
@@ -48,6 +50,8 @@ class TestBase(unittest.TestCase):
class TestMailbox(TestBase):
+ maxDiff = None
+
_factory = None # Overridden by subclasses to reuse tests
_template = 'From: foo\n\n%s'
@@ -69,14 +73,124 @@ class TestMailbox(TestBase):
self.assertEqual(len(self._box), 2)
keys.append(self._box.add(email.message_from_string(_sample_message)))
self.assertEqual(len(self._box), 3)
- keys.append(self._box.add(io.StringIO(_sample_message)))
+ keys.append(self._box.add(io.BytesIO(_bytes_sample_message)))
self.assertEqual(len(self._box), 4)
keys.append(self._box.add(_sample_message))
self.assertEqual(len(self._box), 5)
+ keys.append(self._box.add(_bytes_sample_message))
+ self.assertEqual(len(self._box), 6)
+ with self.assertWarns(DeprecationWarning):
+ keys.append(self._box.add(
+ io.TextIOWrapper(io.BytesIO(_bytes_sample_message))))
+ self.assertEqual(len(self._box), 7)
self.assertEqual(self._box.get_string(keys[0]), self._template % 0)
- for i in (1, 2, 3, 4):
+ for i in (1, 2, 3, 4, 5, 6):
self._check_sample(self._box[keys[i]])
+ _nonascii_msg = textwrap.dedent("""\
+ From: foo
+ Subject: Falinaptár házhozszállítással. Már rendeltél?
+
+ 0
+ """)
+
+ def test_add_invalid_8bit_bytes_header(self):
+ key = self._box.add(self._nonascii_msg.encode('latin1'))
+ self.assertEqual(len(self._box), 1)
+ self.assertEqual(self._box.get_bytes(key),
+ self._nonascii_msg.encode('latin1'))
+
+ def test_invalid_nonascii_header_as_string(self):
+ subj = self._nonascii_msg.splitlines()[1]
+ key = self._box.add(subj.encode('latin1'))
+ self.assertEqual(self._box.get_string(key),
+ 'Subject: =?unknown-8bit?b?RmFsaW5hcHThciBo4Xpob3pzeuFsbO104XNz'
+ 'YWwuIE3hciByZW5kZWx06Ww/?=\n\n')
+
+ def test_add_nonascii_string_header_raises(self):
+ with self.assertRaisesRegex(ValueError, "ASCII-only"):
+ self._box.add(self._nonascii_msg)
+ self._box.flush()
+ self.assertEqual(len(self._box), 0)
+ self.assertMailboxEmpty()
+
+ def test_add_that_raises_leaves_mailbox_empty(self):
+ # XXX This test will start failing when Message learns to handle
+ # non-ASCII string headers, and a different internal failure will
+ # need to be found or manufactured.
+ with self.assertRaises(ValueError):
+ self._box.add(email.message_from_string("From: Alphöso"))
+ self.assertEqual(len(self._box), 0)
+ self._box.close()
+ self.assertMailboxEmpty()
+
+ _non_latin_bin_msg = textwrap.dedent("""\
+ From: foo@bar.com
+ To: báz
+ Subject: Maintenant je vous présente mon collègue, le pouf célèbre
+ \tJean de Baddie
+ Mime-Version: 1.0
+ Content-Type: text/plain; charset="utf-8"
+ Content-Transfer-Encoding: 8bit
+
+ Да, они летят.
+ """).encode('utf-8')
+
+ def test_add_8bit_body(self):
+ key = self._box.add(self._non_latin_bin_msg)
+ self.assertEqual(self._box.get_bytes(key),
+ self._non_latin_bin_msg)
+ with self._box.get_file(key) as f:
+ self.assertEqual(f.read(),
+ self._non_latin_bin_msg.replace(b'\n',
+ os.linesep.encode()))
+ self.assertEqual(self._box[key].get_payload(),
+ "Да, они летят.\n")
+
+ def test_add_binary_file(self):
+ with tempfile.TemporaryFile('wb+') as f:
+ f.write(_bytes_sample_message)
+ f.seek(0)
+ key = self._box.add(f)
+ # See issue 11062
+ if not isinstance(self._box, mailbox.Babyl):
+ self.assertEqual(self._box.get_bytes(key).split(b'\n'),
+ _bytes_sample_message.split(b'\n'))
+
+ def test_add_binary_nonascii_file(self):
+ with tempfile.TemporaryFile('wb+') as f:
+ f.write(self._non_latin_bin_msg)
+ f.seek(0)
+ key = self._box.add(f)
+ # See issue 11062
+ if not isinstance(self._box, mailbox.Babyl):
+ self.assertEqual(self._box.get_bytes(key).split(b'\n'),
+ self._non_latin_bin_msg.split(b'\n'))
+
+ def test_add_text_file_warns(self):
+ with tempfile.TemporaryFile('w+') as f:
+ f.write(_sample_message)
+ f.seek(0)
+ with self.assertWarns(DeprecationWarning):
+ key = self._box.add(f)
+ # See issue 11062
+ if not isinstance(self._box, mailbox.Babyl):
+ self.assertEqual(self._box.get_bytes(key).split(b'\n'),
+ _bytes_sample_message.split(b'\n'))
+
+ def test_add_StringIO_warns(self):
+ with self.assertWarns(DeprecationWarning):
+ key = self._box.add(io.StringIO(self._template % "0"))
+ self.assertEqual(self._box.get_string(key), self._template % "0")
+
+ def test_add_nonascii_StringIO_raises(self):
+ with self.assertWarns(DeprecationWarning):
+ with self.assertRaisesRegex(ValueError, "ASCII-only"):
+ self._box.add(io.StringIO(self._nonascii_msg))
+ self.assertEqual(len(self._box), 0)
+ self._box.close()
+ self.assertMailboxEmpty()
+
def test_remove(self):
# Remove messages using remove()
self._test_remove_or_delitem(self._box.remove)
@@ -149,27 +263,38 @@ class TestMailbox(TestBase):
key0 = self._box.add(self._template % 0)
key1 = self._box.add(_sample_message)
msg0 = self._box.get_message(key0)
- self.assertTrue(isinstance(msg0, mailbox.Message))
+ self.assertIsInstance(msg0, mailbox.Message)
self.assertEqual(msg0['from'], 'foo')
self.assertEqual(msg0.get_payload(), '0')
self._check_sample(self._box.get_message(key1))
+ def test_get_bytes(self):
+ # Get bytes representations of messages
+ key0 = self._box.add(self._template % 0)
+ key1 = self._box.add(_sample_message)
+ self.assertEqual(self._box.get_bytes(key0),
+ (self._template % 0).encode('ascii'))
+ self.assertEqual(self._box.get_bytes(key1), _bytes_sample_message)
+
def test_get_string(self):
# Get string representations of messages
key0 = self._box.add(self._template % 0)
key1 = self._box.add(_sample_message)
self.assertEqual(self._box.get_string(key0), self._template % 0)
- self.assertEqual(self._box.get_string(key1), _sample_message)
+ self.assertEqual(self._box.get_string(key1).split('\n'),
+ _sample_message.split('\n'))
def test_get_file(self):
# Get file representations of messages
key0 = self._box.add(self._template % 0)
key1 = self._box.add(_sample_message)
- data0 = self._box.get_file(key0).read()
- data1 = self._box.get_file(key1).read()
- self.assertEqual(data0.replace(os.linesep, '\n'),
+ with self._box.get_file(key0) as file:
+ data0 = file.read()
+ with self._box.get_file(key1) as file:
+ data1 = file.read()
+ self.assertEqual(data0.decode('ascii').replace(os.linesep, '\n'),
self._template % 0)
- self.assertEqual(data1.replace(os.linesep, '\n'),
+ self.assertEqual(data1.decode('ascii').replace(os.linesep, '\n'),
_sample_message)
def test_iterkeys(self):
@@ -403,11 +528,12 @@ class TestMailbox(TestBase):
def test_dump_message(self):
# Write message representations to disk
for input in (email.message_from_string(_sample_message),
- _sample_message, io.StringIO(_sample_message)):
- output = io.StringIO()
+ _sample_message, io.BytesIO(_bytes_sample_message)):
+ output = io.BytesIO()
self._box._dump_message(input, output)
- self.assertEqual(output.getvalue(), _sample_message)
- output = io.StringIO()
+ self.assertEqual(output.getvalue(),
+ _bytes_sample_message.replace(b'\n', os.linesep.encode()))
+ output = io.BytesIO()
self.assertRaises(TypeError,
lambda: self._box._dump_message(None, output))
@@ -437,6 +563,7 @@ class TestMailboxSuperclass(TestBase):
self.assertRaises(NotImplementedError, lambda: box.__getitem__(''))
self.assertRaises(NotImplementedError, lambda: box.get_message(''))
self.assertRaises(NotImplementedError, lambda: box.get_string(''))
+ self.assertRaises(NotImplementedError, lambda: box.get_bytes(''))
self.assertRaises(NotImplementedError, lambda: box.get_file(''))
self.assertRaises(NotImplementedError, lambda: '' in box)
self.assertRaises(NotImplementedError, lambda: box.__contains__(''))
@@ -460,6 +587,9 @@ class TestMaildir(TestMailbox):
if os.name in ('nt', 'os2') or sys.platform == 'cygwin':
self._box.colon = '!'
+ def assertMailboxEmpty(self):
+ self.assertEqual(os.listdir(os.path.join(self._path, 'tmp')), [])
+
def test_add_MM(self):
# Add a MaildirMessage instance
msg = mailbox.MaildirMessage(self._template % 0)
@@ -476,7 +606,7 @@ class TestMaildir(TestMailbox):
msg.set_flags('RF')
key = self._box.add(msg)
msg_returned = self._box.get_message(key)
- self.assertTrue(isinstance(msg_returned, mailbox.MaildirMessage))
+ self.assertIsInstance(msg_returned, mailbox.MaildirMessage)
self.assertEqual(msg_returned.get_subdir(), 'cur')
self.assertEqual(msg_returned.get_flags(), 'FR')
@@ -516,7 +646,7 @@ class TestMaildir(TestMailbox):
box = mailbox.Maildir(self._path, factory=FakeMessage)
box.colon = self._box.colon
msg2 = box.get_message(key)
- self.assertTrue(isinstance(msg2, FakeMessage))
+ self.assertIsInstance(msg2, FakeMessage)
def test_initialize_new(self):
# Initialize a non-existent mailbox
@@ -586,12 +716,10 @@ class TestMaildir(TestMailbox):
# Remove old files from 'tmp'
foo_path = os.path.join(self._path, 'tmp', 'foo')
bar_path = os.path.join(self._path, 'tmp', 'bar')
- f = open(foo_path, 'w')
- f.write("@")
- f.close()
- f = open(bar_path, 'w')
- f.write("@")
- f.close()
+ with open(foo_path, 'w') as f:
+ f.write("@")
+ with open(bar_path, 'w') as f:
+ f.write("@")
self._box.clean()
self.assertTrue(os.path.exists(foo_path))
self.assertTrue(os.path.exists(bar_path))
@@ -640,9 +768,9 @@ class TestMaildir(TestMailbox):
"Host name mismatch: '%s' should be '%s'" %
(groups[4], hostname))
previous_groups = groups
- tmp_file.write(_sample_message)
+ tmp_file.write(_bytes_sample_message)
tmp_file.seek(0)
- self.assertEqual(tmp_file.read(), _sample_message)
+ self.assertEqual(tmp_file.read(), _bytes_sample_message)
tmp_file.close()
file_count = len(os.listdir(os.path.join(self._path, "tmp")))
self.assertEqual(file_count, repetitions,
@@ -779,17 +907,28 @@ class TestMaildir(TestMailbox):
class _TestMboxMMDF(TestMailbox):
def tearDown(self):
+ super().tearDown()
self._box.close()
self._delete_recursively(self._path)
for lock_remnant in glob.glob(self._path + '.*'):
support.unlink(lock_remnant)
+ def assertMailboxEmpty(self):
+ with open(self._path) as f:
+ self.assertEqual(f.readlines(), [])
+
def test_add_from_string(self):
# Add a string starting with 'From ' to the mailbox
key = self._box.add('From foo@bar blah\nFrom: foo\n\n0')
self.assertEqual(self._box[key].get_from(), 'foo@bar blah')
self.assertEqual(self._box[key].get_payload(), '0')
+ def test_add_from_bytes(self):
+ # Add a byte string starting with 'From ' to the mailbox
+ key = self._box.add(b'From foo@bar blah\nFrom: foo\n\n0')
+ self.assertEqual(self._box[key].get_from(), 'foo@bar blah')
+ self.assertEqual(self._box[key].get_payload(), '0')
+
def test_add_mbox_or_mmdf_message(self):
# Add an mboxMessage or MMDFMessage
for class_ in (mailbox.mboxMessage, mailbox.MMDFMessage):
@@ -820,7 +959,8 @@ class _TestMboxMMDF(TestMailbox):
self._box._file.seek(0)
contents = self._box._file.read()
self._box.close()
- self.assertEqual(contents, open(self._path, 'r', newline='').read())
+ with open(self._path, 'rb') as f:
+ self.assertEqual(contents, f.read())
self._box = self._factory(self._path)
def test_lock_conflict(self):
@@ -899,6 +1039,9 @@ class TestMH(TestMailbox):
_factory = lambda self, path, factory=None: mailbox.MH(path, factory)
+ def assertMailboxEmpty(self):
+ self.assertEqual(os.listdir(self._path), ['.mh_sequences'])
+
def test_list_folders(self):
# List folders
self._box.add_folder('one')
@@ -972,6 +1115,13 @@ class TestMH(TestMailbox):
key0 = self._box.add(msg0)
refmsg0 = self._box.get_message(key0)
+ def test_issue7627(self):
+ msg0 = mailbox.MHMessage(self._template % 0)
+ key0 = self._box.add(msg0)
+ self._box.lock()
+ self._box.remove(key0)
+ self._box.unlock()
+
def test_pack(self):
# Pack the contents of the mailbox
msg0 = mailbox.MHMessage(self._template % 0)
@@ -1024,7 +1174,12 @@ class TestBabyl(TestMailbox):
_factory = lambda self, path, factory=None: mailbox.Babyl(path, factory)
+ def assertMailboxEmpty(self):
+ with open(self._path) as f:
+ self.assertEqual(f.readlines(), [])
+
def tearDown(self):
+ super().tearDown()
self._box.close()
self._delete_recursively(self._path)
for lock_remnant in glob.glob(self._path + '.*'):
@@ -1074,21 +1229,29 @@ class TestMessage(TestBase):
def test_initialize_with_file(self):
# Initialize based on contents of file
- f = open(self._path, 'w+')
- f.write(_sample_message)
- f.seek(0)
- msg = self._factory(f)
- self._post_initialize_hook(msg)
- self._check_sample(msg)
- f.close()
+ with open(self._path, 'w+') as f:
+ f.write(_sample_message)
+ f.seek(0)
+ msg = self._factory(f)
+ self._post_initialize_hook(msg)
+ self._check_sample(msg)
+
+ def test_initialize_with_binary_file(self):
+ # Initialize based on contents of binary file
+ with open(self._path, 'wb+') as f:
+ f.write(_bytes_sample_message)
+ f.seek(0)
+ msg = self._factory(f)
+ self._post_initialize_hook(msg)
+ self._check_sample(msg)
def test_initialize_with_nothing(self):
# Initialize without arguments
msg = self._factory()
self._post_initialize_hook(msg)
- self.assertTrue(isinstance(msg, email.message.Message))
- self.assertTrue(isinstance(msg, mailbox.Message))
- self.assertTrue(isinstance(msg, self._factory))
+ self.assertIsInstance(msg, email.message.Message)
+ self.assertIsInstance(msg, mailbox.Message)
+ self.assertIsInstance(msg, self._factory)
self.assertEqual(msg.keys(), [])
self.assertFalse(msg.is_multipart())
self.assertEqual(msg.get_payload(), None)
@@ -1358,6 +1521,14 @@ class TestMessageConversion(TestBase):
msg_plain = mailbox.Message(msg)
self._check_sample(msg_plain)
+ def test_x_from_bytes(self):
+ # Convert all formats to Message
+ for class_ in (mailbox.Message, mailbox.MaildirMessage,
+ mailbox.mboxMessage, mailbox.MHMessage,
+ mailbox.BabylMessage, mailbox.MMDFMessage):
+ msg = class_(_bytes_sample_message)
+ self._check_sample(msg)
+
def test_x_to_invalid(self):
# Convert all formats to an invalid format
for class_ in (mailbox.Message, mailbox.MaildirMessage,
@@ -1804,18 +1975,16 @@ class MaildirTestCase(unittest.TestCase):
filename = ".".join((str(t), str(pid), "myhostname", "mydomain"))
tmpname = os.path.join(self._dir, "tmp", filename)
newname = os.path.join(self._dir, dir, filename)
- fp = open(tmpname, "w")
- self._msgfiles.append(tmpname)
- if mbox:
- fp.write(FROM_)
- fp.write(DUMMY_MESSAGE)
- fp.close()
+ with open(tmpname, "w") as fp:
+ self._msgfiles.append(tmpname)
+ if mbox:
+ fp.write(FROM_)
+ fp.write(DUMMY_MESSAGE)
if hasattr(os, "link"):
os.link(tmpname, newname)
else:
- fp = open(newname, "w")
- fp.write(DUMMY_MESSAGE)
- fp.close()
+ with open(newname, "w") as fp:
+ fp.write(DUMMY_MESSAGE)
self._msgfiles.append(newname)
return tmpname
@@ -1905,6 +2074,8 @@ H4sICM2D1UIAA3RleHQAC8nILFYAokSFktSKEoW0zJxUPa7wzJIMhZLyfIWczLzUYj0uAHTs
--NMuMz9nt05w80d4+--
"""
+_bytes_sample_message = _sample_message.encode('ascii')
+
_sample_headers = {
"Return-Path":"<gkj@gregorykjohnson.com>",
"X-Original-To":"gkj+person@localhost",