summaryrefslogtreecommitdiffstats
path: root/Lib
diff options
context:
space:
mode:
authorKristjan Valur Jonsson <sweskman@gmail.com>2011-03-30 11:39:24 (GMT)
committerKristjan Valur Jonsson <sweskman@gmail.com>2011-03-30 11:39:24 (GMT)
commit3c136e19b9de13f8d4163a0feb3af54ab8b8c765 (patch)
tree8a0c2faa0a95fc1ddaf994f9b8cc6ecd88b547d9 /Lib
parent978da33c7a07bf133d144a7ad342de7e20777250 (diff)
parent010a94848943b543dd54661e7e3857f19aabd741 (diff)
downloadcpython-3c136e19b9de13f8d4163a0feb3af54ab8b8c765.zip
cpython-3c136e19b9de13f8d4163a0feb3af54ab8b8c765.tar.gz
cpython-3c136e19b9de13f8d4163a0feb3af54ab8b8c765.tar.bz2
Merge
Diffstat (limited to 'Lib')
-rw-r--r--Lib/email/parser.py24
-rw-r--r--Lib/logging/__init__.py8
-rw-r--r--Lib/test/test_logging.py537
-rw-r--r--Lib/test/test_urllib.py16
-rw-r--r--Lib/test/test_urllib2.py24
-rw-r--r--Lib/urllib/request.py27
6 files changed, 587 insertions, 49 deletions
diff --git a/Lib/email/parser.py b/Lib/email/parser.py
index 6caaff5..ef051fa 100644
--- a/Lib/email/parser.py
+++ b/Lib/email/parser.py
@@ -15,7 +15,7 @@ from email.message import Message
class Parser:
- def __init__(self, *args, **kws):
+ def __init__(self, _class=Message):
"""Parser of RFC 2822 and MIME email messages.
Creates an in-memory object tree representing the email message, which
@@ -31,27 +31,7 @@ class Parser:
must be created. This class must have a constructor that can take
zero arguments. Default is Message.Message.
"""
- if len(args) >= 1:
- if '_class' in kws:
- raise TypeError("Multiple values for keyword arg '_class'")
- kws['_class'] = args[0]
- if len(args) == 2:
- if 'strict' in kws:
- raise TypeError("Multiple values for keyword arg 'strict'")
- kws['strict'] = args[1]
- if len(args) > 2:
- raise TypeError('Too many arguments')
- if '_class' in kws:
- self._class = kws['_class']
- del kws['_class']
- else:
- self._class = Message
- if 'strict' in kws:
- warnings.warn("'strict' argument is deprecated (and ignored)",
- DeprecationWarning, 2)
- del kws['strict']
- if kws:
- raise TypeError('Unexpected keyword arguments')
+ self._class = _class
def parse(self, fp, headersonly=False):
"""Create a message structure from the data in a file.
diff --git a/Lib/logging/__init__.py b/Lib/logging/__init__.py
index e4b34a1..7757a82 100644
--- a/Lib/logging/__init__.py
+++ b/Lib/logging/__init__.py
@@ -1,4 +1,4 @@
-# Copyright 2001-2010 by Vinay Sajip. All Rights Reserved.
+# Copyright 2001-2011 by Vinay Sajip. All Rights Reserved.
#
# Permission to use, copy, modify, and distribute this software and its
# documentation for any purpose and without fee is hereby granted,
@@ -18,7 +18,7 @@
Logging package for Python. Based on PEP 282 and comments thereto in
comp.lang.python, and influenced by Apache's log4j system.
-Copyright (C) 2001-2010 Vinay Sajip. All Rights Reserved.
+Copyright (C) 2001-2011 Vinay Sajip. All Rights Reserved.
To use, simply 'import logging' and log away!
"""
@@ -1826,10 +1826,10 @@ class NullHandler(Handler):
package.
"""
def handle(self, record):
- pass
+ """Stub."""
def emit(self, record):
- pass
+ """Stub."""
def createLock(self):
self.lock = None
diff --git a/Lib/test/test_logging.py b/Lib/test/test_logging.py
index b62545c..bfb2404 100644
--- a/Lib/test/test_logging.py
+++ b/Lib/test/test_logging.py
@@ -40,7 +40,7 @@ from socketserver import ThreadingTCPServer, StreamRequestHandler
import struct
import sys
import tempfile
-from test.support import captured_stdout, run_with_locale, run_unittest
+from test.support import captured_stdout, run_with_locale, run_unittest, patch
import textwrap
import unittest
import warnings
@@ -1082,28 +1082,39 @@ class WarningsTest(BaseTest):
def test_warnings(self):
with warnings.catch_warnings():
logging.captureWarnings(True)
- try:
- warnings.filterwarnings("always", category=UserWarning)
- file = io.StringIO()
- h = logging.StreamHandler(file)
- logger = logging.getLogger("py.warnings")
- logger.addHandler(h)
- warnings.warn("I'm warning you...")
- logger.removeHandler(h)
- s = file.getvalue()
- h.close()
- self.assertTrue(s.find("UserWarning: I'm warning you...\n") > 0)
-
- #See if an explicit file uses the original implementation
- file = io.StringIO()
- warnings.showwarning("Explicit", UserWarning, "dummy.py", 42,
- file, "Dummy line")
- s = file.getvalue()
- file.close()
- self.assertEqual(s,
- "dummy.py:42: UserWarning: Explicit\n Dummy line\n")
- finally:
- logging.captureWarnings(False)
+ self.addCleanup(lambda: logging.captureWarnings(False))
+ warnings.filterwarnings("always", category=UserWarning)
+ stream = io.StringIO()
+ h = logging.StreamHandler(stream)
+ logger = logging.getLogger("py.warnings")
+ logger.addHandler(h)
+ warnings.warn("I'm warning you...")
+ logger.removeHandler(h)
+ s = stream.getvalue()
+ h.close()
+ self.assertTrue(s.find("UserWarning: I'm warning you...\n") > 0)
+
+ #See if an explicit file uses the original implementation
+ a_file = io.StringIO()
+ warnings.showwarning("Explicit", UserWarning, "dummy.py", 42,
+ a_file, "Dummy line")
+ s = a_file.getvalue()
+ a_file.close()
+ self.assertEqual(s,
+ "dummy.py:42: UserWarning: Explicit\n Dummy line\n")
+
+ def test_warnings_no_handlers(self):
+ with warnings.catch_warnings():
+ logging.captureWarnings(True)
+ self.addCleanup(lambda: logging.captureWarnings(False))
+
+ # confirm our assumption: no loggers are set
+ logger = logging.getLogger("py.warnings")
+ assert logger.handlers == []
+
+ warnings.showwarning("Explicit", UserWarning, "dummy.py", 42)
+ self.assertTrue(len(logger.handlers) == 1)
+ self.assertIsInstance(logger.handlers[0], logging.NullHandler)
def formatFunc(format, datefmt=None):
@@ -2007,6 +2018,11 @@ class ManagerTest(BaseTest):
self.assertEqual(logged, ['should appear in logged'])
+ def test_set_log_record_factory(self):
+ man = logging.Manager(None)
+ expected = object()
+ man.setLogRecordFactory(expected)
+ self.assertEqual(man.logRecordFactory, expected)
class ChildLoggerTest(BaseTest):
def test_child_loggers(self):
@@ -2198,6 +2214,479 @@ class LastResortTest(BaseTest):
logging.raiseExceptions = old_raise_exceptions
+class FakeHandler:
+
+ def __init__(self, identifier, called):
+ for method in ('acquire', 'flush', 'close', 'release'):
+ setattr(self, method, self.record_call(identifier, method, called))
+
+ def record_call(self, identifier, method_name, called):
+ def inner():
+ called.append('{} - {}'.format(identifier, method_name))
+ return inner
+
+
+class RecordingHandler(logging.NullHandler):
+
+ def __init__(self, *args, **kwargs):
+ super(RecordingHandler, self).__init__(*args, **kwargs)
+ self.records = []
+
+ def handle(self, record):
+ """Keep track of all the emitted records."""
+ self.records.append(record)
+
+
+class ShutdownTest(BaseTest):
+
+ """Tets suite for the shutdown method."""
+
+ def setUp(self):
+ super(ShutdownTest, self).setUp()
+ self.called = []
+
+ raise_exceptions = logging.raiseExceptions
+ self.addCleanup(lambda: setattr(logging, 'raiseExceptions', raise_exceptions))
+
+ def raise_error(self, error):
+ def inner():
+ raise error()
+ return inner
+
+ def test_no_failure(self):
+ # create some fake handlers
+ handler0 = FakeHandler(0, self.called)
+ handler1 = FakeHandler(1, self.called)
+ handler2 = FakeHandler(2, self.called)
+
+ # create live weakref to those handlers
+ handlers = map(logging.weakref.ref, [handler0, handler1, handler2])
+
+ logging.shutdown(handlerList=list(handlers))
+
+ expected = ['2 - acquire', '2 - flush', '2 - close', '2 - release',
+ '1 - acquire', '1 - flush', '1 - close', '1 - release',
+ '0 - acquire', '0 - flush', '0 - close', '0 - release']
+ self.assertEqual(expected, self.called)
+
+ def _test_with_failure_in_method(self, method, error):
+ handler = FakeHandler(0, self.called)
+ setattr(handler, method, self.raise_error(error))
+ handlers = [logging.weakref.ref(handler)]
+
+ logging.shutdown(handlerList=list(handlers))
+
+ self.assertEqual('0 - release', self.called[-1])
+
+ def test_with_ioerror_in_acquire(self):
+ self._test_with_failure_in_method('acquire', IOError)
+
+ def test_with_ioerror_in_flush(self):
+ self._test_with_failure_in_method('flush', IOError)
+
+ def test_with_ioerror_in_close(self):
+ self._test_with_failure_in_method('close', IOError)
+
+ def test_with_valueerror_in_acquire(self):
+ self._test_with_failure_in_method('acquire', ValueError)
+
+ def test_with_valueerror_in_flush(self):
+ self._test_with_failure_in_method('flush', ValueError)
+
+ def test_with_valueerror_in_close(self):
+ self._test_with_failure_in_method('close', ValueError)
+
+ def test_with_other_error_in_acquire_without_raise(self):
+ logging.raiseExceptions = False
+ self._test_with_failure_in_method('acquire', IndexError)
+
+ def test_with_other_error_in_flush_without_raise(self):
+ logging.raiseExceptions = False
+ self._test_with_failure_in_method('flush', IndexError)
+
+ def test_with_other_error_in_close_without_raise(self):
+ logging.raiseExceptions = False
+ self._test_with_failure_in_method('close', IndexError)
+
+ def test_with_other_error_in_acquire_with_raise(self):
+ logging.raiseExceptions = True
+ self.assertRaises(IndexError, self._test_with_failure_in_method,
+ 'acquire', IndexError)
+
+ def test_with_other_error_in_flush_with_raise(self):
+ logging.raiseExceptions = True
+ self.assertRaises(IndexError, self._test_with_failure_in_method,
+ 'flush', IndexError)
+
+ def test_with_other_error_in_close_with_raise(self):
+ logging.raiseExceptions = True
+ self.assertRaises(IndexError, self._test_with_failure_in_method,
+ 'close', IndexError)
+
+
+class ModuleLevelMiscTest(BaseTest):
+
+ """Tets suite for some module level methods."""
+
+ def test_disable(self):
+ old_disable = logging.root.manager.disable
+ # confirm our assumptions are correct
+ assert old_disable == 0
+ self.addCleanup(lambda: logging.disable(old_disable))
+
+ logging.disable(83)
+ self.assertEqual(logging.root.manager.disable, 83)
+
+ def _test_log(self, method, level=None):
+ called = []
+ patch(self, logging, 'basicConfig',
+ lambda *a, **kw: called.append(a, kw))
+
+ recording = RecordingHandler()
+ logging.root.addHandler(recording)
+
+ log_method = getattr(logging, method)
+ if level is not None:
+ log_method(level, "test me: %r", recording)
+ else:
+ log_method("test me: %r", recording)
+
+ self.assertEqual(len(recording.records), 1)
+ record = recording.records[0]
+ self.assertEqual(record.getMessage(), "test me: %r" % recording)
+
+ expected_level = level if level is not None else getattr(logging, method.upper())
+ self.assertEqual(record.levelno, expected_level)
+
+ # basicConfig was not called!
+ self.assertEqual(called, [])
+
+ def test_log(self):
+ self._test_log('log', logging.ERROR)
+
+ def test_debug(self):
+ self._test_log('debug')
+
+ def test_info(self):
+ self._test_log('info')
+
+ def test_warning(self):
+ self._test_log('warning')
+
+ def test_error(self):
+ self._test_log('error')
+
+ def test_critical(self):
+ self._test_log('critical')
+
+ def test_set_logger_class(self):
+ self.assertRaises(TypeError, logging.setLoggerClass, object)
+
+ class MyLogger(logging.Logger):
+ pass
+
+ logging.setLoggerClass(MyLogger)
+ self.assertEqual(logging.getLoggerClass(), MyLogger)
+
+ logging.setLoggerClass(logging.Logger)
+ self.assertEqual(logging.getLoggerClass(), logging.Logger)
+
+
+class BasicConfigTest(unittest.TestCase):
+
+ """Tets suite for logging.basicConfig."""
+
+ def setUp(self):
+ super(BasicConfigTest, self).setUp()
+ handlers = logging.root.handlers
+ self.addCleanup(lambda: setattr(logging.root, 'handlers', handlers))
+ logging.root.handlers = []
+
+ def tearDown(self):
+ logging.shutdown()
+ super(BasicConfigTest, self).tearDown()
+
+ def test_no_kwargs(self):
+ logging.basicConfig()
+
+ # handler defaults to a StreamHandler to sys.stderr
+ self.assertEqual(len(logging.root.handlers), 1)
+ handler = logging.root.handlers[0]
+ self.assertIsInstance(handler, logging.StreamHandler)
+ self.assertEqual(handler.stream, sys.stderr)
+
+ formatter = handler.formatter
+ # format defaults to logging.BASIC_FORMAT
+ self.assertEqual(formatter._style._fmt, logging.BASIC_FORMAT)
+ # datefmt defaults to None
+ self.assertIsNone(formatter.datefmt)
+ # style defaults to %
+ self.assertIsInstance(formatter._style, logging.PercentStyle)
+
+ # level is not explicitely set
+ self.assertEqual(logging.root.level, logging.WARNING)
+
+ def test_filename(self):
+ logging.basicConfig(filename='test.log')
+
+ self.assertEqual(len(logging.root.handlers), 1)
+ handler = logging.root.handlers[0]
+ self.assertIsInstance(handler, logging.FileHandler)
+
+ expected = logging.FileHandler('test.log', 'a')
+ self.addCleanup(expected.close)
+ self.assertEqual(handler.stream.mode, expected.stream.mode)
+ self.assertEqual(handler.stream.name, expected.stream.name)
+
+ def test_filemode(self):
+ logging.basicConfig(filename='test.log', filemode='wb')
+
+ handler = logging.root.handlers[0]
+ expected = logging.FileHandler('test.log', 'wb')
+ self.addCleanup(expected.close)
+ self.assertEqual(handler.stream.mode, expected.stream.mode)
+
+ def test_stream(self):
+ stream = io.StringIO()
+ self.addCleanup(stream.close)
+ logging.basicConfig(stream=stream)
+
+ self.assertEqual(len(logging.root.handlers), 1)
+ handler = logging.root.handlers[0]
+ self.assertIsInstance(handler, logging.StreamHandler)
+ self.assertEqual(handler.stream, stream)
+
+ def test_format(self):
+ logging.basicConfig(format='foo')
+
+ formatter = logging.root.handlers[0].formatter
+ self.assertEqual(formatter._style._fmt, 'foo')
+
+ def test_datefmt(self):
+ logging.basicConfig(datefmt='bar')
+
+ formatter = logging.root.handlers[0].formatter
+ self.assertEqual(formatter.datefmt, 'bar')
+
+ def test_style(self):
+ logging.basicConfig(style='$')
+
+ formatter = logging.root.handlers[0].formatter
+ self.assertIsInstance(formatter._style, logging.StringTemplateStyle)
+
+ def test_level(self):
+ old_level = logging.root.level
+ self.addCleanup(lambda: logging.root.setLevel(old_level))
+
+ logging.basicConfig(level=57)
+ self.assertEqual(logging.root.level, 57)
+
+ def _test_log(self, method, level=None):
+ # logging.root has no handlers so basicConfig should be called
+ called = []
+
+ old_basic_config = logging.basicConfig
+ def my_basic_config(*a, **kw):
+ old_basic_config()
+ old_level = logging.root.level
+ logging.root.setLevel(100) # avoid having messages in stderr
+ self.addCleanup(lambda: logging.root.setLevel(old_level))
+ called.append((a, kw))
+
+ patch(self, logging, 'basicConfig', my_basic_config)
+
+ log_method = getattr(logging, method)
+ if level is not None:
+ log_method(level, "test me")
+ else:
+ log_method("test me")
+
+ # basicConfig was called with no arguments
+ self.assertEqual(called, [((), {})])
+
+ def test_log(self):
+ self._test_log('log', logging.WARNING)
+
+ def test_debug(self):
+ self._test_log('debug')
+
+ def test_info(self):
+ self._test_log('info')
+
+ def test_warning(self):
+ self._test_log('warning')
+
+ def test_error(self):
+ self._test_log('error')
+
+ def test_critical(self):
+ self._test_log('critical')
+
+
+class LoggerAdapterTest(unittest.TestCase):
+
+ def setUp(self):
+ super(LoggerAdapterTest, self).setUp()
+ old_handler_list = logging._handlerList[:]
+
+ self.recording = RecordingHandler()
+ self.logger = logging.root
+ self.logger.addHandler(self.recording)
+ self.addCleanup(lambda: self.logger.removeHandler(self.recording))
+ self.addCleanup(self.recording.close)
+
+ def cleanup():
+ logging._handlerList[:] = old_handler_list
+
+ self.addCleanup(cleanup)
+ self.addCleanup(logging.shutdown)
+ self.adapter = logging.LoggerAdapter(logger=self.logger, extra=None)
+
+ def test_exception(self):
+ msg = 'testing exception: %r'
+ exc = None
+ try:
+ assert False
+ except AssertionError as e:
+ exc = e
+ self.adapter.exception(msg, self.recording)
+
+ self.assertEqual(len(self.recording.records), 1)
+ record = self.recording.records[0]
+ self.assertEqual(record.levelno, logging.ERROR)
+ self.assertEqual(record.msg, msg)
+ self.assertEqual(record.args, (self.recording,))
+ self.assertEqual(record.exc_info,
+ (exc.__class__, exc, exc.__traceback__))
+
+ def test_critical(self):
+ msg = 'critical test! %r'
+ self.adapter.critical(msg, self.recording)
+
+ self.assertEqual(len(self.recording.records), 1)
+ record = self.recording.records[0]
+ self.assertEqual(record.levelno, logging.CRITICAL)
+ self.assertEqual(record.msg, msg)
+ self.assertEqual(record.args, (self.recording,))
+
+ def test_is_enabled_for(self):
+ old_disable = self.adapter.logger.manager.disable
+ self.adapter.logger.manager.disable = 33
+ self.addCleanup(lambda: setattr(self.adapter.logger.manager,
+ 'disable', old_disable))
+ self.assertFalse(self.adapter.isEnabledFor(32))
+
+ def test_has_handlers(self):
+ self.assertTrue(self.adapter.hasHandlers())
+
+ for handler in self.logger.handlers:
+ self.logger.removeHandler(handler)
+ assert not self.logger.hasHandlers()
+
+ self.assertFalse(self.adapter.hasHandlers())
+
+
+class LoggerTest(BaseTest):
+
+ def setUp(self):
+ super(LoggerTest, self).setUp()
+ self.recording = RecordingHandler()
+ self.logger = logging.Logger(name='blah')
+ self.logger.addHandler(self.recording)
+ self.addCleanup(lambda: self.logger.removeHandler(self.recording))
+ self.addCleanup(self.recording.close)
+ self.addCleanup(logging.shutdown)
+
+ def test_set_invalid_level(self):
+ self.assertRaises(TypeError, self.logger.setLevel, object())
+
+ def test_exception(self):
+ msg = 'testing exception: %r'
+ exc = None
+ try:
+ assert False
+ except AssertionError as e:
+ exc = e
+ self.logger.exception(msg, self.recording)
+
+ self.assertEqual(len(self.recording.records), 1)
+ record = self.recording.records[0]
+ self.assertEqual(record.levelno, logging.ERROR)
+ self.assertEqual(record.msg, msg)
+ self.assertEqual(record.args, (self.recording,))
+ self.assertEqual(record.exc_info,
+ (exc.__class__, exc, exc.__traceback__))
+
+ def test_log_invalid_level_with_raise(self):
+ old_raise = logging.raiseExceptions
+ self.addCleanup(lambda: setattr(logging, 'raiseExecptions', old_raise))
+
+ logging.raiseExceptions = True
+ self.assertRaises(TypeError, self.logger.log, '10', 'test message')
+
+ def test_log_invalid_level_no_raise(self):
+ old_raise = logging.raiseExceptions
+ self.addCleanup(lambda: setattr(logging, 'raiseExecptions', old_raise))
+
+ logging.raiseExceptions = False
+ self.logger.log('10', 'test message') # no exception happens
+
+ def test_find_caller_with_stack_info(self):
+ called = []
+ patch(self, logging.traceback, 'print_stack',
+ lambda f, file: called.append(file.getvalue()))
+
+ self.logger.findCaller(stack_info=True)
+
+ self.assertEqual(len(called), 1)
+ self.assertEqual('Stack (most recent call last):\n', called[0])
+
+ def test_make_record_with_extra_overwrite(self):
+ name = 'my record'
+ level = 13
+ fn = lno = msg = args = exc_info = func = sinfo = None
+ rv = logging._logRecordFactory(name, level, fn, lno, msg, args,
+ exc_info, func, sinfo)
+
+ for key in ('message', 'asctime') + tuple(rv.__dict__.keys()):
+ extra = {key: 'some value'}
+ self.assertRaises(KeyError, self.logger.makeRecord, name, level,
+ fn, lno, msg, args, exc_info,
+ extra=extra, sinfo=sinfo)
+
+ def test_make_record_with_extra_no_overwrite(self):
+ name = 'my record'
+ level = 13
+ fn = lno = msg = args = exc_info = func = sinfo = None
+ extra = {'valid_key': 'some value'}
+ result = self.logger.makeRecord(name, level, fn, lno, msg, args,
+ exc_info, extra=extra, sinfo=sinfo)
+ self.assertIn('valid_key', result.__dict__)
+
+ def test_has_handlers(self):
+ self.assertTrue(self.logger.hasHandlers())
+
+ for handler in self.logger.handlers:
+ self.logger.removeHandler(handler)
+ assert not self.logger.hasHandlers()
+
+ self.assertFalse(self.logger.hasHandlers())
+
+ def test_has_handlers_no_propagate(self):
+ child_logger = logging.getLogger('blah.child')
+ child_logger.propagate = False
+ assert child_logger.handlers == []
+
+ self.assertFalse(child_logger.hasHandlers())
+
+ def test_is_enabled_for(self):
+ old_disable = self.logger.manager.disable
+ self.logger.manager.disable = 23
+ self.addCleanup(lambda: setattr(self.logger.manager,
+ 'disable', old_disable))
+ self.assertFalse(self.logger.isEnabledFor(22))
+
+
class BaseFileTest(BaseTest):
"Base class for handler tests that write log files"
@@ -2319,6 +2808,8 @@ def test_main():
EncodingTest, WarningsTest, ConfigDictTest, ManagerTest,
FormatterTest,
LogRecordFactoryTest, ChildLoggerTest, QueueHandlerTest,
+ ShutdownTest, ModuleLevelMiscTest, BasicConfigTest,
+ LoggerAdapterTest, LoggerTest,
RotatingFileHandlerTest,
LastResortTest,
TimedRotatingFileHandlerTest
diff --git a/Lib/test/test_urllib.py b/Lib/test/test_urllib.py
index 25ebeee..b85005f 100644
--- a/Lib/test/test_urllib.py
+++ b/Lib/test/test_urllib.py
@@ -2,6 +2,7 @@
import urllib.parse
import urllib.request
+import urllib.error
import http.client
import email.message
import io
@@ -206,6 +207,21 @@ Content-Type: text/html; charset=iso-8859-1
finally:
self.unfakehttp()
+ def test_invalid_redirect(self):
+ # urlopen() should raise IOError for many error codes.
+ self.fakehttp(b'''HTTP/1.1 302 Found
+Date: Wed, 02 Jan 2008 03:03:54 GMT
+Server: Apache/1.3.33 (Debian GNU/Linux) mod_ssl/2.8.22 OpenSSL/0.9.7e
+Location: file://guidocomputer.athome.com:/python/license
+Connection: close
+Content-Type: text/html; charset=iso-8859-1
+''')
+ try:
+ self.assertRaises(urllib.error.HTTPError, urlopen,
+ "http://python.org/")
+ finally:
+ self.unfakehttp()
+
def test_empty_socket(self):
# urlopen() raises IOError if the underlying socket does not send any
# data. (#1680230)
diff --git a/Lib/test/test_urllib2.py b/Lib/test/test_urllib2.py
index 1d0d98c..113c10d 100644
--- a/Lib/test/test_urllib2.py
+++ b/Lib/test/test_urllib2.py
@@ -10,6 +10,7 @@ import urllib.request
# The proxy bypass method imported below has logic specific to the OSX
# proxy config data structure but is testable on all platforms.
from urllib.request import Request, OpenerDirector, _proxy_bypass_macosx_sysconf
+import urllib.error
# XXX
# Request
@@ -1031,6 +1032,29 @@ class HandlerTests(unittest.TestCase):
self.assertEqual(count,
urllib.request.HTTPRedirectHandler.max_redirections)
+
+ def test_invalid_redirect(self):
+ from_url = "http://example.com/a.html"
+ valid_schemes = ['http','https','ftp']
+ invalid_schemes = ['file','imap','ldap']
+ schemeless_url = "example.com/b.html"
+ h = urllib.request.HTTPRedirectHandler()
+ o = h.parent = MockOpener()
+ req = Request(from_url)
+ req.timeout = socket._GLOBAL_DEFAULT_TIMEOUT
+
+ for scheme in invalid_schemes:
+ invalid_url = scheme + '://' + schemeless_url
+ self.assertRaises(urllib.error.HTTPError, h.http_error_302,
+ req, MockFile(), 302, "Security Loophole",
+ MockHeaders({"location": invalid_url}))
+
+ for scheme in valid_schemes:
+ valid_url = scheme + '://' + schemeless_url
+ h.http_error_302(req, MockFile(), 302, "That's fine",
+ MockHeaders({"location": valid_url}))
+ self.assertEqual(o.req.get_full_url(), valid_url)
+
def test_cookie_redirect(self):
# cookies shouldn't leak into redirected requests
from http.cookiejar import CookieJar
diff --git a/Lib/urllib/request.py b/Lib/urllib/request.py
index 0aa7a77..304bf59 100644
--- a/Lib/urllib/request.py
+++ b/Lib/urllib/request.py
@@ -545,6 +545,17 @@ class HTTPRedirectHandler(BaseHandler):
# fix a possible malformed URL
urlparts = urlparse(newurl)
+
+ # For security reasons we don't allow redirection to anything other
+ # than http, https or ftp.
+
+ if not urlparts.scheme in ('http', 'https', 'ftp'):
+ raise HTTPError(newurl, code,
+ msg +
+ " - Redirection to url '%s' is not allowed" %
+ newurl,
+ headers, fp)
+
if not urlparts.path:
urlparts = list(urlparts)
urlparts[2] = "/"
@@ -1903,8 +1914,24 @@ class FancyURLopener(URLopener):
return
void = fp.read()
fp.close()
+
# In case the server sent a relative URL, join with original:
newurl = urljoin(self.type + ":" + url, newurl)
+
+ urlparts = urlparse(newurl)
+
+ # For security reasons, we don't allow redirection to anything other
+ # than http, https and ftp.
+
+ # We are using newer HTTPError with older redirect_internal method
+ # This older method will get deprecated in 3.3
+
+ if not urlparts.scheme in ('http', 'https', 'ftp'):
+ raise HTTPError(newurl, errcode,
+ errmsg +
+ " Redirection to url '%s' is not allowed." % newurl,
+ headers, fp)
+
return self.open(newurl)
def http_error_301(self, url, fp, errcode, errmsg, headers, data=None):