From c2c07fa10ce1d5d636ababe380d830dfecef43cf Mon Sep 17 00:00:00 2001 From: Guido van Rossum Date: Thu, 9 Apr 1998 13:51:46 +0000 Subject: Piers Lauder's IMAP module. --- Lib/imaplib.py | 792 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 792 insertions(+) create mode 100644 Lib/imaplib.py diff --git a/Lib/imaplib.py b/Lib/imaplib.py new file mode 100644 index 0000000..e6754ef --- /dev/null +++ b/Lib/imaplib.py @@ -0,0 +1,792 @@ +"""IMAP4 client. + +Based on RFC 2060. + +Author: Piers Lauder December 1997. + +Public class: IMAP4 +Public variable: Debug +Public functions: Internaldate2tuple + Int2AP + ParseFlags + Time2Internaldate +""" + +import os, re, socket, string, time + +# Globals + +CRLF = '\r\n' +Debug = 0 +IMAP4_PORT = 143 + +# Commands + +Commands = { + # name valid states + 'APPEND': ('AUTH', 'SELECTED'), + 'AUTHENTICATE': ('NONAUTH',), + 'CAPABILITY': ('NONAUTH', 'AUTH', 'SELECTED', 'LOGOUT'), + 'CHECK': ('SELECTED',), + 'CLOSE': ('SELECTED',), + 'COPY': ('SELECTED',), + 'CREATE': ('AUTH', 'SELECTED'), + 'DELETE': ('AUTH', 'SELECTED'), + 'EXAMINE': ('AUTH', 'SELECTED'), + 'EXPUNGE': ('SELECTED',), + 'FETCH': ('SELECTED',), + 'LIST': ('AUTH', 'SELECTED'), + 'LOGIN': ('NONAUTH',), + 'LOGOUT': ('NONAUTH', 'AUTH', 'SELECTED', 'LOGOUT'), + 'LSUB': ('AUTH', 'SELECTED'), + 'NOOP': ('NONAUTH', 'AUTH', 'SELECTED', 'LOGOUT'), + 'RENAME': ('AUTH', 'SELECTED'), + 'SEARCH': ('SELECTED',), + 'SELECT': ('AUTH', 'SELECTED'), + 'STATUS': ('AUTH', 'SELECTED'), + 'STORE': ('SELECTED',), + 'SUBSCRIBE': ('AUTH', 'SELECTED'), + 'UID': ('SELECTED',), + 'UNSUBSCRIBE': ('AUTH', 'SELECTED'), + } + +# Patterns to match server responses + +Continuation = re.compile(r'\+ (?P.*)') +Flags = re.compile(r'.*FLAGS \((?P[^\)]*)\)') +InternalDate = re.compile(r'.*INTERNALDATE "' + r'(?P[ 123][0-9])-(?P[A-Z][a-z][a-z])-(?P[0-9][0-9][0-9][0-9])' + r' (?P[0-9][0-9]):(?P[0-9][0-9]):(?P[0-9][0-9])' + r' (?P[-+])(?P[0-9][0-9])(?P[0-9][0-9])' + r'"') +Literal = re.compile(r'(?P.*) {(?P\d+)}$') +Response_code = re.compile(r'\[(?P[A-Z-]+)( (?P[^\]]*))?\]') +Untagged_response = re.compile(r'\* (?P[A-Z-]+) (?P.*)') +Untagged_status = re.compile(r'\* (?P\d+) (?P[A-Z-]+)( (?P.*))?') + + + +class IMAP4: + + """IMAP4 client class. + + Instantiate with: IMAP4([host[, port]]) + + host - host's name (default: localhost); + port - port number (default: standard IMAP4 port). + + All IMAP4rev1 commands are supported by methods of the same + name (in lower-case). Each command returns a tuple: (type, [data, ...]) + where 'type' is usually 'OK' or 'NO', and 'data' is either the + text from the tagged response, or untagged results from command. + + Errors raise the exception class .error(""). + IMAP4 server errors raise .abort(""), + which is a sub-class of 'error'. + """ + + class error(Exception): pass # Logical errors - debug required + class abort(error): pass # Service errors - close and retry + COUNT = [0] # Count instantiations + + + def __init__(self, host = '', port = IMAP4_PORT): + self.host = host + self.port = port + self.debug = Debug + self.state = 'LOGOUT' + self.tagged_commands = {} # Tagged commands awaiting response + self.untagged_responses = {} # {typ: [data, ...], ...} + self.continuation_response = '' # Last continuation response + self.tagnum = 0 + + # Open socket to server. + + self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self.sock.connect(self.host, self.port) + self.file = self.sock.makefile('r') + + # Create unique tag for this session, + # and compile tagged response matcher. + + self.COUNT[0] = self.COUNT[0] + 1 + self.tagpre = Int2AP((os.getpid()<<8)+self.COUNT[0]) + self.tagre = re.compile(r'(?P' + + self.tagpre + + r'\d+) (?P[A-Z]+) (?P.*)') + + # Get server welcome message, + # request and store CAPABILITY response. + + if __debug__ and self.debug >= 1: + print '\tnew IMAP4 connection, tag=%s' % self.tagpre + + self.welcome = self._get_response() + if self.untagged_responses.has_key('PREAUTH'): + self.state = 'AUTH' + elif self.untagged_responses.has_key('OK'): + self.state = 'NONAUTH' +# elif self.untagged_responses.has_key('BYE'): + else: + raise self.error(self.welcome) + + cap = 'CAPABILITY' + self._simple_command(cap) + if not self.untagged_responses.has_key(cap): + raise self.error('no CAPABILITY response from server') + self.capabilities = tuple(string.split(self.untagged_responses[cap][-1])) + if not 'IMAP4REV1' in self.capabilities: + raise self.error('server not IMAP4REV1 compliant') + + if __debug__ and self.debug >= 3: + print '\tCAPABILITIES: %s' % `self.capabilities` + + + def __getattr__(self, attr): + """Allow UPPERCASE variants of all following IMAP4 commands.""" + if Commands.has_key(attr): + return eval("self.%s" % string.lower(attr)) + raise AttributeError("Unknown IMAP4 command: '%s'" % attr) + + + # Public methods + + + def append(self, mailbox, flags, date_time, message): + """Append message to named mailbox. + + (typ, [data]) = .append(mailbox, flags, date_time, message) + """ + name = 'APPEND' + if flags: + flags = '(%s)' % flags + else: + flags = None + if date_time: + date_time = Time2Internaldate(date_time) + else: + date_time = None + tag = self._command(name, mailbox, flags, date_time, message) + return self._command_complete(name, tag) + + + def authenticate(self, func): + """Authenticate command - requires response processing. + + UNIMPLEMENTED + """ + raise self.error('UNIMPLEMENTED') + + + def check(self): + """Checkpoint mailbox on server. + + (typ, [data]) = .check() + """ + return self._simple_command('CHECK') + + + def close(self): + """Close currently selected mailbox. + Deleted messages are removed from writable mailbox. + This is the recommended command before 'LOGOUT'. + + (typ, [data]) = .close() + """ + try: + try: typ, dat = self._simple_command('CLOSE') + except EOFError: typ, dat = None, [None] + finally: + self.state = 'AUTH' + return typ, dat + + + def copy(self, message_set, new_mailbox): + """Copy 'message_set' messages onto end of 'new_mailbox'. + + (typ, [data]) = .copy(message_set, new_mailbox) + """ + return self._simple_command('COPY', message_set, new_mailbox) + + + def create(self, mailbox): + """Create new mailbox. + + (typ, [data]) = .create(mailbox) + """ + return self._simple_command('CREATE', mailbox) + + + def delete(self, mailbox): + """Delete old mailbox. + + (typ, [data]) = .delete(mailbox) + """ + return self._simple_command('DELETE', mailbox) + + + def expunge(self): + """Permanently remove deleted items from selected mailbox. + Generates 'EXPUNGE' response for each deleted message. + + (typ, [data]) = .expunge() + + 'data' is list of 'EXPUNGE'd message numbers in order received. + """ + name = 'EXPUNGE' + typ, dat = self._simple_command(name) + return self._untagged_response(typ, name) + + + def fetch(self, message_set, message_parts): + """Fetch (parts of) messages. + + (typ, [data, ...]) = .fetch(message_set, message_parts) + + 'data' are tuples of message part envelope and data. + """ + name = 'FETCH' + typ, dat = self._simple_command(name, message_set, message_parts) + return self._untagged_response(typ, name) + + + def list(self, directory='""', pattern='*'): + """List mailbox names in directory matching pattern. + + (typ, [data]) = .list(directory='""', pattern='*') + + 'data' is list of LIST responses. + """ + name = 'LIST' + typ, dat = self._simple_command(name, directory, pattern) + return self._untagged_response(typ, name) + + + def login(self, user, password): + """Identify client using plaintext password. + + (typ, [data]) = .list(user, password) + """ + if not 'AUTH=LOGIN' in self.capabilities: + raise self.error("server doesn't allow LOGIN authorisation") + typ, dat = self._simple_command('LOGIN', user, password) + if typ != 'OK': + raise self.error(dat) + self.state = 'AUTH' + return typ, dat + + + def logout(self): + """Shutdown connection to server. + + (typ, [data]) = .logout() + + Returns server 'BYE' response. + """ + self.state = 'LOGOUT' + try: typ, dat = self._simple_command('LOGOUT') + except EOFError: typ, dat = None, [None] + self.file.close() + self.sock.close() + if self.untagged_responses.has_key('BYE'): + return 'BYE', self.untagged_responses['BYE'] + return typ, dat + + + def lsub(self, directory='""', pattern='*'): + """List 'subscribed' mailbox names in directory matching pattern. + + (typ, [data, ...]) = .lsub(directory='""', pattern='*') + + 'data' are tuples of message part envelope and data. + """ + name = 'LSUB' + typ, dat = self._simple_command(name, directory, pattern) + return self._untagged_response(typ, name) + + + def recent(self): + """Prompt server for an update. + + (typ, [data]) = .recent() + + 'data' is None if no new messages, + else value of RECENT response. + """ + name = 'RECENT' + typ, dat = self._untagged_response('OK', name) + if dat[-1]: + return typ, dat + typ, dat = self._simple_command('NOOP') + return self._untagged_response(typ, name) + + + def rename(self, oldmailbox, newmailbox): + """Rename old mailbox name to new. + + (typ, data) = .rename(oldmailbox, newmailbox) + """ + return self._simple_command('RENAME', oldmailbox, newmailbox) + + + def response(self, code): + """Return data for response 'code' if received, or None. + + (code, [data]) = .response(code) + """ + return code, self.untagged_responses.get(code, [None]) + + + def search(self, charset, criteria): + """Search mailbox for matching messages. + + (typ, [data]) = .search(charset, criteria) + + 'data' is space separated list of matching message numbers. + """ + name = 'SEARCH' + if charset: + charset = 'CHARSET ' + charset + typ, dat = self._simple_command(name, charset, criteria) + return self._untagged_response(typ, name) + + + def select(self, mailbox='INBOX', readonly=None): + """Select a mailbox. + + (typ, [data]) = .select(mailbox='INBOX', readonly=None) + + 'data' is count of messages in mailbox ('EXISTS' response). + """ + # Mandated responses are ('FLAGS', 'EXISTS', 'RECENT', 'UIDVALIDITY') + # Remove immediately interesting responses + for r in ('EXISTS', 'READ-WRITE'): + if self.untagged_responses.has_key(r): + del self.untagged_responses[r] + if readonly: + name = 'EXAMINE' + else: + name = 'SELECT' + typ, dat = self._simple_command(name, mailbox) + if typ == 'OK': + self.state = 'SELECTED' + elif typ == 'NO': + self.state = 'AUTH' + if not readonly and not self.untagged_responses.has_key('READ-WRITE'): + raise self.error('%s is not writable' % mailbox) + return typ, self.untagged_responses.get('EXISTS', [None]) + + + def status(self, mailbox, names): + """Request named status conditions for mailbox. + + (typ, [data]) = .status(mailbox, names) + """ + name = 'STATUS' + typ, dat = self._simple_command(name, mailbox, names) + return self._untagged_response(typ, name) + + + def store(self, message_set, command, flag_list): + """Alters flag dispositions for messages in mailbox. + + (typ, [data]) = .store(message_set, command, flag_list) + """ + command = '%s %s' % (command, flag_list) + typ, dat = self._simple_command('STORE', message_set, command) + return self._untagged_response(typ, 'FETCH') + + + def subscribe(self, mailbox): + """Subscribe to new mailbox. + + (typ, [data]) = .subscribe(mailbox) + """ + return self._simple_command('SUBSCRIBE', mailbox) + + + def uid(self, command, args): + """Execute "command args" with messages identified by UID, + rather than message number. + + (typ, [data]) = .uid(command, args) + + Returns response appropriate to 'command'. + """ + name = 'UID' + typ, dat = self._simple_command('UID', command, args) + if command == 'SEARCH': + name = 'SEARCH' + else: + name = 'FETCH' + typ, dat2 = self._untagged_response(typ, name) + if dat2[-1]: dat = dat2 + return typ, dat + + + def unsubscribe(self, mailbox): + """Unsubscribe from old mailbox. + + (typ, [data]) = .unsubscribe(mailbox) + """ + return self._simple_command('UNSUBSCRIBE', mailbox) + + + def xatom(self, name, arg1=None, arg2=None): + """Allow simple extension commands + notified by server in CAPABILITY response. + + (typ, [data]) = .xatom(name, arg1=None, arg2=None) + """ + if name[0] != 'X' or not name in self.capabilities: + raise self.error('unknown extension command: %s' % name) + return self._simple_command(name, arg1, arg2) + + + + # Private methods + + + def _append_untagged(self, typ, dat): + + if self.untagged_responses.has_key(typ): + self.untagged_responses[typ].append(dat) + else: + self.untagged_responses[typ] = [dat] + + if __debug__ and self.debug >= 5: + print '\tuntagged_responses[%s] += %.20s..' % (typ, `dat`) + + + def _command(self, name, dat1=None, dat2=None, dat3=None, literal=None): + + if self.state not in Commands[name]: + raise self.error( + 'command %s illegal in state %s' % (name, self.state)) + + tag = self._new_tag() + data = '%s %s' % (tag, name) + for d in (dat1, dat2, dat3): + if d is not None: data = '%s %s' % (data, d) + if literal is not None: + data = '%s {%s}' % (data, len(literal)) + + try: + self.sock.send('%s%s' % (data, CRLF)) + except socket.error, val: + raise self.abort('socket error: %s' % val) + + if __debug__ and self.debug >= 4: + print '\t> %s' % data + + if literal is None: + return tag + + # Wait for continuation response + + while self._get_response(): + if self.tagged_commands[tag]: # BAD/NO? + return tag + + # Send literal + + if __debug__ and self.debug >= 4: + print '\twrite literal size %s' % len(literal) + + try: + self.sock.send(literal) + self.sock.send(CRLF) + except socket.error, val: + raise self.abort('socket error: %s' % val) + + return tag + + + def _command_complete(self, name, tag): + try: + typ, data = self._get_tagged_response(tag) + except self.abort, val: + raise self.abort('command: %s => %s' % (name, val)) + except self.error, val: + raise self.error('command: %s => %s' % (name, val)) + if self.untagged_responses.has_key('BYE') and name != 'LOGOUT': + raise self.abort(self.untagged_responses['BYE'][-1]) + if typ == 'BAD': + raise self.error('%s command error: %s %s' % (name, typ, data)) + return typ, data + + + def _get_response(self): + + # Read response and store. + # + # Returns None for continuation responses, + # otherwise first response line received + + # Protocol mandates all lines terminated by CRLF. + + resp = self._get_line()[:-2] + + # Command completion response? + + if self._match(self.tagre, resp): + tag = self.mo.group('tag') + if not self.tagged_commands.has_key(tag): + raise self.abort('unexpected tagged response: %s' % resp) + + typ = self.mo.group('type') + dat = self.mo.group('data') + self.tagged_commands[tag] = (typ, [dat]) + else: + dat2 = None + + # '*' (untagged) responses? + + if not self._match(Untagged_response, resp): + if self._match(Untagged_status, resp): + dat2 = self.mo.group('data2') + + if self.mo is None: + # Only other possibility is '+' (continuation) rsponse... + + if self._match(Continuation, resp): + self.continuation_response = self.mo.group('data') + return None # NB: indicates continuation + + raise self.abort('unexpected response: %s' % resp) + + typ = self.mo.group('type') + dat = self.mo.group('data') + if dat2: dat = dat + ' ' + dat2 + + # Is there a literal to come? + + while self._match(Literal, dat): + + # Read literal direct from connection. + + size = string.atoi(self.mo.group('size')) + if __debug__ and self.debug >= 4: + print '\tread literal size %s' % size + data = self.file.read(size) + + # Store response with literal as tuple + + self._append_untagged(typ, (dat, data)) + + # Read trailer - possibly containing another literal + + dat = self._get_line()[:-2] + + self._append_untagged(typ, dat) + + # Bracketed response information? + + if typ in ('OK', 'NO', 'BAD') and self._match(Response_code, dat): + self._append_untagged(self.mo.group('type'), self.mo.group('data')) + + return resp + + + def _get_tagged_response(self, tag): + + while 1: + result = self.tagged_commands[tag] + if result is not None: + del self.tagged_commands[tag] + return result + self._get_response() + + + def _get_line(self): + + line = self.file.readline() + if not line: + raise EOFError + + # Protocol mandates all lines terminated by CRLF + + if __debug__ and self.debug >= 4: + print '\t< %s' % line[:-2] + return line + + + def _match(self, cre, s): + + # Run compiled regular expression match method on 's'. + # Save result, return success. + + self.mo = cre.match(s) + if __debug__ and self.mo is not None and self.debug >= 5: + print "\tmatched r'%s' => %s" % (cre.pattern, `self.mo.groups()`) + return self.mo is not None + + + def _new_tag(self): + + tag = '%s%s' % (self.tagpre, self.tagnum) + self.tagnum = self.tagnum + 1 + self.tagged_commands[tag] = None + return tag + + + def _simple_command(self, name, dat1=None, dat2=None): + + return self._command_complete(name, self._command(name, dat1, dat2)) + + + def _untagged_response(self, typ, name): + + if not self.untagged_responses.has_key(name): + return typ, [None] + data = self.untagged_responses[name] + del self.untagged_responses[name] + return typ, data + + + +Mon2num = {'Jan': 1, 'Feb': 2, 'Mar': 3, 'Apr': 4, 'May': 5, 'Jun': 6, + 'Jul': 7, 'Aug': 8, 'Sep': 9, 'Oct': 10, 'Nov': 11, 'Dec': 12} + +def Internaldate2tuple(resp): + + """ + Convert IMAP4 INTERNALDATE to UT. + + Returns Python time module tuple. + """ + + mo = InternalDate.match(resp) + if not mo: + return None + + mon = Mon2num[mo.group('mon')] + zonen = mo.group('zonen') + + for name in ('day', 'year', 'hour', 'min', 'sec', 'zoneh', 'zonem'): + exec "%s = string.atoi(mo.group('%s'))" % (name, name) + + # INTERNALDATE timezone must be subtracted to get UT + + zone = (zoneh*60 + zonem)*60 + if zonen == '-': + zone = -zone + + tt = (year, mon, day, hour, min, sec, -1, -1, -1) + + utc = time.mktime(tt) + + # Following is necessary because the time module has no 'mkgmtime'. + # 'mktime' assumes arg in local timezone, so adds timezone/altzone. + + lt = time.localtime(utc) + if time.daylight and lt[-1]: + zone = zone + time.altzone + else: + zone = zone + time.timezone + + return time.localtime(utc - zone) + + + +def Int2AP(num): + + """Convert integer to A-P string representation. """ + + val = ''; AP = 'ABCDEFGHIJKLMNOP' + while num: + num, mod = divmod(num, 16) + val = AP[mod] + val + return val + + + +def ParseFlags(resp): + + """Convert IMAP4 flags response to python tuple. """ + + mo = Flags.match(resp) + if not mo: + return () + + return tuple(string.split(mo.group('flags'))) + + +def Time2Internaldate(date_time): + + """Convert 'date_time' to IMAP4 INTERNALDATE representation. + + Return string in form: '"DD-Mmm-YYYY HH:MM:SS +HHMM"' + """ + + dttype = type(date_time) + if dttype is type(1): + tt = time.localtime(date_time) + elif dttype is type(()): + tt = date_time + elif dttype is type(""): + return date_time # Assume in correct format + else: raise ValueError + + dt = time.strftime("%d-%b-%Y %H:%M:%S", tt) + if dt[0] == '0': + dt = ' ' + dt[1:] + if time.daylight and tt[-1]: + zone = -time.altzone + else: + zone = -time.timezone + return '"' + dt + " %+02d%02d" % divmod(zone/60, 60) + '"' + + + +if __debug__ and __name__ == '__main__': + + import getpass + USER = getpass.getuser() + PASSWD = getpass.getpass() + + test_seq1 = ( + ('login', (USER, PASSWD)), + ('create', ('/tmp/xxx',)), + ('rename', ('/tmp/xxx', '/tmp/yyy')), + ('CREATE', ('/tmp/yyz',)), + ('append', ('/tmp/yyz', None, None, 'From: anon@x.y.z\n\ndata...')), + ('select', ('/tmp/yyz',)), + ('recent', ()), + ('uid', ('SEARCH', 'ALL')), + ('fetch', ('1', '(INTERNALDATE RFC822)')), + ('store', ('1', 'FLAGS', '(\Deleted)')), + ('expunge', ()), + ('close', ()), + ) + + test_seq2 = ( + ('select', ()), + ('response',('UIDVALIDITY',)), + ('uid', ('SEARCH', 'ALL')), + ('recent', ()), + ('response', ('EXISTS',)), + ('logout', ()), + ) + + def run(cmd, args): + typ, dat = apply(eval('M.%s' % cmd), args) + print ' %s %s\n => %s %s' % (cmd, args, typ, dat) + return dat + + Debug = 4 + M = IMAP4() + + for cmd,args in test_seq1: + run(cmd, args) + + for ml in M.list('/tmp/', 'yy%')[1]: + path = string.split(ml)[-1] + print '%s %s' % M.delete(path) + + for cmd,args in test_seq2: + dat = run(cmd, args) + + if (cmd,args) == ('uid', ('SEARCH', 'ALL')): + uid = string.split(dat[0])[-1] + run('uid', ('FETCH', '%s (FLAGS INTERNALDATE RFC822.SIZE RFC822.HEADER RFC822)' % uid)) -- cgit v0.12