Mercurial > treepkg
view contrib/bin/copy-latest-pkgs.py @ 577:7a9841e4958f
don't forget to import shlex
author | Bjoern Ricks <bricks@intevation.de> |
---|---|
date | Sat, 03 Sep 2011 11:48:00 +0000 |
parents | 247a10201cdd |
children |
line wrap: on
line source
#!/usr/bin/env python # -*- coding: UTF-8 -*- # # Copyright (C) 2011 by Intevation GmbH # Authors: # Sascha L. Teichmann <sascha.teichmann@intevation.de> # # This program is free software under the GPL (>=v2) # Read the file COPYING coming with the software for details. import os import re import sys import subprocess import logging import traceback from optparse import OptionParser from shutil import copyfile log = logging.getLogger(__name__) log.setLevel(logging.WARNING) log.addHandler(logging.StreamHandler(sys.stderr)) SAEGEWERKER = "saegewerker" FIELD = re.compile("([a-zA-Z]+):\s*(.+)") EPOCH = re.compile("(?:\d+:)(.+)") UNSHARP = re.compile("([^-]+)") class DebCmp(object): """Helper class to make deb files comparable by there versions. """ def __init__(self, version, path): self.version = version self.path = path def __cmp__(self, other): if self.version == other.version: return 0 if (subprocess.call([ "dpkg", "--compare-versions", self.version, "gt", other.version]) == 0): return +1 if (subprocess.call([ "dpkg", "--compare-versions", self.version, "lt", other.version]) == 0): return -1 return 0 def __str__(self): return "version: %s / path: %s" % ( self.version, self.path) def deb_info(deb, fields=["Package", "Version"]): """Extract some meta info from a deb file.""" po = subprocess.Popen( ["dpkg-deb", "-f", deb] + fields, stdout=subprocess.PIPE) out = po.communicate()[0] return dict([m.groups() for m in map(FIELD.match, out.splitlines()) if m]) def copy_pkgs(src, dst, options): archs = {} for arch in os.listdir(src): if arch == 'source': continue arch_dir = os.path.join(src, arch) if not os.path.isdir(arch_dir): continue log.info("found arch: '%s'" % arch) tracks = {} for track in os.listdir(arch_dir): track_dir = os.path.join(arch_dir, track) if not os.path.isdir(track_dir): continue packages = {} log.info("track dir: '%s'" % track_dir) for f in os.listdir(track_dir): if not f.endswith(".deb"): continue deb_path = os.path.join(track_dir, f) if not os.path.isfile(deb_path): continue info = deb_info(deb_path) deb_cmp = DebCmp(info['Version'], deb_path) packages.setdefault(info['Package'], []).append(deb_cmp) if packages: tracks[track] = [max(debs) for debs in packages.itervalues()] archs[arch] = tracks copy = options.no_hardlinks and copyfile or os.link action = "%s %%s -> %%s" % (options.no_hardlinks and "copy" or "link") track_versions = {} for arch, tracks in archs.iteritems(): log.debug("writing arch '%s'" % arch) for track, debs in tracks.iteritems(): log.debug(" writing track '%s'" % track) dst_dir = os.path.join(dst, arch, track) if not os.path.exists(dst_dir): try: os.makedirs(dst_dir) except: log.warn(traceback.format_exc()); continue track_ver = track_versions.setdefault(track, set()) for deb in debs: src_path = deb.path dst_path = os.path.join(dst_dir, os.path.basename(src_path)) log.info(action % (src_path, dst_path)) if os.path.isfile(dst_path): try: os.remove(dst_path) except: log.warn(traceback.format_exc()); continue try: copy(src_path, dst_path) except: log.error(traceback.format_exc()); continue ver = deb.version m = EPOCH.match(ver) if m: ver = m.group(1) track_ver.add(ver) src_source_dir = os.path.join(src, "source") if not os.path.isdir(src_source_dir): log.info("no source dir found") return dst_source_dir = os.path.join(dst, "source") for track in os.listdir(src_source_dir): try: versions = track_versions[track] except KeyError: continue track_path = os.path.join(src_source_dir, track) if not os.path.isdir(track_path): continue log.info("found source track: %s" % track) unsharp = [UNSHARP.match(x).group(1) for x in versions] for f in os.listdir(track_path): f_path = os.path.join(track_path, f) if not os.path.isfile(f_path): continue cand = f.split("_", 1) if len(cand) < 2: continue cand = cand[1] for version in f.endswith(".tar.gz") and unsharp or versions: if cand.startswith(version): break else: continue dst_track_dir = os.path.join(dst_source_dir, track) if not os.path.exists(dst_track_dir): try: os.makedirs(dst_track_dir) except: log.error(traceback.format_exc()); continue dst_f = os.path.join(dst_track_dir, f) log.info(action % (f_path, dst_f)) if os.path.isfile(dst_f): try: os.remove(dst_f) except: log.warn(traceback.format_exc()); continue try: copy(f_path, dst_f) except: log.error(traceback.format_exc()); continue def main(): usage = "usage: %prog [options] src-dir dst-dir" parser = OptionParser(usage=usage) parser.add_option( "-v", "--verbose", action="store_true", dest="verbose", help="verbose output") parser.add_option( "-d", "--dry-run", action="store_true", dest="dry_run", default=False, help="don't copy the deb files") parser.add_option( "-n", "--no-saegewerker", action="store_true", dest="no_saegewerker", default=False, help="Don't force run as '%s'" % SAEGEWERKER) parser.add_option( "-l", "--no-hardlinks", action="store_false", dest="no_hardlinks", default=False, help="copy files instead of hard linking") options, args = parser.parse_args() if len(args) < 2: log.error("need at least two arguments") sys.exit(1) src, dst = args[0], args[1] for d in (src, dst): if not os.path.isdir(d): log.error("'%s' is not a directory." % d) sys.exit(1) if options.verbose: log.setLevel(logging.INFO) if not options.no_saegewerker and os.environ['USER'] != SAEGEWERKER: log.error("Need to run as '%s'" % SAEGEWERKER) sys.exit(1) copy_pkgs(src, dst, options) if __name__ == '__main__': main()