Mercurial > treepkg > treepkg
view treepkg/util.py @ 528:e51cf2947fab
log tag date
author | Bjoern Ricks <bricks@intevation.de> |
---|---|
date | Mon, 15 Nov 2010 15:15:52 +0000 |
parents | aa90ea7778a5 |
children |
line wrap: on
line source
# Copyright (C) 2007, 2008, 2010 by Intevation GmbH # Authors: # Bernhard Herzog <bh@intevation.de> # Bjoern Ricks <bjoern.ricks@intevation.de> # Andre Heinecke <andre.heinecke@intevation.de> # # This program is free software under the GPL (>=v2) # Read the file COPYING coming with the software for details. """Collection of functions that didn't fit elsewhere""" import os import re import tempfile import shutil import fnmatch import pwd import os.path try: from hashlib import md5 as new_md5 except ImportError: # fall back to md5 for Python versions before 2.5 from md5 import new as new_md5 import run from cmdexpand import cmdexpand def import_dotted_name(dotted_name): module = __import__(dotted_name) for name in dotted_name.split(".")[1:]: module = getattr(module, name) return module def extract_value_for_key(lines, key): """Parses a sequence of strings for a key and returns the associated value The function determines the first string in lines that starts with key. It returns the rest of the lines stripped of leading and trailing whitespace. If the key is not found in lines, the function returns None. """ for line in lines: if line.startswith(key): return line[len(key):].strip() def extract_lsm_version(lsm_file): return extract_value_for_key(open(lsm_file), "Version:") def debian_changelog_version(changelog): """Returns the newest version in a debian changelog.""" output = run.capture_output(["dpkg-parsechangelog", "-l" + changelog]) return extract_value_for_key(output.splitlines(), "Version:") def extract_cmakefile_version(cmakelist): """ Returns the version mentioned in a CMakeList.txt """ major = re.compile(r"VERSION_MAJOR\s+(\d+)", re.IGNORECASE) minor = re.compile(r"VERSION_MINOR\s+(\d+)", re.IGNORECASE) patch = re.compile(r"VERSION_PATCH\s+(\d+)", re.IGNORECASE) version = "" try: for line in open(cmakelist): major_match = major.match(line) minor_match = minor.match(line) patch_match = patch.match(line) if major_match: version = major_match.group(1) if minor_match and version: version += "." + minor_match.group(1) if patch_match: version += "." + patch_match.group(1) return version except: return "" def extract_configureac_version(configure_ac): match = re.match(r"m4_define\(\[?my_version\]?, \[([^]]+)\]\)", line) if match: return match.group(1) match = re.match(r"AC_INIT\([a-zA-Z_]+, ([0-9.]+)", line) if match: return match.group(1) return "" def ensure_directory(directory): """Creates directory and all its parents. Unlike os.makedirs, this function doesn't throw an exception """ if not os.path.isdir(directory): os.makedirs(directory) def listdir_abs(directory, pattern=None): """Like os.listdir, but returns a list of absolute pathnames. Optionally, a glob pattern can be given to restrict the names returned by the function. """ filenames = os.listdir(directory) if pattern is not None: filenames = fnmatch.filter(filenames, pattern) return [os.path.join(directory, filename) for filename in filenames] def copytree(src, dst, symlinks=False): """Recursively copy a directory tree using copy2(). This version is basically the same as the one in the shutil module in the python standard library, however, it's OK if the destination directory already exists. If the optional symlinks flag is true, symbolic links in the source tree result in symbolic links in the destination tree; if it is false, the contents of the files pointed to by symbolic links are copied. """ names = os.listdir(src) ensure_directory(dst) errors = [] for name in names: srcname = os.path.join(src, name) dstname = os.path.join(dst, name) try: if symlinks and os.path.islink(srcname): linkto = os.readlink(srcname) os.symlink(linkto, dstname) elif os.path.isdir(srcname): copytree(srcname, dstname, symlinks) else: shutil.copy2(srcname, dstname) # XXX What about devices, sockets etc.? except (IOError, os.error), why: errors.append((srcname, dstname, why)) if errors: raise Error, errors def writefile(filename, contents, permissions=None): """Write contents to filename in an atomic way. The contents are first written to a temporary file in the same directory as filename. Once the contents are written, the temporary file is closed and renamed to filename. The optional parameter permissions, if given, are the permissions for the new file. By default, or if the parameter is None, the default permissions set by the tempfile.mkstemp are used which means that the file is only readable for the user that created the file. The permissions value is used as the second parameter to os.chmod. """ dirname, basename = os.path.split(filename) fileno, tempname = tempfile.mkstemp("", basename, dirname) try: os.write(fileno, contents) if not contents.endswith("\n"): os.write(fileno, "\n") os.close(fileno) if permissions is not None: os.chmod(tempname, permissions) os.rename(tempname, filename) finally: if os.path.exists(tempname): os.remove(tempname) def replace_in_file(filename, pattern, replacement): """Replace all occurrences of pattern in a file with replacement. The file is modified in place. The search and replace is done with the re.sub function. The pattern and replacement parameter are passed through to re.sub unmodified, so their semantics are determined by re.sub. The return value is True if the contents of the file have been changed, False otherwise. """ contents = open(filename).read() modified = re.sub(pattern, replacement, contents) f = open(filename, "w") f.write(modified) f.close() return modified != contents def filenameproperty(filename, dir_attr="base_dir"): """Create a property for a directory or filename. If the filename is relative it is interpreted as relative to the value of the attribute of self named by dir_attr which defaults to 'base_dir'. """ def get(self): return os.path.join(getattr(self, dir_attr), filename) return property(get) def getuser(): """Returns the login name of the current user owning the proccess""" return pwd.getpwuid(os.getuid())[0] def md5sum(filename): """ calculates the md5sum of a file """ if not os.path.isfile(filename): raise RuntimeError("Could not create md5sum. File not found: %s" % filename) f = file(filename, 'rb') m = new_md5() while True: d = f.read(8096) if not d: break m.update(d) f.close() return m.hexdigest() def remove_trailing_slashes(s): return s.rstrip("/") def expand_filename(filename): """ Applies os.path.expanduser and os.path.expandvars to filename """ return os.path.expandvars(os.path.expanduser(filename)) def compress_all_logs(reference_log, cmd="gzip -9 $logfile"): """ Takes the path of a reference log file and compresses all files in same folder with the cmd command. """ if reference_log and os.path.exists(reference_log): log_dir = os.path.isdir(reference_log) and \ reference_log or os.path.dirname(reference_log) for log_file in [os.path.join(log_dir, f) for f in os.listdir(log_dir)]: if os.path.isfile(log_file): run.call(cmdexpand(cmd, logfile=log_file))