From f45ea46fe8f1d22ac7602f280f9c4e4deb6562ba Mon Sep 17 00:00:00 2001 From: Yury Selivanov Date: Wed, 24 Sep 2014 23:41:28 -0400 Subject: asyncio: Reverting 69d474dab479 as issue #21645 is now closed and debug is no longer needed --- Lib/test/test_asyncio/test_streams.py | 6 ------ 1 file changed, 6 deletions(-) diff --git a/Lib/test/test_asyncio/test_streams.py b/Lib/test/test_asyncio/test_streams.py index 8adc3b2..73a375a 100644 --- a/Lib/test/test_asyncio/test_streams.py +++ b/Lib/test/test_asyncio/test_streams.py @@ -596,12 +596,6 @@ class StreamReaderTests(test_utils.TestCase): code = """\ import os, sys -try: - import faulthandler -except ImportError: - pass -else: - faulthandler.dump_traceback_later(60, exit=True) fd = int(sys.argv[1]) os.write(fd, b'data') os.close(fd) -- cgit v0.12 orm method='get' action=''> https://github.com/python/cpython.git
summaryrefslogtreecommitdiffstats
path: root/Lib/test/test_dummy_threading.py
blob: 2f59d5f9811144b4451f01699bddaeffa483bb0c (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
from test import test_support
import unittest
import dummy_threading as _threading
import time

class DummyThreadingTestCase(unittest.TestCase):

    class TestThread(_threading.Thread):

        def run(self):
            global running
            global sema
            global mutex
            # Uncomment if testing another module, such as the real 'threading'
            # module.
            #delay = random.random() * 2
            delay = 0
            if test_support.verbose:
                print 'task', self.name, 'will run for', delay, 'sec'
            sema.acquire()
            mutex.acquire()
            running += 1
            if test_support.verbose:
                print running, 'tasks are running'
            mutex.release()
            time.sleep(delay)
            if test_support.verbose:
                print 'task', self.name, 'done'
            mutex.acquire()
            running -= 1
            if test_support.verbose:
                print self.name, 'is finished.', running, 'tasks are running'
            mutex.release()
            sema.release()

    def setUp(self):
        self.numtasks = 10
        global sema
        sema = _threading.BoundedSemaphore(value=3)
        global mutex
        mutex = _threading.RLock()
        global running
        running = 0
        self.threads = []

    def test_tasks(self):
        for i in range(self.numtasks):
            t = self.TestThread(name="<thread %d>"%i)
            self.threads.append(t)
            t.start()

        if test_support.verbose:
            print 'waiting for all tasks to complete'
        for t in self.threads:
            t.join()
        if test_support.verbose:
            print 'all tasks done'

def test_main():
    test_support.run_unittest(DummyThreadingTestCase)

if __name__ == '__main__':
    test_main()