view contrib/bin/copy-latest-pkgs.py @ 547:70fa7debabed

checkout git branch on update this may be necessary if a treepkg config has changed and a branch is used instead of master
author Bjoern Ricks <bricks@intevation.de>
date Wed, 02 Feb 2011 10:26:02 +0000
parents 247a10201cdd
children
line wrap: on
line source
#!/usr/bin/env python
# -*- coding: UTF-8 -*-
#
# Copyright (C) 2011 by Intevation GmbH
# Authors:
# Sascha L. Teichmann <sascha.teichmann@intevation.de>
#
# This program is free software under the GPL (>=v2)
# Read the file COPYING coming with the software for details.

import os
import re
import sys
import subprocess
import logging
import traceback

from optparse import OptionParser
from shutil   import copyfile

log = logging.getLogger(__name__) 
log.setLevel(logging.WARNING)
log.addHandler(logging.StreamHandler(sys.stderr))

SAEGEWERKER = "saegewerker"

FIELD   = re.compile("([a-zA-Z]+):\s*(.+)")
EPOCH   = re.compile("(?:\d+:)(.+)")
UNSHARP = re.compile("([^-]+)")


class DebCmp(object):
    """Helper class to make deb files comparable
       by there versions.
    """

    def __init__(self, version, path):
        self.version = version
        self.path    = path

    def __cmp__(self, other):
        if self.version == other.version:
            return 0
        if (subprocess.call([
            "dpkg", "--compare-versions", 
            self.version, "gt", other.version]) == 0):
            return +1
        if (subprocess.call([
            "dpkg", "--compare-versions", 
            self.version, "lt", other.version]) == 0):
            return -1
        return 0

    def __str__(self):
        return "version: %s / path: %s" % (
            self.version,
            self.path)


def deb_info(deb, fields=["Package", "Version"]):
    """Extract some meta info from a deb file."""
    po = subprocess.Popen(
        ["dpkg-deb", "-f", deb] + fields,
        stdout=subprocess.PIPE)
    out = po.communicate()[0]
    return dict([m.groups()
                for m in map(FIELD.match, out.splitlines()) if m])


def copy_pkgs(src, dst, options):

    archs = {}

    for arch in os.listdir(src):
        if arch == 'source': continue
        arch_dir = os.path.join(src, arch)
        if not os.path.isdir(arch_dir): continue
        log.info("found arch: '%s'" % arch)

        tracks = {}

        for track in os.listdir(arch_dir):
            track_dir = os.path.join(arch_dir, track)
            if not os.path.isdir(track_dir): continue

            packages = {}

            log.info("track dir: '%s'" % track_dir)
            for f in os.listdir(track_dir):
                if not f.endswith(".deb"): continue
                deb_path = os.path.join(track_dir, f)
                if not os.path.isfile(deb_path): continue

                info = deb_info(deb_path)
                deb_cmp = DebCmp(info['Version'], deb_path)

                packages.setdefault(info['Package'], []).append(deb_cmp)

            if packages:
                tracks[track] = [max(debs) for debs in packages.itervalues()]

        archs[arch] = tracks

    copy   = options.no_hardlinks and copyfile or os.link
    action = "%s %%s -> %%s" % (options.no_hardlinks and "copy" or "link")

    track_versions = {}

    for arch, tracks in archs.iteritems():
        log.debug("writing arch '%s'" % arch)
        for track, debs in tracks.iteritems():
            log.debug("  writing track '%s'" % track)
            dst_dir = os.path.join(dst, arch, track)
            if not os.path.exists(dst_dir):
                try:    os.makedirs(dst_dir)
                except: log.warn(traceback.format_exc()); continue

            track_ver = track_versions.setdefault(track, set())

            for deb in debs:
                src_path = deb.path
                dst_path = os.path.join(dst_dir, os.path.basename(src_path))
                log.info(action % (src_path, dst_path))
                if os.path.isfile(dst_path):
                    try:    os.remove(dst_path)
                    except: log.warn(traceback.format_exc()); continue
                try:    copy(src_path, dst_path)
                except: log.error(traceback.format_exc()); continue

                ver = deb.version
                m = EPOCH.match(ver)
                if m: ver = m.group(1)

                track_ver.add(ver)

    src_source_dir = os.path.join(src, "source")
    if not os.path.isdir(src_source_dir):
        log.info("no source dir found")
        return

    dst_source_dir = os.path.join(dst, "source")

    for track in os.listdir(src_source_dir):
        try: versions = track_versions[track]
        except KeyError: continue
        track_path = os.path.join(src_source_dir, track)
        if not os.path.isdir(track_path): continue
        log.info("found source track: %s" % track)
        unsharp = [UNSHARP.match(x).group(1) for x in versions]
        for f in os.listdir(track_path):
            f_path = os.path.join(track_path, f)
            if not os.path.isfile(f_path): continue
            cand = f.split("_", 1)
            if len(cand) < 2: continue
            cand = cand[1]
            for version in f.endswith(".tar.gz") and unsharp or versions:
                if cand.startswith(version): break
            else:
                continue

            dst_track_dir = os.path.join(dst_source_dir, track)
            
            if not os.path.exists(dst_track_dir):
                try:    os.makedirs(dst_track_dir)
                except: log.error(traceback.format_exc()); continue

            dst_f = os.path.join(dst_track_dir, f)

            log.info(action % (f_path, dst_f))
            if os.path.isfile(dst_f):
                try:    os.remove(dst_f)
                except: log.warn(traceback.format_exc()); continue
            try:    copy(f_path, dst_f)
            except: log.error(traceback.format_exc()); continue


def main():
    usage = "usage: %prog [options] src-dir dst-dir"
    parser = OptionParser(usage=usage)
    parser.add_option(
        "-v", "--verbose", action="store_true",
        dest="verbose",
        help="verbose output")
    parser.add_option(
        "-d", "--dry-run", action="store_true",
        dest="dry_run", default=False,
        help="don't copy the deb files")
    parser.add_option(
        "-n", "--no-saegewerker", action="store_true",
        dest="no_saegewerker", default=False,
        help="Don't force run as '%s'" % SAEGEWERKER)
    parser.add_option(
        "-l", "--no-hardlinks", action="store_false",
        dest="no_hardlinks", default=False,
        help="copy files instead of hard linking")

    options, args = parser.parse_args()

    if len(args) < 2:
        log.error("need at least two arguments")
        sys.exit(1)

    src, dst = args[0], args[1]

    for d in (src, dst):
        if not os.path.isdir(d):
            log.error("'%s' is not a directory." % d)
            sys.exit(1)

    if options.verbose: log.setLevel(logging.INFO)

    if not options.no_saegewerker and os.environ['USER'] != SAEGEWERKER:
        log.error("Need to run as '%s'" % SAEGEWERKER)
        sys.exit(1)

    copy_pkgs(src, dst, options)


if __name__ == '__main__':
    main()
This site is hosted by Intevation GmbH (Datenschutzerklärung und Impressum | Privacy Policy and Imprint)