summaryrefslogtreecommitdiffstats
path: root/Lib
diff options
context:
space:
mode:
Diffstat (limited to 'Lib')
-rw-r--r--Lib/nntplib.py259
-rw-r--r--Lib/test/test_nntplib.py100
2 files changed, 259 insertions, 100 deletions
diff --git a/Lib/nntplib.py b/Lib/nntplib.py
index a09c065..79611b3 100644
--- a/Lib/nntplib.py
+++ b/Lib/nntplib.py
@@ -69,6 +69,13 @@ import collections
import datetime
import warnings
+try:
+ import ssl
+except ImportError:
+ _have_ssl = False
+else:
+ _have_ssl = True
+
from email.header import decode_header as _email_decode_header
from socket import _GLOBAL_DEFAULT_TIMEOUT
@@ -111,7 +118,7 @@ class NNTPDataError(NNTPError):
# Standard port used by NNTP servers
NNTP_PORT = 119
-
+NNTP_SSL_PORT = 563
# Response numbers that are followed by additional text (e.g. article)
_LONGRESP = {
@@ -263,6 +270,23 @@ def _unparse_datetime(dt, legacy=False):
return date_str, time_str
+if _have_ssl:
+
+ def _encrypt_on(sock, context):
+ """Wrap a socket in SSL/TLS. Arguments:
+ - sock: Socket to wrap
+ - context: SSL context to use for the encrypted connection
+ Returns:
+ - sock: New, encrypted socket.
+ """
+ # Generate a default SSL context if none was passed.
+ if context is None:
+ context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
+ # SSLv2 considered harmful.
+ context.options |= ssl.OP_NO_SSLv2
+ return context.wrap_socket(sock)
+
+
# The classes themselves
class _NNTPBase:
# UTF-8 is the character set for all NNTP commands and responses: they
@@ -280,18 +304,13 @@ class _NNTPBase:
encoding = 'utf-8'
errors = 'surrogateescape'
- def __init__(self, file, host, user=None, password=None,
- readermode=None, usenetrc=True,
- timeout=_GLOBAL_DEFAULT_TIMEOUT):
+ def __init__(self, file, host,
+ readermode=None, timeout=_GLOBAL_DEFAULT_TIMEOUT):
"""Initialize an instance. Arguments:
- file: file-like object (open for read/write in binary mode)
- host: hostname of the server (used if `usenetrc` is True)
- - user: username to authenticate with
- - password: password to use with username
- readermode: if true, send 'mode reader' command after
connecting.
- - usenetrc: allow loading username and password from ~/.netrc file
- if not specified explicitly
- timeout: timeout (in seconds) used for socket connections
readermode is sometimes necessary if you are connecting to an
@@ -300,74 +319,32 @@ class _NNTPBase:
unexpected NNTPPermanentErrors, you might need to set
readermode.
"""
+ self.host = host
self.file = file
self.debugging = 0
self.welcome = self._getresp()
- # 'mode reader' is sometimes necessary to enable 'reader' mode.
- # However, the order in which 'mode reader' and 'authinfo' need to
- # arrive differs between some NNTP servers. Try to send
- # 'mode reader', and if it fails with an authorization failed
- # error, try again after sending authinfo.
- readermode_afterauth = 0
+ # 'MODE READER' is sometimes necessary to enable 'reader' mode.
+ # However, the order in which 'MODE READER' and 'AUTHINFO' need to
+ # arrive differs between some NNTP servers. If _setreadermode() fails
+ # with an authorization failed error, it will set this to True;
+ # the login() routine will interpret that as a request to try again
+ # after performing its normal function.
+ self.readermode_afterauth = False
if readermode:
- try:
- self.welcome = self._shortcmd('mode reader')
- except NNTPPermanentError:
- # error 500, probably 'not implemented'
- pass
- except NNTPTemporaryError as e:
- if user and e.response.startswith('480'):
- # Need authorization before 'mode reader'
- readermode_afterauth = 1
- else:
- raise
- # If no login/password was specified, try to get them from ~/.netrc
- # Presume that if .netc has an entry, NNRP authentication is required.
- try:
- if usenetrc and not user:
- import netrc
- credentials = netrc.netrc()
- auth = credentials.authenticators(host)
- if auth:
- user = auth[0]
- password = auth[2]
- except IOError:
- pass
- # Perform NNTP authentication if needed.
- if user:
- resp = self._shortcmd('authinfo user '+user)
- if resp.startswith('381'):
- if not password:
- raise NNTPReplyError(resp)
- else:
- resp = self._shortcmd(
- 'authinfo pass '+password)
- if not resp.startswith('281'):
- raise NNTPPermanentError(resp)
- if readermode_afterauth:
- try:
- self.welcome = self._shortcmd('mode reader')
- except NNTPPermanentError:
- # error 500, probably 'not implemented'
- pass
-
- # Inquire about capabilities (RFC 3977)
- self.nntp_version = 1
- self.nntp_implementation = None
- try:
- resp, caps = self.capabilities()
- except NNTPPermanentError:
- # Server doesn't support capabilities
- self._caps = {}
- else:
- self._caps = caps
- if 'VERSION' in caps:
- # The server can advertise several supported versions,
- # choose the highest.
- self.nntp_version = max(map(int, caps['VERSION']))
- if 'IMPLEMENTATION' in caps:
- self.nntp_implementation = ' '.join(caps['IMPLEMENTATION'])
+ self._setreadermode()
+
+ # RFC 4642 2.2.2: Both the client and the server MUST know if there is
+ # a TLS session active. A client MUST NOT attempt to start a TLS
+ # session if a TLS session is already active.
+ self.tls_on = False
+
+ # Inquire about capabilities (RFC 3977).
+ self._caps = None
+ self.getcapabilities()
+
+ # Log in and encryption setup order is left to subclasses.
+ self.authenticated = False
def getwelcome(self):
"""Get the welcome message from the server
@@ -382,6 +359,22 @@ class _NNTPBase:
"""Get the server capabilities, as read by __init__().
If the CAPABILITIES command is not supported, an empty dict is
returned."""
+ if self._caps is None:
+ self.nntp_version = 1
+ self.nntp_implementation = None
+ try:
+ resp, caps = self.capabilities()
+ except NNTPPermanentError:
+ # Server doesn't support capabilities
+ self._caps = {}
+ else:
+ self._caps = caps
+ if 'VERSION' in caps:
+ # The server can advertise several supported versions,
+ # choose the highest.
+ self.nntp_version = max(map(int, caps['VERSION']))
+ if 'IMPLEMENTATION' in caps:
+ self.nntp_implementation = ' '.join(caps['IMPLEMENTATION'])
return self._caps
def set_debuglevel(self, level):
@@ -918,6 +911,77 @@ class _NNTPBase:
self._close()
return resp
+ def login(self, user=None, password=None, usenetrc=True):
+ if self.authenticated:
+ raise ValueError("Already logged in.")
+ if not user and not usenetrc:
+ raise ValueError(
+ "At least one of `user` and `usenetrc` must be specified")
+ # If no login/password was specified but netrc was requested,
+ # try to get them from ~/.netrc
+ # Presume that if .netrc has an entry, NNRP authentication is required.
+ try:
+ if usenetrc and not user:
+ import netrc
+ credentials = netrc.netrc()
+ auth = credentials.authenticators(self.host)
+ if auth:
+ user = auth[0]
+ password = auth[2]
+ except IOError:
+ pass
+ # Perform NNTP authentication if needed.
+ if not user:
+ return
+ resp = self._shortcmd('authinfo user ' + user)
+ if resp.startswith('381'):
+ if not password:
+ raise NNTPReplyError(resp)
+ else:
+ resp = self._shortcmd('authinfo pass ' + password)
+ if not resp.startswith('281'):
+ raise NNTPPermanentError(resp)
+ # Attempt to send mode reader if it was requested after login.
+ if self.readermode_afterauth:
+ self._setreadermode()
+
+ def _setreadermode(self):
+ try:
+ self.welcome = self._shortcmd('mode reader')
+ except NNTPPermanentError:
+ # Error 5xx, probably 'not implemented'
+ pass
+ except NNTPTemporaryError as e:
+ if e.response.startswith('480'):
+ # Need authorization before 'mode reader'
+ self.readermode_afterauth = True
+ else:
+ raise
+
+ if _have_ssl:
+ def starttls(self, context=None):
+ """Process a STARTTLS command. Arguments:
+ - context: SSL context to use for the encrypted connection
+ """
+ # Per RFC 4642, STARTTLS MUST NOT be sent after authentication or if
+ # a TLS session already exists.
+ if self.tls_on:
+ raise ValueError("TLS is already enabled.")
+ if self.authenticated:
+ raise ValueError("TLS cannot be started after authentication.")
+ resp = self._shortcmd('STARTTLS')
+ if resp.startswith('382'):
+ self.file.close()
+ self.sock = _encrypt_on(self.sock, context)
+ self.file = self.sock.makefile("rwb")
+ self.tls_on = True
+ # Capabilities may change after TLS starts up, so ask for them
+ # again.
+ self._caps = None
+ self.getcapabilities()
+ else:
+ raise NNTPError("TLS failed to start.")
+
class NNTP(_NNTPBase):
@@ -945,8 +1009,10 @@ class NNTP(_NNTPBase):
self.port = port
self.sock = socket.create_connection((host, port), timeout)
file = self.sock.makefile("rwb")
- _NNTPBase.__init__(self, file, host, user, password,
- readermode, usenetrc, timeout)
+ _NNTPBase.__init__(self, file, host,
+ readermode, timeout)
+ if user or usenetrc:
+ self.login(user, password, usenetrc)
def _close(self):
try:
@@ -955,6 +1021,33 @@ class NNTP(_NNTPBase):
self.sock.close()
+if _have_ssl:
+ class NNTP_SSL(_NNTPBase):
+
+ def __init__(self, host, port=NNTP_SSL_PORT,
+ user=None, password=None, ssl_context=None,
+ readermode=None, usenetrc=True,
+ timeout=_GLOBAL_DEFAULT_TIMEOUT):
+ """This works identically to NNTP.__init__, except for the change
+ in default port and the `ssl_context` argument for SSL connections.
+ """
+ self.sock = socket.create_connection((host, port), timeout)
+ self.sock = _encrypt_on(self.sock, ssl_context)
+ file = self.sock.makefile("rwb")
+ _NNTPBase.__init__(self, file, host,
+ readermode=readermode, timeout=timeout)
+ if user or usenetrc:
+ self.login(user, password, usenetrc)
+
+ def _close(self):
+ try:
+ _NNTPBase._close(self)
+ finally:
+ self.sock.close()
+
+ __all__.append("NNTP_SSL")
+
+
# Test retrieval when run as a script.
if __name__ == '__main__':
import argparse
@@ -966,13 +1059,27 @@ if __name__ == '__main__':
help='group to fetch messages from (default: %(default)s)')
parser.add_argument('-s', '--server', default='news.gmane.org',
help='NNTP server hostname (default: %(default)s)')
- parser.add_argument('-p', '--port', default=NNTP_PORT, type=int,
- help='NNTP port number (default: %(default)s)')
+ parser.add_argument('-p', '--port', default=-1, type=int,
+ help='NNTP port number (default: %s / %s)' % (NNTP_PORT, NNTP_SSL_PORT))
parser.add_argument('-n', '--nb-articles', default=10, type=int,
help='number of articles to fetch (default: %(default)s)')
+ parser.add_argument('-S', '--ssl', action='store_true', default=False,
+ help='use NNTP over SSL')
args = parser.parse_args()
- s = NNTP(host=args.server, port=args.port)
+ port = args.port
+ if not args.ssl:
+ if port == -1:
+ port = NNTP_PORT
+ s = NNTP(host=args.server, port=port)
+ else:
+ if port == -1:
+ port = NNTP_SSL_PORT
+ s = NNTP_SSL(host=args.server, port=port)
+
+ caps = s.getcapabilities()
+ if 'STARTTLS' in caps:
+ s.starttls()
resp, count, first, last, name = s.group(args.group)
print('Group', name, 'has', count, 'articles, range', first, 'to', last)
diff --git a/Lib/test/test_nntplib.py b/Lib/test/test_nntplib.py
index 6380084..6aa0cb5 100644
--- a/Lib/test/test_nntplib.py
+++ b/Lib/test/test_nntplib.py
@@ -4,8 +4,10 @@ import textwrap
import unittest
import contextlib
from test import support
-from nntplib import NNTP, GroupInfo
+from nntplib import NNTP, GroupInfo, _have_ssl
import nntplib
+if _have_ssl:
+ import ssl
TIMEOUT = 30
@@ -106,7 +108,7 @@ class NetworkedNNTPTestsMixin:
"references", ":bytes", ":lines"}
)
for v in art_dict.values():
- self.assertIsInstance(v, str)
+ self.assertIsInstance(v, (str, type(None)))
def test_xover(self):
resp, count, first, last, name = self.server.group(self.GROUP_NAME)
@@ -162,26 +164,19 @@ class NetworkedNNTPTestsMixin:
self.server.quit()
self.server = None
-
-class NetworkedNNTPTests(NetworkedNNTPTestsMixin, unittest.TestCase):
- NNTP_HOST = 'news.gmane.org'
- GROUP_NAME = 'gmane.comp.python.devel'
- GROUP_PAT = 'gmane.comp.python.d*'
-
- def setUp(self):
- support.requires("network")
- with support.transient_internet(self.NNTP_HOST):
- self.server = NNTP(self.NNTP_HOST, timeout=TIMEOUT, usenetrc=False)
-
- def tearDown(self):
- if self.server is not None:
- self.server.quit()
-
- # Disabled with gmane as it produces too much data
- test_list = None
+ def test_login(self):
+ baduser = "notarealuser"
+ badpw = "notarealpassword"
+ # Check that bogus credentials cause failure
+ self.assertRaises(nntplib.NNTPError, self.server.login,
+ user=baduser, password=badpw, usenetrc=False)
+ # FIXME: We should check that correct credentials succeed, but that
+ # would require valid details for some server somewhere to be in the
+ # test suite, I think. Gmane is anonymous, at least as used for the
+ # other tests.
def test_capabilities(self):
- # As of this writing, gmane implements NNTP version 2 and has a
+ # The server under test implements NNTP version 2 and has a
# couple of well-known capabilities. Just sanity check that we
# got them.
def _check_caps(caps):
@@ -194,6 +189,63 @@ class NetworkedNNTPTests(NetworkedNNTPTestsMixin, unittest.TestCase):
resp, caps = self.server.capabilities()
_check_caps(caps)
+ if _have_ssl:
+ def test_starttls(self):
+ file = self.server.file
+ sock = self.server.sock
+ try:
+ self.server.starttls()
+ except nntplib.NNTPPermanentError:
+ self.skipTest("STARTTLS not supported by server.")
+ else:
+ # Check that the socket and internal pseudo-file really were
+ # changed.
+ self.assertNotEqual(file, self.server.file)
+ self.assertNotEqual(sock, self.server.sock)
+ # Check that the new socket really is an SSL one
+ self.assertIsInstance(self.server.sock, ssl.SSLSocket)
+ # Check that trying starttls when it's already active fails.
+ self.assertRaises(ValueError, self.server.starttls)
+
+
+class NetworkedNNTPTests(NetworkedNNTPTestsMixin, unittest.TestCase):
+ # This server supports STARTTLS (gmane doesn't)
+ NNTP_HOST = 'news.trigofacile.com'
+ GROUP_NAME = 'fr.comp.lang.python'
+ GROUP_PAT = 'fr.comp.lang.*'
+
+ def setUp(self):
+ support.requires("network")
+ with support.transient_internet(self.NNTP_HOST):
+ self.server = NNTP(self.NNTP_HOST, timeout=TIMEOUT, usenetrc=False)
+
+ def tearDown(self):
+ if self.server is not None:
+ self.server.quit()
+
+
+if _have_ssl:
+ class NetworkedNNTP_SSLTests(NetworkedNNTPTestsMixin, unittest.TestCase):
+ NNTP_HOST = 'snews.gmane.org'
+ GROUP_NAME = 'gmane.comp.python.devel'
+ GROUP_PAT = 'gmane.comp.python.d*'
+
+ def setUp(self):
+ support.requires("network")
+ with support.transient_internet(self.NNTP_HOST):
+ self.server = nntplib.NNTP_SSL(self.NNTP_HOST, timeout=TIMEOUT,
+ usenetrc=False)
+
+ def tearDown(self):
+ if self.server is not None:
+ self.server.quit()
+
+ # Disabled with gmane as it produces too much data
+ test_list = None
+
+ # Disabled as the connection will already be encrypted.
+ test_starttls = None
+
#
# Non-networked tests using a local server (or something mocking it).
@@ -261,7 +313,6 @@ class MockedNNTPTestsMixin:
# Using BufferedRWPair instead of BufferedRandom ensures the file
# isn't seekable.
file = io.BufferedRWPair(self.sio, self.sio)
- kwargs.setdefault('usenetrc', False)
self.server = nntplib._NNTPBase(file, 'test.server', *args, **kwargs)
return self.server
@@ -1134,9 +1185,10 @@ class MiscTests(unittest.TestCase):
def test_main():
- support.run_unittest(MiscTests, NNTPv1Tests, NNTPv2Tests,
- NetworkedNNTPTests
- )
+ tests = [MiscTests, NNTPv1Tests, NNTPv2Tests, NetworkedNNTPTests]
+ if _have_ssl:
+ tests.append(NetworkedNNTP_SSLTests)
+ support.run_unittest(*tests)
if __name__ == "__main__":