Source code for anybox.recipe.odoo.vcs.bzr

import os
import logging
import subprocess
import urlparse
import urllib
from StringIO import StringIO
from copy import deepcopy

from zc.buildout import UserError
from ..utils import use_or_open
from ..utils import working_directory_keeper
from ..utils import check_output
from .base import SUBPROCESS_ENV
from .base import BaseRepo
from .base import update_check_call
from .base import clone_check_call
from .base import UpdateError
from .base import CloneError

logger = logging.getLogger(__name__)

try:
    from bzrlib.plugins.launchpad.lp_directory import LaunchpadDirectory
except ImportError:
    LPDIR = None
else:
    LPDIR = LaunchpadDirectory()


[docs]class BzrBranch(BaseRepo): """Represent a Bazaar branch tied to a reference branch.""" vcs_control_dir = '.bzr' vcs_official_name = 'Bazaar' def __init__(self, *a, **kw): super(BzrBranch, self).__init__(*a, **kw) if self.options.get('bzr-init') == "ligthweight-checkout": logger.warn("The 'ligthweight-checkout' *misspelling* is " "deprecated as of version 1.7.1 of this buildout " "recipe. " "Please fix it as 'lightweight-checkout', as it will " "probably disappear in version 1.8.") self.options['bzr-init'] = 'lightweight-checkout' if self.url.startswith('lp:') and not self.offline: if LPDIR is None: raise RuntimeError( "To use launchpad locations (lp:), bzrlib must be " "importable. Please also take care that it's the same " "or working exactly as the one behind the bzr executable") # first arg (name) of look_up is acturally ignored url = LPDIR.look_up('', self.url) parsed = list(urlparse.urlparse(url)) parsed[2] = urllib.quote(parsed[2]) self.url = urlparse.urlunparse(parsed)
[docs] def conf_file_path(self): return os.path.join(self.target_dir, '.bzr', 'branch', 'branch.conf')
[docs] def parse_conf(self, from_file=None): """Return a dict of paths from standard conf (or the given file-like) Reference: http://doc.bazaar.canonical.com/bzr.0.18/configuration.htm >>> from pprint import pprint >>> branch = BzrBranch('', '') >>> pprint(branch.parse_conf(StringIO(os.linesep.join([ ... "parent_location = /some/path", ... "submit_location = /other/path"])))) {'parent_location': '/some/path', 'submit_location': '/other/path'} """ with use_or_open(from_file, self.conf_file_path()) as conffile: return dict((name.strip(), url.strip()) for name, url in ( line.split('=', 1) for line in conffile if not line.startswith('#') and '=' in line)) with working_directory_keeper: os.chdir(self.target_dir)
[docs] def write_conf(self, conf, to_file=None): """Write counterpart to :meth:`read_conf` """ lines = ('%s = %s' % (k, v) + os.linesep for k, v in conf.items()) with use_or_open(to_file, self.conf_file_path(), 'w') as conffile: conffile.writelines(lines)
[docs] def update_conf(self): """Update branch.conf. :return bool: ``True`` if parent URL has changed (see lp:1320198) """ try: conf = self.parse_conf() except IOError: logger.error("Cannot read branch.conf of Bazaar branch at %s " "Proceeding anyway, remote URLs shall not be " "updated if needed. Please check that the branch " "is in good shape.", self.target_dir) return old_parent = conf['parent_location'] if old_parent == self.url or self.url.startswith('lp:'): return False self.previous_conf = deepcopy(conf) count = 1 while True: save = 'buildout_save_parent_location_%d' % count if save not in conf: conf[save] = old_parent break count += 1 conf['parent_location'] = self.url self.write_conf(conf) return True
[docs] def rollback_conf(self): """Reset branch.conf to state before latest update_conf changes. Only changes done through the same instance are taken into account. """ previous_conf = getattr(self, 'previous_conf', None) if previous_conf is None: return logger.info("Rollbacking branch.conf for target %r", self.target_dir) self.write_conf(previous_conf)
[docs] def uncommitted_changes(self): """True if we have uncommitted changes.""" return bool(check_output(['bzr', 'status', self.target_dir], env=SUBPROCESS_ENV))
[docs] def revision_id(self, revspec): """Convert revision number (revno) to globally unique revision id. :param str revspec: any revision specification :returns str: revision id specification (directly usable as -r argument) """ testament = check_output(['bzr', 'testament', '--strict', '-r', revspec, self.target_dir]) prefix = 'revision-id:' for line in testament.splitlines(): if line.startswith(prefix): return 'revid:' + line[len(prefix):].strip()
[docs] def parents(self, as_revno=False, pip_compatible=False): """Return current revision. :param as_revno: if ``True``, the revno will be returned. By default, a full revision id is issued (see :meth:`revision_id`) :param pip_compatible: currently, setting this to ``True`` forces ``as_revno`` to ``True`` (pip URL syntax for bzr does not allow revids, because of the ``@`` in bzr revids) This method will not detect pending merges, but :meth:`uncommitted_changes` will, and that is enough for freeze/extract features. """ revno = check_output(['bzr', 'revno', '--tree', self.target_dir], env=SUBPROCESS_ENV).strip() if pip_compatible: as_revno = True if as_revno: return [revno] return [self.revision_id(revno)]
[docs] def clean(self): if not os.path.exists(self.target_dir): # not branched yet, there's nothing to clean return subprocess.check_call(['bzr', 'clean-tree', '--ignored', '--force']) with working_directory_keeper: os.chdir(self.target_dir) subprocess.check_call(['bzr', 'clean-tree', '--ignored', '--force'])
[docs] def revert(self, revision): logger.info("Reverting bzr repo at %s to revision %r", self.target_dir, revision) with working_directory_keeper: os.chdir(self.target_dir) subprocess.check_call(['bzr', 'revert', '-r', revision])
def _update(self, revision): """Update existing branch at target dir to given revision. raise UpdateError in case of problems.""" update_check_call(['bzr', 'up', '-r', revision, self.target_dir], env=SUBPROCESS_ENV) logger.info("Updated %r to revision %s", self.target_dir, revision)
[docs] def get_revid(self, revision): """Convert a locally available revision to a revid. :param str revision: any valid revision string. :raises: :class:`LookupError` if not actually available. """ with working_directory_keeper: os.chdir(self.target_dir) try: log = check_output( ['bzr', 'log', '--show-ids', '-r', revision], env=SUBPROCESS_ENV) except subprocess.CalledProcessError as exc: if exc.returncode != 3: raise raise LookupError( "could not find revision id for %r" % revision) prefix = 'revision-id:' for line in log.split(os.linesep): if line.startswith(prefix): return line[len(prefix):].strip() raise LookupError("could not find revision id for %r" % revision)
[docs] def is_revno(self, revspec, fixed=False): """True iff revspec is a fixed revision number. Valid revision numbers are integers separated by dots. :param fixed: if ``True``, it is further checked that integers are positive. """ revno_prefix = 'revno:' if revspec.startswith(revno_prefix): # GR I checked on Debian's 2.6.0~bzr6526-1, revno:-1 # is in practice accepted by bzr, so we have to check return self.is_revno(revspec[len(revno_prefix):]) for part in revspec.strip().split('.'): try: part = int(part) except ValueError: return False else: if fixed and part <= 0: return False return True
[docs] def is_fixed_revision(self, revstr): """True iff the given revision string is for a fixed revision.""" revstr = revstr.strip() # one never knows if revstr.startswith('revid:') or revstr.startswith('tag:'): return True if not revstr or revstr.startswith('last:'): return False if self.is_revno(revstr, fixed=True): return True
[docs] def is_local_fixed_revision(self, revstr): if not self.is_fixed_revision(revstr): return False try: self.get_revid(revstr) except LookupError: return False else: return True
[docs] def get_update(self, revision): """Ensure that target_dir is a branch of url at specified revision. If target_dir already exists, does a simple pull. Offline-mode: no branch nor pull, but update. In all cases, an attempt to update is performed before any pull Special case: if the 'merge' option is True, merge revision into current branch. """ target_dir = self.target_dir offline = self.offline clear_locks = self.clear_locks if not os.path.exists(target_dir) or \ self.options.get("bzr-init") == 'merge': try: self._branch(revision) except CloneError: if not revision: raise logger.warning("First attempt of branching to %s at revision " "%r failed. Retrying in two steps.", target_dir, revision) # it really happens, see # https://bugs.launchpad.net/anybox.recipe.openerp/+bug/1204573 self._branch(None) self._update(revision) else: # TODO what if bzr source is actually local fs ? if clear_locks: yes = StringIO() yes.write('y') yes.seek(0) logger.info("Break-lock for branch %s ...", target_dir) # GR newer versions of bzr have a --force option, but this call # works also for older ones (fortunately we don't need a pty) p = subprocess.Popen(['bzr', 'break-lock', target_dir], subprocess.PIPE) out, err = p.communicate(input='y') if p.returncode != 0: raise subprocess.CalledProcessError( p.returncode, repr(['bzr', 'break-lock', target_dir])) parent_changed = self.update_conf() unsafe_revno = parent_changed and self.is_revno(revision) fixed_rev = self.is_fixed_revision(revision) init_opt = self.options.get('bzr-init') if fixed_rev and not unsafe_revno: if (offline and init_opt == 'lightweight-checkout'): logger.warning("Offline mode, no update for lightweight " "checkout at %s on revision %r", self.target_dir, revision) return try: self._update(revision) return except UpdateError: if offline: raise if offline: if parent_changed and (unsafe_revno or not fixed_rev): self.rollback_conf() raise UserError( "Change of parent URL with live or revno revision " "specification: %r is forbidden in offline mode. " "If that revno is common to old and new remote " "branch, consider using revision IDs " "instead." % revision) logger.info("Offline mode, no pull for revision %r", revision) else: self._pull() if not (offline and init_opt in ('stacked-branch', 'lightweight-checkout')): self._update(revision)
def _branch(self, revision): """ Branch or checkout remote repository """ target_dir = self.target_dir url = self.url offline = self.offline # TODO case of local url ? if offline: raise IOError( "bzr branch %s does not exist; cannot branch it from " "%s (offline mode)" % (target_dir, url)) options = self.options if "bzr-init" in options and "bzr-stacked-branches" in options: raise Exception( "Both options 'bzr-init' and " "'bzr-stacked-branches' are mutually exclusive. " "Prefer 'bzr-init'.") default = "branch" if "bzr-stacked-branches" in options: logger.warning("'bzr-stacked-branches' option is deprecated. " "Replace by bzr-init=stacked-branch") default = "stacked-branch" bzr_opt = options.get("bzr-init", default) branch_cmd = ['bzr'] if bzr_opt == "branch": branch_cmd.append("branch") logger.info("Branching %s ...", url) elif bzr_opt == "stacked-branch": branch_cmd.extend(["branch", "--stacked"]) logger.info("Stacked branching %s ...", url) elif bzr_opt == "lightweight-checkout": branch_cmd.extend(["checkout", "--lightweight"]) logger.info("Lightweight checkout %s ...", url) elif bzr_opt == "merge": branch_cmd.extend(["merge", "--force"]) logger.info("Merging %s into %s ...", url, self.target_dir) else: raise Exception("Unsupported option %r" % bzr_opt) if revision: branch_cmd.extend(['-r', revision]) if bzr_opt == "merge": branch_cmd.extend([url, '-d', target_dir]) else: branch_cmd.extend([url, target_dir]) clone_check_call(branch_cmd, env=SUBPROCESS_ENV) def _pull(self): if self.options.get('bzr-init') == 'lightweight-checkout': logger.info("Update lightweight checkout at %s ...", self.target_dir) update_check_call(['bzr', 'update', self.target_dir], env=SUBPROCESS_ENV) else: logger.info("Pull for branch %s ...", self.target_dir) update_check_call(['bzr', 'pull', '-d', self.target_dir], env=SUBPROCESS_ENV)
[docs] def archive(self, target_path): with working_directory_keeper: os.chdir(self.target_dir) subprocess.check_call(['bzr', 'export', target_path])