diff options
Diffstat (limited to 'Lib/test/test_mailbox.py')
-rw-r--r-- | Lib/test/test_mailbox.py | 263 |
1 files changed, 217 insertions, 46 deletions
diff --git a/Lib/test/test_mailbox.py b/Lib/test/test_mailbox.py index dd13c72..45abcc8 100644 --- a/Lib/test/test_mailbox.py +++ b/Lib/test/test_mailbox.py @@ -7,8 +7,10 @@ import email import email.message import re import io +import tempfile from test import support import unittest +import textwrap import mailbox import glob try: @@ -21,16 +23,16 @@ class TestBase(unittest.TestCase): def _check_sample(self, msg): # Inspect a mailbox.Message representation of the sample message - self.assertTrue(isinstance(msg, email.message.Message)) - self.assertTrue(isinstance(msg, mailbox.Message)) + self.assertIsInstance(msg, email.message.Message) + self.assertIsInstance(msg, mailbox.Message) for key, value in _sample_headers.items(): self.assertIn(value, msg.get_all(key)) self.assertTrue(msg.is_multipart()) self.assertEqual(len(msg.get_payload()), len(_sample_payloads)) for i, payload in enumerate(_sample_payloads): part = msg.get_payload(i) - self.assertTrue(isinstance(part, email.message.Message)) - self.assertFalse(isinstance(part, mailbox.Message)) + self.assertIsInstance(part, email.message.Message) + self.assertNotIsInstance(part, mailbox.Message) self.assertEqual(part.get_payload(), payload) def _delete_recursively(self, target): @@ -48,6 +50,8 @@ class TestBase(unittest.TestCase): class TestMailbox(TestBase): + maxDiff = None + _factory = None # Overridden by subclasses to reuse tests _template = 'From: foo\n\n%s' @@ -69,14 +73,124 @@ class TestMailbox(TestBase): self.assertEqual(len(self._box), 2) keys.append(self._box.add(email.message_from_string(_sample_message))) self.assertEqual(len(self._box), 3) - keys.append(self._box.add(io.StringIO(_sample_message))) + keys.append(self._box.add(io.BytesIO(_bytes_sample_message))) self.assertEqual(len(self._box), 4) keys.append(self._box.add(_sample_message)) self.assertEqual(len(self._box), 5) + keys.append(self._box.add(_bytes_sample_message)) + self.assertEqual(len(self._box), 6) + with self.assertWarns(DeprecationWarning): + keys.append(self._box.add( + io.TextIOWrapper(io.BytesIO(_bytes_sample_message)))) + self.assertEqual(len(self._box), 7) self.assertEqual(self._box.get_string(keys[0]), self._template % 0) - for i in (1, 2, 3, 4): + for i in (1, 2, 3, 4, 5, 6): self._check_sample(self._box[keys[i]]) + _nonascii_msg = textwrap.dedent("""\ + From: foo + Subject: Falinaptár házhozszállítással. Már rendeltél? + + 0 + """) + + def test_add_invalid_8bit_bytes_header(self): + key = self._box.add(self._nonascii_msg.encode('latin1')) + self.assertEqual(len(self._box), 1) + self.assertEqual(self._box.get_bytes(key), + self._nonascii_msg.encode('latin1')) + + def test_invalid_nonascii_header_as_string(self): + subj = self._nonascii_msg.splitlines()[1] + key = self._box.add(subj.encode('latin1')) + self.assertEqual(self._box.get_string(key), + 'Subject: =?unknown-8bit?b?RmFsaW5hcHThciBo4Xpob3pzeuFsbO104XNz' + 'YWwuIE3hciByZW5kZWx06Ww/?=\n\n') + + def test_add_nonascii_string_header_raises(self): + with self.assertRaisesRegex(ValueError, "ASCII-only"): + self._box.add(self._nonascii_msg) + self._box.flush() + self.assertEqual(len(self._box), 0) + self.assertMailboxEmpty() + + def test_add_that_raises_leaves_mailbox_empty(self): + # XXX This test will start failing when Message learns to handle + # non-ASCII string headers, and a different internal failure will + # need to be found or manufactured. + with self.assertRaises(ValueError): + self._box.add(email.message_from_string("From: Alphöso")) + self.assertEqual(len(self._box), 0) + self._box.close() + self.assertMailboxEmpty() + + _non_latin_bin_msg = textwrap.dedent("""\ + From: foo@bar.com + To: báz + Subject: Maintenant je vous présente mon collègue, le pouf célèbre + \tJean de Baddie + Mime-Version: 1.0 + Content-Type: text/plain; charset="utf-8" + Content-Transfer-Encoding: 8bit + + Да, они летят. + """).encode('utf-8') + + def test_add_8bit_body(self): + key = self._box.add(self._non_latin_bin_msg) + self.assertEqual(self._box.get_bytes(key), + self._non_latin_bin_msg) + with self._box.get_file(key) as f: + self.assertEqual(f.read(), + self._non_latin_bin_msg.replace(b'\n', + os.linesep.encode())) + self.assertEqual(self._box[key].get_payload(), + "Да, они летят.\n") + + def test_add_binary_file(self): + with tempfile.TemporaryFile('wb+') as f: + f.write(_bytes_sample_message) + f.seek(0) + key = self._box.add(f) + # See issue 11062 + if not isinstance(self._box, mailbox.Babyl): + self.assertEqual(self._box.get_bytes(key).split(b'\n'), + _bytes_sample_message.split(b'\n')) + + def test_add_binary_nonascii_file(self): + with tempfile.TemporaryFile('wb+') as f: + f.write(self._non_latin_bin_msg) + f.seek(0) + key = self._box.add(f) + # See issue 11062 + if not isinstance(self._box, mailbox.Babyl): + self.assertEqual(self._box.get_bytes(key).split(b'\n'), + self._non_latin_bin_msg.split(b'\n')) + + def test_add_text_file_warns(self): + with tempfile.TemporaryFile('w+') as f: + f.write(_sample_message) + f.seek(0) + with self.assertWarns(DeprecationWarning): + key = self._box.add(f) + # See issue 11062 + if not isinstance(self._box, mailbox.Babyl): + self.assertEqual(self._box.get_bytes(key).split(b'\n'), + _bytes_sample_message.split(b'\n')) + + def test_add_StringIO_warns(self): + with self.assertWarns(DeprecationWarning): + key = self._box.add(io.StringIO(self._template % "0")) + self.assertEqual(self._box.get_string(key), self._template % "0") + + def test_add_nonascii_StringIO_raises(self): + with self.assertWarns(DeprecationWarning): + with self.assertRaisesRegex(ValueError, "ASCII-only"): + self._box.add(io.StringIO(self._nonascii_msg)) + self.assertEqual(len(self._box), 0) + self._box.close() + self.assertMailboxEmpty() + def test_remove(self): # Remove messages using remove() self._test_remove_or_delitem(self._box.remove) @@ -149,27 +263,38 @@ class TestMailbox(TestBase): key0 = self._box.add(self._template % 0) key1 = self._box.add(_sample_message) msg0 = self._box.get_message(key0) - self.assertTrue(isinstance(msg0, mailbox.Message)) + self.assertIsInstance(msg0, mailbox.Message) self.assertEqual(msg0['from'], 'foo') self.assertEqual(msg0.get_payload(), '0') self._check_sample(self._box.get_message(key1)) + def test_get_bytes(self): + # Get bytes representations of messages + key0 = self._box.add(self._template % 0) + key1 = self._box.add(_sample_message) + self.assertEqual(self._box.get_bytes(key0), + (self._template % 0).encode('ascii')) + self.assertEqual(self._box.get_bytes(key1), _bytes_sample_message) + def test_get_string(self): # Get string representations of messages key0 = self._box.add(self._template % 0) key1 = self._box.add(_sample_message) self.assertEqual(self._box.get_string(key0), self._template % 0) - self.assertEqual(self._box.get_string(key1), _sample_message) + self.assertEqual(self._box.get_string(key1).split('\n'), + _sample_message.split('\n')) def test_get_file(self): # Get file representations of messages key0 = self._box.add(self._template % 0) key1 = self._box.add(_sample_message) - data0 = self._box.get_file(key0).read() - data1 = self._box.get_file(key1).read() - self.assertEqual(data0.replace(os.linesep, '\n'), + with self._box.get_file(key0) as file: + data0 = file.read() + with self._box.get_file(key1) as file: + data1 = file.read() + self.assertEqual(data0.decode('ascii').replace(os.linesep, '\n'), self._template % 0) - self.assertEqual(data1.replace(os.linesep, '\n'), + self.assertEqual(data1.decode('ascii').replace(os.linesep, '\n'), _sample_message) def test_iterkeys(self): @@ -403,11 +528,12 @@ class TestMailbox(TestBase): def test_dump_message(self): # Write message representations to disk for input in (email.message_from_string(_sample_message), - _sample_message, io.StringIO(_sample_message)): - output = io.StringIO() + _sample_message, io.BytesIO(_bytes_sample_message)): + output = io.BytesIO() self._box._dump_message(input, output) - self.assertEqual(output.getvalue(), _sample_message) - output = io.StringIO() + self.assertEqual(output.getvalue(), + _bytes_sample_message.replace(b'\n', os.linesep.encode())) + output = io.BytesIO() self.assertRaises(TypeError, lambda: self._box._dump_message(None, output)) @@ -437,6 +563,7 @@ class TestMailboxSuperclass(TestBase): self.assertRaises(NotImplementedError, lambda: box.__getitem__('')) self.assertRaises(NotImplementedError, lambda: box.get_message('')) self.assertRaises(NotImplementedError, lambda: box.get_string('')) + self.assertRaises(NotImplementedError, lambda: box.get_bytes('')) self.assertRaises(NotImplementedError, lambda: box.get_file('')) self.assertRaises(NotImplementedError, lambda: '' in box) self.assertRaises(NotImplementedError, lambda: box.__contains__('')) @@ -460,6 +587,9 @@ class TestMaildir(TestMailbox): if os.name in ('nt', 'os2') or sys.platform == 'cygwin': self._box.colon = '!' + def assertMailboxEmpty(self): + self.assertEqual(os.listdir(os.path.join(self._path, 'tmp')), []) + def test_add_MM(self): # Add a MaildirMessage instance msg = mailbox.MaildirMessage(self._template % 0) @@ -476,7 +606,7 @@ class TestMaildir(TestMailbox): msg.set_flags('RF') key = self._box.add(msg) msg_returned = self._box.get_message(key) - self.assertTrue(isinstance(msg_returned, mailbox.MaildirMessage)) + self.assertIsInstance(msg_returned, mailbox.MaildirMessage) self.assertEqual(msg_returned.get_subdir(), 'cur') self.assertEqual(msg_returned.get_flags(), 'FR') @@ -516,7 +646,7 @@ class TestMaildir(TestMailbox): box = mailbox.Maildir(self._path, factory=FakeMessage) box.colon = self._box.colon msg2 = box.get_message(key) - self.assertTrue(isinstance(msg2, FakeMessage)) + self.assertIsInstance(msg2, FakeMessage) def test_initialize_new(self): # Initialize a non-existent mailbox @@ -586,12 +716,10 @@ class TestMaildir(TestMailbox): # Remove old files from 'tmp' foo_path = os.path.join(self._path, 'tmp', 'foo') bar_path = os.path.join(self._path, 'tmp', 'bar') - f = open(foo_path, 'w') - f.write("@") - f.close() - f = open(bar_path, 'w') - f.write("@") - f.close() + with open(foo_path, 'w') as f: + f.write("@") + with open(bar_path, 'w') as f: + f.write("@") self._box.clean() self.assertTrue(os.path.exists(foo_path)) self.assertTrue(os.path.exists(bar_path)) @@ -640,9 +768,9 @@ class TestMaildir(TestMailbox): "Host name mismatch: '%s' should be '%s'" % (groups[4], hostname)) previous_groups = groups - tmp_file.write(_sample_message) + tmp_file.write(_bytes_sample_message) tmp_file.seek(0) - self.assertEqual(tmp_file.read(), _sample_message) + self.assertEqual(tmp_file.read(), _bytes_sample_message) tmp_file.close() file_count = len(os.listdir(os.path.join(self._path, "tmp"))) self.assertEqual(file_count, repetitions, @@ -779,17 +907,28 @@ class TestMaildir(TestMailbox): class _TestMboxMMDF(TestMailbox): def tearDown(self): + super().tearDown() self._box.close() self._delete_recursively(self._path) for lock_remnant in glob.glob(self._path + '.*'): support.unlink(lock_remnant) + def assertMailboxEmpty(self): + with open(self._path) as f: + self.assertEqual(f.readlines(), []) + def test_add_from_string(self): # Add a string starting with 'From ' to the mailbox key = self._box.add('From foo@bar blah\nFrom: foo\n\n0') self.assertEqual(self._box[key].get_from(), 'foo@bar blah') self.assertEqual(self._box[key].get_payload(), '0') + def test_add_from_bytes(self): + # Add a byte string starting with 'From ' to the mailbox + key = self._box.add(b'From foo@bar blah\nFrom: foo\n\n0') + self.assertEqual(self._box[key].get_from(), 'foo@bar blah') + self.assertEqual(self._box[key].get_payload(), '0') + def test_add_mbox_or_mmdf_message(self): # Add an mboxMessage or MMDFMessage for class_ in (mailbox.mboxMessage, mailbox.MMDFMessage): @@ -820,7 +959,8 @@ class _TestMboxMMDF(TestMailbox): self._box._file.seek(0) contents = self._box._file.read() self._box.close() - self.assertEqual(contents, open(self._path, 'r', newline='').read()) + with open(self._path, 'rb') as f: + self.assertEqual(contents, f.read()) self._box = self._factory(self._path) def test_lock_conflict(self): @@ -899,6 +1039,9 @@ class TestMH(TestMailbox): _factory = lambda self, path, factory=None: mailbox.MH(path, factory) + def assertMailboxEmpty(self): + self.assertEqual(os.listdir(self._path), ['.mh_sequences']) + def test_list_folders(self): # List folders self._box.add_folder('one') @@ -972,6 +1115,13 @@ class TestMH(TestMailbox): key0 = self._box.add(msg0) refmsg0 = self._box.get_message(key0) + def test_issue7627(self): + msg0 = mailbox.MHMessage(self._template % 0) + key0 = self._box.add(msg0) + self._box.lock() + self._box.remove(key0) + self._box.unlock() + def test_pack(self): # Pack the contents of the mailbox msg0 = mailbox.MHMessage(self._template % 0) @@ -1024,7 +1174,12 @@ class TestBabyl(TestMailbox): _factory = lambda self, path, factory=None: mailbox.Babyl(path, factory) + def assertMailboxEmpty(self): + with open(self._path) as f: + self.assertEqual(f.readlines(), []) + def tearDown(self): + super().tearDown() self._box.close() self._delete_recursively(self._path) for lock_remnant in glob.glob(self._path + '.*'): @@ -1074,21 +1229,29 @@ class TestMessage(TestBase): def test_initialize_with_file(self): # Initialize based on contents of file - f = open(self._path, 'w+') - f.write(_sample_message) - f.seek(0) - msg = self._factory(f) - self._post_initialize_hook(msg) - self._check_sample(msg) - f.close() + with open(self._path, 'w+') as f: + f.write(_sample_message) + f.seek(0) + msg = self._factory(f) + self._post_initialize_hook(msg) + self._check_sample(msg) + + def test_initialize_with_binary_file(self): + # Initialize based on contents of binary file + with open(self._path, 'wb+') as f: + f.write(_bytes_sample_message) + f.seek(0) + msg = self._factory(f) + self._post_initialize_hook(msg) + self._check_sample(msg) def test_initialize_with_nothing(self): # Initialize without arguments msg = self._factory() self._post_initialize_hook(msg) - self.assertTrue(isinstance(msg, email.message.Message)) - self.assertTrue(isinstance(msg, mailbox.Message)) - self.assertTrue(isinstance(msg, self._factory)) + self.assertIsInstance(msg, email.message.Message) + self.assertIsInstance(msg, mailbox.Message) + self.assertIsInstance(msg, self._factory) self.assertEqual(msg.keys(), []) self.assertFalse(msg.is_multipart()) self.assertEqual(msg.get_payload(), None) @@ -1358,6 +1521,14 @@ class TestMessageConversion(TestBase): msg_plain = mailbox.Message(msg) self._check_sample(msg_plain) + def test_x_from_bytes(self): + # Convert all formats to Message + for class_ in (mailbox.Message, mailbox.MaildirMessage, + mailbox.mboxMessage, mailbox.MHMessage, + mailbox.BabylMessage, mailbox.MMDFMessage): + msg = class_(_bytes_sample_message) + self._check_sample(msg) + def test_x_to_invalid(self): # Convert all formats to an invalid format for class_ in (mailbox.Message, mailbox.MaildirMessage, @@ -1804,18 +1975,16 @@ class MaildirTestCase(unittest.TestCase): filename = ".".join((str(t), str(pid), "myhostname", "mydomain")) tmpname = os.path.join(self._dir, "tmp", filename) newname = os.path.join(self._dir, dir, filename) - fp = open(tmpname, "w") - self._msgfiles.append(tmpname) - if mbox: - fp.write(FROM_) - fp.write(DUMMY_MESSAGE) - fp.close() + with open(tmpname, "w") as fp: + self._msgfiles.append(tmpname) + if mbox: + fp.write(FROM_) + fp.write(DUMMY_MESSAGE) if hasattr(os, "link"): os.link(tmpname, newname) else: - fp = open(newname, "w") - fp.write(DUMMY_MESSAGE) - fp.close() + with open(newname, "w") as fp: + fp.write(DUMMY_MESSAGE) self._msgfiles.append(newname) return tmpname @@ -1905,6 +2074,8 @@ H4sICM2D1UIAA3RleHQAC8nILFYAokSFktSKEoW0zJxUPa7wzJIMhZLyfIWczLzUYj0uAHTs --NMuMz9nt05w80d4+-- """ +_bytes_sample_message = _sample_message.encode('ascii') + _sample_headers = { "Return-Path":"<gkj@gregorykjohnson.com>", "X-Original-To":"gkj+person@localhost", |