Mercurial > treepkg
view contrib/bin/delete-old-debs.py @ 563:a4efa91bc3b2
Also use option --config to pass the config file
author | Bjoern Ricks <bricks@intevation.de> |
---|---|
date | Fri, 02 Sep 2011 10:18:54 +0000 |
parents | 6fb5e8b74414 |
children |
line wrap: on
line source
#!/usr/bin/env python # -*- coding: UTF-8 -*- # # Copyright (C) 2011 by Intevation GmbH # Authors: # Sascha L. Teichmann <sascha.teichmann@intevation.de> # # This program is free software under the GPL (>=v2) # Read the file COPYING coming with the software for details. import sys import os import re import subprocess import logging #from heapq import nsmallest from optparse import OptionParser log = logging.getLogger(__name__) log.setLevel(logging.WARNING) log.addHandler(logging.StreamHandler(sys.stderr)) DEFAULT_KEEP = 3 FIELD = re.compile("([a-zA-Z]+):\s*(.+)") class DebCmp(object): """Helper class to make deb files comparable by there versions. """ def __init__(self, version, path): self.version = version self.path = path def __cmp__(self, other): if self.version == other.version: return 0 # switch lt and gt to reverse order in heap if (subprocess.call([ "dpkg", "--compare-versions", self.version, "gt", other.version]) == 0): return -1 if (subprocess.call([ "dpkg", "--compare-versions", self.version, "lt", other.version]) == 0): return +1 return 0 def deb_info(deb, fields=["Package", "Version"]): """Extract some meta info from a deb file.""" po = subprocess.Popen( ["dpkg-deb", "-f", deb] + fields, stdout=subprocess.PIPE) out = po.communicate()[0] return dict([m.groups() for m in map(FIELD.match, out.splitlines()) if m]) def oldest_debs(deb_dir, keep=DEFAULT_KEEP): """Given directory containing deb files this function returns the files that are older than the youngest keep-th per package. """ log.info("scanning dir '%s'" % deb_dir) packages = {} num = 1 for f in os.listdir(deb_dir): if not f.endswith(".deb"): continue deb = os.path.join(deb_dir, f) if not os.path.isfile(deb): continue info = deb_info(deb) packages.setdefault(info['Package'], []).append( DebCmp(info['Version'], deb)) if (num % 10) == 0: log.info("%d debs found" % (num-1)) num += 1 if log.isEnabledFor(logging.INFO): log.info("%d debs found" % (num-1)) log.info("number packages: %s" % len(packages)) for package, debs in packages.iteritems(): if len(debs) > keep: debs.sort() for deb in debs[keep:]: yield deb.path ## full sorting is not required #stay = frozenset([d.path for d in nsmallest(keep, debs)]) #for deb in debs: # if deb.path not in stay: # yield deb.path def main(): usage = "usage: %prog [options] dir ..." parser = OptionParser(usage=usage) parser.add_option( "-v", "--verbose", action="store_true", dest="verbose", help="verbose output") parser.add_option( "-d", "--dry-run", action="store_true", dest="dry_run", help="don't remove the old deb files") parser.add_option( "-k", "--keep", action="store", dest="keep", type="int", default=DEFAULT_KEEP, help="number of files to keep. Default: %d" % DEFAULT_KEEP) options, args = parser.parse_args() remove = options.dry_run and (lambda x: None) or os.remove keep = max(1, options.keep) if options.verbose: log.setLevel(logging.INFO) for deb_dir in args: if not os.path.isdir(deb_dir): log.warn("'%s' is not a directory" % deb_dir) continue for deb in oldest_debs(deb_dir, keep): log.info("remove '%s'" % deb) remove(deb) changes = deb[:-3] + "changes" if os.path.isfile(changes): log.info("remove '%s'" % changes) remove(changes) if __name__ == "__main__": main()