Mercurial > treepkg
view contrib/bin/delete-old-debs.py @ 537:aeccb5774939
contrib: Small bug fixes in delete-old-debs.py.
author | Sascha Teichmann <teichmann@intevation.de> |
---|---|
date | Mon, 10 Jan 2011 11:36:55 +0000 |
parents | 8a61185a3357 |
children | 6fb5e8b74414 |
line wrap: on
line source
#!/usr/bin/env python # -*- coding: UTF-8 -*- # # Copyright (C) 2011 by Intevation GmbH # Authors: # Sascha L. Teichmann <sascha.teichmann@intevation.de> # # This program is free software under the GPL (>=v2) # Read the file COPYING coming with the software for details. import sys import os import re import subprocess import logging from heapq import nsmallest from optparse import OptionParser log = logging.getLogger(__name__) log.setLevel(logging.WARNING) log.addHandler(logging.StreamHandler(sys.stderr)) DEFAULT_KEEP = 3 FIELD = re.compile("([a-zA-Z]+):\s*(.+)") # map rich comparison to 'dpkg --compare-versions' # map == to !=, < to >= and so on to reverse order in heap. RICH_CMP = dict([ ("__%s__" % a, lambda se, ot: subprocess.call([ "dpkg", "--compare-versions", se.version, b, ot.version]) == 0) for a, b in (("eq", "ne"), ("ne", "eq"), ("lt", "ge"), ("gt", "le"), ("le", "gt"), ("ge", "lt"))]) class DebCmp(object): """Helper class to make deb files comparable by there versions. """ def __init__(self, version, path): self.version = version self.path = path self.__dict__.update(RICH_CMP) def deb_info(deb, fields=["Package", "Version"]): """Extract some meta info from a deb file.""" po = subprocess.Popen( ["dpkg-deb", "-f", deb] + fields, stdout=subprocess.PIPE) out = po.communicate()[0] return dict([m.groups() for m in map(FIELD.match, out.splitlines()) if m]) def oldest_debs(deb_dir, keep=DEFAULT_KEEP): """Given directory containing deb files this function returns the files that are older than the youngest keep-th per package. """ log.info("scanning dir '%s'" % deb_dir) packages = {} num = 1 for f in os.listdir(deb_dir): if not f.endswith(".deb"): continue deb = os.path.join(deb_dir, f) if not os.path.isfile(deb): continue info = deb_info(deb) packages.setdefault(info['Package'], []).append( DebCmp(info['Version'], deb)) if (num % 10) == 0: log.info("%d debs found" % (num-1)) num += 1 if log.isEnabledFor(logging.INFO): log.info("%d debs found" % (num-1)) log.info("number packages: %s" % len(packages)) for package, debs in packages.iteritems(): if len(debs) > keep: # full sorting is not required stay = frozenset([d.path for d in nsmallest(keep, debs)]) for deb in debs: if deb.path not in stay: yield deb.path def main(): usage = "usage: %prog [options] dir ..." parser = OptionParser(usage=usage) parser.add_option( "-v", "--verbose", action="store_true", dest="verbose", help="verbose output") parser.add_option( "-d", "--dry-run", action="store_true", dest="dry_run", help="don't remove the old deb files") parser.add_option( "-k", "--keep", action="store", dest="keep", type="int", default=DEFAULT_KEEP, help="number of files to keep. Default: %d" % DEFAULT_KEEP) options, args = parser.parse_args() remove = options.dry_run and (lambda x: None) or os.remove keep = max(1, options.keep) if options.verbose: log.setLevel(logging.INFO) for deb_dir in args: if not os.path.isdir(deb_dir): log.warn("'%s' is not a directory" % deb_dir) continue for deb in oldest_debs(deb_dir, keep): log.info("remove '%s'" % deb) remove(deb) changes = deb[:-3] + "changes" if os.path.isfile(changes): log.info("remove '%s'" % changes) remove(changes) if __name__ == "__main__": main()