summaryrefslogtreecommitdiffstats
path: root/Lib
diff options
context:
space:
mode:
authorAndrew M. Kuchling <amk@amk.ca>2006-04-22 02:32:43 (GMT)
committerAndrew M. Kuchling <amk@amk.ca>2006-04-22 02:32:43 (GMT)
commit1da4a947190802f5825bb72761916bad59cc503e (patch)
tree38596017508db44fd969af2765aaa03f81ecf037 /Lib
parent81efcf6833c97a5488454c83209d253a178eec92 (diff)
downloadcpython-1da4a947190802f5825bb72761916bad59cc503e.zip
cpython-1da4a947190802f5825bb72761916bad59cc503e.tar.gz
cpython-1da4a947190802f5825bb72761916bad59cc503e.tar.bz2
Add Gregory K. Johnson's revised version of mailbox.py (funded by
the 2005 Summer of Code). The revision adds a number of new mailbox classes that support adding and removing messages; these classes also support mailbox locking and default to using email.Message instead of rfc822.Message. The old mailbox classes are largely left alone for backward compatibility. The exception is the Maildir class, which was present in the old module and now inherits from the new classes. The Maildir class's interface is pretty simple, though, so I think it'll be compatible with existing code. (The change to the NEWS file also adds a missing word to a different news item, which unfortunately required rewrapping the line.)
Diffstat (limited to 'Lib')
-rwxr-xr-xLib/mailbox.py2037
-rw-r--r--Lib/test/test_mailbox.py1672
2 files changed, 3550 insertions, 159 deletions
diff --git a/Lib/mailbox.py b/Lib/mailbox.py
index c89c1a4..ac87a51 100755
--- a/Lib/mailbox.py
+++ b/Lib/mailbox.py
@@ -1,93 +1,1909 @@
#! /usr/bin/env python
-"""Classes to handle Unix style, MMDF style, and MH style mailboxes."""
+"""Read/write support for Maildir, mbox, MH, Babyl, and MMDF mailboxes."""
-
-import rfc822
import os
+import time
+import calendar
+import socket
+import errno
+import copy
+import email
+import email.Message
+import email.Generator
+import rfc822
+import StringIO
+try:
+ import fnctl
+except ImportError:
+ fcntl = None
-__all__ = ["UnixMailbox","MmdfMailbox","MHMailbox","Maildir","BabylMailbox",
- "PortableUnixMailbox"]
+__all__ = [ 'Mailbox', 'Maildir', 'mbox', 'MH', 'Babyl', 'MMDF',
+ 'Message', 'MaildirMessage', 'mboxMessage', 'MHMessage',
+ 'BabylMessage', 'MMDFMessage', 'UnixMailbox',
+ 'PortableUnixMailbox', 'MmdfMailbox', 'MHMailbox', 'BabylMailbox' ]
-class _Mailbox:
- def __init__(self, fp, factory=rfc822.Message):
- self.fp = fp
- self.seekp = 0
- self.factory = factory
+class Mailbox:
+ """A group of messages in a particular place."""
+
+ def __init__(self, path, factory=None, create=True):
+ """Initialize a Mailbox instance."""
+ self._path = os.path.abspath(os.path.expanduser(path))
+ self._factory = factory
+
+ def add(self, message):
+ """Add message and return assigned key."""
+ raise NotImplementedError('Method must be implemented by subclass')
+
+ def remove(self, key):
+ """Remove the keyed message; raise KeyError if it doesn't exist."""
+ raise NotImplementedError('Method must be implemented by subclass')
+
+ def __delitem__(self, key):
+ self.remove(key)
+
+ def discard(self, key):
+ """If the keyed message exists, remove it."""
+ try:
+ self.remove(key)
+ except KeyError:
+ pass
+
+ def __setitem__(self, key, message):
+ """Replace the keyed message; raise KeyError if it doesn't exist."""
+ raise NotImplementedError('Method must be implemented by subclass')
+
+ def get(self, key, default=None):
+ """Return the keyed message, or default if it doesn't exist."""
+ try:
+ return self.__getitem__(key)
+ except KeyError:
+ return default
+
+ def __getitem__(self, key):
+ """Return the keyed message; raise KeyError if it doesn't exist."""
+ if not self._factory:
+ return self.get_message(key)
+ else:
+ return self._factory(self.get_file(key))
+
+ def get_message(self, key):
+ """Return a Message representation or raise a KeyError."""
+ raise NotImplementedError('Method must be implemented by subclass')
+
+ def get_string(self, key):
+ """Return a string representation or raise a KeyError."""
+ raise NotImplementedError('Method must be implemented by subclass')
+
+ def get_file(self, key):
+ """Return a file-like representation or raise a KeyError."""
+ raise NotImplementedError('Method must be implemented by subclass')
+
+ def iterkeys(self):
+ """Return an iterator over keys."""
+ raise NotImplementedError('Method must be implemented by subclass')
+
+ def keys(self):
+ """Return a list of keys."""
+ return list(self.iterkeys())
+
+ def itervalues(self):
+ """Return an iterator over all messages."""
+ for key in self.iterkeys():
+ try:
+ value = self[key]
+ except KeyError:
+ continue
+ yield value
def __iter__(self):
- return iter(self.next, None)
+ return self.itervalues()
+
+ def values(self):
+ """Return a list of messages. Memory intensive."""
+ return list(self.itervalues())
+
+ def iteritems(self):
+ """Return an iterator over (key, message) tuples."""
+ for key in self.iterkeys():
+ try:
+ value = self[key]
+ except KeyError:
+ continue
+ yield (key, value)
+
+ def items(self):
+ """Return a list of (key, message) tuples. Memory intensive."""
+ return list(self.iteritems())
+
+ def has_key(self, key):
+ """Return True if the keyed message exists, False otherwise."""
+ raise NotImplementedError('Method must be implemented by subclass')
+
+ def __contains__(self, key):
+ return self.has_key(key)
+
+ def __len__(self):
+ """Return a count of messages in the mailbox."""
+ raise NotImplementedError('Method must be implemented by subclass')
+
+ def clear(self):
+ """Delete all messages."""
+ for key in self.iterkeys():
+ self.discard(key)
+
+ def pop(self, key, default=None):
+ """Delete the keyed message and return it, or default."""
+ try:
+ result = self[key]
+ except KeyError:
+ return default
+ self.discard(key)
+ return result
+
+ def popitem(self):
+ """Delete an arbitrary (key, message) pair and return it."""
+ for key in self.iterkeys():
+ return (key, self.pop(key)) # This is only run once.
+ else:
+ raise KeyError('No messages in mailbox')
+
+ def update(self, arg=None):
+ """Change the messages that correspond to certain keys."""
+ if hasattr(arg, 'iteritems'):
+ source = arg.iteritems()
+ elif hasattr(arg, 'items'):
+ source = arg.items()
+ else:
+ source = arg
+ bad_key = False
+ for key, message in source:
+ try:
+ self[key] = message
+ except KeyError:
+ bad_key = True
+ if bad_key:
+ raise KeyError('No message with key(s)')
+
+ def flush(self):
+ """Write any pending changes to the disk."""
+ raise NotImplementedError('Method must be implemented by subclass')
+
+ def lock(self):
+ """Lock the mailbox."""
+ raise NotImplementedError('Method must be implemented by subclass')
+
+ def unlock(self):
+ """Unlock the mailbox if it is locked."""
+ raise NotImplementedError('Method must be implemented by subclass')
+
+ def close(self):
+ """Flush and close the mailbox."""
+ raise NotImplementedError('Method must be implemented by subclass')
+
+ def _dump_message(self, message, target, mangle_from_=False):
+ # Most files are opened in binary mode to allow predictable seeking.
+ # To get native line endings on disk, the user-friendly \n line endings
+ # used in strings and by email.Message are translated here.
+ """Dump message contents to target file."""
+ if isinstance(message, email.Message.Message):
+ buffer = StringIO.StringIO()
+ gen = email.Generator.Generator(buffer, mangle_from_, 0)
+ gen.flatten(message)
+ buffer.seek(0)
+ target.write(buffer.read().replace('\n', os.linesep))
+ elif isinstance(message, str):
+ if mangle_from_:
+ message = message.replace('\nFrom ', '\n>From ')
+ message = message.replace('\n', os.linesep)
+ target.write(message)
+ elif hasattr(message, 'read'):
+ while True:
+ line = message.readline()
+ if line == '':
+ break
+ if mangle_from_ and line.startswith('From '):
+ line = '>From ' + line[5:]
+ line = line.replace('\n', os.linesep)
+ target.write(line)
+ else:
+ raise TypeError('Invalid message type: %s' % type(message))
+
+
+class Maildir(Mailbox):
+ """A qmail-style Maildir mailbox."""
+
+ colon = ':'
+
+ def __init__(self, dirname, factory=rfc822.Message, create=True):
+ """Initialize a Maildir instance."""
+ Mailbox.__init__(self, dirname, factory, create)
+ if not os.path.exists(self._path):
+ if create:
+ os.mkdir(self._path, 0700)
+ os.mkdir(os.path.join(self._path, 'tmp'), 0700)
+ os.mkdir(os.path.join(self._path, 'new'), 0700)
+ os.mkdir(os.path.join(self._path, 'cur'), 0700)
+ else:
+ raise NoSuchMailboxError(self._path)
+ self._toc = {}
+
+ def add(self, message):
+ """Add message and return assigned key."""
+ tmp_file = self._create_tmp()
+ try:
+ self._dump_message(message, tmp_file)
+ finally:
+ tmp_file.close()
+ if isinstance(message, MaildirMessage):
+ subdir = message.get_subdir()
+ suffix = self.colon + message.get_info()
+ if suffix == self.colon:
+ suffix = ''
+ else:
+ subdir = 'new'
+ suffix = ''
+ uniq = os.path.basename(tmp_file.name).split(self.colon)[0]
+ dest = os.path.join(self._path, subdir, uniq + suffix)
+ os.rename(tmp_file.name, dest)
+ if isinstance(message, MaildirMessage):
+ os.utime(dest, (os.path.getatime(dest), message.get_date()))
+ return uniq
+
+ def remove(self, key):
+ """Remove the keyed message; raise KeyError if it doesn't exist."""
+ os.remove(os.path.join(self._path, self._lookup(key)))
+
+ def discard(self, key):
+ """If the keyed message exists, remove it."""
+ # This overrides an inapplicable implementation in the superclass.
+ try:
+ self.remove(key)
+ except KeyError:
+ pass
+ except OSError, e:
+ if e.errno == errno.ENOENT:
+ pass
+ else:
+ raise
+
+ def __setitem__(self, key, message):
+ """Replace the keyed message; raise KeyError if it doesn't exist."""
+ old_subpath = self._lookup(key)
+ temp_key = self.add(message)
+ temp_subpath = self._lookup(temp_key)
+ if isinstance(message, MaildirMessage):
+ # temp's subdir and suffix were specified by message.
+ dominant_subpath = temp_subpath
+ else:
+ # temp's subdir and suffix were defaults from add().
+ dominant_subpath = old_subpath
+ subdir = os.path.dirname(dominant_subpath)
+ if self.colon in dominant_subpath:
+ suffix = self.colon + dominant_subpath.split(self.colon)[-1]
+ else:
+ suffix = ''
+ self.discard(key)
+ new_path = os.path.join(self._path, subdir, key + suffix)
+ os.rename(os.path.join(self._path, temp_subpath), new_path)
+ if isinstance(message, MaildirMessage):
+ os.utime(new_path, (os.path.getatime(new_path),
+ message.get_date()))
+
+ def get_message(self, key):
+ """Return a Message representation or raise a KeyError."""
+ subpath = self._lookup(key)
+ f = file(os.path.join(self._path, subpath), 'r')
+ try:
+ msg = MaildirMessage(f)
+ finally:
+ f.close()
+ subdir, name = os.path.split(subpath)
+ msg.set_subdir(subdir)
+ if self.colon in name:
+ msg.set_info(name.split(self.colon)[-1])
+ msg.set_date(os.path.getmtime(os.path.join(self._path, subpath)))
+ return msg
+
+ def get_string(self, key):
+ """Return a string representation or raise a KeyError."""
+ f = file(os.path.join(self._path, self._lookup(key)), 'r')
+ try:
+ return f.read()
+ finally:
+ f.close()
+
+ def get_file(self, key):
+ """Return a file-like representation or raise a KeyError."""
+ f = file(os.path.join(self._path, self._lookup(key)), 'rb')
+ return _ProxyFile(f)
+
+ def iterkeys(self):
+ """Return an iterator over keys."""
+ self._refresh()
+ for key in self._toc:
+ try:
+ self._lookup(key)
+ except KeyError:
+ continue
+ yield key
+
+ def has_key(self, key):
+ """Return True if the keyed message exists, False otherwise."""
+ self._refresh()
+ return key in self._toc
+
+ def __len__(self):
+ """Return a count of messages in the mailbox."""
+ self._refresh()
+ return len(self._toc)
+
+ def flush(self):
+ """Write any pending changes to disk."""
+ return # Maildir changes are always written immediately.
+
+ def lock(self):
+ """Lock the mailbox."""
+ return
+
+ def unlock(self):
+ """Unlock the mailbox if it is locked."""
+ return
+
+ def close(self):
+ """Flush and close the mailbox."""
+ return
+
+ def list_folders(self):
+ """Return a list of folder names."""
+ result = []
+ for entry in os.listdir(self._path):
+ if len(entry) > 1 and entry[0] == '.' and \
+ os.path.isdir(os.path.join(self._path, entry)):
+ result.append(entry[1:])
+ return result
+
+ def get_folder(self, folder):
+ """Return a Maildir instance for the named folder."""
+ return Maildir(os.path.join(self._path, '.' + folder), create=False)
+
+ def add_folder(self, folder):
+ """Create a folder and return a Maildir instance representing it."""
+ path = os.path.join(self._path, '.' + folder)
+ result = Maildir(path)
+ maildirfolder_path = os.path.join(path, 'maildirfolder')
+ if not os.path.exists(maildirfolder_path):
+ os.close(os.open(maildirfolder_path, os.O_CREAT | os.O_WRONLY))
+ return result
+
+ def remove_folder(self, folder):
+ """Delete the named folder, which must be empty."""
+ path = os.path.join(self._path, '.' + folder)
+ for entry in os.listdir(os.path.join(path, 'new')) + \
+ os.listdir(os.path.join(path, 'cur')):
+ if len(entry) < 1 or entry[0] != '.':
+ raise NotEmptyError('Folder contains message(s): %s' % folder)
+ for entry in os.listdir(path):
+ if entry != 'new' and entry != 'cur' and entry != 'tmp' and \
+ os.path.isdir(os.path.join(path, entry)):
+ raise NotEmptyError("Folder contains subdirectory '%s': %s" %
+ (folder, entry))
+ for root, dirs, files in os.walk(path, topdown=False):
+ for entry in files:
+ os.remove(os.path.join(root, entry))
+ for entry in dirs:
+ os.rmdir(os.path.join(root, entry))
+ os.rmdir(path)
+
+ def clean(self):
+ """Delete old files in "tmp"."""
+ now = time.time()
+ for entry in os.listdir(os.path.join(self._path, 'tmp')):
+ path = os.path.join(self._path, 'tmp', entry)
+ if now - os.path.getatime(path) > 129600: # 60 * 60 * 36
+ os.remove(path)
+
+ _count = 1 # This is used to generate unique file names.
+
+ def _create_tmp(self):
+ """Create a file in the tmp subdirectory and open and return it."""
+ now = time.time()
+ hostname = socket.gethostname()
+ if '/' in hostname:
+ hostname = hostname.replace('/', r'\057')
+ if ':' in hostname:
+ hostname = hostname.replace(':', r'\072')
+ uniq = "%s.M%sP%sQ%s.%s" % (int(now), int(now % 1 * 1e6), os.getpid(),
+ Maildir._count, hostname)
+ path = os.path.join(self._path, 'tmp', uniq)
+ try:
+ os.stat(path)
+ except OSError, e:
+ if e.errno == errno.ENOENT:
+ Maildir._count += 1
+ return file(path, 'wb+')
+ else:
+ raise
+ else:
+ raise ExternalClashError('Name clash prevented file creation: %s' %
+ path)
+
+ def _refresh(self):
+ """Update table of contents mapping."""
+ self._toc = {}
+ for subdir in ('new', 'cur'):
+ for entry in os.listdir(os.path.join(self._path, subdir)):
+ uniq = entry.split(self.colon)[0]
+ self._toc[uniq] = os.path.join(subdir, entry)
+
+ def _lookup(self, key):
+ """Use TOC to return subpath for given key, or raise a KeyError."""
+ try:
+ if os.path.exists(os.path.join(self._path, self._toc[key])):
+ return self._toc[key]
+ except KeyError:
+ pass
+ self._refresh()
+ try:
+ return self._toc[key]
+ except KeyError:
+ raise KeyError('No message with key: %s' % key)
+ # This method is for backward compatibility only.
def next(self):
- while 1:
- self.fp.seek(self.seekp)
+ """Return the next message in a one-time iteration."""
+ if not hasattr(self, '_onetime_keys'):
+ self._onetime_keys = self.iterkeys()
+ while True:
try:
- self._search_start()
- except EOFError:
- self.seekp = self.fp.tell()
+ return self[self._onetime_keys.next()]
+ except StopIteration:
return None
- start = self.fp.tell()
- self._search_end()
- self.seekp = stop = self.fp.tell()
- if start != stop:
+ except KeyError:
+ continue
+
+
+class _singlefileMailbox(Mailbox):
+ """A single-file mailbox."""
+
+ def __init__(self, path, factory=None, create=True):
+ """Initialize a single-file mailbox."""
+ Mailbox.__init__(self, path, factory, create)
+ try:
+ f = file(self._path, 'rb+')
+ except IOError, e:
+ if e.errno == errno.ENOENT:
+ if create:
+ f = file(self._path, 'wb+')
+ else:
+ raise NoSuchMailboxError(self._path)
+ elif e.errno == errno.EACCES:
+ f = file(self._path, 'rb')
+ else:
+ raise
+ self._file = f
+ self._toc = None
+ self._next_key = 0
+ self._pending = False # No changes require rewriting the file.
+ self._locked = False
+
+ def add(self, message):
+ """Add message and return assigned key."""
+ self._lookup()
+ self._toc[self._next_key] = self._append_message(message)
+ self._next_key += 1
+ self._pending = True
+ return self._next_key - 1
+
+ def remove(self, key):
+ """Remove the keyed message; raise KeyError if it doesn't exist."""
+ self._lookup(key)
+ del self._toc[key]
+ self._pending = True
+
+ def __setitem__(self, key, message):
+ """Replace the keyed message; raise KeyError if it doesn't exist."""
+ self._lookup(key)
+ self._toc[key] = self._append_message(message)
+ self._pending = True
+
+ def iterkeys(self):
+ """Return an iterator over keys."""
+ self._lookup()
+ for key in self._toc.keys():
+ yield key
+
+ def has_key(self, key):
+ """Return True if the keyed message exists, False otherwise."""
+ self._lookup()
+ return key in self._toc
+
+ def __len__(self):
+ """Return a count of messages in the mailbox."""
+ self._lookup()
+ return len(self._toc)
+
+ def lock(self):
+ """Lock the mailbox."""
+ if not self._locked:
+ _lock_file(self._file)
+ self._locked = True
+
+ def unlock(self):
+ """Unlock the mailbox if it is locked."""
+ if self._locked:
+ _unlock_file(self._file)
+ self._locked = False
+
+ def flush(self):
+ """Write any pending changes to disk."""
+ if not self._pending:
+ return
+ self._lookup()
+ new_file = _create_temporary(self._path)
+ try:
+ new_toc = {}
+ self._pre_mailbox_hook(new_file)
+ for key in sorted(self._toc.keys()):
+ start, stop = self._toc[key]
+ self._file.seek(start)
+ self._pre_message_hook(new_file)
+ new_start = new_file.tell()
+ while True:
+ buffer = self._file.read(min(4096,
+ stop - self._file.tell()))
+ if buffer == '':
+ break
+ new_file.write(buffer)
+ new_toc[key] = (new_start, new_file.tell())
+ self._post_message_hook(new_file)
+ except:
+ new_file.close()
+ os.remove(new_file.name)
+ raise
+ new_file.close()
+ self._file.close()
+ try:
+ os.rename(new_file.name, self._path)
+ except OSError, e:
+ if e.errno == errno.EEXIST:
+ os.remove(self._path)
+ os.rename(new_file.name, self._path)
+ else:
+ raise
+ self._file = file(self._path, 'rb+')
+ self._toc = new_toc
+ self._pending = False
+ if self._locked:
+ _lock_file(new_file, dotlock=False)
+
+ def _pre_mailbox_hook(self, f):
+ """Called before writing the mailbox to file f."""
+ return
+
+ def _pre_message_hook(self, f):
+ """Called before writing each message to file f."""
+ return
+
+ def _post_message_hook(self, f):
+ """Called after writing each message to file f."""
+ return
+
+ def close(self):
+ """Flush and close the mailbox."""
+ self.flush()
+ if self._locked:
+ self.unlock()
+ self._file.close()
+
+ def _lookup(self, key=None):
+ """Return (start, stop) or raise KeyError."""
+ if self._toc is None:
+ self._generate_toc()
+ if key is not None:
+ try:
+ return self._toc[key]
+ except KeyError:
+ raise KeyError('No message with key: %s' % key)
+
+ def _append_message(self, message):
+ """Append message to mailbox and return (start, stop) offsets."""
+ self._file.seek(0, 2)
+ self._pre_message_hook(self._file)
+ offsets = self._install_message(message)
+ self._post_message_hook(self._file)
+ self._file.flush()
+ return offsets
+
+
+
+class _mboxMMDF(_singlefileMailbox):
+ """An mbox or MMDF mailbox."""
+
+ _mangle_from_ = True
+
+ def get_message(self, key):
+ """Return a Message representation or raise a KeyError."""
+ start, stop = self._lookup(key)
+ self._file.seek(start)
+ from_line = self._file.readline().replace(os.linesep, '')
+ string = self._file.read(stop - self._file.tell())
+ msg = self._message_factory(string.replace(os.linesep, '\n'))
+ msg.set_from(from_line[5:])
+ return msg
+
+ def get_string(self, key, from_=False):
+ """Return a string representation or raise a KeyError."""
+ start, stop = self._lookup(key)
+ self._file.seek(start)
+ if not from_:
+ self._file.readline()
+ string = self._file.read(stop - self._file.tell())
+ return string.replace(os.linesep, '\n')
+
+ def get_file(self, key, from_=False):
+ """Return a file-like representation or raise a KeyError."""
+ start, stop = self._lookup(key)
+ self._file.seek(start)
+ if not from_:
+ self._file.readline()
+ return _PartialFile(self._file, self._file.tell(), stop)
+
+ def _install_message(self, message):
+ """Format a message and blindly write to self._file."""
+ from_line = None
+ if isinstance(message, str) and message.startswith('From '):
+ newline = message.find('\n')
+ if newline != -1:
+ from_line = message[:newline]
+ message = message[newline + 1:]
+ else:
+ from_line = message
+ message = ''
+ elif isinstance(message, _mboxMMDFMessage):
+ from_line = 'From ' + message.get_from()
+ elif isinstance(message, email.Message.Message):
+ from_line = message.get_unixfrom() # May be None.
+ if from_line is None:
+ from_line = 'From MAILER-DAEMON %s' % time.asctime(time.gmtime())
+ start = self._file.tell()
+ self._file.write(from_line + os.linesep)
+ self._dump_message(message, self._file, self._mangle_from_)
+ stop = self._file.tell()
+ return (start, stop)
+
+
+class mbox(_mboxMMDF):
+ """A classic mbox mailbox."""
+
+ _mangle_from_ = True
+
+ def __init__(self, path, factory=None, create=True):
+ """Initialize an mbox mailbox."""
+ self._message_factory = mboxMessage
+ _mboxMMDF.__init__(self, path, factory, create)
+
+ def _pre_message_hook(self, f):
+ """Called before writing each message to file f."""
+ if f.tell() != 0:
+ f.write(os.linesep)
+
+ def _generate_toc(self):
+ """Generate key-to-(start, stop) table of contents."""
+ starts, stops = [], []
+ self._file.seek(0)
+ while True:
+ line_pos = self._file.tell()
+ line = self._file.readline()
+ if line.startswith('From '):
+ if len(stops) < len(starts):
+ stops.append(line_pos - len(os.linesep))
+ starts.append(line_pos)
+ elif line == '':
+ stops.append(line_pos)
break
- return self.factory(_Subfile(self.fp, start, stop))
+ self._toc = dict(enumerate(zip(starts, stops)))
+ self._next_key = len(self._toc)
-class _Subfile:
+class MMDF(_mboxMMDF):
+ """An MMDF mailbox."""
- def __init__(self, fp, start, stop):
- self.fp = fp
- self.start = start
- self.stop = stop
- self.pos = self.start
+ def __init__(self, path, factory=None, create=True):
+ """Initialize an MMDF mailbox."""
+ self._message_factory = MMDFMessage
+ _mboxMMDF.__init__(self, path, factory, create)
+ def _pre_message_hook(self, f):
+ """Called before writing each message to file f."""
+ f.write('\001\001\001\001' + os.linesep)
- def _read(self, length, read_function):
- if self.pos >= self.stop:
- return ''
- remaining = self.stop - self.pos
- if length is None or length < 0 or length > remaining:
- length = remaining
- self.fp.seek(self.pos)
- data = read_function(length)
- self.pos = self.fp.tell()
- return data
-
- def read(self, length = None):
- return self._read(length, self.fp.read)
-
- def readline(self, length = None):
- return self._read(length, self.fp.readline)
-
- def readlines(self, sizehint = -1):
- lines = []
- while 1:
- line = self.readline()
- if not line:
+ def _post_message_hook(self, f):
+ """Called after writing each message to file f."""
+ f.write(os.linesep + '\001\001\001\001' + os.linesep)
+
+ def _generate_toc(self):
+ """Generate key-to-(start, stop) table of contents."""
+ starts, stops = [], []
+ self._file.seek(0)
+ next_pos = 0
+ while True:
+ line_pos = next_pos
+ line = self._file.readline()
+ next_pos = self._file.tell()
+ if line.startswith('\001\001\001\001' + os.linesep):
+ starts.append(next_pos)
+ while True:
+ line_pos = next_pos
+ line = self._file.readline()
+ next_pos = self._file.tell()
+ if line == '\001\001\001\001' + os.linesep:
+ stops.append(line_pos - len(os.linesep))
+ break
+ elif line == '':
+ stops.append(line_pos)
+ break
+ elif line == '':
+ break
+ self._toc = dict(enumerate(zip(starts, stops)))
+ self._next_key = len(self._toc)
+
+
+class MH(Mailbox):
+ """An MH mailbox."""
+
+ def __init__(self, path, factory=None, create=True):
+ """Initialize an MH instance."""
+ Mailbox.__init__(self, path, factory, create)
+ if not os.path.exists(self._path):
+ if create:
+ os.mkdir(self._path, 0700)
+ os.close(os.open(os.path.join(self._path, '.mh_sequences'),
+ os.O_CREAT | os.O_EXCL | os.O_WRONLY, 0600))
+ else:
+ raise NoSuchMailboxError(self._path)
+ self._locked = False
+
+ def add(self, message):
+ """Add message and return assigned key."""
+ keys = self.keys()
+ if len(keys) == 0:
+ new_key = 1
+ else:
+ new_key = max(keys) + 1
+ new_path = os.path.join(self._path, str(new_key))
+ f = _create_carefully(new_path)
+ try:
+ if self._locked:
+ _lock_file(f)
+ try:
+ self._dump_message(message, f)
+ if isinstance(message, MHMessage):
+ self._dump_sequences(message, new_key)
+ finally:
+ if self._locked:
+ _unlock_file(f)
+ finally:
+ f.close()
+ return new_key
+
+ def remove(self, key):
+ """Remove the keyed message; raise KeyError if it doesn't exist."""
+ path = os.path.join(self._path, str(key))
+ try:
+ f = file(path, 'rb+')
+ except IOError, e:
+ if e.errno == errno.ENOENT:
+ raise KeyError('No message with key: %s' % key)
+ else:
+ raise
+ try:
+ if self._locked:
+ _lock_file(f)
+ try:
+ f.close()
+ os.remove(os.path.join(self._path, str(key)))
+ finally:
+ if self._locked:
+ _unlock_file(f)
+ finally:
+ f.close()
+
+ def __setitem__(self, key, message):
+ """Replace the keyed message; raise KeyError if it doesn't exist."""
+ path = os.path.join(self._path, str(key))
+ try:
+ f = file(path, 'rb+')
+ except IOError, e:
+ if e.errno == errno.ENOENT:
+ raise KeyError('No message with key: %s' % key)
+ else:
+ raise
+ try:
+ if self._locked:
+ _lock_file(f)
+ try:
+ os.close(os.open(path, os.O_WRONLY | os.O_TRUNC))
+ self._dump_message(message, f)
+ if isinstance(message, MHMessage):
+ self._dump_sequences(message, key)
+ finally:
+ if self._locked:
+ _unlock_file(f)
+ finally:
+ f.close()
+
+ def get_message(self, key):
+ """Return a Message representation or raise a KeyError."""
+ try:
+ if self._locked:
+ f = file(os.path.join(self._path, str(key)), 'r+')
+ else:
+ f = file(os.path.join(self._path, str(key)), 'r')
+ except IOError, e:
+ if e.errno == errno.ENOENT:
+ raise KeyError('No message with key: %s' % key)
+ else:
+ raise
+ try:
+ if self._locked:
+ _lock_file(f)
+ try:
+ msg = MHMessage(f)
+ finally:
+ if self._locked:
+ _unlock_file(f)
+ finally:
+ f.close()
+ for name, key_list in self.get_sequences():
+ if key in key_list:
+ msg.add_sequence(name)
+ return msg
+
+ def get_string(self, key):
+ """Return a string representation or raise a KeyError."""
+ try:
+ if self._locked:
+ f = file(os.path.join(self._path, str(key)), 'r+')
+ else:
+ f = file(os.path.join(self._path, str(key)), 'r')
+ except IOError, e:
+ if e.errno == errno.ENOENT:
+ raise KeyError('No message with key: %s' % key)
+ else:
+ raise
+ try:
+ if self._locked:
+ _lock_file(f)
+ try:
+ return f.read()
+ finally:
+ if self._locked:
+ _unlock_file(f)
+ finally:
+ f.close()
+
+ def get_file(self, key):
+ """Return a file-like representation or raise a KeyError."""
+ try:
+ f = file(os.path.join(self._path, str(key)), 'rb')
+ except IOError, e:
+ if e.errno == errno.ENOENT:
+ raise KeyError('No message with key: %s' % key)
+ else:
+ raise
+ return _ProxyFile(f)
+
+ def iterkeys(self):
+ """Return an iterator over keys."""
+ return iter(sorted(int(entry) for entry in os.listdir(self._path)
+ if entry.isdigit()))
+
+ def has_key(self, key):
+ """Return True if the keyed message exists, False otherwise."""
+ return os.path.exists(os.path.join(self._path, str(key)))
+
+ def __len__(self):
+ """Return a count of messages in the mailbox."""
+ return len(list(self.iterkeys()))
+
+ def lock(self):
+ """Lock the mailbox."""
+ if not self._locked:
+ self._file = file(os.path.join(self._path, '.mh_sequences'), 'rb+')
+ _lock_file(self._file)
+ self._locked = True
+
+ def unlock(self):
+ """Unlock the mailbox if it is locked."""
+ if self._locked:
+ _unlock_file(self._file)
+ self._file.close()
+ del self._file
+ self._locked = False
+
+ def flush(self):
+ """Write any pending changes to the disk."""
+ return
+
+ def close(self):
+ """Flush and close the mailbox."""
+ if self._locked:
+ self.unlock()
+
+ def list_folders(self):
+ """Return a list of folder names."""
+ result = []
+ for entry in os.listdir(self._path):
+ if os.path.isdir(os.path.join(self._path, entry)):
+ result.append(entry)
+ return result
+
+ def get_folder(self, folder):
+ """Return an MH instance for the named folder."""
+ return MH(os.path.join(self._path, folder), create=False)
+
+ def add_folder(self, folder):
+ """Create a folder and return an MH instance representing it."""
+ return MH(os.path.join(self._path, folder))
+
+ def remove_folder(self, folder):
+ """Delete the named folder, which must be empty."""
+ path = os.path.join(self._path, folder)
+ entries = os.listdir(path)
+ if entries == ['.mh_sequences']:
+ os.remove(os.path.join(path, '.mh_sequences'))
+ elif entries == []:
+ pass
+ else:
+ raise NotEmptyError('Folder not empty: %s' % self._path)
+ os.rmdir(path)
+
+ def get_sequences(self):
+ """Return a name-to-key-list dictionary to define each sequence."""
+ results = {}
+ f = file(os.path.join(self._path, '.mh_sequences'), 'r')
+ try:
+ all_keys = set(self.keys())
+ for line in f:
+ try:
+ name, contents = line.split(':')
+ keys = set()
+ for spec in contents.split():
+ if spec.isdigit():
+ keys.add(int(spec))
+ else:
+ start, stop = (int(x) for x in spec.split('-'))
+ keys.update(range(start, stop + 1))
+ results[name] = [key for key in sorted(keys) \
+ if key in all_keys]
+ if len(results[name]) == 0:
+ del results[name]
+ except ValueError:
+ raise FormatError('Invalid sequence specification: %s' %
+ line.rstrip())
+ finally:
+ f.close()
+ return results
+
+ def set_sequences(self, sequences):
+ """Set sequences using the given name-to-key-list dictionary."""
+ f = file(os.path.join(self._path, '.mh_sequences'), 'r+')
+ try:
+ os.close(os.open(f.name, os.O_WRONLY | os.O_TRUNC))
+ for name, keys in sequences.iteritems():
+ if len(keys) == 0:
+ continue
+ f.write('%s:' % name)
+ prev = None
+ completing = False
+ for key in sorted(set(keys)):
+ if key - 1 == prev:
+ if not completing:
+ completing = True
+ f.write('-')
+ elif completing:
+ completing = False
+ f.write('%s %s' % (prev, key))
+ else:
+ f.write(' %s' % key)
+ prev = key
+ if completing:
+ f.write(str(prev) + '\n')
+ else:
+ f.write('\n')
+ finally:
+ f.close()
+
+ def pack(self):
+ """Re-name messages to eliminate numbering gaps. Invalidates keys."""
+ sequences = self.get_sequences()
+ prev = 0
+ changes = []
+ for key in self.iterkeys():
+ if key - 1 != prev:
+ changes.append((key, prev + 1))
+ f = file(os.path.join(self._path, str(key)), 'r+')
+ try:
+ if self._locked:
+ _lock_file(f)
+ try:
+ if hasattr(os, 'link'):
+ os.link(os.path.join(self._path, str(key)),
+ os.path.join(self._path, str(prev + 1)))
+ os.unlink(os.path.join(self._path, str(key)))
+ else:
+ f.close()
+ os.rename(os.path.join(self._path, str(key)),
+ os.path.join(self._path, str(prev + 1)))
+ finally:
+ if self._locked:
+ _unlock_file(f)
+ finally:
+ f.close()
+ prev += 1
+ self._next_key = prev + 1
+ if len(changes) == 0:
+ return
+ for name, key_list in sequences.items():
+ for old, new in changes:
+ if old in key_list:
+ key_list[key_list.index(old)] = new
+ self.set_sequences(sequences)
+
+ def _dump_sequences(self, message, key):
+ """Inspect a new MHMessage and update sequences appropriately."""
+ pending_sequences = message.get_sequences()
+ all_sequences = self.get_sequences()
+ for name, key_list in all_sequences.iteritems():
+ if name in pending_sequences:
+ key_list.append(key)
+ elif key in key_list:
+ del key_list[key_list.index(key)]
+ for sequence in pending_sequences:
+ if sequence not in all_sequences:
+ all_sequences[sequence] = [key]
+ self.set_sequences(all_sequences)
+
+
+class Babyl(_singlefileMailbox):
+ """An Rmail-style Babyl mailbox."""
+
+ _special_labels = frozenset(('unseen', 'deleted', 'filed', 'answered',
+ 'forwarded', 'edited', 'resent'))
+
+ def __init__(self, path, factory=None, create=True):
+ """Initialize a Babyl mailbox."""
+ _singlefileMailbox.__init__(self, path, factory, create)
+ self._labels = {}
+
+ def add(self, message):
+ """Add message and return assigned key."""
+ key = _singlefileMailbox.add(self, message)
+ if isinstance(message, BabylMessage):
+ self._labels[key] = message.get_labels()
+ return key
+
+ def remove(self, key):
+ """Remove the keyed message; raise KeyError if it doesn't exist."""
+ _singlefileMailbox.remove(self, key)
+ if key in self._labels:
+ del self._labels[key]
+
+ def __setitem__(self, key, message):
+ """Replace the keyed message; raise KeyError if it doesn't exist."""
+ _singlefileMailbox.__setitem__(self, key, message)
+ if isinstance(message, BabylMessage):
+ self._labels[key] = message.get_labels()
+
+ def get_message(self, key):
+ """Return a Message representation or raise a KeyError."""
+ start, stop = self._lookup(key)
+ self._file.seek(start)
+ self._file.readline() # Skip '1,' line specifying labels.
+ original_headers = StringIO.StringIO()
+ while True:
+ line = self._file.readline()
+ if line == '*** EOOH ***' + os.linesep or line == '':
+ break
+ original_headers.write(line.replace(os.linesep, '\n'))
+ visible_headers = StringIO.StringIO()
+ while True:
+ line = self._file.readline()
+ if line == os.linesep or line == '':
+ break
+ visible_headers.write(line.replace(os.linesep, '\n'))
+ body = self._file.read(stop - self._file.tell()).replace(os.linesep,
+ '\n')
+ msg = BabylMessage(original_headers.getvalue() + body)
+ msg.set_visible(visible_headers.getvalue())
+ if key in self._labels:
+ msg.set_labels(self._labels[key])
+ return msg
+
+ def get_string(self, key):
+ """Return a string representation or raise a KeyError."""
+ start, stop = self._lookup(key)
+ self._file.seek(start)
+ self._file.readline() # Skip '1,' line specifying labels.
+ original_headers = StringIO.StringIO()
+ while True:
+ line = self._file.readline()
+ if line == '*** EOOH ***' + os.linesep or line == '':
+ break
+ original_headers.write(line.replace(os.linesep, '\n'))
+ while True:
+ line = self._file.readline()
+ if line == os.linesep or line == '':
+ break
+ return original_headers.getvalue() + \
+ self._file.read(stop - self._file.tell()).replace(os.linesep,
+ '\n')
+
+ def get_file(self, key):
+ """Return a file-like representation or raise a KeyError."""
+ return StringIO.StringIO(self.get_string(key).replace('\n',
+ os.linesep))
+
+ def get_labels(self):
+ """Return a list of user-defined labels in the mailbox."""
+ self._lookup()
+ labels = set()
+ for label_list in self._labels.values():
+ labels.update(label_list)
+ labels.difference_update(self._special_labels)
+ return list(labels)
+
+ def _generate_toc(self):
+ """Generate key-to-(start, stop) table of contents."""
+ starts, stops = [], []
+ self._file.seek(0)
+ next_pos = 0
+ label_lists = []
+ while True:
+ line_pos = next_pos
+ line = self._file.readline()
+ next_pos = self._file.tell()
+ if line == '\037\014' + os.linesep:
+ if len(stops) < len(starts):
+ stops.append(line_pos - len(os.linesep))
+ starts.append(next_pos)
+ labels = [label.strip() for label
+ in self._file.readline()[1:].split(',')
+ if label.strip() != '']
+ label_lists.append(labels)
+ elif line == '\037' or line == '\037' + os.linesep:
+ if len(stops) < len(starts):
+ stops.append(line_pos - len(os.linesep))
+ elif line == '':
+ stops.append(line_pos - len(os.linesep))
break
- lines.append(line)
- if sizehint >= 0:
- sizehint = sizehint - len(line)
+ self._toc = dict(enumerate(zip(starts, stops)))
+ self._labels = dict(enumerate(label_lists))
+ self._next_key = len(self._toc)
+
+ def _pre_mailbox_hook(self, f):
+ """Called before writing the mailbox to file f."""
+ f.write('BABYL OPTIONS:%sVersion: 5%sLabels:%s%s\037' %
+ (os.linesep, os.linesep, ','.join(self.get_labels()),
+ os.linesep))
+
+ def _pre_message_hook(self, f):
+ """Called before writing each message to file f."""
+ f.write('\014' + os.linesep)
+
+ def _post_message_hook(self, f):
+ """Called after writing each message to file f."""
+ f.write(os.linesep + '\037')
+
+ def _install_message(self, message):
+ """Write message contents and return (start, stop)."""
+ start = self._file.tell()
+ if isinstance(message, BabylMessage):
+ special_labels = []
+ labels = []
+ for label in message.get_labels():
+ if label in self._special_labels:
+ special_labels.append(label)
+ else:
+ labels.append(label)
+ self._file.write('1')
+ for label in special_labels:
+ self._file.write(', ' + label)
+ self._file.write(',,')
+ for label in labels:
+ self._file.write(' ' + label + ',')
+ self._file.write(os.linesep)
+ else:
+ self._file.write('1,,' + os.linesep)
+ if isinstance(message, email.Message.Message):
+ orig_buffer = StringIO.StringIO()
+ orig_generator = email.Generator.Generator(orig_buffer, False, 0)
+ orig_generator.flatten(message)
+ orig_buffer.seek(0)
+ while True:
+ line = orig_buffer.readline()
+ self._file.write(line.replace('\n', os.linesep))
+ if line == '\n' or line == '':
+ break
+ self._file.write('*** EOOH ***' + os.linesep)
+ if isinstance(message, BabylMessage):
+ vis_buffer = StringIO.StringIO()
+ vis_generator = email.Generator.Generator(vis_buffer, False, 0)
+ vis_generator.flatten(message.get_visible())
+ while True:
+ line = vis_buffer.readline()
+ self._file.write(line.replace('\n', os.linesep))
+ if line == '\n' or line == '':
+ break
+ else:
+ orig_buffer.seek(0)
+ while True:
+ line = orig_buffer.readline()
+ self._file.write(line.replace('\n', os.linesep))
+ if line == '\n' or line == '':
+ break
+ while True:
+ buffer = orig_buffer.read(4096) # Buffer size is arbitrary.
+ if buffer == '':
+ break
+ self._file.write(buffer.replace('\n', os.linesep))
+ elif isinstance(message, str):
+ body_start = message.find('\n\n') + 2
+ if body_start - 2 != -1:
+ self._file.write(message[:body_start].replace('\n',
+ os.linesep))
+ self._file.write('*** EOOH ***' + os.linesep)
+ self._file.write(message[:body_start].replace('\n',
+ os.linesep))
+ self._file.write(message[body_start:].replace('\n',
+ os.linesep))
+ else:
+ self._file.write('*** EOOH ***' + os.linesep + os.linesep)
+ self._file.write(message.replace('\n', os.linesep))
+ elif hasattr(message, 'readline'):
+ original_pos = message.tell()
+ first_pass = True
+ while True:
+ line = message.readline()
+ self._file.write(line.replace('\n', os.linesep))
+ if line == '\n' or line == '':
+ self._file.write('*** EOOH ***' + os.linesep)
+ if first_pass:
+ first_pass = False
+ message.seek(original_pos)
+ else:
+ break
+ while True:
+ buffer = message.read(4096) # Buffer size is arbitrary.
+ if buffer == '':
+ break
+ self._file.write(buffer.replace('\n', os.linesep))
+ else:
+ raise TypeError('Invalid message type: %s' % type(message))
+ stop = self._file.tell()
+ return (start, stop)
+
+
+class Message(email.Message.Message):
+ """Message with mailbox-format-specific properties."""
+
+ def __init__(self, message=None):
+ """Initialize a Message instance."""
+ if isinstance(message, email.Message.Message):
+ self._become_message(copy.deepcopy(message))
+ if isinstance(message, Message):
+ message._explain_to(self)
+ elif isinstance(message, str):
+ self._become_message(email.message_from_string(message))
+ elif hasattr(message, "read"):
+ self._become_message(email.message_from_file(message))
+ elif message is None:
+ email.Message.Message.__init__(self)
+ else:
+ raise TypeError('Invalid message type: %s' % type(message))
+
+ def _become_message(self, message):
+ """Assume the non-format-specific state of message."""
+ for name in ('_headers', '_unixfrom', '_payload', '_charset',
+ 'preamble', 'epilogue', 'defects', '_default_type'):
+ self.__dict__[name] = message.__dict__[name]
+
+ def _explain_to(self, message):
+ """Copy format-specific state to message insofar as possible."""
+ if isinstance(message, Message):
+ return # There's nothing format-specific to explain.
+ else:
+ raise TypeError('Cannot convert to specified type')
+
+
+class MaildirMessage(Message):
+ """Message with Maildir-specific properties."""
+
+ def __init__(self, message=None):
+ """Initialize a MaildirMessage instance."""
+ self._subdir = 'new'
+ self._info = ''
+ self._date = time.time()
+ Message.__init__(self, message)
+
+ def get_subdir(self):
+ """Return 'new' or 'cur'."""
+ return self._subdir
+
+ def set_subdir(self, subdir):
+ """Set subdir to 'new' or 'cur'."""
+ if subdir == 'new' or subdir == 'cur':
+ self._subdir = subdir
+ else:
+ raise ValueError("subdir must be 'new' or 'cur': %s" % subdir)
+
+ def get_flags(self):
+ """Return as a string the flags that are set."""
+ if self._info.startswith('2,'):
+ return self._info[2:]
+ else:
+ return ''
+
+ def set_flags(self, flags):
+ """Set the given flags and unset all others."""
+ self._info = '2,' + ''.join(sorted(flags))
+
+ def add_flag(self, flag):
+ """Set the given flag(s) without changing others."""
+ self.set_flags(''.join(set(self.get_flags()) | set(flag)))
+
+ def remove_flag(self, flag):
+ """Unset the given string flag(s) without changing others."""
+ if self.get_flags() != '':
+ self.set_flags(''.join(set(self.get_flags()) - set(flag)))
+
+ def get_date(self):
+ """Return delivery date of message, in seconds since the epoch."""
+ return self._date
+
+ def set_date(self, date):
+ """Set delivery date of message, in seconds since the epoch."""
+ try:
+ self._date = float(date)
+ except ValueError:
+ raise TypeError("can't convert to float: %s" % date)
+
+ def get_info(self):
+ """Get the message's "info" as a string."""
+ return self._info
+
+ def set_info(self, info):
+ """Set the message's "info" string."""
+ if isinstance(info, str):
+ self._info = info
+ else:
+ raise TypeError('info must be a string: %s' % type(info))
+
+ def _explain_to(self, message):
+ """Copy Maildir-specific state to message insofar as possible."""
+ if isinstance(message, MaildirMessage):
+ message.set_flags(self.get_flags())
+ message.set_subdir(self.get_subdir())
+ message.set_date(self.get_date())
+ elif isinstance(message, _mboxMMDFMessage):
+ flags = set(self.get_flags())
+ if 'S' in flags:
+ message.add_flag('R')
+ if self.get_subdir() == 'cur':
+ message.add_flag('O')
+ if 'T' in flags:
+ message.add_flag('D')
+ if 'F' in flags:
+ message.add_flag('F')
+ if 'R' in flags:
+ message.add_flag('A')
+ message.set_from('MAILER-DAEMON', time.gmtime(self.get_date()))
+ elif isinstance(message, MHMessage):
+ flags = set(self.get_flags())
+ if 'S' not in flags:
+ message.add_sequence('unseen')
+ if 'R' in flags:
+ message.add_sequence('replied')
+ if 'F' in flags:
+ message.add_sequence('flagged')
+ elif isinstance(message, BabylMessage):
+ flags = set(self.get_flags())
+ if 'S' not in flags:
+ message.add_label('unseen')
+ if 'T' in flags:
+ message.add_label('deleted')
+ if 'R' in flags:
+ message.add_label('answered')
+ if 'P' in flags:
+ message.add_label('forwarded')
+ elif isinstance(message, Message):
+ pass
+ else:
+ raise TypeError('Cannot convert to specified type: %s' %
+ type(message))
+
+
+class _mboxMMDFMessage(Message):
+ """Message with mbox- or MMDF-specific properties."""
+
+ def __init__(self, message=None):
+ """Initialize an mboxMMDFMessage instance."""
+ self.set_from('MAILER-DAEMON', True)
+ if isinstance(message, email.Message.Message):
+ unixfrom = message.get_unixfrom()
+ if unixfrom is not None and unixfrom.startswith('From '):
+ self.set_from(unixfrom[5:])
+ Message.__init__(self, message)
+
+ def get_from(self):
+ """Return contents of "From " line."""
+ return self._from
+
+ def set_from(self, from_, time_=None):
+ """Set "From " line, formatting and appending time_ if specified."""
+ if time_ is not None:
+ if time_ is True:
+ time_ = time.gmtime()
+ from_ += ' ' + time.asctime(time_)
+ self._from = from_
+
+ def get_flags(self):
+ """Return as a string the flags that are set."""
+ return self.get('Status', '') + self.get('X-Status', '')
+
+ def set_flags(self, flags):
+ """Set the given flags and unset all others."""
+ flags = set(flags)
+ status_flags, xstatus_flags = '', ''
+ for flag in ('R', 'O'):
+ if flag in flags:
+ status_flags += flag
+ flags.remove(flag)
+ for flag in ('D', 'F', 'A'):
+ if flag in flags:
+ xstatus_flags += flag
+ flags.remove(flag)
+ xstatus_flags += ''.join(sorted(flags))
+ try:
+ self.replace_header('Status', status_flags)
+ except KeyError:
+ self.add_header('Status', status_flags)
+ try:
+ self.replace_header('X-Status', xstatus_flags)
+ except KeyError:
+ self.add_header('X-Status', xstatus_flags)
+
+ def add_flag(self, flag):
+ """Set the given flag(s) without changing others."""
+ self.set_flags(''.join(set(self.get_flags()) | set(flag)))
+
+ def remove_flag(self, flag):
+ """Unset the given string flag(s) without changing others."""
+ if 'Status' in self or 'X-Status' in self:
+ self.set_flags(''.join(set(self.get_flags()) - set(flag)))
+
+ def _explain_to(self, message):
+ """Copy mbox- or MMDF-specific state to message insofar as possible."""
+ if isinstance(message, MaildirMessage):
+ flags = set(self.get_flags())
+ if 'O' in flags:
+ message.set_subdir('cur')
+ if 'F' in flags:
+ message.add_flag('F')
+ if 'A' in flags:
+ message.add_flag('R')
+ if 'R' in flags:
+ message.add_flag('S')
+ if 'D' in flags:
+ message.add_flag('T')
+ del message['status']
+ del message['x-status']
+ maybe_date = ' '.join(self.get_from().split()[-5:])
+ try:
+ message.set_date(calendar.timegm(time.strptime(maybe_date,
+ '%a %b %d %H:%M:%S %Y')))
+ except (ValueError, OverflowError):
+ pass
+ elif isinstance(message, _mboxMMDFMessage):
+ message.set_flags(self.get_flags())
+ message.set_from(self.get_from())
+ elif isinstance(message, MHMessage):
+ flags = set(self.get_flags())
+ if 'R' not in flags:
+ message.add_sequence('unseen')
+ if 'A' in flags:
+ message.add_sequence('replied')
+ if 'F' in flags:
+ message.add_sequence('flagged')
+ del message['status']
+ del message['x-status']
+ elif isinstance(message, BabylMessage):
+ flags = set(self.get_flags())
+ if 'R' not in flags:
+ message.add_label('unseen')
+ if 'D' in flags:
+ message.add_label('deleted')
+ if 'A' in flags:
+ message.add_label('answered')
+ del message['status']
+ del message['x-status']
+ elif isinstance(message, Message):
+ pass
+ else:
+ raise TypeError('Cannot convert to specified type: %s' %
+ type(message))
+
+
+class mboxMessage(_mboxMMDFMessage):
+ """Message with mbox-specific properties."""
+
+
+class MHMessage(Message):
+ """Message with MH-specific properties."""
+
+ def __init__(self, message=None):
+ """Initialize an MHMessage instance."""
+ self._sequences = []
+ Message.__init__(self, message)
+
+ def get_sequences(self):
+ """Return a list of sequences that include the message."""
+ return self._sequences[:]
+
+ def set_sequences(self, sequences):
+ """Set the list of sequences that include the message."""
+ self._sequences = list(sequences)
+
+ def add_sequence(self, sequence):
+ """Add sequence to list of sequences including the message."""
+ if isinstance(sequence, str):
+ if not sequence in self._sequences:
+ self._sequences.append(sequence)
+ else:
+ raise TypeError('sequence must be a string: %s' % type(sequence))
+
+ def remove_sequence(self, sequence):
+ """Remove sequence from the list of sequences including the message."""
+ try:
+ self._sequences.remove(sequence)
+ except ValueError:
+ pass
+
+ def _explain_to(self, message):
+ """Copy MH-specific state to message insofar as possible."""
+ if isinstance(message, MaildirMessage):
+ sequences = set(self.get_sequences())
+ if 'unseen' in sequences:
+ message.set_subdir('cur')
+ else:
+ message.set_subdir('cur')
+ message.add_flag('S')
+ if 'flagged' in sequences:
+ message.add_flag('F')
+ if 'replied' in sequences:
+ message.add_flag('R')
+ elif isinstance(message, _mboxMMDFMessage):
+ sequences = set(self.get_sequences())
+ if 'unseen' not in sequences:
+ message.add_flag('RO')
+ else:
+ message.add_flag('O')
+ if 'flagged' in sequences:
+ message.add_flag('F')
+ if 'replied' in sequences:
+ message.add_flag('A')
+ elif isinstance(message, MHMessage):
+ for sequence in self.get_sequences():
+ message.add_sequence(sequence)
+ elif isinstance(message, BabylMessage):
+ sequences = set(self.get_sequences())
+ if 'unseen' in sequences:
+ message.add_label('unseen')
+ if 'replied' in sequences:
+ message.add_label('answered')
+ elif isinstance(message, Message):
+ pass
+ else:
+ raise TypeError('Cannot convert to specified type: %s' %
+ type(message))
+
+
+class BabylMessage(Message):
+ """Message with Babyl-specific properties."""
+
+ def __init__(self, message=None):
+ """Initialize an BabylMessage instance."""
+ self._labels = []
+ self._visible = Message()
+ Message.__init__(self, message)
+
+ def get_labels(self):
+ """Return a list of labels on the message."""
+ return self._labels[:]
+
+ def set_labels(self, labels):
+ """Set the list of labels on the message."""
+ self._labels = list(labels)
+
+ def add_label(self, label):
+ """Add label to list of labels on the message."""
+ if isinstance(label, str):
+ if label not in self._labels:
+ self._labels.append(label)
+ else:
+ raise TypeError('label must be a string: %s' % type(label))
+
+ def remove_label(self, label):
+ """Remove label from the list of labels on the message."""
+ try:
+ self._labels.remove(label)
+ except ValueError:
+ pass
+
+ def get_visible(self):
+ """Return a Message representation of visible headers."""
+ return Message(self._visible)
+
+ def set_visible(self, visible):
+ """Set the Message representation of visible headers."""
+ self._visible = Message(visible)
+
+ def update_visible(self):
+ """Update and/or sensibly generate a set of visible headers."""
+ for header in self._visible.keys():
+ if header in self:
+ self._visible.replace_header(header, self[header])
+ else:
+ del self._visible[header]
+ for header in ('Date', 'From', 'Reply-To', 'To', 'CC', 'Subject'):
+ if header in self and header not in self._visible:
+ self._visible[header] = self[header]
+
+ def _explain_to(self, message):
+ """Copy Babyl-specific state to message insofar as possible."""
+ if isinstance(message, MaildirMessage):
+ labels = set(self.get_labels())
+ if 'unseen' in labels:
+ message.set_subdir('cur')
+ else:
+ message.set_subdir('cur')
+ message.add_flag('S')
+ if 'forwarded' in labels or 'resent' in labels:
+ message.add_flag('P')
+ if 'answered' in labels:
+ message.add_flag('R')
+ if 'deleted' in labels:
+ message.add_flag('T')
+ elif isinstance(message, _mboxMMDFMessage):
+ labels = set(self.get_labels())
+ if 'unseen' not in labels:
+ message.add_flag('RO')
+ else:
+ message.add_flag('O')
+ if 'deleted' in labels:
+ message.add_flag('D')
+ if 'answered' in labels:
+ message.add_flag('A')
+ elif isinstance(message, MHMessage):
+ labels = set(self.get_labels())
+ if 'unseen' in labels:
+ message.add_sequence('unseen')
+ if 'answered' in labels:
+ message.add_sequence('replied')
+ elif isinstance(message, BabylMessage):
+ message.set_visible(self.get_visible())
+ for label in self.get_labels():
+ message.add_label(label)
+ elif isinstance(message, Message):
+ pass
+ else:
+ raise TypeError('Cannot convert to specified type: %s' %
+ type(message))
+
+
+class MMDFMessage(_mboxMMDFMessage):
+ """Message with MMDF-specific properties."""
+
+
+class _ProxyFile:
+ """A read-only wrapper of a file."""
+
+ def __init__(self, f, pos=None):
+ """Initialize a _ProxyFile."""
+ self._file = f
+ if pos is None:
+ self._pos = f.tell()
+ else:
+ self._pos = pos
+
+ def read(self, size=None):
+ """Read bytes."""
+ return self._read(size, self._file.read)
+
+ def readline(self, size=None):
+ """Read a line."""
+ return self._read(size, self._file.readline)
+
+ def readlines(self, sizehint=None):
+ """Read multiple lines."""
+ result = []
+ for line in self:
+ result.append(line)
+ if sizehint is not None:
+ sizehint -= len(line)
if sizehint <= 0:
break
- return lines
+ return result
+
+ def __iter__(self):
+ """Iterate over lines."""
+ return iter(self.readline, "")
def tell(self):
- return self.pos - self.start
+ """Return the position."""
+ return self._pos
+
+ def seek(self, offset, whence=0):
+ """Change position."""
+ if whence == 1:
+ self._file.seek(self._pos)
+ self._file.seek(offset, whence)
+ self._pos = self._file.tell()
- def seek(self, pos, whence=0):
+ def close(self):
+ """Close the file."""
+ del self._file
+
+ def _read(self, size, read_method):
+ """Read size bytes using read_method."""
+ if size is None:
+ size = -1
+ self._file.seek(self._pos)
+ result = read_method(size)
+ self._pos = self._file.tell()
+ return result
+
+
+class _PartialFile(_ProxyFile):
+ """A read-only wrapper of part of a file."""
+
+ def __init__(self, f, start=None, stop=None):
+ """Initialize a _PartialFile."""
+ _ProxyFile.__init__(self, f, start)
+ self._start = start
+ self._stop = stop
+
+ def tell(self):
+ """Return the position with respect to start."""
+ return _ProxyFile.tell(self) - self._start
+
+ def seek(self, offset, whence=0):
+ """Change position, possibly with respect to start or stop."""
if whence == 0:
- self.pos = self.start + pos
- elif whence == 1:
- self.pos = self.pos + pos
+ self._pos = self._start
+ whence = 1
elif whence == 2:
- self.pos = self.stop + pos
+ self._pos = self._stop
+ whence = 1
+ _ProxyFile.seek(self, offset, whence)
- def close(self):
- del self.fp
+ def _read(self, size, read_method):
+ """Read size bytes using read_method, honoring start and stop."""
+ remaining = self._stop - self._pos
+ if remaining <= 0:
+ return ''
+ if size is None or size < 0 or size > remaining:
+ size = remaining
+ return _ProxyFile._read(self, size, read_method)
+
+
+def _lock_file(f, dotlock=True):
+ """Lock file f using lockf, flock, and dot locking."""
+ dotlock_done = False
+ try:
+ if fcntl:
+ try:
+ fcntl.lockf(f, fcntl.LOCK_EX | fcntl.LOCK_NB)
+ except IOError, e:
+ if e.errno == errno.EAGAIN:
+ raise ExternalClashError('lockf: lock unavailable: %s' %
+ f.name)
+ else:
+ raise
+ try:
+ fcntl.flock(f, fcntl.LOCK_EX | fcntl.LOCK_NB)
+ except IOError, e:
+ if e.errno == errno.EWOULDBLOCK:
+ raise ExternalClashError('flock: lock unavailable: %s' %
+ f.name)
+ else:
+ raise
+ if dotlock:
+ try:
+ pre_lock = _create_temporary(f.name + '.lock')
+ pre_lock.close()
+ except IOError, e:
+ if e.errno == errno.EACCES:
+ return # Without write access, just skip dotlocking.
+ else:
+ raise
+ try:
+ if hasattr(os, 'link'):
+ os.link(pre_lock.name, f.name + '.lock')
+ dotlock_done = True
+ os.unlink(pre_lock.name)
+ else:
+ os.rename(pre_lock.name, f.name + '.lock')
+ dotlock_done = True
+ except OSError, e:
+ if e.errno == errno.EEXIST:
+ os.remove(pre_lock.name)
+ raise ExternalClashError('dot lock unavailable: %s' %
+ f.name)
+ else:
+ raise
+ except:
+ if fcntl:
+ fcntl.lockf(f, fcntl.LOCK_UN)
+ fcntl.flock(f, fcntl.LOCK_UN)
+ if dotlock_done:
+ os.remove(f.name + '.lock')
+ raise
+
+def _unlock_file(f):
+ """Unlock file f using lockf, flock, and dot locking."""
+ if fcntl:
+ fcntl.lockf(f, fcntl.LOCK_UN)
+ fcntl.flock(f, fcntl.LOCK_UN)
+ if os.path.exists(f.name + '.lock'):
+ os.remove(f.name + '.lock')
+
+def _create_carefully(path):
+ """Create a file if it doesn't exist and open for reading and writing."""
+ fd = os.open(path, os.O_CREAT | os.O_EXCL | os.O_RDWR)
+ try:
+ return file(path, 'rb+')
+ finally:
+ os.close(fd)
+
+def _create_temporary(path):
+ """Create a temp file based on path and open for reading and writing."""
+ return _create_carefully('%s.%s.%s.%s' % (path, int(time.time()),
+ socket.gethostname(),
+ os.getpid()))
+
+
+## Start: classes from the original module (for backward compatibility).
+
+# Note that the Maildir class, whose name is unchanged, itself offers a next()
+# method for backward compatibility.
+
+class _Mailbox:
+
+ def __init__(self, fp, factory=rfc822.Message):
+ self.fp = fp
+ self.seekp = 0
+ self.factory = factory
+ def __iter__(self):
+ return iter(self.next, None)
+
+ def next(self):
+ while 1:
+ self.fp.seek(self.seekp)
+ try:
+ self._search_start()
+ except EOFError:
+ self.seekp = self.fp.tell()
+ return None
+ start = self.fp.tell()
+ self._search_end()
+ self.seekp = stop = self.fp.tell()
+ if start != stop:
+ break
+ return self.factory(_PartialFile(self.fp, start, stop))
# Recommended to use PortableUnixMailbox instead!
class UnixMailbox(_Mailbox):
@@ -213,36 +2029,6 @@ class MHMailbox:
return msg
-class Maildir:
- # Qmail directory mailbox
-
- def __init__(self, dirname, factory=rfc822.Message):
- self.dirname = dirname
- self.factory = factory
-
- # check for new mail
- newdir = os.path.join(self.dirname, 'new')
- boxes = [os.path.join(newdir, f)
- for f in os.listdir(newdir) if f[0] != '.']
-
- # Now check for current mail in this maildir
- curdir = os.path.join(self.dirname, 'cur')
- boxes += [os.path.join(curdir, f)
- for f in os.listdir(curdir) if f[0] != '.']
- boxes.reverse()
- self.boxes = boxes
-
- def __iter__(self):
- return iter(self.next, None)
-
- def next(self):
- if not self.boxes:
- return None
- fn = self.boxes.pop()
- fp = open(fn)
- return self.factory(fp)
-
-
class BabylMailbox(_Mailbox):
def _search_start(self):
@@ -263,59 +2049,20 @@ class BabylMailbox(_Mailbox):
self.fp.seek(pos)
return
+## End: classes from the original module (for backward compatibility).
-def _test():
- import sys
- args = sys.argv[1:]
- if not args:
- for key in 'MAILDIR', 'MAIL', 'LOGNAME', 'USER':
- if key in os.environ:
- mbox = os.environ[key]
- break
- else:
- print "$MAIL, $LOGNAME nor $USER set -- who are you?"
- return
- else:
- mbox = args[0]
- if mbox[:1] == '+':
- mbox = os.environ['HOME'] + '/Mail/' + mbox[1:]
- elif not '/' in mbox:
- if os.path.isfile('/var/mail/' + mbox):
- mbox = '/var/mail/' + mbox
- else:
- mbox = '/usr/mail/' + mbox
- if os.path.isdir(mbox):
- if os.path.isdir(os.path.join(mbox, 'cur')):
- mb = Maildir(mbox)
- else:
- mb = MHMailbox(mbox)
- else:
- fp = open(mbox, 'r')
- mb = PortableUnixMailbox(fp)
-
- msgs = []
- while 1:
- msg = mb.next()
- if msg is None:
- break
- msgs.append(msg)
- if len(args) <= 1:
- msg.fp = None
- if len(args) > 1:
- num = int(args[1])
- print 'Message %d body:'%num
- msg = msgs[num-1]
- msg.rewindbody()
- sys.stdout.write(msg.fp.read())
- else:
- print 'Mailbox',mbox,'has',len(msgs),'messages:'
- for msg in msgs:
- f = msg.getheader('from') or ""
- s = msg.getheader('subject') or ""
- d = msg.getheader('date') or ""
- print '-%20.20s %20.20s %-30.30s'%(f, d[5:], s)
-
-
-if __name__ == '__main__':
- _test()
+class Error(Exception):
+ """Raised for module-specific errors."""
+
+class NoSuchMailboxError(Error):
+ """The specified mailbox does not exist and won't be created."""
+
+class NotEmptyError(Error):
+ """The specified mailbox is not empty and deletion was requested."""
+
+class ExternalClashError(Error):
+ """Another process caused an action to fail."""
+
+class FormatError(Error):
+ """A file appears to have an invalid format."""
diff --git a/Lib/test/test_mailbox.py b/Lib/test/test_mailbox.py
index 77d39a6..83c2443 100644
--- a/Lib/test/test_mailbox.py
+++ b/Lib/test/test_mailbox.py
@@ -1,15 +1,1572 @@
-import mailbox
import os
import time
-import unittest
+import stat
+import socket
+import email
+import email.Message
+import rfc822
+import re
+import StringIO
from test import test_support
-
-# cleanup earlier tests
+import unittest
+import mailbox
+import glob
try:
- os.unlink(test_support.TESTFN)
-except os.error:
+ import fcntl
+except ImportError:
pass
+
+class TestBase(unittest.TestCase):
+
+ def _check_sample(self, msg):
+ # Inspect a mailbox.Message representation of the sample message
+ self.assert_(isinstance(msg, email.Message.Message))
+ self.assert_(isinstance(msg, mailbox.Message))
+ for key, value in _sample_headers.iteritems():
+ self.assert_(value in msg.get_all(key))
+ self.assert_(msg.is_multipart())
+ self.assert_(len(msg.get_payload()) == len(_sample_payloads))
+ for i, payload in enumerate(_sample_payloads):
+ part = msg.get_payload(i)
+ self.assert_(isinstance(part, email.Message.Message))
+ self.assert_(not isinstance(part, mailbox.Message))
+ self.assert_(part.get_payload() == payload)
+
+ def _delete_recursively(self, target):
+ # Delete a file or delete a directory recursively
+ if os.path.isdir(target):
+ for path, dirs, files in os.walk(target, topdown=False):
+ for name in files:
+ os.remove(os.path.join(path, name))
+ for name in dirs:
+ os.rmdir(os.path.join(path, name))
+ os.rmdir(target)
+ elif os.path.exists(target):
+ os.remove(target)
+
+
+class TestMailbox(TestBase):
+
+ _factory = None # Overridden by subclasses to reuse tests
+ _template = 'From: foo\n\n%s'
+
+ def setUp(self):
+ self._path = test_support.TESTFN
+ self._box = self._factory(self._path)
+
+ def tearDown(self):
+ self._box.close()
+ self._delete_recursively(self._path)
+
+ def test_add(self):
+ # Add copies of a sample message
+ keys = []
+ keys.append(self._box.add(self._template % 0))
+ self.assert_(len(self._box) == 1)
+ keys.append(self._box.add(mailbox.Message(_sample_message)))
+ self.assert_(len(self._box) == 2)
+ keys.append(self._box.add(email.message_from_string(_sample_message)))
+ self.assert_(len(self._box) == 3)
+ keys.append(self._box.add(StringIO.StringIO(_sample_message)))
+ self.assert_(len(self._box) == 4)
+ keys.append(self._box.add(_sample_message))
+ self.assert_(len(self._box) == 5)
+ self.assert_(self._box.get_string(keys[0]) == self._template % 0)
+ for i in (1, 2, 3, 4):
+ self._check_sample(self._box[keys[i]])
+
+ def test_remove(self):
+ # Remove messages using remove()
+ self._test_remove_or_delitem(self._box.remove)
+
+ def test_delitem(self):
+ # Remove messages using __delitem__()
+ self._test_remove_or_delitem(self._box.__delitem__)
+
+ def _test_remove_or_delitem(self, method):
+ # (Used by test_remove() and test_delitem().)
+ key0 = self._box.add(self._template % 0)
+ key1 = self._box.add(self._template % 1)
+ self.assert_(len(self._box) == 2)
+ method(key0)
+ l = len(self._box)
+ self.assert_(l == 1, "actual l: %s" % l)
+ self.assertRaises(KeyError, lambda: self._box[key0])
+ self.assertRaises(KeyError, lambda: method(key0))
+ self.assert_(self._box.get_string(key1) == self._template % 1)
+ key2 = self._box.add(self._template % 2)
+ self.assert_(len(self._box) == 2)
+ method(key2)
+ l = len(self._box)
+ self.assert_(l == 1, "actual l: %s" % l)
+ self.assertRaises(KeyError, lambda: self._box[key2])
+ self.assertRaises(KeyError, lambda: method(key2))
+ self.assert_(self._box.get_string(key1) == self._template % 1)
+ method(key1)
+ self.assert_(len(self._box) == 0)
+ self.assertRaises(KeyError, lambda: self._box[key1])
+ self.assertRaises(KeyError, lambda: method(key1))
+
+ def test_discard(self, repetitions=10):
+ # Discard messages
+ key0 = self._box.add(self._template % 0)
+ key1 = self._box.add(self._template % 1)
+ self.assert_(len(self._box) == 2)
+ self._box.discard(key0)
+ self.assert_(len(self._box) == 1)
+ self.assertRaises(KeyError, lambda: self._box[key0])
+ self._box.discard(key0)
+ self.assert_(len(self._box) == 1)
+ self.assertRaises(KeyError, lambda: self._box[key0])
+
+ def test_get(self):
+ # Retrieve messages using get()
+ key0 = self._box.add(self._template % 0)
+ msg = self._box.get(key0)
+ self.assert_(msg['from'] == 'foo')
+ self.assert_(msg.get_payload() == '0')
+ self.assert_(self._box.get('foo') is None)
+ self.assert_(self._box.get('foo', False) is False)
+ self._box.close()
+ self._box = self._factory(self._path, factory=rfc822.Message)
+ key1 = self._box.add(self._template % 1)
+ msg = self._box.get(key1)
+ self.assert_(msg['from'] == 'foo')
+ self.assert_(msg.fp.read() == '1')
+
+ def test_getitem(self):
+ # Retrieve message using __getitem__()
+ key0 = self._box.add(self._template % 0)
+ msg = self._box[key0]
+ self.assert_(msg['from'] == 'foo')
+ self.assert_(msg.get_payload() == '0')
+ self.assertRaises(KeyError, lambda: self._box['foo'])
+ self._box.discard(key0)
+ self.assertRaises(KeyError, lambda: self._box[key0])
+
+ def test_get_message(self):
+ # Get Message representations of messages
+ key0 = self._box.add(self._template % 0)
+ key1 = self._box.add(_sample_message)
+ msg0 = self._box.get_message(key0)
+ self.assert_(isinstance(msg0, mailbox.Message))
+ self.assert_(msg0['from'] == 'foo')
+ self.assert_(msg0.get_payload() == '0')
+ self._check_sample(self._box.get_message(key1))
+
+ def test_get_string(self):
+ # Get string representations of messages
+ key0 = self._box.add(self._template % 0)
+ key1 = self._box.add(_sample_message)
+ self.assert_(self._box.get_string(key0) == self._template % 0)
+ self.assert_(self._box.get_string(key1) == _sample_message)
+
+ def test_get_file(self):
+ # Get file representations of messages
+ key0 = self._box.add(self._template % 0)
+ key1 = self._box.add(_sample_message)
+ self.assert_(self._box.get_file(key0).read().replace(os.linesep, '\n')
+ == self._template % 0)
+ self.assert_(self._box.get_file(key1).read().replace(os.linesep, '\n')
+ == _sample_message)
+
+ def test_iterkeys(self):
+ # Get keys using iterkeys()
+ self._check_iteration(self._box.iterkeys, do_keys=True, do_values=False)
+
+ def test_keys(self):
+ # Get keys using keys()
+ self._check_iteration(self._box.keys, do_keys=True, do_values=False)
+
+ def test_itervalues(self):
+ # Get values using itervalues()
+ self._check_iteration(self._box.itervalues, do_keys=False,
+ do_values=True)
+
+ def test_iter(self):
+ # Get values using __iter__()
+ self._check_iteration(self._box.__iter__, do_keys=False,
+ do_values=True)
+
+ def test_values(self):
+ # Get values using values()
+ self._check_iteration(self._box.values, do_keys=False, do_values=True)
+
+ def test_iteritems(self):
+ # Get keys and values using iteritems()
+ self._check_iteration(self._box.iteritems, do_keys=True,
+ do_values=True)
+
+ def test_items(self):
+ # Get keys and values using items()
+ self._check_iteration(self._box.items, do_keys=True, do_values=True)
+
+ def _check_iteration(self, method, do_keys, do_values, repetitions=10):
+ for value in method():
+ self.fail("Not empty")
+ keys, values = [], []
+ for i in xrange(repetitions):
+ keys.append(self._box.add(self._template % i))
+ values.append(self._template % i)
+ if do_keys and not do_values:
+ returned_keys = list(method())
+ elif do_values and not do_keys:
+ returned_values = list(method())
+ else:
+ returned_keys, returned_values = [], []
+ for key, value in method():
+ returned_keys.append(key)
+ returned_values.append(value)
+ if do_keys:
+ self.assert_(len(keys) == len(returned_keys))
+ self.assert_(set(keys) == set(returned_keys))
+ if do_values:
+ count = 0
+ for value in returned_values:
+ self.assert_(value['from'] == 'foo')
+ self.assert_(int(value.get_payload()) < repetitions)
+ count += 1
+ self.assert_(len(values) == count)
+
+ def test_has_key(self):
+ # Check existence of keys using has_key()
+ self._test_has_key_or_contains(self._box.has_key)
+
+ def test_contains(self):
+ # Check existence of keys using __contains__()
+ self._test_has_key_or_contains(self._box.__contains__)
+
+ def _test_has_key_or_contains(self, method):
+ # (Used by test_has_key() and test_contains().)
+ self.assert_(not method('foo'))
+ key0 = self._box.add(self._template % 0)
+ self.assert_(method(key0))
+ self.assert_(not method('foo'))
+ key1 = self._box.add(self._template % 1)
+ self.assert_(method(key1))
+ self.assert_(method(key0))
+ self.assert_(not method('foo'))
+ self._box.remove(key0)
+ self.assert_(not method(key0))
+ self.assert_(method(key1))
+ self.assert_(not method('foo'))
+ self._box.remove(key1)
+ self.assert_(not method(key1))
+ self.assert_(not method(key0))
+ self.assert_(not method('foo'))
+
+ def test_len(self, repetitions=10):
+ # Get message count
+ keys = []
+ for i in xrange(repetitions):
+ self.assert_(len(self._box) == i)
+ keys.append(self._box.add(self._template % i))
+ self.assert_(len(self._box) == i + 1)
+ for i in xrange(repetitions):
+ self.assert_(len(self._box) == repetitions - i)
+ self._box.remove(keys[i])
+ self.assert_(len(self._box) == repetitions - i - 1)
+
+ def test_set_item(self):
+ # Modify messages using __setitem__()
+ key0 = self._box.add(self._template % 'original 0')
+ self.assert_(self._box.get_string(key0) == \
+ self._template % 'original 0')
+ key1 = self._box.add(self._template % 'original 1')
+ self.assert_(self._box.get_string(key1) == \
+ self._template % 'original 1')
+ self._box[key0] = self._template % 'changed 0'
+ self.assert_(self._box.get_string(key0) == \
+ self._template % 'changed 0')
+ self._box[key1] = self._template % 'changed 1'
+ self.assert_(self._box.get_string(key1) == \
+ self._template % 'changed 1')
+ self._box[key0] = _sample_message
+ self._check_sample(self._box[key0])
+ self._box[key1] = self._box[key0]
+ self._check_sample(self._box[key1])
+ self._box[key0] = self._template % 'original 0'
+ self.assert_(self._box.get_string(key0) ==
+ self._template % 'original 0')
+ self._check_sample(self._box[key1])
+ self.assertRaises(KeyError,
+ lambda: self._box.__setitem__('foo', 'bar'))
+ self.assertRaises(KeyError, lambda: self._box['foo'])
+ self.assert_(len(self._box) == 2)
+
+ def test_clear(self, iterations=10):
+ # Remove all messages using clear()
+ keys = []
+ for i in xrange(iterations):
+ self._box.add(self._template % i)
+ for i, key in enumerate(keys):
+ self.assert_(self._box.get_string(key) == self._template % i)
+ self._box.clear()
+ self.assert_(len(self._box) == 0)
+ for i, key in enumerate(keys):
+ self.assertRaises(KeyError, lambda: self._box.get_string(key))
+
+ def test_pop(self):
+ # Get and remove a message using pop()
+ key0 = self._box.add(self._template % 0)
+ self.assert_(key0 in self._box)
+ key1 = self._box.add(self._template % 1)
+ self.assert_(key1 in self._box)
+ self.assert_(self._box.pop(key0).get_payload() == '0')
+ self.assert_(key0 not in self._box)
+ self.assert_(key1 in self._box)
+ key2 = self._box.add(self._template % 2)
+ self.assert_(key2 in self._box)
+ self.assert_(self._box.pop(key2).get_payload() == '2')
+ self.assert_(key2 not in self._box)
+ self.assert_(key1 in self._box)
+ self.assert_(self._box.pop(key1).get_payload() == '1')
+ self.assert_(key1 not in self._box)
+ self.assert_(len(self._box) == 0)
+
+ def test_popitem(self, iterations=10):
+ # Get and remove an arbitrary (key, message) using popitem()
+ keys = []
+ for i in xrange(10):
+ keys.append(self._box.add(self._template % i))
+ seen = []
+ for i in xrange(10):
+ key, msg = self._box.popitem()
+ self.assert_(key in keys)
+ self.assert_(key not in seen)
+ seen.append(key)
+ self.assert_(int(msg.get_payload()) == keys.index(key))
+ self.assert_(len(self._box) == 0)
+ for key in keys:
+ self.assertRaises(KeyError, lambda: self._box[key])
+
+ def test_update(self):
+ # Modify multiple messages using update()
+ key0 = self._box.add(self._template % 'original 0')
+ key1 = self._box.add(self._template % 'original 1')
+ key2 = self._box.add(self._template % 'original 2')
+ self._box.update({key0: self._template % 'changed 0',
+ key2: _sample_message})
+ self.assert_(len(self._box) == 3)
+ self.assert_(self._box.get_string(key0) ==
+ self._template % 'changed 0')
+ self.assert_(self._box.get_string(key1) ==
+ self._template % 'original 1')
+ self._check_sample(self._box[key2])
+ self._box.update([(key2, self._template % 'changed 2'),
+ (key1, self._template % 'changed 1'),
+ (key0, self._template % 'original 0')])
+ self.assert_(len(self._box) == 3)
+ self.assert_(self._box.get_string(key0) ==
+ self._template % 'original 0')
+ self.assert_(self._box.get_string(key1) ==
+ self._template % 'changed 1')
+ self.assert_(self._box.get_string(key2) ==
+ self._template % 'changed 2')
+ self.assertRaises(KeyError,
+ lambda: self._box.update({'foo': 'bar',
+ key0: self._template % "changed 0"}))
+ self.assert_(len(self._box) == 3)
+ self.assert_(self._box.get_string(key0) ==
+ self._template % "changed 0")
+ self.assert_(self._box.get_string(key1) ==
+ self._template % "changed 1")
+ self.assert_(self._box.get_string(key2) ==
+ self._template % "changed 2")
+
+ def test_flush(self):
+ # Write changes to disk
+ self._test_flush_or_close(self._box.flush)
+
+ def test_lock_unlock(self):
+ # Lock and unlock the mailbox
+ self.assert_(not os.path.exists(self._get_lock_path()))
+ self._box.lock()
+ self.assert_(os.path.exists(self._get_lock_path()))
+ self._box.unlock()
+ self.assert_(not os.path.exists(self._get_lock_path()))
+
+ def test_close(self):
+ # Close mailbox and flush changes to disk
+ self._test_flush_or_close(self._box.close)
+
+ def _test_flush_or_close(self, method):
+ contents = [self._template % i for i in xrange(3)]
+ self._box.add(contents[0])
+ self._box.add(contents[1])
+ self._box.add(contents[2])
+ method()
+ self._box = self._factory(self._path)
+ keys = self._box.keys()
+ self.assert_(len(keys) == 3)
+ for key in keys:
+ self.assert_(self._box.get_string(key) in contents)
+
+ def test_dump_message(self):
+ # Write message representations to disk
+ for input in (email.message_from_string(_sample_message),
+ _sample_message, StringIO.StringIO(_sample_message)):
+ output = StringIO.StringIO()
+ self._box._dump_message(input, output)
+ self.assert_(output.getvalue() ==
+ _sample_message.replace('\n', os.linesep))
+ output = StringIO.StringIO()
+ self.assertRaises(TypeError,
+ lambda: self._box._dump_message(None, output))
+
+ def _get_lock_path(self):
+ # Return the path of the dot lock file. May be overridden.
+ return self._path + '.lock'
+
+
+class TestMailboxSuperclass(TestBase):
+
+ def test_notimplemented(self):
+ # Test that all Mailbox methods raise NotImplementedException.
+ box = mailbox.Mailbox('path')
+ self.assertRaises(NotImplementedError, lambda: box.add(''))
+ self.assertRaises(NotImplementedError, lambda: box.remove(''))
+ self.assertRaises(NotImplementedError, lambda: box.__delitem__(''))
+ self.assertRaises(NotImplementedError, lambda: box.discard(''))
+ self.assertRaises(NotImplementedError, lambda: box.__setitem__('', ''))
+ self.assertRaises(NotImplementedError, lambda: box.iterkeys())
+ self.assertRaises(NotImplementedError, lambda: box.keys())
+ self.assertRaises(NotImplementedError, lambda: box.itervalues().next())
+ self.assertRaises(NotImplementedError, lambda: box.__iter__().next())
+ self.assertRaises(NotImplementedError, lambda: box.values())
+ self.assertRaises(NotImplementedError, lambda: box.iteritems().next())
+ self.assertRaises(NotImplementedError, lambda: box.items())
+ self.assertRaises(NotImplementedError, lambda: box.get(''))
+ self.assertRaises(NotImplementedError, lambda: box.__getitem__(''))
+ self.assertRaises(NotImplementedError, lambda: box.get_message(''))
+ self.assertRaises(NotImplementedError, lambda: box.get_string(''))
+ self.assertRaises(NotImplementedError, lambda: box.get_file(''))
+ self.assertRaises(NotImplementedError, lambda: box.has_key(''))
+ self.assertRaises(NotImplementedError, lambda: box.__contains__(''))
+ self.assertRaises(NotImplementedError, lambda: box.__len__())
+ self.assertRaises(NotImplementedError, lambda: box.clear())
+ self.assertRaises(NotImplementedError, lambda: box.pop(''))
+ self.assertRaises(NotImplementedError, lambda: box.popitem())
+ self.assertRaises(NotImplementedError, lambda: box.update((('', ''),)))
+ self.assertRaises(NotImplementedError, lambda: box.flush())
+ self.assertRaises(NotImplementedError, lambda: box.lock())
+ self.assertRaises(NotImplementedError, lambda: box.unlock())
+ self.assertRaises(NotImplementedError, lambda: box.close())
+
+
+class TestMaildir(TestMailbox):
+
+ _factory = lambda self, path, factory=None: mailbox.Maildir(path, factory)
+
+ def setUp(self):
+ TestMailbox.setUp(self)
+ if os.name == 'nt':
+ self._box.colon = '!'
+
+ def test_add_MM(self):
+ # Add a MaildirMessage instance
+ msg = mailbox.MaildirMessage(self._template % 0)
+ msg.set_subdir('cur')
+ msg.set_info('foo')
+ key = self._box.add(msg)
+ self.assert_(os.path.exists(os.path.join(self._path, 'cur', '%s%sfoo' %
+ (key, self._box.colon))))
+
+ def test_get_MM(self):
+ # Get a MaildirMessage instance
+ msg = mailbox.MaildirMessage(self._template % 0)
+ msg.set_subdir('cur')
+ msg.set_flags('RF')
+ key = self._box.add(msg)
+ msg_returned = self._box.get_message(key)
+ self.assert_(isinstance(msg_returned, mailbox.MaildirMessage))
+ self.assert_(msg_returned.get_subdir() == 'cur')
+ self.assert_(msg_returned.get_flags() == 'FR')
+
+ def test_set_MM(self):
+ # Set with a MaildirMessage instance
+ msg0 = mailbox.MaildirMessage(self._template % 0)
+ msg0.set_flags('TP')
+ key = self._box.add(msg0)
+ msg_returned = self._box.get_message(key)
+ self.assert_(msg_returned.get_subdir() == 'new')
+ self.assert_(msg_returned.get_flags() == 'PT')
+ msg1 = mailbox.MaildirMessage(self._template % 1)
+ self._box[key] = msg1
+ msg_returned = self._box.get_message(key)
+ self.assert_(msg_returned.get_subdir() == 'new')
+ self.assert_(msg_returned.get_flags() == '')
+ self.assert_(msg_returned.get_payload() == '1')
+ msg2 = mailbox.MaildirMessage(self._template % 2)
+ msg2.set_info('2,S')
+ self._box[key] = msg2
+ self._box[key] = self._template % 3
+ msg_returned = self._box.get_message(key)
+ self.assert_(msg_returned.get_subdir() == 'new')
+ self.assert_(msg_returned.get_flags() == 'S')
+ self.assert_(msg_returned.get_payload() == '3')
+
+ def test_initialize_new(self):
+ # Initialize a non-existent mailbox
+ self.tearDown()
+ self._box = mailbox.Maildir(self._path)
+ self._check_basics(factory=rfc822.Message)
+ self._delete_recursively(self._path)
+ self._box = self._factory(self._path, factory=None)
+ self._check_basics()
+
+ def test_initialize_existing(self):
+ # Initialize an existing mailbox
+ self.tearDown()
+ for subdir in '', 'tmp', 'new', 'cur':
+ os.mkdir(os.path.join(self._path, subdir))
+ self._box = mailbox.Maildir(self._path)
+ self._check_basics(factory=rfc822.Message)
+ self._box = mailbox.Maildir(self._path, factory=None)
+ self._check_basics()
+
+ def _check_basics(self, factory=None):
+ # (Used by test_open_new() and test_open_existing().)
+ self.assertEqual(self._box._path, os.path.abspath(self._path))
+ self.assertEqual(self._box._factory, factory)
+ for subdir in '', 'tmp', 'new', 'cur':
+ path = os.path.join(self._path, subdir)
+ mode = os.stat(path)[stat.ST_MODE]
+ self.assert_(stat.S_ISDIR(mode), "Not a directory: '%s'" % path)
+
+ def test_list_folders(self):
+ # List folders
+ self._box.add_folder('one')
+ self._box.add_folder('two')
+ self._box.add_folder('three')
+ self.assert_(len(self._box.list_folders()) == 3)
+ self.assert_(set(self._box.list_folders()) ==
+ set(('one', 'two', 'three')))
+
+ def test_get_folder(self):
+ # Open folders
+ self._box.add_folder('foo.bar')
+ folder0 = self._box.get_folder('foo.bar')
+ folder0.add(self._template % 'bar')
+ self.assert_(os.path.isdir(os.path.join(self._path, '.foo.bar')))
+ folder1 = self._box.get_folder('foo.bar')
+ self.assert_(folder1.get_string(folder1.keys()[0]) == \
+ self._template % 'bar')
+
+ def test_add_and_remove_folders(self):
+ # Delete folders
+ self._box.add_folder('one')
+ self._box.add_folder('two')
+ self.assert_(len(self._box.list_folders()) == 2)
+ self.assert_(set(self._box.list_folders()) == set(('one', 'two')))
+ self._box.remove_folder('one')
+ self.assert_(len(self._box.list_folders()) == 1)
+ self.assert_(set(self._box.list_folders()) == set(('two',)))
+ self._box.add_folder('three')
+ self.assert_(len(self._box.list_folders()) == 2)
+ self.assert_(set(self._box.list_folders()) == set(('two', 'three')))
+ self._box.remove_folder('three')
+ self.assert_(len(self._box.list_folders()) == 1)
+ self.assert_(set(self._box.list_folders()) == set(('two',)))
+ self._box.remove_folder('two')
+ self.assert_(len(self._box.list_folders()) == 0)
+ self.assert_(self._box.list_folders() == [])
+
+ def test_clean(self):
+ # Remove old files from 'tmp'
+ foo_path = os.path.join(self._path, 'tmp', 'foo')
+ bar_path = os.path.join(self._path, 'tmp', 'bar')
+ file(foo_path, 'w').close()
+ file(bar_path, 'w').close()
+ self._box.clean()
+ self.assert_(os.path.exists(foo_path))
+ self.assert_(os.path.exists(bar_path))
+ foo_stat = os.stat(foo_path)
+ os.utime(os.path.join(foo_path), (time.time() - 129600 - 2,
+ foo_stat.st_mtime))
+ self._box.clean()
+ self.assert_(not os.path.exists(foo_path))
+ self.assert_(os.path.exists(bar_path))
+
+ def test_create_tmp(self, repetitions=10):
+ # Create files in tmp directory
+ hostname = socket.gethostname()
+ if '/' in hostname:
+ hostname = hostname.replace('/', r'\057')
+ if ':' in hostname:
+ hostname = hostname.replace(':', r'\072')
+ pid = os.getpid()
+ pattern = re.compile(r"(?P<time>\d+)\.M(?P<M>\d{1,6})P(?P<P>\d+)"
+ r"Q(?P<Q>\d+)\.(?P<host>[^:/]+)")
+ previous_groups = None
+ for x in xrange(repetitions):
+ tmp_file = self._box._create_tmp()
+ head, tail = os.path.split(tmp_file.name)
+ self.assertEqual(head, os.path.abspath(os.path.join(self._path,
+ "tmp")),
+ "File in wrong location: '%s'" % head)
+ match = pattern.match(tail)
+ self.assert_(match != None, "Invalid file name: '%s'" % tail)
+ groups = match.groups()
+ if previous_groups != None:
+ self.assert_(int(groups[0] >= previous_groups[0]),
+ "Non-monotonic seconds: '%s' before '%s'" %
+ (previous_groups[0], groups[0]))
+ self.assert_(int(groups[1] >= previous_groups[1]) or
+ groups[0] != groups[1],
+ "Non-monotonic milliseconds: '%s' before '%s'" %
+ (previous_groups[1], groups[1]))
+ self.assert_(int(groups[2]) == pid,
+ "Process ID mismatch: '%s' should be '%s'" %
+ (groups[2], pid))
+ self.assert_(int(groups[3]) == int(previous_groups[3]) + 1,
+ "Non-sequential counter: '%s' before '%s'" %
+ (previous_groups[3], groups[3]))
+ self.assert_(groups[4] == hostname,
+ "Host name mismatch: '%s' should be '%s'" %
+ (groups[4], hostname))
+ previous_groups = groups
+ tmp_file.write(_sample_message)
+ tmp_file.seek(0)
+ self.assert_(tmp_file.read() == _sample_message)
+ tmp_file.close()
+ file_count = len(os.listdir(os.path.join(self._path, "tmp")))
+ self.assert_(file_count == repetitions,
+ "Wrong file count: '%s' should be '%s'" %
+ (file_count, repetitions))
+
+ def test_refresh(self):
+ # Update the table of contents
+ self.assert_(self._box._toc == {})
+ key0 = self._box.add(self._template % 0)
+ key1 = self._box.add(self._template % 1)
+ self.assert_(self._box._toc == {})
+ self._box._refresh()
+ self.assert_(self._box._toc == {key0: os.path.join('new', key0),
+ key1: os.path.join('new', key1)})
+ key2 = self._box.add(self._template % 2)
+ self.assert_(self._box._toc == {key0: os.path.join('new', key0),
+ key1: os.path.join('new', key1)})
+ self._box._refresh()
+ self.assert_(self._box._toc == {key0: os.path.join('new', key0),
+ key1: os.path.join('new', key1),
+ key2: os.path.join('new', key2)})
+
+ def test_lookup(self):
+ # Look up message subpaths in the TOC
+ self.assertRaises(KeyError, lambda: self._box._lookup('foo'))
+ key0 = self._box.add(self._template % 0)
+ self.assert_(self._box._lookup(key0) == os.path.join('new', key0))
+ os.remove(os.path.join(self._path, 'new', key0))
+ self.assert_(self._box._toc == {key0: os.path.join('new', key0)})
+ self.assertRaises(KeyError, lambda: self._box._lookup(key0))
+ self.assert_(self._box._toc == {})
+
+ def test_lock_unlock(self):
+ # Lock and unlock the mailbox. For Maildir, this does nothing.
+ self._box.lock()
+ self._box.unlock()
+
+
+class _TestMboxMMDF(TestMailbox):
+
+ def tearDown(self):
+ self._box.close()
+ self._delete_recursively(self._path)
+ for lock_remnant in glob.glob(self._path + '.*'):
+ os.remove(lock_remnant)
+
+ def test_add_from_string(self):
+ # Add a string starting with 'From ' to the mailbox
+ key = self._box.add('From foo@bar blah\nFrom: foo\n\n0')
+ self.assert_(self._box[key].get_from() == 'foo@bar blah')
+ self.assert_(self._box[key].get_payload() == '0')
+
+ def test_add_mbox_or_mmdf_message(self):
+ # Add an mboxMessage or MMDFMessage
+ for class_ in (mailbox.mboxMessage, mailbox.MMDFMessage):
+ msg = class_('From foo@bar blah\nFrom: foo\n\n0')
+ key = self._box.add(msg)
+
+ def test_open_close_open(self):
+ # Open and inspect previously-created mailbox
+ values = [self._template % i for i in xrange(3)]
+ for value in values:
+ self._box.add(value)
+ self._box.close()
+ mtime = os.path.getmtime(self._path)
+ self._box = self._factory(self._path)
+ self.assert_(len(self._box) == 3)
+ for key in self._box.iterkeys():
+ self.assert_(self._box.get_string(key) in values)
+ self._box.close()
+ self.assert_(mtime == os.path.getmtime(self._path))
+
+ def test_add_and_close(self):
+ # Verifying that closing a mailbox doesn't change added items
+ self._box.add(_sample_message)
+ for i in xrange(3):
+ self._box.add(self._template % i)
+ self._box.add(_sample_message)
+ self._box._file.flush()
+ self._box._file.seek(0)
+ contents = self._box._file.read()
+ self._box.close()
+ self.assert_(contents == file(self._path, 'rb').read())
+ self._box = self._factory(self._path)
+
+
+class TestMbox(_TestMboxMMDF):
+
+ _factory = lambda self, path, factory=None: mailbox.mbox(path, factory)
+
+
+class TestMMDF(_TestMboxMMDF):
+
+ _factory = lambda self, path, factory=None: mailbox.MMDF(path, factory)
+
+
+class TestMH(TestMailbox):
+
+ _factory = lambda self, path, factory=None: mailbox.MH(path, factory)
+
+ def test_list_folders(self):
+ # List folders
+ self._box.add_folder('one')
+ self._box.add_folder('two')
+ self._box.add_folder('three')
+ self.assert_(len(self._box.list_folders()) == 3)
+ self.assert_(set(self._box.list_folders()) ==
+ set(('one', 'two', 'three')))
+
+ def test_get_folder(self):
+ # Open folders
+ self._box.add_folder('foo.bar')
+ folder0 = self._box.get_folder('foo.bar')
+ folder0.add(self._template % 'bar')
+ self.assert_(os.path.isdir(os.path.join(self._path, 'foo.bar')))
+ folder1 = self._box.get_folder('foo.bar')
+ self.assert_(folder1.get_string(folder1.keys()[0]) == \
+ self._template % 'bar')
+
+ def test_add_and_remove_folders(self):
+ # Delete folders
+ self._box.add_folder('one')
+ self._box.add_folder('two')
+ self.assert_(len(self._box.list_folders()) == 2)
+ self.assert_(set(self._box.list_folders()) == set(('one', 'two')))
+ self._box.remove_folder('one')
+ self.assert_(len(self._box.list_folders()) == 1)
+ self.assert_(set(self._box.list_folders()) == set(('two',)))
+ self._box.add_folder('three')
+ self.assert_(len(self._box.list_folders()) == 2)
+ self.assert_(set(self._box.list_folders()) == set(('two', 'three')))
+ self._box.remove_folder('three')
+ self.assert_(len(self._box.list_folders()) == 1)
+ self.assert_(set(self._box.list_folders()) == set(('two',)))
+ self._box.remove_folder('two')
+ self.assert_(len(self._box.list_folders()) == 0)
+ self.assert_(self._box.list_folders() == [])
+
+ def test_sequences(self):
+ # Get and set sequences
+ self.assert_(self._box.get_sequences() == {})
+ msg0 = mailbox.MHMessage(self._template % 0)
+ msg0.add_sequence('foo')
+ key0 = self._box.add(msg0)
+ self.assert_(self._box.get_sequences() == {'foo':[key0]})
+ msg1 = mailbox.MHMessage(self._template % 1)
+ msg1.set_sequences(['bar', 'replied', 'foo'])
+ key1 = self._box.add(msg1)
+ self.assert_(self._box.get_sequences() ==
+ {'foo':[key0, key1], 'bar':[key1], 'replied':[key1]})
+ msg0.set_sequences(['flagged'])
+ self._box[key0] = msg0
+ self.assert_(self._box.get_sequences() ==
+ {'foo':[key1], 'bar':[key1], 'replied':[key1],
+ 'flagged':[key0]})
+ self._box.remove(key1)
+ self.assert_(self._box.get_sequences() == {'flagged':[key0]})
+
+ def test_pack(self):
+ # Pack the contents of the mailbox
+ msg0 = mailbox.MHMessage(self._template % 0)
+ msg1 = mailbox.MHMessage(self._template % 1)
+ msg2 = mailbox.MHMessage(self._template % 2)
+ msg3 = mailbox.MHMessage(self._template % 3)
+ msg0.set_sequences(['foo', 'unseen'])
+ msg1.set_sequences(['foo'])
+ msg2.set_sequences(['foo', 'flagged'])
+ msg3.set_sequences(['foo', 'bar', 'replied'])
+ key0 = self._box.add(msg0)
+ key1 = self._box.add(msg1)
+ key2 = self._box.add(msg2)
+ key3 = self._box.add(msg3)
+ self.assert_(self._box.get_sequences() ==
+ {'foo':[key0,key1,key2,key3], 'unseen':[key0],
+ 'flagged':[key2], 'bar':[key3], 'replied':[key3]})
+ self._box.remove(key2)
+ self.assert_(self._box.get_sequences() ==
+ {'foo':[key0,key1,key3], 'unseen':[key0], 'bar':[key3],
+ 'replied':[key3]})
+ self._box.pack()
+ self.assert_(self._box.keys() == [1, 2, 3])
+ key0 = key0
+ key1 = key0 + 1
+ key2 = key1 + 1
+ self.assert_(self._box.get_sequences() ==
+ {'foo':[1, 2, 3], 'unseen':[1], 'bar':[3], 'replied':[3]})
+
+ def _get_lock_path(self):
+ return os.path.join(self._path, '.mh_sequences.lock')
+
+
+class TestBabyl(TestMailbox):
+
+ _factory = lambda self, path, factory=None: mailbox.Babyl(path, factory)
+
+ def tearDown(self):
+ self._box.close()
+ self._delete_recursively(self._path)
+ for lock_remnant in glob.glob(self._path + '.*'):
+ os.remove(lock_remnant)
+
+ def test_labels(self):
+ # Get labels from the mailbox
+ self.assert_(self._box.get_labels() == [])
+ msg0 = mailbox.BabylMessage(self._template % 0)
+ msg0.add_label('foo')
+ key0 = self._box.add(msg0)
+ self.assert_(self._box.get_labels() == ['foo'])
+ msg1 = mailbox.BabylMessage(self._template % 1)
+ msg1.set_labels(['bar', 'answered', 'foo'])
+ key1 = self._box.add(msg1)
+ self.assert_(set(self._box.get_labels()) == set(['foo', 'bar']))
+ msg0.set_labels(['blah', 'filed'])
+ self._box[key0] = msg0
+ self.assert_(set(self._box.get_labels()) ==
+ set(['foo', 'bar', 'blah']))
+ self._box.remove(key1)
+ self.assert_(set(self._box.get_labels()) == set(['blah']))
+
+
+class TestMessage(TestBase):
+
+ _factory = mailbox.Message # Overridden by subclasses to reuse tests
+
+ def setUp(self):
+ self._path = test_support.TESTFN
+
+ def tearDown(self):
+ self._delete_recursively(self._path)
+
+ def test_initialize_with_eMM(self):
+ # Initialize based on email.Message.Message instance
+ eMM = email.message_from_string(_sample_message)
+ msg = self._factory(eMM)
+ self._post_initialize_hook(msg)
+ self._check_sample(msg)
+
+ def test_initialize_with_string(self):
+ # Initialize based on string
+ msg = self._factory(_sample_message)
+ self._post_initialize_hook(msg)
+ self._check_sample(msg)
+
+ def test_initialize_with_file(self):
+ # Initialize based on contents of file
+ f = open(self._path, 'w+')
+ f.write(_sample_message)
+ f.seek(0)
+ msg = self._factory(f)
+ self._post_initialize_hook(msg)
+ self._check_sample(msg)
+ f.close()
+
+ def test_initialize_with_nothing(self):
+ # Initialize without arguments
+ msg = self._factory()
+ self._post_initialize_hook(msg)
+ self.assert_(isinstance(msg, email.Message.Message))
+ self.assert_(isinstance(msg, mailbox.Message))
+ self.assert_(isinstance(msg, self._factory))
+ self.assert_(msg.keys() == [])
+ self.assert_(not msg.is_multipart())
+ self.assert_(msg.get_payload() == None)
+
+ def test_initialize_incorrectly(self):
+ # Initialize with invalid argument
+ self.assertRaises(TypeError, lambda: self._factory(object()))
+
+ def test_become_message(self):
+ # Take on the state of another message
+ eMM = email.message_from_string(_sample_message)
+ msg = self._factory()
+ msg._become_message(eMM)
+ self._check_sample(msg)
+
+ def test_explain_to(self):
+ # Copy self's format-specific data to other message formats.
+ # This test is superficial; better ones are in TestMessageConversion.
+ msg = self._factory()
+ for class_ in (mailbox.Message, mailbox.MaildirMessage,
+ mailbox.mboxMessage, mailbox.MHMessage,
+ mailbox.BabylMessage, mailbox.MMDFMessage):
+ other_msg = class_()
+ msg._explain_to(other_msg)
+ other_msg = email.Message.Message()
+ self.assertRaises(TypeError, lambda: msg._explain_to(other_msg))
+
+ def _post_initialize_hook(self, msg):
+ # Overridden by subclasses to check extra things after initialization
+ pass
+
+
+class TestMaildirMessage(TestMessage):
+
+ _factory = mailbox.MaildirMessage
+
+ def _post_initialize_hook(self, msg):
+ self.assert_(msg._subdir == 'new')
+ self.assert_(msg._info == '')
+
+ def test_subdir(self):
+ # Use get_subdir() and set_subdir()
+ msg = mailbox.MaildirMessage(_sample_message)
+ self.assert_(msg.get_subdir() == 'new')
+ msg.set_subdir('cur')
+ self.assert_(msg.get_subdir() == 'cur')
+ msg.set_subdir('new')
+ self.assert_(msg.get_subdir() == 'new')
+ self.assertRaises(ValueError, lambda: msg.set_subdir('tmp'))
+ self.assert_(msg.get_subdir() == 'new')
+ msg.set_subdir('new')
+ self.assert_(msg.get_subdir() == 'new')
+ self._check_sample(msg)
+
+ def test_flags(self):
+ # Use get_flags(), set_flags(), add_flag(), remove_flag()
+ msg = mailbox.MaildirMessage(_sample_message)
+ self.assert_(msg.get_flags() == '')
+ self.assert_(msg.get_subdir() == 'new')
+ msg.set_flags('F')
+ self.assert_(msg.get_subdir() == 'new')
+ self.assert_(msg.get_flags() == 'F')
+ msg.set_flags('SDTP')
+ self.assert_(msg.get_flags() == 'DPST')
+ msg.add_flag('FT')
+ self.assert_(msg.get_flags() == 'DFPST')
+ msg.remove_flag('TDRP')
+ self.assert_(msg.get_flags() == 'FS')
+ self.assert_(msg.get_subdir() == 'new')
+ self._check_sample(msg)
+
+ def test_date(self):
+ # Use get_date() and set_date()
+ msg = mailbox.MaildirMessage(_sample_message)
+ self.assert_(abs(msg.get_date() - time.time()) < 60)
+ msg.set_date(0.0)
+ self.assert_(msg.get_date() == 0.0)
+
+ def test_info(self):
+ # Use get_info() and set_info()
+ msg = mailbox.MaildirMessage(_sample_message)
+ self.assert_(msg.get_info() == '')
+ msg.set_info('1,foo=bar')
+ self.assert_(msg.get_info() == '1,foo=bar')
+ self.assertRaises(TypeError, lambda: msg.set_info(None))
+ self._check_sample(msg)
+
+ def test_info_and_flags(self):
+ # Test interaction of info and flag methods
+ msg = mailbox.MaildirMessage(_sample_message)
+ self.assert_(msg.get_info() == '')
+ msg.set_flags('SF')
+ self.assert_(msg.get_flags() == 'FS')
+ self.assert_(msg.get_info() == '2,FS')
+ msg.set_info('1,')
+ self.assert_(msg.get_flags() == '')
+ self.assert_(msg.get_info() == '1,')
+ msg.remove_flag('RPT')
+ self.assert_(msg.get_flags() == '')
+ self.assert_(msg.get_info() == '1,')
+ msg.add_flag('D')
+ self.assert_(msg.get_flags() == 'D')
+ self.assert_(msg.get_info() == '2,D')
+ self._check_sample(msg)
+
+
+class _TestMboxMMDFMessage(TestMessage):
+
+ _factory = mailbox._mboxMMDFMessage
+
+ def _post_initialize_hook(self, msg):
+ self._check_from(msg)
+
+ def test_initialize_with_unixfrom(self):
+ # Initialize with a message that already has a _unixfrom attribute
+ msg = mailbox.Message(_sample_message)
+ msg.set_unixfrom('From foo@bar blah')
+ msg = mailbox.mboxMessage(msg)
+ self.assert_(msg.get_from() == 'foo@bar blah', msg.get_from())
+
+ def test_from(self):
+ # Get and set "From " line
+ msg = mailbox.mboxMessage(_sample_message)
+ self._check_from(msg)
+ msg.set_from('foo bar')
+ self.assert_(msg.get_from() == 'foo bar')
+ msg.set_from('foo@bar', True)
+ self._check_from(msg, 'foo@bar')
+ msg.set_from('blah@temp', time.localtime())
+ self._check_from(msg, 'blah@temp')
+
+ def test_flags(self):
+ # Use get_flags(), set_flags(), add_flag(), remove_flag()
+ msg = mailbox.mboxMessage(_sample_message)
+ self.assert_(msg.get_flags() == '')
+ msg.set_flags('F')
+ self.assert_(msg.get_flags() == 'F')
+ msg.set_flags('XODR')
+ self.assert_(msg.get_flags() == 'RODX')
+ msg.add_flag('FA')
+ self.assert_(msg.get_flags() == 'RODFAX')
+ msg.remove_flag('FDXA')
+ self.assert_(msg.get_flags() == 'RO')
+ self._check_sample(msg)
+
+ def _check_from(self, msg, sender=None):
+ # Check contents of "From " line
+ if sender is None:
+ sender = "MAILER-DAEMON"
+ self.assert_(re.match(sender + r" \w{3} \w{3} [\d ]\d [\d ]\d:\d{2}:"
+ r"\d{2} \d{4}", msg.get_from()) is not None)
+
+
+class TestMboxMessage(_TestMboxMMDFMessage):
+
+ _factory = mailbox.mboxMessage
+
+
+class TestMHMessage(TestMessage):
+
+ _factory = mailbox.MHMessage
+
+ def _post_initialize_hook(self, msg):
+ self.assert_(msg._sequences == [])
+
+ def test_sequences(self):
+ # Get, set, join, and leave sequences
+ msg = mailbox.MHMessage(_sample_message)
+ self.assert_(msg.get_sequences() == [])
+ msg.set_sequences(['foobar'])
+ self.assert_(msg.get_sequences() == ['foobar'])
+ msg.set_sequences([])
+ self.assert_(msg.get_sequences() == [])
+ msg.add_sequence('unseen')
+ self.assert_(msg.get_sequences() == ['unseen'])
+ msg.add_sequence('flagged')
+ self.assert_(msg.get_sequences() == ['unseen', 'flagged'])
+ msg.add_sequence('flagged')
+ self.assert_(msg.get_sequences() == ['unseen', 'flagged'])
+ msg.remove_sequence('unseen')
+ self.assert_(msg.get_sequences() == ['flagged'])
+ msg.add_sequence('foobar')
+ self.assert_(msg.get_sequences() == ['flagged', 'foobar'])
+ msg.remove_sequence('replied')
+ self.assert_(msg.get_sequences() == ['flagged', 'foobar'])
+ msg.set_sequences(['foobar', 'replied'])
+ self.assert_(msg.get_sequences() == ['foobar', 'replied'])
+
+
+class TestBabylMessage(TestMessage):
+
+ _factory = mailbox.BabylMessage
+
+ def _post_initialize_hook(self, msg):
+ self.assert_(msg._labels == [])
+
+ def test_labels(self):
+ # Get, set, join, and leave labels
+ msg = mailbox.BabylMessage(_sample_message)
+ self.assert_(msg.get_labels() == [])
+ msg.set_labels(['foobar'])
+ self.assert_(msg.get_labels() == ['foobar'])
+ msg.set_labels([])
+ self.assert_(msg.get_labels() == [])
+ msg.add_label('filed')
+ self.assert_(msg.get_labels() == ['filed'])
+ msg.add_label('resent')
+ self.assert_(msg.get_labels() == ['filed', 'resent'])
+ msg.add_label('resent')
+ self.assert_(msg.get_labels() == ['filed', 'resent'])
+ msg.remove_label('filed')
+ self.assert_(msg.get_labels() == ['resent'])
+ msg.add_label('foobar')
+ self.assert_(msg.get_labels() == ['resent', 'foobar'])
+ msg.remove_label('unseen')
+ self.assert_(msg.get_labels() == ['resent', 'foobar'])
+ msg.set_labels(['foobar', 'answered'])
+ self.assert_(msg.get_labels() == ['foobar', 'answered'])
+
+ def test_visible(self):
+ # Get, set, and update visible headers
+ msg = mailbox.BabylMessage(_sample_message)
+ visible = msg.get_visible()
+ self.assert_(visible.keys() == [])
+ self.assert_(visible.get_payload() is None)
+ visible['User-Agent'] = 'FooBar 1.0'
+ visible['X-Whatever'] = 'Blah'
+ self.assert_(msg.get_visible().keys() == [])
+ msg.set_visible(visible)
+ visible = msg.get_visible()
+ self.assert_(visible.keys() == ['User-Agent', 'X-Whatever'])
+ self.assert_(visible['User-Agent'] == 'FooBar 1.0')
+ self.assert_(visible['X-Whatever'] == 'Blah')
+ self.assert_(visible.get_payload() is None)
+ msg.update_visible()
+ self.assert_(visible.keys() == ['User-Agent', 'X-Whatever'])
+ self.assert_(visible.get_payload() is None)
+ visible = msg.get_visible()
+ self.assert_(visible.keys() == ['User-Agent', 'Date', 'From', 'To',
+ 'Subject'])
+ for header in ('User-Agent', 'Date', 'From', 'To', 'Subject'):
+ self.assert_(visible[header] == msg[header])
+
+
+class TestMMDFMessage(_TestMboxMMDFMessage):
+
+ _factory = mailbox.MMDFMessage
+
+
+class TestMessageConversion(TestBase):
+
+ def test_plain_to_x(self):
+ # Convert Message to all formats
+ for class_ in (mailbox.Message, mailbox.MaildirMessage,
+ mailbox.mboxMessage, mailbox.MHMessage,
+ mailbox.BabylMessage, mailbox.MMDFMessage):
+ msg_plain = mailbox.Message(_sample_message)
+ msg = class_(msg_plain)
+ self._check_sample(msg)
+
+ def test_x_to_plain(self):
+ # Convert all formats to Message
+ for class_ in (mailbox.Message, mailbox.MaildirMessage,
+ mailbox.mboxMessage, mailbox.MHMessage,
+ mailbox.BabylMessage, mailbox.MMDFMessage):
+ msg = class_(_sample_message)
+ msg_plain = mailbox.Message(msg)
+ self._check_sample(msg_plain)
+
+ def test_x_to_invalid(self):
+ # Convert all formats to an invalid format
+ for class_ in (mailbox.Message, mailbox.MaildirMessage,
+ mailbox.mboxMessage, mailbox.MHMessage,
+ mailbox.BabylMessage, mailbox.MMDFMessage):
+ self.assertRaises(TypeError, lambda: class_(False))
+
+ def test_maildir_to_maildir(self):
+ # Convert MaildirMessage to MaildirMessage
+ msg_maildir = mailbox.MaildirMessage(_sample_message)
+ msg_maildir.set_flags('DFPRST')
+ msg_maildir.set_subdir('cur')
+ date = msg_maildir.get_date()
+ msg = mailbox.MaildirMessage(msg_maildir)
+ self._check_sample(msg)
+ self.assert_(msg.get_flags() == 'DFPRST')
+ self.assert_(msg.get_subdir() == 'cur')
+ self.assert_(msg.get_date() == date)
+
+ def test_maildir_to_mboxmmdf(self):
+ # Convert MaildirMessage to mboxmessage and MMDFMessage
+ pairs = (('D', ''), ('F', 'F'), ('P', ''), ('R', 'A'), ('S', 'R'),
+ ('T', 'D'), ('DFPRST', 'RDFA'))
+ for class_ in (mailbox.mboxMessage, mailbox.MMDFMessage):
+ msg_maildir = mailbox.MaildirMessage(_sample_message)
+ msg_maildir.set_date(0.0)
+ for setting, result in pairs:
+ msg_maildir.set_flags(setting)
+ msg = class_(msg_maildir)
+ self.assert_(msg.get_flags() == result)
+ self.assert_(msg.get_from() == 'MAILER-DAEMON %s' %
+ time.asctime(time.gmtime(0.0)))
+ msg_maildir.set_subdir('cur')
+ self.assert_(class_(msg_maildir).get_flags() == 'RODFA')
+
+ def test_maildir_to_mh(self):
+ # Convert MaildirMessage to MHMessage
+ msg_maildir = mailbox.MaildirMessage(_sample_message)
+ pairs = (('D', ['unseen']), ('F', ['unseen', 'flagged']),
+ ('P', ['unseen']), ('R', ['unseen', 'replied']), ('S', []),
+ ('T', ['unseen']), ('DFPRST', ['replied', 'flagged']))
+ for setting, result in pairs:
+ msg_maildir.set_flags(setting)
+ self.assert_(mailbox.MHMessage(msg_maildir).get_sequences() == \
+ result)
+
+ def test_maildir_to_babyl(self):
+ # Convert MaildirMessage to Babyl
+ msg_maildir = mailbox.MaildirMessage(_sample_message)
+ pairs = (('D', ['unseen']), ('F', ['unseen']),
+ ('P', ['unseen', 'forwarded']), ('R', ['unseen', 'answered']),
+ ('S', []), ('T', ['unseen', 'deleted']),
+ ('DFPRST', ['deleted', 'answered', 'forwarded']))
+ for setting, result in pairs:
+ msg_maildir.set_flags(setting)
+ self.assert_(mailbox.BabylMessage(msg_maildir).get_labels() == \
+ result)
+
+ def test_mboxmmdf_to_maildir(self):
+ # Convert mboxMessage and MMDFMessage to MaildirMessage
+ for class_ in (mailbox.mboxMessage, mailbox.MMDFMessage):
+ msg_mboxMMDF = class_(_sample_message)
+ msg_mboxMMDF.set_from('foo@bar', time.gmtime(0.0))
+ pairs = (('R', 'S'), ('O', ''), ('D', 'T'), ('F', 'F'), ('A', 'R'),
+ ('RODFA', 'FRST'))
+ for setting, result in pairs:
+ msg_mboxMMDF.set_flags(setting)
+ msg = mailbox.MaildirMessage(msg_mboxMMDF)
+ self.assert_(msg.get_flags() == result)
+ self.assert_(msg.get_date() == 0.0, msg.get_date())
+ msg_mboxMMDF.set_flags('O')
+ self.assert_(mailbox.MaildirMessage(msg_mboxMMDF).get_subdir() == \
+ 'cur')
+
+ def test_mboxmmdf_to_mboxmmdf(self):
+ # Convert mboxMessage and MMDFMessage to mboxMessage and MMDFMessage
+ for class_ in (mailbox.mboxMessage, mailbox.MMDFMessage):
+ msg_mboxMMDF = class_(_sample_message)
+ msg_mboxMMDF.set_flags('RODFA')
+ msg_mboxMMDF.set_from('foo@bar')
+ for class2_ in (mailbox.mboxMessage, mailbox.MMDFMessage):
+ msg2 = class2_(msg_mboxMMDF)
+ self.assert_(msg2.get_flags() == 'RODFA')
+ self.assert_(msg2.get_from() == 'foo@bar')
+
+ def test_mboxmmdf_to_mh(self):
+ # Convert mboxMessage and MMDFMessage to MHMessage
+ for class_ in (mailbox.mboxMessage, mailbox.MMDFMessage):
+ msg_mboxMMDF = class_(_sample_message)
+ pairs = (('R', []), ('O', ['unseen']), ('D', ['unseen']),
+ ('F', ['unseen', 'flagged']),
+ ('A', ['unseen', 'replied']),
+ ('RODFA', ['replied', 'flagged']))
+ for setting, result in pairs:
+ msg_mboxMMDF.set_flags(setting)
+ self.assert_(mailbox.MHMessage(msg_mboxMMDF).get_sequences() \
+ == result)
+
+ def test_mboxmmdf_to_babyl(self):
+ # Convert mboxMessage and MMDFMessage to BabylMessage
+ for class_ in (mailbox.mboxMessage, mailbox.MMDFMessage):
+ msg = class_(_sample_message)
+ pairs = (('R', []), ('O', ['unseen']),
+ ('D', ['unseen', 'deleted']), ('F', ['unseen']),
+ ('A', ['unseen', 'answered']),
+ ('RODFA', ['deleted', 'answered']))
+ for setting, result in pairs:
+ msg.set_flags(setting)
+ self.assert_(mailbox.BabylMessage(msg).get_labels() == result)
+
+ def test_mh_to_maildir(self):
+ # Convert MHMessage to MaildirMessage
+ pairs = (('unseen', ''), ('replied', 'RS'), ('flagged', 'FS'))
+ for setting, result in pairs:
+ msg = mailbox.MHMessage(_sample_message)
+ msg.add_sequence(setting)
+ self.assert_(mailbox.MaildirMessage(msg).get_flags() == result)
+ self.assert_(mailbox.MaildirMessage(msg).get_subdir() == 'cur')
+ msg = mailbox.MHMessage(_sample_message)
+ msg.add_sequence('unseen')
+ msg.add_sequence('replied')
+ msg.add_sequence('flagged')
+ self.assert_(mailbox.MaildirMessage(msg).get_flags() == 'FR')
+ self.assert_(mailbox.MaildirMessage(msg).get_subdir() == 'cur')
+
+ def test_mh_to_mboxmmdf(self):
+ # Convert MHMessage to mboxMessage and MMDFMessage
+ pairs = (('unseen', 'O'), ('replied', 'ROA'), ('flagged', 'ROF'))
+ for setting, result in pairs:
+ msg = mailbox.MHMessage(_sample_message)
+ msg.add_sequence(setting)
+ for class_ in (mailbox.mboxMessage, mailbox.MMDFMessage):
+ self.assert_(class_(msg).get_flags() == result)
+ msg = mailbox.MHMessage(_sample_message)
+ msg.add_sequence('unseen')
+ msg.add_sequence('replied')
+ msg.add_sequence('flagged')
+ for class_ in (mailbox.mboxMessage, mailbox.MMDFMessage):
+ self.assert_(class_(msg).get_flags() == 'OFA')
+
+ def test_mh_to_mh(self):
+ # Convert MHMessage to MHMessage
+ msg = mailbox.MHMessage(_sample_message)
+ msg.add_sequence('unseen')
+ msg.add_sequence('replied')
+ msg.add_sequence('flagged')
+ self.assert_(mailbox.MHMessage(msg).get_sequences() == \
+ ['unseen', 'replied', 'flagged'])
+
+ def test_mh_to_babyl(self):
+ # Convert MHMessage to BabylMessage
+ pairs = (('unseen', ['unseen']), ('replied', ['answered']),
+ ('flagged', []))
+ for setting, result in pairs:
+ msg = mailbox.MHMessage(_sample_message)
+ msg.add_sequence(setting)
+ self.assert_(mailbox.BabylMessage(msg).get_labels() == result)
+ msg = mailbox.MHMessage(_sample_message)
+ msg.add_sequence('unseen')
+ msg.add_sequence('replied')
+ msg.add_sequence('flagged')
+ self.assert_(mailbox.BabylMessage(msg).get_labels() == \
+ ['unseen', 'answered'])
+
+ def test_babyl_to_maildir(self):
+ # Convert BabylMessage to MaildirMessage
+ pairs = (('unseen', ''), ('deleted', 'ST'), ('filed', 'S'),
+ ('answered', 'RS'), ('forwarded', 'PS'), ('edited', 'S'),
+ ('resent', 'PS'))
+ for setting, result in pairs:
+ msg = mailbox.BabylMessage(_sample_message)
+ msg.add_label(setting)
+ self.assert_(mailbox.MaildirMessage(msg).get_flags() == result)
+ self.assert_(mailbox.MaildirMessage(msg).get_subdir() == 'cur')
+ msg = mailbox.BabylMessage(_sample_message)
+ for label in ('unseen', 'deleted', 'filed', 'answered', 'forwarded',
+ 'edited', 'resent'):
+ msg.add_label(label)
+ self.assert_(mailbox.MaildirMessage(msg).get_flags() == 'PRT')
+ self.assert_(mailbox.MaildirMessage(msg).get_subdir() == 'cur')
+
+ def test_babyl_to_mboxmmdf(self):
+ # Convert BabylMessage to mboxMessage and MMDFMessage
+ pairs = (('unseen', 'O'), ('deleted', 'ROD'), ('filed', 'RO'),
+ ('answered', 'ROA'), ('forwarded', 'RO'), ('edited', 'RO'),
+ ('resent', 'RO'))
+ for setting, result in pairs:
+ for class_ in (mailbox.mboxMessage, mailbox.MMDFMessage):
+ msg = mailbox.BabylMessage(_sample_message)
+ msg.add_label(setting)
+ self.assert_(class_(msg).get_flags() == result)
+ msg = mailbox.BabylMessage(_sample_message)
+ for label in ('unseen', 'deleted', 'filed', 'answered', 'forwarded',
+ 'edited', 'resent'):
+ msg.add_label(label)
+ for class_ in (mailbox.mboxMessage, mailbox.MMDFMessage):
+ self.assert_(class_(msg).get_flags() == 'ODA')
+
+ def test_babyl_to_mh(self):
+ # Convert BabylMessage to MHMessage
+ pairs = (('unseen', ['unseen']), ('deleted', []), ('filed', []),
+ ('answered', ['replied']), ('forwarded', []), ('edited', []),
+ ('resent', []))
+ for setting, result in pairs:
+ msg = mailbox.BabylMessage(_sample_message)
+ msg.add_label(setting)
+ self.assert_(mailbox.MHMessage(msg).get_sequences() == result)
+ msg = mailbox.BabylMessage(_sample_message)
+ for label in ('unseen', 'deleted', 'filed', 'answered', 'forwarded',
+ 'edited', 'resent'):
+ msg.add_label(label)
+ self.assert_(mailbox.MHMessage(msg).get_sequences() == \
+ ['unseen', 'replied'])
+
+ def test_babyl_to_babyl(self):
+ # Convert BabylMessage to BabylMessage
+ msg = mailbox.BabylMessage(_sample_message)
+ msg.update_visible()
+ for label in ('unseen', 'deleted', 'filed', 'answered', 'forwarded',
+ 'edited', 'resent'):
+ msg.add_label(label)
+ msg2 = mailbox.BabylMessage(msg)
+ self.assert_(msg2.get_labels() == ['unseen', 'deleted', 'filed',
+ 'answered', 'forwarded', 'edited',
+ 'resent'])
+ self.assert_(msg.get_visible().keys() == msg2.get_visible().keys())
+ for key in msg.get_visible().keys():
+ self.assert_(msg.get_visible()[key] == msg2.get_visible()[key])
+
+
+class TestProxyFileBase(TestBase):
+
+ def _test_read(self, proxy):
+ # Read by byte
+ proxy.seek(0)
+ self.assert_(proxy.read() == 'bar')
+ proxy.seek(1)
+ self.assert_(proxy.read() == 'ar')
+ proxy.seek(0)
+ self.assert_(proxy.read(2) == 'ba')
+ proxy.seek(1)
+ self.assert_(proxy.read(-1) == 'ar')
+ proxy.seek(2)
+ self.assert_(proxy.read(1000) == 'r')
+
+ def _test_readline(self, proxy):
+ # Read by line
+ proxy.seek(0)
+ self.assert_(proxy.readline() == 'foo' + os.linesep)
+ self.assert_(proxy.readline() == 'bar' + os.linesep)
+ self.assert_(proxy.readline() == 'fred' + os.linesep)
+ self.assert_(proxy.readline() == 'bob')
+ proxy.seek(2)
+ self.assert_(proxy.readline() == 'o' + os.linesep)
+ proxy.seek(6 + 2 * len(os.linesep))
+ self.assert_(proxy.readline() == 'fred' + os.linesep)
+ proxy.seek(6 + 2 * len(os.linesep))
+ self.assert_(proxy.readline(2) == 'fr')
+ self.assert_(proxy.readline(-10) == 'ed' + os.linesep)
+
+ def _test_readlines(self, proxy):
+ # Read multiple lines
+ proxy.seek(0)
+ self.assert_(proxy.readlines() == ['foo' + os.linesep,
+ 'bar' + os.linesep,
+ 'fred' + os.linesep, 'bob'])
+ proxy.seek(0)
+ self.assert_(proxy.readlines(2) == ['foo' + os.linesep])
+ proxy.seek(3 + len(os.linesep))
+ self.assert_(proxy.readlines(4 + len(os.linesep)) ==
+ ['bar' + os.linesep, 'fred' + os.linesep])
+ proxy.seek(3)
+ self.assert_(proxy.readlines(1000) == [os.linesep, 'bar' + os.linesep,
+ 'fred' + os.linesep, 'bob'])
+
+ def _test_iteration(self, proxy):
+ # Iterate by line
+ proxy.seek(0)
+ iterator = iter(proxy)
+ self.assert_(iterator.next() == 'foo' + os.linesep)
+ self.assert_(iterator.next() == 'bar' + os.linesep)
+ self.assert_(iterator.next() == 'fred' + os.linesep)
+ self.assert_(iterator.next() == 'bob')
+ self.assertRaises(StopIteration, lambda: iterator.next())
+
+ def _test_seek_and_tell(self, proxy):
+ # Seek and use tell to check position
+ proxy.seek(3)
+ self.assert_(proxy.tell() == 3)
+ self.assert_(proxy.read(len(os.linesep)) == os.linesep)
+ proxy.seek(2, 1)
+ self.assert_(proxy.read(1 + len(os.linesep)) == 'r' + os.linesep)
+ proxy.seek(-3 - len(os.linesep), 2)
+ self.assert_(proxy.read(3) == 'bar')
+ proxy.seek(2, 0)
+ self.assert_(proxy.read() == 'o' + os.linesep + 'bar' + os.linesep)
+ proxy.seek(100)
+ self.assert_(proxy.read() == '')
+
+ def _test_close(self, proxy):
+ # Close a file
+ proxy.close()
+ self.assertRaises(AttributeError, lambda: proxy.close())
+
+
+class TestProxyFile(TestProxyFileBase):
+
+ def setUp(self):
+ self._path = test_support.TESTFN
+ self._file = file(self._path, 'wb+')
+
+ def tearDown(self):
+ self._file.close()
+ self._delete_recursively(self._path)
+
+ def test_initialize(self):
+ # Initialize and check position
+ self._file.write('foo')
+ pos = self._file.tell()
+ proxy0 = mailbox._ProxyFile(self._file)
+ self.assert_(proxy0.tell() == pos)
+ self.assert_(self._file.tell() == pos)
+ proxy1 = mailbox._ProxyFile(self._file, 0)
+ self.assert_(proxy1.tell() == 0)
+ self.assert_(self._file.tell() == pos)
+
+ def test_read(self):
+ self._file.write('bar')
+ self._test_read(mailbox._ProxyFile(self._file))
+
+ def test_readline(self):
+ self._file.write('foo%sbar%sfred%sbob' % (os.linesep, os.linesep,
+ os.linesep))
+ self._test_readline(mailbox._ProxyFile(self._file))
+
+ def test_readlines(self):
+ self._file.write('foo%sbar%sfred%sbob' % (os.linesep, os.linesep,
+ os.linesep))
+ self._test_readlines(mailbox._ProxyFile(self._file))
+
+ def test_iteration(self):
+ self._file.write('foo%sbar%sfred%sbob' % (os.linesep, os.linesep,
+ os.linesep))
+ self._test_iteration(mailbox._ProxyFile(self._file))
+
+ def test_seek_and_tell(self):
+ self._file.write('foo%sbar%s' % (os.linesep, os.linesep))
+ self._test_seek_and_tell(mailbox._ProxyFile(self._file))
+
+ def test_close(self):
+ self._file.write('foo%sbar%s' % (os.linesep, os.linesep))
+ self._test_close(mailbox._ProxyFile(self._file))
+
+
+class TestPartialFile(TestProxyFileBase):
+
+ def setUp(self):
+ self._path = test_support.TESTFN
+ self._file = file(self._path, 'wb+')
+
+ def tearDown(self):
+ self._file.close()
+ self._delete_recursively(self._path)
+
+ def test_initialize(self):
+ # Initialize and check position
+ self._file.write('foo' + os.linesep + 'bar')
+ pos = self._file.tell()
+ proxy = mailbox._PartialFile(self._file, 2, 5)
+ self.assert_(proxy.tell() == 0)
+ self.assert_(self._file.tell() == pos)
+
+ def test_read(self):
+ self._file.write('***bar***')
+ self._test_read(mailbox._PartialFile(self._file, 3, 6))
+
+ def test_readline(self):
+ self._file.write('!!!!!foo%sbar%sfred%sbob!!!!!' %
+ (os.linesep, os.linesep, os.linesep))
+ self._test_readline(mailbox._PartialFile(self._file, 5,
+ 18 + 3 * len(os.linesep)))
+
+ def test_readlines(self):
+ self._file.write('foo%sbar%sfred%sbob?????' %
+ (os.linesep, os.linesep, os.linesep))
+ self._test_readlines(mailbox._PartialFile(self._file, 0,
+ 13 + 3 * len(os.linesep)))
+
+ def test_iteration(self):
+ self._file.write('____foo%sbar%sfred%sbob####' %
+ (os.linesep, os.linesep, os.linesep))
+ self._test_iteration(mailbox._PartialFile(self._file, 4,
+ 17 + 3 * len(os.linesep)))
+
+ def test_seek_and_tell(self):
+ self._file.write('(((foo%sbar%s$$$' % (os.linesep, os.linesep))
+ self._test_seek_and_tell(mailbox._PartialFile(self._file, 3,
+ 9 + 2 * len(os.linesep)))
+
+ def test_close(self):
+ self._file.write('&foo%sbar%s^' % (os.linesep, os.linesep))
+ self._test_close(mailbox._PartialFile(self._file, 1,
+ 6 + 3 * len(os.linesep)))
+
+
+## Start: tests from the original module (for backward compatibility).
+
FROM_ = "From some.body@dummy.domain Sat Jul 24 13:43:35 2004\n"
DUMMY_MESSAGE = """\
From: some.body@dummy.domain
@@ -65,15 +1622,15 @@ class MaildirTestCase(unittest.TestCase):
# Test for regression on bug #117490:
# Make sure the boxes attribute actually gets set.
self.mbox = mailbox.Maildir(test_support.TESTFN)
- self.assert_(hasattr(self.mbox, "boxes"))
- self.assert_(len(self.mbox.boxes) == 0)
+ #self.assert_(hasattr(self.mbox, "boxes"))
+ #self.assert_(len(self.mbox.boxes) == 0)
self.assert_(self.mbox.next() is None)
self.assert_(self.mbox.next() is None)
def test_nonempty_maildir_cur(self):
self.createMessage("cur")
self.mbox = mailbox.Maildir(test_support.TESTFN)
- self.assert_(len(self.mbox.boxes) == 1)
+ #self.assert_(len(self.mbox.boxes) == 1)
self.assert_(self.mbox.next() is not None)
self.assert_(self.mbox.next() is None)
self.assert_(self.mbox.next() is None)
@@ -81,7 +1638,7 @@ class MaildirTestCase(unittest.TestCase):
def test_nonempty_maildir_new(self):
self.createMessage("new")
self.mbox = mailbox.Maildir(test_support.TESTFN)
- self.assert_(len(self.mbox.boxes) == 1)
+ #self.assert_(len(self.mbox.boxes) == 1)
self.assert_(self.mbox.next() is not None)
self.assert_(self.mbox.next() is None)
self.assert_(self.mbox.next() is None)
@@ -90,7 +1647,7 @@ class MaildirTestCase(unittest.TestCase):
self.createMessage("cur")
self.createMessage("new")
self.mbox = mailbox.Maildir(test_support.TESTFN)
- self.assert_(len(self.mbox.boxes) == 2)
+ #self.assert_(len(self.mbox.boxes) == 2)
self.assert_(self.mbox.next() is not None)
self.assert_(self.mbox.next() is not None)
self.assert_(self.mbox.next() is None)
@@ -108,12 +1665,99 @@ class MaildirTestCase(unittest.TestCase):
self.assertEqual(len(str(msg)), len(FROM_)+len(DUMMY_MESSAGE))
self.assertEqual(n, 1)
- # XXX We still need more tests!
+## End: classes from the original module (for backward compatibility).
+
+
+_sample_message = """\
+Return-Path: <gkj@gregorykjohnson.com>
+X-Original-To: gkj+person@localhost
+Delivered-To: gkj+person@localhost
+Received: from localhost (localhost [127.0.0.1])
+ by andy.gregorykjohnson.com (Postfix) with ESMTP id 356ED9DD17
+ for <gkj+person@localhost>; Wed, 13 Jul 2005 17:23:16 -0400 (EDT)
+Delivered-To: gkj@sundance.gregorykjohnson.com
+Received: from localhost [127.0.0.1]
+ by localhost with POP3 (fetchmail-6.2.5)
+ for gkj+person@localhost (single-drop); Wed, 13 Jul 2005 17:23:16 -0400 (EDT)
+Received: from andy.gregorykjohnson.com (andy.gregorykjohnson.com [64.32.235.228])
+ by sundance.gregorykjohnson.com (Postfix) with ESMTP id 5B056316746
+ for <gkj@gregorykjohnson.com>; Wed, 13 Jul 2005 17:23:11 -0400 (EDT)
+Received: by andy.gregorykjohnson.com (Postfix, from userid 1000)
+ id 490CD9DD17; Wed, 13 Jul 2005 17:23:11 -0400 (EDT)
+Date: Wed, 13 Jul 2005 17:23:11 -0400
+From: "Gregory K. Johnson" <gkj@gregorykjohnson.com>
+To: gkj@gregorykjohnson.com
+Subject: Sample message
+Message-ID: <20050713212311.GC4701@andy.gregorykjohnson.com>
+Mime-Version: 1.0
+Content-Type: multipart/mixed; boundary="NMuMz9nt05w80d4+"
+Content-Disposition: inline
+User-Agent: Mutt/1.5.9i
+
+
+--NMuMz9nt05w80d4+
+Content-Type: text/plain; charset=us-ascii
+Content-Disposition: inline
+
+This is a sample message.
+
+--
+Gregory K. Johnson
+
+--NMuMz9nt05w80d4+
+Content-Type: application/octet-stream
+Content-Disposition: attachment; filename="text.gz"
+Content-Transfer-Encoding: base64
+
+H4sICM2D1UIAA3RleHQAC8nILFYAokSFktSKEoW0zJxUPa7wzJIMhZLyfIWczLzUYj0uAHTs
+3FYlAAAA
+
+--NMuMz9nt05w80d4+--
+"""
+
+_sample_headers = {
+ "Return-Path":"<gkj@gregorykjohnson.com>",
+ "X-Original-To":"gkj+person@localhost",
+ "Delivered-To":"gkj+person@localhost",
+ "Received":"""from localhost (localhost [127.0.0.1])
+ by andy.gregorykjohnson.com (Postfix) with ESMTP id 356ED9DD17
+ for <gkj+person@localhost>; Wed, 13 Jul 2005 17:23:16 -0400 (EDT)""",
+ "Delivered-To":"gkj@sundance.gregorykjohnson.com",
+ "Received":"""from localhost [127.0.0.1]
+ by localhost with POP3 (fetchmail-6.2.5)
+ for gkj+person@localhost (single-drop); Wed, 13 Jul 2005 17:23:16 -0400 (EDT)""",
+ "Received":"""from andy.gregorykjohnson.com (andy.gregorykjohnson.com [64.32.235.228])
+ by sundance.gregorykjohnson.com (Postfix) with ESMTP id 5B056316746
+ for <gkj@gregorykjohnson.com>; Wed, 13 Jul 2005 17:23:11 -0400 (EDT)""",
+ "Received":"""by andy.gregorykjohnson.com (Postfix, from userid 1000)
+ id 490CD9DD17; Wed, 13 Jul 2005 17:23:11 -0400 (EDT)""",
+ "Date":"Wed, 13 Jul 2005 17:23:11 -0400",
+ "From":""""Gregory K. Johnson" <gkj@gregorykjohnson.com>""",
+ "To":"gkj@gregorykjohnson.com",
+ "Subject":"Sample message",
+ "Mime-Version":"1.0",
+ "Content-Type":"""multipart/mixed; boundary="NMuMz9nt05w80d4+\"""",
+ "Content-Disposition":"inline",
+ "User-Agent": "Mutt/1.5.9i" }
+
+_sample_payloads = ("""This is a sample message.
+
+--
+Gregory K. Johnson
+""",
+"""H4sICM2D1UIAA3RleHQAC8nILFYAokSFktSKEoW0zJxUPa7wzJIMhZLyfIWczLzUYj0uAHTs
+3FYlAAAA
+""")
def test_main():
- test_support.run_unittest(MaildirTestCase)
+ tests = (TestMailboxSuperclass, TestMaildir, TestMbox, TestMMDF, TestMH,
+ TestBabyl, TestMessage, TestMaildirMessage, TestMboxMessage,
+ TestMHMessage, TestBabylMessage, TestMMDFMessage,
+ TestMessageConversion, TestProxyFile, TestPartialFile,
+ MaildirTestCase)
+ test_support.run_unittest(*tests)
-if __name__ == "__main__":
+if __name__ == '__main__':
test_main()