diff options
Diffstat (limited to 'Lib/test/test_zipfile')
-rw-r--r-- | Lib/test/test_zipfile/_context.py | 30 | ||||
-rw-r--r-- | Lib/test/test_zipfile/_func_timeout_compat.py | 8 | ||||
-rw-r--r-- | Lib/test/test_zipfile/_itertools.py | 38 | ||||
-rw-r--r-- | Lib/test/test_zipfile/_support.py | 9 | ||||
-rw-r--r-- | Lib/test/test_zipfile/test_complexity.py | 24 | ||||
-rw-r--r-- | Lib/test/test_zipfile/test_path.py | 22 |
6 files changed, 81 insertions, 50 deletions
diff --git a/Lib/test/test_zipfile/_context.py b/Lib/test/test_zipfile/_context.py deleted file mode 100644 index 348798a..0000000 --- a/Lib/test/test_zipfile/_context.py +++ /dev/null @@ -1,30 +0,0 @@ -import contextlib -import time - - -class DeadlineExceeded(Exception): - pass - - -class TimedContext(contextlib.ContextDecorator): - """ - A context that will raise DeadlineExceeded if the - max duration is reached during the execution. - - >>> TimedContext(1)(time.sleep)(.1) - >>> TimedContext(0)(time.sleep)(.1) - Traceback (most recent call last): - ... - tests._context.DeadlineExceeded: (..., 0) - """ - - def __init__(self, max_duration: int): - self.max_duration = max_duration - - def __enter__(self): - self.start = time.monotonic() - - def __exit__(self, *err): - duration = time.monotonic() - self.start - if duration > self.max_duration: - raise DeadlineExceeded(duration, self.max_duration) diff --git a/Lib/test/test_zipfile/_func_timeout_compat.py b/Lib/test/test_zipfile/_func_timeout_compat.py deleted file mode 100644 index b1f2b26..0000000 --- a/Lib/test/test_zipfile/_func_timeout_compat.py +++ /dev/null @@ -1,8 +0,0 @@ -try: - from func_timeout import func_set_timeout as set_timeout -except ImportError: # pragma: no cover - # provide a fallback that doesn't actually time out - from ._context import TimedContext as set_timeout - - -__all__ = ['set_timeout'] diff --git a/Lib/test/test_zipfile/_itertools.py b/Lib/test/test_zipfile/_itertools.py index 74f01fe..f735dd2 100644 --- a/Lib/test/test_zipfile/_itertools.py +++ b/Lib/test/test_zipfile/_itertools.py @@ -1,4 +1,6 @@ import itertools +from collections import deque +from itertools import islice # from jaraco.itertools 6.3.0 @@ -39,3 +41,39 @@ def always_iterable(obj, base_type=(str, bytes)): return iter(obj) except TypeError: return iter((obj,)) + + +# from more_itertools v9.0.0 +def consume(iterator, n=None): + """Advance *iterable* by *n* steps. If *n* is ``None``, consume it + entirely. + Efficiently exhausts an iterator without returning values. Defaults to + consuming the whole iterator, but an optional second argument may be + provided to limit consumption. + >>> i = (x for x in range(10)) + >>> next(i) + 0 + >>> consume(i, 3) + >>> next(i) + 4 + >>> consume(i) + >>> next(i) + Traceback (most recent call last): + File "<stdin>", line 1, in <module> + StopIteration + If the iterator has fewer items remaining than the provided limit, the + whole iterator will be consumed. + >>> i = (x for x in range(3)) + >>> consume(i, 5) + >>> next(i) + Traceback (most recent call last): + File "<stdin>", line 1, in <module> + StopIteration + """ + # Use functions that consume iterators at C speed. + if n is None: + # feed the entire iterator into a zero-length deque + deque(iterator, maxlen=0) + else: + # advance to the empty slice starting at position n + next(islice(iterator, n, n), None) diff --git a/Lib/test/test_zipfile/_support.py b/Lib/test/test_zipfile/_support.py new file mode 100644 index 0000000..1afdf3b --- /dev/null +++ b/Lib/test/test_zipfile/_support.py @@ -0,0 +1,9 @@ +import importlib +import unittest + + +def import_or_skip(name): + try: + return importlib.import_module(name) + except ImportError: # pragma: no cover + raise unittest.SkipTest(f'Unable to import {name}') diff --git a/Lib/test/test_zipfile/test_complexity.py b/Lib/test/test_zipfile/test_complexity.py new file mode 100644 index 0000000..3432dc3 --- /dev/null +++ b/Lib/test/test_zipfile/test_complexity.py @@ -0,0 +1,24 @@ +import unittest +import string +import zipfile + +from ._functools import compose +from ._itertools import consume + +from ._support import import_or_skip + + +big_o = import_or_skip('big_o') + + +class TestComplexity(unittest.TestCase): + def test_implied_dirs_performance(self): + best, others = big_o.big_o( + compose(consume, zipfile.CompleteDirs._implied_dirs), + lambda size: [ + '/'.join(string.ascii_lowercase + str(n)) for n in range(size) + ], + max_n=1000, + min_n=1, + ) + assert best <= big_o.complexities.Linear diff --git a/Lib/test/test_zipfile/test_path.py b/Lib/test/test_zipfile/test_path.py index 53cbef1..aff91e5 100644 --- a/Lib/test/test_zipfile/test_path.py +++ b/Lib/test/test_zipfile/test_path.py @@ -3,7 +3,6 @@ import itertools import contextlib import pathlib import pickle -import string import sys import unittest import zipfile @@ -12,7 +11,6 @@ from ._functools import compose from ._itertools import Counter from ._test_params import parameterize, Invoked -from ._func_timeout_compat import set_timeout from test.support.os_helper import temp_dir @@ -22,9 +20,6 @@ class jaraco: Counter = Counter -consume = tuple - - def add_dirs(zf): """ Given a writable zip file zf, inject directory entries for @@ -330,12 +325,6 @@ class TestPath(unittest.TestCase): # Check the file iterated all items assert entries.count == self.HUGE_ZIPFILE_NUM_ENTRIES - # timeout disabled due to #102209 - # @set_timeout(3) - def test_implied_dirs_performance(self): - data = ['/'.join(string.ascii_lowercase + str(n)) for n in range(10000)] - zipfile.CompleteDirs._implied_dirs(data) - @pass_alpharep def test_read_does_not_close(self, alpharep): alpharep = self.zipfile_ondisk(alpharep) @@ -513,7 +502,7 @@ class TestPath(unittest.TestCase): saved_1 = pickle.dumps(zipfile.Path(zipfile_ondisk, at=subpath)) restored_1 = pickle.loads(saved_1) first, *rest = restored_1.iterdir() - assert first.read_text().startswith('content of ') + assert first.read_text(encoding='utf-8').startswith('content of ') @pass_alpharep def test_extract_orig_with_implied_dirs(self, alpharep): @@ -525,3 +514,12 @@ class TestPath(unittest.TestCase): # wrap the zipfile for its side effect zipfile.Path(zf) zf.extractall(source_path.parent) + + @pass_alpharep + def test_getinfo_missing(self, alpharep): + """ + Validate behavior of getinfo on original zipfile after wrapping. + """ + zipfile.Path(alpharep) + with self.assertRaises(KeyError): + alpharep.getinfo('does-not-exist') |