diff --git a/Lib/shutil.py b/Lib/shutil.py index 420802f..d0ff2ef 100644 --- a/Lib/shutil.py +++ b/Lib/shutil.py @@ -446,17 +446,24 @@ def _make_zipfile(base_name, base_dir, verbose=0, dry_run=0, logger=None): zip_filename, base_dir) if not dry_run: - zip = zipfile.ZipFile(zip_filename, "w", - compression=zipfile.ZIP_DEFLATED) - - for dirpath, dirnames, filenames in os.walk(base_dir): - for name in filenames: - path = os.path.normpath(os.path.join(dirpath, name)) - if os.path.isfile(path): - zip.write(path, path) + with zipfile.ZipFile(zip_filename, "w", + compression=zipfile.ZIP_DEFLATED) as zf: + path = os.path.normpath(base_dir) + zf.write(path, path) + if logger is not None: + logger.info("adding '%s'", path) + for dirpath, dirnames, filenames in os.walk(base_dir): + for name in sorted(dirnames): + path = os.path.normpath(os.path.join(dirpath, name)) + zf.write(path, path) if logger is not None: logger.info("adding '%s'", path) - zip.close() + for name in filenames: + path = os.path.normpath(os.path.join(dirpath, name)) + if os.path.isfile(path): + zf.write(path, path) + if logger is not None: + logger.info("adding '%s'", path) return zip_filename diff --git a/Lib/test/test_shutil.py b/Lib/test/test_shutil.py index 9bdb724..9238489 100644 --- a/Lib/test/test_shutil.py +++ b/Lib/test/test_shutil.py @@ -10,13 +10,13 @@ import os.path import errno from os.path import splitdrive from distutils.spawn import find_executable, spawn -from shutil import (_make_tarball, _make_zipfile, make_archive, +from shutil import (make_archive, register_archive_format, unregister_archive_format, get_archive_formats) import tarfile import warnings -from test import test_support +from test import test_support as support from test.test_support import TESTFN, check_warnings, captured_stdout TESTFN2 = TESTFN + "2" @@ -372,139 +372,135 @@ class TestShutil(unittest.TestCase): @unittest.skipUnless(zlib, "requires zlib") def test_make_tarball(self): # creating something to tar - tmpdir = self.mkdtemp() - self.write_file([tmpdir, 'file1'], 'xxx') - self.write_file([tmpdir, 'file2'], 'xxx') - os.mkdir(os.path.join(tmpdir, 'sub')) - self.write_file([tmpdir, 'sub', 'file3'], 'xxx') + root_dir, base_dir = self._create_files('') tmpdir2 = self.mkdtemp() # force shutil to create the directory os.rmdir(tmpdir2) - unittest.skipUnless(splitdrive(tmpdir)[0] == splitdrive(tmpdir2)[0], + unittest.skipUnless(splitdrive(root_dir)[0] == splitdrive(tmpdir2)[0], "source and target should be on same drive") base_name = os.path.join(tmpdir2, 'archive') # working with relative paths to avoid tar warnings - old_dir = os.getcwd() - os.chdir(tmpdir) - try: - _make_tarball(splitdrive(base_name)[1], '.') - finally: - os.chdir(old_dir) + make_archive(splitdrive(base_name)[1], 'gztar', root_dir, '.') # check if the compressed tarball was created tarball = base_name + '.tar.gz' - self.assertTrue(os.path.exists(tarball)) + self.assertTrue(os.path.isfile(tarball)) + self.assertTrue(tarfile.is_tarfile(tarball)) + with tarfile.open(tarball, 'r:gz') as tf: + self.assertEqual(sorted(tf.getnames()), + ['.', './file1', './file2', + './sub', './sub/file3', './sub2']) # trying an uncompressed one base_name = os.path.join(tmpdir2, 'archive') - old_dir = os.getcwd() - os.chdir(tmpdir) - try: - _make_tarball(splitdrive(base_name)[1], '.', compress=None) - finally: - os.chdir(old_dir) + make_archive(splitdrive(base_name)[1], 'tar', root_dir, '.') tarball = base_name + '.tar' - self.assertTrue(os.path.exists(tarball)) + self.assertTrue(os.path.isfile(tarball)) + self.assertTrue(tarfile.is_tarfile(tarball)) + with tarfile.open(tarball, 'r') as tf: + self.assertEqual(sorted(tf.getnames()), + ['.', './file1', './file2', + './sub', './sub/file3', './sub2']) def _tarinfo(self, path): - tar = tarfile.open(path) - try: + with tarfile.open(path) as tar: names = tar.getnames() names.sort() return tuple(names) - finally: - tar.close() - def _create_files(self): + def _create_files(self, base_dir='dist'): # creating something to tar - tmpdir = self.mkdtemp() - dist = os.path.join(tmpdir, 'dist') - os.mkdir(dist) - self.write_file([dist, 'file1'], 'xxx') - self.write_file([dist, 'file2'], 'xxx') + root_dir = self.mkdtemp() + dist = os.path.join(root_dir, base_dir) + if not os.path.isdir(dist): + os.makedirs(dist) + self.write_file((dist, 'file1'), 'xxx') + self.write_file((dist, 'file2'), 'xxx') os.mkdir(os.path.join(dist, 'sub')) - self.write_file([dist, 'sub', 'file3'], 'xxx') + self.write_file((dist, 'sub', 'file3'), 'xxx') os.mkdir(os.path.join(dist, 'sub2')) - tmpdir2 = self.mkdtemp() - base_name = os.path.join(tmpdir2, 'archive') - return tmpdir, tmpdir2, base_name + if base_dir: + self.write_file((root_dir, 'outer'), 'xxx') + return root_dir, base_dir @unittest.skipUnless(zlib, "Requires zlib") - @unittest.skipUnless(find_executable('tar') and find_executable('gzip'), + @unittest.skipUnless(find_executable('tar'), 'Need the tar command to run') def test_tarfile_vs_tar(self): - tmpdir, tmpdir2, base_name = self._create_files() - old_dir = os.getcwd() - os.chdir(tmpdir) - try: - _make_tarball(base_name, 'dist') - finally: - os.chdir(old_dir) + root_dir, base_dir = self._create_files() + base_name = os.path.join(self.mkdtemp(), 'archive') + make_archive(base_name, 'gztar', root_dir, base_dir) # check if the compressed tarball was created tarball = base_name + '.tar.gz' - self.assertTrue(os.path.exists(tarball)) + self.assertTrue(os.path.isfile(tarball)) # now create another tarball using `tar` - tarball2 = os.path.join(tmpdir, 'archive2.tar.gz') - tar_cmd = ['tar', '-cf', 'archive2.tar', 'dist'] - gzip_cmd = ['gzip', '-f9', 'archive2.tar'] - old_dir = os.getcwd() - os.chdir(tmpdir) - try: - with captured_stdout() as s: - spawn(tar_cmd) - spawn(gzip_cmd) - finally: - os.chdir(old_dir) + tarball2 = os.path.join(root_dir, 'archive2.tar') + tar_cmd = ['tar', '-cf', 'archive2.tar', base_dir] + with support.change_cwd(root_dir), captured_stdout(): + spawn(tar_cmd) - self.assertTrue(os.path.exists(tarball2)) + self.assertTrue(os.path.isfile(tarball2)) # let's compare both tarballs self.assertEqual(self._tarinfo(tarball), self._tarinfo(tarball2)) # trying an uncompressed one - base_name = os.path.join(tmpdir2, 'archive') - old_dir = os.getcwd() - os.chdir(tmpdir) - try: - _make_tarball(base_name, 'dist', compress=None) - finally: - os.chdir(old_dir) + make_archive(base_name, 'tar', root_dir, base_dir) tarball = base_name + '.tar' - self.assertTrue(os.path.exists(tarball)) + self.assertTrue(os.path.isfile(tarball)) # now for a dry_run - base_name = os.path.join(tmpdir2, 'archive') - old_dir = os.getcwd() - os.chdir(tmpdir) - try: - _make_tarball(base_name, 'dist', compress=None, dry_run=True) - finally: - os.chdir(old_dir) + make_archive(base_name, 'tar', root_dir, base_dir, dry_run=True) tarball = base_name + '.tar' - self.assertTrue(os.path.exists(tarball)) + self.assertTrue(os.path.isfile(tarball)) @unittest.skipUnless(zlib, "Requires zlib") @unittest.skipUnless(ZIP_SUPPORT, 'Need zip support to run') def test_make_zipfile(self): - # creating something to tar - tmpdir = self.mkdtemp() - self.write_file([tmpdir, 'file1'], 'xxx') - self.write_file([tmpdir, 'file2'], 'xxx') + # creating something to zip + root_dir, base_dir = self._create_files() + base_name = os.path.join(self.mkdtemp(), 'archive') - tmpdir2 = self.mkdtemp() - # force shutil to create the directory - os.rmdir(tmpdir2) - base_name = os.path.join(tmpdir2, 'archive') - _make_zipfile(base_name, tmpdir) + res = make_archive(base_name, 'zip', root_dir, base_dir) - # check if the compressed tarball was created - tarball = base_name + '.zip' - self.assertTrue(os.path.exists(tarball)) + self.assertEqual(res, base_name + '.zip') + self.assertTrue(os.path.isfile(res)) + self.assertTrue(zipfile.is_zipfile(res)) + with zipfile.ZipFile(res) as zf: + self.assertEqual(sorted(zf.namelist()), + ['dist/', 'dist/file1', 'dist/file2', + 'dist/sub/', 'dist/sub/file3', 'dist/sub2/']) + @unittest.skipUnless(zlib, "Requires zlib") + @unittest.skipUnless(ZIP_SUPPORT, 'Need zip support to run') + @unittest.skipUnless(find_executable('zip'), + 'Need the zip command to run') + def test_zipfile_vs_zip(self): + root_dir, base_dir = self._create_files() + base_name = os.path.join(self.mkdtemp(), 'archive') + archive = make_archive(base_name, 'zip', root_dir, base_dir) + + # check if ZIP file was created + self.assertEqual(archive, base_name + '.zip') + self.assertTrue(os.path.isfile(archive)) + + # now create another ZIP file using `zip` + archive2 = os.path.join(root_dir, 'archive2.zip') + zip_cmd = ['zip', '-q', '-r', 'archive2.zip', base_dir] + with support.change_cwd(root_dir): + spawn(zip_cmd) + + self.assertTrue(os.path.isfile(archive2)) + # let's compare both ZIP files + with zipfile.ZipFile(archive) as zf: + names = zf.namelist() + with zipfile.ZipFile(archive2) as zf: + names2 = zf.namelist() + self.assertEqual(sorted(names), sorted(names2)) def test_make_archive(self): tmpdir = self.mkdtemp() @@ -521,39 +517,36 @@ class TestShutil(unittest.TestCase): else: group = owner = 'root' - base_dir, root_dir, base_name = self._create_files() - base_name = os.path.join(self.mkdtemp() , 'archive') + root_dir, base_dir = self._create_files() + base_name = os.path.join(self.mkdtemp(), 'archive') res = make_archive(base_name, 'zip', root_dir, base_dir, owner=owner, group=group) - self.assertTrue(os.path.exists(res)) + self.assertTrue(os.path.isfile(res)) res = make_archive(base_name, 'zip', root_dir, base_dir) - self.assertTrue(os.path.exists(res)) + self.assertTrue(os.path.isfile(res)) res = make_archive(base_name, 'tar', root_dir, base_dir, owner=owner, group=group) - self.assertTrue(os.path.exists(res)) + self.assertTrue(os.path.isfile(res)) res = make_archive(base_name, 'tar', root_dir, base_dir, owner='kjhkjhkjg', group='oihohoh') - self.assertTrue(os.path.exists(res)) + self.assertTrue(os.path.isfile(res)) @unittest.skipUnless(zlib, "Requires zlib") @unittest.skipUnless(UID_GID_SUPPORT, "Requires grp and pwd support") def test_tarfile_root_owner(self): - tmpdir, tmpdir2, base_name = self._create_files() - old_dir = os.getcwd() - os.chdir(tmpdir) + root_dir, base_dir = self._create_files() + base_name = os.path.join(self.mkdtemp(), 'archive') group = grp.getgrgid(0)[0] owner = pwd.getpwuid(0)[0] - try: - archive_name = _make_tarball(base_name, 'dist', compress=None, - owner=owner, group=group) - finally: - os.chdir(old_dir) + with support.change_cwd(root_dir): + archive_name = make_archive(base_name, 'gztar', root_dir, 'dist', + owner=owner, group=group) # check if the compressed tarball was created - self.assertTrue(os.path.exists(archive_name)) + self.assertTrue(os.path.isfile(archive_name)) # now checks the rights archive = tarfile.open(archive_name) @@ -859,7 +852,7 @@ class TestCopyFile(unittest.TestCase): def test_main(): - test_support.run_unittest(TestShutil, TestMove, TestCopyFile) + support.run_unittest(TestShutil, TestMove, TestCopyFile) if __name__ == '__main__': test_main() diff --git a/Lib/test/test_support.py b/Lib/test/test_support.py index 42c1b4d..98a9275 100644 --- a/Lib/test/test_support.py +++ b/Lib/test/test_support.py @@ -491,6 +491,33 @@ TESTFN = "{}_{}_tmp".format(TESTFN, os.getpid()) SAVEDCWD = os.getcwd() @contextlib.contextmanager +def change_cwd(path, quiet=False): + """Return a context manager that changes the current working directory. + + Arguments: + + path: the directory to use as the temporary current working directory. + + quiet: if False (the default), the context manager raises an exception + on error. Otherwise, it issues only a warning and keeps the current + working directory the same. + + """ + saved_dir = os.getcwd() + try: + os.chdir(path) + except OSError: + if not quiet: + raise + warnings.warn('tests may fail, unable to change CWD to: ' + path, + RuntimeWarning, stacklevel=3) + try: + yield os.getcwd() + finally: + os.chdir(saved_dir) + + +@contextlib.contextmanager def temp_cwd(name='tempcwd', quiet=False): """ Context manager that creates a temporary directory and set it as CWD.