view contrib/bin/delete-old-debs.py @ 537:aeccb5774939

contrib: Small bug fixes in delete-old-debs.py.
author Sascha Teichmann <teichmann@intevation.de>
date Mon, 10 Jan 2011 11:36:55 +0000
parents 8a61185a3357
children 6fb5e8b74414
line wrap: on
line source
#!/usr/bin/env python
# -*- coding: UTF-8 -*-
#
# Copyright (C) 2011 by Intevation GmbH
# Authors:
# Sascha L. Teichmann <sascha.teichmann@intevation.de>
#
# This program is free software under the GPL (>=v2)
# Read the file COPYING coming with the software for details.

import sys
import os
import re

import subprocess
import logging

from heapq import nsmallest

from optparse import OptionParser

log = logging.getLogger(__name__) 
log.setLevel(logging.WARNING)
log.addHandler(logging.StreamHandler(sys.stderr))

DEFAULT_KEEP = 3

FIELD = re.compile("([a-zA-Z]+):\s*(.+)")

# map rich comparison to 'dpkg --compare-versions'
# map == to !=, < to >= and so on to reverse order in heap. 
RICH_CMP = dict([
    ("__%s__" % a, lambda se, ot:
        subprocess.call([
            "dpkg", "--compare-versions", 
            se.version, b, ot.version]) == 0)
    for a, b in (("eq", "ne"), ("ne", "eq"),
                 ("lt", "ge"), ("gt", "le"),
                 ("le", "gt"), ("ge", "lt"))])


class DebCmp(object):
    """Helper class to make deb files comparable
       by there versions.
    """

    def __init__(self, version, path):
        self.version = version
        self.path    = path

        self.__dict__.update(RICH_CMP)


def deb_info(deb, fields=["Package", "Version"]):
    """Extract some meta info from a deb file."""
    po = subprocess.Popen(
        ["dpkg-deb", "-f", deb] + fields,
        stdout=subprocess.PIPE)
    out = po.communicate()[0]
    return dict([m.groups()
                for m in map(FIELD.match, out.splitlines()) if m])


def oldest_debs(deb_dir, keep=DEFAULT_KEEP):
    """Given directory containing deb files this function 
       returns the files that are older than the youngest 
       keep-th per package.
    """

    log.info("scanning dir '%s'" % deb_dir)

    packages = {}

    num = 1
    for f in os.listdir(deb_dir):
        if not f.endswith(".deb"): continue
        deb = os.path.join(deb_dir, f)
        if not os.path.isfile(deb): continue
        info = deb_info(deb)
        packages.setdefault(info['Package'], []).append(
            DebCmp(info['Version'], deb))
        if (num % 10) == 0:
            log.info("%d debs found" % (num-1))
        num += 1

    if log.isEnabledFor(logging.INFO):
        log.info("%d debs found" % (num-1))
        log.info("number packages: %s" % len(packages))

    for package, debs in packages.iteritems():
        if len(debs) > keep:
            # full sorting is not required
            stay = frozenset([d.path for d in nsmallest(keep, debs)])

            for deb in debs:
                if deb.path not in stay:
                    yield deb.path


def main():
    usage = "usage: %prog [options] dir ..."
    parser = OptionParser(usage=usage)
    parser.add_option(
        "-v", "--verbose", action="store_true",
        dest="verbose",
        help="verbose output")
    parser.add_option(
        "-d", "--dry-run", action="store_true",
        dest="dry_run",
        help="don't remove the old deb files")
    parser.add_option(
        "-k", "--keep", action="store",
        dest="keep", type="int", default=DEFAULT_KEEP,
        help="number of files to keep. Default: %d" % DEFAULT_KEEP)

    options, args = parser.parse_args()
    
    remove = options.dry_run and (lambda x: None) or os.remove
    keep   = max(1, options.keep)
    if options.verbose: log.setLevel(logging.INFO)

    for deb_dir in args:

        if not os.path.isdir(deb_dir):
            log.warn("'%s' is not a directory" % deb_dir)
            continue

        for deb in oldest_debs(deb_dir, keep):
            log.info("remove '%s'" % deb)
            remove(deb)
            changes = deb[:-3] + "changes"
            if os.path.isfile(changes):
                log.info("remove '%s'" % changes)
                remove(changes)


if __name__ == "__main__":
    main()
This site is hosted by Intevation GmbH (Datenschutzerklärung und Impressum | Privacy Policy and Imprint)