diff options
Diffstat (limited to 'Lib/packaging/tests/test_install.py')
-rw-r--r-- | Lib/packaging/tests/test_install.py | 391 |
1 files changed, 391 insertions, 0 deletions
diff --git a/Lib/packaging/tests/test_install.py b/Lib/packaging/tests/test_install.py new file mode 100644 index 0000000..cc1f5d3 --- /dev/null +++ b/Lib/packaging/tests/test_install.py @@ -0,0 +1,391 @@ +"""Tests for the packaging.install module.""" +import os +import logging +from tempfile import mkstemp +from sysconfig import is_python_build + +from packaging import install +from packaging.pypi.xmlrpc import Client +from packaging.metadata import Metadata +from packaging.tests.support import (LoggingCatcher, TempdirManager, unittest, + fake_dec) +try: + import threading + from packaging.tests.pypi_server import use_xmlrpc_server +except ImportError: + threading = None + use_xmlrpc_server = fake_dec + + +class InstalledDist: + """Distribution object, represent distributions currently installed on the + system""" + def __init__(self, name, version, deps): + self.metadata = Metadata() + self.name = name + self.version = version + self.metadata['Name'] = name + self.metadata['Version'] = version + self.metadata['Requires-Dist'] = deps + + def __repr__(self): + return '<InstalledDist %r>' % self.metadata['Name'] + + +class ToInstallDist: + """Distribution that will be installed""" + + def __init__(self, files=False): + self._files = files + self.install_called = False + self.install_called_with = {} + self.uninstall_called = False + self._real_files = [] + self.name = "fake" + self.version = "fake" + if files: + for f in range(0, 3): + fp, fn = mkstemp() + os.close(fp) + self._real_files.append(fn) + + def _unlink_installed_files(self): + if self._files: + for fn in self._real_files: + os.unlink(fn) + + def list_installed_files(self, **args): + if self._files: + return self._real_files + + def get_install(self, **args): + return self.list_installed_files() + + +class MagicMock: + def __init__(self, return_value=None, raise_exception=False): + self.called = False + self._times_called = 0 + self._called_with = [] + self._return_value = return_value + self._raise = raise_exception + + def __call__(self, *args, **kwargs): + self.called = True + self._times_called = self._times_called + 1 + self._called_with.append((args, kwargs)) + iterable = hasattr(self._raise, '__iter__') + if self._raise: + if ((not iterable and self._raise) + or self._raise[self._times_called - 1]): + raise Exception + return self._return_value + + def called_with(self, *args, **kwargs): + return (args, kwargs) in self._called_with + + +def get_installed_dists(dists): + """Return a list of fake installed dists. + The list is name, version, deps""" + objects = [] + for name, version, deps in dists: + objects.append(InstalledDist(name, version, deps)) + return objects + + +class TestInstall(LoggingCatcher, TempdirManager, unittest.TestCase): + def _get_client(self, server, *args, **kwargs): + return Client(server.full_address, *args, **kwargs) + + def _get_results(self, output): + """return a list of results""" + installed = [(o.name, str(o.version)) for o in output['install']] + remove = [(o.name, str(o.version)) for o in output['remove']] + conflict = [(o.name, str(o.version)) for o in output['conflict']] + return installed, remove, conflict + + @unittest.skipIf(threading is None, 'needs threading') + @use_xmlrpc_server() + def test_existing_deps(self, server): + # Test that the installer get the dependencies from the metadatas + # and ask the index for this dependencies. + # In this test case, we have choxie that is dependent from towel-stuff + # 0.1, which is in-turn dependent on bacon <= 0.2: + # choxie -> towel-stuff -> bacon. + # Each release metadata is not provided in metadata 1.2. + client = self._get_client(server) + archive_path = '%s/distribution.tar.gz' % server.full_address + server.xmlrpc.set_distributions([ + {'name': 'choxie', + 'version': '2.0.0.9', + 'requires_dist': ['towel-stuff (0.1)'], + 'url': archive_path}, + {'name': 'towel-stuff', + 'version': '0.1', + 'requires_dist': ['bacon (<= 0.2)'], + 'url': archive_path}, + {'name': 'bacon', + 'version': '0.1', + 'requires_dist': [], + 'url': archive_path}, + ]) + installed = get_installed_dists([('bacon', '0.1', [])]) + output = install.get_infos("choxie", index=client, + installed=installed) + + # we don't have installed bacon as it's already installed system-wide + self.assertEqual(0, len(output['remove'])) + self.assertEqual(2, len(output['install'])) + readable_output = [(o.name, str(o.version)) + for o in output['install']] + self.assertIn(('towel-stuff', '0.1'), readable_output) + self.assertIn(('choxie', '2.0.0.9'), readable_output) + + @unittest.skipIf(threading is None, 'needs threading') + @use_xmlrpc_server() + def test_upgrade_existing_deps(self, server): + client = self._get_client(server) + archive_path = '%s/distribution.tar.gz' % server.full_address + server.xmlrpc.set_distributions([ + {'name': 'choxie', + 'version': '2.0.0.9', + 'requires_dist': ['towel-stuff (0.1)'], + 'url': archive_path}, + {'name': 'towel-stuff', + 'version': '0.1', + 'requires_dist': ['bacon (>= 0.2)'], + 'url': archive_path}, + {'name': 'bacon', + 'version': '0.2', + 'requires_dist': [], + 'url': archive_path}, + ]) + + output = install.get_infos("choxie", index=client, + installed=get_installed_dists([('bacon', '0.1', [])])) + installed = [(o.name, str(o.version)) for o in output['install']] + + # we need bacon 0.2, but 0.1 is installed. + # So we expect to remove 0.1 and to install 0.2 instead. + remove = [(o.name, str(o.version)) for o in output['remove']] + self.assertIn(('choxie', '2.0.0.9'), installed) + self.assertIn(('towel-stuff', '0.1'), installed) + self.assertIn(('bacon', '0.2'), installed) + self.assertIn(('bacon', '0.1'), remove) + self.assertEqual(0, len(output['conflict'])) + + @unittest.skipIf(threading is None, 'needs threading') + @use_xmlrpc_server() + def test_conflicts(self, server): + # Tests that conflicts are detected + client = self._get_client(server) + archive_path = '%s/distribution.tar.gz' % server.full_address + + # choxie depends on towel-stuff, which depends on bacon. + server.xmlrpc.set_distributions([ + {'name': 'choxie', + 'version': '2.0.0.9', + 'requires_dist': ['towel-stuff (0.1)'], + 'url': archive_path}, + {'name': 'towel-stuff', + 'version': '0.1', + 'requires_dist': ['bacon (>= 0.2)'], + 'url': archive_path}, + {'name': 'bacon', + 'version': '0.2', + 'requires_dist': [], + 'url': archive_path}, + ]) + + # name, version, deps. + already_installed = [('bacon', '0.1', []), + ('chicken', '1.1', ['bacon (0.1)'])] + output = install.get_infos( + 'choxie', index=client, + installed=get_installed_dists(already_installed)) + + # we need bacon 0.2, but 0.1 is installed. + # So we expect to remove 0.1 and to install 0.2 instead. + installed, remove, conflict = self._get_results(output) + self.assertIn(('choxie', '2.0.0.9'), installed) + self.assertIn(('towel-stuff', '0.1'), installed) + self.assertIn(('bacon', '0.2'), installed) + self.assertIn(('bacon', '0.1'), remove) + self.assertIn(('chicken', '1.1'), conflict) + + @unittest.skipIf(threading is None, 'needs threading') + @use_xmlrpc_server() + def test_installation_unexisting_project(self, server): + # Test that the isntalled raises an exception if the project does not + # exists. + client = self._get_client(server) + self.assertRaises(install.InstallationException, + install.get_infos, + 'unexisting project', index=client) + + def test_move_files(self): + # test that the files are really moved, and that the new path is + # returned. + path = self.mkdtemp() + newpath = self.mkdtemp() + files = [os.path.join(path, str(x)) for x in range(1, 20)] + for f in files: + open(f, 'ab+').close() + output = [o for o in install._move_files(files, newpath)] + + # check that output return the list of old/new places + for file_ in files: + name = os.path.split(file_)[-1] + newloc = os.path.join(newpath, name) + self.assertIn((file_, newloc), output) + + # remove the files + for f in [o[1] for o in output]: # o[1] is the new place + os.remove(f) + + def test_update_infos(self): + tests = [[ + {'foo': ['foobar', 'foo', 'baz'], 'baz': ['foo', 'foo']}, + {'foo': ['additional_content', 'yeah'], 'baz': ['test', 'foo']}, + {'foo': ['foobar', 'foo', 'baz', 'additional_content', 'yeah'], + 'baz': ['foo', 'foo', 'test', 'foo']}, + ]] + + for dict1, dict2, expect in tests: + install._update_infos(dict1, dict2) + for key in expect: + self.assertEqual(expect[key], dict1[key]) + + def test_install_dists_rollback(self): + # if one of the distribution installation fails, call uninstall on all + # installed distributions. + + old_install_dist = install._install_dist + old_uninstall = getattr(install, 'uninstall', None) + + install._install_dist = MagicMock(return_value=[], + raise_exception=(False, True)) + install.remove = MagicMock() + try: + d1 = ToInstallDist() + d2 = ToInstallDist() + path = self.mkdtemp() + self.assertRaises(Exception, install.install_dists, [d1, d2], path) + self.assertTrue(install._install_dist.called_with(d1, path)) + self.assertTrue(install.remove.called) + finally: + install._install_dist = old_install_dist + install.remove = old_uninstall + + def test_install_dists_success(self): + old_install_dist = install._install_dist + install._install_dist = MagicMock(return_value=[]) + try: + # test that the install method is called on each distributions + d1 = ToInstallDist() + d2 = ToInstallDist() + + # should call install + path = self.mkdtemp() + install.install_dists([d1, d2], path) + for dist in (d1, d2): + self.assertTrue(install._install_dist.called_with(dist, path)) + finally: + install._install_dist = old_install_dist + + def test_install_from_infos_conflict(self): + # assert conflicts raise an exception + self.assertRaises(install.InstallationConflict, + install.install_from_infos, + conflicts=[ToInstallDist()]) + + def test_install_from_infos_remove_success(self): + old_install_dists = install.install_dists + install.install_dists = lambda x, y=None: None + try: + dists = [] + for i in range(2): + dists.append(ToInstallDist(files=True)) + install.install_from_infos(remove=dists) + + # assert that the files have been removed + for dist in dists: + for f in dist.list_installed_files(): + self.assertFalse(os.path.exists(f)) + finally: + install.install_dists = old_install_dists + + def test_install_from_infos_remove_rollback(self): + old_install_dist = install._install_dist + old_uninstall = getattr(install, 'uninstall', None) + + install._install_dist = MagicMock(return_value=[], + raise_exception=(False, True)) + install.uninstall = MagicMock() + try: + # assert that if an error occurs, the removed files are restored. + remove = [] + for i in range(2): + remove.append(ToInstallDist(files=True)) + to_install = [ToInstallDist(), ToInstallDist()] + temp_dir = self.mkdtemp() + + self.assertRaises(Exception, install.install_from_infos, + install_path=temp_dir, install=to_install, + remove=remove) + # assert that the files are in the same place + # assert that the files have been removed + for dist in remove: + for f in dist.list_installed_files(): + self.assertTrue(os.path.exists(f)) + dist._unlink_installed_files() + finally: + install._install_dist = old_install_dist + install.uninstall = old_uninstall + + def test_install_from_infos_install_succes(self): + old_install_dist = install._install_dist + install._install_dist = MagicMock([]) + try: + # assert that the distribution can be installed + install_path = "my_install_path" + to_install = [ToInstallDist(), ToInstallDist()] + + install.install_from_infos(install_path, install=to_install) + for dist in to_install: + install._install_dist.called_with(install_path) + finally: + install._install_dist = old_install_dist + + def test_install_permission_denied(self): + # if we don't have access to the installation path, we should abort + # immediately + project = os.path.join(os.path.dirname(__file__), 'package.tgz') + + # when running from an uninstalled build, a warning is emitted and the + # installation is not attempted + if is_python_build(): + self.assertFalse(install.install(project)) + self.assertEqual(1, len(self.get_logs(logging.ERROR))) + return + + install_path = self.mkdtemp() + old_get_path = install.get_path + install.get_path = lambda path: install_path + old_mod = os.stat(install_path).st_mode + os.chmod(install_path, 0) + try: + self.assertFalse(install.install(project)) + finally: + os.chmod(install_path, old_mod) + install.get_path = old_get_path + + +def test_suite(): + suite = unittest.TestSuite() + suite.addTest(unittest.makeSuite(TestInstall)) + return suite + +if __name__ == '__main__': + unittest.main(defaultTest='test_suite') |