summaryrefslogtreecommitdiffstats
path: root/Lib/test/test_threadedtempfile.py
blob: 73834673846b1b8171c591f29286b3d1df35ccaf (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
"""
Create and delete FILES_PER_THREAD temp files (via tempfile.TemporaryFile)
in each of NUM_THREADS threads, recording the number of successes and
failures.  A failure is a bug in tempfile, and may be due to:

+ Trying to create more than one tempfile with the same name.
+ Trying to delete a tempfile that doesn't still exist.
+ Something we've never seen before.

By default, NUM_THREADS == 20 and FILES_PER_THREAD == 50.  This is enough to
create about 150 failures per run under Win98SE in 2.0, and runs pretty
quickly. Guido reports needing to boost FILES_PER_THREAD to 500 before
provoking a 2.0 failure under Linux.
"""

NUM_THREADS = 20
FILES_PER_THREAD = 50

import _thread as thread # If this fails, we can't test this module
import threading
import tempfile

from test.support import threading_setup, threading_cleanup, run_unittest
import unittest
import io
from traceback import print_exc

startEvent = threading.Event()

class TempFileGreedy(threading.Thread):
    error_count = 0
    ok_count = 0

    def run(self):
        self.errors = io.StringIO()
        startEvent.wait()
        for i in range(FILES_PER_THREAD):
            try:
                f = tempfile.TemporaryFile("w+b")
                f.close()
            except:
                self.error_count += 1
                print_exc(file=self.errors)
            else:
                self.ok_count += 1


class ThreadedTempFileTest(unittest.TestCase):
    def test_main(self):
        threads = []
        thread_info = threading_setup()

        for i in range(NUM_THREADS):
            t = TempFileGreedy()
            threads.append(t)
            t.start()

        startEvent.set()

        ok = 0
        errors = []
        for t in threads:
            t.join()
            ok += t.ok_count
            if t.error_count:
                errors.append(str(t.get_name()) + str(t.errors.getvalue()))

        threading_cleanup(*thread_info)

        msg = "Errors: errors %d ok %d\n%s" % (len(errors), ok,
            '\n'.join(errors))
        self.assertEquals(errors, [], msg)
        self.assertEquals(ok, NUM_THREADS * FILES_PER_THREAD)

def test_main():
    run_unittest(ThreadedTempFileTest)

if __name__ == "__main__":
    test_main()