aheinecke@501: # Copyright (C) 2010 by Intevation GmbH aheinecke@321: # Authors: aheinecke@321: # Andre Heinecke aheinecke@321: # aheinecke@321: # This program is free software under the GPL (>=v2) aheinecke@321: # Read the file COPYING coming with the software for details. aheinecke@321: aheinecke@321: """Collection of Git utility code""" aheinecke@321: aheinecke@321: import os aheinecke@321: import shutil aheinecke@321: import re aheinecke@321: import StringIO aheinecke@321: aheinecke@321: import run bricks@560: bricks@560: from run import capture_output aheinecke@321: from cmdexpand import cmdexpand aheinecke@321: from util import extract_value_for_key aheinecke@321: aheinecke@321: aheinecke@321: class GitError(Exception): aheinecke@321: aheinecke@321: """Base class for Git specific errors raised by TreePKG""" aheinecke@321: aheinecke@321: class GitRepository(object): aheinecke@321: aheinecke@321: """Describes a git repository""" aheinecke@321: aheinecke@321: def __init__(self, url, branch=None): aheinecke@321: """Initialize the git repository description aheinecke@321: Parameters: aheinecke@321: url -- The url of the repository aheinecke@321: aheinecke@321: branch -- The name of the remote Branch to track aheinecke@321: defaults to master aheinecke@321: """ aheinecke@321: self.url = url bricks@557: self.local_branch = "local" aheinecke@321: self.branch = branch bricks@558: if not branch: bricks@558: self.branch = "master" bricks@559: if ":" in self.branch: bricks@559: branches = self.branch.split(":") bricks@557: self.local_branch = branches[0] bricks@557: self.branch = branches[1] bricks@557: # use master as default aheinecke@321: aheinecke@321: def checkout(self, localdir): bricks@550: """Clones the repository at url into the localdir""" bricks@550: run.call(cmdexpand("git clone -q $url $localdir", **locals())) bricks@550: if branch: bricks@557: self.checkout_branch(localdir) bricks@557: bricks@557: def checkout_branch(self, localdir): bricks@557: run.call(cmdexpand("git checkout -q --track -b $local $branch", bricks@557: branch=self.branch, local=self.local_branch), cwd=localdir) bricks@557: aheinecke@321: aheinecke@321: def export(self, localdir, destdir): aheinecke@321: """Exports the working copy in localdir to destdir""" bricks@550: dest = destdir + os.sep bricks@550: run.call(cmdexpand("git checkout-index -a -f --prefix=$dest", dest=dest), bricks@550: cwd=localdir) bricks@550: bricks@550: def copy(self, localdir, destdir): bricks@550: """Copies the working copy to destdir (including .git dir)""" bricks@550: shutils.copytree(localdir, destdir) bricks@550: bricks@557: def update(self, localdir): bricks@550: """Runs git pull on the localdir.""" bricks@550: run.call(cmdexpand("git pull -q"), cwd=localdir) bricks@557: output = capture_output(cmdexpand("git branch"), cwd=localdir) bricks@557: branches = output.splitlines() bricks@557: cur_branch = None bricks@557: all_branches = [] bricks@557: for tbranch in branches: bricks@557: tbranch = tbranch.strip() bricks@557: if tbranch.startswith("*"): bricks@557: cur_branch = tbranch[2:] bricks@561: tbranch = cur_branch bricks@557: all_branches.append(tbranch) bricks@557: if not self.local_branch in all_branches: bricks@557: self.checkout_branch(localdir) bricks@557: # TODO: check if self.local_branch is curbranch bricks@557: # doesn't hurt if a checkout is done on the current branch bricks@557: if self.branch: bricks@557: run.call(cmdexpand("git checkout -q $branch", branch=self.local_branch), bricks@550: cwd=localdir) bricks@550: aheinecke@321: aheinecke@321: def last_changed_revision(self, localdir): bricks@550: """Returns the SHA1 sum of the latest commit in the working copy in localdir""" bricks@550: output = run.capture_output(cmdexpand("git rev-parse HEAD"), bricks@550: cwd=localdir) bricks@550: if output is None: bricks@550: raise GitError("Cannot determine last changed revision for %r" bricks@550: % git_working_copy) bricks@550: return output.strip() aheinecke@321: aheinecke@321: def check_working_copy(self, localdir): aheinecke@447: """FIXME STUB: Not implemented for git""" aheinecke@447: return None aheinecke@321: aheinecke@321: class GitWorkingCopy(object): aheinecke@321: aheinecke@321: """Represents a checkout of a git repository""" aheinecke@321: aheinecke@321: def __init__(self, repository, localdir, logger=None): aheinecke@321: """ aheinecke@321: Initialize the working copy. aheinecke@321: Parameters: aheinecke@321: repository -- The GitRepository instance describing the aheinecke@321: repository aheinecke@321: localdir -- The directory for the working copy aheinecke@321: logger -- logging object to use for some info/debug messages aheinecke@321: """ aheinecke@321: self.repository = repository aheinecke@321: self.localdir = localdir aheinecke@321: self.logger = logger aheinecke@321: aheinecke@321: def log_info(self, *args): aheinecke@321: if self.logger is not None: aheinecke@321: self.logger.info(*args) aheinecke@321: aheinecke@321: def update_or_checkout(self, revision=0): aheinecke@321: """Updates the working copy or creates by checking out the repository. aheinecke@321: Revision number included for compatibility aheinecke@321: """ bricks@509: gitdir = os.path.join(self.localdir, ".git") bricks@545: branch = self.repository.branch bricks@509: if os.path.exists(gitdir): bricks@545: self.log_info("Updating the working copy in %r for repo " \ bricks@545: "%s and branch %s", self.localdir, bricks@546: self.repository.url, bricks@545: branch) bricks@557: self.repository.update(self.localdir) aheinecke@321: else: bricks@519: # TODO: better check if localdir contains files bricks@520: if os.path.exists(self.localdir): bricks@519: raise GitError("Working copy dir %s already exists. " \ bricks@519: " files. Can't checkout from %s" % (self.localdir, bricks@519: self.repository.url)) aheinecke@321: self.log_info("The working copy in %r doesn't exist yet." bricks@545: " Checking out branch %s from %r", bricks@545: self.localdir, branch, self.repository.url) aheinecke@321: self.repository.checkout(self.localdir) aheinecke@321: aheinecke@321: def export(self, destdir): aheinecke@321: """Exports the working copy to destdir""" aheinecke@321: self.repository.export(self.localdir, destdir) aheinecke@321: bricks@527: def export_tag(self, url, destdir, revision=None): bricks@527: """Export tag to destir """ bricks@527: self.export(destdir) bricks@527: aheinecke@321: def last_changed_revision(self): aheinecke@321: """Returns the last changed rev of the working copy""" bricks@541: return self.get_revision() bricks@508: bricks@508: def list_tags(self, pattern): bricks@508: output = run.capture_output(cmdexpand("git tag -l $pattern", bricks@508: pattern=pattern), cwd=self.localdir) bricks@508: return output.splitlines() bricks@508: bricks@508: def get_revision(self, refname="HEAD"): bricks@508: """Return the SHA1 sum of the latest commit""" bricks@509: output = run.capture_output(cmdexpand("git rev-parse $refname", bricks@509: refname=refname), cwd=self.localdir) bricks@508: if output is None: bricks@508: raise GitError("Cannot determine revision for %r" bricks@509: % self.localdir) bricks@508: return output.strip() bricks@508: bricks@541: def get_short_revision(self, refname="HEAD"): bricks@541: """Return the short SHA1 sum of the latest commit""" bricks@541: revision = self.get_revision(refname) bricks@541: return revision[:7] bricks@541: bricks@508: class TagDetector: bricks@508: bricks@508: """Class to detect tags from a git repository bricks@508: bricks@508: The tags are found using the parameters: bricks@508: url -- The url of the git repository to use bricks@508: pattern -- A regular expression matching the tags bricks@508: """ bricks@508: bricks@508: def __init__(self, url, pattern, localdir): bricks@508: self.url = url bricks@508: self.pattern = pattern bricks@508: repo = GitRepository(url) bricks@508: self.workingcopy = GitWorkingCopy(repo, localdir) bricks@508: bricks@509: def list_tags(self): bricks@508: self.workingcopy.update_or_checkout() bricks@509: tags = self.workingcopy.list_tags(self.pattern) bricks@509: return sorted(tags) bricks@509: bricks@509: def newest_tag_revision(self): bricks@509: candidates = self.list_tags() bricks@508: urlrev = (None, None) bricks@508: if candidates: bricks@508: newest = candidates[-1] bricks@508: try: bricks@508: rev = self.workingcopy.get_revision(newest) bricks@508: urlrev = (newest, rev) bricks@508: except GitError: bricks@508: pass bricks@508: return urlrev bricks@525: bricks@525: def tag_pkg_parameters(self, tag_name): bricks@526: # FIXME: Don't hardcore regex bricks@525: #match = re.search(r"enterprise[^.]*\.[^.]*\." bricks@531: match = re.search(r"enterprise[^.]*\.[^.]*" bricks@525: r"(?P[0-9]{8})", bricks@525: tag_name) bricks@525: if match: bricks@525: date = match.group("date") bricks@525: return (date, 1) bricks@525: else: bricks@525: raise GitError("Cannot determine tag parameters from %s" bricks@525: % tag_name)