diff options
-rw-r--r-- | Lib/codecs.py | 9 | ||||
-rw-r--r-- | Lib/test/test_codecs.py | 91 |
2 files changed, 91 insertions, 9 deletions
diff --git a/Lib/codecs.py b/Lib/codecs.py index bd415fe..862f910 100644 --- a/Lib/codecs.py +++ b/Lib/codecs.py @@ -310,6 +310,15 @@ class StreamReader(Codec): data = data[1:] if data: self.atcr = data.endswith(u"\r") + # If we're at a "\r" (and are allowed to read more), read one + # extra character (which might be a "\n") to get a proper + # line ending (If the stream is temporarily exhausted we return + # the wrong line ending, but at least we won't generate a bogus + # second line. + if self.atcr and size is None: + data += self.read(size=1, chars=1) + self.atcr = data.endswith(u"\r") + line += data lines = line.splitlines(True) if lines: diff --git a/Lib/test/test_codecs.py b/Lib/test/test_codecs.py index e6dba34..692e9b5 100644 --- a/Lib/test/test_codecs.py +++ b/Lib/test/test_codecs.py @@ -73,15 +73,88 @@ class ReadTest(unittest.TestCase): # reader has to look ahead whether this is a lone \r or a \r\n for size in xrange(80): for lineend in u"\n \r\n \r \u2028".split(): - s = size*u"a" + lineend + u"xxx\n" - self.assertEqual( - getreader(s).readline(keepends=True), - size*u"a" + lineend, - ) - self.assertEqual( - getreader(s).readline(keepends=False), - size*u"a", - ) + s = 10*(size*u"a" + lineend + u"xxx\n") + reader = getreader(s) + for i in xrange(10): + self.assertEqual( + reader.readline(keepends=True), + size*u"a" + lineend, + ) + reader = getreader(s) + for i in xrange(10): + self.assertEqual( + reader.readline(keepends=False), + size*u"a", + ) + + def test_bug1175396(self): + s = [ + '<%!--===================================================\r\n', + ' BLOG index page: show recent articles,\r\n', + ' today\'s articles, or articles of a specific date.\r\n', + '========================================================--%>\r\n', + '<%@inputencoding="ISO-8859-1"%>\r\n', + '<%@pagetemplate=TEMPLATE.y%>\r\n', + '<%@import=import frog.util, frog%>\r\n', + '<%@import=import frog.objects%>\r\n', + '<%@import=from frog.storageerrors import StorageError%>\r\n', + '<%\r\n', + '\r\n', + 'import logging\r\n', + 'log=logging.getLogger("Snakelets.logger")\r\n', + '\r\n', + '\r\n', + 'user=self.SessionCtx.user\r\n', + 'storageEngine=self.SessionCtx.storageEngine\r\n', + '\r\n', + '\r\n', + 'def readArticlesFromDate(date, count=None):\r\n', + ' entryids=storageEngine.listBlogEntries(date)\r\n', + ' entryids.reverse() # descending\r\n', + ' if count:\r\n', + ' entryids=entryids[:count]\r\n', + ' try:\r\n', + ' return [ frog.objects.BlogEntry.load(storageEngine, date, Id) for Id in entryids ]\r\n', + ' except StorageError,x:\r\n', + ' log.error("Error loading articles: "+str(x))\r\n', + ' self.abort("cannot load articles")\r\n', + '\r\n', + 'showdate=None\r\n', + '\r\n', + 'arg=self.Request.getArg()\r\n', + 'if arg=="today":\r\n', + ' #-------------------- TODAY\'S ARTICLES\r\n', + ' self.write("<h2>Today\'s articles</h2>")\r\n', + ' showdate = frog.util.isodatestr() \r\n', + ' entries = readArticlesFromDate(showdate)\r\n', + 'elif arg=="active":\r\n', + ' #-------------------- ACTIVE ARTICLES redirect\r\n', + ' self.Yredirect("active.y")\r\n', + 'elif arg=="login":\r\n', + ' #-------------------- LOGIN PAGE redirect\r\n', + ' self.Yredirect("login.y")\r\n', + 'elif arg=="date":\r\n', + ' #-------------------- ARTICLES OF A SPECIFIC DATE\r\n', + ' showdate = self.Request.getParameter("date")\r\n', + ' self.write("<h2>Articles written on %s</h2>"% frog.util.mediumdatestr(showdate))\r\n', + ' entries = readArticlesFromDate(showdate)\r\n', + 'else:\r\n', + ' #-------------------- RECENT ARTICLES\r\n', + ' self.write("<h2>Recent articles</h2>")\r\n', + ' dates=storageEngine.listBlogEntryDates()\r\n', + ' if dates:\r\n', + ' entries=[]\r\n', + ' SHOWAMOUNT=10\r\n', + ' for showdate in dates:\r\n', + ' entries.extend( readArticlesFromDate(showdate, SHOWAMOUNT-len(entries)) )\r\n', + ' if len(entries)>=SHOWAMOUNT:\r\n', + ' break\r\n', + ' \r\n', + ] + stream = StringIO.StringIO("".join(s).encode(self.encoding)) + reader = codecs.getreader(self.encoding)(stream) + for (i, line) in enumerate(reader): + self.assertEqual(line, s[i]) def test_readlinequeue(self): q = Queue() |