# HG changeset patch # User cmlenz # Date 1186130927 0 # Node ID 026d9aa41b85a14e5a9e9a658f016a35eb796084 # Parent 7801dddec1a18781c884d0e937582e501bfe188b Merged HTTP branch into trunk. diff --git a/bitten/build/config.py b/bitten/build/config.py --- a/bitten/build/config.py +++ b/bitten/build/config.py @@ -149,7 +149,7 @@ _VAR_RE = re.compile(r'\$\{(?P\w[\w.]*?\w)(?:\:(?P.+))?\}') - def interpolate(self, text): + def interpolate(self, text, **vars): """Interpolate configuration properties into a string. Properties can be referenced in the text using the notation @@ -160,11 +160,14 @@ provided, the reference is not replaced at all. @param text: the string containing variable references + @param vars: extra variables to use for the interpolation """ def _replace(m): refname = m.group('ref') if refname in self: return self[refname] + elif refname in vars: + return vars[refname] elif m.group('def'): return m.group('def') else: diff --git a/bitten/build/svntools.py b/bitten/build/svntools.py new file mode 100644 --- /dev/null +++ b/bitten/build/svntools.py @@ -0,0 +1,73 @@ +# -*- coding: utf-8 -*- +# +# Copyright (C) 2007 Christopher Lenz +# All rights reserved. +# +# This software is licensed as described in the file COPYING, which +# you should have received as part of this distribution. The terms +# are also available at http://bitten.cmlenz.net/wiki/License. + +"""Recipe commands for Subversion.""" + +import logging +import posixpath + +log = logging.getLogger('bitten.build.svntools') + +def checkout(ctxt, url, path=None, revision=None): + """Perform a checkout from a Subversion repository. + + @param ctxt: the build context + @type ctxt: an instance of L{bitten.recipe.Context} + @param url: the URL of the repository + @param path: the path inside the repository + @param revision: the revision to check out + """ + args = ['checkout'] + if revision: + args += ['-r', revision] + if path: + url = posixpath.join(url, path) + args += [url, '.'] + + from bitten.build import shtools + returncode = shtools.execute(ctxt, file_='svn', args=args) + if returncode != 0: + ctxt.error('svn checkout failed (%s)' % returncode) + +def export(ctxt, url, path=None, revision=None): + """Perform an export from a Subversion repository. + + @param ctxt: the build context + @type ctxt: an instance of L{bitten.recipe.Context} + @param url: the URL of the repository + @param path: the path inside the repository + @param revision: the revision to check out + """ + args = ['export', '--force'] + if revision: + args += ['-r', revision] + if path: + url = posixpath.join(url, path) + args += [url, '.'] + + from bitten.build import shtools + returncode = shtools.execute(ctxt, file_='svn', args=args) + if returncode != 0: + ctxt.error('svn export failed (%s)' % returncode) + +def update(ctxt, revision=None): + """Update the local working copy from the Subversion repository. + + @param ctxt: the build context + @type ctxt: an instance of L{bitten.recipe.Context} + @param revision: the revision to check out + """ + args = ['update'] + if revision: + args += ['-r', revision] + + from bitten.build import shtools + returncode = shtools.execute(ctxt, file_='svn', args=args) + if returncode != 0: + ctxt.error('svn update failed (%s)' % returncode) diff --git a/bitten/master.py b/bitten/master.py --- a/bitten/master.py +++ b/bitten/master.py @@ -7,17 +7,13 @@ # you should have received as part of this distribution. The terms # are also available at http://bitten.cmlenz.net/wiki/License. -"""Build master implementation. - -This module is runnable as a script to launch the build master. The build -master starts a single process that handles connections to any number of build -slaves. -""" +"""Build master implementation.""" import calendar from datetime import datetime, timedelta import logging import os +import re try: set except NameError: @@ -25,295 +21,177 @@ import sys import time +from trac.config import BoolOption, IntOption +from trac.core import * from trac.env import Environment +from trac.web import IRequestHandler, HTTPBadRequest, HTTPConflict, \ + HTTPForbidden, HTTPMethodNotAllowed, HTTPNotFound, \ + RequestDone + from bitten.model import BuildConfig, Build, BuildStep, BuildLog, Report from bitten.queue import BuildQueue +from bitten.recipe import Recipe from bitten.trac_ext.main import BuildSystem -from bitten.util import beep, xmlio - -log = logging.getLogger('bitten.master') - -DEFAULT_CHECK_INTERVAL = 120 # 2 minutes - - -class Master(beep.Listener): - """BEEP listener implementation for the build master.""" - - def __init__(self, envs, ip, port, build_all=False, - adjust_timestamps=False, - check_interval=DEFAULT_CHECK_INTERVAL): - beep.Listener.__init__(self, ip, port) - self.profiles[OrchestrationProfileHandler.URI] = \ - OrchestrationProfileHandler - self.adjust_timestamps = adjust_timestamps - self.check_interval = check_interval - self.handlers = {} # Map of connected slaves keyed by name - - self.queues = [] - for env in envs: - self.queues.append(BuildQueue(env, build_all=build_all)) - - self.schedule(self.check_interval, self._enqueue_builds) - - def close(self): - for queue in self.queues: - queue.reset_orphaned_builds() - beep.Listener.close(self) - - def _enqueue_builds(self): - self.schedule(self.check_interval, self._enqueue_builds) - - for queue in self.queues: - queue.populate() - - self.schedule(self.check_interval * 0.2, self._initiate_builds) - - def _initiate_builds(self): - available_slaves = set([name for name in self.handlers - if not self.handlers[name].building]) - for idx, queue in enumerate(self.queues[:]): - build, slave = queue.get_next_pending_build(available_slaves) - if build: - self.handlers[slave].send_initiation(queue, build) - available_slaves.discard(slave) - self.queues.append(self.queues.pop(idx)) # Round robin - - def register(self, handler): - if handler.name in self.handlers: - # The slave is for some reason still registered... this shouldn't - # happen in theory, but apparently it does in the real world (see - # #106). We simply unregister it before trying to register it - # again. - self.unregister(handler) - - any_match = False - for queue in self.queues: - if queue.register_slave(handler.name, handler.info): - any_match = True - - if not any_match: - log.warning('Slave %s does not match any of the configured target ' - 'platforms', handler.name) - return False - - self.handlers[handler.name] = handler - self.schedule(self.check_interval * 0.2, self._initiate_builds) - - log.info('Registered slave "%s"', handler.name) - return True - - def unregister(self, handler): - if handler.name not in self.handlers: - return - - for queue in self.queues: - if queue.unregister_slave(handler.name): - for build in list(Build.select(queue.env, slave=handler.name, - status=Build.IN_PROGRESS)): - handler._build_aborted(queue, build) - - del self.handlers[handler.name] - - log.info('Unregistered slave "%s"', handler.name) +from bitten.util import xmlio -class OrchestrationProfileHandler(beep.ProfileHandler): - """Handler for communication on the Bitten build orchestration profile from - the perspective of the build master. - - An instance of this class is associated with exactly one remote build slave. - """ - URI = 'http://bitten.cmlenz.net/beep/orchestration' - - def handle_connect(self): - self.master = self.session.listener - assert self.master - self.name = None - self.building = False - self.info = {} - - def handle_disconnect(self): - self.master.unregister(self) - - def handle_msg(self, msgno, payload): - assert payload.content_type == beep.BEEP_XML - elem = xmlio.parse(payload.body) - - if elem.name == 'register': - self.name = elem.attr['name'] - self.info[Build.IP_ADDRESS] = self.session.addr[0] - for child in elem.children(): - if child.name == 'platform': - self.info[Build.MACHINE] = child.gettext() - self.info[Build.PROCESSOR] = child.attr.get('processor') - elif child.name == 'os': - self.info[Build.OS_NAME] = child.gettext() - self.info[Build.OS_FAMILY] = child.attr.get('family') - self.info[Build.OS_VERSION] = child.attr.get('version') - elif child.name == 'package': - for name, value in child.attr.items(): - if name == 'name': - continue - self.info[child.attr['name'] + '.' + name] = value - - if not self.master.register(self): - raise beep.ProtocolError(550, 'Nothing for you to build here, ' - 'please move along') - - xml = xmlio.Element('ok') - self.channel.send_rpy(msgno, beep.Payload(xml)) +class BuildMaster(Component): + """BEEP listener implementation for the build master.""" - def send_initiation(self, queue, build): - log.info('Initiating build of %d ("%s" as of [%s]) on slave %s', - build.id, build.config, build.rev, self.name) - - build.slave = self.name - build.slave_info.update(self.info) - build.status = Build.IN_PROGRESS - build.update() - self.building = True - - config = BuildConfig.fetch(queue.env, build.config) - - def handle_reply(cmd, msgno, ansno, payload): - if cmd == 'ERR': - if payload.content_type == beep.BEEP_XML: - elem = xmlio.parse(payload.body) - if elem.name == 'error': - log.warning('Slave %s refused build request: %s (%d)', - self.name, elem.gettext(), - int(elem.attr['code'])) - self.building = False - self._build_aborted(queue, build) - return + implements(IRequestHandler) - elem = xmlio.parse(payload.body) - if elem.name != 'proceed': - raise beep.ProtocolError(500) + # Configuration options - snapshots = queue.snapshots[config.name] - snapshot = snapshots.get(build.rev) - if not snapshot: - # Request a snapshot for this build, and schedule a poll - # function that kicks off the snapshot transmission once the - # archive has been completely built - worker = snapshots.create(build.rev) - def _check_snapshot(): - worker.join(.5) - if worker.isAlive(): - self.master.schedule(2, _check_snapshot) - else: - if self.name not in self.master.handlers: - # The slave disconnected while we were building - # the archive - return - snapshot = snapshots.get(build.rev) - if snapshot is None: - log.error('Failed to create snapshot archive for ' - '%s@%s', config.path, build.rev) - return - self.send_snapshot(queue, build, snapshot) - _check_snapshot() - else: - self.send_snapshot(queue, build, snapshot) + adjust_timestamps = BoolOption('bitten', 'adjust_timestamps', False, doc= + """Whether the timestamps of builds should be adjusted to be close ' + to the timestamps of the corresponding changesets.""") + + build_all = BoolOption('bitten', 'build_all', False, doc= + """Whether to request builds of older revisions even if a younger + revision has already been built.""") + + slave_timeout = IntOption('bitten', 'slave_timeout', 3600, doc= + """The time in seconds after which a build is cancelled if the slave + does not report progress.""") + + # IRequestHandler methods + + def match_request(self, req): + match = re.match(r'/builds(?:/(\d+)(?:/(\w+)/([^/]+))?)?$', + req.path_info) + if match: + if match.group(1): + req.args['id'] = match.group(1) + req.args['collection'] = match.group(2) + req.args['member'] = match.group(3) + return True + + def process_request(self, req): + req.perm.assert_permission('BUILD_EXEC') + + if 'id' not in req.args: + if req.method != 'POST': + raise HTTPMethodNotAllowed('Method not allowed') + return self._process_build_creation(req) + + build = Build.fetch(self.env, req.args['id']) + if not build: + raise HTTPNotFound('No such build') + config = BuildConfig.fetch(self.env, build.config) + + if not req.args['collection']: + return self._process_build_initiation(req, config, build) + + if req.method != 'PUT': + raise HTTPMethodNotAllowed('Method not allowed') + + if req.args['collection'] == 'steps': + return self._process_build_step(req, config, build, + req.args['member']) + else: + raise HTTPNotFound('No such collection') + + def _process_build_creation(self, req): + queue = BuildQueue(self.env, build_all=self.build_all) + queue.populate() + + try: + elem = xmlio.parse(req.read()) + except xmlio.ParseError, e: + raise HTTPBadRequest('XML parser error') + + name = elem.attr['name'] + properties = {Build.IP_ADDRESS: req.remote_addr} + self.log.info('Build slave %r connected from %s', name, req.remote_addr) + + for child in elem.children(): + if child.name == 'platform': + properties[Build.MACHINE] = child.gettext() + properties[Build.PROCESSOR] = child.attr.get('processor') + elif child.name == 'os': + properties[Build.OS_NAME] = child.gettext() + properties[Build.OS_FAMILY] = child.attr.get('family') + properties[Build.OS_VERSION] = child.attr.get('version') + elif child.name == 'package': + for name, value in child.attr.items(): + if name == 'name': + continue + properties[child.attr['name'] + '.' + name] = value + + build = queue.get_build_for_slave(name, properties) + if not build: + req.send_response(204) + req.send_header('Content-Type', 'text/plain') + req.write('No pending builds') + raise RequestDone + + req.send_response(201) + req.send_header('Content-Type', 'text/plain') + req.send_header('Location', req.abs_href.builds(build.id)) + req.write('Build pending') + raise RequestDone + + def _process_build_initiation(self, req, config, build): + build.started = int(time.time()) + build.update() xml = xmlio.parse(config.recipe) - xml.attr['project'] = os.path.basename(queue.env.path) - self.channel.send_msg(beep.Payload(xml), - handle_reply=handle_reply, force_flush=True) - - def send_snapshot(self, queue, build, snapshot): - timestamp_delta = 0 - if self.master.adjust_timestamps: - d = datetime.now() - timedelta(seconds=self.master.check_interval) \ - - datetime.fromtimestamp(build.rev_time) - log.info('Warping timestamps by %s', d) - timestamp_delta = d.days * 86400 + d.seconds - - def handle_reply(cmd, msgno, ansno, payload): - if cmd == 'ERR': - if payload.content_type != beep.BEEP_XML: - raise beep.ProtocolError(500) - elem = xmlio.parse(payload.body) - if elem.name == 'error': - log.warning('Slave %s refused to start build: %s (%d)', - self.name, elem.gettext(), - int(elem.attr['code'])) - self.building = False - self._build_aborted(queue, build) - - elif cmd == 'ANS': - if payload.content_type != beep.BEEP_XML: - raise beep.ProtocolError(500) - elem = xmlio.parse(payload.body) + xml.attr['path'] = config.path + xml.attr['revision'] = build.rev + body = str(xml) - # verify that the build hasn't been modified out from - # under us-- we don't want to accept any build information - # from builds that have been invalidated or claimed by - # other slaves. - current_build = Build.fetch(queue.env, build.id) - if current_build.status != Build.IN_PROGRESS or \ - current_build.slave != self.name: - raise beep.ProtocolError(550, 'Build %s has been ' - 'invalidated, will not accept completed steps from ' - 'slave %s' % (build.id, self.name)) - - if elem.name == 'started': - self._build_started(queue, build, elem, timestamp_delta) - elif elem.name == 'step': - self._build_step_completed(queue, build, elem, - timestamp_delta) - elif elem.name == 'completed': - self._build_completed(queue, build, elem, timestamp_delta) - elif elem.name == 'aborted': - self._build_aborted(queue, build) - elif elem.name == 'error': - build.status = Build.FAILURE - - elif cmd == 'NUL': - self.building = False + req.send_response(200) + req.send_header('Content-Type', 'application/x-bitten+xml') + req.send_header('Content-Length', str(len(body))) + req.send_header('Content-Disposition', + 'attachment; filename=recipe_%s_r%s.xml' % + (config.name, build.rev)) + req.write(body) + raise RequestDone - snapshot_name = os.path.basename(snapshot) - message = beep.Payload(file(snapshot, 'rb'), - content_type='application/tar', - content_encoding='bzip2', - content_disposition=snapshot_name) - self.channel.send_msg(message, handle_reply=handle_reply) - - def _build_started(self, queue, build, elem, timestamp_delta=None): - build.started = int(_parse_iso_datetime(elem.attr['time'])) - if timestamp_delta: - build.started -= timestamp_delta - build.update() + def _process_build_step(self, req, config, build, stepname): + step = BuildStep.fetch(self.env, build=build.id, name=stepname) + if step: + raise HTTPConflict('Build step already exists') - log.info('Slave %s started build %d ("%s" as of [%s])', - self.name, build.id, build.config, build.rev) - for listener in BuildSystem(queue.env).listeners: - listener.build_started(build) - - def _build_step_completed(self, queue, build, elem, timestamp_delta=None): - log.debug('Slave %s completed step "%s" with status %s', self.name, - elem.attr['id'], elem.attr['result']) + recipe = Recipe(xmlio.parse(config.recipe)) + index = None + current_step = None + for num, recipe_step in enumerate(recipe): + if recipe_step.id == stepname: + index = num + current_step = recipe_step + if index is None: + raise HTTPForbidden('No such build step') + last_step = index == num - db = queue.env.get_db_cnx() + try: + elem = xmlio.parse(req.read()) + except xmlio.ParseError, e: + raise HTTPBadRequest('XML parser error') - step = BuildStep(queue.env, build=build.id, name=elem.attr['id'], - description=elem.attr.get('description')) - step.started = int(_parse_iso_datetime(elem.attr['time'])) - step.stopped = step.started + int(elem.attr['duration']) - if timestamp_delta: - step.started -= timestamp_delta - step.stopped -= timestamp_delta - if elem.attr['result'] == 'failure': - log.warning('Build %s Step %s failed', build.id, elem.attr['id']) + self.log.debug('Slave %s completed step %d (%s) with status %s', + build.slave, index, stepname, elem.attr['status']) + + db = self.env.get_db_cnx() + + step = BuildStep(self.env, build=build.id, name=stepname) + try: + step.started = int(_parse_iso_datetime(elem.attr['time'])) + step.stopped = step.started + float(elem.attr['duration']) + except ValueError, e: + raise HTTPBadRequest(e.args[0]) + if elem.attr['status'] == 'failure': + self.log.warning('Build %s step %s failed', build.id, stepname) step.status = BuildStep.FAILURE else: step.status = BuildStep.SUCCESS step.errors += [error.gettext() for error in elem.children('error')] step.insert(db=db) + # Collect log messages from the request body for idx, log_elem in enumerate(elem.children('log')): - build_log = BuildLog(queue.env, build=build.id, step=step.name, + build_log = BuildLog(self.env, build=build.id, step=step.name, generator=log_elem.attr.get('generator'), orderno=idx) for message_elem in log_elem.children('message'): @@ -321,8 +199,9 @@ message_elem.gettext())) build_log.insert(db=db) + # Collect report data from the request body for report_elem in elem.children('report'): - report = Report(queue.env, build=build.id, step=step.name, + report = Report(self.env, build=build.id, step=step.name, category=report_elem.attr.get('category'), generator=report_elem.attr.get('generator')) for item_elem in report_elem.children(): @@ -333,42 +212,40 @@ report.items.append(item) report.insert(db=db) + # If this was the last step in the recipe we mark the build as + # completed + if last_step or step.status == BuildStep.FAILURE and \ + current_step.onerror == 'fail': + self.log.info('Slave %s completed build %d ("%s" as of [%s])', + build.slave, build.id, build.config, build.rev) + build.stopped = step.stopped + + # Determine overall outcome of the build by checking the outcome + # of the individual steps against the "onerror" specification of + # each step in the recipe + for num, recipe_step in enumerate(recipe): + step = BuildStep.fetch(self.env, build.id, recipe_step.id) + if step.status == BuildStep.FAILURE: + if recipe_step.onerror != 'ignore': + build.status = Build.FAILURE + break + else: + build.status = Build.SUCCESS + + build.update(db=db) + db.commit() - def _build_completed(self, queue, build, elem, timestamp_delta=None): - build.stopped = int(_parse_iso_datetime(elem.attr['time'])) - if timestamp_delta: - build.stopped -= timestamp_delta - if elem.attr['result'] == 'failure': - build.status = Build.FAILURE - else: - build.status = Build.SUCCESS - build.update() - - log.info('Slave %s completed build %d ("%s" as of [%s]) with status %s', - self.name, build.id, build.config, build.rev, - build.status == Build.FAILURE and 'FAILURE' or 'SUCCESS') - for listener in BuildSystem(queue.env).listeners: - listener.build_completed(build) + if last_step: + for listener in BuildSystem(self.env).listeners: + listener.build_completed(build) - def _build_aborted(self, queue, build): - log.info('Slave %s aborted build %d ("%s" as of [%s])', - self.name, build.id, build.config, build.rev) - for listener in BuildSystem(queue.env).listeners: - listener.build_aborted(build) - - db = queue.env.get_db_cnx() - - for step in list(BuildStep.select(queue.env, build=build.id, db=db)): - step.delete(db=db) - - build.slave = None - build.slave_info = {} - build.started = 0 - build.status = Build.PENDING - build.update(db=db) - - db.commit() + body = 'Build step processed' + req.send_response(200) + req.send_header('Content-Type', 'text/plain') + req.send_header('Content-Length', str(len(body))) + req.write(body) + raise RequestDone def _parse_iso_datetime(string): @@ -380,106 +257,4 @@ string = string.split('.', 1)[0] # strip out microseconds return calendar.timegm(time.strptime(string, '%Y-%m-%dT%H:%M:%S')) except ValueError, e: - raise ValueError('Invalid ISO date/time %s (%s)' % (string, e)) - -def main(): - """Main entry point for running the build master.""" - from bitten import __version__ as VERSION - from optparse import OptionParser - - # Parse command-line arguments - parser = OptionParser(usage='usage: %prog [options] ENV_PATHS', - version='%%prog %s' % VERSION) - parser.add_option('-p', '--port', action='store', type='int', dest='port', - help='port number to use') - parser.add_option('-H', '--host', action='store', dest='host', - metavar='HOSTNAME', - help='the host name or IP address to bind to') - parser.add_option('-l', '--log', dest='logfile', metavar='FILENAME', - help='write log messages to FILENAME') - parser.add_option('-i', '--interval', dest='interval', metavar='SECONDS', - default=DEFAULT_CHECK_INTERVAL, type='int', - help='poll interval for changeset detection') - parser.add_option('--build-all', action='store_true', dest='buildall', - help='build older revisions even when a build for a ' - 'newer revision has already been performed') - parser.add_option('--timewarp', action='store_true', dest='timewarp', - help='adjust timestamps of builds to be near the ' - 'timestamps of the corresponding changesets') - parser.add_option('--debug', action='store_const', dest='loglevel', - const=logging.DEBUG, help='enable debugging output') - parser.add_option('-v', '--verbose', action='store_const', dest='loglevel', - const=logging.INFO, help='print as much as possible') - parser.add_option('-q', '--quiet', action='store_const', dest='loglevel', - const=logging.ERROR, help='print as little as possible') - parser.set_defaults(port=7633, loglevel=logging.WARNING) - options, args = parser.parse_args() - - if len(args) < 1: - parser.error('incorrect number of arguments') - - # Configure logging - logger = logging.getLogger('bitten') - logger.setLevel(options.loglevel) - handler = logging.StreamHandler() - if options.logfile: - handler.setLevel(logging.WARNING) - else: - handler.setLevel(options.loglevel) - formatter = logging.Formatter('%(message)s') - handler.setFormatter(formatter) - logger.addHandler(handler) - if options.logfile: - handler = logging.FileHandler(options.logfile) - handler.setLevel(options.loglevel) - formatter = logging.Formatter('%(asctime)s [%(name)s] %(levelname)s: ' - '%(message)s') - handler.setFormatter(formatter) - logger.addHandler(handler) - - port = options.port - if not (1 <= port <= 65535): - parser.error('port must be an integer in the range 1-65535') - - host = options.host - if not host: - import socket - ip = socket.gethostbyname(socket.gethostname()) - try: - host = socket.gethostbyaddr(ip)[0] - except socket.error, e: - log.warning('Reverse host name lookup failed (%s)', e) - host = ip - - envs = [] - env_names = set() - for env_path in [os.path.normpath(arg) for arg in args]: - if not os.path.isdir(env_path): - log.warning('Ignoring %s: not a directory', env_path) - continue - env_name = os.path.basename(env_path) - if env_name in env_names: - log.warning('Ignoring %s: duplicate project name "%s"', env_path, - env_name) - continue - env_names.add(env_name) - env = Environment(env_path) - if BuildSystem(env): - if env.needs_upgrade(): - log.warning('Environment at %s needs to be upgraded', env.path) - continue - envs.append(env) - if not envs: - log.error('None of the specified environments has support for Bitten') - sys.exit(2) - - master = Master(envs, host, port, build_all=options.buildall, - adjust_timestamps=options.timewarp, - check_interval=options.interval) - try: - master.run(timeout=5.0) - except KeyboardInterrupt: - master.quit() - -if __name__ == '__main__': - main() + raise ValueError('Invalid ISO date/time %r' % string) diff --git a/bitten/queue.py b/bitten/queue.py --- a/bitten/queue.py +++ b/bitten/queue.py @@ -23,7 +23,6 @@ import re from bitten.model import BuildConfig, TargetPlatform, Build, BuildStep -from bitten.snapshot import SnapshotManager log = logging.getLogger('bitten.queue') @@ -84,9 +83,8 @@ class BuildQueue(object): """Enapsulates the build queue of an environment. - A build queue manages the the registration of build slaves, creation and - removal of snapshot archives, and detection of repository revisions that - need to be built. + A build queue manages the the registration of build slaves and detection of + repository revisions that need to be built. """ def __init__(self, env, build_all=False): @@ -96,21 +94,14 @@ @param build_all: whether older revisions should be built """ self.env = env + self.log = env.log self.build_all = build_all - self.slaves = {} # Sets of slave names keyed by target platform ID - - # Snapshot managers, keyed by build config name - self.snapshots = {} - for config in BuildConfig.select(self.env, include_inactive=True): - self.snapshots[config.name] = SnapshotManager(config) - - self.reset_orphaned_builds() # Build scheduling - def get_next_pending_build(self, available_slaves): - """Check whether one of the pending builds can be built by one of the - available build slaves. + def get_build_for_slave(self, name, properties): + """Check whether one of the pending builds can be built by the build + slave. If such a build is found, this method returns a C{(build, slave)} tuple, where C{build} is the L{bitten.model.Build} object and C{slave} @@ -119,88 +110,71 @@ Otherwise, this function will return C{(None, None)} """ log.debug('Checking for pending builds...') - if len(available_slaves) == 0: - log.debug('No available slaves.') - return None, None + db = self.env.get_db_cnx() repos = self.env.get_repository() - # delete any old builds. - builds_to_delete = [] - try: - for build in Build.select(self.env, status=Build.PENDING): - if self.should_delete_build(build, repos): - builds_to_delete.append(build) - finally: - db = self.env.get_db_cnx() - for build in builds_to_delete: - build.delete(db=db) - - # Rather than just take the first build available to - # this slave by version number, we'd like to ensure that - # all the most recent revisions of each config are built - # before we do any older ones. If all the most recent - # revisions are done/in progress for our set of available - # slaves, we'll just fall back to processing the remaining - # builds in descending revision order. First thing we'll do is - # figure out the newest revision that has a build for each config. - - # now make sure all the newest revisions of each config that can be - # built are in-progress or done. - for config in BuildConfig.select(self.env): - # need to loop to get all target platforms of the - # newest revision - newest_rev = -1 - for build in Build.select(self.env, config.name): - if build.rev < newest_rev: - break - if self.should_delete_build(build, repos): - continue - newest_rev = build.rev + self.reset_orphaned_builds() - if build.status == Build.PENDING: - slaves = self.slaves.get(build.platform, []) - for idx, slave in enumerate([name for name in slaves - if name in available_slaves]): - slaves.append(slaves.pop(idx)) # Round robin - return build, slave - - # now just assign anyone who's left - for build in Build.select(self.env, status=Build.PENDING): + # Iterate through pending builds by descending revision timestamp, to + # avoid the first configuration/platform getting all the builds + platforms = [p.id for p in self.match_slave(name, properties)] + build = None + builds_to_delete = [] + for build in Build.select(self.env, status=Build.PENDING, db=db): if self.should_delete_build(build, repos): - continue - # Find a slave for the build platform that is not already building - # something else - slaves = self.slaves.get(build.platform, []) - for idx, slave in enumerate([name for name in slaves if name - in available_slaves]): - slaves.append(slaves.pop(idx)) # Round robin - return build, slave + builds_to_delete.append(build) + elif build.platform in platforms: + break + else: + self.log.debug('No pending builds.') + return None - log.debug('No pending builds.') - return None, None + # delete any obsolete builds + for build in builds_to_delete: + build.delete(db=db) - def should_delete_build(self, build, repos): - # Ignore pending builds for deactived build configs - config = BuildConfig.fetch(self.env, build.config) - if not config.active: - log.info('Dropping build of configuration "%s" at ' - 'revision [%s] on "%s" because the configuration is ' - 'deactivated', config.name, build.rev, TargetPlatform.fetch(self.env, build.platform).name) - return True - # Stay within the revision limits of the build config - if (config.min_rev and repos.rev_older_than(build.rev, - config.min_rev)) \ - or (config.max_rev and repos.rev_older_than(config.max_rev, - build.rev)): - # This minimum and/or maximum revision has changed since - # this build was enqueued, so drop it - log.info('Dropping build of configuration "%s" at ' - 'revision [%s] on "%s" because it is outside of the ' - 'revision range of the configuration', config.name, - build.rev, TargetPlatform.fetch(self.env, build.platform).name) - return True - return False + if build: + build.slave = name + build.slave_info.update(properties) + build.status = Build.IN_PROGRESS + build.update(db=db) + + if build or builds_to_delete: + db.commit() + + return build + + def match_slave(self, name, properties): + """Match a build slave against available target platforms. + + @param name: The name of the slave + @param properties: A dict containing the properties of the slave + @return: the list of platforms the slave matched + """ + platforms = [] + + for config in BuildConfig.select(self.env): + for platform in TargetPlatform.select(self.env, config=config.name): + match = True + for propname, pattern in ifilter(None, platform.rules): + try: + propvalue = properties.get(propname) + if not propvalue or not re.match(pattern, propvalue): + match = False + break + except re.error: + self.log.error('Invalid platform matching pattern "%s"', + pattern, exc_info=True) + match = False + break + if match: + self.log.debug('Slave %s matched target platform %r of ' + 'build configuration %r', name, + platform.name, config.name) + platforms.append(platform) + + return platforms def populate(self): """Add a build for the next change on each build configuration to the @@ -217,21 +191,26 @@ db = self.env.get_db_cnx() builds = [] + for config in BuildConfig.select(self.env, db=db): for platform, rev, build in collect_changes(repos, config, db): if build is None: - log.info('Enqueuing build of configuration "%s" at ' - 'revision [%s] on %s', config.name, rev, - platform.name) + self.log.info('Enqueuing build of configuration "%s" at ' + 'revision [%s] on %s', config.name, rev, + platform.name) build = Build(self.env, config=config.name, platform=platform.id, rev=str(rev), - rev_time = repos.get_changeset(rev).date) + rev_time=repos.get_changeset(rev).date) builds.append(build) break - elif not self.build_all: + if not self.build_all: + self.log.debug('Ignoring older revisions for configuration ' + '%r', config.name) break + for build in builds: build.insert(db=db) + db.commit() def reset_orphaned_builds(self): @@ -252,53 +231,27 @@ build.update(db=db) db.commit() - # Slave registry + def should_delete_build(self, build, repos): + # Ignore pending builds for deactived build configs + config = BuildConfig.fetch(self.env, build.config) + if not config.active: + log.info('Dropping build of configuration "%s" at ' + 'revision [%s] on "%s" because the configuration is ' + 'deactivated', config.name, build.rev, + TargetPlatform.fetch(self.env, build.platform).name) + return True - def register_slave(self, name, properties): - """Register a build slave with the queue. - - This method tries to match the slave against the configured target - platforms. Only if it matches at least one platform will the - registration be successful. - - @param name: The name of the slave - @param properties: A dict containing the properties of the slave - @return: Whether the registration was successful - """ - any_match = False - for config in BuildConfig.select(self.env): - for platform in TargetPlatform.select(self.env, config=config.name): - match = True - for propname, pattern in ifilter(None, platform.rules): - try: - propvalue = properties.get(propname) - if not propvalue or not re.match(pattern, propvalue): - match = False - break - except re.error: - log.error('Invalid platform matching pattern "%s"', - pattern, exc_info=True) - match = False - break - if match: - log.debug('Slave %s matched target platform "%s"', name, - platform.name) - self.slaves.setdefault(platform.id, []).append(name) - any_match = True - return any_match + # Stay within the revision limits of the build config + if (config.min_rev and repos.rev_older_than(build.rev, + config.min_rev)) \ + or (config.max_rev and repos.rev_older_than(config.max_rev, + build.rev)): + # This minimum and/or maximum revision has changed since + # this build was enqueued, so drop it + log.info('Dropping build of configuration "%s" at revision [%s] on ' + '"%s" because it is outside of the revision range of the ' + 'configuration', config.name, build.rev, + TargetPlatform.fetch(self.env, build.platform).name) + return True - def unregister_slave(self, name): - """Unregister a build slave. - - This method removes the slave from the registry, and also resets any - in-progress builds by this slave to PENDING state. - - @param name: The name of the slave - @return: C{True} if the slave was registered for this build queue, - C{False} otherwise - """ - for slaves in self.slaves.values(): - if name in slaves: - slaves.remove(name) - return True return False diff --git a/bitten/recipe.py b/bitten/recipe.py --- a/bitten/recipe.py +++ b/bitten/recipe.py @@ -41,7 +41,7 @@ step = None # The current step generator = None # The current generator (namespace#name) - def __init__(self, basedir, config=None): + def __init__(self, basedir, config=None, vars=None): """Initialize the context. @param basedir: a string containing the working directory for the build @@ -50,6 +50,7 @@ """ self.basedir = os.path.realpath(basedir) self.config = config or Configuration() + self.vars = vars or {} self.output = [] def run(self, step, namespace, name, attr): @@ -81,7 +82,8 @@ if keyword.iskeyword(name) or name in __builtins__: name = name + '_' return name - args = dict([(escape(name), self.config.interpolate(attr[name])) + args = dict([(escape(name), + self.config.interpolate(attr[name], **self.vars)) for name in attr]) self.generator = qname @@ -172,6 +174,9 @@ self.description = elem.attr.get('description') self.onerror = elem.attr.get('onerror', 'fail') + def __repr__(self): + return '<%s %r>' % (type(self).__name__, self.id) + def execute(self, ctxt): """Execute this step in the given context. @@ -215,7 +220,9 @@ @type config: an instance of L{bitten.build.config.Configuration} """ assert isinstance(xml, xmlio.ParsedElement) - self.ctxt = Context(basedir, config) + vars = dict([(name, value) for name, value in xml.attr.items() + if not name.startswith('xmlns')]) + self.ctxt = Context(basedir, config, vars) self._root = xml def __iter__(self): diff --git a/bitten/slave.py b/bitten/slave.py --- a/bitten/slave.py +++ b/bitten/slave.py @@ -10,6 +10,7 @@ """Implementation of the build slave.""" from datetime import datetime +import httplib import logging import os import platform @@ -18,25 +19,27 @@ except NameError: from sets import Set as set import shutil +import socket import tempfile -import tarfile +import time +import urlparse from bitten.build import BuildError from bitten.build.config import Configuration from bitten.recipe import Recipe, InvalidRecipeError -from bitten.util import beep, xmlio +from bitten.util import xmlio log = logging.getLogger('bitten.slave') -class Slave(beep.Initiator): +class BuildSlave(object): """BEEP initiator implementation for the build slave.""" - def __init__(self, ip, port, name=None, config=None, dry_run=False, + def __init__(self, url, name=None, config=None, dry_run=False, work_dir=None, keep_files=False, single_build=False): """Create the build slave instance. - @param ip: Host name or IP address of the build master to connect to + @param url: The URL of the build master @param port: TCP port number of the build master to connect to @param name: The name with which this slave should identify itself @param config: The slave configuration @@ -48,9 +51,11 @@ @param single_build: Whether this slave should exit after completing a single build, or continue processing builds forever """ - beep.Initiator.__init__(self, ip, port) + self.url = url + if name is None: + name = platform.node().split('.', 1)[0].lower() self.name = name - self.config = config + self.config = Configuration(config) self.dry_run = dry_run if not work_dir: work_dir = tempfile.mkdtemp(prefix='bitten') @@ -60,48 +65,41 @@ self.keep_files = keep_files self.single_build = single_build - def greeting_received(self, profiles): - """Start a channel for the build orchestration profile, if advertised - by the peer. - - Otherwise, terminate the session. - """ - if OrchestrationProfileHandler.URI not in profiles: - err = 'Peer does not support the Bitten orchestration profile' - log.error(err) - raise beep.TerminateSession(err) - self.channels[0].profile.send_start([OrchestrationProfileHandler]) - - -class OrchestrationProfileHandler(beep.ProfileHandler): - """Handler for communication on the Bitten build orchestration profile from - the perspective of the build slave. - """ - URI = 'http://bitten.cmlenz.net/beep/orchestration' + def request(self, method, url, body=None, headers=None): + scheme, host, path, query, fragment = urlparse.urlsplit(url) + scheme = (scheme or 'http').lower() - def handle_connect(self): - """Register with the build master.""" - self.build_xml = None + if scheme == 'https': + conn = httplib.HTTPSConnection(host) + else: + conn = httplib.HTTPConnection(host) - def handle_reply(cmd, msgno, ansno, payload): - if cmd == 'ERR': - if payload.content_type == beep.BEEP_XML: - elem = xmlio.parse(payload.body) - if elem.name == 'error': - log.error('Slave registration failed: %s (%d)', - elem.gettext(), int(elem.attr['code'])) - raise beep.TerminateSession('Registration failed!') - log.info('Registration successful') + if headers is None: + headers = {} + if body is None: + body = '' + headers['Content-Length'] = len(body) + conn.request(method.upper(), path, body, headers) + return conn.getresponse() - self.config = Configuration(self.session.config) - if self.session.name is not None: - node = self.session.name - else: - node = platform.node().split('.', 1)[0].lower() + def run(self): + while True: + try: + try: + self._create_build() + except socket.error, e: + log.error(e) + raise ExitSlave() + except ExitSlave: + break + time.sleep(30) - log.info('Registering with build master as %s', node) - log.debug('Properties: %s', self.config.properties) - xml = xmlio.Element('register', name=node)[ + def quit(self): + log.info('Shutting down') + raise ExitSlave() + + def _create_build(self): + xml = xmlio.Element('slave', name=self.name)[ xmlio.Element('platform', processor=self.config['processor'])[ self.config['machine'] ], @@ -110,182 +108,97 @@ self.config['os'] ], ] - log.debug('Packages: %s', self.config.packages) + + log.debug('Configured packages: %s', self.config.packages) for package, properties in self.config.packages.items(): xml.append(xmlio.Element('package', name=package, **properties)) - self.channel.send_msg(beep.Payload(xml), handle_reply, True) - - def handle_msg(self, msgno, payload): - """Handle either a build initiation or the transmission of a snapshot - archive. - - @param msgno: The identifier of the BEEP message - @param payload: The payload of the message - """ - if payload.content_type == beep.BEEP_XML: - elem = xmlio.parse(payload.body) - if elem.name == 'build': - # Received a build request - self.build_xml = elem - xml = xmlio.Element('proceed') - self.channel.send_rpy(msgno, beep.Payload(xml)) - - elif payload.content_type == 'application/tar' and \ - payload.content_encoding == 'bzip2': - # Received snapshot archive for build - project_name = self.build_xml.attr.get('project', 'default') - project_dir = os.path.join(self.session.work_dir, project_name) - if not os.path.exists(project_dir): - os.mkdir(project_dir) - - archive_name = payload.content_disposition - if not archive_name: - archive_name = 'snapshot.tar.bz2' - archive_path = os.path.join(project_dir, archive_name) - - archive_file = file(archive_path, 'wb') - try: - shutil.copyfileobj(payload.body, archive_file) - finally: - archive_file.close() - basedir = self.unpack_snapshot(project_dir, archive_name) - - try: - recipe = Recipe(self.build_xml, basedir, self.config) - self.execute_build(msgno, recipe) - finally: - if not self.session.keep_files: - shutil.rmtree(basedir) - os.remove(archive_path) - if self.session.single_build: - log.info('Exiting after single build completion.') - self.session.quit() - - def unpack_snapshot(self, project_dir, archive_name): - """Unpack a snapshot archive. - - @param project_dir: Base directory for builds for the project - @param archive_name: Name of the archive file - """ - path = os.path.join(project_dir, archive_name) - log.debug('Received snapshot archive: %s', path) - try: - tar_file = tarfile.open(path, 'r:bz2') - tar_file.chown = lambda *args: None # Don't chown extracted members - basedir = None - try: - for tarinfo in tar_file: - if tarinfo.isfile() or tarinfo.isdir(): - if tarinfo.name.startswith('/') or '..' in tarinfo.name: - continue - tar_file.extract(tarinfo, project_dir) - if basedir is None: - basedir = tarinfo.name.split('/', 1)[0] - finally: - tar_file.close() - - basedir = os.path.join(project_dir, basedir) - log.debug('Unpacked snapshot to %s' % basedir) - return basedir - - except tarfile.TarError, e: - log.error('Could not unpack archive %s: %s', path, e, exc_info=True) - raise beep.ProtocolError(550, 'Could not unpack archive (%s)' % e) + body = str(xml) + log.debug('Sending slave configuration: %s', body) + resp = self.request('POST', self.url, body, { + 'Content-Length': len(body), + 'Content-Type': 'application/x-bitten+xml' + }) - def execute_build(self, msgno, recipe): - """Execute a build. - - Execute every step in the recipe, and report the outcome of each - step back to the server using an ANS message. - - @param msgno: The identifier of the snapshot transmission message - @param recipe: The recipe object - @type recipe: an instance of L{bitten.recipe.Recipe} - """ - log.info('Building in directory %s', recipe.ctxt.basedir) - try: - if not self.session.dry_run: - xml = xmlio.Element('started', - time=datetime.utcnow().isoformat()) - self.channel.send_ans(msgno, beep.Payload(xml)) + if resp.status == 201: + self._initiate_build(resp.getheader('location')) + elif resp.status == 204: + log.info(resp.read()) + else: + log.error('Unexpected response (%d): %s', resp.status, resp.reason) + raise ExitSlave() - failed = False - for step in recipe: - log.info('Executing build step "%s"', step.id) - started = datetime.utcnow() - try: - xml = xmlio.Element('step', id=step.id, - description=step.description, - time=started.isoformat()) - step_failed = False - try: - for type, category, generator, output in \ - step.execute(recipe.ctxt): - if type == Recipe.ERROR: - step_failed = True - xml.append(xmlio.Element(type, category=category, - generator=generator)[ - output - ]) - except BuildError, e: - log.error('Build step %s failed (%s)', step.id, e) - failed = step_failed = True - except Exception, e: - log.error('Internal error in build step %s', - step.id, exc_info=True) - failed = step_failed = True - xml.attr['duration'] = (datetime.utcnow() - started).seconds - if step_failed: - xml.attr['result'] = 'failure' - log.warning('Build step %s failed', step.id) - else: - xml.attr['result'] = 'success' - log.info('Build step %s completed successfully', - step.id) - if not self.session.dry_run: - self.channel.send_ans(msgno, beep.Payload(xml)) - if step_failed and step.onerror == 'fail': - log.warning('Stopping build due to failure') - break - except InvalidRecipeError, e: - log.warning('Build step %s failed: %s', step.id, e) - duration = datetime.utcnow() - started + def _initiate_build(self, build_url): + build_id = int(build_url.split('/')[-1]) + log.info('Build %s pending at %s', build_id, build_url) + resp = self.request('GET', build_url) + if resp.status == 200: + xml = xmlio.parse(resp) + basedir = os.path.join(self.work_dir, 'build_%d' % build_id) + if not os.path.exists(basedir): + os.mkdir(basedir) + try: + recipe = Recipe(xml, basedir, self.config) + self._execute_build(build_url, recipe) + finally: + if not self.keep_files: + log.debug('Removing build directory %s' % basedir) + shutil.rmtree(basedir) + if self.single_build: + log.info('Exiting after single build completed.') + raise ExitSlave() + else: + log.error('Unexpected response (%d): %s', resp.status, resp.reason) + raise ExitSlave() + + def _execute_build(self, build_url, recipe): + for step in recipe: + log.info('Executing build step %r', step.id) + if not self._execute_step(build_url, recipe, step): + log.warning('Stopping build due to failure') + break + else: + log.warning('Build completed') + + def _execute_step(self, build_url, recipe, step): + failed = False + started = datetime.utcnow() + xml = xmlio.Element('result', time=started.isoformat()) + try: + for type, category, generator, output in \ + step.execute(recipe.ctxt): + if type == Recipe.ERROR: failed = True - xml = xmlio.Element('step', id=step.id, result='failure', - description=step.description, - time=started.isoformat(), - duration=duration.seconds)[ - xmlio.Element('error')[e] - ] - if not self.session.dry_run: - self.channel.send_ans(msgno, beep.Payload(xml)) + xml.append(xmlio.Element(type, category=category, + generator=generator)[ + output + ]) + except BuildError, e: + log.error('Build step %r failed (%s)', step.id, e) + failed = True + except Exception, e: + log.error('Internal error in build step %r', step.id, exc_info=True) + failed = True + xml.attr['duration'] = (datetime.utcnow() - started).seconds + if failed: + xml.attr['status'] = 'failure' + log.warning('Build step %r failed', step.id) + else: + xml.attr['status'] = 'success' + log.info('Build step %s completed successfully', step.id) - if failed: - log.warning('Build failed') - else: - log.info('Build completed successfully') - if not self.session.dry_run: - xml = xmlio.Element('completed', time=datetime.utcnow().isoformat(), - result=['success', 'failure'][failed]) - self.channel.send_ans(msgno, beep.Payload(xml)) + resp = self.request('PUT', build_url + '/steps/' + step.id, str(xml), { + 'Content-Type': 'application/x-bitten+xml' + }) + if resp.status != 200: + log.error('Unexpected response (%d): %s', resp.status, resp.reason) - self.channel.send_nul(msgno) - else: - xml = xmlio.Element('error', code=550)['Dry run'] - self.channel.send_err(msgno, beep.Payload(xml)) + return not failed or step.onerror != 'fail' - except InvalidRecipeError, e: - xml = xmlio.Element('error')[e] - self.channel.send_ans(msgno, beep.Payload(xml)) - self.channel.send_nul(msgno) - except (KeyboardInterrupt, SystemExit), e: - xml = xmlio.Element('aborted')['Build cancelled'] - self.channel.send_ans(msgno, beep.Payload(xml)) - self.channel.send_nul(msgno) - - raise beep.TerminateSession('Cancelled') +class ExitSlave(Exception): + """Exception used internally by the slave to signal that the slave process + should be stopped.""" def main(): @@ -293,7 +206,7 @@ from bitten import __version__ as VERSION from optparse import OptionParser - parser = OptionParser(usage='usage: %prog [options] host [port]', + parser = OptionParser(usage='usage: %prog [options] url', version='%%prog %s' % VERSION) parser.add_option('--name', action='store', dest='name', help='name of this slave (defaults to host name)') @@ -314,23 +227,16 @@ const=logging.INFO, help='print as much as possible') parser.add_option('-q', '--quiet', action='store_const', dest='loglevel', const=logging.ERROR, help='print as little as possible') - parser.add_option('-s', '--single', action='store_const', dest='single_build', - const=logging.ERROR, help='exit after completing a single build') + parser.add_option('-s', '--single', action='store_true', + dest='single_build', + help='exit after completing a single build') parser.set_defaults(dry_run=False, keep_files=False, loglevel=logging.WARNING, single_build=False) options, args = parser.parse_args() if len(args) < 1: parser.error('incorrect number of arguments') - host = args[0] - if len(args) > 1: - try: - port = int(args[1]) - assert (1 <= port <= 65535), 'port number out of range' - except (AssertionError, ValueError): - parser.error('port must be an integer in the range 1-65535') - else: - port = 7633 + url = args[0] logger = logging.getLogger('bitten') logger.setLevel(options.loglevel) @@ -347,16 +253,20 @@ handler.setFormatter(formatter) logger.addHandler(handler) - slave = Slave(host, port, name=options.name, config=options.config, - dry_run=options.dry_run, work_dir=options.work_dir, - keep_files=options.keep_files, - single_build=options.single_build) + slave = BuildSlave(url, name=options.name, config=options.config, + dry_run=options.dry_run, work_dir=options.work_dir, + keep_files=options.keep_files, + single_build=options.single_build) try: - slave.run() - except KeyboardInterrupt: - slave.quit() + try: + slave.run() + except KeyboardInterrupt: + slave.quit() + except ExitSlave: + pass - if not options.keep_files and os.path.isdir(slave.work_dir): + if not options.work_dir: + log.debug('Removing temporary directory %s' % slave.work_dir) shutil.rmtree(slave.work_dir) if __name__ == '__main__': diff --git a/bitten/snapshot.py b/bitten/snapshot.py deleted file mode 100644 --- a/bitten/snapshot.py +++ /dev/null @@ -1,325 +0,0 @@ -# -*- coding: utf-8 -*- -# -# Copyright (C) 2005 Christopher Lenz -# All rights reserved. -# -# This software is licensed as described in the file COPYING, which -# you should have received as part of this distribution. The terms -# are also available at http://bitten.cmlenz.net/wiki/License. - -"""Snapshot archive management. - -Snapshots of the code base are stored in the Trac environment as tar archives, -compressed using bzip2. - -These files use the naming convention C{[config_name]_r[revision].tar.bz2} so -they can be located programmatically after creation, and associated with the -build config and revision they apply to. - -These snapshot files are accompanied by a checksum file (using MD5). Any archive -file with no accompanying checksum is considered incomplete or invalid. - -For larger code bases, these snapshots may be relatively expensive to create. -Most of the time is spent in walking the repository directory and reading the -files it contains. To avoid blocking the build master while snapshots are -created, this is done in a worker thread. - -As snapshot archives are often very similar to each other for subsequent -revisions, an attempt is made to avoid the creation of new archives from -scratch. Instead, the build master keeps the most recently used archives (MRU -cache) and will build new archives based on the deltas provided by the version -control system. Using the nearest existing snapshot as the base, deleted files -and directories are removed from the snapshot, added files/directories are -added, and modified files are updated. -""" - -from datetime import datetime -import logging -import os -import posixpath -try: - import threading -except ImportError: - import dummy_threading as threading -import time -import tarfile - -from bitten.util import md5sum - -__all__ = ['SnapshotManager'] - -log = logging.getLogger('bitten.snapshot') - -MAX_SNAPSHOTS = 10 -SNAPSHOTS_DIR = 'snapshots' - - -class SnapshotManager(object): - """Manages snapshot archives for a specific build configuration.""" - - def __init__(self, config): - """Create the snapshot manager. - - @param config: the build configuration - @type config: an instance of L{bitten.model.BuildConfig} - """ - assert config and config.exists, 'Build configuration does not exist' - self.env = config.env - self.config = config - - self.prefix = config.name - self.directory = self.env.config.get('bitten', 'snapshots_dir', - os.path.join(self.env.path, - SNAPSHOTS_DIR)) - self.limit = int(self.env.config.get('bitten', 'max_snapshots', - MAX_SNAPSHOTS)) - - # Create the snapshots directory if it doesn't already exist - if not os.path.exists(self.directory): - os.mkdir(self.directory) - - # Make sure we have permissions to write to the directory - if not os.access(self.directory, os.R_OK + os.W_OK): - raise IOError('Insufficient permissions to create snapshots in %s' % - self.directory) - - # Collect a list of all existing snapshot archives - self._index = [] - for snapshot in self._scan(): - self._index.append(snapshot) - self._lock = threading.RLock() - self._cleanup() - - self._workers = {} - - def _scan(self): - """Find all existing snapshots in the directory.""" - for filename in [f for f in os.listdir(self.directory) - if f.startswith(self.prefix)]: - if not filename.endswith('.tar.bz2'): - continue - rest = filename[len(self.prefix):-8] - if not rest.startswith('_r'): - continue - rev = rest[2:] - - filepath = os.path.join(self.directory, filename) - try: - md5sum.validate(filepath) - except md5sum.IntegrityError, e: - log.warning('Integrity error checking %s (%s)', filepath, e) - os.remove(filepath) - continue - mtime = os.path.getmtime(filepath) - - yield mtime, rev, filepath - - def _cleanup(self, limit=None): - """Remove obsolete snapshots to preserve disk space. - - @param limit: the maximum number of snapshot archives to keep; defaults - to the value of the C{max_snapshots} value in C{trac.ini}. - """ - self._lock.acquire() - try: - self._index.sort(lambda a, b: -cmp(a[0], b[0])) - limit = limit or self.limit - if len(self._index) > limit: - for mtime, rev, path in self._index[limit:]: - log.debug('Removing snapshot %s', path) - os.remove(path) - md5file = path + '.md5' - if os.path.isfile(md5file): - os.remove(md5file) - else: - md5file = os.path.splitext(path)[0] + '.md5' - if os.path.isfile(md5file): - os.remove(md5file) - self._index = self._index[:limit] - finally: - self._lock.release() - - def create(self, rev): - """Create a new snapshot archive for the specified revision. - - The archive is created in a worker thread. The return value of this - function is the thread object. The caller may use this object to check - for completion of the operation. - - @param rev: the repository revision for which a snapshot archive should - be created - """ - self._lock.acquire() - try: - repos = self.env.get_repository() - new_root = repos.get_node(self.config.path or '/', rev) - assert new_root.isdir, '"%s" is not a directory' % self.config.path - - if new_root.rev in self._workers: - return self._workers[new_root.rev] - - new_prefix = self.prefix + '_r' + str(rev) - filename = new_prefix + '.tar.bz2' - new_filepath = os.path.join(self.directory, filename) - if os.path.exists(new_filepath): - raise IOError('Snapshot file already exists at %s' % - new_filepath) - - self._cleanup(self.limit - 1) - - existing = self._get_closest_match(repos, new_root) - if existing: - base_rev, base_filepath = existing - base_root = repos.get_node(self.config.path or '/', base_rev) - base_prefix = self.prefix + '_r' + str(base_rev) - else: - base_root = base_filepath = base_prefix = None - - worker = threading.Thread(target=self._create, - args=(repos, new_root, new_filepath, - new_prefix, base_root, - base_filepath, base_prefix), - name='Create snapshot %s' % filename) - worker.start() - self._workers[new_root.rev] = worker - return worker - finally: - self._lock.release() - - def _create(self, repos, new_root, new_filepath, new_prefix, base_root=None, - base_filepath=None, base_prefix=None): - """Actually create a snapshot archive. - - This is used internally from the C{create()} function and executed in a - worker thread. - """ - log.debug('Preparing snapshot archive for %s@%s', new_root.path, - new_root.rev) - if base_root: - base_rev = repos.next_rev(base_root.rev) - base_tar = tarfile.open(base_filepath, 'r:bz2') - base_tar.posix = False - new_tar = tarfile.open(new_filepath, 'w:bz2') - new_tar.posix = False - - def _add_entry(node): - name = node.path[len(self.config.path):] - if name.startswith('/'): - name = name[1:] - new_path = posixpath.join(new_prefix, name) - - # tarfile can't handle unicode - if isinstance(new_path, unicode): - new_path = new_path.encode('utf-8') - - if node.isdir: - log.debug('Adding directory %s/ to archive', name) - new_info = tarfile.TarInfo(new_path) - new_info.type = tarfile.DIRTYPE - new_info.mode = 0755 - new_tar.addfile(new_info) - - for entry in node.get_entries(): - _add_entry(entry) - time.sleep(.1) # be nice - else: - copy_base = False - if base_root and repos.has_node(node.path, base_root.rev): - base_node = repos.get_node(node.path, base_root.rev) - copy_base = base_node.rev == node.rev - - if copy_base: - # Copy entry from base archive - base_path = posixpath.join(base_prefix, name) - base_info = base_tar.getmember(base_path) - base_info.name = new_path - fileobj = base_tar.extractfile(base_info) - new_tar.addfile(base_info, fileobj) - fileobj.close() - del base_info, fileobj - - else: - # Create entry from repository - new_info = tarfile.TarInfo(new_path) - new_info.type = tarfile.REGTYPE - new_info.mtime = node.last_modified - if isinstance(new_info.mtime, datetime): - new_info.mtime = time.mktime(new_info.mtime.timetuple()) - new_info.size = node.content_length - - # FIXME: Subversion specific! This should really be an - # executable flag provided by Trac's versioncontrol - # API - if 'svn:executable' in node.get_properties(): - new_info.mode = 0755 - else: - new_info.mode = 0644 - - new_tar.addfile(new_info, node.get_content()) - - try: - _add_entry(new_root) - finally: - new_tar.close() - if base_root: - base_tar.close() - - # Create MD5 checksum file - md5sum.write(new_filepath) - - self._lock.acquire() - try: - self._index.append((os.path.getmtime(new_filepath), new_root.rev, - new_filepath)) - del self._workers[new_root.rev] - finally: - self._lock.release() - log.info('Prepared snapshot archive at %s', new_filepath) - - def get(self, rev): - """Returns the path to an already existing snapshot archive for the - specified revision. - - If no snapshot exists for the revision, this function returns C{None}. - - @param rev: the repository revision for which a snapshot archive should - be retrieved - """ - self._lock.acquire() - try: - for mtime, srev, path in self._index: - if str(rev) == str(srev): - return path - return None - finally: - self._lock.release() - - def _get_closest_match(self, repos, root): - """Determine which existing snapshot archive is closest to the - requested repository revision.""" - self._lock.acquire() - try: - distances = [] # (distance, rev) tuples - - for mtime, srev, path in self._index: - distance = 0 - srev = repos.normalize_rev(srev) - get_next = repos.next_rev - if repos.rev_older_than(root.rev, srev): - get_next = repos.previous_rev - nrev = srev - while nrev != root.rev: - distance += 1 - nrev = get_next(nrev) - if nrev is None: - distance = 0 - break - if distance: - distances.append((distance, srev, path)) - - if not distances: - return None - distances.sort() - return distances[0][1:] - finally: - self._lock.release() diff --git a/bitten/tests/__init__.py b/bitten/tests/__init__.py --- a/bitten/tests/__init__.py +++ b/bitten/tests/__init__.py @@ -9,18 +9,18 @@ import unittest -from bitten.tests import model, recipe, queue, slave, snapshot +from bitten.tests import master, model, recipe, queue, slave from bitten.build import tests as build from bitten.util import tests as util from bitten.trac_ext import tests as trac_ext def suite(): suite = unittest.TestSuite() + suite.addTest(master.suite()) suite.addTest(model.suite()) suite.addTest(recipe.suite()) suite.addTest(queue.suite()) suite.addTest(slave.suite()) - suite.addTest(snapshot.suite()) suite.addTest(build.suite()) suite.addTest(trac_ext.suite()) suite.addTest(util.suite()) diff --git a/bitten/tests/master.py b/bitten/tests/master.py new file mode 100644 --- /dev/null +++ b/bitten/tests/master.py @@ -0,0 +1,573 @@ +# -*- coding: utf-8 -*- +# +# Copyright (C) 2005 Christopher Lenz +# All rights reserved. +# +# This software is licensed as described in the file COPYING, which +# you should have received as part of this distribution. The terms +# are also available at http://bitten.cmlenz.net/wiki/License. + +from datetime import datetime +import re +import shutil +from StringIO import StringIO +import tempfile +import unittest + +from trac.perm import PermissionCache, PermissionSystem +from trac.test import EnvironmentStub, Mock +from trac.web.api import HTTPBadRequest, HTTPMethodNotAllowed, HTTPNotFound, \ + RequestDone +from trac.web.href import Href + +from bitten.master import BuildMaster +from bitten.model import BuildConfig, TargetPlatform, Build, BuildStep, \ + BuildLog, Report, schema +from bitten.trac_ext.compat import schema_to_sql +from bitten.trac_ext.main import BuildSystem + + +class BuildMasterTestCase(unittest.TestCase): + + def setUp(self): + self.env = EnvironmentStub() + self.env.path = tempfile.mkdtemp() + + PermissionSystem(self.env).grant_permission('hal', 'BUILD_EXEC') + + # Create tables + db = self.env.get_db_cnx() + cursor = db.cursor() + for table in schema: + for stmt in schema_to_sql(self.env, db, table): + cursor.execute(stmt) + + self.repos = Mock() + self.env.get_repository = lambda authname=None: self.repos + + def tearDown(self): + shutil.rmtree(self.env.path) + + def test_create_build(self): + BuildConfig(self.env, 'test', path='somepath', active=True).insert() + platform = TargetPlatform(self.env, config='test', name="Unix") + platform.rules.append(('family', 'posix')) + platform.insert() + + self.repos = Mock( + get_node=lambda path, rev=None: Mock( + get_entries=lambda: [Mock(), Mock()], + get_history=lambda: [('somepath', 123, 'edit'), + ('somepath', 121, 'edit'), + ('somepath', 120, 'edit')] + ), + get_changeset=lambda rev: Mock(date=42), + normalize_path=lambda path: path, + rev_older_than=lambda rev1, rev2: rev1 < rev2 + ) + + inheaders = {'Content-Type': 'application/x-bitten+xml'} + inbody = StringIO(""" + Power Macintosh + Darwin +""") + outheaders = {} + outbody = StringIO() + req = Mock(method='POST', base_path='', path_info='/builds', + href=Href('/trac'), abs_href=Href('http://example.org/trac'), + remote_addr='127.0.0.1', args={}, + perm=PermissionCache(self.env, 'hal'), + get_header=lambda x: inheaders.get(x), read=inbody.read, + send_response=lambda x: outheaders.setdefault('Status', x), + send_header=lambda x, y: outheaders.setdefault(x, y), + write=outbody.write) + + module = BuildMaster(self.env) + assert module.match_request(req) + try: + module.process_request(req) + self.fail('Expected RequestDone') + except RequestDone: + self.assertEqual(201, outheaders['Status']) + self.assertEqual('text/plain', outheaders['Content-Type']) + location = outheaders['Location'] + mo = re.match('http://example.org/trac/builds/(\d+)', location) + assert mo, 'Location was %r' % location + self.assertEqual('Build pending', outbody.getvalue()) + build = Build.fetch(self.env, int(mo.group(1))) + self.assertEqual(Build.IN_PROGRESS, build.status) + self.assertEqual('hal', build.slave) + + def test_create_build_invalid_xml(self): + inheaders = {'Content-Type': 'application/x-bitten+xml'} + inbody = StringIO('') + req = Mock(method='POST', base_path='', path_info='/builds', + href=Href('/trac'), remote_addr='127.0.0.1', args={}, + perm=PermissionCache(self.env, 'hal'), + get_header=lambda x: inheaders.get(x), read=inbody.read) + + module = BuildMaster(self.env) + assert module.match_request(req) + try: + module.process_request(req) + self.fail('Expected HTTPBadRequest') + except HTTPBadRequest, e: + self.assertEqual('XML parser error', e.detail) + + def test_create_build_no_post(self): + req = Mock(method='GET', base_path='', path_info='/builds', + href=Href('/trac'), remote_addr='127.0.0.1', args={}, + perm=PermissionCache(self.env, 'hal')) + module = BuildMaster(self.env) + assert module.match_request(req) + try: + module.process_request(req) + self.fail('Expected HTTPMethodNotAllowed') + except HTTPMethodNotAllowed, e: + self.assertEqual('Method not allowed', e.detail) + + def test_create_build_no_match(self): + inheaders = {'Content-Type': 'application/x-bitten+xml'} + inbody = StringIO(""" + Power Macintosh + Darwin +""") + outheaders = {} + outbody = StringIO() + req = Mock(method='POST', base_path='', path_info='/builds', + href=Href('/trac'), remote_addr='127.0.0.1', args={}, + perm=PermissionCache(self.env, 'hal'), + get_header=lambda x: inheaders.get(x), read=inbody.read, + send_response=lambda x: outheaders.setdefault('Status', x), + send_header=lambda x, y: outheaders.setdefault(x, y), + write=outbody.write) + + module = BuildMaster(self.env) + assert module.match_request(req) + try: + module.process_request(req) + self.fail('Expected RequestDone') + except RequestDone: + self.assertEqual(204, outheaders['Status']) + self.assertEqual('text/plain', outheaders['Content-Type']) + self.assertEqual('No pending builds', outbody.getvalue()) + + def test_initiate_build(self): + config = BuildConfig(self.env, 'test', path='somepath', active=True, + recipe='') + config.insert() + platform = TargetPlatform(self.env, config='test', name="Unix") + platform.rules.append(('family', 'posix')) + platform.insert() + build = Build(self.env, 'test', '123', platform.id, slave='hal', + rev_time=42) + build.insert() + + outheaders = {} + outbody = StringIO() + + req = Mock(method='GET', base_path='', + path_info='/builds/%d' % build.id, + href=Href('/trac'), remote_addr='127.0.0.1', args={}, + perm=PermissionCache(self.env, 'hal'), + send_response=lambda x: outheaders.setdefault('Status', x), + send_header=lambda x, y: outheaders.setdefault(x, y), + write=outbody.write) + + module = BuildMaster(self.env) + assert module.match_request(req) + try: + module.process_request(req) + self.fail('Expected RequestDone') + except RequestDone: + self.assertEqual(200, outheaders['Status']) + self.assertEqual('39', outheaders['Content-Length']) + self.assertEqual('application/x-bitten+xml', + outheaders['Content-Type']) + self.assertEqual('attachment; filename=recipe_test_r123.xml', + outheaders['Content-Disposition']) + self.assertEqual('', + outbody.getvalue()) + + # Make sure the started timestamp has been set + build = Build.fetch(self.env, build.id) + assert build.started + + def test_initiate_build_no_such_build(self): + req = Mock(method='GET', base_path='', + path_info='/builds/123', href=Href('/trac'), + remote_addr='127.0.0.1', args={}, + perm=PermissionCache(self.env, 'hal')) + + module = BuildMaster(self.env) + assert module.match_request(req) + try: + module.process_request(req) + self.fail('Expected HTTPNotFound') + except HTTPNotFound, e: + self.assertEqual('No such build', e.detail) + + def test_process_unknown_collection(self): + BuildConfig(self.env, 'test', path='somepath', active=True, + recipe='').insert() + build = Build(self.env, 'test', '123', 1, slave='hal', rev_time=42) + build.insert() + + req = Mock(method='PUT', base_path='', + path_info='/builds/%d/files/abc.zip' % build.id, + href=Href('/trac'), remote_addr='127.0.0.1', args={}, + perm=PermissionCache(self.env, 'hal')) + + module = BuildMaster(self.env) + assert module.match_request(req) + try: + module.process_request(req) + self.fail('Expected HTTPNotFound') + except HTTPNotFound, e: + self.assertEqual('No such collection', e.detail) + + def test_process_build_step_success(self): + recipe = """ + + +""" + BuildConfig(self.env, 'test', path='somepath', active=True, + recipe=recipe).insert() + build = Build(self.env, 'test', '123', 1, slave='hal', rev_time=42, + started=42) + build.insert() + + inbody = StringIO(""" +""") + outheaders = {} + outbody = StringIO() + req = Mock(method='PUT', base_path='', + path_info='/builds/%d/steps/foo' % build.id, + href=Href('/trac'), remote_addr='127.0.0.1', args={}, + perm=PermissionCache(self.env, 'hal'), + read=inbody.read, + send_response=lambda x: outheaders.setdefault('Status', x), + send_header=lambda x, y: outheaders.setdefault(x, y), + write=outbody.write) + module = BuildMaster(self.env) + assert module.match_request(req) + try: + module.process_request(req) + self.fail('Expected RequestDone') + except RequestDone: + self.assertEqual(200, outheaders['Status']) + self.assertEqual('20', outheaders['Content-Length']) + self.assertEqual('text/plain', outheaders['Content-Type']) + self.assertEqual('Build step processed', outbody.getvalue()) + + build = Build.fetch(self.env, build.id) + self.assertEqual(Build.SUCCESS, build.status) + assert build.stopped + assert build.stopped > build.started + + steps = list(BuildStep.select(self.env, build.id)) + self.assertEqual(1, len(steps)) + self.assertEqual('foo', steps[0].name) + self.assertEqual(BuildStep.SUCCESS, steps[0].status) + + def test_process_build_step_success_with_log(self): + recipe = """ + + +""" + BuildConfig(self.env, 'test', path='somepath', active=True, + recipe=recipe).insert() + build = Build(self.env, 'test', '123', 1, slave='hal', rev_time=42, + started=42) + build.insert() + + inbody = StringIO(""" + + Doing stuff + Ouch that hurt + +""") + outheaders = {} + outbody = StringIO() + req = Mock(method='PUT', base_path='', + path_info='/builds/%d/steps/foo' % build.id, + href=Href('/trac'), remote_addr='127.0.0.1', args={}, + perm=PermissionCache(self.env, 'hal'), + read=inbody.read, + send_response=lambda x: outheaders.setdefault('Status', x), + send_header=lambda x, y: outheaders.setdefault(x, y), + write=outbody.write) + module = BuildMaster(self.env) + assert module.match_request(req) + try: + module.process_request(req) + self.fail('Expected RequestDone') + except RequestDone: + self.assertEqual(200, outheaders['Status']) + self.assertEqual('20', outheaders['Content-Length']) + self.assertEqual('text/plain', outheaders['Content-Type']) + self.assertEqual('Build step processed', outbody.getvalue()) + + build = Build.fetch(self.env, build.id) + self.assertEqual(Build.SUCCESS, build.status) + assert build.stopped + assert build.stopped > build.started + + steps = list(BuildStep.select(self.env, build.id)) + self.assertEqual(1, len(steps)) + self.assertEqual('foo', steps[0].name) + self.assertEqual(BuildStep.SUCCESS, steps[0].status) + + logs = list(BuildLog.select(self.env, build=build.id, step='foo')) + self.assertEqual(1, len(logs)) + self.assertEqual('http://bitten.cmlenz.net/tools/python#unittest', + logs[0].generator) + self.assertEqual(2, len(logs[0].messages)) + self.assertEqual((u'info', u'Doing stuff'), logs[0].messages[0]) + self.assertEqual((u'error', u'Ouch that hurt'), logs[0].messages[1]) + + def test_process_build_step_success_with_report(self): + recipe = """ + + +""" + BuildConfig(self.env, 'test', path='somepath', active=True, + recipe=recipe).insert() + build = Build(self.env, 'test', '123', 1, slave='hal', rev_time=42, + started=42) + build.insert() + + inbody = StringIO(""" + + + Doing my thing + + +""") + outheaders = {} + outbody = StringIO() + req = Mock(method='PUT', base_path='', + path_info='/builds/%d/steps/foo' % build.id, + href=Href('/trac'), remote_addr='127.0.0.1', args={}, + perm=PermissionCache(self.env, 'hal'), + read=inbody.read, + send_response=lambda x: outheaders.setdefault('Status', x), + send_header=lambda x, y: outheaders.setdefault(x, y), + write=outbody.write) + module = BuildMaster(self.env) + assert module.match_request(req) + try: + module.process_request(req) + self.fail('Expected RequestDone') + except RequestDone: + self.assertEqual(200, outheaders['Status']) + self.assertEqual('20', outheaders['Content-Length']) + self.assertEqual('text/plain', outheaders['Content-Type']) + self.assertEqual('Build step processed', outbody.getvalue()) + + build = Build.fetch(self.env, build.id) + self.assertEqual(Build.SUCCESS, build.status) + assert build.stopped + assert build.stopped > build.started + + steps = list(BuildStep.select(self.env, build.id)) + self.assertEqual(1, len(steps)) + self.assertEqual('foo', steps[0].name) + self.assertEqual(BuildStep.SUCCESS, steps[0].status) + + reports = list(Report.select(self.env, build=build.id, step='foo')) + self.assertEqual(1, len(reports)) + self.assertEqual('test', reports[0].category) + self.assertEqual('http://bitten.cmlenz.net/tools/python#unittest', + reports[0].generator) + self.assertEqual(1, len(reports[0].items)) + self.assertEqual({ + 'fixture': 'my.Fixture', + 'file': 'my/test/file.py', + 'stdout': 'Doing my thing', + 'type': 'test', + }, reports[0].items[0]) + + def test_process_build_step_failure(self): + recipe = """ + + +""" + BuildConfig(self.env, 'test', path='somepath', active=True, + recipe=recipe).insert() + build = Build(self.env, 'test', '123', 1, slave='hal', rev_time=42, + started=42) + build.insert() + + inbody = StringIO(""" +""") + outheaders = {} + outbody = StringIO() + req = Mock(method='PUT', base_path='', + path_info='/builds/%d/steps/foo' % build.id, + href=Href('/trac'), remote_addr='127.0.0.1', args={}, + perm=PermissionCache(self.env, 'hal'), + read=inbody.read, + send_response=lambda x: outheaders.setdefault('Status', x), + send_header=lambda x, y: outheaders.setdefault(x, y), + write=outbody.write) + module = BuildMaster(self.env) + assert module.match_request(req) + try: + module.process_request(req) + self.fail('Expected RequestDone') + except RequestDone: + self.assertEqual(200, outheaders['Status']) + self.assertEqual('20', outheaders['Content-Length']) + self.assertEqual('text/plain', outheaders['Content-Type']) + self.assertEqual('Build step processed', outbody.getvalue()) + + build = Build.fetch(self.env, build.id) + self.assertEqual(Build.FAILURE, build.status) + assert build.stopped + assert build.stopped > build.started + + steps = list(BuildStep.select(self.env, build.id)) + self.assertEqual(1, len(steps)) + self.assertEqual('foo', steps[0].name) + self.assertEqual(BuildStep.FAILURE, steps[0].status) + + def test_process_build_step_failure_ignored(self): + recipe = """ + + +""" + BuildConfig(self.env, 'test', path='somepath', active=True, + recipe=recipe).insert() + build = Build(self.env, 'test', '123', 1, slave='hal', rev_time=42, + started=42) + build.insert() + + inbody = StringIO(""" +""") + outheaders = {} + outbody = StringIO() + req = Mock(method='PUT', base_path='', + path_info='/builds/%d/steps/foo' % build.id, + href=Href('/trac'), remote_addr='127.0.0.1', args={}, + perm=PermissionCache(self.env, 'hal'), + read=inbody.read, + send_response=lambda x: outheaders.setdefault('Status', x), + send_header=lambda x, y: outheaders.setdefault(x, y), + write=outbody.write) + module = BuildMaster(self.env) + assert module.match_request(req) + try: + module.process_request(req) + self.fail('Expected RequestDone') + except RequestDone: + self.assertEqual(200, outheaders['Status']) + self.assertEqual('20', outheaders['Content-Length']) + self.assertEqual('text/plain', outheaders['Content-Type']) + self.assertEqual('Build step processed', outbody.getvalue()) + + build = Build.fetch(self.env, build.id) + self.assertEqual(Build.SUCCESS, build.status) + assert build.stopped + assert build.stopped > build.started + + steps = list(BuildStep.select(self.env, build.id)) + self.assertEqual(1, len(steps)) + self.assertEqual('foo', steps[0].name) + self.assertEqual(BuildStep.FAILURE, steps[0].status) + + def test_process_build_step_invalid_xml(self): + recipe = """ + + +""" + BuildConfig(self.env, 'test', path='somepath', active=True, + recipe=recipe).insert() + build = Build(self.env, 'test', '123', 1, slave='hal', rev_time=42, + started=42) + build.insert() + + inbody = StringIO("""""") + req = Mock(method='PUT', base_path='', + path_info='/builds/%d/steps/foo' % build.id, + href=Href('/trac'), remote_addr='127.0.0.1', args={}, + perm=PermissionCache(self.env, 'hal'), + read=inbody.read) + + module = BuildMaster(self.env) + assert module.match_request(req) + try: + module.process_request(req) + self.fail('Expected HTTPBadRequest') + except HTTPBadRequest, e: + self.assertEqual('XML parser error', e.detail) + + def test_process_build_step_invalid_datetime(self): + recipe = """ + + +""" + BuildConfig(self.env, 'test', path='somepath', active=True, + recipe=recipe).insert() + build = Build(self.env, 'test', '123', 1, slave='hal', rev_time=42, + started=42) + build.insert() + + inbody = StringIO(""" +""") + req = Mock(method='PUT', base_path='', + path_info='/builds/%d/steps/foo' % build.id, + href=Href('/trac'), remote_addr='127.0.0.1', args={}, + perm=PermissionCache(self.env, 'hal'), + read=inbody.read) + + module = BuildMaster(self.env) + assert module.match_request(req) + try: + module.process_request(req) + self.fail('Expected HTTPBadRequest') + except HTTPBadRequest, e: + self.assertEqual("Invalid ISO date/time 'sometime tomorrow maybe'", + e.detail) + + def test_process_build_step_no_put(self): + BuildConfig(self.env, 'test', path='somepath', active=True, + recipe='').insert() + build = Build(self.env, 'test', '123', 1, slave='hal', rev_time=42, + started=42) + build.insert() + + req = Mock(method='POST', base_path='', + path_info='/builds/%d/steps/foo' % build.id, + href=Href('/trac'), remote_addr='127.0.0.1', args={}, + perm=PermissionCache(self.env, 'hal')) + + module = BuildMaster(self.env) + assert module.match_request(req) + try: + module.process_request(req) + self.fail('Expected HTTPMethodNotAllowed') + except HTTPMethodNotAllowed, e: + self.assertEqual('Method not allowed', e.detail) + + +def suite(): + suite = unittest.TestSuite() + suite.addTest(unittest.makeSuite(BuildMasterTestCase, 'test')) + return suite + +if __name__ == '__main__': + unittest.main(defaultTest='suite') diff --git a/bitten/tests/queue.py b/bitten/tests/queue.py --- a/bitten/tests/queue.py +++ b/bitten/tests/queue.py @@ -136,21 +136,22 @@ def tearDown(self): shutil.rmtree(self.env.path) - def test_next_pending_build(self): + def test_get_build_for_slave(self): """ Make sure that a pending build of an activated configuration is scheduled for a slave that matches the target platform. """ BuildConfig(self.env, 'test', active=True).insert() - build = Build(self.env, config='test', platform=1, rev=123, rev_time=42, - status=Build.PENDING) + platform = TargetPlatform(self.env, config='test', name='Foo') + platform.insert() + build = Build(self.env, config='test', platform=platform.id, rev=123, + rev_time=42, status=Build.PENDING) build.insert() build_id = build.id queue = BuildQueue(self.env) - queue.slaves[1] = ['foobar'] - build, slave = queue.get_next_pending_build(['foobar', 'dummy']) - self.assertEqual((build_id, 'foobar'), (build.id, slave)) + build = queue.get_build_for_slave('foobar', {}) + self.assertEqual(build_id, build.id) def test_next_pending_build_no_matching_slave(self): """ @@ -164,9 +165,8 @@ build_id = build.id queue = BuildQueue(self.env) - queue.slaves[2] = ['foobar'] - build, slave = queue.get_next_pending_build(['foobar', 'dummy']) - self.assertEqual((None, None), (build, slave)) + build = queue.get_build_for_slave('foobar', {}) + self.assertEqual(None, build) def test_next_pending_build_inactive_config(self): """ @@ -180,34 +180,20 @@ build.insert() queue = BuildQueue(self.env) - queue.slaves[1] = ['foobar'] - build, slave = queue.get_next_pending_build(['foobar', 'dummy']) - self.assertEqual((None, None), (build, slave)) + build = queue.get_build_for_slave('foobar', {}) + self.assertEqual(None, build) - def test_next_pending_build_slave_round_robin(self): - """ - Verify that if a slave is selected for a pending build, it is moved to - the end of the slave list for that target platform. - """ - BuildConfig(self.env, 'test', active=True).insert() - build = Build(self.env, config='test', platform=1, - rev=123, rev_time=42, status=Build.PENDING) - build.insert() - - queue = BuildQueue(self.env) - queue.slaves[1] = ['foo', 'bar', 'baz'] - build, slave = queue.get_next_pending_build(['foo']) - self.assertEqual(['bar', 'baz', 'foo'], queue.slaves[1]) - - def test_register_slave_match_simple(self): + def test_match_slave_match(self): BuildConfig(self.env, 'test', active=True).insert() platform = TargetPlatform(self.env, config='test', name="Unix") platform.rules.append(('family', 'posix')) platform.insert() + platform_id = platform.id queue = BuildQueue(self.env) - assert queue.register_slave('foo', {'family': 'posix'}) - self.assertEqual(['foo'], queue.slaves[platform.id]) + platforms = queue.match_slave('foo', {'family': 'posix'}) + self.assertEqual(1, len(platforms)) + self.assertEqual(platform_id, platforms[0].id) def test_register_slave_match_simple_fail(self): BuildConfig(self.env, 'test', active=True).insert() @@ -216,18 +202,20 @@ platform.insert() queue = BuildQueue(self.env) - assert not queue.register_slave('foo', {'family': 'nt'}) - self.assertRaises(KeyError, queue.slaves.__getitem__, platform.id) + platforms = queue.match_slave('foo', {'family': 'nt'}) + self.assertEqual([], platforms) def test_register_slave_match_regexp(self): BuildConfig(self.env, 'test', active=True).insert() platform = TargetPlatform(self.env, config='test', name="Unix") platform.rules.append(('version', '8\.\d\.\d')) platform.insert() + platform_id = platform.id queue = BuildQueue(self.env) - assert queue.register_slave('foo', {'version': '8.2.0'}) - self.assertEqual(['foo'], queue.slaves[platform.id]) + platforms = queue.match_slave('foo', {'version': '8.2.0'}) + self.assertEqual(1, len(platforms)) + self.assertEqual(platform_id, platforms[0].id) def test_register_slave_match_regexp_multi(self): BuildConfig(self.env, 'test', active=True).insert() @@ -235,10 +223,12 @@ platform.rules.append(('os', '^Linux')) platform.rules.append(('processor', '^[xi]\d?86$')) platform.insert() + platform_id = platform.id queue = BuildQueue(self.env) - assert queue.register_slave('foo', {'os': 'Linux', 'processor': 'i686'}) - self.assertEqual(['foo'], queue.slaves[platform.id]) + platforms = queue.match_slave('foo', {'os': 'Linux', 'processor': 'i686'}) + self.assertEqual(1, len(platforms)) + self.assertEqual(platform_id, platforms[0].id) def test_register_slave_match_regexp_fail(self): BuildConfig(self.env, 'test', active=True).insert() @@ -247,8 +237,8 @@ platform.insert() queue = BuildQueue(self.env) - assert not queue.register_slave('foo', {'version': '7.8.1'}) - self.assertRaises(KeyError, queue.slaves.__getitem__, platform.id) + platforms = queue.match_slave('foo', {'version': '7.8.1'}) + self.assertEqual([], platforms) def test_register_slave_match_regexp_invalid(self): BuildConfig(self.env, 'test', active=True).insert() @@ -257,34 +247,8 @@ platform.insert() queue = BuildQueue(self.env) - assert not queue.register_slave('foo', {'version': '7.8.1'}) - self.assertRaises(KeyError, queue.slaves.__getitem__, platform.id) - - def test_unregister_slave_no_builds(self): - queue = BuildQueue(self.env) - queue.slaves[1] = ['foo', 'bar'] - queue.slaves[2] = ['baz'] - queue.unregister_slave('bar') - self.assertEqual(['foo'], queue.slaves[1]) - self.assertEqual(['baz'], queue.slaves[2]) - - def test_unregister_slave_in_progress_build(self): - build = Build(self.env, config='test', platform=1, rev=123, rev_time=42, - slave='foo', status=Build.IN_PROGRESS) - build.insert() - - queue = BuildQueue(self.env) - queue.slaves[1] = ['foo', 'bar'] - queue.slaves[2] = ['baz'] - queue.unregister_slave('bar') - self.assertEqual(['foo'], queue.slaves[1]) - self.assertEqual(['baz'], queue.slaves[2]) - - build = Build.fetch(self.env, id=build.id) - self.assertEqual(Build.PENDING, build.status) - self.assertEqual('', build.slave) - self.assertEqual({}, build.slave_info) - self.assertEqual(0, build.started) + platforms = queue.match_slave('foo', {'version': '7.8.1'}) + self.assertEqual([], platforms) def suite(): diff --git a/bitten/tests/slave.py b/bitten/tests/slave.py --- a/bitten/tests/slave.py +++ b/bitten/tests/slave.py @@ -14,15 +14,14 @@ import zipfile from trac.test import Mock -from bitten.slave import Slave, OrchestrationProfileHandler -from bitten.util import beep +from bitten.slave import BuildSlave -class OrchestrationProfileHandlerTestCase(unittest.TestCase): +class BuildSlaveTestCase(unittest.TestCase): def setUp(self): self.work_dir = tempfile.mkdtemp(prefix='bitten_test') - self.slave = Slave(None, None, work_dir=self.work_dir) + self.slave = Slave(None, work_dir=self.work_dir) self.handler = OrchestrationProfileHandler(Mock(session=self.slave)) def tearDown(self): @@ -34,23 +33,10 @@ fd.close() return filename - def test_unpack_invalid_snapshot(self): - """ - Verify handling of `TarError` exceptions when trying to unpack an - invalid .tar.bz2 file. - """ - path = self._create_file('invalid.tar.bz2') - tarbz2 = file(path, 'w') - tarbz2.write('INVALID') - tarbz2.close() - self.assertRaises(beep.ProtocolError, self.handler.unpack_snapshot, - os.path.dirname(path), 'invalid.tar.bz2') - def suite(): suite = unittest.TestSuite() - suite.addTest(unittest.makeSuite(OrchestrationProfileHandlerTestCase, - 'test')) + suite.addTest(unittest.makeSuite(BuildSlaveTestCase, 'test')) return suite if __name__ == '__main__': diff --git a/bitten/tests/snapshot.py b/bitten/tests/snapshot.py deleted file mode 100644 --- a/bitten/tests/snapshot.py +++ /dev/null @@ -1,205 +0,0 @@ -# -*- coding: utf-8 -*- -# -# Copyright (C) 2005 Christopher Lenz -# All rights reserved. -# -# This software is licensed as described in the file COPYING, which -# you should have received as part of this distribution. The terms -# are also available at http://bitten.cmlenz.net/wiki/License. - -import md5 -import os -import shutil -import tarfile -import tempfile -import unittest - -from trac.test import EnvironmentStub, Mock -from bitten.model import BuildConfig -from bitten.snapshot import SnapshotManager -from bitten.trac_ext.compat import schema_to_sql -from bitten.util import md5sum - - -class SnapshotManagerTestCase(unittest.TestCase): - - def setUp(self): - self.env = EnvironmentStub() - self.env.path = tempfile.mkdtemp(prefix='bitten_test') - os.mkdir(os.path.join(self.env.path, 'snapshots')) - db = self.env.get_db_cnx() - cursor = db.cursor() - for table in BuildConfig._schema: - for stmt in schema_to_sql(self.env, db, table): - cursor.execute(stmt) - db.commit() - self.config = BuildConfig(self.env, name='foo', path='trunk') - self.config.insert() - - def tearDown(self): - shutil.rmtree(self.env.path) - - def _create_file(self, path, create_md5sum=True): - filename = os.path.join(self.env.path, path) - fileobj = file(filename, 'w') - fileobj.close() - if create_md5sum: - md5sum.write(filename) - return filename - - def test_empty(self): - snapshots = SnapshotManager(self.config) - self.assertEqual([], snapshots._index) - self.assertEqual(None, snapshots.get(123)) - - def test_get(self): - path1 = self._create_file(os.path.join('snapshots', 'foo_r123.tar.bz2')) - path2 = self._create_file(os.path.join('snapshots', 'foo_r124.tar.bz2')) - snapshots = SnapshotManager(self.config) - self.assertEqual(path1, snapshots.get(123)) - self.assertEqual(path2, snapshots.get(124)) - - def test_get_prefix_match(self): - path1 = self._create_file(os.path.join('snapshots', 'foo_r123.tar.bz2')) - self._create_file(os.path.join('snapshots', 'bar_r124.tar.bz2')) - snapshots = SnapshotManager(self.config) - self.assertEqual(path1, snapshots.get(123)) - self.assertEqual(None, snapshots.get(124)) - - def test_get_wrong_extension(self): - path1 = self._create_file(os.path.join('snapshots', 'foo_r123.tar.bz2')) - self._create_file(os.path.join('snapshots', 'foo_r124.doc')) - snapshots = SnapshotManager(self.config) - self.assertEqual(path1, snapshots.get(123)) - self.assertEqual(None, snapshots.get(124)) - - def test_get_missing_rev(self): - path1 = self._create_file(os.path.join('snapshots', 'foo_r123.tar.bz2')) - self._create_file(os.path.join('snapshots', 'foo124.doc')) - snapshots = SnapshotManager(self.config) - self.assertEqual(path1, snapshots.get(123)) - self.assertEqual(None, snapshots.get(124)) - - def test_get_missing_md5sum(self): - path1 = self._create_file(os.path.join('snapshots', 'foo_r123.tar.bz2')) - self._create_file(os.path.join('snapshots', 'foo_r124.zip'), - create_md5sum=False) - snapshots = SnapshotManager(self.config) - self.assertEqual(path1, snapshots.get(123)) - self.assertEqual(None, snapshots.get(124)) - - def test_get_wrong_md5sum(self): - path1 = self._create_file(os.path.join('snapshots', 'foo_r123.tar.bz2')) - path2 = self._create_file(os.path.join('snapshots', 'foo_r124.tar.bz2'), - create_md5sum=False) - - md5sum.write(path1, path2 + '.md5') - snapshots = SnapshotManager(self.config) - self.assertEqual(path1, snapshots.get(123)) - self.assertEqual(None, snapshots.get(124)) - - def test_cleanup_on_init(self): - self.env.config.set('bitten', 'max_snapshots', '3') - path1 = self._create_file(os.path.join('snapshots', 'foo_r123.tar.bz2')) - path2 = self._create_file(os.path.join('snapshots', 'foo_r124.tar.bz2')) - path3 = self._create_file(os.path.join('snapshots', 'foo_r125.tar.bz2')) - self._create_file(os.path.join('snapshots', 'foo_r126.tar.bz2')) - snapshots = SnapshotManager(self.config) - self.assertEqual(path1, snapshots.get(123)) - self.assertEqual(path2, snapshots.get(124)) - self.assertEqual(path3, snapshots.get(125)) - self.assertEqual(None, snapshots.get(126)) - - def test_cleanup_explicit(self): - path1 = self._create_file(os.path.join('snapshots', 'foo_r123.tar.bz2')) - path2 = self._create_file(os.path.join('snapshots', 'foo_r124.tar.bz2')) - path3 = self._create_file(os.path.join('snapshots', 'foo_r125.tar.bz2')) - snapshots = SnapshotManager(self.config) - path4 = self._create_file(os.path.join('snapshots', 'foo_r126.tar.bz2')) - snapshots._index.append((os.path.getmtime(path4), 126, path4)) - snapshots._cleanup(3) - self.assertEqual(path1, snapshots.get(123)) - self.assertEqual(path2, snapshots.get(124)) - self.assertEqual(path3, snapshots.get(125)) - self.assertEqual(None, snapshots.get(126)) - - def test_create_not_a_directory(self): - repos = Mock(get_node=lambda path, rev: Mock(isdir=False)) - self.env.get_repository = lambda authname=None: repos - snapshots = SnapshotManager(self.config) - self.assertRaises(AssertionError, snapshots.create, 123) - - def test_create_empty(self): - root_dir = Mock(isdir=True, get_entries=lambda: [], path='trunk', - rev=123) - repos = Mock(get_node=lambda path, rev: root_dir) - self.env.get_repository = lambda authname=None: repos - snapshots = SnapshotManager(self.config) - snapshots.create(123).join() - path = snapshots.get(123) - assert path is not None - assert path.endswith('foo_r123.tar.bz2') - entries = tarfile.open(path, 'r:bz2').getmembers() - self.assertEqual(1, len(entries)) - self.assertEqual('foo_r123/', entries[0].name) - - def test_create_empty_dir(self): - empty_dir = Mock(isdir=True, get_entries=lambda: [], path='trunk/empty') - root_dir = Mock(isdir=True, get_entries=lambda: [empty_dir], - path='trunk', rev=123) - repos = Mock(get_node=lambda path, rev: root_dir) - self.env.get_repository = lambda authname=None: repos - snapshots = SnapshotManager(self.config) - snapshots.create(123).join() - path = snapshots.get(123) - assert path is not None - assert path.endswith('foo_r123.tar.bz2') - entries = tarfile.open(path, 'r:bz2').getmembers() - self.assertEqual(2, len(entries)) - self.assertEqual('foo_r123/', entries[0].name) - self.assertEqual('foo_r123/empty/', entries[1].name) - - def test_get_closest_match_backward(self): - path1 = self._create_file(os.path.join('snapshots', 'foo_r123.tar.bz2')) - path2 = self._create_file(os.path.join('snapshots', 'foo_r124.tar.bz2')) - - empty_dir = Mock(isdir=True, get_entries=lambda: [], path='trunk/empty') - root_dir = Mock(isdir=True, get_entries=lambda: [empty_dir], - path='trunk', rev=125) - repos = Mock(get_node=lambda path, rev: root_dir, - normalize_rev=lambda rev: int(rev), - rev_older_than=lambda rev1, rev2: rev1 < rev2, - next_rev=lambda rev: int(rev) + 1, - previous_rev=lambda rev: int(rev) - 1) - self.env.get_repository = lambda authname=None: repos - - snapshots = SnapshotManager(self.config) - match = snapshots._get_closest_match(repos, root_dir) - self.assertEqual((124, path2), match) - - def test_get_closest_match_forward(self): - path1 = self._create_file(os.path.join('snapshots', 'foo_r123.tar.bz2')) - path2 = self._create_file(os.path.join('snapshots', 'foo_r124.tar.bz2')) - - empty_dir = Mock(isdir=True, get_entries=lambda: [], path='trunk/empty') - root_dir = Mock(isdir=True, get_entries=lambda: [empty_dir], - path='trunk', rev=122) - repos = Mock(get_node=lambda path, rev: root_dir, - normalize_rev=lambda rev: int(rev), - rev_older_than=lambda rev1, rev2: rev1 < rev2, - next_rev=lambda rev: int(rev) + 1, - previous_rev=lambda rev: int(rev) - 1) - self.env.get_repository = lambda authname=None: repos - - snapshots = SnapshotManager(self.config) - match = snapshots._get_closest_match(repos, root_dir) - self.assertEqual((123, path1), match) - - -def suite(): - suite = unittest.TestSuite() - suite.addTest(unittest.makeSuite(SnapshotManagerTestCase, 'test')) - return suite - -if __name__ == '__main__': - unittest.main(defaultTest='suite') diff --git a/bitten/trac_ext/main.py b/bitten/trac_ext/main.py --- a/bitten/trac_ext/main.py +++ b/bitten/trac_ext/main.py @@ -76,7 +76,8 @@ # IPermissionRequestor methods def get_permission_actions(self): - actions = ['BUILD_VIEW', 'BUILD_CREATE', 'BUILD_MODIFY', 'BUILD_DELETE'] + actions = ['BUILD_VIEW', 'BUILD_CREATE', 'BUILD_MODIFY', 'BUILD_DELETE', + 'BUILD_EXEC'] return actions + [('BUILD_ADMIN', actions)] # IWikiSyntaxProvider methods diff --git a/bitten/util/beep.py b/bitten/util/beep.py deleted file mode 100644 --- a/bitten/util/beep.py +++ /dev/null @@ -1,947 +0,0 @@ -# -*- coding: utf-8 -*- -# -# Copyright (C) 2005 Christopher Lenz -# All rights reserved. -# -# This software is licensed as described in the file COPYING, which -# you should have received as part of this distribution. The terms -# are also available at http://bitten.cmlenz.net/wiki/License. - -"""Minimal implementation of the BEEP protocol (IETF RFC 3080) based on the -`asyncore` module. - -Current limitations: - - No support for the TLS and SASL profiles. - - No support for mapping frames (SEQ frames for TCP mapping). - - No localization support (xml:lang attribute). -""" - -import asynchat -import asyncore -from datetime import datetime, timedelta -import email -from heapq import heappop, heappush -import logging -import socket -try: - set -except NameError: - from sets import Set as set -try: - from cStringIO import StringIO -except ImportError: - from StringIO import StringIO -import sys - -from bitten.util import xmlio - -__all__ = ['Listener', 'Initiator', 'Payload', 'ProfileHandler', - 'ProtocolError'] - -BEEP_XML = 'application/beep+xml' - -log = logging.getLogger('bitten.beep') - - -class ProtocolError(Exception): - """Generic root class for BEEP errors.""" - - _default_messages = { - 421: 'Service Not Available', - 450: 'Requested Action Not Taken', - 451: 'Requested Action Aborted', - 454: 'Temporary Authentication Failure', - 500: 'General Syntax Error', - 501: 'Syntax Error In Parameters', - 504: 'Parameter Not Implemented', - 530: 'Authentication Required', - 534: 'Authentication Mechanism Insufficient', - 535: 'Authentication Failure', - 537: 'Action Not Authorised For User', - 538: 'Authentication Mechanism Requires Encryption', - 550: 'Requested Action Not Taken', - 553: 'Parameter Invalid', - 554: 'Transaction Failed' - } - - def __init__(self, code, message=None): - """Create the error. - - A numeric error code must be provided as the first parameter. A message - can be provided; if it is omitted, a standard error message will be - used in case the error code is known. - - @param code: The error code - @param message: An error message (optional) - """ - if message is None: - message = ProtocolError._default_messages.get(code) - Exception.__init__(self, 'BEEP error %d (%s)' % (code, message)) - self.code = code - self.message = message - self.local = True - - def from_xml(cls, xml): - """Create an error object from the given XML element. - - @param xml: The XML element representing the error - @type xml: An instance of L{bitten.util.xmlio.ParsedElement} - @return: The created C{ProtocolError} object - """ - elem = xmlio.parse(xml) - obj = cls(int(elem.attr['code']), elem.gettext()) - obj.local = False - return obj - from_xml = classmethod(from_xml) - - def to_xml(self): - """Create an XML representation of the error. - - @return: The created XML element - """ - return xmlio.Element('error', code=self.code)[self.message] - - -class TerminateSession(Exception): - """Signal termination of a session.""" - - -class EventLoop(object): - """An `asyncore` event loop. - - This will interrupt the normal asyncore loop at configurable intervals, and - execute scheduled callbacks. - """ - - def __init__(self): - """Create the event loop.""" - self.eventqueue = [] - - def run(self, timeout=15.0, granularity=5): - """Start the event loop.""" - granularity = timedelta(seconds=granularity) - socket_map = asyncore.socket_map - last_event_check = datetime.min - try: - while socket_map: - now = datetime.now() - if now - last_event_check >= granularity: - last_event_check = now - while self.eventqueue: - when, callback = heappop(self.eventqueue) - if now < when: - heappush(self.eventqueue, (when, callback)) - log.debug('Checking done %d events.', - len(self.eventqueue)) - break - try: - callback() - except Exception: - log.error('Exception caught firing callback %s. ' - 'Ignoring.', callback.__name__) - try: - asyncore.loop(timeout, True, None, 1) - except Exception: - log.error('Exception caught in asyncore.loop, ignoring.'); - except: - log.error('Exception caught in run()'); - - - def schedule(self, delta, callback): - """Schedule a function to be called. - - @param delta: The number of seconds after which the callback should be - invoked - @param callback: The function to call - """ - when = datetime.now() + timedelta(seconds=delta) - log.debug('Scheduling event %s to run at %s', callback.__name__, when) - - heappush(self.eventqueue, (when, callback)) - - -class Listener(EventLoop, asyncore.dispatcher): - """BEEP peer in the listener role. - - This peer opens a socket for listening to incoming connections. For each - connection, it opens a new session: an instance of `Session` that handle - communication with the connected peer. - """ - def __init__(self, ip, port): - """Create the listener. - - @param ip: The IP address or host name to bind to - @type ip: a string - @param port: The TCP port number to bind to - @type port: an int - """ - EventLoop.__init__(self) - asyncore.dispatcher.__init__(self) - self.create_socket(socket.AF_INET, socket.SOCK_STREAM) - self.set_reuse_addr() - self.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1) - self.bind((ip, port)) - self.sessions = [] - self.profiles = {} # Mapping from URIs to ProfileHandler sub-classes - log.debug('Listening to connections on %s:%d', ip, port) - self.listen(5) - - def writable(self): - """Called by asyncore to determine whether the channel is writable.""" - return False - - def handle_read(self): - """Called by asyncore to signal data available for reading.""" - - def readable(self): - """Called by asyncore to determine whether the channel is readable.""" - return True - - def handle_accept(self): - """Start a new BEEP session initiated by a peer.""" - conn, (ip, port) = self.accept() - log.debug('Connected to %s:%d', ip, port) - self.sessions.append(Session(self, conn, (ip, port), self.profiles, - first_channelno=2)) - - def quit(self): - """Shutdown the listener, attempting to gracefully quit all active BEEP - sessions by first closing their respective channels.""" - if not self.sessions: - self.close() - return - def terminate_next_session(): - session = self.sessions[-1] - def handle_ok(): - if self.sessions: - terminate_next_session() - else: - self.close() - def handle_error(channelno, code, message): - log.error('Failed to close channel %d', channelno) - log.debug('Closing session with %s', session.addr) - session.terminate(handle_ok=handle_ok, handle_error=handle_error) - self.schedule(0, terminate_next_session) - self.run(.5) - - -class Session(asynchat.async_chat): - """A BEEP session between two peers.""" - - def __init__(self, listener=None, conn=None, addr=None, profiles=None, - first_channelno=1): - """Create the BEEP session. - - @param listener: The `Listener` this session belongs to, or `None` for - a session by the initiating peer - @param conn: The connection - @param addr: The address of the remote peer, a (IP-address, port) tuple - @param profiles: A dictionary of supported profiles; the keys are the - URIs of the profiles, the values are corresponding - sub-classes of `ProfileHandler` - @param first_channelno: The first channel number to request; 0 for the - peer in the listening role, 1 for initiators - """ - asynchat.async_chat.__init__(self, conn) - if conn: - self.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1) - - self.listener = listener - self.addr = addr - self.set_terminator('\r\n') - - self.profiles = profiles or {} - self.inbuf = [] - self.header = self.payload = None - - self.channelno = cycle_through(first_channelno, 2147483647, step=2) - self.channels = {0: Channel(self, 0, ManagementProfileHandler)} - - def close(self): - if self.listener: - log.debug('Closing connection to %s:%s', self.addr[0], self.addr[1]) - self.listener.sessions.remove(self) - else: - log.info('Session terminated') - asynchat.async_chat.close(self) - - def handle_close(self): - log.warning('Peer %s:%s closed connection' % self.addr) - channels = self.channels.keys() - channels.reverse() - for channelno in channels: - self.channels[channelno].close() - asynchat.async_chat.handle_close(self) - - def handle_error(self): - """Called by asyncore when an exception is raised.""" - cls, value = sys.exc_info()[:2] - if cls is TerminateSession: - raise - log.exception(value) - - def collect_incoming_data(self, data): - """Called by async_chat when data is received. - - Buffer the data and wait until a terminator is found.""" - self.inbuf.append(data) - - def found_terminator(self): - """Called by async_chat when a terminator is found in the input - stream. - - Parse the incoming data depending on whether it terminated on the frame - header, playload or trailer. For the header, extract the payload size - parameter and use it as terminator or the payload. When the trailer has - been received, delegate to `_handle_frame()`. - """ - if self.header is None: - # Frame header received - self.header = ''.join(self.inbuf).split(' ') - self.inbuf = [] - if self.header[0] == 'SEQ': - size = 0 - else: - # Extract payload size to use as next terminator - try: - size = int(self.header[int(self.header[0] != 'ANS') - 2]) - except ValueError: - log.error('Malformed frame header: [%s]', - ' '.join(self.header), exc_info=True) - self.header = None - raise TerminateSession('Malformed frame header') - if size == 0: - self.payload = '' - self.set_terminator('END\r\n') - else: - self.set_terminator(size) - elif self.payload is None: - # Frame payload received - self.payload = ''.join(self.inbuf) - self.inbuf = [] - self.set_terminator('END\r\n') - else: - # Frame trailer received - try: - self._handle_frame(self.header, self.payload) - finally: - self.header = self.payload = None - self.inbuf = [] - self.set_terminator('\r\n') - - def _handle_frame(self, header, payload): - """Handle an incoming frame. - - This parses the frame header and decides which channel to pass it to. - """ - log.debug('Handling frame [%s]', ' '.join(header)) - msgno = None - channel = None - try: - cmd = header[0].upper() - channel = int(header[1]) - if cmd == 'SEQ': - ackno = int(header[2]) - window = int(header[3]) - self.channels[channel].handle_seq_frame(ackno, window) - else: - assert cmd in ('MSG', 'RPY', 'ERR', 'ANS', 'NUL') - msgno = int(header[2]) - assert header[3] in ('*', '.') - more = header[3] == '*' - seqno = int(header[4]) - ansno = None - if cmd == 'ANS': - ansno = int(header[6]) - try: - self.channels[channel].handle_data_frame(cmd, msgno, more, - seqno, ansno, - payload) - except ProtocolError, e: - log.error(e) - if e.local and msgno is not None: - self.channels[channel].send_err(msgno, - Payload(e.to_xml())) - - except (AssertionError, IndexError, TypeError, ValueError), e: - log.error('Malformed frame', exc_info=True) - raise TerminateSession('Malformed frame header') - - def terminate(self, handle_ok=None, handle_error=None): - """Terminate the session by closing all channels.""" - def close_next_channel(): - channelno = max(self.channels.keys()) - def _handle_ok(): - if channelno == 0: - if handle_ok is not None: - handle_ok() - else: - close_next_channel() - def _handle_error(code, message): - log.error('Peer refused to close channel %d: %s (%d)', - channelno, message, code) - if handle_error is not None: - handle_error(channelno, code, message) - else: - raise ProtocolError(code, message) - self.channels[0].profile.send_close(channelno, handle_ok=_handle_ok, - handle_error=_handle_error) - close_next_channel() - - -class Initiator(EventLoop, Session): - """Root class for BEEP peers in the initiating role.""" - - def __init__(self, ip, port, profiles=None): - """Create the BEEP session. - - @param ip: The IP address to connect to - @param port: The port to connect to - @param profiles: A dictionary of the supported profiles, where the key - is the URI identifying the profile, and the value is a - `ProfileHandler` sub-class that will be instantiated to - handle the communication for that profile - """ - EventLoop.__init__(self) - Session.__init__(self, profiles=profiles or {}) - self.create_socket(socket.AF_INET, socket.SOCK_STREAM) - log.debug('Connecting to %s:%s', ip, port) - if ip and port: - self.addr = (ip, port) - self.connect(self.addr) - - def handle_connect(self): - """Called by asyncore when the connection is established.""" - log.debug('Connected to peer at %s:%s', self.addr[0], self.addr[1]) - - def greeting_received(self, profiles): - """Sub-classes should override this to start the channels they need. - - @param profiles: A list of URIs of the profiles the peer claims to - support. - """ - - def quit(self): - """Stops the build slave, attempting to gracefully quit the session by - closing all active channels.""" - self.terminate() - asyncore.loop(timeout=10) - - -class Channel(object): - """A specific channel of a BEEP session.""" - - def __init__(self, session, channelno, profile_cls): - """Create the channel. - - @param session: The L{Session} object that the channel belongs to - @param channelno: The channel number - @param profile_cls: The associated L{ProfileHandler} class - """ - self.session = session - self.channelno = channelno - self.windowsize = 4096 - self.inqueue = {} - self.reply_handlers = {} - - self.msgno = cycle_through(0, 2147483647) - self.msgnos = set() # message numbers currently in use - self.ansnos = {} # answer numbers keyed by msgno, each 0-2147483647 - - # incoming, outgoing sequence numbers - self.seqno = [SerialNumber(), SerialNumber()] - - self.profile = profile_cls(self) - self.profile.handle_connect() - - def close(self): - """Close the channel.""" - self.profile.handle_disconnect() - del self.session.channels[self.channelno] - - def handle_seq_frame(self, ackno, window): - """Process a TCP mapping frame (SEQ). - - @param ackno: the value of the next sequence number that the sender is - expecting to receive on this channel - @param window: window size, the number of payload octets per frame that - the sender is expecting to receive on this channel - """ - self.windowsize = window - - def handle_data_frame(self, cmd, msgno, more, seqno, ansno, payload): - """Process a single data frame. - - @param cmd: The frame keyword (MSG, RPY, ERR, ANS or NUL) - @param msgno: The message number - @param more: `True` if more frames are pending for this message - @param seqno: Sequence number of the frame - @param ansno: The answer number for 'ANS' messages, otherwise `None` - @param payload: The frame payload as a string - """ - # Validate and update sequence number - if seqno != self.seqno[0]: - raise TerminateSession('Out of sync with peer') - self.seqno[0] += len(payload) - - if more: - # More of this message pending, so push it on the queue - self.inqueue.setdefault(msgno, []).append(payload) - return - - # Complete message received, so handle it - if msgno in self.inqueue: - # Recombine queued messages - payload = ''.join(self.inqueue[msgno]) + payload - del self.inqueue[msgno] - if cmd in ('ERR', 'RPY', 'NUL') and msgno in self.msgnos: - # Final reply using this message number, so dealloc - self.msgnos.remove(msgno) - if payload: - payload = Payload.parse(payload) - else: - payload = None - - if cmd == 'MSG': - self.profile.handle_msg(msgno, payload) - else: - if msgno in self.reply_handlers: - try: - self.reply_handlers[msgno](cmd, msgno, ansno, payload) - finally: - if cmd != 'ANS': - del self.reply_handlers[msgno] - elif cmd == 'RPY': - self.profile.handle_rpy(msgno, payload) - elif cmd == 'ERR': - self.profile.handle_err(msgno, payload) - elif cmd == 'ANS': - self.profile.handle_ans(msgno, ansno, payload) - elif cmd == 'NUL': - self.profile.handle_nul(msgno) - - def send_msg(self, payload, handle_reply=None, force_flush=True): - """Send a MSG frame to the peer. - - @param payload: The message payload (a `Payload` instance) - @param handle_reply: A function that is called when a reply to this - message is received - @param force_flush: Flush the beep channel after sending the message - @return: the message number assigned to the message - """ - while True: # Find a unique message number - msgno = self.msgno.next() - if msgno not in self.msgnos: - break - self.msgnos.add(msgno) # Flag the chosen message number as in use - if handle_reply is not None: - assert callable(handle_reply), 'Reply handler must be callable' - self.reply_handlers[msgno] = handle_reply - if force_flush == True: - self.send_with_producer(FrameProducer(self, 'MSG', msgno, None, - payload)) - else: - self.session.push_with_producer(FrameProducer(self, 'MSG', msgno, None, payload)) - - return msgno - - def send_rpy(self, msgno, payload): - """Send a RPY frame to the peer. - - @param msgno: The number of the message this reply is in reference to - @param payload: The message payload (a `Payload` instance) - """ - self.send_with_producer(FrameProducer(self, 'RPY', msgno, None, - payload)) - - def send_err(self, msgno, payload): - """Send an ERR frame to the peer. - - @param msgno: The number of the message this reply is in reference to - @param payload: The message payload (a `Payload` instance) - """ - self.send_with_producer(FrameProducer(self, 'ERR', msgno, None, - payload)) - - def send_ans(self, msgno, payload): - """Send an ANS frame to the peer. - - @param msgno: The number of the message this reply is in reference to - @param payload: The message payload (a `Payload` instance) - @return: the answer number assigned to the answer - """ - ansnos = self.ansnos.setdefault(msgno, cycle_through(0, 2147483647)) - next_ansno = ansnos.next() - self.send_with_producer(FrameProducer(self, 'ANS', msgno, - next_ansno, payload)) - return next_ansno - - def send_nul(self, msgno): - """Send a NUL frame to the peer. - - @param msgno: The number of the message this reply is in reference to - """ - self.send_with_producer(FrameProducer(self, 'NUL', msgno)) - del self.ansnos[msgno] # dealloc answer numbers for the message - - def send_with_producer(self, fp): - """Sends a message contained in the given FrameProducer to the peer, - ensuring the message is flushed before continuing. - """ - # push with producer seems to send the first frame out the door - self.session.push_with_producer(fp) - # if there are any more, make sure they get out as well. - if not fp.done: - while not fp.done: - asyncore.loop(count=1) - # make sure to flush the last bit. - asyncore.loop(count=1) - -class ProfileHandler(object): - """Abstract base class for handlers of specific BEEP profiles. - - Concrete subclasses need to at least implement the `handle_msg()` method, - and may override any of the others. - """ - - def __init__(self, channel): - """Create the profile.""" - self.session = channel.session - self.channel = channel - - def handle_connect(self): - """Called when the channel this profile is associated with is - initially started.""" - - def handle_disconnect(self): - """Called when the channel this profile is associated with is closed.""" - - def handle_msg(self, msgno, payload): - """Handle a MSG frame. - - @param msgno: The message identifier - @param payload: The C{Payload} of the message - """ - raise NotImplementedError - - def handle_rpy(self, msgno, payload): - """Handle a RPY frame. - - @param msgno: The identifier of the referenced message - @param payload: The C{Payload} of the message - """ - pass - - def handle_err(self, msgno, payload): - """Handle an ERR frame. - - @param msgno: The identifier of the referenced message - @param payload: The C{Payload} of the message - """ - pass - - def handle_ans(self, msgno, ansno, payload): - """Handle an ANS frame. - - @param msgno: The identifier of the referenced message - @param ansno: The answer number - @param payload: The C{Payload} of the message - """ - pass - - def handle_nul(self, msgno): - """Handle a NUL frame. - - @param msgno: The identifier of the referenced message - """ - pass - - -class ManagementProfileHandler(ProfileHandler): - """Implementation of the BEEP management profile.""" - - def handle_connect(self): - """Send a greeting reply directly after connecting to the peer.""" - profile_uris = self.session.profiles.keys() - log.debug('Send greeting with profiles: %s', profile_uris) - xml = xmlio.Element('greeting')[ - [xmlio.Element('profile', uri=uri) for uri in profile_uris] - ] - self.channel.send_rpy(0, Payload(xml)) - - def handle_msg(self, msgno, payload): - """Handle an incoming message.""" - assert payload and payload.content_type == BEEP_XML - elem = xmlio.parse(payload.body) - - if elem.name == 'start': - channelno = int(elem.attr['number']) - if channelno in self.session.channels: - raise ProtocolError(550, 'Channel already in use') - for profile in elem.children('profile'): - if profile.attr['uri'] in self.session.profiles: - log.debug('Start channel %s for profile <%s>', - elem.attr['number'], profile.attr['uri']) - channel = Channel(self.session, channelno, - self.session.profiles[profile.attr['uri']]) - self.session.channels[channelno] = channel - xml = xmlio.Element('profile', uri=profile.attr['uri']) - self.channel.send_rpy(msgno, Payload(xml)) - return - raise ProtocolError(550, - 'None of the requested profiles is supported') - - elif elem.name == 'close': - channelno = int(elem.attr['number']) - if not channelno in self.session.channels: - raise ProtocolError(550, 'Channel not open') - if channelno == 0: - if len(self.session.channels) > 1: - raise ProtocolError(550, 'Other channels still open') - if self.session.channels[channelno].msgnos: - raise ProtocolError(550, 'Channel waiting for replies') - self.session.channels[channelno].close() - self.channel.send_rpy(msgno, Payload(xmlio.Element('ok'))) - if not self.session.channels: - self.session.close() - - def handle_rpy(self, msgno, payload): - """Handle a positive reply.""" - if payload.content_type == BEEP_XML: - elem = xmlio.parse(payload.body) - if elem.name == 'greeting': - if isinstance(self.session, Initiator): - profiles = [p.attr['uri'] for p in elem.children('profile')] - self.session.greeting_received(profiles) - - def handle_err(self, msgno, payload): - """Handle a negative reply.""" - # Probably an error on connect, because other errors should get handled - # by the corresponding callbacks - # TODO: Terminate the session, I guess - if payload.content_type == BEEP_XML: - raise ProtocolError.from_xml(payload.body) - - def send_close(self, channelno=0, code=200, handle_ok=None, - handle_error=None): - """Send a request to close a channel to the peer.""" - def handle_reply(cmd, msgno, ansno, payload): - if cmd == 'RPY': - log.debug('Channel %d closed', channelno) - self.session.channels[channelno].close() - if not self.session.channels: - self.session.close() - if handle_ok is not None: - handle_ok() - elif cmd == 'ERR': - error = ProtocolError.from_xml(payload.body) - log.debug('Peer refused to start channel %d: %s (%d)', - channelno, error.message, error.code) - if handle_error is not None: - handle_error(error.code, error.message) - - log.debug('Requesting closure of channel %d', channelno) - xml = xmlio.Element('close', number=channelno, code=code) - return self.channel.send_msg(Payload(xml), handle_reply, True) - - def send_start(self, profiles, handle_ok=None, handle_error=None): - """Send a request to start a new channel to the peer. - - @param profiles: A list of profiles to request for the channel, each - element being an instance of a L{ProfileHandler} subclass - @param handle_ok: An optional callback function that will be invoked - when the channel has been successfully started - @param handle_error: An optional callback function that will be invoked - when the peer refuses to start the channel - """ - channelno = self.session.channelno.next() - def handle_reply(cmd, msgno, ansno, payload): - if cmd == 'RPY': - elem = xmlio.parse(payload.body) - for cls in [p for p in profiles if p.URI == elem.attr['uri']]: - log.debug('Channel %d started with profile %s', channelno, - elem.attr['uri']) - self.session.channels[channelno] = Channel(self.session, - channelno, cls) - break - if handle_ok is not None: - handle_ok(channelno, elem.attr['uri']) - elif cmd == 'ERR': - elem = xmlio.parse(payload.body) - text = elem.gettext() - code = int(elem.attr['code']) - log.debug('Peer refused to start channel %d: %s (%d)', - channelno, text, code) - if handle_error is not None: - handle_error(code, text) - - log.debug('Requesting start of channel %d with profiles %s', channelno, - [profile.URI for profile in profiles]) - xml = xmlio.Element('start', number=channelno)[ - [xmlio.Element('profile', uri=profile.URI) for profile in profiles] - ] - return self.channel.send_msg(Payload(xml), handle_reply, True) - - -class Payload(object): - """MIME message for transmission as payload with BEEP.""" - - def __init__(self, data=None, content_type=BEEP_XML, - content_disposition=None, content_encoding=None): - """Initialize the payload object. - - @param data: The body of the MIME message. This can be either: - - a string, - - a file-like object providing a C{read()} function, - - an XML element, or - - C{None} - @param content_type: the MIME type of the payload - @param content_disposition: the filename for disposition of the data - @param content_encoding: the encoding of the data - """ - self._hdr_buf = None - self.content_type = content_type - self.content_disposition = content_disposition - self.content_encoding = content_encoding - - if data is None: - data = '' - if isinstance(data, (xmlio.Element, xmlio.ParsedElement)): - self.body = StringIO(str(data)) - elif isinstance(data, basestring): - self.body = StringIO(data) - else: - assert hasattr(data, 'read'), \ - 'Payload data %s must provide a `read` method' % data - self.body = data - - def read(self, size=None): - """Return the specified range of the MIME message. - - @param size: the number of bytes to read - """ - if self._hdr_buf is None: - hdrs = [] - if self.content_type: - hdrs.append('Content-Type: ' + self.content_type) - if self.content_disposition: - hdrs.append('Content-Disposition: ' + self.content_disposition) - if self.content_encoding: - hdrs.append('Content-Transfer-Encoding: ' + - self.content_encoding) - hdrs.append('') - self._hdr_buf = '\r\n'.join(hdrs) + '\r\n' - if isinstance(self._hdr_buf, unicode): - self._hdr_buf = self._hdr_buf.encode('utf8') - - ret_buf = '' - if len(self._hdr_buf): - if size is not None and len(self._hdr_buf) > size: - ret_buf = self._hdr_buf[:size] - self._hdr_buf = self._hdr_buf[size:] - return ret_buf - ret_buf = self._hdr_buf - self._hdr_buf = '' - - if not self.body.closed: - ret_buf = ret_buf + self.body.read((size or -1) - len(ret_buf)) - if size is None or len(ret_buf) < size: - self.body.close() - - return ret_buf - - def parse(cls, string): - """Create a C{Payload} object from the given string data. - - @param string: The string containing the MIME message. - """ - message = email.message_from_string(string) - content_type = message.get('Content-Type') - content_disposition = message.get('Content-Disposition') - content_encoding = message.get('Content-Transfer-Encoding') - return Payload(message.get_payload(), content_type, - content_disposition, content_encoding) - parse = classmethod(parse) - - -class FrameProducer(object): - """Internal class that emits the frames of a BEEP message, based on the - C{asynchat} C{push_with_producer()} protocol. - """ - def __init__(self, channel, cmd, msgno, ansno=None, payload=None): - """Initialize the frame producer. - - @param channel: The channel the message is to be sent on - @param cmd: The BEEP command/keyword (MSG, RPY, ERR, ANS or NUL) - @param msgno: The message number - @param ansno: The answer number (only for ANS messages) - @param payload: The message payload - @type payload: an instance of L{Payload} - """ - self.session = channel.session - self.channel = channel - self.cmd = cmd - self.msgno = msgno - self.ansno = ansno - - self.payload = payload - self.done = False - - def more(self): - """Called by `async_chat` when the producer has been pushed on the - producer FIFO and the channel is about to write.""" - if self.done: - return '' - - if self.payload: - data = self.payload.read(self.channel.windowsize) - if len(data) < self.channel.windowsize: - self.done = True - else: - data = '' - self.done = True - - headerbits = [self.cmd, self.channel.channelno, self.msgno, - self.done and '.' or '*', self.channel.seqno[1].value, - len(data)] - if self.cmd == 'ANS': - assert self.ansno is not None - headerbits.append(self.ansno) - header = ' '.join([str(bit) for bit in headerbits]) - log.debug('Sending frame [%s]', header) - frame = '\r\n'.join((header, data + 'END', '')) - self.channel.seqno[1] += len(data) - - return frame - - -def cycle_through(start, stop=None, step=1): - """Utility generator that cycles through a defined range of numbers.""" - if stop is None: - stop = start - start = 0 - cur = start - while True: - yield cur - cur += step - if cur > stop: - cur = start - - -class SerialNumber(object): - """Serial number (RFC 1982).""" - def __init__(self, limit=4294967295L): - self.value = 0L - self.limit = limit - - def __ne__(self, num): - return self.value != num - - def __eq__(self, num): - return self.value == num - - def __iadd__(self, num): - self.value += num - if self.value > self.limit: - self.value -= self.limit - return self diff --git a/bitten/util/tests/__init__.py b/bitten/util/tests/__init__.py --- a/bitten/util/tests/__init__.py +++ b/bitten/util/tests/__init__.py @@ -11,11 +11,10 @@ import unittest from bitten.util import xmlio -from bitten.util.tests import beep, md5sum +from bitten.util.tests import md5sum def suite(): suite = unittest.TestSuite() - suite.addTest(beep.suite()) suite.addTest(md5sum.suite()) suite.addTest(doctest.DocTestSuite(xmlio)) return suite diff --git a/bitten/util/tests/beep.py b/bitten/util/tests/beep.py deleted file mode 100644 --- a/bitten/util/tests/beep.py +++ /dev/null @@ -1,607 +0,0 @@ -# -*- coding: utf-8 -*- -# -# Copyright (C) 2005 Christopher Lenz -# All rights reserved. -# -# This software is licensed as described in the file COPYING, which -# you should have received as part of this distribution. The terms -# are also available at http://bitten.cmlenz.net/wiki/License. - -import unittest - -from bitten.util import beep, xmlio - - -class MockSession(beep.Initiator): - - def __init__(self): - self.closed = False - self.profiles = {} - self.sent_messages = [] - self.channelno = beep.cycle_through(1, 2147483647, step=2) - self.channels = {0: beep.Channel(self, 0, - beep.ManagementProfileHandler)} - del self.sent_messages[0] # Clear out the management greeting - self.channels[0].seqno = [beep.SerialNumber(), beep.SerialNumber()] - - def close(self): - self.closed = True - - def push_with_producer(self, producer): - assert not self.closed - while True: - frame = producer.more() - if not frame: - break - header, rest = frame.split('\r\n', 1) - header = header.split(' ') - cmd = header[0].upper() - channel = int(header[1]) - msgno = int(header[2]) - more = header[3] == '*' - seqno = int(header[4]) - size = int(header[5]) - ansno = None - if cmd == 'ANS': - ansno = int(header[6]) - if size == 0: - payload = '' - else: - payload = rest[:size] - rest = rest[size:] - self.sent_messages.append((cmd, channel, msgno, more, seqno, ansno, - payload.strip())) - assert rest == 'END\r\n' - - -class MockProfileHandler(object): - URI = 'http://example.com/mock' - - def __init__(self, channel): - self.handled_messages = [] - self.init_elem = None - - def handle_connect(self, init_elem=None): - self.init_elem = init_elem - - def handle_disconnect(self): - pass - - def handle_msg(self, msgno, message): - text = message.read().strip() - self.handled_messages.append(('MSG', msgno, text, None)) - - def handle_rpy(self, msgno, message): - text = message.read().strip() - self.handled_messages.append(('RPY', msgno, text, None)) - - def handle_err(self, msgno, message): - text = message.read().strip() - self.handled_messages.append(('ERR', msgno, text, None)) - - def handle_ans(self, msgno, ansno, message): - text = message.read().strip() - self.handled_messages.append(('ANS', msgno, text, ansno)) - - def handle_nul(self, msgno): - self.handled_messages.append(('NUL', msgno, '', None)) - - -class SessionTestCase(unittest.TestCase): - """Unit tests for the `beep.Session` class.""" - - def setUp(self): - self.session = beep.Session() - - def test_malformed_frame_invalid_keyword(self): - self.session.collect_incoming_data('XYZ 0 0 . 0 0') - self.session.found_terminator() - self.assertRaises(beep.TerminateSession, self.session.found_terminator) - - def test_malformed_frame_invalid_parameter1(self): - self.session.collect_incoming_data('MSG x y . z 0') - self.session.found_terminator() - self.assertRaises(beep.TerminateSession, self.session.found_terminator) - - def test_malformed_frame_invalid_parameter2(self): - self.session.collect_incoming_data('MSG 0 0 + 0 0') - self.session.found_terminator() - self.assertRaises(beep.TerminateSession, self.session.found_terminator) - - def test_malformed_frame_missing_size_param(self): - self.session.collect_incoming_data('MSG 0 0 . 0') - self.session.found_terminator() - self.assertRaises(beep.TerminateSession, self.session.found_terminator) - - def test_malformed_frame_missing_ansno_param(self): - self.session.collect_incoming_data('ANS 0 0 . 0 0') - self.session.found_terminator() - self.assertRaises(beep.TerminateSession, self.session.found_terminator) - - -class ChannelTestCase(unittest.TestCase): - - def setUp(self): - self.session = MockSession() - - def test_handle_single_msg_frame(self): - """ - Verify that the channel correctly passes a single frame MSG to the - profile. - """ - channel = beep.Channel(self.session, 0, MockProfileHandler) - channel.handle_data_frame('MSG', 0, False, 0, None, 'foo bar') - self.assertEqual(('MSG', 0, 'foo bar', None), - channel.profile.handled_messages[0]) - - def test_handle_segmented_msg_frames(self): - """ - Verify that the channel combines two segmented messages and passed the - recombined message to the profile. - """ - channel = beep.Channel(self.session, 0, MockProfileHandler) - channel.handle_data_frame('MSG', 0, True, 0, None, 'foo ') - channel.handle_data_frame('MSG', 0, False, 4, None, 'bar') - self.assertEqual(('MSG', 0, 'foo bar', None), - channel.profile.handled_messages[0]) - - def test_handle_out_of_sync_frame(self): - """ - Verify that the channel detects out-of-sync frames and bails. - """ - channel = beep.Channel(self.session, 0, MockProfileHandler) - channel.handle_data_frame('MSG', 0, False, 0L, None, 'foo bar') - # The next sequence number should be 8; send 12 instead - self.assertRaises(beep.TerminateSession, channel.handle_data_frame, - 'MSG', 0, False, 12L, None, 'foo baz') - - def test_send_single_frame_message(self): - """ - Verify that the channel passes a sent message up to the session for - transmission with the correct parameters. Also assert that the - corresponding message number (0) is reserved. - """ - channel = beep.Channel(self.session, 0, MockProfileHandler) - msgno = channel.send_msg(beep.Payload('foo bar', None)) - self.assertEqual(('MSG', 0, msgno, False, 0L, None, 'foo bar'), - self.session.sent_messages[0]) - assert msgno in channel.msgnos - - def test_send_frames_seqno_incrementing(self): - """ - Verify that the sequence numbers of outgoing frames are incremented as - expected. - """ - channel = beep.Channel(self.session, 0, MockProfileHandler) - channel.send_msg(beep.Payload('foo bar', None)) - channel.send_rpy(0, beep.Payload('nil', None)) - self.assertEqual(('MSG', 0, 0, False, 0, None, 'foo bar'), - self.session.sent_messages[0]) - self.assertEqual(('RPY', 0, 0, False, 9, None, 'nil'), - self.session.sent_messages[1]) - - def test_send_message_msgno_incrementing(self): - """ - Verify that the message number is incremented for subsequent outgoing - messages. - """ - channel = beep.Channel(self.session, 0, MockProfileHandler) - msgno = channel.send_msg(beep.Payload('foo bar', None)) - assert msgno == 0 - self.assertEqual(('MSG', 0, msgno, False, 0, None, 'foo bar'), - self.session.sent_messages[0]) - assert msgno in channel.msgnos - msgno = channel.send_msg(beep.Payload('foo baz', None)) - assert msgno == 1 - self.assertEqual(('MSG', 0, msgno, False, 9, None, 'foo baz'), - self.session.sent_messages[1]) - assert msgno in channel.msgnos - - def test_send_reply(self): - """ - Verify that sending an ANS message is processed correctly. - """ - channel = beep.Channel(self.session, 0, MockProfileHandler) - channel.send_rpy(0, beep.Payload('foo bar', None)) - self.assertEqual(('RPY', 0, 0, False, 0L, None, 'foo bar'), - self.session.sent_messages[0]) - - def test_message_and_reply(self): - """ - Verify that a message number is deallocated after a final "RPY" reply - has been received. - """ - channel = beep.Channel(self.session, 0, MockProfileHandler) - msgno = channel.send_msg(beep.Payload('foo bar', None)) - self.assertEqual(('MSG', 0, msgno, False, 0L, None, 'foo bar'), - self.session.sent_messages[0]) - assert msgno in channel.msgnos - channel.handle_data_frame('RPY', msgno, False, 0, None, '42') - self.assertEqual(('RPY', msgno, '42', None), - channel.profile.handled_messages[0]) - assert msgno not in channel.msgnos - - def test_message_and_error(self): - """ - Verify that a message number is deallocated after a final "ERR" reply - has been received. - """ - channel = beep.Channel(self.session, 0, MockProfileHandler) - msgno = channel.send_msg(beep.Payload('foo bar', None)) - self.assertEqual(('MSG', 0, msgno, False, 0L, None, 'foo bar'), - self.session.sent_messages[0]) - assert msgno in channel.msgnos - channel.handle_data_frame('ERR', msgno, False, 0, None, '42') - self.assertEqual(('ERR', msgno, '42', None), - channel.profile.handled_messages[0]) - assert msgno not in channel.msgnos - - def test_message_and_ans_nul(self): - """ - Verify that a message number is deallocated after a final "NUL" reply - has been received. - """ - channel = beep.Channel(self.session, 0, MockProfileHandler) - msgno = channel.send_msg(beep.Payload('foo bar', None)) - self.assertEqual(('MSG', 0, msgno, False, 0L, None, 'foo bar'), - self.session.sent_messages[0]) - assert msgno in channel.msgnos - channel.handle_data_frame('ANS', msgno, False, 0, 0, '42') - self.assertEqual(('ANS', msgno, '42', 0), - channel.profile.handled_messages[0]) - assert msgno in channel.msgnos - channel.handle_data_frame('NUL', msgno, False, 2, None, '42') - self.assertEqual(('NUL', msgno, '', None), - channel.profile.handled_messages[1]) - assert msgno not in channel.msgnos - - def test_send_error(self): - """ - Verify that sending an ERR message is processed correctly. - """ - channel = beep.Channel(self.session, 0, MockProfileHandler) - channel.send_err(0, beep.Payload('oops', None)) - self.assertEqual(('ERR', 0, 0, False, 0L, None, 'oops'), - self.session.sent_messages[0]) - - def test_send_answers(self): - """ - Verify that sending an ANS message is processed correctly. - """ - channel = beep.Channel(self.session, 0, MockProfileHandler) - ansno = channel.send_ans(0, beep.Payload('foo bar', None)) - assert ansno == 0 - self.assertEqual(('ANS', 0, 0, False, 0, ansno, 'foo bar'), - self.session.sent_messages[0]) - assert 0 in channel.ansnos - ansno = channel.send_ans(0, beep.Payload('foo baz', None)) - assert ansno == 1 - self.assertEqual(('ANS', 0, 0, False, 9, ansno, 'foo baz'), - self.session.sent_messages[1]) - assert 0 in channel.ansnos - channel.send_nul(0) - self.assertEqual(('NUL', 0, 0, False, 18, None, ''), - self.session.sent_messages[2]) - assert 0 not in channel.ansnos - - -class ManagementProfileHandlerTestCase(unittest.TestCase): - - def setUp(self): - self.session = MockSession() - self.channel = self.session.channels[0] - self.profile = self.channel.profile - - def test_send_greeting(self): - """ - Verify that the management profile sends a greeting reply when - initialized. - """ - self.profile.handle_connect() - self.assertEqual(1, len(self.session.sent_messages)) - xml = xmlio.Element('greeting') - message = beep.Payload(xml).read() - self.assertEqual(('RPY', 0, 0, False, 0, None, message), - self.session.sent_messages[0]) - - def test_send_greeting_with_profile(self): - """ - Verify that the management profile sends a greeting with a list of - supported profiles reply when initialized. - """ - self.session.profiles[MockProfileHandler.URI] = MockProfileHandler - self.profile.handle_connect() - self.assertEqual(1, len(self.session.sent_messages)) - xml = xmlio.Element('greeting')[ - xmlio.Element('profile', uri=MockProfileHandler.URI) - ] - message = beep.Payload(xml).read() - self.assertEqual(('RPY', 0, 0, False, 0, None, message), - self.session.sent_messages[0]) - - def test_handle_greeting(self): - """ - Verify that the management profile calls the greeting_received() method - of the initiator session. - """ - def greeting_received(profiles): - greeting_received.called = True - self.assertEqual(['test'], profiles) - greeting_received.called = False - self.session.greeting_received = greeting_received - xml = xmlio.Element('greeting')[xmlio.Element('profile', uri='test')] - message = beep.Payload(xml).read() - self.channel.handle_data_frame('RPY', 0, False, 0L, None, message) - assert greeting_received.called - - def test_handle_start(self): - self.session.profiles[MockProfileHandler.URI] = MockProfileHandler - xml = xmlio.Element('start', number=2)[ - xmlio.Element('profile', uri=MockProfileHandler.URI), - xmlio.Element('profile', uri='http://example.com/bogus') - ] - self.profile.handle_msg(0, beep.Payload(xml)) - - assert 2 in self.session.channels - xml = xmlio.Element('profile', uri=MockProfileHandler.URI) - message = beep.Payload(xml).read() - self.assertEqual(('RPY', 0, 0, False, 0, None, message), - self.session.sent_messages[0]) - - def test_handle_start_unsupported_profile(self): - self.session.profiles[MockProfileHandler.URI] = MockProfileHandler - xml = xmlio.Element('start', number=2)[ - xmlio.Element('profile', uri='http://example.com/foo'), - xmlio.Element('profile', uri='http://example.com/bar') - ] - self.assertRaises(beep.ProtocolError, self.profile.handle_msg, 0, - beep.Payload(xml)) - assert 2 not in self.session.channels - - def test_handle_start_channel_in_use(self): - self.session.channels[2] = beep.Channel(self.session, 2, - MockProfileHandler) - orig_profile = self.session.channels[2].profile - self.session.profiles[MockProfileHandler.URI] = MockProfileHandler - xml = xmlio.Element('start', number=2)[ - xmlio.Element('profile', uri=MockProfileHandler.URI) - ] - self.assertRaises(beep.ProtocolError, self.profile.handle_msg, 0, - beep.Payload(xml)) - assert self.session.channels[2].profile is orig_profile - - def test_handle_close(self): - self.session.channels[1] = beep.Channel(self.session, 1, - MockProfileHandler) - xml = xmlio.Element('close', number=1, code=200) - self.profile.handle_msg(0, beep.Payload(xml)) - - assert 1 not in self.session.channels - xml = xmlio.Element('ok') - message = beep.Payload(xml).read() - self.assertEqual(('RPY', 0, 0, False, 0, None, message), - self.session.sent_messages[0]) - - def test_handle_close_session(self): - xml = xmlio.Element('close', number=0, code=200) - self.profile.handle_msg(0, beep.Payload(xml)) - - assert 1 not in self.session.channels - xml = xmlio.Element('ok') - message = beep.Payload(xml).read() - self.assertEqual(('RPY', 0, 0, False, 0, None, message), - self.session.sent_messages[0]) - assert self.session.closed - - def test_handle_close_channel_not_open(self): - xml = xmlio.Element('close', number=1, code=200) - self.assertRaises(beep.ProtocolError, self.profile.handle_msg, 0, - beep.Payload(xml)) - - def test_handle_close_channel_busy(self): - self.session.channels[1] = beep.Channel(self.session, 1, - MockProfileHandler) - self.session.channels[1].send_msg(beep.Payload('test')) - assert self.session.channels[1].msgnos - - xml = xmlio.Element('close', number=1, code=200) - self.assertRaises(beep.ProtocolError, self.profile.handle_msg, 0, - beep.Payload(xml)) - - def test_handle_close_session_busy(self): - self.session.channels[1] = beep.Channel(self.session, 1, - MockProfileHandler) - - xml = xmlio.Element('close', number=0, code=200) - self.assertRaises(beep.ProtocolError, self.profile.handle_msg, 0, - beep.Payload(xml)) - - def test_send_start(self): - """ - Verify that a request is sent correctly. - """ - self.profile.send_start([MockProfileHandler]) - xml = xmlio.Element('start', number="1")[ - xmlio.Element('profile', uri=MockProfileHandler.URI) - ] - message = beep.Payload(xml).read() - self.assertEqual(('MSG', 0, 0, False, 0, None, message), - self.session.sent_messages[0]) - - def test_send_start_ok(self): - """ - Verify that a positive reply to a request is handled correctly, - and the channel is created. - """ - self.profile.send_start([MockProfileHandler]) - xml = xmlio.Element('profile', uri=MockProfileHandler.URI) - message = beep.Payload(xml).read() - self.channel.handle_data_frame('RPY', 0, False, 0L, None, message) - assert isinstance(self.session.channels[1].profile, MockProfileHandler) - - def test_send_start_error(self): - """ - Verify that a negative reply to a request is handled correctly, - and no channel gets created. - """ - self.profile.send_start([MockProfileHandler]) - xml = xmlio.Element('error', code=500)['ouch'] - message = beep.Payload(xml).read() - self.channel.handle_data_frame('ERR', 0, False, 0L, None, message) - assert 1 not in self.session.channels - - def test_send_start_ok_with_callback(self): - """ - Verify that user-supplied callback for positive replies is invoked - when a reply is received in response to a request. - """ - def handle_ok(channelno, profile_uri): - self.assertEqual(1, channelno) - self.assertEqual(MockProfileHandler.URI, profile_uri) - handle_ok.called = True - handle_ok.called = False - def handle_error(code, text): - handle_error.called = True - handle_error.called = False - self.profile.send_start([MockProfileHandler], handle_ok=handle_ok, - handle_error=handle_error) - - xml = xmlio.Element('profile', uri=MockProfileHandler.URI) - message = beep.Payload(xml).read() - self.channel.handle_data_frame('RPY', 0, False, 0L, None, message) - assert isinstance(self.session.channels[1].profile, MockProfileHandler) - assert handle_ok.called - assert not handle_error.called - - def test_send_start_error_with_callback(self): - """ - Verify that user-supplied callback for negative replies is invoked - when an error is received in response to a request. - """ - def handle_ok(channelno, profile_uri): - handle_ok.called = True - handle_ok.called = False - def handle_error(code, text): - self.assertEqual(500, code) - self.assertEqual('ouch', text) - handle_error.called = True - handle_error.called = False - self.profile.send_start([MockProfileHandler], handle_ok=handle_ok, - handle_error=handle_error) - - xml = xmlio.Element('error', code=500)['ouch'] - message = beep.Payload(xml).read() - self.channel.handle_data_frame('ERR', 0, False, 0L, None, message) - assert 1 not in self.session.channels - assert not handle_ok.called - assert handle_error.called - - def test_send_close(self): - """ - Verify that a request is sent correctly. - """ - self.profile.send_close(1, code=200) - xml = xmlio.Element('close', number=1, code=200) - message = beep.Payload(xml).read() - self.assertEqual(('MSG', 0, 0, False, 0, None, message), - self.session.sent_messages[0]) - - def test_send_close_ok(self): - """ - Verify that a positive reply to a request is handled correctly, - and the channel is closed. - """ - self.session.channels[1] = beep.Channel(self.session, 1, - MockProfileHandler) - self.profile.send_close(1, code=200) - - xml = xmlio.Element('ok') - message = beep.Payload(xml).read() - self.channel.handle_data_frame('RPY', 0, False, 0L, None, message) - assert 1 not in self.session.channels - - def test_send_close_session_ok(self): - """ - Verify that a positive reply to a request is handled correctly, - and the channel is closed. - """ - self.profile.send_close(0, code=200) - - xml = xmlio.Element('ok') - message = beep.Payload(xml).read() - self.channel.handle_data_frame('RPY', 0, False, 0L, None, message) - assert 0 not in self.session.channels - assert self.session.closed - - def test_send_close_error(self): - """ - Verify that a negative reply to a request is handled correctly, - and the channel stays open. - """ - self.session.channels[1] = beep.Channel(self.session, 1, - MockProfileHandler) - self.profile.send_close(1, code=200) - - xml = xmlio.Element('error', code=500)['ouch'] - message = beep.Payload(xml).read() - self.channel.handle_data_frame('ERR', 0, False, 0L, None, message) - assert 1 in self.session.channels - - def test_send_close_ok_with_callback(self): - """ - Verify that user-supplied callback for positive replies is invoked - when an reply is received in response to a request. - """ - self.session.channels[1] = beep.Channel(self.session, 1, - MockProfileHandler) - def handle_ok(): - handle_ok.called = True - handle_ok.called = False - def handle_error(code, text): - handle_error.called = True - handle_error.called = False - self.profile.send_close(1, code=200, handle_ok=handle_ok, - handle_error=handle_error) - - xml = xmlio.Element('profile', uri=MockProfileHandler.URI) - message = beep.Payload(xml).read() - self.channel.handle_data_frame('RPY', 0, False, 0L, None, message) - assert 1 not in self.session.channels - assert handle_ok.called - assert not handle_error.called - - def test_send_close_error_with_callback(self): - """ - Verify that user-supplied callback for negative replies is invoked - when an error is received in response to a request. - """ - self.session.channels[1] = beep.Channel(self.session, 1, - MockProfileHandler) - def handle_ok(channelno, profile_uri): - handle_ok.called = True - handle_ok.called = False - def handle_error(code, text): - self.assertEqual(500, code) - self.assertEqual('ouch', text) - handle_error.called = True - handle_error.called = False - self.profile.send_close(1, code=200, handle_ok=handle_ok, - handle_error=handle_error) - - xml = xmlio.Element('error', code=500)['ouch'] - message = beep.Payload(xml).read() - self.channel.handle_data_frame('ERR', 0, False, 0L, None, message) - assert 1 in self.session.channels - assert not handle_ok.called - assert handle_error.called - - -def suite(): - suite = unittest.TestSuite() - suite.addTest(unittest.makeSuite(SessionTestCase, 'test')) - suite.addTest(unittest.makeSuite(ChannelTestCase, 'test')) - suite.addTest(unittest.makeSuite(ManagementProfileHandlerTestCase, 'test')) - return suite - -if __name__ == '__main__': - unittest.main(defaultTest='suite') diff --git a/setup.py b/setup.py --- a/setup.py +++ b/setup.py @@ -37,6 +37,7 @@ ], 'trac.plugins': [ 'bitten.main = bitten.trac_ext.main', + 'bitten.master = bitten.master', 'bitten.web_ui = bitten.trac_ext.web_ui', 'bitten.summarizers = bitten.trac_ext.summarizers', 'bitten.charts = bitten.trac_ext.charts' @@ -56,6 +57,9 @@ NS + 'python#pylint = bitten.build.pythontools:pylint', NS + 'python#trace = bitten.build.pythontools:trace', NS + 'python#unittest = bitten.build.pythontools:unittest', + NS + 'svn#checkout = bitten.build.svntools:checkout', + NS + 'svn#export = bitten.build.svntools:export', + NS + 'svn#update = bitten.build.svntools:update', NS + 'xml#transform = bitten.build.xmltools:transform' ] },