diff options
Diffstat (limited to 'Lib/test/test_file.py')
-rw-r--r-- | Lib/test/test_file.py | 168 |
1 files changed, 167 insertions, 1 deletions
diff --git a/Lib/test/test_file.py b/Lib/test/test_file.py index 3ae460c..aab3e70 100644 --- a/Lib/test/test_file.py +++ b/Lib/test/test_file.py @@ -1,9 +1,13 @@ import sys import os import unittest +import itertools +import time +import threading from array import array from weakref import proxy +from test import test_support from test.test_support import TESTFN, findfile, run_unittest from UserList import UserList @@ -339,11 +343,173 @@ class FileSubclassTests(unittest.TestCase): self.failUnless(f.subclass_closed) +class FileThreadingTests(unittest.TestCase): + # These tests check the ability to call various methods of file objects + # (including close()) concurrently without crashing the Python interpreter. + # See #815646, #595601 + + def setUp(self): + self.f = None + self.filename = TESTFN + with open(self.filename, "w") as f: + f.write("\n".join("0123456789")) + self._count_lock = threading.Lock() + self.close_count = 0 + self.close_success_count = 0 + + def tearDown(self): + if self.f: + try: + self.f.close() + except (EnvironmentError, ValueError): + pass + try: + os.remove(self.filename) + except EnvironmentError: + pass + + def _create_file(self): + self.f = open(self.filename, "w+") + + def _close_file(self): + with self._count_lock: + self.close_count += 1 + self.f.close() + with self._count_lock: + self.close_success_count += 1 + + def _close_and_reopen_file(self): + self._close_file() + # if close raises an exception thats fine, self.f remains valid so + # we don't need to reopen. + self._create_file() + + def _run_workers(self, func, nb_workers, duration=0.2): + with self._count_lock: + self.close_count = 0 + self.close_success_count = 0 + self.do_continue = True + threads = [] + try: + for i in range(nb_workers): + t = threading.Thread(target=func) + t.start() + threads.append(t) + for _ in xrange(100): + time.sleep(duration/100) + with self._count_lock: + if self.close_count-self.close_success_count > nb_workers+1: + if test_support.verbose: + print 'Q', + break + time.sleep(duration) + finally: + self.do_continue = False + for t in threads: + t.join() + + def _test_close_open_io(self, io_func, nb_workers=5): + def worker(): + self._create_file() + funcs = itertools.cycle(( + lambda: io_func(), + lambda: self._close_and_reopen_file(), + )) + for f in funcs: + if not self.do_continue: + break + try: + f() + except (IOError, ValueError): + pass + self._run_workers(worker, nb_workers) + if test_support.verbose: + # Useful verbose statistics when tuning this test to take + # less time to run but still ensuring that its still useful. + # + # the percent of close calls that raised an error + percent = 100. - 100.*self.close_success_count/self.close_count + print self.close_count, ('%.4f ' % percent), + + def test_close_open(self): + def io_func(): + pass + self._test_close_open_io(io_func) + + def test_close_open_flush(self): + def io_func(): + self.f.flush() + self._test_close_open_io(io_func) + + def test_close_open_iter(self): + def io_func(): + list(iter(self.f)) + self._test_close_open_io(io_func) + + def test_close_open_isatty(self): + def io_func(): + self.f.isatty() + self._test_close_open_io(io_func) + + def test_close_open_print(self): + def io_func(): + print >> self.f, '' + self._test_close_open_io(io_func) + + def test_close_open_read(self): + def io_func(): + self.f.read(0) + self._test_close_open_io(io_func) + + def test_close_open_readinto(self): + def io_func(): + a = array('c', 'xxxxx') + self.f.readinto(a) + self._test_close_open_io(io_func) + + def test_close_open_readline(self): + def io_func(): + self.f.readline() + self._test_close_open_io(io_func) + + def test_close_open_readlines(self): + def io_func(): + self.f.readlines() + self._test_close_open_io(io_func) + + def test_close_open_seek(self): + def io_func(): + self.f.seek(0, 0) + self._test_close_open_io(io_func) + + def test_close_open_tell(self): + def io_func(): + self.f.tell() + self._test_close_open_io(io_func) + + def test_close_open_truncate(self): + def io_func(): + self.f.truncate() + self._test_close_open_io(io_func) + + def test_close_open_write(self): + def io_func(): + self.f.write('') + self._test_close_open_io(io_func) + + def test_close_open_writelines(self): + def io_func(): + self.f.writelines('') + self._test_close_open_io(io_func) + + + def test_main(): # Historically, these tests have been sloppy about removing TESTFN. # So get rid of it no matter what. try: - run_unittest(AutoFileTests, OtherFileTests, FileSubclassTests) + run_unittest(AutoFileTests, OtherFileTests, FileSubclassTests, + FileThreadingTests) finally: if os.path.exists(TESTFN): os.unlink(TESTFN) |