#!/usr/bin/python # # Copyright (C) 2014 Linaro # # This is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This file is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this software. If not, see . import argparse import atexit import fnmatch import glob import logging import os import shutil import subprocess import sys import git DEFAULT_BRANCH = "master" DEFAULT_BASE_PATH = "/srv/repositories" DEVNULL = open(os.devnull, "w") atexit.register(DEVNULL.close) log = logging.getLogger("create-bundle") def create_bundles(args): excludes = [] if args.exclude_list: with open(os.path.abspath(args.exclude_list)) as open_f: for line in open_f: line = line.strip() if line and not line.startswith("#"): excludes.append(os.path.join(args.base_path, line)) if args.exclude: for repo in args.exclude: excludes.append(os.path.join(args.base_path, repo)) if args.repo_list: with open(os.path.abspath(args.repo_list)) as open_f: for line in open_f: line = line.strip() if not line or line.startswith("#"): continue split_list = line.split(";") if len(split_list) > 1: repo = split_list[0].strip() branch = split_list[1].strip() else: repo = split_list[0].strip() branch = None repo = os.path.join(args.base_path, repo) log.debug("Read path-branch is: {0}-{1}".format(repo, branch)) for path in glob.glob(repo): if os.path.exists(path): traverse_git_dirs(path, branch, args.dry_run, excludes) else: log.error("Repository %s does not exists." % (path)) else: traverse_git_dirs( args.base_path, DEFAULT_BRANCH, args.dry_run, excludes) def traverse_git_dirs(directory, branch, dry_run, excludes=None): """Traverse git bare directories and create the bundle file.""" for dirname, subdirs, _ in os.walk(directory): log.debug("Traversing directory {0}".format(dirname)) if excludes and os.path.join(directory, dirname) in excludes: log.debug("Excluding repo {0}".format(dirname)) subdirs[:] = [] continue if dirname.endswith(".git"): log.debug("Found git bare repository in {0}".format(dirname)) subdirs[:] = [] git_create_bundle(dirname, branch, dry_run) def git_create_bundle(repo, branch, dry_run): """Create the clone.bundle file.""" branches = get_repo_branch(repo, branch) if branches: cmd_args = ["git", "bundle", "create", "clone.bundle.tmp"] cmd_args += branches log.debug("Running git command with args: {0}".format(cmd_args)) if not dry_run: proc = subprocess.Popen( cmd_args, cwd=repo, stdout=DEVNULL, stderr=DEVNULL) proc.wait() if proc.returncode != 0: log.error( "Error creating clone.bundle for repository {0}, with " "branches {1} (return code was: {2}.".format( repo, branches, proc.returncode) ) else: shutil.move( os.path.join(repo, "clone.bundle.tmp"), os.path.join(repo, "clone.bundle") ) else: log.error("No valid branch found for repository '{0}'.".format(repo)) def get_repo_branch(repo, branch): """Get the valid heads for the repo.""" if not branch: branch = DEFAULT_BRANCH repo = git.Repo(repo) branches = [x.name for x in repo.heads if fnmatch.fnmatch(x.name, branch)] if not branches: # If we do not have the provided or the default branch, try to get # linaro related ones at least. branches = [x.name for x in repo.heads if fnmatch.fnmatch(x.name, '*linaro*')] log.debug("Bundled branches are: {0}".format(branches)) return list(set(branches)) def setup_logging(debug): """Configure log.""" log.addHandler(logging.StreamHandler()) if debug: log.setLevel(logging.DEBUG) else: log.setLevel(logging.INFO) if __name__ == '__main__': parser = argparse.ArgumentParser(description="Create git bundle files.") parser.add_argument( '--base-path', help=("The base path where repositories are stored. " "Defaults to: {0}.".format(DEFAULT_BASE_PATH)), default=DEFAULT_BASE_PATH ) parser.add_argument( '--repo-list', help=("Path to a file with list of repositories to create the bundle " "for. If not provided, bundles will be created for all " "repositories.") ) parser.add_argument( "--exclude", action="append", help="Repository names to exclude. Relative paths." ) parser.add_argument( "--exclude-list", help=("Path to a file with list of repositories to exclude from " "creating the bundle.") ) parser.add_argument( "--dry-run", help="Do not perform any operations, just print the execution.", action="store_true", ) parser.add_argument( "--debug", action="store_true", help="Print debugging information." ) args = parser.parse_args() setup_logging(args.debug) if not os.path.exists(args.base_path): log.error( "Repositories base path '{0}' does not " "exist.".format(args.base_path) ) sys.exit(1) if args.repo_list and not os.path.isfile(os.path.abspath(args.repo_list)): log.error( "Repositories list file '{0}' does not exist or is not " "a file.".format(args.repo_list) ) sys.exit(1) if args.exclude_list and \ not os.path.isfile(os.path.abspath(args.exclude_list)): log.error( "Repositories exclude list file '{0}' does not exists or is not a" "file".format(args.exclude_list) ) sys.exit(1) create_bundles(args)