summaryrefslogtreecommitdiffstats
path: root/Lib
diff options
context:
space:
mode:
authorGeorg Brandl <georg@python.org>2005-08-26 08:51:34 (GMT)
committerGeorg Brandl <georg@python.org>2005-08-26 08:51:34 (GMT)
commit5a650a253c89bf6a491fc67c441530969ee4bda8 (patch)
treec80a0e9eaaf700d58ad021f4fa95730e02c6c12e /Lib
parentb3f55f4a706113f24386afd4bb1262b44a1c70c0 (diff)
downloadcpython-5a650a253c89bf6a491fc67c441530969ee4bda8.zip
cpython-5a650a253c89bf6a491fc67c441530969ee4bda8.tar.gz
cpython-5a650a253c89bf6a491fc67c441530969ee4bda8.tar.bz2
patch [ 810023 ] Fix for off-by-one bug in urllib.URLopener.retrieve
Diffstat (limited to 'Lib')
-rw-r--r--Lib/test/test_urllib.py102
-rw-r--r--Lib/urllib.py14
2 files changed, 99 insertions, 17 deletions
diff --git a/Lib/test/test_urllib.py b/Lib/test/test_urllib.py
index 1a32f53..3621476 100644
--- a/Lib/test/test_urllib.py
+++ b/Lib/test/test_urllib.py
@@ -6,6 +6,7 @@ import unittest
from test import test_support
import os
import mimetools
+import tempfile
import StringIO
def hexescape(char):
@@ -125,15 +126,53 @@ class urlretrieve_FileTests(unittest.TestCase):
"""Test urllib.urlretrieve() on local files"""
def setUp(self):
+ # Create a list of temporary files. Each item in the list is a file
+ # name (absolute path or relative to the current working directory).
+ # All files in this list will be deleted in the tearDown method. Note,
+ # this only helps to makes sure temporary files get deleted, but it
+ # does nothing about trying to close files that may still be open. It
+ # is the responsibility of the developer to properly close files even
+ # when exceptional conditions occur.
+ self.tempFiles = []
+
# Create a temporary file.
+ self.registerFileForCleanUp(test_support.TESTFN)
self.text = 'testing urllib.urlretrieve'
- FILE = file(test_support.TESTFN, 'wb')
- FILE.write(self.text)
- FILE.close()
+ try:
+ FILE = file(test_support.TESTFN, 'wb')
+ FILE.write(self.text)
+ FILE.close()
+ finally:
+ try: FILE.close()
+ except: pass
def tearDown(self):
- # Delete the temporary file.
- os.remove(test_support.TESTFN)
+ # Delete the temporary files.
+ for each in self.tempFiles:
+ try: os.remove(each)
+ except: pass
+
+ def constructLocalFileUrl(self, filePath):
+ return "file://%s" % urllib.pathname2url(os.path.abspath(filePath))
+
+ def createNewTempFile(self, data=""):
+ """Creates a new temporary file containing the specified data,
+ registers the file for deletion during the test fixture tear down, and
+ returns the absolute path of the file."""
+
+ newFd, newFilePath = tempfile.mkstemp()
+ try:
+ self.registerFileForCleanUp(newFilePath)
+ newFile = os.fdopen(newFd, "wb")
+ newFile.write(data)
+ newFile.close()
+ finally:
+ try: newFile.close()
+ except: pass
+ return newFilePath
+
+ def registerFileForCleanUp(self, fileName):
+ self.tempFiles.append(fileName)
def test_basic(self):
# Make sure that a local file just gets its own location returned and
@@ -147,15 +186,19 @@ class urlretrieve_FileTests(unittest.TestCase):
def test_copy(self):
# Test that setting the filename argument works.
second_temp = "%s.2" % test_support.TESTFN
- result = urllib.urlretrieve("file:%s" % test_support.TESTFN, second_temp)
+ self.registerFileForCleanUp(second_temp)
+ result = urllib.urlretrieve(self.constructLocalFileUrl(
+ test_support.TESTFN), second_temp)
self.assertEqual(second_temp, result[0])
self.assert_(os.path.exists(second_temp), "copy of the file was not "
"made")
FILE = file(second_temp, 'rb')
try:
text = FILE.read()
- finally:
FILE.close()
+ finally:
+ try: FILE.close()
+ except: pass
self.assertEqual(self.text, text)
def test_reporthook(self):
@@ -167,8 +210,49 @@ class urlretrieve_FileTests(unittest.TestCase):
self.assertEqual(count, count_holder[0])
count_holder[0] = count_holder[0] + 1
second_temp = "%s.2" % test_support.TESTFN
- urllib.urlretrieve(test_support.TESTFN, second_temp, hooktester)
- os.remove(second_temp)
+ self.registerFileForCleanUp(second_temp)
+ urllib.urlretrieve(self.constructLocalFileUrl(test_support.TESTFN),
+ second_temp, hooktester)
+
+ def test_reporthook_0_bytes(self):
+ # Test on zero length file. Should call reporthook only 1 time.
+ report = []
+ def hooktester(count, block_size, total_size, _report=report):
+ _report.append((count, block_size, total_size))
+ srcFileName = self.createNewTempFile()
+ urllib.urlretrieve(self.constructLocalFileUrl(srcFileName),
+ test_support.TESTFN, hooktester)
+ self.assertEqual(len(report), 1)
+ self.assertEqual(report[0][2], 0)
+
+ def test_reporthook_5_bytes(self):
+ # Test on 5 byte file. Should call reporthook only 2 times (once when
+ # the "network connection" is established and once when the block is
+ # read). Since the block size is 8192 bytes, only one block read is
+ # required to read the entire file.
+ report = []
+ def hooktester(count, block_size, total_size, _report=report):
+ _report.append((count, block_size, total_size))
+ srcFileName = self.createNewTempFile("x" * 5)
+ urllib.urlretrieve(self.constructLocalFileUrl(srcFileName),
+ test_support.TESTFN, hooktester)
+ self.assertEqual(len(report), 2)
+ self.assertEqual(report[0][1], 8192)
+ self.assertEqual(report[0][2], 5)
+
+ def test_reporthook_8193_bytes(self):
+ # Test on 8193 byte file. Should call reporthook only 3 times (once
+ # when the "network connection" is established, once for the next 8192
+ # bytes, and once for the last byte).
+ report = []
+ def hooktester(count, block_size, total_size, _report=report):
+ _report.append((count, block_size, total_size))
+ srcFileName = self.createNewTempFile("x" * 8193)
+ urllib.urlretrieve(self.constructLocalFileUrl(srcFileName),
+ test_support.TESTFN, hooktester)
+ self.assertEqual(len(report), 3)
+ self.assertEqual(report[0][1], 8192)
+ self.assertEqual(report[0][2], 8193)
class QuotingTests(unittest.TestCase):
"""Tests for urllib.quote() and urllib.quote_plus()
diff --git a/Lib/urllib.py b/Lib/urllib.py
index 4f1ebdd..1daa49d 100644
--- a/Lib/urllib.py
+++ b/Lib/urllib.py
@@ -234,19 +234,17 @@ class URLopener:
bs = 1024*8
size = -1
read = 0
- blocknum = 1
+ blocknum = 0
if reporthook:
if "content-length" in headers:
size = int(headers["Content-Length"])
- reporthook(0, bs, size)
- block = fp.read(bs)
- read += len(block)
- if reporthook:
- reporthook(1, bs, size)
- while block:
- tfp.write(block)
+ reporthook(blocknum, bs, size)
+ while 1:
block = fp.read(bs)
+ if block == "":
+ break
read += len(block)
+ tfp.write(block)
blocknum += 1
if reporthook:
reporthook(blocknum, bs, size)