summaryrefslogtreecommitdiffstats
path: root/Lib/test/test_zipfile
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/test/test_zipfile')
-rw-r--r--Lib/test/test_zipfile/_context.py30
-rw-r--r--Lib/test/test_zipfile/_func_timeout_compat.py8
-rw-r--r--Lib/test/test_zipfile/_itertools.py38
-rw-r--r--Lib/test/test_zipfile/_support.py9
-rw-r--r--Lib/test/test_zipfile/test_complexity.py24
-rw-r--r--Lib/test/test_zipfile/test_path.py22
6 files changed, 81 insertions, 50 deletions
diff --git a/Lib/test/test_zipfile/_context.py b/Lib/test/test_zipfile/_context.py
deleted file mode 100644
index 348798a..0000000
--- a/Lib/test/test_zipfile/_context.py
+++ /dev/null
@@ -1,30 +0,0 @@
-import contextlib
-import time
-
-
-class DeadlineExceeded(Exception):
- pass
-
-
-class TimedContext(contextlib.ContextDecorator):
- """
- A context that will raise DeadlineExceeded if the
- max duration is reached during the execution.
-
- >>> TimedContext(1)(time.sleep)(.1)
- >>> TimedContext(0)(time.sleep)(.1)
- Traceback (most recent call last):
- ...
- tests._context.DeadlineExceeded: (..., 0)
- """
-
- def __init__(self, max_duration: int):
- self.max_duration = max_duration
-
- def __enter__(self):
- self.start = time.monotonic()
-
- def __exit__(self, *err):
- duration = time.monotonic() - self.start
- if duration > self.max_duration:
- raise DeadlineExceeded(duration, self.max_duration)
diff --git a/Lib/test/test_zipfile/_func_timeout_compat.py b/Lib/test/test_zipfile/_func_timeout_compat.py
deleted file mode 100644
index b1f2b26..0000000
--- a/Lib/test/test_zipfile/_func_timeout_compat.py
+++ /dev/null
@@ -1,8 +0,0 @@
-try:
- from func_timeout import func_set_timeout as set_timeout
-except ImportError: # pragma: no cover
- # provide a fallback that doesn't actually time out
- from ._context import TimedContext as set_timeout
-
-
-__all__ = ['set_timeout']
diff --git a/Lib/test/test_zipfile/_itertools.py b/Lib/test/test_zipfile/_itertools.py
index 74f01fe..f735dd2 100644
--- a/Lib/test/test_zipfile/_itertools.py
+++ b/Lib/test/test_zipfile/_itertools.py
@@ -1,4 +1,6 @@
import itertools
+from collections import deque
+from itertools import islice
# from jaraco.itertools 6.3.0
@@ -39,3 +41,39 @@ def always_iterable(obj, base_type=(str, bytes)):
return iter(obj)
except TypeError:
return iter((obj,))
+
+
+# from more_itertools v9.0.0
+def consume(iterator, n=None):
+ """Advance *iterable* by *n* steps. If *n* is ``None``, consume it
+ entirely.
+ Efficiently exhausts an iterator without returning values. Defaults to
+ consuming the whole iterator, but an optional second argument may be
+ provided to limit consumption.
+ >>> i = (x for x in range(10))
+ >>> next(i)
+ 0
+ >>> consume(i, 3)
+ >>> next(i)
+ 4
+ >>> consume(i)
+ >>> next(i)
+ Traceback (most recent call last):
+ File "<stdin>", line 1, in <module>
+ StopIteration
+ If the iterator has fewer items remaining than the provided limit, the
+ whole iterator will be consumed.
+ >>> i = (x for x in range(3))
+ >>> consume(i, 5)
+ >>> next(i)
+ Traceback (most recent call last):
+ File "<stdin>", line 1, in <module>
+ StopIteration
+ """
+ # Use functions that consume iterators at C speed.
+ if n is None:
+ # feed the entire iterator into a zero-length deque
+ deque(iterator, maxlen=0)
+ else:
+ # advance to the empty slice starting at position n
+ next(islice(iterator, n, n), None)
diff --git a/Lib/test/test_zipfile/_support.py b/Lib/test/test_zipfile/_support.py
new file mode 100644
index 0000000..1afdf3b
--- /dev/null
+++ b/Lib/test/test_zipfile/_support.py
@@ -0,0 +1,9 @@
+import importlib
+import unittest
+
+
+def import_or_skip(name):
+ try:
+ return importlib.import_module(name)
+ except ImportError: # pragma: no cover
+ raise unittest.SkipTest(f'Unable to import {name}')
diff --git a/Lib/test/test_zipfile/test_complexity.py b/Lib/test/test_zipfile/test_complexity.py
new file mode 100644
index 0000000..3432dc3
--- /dev/null
+++ b/Lib/test/test_zipfile/test_complexity.py
@@ -0,0 +1,24 @@
+import unittest
+import string
+import zipfile
+
+from ._functools import compose
+from ._itertools import consume
+
+from ._support import import_or_skip
+
+
+big_o = import_or_skip('big_o')
+
+
+class TestComplexity(unittest.TestCase):
+ def test_implied_dirs_performance(self):
+ best, others = big_o.big_o(
+ compose(consume, zipfile.CompleteDirs._implied_dirs),
+ lambda size: [
+ '/'.join(string.ascii_lowercase + str(n)) for n in range(size)
+ ],
+ max_n=1000,
+ min_n=1,
+ )
+ assert best <= big_o.complexities.Linear
diff --git a/Lib/test/test_zipfile/test_path.py b/Lib/test/test_zipfile/test_path.py
index 53cbef1..aff91e5 100644
--- a/Lib/test/test_zipfile/test_path.py
+++ b/Lib/test/test_zipfile/test_path.py
@@ -3,7 +3,6 @@ import itertools
import contextlib
import pathlib
import pickle
-import string
import sys
import unittest
import zipfile
@@ -12,7 +11,6 @@ from ._functools import compose
from ._itertools import Counter
from ._test_params import parameterize, Invoked
-from ._func_timeout_compat import set_timeout
from test.support.os_helper import temp_dir
@@ -22,9 +20,6 @@ class jaraco:
Counter = Counter
-consume = tuple
-
-
def add_dirs(zf):
"""
Given a writable zip file zf, inject directory entries for
@@ -330,12 +325,6 @@ class TestPath(unittest.TestCase):
# Check the file iterated all items
assert entries.count == self.HUGE_ZIPFILE_NUM_ENTRIES
- # timeout disabled due to #102209
- # @set_timeout(3)
- def test_implied_dirs_performance(self):
- data = ['/'.join(string.ascii_lowercase + str(n)) for n in range(10000)]
- zipfile.CompleteDirs._implied_dirs(data)
-
@pass_alpharep
def test_read_does_not_close(self, alpharep):
alpharep = self.zipfile_ondisk(alpharep)
@@ -513,7 +502,7 @@ class TestPath(unittest.TestCase):
saved_1 = pickle.dumps(zipfile.Path(zipfile_ondisk, at=subpath))
restored_1 = pickle.loads(saved_1)
first, *rest = restored_1.iterdir()
- assert first.read_text().startswith('content of ')
+ assert first.read_text(encoding='utf-8').startswith('content of ')
@pass_alpharep
def test_extract_orig_with_implied_dirs(self, alpharep):
@@ -525,3 +514,12 @@ class TestPath(unittest.TestCase):
# wrap the zipfile for its side effect
zipfile.Path(zf)
zf.extractall(source_path.parent)
+
+ @pass_alpharep
+ def test_getinfo_missing(self, alpharep):
+ """
+ Validate behavior of getinfo on original zipfile after wrapping.
+ """
+ zipfile.Path(alpharep)
+ with self.assertRaises(KeyError):
+ alpharep.getinfo('does-not-exist')