summaryrefslogtreecommitdiffstats
path: root/Lib/imaplib.py
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/imaplib.py')
-rw-r--r--Lib/imaplib.py1637
1 files changed, 818 insertions, 819 deletions
diff --git a/Lib/imaplib.py b/Lib/imaplib.py
index 4e34654..954ecf6 100644
--- a/Lib/imaplib.py
+++ b/Lib/imaplib.py
@@ -1,71 +1,70 @@
-
"""IMAP4 client.
Based on RFC 2060.
-Public class: IMAP4
-Public variable: Debug
-Public functions: Internaldate2tuple
- Int2AP
- ParseFlags
- Time2Internaldate
+Public class: IMAP4
+Public variable: Debug
+Public functions: Internaldate2tuple
+ Int2AP
+ ParseFlags
+ Time2Internaldate
"""
# Author: Piers Lauder <piers@cs.su.oz.au> December 1997.
-#
+#
# Authentication code contributed by Donn Cave <donn@u.washington.edu> June 1998.
__version__ = "2.39"
import binascii, re, socket, string, time, random, sys
-# Globals
+# Globals
CRLF = '\r\n'
Debug = 0
IMAP4_PORT = 143
-AllowedVersions = ('IMAP4REV1', 'IMAP4') # Most recent first
+AllowedVersions = ('IMAP4REV1', 'IMAP4') # Most recent first
-# Commands
+# Commands
Commands = {
- # name valid states
- 'APPEND': ('AUTH', 'SELECTED'),
- 'AUTHENTICATE': ('NONAUTH',),
- 'CAPABILITY': ('NONAUTH', 'AUTH', 'SELECTED', 'LOGOUT'),
- 'CHECK': ('SELECTED',),
- 'CLOSE': ('SELECTED',),
- 'COPY': ('SELECTED',),
- 'CREATE': ('AUTH', 'SELECTED'),
- 'DELETE': ('AUTH', 'SELECTED'),
- 'EXAMINE': ('AUTH', 'SELECTED'),
- 'EXPUNGE': ('SELECTED',),
- 'FETCH': ('SELECTED',),
- 'LIST': ('AUTH', 'SELECTED'),
- 'LOGIN': ('NONAUTH',),
- 'LOGOUT': ('NONAUTH', 'AUTH', 'SELECTED', 'LOGOUT'),
- 'LSUB': ('AUTH', 'SELECTED'),
- 'NOOP': ('NONAUTH', 'AUTH', 'SELECTED', 'LOGOUT'),
- 'PARTIAL': ('SELECTED',),
- 'RENAME': ('AUTH', 'SELECTED'),
- 'SEARCH': ('SELECTED',),
- 'SELECT': ('AUTH', 'SELECTED'),
- 'STATUS': ('AUTH', 'SELECTED'),
- 'STORE': ('SELECTED',),
- 'SUBSCRIBE': ('AUTH', 'SELECTED'),
- 'UID': ('SELECTED',),
- 'UNSUBSCRIBE': ('AUTH', 'SELECTED'),
- }
-
-# Patterns to match server responses
+ # name valid states
+ 'APPEND': ('AUTH', 'SELECTED'),
+ 'AUTHENTICATE': ('NONAUTH',),
+ 'CAPABILITY': ('NONAUTH', 'AUTH', 'SELECTED', 'LOGOUT'),
+ 'CHECK': ('SELECTED',),
+ 'CLOSE': ('SELECTED',),
+ 'COPY': ('SELECTED',),
+ 'CREATE': ('AUTH', 'SELECTED'),
+ 'DELETE': ('AUTH', 'SELECTED'),
+ 'EXAMINE': ('AUTH', 'SELECTED'),
+ 'EXPUNGE': ('SELECTED',),
+ 'FETCH': ('SELECTED',),
+ 'LIST': ('AUTH', 'SELECTED'),
+ 'LOGIN': ('NONAUTH',),
+ 'LOGOUT': ('NONAUTH', 'AUTH', 'SELECTED', 'LOGOUT'),
+ 'LSUB': ('AUTH', 'SELECTED'),
+ 'NOOP': ('NONAUTH', 'AUTH', 'SELECTED', 'LOGOUT'),
+ 'PARTIAL': ('SELECTED',),
+ 'RENAME': ('AUTH', 'SELECTED'),
+ 'SEARCH': ('SELECTED',),
+ 'SELECT': ('AUTH', 'SELECTED'),
+ 'STATUS': ('AUTH', 'SELECTED'),
+ 'STORE': ('SELECTED',),
+ 'SUBSCRIBE': ('AUTH', 'SELECTED'),
+ 'UID': ('SELECTED',),
+ 'UNSUBSCRIBE': ('AUTH', 'SELECTED'),
+ }
+
+# Patterns to match server responses
Continuation = re.compile(r'\+( (?P<data>.*))?')
Flags = re.compile(r'.*FLAGS \((?P<flags>[^\)]*)\)')
InternalDate = re.compile(r'.*INTERNALDATE "'
- r'(?P<day>[ 123][0-9])-(?P<mon>[A-Z][a-z][a-z])-(?P<year>[0-9][0-9][0-9][0-9])'
- r' (?P<hour>[0-9][0-9]):(?P<min>[0-9][0-9]):(?P<sec>[0-9][0-9])'
- r' (?P<zonen>[-+])(?P<zoneh>[0-9][0-9])(?P<zonem>[0-9][0-9])'
- r'"')
+ r'(?P<day>[ 123][0-9])-(?P<mon>[A-Z][a-z][a-z])-(?P<year>[0-9][0-9][0-9][0-9])'
+ r' (?P<hour>[0-9][0-9]):(?P<min>[0-9][0-9]):(?P<sec>[0-9][0-9])'
+ r' (?P<zonen>[-+])(?P<zoneh>[0-9][0-9])(?P<zonem>[0-9][0-9])'
+ r'"')
Literal = re.compile(r'.*{(?P<size>\d+)}$')
Response_code = re.compile(r'\[(?P<type>[A-Z-]+)( (?P<data>[^\]]*))?\]')
Untagged_response = re.compile(r'\* (?P<type>[A-Z-]+)( (?P<data>.*))?')
@@ -75,1045 +74,1045 @@ Untagged_status = re.compile(r'\* (?P<data>\d+) (?P<type>[A-Z-]+)( (?P<data2>.*)
class IMAP4:
- """IMAP4 client class.
-
- Instantiate with: IMAP4([host[, port]])
-
- host - host's name (default: localhost);
- port - port number (default: standard IMAP4 port).
-
- All IMAP4rev1 commands are supported by methods of the same
- name (in lower-case).
-
- All arguments to commands are converted to strings, except for
- AUTHENTICATE, and the last argument to APPEND which is passed as
- an IMAP4 literal. If necessary (the string contains any
- non-printing characters or white-space and isn't enclosed with
- either parentheses or double quotes) each string is quoted.
- However, the 'password' argument to the LOGIN command is always
- quoted. If you want to avoid having an argument string quoted
- (eg: the 'flags' argument to STORE) then enclose the string in
- parentheses (eg: "(\Deleted)").
+ """IMAP4 client class.
+
+ Instantiate with: IMAP4([host[, port]])
+
+ host - host's name (default: localhost);
+ port - port number (default: standard IMAP4 port).
+
+ All IMAP4rev1 commands are supported by methods of the same
+ name (in lower-case).
+
+ All arguments to commands are converted to strings, except for
+ AUTHENTICATE, and the last argument to APPEND which is passed as
+ an IMAP4 literal. If necessary (the string contains any
+ non-printing characters or white-space and isn't enclosed with
+ either parentheses or double quotes) each string is quoted.
+ However, the 'password' argument to the LOGIN command is always
+ quoted. If you want to avoid having an argument string quoted
+ (eg: the 'flags' argument to STORE) then enclose the string in
+ parentheses (eg: "(\Deleted)").
- Each command returns a tuple: (type, [data, ...]) where 'type'
- is usually 'OK' or 'NO', and 'data' is either the text from the
- tagged response, or untagged results from command.
+ Each command returns a tuple: (type, [data, ...]) where 'type'
+ is usually 'OK' or 'NO', and 'data' is either the text from the
+ tagged response, or untagged results from command.
- Errors raise the exception class <instance>.error("<reason>").
- IMAP4 server errors raise <instance>.abort("<reason>"),
- which is a sub-class of 'error'. Mailbox status changes
- from READ-WRITE to READ-ONLY raise the exception class
- <instance>.readonly("<reason>"), which is a sub-class of 'abort'.
+ Errors raise the exception class <instance>.error("<reason>").
+ IMAP4 server errors raise <instance>.abort("<reason>"),
+ which is a sub-class of 'error'. Mailbox status changes
+ from READ-WRITE to READ-ONLY raise the exception class
+ <instance>.readonly("<reason>"), which is a sub-class of 'abort'.
- "error" exceptions imply a program error.
- "abort" exceptions imply the connection should be reset, and
- the command re-tried.
- "readonly" exceptions imply the command should be re-tried.
+ "error" exceptions imply a program error.
+ "abort" exceptions imply the connection should be reset, and
+ the command re-tried.
+ "readonly" exceptions imply the command should be re-tried.
- Note: to use this module, you must read the RFCs pertaining
- to the IMAP4 protocol, as the semantics of the arguments to
- each IMAP4 command are left to the invoker, not to mention
- the results.
- """
+ Note: to use this module, you must read the RFCs pertaining
+ to the IMAP4 protocol, as the semantics of the arguments to
+ each IMAP4 command are left to the invoker, not to mention
+ the results.
+ """
- class error(Exception): pass # Logical errors - debug required
- class abort(error): pass # Service errors - close and retry
- class readonly(abort): pass # Mailbox status changed to READ-ONLY
+ class error(Exception): pass # Logical errors - debug required
+ class abort(error): pass # Service errors - close and retry
+ class readonly(abort): pass # Mailbox status changed to READ-ONLY
- mustquote = re.compile(r"[^\w!#$%&'*+,.:;<=>?^`|~-]")
+ mustquote = re.compile(r"[^\w!#$%&'*+,.:;<=>?^`|~-]")
- def __init__(self, host = '', port = IMAP4_PORT):
- self.host = host
- self.port = port
- self.debug = Debug
- self.state = 'LOGOUT'
- self.literal = None # A literal argument to a command
- self.tagged_commands = {} # Tagged commands awaiting response
- self.untagged_responses = {} # {typ: [data, ...], ...}
- self.continuation_response = '' # Last continuation response
- self.is_readonly = None # READ-ONLY desired state
- self.tagnum = 0
+ def __init__(self, host = '', port = IMAP4_PORT):
+ self.host = host
+ self.port = port
+ self.debug = Debug
+ self.state = 'LOGOUT'
+ self.literal = None # A literal argument to a command
+ self.tagged_commands = {} # Tagged commands awaiting response
+ self.untagged_responses = {} # {typ: [data, ...], ...}
+ self.continuation_response = '' # Last continuation response
+ self.is_readonly = None # READ-ONLY desired state
+ self.tagnum = 0
- # Open socket to server.
+ # Open socket to server.
- self.open(host, port)
+ self.open(host, port)
- # Create unique tag for this session,
- # and compile tagged response matcher.
+ # Create unique tag for this session,
+ # and compile tagged response matcher.
- self.tagpre = Int2AP(random.randint(0, 31999))
- self.tagre = re.compile(r'(?P<tag>'
- + self.tagpre
- + r'\d+) (?P<type>[A-Z]+) (?P<data>.*)')
+ self.tagpre = Int2AP(random.randint(0, 31999))
+ self.tagre = re.compile(r'(?P<tag>'
+ + self.tagpre
+ + r'\d+) (?P<type>[A-Z]+) (?P<data>.*)')
- # Get server welcome message,
- # request and store CAPABILITY response.
+ # Get server welcome message,
+ # request and store CAPABILITY response.
- if __debug__:
- if self.debug >= 1:
- _mesg('new IMAP4 connection, tag=%s' % self.tagpre)
+ if __debug__:
+ if self.debug >= 1:
+ _mesg('new IMAP4 connection, tag=%s' % self.tagpre)
- self.welcome = self._get_response()
- if self.untagged_responses.has_key('PREAUTH'):
- self.state = 'AUTH'
- elif self.untagged_responses.has_key('OK'):
- self.state = 'NONAUTH'
- else:
- raise self.error(self.welcome)
+ self.welcome = self._get_response()
+ if self.untagged_responses.has_key('PREAUTH'):
+ self.state = 'AUTH'
+ elif self.untagged_responses.has_key('OK'):
+ self.state = 'NONAUTH'
+ else:
+ raise self.error(self.welcome)
- cap = 'CAPABILITY'
- self._simple_command(cap)
- if not self.untagged_responses.has_key(cap):
- raise self.error('no CAPABILITY response from server')
- self.capabilities = tuple(string.split(string.upper(self.untagged_responses[cap][-1])))
+ cap = 'CAPABILITY'
+ self._simple_command(cap)
+ if not self.untagged_responses.has_key(cap):
+ raise self.error('no CAPABILITY response from server')
+ self.capabilities = tuple(string.split(string.upper(self.untagged_responses[cap][-1])))
- if __debug__:
- if self.debug >= 3:
- _mesg('CAPABILITIES: %s' % `self.capabilities`)
+ if __debug__:
+ if self.debug >= 3:
+ _mesg('CAPABILITIES: %s' % `self.capabilities`)
- for version in AllowedVersions:
- if not version in self.capabilities:
- continue
- self.PROTOCOL_VERSION = version
- return
+ for version in AllowedVersions:
+ if not version in self.capabilities:
+ continue
+ self.PROTOCOL_VERSION = version
+ return
- raise self.error('server not IMAP4 compliant')
+ raise self.error('server not IMAP4 compliant')
- def __getattr__(self, attr):
- # Allow UPPERCASE variants of IMAP4 command methods.
- if Commands.has_key(attr):
- return eval("self.%s" % string.lower(attr))
- raise AttributeError("Unknown IMAP4 command: '%s'" % attr)
-
+ def __getattr__(self, attr):
+ # Allow UPPERCASE variants of IMAP4 command methods.
+ if Commands.has_key(attr):
+ return eval("self.%s" % string.lower(attr))
+ raise AttributeError("Unknown IMAP4 command: '%s'" % attr)
+
- # Public methods
+ # Public methods
- def open(self, host, port):
- """Setup 'self.sock' and 'self.file'."""
- self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- self.sock.connect((self.host, self.port))
- self.file = self.sock.makefile('r')
+ def open(self, host, port):
+ """Setup 'self.sock' and 'self.file'."""
+ self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ self.sock.connect((self.host, self.port))
+ self.file = self.sock.makefile('r')
- def recent(self):
- """Return most recent 'RECENT' responses if any exist,
- else prompt server for an update using the 'NOOP' command.
+ def recent(self):
+ """Return most recent 'RECENT' responses if any exist,
+ else prompt server for an update using the 'NOOP' command.
- (typ, [data]) = <instance>.recent()
+ (typ, [data]) = <instance>.recent()
- 'data' is None if no new messages,
- else list of RECENT responses, most recent last.
- """
- name = 'RECENT'
- typ, dat = self._untagged_response('OK', [None], name)
- if dat[-1]:
- return typ, dat
- typ, dat = self.noop() # Prod server for response
- return self._untagged_response(typ, dat, name)
+ 'data' is None if no new messages,
+ else list of RECENT responses, most recent last.
+ """
+ name = 'RECENT'
+ typ, dat = self._untagged_response('OK', [None], name)
+ if dat[-1]:
+ return typ, dat
+ typ, dat = self.noop() # Prod server for response
+ return self._untagged_response(typ, dat, name)
- def response(self, code):
- """Return data for response 'code' if received, or None.
+ def response(self, code):
+ """Return data for response 'code' if received, or None.
- Old value for response 'code' is cleared.
+ Old value for response 'code' is cleared.
- (code, [data]) = <instance>.response(code)
- """
- return self._untagged_response(code, [None], string.upper(code))
+ (code, [data]) = <instance>.response(code)
+ """
+ return self._untagged_response(code, [None], string.upper(code))
- def socket(self):
- """Return socket instance used to connect to IMAP4 server.
+ def socket(self):
+ """Return socket instance used to connect to IMAP4 server.
- socket = <instance>.socket()
- """
- return self.sock
+ socket = <instance>.socket()
+ """
+ return self.sock
- # IMAP4 commands
+ # IMAP4 commands
- def append(self, mailbox, flags, date_time, message):
- """Append message to named mailbox.
+ def append(self, mailbox, flags, date_time, message):
+ """Append message to named mailbox.
- (typ, [data]) = <instance>.append(mailbox, flags, date_time, message)
+ (typ, [data]) = <instance>.append(mailbox, flags, date_time, message)
- All args except `message' can be None.
- """
- name = 'APPEND'
- if not mailbox:
- mailbox = 'INBOX'
- if flags:
- if (flags[0],flags[-1]) != ('(',')'):
- flags = '(%s)' % flags
- else:
- flags = None
- if date_time:
- date_time = Time2Internaldate(date_time)
- else:
- date_time = None
- self.literal = message
- return self._simple_command(name, mailbox, flags, date_time)
+ All args except `message' can be None.
+ """
+ name = 'APPEND'
+ if not mailbox:
+ mailbox = 'INBOX'
+ if flags:
+ if (flags[0],flags[-1]) != ('(',')'):
+ flags = '(%s)' % flags
+ else:
+ flags = None
+ if date_time:
+ date_time = Time2Internaldate(date_time)
+ else:
+ date_time = None
+ self.literal = message
+ return self._simple_command(name, mailbox, flags, date_time)
- def authenticate(self, mechanism, authobject):
- """Authenticate command - requires response processing.
+ def authenticate(self, mechanism, authobject):
+ """Authenticate command - requires response processing.
- 'mechanism' specifies which authentication mechanism is to
- be used - it must appear in <instance>.capabilities in the
- form AUTH=<mechanism>.
+ 'mechanism' specifies which authentication mechanism is to
+ be used - it must appear in <instance>.capabilities in the
+ form AUTH=<mechanism>.
- 'authobject' must be a callable object:
+ 'authobject' must be a callable object:
- data = authobject(response)
+ data = authobject(response)
- It will be called to process server continuation responses.
- It should return data that will be encoded and sent to server.
- It should return None if the client abort response '*' should
- be sent instead.
- """
- mech = string.upper(mechanism)
- cap = 'AUTH=%s' % mech
- if not cap in self.capabilities:
- raise self.error("Server doesn't allow %s authentication." % mech)
- self.literal = _Authenticator(authobject).process
- typ, dat = self._simple_command('AUTHENTICATE', mech)
- if typ != 'OK':
- raise self.error(dat[-1])
- self.state = 'AUTH'
- return typ, dat
+ It will be called to process server continuation responses.
+ It should return data that will be encoded and sent to server.
+ It should return None if the client abort response '*' should
+ be sent instead.
+ """
+ mech = string.upper(mechanism)
+ cap = 'AUTH=%s' % mech
+ if not cap in self.capabilities:
+ raise self.error("Server doesn't allow %s authentication." % mech)
+ self.literal = _Authenticator(authobject).process
+ typ, dat = self._simple_command('AUTHENTICATE', mech)
+ if typ != 'OK':
+ raise self.error(dat[-1])
+ self.state = 'AUTH'
+ return typ, dat
- def check(self):
- """Checkpoint mailbox on server.
+ def check(self):
+ """Checkpoint mailbox on server.
- (typ, [data]) = <instance>.check()
- """
- return self._simple_command('CHECK')
+ (typ, [data]) = <instance>.check()
+ """
+ return self._simple_command('CHECK')
- def close(self):
- """Close currently selected mailbox.
+ def close(self):
+ """Close currently selected mailbox.
- Deleted messages are removed from writable mailbox.
- This is the recommended command before 'LOGOUT'.
+ Deleted messages are removed from writable mailbox.
+ This is the recommended command before 'LOGOUT'.
- (typ, [data]) = <instance>.close()
- """
- try:
- typ, dat = self._simple_command('CLOSE')
- finally:
- self.state = 'AUTH'
- return typ, dat
+ (typ, [data]) = <instance>.close()
+ """
+ try:
+ typ, dat = self._simple_command('CLOSE')
+ finally:
+ self.state = 'AUTH'
+ return typ, dat
- def copy(self, message_set, new_mailbox):
- """Copy 'message_set' messages onto end of 'new_mailbox'.
+ def copy(self, message_set, new_mailbox):
+ """Copy 'message_set' messages onto end of 'new_mailbox'.
- (typ, [data]) = <instance>.copy(message_set, new_mailbox)
- """
- return self._simple_command('COPY', message_set, new_mailbox)
+ (typ, [data]) = <instance>.copy(message_set, new_mailbox)
+ """
+ return self._simple_command('COPY', message_set, new_mailbox)
- def create(self, mailbox):
- """Create new mailbox.
+ def create(self, mailbox):
+ """Create new mailbox.
- (typ, [data]) = <instance>.create(mailbox)
- """
- return self._simple_command('CREATE', mailbox)
+ (typ, [data]) = <instance>.create(mailbox)
+ """
+ return self._simple_command('CREATE', mailbox)
- def delete(self, mailbox):
- """Delete old mailbox.
+ def delete(self, mailbox):
+ """Delete old mailbox.
- (typ, [data]) = <instance>.delete(mailbox)
- """
- return self._simple_command('DELETE', mailbox)
+ (typ, [data]) = <instance>.delete(mailbox)
+ """
+ return self._simple_command('DELETE', mailbox)
- def expunge(self):
- """Permanently remove deleted items from selected mailbox.
+ def expunge(self):
+ """Permanently remove deleted items from selected mailbox.
- Generates 'EXPUNGE' response for each deleted message.
+ Generates 'EXPUNGE' response for each deleted message.
- (typ, [data]) = <instance>.expunge()
+ (typ, [data]) = <instance>.expunge()
- 'data' is list of 'EXPUNGE'd message numbers in order received.
- """
- name = 'EXPUNGE'
- typ, dat = self._simple_command(name)
- return self._untagged_response(typ, dat, name)
+ 'data' is list of 'EXPUNGE'd message numbers in order received.
+ """
+ name = 'EXPUNGE'
+ typ, dat = self._simple_command(name)
+ return self._untagged_response(typ, dat, name)
- def fetch(self, message_set, message_parts):
- """Fetch (parts of) messages.
+ def fetch(self, message_set, message_parts):
+ """Fetch (parts of) messages.
- (typ, [data, ...]) = <instance>.fetch(message_set, message_parts)
+ (typ, [data, ...]) = <instance>.fetch(message_set, message_parts)
- 'message_parts' should be a string of selected parts
- enclosed in parentheses, eg: "(UID BODY[TEXT])".
+ 'message_parts' should be a string of selected parts
+ enclosed in parentheses, eg: "(UID BODY[TEXT])".
- 'data' are tuples of message part envelope and data.
- """
- name = 'FETCH'
- typ, dat = self._simple_command(name, message_set, message_parts)
- return self._untagged_response(typ, dat, name)
+ 'data' are tuples of message part envelope and data.
+ """
+ name = 'FETCH'
+ typ, dat = self._simple_command(name, message_set, message_parts)
+ return self._untagged_response(typ, dat, name)
- def list(self, directory='""', pattern='*'):
- """List mailbox names in directory matching pattern.
+ def list(self, directory='""', pattern='*'):
+ """List mailbox names in directory matching pattern.
- (typ, [data]) = <instance>.list(directory='""', pattern='*')
+ (typ, [data]) = <instance>.list(directory='""', pattern='*')
- 'data' is list of LIST responses.
- """
- name = 'LIST'
- typ, dat = self._simple_command(name, directory, pattern)
- return self._untagged_response(typ, dat, name)
+ 'data' is list of LIST responses.
+ """
+ name = 'LIST'
+ typ, dat = self._simple_command(name, directory, pattern)
+ return self._untagged_response(typ, dat, name)
- def login(self, user, password):
- """Identify client using plaintext password.
+ def login(self, user, password):
+ """Identify client using plaintext password.
- (typ, [data]) = <instance>.login(user, password)
+ (typ, [data]) = <instance>.login(user, password)
- NB: 'password' will be quoted.
- """
- #if not 'AUTH=LOGIN' in self.capabilities:
- # raise self.error("Server doesn't allow LOGIN authentication." % mech)
- typ, dat = self._simple_command('LOGIN', user, self._quote(password))
- if typ != 'OK':
- raise self.error(dat[-1])
- self.state = 'AUTH'
- return typ, dat
+ NB: 'password' will be quoted.
+ """
+ #if not 'AUTH=LOGIN' in self.capabilities:
+ # raise self.error("Server doesn't allow LOGIN authentication." % mech)
+ typ, dat = self._simple_command('LOGIN', user, self._quote(password))
+ if typ != 'OK':
+ raise self.error(dat[-1])
+ self.state = 'AUTH'
+ return typ, dat
- def logout(self):
- """Shutdown connection to server.
+ def logout(self):
+ """Shutdown connection to server.
- (typ, [data]) = <instance>.logout()
+ (typ, [data]) = <instance>.logout()
- Returns server 'BYE' response.
- """
- self.state = 'LOGOUT'
- try: typ, dat = self._simple_command('LOGOUT')
- except: typ, dat = 'NO', ['%s: %s' % sys.exc_info()[:2]]
- self.file.close()
- self.sock.close()
- if self.untagged_responses.has_key('BYE'):
- return 'BYE', self.untagged_responses['BYE']
- return typ, dat
+ Returns server 'BYE' response.
+ """
+ self.state = 'LOGOUT'
+ try: typ, dat = self._simple_command('LOGOUT')
+ except: typ, dat = 'NO', ['%s: %s' % sys.exc_info()[:2]]
+ self.file.close()
+ self.sock.close()
+ if self.untagged_responses.has_key('BYE'):
+ return 'BYE', self.untagged_responses['BYE']
+ return typ, dat
- def lsub(self, directory='""', pattern='*'):
- """List 'subscribed' mailbox names in directory matching pattern.
+ def lsub(self, directory='""', pattern='*'):
+ """List 'subscribed' mailbox names in directory matching pattern.
- (typ, [data, ...]) = <instance>.lsub(directory='""', pattern='*')
+ (typ, [data, ...]) = <instance>.lsub(directory='""', pattern='*')
- 'data' are tuples of message part envelope and data.
- """
- name = 'LSUB'
- typ, dat = self._simple_command(name, directory, pattern)
- return self._untagged_response(typ, dat, name)
+ 'data' are tuples of message part envelope and data.
+ """
+ name = 'LSUB'
+ typ, dat = self._simple_command(name, directory, pattern)
+ return self._untagged_response(typ, dat, name)
- def noop(self):
- """Send NOOP command.
+ def noop(self):
+ """Send NOOP command.
- (typ, data) = <instance>.noop()
- """
- if __debug__:
- if self.debug >= 3:
- _dump_ur(self.untagged_responses)
- return self._simple_command('NOOP')
+ (typ, data) = <instance>.noop()
+ """
+ if __debug__:
+ if self.debug >= 3:
+ _dump_ur(self.untagged_responses)
+ return self._simple_command('NOOP')
- def partial(self, message_num, message_part, start, length):
- """Fetch truncated part of a message.
+ def partial(self, message_num, message_part, start, length):
+ """Fetch truncated part of a message.
- (typ, [data, ...]) = <instance>.partial(message_num, message_part, start, length)
+ (typ, [data, ...]) = <instance>.partial(message_num, message_part, start, length)
- 'data' is tuple of message part envelope and data.
- """
- name = 'PARTIAL'
- typ, dat = self._simple_command(name, message_num, message_part, start, length)
- return self._untagged_response(typ, dat, 'FETCH')
+ 'data' is tuple of message part envelope and data.
+ """
+ name = 'PARTIAL'
+ typ, dat = self._simple_command(name, message_num, message_part, start, length)
+ return self._untagged_response(typ, dat, 'FETCH')
- def rename(self, oldmailbox, newmailbox):
- """Rename old mailbox name to new.
+ def rename(self, oldmailbox, newmailbox):
+ """Rename old mailbox name to new.
- (typ, data) = <instance>.rename(oldmailbox, newmailbox)
- """
- return self._simple_command('RENAME', oldmailbox, newmailbox)
+ (typ, data) = <instance>.rename(oldmailbox, newmailbox)
+ """
+ return self._simple_command('RENAME', oldmailbox, newmailbox)
- def search(self, charset, *criteria):
- """Search mailbox for matching messages.
+ def search(self, charset, *criteria):
+ """Search mailbox for matching messages.
- (typ, [data]) = <instance>.search(charset, criterium, ...)
+ (typ, [data]) = <instance>.search(charset, criterium, ...)
- 'data' is space separated list of matching message numbers.
- """
- name = 'SEARCH'
- if charset:
- charset = 'CHARSET ' + charset
- typ, dat = apply(self._simple_command, (name, charset) + criteria)
- return self._untagged_response(typ, dat, name)
+ 'data' is space separated list of matching message numbers.
+ """
+ name = 'SEARCH'
+ if charset:
+ charset = 'CHARSET ' + charset
+ typ, dat = apply(self._simple_command, (name, charset) + criteria)
+ return self._untagged_response(typ, dat, name)
- def select(self, mailbox='INBOX', readonly=None):
- """Select a mailbox.
+ def select(self, mailbox='INBOX', readonly=None):
+ """Select a mailbox.
- Flush all untagged responses.
+ Flush all untagged responses.
- (typ, [data]) = <instance>.select(mailbox='INBOX', readonly=None)
+ (typ, [data]) = <instance>.select(mailbox='INBOX', readonly=None)
- 'data' is count of messages in mailbox ('EXISTS' response).
- """
- # Mandated responses are ('FLAGS', 'EXISTS', 'RECENT', 'UIDVALIDITY')
- self.untagged_responses = {} # Flush old responses.
- self.is_readonly = readonly
- if readonly:
- name = 'EXAMINE'
- else:
- name = 'SELECT'
- typ, dat = self._simple_command(name, mailbox)
- if typ != 'OK':
- self.state = 'AUTH' # Might have been 'SELECTED'
- return typ, dat
- self.state = 'SELECTED'
- if self.untagged_responses.has_key('READ-ONLY') \
- and not readonly:
- if __debug__:
- if self.debug >= 1:
- _dump_ur(self.untagged_responses)
- raise self.readonly('%s is not writable' % mailbox)
- return typ, self.untagged_responses.get('EXISTS', [None])
+ 'data' is count of messages in mailbox ('EXISTS' response).
+ """
+ # Mandated responses are ('FLAGS', 'EXISTS', 'RECENT', 'UIDVALIDITY')
+ self.untagged_responses = {} # Flush old responses.
+ self.is_readonly = readonly
+ if readonly:
+ name = 'EXAMINE'
+ else:
+ name = 'SELECT'
+ typ, dat = self._simple_command(name, mailbox)
+ if typ != 'OK':
+ self.state = 'AUTH' # Might have been 'SELECTED'
+ return typ, dat
+ self.state = 'SELECTED'
+ if self.untagged_responses.has_key('READ-ONLY') \
+ and not readonly:
+ if __debug__:
+ if self.debug >= 1:
+ _dump_ur(self.untagged_responses)
+ raise self.readonly('%s is not writable' % mailbox)
+ return typ, self.untagged_responses.get('EXISTS', [None])
- def status(self, mailbox, names):
- """Request named status conditions for mailbox.
+ def status(self, mailbox, names):
+ """Request named status conditions for mailbox.
- (typ, [data]) = <instance>.status(mailbox, names)
- """
- name = 'STATUS'
- if self.PROTOCOL_VERSION == 'IMAP4':
- raise self.error('%s unimplemented in IMAP4 (obtain IMAP4rev1 server, or re-code)' % name)
- typ, dat = self._simple_command(name, mailbox, names)
- return self._untagged_response(typ, dat, name)
+ (typ, [data]) = <instance>.status(mailbox, names)
+ """
+ name = 'STATUS'
+ if self.PROTOCOL_VERSION == 'IMAP4':
+ raise self.error('%s unimplemented in IMAP4 (obtain IMAP4rev1 server, or re-code)' % name)
+ typ, dat = self._simple_command(name, mailbox, names)
+ return self._untagged_response(typ, dat, name)
- def store(self, message_set, command, flags):
- """Alters flag dispositions for messages in mailbox.
+ def store(self, message_set, command, flags):
+ """Alters flag dispositions for messages in mailbox.
- (typ, [data]) = <instance>.store(message_set, command, flags)
- """
- if (flags[0],flags[-1]) != ('(',')'):
- flags = '(%s)' % flags # Avoid quoting the flags
- typ, dat = self._simple_command('STORE', message_set, command, flags)
- return self._untagged_response(typ, dat, 'FETCH')
+ (typ, [data]) = <instance>.store(message_set, command, flags)
+ """
+ if (flags[0],flags[-1]) != ('(',')'):
+ flags = '(%s)' % flags # Avoid quoting the flags
+ typ, dat = self._simple_command('STORE', message_set, command, flags)
+ return self._untagged_response(typ, dat, 'FETCH')
- def subscribe(self, mailbox):
- """Subscribe to new mailbox.
+ def subscribe(self, mailbox):
+ """Subscribe to new mailbox.
- (typ, [data]) = <instance>.subscribe(mailbox)
- """
- return self._simple_command('SUBSCRIBE', mailbox)
+ (typ, [data]) = <instance>.subscribe(mailbox)
+ """
+ return self._simple_command('SUBSCRIBE', mailbox)
- def uid(self, command, *args):
- """Execute "command arg ..." with messages identified by UID,
- rather than message number.
+ def uid(self, command, *args):
+ """Execute "command arg ..." with messages identified by UID,
+ rather than message number.
- (typ, [data]) = <instance>.uid(command, arg1, arg2, ...)
+ (typ, [data]) = <instance>.uid(command, arg1, arg2, ...)
- Returns response appropriate to 'command'.
- """
- command = string.upper(command)
- if not Commands.has_key(command):
- raise self.error("Unknown IMAP4 UID command: %s" % command)
- if self.state not in Commands[command]:
- raise self.error('command %s illegal in state %s'
- % (command, self.state))
- name = 'UID'
- typ, dat = apply(self._simple_command, (name, command) + args)
- if command == 'SEARCH':
- name = 'SEARCH'
- else:
- name = 'FETCH'
- return self._untagged_response(typ, dat, name)
+ Returns response appropriate to 'command'.
+ """
+ command = string.upper(command)
+ if not Commands.has_key(command):
+ raise self.error("Unknown IMAP4 UID command: %s" % command)
+ if self.state not in Commands[command]:
+ raise self.error('command %s illegal in state %s'
+ % (command, self.state))
+ name = 'UID'
+ typ, dat = apply(self._simple_command, (name, command) + args)
+ if command == 'SEARCH':
+ name = 'SEARCH'
+ else:
+ name = 'FETCH'
+ return self._untagged_response(typ, dat, name)
- def unsubscribe(self, mailbox):
- """Unsubscribe from old mailbox.
+ def unsubscribe(self, mailbox):
+ """Unsubscribe from old mailbox.
- (typ, [data]) = <instance>.unsubscribe(mailbox)
- """
- return self._simple_command('UNSUBSCRIBE', mailbox)
-
-
- def xatom(self, name, *args):
- """Allow simple extension commands
- notified by server in CAPABILITY response.
+ (typ, [data]) = <instance>.unsubscribe(mailbox)
+ """
+ return self._simple_command('UNSUBSCRIBE', mailbox)
+
+
+ def xatom(self, name, *args):
+ """Allow simple extension commands
+ notified by server in CAPABILITY response.
- (typ, [data]) = <instance>.xatom(name, arg, ...)
- """
- if name[0] != 'X' or not name in self.capabilities:
- raise self.error('unknown extension command: %s' % name)
- return apply(self._simple_command, (name,) + args)
+ (typ, [data]) = <instance>.xatom(name, arg, ...)
+ """
+ if name[0] != 'X' or not name in self.capabilities:
+ raise self.error('unknown extension command: %s' % name)
+ return apply(self._simple_command, (name,) + args)
-
- # Private methods
-
-
- def _append_untagged(self, typ, dat):
-
- if dat is None: dat = ''
- ur = self.untagged_responses
- if __debug__:
- if self.debug >= 5:
- _mesg('untagged_responses[%s] %s += ["%s"]' %
- (typ, len(ur.get(typ,'')), dat))
- if ur.has_key(typ):
- ur[typ].append(dat)
- else:
- ur[typ] = [dat]
+
+ # Private methods
+
+
+ def _append_untagged(self, typ, dat):
+
+ if dat is None: dat = ''
+ ur = self.untagged_responses
+ if __debug__:
+ if self.debug >= 5:
+ _mesg('untagged_responses[%s] %s += ["%s"]' %
+ (typ, len(ur.get(typ,'')), dat))
+ if ur.has_key(typ):
+ ur[typ].append(dat)
+ else:
+ ur[typ] = [dat]
- def _check_bye(self):
- bye = self.untagged_responses.get('BYE')
- if bye:
- raise self.abort(bye[-1])
+ def _check_bye(self):
+ bye = self.untagged_responses.get('BYE')
+ if bye:
+ raise self.abort(bye[-1])
- def _command(self, name, *args):
+ def _command(self, name, *args):
- if self.state not in Commands[name]:
- self.literal = None
- raise self.error(
- 'command %s illegal in state %s' % (name, self.state))
+ if self.state not in Commands[name]:
+ self.literal = None
+ raise self.error(
+ 'command %s illegal in state %s' % (name, self.state))
- for typ in ('OK', 'NO', 'BAD'):
- if self.untagged_responses.has_key(typ):
- del self.untagged_responses[typ]
+ for typ in ('OK', 'NO', 'BAD'):
+ if self.untagged_responses.has_key(typ):
+ del self.untagged_responses[typ]
- if self.untagged_responses.has_key('READ-ONLY') \
- and not self.is_readonly:
- raise self.readonly('mailbox status changed to READ-ONLY')
+ if self.untagged_responses.has_key('READ-ONLY') \
+ and not self.is_readonly:
+ raise self.readonly('mailbox status changed to READ-ONLY')
- tag = self._new_tag()
- data = '%s %s' % (tag, name)
- for arg in args:
- if arg is None: continue
- data = '%s %s' % (data, self._checkquote(arg))
+ tag = self._new_tag()
+ data = '%s %s' % (tag, name)
+ for arg in args:
+ if arg is None: continue
+ data = '%s %s' % (data, self._checkquote(arg))
- literal = self.literal
- if literal is not None:
- self.literal = None
- if type(literal) is type(self._command):
- literator = literal
- else:
- literator = None
- data = '%s {%s}' % (data, len(literal))
+ literal = self.literal
+ if literal is not None:
+ self.literal = None
+ if type(literal) is type(self._command):
+ literator = literal
+ else:
+ literator = None
+ data = '%s {%s}' % (data, len(literal))
- if __debug__:
- if self.debug >= 4:
- _mesg('> %s' % data)
- else:
- _log('> %s' % data)
+ if __debug__:
+ if self.debug >= 4:
+ _mesg('> %s' % data)
+ else:
+ _log('> %s' % data)
- try:
- self.sock.send('%s%s' % (data, CRLF))
- except socket.error, val:
- raise self.abort('socket error: %s' % val)
+ try:
+ self.sock.send('%s%s' % (data, CRLF))
+ except socket.error, val:
+ raise self.abort('socket error: %s' % val)
- if literal is None:
- return tag
+ if literal is None:
+ return tag
- while 1:
- # Wait for continuation response
+ while 1:
+ # Wait for continuation response
- while self._get_response():
- if self.tagged_commands[tag]: # BAD/NO?
- return tag
+ while self._get_response():
+ if self.tagged_commands[tag]: # BAD/NO?
+ return tag
- # Send literal
+ # Send literal
- if literator:
- literal = literator(self.continuation_response)
+ if literator:
+ literal = literator(self.continuation_response)
- if __debug__:
- if self.debug >= 4:
- _mesg('write literal size %s' % len(literal))
+ if __debug__:
+ if self.debug >= 4:
+ _mesg('write literal size %s' % len(literal))
- try:
- self.sock.send(literal)
- self.sock.send(CRLF)
- except socket.error, val:
- raise self.abort('socket error: %s' % val)
+ try:
+ self.sock.send(literal)
+ self.sock.send(CRLF)
+ except socket.error, val:
+ raise self.abort('socket error: %s' % val)
- if not literator:
- break
+ if not literator:
+ break
- return tag
+ return tag
- def _command_complete(self, name, tag):
- self._check_bye()
- try:
- typ, data = self._get_tagged_response(tag)
- except self.abort, val:
- raise self.abort('command: %s => %s' % (name, val))
- except self.error, val:
- raise self.error('command: %s => %s' % (name, val))
- self._check_bye()
- if typ == 'BAD':
- raise self.error('%s command error: %s %s' % (name, typ, data))
- return typ, data
+ def _command_complete(self, name, tag):
+ self._check_bye()
+ try:
+ typ, data = self._get_tagged_response(tag)
+ except self.abort, val:
+ raise self.abort('command: %s => %s' % (name, val))
+ except self.error, val:
+ raise self.error('command: %s => %s' % (name, val))
+ self._check_bye()
+ if typ == 'BAD':
+ raise self.error('%s command error: %s %s' % (name, typ, data))
+ return typ, data
- def _get_response(self):
+ def _get_response(self):
- # Read response and store.
- #
- # Returns None for continuation responses,
- # otherwise first response line received.
+ # Read response and store.
+ #
+ # Returns None for continuation responses,
+ # otherwise first response line received.
- resp = self._get_line()
+ resp = self._get_line()
- # Command completion response?
+ # Command completion response?
- if self._match(self.tagre, resp):
- tag = self.mo.group('tag')
- if not self.tagged_commands.has_key(tag):
- raise self.abort('unexpected tagged response: %s' % resp)
+ if self._match(self.tagre, resp):
+ tag = self.mo.group('tag')
+ if not self.tagged_commands.has_key(tag):
+ raise self.abort('unexpected tagged response: %s' % resp)
- typ = self.mo.group('type')
- dat = self.mo.group('data')
- self.tagged_commands[tag] = (typ, [dat])
- else:
- dat2 = None
+ typ = self.mo.group('type')
+ dat = self.mo.group('data')
+ self.tagged_commands[tag] = (typ, [dat])
+ else:
+ dat2 = None
- # '*' (untagged) responses?
+ # '*' (untagged) responses?
- if not self._match(Untagged_response, resp):
- if self._match(Untagged_status, resp):
- dat2 = self.mo.group('data2')
+ if not self._match(Untagged_response, resp):
+ if self._match(Untagged_status, resp):
+ dat2 = self.mo.group('data2')
- if self.mo is None:
- # Only other possibility is '+' (continuation) response...
+ if self.mo is None:
+ # Only other possibility is '+' (continuation) response...
- if self._match(Continuation, resp):
- self.continuation_response = self.mo.group('data')
- return None # NB: indicates continuation
+ if self._match(Continuation, resp):
+ self.continuation_response = self.mo.group('data')
+ return None # NB: indicates continuation
- raise self.abort("unexpected response: '%s'" % resp)
+ raise self.abort("unexpected response: '%s'" % resp)
- typ = self.mo.group('type')
- dat = self.mo.group('data')
- if dat is None: dat = '' # Null untagged response
- if dat2: dat = dat + ' ' + dat2
+ typ = self.mo.group('type')
+ dat = self.mo.group('data')
+ if dat is None: dat = '' # Null untagged response
+ if dat2: dat = dat + ' ' + dat2
- # Is there a literal to come?
+ # Is there a literal to come?
- while self._match(Literal, dat):
+ while self._match(Literal, dat):
- # Read literal direct from connection.
+ # Read literal direct from connection.
- size = string.atoi(self.mo.group('size'))
- if __debug__:
- if self.debug >= 4:
- _mesg('read literal size %s' % size)
- data = self.file.read(size)
+ size = string.atoi(self.mo.group('size'))
+ if __debug__:
+ if self.debug >= 4:
+ _mesg('read literal size %s' % size)
+ data = self.file.read(size)
- # Store response with literal as tuple
+ # Store response with literal as tuple
- self._append_untagged(typ, (dat, data))
+ self._append_untagged(typ, (dat, data))
- # Read trailer - possibly containing another literal
+ # Read trailer - possibly containing another literal
- dat = self._get_line()
+ dat = self._get_line()
- self._append_untagged(typ, dat)
+ self._append_untagged(typ, dat)
- # Bracketed response information?
+ # Bracketed response information?
- if typ in ('OK', 'NO', 'BAD') and self._match(Response_code, dat):
- self._append_untagged(self.mo.group('type'), self.mo.group('data'))
+ if typ in ('OK', 'NO', 'BAD') and self._match(Response_code, dat):
+ self._append_untagged(self.mo.group('type'), self.mo.group('data'))
- if __debug__:
- if self.debug >= 1 and typ in ('NO', 'BAD', 'BYE'):
- _mesg('%s response: %s' % (typ, dat))
+ if __debug__:
+ if self.debug >= 1 and typ in ('NO', 'BAD', 'BYE'):
+ _mesg('%s response: %s' % (typ, dat))
- return resp
+ return resp
- def _get_tagged_response(self, tag):
+ def _get_tagged_response(self, tag):
- while 1:
- result = self.tagged_commands[tag]
- if result is not None:
- del self.tagged_commands[tag]
- return result
+ while 1:
+ result = self.tagged_commands[tag]
+ if result is not None:
+ del self.tagged_commands[tag]
+ return result
- # Some have reported "unexpected response" exceptions.
- # Note that ignoring them here causes loops.
- # Instead, send me details of the unexpected response and
- # I'll update the code in `_get_response()'.
+ # Some have reported "unexpected response" exceptions.
+ # Note that ignoring them here causes loops.
+ # Instead, send me details of the unexpected response and
+ # I'll update the code in `_get_response()'.
- try:
- self._get_response()
- except self.abort, val:
- if __debug__:
- if self.debug >= 1:
- print_log()
- raise
+ try:
+ self._get_response()
+ except self.abort, val:
+ if __debug__:
+ if self.debug >= 1:
+ print_log()
+ raise
- def _get_line(self):
+ def _get_line(self):
- line = self.file.readline()
- if not line:
- raise self.abort('socket error: EOF')
+ line = self.file.readline()
+ if not line:
+ raise self.abort('socket error: EOF')
- # Protocol mandates all lines terminated by CRLF
+ # Protocol mandates all lines terminated by CRLF
- line = line[:-2]
- if __debug__:
- if self.debug >= 4:
- _mesg('< %s' % line)
- else:
- _log('< %s' % line)
- return line
+ line = line[:-2]
+ if __debug__:
+ if self.debug >= 4:
+ _mesg('< %s' % line)
+ else:
+ _log('< %s' % line)
+ return line
- def _match(self, cre, s):
+ def _match(self, cre, s):
- # Run compiled regular expression match method on 's'.
- # Save result, return success.
+ # Run compiled regular expression match method on 's'.
+ # Save result, return success.
- self.mo = cre.match(s)
- if __debug__:
- if self.mo is not None and self.debug >= 5:
- _mesg("\tmatched r'%s' => %s" % (cre.pattern, `self.mo.groups()`))
- return self.mo is not None
+ self.mo = cre.match(s)
+ if __debug__:
+ if self.mo is not None and self.debug >= 5:
+ _mesg("\tmatched r'%s' => %s" % (cre.pattern, `self.mo.groups()`))
+ return self.mo is not None
- def _new_tag(self):
+ def _new_tag(self):
- tag = '%s%s' % (self.tagpre, self.tagnum)
- self.tagnum = self.tagnum + 1
- self.tagged_commands[tag] = None
- return tag
+ tag = '%s%s' % (self.tagpre, self.tagnum)
+ self.tagnum = self.tagnum + 1
+ self.tagged_commands[tag] = None
+ return tag
- def _checkquote(self, arg):
+ def _checkquote(self, arg):
- # Must quote command args if non-alphanumeric chars present,
- # and not already quoted.
+ # Must quote command args if non-alphanumeric chars present,
+ # and not already quoted.
- if type(arg) is not type(''):
- return arg
- if (arg[0],arg[-1]) in (('(',')'),('"','"')):
- return arg
- if self.mustquote.search(arg) is None:
- return arg
- return self._quote(arg)
+ if type(arg) is not type(''):
+ return arg
+ if (arg[0],arg[-1]) in (('(',')'),('"','"')):
+ return arg
+ if self.mustquote.search(arg) is None:
+ return arg
+ return self._quote(arg)
- def _quote(self, arg):
+ def _quote(self, arg):
- arg = string.replace(arg, '\\', '\\\\')
- arg = string.replace(arg, '"', '\\"')
+ arg = string.replace(arg, '\\', '\\\\')
+ arg = string.replace(arg, '"', '\\"')
- return '"%s"' % arg
+ return '"%s"' % arg
- def _simple_command(self, name, *args):
+ def _simple_command(self, name, *args):
- return self._command_complete(name, apply(self._command, (name,) + args))
+ return self._command_complete(name, apply(self._command, (name,) + args))
- def _untagged_response(self, typ, dat, name):
+ def _untagged_response(self, typ, dat, name):
- if typ == 'NO':
- return typ, dat
- if not self.untagged_responses.has_key(name):
- return typ, [None]
- data = self.untagged_responses[name]
- if __debug__:
- if self.debug >= 5:
- _mesg('untagged_responses[%s] => %s' % (name, data))
- del self.untagged_responses[name]
- return typ, data
+ if typ == 'NO':
+ return typ, dat
+ if not self.untagged_responses.has_key(name):
+ return typ, [None]
+ data = self.untagged_responses[name]
+ if __debug__:
+ if self.debug >= 5:
+ _mesg('untagged_responses[%s] => %s' % (name, data))
+ del self.untagged_responses[name]
+ return typ, data
class _Authenticator:
- """Private class to provide en/decoding
- for base64-based authentication conversation.
- """
-
- def __init__(self, mechinst):
- self.mech = mechinst # Callable object to provide/process data
-
- def process(self, data):
- ret = self.mech(self.decode(data))
- if ret is None:
- return '*' # Abort conversation
- return self.encode(ret)
-
- def encode(self, inp):
- #
- # Invoke binascii.b2a_base64 iteratively with
- # short even length buffers, strip the trailing
- # line feed from the result and append. "Even"
- # means a number that factors to both 6 and 8,
- # so when it gets to the end of the 8-bit input
- # there's no partial 6-bit output.
- #
- oup = ''
- while inp:
- if len(inp) > 48:
- t = inp[:48]
- inp = inp[48:]
- else:
- t = inp
- inp = ''
- e = binascii.b2a_base64(t)
- if e:
- oup = oup + e[:-1]
- return oup
-
- def decode(self, inp):
- if not inp:
- return ''
- return binascii.a2b_base64(inp)
-
+ """Private class to provide en/decoding
+ for base64-based authentication conversation.
+ """
+
+ def __init__(self, mechinst):
+ self.mech = mechinst # Callable object to provide/process data
+
+ def process(self, data):
+ ret = self.mech(self.decode(data))
+ if ret is None:
+ return '*' # Abort conversation
+ return self.encode(ret)
+
+ def encode(self, inp):
+ #
+ # Invoke binascii.b2a_base64 iteratively with
+ # short even length buffers, strip the trailing
+ # line feed from the result and append. "Even"
+ # means a number that factors to both 6 and 8,
+ # so when it gets to the end of the 8-bit input
+ # there's no partial 6-bit output.
+ #
+ oup = ''
+ while inp:
+ if len(inp) > 48:
+ t = inp[:48]
+ inp = inp[48:]
+ else:
+ t = inp
+ inp = ''
+ e = binascii.b2a_base64(t)
+ if e:
+ oup = oup + e[:-1]
+ return oup
+
+ def decode(self, inp):
+ if not inp:
+ return ''
+ return binascii.a2b_base64(inp)
+
Mon2num = {'Jan': 1, 'Feb': 2, 'Mar': 3, 'Apr': 4, 'May': 5, 'Jun': 6,
- 'Jul': 7, 'Aug': 8, 'Sep': 9, 'Oct': 10, 'Nov': 11, 'Dec': 12}
+ 'Jul': 7, 'Aug': 8, 'Sep': 9, 'Oct': 10, 'Nov': 11, 'Dec': 12}
def Internaldate2tuple(resp):
- """Convert IMAP4 INTERNALDATE to UT.
+ """Convert IMAP4 INTERNALDATE to UT.
- Returns Python time module tuple.
- """
+ Returns Python time module tuple.
+ """
- mo = InternalDate.match(resp)
- if not mo:
- return None
+ mo = InternalDate.match(resp)
+ if not mo:
+ return None
- mon = Mon2num[mo.group('mon')]
- zonen = mo.group('zonen')
+ mon = Mon2num[mo.group('mon')]
+ zonen = mo.group('zonen')
- for name in ('day', 'year', 'hour', 'min', 'sec', 'zoneh', 'zonem'):
- exec "%s = string.atoi(mo.group('%s'))" % (name, name)
+ for name in ('day', 'year', 'hour', 'min', 'sec', 'zoneh', 'zonem'):
+ exec "%s = string.atoi(mo.group('%s'))" % (name, name)
- # INTERNALDATE timezone must be subtracted to get UT
+ # INTERNALDATE timezone must be subtracted to get UT
- zone = (zoneh*60 + zonem)*60
- if zonen == '-':
- zone = -zone
+ zone = (zoneh*60 + zonem)*60
+ if zonen == '-':
+ zone = -zone
- tt = (year, mon, day, hour, min, sec, -1, -1, -1)
+ tt = (year, mon, day, hour, min, sec, -1, -1, -1)
- utc = time.mktime(tt)
+ utc = time.mktime(tt)
- # Following is necessary because the time module has no 'mkgmtime'.
- # 'mktime' assumes arg in local timezone, so adds timezone/altzone.
+ # Following is necessary because the time module has no 'mkgmtime'.
+ # 'mktime' assumes arg in local timezone, so adds timezone/altzone.
- lt = time.localtime(utc)
- if time.daylight and lt[-1]:
- zone = zone + time.altzone
- else:
- zone = zone + time.timezone
+ lt = time.localtime(utc)
+ if time.daylight and lt[-1]:
+ zone = zone + time.altzone
+ else:
+ zone = zone + time.timezone
- return time.localtime(utc - zone)
+ return time.localtime(utc - zone)
def Int2AP(num):
- """Convert integer to A-P string representation."""
+ """Convert integer to A-P string representation."""
- val = ''; AP = 'ABCDEFGHIJKLMNOP'
- num = int(abs(num))
- while num:
- num, mod = divmod(num, 16)
- val = AP[mod] + val
- return val
+ val = ''; AP = 'ABCDEFGHIJKLMNOP'
+ num = int(abs(num))
+ while num:
+ num, mod = divmod(num, 16)
+ val = AP[mod] + val
+ return val
def ParseFlags(resp):
- """Convert IMAP4 flags response to python tuple."""
+ """Convert IMAP4 flags response to python tuple."""
- mo = Flags.match(resp)
- if not mo:
- return ()
+ mo = Flags.match(resp)
+ if not mo:
+ return ()
- return tuple(string.split(mo.group('flags')))
+ return tuple(string.split(mo.group('flags')))
def Time2Internaldate(date_time):
- """Convert 'date_time' to IMAP4 INTERNALDATE representation.
+ """Convert 'date_time' to IMAP4 INTERNALDATE representation.
- Return string in form: '"DD-Mmm-YYYY HH:MM:SS +HHMM"'
- """
+ Return string in form: '"DD-Mmm-YYYY HH:MM:SS +HHMM"'
+ """
- dttype = type(date_time)
- if dttype is type(1) or dttype is type(1.1):
- tt = time.localtime(date_time)
- elif dttype is type(()):
- tt = date_time
- elif dttype is type(""):
- return date_time # Assume in correct format
- else: raise ValueError
+ dttype = type(date_time)
+ if dttype is type(1) or dttype is type(1.1):
+ tt = time.localtime(date_time)
+ elif dttype is type(()):
+ tt = date_time
+ elif dttype is type(""):
+ return date_time # Assume in correct format
+ else: raise ValueError
- dt = time.strftime("%d-%b-%Y %H:%M:%S", tt)
- if dt[0] == '0':
- dt = ' ' + dt[1:]
- if time.daylight and tt[-1]:
- zone = -time.altzone
- else:
- zone = -time.timezone
- return '"' + dt + " %+02d%02d" % divmod(zone/60, 60) + '"'
+ dt = time.strftime("%d-%b-%Y %H:%M:%S", tt)
+ if dt[0] == '0':
+ dt = ' ' + dt[1:]
+ if time.daylight and tt[-1]:
+ zone = -time.altzone
+ else:
+ zone = -time.timezone
+ return '"' + dt + " %+02d%02d" % divmod(zone/60, 60) + '"'
if __debug__:
- def _mesg(s, secs=None):
- if secs is None:
- secs = time.time()
- tm = time.strftime('%M:%S', time.localtime(secs))
- sys.stderr.write(' %s.%02d %s\n' % (tm, (secs*100)%100, s))
- sys.stderr.flush()
+ def _mesg(s, secs=None):
+ if secs is None:
+ secs = time.time()
+ tm = time.strftime('%M:%S', time.localtime(secs))
+ sys.stderr.write(' %s.%02d %s\n' % (tm, (secs*100)%100, s))
+ sys.stderr.flush()
- def _dump_ur(dict):
- # Dump untagged responses (in `dict').
- l = dict.items()
- if not l: return
- t = '\n\t\t'
- j = string.join
- l = map(lambda x,j=j:'%s: "%s"' % (x[0], x[1][0] and j(x[1], '" "') or ''), l)
- _mesg('untagged responses dump:%s%s' % (t, j(l, t)))
+ def _dump_ur(dict):
+ # Dump untagged responses (in `dict').
+ l = dict.items()
+ if not l: return
+ t = '\n\t\t'
+ j = string.join
+ l = map(lambda x,j=j:'%s: "%s"' % (x[0], x[1][0] and j(x[1], '" "') or ''), l)
+ _mesg('untagged responses dump:%s%s' % (t, j(l, t)))
- _cmd_log = [] # Last `_cmd_log_len' interactions
- _cmd_log_len = 10
+ _cmd_log = [] # Last `_cmd_log_len' interactions
+ _cmd_log_len = 10
- def _log(line):
- # Keep log of last `_cmd_log_len' interactions for debugging.
- if len(_cmd_log) == _cmd_log_len:
- del _cmd_log[0]
- _cmd_log.append((time.time(), line))
+ def _log(line):
+ # Keep log of last `_cmd_log_len' interactions for debugging.
+ if len(_cmd_log) == _cmd_log_len:
+ del _cmd_log[0]
+ _cmd_log.append((time.time(), line))
- def print_log():
- _mesg('last %d IMAP4 interactions:' % len(_cmd_log))
- for secs,line in _cmd_log:
- _mesg(line, secs)
+ def print_log():
+ _mesg('last %d IMAP4 interactions:' % len(_cmd_log))
+ for secs,line in _cmd_log:
+ _mesg(line, secs)
if __name__ == '__main__':
- import getopt, getpass, sys
-
- try:
- optlist, args = getopt.getopt(sys.argv[1:], 'd:')
- except getopt.error, val:
- pass
-
- for opt,val in optlist:
- if opt == '-d':
- Debug = int(val)
-
- if not args: args = ('',)
-
- host = args[0]
-
- USER = getpass.getuser()
- PASSWD = getpass.getpass("IMAP password for %s on %s" % (USER, host or "localhost"))
-
- test_mesg = 'From: %s@localhost\nSubject: IMAP4 test\n\ndata...\n' % USER
- test_seq1 = (
- ('login', (USER, PASSWD)),
- ('create', ('/tmp/xxx 1',)),
- ('rename', ('/tmp/xxx 1', '/tmp/yyy')),
- ('CREATE', ('/tmp/yyz 2',)),
- ('append', ('/tmp/yyz 2', None, None, test_mesg)),
- ('list', ('/tmp', 'yy*')),
- ('select', ('/tmp/yyz 2',)),
- ('search', (None, 'SUBJECT', 'test')),
- ('partial', ('1', 'RFC822', 1, 1024)),
- ('store', ('1', 'FLAGS', '(\Deleted)')),
- ('expunge', ()),
- ('recent', ()),
- ('close', ()),
- )
-
- test_seq2 = (
- ('select', ()),
- ('response',('UIDVALIDITY',)),
- ('uid', ('SEARCH', 'ALL')),
- ('response', ('EXISTS',)),
- ('append', (None, None, None, test_mesg)),
- ('recent', ()),
- ('logout', ()),
- )
-
- def run(cmd, args):
- _mesg('%s %s' % (cmd, args))
- typ, dat = apply(eval('M.%s' % cmd), args)
- _mesg('%s => %s %s' % (cmd, typ, dat))
- return dat
-
- try:
- M = IMAP4(host)
- _mesg('PROTOCOL_VERSION = %s' % M.PROTOCOL_VERSION)
-
- for cmd,args in test_seq1:
- run(cmd, args)
-
- for ml in run('list', ('/tmp/', 'yy%')):
- mo = re.match(r'.*"([^"]+)"$', ml)
- if mo: path = mo.group(1)
- else: path = string.split(ml)[-1]
- run('delete', (path,))
-
- for cmd,args in test_seq2:
- dat = run(cmd, args)
-
- if (cmd,args) != ('uid', ('SEARCH', 'ALL')):
- continue
-
- uid = string.split(dat[-1])
- if not uid: continue
- run('uid', ('FETCH', '%s' % uid[-1],
- '(FLAGS INTERNALDATE RFC822.SIZE RFC822.HEADER RFC822.TEXT)'))
-
- print '\nAll tests OK.'
-
- except:
- print '\nTests failed.'
-
- if not Debug:
- print '''
+ import getopt, getpass, sys
+
+ try:
+ optlist, args = getopt.getopt(sys.argv[1:], 'd:')
+ except getopt.error, val:
+ pass
+
+ for opt,val in optlist:
+ if opt == '-d':
+ Debug = int(val)
+
+ if not args: args = ('',)
+
+ host = args[0]
+
+ USER = getpass.getuser()
+ PASSWD = getpass.getpass("IMAP password for %s on %s" % (USER, host or "localhost"))
+
+ test_mesg = 'From: %s@localhost\nSubject: IMAP4 test\n\ndata...\n' % USER
+ test_seq1 = (
+ ('login', (USER, PASSWD)),
+ ('create', ('/tmp/xxx 1',)),
+ ('rename', ('/tmp/xxx 1', '/tmp/yyy')),
+ ('CREATE', ('/tmp/yyz 2',)),
+ ('append', ('/tmp/yyz 2', None, None, test_mesg)),
+ ('list', ('/tmp', 'yy*')),
+ ('select', ('/tmp/yyz 2',)),
+ ('search', (None, 'SUBJECT', 'test')),
+ ('partial', ('1', 'RFC822', 1, 1024)),
+ ('store', ('1', 'FLAGS', '(\Deleted)')),
+ ('expunge', ()),
+ ('recent', ()),
+ ('close', ()),
+ )
+
+ test_seq2 = (
+ ('select', ()),
+ ('response',('UIDVALIDITY',)),
+ ('uid', ('SEARCH', 'ALL')),
+ ('response', ('EXISTS',)),
+ ('append', (None, None, None, test_mesg)),
+ ('recent', ()),
+ ('logout', ()),
+ )
+
+ def run(cmd, args):
+ _mesg('%s %s' % (cmd, args))
+ typ, dat = apply(eval('M.%s' % cmd), args)
+ _mesg('%s => %s %s' % (cmd, typ, dat))
+ return dat
+
+ try:
+ M = IMAP4(host)
+ _mesg('PROTOCOL_VERSION = %s' % M.PROTOCOL_VERSION)
+
+ for cmd,args in test_seq1:
+ run(cmd, args)
+
+ for ml in run('list', ('/tmp/', 'yy%')):
+ mo = re.match(r'.*"([^"]+)"$', ml)
+ if mo: path = mo.group(1)
+ else: path = string.split(ml)[-1]
+ run('delete', (path,))
+
+ for cmd,args in test_seq2:
+ dat = run(cmd, args)
+
+ if (cmd,args) != ('uid', ('SEARCH', 'ALL')):
+ continue
+
+ uid = string.split(dat[-1])
+ if not uid: continue
+ run('uid', ('FETCH', '%s' % uid[-1],
+ '(FLAGS INTERNALDATE RFC822.SIZE RFC822.HEADER RFC822.TEXT)'))
+
+ print '\nAll tests OK.'
+
+ except:
+ print '\nTests failed.'
+
+ if not Debug:
+ print '''
If you would like to see debugging output,
try: %s -d5
''' % sys.argv[0]
- raise
+ raise