summaryrefslogtreecommitdiffstats
path: root/Lib
diff options
context:
space:
mode:
authorAmaury Forgeot d'Arc <amauryfa@gmail.com>2009-08-29 23:19:16 (GMT)
committerAmaury Forgeot d'Arc <amauryfa@gmail.com>2009-08-29 23:19:16 (GMT)
commitaf0312af7a279ca1ec067728cd71cf8269b4f597 (patch)
tree6fbdbfa29e47c4e71759d20b9c08e10c55aecad8 /Lib
parentf5d2879ab87d7096b0f50b25e9fd9ac5fb77f239 (diff)
downloadcpython-af0312af7a279ca1ec067728cd71cf8269b4f597.zip
cpython-af0312af7a279ca1ec067728cd71cf8269b4f597.tar.gz
cpython-af0312af7a279ca1ec067728cd71cf8269b4f597.tar.bz2
Merged revisions 74582 via svnmerge from
svn+ssh://pythondev@svn.python.org/python/branches/py3k ................ r74582 | amaury.forgeotdarc | 2009-08-30 01:00:38 +0200 (dim., 30 août 2009) | 10 lines Merged revisions 74581 via svnmerge from svn+ssh://pythondev@svn.python.org/python/trunk ........ r74581 | amaury.forgeotdarc | 2009-08-29 20:14:40 +0200 (sam., 29 août 2009) | 3 lines #6750: TextIOWrapped could duplicate output when several threads write to it. this affect text files opened with io.open(), and the print() function of py3k ........ ................
Diffstat (limited to 'Lib')
-rw-r--r--Lib/test/test_io.py21
1 files changed, 21 insertions, 0 deletions
diff --git a/Lib/test/test_io.py b/Lib/test/test_io.py
index a9094d9..150eff4 100644
--- a/Lib/test/test_io.py
+++ b/Lib/test/test_io.py
@@ -2051,6 +2051,27 @@ class TextIOWrapperTest(unittest.TestCase):
self.assertEqual(f.errors, "replace")
+ def test_threads_write(self):
+ # Issue6750: concurrent writes could duplicate data
+ event = threading.Event()
+ with self.open(support.TESTFN, "w", buffering=1) as f:
+ def run(n):
+ text = "Thread%03d\n" % n
+ event.wait()
+ f.write(text)
+ threads = [threading.Thread(target=lambda n=x: run(n))
+ for x in range(20)]
+ for t in threads:
+ t.start()
+ time.sleep(0.02)
+ event.set()
+ for t in threads:
+ t.join()
+ with self.open(support.TESTFN) as f:
+ content = f.read()
+ for n in range(20):
+ self.assertEquals(content.count("Thread%03d\n" % n), 1)
+
class CTextIOWrapperTest(TextIOWrapperTest):
def test_initialization(self):