import itertools import unittest from test.support import import_helper, threading_helper from test.support.threading_helper import run_concurrently zlib = import_helper.import_module("zlib") from test.test_zlib import HAMLET_SCENE NTHREADS = 10 @threading_helper.requires_working_threading() class TestZlib(unittest.TestCase): def test_compressor(self): comp = zlib.compressobj() # First compress() outputs zlib header header = comp.compress(HAMLET_SCENE) self.assertGreater(len(header), 0) def worker(): # it should return empty bytes as it buffers data internally data = comp.compress(HAMLET_SCENE) self.assertEqual(data, b"") run_concurrently(worker_func=worker, nthreads=NTHREADS - 1) full_compressed = header + comp.flush() decompressed = zlib.decompress(full_compressed) # The decompressed data should be HAMLET_SCENE repeated NTHREADS times self.assertEqual(decompressed, HAMLET_SCENE * NTHREADS) def test_decompressor_concurrent_attribute_reads(self): input_data = HAMLET_SCENE * NTHREADS compressed = zlib.compress(input_data) decomp = zlib.decompressobj() decomp_size_per_loop = len(input_data) // 1000 decompressed_parts = [] def decomp_worker(): # Decompress in chunks, which updates eof, unused_data, unconsumed_tail decompressed_parts.append( decomp.decompress(compressed, decomp_size_per_loop) ) while decomp.unconsumed_tail: decompressed_parts.append( decomp.decompress( decomp.unconsumed_tail, decomp_size_per_loop ) ) def decomp_attr_reader(): # Read attributes concurrently while another thread decompresses for _ in range(1000): _ = decomp.unused_data _ = decomp.unconsumed_tail _ = decomp.eof counter = itertools.count() def worker(): # First thread decompresses, others read attributes if next(counter) == 0: decomp_worker() else: decomp_attr_reader() run_concurrently(worker_func=worker, nthreads=NTHREADS) self.assertTrue(decomp.eof) self.assertEqual(decomp.unused_data, b"") decompressed = b"".join(decompressed_parts) self.assertEqual(decompressed, HAMLET_SCENE * NTHREADS) if __name__ == "__main__": unittest.main()