diff options
-rwxr-xr-x | Lib/mailbox.py | 485 | ||||
-rw-r--r-- | Lib/shlex.py | 23 |
2 files changed, 254 insertions, 254 deletions
diff --git a/Lib/mailbox.py b/Lib/mailbox.py index b6c16fe..b5a5692 100755 --- a/Lib/mailbox.py +++ b/Lib/mailbox.py @@ -7,273 +7,272 @@ import rfc822 import os class _Mailbox: + def __init__(self, fp): + self.fp = fp + self.seekp = 0 + + def seek(self, pos, whence=0): + if whence==1: # Relative to current position + self.pos = self.pos + pos + if whence==2: # Relative to file's end + self.pos = self.stop + pos + else: # Default - absolute position + self.pos = self.start + pos + + def next(self): + while 1: + self.fp.seek(self.seekp) + try: + self._search_start() + except EOFError: + self.seekp = self.fp.tell() + return None + start = self.fp.tell() + self._search_end() + self.seekp = stop = self.fp.tell() + if start <> stop: + break + return rfc822.Message(_Subfile(self.fp, start, stop)) - def __init__(self, fp): - self.fp = fp - self.seekp = 0 - - def seek(self, pos, whence=0): - if whence==1: # Relative to current position - self.pos = self.pos + pos - if whence==2: # Relative to file's end - self.pos = self.stop + pos - else: # Default - absolute position - self.pos = self.start + pos - - def next(self): - while 1: - self.fp.seek(self.seekp) - try: - self._search_start() - except EOFError: - self.seekp = self.fp.tell() - return None - start = self.fp.tell() - self._search_end() - self.seekp = stop = self.fp.tell() - if start <> stop: - break - return rfc822.Message(_Subfile(self.fp, start, stop)) class _Subfile: + def __init__(self, fp, start, stop): + self.fp = fp + self.start = start + self.stop = stop + self.pos = self.start + + def read(self, length = None): + if self.pos >= self.stop: + return '' + remaining = self.stop - self.pos + if length is None or length < 0: + length = remaining + elif length > remaining: + length = remaining + self.fp.seek(self.pos) + data = self.fp.read(length) + self.pos = self.fp.tell() + return data + + def readline(self, length = None): + if self.pos >= self.stop: + return '' + if length is None: + length = self.stop - self.pos + self.fp.seek(self.pos) + data = self.fp.readline(length) + self.pos = self.fp.tell() + return data + + def readlines(self, sizehint = -1): + lines = [] + while 1: + line = self.readline() + if not line: + break + lines.append(line) + if sizehint >= 0: + sizehint = sizehint - len(line) + if sizehint <= 0: + break + return lines + + def tell(self): + return self.pos - self.start + + def seek(self, pos, whence=0): + if whence == 0: + self.pos = self.start + pos + elif whence == 1: + self.pos = self.pos + pos + elif whence == 2: + self.pos = self.stop + pos + + def close(self): + del self.fp - def __init__(self, fp, start, stop): - self.fp = fp - self.start = start - self.stop = stop - self.pos = self.start - - def read(self, length = None): - if self.pos >= self.stop: - return '' - remaining = self.stop - self.pos - if length is None or length < 0: - length = remaining - elif length > remaining: - length = remaining - self.fp.seek(self.pos) - data = self.fp.read(length) - self.pos = self.fp.tell() - return data - - def readline(self, length = None): - if self.pos >= self.stop: - return '' - if length is None: - length = self.stop - self.pos - self.fp.seek(self.pos) - data = self.fp.readline(length) - self.pos = self.fp.tell() - return data - - def readlines(self, sizehint = -1): - lines = [] - while 1: - line = self.readline() - if not line: - break - lines.append(line) - if sizehint >= 0: - sizehint = sizehint - len(line) - if sizehint <= 0: - break - return lines - - def tell(self): - return self.pos - self.start - - def seek(self, pos, whence=0): - if whence == 0: - self.pos = self.start + pos - elif whence == 1: - self.pos = self.pos + pos - elif whence == 2: - self.pos = self.stop + pos - - def close(self): - del self.fp class UnixMailbox(_Mailbox): + def _search_start(self): + while 1: + pos = self.fp.tell() + line = self.fp.readline() + if not line: + raise EOFError + if line[:5] == 'From ' and self._isrealfromline(line): + self.fp.seek(pos) + return + + def _search_end(self): + self.fp.readline() # Throw away header line + while 1: + pos = self.fp.tell() + line = self.fp.readline() + if not line: + return + if line[:5] == 'From ' and self._isrealfromline(line): + self.fp.seek(pos) + return + + # An overridable mechanism to test for From-line-ness. + # You can either specify a different regular expression + # or define a whole new _isrealfromline() method. + # Note that this only gets called for lines starting with + # the 5 characters "From ". + + _fromlinepattern = r"From \s*[^\s]+\s+\w\w\w\s+\w\w\w\s+\d?\d\s+" \ + r"\d?\d:\d\d(:\d\d)?(\s+[^\s]+)?\s+\d\d\d\d\s*$" + _regexp = None + + def _isrealfromline(self, line): + if not self._regexp: + import re + self._regexp = re.compile(self._fromlinepattern) + return self._regexp.match(line) - def _search_start(self): - while 1: - pos = self.fp.tell() - line = self.fp.readline() - if not line: - raise EOFError - if line[:5] == 'From ' and self._isrealfromline(line): - self.fp.seek(pos) - return - - def _search_end(self): - self.fp.readline() # Throw away header line - while 1: - pos = self.fp.tell() - line = self.fp.readline() - if not line: - return - if line[:5] == 'From ' and self._isrealfromline(line): - self.fp.seek(pos) - return - - # An overridable mechanism to test for From-line-ness. - # You can either specify a different regular expression - # or define a whole new _isrealfromline() method. - # Note that this only gets called for lines starting with - # the 5 characters "From ". - - _fromlinepattern = r"From \s*[^\s]+\s+\w\w\w\s+\w\w\w\s+\d?\d\s+" \ - r"\d?\d:\d\d(:\d\d)?(\s+[^\s]+)?\s+\d\d\d\d\s*$" - _regexp = None - - def _isrealfromline(self, line): - if not self._regexp: - import re - self._regexp = re.compile(self._fromlinepattern) - return self._regexp.match(line) class MmdfMailbox(_Mailbox): + def _search_start(self): + while 1: + line = self.fp.readline() + if not line: + raise EOFError + if line[:5] == '\001\001\001\001\n': + return + + def _search_end(self): + while 1: + pos = self.fp.tell() + line = self.fp.readline() + if not line: + return + if line == '\001\001\001\001\n': + self.fp.seek(pos) + return - def _search_start(self): - while 1: - line = self.fp.readline() - if not line: - raise EOFError - if line[:5] == '\001\001\001\001\n': - return - - def _search_end(self): - while 1: - pos = self.fp.tell() - line = self.fp.readline() - if not line: - return - if line == '\001\001\001\001\n': - self.fp.seek(pos) - return class MHMailbox: + def __init__(self, dirname): + import re + pat = re.compile('^[0-9][0-9]*$') + self.dirname = dirname + files = os.listdir(self.dirname) + self.boxes = [] + for f in files: + if pat.match(f): + self.boxes.append(f) + + def next(self): + if not self.boxes: + return None + fn = self.boxes[0] + del self.boxes[0] + fp = open(os.path.join(self.dirname, fn)) + return rfc822.Message(fp) - def __init__(self, dirname): - import re - pat = re.compile('^[0-9][0-9]*$') - self.dirname = dirname - files = os.listdir(self.dirname) - self.boxes = [] - for f in files: - if pat.match(f): - self.boxes.append(f) - - def next(self): - if not self.boxes: - return None - fn = self.boxes[0] - del self.boxes[0] - fp = open(os.path.join(self.dirname, fn)) - return rfc822.Message(fp) class Maildir: + # Qmail directory mailbox + + def __init__(self, dirname): + import string + self.dirname = dirname + self.boxes = [] + + # check for new mail + newdir = os.path.join(self.dirname, 'new') + for file in os.listdir(newdir): + if len(string.split(file, '.')) > 2: + self.boxes.append(os.path.join(newdir, file)) + + # Now check for current mail in this maildir + curdir = os.path.join(self.dirname, 'cur') + for file in os.listdir(curdir): + if len(string.split(file, '.')) > 2: + self.boxes.append(os.path.join(curdir, file)) + + def next(self): + if not self.boxes: + return None + fn = self.boxes[0] + del self.boxes[0] + fp = open(os.path.join(self.dirname, fn)) + return rfc822.Message(fp) - # Qmail directory mailbox - - def __init__(self, dirname): - import string - self.dirname = dirname - self.boxes = [] - - # check for new mail - newdir = os.path.join(self.dirname, 'new') - for file in os.listdir(newdir): - if len(string.split(file, '.')) > 2: - self.boxes.append(os.path.join(newdir, file)) - - # Now check for current mail in this maildir - curdir = os.path.join(self.dirname, 'cur') - for file in os.listdir(curdir): - if len(string.split(file, '.')) > 2: - self.boxes.append(os.path.join(curdir, file)) - - def next(self): - if not self.boxes: - return None - fn = self.boxes[0] - del self.boxes[0] - fp = open(os.path.join(self.dirname, fn)) - return rfc822.Message(fp) class BabylMailbox(_Mailbox): + def _search_start(self): + while 1: + line = self.fp.readline() + if not line: + raise EOFError + if line == '*** EOOH ***\n': + return - def _search_start(self): - while 1: - line = self.fp.readline() - if not line: - raise EOFError - if line == '*** EOOH ***\n': - return - - def _search_end(self): - while 1: - pos = self.fp.tell() - line = self.fp.readline() - if not line: - return - if line == '\037\014\n': - self.fp.seek(pos) - return + def _search_end(self): + while 1: + pos = self.fp.tell() + line = self.fp.readline() + if not line: + return + if line == '\037\014\n': + self.fp.seek(pos) + return def _test(): - import time - import sys - import string - import os - - args = sys.argv[1:] - if not args: - for key in 'MAILDIR', 'MAIL', 'LOGNAME', 'USER': - if os.environ.has_key(key): - mbox = os.environ[key] - break - else: - print "$MAIL, $LOGNAME nor $USER set -- who are you?" - return - else: - mbox = args[0] - if mbox[:1] == '+': - mbox = os.environ['HOME'] + '/Mail/' + mbox[1:] - elif not '/' in mbox: - mbox = '/usr/mail/' + mbox - if os.path.isdir(mbox): - if os.path.isdir(os.path.join(mbox, 'cur')): - mb = Maildir(mbox) - else: - mb = MHMailbox(mbox) + import time + import sys + import string + import os + + args = sys.argv[1:] + if not args: + for key in 'MAILDIR', 'MAIL', 'LOGNAME', 'USER': + if os.environ.has_key(key): + mbox = os.environ[key] + break else: - fp = open(mbox, 'r') - mb = UnixMailbox(fp) - - msgs = [] - while 1: - msg = mb.next() - if msg is None: - break - msgs.append(msg) - if len(args) <= 1: - msg.fp = None - if len(args) > 1: - num = string.atoi(args[1]) - print 'Message %d body:'%num - msg = msgs[num-1] - msg.rewindbody() - sys.stdout.write(msg.fp.read()) + print "$MAIL, $LOGNAME nor $USER set -- who are you?" + return + else: + mbox = args[0] + if mbox[:1] == '+': + mbox = os.environ['HOME'] + '/Mail/' + mbox[1:] + elif not '/' in mbox: + mbox = '/usr/mail/' + mbox + if os.path.isdir(mbox): + if os.path.isdir(os.path.join(mbox, 'cur')): + mb = Maildir(mbox) else: - print 'Mailbox',mbox,'has',len(msgs),'messages:' - for msg in msgs: - f = msg.getheader('from') or "" - s = msg.getheader('subject') or "" - d = msg.getheader('date') or "" - print '%20.20s %18.18s %-30.30s'%(f, d[5:], s) + mb = MHMailbox(mbox) + else: + fp = open(mbox, 'r') + mb = UnixMailbox(fp) + + msgs = [] + while 1: + msg = mb.next() + if msg is None: + break + msgs.append(msg) + if len(args) <= 1: + msg.fp = None + if len(args) > 1: + num = string.atoi(args[1]) + print 'Message %d body:'%num + msg = msgs[num-1] + msg.rewindbody() + sys.stdout.write(msg.fp.read()) + else: + print 'Mailbox',mbox,'has',len(msgs),'messages:' + for msg in msgs: + f = msg.getheader('from') or "" + s = msg.getheader('subject') or "" + d = msg.getheader('date') or "" + print '%20.20s %18.18s %-30.30s'%(f, d[5:], s) if __name__ == '__main__': - _test() + _test() diff --git a/Lib/shlex.py b/Lib/shlex.py index 876dcdf..fd7535a 100644 --- a/Lib/shlex.py +++ b/Lib/shlex.py @@ -17,7 +17,8 @@ class shlex: self.instream = sys.stdin self.infile = None self.commenters = '#' - self.wordchars = 'abcdfeghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789_' + self.wordchars = ('abcdfeghijklmnopqrstuvwxyz' + 'ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789_') self.whitespace = ' \t\r\n' self.quotes = '\'"' self.state = ' ' @@ -25,7 +26,7 @@ class shlex: self.lineno = 1 self.debug = 0 self.token = '' - self.filestack = [] + self.filestack = [] self.source = None if self.debug: print 'shlex: reading from %s, line %d' \ @@ -45,7 +46,7 @@ class shlex: if self.debug >= 1: print "shlex: popping token " + `tok` return tok - # No pushback. Get a token. + # No pushback. Get a token. raw = self.read_token() # Handle inclusions while raw == self.source: @@ -88,18 +89,18 @@ class shlex: if self.debug >= 3: print "shlex: in state", repr(self.state), \ "I see character:", repr(nextchar) - if self.state == None: - self.token = ''; # past end of file + if self.state is None: + self.token = ''; # past end of file break elif self.state == ' ': if not nextchar: - self.state = None; # end of file + self.state = None; # end of file break elif nextchar in self.whitespace: if self.debug >= 2: print "shlex: I see whitespace in whitespace state" if self.token: - break # emit current token + break # emit current token else: continue elif nextchar in self.commenters: @@ -114,7 +115,7 @@ class shlex: else: self.token = nextchar if self.token: - break # emit current token + break # emit current token else: continue elif self.state in self.quotes: @@ -124,14 +125,14 @@ class shlex: break elif self.state == 'a': if not nextchar: - self.state = None; # end of file + self.state = None; # end of file break elif nextchar in self.whitespace: if self.debug >= 2: print "shlex: I see whitespace in word state" self.state = ' ' if self.token: - break # emit current token + break # emit current token else: continue elif nextchar in self.commenters: @@ -145,7 +146,7 @@ class shlex: print "shlex: I see punctuation in word state" self.state = ' ' if self.token: - break # emit current token + break # emit current token else: continue result = self.token |