diff options
-rw-r--r-- | Lib/imaplib.py | 153 |
1 files changed, 115 insertions, 38 deletions
diff --git a/Lib/imaplib.py b/Lib/imaplib.py index 5cf0df7..1c9be25 100644 --- a/Lib/imaplib.py +++ b/Lib/imaplib.py @@ -14,8 +14,9 @@ Public functions: Internaldate2tuple # # Authentication code contributed by Donn Cave <donn@u.washington.edu> June 1998. # String method conversion by ESR, February 2001. +# GET/SETACL contributed by Anthony Baxter <anthony@interlink.com.au> April 2001. -__version__ = "2.40" +__version__ = "2.47" import binascii, re, socket, time, random, sys @@ -44,21 +45,24 @@ Commands = { 'EXAMINE': ('AUTH', 'SELECTED'), 'EXPUNGE': ('SELECTED',), 'FETCH': ('SELECTED',), + 'GETACL': ('AUTH', 'SELECTED'), 'LIST': ('AUTH', 'SELECTED'), 'LOGIN': ('NONAUTH',), 'LOGOUT': ('NONAUTH', 'AUTH', 'SELECTED', 'LOGOUT'), 'LSUB': ('AUTH', 'SELECTED'), + 'NAMESPACE': ('AUTH', 'SELECTED'), 'NOOP': ('NONAUTH', 'AUTH', 'SELECTED', 'LOGOUT'), 'PARTIAL': ('SELECTED',), 'RENAME': ('AUTH', 'SELECTED'), 'SEARCH': ('SELECTED',), 'SELECT': ('AUTH', 'SELECTED'), + 'SETACL': ('AUTH', 'SELECTED'), + 'SORT': ('SELECTED',), 'STATUS': ('AUTH', 'SELECTED'), 'STORE': ('SELECTED',), 'SUBSCRIBE': ('AUTH', 'SELECTED'), 'UID': ('SELECTED',), 'UNSUBSCRIBE': ('AUTH', 'SELECTED'), - 'NAMESPACE': ('AUTH', 'SELECTED'), } # Patterns to match server responses @@ -155,6 +159,7 @@ class IMAP4: if __debug__: if self.debug >= 1: + _mesg('imaplib version %s' % __version__) _mesg('new IMAP4 connection, tag=%s' % self.tagpre) self.welcome = self._get_response() @@ -187,21 +192,57 @@ class IMAP4: def __getattr__(self, attr): # Allow UPPERCASE variants of IMAP4 command methods. if Commands.has_key(attr): - return eval("self.%s" % attr.lower()) + return getattr(self, attr.lower()) raise AttributeError("Unknown IMAP4 command: '%s'" % attr) - # Public methods + # Overridable methods def open(self, host, port): - """Setup 'self.sock' and 'self.file'.""" + """Setup connection to remote server on "host:port". + This connection will be used by the routines: + read, readline, send, shutdown. + """ self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.sock.connect((self.host, self.port)) self.file = self.sock.makefile('r') + def read(self, size): + """Read 'size' bytes from remote.""" + return self.file.read(size) + + + def readline(self): + """Read line from remote.""" + return self.file.readline() + + + def send(self, data): + """Send data to remote.""" + self.sock.send(data) + + + def shutdown(self): + """Close I/O established in "open".""" + self.file.close() + self.sock.close() + + + def socket(self): + """Return socket instance used to connect to IMAP4 server. + + socket = <instance>.socket() + """ + return self.sock + + + + # Utility methods + + def recent(self): """Return most recent 'RECENT' responses if any exist, else prompt server for an update using the 'NOOP' command. @@ -229,14 +270,6 @@ class IMAP4: return self._untagged_response(code, [None], code.upper()) - def socket(self): - """Return socket instance used to connect to IMAP4 server. - - socket = <instance>.socket() - """ - return self.sock - - # IMAP4 commands @@ -368,6 +401,15 @@ class IMAP4: return self._untagged_response(typ, dat, name) + def getacl(self, mailbox): + """Get the ACLs for a mailbox. + + (typ, [data]) = <instance>.getacl(mailbox) + """ + typ, dat = self._simple_command('GETACL', mailbox) + return self._untagged_response(typ, dat, 'ACL') + + def list(self, directory='""', pattern='*'): """List mailbox names in directory matching pattern. @@ -406,8 +448,7 @@ class IMAP4: self.state = 'LOGOUT' try: typ, dat = self._simple_command('LOGOUT') except: typ, dat = 'NO', ['%s: %s' % sys.exc_info()[:2]] - self.file.close() - self.sock.close() + self.shutdown() if self.untagged_responses.has_key('BYE'): return 'BYE', self.untagged_responses['BYE'] return typ, dat @@ -425,6 +466,16 @@ class IMAP4: return self._untagged_response(typ, dat, name) + def namespace(self): + """ Returns IMAP namespaces ala rfc2342 + + (typ, [data, ...]) = <instance>.namespace() + """ + name = 'NAMESPACE' + typ, dat = self._simple_command(name) + return self._untagged_response(typ, dat, name) + + def noop(self): """Send NOOP command. @@ -465,8 +516,9 @@ class IMAP4: """ name = 'SEARCH' if charset: - charset = 'CHARSET ' + charset - typ, dat = apply(self._simple_command, (name, charset) + criteria) + typ, dat = apply(self._simple_command, (name, 'CHARSET', charset) + criteria) + else: + typ, dat = apply(self._simple_command, (name,) + criteria) return self._untagged_response(typ, dat, name) @@ -500,14 +552,36 @@ class IMAP4: return typ, self.untagged_responses.get('EXISTS', [None]) + def setacl(self, mailbox, who, what): + """Set a mailbox acl. + + (typ, [data]) = <instance>.create(mailbox, who, what) + """ + return self._simple_command('SETACL', mailbox, who, what) + + + def sort(self, sort_criteria, charset, *search_criteria): + """IMAP4rev1 extension SORT command. + + (typ, [data]) = <instance>.sort(sort_criteria, charset, search_criteria, ...) + """ + name = 'SORT' + #if not name in self.capabilities: # Let the server decide! + # raise self.error('unimplemented extension command: %s' % name) + if (sort_criteria[0],sort_criteria[-1]) != ('(',')'): + sort_criteria = '(%s)' % sort_criteria + typ, dat = apply(self._simple_command, (name, sort_criteria, charset) + search_criteria) + return self._untagged_response(typ, dat, name) + + def status(self, mailbox, names): """Request named status conditions for mailbox. (typ, [data]) = <instance>.status(mailbox, names) """ name = 'STATUS' - if self.PROTOCOL_VERSION == 'IMAP4': - raise self.error('%s unimplemented in IMAP4 (obtain IMAP4rev1 server, or re-code)' % name) + #if self.PROTOCOL_VERSION == 'IMAP4': # Let the server decide! + # raise self.error('%s unimplemented in IMAP4 (obtain IMAP4rev1 server, or re-code)' % name) typ, dat = self._simple_command(name, mailbox, names) return self._untagged_response(typ, dat, name) @@ -547,8 +621,8 @@ class IMAP4: % (command, self.state)) name = 'UID' typ, dat = apply(self._simple_command, (name, command) + args) - if command == 'SEARCH': - name = 'SEARCH' + if command in ('SEARCH', 'SORT'): + name = command else: name = 'FETCH' return self._untagged_response(typ, dat, name) @@ -566,18 +640,19 @@ class IMAP4: """Allow simple extension commands notified by server in CAPABILITY response. + Assumes command is legal in current state. + (typ, [data]) = <instance>.xatom(name, arg, ...) + + Returns response appropriate to extension command `name'. """ - if name[0] != 'X' or not name in self.capabilities: - raise self.error('unknown extension command: %s' % name) + name = name.upper() + #if not name in self.capabilities: # Let the server decide! + # raise self.error('unknown extension command: %s' % name) + if not Commands.has_key(name): + Commands[name] = (self.state,) return apply(self._simple_command, (name,) + args) - def namespace(self): - """ Returns IMAP namespaces ala rfc2342 - """ - name = 'NAMESPACE' - typ, dat = self._simple_command(name) - return self._untagged_response(typ, dat, name) # Private methods @@ -640,8 +715,8 @@ class IMAP4: _log('> %s' % data) try: - self.sock.send('%s%s' % (data, CRLF)) - except socket.error, val: + self.send('%s%s' % (data, CRLF)) + except (socket.error, OSError), val: raise self.abort('socket error: %s' % val) if literal is None: @@ -664,9 +739,9 @@ class IMAP4: _mesg('write literal size %s' % len(literal)) try: - self.sock.send(literal) - self.sock.send(CRLF) - except socket.error, val: + self.send(literal) + self.send(CRLF) + except (socket.error, OSError), val: raise self.abort('socket error: %s' % val) if not literator: @@ -741,7 +816,7 @@ class IMAP4: if __debug__: if self.debug >= 4: _mesg('read literal size %s' % size) - data = self.file.read(size) + data = self.read(size) # Store response with literal as tuple @@ -789,7 +864,7 @@ class IMAP4: def _get_line(self): - line = self.file.readline() + line = self.readline() if not line: raise self.abort('socket error: EOF') @@ -1059,7 +1134,7 @@ if __name__ == '__main__': host = args[0] USER = getpass.getuser() - PASSWD = getpass.getpass("IMAP password for %s on %s:" % (USER, host or "localhost")) + PASSWD = getpass.getpass("IMAP password for %s on %s: " % (USER, host or "localhost")) test_mesg = 'From: %s@localhost\nSubject: IMAP4 test\n\ndata...\n' % USER test_seq1 = ( @@ -1073,6 +1148,7 @@ if __name__ == '__main__': ('search', (None, 'SUBJECT', 'test')), ('partial', ('1', 'RFC822', 1, 1024)), ('store', ('1', 'FLAGS', '(\Deleted)')), + ('namespace', ()), ('expunge', ()), ('recent', ()), ('close', ()), @@ -1090,13 +1166,14 @@ if __name__ == '__main__': def run(cmd, args): _mesg('%s %s' % (cmd, args)) - typ, dat = apply(eval('M.%s' % cmd), args) + typ, dat = apply(getattr(M, cmd), args) _mesg('%s => %s %s' % (cmd, typ, dat)) return dat try: M = IMAP4(host) _mesg('PROTOCOL_VERSION = %s' % M.PROTOCOL_VERSION) + _mesg('CAPABILITIES = %s' % `M.capabilities`) for cmd,args in test_seq1: run(cmd, args) |