summaryrefslogtreecommitdiffstats
path: root/Lib
diff options
context:
space:
mode:
authorAntoine Pitrou <solipsis@pitrou.net>2010-09-29 15:03:40 (GMT)
committerAntoine Pitrou <solipsis@pitrou.net>2010-09-29 15:03:40 (GMT)
commit69ab95105f5105b8337c757346c899d6c7e7d9bb (patch)
treeae499f9e4041d0d244368932a1d4b4c0b6cd7a49 /Lib
parent926f0da582a01f5c03bcc05919f5dbb6da37c01a (diff)
downloadcpython-69ab95105f5105b8337c757346c899d6c7e7d9bb.zip
cpython-69ab95105f5105b8337c757346c899d6c7e7d9bb.tar.gz
cpython-69ab95105f5105b8337c757346c899d6c7e7d9bb.tar.bz2
Issue #9360: Cleanup and improvements to the nntplib module. The API
now conforms to the philosophy of bytes and unicode separation in Python 3. A test suite has also been added.
Diffstat (limited to 'Lib')
-rw-r--r--Lib/nntplib.py946
-rw-r--r--Lib/test/test_nntplib.py1091
2 files changed, 1747 insertions, 290 deletions
diff --git a/Lib/nntplib.py b/Lib/nntplib.py
index 6ac2fcb..1055d06 100644
--- a/Lib/nntplib.py
+++ b/Lib/nntplib.py
@@ -1,4 +1,7 @@
-"""An NNTP client class based on RFC 977: Network News Transfer Protocol.
+"""An NNTP client class based on:
+- RFC 977: Network News Transfer Protocol
+- RFC 2980: Common NNTP Extensions
+- RFC 3977: Network News Transfer Protocol (version 2)
Example:
@@ -27,15 +30,53 @@ are strings, not numbers, since they are rarely used for calculations.
# RFC 977 by Brian Kantor and Phil Lapsley.
# xover, xgtitle, xpath, date methods by Kevan Heydon
+# Incompatible changes from the 2.x nntplib:
+# - all commands are encoded as UTF-8 data (using the "surrogateescape"
+# error handler), except for raw message data (POST, IHAVE)
+# - all responses are decoded as UTF-8 data (using the "surrogateescape"
+# error handler), except for raw message data (ARTICLE, HEAD, BODY)
+# - the `file` argument to various methods is keyword-only
+#
+# - NNTP.date() returns a datetime object
+# - NNTP.newgroups() and NNTP.newnews() take a datetime (or date) object,
+# rather than a pair of (date, time) strings.
+# - NNTP.newgroups() and NNTP.list() return a list of GroupInfo named tuples
+# - NNTP.descriptions() returns a dict mapping group names to descriptions
+# - NNTP.xover() returns a list of dicts mapping field names (header or metadata)
+# to field values; each dict representing a message overview.
+# - NNTP.article(), NNTP.head() and NNTP.body() return a (response, ArticleInfo)
+# tuple.
+# - the "internal" methods have been marked private (they now start with
+# an underscore)
+
+# Other changes from the 2.x/3.1 nntplib:
+# - automatic querying of capabilities at connect
+# - New method NNTP.getcapabilities()
+# - New method NNTP.over()
+# - New helper function decode_header()
+# - NNTP.post() and NNTP.ihave() accept file objects, bytes-like objects and
+# arbitrary iterables yielding lines.
+# - An extensive test suite :-)
+
+# TODO:
+# - return structured data (GroupInfo etc.) everywhere
+# - support HDR
# Imports
import re
import socket
+import collections
+import datetime
+import warnings
-__all__ = ["NNTP","NNTPReplyError","NNTPTemporaryError",
- "NNTPPermanentError","NNTPProtocolError","NNTPDataError",
- "error_reply","error_temp","error_perm","error_proto",
- "error_data",]
+from email.header import decode_header as _email_decode_header
+from socket import _GLOBAL_DEFAULT_TIMEOUT
+
+__all__ = ["NNTP",
+ "NNTPReplyError", "NNTPTemporaryError", "NNTPPermanentError",
+ "NNTPProtocolError", "NNTPDataError",
+ "decode_header",
+ ]
# Exceptions raised when an error or invalid response is received
class NNTPError(Exception):
@@ -67,39 +108,189 @@ class NNTPDataError(NNTPError):
"""Error in response data"""
pass
-# for backwards compatibility
-error_reply = NNTPReplyError
-error_temp = NNTPTemporaryError
-error_perm = NNTPPermanentError
-error_proto = NNTPProtocolError
-error_data = NNTPDataError
-
-
# Standard port used by NNTP servers
NNTP_PORT = 119
# Response numbers that are followed by additional text (e.g. article)
-LONGRESP = [b'100', b'215', b'220', b'221', b'222', b'224', b'230', b'231', b'282']
-
+_LONGRESP = {
+ '100', # HELP
+ '101', # CAPABILITIES
+ '211', # LISTGROUP (also not multi-line with GROUP)
+ '215', # LIST
+ '220', # ARTICLE
+ '221', # HEAD, XHDR
+ '222', # BODY
+ '224', # OVER, XOVER
+ '225', # HDR
+ '230', # NEWNEWS
+ '231', # NEWGROUPS
+ '282', # XGTITLE
+}
+
+# Default decoded value for LIST OVERVIEW.FMT if not supported
+_DEFAULT_OVERVIEW_FMT = [
+ "subject", "from", "date", "message-id", "references", ":bytes", ":lines"]
+
+# Alternative names allowed in LIST OVERVIEW.FMT response
+_OVERVIEW_FMT_ALTERNATIVES = {
+ 'bytes': ':bytes',
+ 'lines': ':lines',
+}
# Line terminators (we always output CRLF, but accept any of CRLF, CR, LF)
-CRLF = b'\r\n'
+_CRLF = b'\r\n'
+GroupInfo = collections.namedtuple('GroupInfo',
+ ['group', 'last', 'first', 'flag'])
+ArticleInfo = collections.namedtuple('ArticleInfo',
+ ['number', 'message_id', 'lines'])
-# The class itself
-class NNTP:
- def __init__(self, host, port=NNTP_PORT, user=None, password=None,
- readermode=None, usenetrc=True):
+
+# Helper function(s)
+def decode_header(header_str):
+ """Takes an unicode string representing a munged header value
+ and decodes it as a (possibly non-ASCII) readable value."""
+ parts = []
+ for v, enc in _email_decode_header(header_str):
+ if isinstance(v, bytes):
+ parts.append(v.decode(enc or 'ascii'))
+ else:
+ parts.append(v)
+ return ' '.join(parts)
+
+def _parse_overview_fmt(lines):
+ """Parse a list of string representing the response to LIST OVERVIEW.FMT
+ and return a list of header/metadata names.
+ Raises NNTPDataError if the response is not compliant
+ (cf. RFC 3977, section 8.4)."""
+ fmt = []
+ for line in lines:
+ if line[0] == ':':
+ # Metadata name (e.g. ":bytes")
+ name, _, suffix = line[1:].partition(':')
+ name = ':' + name
+ else:
+ # Header name (e.g. "Subject:" or "Xref:full")
+ name, _, suffix = line.partition(':')
+ name = name.lower()
+ name = _OVERVIEW_FMT_ALTERNATIVES.get(name, name)
+ # Should we do something with the suffix?
+ fmt.append(name)
+ defaults = _DEFAULT_OVERVIEW_FMT
+ if len(fmt) < len(defaults):
+ raise NNTPDataError("LIST OVERVIEW.FMT response too short")
+ if fmt[:len(defaults)] != defaults:
+ raise NNTPDataError("LIST OVERVIEW.FMT redefines default fields")
+ return fmt
+
+def _parse_overview(lines, fmt, data_process_func=None):
+ """Parse the response to a OVER or XOVER command according to the
+ overview format `fmt`."""
+ n_defaults = len(_DEFAULT_OVERVIEW_FMT)
+ overview = []
+ for line in lines:
+ fields = {}
+ article_number, *tokens = line.split('\t')
+ article_number = int(article_number)
+ for i, token in enumerate(tokens):
+ if i >= len(fmt):
+ # XXX should we raise an error? Some servers might not
+ # support LIST OVERVIEW.FMT and still return additional
+ # headers.
+ continue
+ field_name = fmt[i]
+ is_metadata = field_name.startswith(':')
+ if i >= n_defaults and not is_metadata:
+ # Non-default header names are included in full in the response
+ h = field_name + ":"
+ if token[:len(h)].lower() != h:
+ raise NNTPDataError("OVER/XOVER response doesn't include "
+ "names of additional headers")
+ token = token[len(h):].lstrip(" ")
+ fields[fmt[i]] = token
+ overview.append((article_number, fields))
+ return overview
+
+def _parse_datetime(date_str, time_str=None):
+ """Parse a pair of (date, time) strings, and return a datetime object.
+ If only the date is given, it is assumed to be date and time
+ concatenated together (e.g. response to the DATE command).
+ """
+ if time_str is None:
+ time_str = date_str[-6:]
+ date_str = date_str[:-6]
+ hours = int(time_str[:2])
+ minutes = int(time_str[2:4])
+ seconds = int(time_str[4:])
+ year = int(date_str[:-4])
+ month = int(date_str[-4:-2])
+ day = int(date_str[-2:])
+ # RFC 3977 doesn't say how to interpret 2-char years. Assume that
+ # there are no dates before 1970 on Usenet.
+ if year < 70:
+ year += 2000
+ elif year < 100:
+ year += 1900
+ return datetime.datetime(year, month, day, hours, minutes, seconds)
+
+def _unparse_datetime(dt, legacy=False):
+ """Format a date or datetime object as a pair of (date, time) strings
+ in the format required by the NEWNEWS and NEWGROUPS commands. If a
+ date object is passed, the time is assumed to be midnight (00h00).
+
+ The returned representation depends on the legacy flag:
+ * if legacy is False (the default):
+ date has the YYYYMMDD format and time the HHMMSS format
+ * if legacy is True:
+ date has the YYMMDD format and time the HHMMSS format.
+ RFC 3977 compliant servers should understand both formats; therefore,
+ legacy is only needed when talking to old servers.
+ """
+ if not isinstance(dt, datetime.datetime):
+ time_str = "000000"
+ else:
+ time_str = "{0.hour:02d}{0.minute:02d}{0.second:02d}".format(dt)
+ y = dt.year
+ if legacy:
+ y = y % 100
+ date_str = "{0:02d}{1.month:02d}{1.day:02d}".format(y, dt)
+ else:
+ date_str = "{0:04d}{1.month:02d}{1.day:02d}".format(y, dt)
+ return date_str, time_str
+
+
+# The classes themselves
+class _NNTPBase:
+ # UTF-8 is the character set for all NNTP commands and responses: they
+ # are automatically encoded (when sending) and decoded (and receiving)
+ # by this class.
+ # However, some multi-line data blocks can contain arbitrary bytes (for
+ # example, latin-1 or utf-16 data in the body of a message). Commands
+ # taking (POST, IHAVE) or returning (HEAD, BODY, ARTICLE) raw message
+ # data will therefore only accept and produce bytes objects.
+ # Furthermore, since there could be non-compliant servers out there,
+ # we use 'surrogateescape' as the error handler for fault tolerance
+ # and easy round-tripping. This could be useful for some applications
+ # (e.g. NNTP gateways).
+
+ encoding = 'utf-8'
+ errors = 'surrogateescape'
+
+ def __init__(self, file, user=None, password=None,
+ readermode=None, usenetrc=True,
+ timeout=_GLOBAL_DEFAULT_TIMEOUT):
"""Initialize an instance. Arguments:
- - host: hostname to connect to
- - port: port to connect to (default the standard NNTP port)
+ - file: file-like object (open for read/write in binary mode)
- user: username to authenticate with
- password: password to use with username
- readermode: if true, send 'mode reader' command after
connecting.
+ - usenetrc: allow loading username and password from ~/.netrc file
+ if not specified explicitly
+ - timeout: timeout (in seconds) used for socket connections
readermode is sometimes necessary if you are connecting to an
NNTP server on the local machine and intend to call
@@ -107,12 +298,9 @@ class NNTP:
unexpected NNTPPermanentErrors, you might need to set
readermode.
"""
- self.host = host
- self.port = port
- self.sock = socket.create_connection((host, port))
- self.file = self.sock.makefile('rb')
+ self.file = file
self.debugging = 0
- self.welcome = self.getresp()
+ self.welcome = self._getresp()
# 'mode reader' is sometimes necessary to enable 'reader' mode.
# However, the order in which 'mode reader' and 'authinfo' need to
@@ -122,12 +310,12 @@ class NNTP:
readermode_afterauth = 0
if readermode:
try:
- self.welcome = self.shortcmd('mode reader')
+ self.welcome = self._shortcmd('mode reader')
except NNTPPermanentError:
# error 500, probably 'not implemented'
pass
except NNTPTemporaryError as e:
- if user and e.response.startswith(b'480'):
+ if user and e.response.startswith('480'):
# Need authorization before 'mode reader'
readermode_afterauth = 1
else:
@@ -144,29 +332,35 @@ class NNTP:
password = auth[2]
except IOError:
pass
- # Perform NNRP authentication if needed.
+ # Perform NNTP authentication if needed.
if user:
- resp = self.shortcmd('authinfo user '+user)
- if resp.startswith(b'381'):
+ resp = self._shortcmd('authinfo user '+user)
+ if resp.startswith('381'):
if not password:
raise NNTPReplyError(resp)
else:
- resp = self.shortcmd(
+ resp = self._shortcmd(
'authinfo pass '+password)
- if not resp.startswith(b'281'):
+ if not resp.startswith('281'):
raise NNTPPermanentError(resp)
if readermode_afterauth:
try:
- self.welcome = self.shortcmd('mode reader')
+ self.welcome = self._shortcmd('mode reader')
except NNTPPermanentError:
# error 500, probably 'not implemented'
pass
-
- # Get the welcome message from the server
- # (this is read and squirreled away by __init__()).
- # If the response code is 200, posting is allowed;
- # if it 201, posting is not allowed
+ # Inquire about capabilities (RFC 3977)
+ self.nntp_version = 1
+ try:
+ resp, caps = self.capabilities()
+ except NNTPPermanentError:
+ # Server doesn't support capabilities
+ self._caps = {}
+ else:
+ self._caps = caps
+ if 'VERSION' in caps:
+ self.nntp_version = int(caps['VERSION'][0])
def getwelcome(self):
"""Get the welcome message from the server
@@ -177,6 +371,12 @@ class NNTP:
if self.debugging: print('*welcome*', repr(self.welcome))
return self.welcome
+ def getcapabilities(self):
+ """Get the server capabilities, as read by __init__().
+ If the CAPABILITIES command is not supported, an empty dict is
+ returned."""
+ return self._caps
+
def set_debuglevel(self, level):
"""Set the debugging level. Argument 'level' means:
0: no debugging output (default)
@@ -186,121 +386,221 @@ class NNTP:
self.debugging = level
debug = set_debuglevel
- def putline(self, line):
- """Internal: send one line to the server, appending CRLF."""
- line = line + CRLF
+ def _putline(self, line):
+ """Internal: send one line to the server, appending CRLF.
+ The `line` must be a bytes-like object."""
+ line = line + _CRLF
if self.debugging > 1: print('*put*', repr(line))
- self.sock.sendall(line)
+ self.file.write(line)
+ self.file.flush()
- def putcmd(self, line):
- """Internal: send one command to the server (through putline())."""
+ def _putcmd(self, line):
+ """Internal: send one command to the server (through _putline()).
+ The `line` must be an unicode string."""
if self.debugging: print('*cmd*', repr(line))
- line = bytes(line, "ASCII")
- self.putline(line)
+ line = line.encode(self.encoding, self.errors)
+ self._putline(line)
- def getline(self):
- """Internal: return one line from the server, stripping CRLF.
- Raise EOFError if the connection is closed."""
+ def _getline(self, strip_crlf=True):
+ """Internal: return one line from the server, stripping _CRLF.
+ Raise EOFError if the connection is closed.
+ Returns a bytes object."""
line = self.file.readline()
if self.debugging > 1:
print('*get*', repr(line))
if not line: raise EOFError
- if line[-2:] == CRLF:
- line = line[:-2]
- elif line[-1:] in CRLF:
- line = line[:-1]
+ if strip_crlf:
+ if line[-2:] == _CRLF:
+ line = line[:-2]
+ elif line[-1:] in _CRLF:
+ line = line[:-1]
return line
- def getresp(self):
+ def _getresp(self):
"""Internal: get a response from the server.
- Raise various errors if the response indicates an error."""
- resp = self.getline()
+ Raise various errors if the response indicates an error.
+ Returns an unicode string."""
+ resp = self._getline()
if self.debugging: print('*resp*', repr(resp))
+ resp = resp.decode(self.encoding, self.errors)
c = resp[:1]
- if c == b'4':
+ if c == '4':
raise NNTPTemporaryError(resp)
- if c == b'5':
+ if c == '5':
raise NNTPPermanentError(resp)
- if c not in b'123':
+ if c not in '123':
raise NNTPProtocolError(resp)
return resp
- def getlongresp(self, file=None):
+ def _getlongresp(self, file=None):
"""Internal: get a response plus following text from the server.
- Raise various errors if the response indicates an error."""
+ Raise various errors if the response indicates an error.
+
+ Returns a (response, lines) tuple where `response` is an unicode
+ string and `lines` is a list of bytes objects.
+ If `file` is a file-like object, it must be open in binary mode.
+ """
openedFile = None
try:
# If a string was passed then open a file with that name
- if isinstance(file, str):
- openedFile = file = open(file, "w")
+ if isinstance(file, (str, bytes)):
+ openedFile = file = open(file, "wb")
- resp = self.getresp()
- if resp[:3] not in LONGRESP:
+ resp = self._getresp()
+ if resp[:3] not in _LONGRESP:
raise NNTPReplyError(resp)
- list = []
- while 1:
- line = self.getline()
- if line == b'.':
- break
- if line.startswith(b'..'):
- line = line[1:]
- if file:
- file.write(line + b'\n')
- else:
- list.append(line)
+
+ lines = []
+ if file is not None:
+ # XXX lines = None instead?
+ terminators = (b'.' + _CRLF, b'.\n')
+ while 1:
+ line = self._getline(False)
+ if line in terminators:
+ break
+ if line.startswith(b'..'):
+ line = line[1:]
+ file.write(line)
+ else:
+ terminator = b'.'
+ while 1:
+ line = self._getline()
+ if line == terminator:
+ break
+ if line.startswith(b'..'):
+ line = line[1:]
+ lines.append(line)
finally:
# If this method created the file, then it must close it
if openedFile:
openedFile.close()
- return resp, list
+ return resp, lines
- def shortcmd(self, line):
- """Internal: send a command and get the response."""
- self.putcmd(line)
- return self.getresp()
+ def _shortcmd(self, line):
+ """Internal: send a command and get the response.
+ Same return value as _getresp()."""
+ self._putcmd(line)
+ return self._getresp()
+
+ def _longcmd(self, line, file=None):
+ """Internal: send a command and get the response plus following text.
+ Same return value as _getlongresp()."""
+ self._putcmd(line)
+ return self._getlongresp(file)
+
+ def _longcmdstring(self, line, file=None):
+ """Internal: send a command and get the response plus following text.
+ Same as _longcmd() and _getlongresp(), except that the returned `lines`
+ are unicode strings rather than bytes objects.
+ """
+ self._putcmd(line)
+ resp, list = self._getlongresp(file)
+ return resp, [line.decode(self.encoding, self.errors)
+ for line in list]
+
+ def _getoverviewfmt(self):
+ """Internal: get the overview format. Queries the server if not
+ already done, else returns the cached value."""
+ try:
+ return self._cachedoverviewfmt
+ except AttributeError:
+ pass
+ try:
+ resp, lines = self._longcmdstring("LIST OVERVIEW.FMT")
+ except NNTPPermanentError:
+ # Not supported by server?
+ fmt = _DEFAULT_OVERVIEW_FMT[:]
+ else:
+ fmt = _parse_overview_fmt(lines)
+ self._cachedoverviewfmt = fmt
+ return fmt
- def longcmd(self, line, file=None):
- """Internal: send a command and get the response plus following text."""
- self.putcmd(line)
- return self.getlongresp(file)
+ def _grouplist(self, lines):
+ # Parse lines into "group last first flag"
+ return [GroupInfo(*line.split()) for line in lines]
- def newgroups(self, date, time, file=None):
- """Process a NEWGROUPS command. Arguments:
- - date: string 'yymmdd' indicating the date
- - time: string 'hhmmss' indicating the time
+ def capabilities(self):
+ """Process a CAPABILITIES command. Not supported by all servers.
Return:
- resp: server response if successful
- - list: list of newsgroup names"""
-
- return self.longcmd('NEWGROUPS ' + date + ' ' + time, file)
+ - caps: a dictionary mapping capability names to lists of tokens
+ (for example {'VERSION': ['2'], 'OVER': [], LIST: ['ACTIVE', 'HEADERS'] })
+ """
+ caps = {}
+ resp, lines = self._longcmdstring("CAPABILITIES")
+ for line in lines:
+ name, *tokens = line.split()
+ caps[name] = tokens
+ return resp, caps
- def newnews(self, group, date, time, file=None):
+ def newgroups(self, date, *, file=None):
+ """Process a NEWGROUPS command. Arguments:
+ - date: a date or datetime object
+ Return:
+ - resp: server response if successful
+ - list: list of newsgroup names
+ """
+ if not isinstance(date, (datetime.date, datetime.date)):
+ raise TypeError(
+ "the date parameter must be a date or datetime object, "
+ "not '{:40}'".format(date.__class__.__name__))
+ date_str, time_str = _unparse_datetime(date, self.nntp_version < 2)
+ cmd = 'NEWGROUPS {0} {1}'.format(date_str, time_str)
+ resp, lines = self._longcmdstring(cmd, file)
+ return resp, self._grouplist(lines)
+
+ def newnews(self, group, date, *, file=None):
"""Process a NEWNEWS command. Arguments:
- group: group name or '*'
- - date: string 'yymmdd' indicating the date
- - time: string 'hhmmss' indicating the time
+ - date: a date or datetime object
Return:
- resp: server response if successful
- - list: list of message ids"""
-
- cmd = 'NEWNEWS ' + group + ' ' + date + ' ' + time
- return self.longcmd(cmd, file)
-
- def list(self, file=None):
- """Process a LIST command. Return:
+ - list: list of message ids
+ """
+ if not isinstance(date, (datetime.date, datetime.date)):
+ raise TypeError(
+ "the date parameter must be a date or datetime object, "
+ "not '{:40}'".format(date.__class__.__name__))
+ date_str, time_str = _unparse_datetime(date, self.nntp_version < 2)
+ cmd = 'NEWNEWS {0} {1} {2}'.format(group, date_str, time_str)
+ return self._longcmdstring(cmd, file)
+
+ def list(self, *, file=None):
+ """Process a LIST command. Argument:
+ - file: Filename string or file object to store the result in
+ Returns:
- resp: server response if successful
- - list: list of (group, last, first, flag) (strings)"""
+ - list: list of (group, last, first, flag) (strings)
+ """
+ resp, lines = self._longcmdstring('LIST', file)
+ return resp, self._grouplist(lines)
- resp, list = self.longcmd('LIST', file)
- for i in range(len(list)):
- # Parse lines into "group last first flag"
- list[i] = tuple(list[i].split())
- return resp, list
+ def _getdescriptions(self, group_pattern, return_all):
+ line_pat = re.compile('^(?P<group>[^ \t]+)[ \t]+(.*)$')
+ # Try the more std (acc. to RFC2980) LIST NEWSGROUPS first
+ resp, lines = self._longcmdstring('LIST NEWSGROUPS ' + group_pattern)
+ if not resp.startswith('215'):
+ # Now the deprecated XGTITLE. This either raises an error
+ # or succeeds with the same output structure as LIST
+ # NEWSGROUPS.
+ resp, lines = self._longcmdstring('XGTITLE ' + group_pattern)
+ groups = {}
+ for raw_line in lines:
+ match = line_pat.search(raw_line.strip())
+ if match:
+ name, desc = match.group(1, 2)
+ if not return_all:
+ return desc
+ groups[name] = desc
+ if return_all:
+ return resp, groups
+ else:
+ # Nothing found
+ return ''
def description(self, group):
-
"""Get a description for a single group. If more than one
group matches ('group' is a pattern), return the first. If no
group matches, return an empty string.
@@ -311,42 +611,24 @@ class NNTP:
NOTE: This neither checks for a wildcard in 'group' nor does
it check whether the group actually exists."""
-
- resp, lines = self.descriptions(group)
- if len(lines) == 0:
- return b''
- else:
- return lines[0][1]
+ return self._getdescriptions(group, False)
def descriptions(self, group_pattern):
"""Get descriptions for a range of groups."""
- line_pat = re.compile(b'^(?P<group>[^ \t]+)[ \t]+(.*)$')
- # Try the more std (acc. to RFC2980) LIST NEWSGROUPS first
- resp, raw_lines = self.longcmd('LIST NEWSGROUPS ' + group_pattern)
- if not resp.startswith(b'215'):
- # Now the deprecated XGTITLE. This either raises an error
- # or succeeds with the same output structure as LIST
- # NEWSGROUPS.
- resp, raw_lines = self.longcmd('XGTITLE ' + group_pattern)
- lines = []
- for raw_line in raw_lines:
- match = line_pat.search(raw_line.strip())
- if match:
- lines.append(match.group(1, 2))
- return resp, lines
+ return self._getdescriptions(group_pattern, True)
def group(self, name):
"""Process a GROUP command. Argument:
- group: the group name
Returns:
- resp: server response if successful
- - count: number of articles (string)
- - first: first article number (string)
- - last: last article number (string)
- - name: the group name"""
-
- resp = self.shortcmd('GROUP ' + name)
- if not resp.startswith(b'211'):
+ - count: number of articles
+ - first: first article number
+ - last: last article number
+ - name: the group name
+ """
+ resp = self._shortcmd('GROUP ' + name)
+ if not resp.startswith('211'):
raise NNTPReplyError(resp)
words = resp.split()
count = first = last = 0
@@ -359,151 +641,177 @@ class NNTP:
last = words[3]
if n > 4:
name = words[4].lower()
- return resp, count, first, last, name
+ return resp, int(count), int(first), int(last), name
- def help(self, file=None):
- """Process a HELP command. Returns:
+ def help(self, *, file=None):
+ """Process a HELP command. Argument:
+ - file: Filename string or file object to store the result in
+ Returns:
- resp: server response if successful
- - list: list of strings"""
-
- return self.longcmd('HELP',file)
+ - list: list of strings returned by the server in response to the
+ HELP command
+ """
+ return self._longcmdstring('HELP', file)
- def statparse(self, resp):
- """Internal: parse the response of a STAT, NEXT or LAST command."""
- if not resp.startswith(b'22'):
+ def _statparse(self, resp):
+ """Internal: parse the response line of a STAT, NEXT, LAST,
+ ARTICLE, HEAD or BODY command."""
+ if not resp.startswith('22'):
raise NNTPReplyError(resp)
words = resp.split()
- nr = 0
- id = b''
- n = len(words)
- if n > 1:
- nr = words[1]
- if n > 2:
- id = words[2]
- return resp, nr, id
+ art_num = int(words[1])
+ message_id = words[2]
+ return resp, art_num, message_id
- def statcmd(self, line):
+ def _statcmd(self, line):
"""Internal: process a STAT, NEXT or LAST command."""
- resp = self.shortcmd(line)
- return self.statparse(resp)
+ resp = self._shortcmd(line)
+ return self._statparse(resp)
- def stat(self, id):
+ def stat(self, message_spec=None):
"""Process a STAT command. Argument:
- - id: article number or message id
+ - message_spec: article number or message id (if not specified,
+ the current article is selected)
Returns:
- resp: server response if successful
- - nr: the article number
- - id: the message id"""
-
- return self.statcmd('STAT {0}'.format(id))
+ - art_num: the article number
+ - message_id: the message id
+ """
+ if message_spec:
+ return self._statcmd('STAT {0}'.format(message_spec))
+ else:
+ return self._statcmd('STAT')
def next(self):
"""Process a NEXT command. No arguments. Return as for STAT."""
- return self.statcmd('NEXT')
+ return self._statcmd('NEXT')
def last(self):
"""Process a LAST command. No arguments. Return as for STAT."""
- return self.statcmd('LAST')
+ return self._statcmd('LAST')
- def artcmd(self, line, file=None):
+ def _artcmd(self, line, file=None):
"""Internal: process a HEAD, BODY or ARTICLE command."""
- resp, list = self.longcmd(line, file)
- resp, nr, id = self.statparse(resp)
- return resp, nr, id, list
+ resp, lines = self._longcmd(line, file)
+ resp, art_num, message_id = self._statparse(resp)
+ return resp, ArticleInfo(art_num, message_id, lines)
- def head(self, id):
+ def head(self, message_spec=None, *, file=None):
"""Process a HEAD command. Argument:
- - id: article number or message id
+ - message_spec: article number or message id
+ - file: filename string or file object to store the headers in
Returns:
- resp: server response if successful
- - nr: article number
- - id: message id
- - list: the lines of the article's header"""
-
- return self.artcmd('HEAD {0}'.format(id))
+ - ArticleInfo: (article number, message id, list of header lines)
+ """
+ if message_spec is not None:
+ cmd = 'HEAD {0}'.format(message_spec)
+ else:
+ cmd = 'HEAD'
+ return self._artcmd(cmd, file)
- def body(self, id, file=None):
+ def body(self, message_spec=None, *, file=None):
"""Process a BODY command. Argument:
- - id: article number or message id
- - file: Filename string or file object to store the article in
+ - message_spec: article number or message id
+ - file: filename string or file object to store the body in
Returns:
- resp: server response if successful
- - nr: article number
- - id: message id
- - list: the lines of the article's body or an empty list
- if file was used"""
-
- return self.artcmd('BODY {0}'.format(id), file)
+ - ArticleInfo: (article number, message id, list of body lines)
+ """
+ if message_spec is not None:
+ cmd = 'BODY {0}'.format(message_spec)
+ else:
+ cmd = 'BODY'
+ return self._artcmd(cmd, file)
- def article(self, id):
+ def article(self, message_spec=None, *, file=None):
"""Process an ARTICLE command. Argument:
- - id: article number or message id
+ - message_spec: article number or message id
+ - file: filename string or file object to store the article in
Returns:
- resp: server response if successful
- - nr: article number
- - id: message id
- - list: the lines of the article"""
-
- return self.artcmd('ARTICLE {0}'.format(id))
+ - ArticleInfo: (article number, message id, list of article lines)
+ """
+ if message_spec is not None:
+ cmd = 'ARTICLE {0}'.format(message_spec)
+ else:
+ cmd = 'ARTICLE'
+ return self._artcmd(cmd, file)
def slave(self):
"""Process a SLAVE command. Returns:
- - resp: server response if successful"""
-
- return self.shortcmd('SLAVE')
+ - resp: server response if successful
+ """
+ return self._shortcmd('SLAVE')
- def xhdr(self, hdr, str, file=None):
+ def xhdr(self, hdr, str, *, file=None):
"""Process an XHDR command (optional server extension). Arguments:
- hdr: the header type (e.g. 'subject')
- str: an article nr, a message id, or a range nr1-nr2
+ - file: Filename string or file object to store the result in
Returns:
- resp: server response if successful
- - list: list of (nr, value) strings"""
-
- pat = re.compile(b'^([0-9]+) ?(.*)\n?')
- resp, lines = self.longcmd('XHDR {0} {1}'.format(hdr, str), file)
- for i in range(len(lines)):
- line = lines[i]
+ - list: list of (nr, value) strings
+ """
+ pat = re.compile('^([0-9]+) ?(.*)\n?')
+ resp, lines = self._longcmdstring('XHDR {0} {1}'.format(hdr, str), file)
+ def remove_number(line):
m = pat.match(line)
- if m:
- lines[i] = m.group(1, 2)
- return resp, lines
+ return m.group(1, 2) if m else line
+ return resp, [remove_number(line) for line in lines]
- def xover(self, start, end, file=None):
+ def xover(self, start, end, *, file=None):
"""Process an XOVER command (optional server extension) Arguments:
- start: start of range
- end: end of range
+ - file: Filename string or file object to store the result in
+ Returns:
+ - resp: server response if successful
+ - list: list of dicts containing the response fields
+ """
+ resp, lines = self._longcmdstring('XOVER {0}-{1}'.format(start, end),
+ file)
+ fmt = self._getoverviewfmt()
+ return resp, _parse_overview(lines, fmt)
+
+ def over(self, message_spec, *, file=None):
+ """Process an OVER command. If the command isn't supported, fall
+ back to XOVER. Arguments:
+ - message_spec:
+ - either a message id, indicating the article to fetch
+ information about
+ - or a (start, end) tuple, indicating a range of article numbers;
+ if end is None, information up to the newest message will be
+ retrieved
+ - or None, indicating the current article number must be used
+ - file: Filename string or file object to store the result in
Returns:
- resp: server response if successful
- - list: list of (art-nr, subject, poster, date,
- id, references, size, lines)"""
+ - list: list of dicts containing the response fields
- resp, lines = self.longcmd('XOVER {0}-{1}'.format(start, end), file)
- xover_lines = []
- for line in lines:
- elem = line.split(b'\t')
- try:
- xover_lines.append((elem[0],
- elem[1],
- elem[2],
- elem[3],
- elem[4],
- elem[5].split(),
- elem[6],
- elem[7]))
- except IndexError:
- raise NNTPDataError(line)
- return resp,xover_lines
-
- def xgtitle(self, group, file=None):
+ NOTE: the "message id" form isn't supported by XOVER
+ """
+ cmd = 'OVER' if 'OVER' in self._caps else 'XOVER'
+ if isinstance(message_spec, (tuple, list)):
+ start, end = message_spec
+ cmd += ' {0}-{1}'.format(start, end or '')
+ elif message_spec is not None:
+ cmd = cmd + ' ' + message_spec
+ resp, lines = self._longcmdstring(cmd, file)
+ fmt = self._getoverviewfmt()
+ return resp, _parse_overview(lines, fmt)
+
+ def xgtitle(self, group, *, file=None):
"""Process an XGTITLE command (optional server extension) Arguments:
- group: group name wildcard (i.e. news.*)
Returns:
- resp: server response if successful
- list: list of (name,title) strings"""
-
- line_pat = re.compile(b'^([^ \t]+)[ \t]+(.*)$')
- resp, raw_lines = self.longcmd('XGTITLE ' + group, file)
+ warnings.warn("The XGTITLE extension is not actively used, "
+ "use descriptions() instead",
+ PendingDeprecationWarning, 2)
+ line_pat = re.compile('^([^ \t]+)[ \t]+(.*)$')
+ resp, raw_lines = self._longcmdstring('XGTITLE ' + group, file)
lines = []
for raw_line in raw_lines:
match = line_pat.search(raw_line.strip())
@@ -511,15 +819,18 @@ class NNTP:
lines.append(match.group(1, 2))
return resp, lines
- def xpath(self,id):
+ def xpath(self, id):
"""Process an XPATH command (optional server extension) Arguments:
- id: Message id of article
Returns:
resp: server response if successful
- path: directory path to article"""
+ path: directory path to article
+ """
+ warnings.warn("The XPATH extension is not actively used",
+ PendingDeprecationWarning, 2)
- resp = self.shortcmd('XPATH {0}'.format(id))
- if not resp.startswith(b'223'):
+ resp = self._shortcmd('XPATH {0}'.format(id))
+ if not resp.startswith('223'):
raise NNTPReplyError(resp)
try:
[resp_num, path] = resp.split()
@@ -528,89 +839,144 @@ class NNTP:
else:
return resp, path
- def date (self):
- """Process the DATE command. Arguments:
- None
+ def date(self):
+ """Process the DATE command.
Returns:
- resp: server response if successful
- date: Date suitable for newnews/newgroups commands etc.
- time: Time suitable for newnews/newgroups commands etc."""
-
- resp = self.shortcmd("DATE")
- if not resp.startswith(b'111'):
+ - resp: server response if successful
+ - date: datetime object
+ """
+ resp = self._shortcmd("DATE")
+ if not resp.startswith('111'):
raise NNTPReplyError(resp)
elem = resp.split()
if len(elem) != 2:
raise NNTPDataError(resp)
- date = elem[1][2:8]
- time = elem[1][-6:]
- if len(date) != 6 or len(time) != 6:
+ date = elem[1]
+ if len(date) != 14:
raise NNTPDataError(resp)
- return resp, date, time
+ return resp, _parse_datetime(date, None)
def _post(self, command, f):
- resp = self.shortcmd(command)
- # Raises error_??? if posting is not allowed
- if not resp.startswith(b'3'):
+ resp = self._shortcmd(command)
+ # Raises a specific exception if posting is not allowed
+ if not resp.startswith('3'):
raise NNTPReplyError(resp)
- while 1:
- line = f.readline()
- if not line:
- break
- if line.endswith(b'\n'):
- line = line[:-1]
+ if isinstance(f, (bytes, bytearray)):
+ f = f.splitlines()
+ # We don't use _putline() because:
+ # - we don't want additional CRLF if the file or iterable is already
+ # in the right format
+ # - we don't want a spurious flush() after each line is written
+ for line in f:
+ if not line.endswith(_CRLF):
+ line = line.rstrip(b"\r\n") + _CRLF
if line.startswith(b'.'):
line = b'.' + line
- self.putline(line)
- self.putline(b'.')
- return self.getresp()
+ self.file.write(line)
+ self.file.write(b".\r\n")
+ self.file.flush()
+ return self._getresp()
- def post(self, f):
+ def post(self, data):
"""Process a POST command. Arguments:
- - f: file containing the article
+ - data: bytes object, iterable or file containing the article
Returns:
- resp: server response if successful"""
- return self._post('POST', f)
+ return self._post('POST', data)
- def ihave(self, id, f):
+ def ihave(self, message_id, data):
"""Process an IHAVE command. Arguments:
- - id: message-id of the article
- - f: file containing the article
+ - message_id: message-id of the article
+ - data: file containing the article
Returns:
- resp: server response if successful
Note that if the server refuses the article an exception is raised."""
- return self._post('IHAVE {0}'.format(id), f)
+ return self._post('IHAVE {0}'.format(message_id), data)
+
+ def _close(self):
+ self.file.close()
+ del self.file
def quit(self):
"""Process a QUIT command and close the socket. Returns:
- resp: server response if successful"""
-
- resp = self.shortcmd('QUIT')
- self.file.close()
- self.sock.close()
- del self.file, self.sock
+ try:
+ resp = self._shortcmd('QUIT')
+ finally:
+ self._close()
return resp
+class NNTP(_NNTPBase):
+
+ def __init__(self, host, port=NNTP_PORT, user=None, password=None,
+ readermode=None, usenetrc=True,
+ timeout=_GLOBAL_DEFAULT_TIMEOUT):
+ """Initialize an instance. Arguments:
+ - host: hostname to connect to
+ - port: port to connect to (default the standard NNTP port)
+ - user: username to authenticate with
+ - password: password to use with username
+ - readermode: if true, send 'mode reader' command after
+ connecting.
+ - usenetrc: allow loading username and password from ~/.netrc file
+ if not specified explicitly
+ - timeout: timeout (in seconds) used for socket connections
+
+ readermode is sometimes necessary if you are connecting to an
+ NNTP server on the local machine and intend to call
+ reader-specific comamnds, such as `group'. If you get
+ unexpected NNTPPermanentErrors, you might need to set
+ readermode.
+ """
+ self.host = host
+ self.port = port
+ self.sock = socket.create_connection((host, port), timeout)
+ file = self.sock.makefile("rwb")
+ _NNTPBase.__init__(self, file, user, password,
+ readermode, usenetrc, timeout)
+
+ def _close(self):
+ try:
+ _NNTPBase._close(self)
+ finally:
+ self.sock.close()
+
+
# Test retrieval when run as a script.
-# Assumption: if there's a local news server, it's called 'news'.
-# Assumption: if user queries a remote news server, it's named
-# in the environment variable NNTPSERVER (used by slrn and kin)
-# and we want readermode off.
if __name__ == '__main__':
- import os
- newshost = 'news' and os.environ["NNTPSERVER"]
- if newshost.find('.') == -1:
- mode = 'readermode'
- else:
- mode = None
- s = NNTP(newshost, readermode=mode)
- resp, count, first, last, name = s.group('comp.lang.python')
- print(resp)
+ import argparse
+ from email.utils import parsedate
+
+ parser = argparse.ArgumentParser(description="""\
+ nntplib built-in demo - display the latest articles in a newsgroup""")
+ parser.add_argument('-g', '--group', default='gmane.comp.python.general',
+ help='group to fetch messages from (default: %(default)s)')
+ parser.add_argument('-s', '--server', default='news.gmane.org',
+ help='NNTP server hostname (default: %(default)s)')
+ parser.add_argument('-p', '--port', default=NNTP_PORT, type=int,
+ help='NNTP port number (default: %(default)s)')
+ parser.add_argument('-n', '--nb-articles', default=10, type=int,
+ help='number of articles to fetch (default: %(default)s)')
+ args = parser.parse_args()
+
+ s = NNTP(host=args.server, port=args.port)
+ resp, count, first, last, name = s.group(args.group)
print('Group', name, 'has', count, 'articles, range', first, 'to', last)
- resp, subs = s.xhdr('subject', '{0}-{1}'.format(first, last))
- print(resp)
- for item in subs:
- print("%7s %s" % item)
- resp = s.quit()
- print(resp)
+
+ def cut(s, lim):
+ if len(s) > lim:
+ s = s[:lim - 4] + "..."
+ return s
+
+ first = str(int(last) - args.nb_articles + 1)
+ resp, overviews = s.xover(first, last)
+ for artnum, over in overviews:
+ author = decode_header(over['from']).split('<', 1)[0]
+ subject = decode_header(over['subject'])
+ lines = int(over[':lines'])
+ print("{:7} {:20} {:42} ({})".format(
+ artnum, cut(author, 20), cut(subject, 42), lines)
+ )
+
+ s.quit()
diff --git a/Lib/test/test_nntplib.py b/Lib/test/test_nntplib.py
new file mode 100644
index 0000000..512bcd5
--- /dev/null
+++ b/Lib/test/test_nntplib.py
@@ -0,0 +1,1091 @@
+import io
+import datetime
+import textwrap
+import unittest
+import contextlib
+from test import support
+from nntplib import NNTP, GroupInfo
+import nntplib
+
+TIMEOUT = 30
+
+# TODO:
+# - test the `file` arg to more commands
+# - test error conditions
+
+
+class NetworkedNNTPTestsMixin:
+
+ def test_welcome(self):
+ welcome = self.server.getwelcome()
+ self.assertEqual(str, type(welcome))
+
+ def test_help(self):
+ resp, list = self.server.help()
+ self.assertTrue(resp.startswith("100 "), resp)
+ for line in list:
+ self.assertEqual(str, type(line))
+
+ def test_list(self):
+ resp, list = self.server.list()
+ if len(list) > 0:
+ self.assertEqual(GroupInfo, type(list[0]))
+ self.assertEqual(str, type(list[0].group))
+
+ def test_unknown_command(self):
+ with self.assertRaises(nntplib.NNTPPermanentError) as cm:
+ self.server._shortcmd("XYZZY")
+ resp = cm.exception.response
+ self.assertTrue(resp.startswith("500 "), resp)
+
+ def test_newgroups(self):
+ # gmane gets a constant influx of new groups. In order not to stress
+ # the server too much, we choose a recent date in the past.
+ dt = datetime.date.today() - datetime.timedelta(days=7)
+ resp, groups = self.server.newgroups(dt)
+ if len(groups) > 0:
+ self.assertIsInstance(groups[0], GroupInfo)
+ self.assertIsInstance(groups[0].group, str)
+
+ def test_description(self):
+ def _check_desc(desc):
+ # Sanity checks
+ self.assertIsInstance(desc, str)
+ self.assertNotIn(self.GROUP_NAME, desc)
+ desc = self.server.description(self.GROUP_NAME)
+ _check_desc(desc)
+ # Another sanity check
+ self.assertIn("Python", desc)
+ # With a pattern
+ desc = self.server.description(self.GROUP_PAT)
+ _check_desc(desc)
+ # Shouldn't exist
+ desc = self.server.description("zk.brrtt.baz")
+ self.assertEqual(desc, '')
+
+ def test_descriptions(self):
+ resp, descs = self.server.descriptions(self.GROUP_PAT)
+ # 215 for LIST NEWSGROUPS, 282 for XGTITLE
+ self.assertTrue(
+ resp.startswith("215 ") or resp.startswith("282 "), resp)
+ self.assertIsInstance(descs, dict)
+ desc = descs[self.GROUP_NAME]
+ self.assertEqual(desc, self.server.description(self.GROUP_NAME))
+
+ def test_group(self):
+ result = self.server.group(self.GROUP_NAME)
+ self.assertEqual(5, len(result))
+ resp, count, first, last, group = result
+ self.assertEqual(group, self.GROUP_NAME)
+ self.assertIsInstance(count, int)
+ self.assertIsInstance(first, int)
+ self.assertIsInstance(last, int)
+ self.assertLessEqual(first, last)
+ self.assertTrue(resp.startswith("211 "), resp)
+
+ def test_date(self):
+ resp, date = self.server.date()
+ self.assertIsInstance(date, datetime.datetime)
+ # Sanity check
+ self.assertGreaterEqual(date.year, 1995)
+ self.assertLessEqual(date.year, 2030)
+
+ def _check_art_dict(self, art_dict):
+ # Some sanity checks for a field dictionary returned by OVER / XOVER
+ self.assertIsInstance(art_dict, dict)
+ # NNTP has 7 mandatory fields
+ self.assertGreaterEqual(art_dict.keys(),
+ {"subject", "from", "date", "message-id",
+ "references", ":bytes", ":lines"}
+ )
+ for v in art_dict.values():
+ self.assertIsInstance(v, str)
+
+ def test_xover(self):
+ resp, count, first, last, name = self.server.group(self.GROUP_NAME)
+ resp, lines = self.server.xover(last, last)
+ art_num, art_dict = lines[0]
+ self.assertEqual(art_num, last)
+ self._check_art_dict(art_dict)
+
+ def test_over(self):
+ resp, count, first, last, name = self.server.group(self.GROUP_NAME)
+ start = last - 10
+ # The "start-" article range form
+ resp, lines = self.server.over((start, None))
+ art_num, art_dict = lines[0]
+ self._check_art_dict(art_dict)
+ # The "start-end" article range form
+ resp, lines = self.server.over((start, last))
+ art_num, art_dict = lines[-1]
+ self.assertEqual(art_num, last)
+ self._check_art_dict(art_dict)
+ # XXX The "message_id" form is unsupported by gmane
+ # 503 Overview by message-ID unsupported
+
+ def test_xhdr(self):
+ resp, count, first, last, name = self.server.group(self.GROUP_NAME)
+ resp, lines = self.server.xhdr('subject', last)
+ for line in lines:
+ self.assertEqual(str, type(line[1]))
+
+ def check_article_resp(self, resp, article, art_num=None):
+ self.assertIsInstance(article, nntplib.ArticleInfo)
+ if art_num is not None:
+ self.assertEqual(article.number, art_num)
+ for line in article.lines:
+ self.assertIsInstance(line, bytes)
+ # XXX this could exceptionally happen...
+ self.assertNotIn(article.lines[-1], (b".", b".\n", b".\r\n"))
+
+ def test_article_head_body(self):
+ resp, count, first, last, name = self.server.group(self.GROUP_NAME)
+ resp, head = self.server.head(last)
+ self.assertTrue(resp.startswith("221 "), resp)
+ self.check_article_resp(resp, head, last)
+ resp, body = self.server.body(last)
+ self.assertTrue(resp.startswith("222 "), resp)
+ self.check_article_resp(resp, body, last)
+ resp, article = self.server.article(last)
+ self.assertTrue(resp.startswith("220 "), resp)
+ self.check_article_resp(resp, article, last)
+ self.assertEqual(article.lines, head.lines + [b''] + body.lines)
+
+ def test_quit(self):
+ self.server.quit()
+ self.server = None
+
+
+class NetworkedNNTPTests(NetworkedNNTPTestsMixin, unittest.TestCase):
+ NNTP_HOST = 'news.gmane.org'
+ GROUP_NAME = 'gmane.comp.python.devel'
+ GROUP_PAT = 'gmane.comp.python.d*'
+
+ def setUp(self):
+ support.requires("network")
+ with support.transient_internet(self.NNTP_HOST):
+ self.server = NNTP(self.NNTP_HOST, timeout=TIMEOUT)
+
+ def tearDown(self):
+ if self.server is not None:
+ self.server.quit()
+
+ # Disabled with gmane as it produces too much data
+ test_list = None
+
+ def test_capabilities(self):
+ # As of this writing, gmane implements NNTP version 2 and has a
+ # couple of well-known capabilities. Just sanity check that we
+ # got them.
+ def _check_caps(caps):
+ caps_list = caps['LIST']
+ self.assertIsInstance(caps_list, (list, tuple))
+ self.assertIn('OVERVIEW.FMT', caps_list)
+ self.assertGreaterEqual(self.server.nntp_version, 2)
+ _check_caps(self.server.getcapabilities())
+ # This re-emits the command
+ resp, caps = self.server.capabilities()
+ _check_caps(caps)
+
+
+#
+# Non-networked tests using a local server (or something mocking it).
+#
+
+class _NNTPServerIO(io.RawIOBase):
+ """A raw IO object allowing NNTP commands to be received and processed
+ by a handler. The handler can push responses which can then be read
+ from the IO object."""
+
+ def __init__(self, handler):
+ io.RawIOBase.__init__(self)
+ # The channel from the client
+ self.c2s = io.BytesIO()
+ # The channel to the client
+ self.s2c = io.BytesIO()
+ self.handler = handler
+ self.handler.start(self.c2s.readline, self.push_data)
+
+ def readable(self):
+ return True
+
+ def writable(self):
+ return True
+
+ def push_data(self, data):
+ """Push (buffer) some data to send to the client."""
+ pos = self.s2c.tell()
+ self.s2c.seek(0, 2)
+ self.s2c.write(data)
+ self.s2c.seek(pos)
+
+ def write(self, b):
+ """The client sends us some data"""
+ pos = self.c2s.tell()
+ self.c2s.write(b)
+ self.c2s.seek(pos)
+ self.handler.process_pending()
+ return len(b)
+
+ def readinto(self, buf):
+ """The client wants to read a response"""
+ self.handler.process_pending()
+ b = self.s2c.read(len(buf))
+ n = len(b)
+ buf[:n] = b
+ return n
+
+
+class MockedNNTPTestsMixin:
+ # Override in derived classes
+ handler_class = None
+
+ def setUp(self):
+ super().setUp()
+ self.make_server()
+
+ def tearDown(self):
+ super().tearDown()
+ del self.server
+
+ def make_server(self, *args, **kwargs):
+ self.handler = self.handler_class()
+ self.sio = _NNTPServerIO(self.handler)
+ # Using BufferedRWPair instead of BufferedRandom ensures the file
+ # isn't seekable.
+ file = io.BufferedRWPair(self.sio, self.sio)
+ self.server = nntplib._NNTPBase(file, *args, **kwargs)
+ return self.server
+
+
+class NNTPv1Handler:
+ """A handler for RFC 977"""
+
+ welcome = "200 NNTP mock server"
+
+ def start(self, readline, push_data):
+ self.in_body = False
+ self.allow_posting = True
+ self._readline = readline
+ self._push_data = push_data
+ # Our welcome
+ self.handle_welcome()
+
+ def _decode(self, data):
+ return str(data, "utf-8", "surrogateescape")
+
+ def process_pending(self):
+ if self.in_body:
+ while True:
+ line = self._readline()
+ if not line:
+ return
+ self.body.append(line)
+ if line == b".\r\n":
+ break
+ try:
+ meth, tokens = self.body_callback
+ meth(*tokens, body=self.body)
+ finally:
+ self.body_callback = None
+ self.body = None
+ self.in_body = False
+ while True:
+ line = self._decode(self._readline())
+ if not line:
+ return
+ if not line.endswith("\r\n"):
+ raise ValueError("line doesn't end with \\r\\n: {!r}".format(line))
+ line = line[:-2]
+ cmd, *tokens = line.split()
+ #meth = getattr(self.handler, "handle_" + cmd.upper(), None)
+ meth = getattr(self, "handle_" + cmd.upper(), None)
+ if meth is None:
+ self.handle_unknown()
+ else:
+ try:
+ meth(*tokens)
+ except Exception as e:
+ raise ValueError("command failed: {!r}".format(line)) from e
+ else:
+ if self.in_body:
+ self.body_callback = meth, tokens
+ self.body = []
+
+ def expect_body(self):
+ """Flag that the client is expected to post a request body"""
+ self.in_body = True
+
+ def push_data(self, data):
+ """Push some binary data"""
+ self._push_data(data)
+
+ def push_lit(self, lit):
+ """Push a string literal"""
+ lit = textwrap.dedent(lit)
+ lit = "\r\n".join(lit.splitlines()) + "\r\n"
+ lit = lit.encode('utf-8')
+ self.push_data(lit)
+
+ def handle_unknown(self):
+ self.push_lit("500 What?")
+
+ def handle_welcome(self):
+ self.push_lit(self.welcome)
+
+ def handle_QUIT(self):
+ self.push_lit("205 Bye!")
+
+ def handle_DATE(self):
+ self.push_lit("111 20100914001155")
+
+ def handle_GROUP(self, group):
+ if group == "fr.comp.lang.python":
+ self.push_lit("211 486 761 1265 fr.comp.lang.python")
+ else:
+ self.push_lit("411 No such group {}".format(group))
+
+ def handle_HELP(self):
+ self.push_lit("""\
+ 100 Legal commands
+ authinfo user Name|pass Password|generic <prog> <args>
+ date
+ help
+ Report problems to <root@example.org>
+ .""")
+
+ def handle_STAT(self, message_spec=None):
+ if message_spec is None:
+ self.push_lit("412 No newsgroup selected")
+ elif message_spec == "3000234":
+ self.push_lit("223 3000234 <45223423@example.com>")
+ elif message_spec == "<45223423@example.com>":
+ self.push_lit("223 0 <45223423@example.com>")
+ else:
+ self.push_lit("430 No Such Article Found")
+
+ def handle_NEXT(self):
+ self.push_lit("223 3000237 <668929@example.org> retrieved")
+
+ def handle_LAST(self):
+ self.push_lit("223 3000234 <45223423@example.com> retrieved")
+
+ def handle_LIST(self, action=None, param=None):
+ if action is None:
+ self.push_lit("""\
+ 215 Newsgroups in form "group high low flags".
+ comp.lang.python 0000052340 0000002828 y
+ comp.lang.python.announce 0000001153 0000000993 m
+ free.it.comp.lang.python 0000000002 0000000002 y
+ fr.comp.lang.python 0000001254 0000000760 y
+ free.it.comp.lang.python.learner 0000000000 0000000001 y
+ tw.bbs.comp.lang.python 0000000304 0000000304 y
+ .""")
+ elif action == "OVERVIEW.FMT":
+ self.push_lit("""\
+ 215 Order of fields in overview database.
+ Subject:
+ From:
+ Date:
+ Message-ID:
+ References:
+ Bytes:
+ Lines:
+ Xref:full
+ .""")
+ elif action == "NEWSGROUPS":
+ assert param is not None
+ if param == "comp.lang.python":
+ self.push_lit("""\
+ 215 Descriptions in form "group description".
+ comp.lang.python\tThe Python computer language.
+ .""")
+ elif param == "comp.lang.python*":
+ self.push_lit("""\
+ 215 Descriptions in form "group description".
+ comp.lang.python.announce\tAnnouncements about the Python language. (Moderated)
+ comp.lang.python\tThe Python computer language.
+ .""")
+ else:
+ self.push_lit("""\
+ 215 Descriptions in form "group description".
+ .""")
+ else:
+ self.push_lit('501 Unknown LIST keyword')
+
+ def handle_NEWNEWS(self, group, date_str, time_str):
+ # We hard code different return messages depending on passed
+ # argument and date syntax.
+ if (group == "comp.lang.python" and date_str == "20100913"
+ and time_str == "082004"):
+ # Date was passed in RFC 3977 format (NNTP "v2")
+ self.push_lit("""\
+ 230 list of newsarticles (NNTP v2) created after Mon Sep 13 08:20:04 2010 follows
+ <a4929a40-6328-491a-aaaf-cb79ed7309a2@q2g2000vbk.googlegroups.com>
+ <f30c0419-f549-4218-848f-d7d0131da931@y3g2000vbm.googlegroups.com>
+ .""")
+ elif (group == "comp.lang.python" and date_str == "100913"
+ and time_str == "082004"):
+ # Date was passed in RFC 977 format (NNTP "v1")
+ self.push_lit("""\
+ 230 list of newsarticles (NNTP v1) created after Mon Sep 13 08:20:04 2010 follows
+ <a4929a40-6328-491a-aaaf-cb79ed7309a2@q2g2000vbk.googlegroups.com>
+ <f30c0419-f549-4218-848f-d7d0131da931@y3g2000vbm.googlegroups.com>
+ .""")
+ else:
+ self.push_lit("""\
+ 230 An empty list of newsarticles follows
+ .""")
+ # (Note for experiments: many servers disable NEWNEWS.
+ # As of this writing, sicinfo3.epfl.ch doesn't.)
+
+ def handle_XOVER(self, message_spec):
+ if message_spec == "57-59":
+ self.push_lit(
+ "224 Overview information for 57-58 follows\n"
+ "57\tRe: ANN: New Plone book with strong Python (and Zope) themes throughout"
+ "\tDoug Hellmann <doug.hellmann-Re5JQEeQqe8AvxtiuMwx3w@public.gmane.org>"
+ "\tSat, 19 Jun 2010 18:04:08 -0400"
+ "\t<4FD05F05-F98B-44DC-8111-C6009C925F0C@gmail.com>"
+ "\t<hvalf7$ort$1@dough.gmane.org>\t7103\t16"
+ "\tXref: news.gmane.org gmane.comp.python.authors:57"
+ "\n"
+ "58\tLooking for a few good bloggers"
+ "\tDoug Hellmann <doug.hellmann-Re5JQEeQqe8AvxtiuMwx3w@public.gmane.org>"
+ "\tThu, 22 Jul 2010 09:14:14 -0400"
+ "\t<A29863FA-F388-40C3-AA25-0FD06B09B5BF@gmail.com>"
+ "\t\t6683\t16"
+ "\tXref: news.gmane.org gmane.comp.python.authors:58"
+ "\n"
+ # An UTF-8 overview line from fr.comp.lang.python
+ "59\tRe: Message d'erreur incompréhensible (par moi)"
+ "\tEric Brunel <eric.brunel@pragmadev.nospam.com>"
+ "\tWed, 15 Sep 2010 18:09:15 +0200"
+ "\t<eric.brunel-2B8B56.18091515092010@news.wanadoo.fr>"
+ "\t<4c90ec87$0$32425$ba4acef3@reader.news.orange.fr>\t1641\t27"
+ "\tXref: saria.nerim.net fr.comp.lang.python:1265"
+ "\n"
+ ".\n")
+ else:
+ self.push_lit("""\
+ 224 No articles
+ .""")
+
+ def handle_POST(self, *, body=None):
+ if body is None:
+ if self.allow_posting:
+ self.push_lit("340 Input article; end with <CR-LF>.<CR-LF>")
+ self.expect_body()
+ else:
+ self.push_lit("440 Posting not permitted")
+ else:
+ assert self.allow_posting
+ self.push_lit("240 Article received OK")
+ self.posted_body = body
+
+ def handle_IHAVE(self, message_id, *, body=None):
+ if body is None:
+ if (self.allow_posting and
+ message_id == "<i.am.an.article.you.will.want@example.com>"):
+ self.push_lit("335 Send it; end with <CR-LF>.<CR-LF>")
+ self.expect_body()
+ else:
+ self.push_lit("435 Article not wanted")
+ else:
+ assert self.allow_posting
+ self.push_lit("235 Article transferred OK")
+ self.posted_body = body
+
+ sample_head = """\
+ From: "Demo User" <nobody@example.net>
+ Subject: I am just a test article
+ Content-Type: text/plain; charset=UTF-8; format=flowed
+ Message-ID: <i.am.an.article.you.will.want@example.com>"""
+
+ sample_body = """\
+ This is just a test article.
+ ..Here is a dot-starting line.
+
+ -- Signed by Andr\xe9."""
+
+ sample_article = sample_head + "\n\n" + sample_body
+
+ def handle_ARTICLE(self, message_spec=None):
+ if message_spec is None:
+ self.push_lit("220 3000237 <45223423@example.com>")
+ elif message_spec == "<45223423@example.com>":
+ self.push_lit("220 0 <45223423@example.com>")
+ elif message_spec == "3000234":
+ self.push_lit("220 3000234 <45223423@example.com>")
+ else:
+ self.push_lit("430 No Such Article Found")
+ return
+ self.push_lit(self.sample_article)
+ self.push_lit(".")
+
+ def handle_HEAD(self, message_spec=None):
+ if message_spec is None:
+ self.push_lit("221 3000237 <45223423@example.com>")
+ elif message_spec == "<45223423@example.com>":
+ self.push_lit("221 0 <45223423@example.com>")
+ elif message_spec == "3000234":
+ self.push_lit("221 3000234 <45223423@example.com>")
+ else:
+ self.push_lit("430 No Such Article Found")
+ return
+ self.push_lit(self.sample_head)
+ self.push_lit(".")
+
+ def handle_BODY(self, message_spec=None):
+ if message_spec is None:
+ self.push_lit("222 3000237 <45223423@example.com>")
+ elif message_spec == "<45223423@example.com>":
+ self.push_lit("222 0 <45223423@example.com>")
+ elif message_spec == "3000234":
+ self.push_lit("222 3000234 <45223423@example.com>")
+ else:
+ self.push_lit("430 No Such Article Found")
+ return
+ self.push_lit(self.sample_body)
+ self.push_lit(".")
+
+
+class NNTPv2Handler(NNTPv1Handler):
+ """A handler for RFC 3977 (NNTP "v2")"""
+
+ def handle_CAPABILITIES(self):
+ self.push_lit("""\
+ 101 Capability list:
+ VERSION 2
+ IMPLEMENTATION INN 2.5.1
+ AUTHINFO USER
+ HDR
+ LIST ACTIVE ACTIVE.TIMES DISTRIB.PATS HEADERS NEWSGROUPS OVERVIEW.FMT
+ OVER
+ POST
+ READER
+ .""")
+
+ def handle_OVER(self, message_spec=None):
+ return self.handle_XOVER(message_spec)
+
+
+class NNTPv1v2TestsMixin:
+
+ def setUp(self):
+ super().setUp()
+
+ def test_welcome(self):
+ self.assertEqual(self.server.welcome, self.handler.welcome)
+
+ def test_date(self):
+ resp, date = self.server.date()
+ self.assertEqual(resp, "111 20100914001155")
+ self.assertEqual(date, datetime.datetime(2010, 9, 14, 0, 11, 55))
+
+ def test_quit(self):
+ self.assertFalse(self.sio.closed)
+ resp = self.server.quit()
+ self.assertEqual(resp, "205 Bye!")
+ self.assertTrue(self.sio.closed)
+
+ def test_help(self):
+ resp, help = self.server.help()
+ self.assertEqual(resp, "100 Legal commands")
+ self.assertEqual(help, [
+ ' authinfo user Name|pass Password|generic <prog> <args>',
+ ' date',
+ ' help',
+ 'Report problems to <root@example.org>',
+ ])
+
+ def test_list(self):
+ resp, groups = self.server.list()
+ self.assertEqual(len(groups), 6)
+ g = groups[1]
+ self.assertEqual(g,
+ GroupInfo("comp.lang.python.announce", "0000001153",
+ "0000000993", "m"))
+
+ def test_stat(self):
+ resp, art_num, message_id = self.server.stat(3000234)
+ self.assertEqual(resp, "223 3000234 <45223423@example.com>")
+ self.assertEqual(art_num, 3000234)
+ self.assertEqual(message_id, "<45223423@example.com>")
+ resp, art_num, message_id = self.server.stat("<45223423@example.com>")
+ self.assertEqual(resp, "223 0 <45223423@example.com>")
+ self.assertEqual(art_num, 0)
+ self.assertEqual(message_id, "<45223423@example.com>")
+ with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
+ self.server.stat("<non.existent.id>")
+ self.assertEqual(cm.exception.response, "430 No Such Article Found")
+ with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
+ self.server.stat()
+ self.assertEqual(cm.exception.response, "412 No newsgroup selected")
+
+ def test_next(self):
+ resp, art_num, message_id = self.server.next()
+ self.assertEqual(resp, "223 3000237 <668929@example.org> retrieved")
+ self.assertEqual(art_num, 3000237)
+ self.assertEqual(message_id, "<668929@example.org>")
+
+ def test_last(self):
+ resp, art_num, message_id = self.server.last()
+ self.assertEqual(resp, "223 3000234 <45223423@example.com> retrieved")
+ self.assertEqual(art_num, 3000234)
+ self.assertEqual(message_id, "<45223423@example.com>")
+
+ def test_description(self):
+ desc = self.server.description("comp.lang.python")
+ self.assertEqual(desc, "The Python computer language.")
+ desc = self.server.description("comp.lang.pythonx")
+ self.assertEqual(desc, "")
+
+ def test_descriptions(self):
+ resp, groups = self.server.descriptions("comp.lang.python")
+ self.assertEqual(resp, '215 Descriptions in form "group description".')
+ self.assertEqual(groups, {
+ "comp.lang.python": "The Python computer language.",
+ })
+ resp, groups = self.server.descriptions("comp.lang.python*")
+ self.assertEqual(groups, {
+ "comp.lang.python": "The Python computer language.",
+ "comp.lang.python.announce": "Announcements about the Python language. (Moderated)",
+ })
+ resp, groups = self.server.descriptions("comp.lang.pythonx")
+ self.assertEqual(groups, {})
+
+ def test_group(self):
+ resp, count, first, last, group = self.server.group("fr.comp.lang.python")
+ self.assertTrue(resp.startswith("211 "), resp)
+ self.assertEqual(first, 761)
+ self.assertEqual(last, 1265)
+ self.assertEqual(count, 486)
+ self.assertEqual(group, "fr.comp.lang.python")
+ with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
+ self.server.group("comp.lang.python.devel")
+ exc = cm.exception
+ self.assertTrue(exc.response.startswith("411 No such group"),
+ exc.response)
+
+ def test_newnews(self):
+ # NEWNEWS comp.lang.python [20]100913 082004
+ dt = datetime.datetime(2010, 9, 13, 8, 20, 4)
+ resp, ids = self.server.newnews("comp.lang.python", dt)
+ expected = (
+ "230 list of newsarticles (NNTP v{0}) "
+ "created after Mon Sep 13 08:20:04 2010 follows"
+ ).format(self.nntp_version)
+ self.assertEqual(resp, expected)
+ self.assertEqual(ids, [
+ "<a4929a40-6328-491a-aaaf-cb79ed7309a2@q2g2000vbk.googlegroups.com>",
+ "<f30c0419-f549-4218-848f-d7d0131da931@y3g2000vbm.googlegroups.com>",
+ ])
+ # NEWNEWS fr.comp.lang.python [20]100913 082004
+ dt = datetime.datetime(2010, 9, 13, 8, 20, 4)
+ resp, ids = self.server.newnews("fr.comp.lang.python", dt)
+ self.assertEqual(resp, "230 An empty list of newsarticles follows")
+ self.assertEqual(ids, [])
+
+ def _check_article_body(self, lines):
+ self.assertEqual(len(lines), 4)
+ self.assertEqual(lines[-1].decode('utf8'), "-- Signed by André.")
+ self.assertEqual(lines[-2], b"")
+ self.assertEqual(lines[-3], b".Here is a dot-starting line.")
+ self.assertEqual(lines[-4], b"This is just a test article.")
+
+ def _check_article_head(self, lines):
+ self.assertEqual(len(lines), 4)
+ self.assertEqual(lines[0], b'From: "Demo User" <nobody@example.net>')
+ self.assertEqual(lines[3], b"Message-ID: <i.am.an.article.you.will.want@example.com>")
+
+ def _check_article_data(self, lines):
+ self.assertEqual(len(lines), 9)
+ self._check_article_head(lines[:4])
+ self._check_article_body(lines[-4:])
+ self.assertEqual(lines[4], b"")
+
+ def test_article(self):
+ # ARTICLE
+ resp, info = self.server.article()
+ self.assertEqual(resp, "220 3000237 <45223423@example.com>")
+ art_num, message_id, lines = info
+ self.assertEqual(art_num, 3000237)
+ self.assertEqual(message_id, "<45223423@example.com>")
+ self._check_article_data(lines)
+ # ARTICLE num
+ resp, info = self.server.article(3000234)
+ self.assertEqual(resp, "220 3000234 <45223423@example.com>")
+ art_num, message_id, lines = info
+ self.assertEqual(art_num, 3000234)
+ self.assertEqual(message_id, "<45223423@example.com>")
+ self._check_article_data(lines)
+ # ARTICLE id
+ resp, info = self.server.article("<45223423@example.com>")
+ self.assertEqual(resp, "220 0 <45223423@example.com>")
+ art_num, message_id, lines = info
+ self.assertEqual(art_num, 0)
+ self.assertEqual(message_id, "<45223423@example.com>")
+ self._check_article_data(lines)
+ # Non-existent id
+ with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
+ self.server.article("<non-existent@example.com>")
+ self.assertEqual(cm.exception.response, "430 No Such Article Found")
+
+ def test_article_file(self):
+ # With a "file" argument
+ f = io.BytesIO()
+ resp, info = self.server.article(file=f)
+ self.assertEqual(resp, "220 3000237 <45223423@example.com>")
+ art_num, message_id, lines = info
+ self.assertEqual(art_num, 3000237)
+ self.assertEqual(message_id, "<45223423@example.com>")
+ self.assertEqual(lines, [])
+ data = f.getvalue()
+ self.assertTrue(data.startswith(
+ b'From: "Demo User" <nobody@example.net>\r\n'
+ b'Subject: I am just a test article\r\n'
+ ), ascii(data))
+ self.assertTrue(data.endswith(
+ b'This is just a test article.\r\n'
+ b'.Here is a dot-starting line.\r\n'
+ b'\r\n'
+ b'-- Signed by Andr\xc3\xa9.\r\n'
+ ), ascii(data))
+
+ def test_head(self):
+ # HEAD
+ resp, info = self.server.head()
+ self.assertEqual(resp, "221 3000237 <45223423@example.com>")
+ art_num, message_id, lines = info
+ self.assertEqual(art_num, 3000237)
+ self.assertEqual(message_id, "<45223423@example.com>")
+ self._check_article_head(lines)
+ # HEAD num
+ resp, info = self.server.head(3000234)
+ self.assertEqual(resp, "221 3000234 <45223423@example.com>")
+ art_num, message_id, lines = info
+ self.assertEqual(art_num, 3000234)
+ self.assertEqual(message_id, "<45223423@example.com>")
+ self._check_article_head(lines)
+ # HEAD id
+ resp, info = self.server.head("<45223423@example.com>")
+ self.assertEqual(resp, "221 0 <45223423@example.com>")
+ art_num, message_id, lines = info
+ self.assertEqual(art_num, 0)
+ self.assertEqual(message_id, "<45223423@example.com>")
+ self._check_article_head(lines)
+ # Non-existent id
+ with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
+ self.server.head("<non-existent@example.com>")
+ self.assertEqual(cm.exception.response, "430 No Such Article Found")
+
+ def test_body(self):
+ # BODY
+ resp, info = self.server.body()
+ self.assertEqual(resp, "222 3000237 <45223423@example.com>")
+ art_num, message_id, lines = info
+ self.assertEqual(art_num, 3000237)
+ self.assertEqual(message_id, "<45223423@example.com>")
+ self._check_article_body(lines)
+ # BODY num
+ resp, info = self.server.body(3000234)
+ self.assertEqual(resp, "222 3000234 <45223423@example.com>")
+ art_num, message_id, lines = info
+ self.assertEqual(art_num, 3000234)
+ self.assertEqual(message_id, "<45223423@example.com>")
+ self._check_article_body(lines)
+ # BODY id
+ resp, info = self.server.body("<45223423@example.com>")
+ self.assertEqual(resp, "222 0 <45223423@example.com>")
+ art_num, message_id, lines = info
+ self.assertEqual(art_num, 0)
+ self.assertEqual(message_id, "<45223423@example.com>")
+ self._check_article_body(lines)
+ # Non-existent id
+ with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
+ self.server.body("<non-existent@example.com>")
+ self.assertEqual(cm.exception.response, "430 No Such Article Found")
+
+ def check_over_xover_resp(self, resp, overviews):
+ self.assertTrue(resp.startswith("224 "), resp)
+ self.assertEqual(len(overviews), 3)
+ art_num, over = overviews[0]
+ self.assertEqual(art_num, 57)
+ self.assertEqual(over, {
+ "from": "Doug Hellmann <doug.hellmann-Re5JQEeQqe8AvxtiuMwx3w@public.gmane.org>",
+ "subject": "Re: ANN: New Plone book with strong Python (and Zope) themes throughout",
+ "date": "Sat, 19 Jun 2010 18:04:08 -0400",
+ "message-id": "<4FD05F05-F98B-44DC-8111-C6009C925F0C@gmail.com>",
+ "references": "<hvalf7$ort$1@dough.gmane.org>",
+ ":bytes": "7103",
+ ":lines": "16",
+ "xref": "news.gmane.org gmane.comp.python.authors:57"
+ })
+ art_num, over = overviews[2]
+ self.assertEqual(over["subject"],
+ "Re: Message d'erreur incompréhensible (par moi)")
+
+ def test_xover(self):
+ resp, overviews = self.server.xover(57, 59)
+ self.check_over_xover_resp(resp, overviews)
+
+ def test_over(self):
+ # In NNTP "v1", this will fallback on XOVER
+ resp, overviews = self.server.over((57, 59))
+ self.check_over_xover_resp(resp, overviews)
+
+ sample_post = (
+ b'From: "Demo User" <nobody@example.net>\r\n'
+ b'Subject: I am just a test article\r\n'
+ b'Content-Type: text/plain; charset=UTF-8; format=flowed\r\n'
+ b'Message-ID: <i.am.an.article.you.will.want@example.com>\r\n'
+ b'\r\n'
+ b'This is just a test article.\r\n'
+ b'.Here is a dot-starting line.\r\n'
+ b'\r\n'
+ b'-- Signed by Andr\xc3\xa9.\r\n'
+ )
+
+ def _check_posted_body(self):
+ # Check the raw body as received by the server
+ lines = self.handler.posted_body
+ # One additional line for the "." terminator
+ self.assertEqual(len(lines), 10)
+ self.assertEqual(lines[-1], b'.\r\n')
+ self.assertEqual(lines[-2], b'-- Signed by Andr\xc3\xa9.\r\n')
+ self.assertEqual(lines[-3], b'\r\n')
+ self.assertEqual(lines[-4], b'..Here is a dot-starting line.\r\n')
+ self.assertEqual(lines[0], b'From: "Demo User" <nobody@example.net>\r\n')
+
+ def _check_post_ihave_sub(self, func, *args, file_factory):
+ # First the prepared post with CRLF endings
+ post = self.sample_post
+ func_args = args + (file_factory(post),)
+ self.handler.posted_body = None
+ resp = func(*func_args)
+ self._check_posted_body()
+ # Then the same post with "normal" line endings - they should be
+ # converted by NNTP.post and NNTP.ihave.
+ post = self.sample_post.replace(b"\r\n", b"\n")
+ func_args = args + (file_factory(post),)
+ self.handler.posted_body = None
+ resp = func(*func_args)
+ self._check_posted_body()
+ return resp
+
+ def check_post_ihave(self, func, success_resp, *args):
+ # With a bytes object
+ resp = self._check_post_ihave_sub(func, *args, file_factory=bytes)
+ self.assertEqual(resp, success_resp)
+ # With a bytearray object
+ resp = self._check_post_ihave_sub(func, *args, file_factory=bytearray)
+ self.assertEqual(resp, success_resp)
+ # With a file object
+ resp = self._check_post_ihave_sub(func, *args, file_factory=io.BytesIO)
+ self.assertEqual(resp, success_resp)
+ # With an iterable of terminated lines
+ def iterlines(b):
+ return iter(b.splitlines(True))
+ resp = self._check_post_ihave_sub(func, *args, file_factory=iterlines)
+ self.assertEqual(resp, success_resp)
+ # With an iterable of non-terminated lines
+ def iterlines(b):
+ return iter(b.splitlines(False))
+ resp = self._check_post_ihave_sub(func, *args, file_factory=iterlines)
+ self.assertEqual(resp, success_resp)
+
+ def test_post(self):
+ self.check_post_ihave(self.server.post, "240 Article received OK")
+ self.handler.allow_posting = False
+ with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
+ self.server.post(self.sample_post)
+ self.assertEqual(cm.exception.response,
+ "440 Posting not permitted")
+
+ def test_ihave(self):
+ self.check_post_ihave(self.server.ihave, "235 Article transferred OK",
+ "<i.am.an.article.you.will.want@example.com>")
+ with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
+ self.server.ihave("<another.message.id>", self.sample_post)
+ self.assertEqual(cm.exception.response,
+ "435 Article not wanted")
+
+
+class NNTPv1Tests(NNTPv1v2TestsMixin, MockedNNTPTestsMixin, unittest.TestCase):
+ """Tests an NNTP v1 server (no capabilities)."""
+
+ nntp_version = 1
+ handler_class = NNTPv1Handler
+
+ def test_caps(self):
+ caps = self.server.getcapabilities()
+ self.assertEqual(caps, {})
+ self.assertEqual(self.server.nntp_version, 1)
+
+
+class NNTPv2Tests(NNTPv1v2TestsMixin, MockedNNTPTestsMixin, unittest.TestCase):
+ """Tests an NNTP v2 server (with capabilities)."""
+
+ nntp_version = 2
+ handler_class = NNTPv2Handler
+
+ def test_caps(self):
+ caps = self.server.getcapabilities()
+ self.assertEqual(caps, {
+ 'VERSION': ['2'],
+ 'IMPLEMENTATION': ['INN', '2.5.1'],
+ 'AUTHINFO': ['USER'],
+ 'HDR': [],
+ 'LIST': ['ACTIVE', 'ACTIVE.TIMES', 'DISTRIB.PATS',
+ 'HEADERS', 'NEWSGROUPS', 'OVERVIEW.FMT'],
+ 'OVER': [],
+ 'POST': [],
+ 'READER': [],
+ })
+ self.assertEqual(self.server.nntp_version, 2)
+
+
+class MiscTests(unittest.TestCase):
+
+ def test_decode_header(self):
+ def gives(a, b):
+ self.assertEqual(nntplib.decode_header(a), b)
+ gives("" , "")
+ gives("a plain header", "a plain header")
+ gives(" with extra spaces ", " with extra spaces ")
+ gives("=?ISO-8859-15?Q?D=E9buter_en_Python?=", "Débuter en Python")
+ gives("=?utf-8?q?Re=3A_=5Bsqlite=5D_probl=C3=A8me_avec_ORDER_BY_sur_des_cha?="
+ " =?utf-8?q?=C3=AEnes_de_caract=C3=A8res_accentu=C3=A9es?=",
+ "Re: [sqlite] problème avec ORDER BY sur des chaînes de caractères accentuées")
+ gives("Re: =?UTF-8?B?cHJvYmzDqG1lIGRlIG1hdHJpY2U=?=",
+ "Re: problème de matrice")
+ # A natively utf-8 header (found in the real world!)
+ gives("Re: Message d'erreur incompréhensible (par moi)",
+ "Re: Message d'erreur incompréhensible (par moi)")
+
+ def test_parse_overview_fmt(self):
+ # The minimal (default) response
+ lines = ["Subject:", "From:", "Date:", "Message-ID:",
+ "References:", ":bytes", ":lines"]
+ self.assertEqual(nntplib._parse_overview_fmt(lines),
+ ["subject", "from", "date", "message-id", "references",
+ ":bytes", ":lines"])
+ # The minimal response using alternative names
+ lines = ["Subject:", "From:", "Date:", "Message-ID:",
+ "References:", "Bytes:", "Lines:"]
+ self.assertEqual(nntplib._parse_overview_fmt(lines),
+ ["subject", "from", "date", "message-id", "references",
+ ":bytes", ":lines"])
+ # Variations in casing
+ lines = ["subject:", "FROM:", "DaTe:", "message-ID:",
+ "References:", "BYTES:", "Lines:"]
+ self.assertEqual(nntplib._parse_overview_fmt(lines),
+ ["subject", "from", "date", "message-id", "references",
+ ":bytes", ":lines"])
+ # First example from RFC 3977
+ lines = ["Subject:", "From:", "Date:", "Message-ID:",
+ "References:", ":bytes", ":lines", "Xref:full",
+ "Distribution:full"]
+ self.assertEqual(nntplib._parse_overview_fmt(lines),
+ ["subject", "from", "date", "message-id", "references",
+ ":bytes", ":lines", "xref", "distribution"])
+ # Second example from RFC 3977
+ lines = ["Subject:", "From:", "Date:", "Message-ID:",
+ "References:", "Bytes:", "Lines:", "Xref:FULL",
+ "Distribution:FULL"]
+ self.assertEqual(nntplib._parse_overview_fmt(lines),
+ ["subject", "from", "date", "message-id", "references",
+ ":bytes", ":lines", "xref", "distribution"])
+ # A classic response from INN
+ lines = ["Subject:", "From:", "Date:", "Message-ID:",
+ "References:", "Bytes:", "Lines:", "Xref:full"]
+ self.assertEqual(nntplib._parse_overview_fmt(lines),
+ ["subject", "from", "date", "message-id", "references",
+ ":bytes", ":lines", "xref"])
+
+ def test_parse_overview(self):
+ fmt = nntplib._DEFAULT_OVERVIEW_FMT + ["xref"]
+ # First example from RFC 3977
+ lines = [
+ '3000234\tI am just a test article\t"Demo User" '
+ '<nobody@example.com>\t6 Oct 1998 04:38:40 -0500\t'
+ '<45223423@example.com>\t<45454@example.net>\t1234\t'
+ '17\tXref: news.example.com misc.test:3000363',
+ ]
+ overview = nntplib._parse_overview(lines, fmt)
+ (art_num, fields), = overview
+ self.assertEqual(art_num, 3000234)
+ self.assertEqual(fields, {
+ 'subject': 'I am just a test article',
+ 'from': '"Demo User" <nobody@example.com>',
+ 'date': '6 Oct 1998 04:38:40 -0500',
+ 'message-id': '<45223423@example.com>',
+ 'references': '<45454@example.net>',
+ ':bytes': '1234',
+ ':lines': '17',
+ 'xref': 'news.example.com misc.test:3000363',
+ })
+
+ def test_parse_datetime(self):
+ def gives(a, b, *c):
+ self.assertEqual(nntplib._parse_datetime(a, b),
+ datetime.datetime(*c))
+ # Output of DATE command
+ gives("19990623135624", None, 1999, 6, 23, 13, 56, 24)
+ # Variations
+ gives("19990623", "135624", 1999, 6, 23, 13, 56, 24)
+ gives("990623", "135624", 1999, 6, 23, 13, 56, 24)
+ gives("090623", "135624", 2009, 6, 23, 13, 56, 24)
+
+ def test_unparse_datetime(self):
+ # Test non-legacy mode
+ # 1) with a datetime
+ def gives(y, M, d, h, m, s, date_str, time_str):
+ dt = datetime.datetime(y, M, d, h, m, s)
+ self.assertEqual(nntplib._unparse_datetime(dt),
+ (date_str, time_str))
+ self.assertEqual(nntplib._unparse_datetime(dt, False),
+ (date_str, time_str))
+ gives(1999, 6, 23, 13, 56, 24, "19990623", "135624")
+ gives(2000, 6, 23, 13, 56, 24, "20000623", "135624")
+ gives(2010, 6, 5, 1, 2, 3, "20100605", "010203")
+ # 2) with a date
+ def gives(y, M, d, date_str, time_str):
+ dt = datetime.date(y, M, d)
+ self.assertEqual(nntplib._unparse_datetime(dt),
+ (date_str, time_str))
+ self.assertEqual(nntplib._unparse_datetime(dt, False),
+ (date_str, time_str))
+ gives(1999, 6, 23, "19990623", "000000")
+ gives(2000, 6, 23, "20000623", "000000")
+ gives(2010, 6, 5, "20100605", "000000")
+
+ def test_unparse_datetime_legacy(self):
+ # Test legacy mode (RFC 977)
+ # 1) with a datetime
+ def gives(y, M, d, h, m, s, date_str, time_str):
+ dt = datetime.datetime(y, M, d, h, m, s)
+ self.assertEqual(nntplib._unparse_datetime(dt, True),
+ (date_str, time_str))
+ gives(1999, 6, 23, 13, 56, 24, "990623", "135624")
+ gives(2000, 6, 23, 13, 56, 24, "000623", "135624")
+ gives(2010, 6, 5, 1, 2, 3, "100605", "010203")
+ # 2) with a date
+ def gives(y, M, d, date_str, time_str):
+ dt = datetime.date(y, M, d)
+ self.assertEqual(nntplib._unparse_datetime(dt, True),
+ (date_str, time_str))
+ gives(1999, 6, 23, "990623", "000000")
+ gives(2000, 6, 23, "000623", "000000")
+ gives(2010, 6, 5, "100605", "000000")
+
+
+def test_main():
+ support.run_unittest(MiscTests, NNTPv1Tests, NNTPv2Tests,
+ NetworkedNNTPTests
+ )
+
+
+if __name__ == "__main__":
+ test_main()