You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
275 lines
9.0 KiB
275 lines
9.0 KiB
#!/usr/bin/env python |
|
|
|
"""Misc. useful functionality used by the rest of this package. |
|
|
|
This module provides common functionality used by the other modules in |
|
this package. |
|
|
|
""" |
|
|
|
import sys |
|
import os |
|
import subprocess |
|
|
|
try: |
|
from subprocess import CalledProcessError |
|
except ImportError: |
|
# from python2.7:subprocess.py |
|
# Exception classes used by this module. |
|
class CalledProcessError(Exception): |
|
"""This exception is raised when a process run by check_call() returns |
|
a non-zero exit status. The exit status will be stored in the |
|
returncode attribute.""" |
|
def __init__(self, returncode, cmd): |
|
self.returncode = returncode |
|
self.cmd = cmd |
|
def __str__(self): |
|
return "Command '%s' returned non-zero exit status %d" % (self.cmd, self.returncode) |
|
|
|
|
|
# Whether or not to show debug messages |
|
DEBUG = False |
|
|
|
def notify(msg, *args): |
|
"""Print a message to stderr.""" |
|
print >> sys.stderr, msg % args |
|
|
|
def debug (msg, *args): |
|
"""Print a debug message to stderr when DEBUG is enabled.""" |
|
if DEBUG: |
|
print >> sys.stderr, msg % args |
|
|
|
def error (msg, *args): |
|
"""Print an error message to stderr.""" |
|
print >> sys.stderr, "ERROR:", msg % args |
|
|
|
def warn(msg, *args): |
|
"""Print a warning message to stderr.""" |
|
print >> sys.stderr, "warning:", msg % args |
|
|
|
def die (msg, *args): |
|
"""Print as error message to stderr and exit the program.""" |
|
error(msg, *args) |
|
sys.exit(1) |
|
|
|
|
|
class ProgressIndicator(object): |
|
|
|
"""Simple progress indicator. |
|
|
|
Displayed as a spinning character by default, but can be customized |
|
by passing custom messages that overrides the spinning character. |
|
|
|
""" |
|
|
|
States = ("|", "/", "-", "\\") |
|
|
|
def __init__ (self, prefix = "", f = sys.stdout): |
|
"""Create a new ProgressIndicator, bound to the given file object.""" |
|
self.n = 0 # Simple progress counter |
|
self.f = f # Progress is written to this file object |
|
self.prev_len = 0 # Length of previous msg (to be overwritten) |
|
self.prefix = prefix # Prefix prepended to each progress message |
|
self.prefix_lens = [] # Stack of prefix string lengths |
|
|
|
def pushprefix (self, prefix): |
|
"""Append the given prefix onto the prefix stack.""" |
|
self.prefix_lens.append(len(self.prefix)) |
|
self.prefix += prefix |
|
|
|
def popprefix (self): |
|
"""Remove the last prefix from the prefix stack.""" |
|
prev_len = self.prefix_lens.pop() |
|
self.prefix = self.prefix[:prev_len] |
|
|
|
def __call__ (self, msg = None, lf = False): |
|
"""Indicate progress, possibly with a custom message.""" |
|
if msg is None: |
|
msg = self.States[self.n % len(self.States)] |
|
msg = self.prefix + msg |
|
print >> self.f, "\r%-*s" % (self.prev_len, msg), |
|
self.prev_len = len(msg.expandtabs()) |
|
if lf: |
|
print >> self.f |
|
self.prev_len = 0 |
|
self.n += 1 |
|
|
|
def finish (self, msg = "done", noprefix = False): |
|
"""Finalize progress indication with the given message.""" |
|
if noprefix: |
|
self.prefix = "" |
|
self(msg, True) |
|
|
|
|
|
def start_command (args, cwd = None, shell = False, add_env = None, |
|
stdin = subprocess.PIPE, stdout = subprocess.PIPE, |
|
stderr = subprocess.PIPE): |
|
"""Start the given command, and return a subprocess object. |
|
|
|
This provides a simpler interface to the subprocess module. |
|
|
|
""" |
|
env = None |
|
if add_env is not None: |
|
env = os.environ.copy() |
|
env.update(add_env) |
|
return subprocess.Popen(args, bufsize = 1, stdin = stdin, stdout = stdout, |
|
stderr = stderr, cwd = cwd, shell = shell, |
|
env = env, universal_newlines = True) |
|
|
|
|
|
def run_command (args, cwd = None, shell = False, add_env = None, |
|
flag_error = True): |
|
"""Run the given command to completion, and return its results. |
|
|
|
This provides a simpler interface to the subprocess module. |
|
|
|
The results are formatted as a 3-tuple: (exit_code, output, errors) |
|
|
|
If flag_error is enabled, Error messages will be produced if the |
|
subprocess terminated with a non-zero exit code and/or stderr |
|
output. |
|
|
|
The other arguments are passed on to start_command(). |
|
|
|
""" |
|
process = start_command(args, cwd, shell, add_env) |
|
(output, errors) = process.communicate() |
|
exit_code = process.returncode |
|
if flag_error and errors: |
|
error("'%s' returned errors:\n---\n%s---", " ".join(args), errors) |
|
if flag_error and exit_code: |
|
error("'%s' returned exit code %i", " ".join(args), exit_code) |
|
return (exit_code, output, errors) |
|
|
|
|
|
# from python2.7:subprocess.py |
|
def call(*popenargs, **kwargs): |
|
"""Run command with arguments. Wait for command to complete, then |
|
return the returncode attribute. |
|
|
|
The arguments are the same as for the Popen constructor. Example: |
|
|
|
retcode = call(["ls", "-l"]) |
|
""" |
|
return subprocess.Popen(*popenargs, **kwargs).wait() |
|
|
|
|
|
# from python2.7:subprocess.py |
|
def check_call(*popenargs, **kwargs): |
|
"""Run command with arguments. Wait for command to complete. If |
|
the exit code was zero then return, otherwise raise |
|
CalledProcessError. The CalledProcessError object will have the |
|
return code in the returncode attribute. |
|
|
|
The arguments are the same as for the Popen constructor. Example: |
|
|
|
check_call(["ls", "-l"]) |
|
""" |
|
retcode = call(*popenargs, **kwargs) |
|
if retcode: |
|
cmd = kwargs.get("args") |
|
if cmd is None: |
|
cmd = popenargs[0] |
|
raise CalledProcessError(retcode, cmd) |
|
return 0 |
|
|
|
|
|
# from python2.7:subprocess.py |
|
def check_output(*popenargs, **kwargs): |
|
r"""Run command with arguments and return its output as a byte string. |
|
|
|
If the exit code was non-zero it raises a CalledProcessError. The |
|
CalledProcessError object will have the return code in the returncode |
|
attribute and output in the output attribute. |
|
|
|
The arguments are the same as for the Popen constructor. Example: |
|
|
|
>>> check_output(["ls", "-l", "/dev/null"]) |
|
'crw-rw-rw- 1 root root 1, 3 Oct 18 2007 /dev/null\n' |
|
|
|
The stdout argument is not allowed as it is used internally. |
|
To capture standard error in the result, use stderr=STDOUT. |
|
|
|
>>> check_output(["/bin/sh", "-c", |
|
... "ls -l non_existent_file ; exit 0"], |
|
... stderr=STDOUT) |
|
'ls: non_existent_file: No such file or directory\n' |
|
""" |
|
if 'stdout' in kwargs: |
|
raise ValueError('stdout argument not allowed, it will be overridden.') |
|
process = subprocess.Popen(stdout=subprocess.PIPE, *popenargs, **kwargs) |
|
output, unused_err = process.communicate() |
|
retcode = process.poll() |
|
if retcode: |
|
cmd = kwargs.get("args") |
|
if cmd is None: |
|
cmd = popenargs[0] |
|
raise subprocess.CalledProcessError(retcode, cmd) |
|
return output |
|
|
|
|
|
def file_reader_method (missing_ok = False): |
|
"""Decorator for simplifying reading of files. |
|
|
|
If missing_ok is True, a failure to open a file for reading will |
|
not raise the usual IOError, but instead the wrapped method will be |
|
called with f == None. The method must in this case properly |
|
handle f == None. |
|
|
|
""" |
|
def _wrap (method): |
|
"""Teach given method to handle both filenames and file objects. |
|
|
|
The given method must take a file object as its second argument |
|
(the first argument being 'self', of course). This decorator |
|
will take a filename given as the second argument and promote |
|
it to a file object. |
|
|
|
""" |
|
def _wrapped_method (self, filename, *args, **kwargs): |
|
if isinstance(filename, file): |
|
f = filename |
|
else: |
|
try: |
|
f = open(filename, 'r') |
|
except IOError: |
|
if missing_ok: |
|
f = None |
|
else: |
|
raise |
|
try: |
|
return method(self, f, *args, **kwargs) |
|
finally: |
|
if not isinstance(filename, file) and f: |
|
f.close() |
|
return _wrapped_method |
|
return _wrap |
|
|
|
|
|
def file_writer_method (method): |
|
"""Decorator for simplifying writing of files. |
|
|
|
Enables the given method to handle both filenames and file objects. |
|
|
|
The given method must take a file object as its second argument |
|
(the first argument being 'self', of course). This decorator will |
|
take a filename given as the second argument and promote it to a |
|
file object. |
|
|
|
""" |
|
def _new_method (self, filename, *args, **kwargs): |
|
if isinstance(filename, file): |
|
f = filename |
|
else: |
|
# Make sure the containing directory exists |
|
parent_dir = os.path.dirname(filename) |
|
if not os.path.isdir(parent_dir): |
|
os.makedirs(parent_dir) |
|
f = open(filename, 'w') |
|
try: |
|
return method(self, f, *args, **kwargs) |
|
finally: |
|
if not isinstance(filename, file): |
|
f.close() |
|
return _new_method
|
|
|