Mercurial > treepkg
view contrib/bin/delete-old-debs.py @ 557:9824e409388b
Refactor git branching
If a checkout is already available and the branch is changed in
the config git command would always fail because it doesn't know
the branch to track. Therefore always check if the branch is
locally available and if not checkout the remote branch
author | Bjoern Ricks <bricks@intevation.de> |
---|---|
date | Fri, 02 Sep 2011 08:45:28 +0000 |
parents | 6fb5e8b74414 |
children |
line wrap: on
line source
#!/usr/bin/env python # -*- coding: UTF-8 -*- # # Copyright (C) 2011 by Intevation GmbH # Authors: # Sascha L. Teichmann <sascha.teichmann@intevation.de> # # This program is free software under the GPL (>=v2) # Read the file COPYING coming with the software for details. import sys import os import re import subprocess import logging #from heapq import nsmallest from optparse import OptionParser log = logging.getLogger(__name__) log.setLevel(logging.WARNING) log.addHandler(logging.StreamHandler(sys.stderr)) DEFAULT_KEEP = 3 FIELD = re.compile("([a-zA-Z]+):\s*(.+)") class DebCmp(object): """Helper class to make deb files comparable by there versions. """ def __init__(self, version, path): self.version = version self.path = path def __cmp__(self, other): if self.version == other.version: return 0 # switch lt and gt to reverse order in heap if (subprocess.call([ "dpkg", "--compare-versions", self.version, "gt", other.version]) == 0): return -1 if (subprocess.call([ "dpkg", "--compare-versions", self.version, "lt", other.version]) == 0): return +1 return 0 def deb_info(deb, fields=["Package", "Version"]): """Extract some meta info from a deb file.""" po = subprocess.Popen( ["dpkg-deb", "-f", deb] + fields, stdout=subprocess.PIPE) out = po.communicate()[0] return dict([m.groups() for m in map(FIELD.match, out.splitlines()) if m]) def oldest_debs(deb_dir, keep=DEFAULT_KEEP): """Given directory containing deb files this function returns the files that are older than the youngest keep-th per package. """ log.info("scanning dir '%s'" % deb_dir) packages = {} num = 1 for f in os.listdir(deb_dir): if not f.endswith(".deb"): continue deb = os.path.join(deb_dir, f) if not os.path.isfile(deb): continue info = deb_info(deb) packages.setdefault(info['Package'], []).append( DebCmp(info['Version'], deb)) if (num % 10) == 0: log.info("%d debs found" % (num-1)) num += 1 if log.isEnabledFor(logging.INFO): log.info("%d debs found" % (num-1)) log.info("number packages: %s" % len(packages)) for package, debs in packages.iteritems(): if len(debs) > keep: debs.sort() for deb in debs[keep:]: yield deb.path ## full sorting is not required #stay = frozenset([d.path for d in nsmallest(keep, debs)]) #for deb in debs: # if deb.path not in stay: # yield deb.path def main(): usage = "usage: %prog [options] dir ..." parser = OptionParser(usage=usage) parser.add_option( "-v", "--verbose", action="store_true", dest="verbose", help="verbose output") parser.add_option( "-d", "--dry-run", action="store_true", dest="dry_run", help="don't remove the old deb files") parser.add_option( "-k", "--keep", action="store", dest="keep", type="int", default=DEFAULT_KEEP, help="number of files to keep. Default: %d" % DEFAULT_KEEP) options, args = parser.parse_args() remove = options.dry_run and (lambda x: None) or os.remove keep = max(1, options.keep) if options.verbose: log.setLevel(logging.INFO) for deb_dir in args: if not os.path.isdir(deb_dir): log.warn("'%s' is not a directory" % deb_dir) continue for deb in oldest_debs(deb_dir, keep): log.info("remove '%s'" % deb) remove(deb) changes = deb[:-3] + "changes" if os.path.isfile(changes): log.info("remove '%s'" % changes) remove(changes) if __name__ == "__main__": main()