summaryrefslogtreecommitdiffstats
path: root/Lib
diff options
context:
space:
mode:
Diffstat (limited to 'Lib')
-rw-r--r--Lib/_pyio.py5
-rw-r--r--Lib/test/test_io.py33
2 files changed, 30 insertions, 8 deletions
diff --git a/Lib/_pyio.py b/Lib/_pyio.py
index 3961969..b04d23a 100644
--- a/Lib/_pyio.py
+++ b/Lib/_pyio.py
@@ -1503,6 +1503,11 @@ class TextIOWrapper(TextIOBase):
if not isinstance(encoding, str):
raise ValueError("invalid encoding: %r" % encoding)
+ if not codecs.lookup(encoding)._is_text_encoding:
+ msg = ("%r is not a text encoding; "
+ "use codecs.open() to handle arbitrary codecs")
+ raise LookupError(msg % encoding)
+
if errors is None:
errors = "strict"
else:
diff --git a/Lib/test/test_io.py b/Lib/test/test_io.py
index 355a33e..d5b274c 100644
--- a/Lib/test/test_io.py
+++ b/Lib/test/test_io.py
@@ -1929,6 +1929,15 @@ class TextIOWrapperTest(unittest.TestCase):
self.assertRaises(TypeError, t.__init__, b, newline=42)
self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
+ def test_non_text_encoding_codecs_are_rejected(self):
+ # Ensure the constructor complains if passed a codec that isn't
+ # marked as a text encoding
+ # http://bugs.python.org/issue20404
+ r = self.BytesIO()
+ b = self.BufferedWriter(r)
+ with self.assertRaisesRegex(LookupError, "is not a text encoding"):
+ self.TextIOWrapper(b, encoding="hex")
+
def test_detach(self):
r = self.BytesIO()
b = self.BufferedWriter(r)
@@ -2579,15 +2588,22 @@ class TextIOWrapperTest(unittest.TestCase):
def test_illegal_decoder(self):
# Issue #17106
+ # Bypass the early encoding check added in issue 20404
+ def _make_illegal_wrapper():
+ quopri = codecs.lookup("quopri")
+ quopri._is_text_encoding = True
+ try:
+ t = self.TextIOWrapper(self.BytesIO(b'aaaaaa'),
+ newline='\n', encoding="quopri")
+ finally:
+ quopri._is_text_encoding = False
+ return t
# Crash when decoder returns non-string
- t = self.TextIOWrapper(self.BytesIO(b'aaaaaa'), newline='\n',
- encoding='quopri_codec')
+ t = _make_illegal_wrapper()
self.assertRaises(TypeError, t.read, 1)
- t = self.TextIOWrapper(self.BytesIO(b'aaaaaa'), newline='\n',
- encoding='quopri_codec')
+ t = _make_illegal_wrapper()
self.assertRaises(TypeError, t.readline)
- t = self.TextIOWrapper(self.BytesIO(b'aaaaaa'), newline='\n',
- encoding='quopri_codec')
+ t = _make_illegal_wrapper()
self.assertRaises(TypeError, t.read)
def _check_create_at_shutdown(self, **kwargs):
@@ -2616,8 +2632,7 @@ class TextIOWrapperTest(unittest.TestCase):
if err:
# Can error out with a RuntimeError if the module state
# isn't found.
- self.assertIn("RuntimeError: could not find io module state",
- err.decode())
+ self.assertIn(self.shutdown_error, err.decode())
else:
self.assertEqual("ok", out.decode().strip())
@@ -2630,6 +2645,7 @@ class TextIOWrapperTest(unittest.TestCase):
class CTextIOWrapperTest(TextIOWrapperTest):
io = io
+ shutdown_error = "RuntimeError: could not find io module state"
def test_initialization(self):
r = self.BytesIO(b"\xc3\xa9\n\n")
@@ -2674,6 +2690,7 @@ class CTextIOWrapperTest(TextIOWrapperTest):
class PyTextIOWrapperTest(TextIOWrapperTest):
io = pyio
+ shutdown_error = "LookupError: unknown encoding: ascii"
class IncrementalNewlineDecoderTest(unittest.TestCase):