You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
376 lines
15 KiB
376 lines
15 KiB
diff --git a/Lib/shutil.py b/Lib/shutil.py |
|
index 420802f..d0ff2ef 100644 |
|
--- a/Lib/shutil.py |
|
+++ b/Lib/shutil.py |
|
@@ -446,17 +446,24 @@ def _make_zipfile(base_name, base_dir, verbose=0, dry_run=0, logger=None): |
|
zip_filename, base_dir) |
|
|
|
if not dry_run: |
|
- zip = zipfile.ZipFile(zip_filename, "w", |
|
- compression=zipfile.ZIP_DEFLATED) |
|
- |
|
- for dirpath, dirnames, filenames in os.walk(base_dir): |
|
- for name in filenames: |
|
- path = os.path.normpath(os.path.join(dirpath, name)) |
|
- if os.path.isfile(path): |
|
- zip.write(path, path) |
|
+ with zipfile.ZipFile(zip_filename, "w", |
|
+ compression=zipfile.ZIP_DEFLATED) as zf: |
|
+ path = os.path.normpath(base_dir) |
|
+ zf.write(path, path) |
|
+ if logger is not None: |
|
+ logger.info("adding '%s'", path) |
|
+ for dirpath, dirnames, filenames in os.walk(base_dir): |
|
+ for name in sorted(dirnames): |
|
+ path = os.path.normpath(os.path.join(dirpath, name)) |
|
+ zf.write(path, path) |
|
if logger is not None: |
|
logger.info("adding '%s'", path) |
|
- zip.close() |
|
+ for name in filenames: |
|
+ path = os.path.normpath(os.path.join(dirpath, name)) |
|
+ if os.path.isfile(path): |
|
+ zf.write(path, path) |
|
+ if logger is not None: |
|
+ logger.info("adding '%s'", path) |
|
|
|
return zip_filename |
|
|
|
diff --git a/Lib/test/test_shutil.py b/Lib/test/test_shutil.py |
|
index 9bdb724..9238489 100644 |
|
--- a/Lib/test/test_shutil.py |
|
+++ b/Lib/test/test_shutil.py |
|
@@ -10,13 +10,13 @@ import os.path |
|
import errno |
|
from os.path import splitdrive |
|
from distutils.spawn import find_executable, spawn |
|
-from shutil import (_make_tarball, _make_zipfile, make_archive, |
|
+from shutil import (make_archive, |
|
register_archive_format, unregister_archive_format, |
|
get_archive_formats) |
|
import tarfile |
|
import warnings |
|
|
|
-from test import test_support |
|
+from test import test_support as support |
|
from test.test_support import TESTFN, check_warnings, captured_stdout |
|
|
|
TESTFN2 = TESTFN + "2" |
|
@@ -372,139 +372,135 @@ class TestShutil(unittest.TestCase): |
|
@unittest.skipUnless(zlib, "requires zlib") |
|
def test_make_tarball(self): |
|
# creating something to tar |
|
- tmpdir = self.mkdtemp() |
|
- self.write_file([tmpdir, 'file1'], 'xxx') |
|
- self.write_file([tmpdir, 'file2'], 'xxx') |
|
- os.mkdir(os.path.join(tmpdir, 'sub')) |
|
- self.write_file([tmpdir, 'sub', 'file3'], 'xxx') |
|
+ root_dir, base_dir = self._create_files('') |
|
|
|
tmpdir2 = self.mkdtemp() |
|
# force shutil to create the directory |
|
os.rmdir(tmpdir2) |
|
- unittest.skipUnless(splitdrive(tmpdir)[0] == splitdrive(tmpdir2)[0], |
|
+ unittest.skipUnless(splitdrive(root_dir)[0] == splitdrive(tmpdir2)[0], |
|
"source and target should be on same drive") |
|
|
|
base_name = os.path.join(tmpdir2, 'archive') |
|
|
|
# working with relative paths to avoid tar warnings |
|
- old_dir = os.getcwd() |
|
- os.chdir(tmpdir) |
|
- try: |
|
- _make_tarball(splitdrive(base_name)[1], '.') |
|
- finally: |
|
- os.chdir(old_dir) |
|
+ make_archive(splitdrive(base_name)[1], 'gztar', root_dir, '.') |
|
|
|
# check if the compressed tarball was created |
|
tarball = base_name + '.tar.gz' |
|
- self.assertTrue(os.path.exists(tarball)) |
|
+ self.assertTrue(os.path.isfile(tarball)) |
|
+ self.assertTrue(tarfile.is_tarfile(tarball)) |
|
+ with tarfile.open(tarball, 'r:gz') as tf: |
|
+ self.assertEqual(sorted(tf.getnames()), |
|
+ ['.', './file1', './file2', |
|
+ './sub', './sub/file3', './sub2']) |
|
|
|
# trying an uncompressed one |
|
base_name = os.path.join(tmpdir2, 'archive') |
|
- old_dir = os.getcwd() |
|
- os.chdir(tmpdir) |
|
- try: |
|
- _make_tarball(splitdrive(base_name)[1], '.', compress=None) |
|
- finally: |
|
- os.chdir(old_dir) |
|
+ make_archive(splitdrive(base_name)[1], 'tar', root_dir, '.') |
|
tarball = base_name + '.tar' |
|
- self.assertTrue(os.path.exists(tarball)) |
|
+ self.assertTrue(os.path.isfile(tarball)) |
|
+ self.assertTrue(tarfile.is_tarfile(tarball)) |
|
+ with tarfile.open(tarball, 'r') as tf: |
|
+ self.assertEqual(sorted(tf.getnames()), |
|
+ ['.', './file1', './file2', |
|
+ './sub', './sub/file3', './sub2']) |
|
|
|
def _tarinfo(self, path): |
|
- tar = tarfile.open(path) |
|
- try: |
|
+ with tarfile.open(path) as tar: |
|
names = tar.getnames() |
|
names.sort() |
|
return tuple(names) |
|
- finally: |
|
- tar.close() |
|
|
|
- def _create_files(self): |
|
+ def _create_files(self, base_dir='dist'): |
|
# creating something to tar |
|
- tmpdir = self.mkdtemp() |
|
- dist = os.path.join(tmpdir, 'dist') |
|
- os.mkdir(dist) |
|
- self.write_file([dist, 'file1'], 'xxx') |
|
- self.write_file([dist, 'file2'], 'xxx') |
|
+ root_dir = self.mkdtemp() |
|
+ dist = os.path.join(root_dir, base_dir) |
|
+ if not os.path.isdir(dist): |
|
+ os.makedirs(dist) |
|
+ self.write_file((dist, 'file1'), 'xxx') |
|
+ self.write_file((dist, 'file2'), 'xxx') |
|
os.mkdir(os.path.join(dist, 'sub')) |
|
- self.write_file([dist, 'sub', 'file3'], 'xxx') |
|
+ self.write_file((dist, 'sub', 'file3'), 'xxx') |
|
os.mkdir(os.path.join(dist, 'sub2')) |
|
- tmpdir2 = self.mkdtemp() |
|
- base_name = os.path.join(tmpdir2, 'archive') |
|
- return tmpdir, tmpdir2, base_name |
|
+ if base_dir: |
|
+ self.write_file((root_dir, 'outer'), 'xxx') |
|
+ return root_dir, base_dir |
|
|
|
@unittest.skipUnless(zlib, "Requires zlib") |
|
- @unittest.skipUnless(find_executable('tar') and find_executable('gzip'), |
|
+ @unittest.skipUnless(find_executable('tar'), |
|
'Need the tar command to run') |
|
def test_tarfile_vs_tar(self): |
|
- tmpdir, tmpdir2, base_name = self._create_files() |
|
- old_dir = os.getcwd() |
|
- os.chdir(tmpdir) |
|
- try: |
|
- _make_tarball(base_name, 'dist') |
|
- finally: |
|
- os.chdir(old_dir) |
|
+ root_dir, base_dir = self._create_files() |
|
+ base_name = os.path.join(self.mkdtemp(), 'archive') |
|
+ make_archive(base_name, 'gztar', root_dir, base_dir) |
|
|
|
# check if the compressed tarball was created |
|
tarball = base_name + '.tar.gz' |
|
- self.assertTrue(os.path.exists(tarball)) |
|
+ self.assertTrue(os.path.isfile(tarball)) |
|
|
|
# now create another tarball using `tar` |
|
- tarball2 = os.path.join(tmpdir, 'archive2.tar.gz') |
|
- tar_cmd = ['tar', '-cf', 'archive2.tar', 'dist'] |
|
- gzip_cmd = ['gzip', '-f9', 'archive2.tar'] |
|
- old_dir = os.getcwd() |
|
- os.chdir(tmpdir) |
|
- try: |
|
- with captured_stdout() as s: |
|
- spawn(tar_cmd) |
|
- spawn(gzip_cmd) |
|
- finally: |
|
- os.chdir(old_dir) |
|
+ tarball2 = os.path.join(root_dir, 'archive2.tar') |
|
+ tar_cmd = ['tar', '-cf', 'archive2.tar', base_dir] |
|
+ with support.change_cwd(root_dir), captured_stdout(): |
|
+ spawn(tar_cmd) |
|
|
|
- self.assertTrue(os.path.exists(tarball2)) |
|
+ self.assertTrue(os.path.isfile(tarball2)) |
|
# let's compare both tarballs |
|
self.assertEqual(self._tarinfo(tarball), self._tarinfo(tarball2)) |
|
|
|
# trying an uncompressed one |
|
- base_name = os.path.join(tmpdir2, 'archive') |
|
- old_dir = os.getcwd() |
|
- os.chdir(tmpdir) |
|
- try: |
|
- _make_tarball(base_name, 'dist', compress=None) |
|
- finally: |
|
- os.chdir(old_dir) |
|
+ make_archive(base_name, 'tar', root_dir, base_dir) |
|
tarball = base_name + '.tar' |
|
- self.assertTrue(os.path.exists(tarball)) |
|
+ self.assertTrue(os.path.isfile(tarball)) |
|
|
|
# now for a dry_run |
|
- base_name = os.path.join(tmpdir2, 'archive') |
|
- old_dir = os.getcwd() |
|
- os.chdir(tmpdir) |
|
- try: |
|
- _make_tarball(base_name, 'dist', compress=None, dry_run=True) |
|
- finally: |
|
- os.chdir(old_dir) |
|
+ make_archive(base_name, 'tar', root_dir, base_dir, dry_run=True) |
|
tarball = base_name + '.tar' |
|
- self.assertTrue(os.path.exists(tarball)) |
|
+ self.assertTrue(os.path.isfile(tarball)) |
|
|
|
@unittest.skipUnless(zlib, "Requires zlib") |
|
@unittest.skipUnless(ZIP_SUPPORT, 'Need zip support to run') |
|
def test_make_zipfile(self): |
|
- # creating something to tar |
|
- tmpdir = self.mkdtemp() |
|
- self.write_file([tmpdir, 'file1'], 'xxx') |
|
- self.write_file([tmpdir, 'file2'], 'xxx') |
|
+ # creating something to zip |
|
+ root_dir, base_dir = self._create_files() |
|
+ base_name = os.path.join(self.mkdtemp(), 'archive') |
|
|
|
- tmpdir2 = self.mkdtemp() |
|
- # force shutil to create the directory |
|
- os.rmdir(tmpdir2) |
|
- base_name = os.path.join(tmpdir2, 'archive') |
|
- _make_zipfile(base_name, tmpdir) |
|
+ res = make_archive(base_name, 'zip', root_dir, base_dir) |
|
|
|
- # check if the compressed tarball was created |
|
- tarball = base_name + '.zip' |
|
- self.assertTrue(os.path.exists(tarball)) |
|
+ self.assertEqual(res, base_name + '.zip') |
|
+ self.assertTrue(os.path.isfile(res)) |
|
+ self.assertTrue(zipfile.is_zipfile(res)) |
|
+ with zipfile.ZipFile(res) as zf: |
|
+ self.assertEqual(sorted(zf.namelist()), |
|
+ ['dist/', 'dist/file1', 'dist/file2', |
|
+ 'dist/sub/', 'dist/sub/file3', 'dist/sub2/']) |
|
|
|
+ @unittest.skipUnless(zlib, "Requires zlib") |
|
+ @unittest.skipUnless(ZIP_SUPPORT, 'Need zip support to run') |
|
+ @unittest.skipUnless(find_executable('zip'), |
|
+ 'Need the zip command to run') |
|
+ def test_zipfile_vs_zip(self): |
|
+ root_dir, base_dir = self._create_files() |
|
+ base_name = os.path.join(self.mkdtemp(), 'archive') |
|
+ archive = make_archive(base_name, 'zip', root_dir, base_dir) |
|
+ |
|
+ # check if ZIP file was created |
|
+ self.assertEqual(archive, base_name + '.zip') |
|
+ self.assertTrue(os.path.isfile(archive)) |
|
+ |
|
+ # now create another ZIP file using `zip` |
|
+ archive2 = os.path.join(root_dir, 'archive2.zip') |
|
+ zip_cmd = ['zip', '-q', '-r', 'archive2.zip', base_dir] |
|
+ with support.change_cwd(root_dir): |
|
+ spawn(zip_cmd) |
|
+ |
|
+ self.assertTrue(os.path.isfile(archive2)) |
|
+ # let's compare both ZIP files |
|
+ with zipfile.ZipFile(archive) as zf: |
|
+ names = zf.namelist() |
|
+ with zipfile.ZipFile(archive2) as zf: |
|
+ names2 = zf.namelist() |
|
+ self.assertEqual(sorted(names), sorted(names2)) |
|
|
|
def test_make_archive(self): |
|
tmpdir = self.mkdtemp() |
|
@@ -521,39 +517,36 @@ class TestShutil(unittest.TestCase): |
|
else: |
|
group = owner = 'root' |
|
|
|
- base_dir, root_dir, base_name = self._create_files() |
|
- base_name = os.path.join(self.mkdtemp() , 'archive') |
|
+ root_dir, base_dir = self._create_files() |
|
+ base_name = os.path.join(self.mkdtemp(), 'archive') |
|
res = make_archive(base_name, 'zip', root_dir, base_dir, owner=owner, |
|
group=group) |
|
- self.assertTrue(os.path.exists(res)) |
|
+ self.assertTrue(os.path.isfile(res)) |
|
|
|
res = make_archive(base_name, 'zip', root_dir, base_dir) |
|
- self.assertTrue(os.path.exists(res)) |
|
+ self.assertTrue(os.path.isfile(res)) |
|
|
|
res = make_archive(base_name, 'tar', root_dir, base_dir, |
|
owner=owner, group=group) |
|
- self.assertTrue(os.path.exists(res)) |
|
+ self.assertTrue(os.path.isfile(res)) |
|
|
|
res = make_archive(base_name, 'tar', root_dir, base_dir, |
|
owner='kjhkjhkjg', group='oihohoh') |
|
- self.assertTrue(os.path.exists(res)) |
|
+ self.assertTrue(os.path.isfile(res)) |
|
|
|
@unittest.skipUnless(zlib, "Requires zlib") |
|
@unittest.skipUnless(UID_GID_SUPPORT, "Requires grp and pwd support") |
|
def test_tarfile_root_owner(self): |
|
- tmpdir, tmpdir2, base_name = self._create_files() |
|
- old_dir = os.getcwd() |
|
- os.chdir(tmpdir) |
|
+ root_dir, base_dir = self._create_files() |
|
+ base_name = os.path.join(self.mkdtemp(), 'archive') |
|
group = grp.getgrgid(0)[0] |
|
owner = pwd.getpwuid(0)[0] |
|
- try: |
|
- archive_name = _make_tarball(base_name, 'dist', compress=None, |
|
- owner=owner, group=group) |
|
- finally: |
|
- os.chdir(old_dir) |
|
+ with support.change_cwd(root_dir): |
|
+ archive_name = make_archive(base_name, 'gztar', root_dir, 'dist', |
|
+ owner=owner, group=group) |
|
|
|
# check if the compressed tarball was created |
|
- self.assertTrue(os.path.exists(archive_name)) |
|
+ self.assertTrue(os.path.isfile(archive_name)) |
|
|
|
# now checks the rights |
|
archive = tarfile.open(archive_name) |
|
@@ -859,7 +852,7 @@ class TestCopyFile(unittest.TestCase): |
|
|
|
|
|
def test_main(): |
|
- test_support.run_unittest(TestShutil, TestMove, TestCopyFile) |
|
+ support.run_unittest(TestShutil, TestMove, TestCopyFile) |
|
|
|
if __name__ == '__main__': |
|
test_main() |
|
diff --git a/Lib/test/test_support.py b/Lib/test/test_support.py |
|
index 42c1b4d..98a9275 100644 |
|
--- a/Lib/test/test_support.py |
|
+++ b/Lib/test/test_support.py |
|
@@ -491,6 +491,33 @@ TESTFN = "{}_{}_tmp".format(TESTFN, os.getpid()) |
|
SAVEDCWD = os.getcwd() |
|
|
|
@contextlib.contextmanager |
|
+def change_cwd(path, quiet=False): |
|
+ """Return a context manager that changes the current working directory. |
|
+ |
|
+ Arguments: |
|
+ |
|
+ path: the directory to use as the temporary current working directory. |
|
+ |
|
+ quiet: if False (the default), the context manager raises an exception |
|
+ on error. Otherwise, it issues only a warning and keeps the current |
|
+ working directory the same. |
|
+ |
|
+ """ |
|
+ saved_dir = os.getcwd() |
|
+ try: |
|
+ os.chdir(path) |
|
+ except OSError: |
|
+ if not quiet: |
|
+ raise |
|
+ warnings.warn('tests may fail, unable to change CWD to: ' + path, |
|
+ RuntimeWarning, stacklevel=3) |
|
+ try: |
|
+ yield os.getcwd() |
|
+ finally: |
|
+ os.chdir(saved_dir) |
|
+ |
|
+ |
|
+@contextlib.contextmanager |
|
def temp_cwd(name='tempcwd', quiet=False): |
|
""" |
|
Context manager that creates a temporary directory and set it as CWD.
|
|
|