Mercurial > treepkg
view contrib/bin/delete-old-debs.py @ 570:44c0f8404983
Refactor git pull command out of update
Tag MUST NOT use update because therefore it always changes the current local branch!
For listing the tags it's enough to pull the latest repo changes
author | Bjoern Ricks <bricks@intevation.de> |
---|---|
date | Fri, 02 Sep 2011 11:46:29 +0000 |
parents | 6fb5e8b74414 |
children |
line wrap: on
line source
#!/usr/bin/env python # -*- coding: UTF-8 -*- # # Copyright (C) 2011 by Intevation GmbH # Authors: # Sascha L. Teichmann <sascha.teichmann@intevation.de> # # This program is free software under the GPL (>=v2) # Read the file COPYING coming with the software for details. import sys import os import re import subprocess import logging #from heapq import nsmallest from optparse import OptionParser log = logging.getLogger(__name__) log.setLevel(logging.WARNING) log.addHandler(logging.StreamHandler(sys.stderr)) DEFAULT_KEEP = 3 FIELD = re.compile("([a-zA-Z]+):\s*(.+)") class DebCmp(object): """Helper class to make deb files comparable by there versions. """ def __init__(self, version, path): self.version = version self.path = path def __cmp__(self, other): if self.version == other.version: return 0 # switch lt and gt to reverse order in heap if (subprocess.call([ "dpkg", "--compare-versions", self.version, "gt", other.version]) == 0): return -1 if (subprocess.call([ "dpkg", "--compare-versions", self.version, "lt", other.version]) == 0): return +1 return 0 def deb_info(deb, fields=["Package", "Version"]): """Extract some meta info from a deb file.""" po = subprocess.Popen( ["dpkg-deb", "-f", deb] + fields, stdout=subprocess.PIPE) out = po.communicate()[0] return dict([m.groups() for m in map(FIELD.match, out.splitlines()) if m]) def oldest_debs(deb_dir, keep=DEFAULT_KEEP): """Given directory containing deb files this function returns the files that are older than the youngest keep-th per package. """ log.info("scanning dir '%s'" % deb_dir) packages = {} num = 1 for f in os.listdir(deb_dir): if not f.endswith(".deb"): continue deb = os.path.join(deb_dir, f) if not os.path.isfile(deb): continue info = deb_info(deb) packages.setdefault(info['Package'], []).append( DebCmp(info['Version'], deb)) if (num % 10) == 0: log.info("%d debs found" % (num-1)) num += 1 if log.isEnabledFor(logging.INFO): log.info("%d debs found" % (num-1)) log.info("number packages: %s" % len(packages)) for package, debs in packages.iteritems(): if len(debs) > keep: debs.sort() for deb in debs[keep:]: yield deb.path ## full sorting is not required #stay = frozenset([d.path for d in nsmallest(keep, debs)]) #for deb in debs: # if deb.path not in stay: # yield deb.path def main(): usage = "usage: %prog [options] dir ..." parser = OptionParser(usage=usage) parser.add_option( "-v", "--verbose", action="store_true", dest="verbose", help="verbose output") parser.add_option( "-d", "--dry-run", action="store_true", dest="dry_run", help="don't remove the old deb files") parser.add_option( "-k", "--keep", action="store", dest="keep", type="int", default=DEFAULT_KEEP, help="number of files to keep. Default: %d" % DEFAULT_KEEP) options, args = parser.parse_args() remove = options.dry_run and (lambda x: None) or os.remove keep = max(1, options.keep) if options.verbose: log.setLevel(logging.INFO) for deb_dir in args: if not os.path.isdir(deb_dir): log.warn("'%s' is not a directory" % deb_dir) continue for deb in oldest_debs(deb_dir, keep): log.info("remove '%s'" % deb) remove(deb) changes = deb[:-3] + "changes" if os.path.isfile(changes): log.info("remove '%s'" % changes) remove(changes) if __name__ == "__main__": main()