summaryrefslogtreecommitdiffstats
path: root/Include/unicodeobject.h
stat options
Period:
Authors:

Commits per author per week (path 'Include/unicodeobject.h')

AuthorW09 2026W10 2026W11 2026W12 2026Total
Total00000
a/Lib/test/test_asyncio/test_events.py +++ b/Lib/test/test_asyncio/test_events.py @@ -1475,12 +1475,12 @@ class EventLoopTestsMixin: return len(data) test_utils.run_until(self.loop, lambda: reader(data) >= 1, - timeout=10) + timeout=support.SHORT_TIMEOUT) self.assertEqual(b'1', data) transport.write(b'2345') test_utils.run_until(self.loop, lambda: reader(data) >= 5, - timeout=10) + timeout=support.SHORT_TIMEOUT) self.assertEqual(b'12345', data) self.assertEqual('CONNECTED', proto.state) @@ -1531,27 +1531,29 @@ class EventLoopTestsMixin: return len(data) write_transport.write(b'1') - test_utils.run_until(self.loop, lambda: reader(data) >= 1, timeout=10) + test_utils.run_until(self.loop, lambda: reader(data) >= 1, + timeout=support.SHORT_TIMEOUT) self.assertEqual(b'1', data) self.assertEqual(['INITIAL', 'CONNECTED'], read_proto.state) self.assertEqual('CONNECTED', write_proto.state) os.write(master, b'a') test_utils.run_until(self.loop, lambda: read_proto.nbytes >= 1, - timeout=10) + timeout=support.SHORT_TIMEOUT) self.assertEqual(['INITIAL', 'CONNECTED'], read_proto.state) self.assertEqual(1, read_proto.nbytes) self.assertEqual('CONNECTED', write_proto.state) write_transport.write(b'2345') - test_utils.run_until(self.loop, lambda: reader(data) >= 5, timeout=10) + test_utils.run_until(self.loop, lambda: reader(data) >= 5, + timeout=support.SHORT_TIMEOUT) self.assertEqual(b'12345', data) self.assertEqual(['INITIAL', 'CONNECTED'], read_proto.state) self.assertEqual('CONNECTED', write_proto.state) os.write(master, b'bcde') test_utils.run_until(self.loop, lambda: read_proto.nbytes >= 5, - timeout=10) + timeout=support.SHORT_TIMEOUT) self.assertEqual(['INITIAL', 'CONNECTED'], read_proto.state) self.assertEqual(5, read_proto.nbytes) self.assertEqual('CONNECTED', write_proto.state) diff --git a/Lib/test/test_asyncio/test_sslproto.py b/Lib/test/test_asyncio/test_sslproto.py index 8e9d4c5..c716eac 100644 --- a/Lib/test/test_asyncio/test_sslproto.py +++ b/Lib/test/test_asyncio/test_sslproto.py @@ -274,7 +274,8 @@ class BaseStartTLS(func_tests.FunctionalTestCaseMixin): with self.tcp_server(serve, timeout=self.TIMEOUT) as srv: self.loop.run_until_complete( - asyncio.wait_for(client(srv.addr), timeout=10)) + asyncio.wait_for(client(srv.addr), + timeout=support.SHORT_TIMEOUT)) # No garbage is left if SSL is closed uncleanly client_context = weakref.ref(client_context) @@ -335,7 +336,8 @@ class BaseStartTLS(func_tests.FunctionalTestCaseMixin): with self.tcp_server(serve, timeout=self.TIMEOUT) as srv: self.loop.run_until_complete( - asyncio.wait_for(client(srv.addr), timeout=10)) + asyncio.wait_for(client(srv.addr), + timeout=support.SHORT_TIMEOUT)) # No garbage is left for SSL client from loop.create_connection, even # if user stores the SSLTransport in corresponding protocol instance @@ -491,7 +493,8 @@ class BaseStartTLS(func_tests.FunctionalTestCaseMixin): with self.tcp_server(serve, timeout=self.TIMEOUT) as srv: self.loop.run_until_complete( - asyncio.wait_for(client(srv.addr), timeout=10)) + asyncio.wait_for(client(srv.addr), + timeout=support.SHORT_TIMEOUT)) def test_start_tls_server_1(self): HELLO_MSG = b'1' * self.PAYLOAD_SIZE @@ -619,7 +622,7 @@ class BaseStartTLS(func_tests.FunctionalTestCaseMixin): *addr, ssl=client_sslctx, server_hostname='', - ssl_handshake_timeout=10.0), + ssl_handshake_timeout=support.SHORT_TIMEOUT), 0.5) with self.tcp_server(server, diff --git a/Lib/test/test_asyncio/utils.py b/Lib/test/test_asyncio/utils.py index 6d8a6e2..804db91 100644 --- a/Lib/test/test_asyncio/utils.py +++ b/Lib/test/test_asyncio/utils.py @@ -107,7 +107,7 @@ def run_briefly(loop): gen.close() -def run_until(loop, pred, timeout=30): +def run_until(loop, pred, timeout=support.SHORT_TIMEOUT): deadline = time.monotonic() + timeout while not pred(): if timeout is not None: diff --git a/Lib/test/test_concurrent_futures.py b/Lib/test/test_concurrent_futures.py index ff9a493..8b75185 100644 --- a/Lib/test/test_concurrent_futures.py +++ b/Lib/test/test_concurrent_futures.py @@ -1,9 +1,9 @@ -import test.support +from test import support # Skip tests if _multiprocessing wasn't built. -test.support.import_module('_multiprocessing') +support.import_module('_multiprocessing') # Skip tests if sem_open implementation is broken. -test.support.import_module('multiprocessing.synchronize') +support.import_module('multiprocessing.synchronize') from test.support.script_helper import assert_python_ok @@ -101,11 +101,11 @@ def make_dummy_object(_): class BaseTestCase(unittest.TestCase): def setUp(self): - self._thread_key = test.support.threading_setup() + self._thread_key = support.threading_setup() def tearDown(self): - test.support.reap_children() - test.support.threading_cleanup(*self._thread_key) + support.reap_children() + support.threading_cleanup(*self._thread_key) class ExecutorMixin: @@ -132,7 +132,7 @@ class ExecutorMixin: self.executor = None dt = time.monotonic() - self.t1 - if test.support.verbose: + if support.verbose: print("%.2fs" % dt, end=' ') self.assertLess(dt, 300, "synchronization issue: test lasted too long") @@ -712,7 +712,7 @@ class ExecutorTest: self.executor.map(str, [2] * (self.worker_count + 1)) self.executor.shutdown() - @test.support.cpython_only + @support.cpython_only def test_no_stale_references(self): # Issue #16284: check that the executors don't unnecessarily hang onto # references. @@ -724,7 +724,7 @@ class ExecutorTest: self.executor.submit(my_object.my_method) del my_object - collected = my_object_collected.wait(timeout=5.0) + collected = my_object_collected.wait(timeout=support.SHORT_TIMEOUT) self.assertTrue(collected, "Stale reference not collected within timeout.") @@ -836,7 +836,7 @@ class ProcessPoolExecutorTest(ExecutorTest): self.assertIs(type(cause), futures.process._RemoteTraceback) self.assertIn('raise RuntimeError(123) # some comment', cause.tb) - with test.support.captured_stderr() as f1: + with support.captured_stderr() as f1: try: raise exc except RuntimeError: @@ -929,7 +929,7 @@ class ErrorAtUnpickle(object): class ExecutorDeadlockTest: - TIMEOUT = 15 + TIMEOUT = support.SHORT_TIMEOUT @classmethod def _sleep_id(cls, x, delay): @@ -994,7 +994,7 @@ class ExecutorDeadlockTest: for func, args, error, name in crash_cases: with self.subTest(name): # The captured_stderr reduces the noise in the test report - with test.support.captured_stderr(): + with support.captured_stderr(): executor = self.executor_type( max_workers=2, mp_context=get_context(self.ctx)) res = executor.submit(func, *args) @@ -1061,7 +1061,7 @@ class FutureTests(BaseTestCase): self.assertTrue(was_cancelled) def test_done_callback_raises(self): - with test.support.captured_stderr() as stderr: + with support.captured_stderr() as stderr: raising_was_called = False fn_was_called = False @@ -1116,7 +1116,7 @@ class FutureTests(BaseTestCase): self.assertTrue(was_cancelled) def test_done_callback_raises_already_succeeded(self): - with test.support.captured_stderr() as stderr: + with support.captured_stderr() as stderr: def raising_fn(callback_future): raise Exception('doh!') @@ -1235,7 +1235,8 @@ class FutureTests(BaseTestCase): t = threading.Thread(target=notification) t.start() - self.assertRaises(futures.CancelledError, f1.result, timeout=5) + self.assertRaises(futures.CancelledError, + f1.result, timeout=support.SHORT_TIMEOUT) t.join() def test_exception_with_timeout(self): @@ -1264,7 +1265,7 @@ class FutureTests(BaseTestCase): t = threading.Thread(target=notification) t.start() - self.assertTrue(isinstance(f1.exception(timeout=5), OSError)) + self.assertTrue(isinstance(f1.exception(timeout=support.SHORT_TIMEOUT), OSError)) t.join() def test_multiple_set_result(self): @@ -1300,12 +1301,12 @@ _threads_key = None def setUpModule(): global _threads_key - _threads_key = test.support.threading_setup() + _threads_key = support.threading_setup() def tearDownModule(): - test.support.threading_cleanup(*_threads_key) - test.support.reap_children() + support.threading_cleanup(*_threads_key) + support.reap_children() # cleanup multiprocessing multiprocessing.process._cleanup() @@ -1315,7 +1316,7 @@ def tearDownModule(): # bpo-37421: Explicitly call _run_finalizers() to remove immediately # temporary directories created by multiprocessing.util.get_temp_dir(). multiprocessing.util._run_finalizers() - test.support.gc_collect() + support.gc_collect() if __name__ == "__main__": diff --git a/Lib/test/test_fork1.py b/Lib/test/test_fork1.py index 2ab856f..ce0a05a 100644 --- a/Lib/test/test_fork1.py +++ b/Lib/test/test_fork1.py @@ -10,15 +10,15 @@ import time import unittest from test.fork_wait import ForkWait -from test.support import reap_children, get_attribute, verbose +from test import support # Skip test if fork does not exist. -get_attribute(os, 'fork') +support.get_attribute(os, 'fork') class ForkTest(ForkWait): def wait_impl(self, cpid): - deadline = time.monotonic() + 10.0 + deadline = time.monotonic() + support.SHORT_TIMEOUT while time.monotonic() <= deadline: # waitpid() shouldn't hang, but some of the buildbots seem to hang # in the forking tests. This is an attempt to fix the problem. @@ -56,7 +56,7 @@ class ForkTest(ForkWait): if m == complete_module: os._exit(0) else: - if verbose > 1: + if support.verbose > 1: print("Child encountered partial module") os._exit(1) else: @@ -90,7 +90,7 @@ class ForkTest(ForkWait): imp.release_lock() except RuntimeError: if in_child: - if verbose > 1: + if support.verbose > 1: print("RuntimeError in child") os._exit(1) raise @@ -105,7 +105,7 @@ class ForkTest(ForkWait): def tearDownModule(): - reap_children() + support.reap_children() if __name__ == "__main__": unittest.main() diff --git a/Lib/test/test_pydoc.py b/Lib/test/test_pydoc.py index b803b8b..ebd8d4a 100644 --- a/Lib/test/test_pydoc.py +++ b/Lib/test/test_pydoc.py @@ -1336,7 +1336,7 @@ class PydocServerTest(unittest.TestCase): self.assertIn('0.0.0.0', serverthread.docserver.address) starttime = time.monotonic() - timeout = 1 #seconds + timeout = test.supp