Mercurial > bitten > bitten-test
diff bitten/master.py @ 392:026d9aa41b85
Merged HTTP branch into trunk.
author | cmlenz |
---|---|
date | Fri, 03 Aug 2007 08:48:47 +0000 |
parents | 0df178e07fdb |
children | a10942252ebc |
line wrap: on
line diff
--- a/bitten/master.py +++ b/bitten/master.py @@ -7,17 +7,13 @@ # you should have received as part of this distribution. The terms # are also available at http://bitten.cmlenz.net/wiki/License. -"""Build master implementation. - -This module is runnable as a script to launch the build master. The build -master starts a single process that handles connections to any number of build -slaves. -""" +"""Build master implementation.""" import calendar from datetime import datetime, timedelta import logging import os +import re try: set except NameError: @@ -25,295 +21,177 @@ import sys import time +from trac.config import BoolOption, IntOption +from trac.core import * from trac.env import Environment +from trac.web import IRequestHandler, HTTPBadRequest, HTTPConflict, \ + HTTPForbidden, HTTPMethodNotAllowed, HTTPNotFound, \ + RequestDone + from bitten.model import BuildConfig, Build, BuildStep, BuildLog, Report from bitten.queue import BuildQueue +from bitten.recipe import Recipe from bitten.trac_ext.main import BuildSystem -from bitten.util import beep, xmlio - -log = logging.getLogger('bitten.master') - -DEFAULT_CHECK_INTERVAL = 120 # 2 minutes - - -class Master(beep.Listener): - """BEEP listener implementation for the build master.""" - - def __init__(self, envs, ip, port, build_all=False, - adjust_timestamps=False, - check_interval=DEFAULT_CHECK_INTERVAL): - beep.Listener.__init__(self, ip, port) - self.profiles[OrchestrationProfileHandler.URI] = \ - OrchestrationProfileHandler - self.adjust_timestamps = adjust_timestamps - self.check_interval = check_interval - self.handlers = {} # Map of connected slaves keyed by name - - self.queues = [] - for env in envs: - self.queues.append(BuildQueue(env, build_all=build_all)) - - self.schedule(self.check_interval, self._enqueue_builds) - - def close(self): - for queue in self.queues: - queue.reset_orphaned_builds() - beep.Listener.close(self) - - def _enqueue_builds(self): - self.schedule(self.check_interval, self._enqueue_builds) - - for queue in self.queues: - queue.populate() - - self.schedule(self.check_interval * 0.2, self._initiate_builds) - - def _initiate_builds(self): - available_slaves = set([name for name in self.handlers - if not self.handlers[name].building]) - for idx, queue in enumerate(self.queues[:]): - build, slave = queue.get_next_pending_build(available_slaves) - if build: - self.handlers[slave].send_initiation(queue, build) - available_slaves.discard(slave) - self.queues.append(self.queues.pop(idx)) # Round robin - - def register(self, handler): - if handler.name in self.handlers: - # The slave is for some reason still registered... this shouldn't - # happen in theory, but apparently it does in the real world (see - # #106). We simply unregister it before trying to register it - # again. - self.unregister(handler) - - any_match = False - for queue in self.queues: - if queue.register_slave(handler.name, handler.info): - any_match = True - - if not any_match: - log.warning('Slave %s does not match any of the configured target ' - 'platforms', handler.name) - return False - - self.handlers[handler.name] = handler - self.schedule(self.check_interval * 0.2, self._initiate_builds) - - log.info('Registered slave "%s"', handler.name) - return True - - def unregister(self, handler): - if handler.name not in self.handlers: - return - - for queue in self.queues: - if queue.unregister_slave(handler.name): - for build in list(Build.select(queue.env, slave=handler.name, - status=Build.IN_PROGRESS)): - handler._build_aborted(queue, build) - - del self.handlers[handler.name] - - log.info('Unregistered slave "%s"', handler.name) +from bitten.util import xmlio -class OrchestrationProfileHandler(beep.ProfileHandler): - """Handler for communication on the Bitten build orchestration profile from - the perspective of the build master. - - An instance of this class is associated with exactly one remote build slave. - """ - URI = 'http://bitten.cmlenz.net/beep/orchestration' - - def handle_connect(self): - self.master = self.session.listener - assert self.master - self.name = None - self.building = False - self.info = {} - - def handle_disconnect(self): - self.master.unregister(self) - - def handle_msg(self, msgno, payload): - assert payload.content_type == beep.BEEP_XML - elem = xmlio.parse(payload.body) - - if elem.name == 'register': - self.name = elem.attr['name'] - self.info[Build.IP_ADDRESS] = self.session.addr[0] - for child in elem.children(): - if child.name == 'platform': - self.info[Build.MACHINE] = child.gettext() - self.info[Build.PROCESSOR] = child.attr.get('processor') - elif child.name == 'os': - self.info[Build.OS_NAME] = child.gettext() - self.info[Build.OS_FAMILY] = child.attr.get('family') - self.info[Build.OS_VERSION] = child.attr.get('version') - elif child.name == 'package': - for name, value in child.attr.items(): - if name == 'name': - continue - self.info[child.attr['name'] + '.' + name] = value - - if not self.master.register(self): - raise beep.ProtocolError(550, 'Nothing for you to build here, ' - 'please move along') - - xml = xmlio.Element('ok') - self.channel.send_rpy(msgno, beep.Payload(xml)) +class BuildMaster(Component): + """BEEP listener implementation for the build master.""" - def send_initiation(self, queue, build): - log.info('Initiating build of %d ("%s" as of [%s]) on slave %s', - build.id, build.config, build.rev, self.name) - - build.slave = self.name - build.slave_info.update(self.info) - build.status = Build.IN_PROGRESS - build.update() - self.building = True - - config = BuildConfig.fetch(queue.env, build.config) - - def handle_reply(cmd, msgno, ansno, payload): - if cmd == 'ERR': - if payload.content_type == beep.BEEP_XML: - elem = xmlio.parse(payload.body) - if elem.name == 'error': - log.warning('Slave %s refused build request: %s (%d)', - self.name, elem.gettext(), - int(elem.attr['code'])) - self.building = False - self._build_aborted(queue, build) - return + implements(IRequestHandler) - elem = xmlio.parse(payload.body) - if elem.name != 'proceed': - raise beep.ProtocolError(500) + # Configuration options - snapshots = queue.snapshots[config.name] - snapshot = snapshots.get(build.rev) - if not snapshot: - # Request a snapshot for this build, and schedule a poll - # function that kicks off the snapshot transmission once the - # archive has been completely built - worker = snapshots.create(build.rev) - def _check_snapshot(): - worker.join(.5) - if worker.isAlive(): - self.master.schedule(2, _check_snapshot) - else: - if self.name not in self.master.handlers: - # The slave disconnected while we were building - # the archive - return - snapshot = snapshots.get(build.rev) - if snapshot is None: - log.error('Failed to create snapshot archive for ' - '%s@%s', config.path, build.rev) - return - self.send_snapshot(queue, build, snapshot) - _check_snapshot() - else: - self.send_snapshot(queue, build, snapshot) + adjust_timestamps = BoolOption('bitten', 'adjust_timestamps', False, doc= + """Whether the timestamps of builds should be adjusted to be close ' + to the timestamps of the corresponding changesets.""") + + build_all = BoolOption('bitten', 'build_all', False, doc= + """Whether to request builds of older revisions even if a younger + revision has already been built.""") + + slave_timeout = IntOption('bitten', 'slave_timeout', 3600, doc= + """The time in seconds after which a build is cancelled if the slave + does not report progress.""") + + # IRequestHandler methods + + def match_request(self, req): + match = re.match(r'/builds(?:/(\d+)(?:/(\w+)/([^/]+))?)?$', + req.path_info) + if match: + if match.group(1): + req.args['id'] = match.group(1) + req.args['collection'] = match.group(2) + req.args['member'] = match.group(3) + return True + + def process_request(self, req): + req.perm.assert_permission('BUILD_EXEC') + + if 'id' not in req.args: + if req.method != 'POST': + raise HTTPMethodNotAllowed('Method not allowed') + return self._process_build_creation(req) + + build = Build.fetch(self.env, req.args['id']) + if not build: + raise HTTPNotFound('No such build') + config = BuildConfig.fetch(self.env, build.config) + + if not req.args['collection']: + return self._process_build_initiation(req, config, build) + + if req.method != 'PUT': + raise HTTPMethodNotAllowed('Method not allowed') + + if req.args['collection'] == 'steps': + return self._process_build_step(req, config, build, + req.args['member']) + else: + raise HTTPNotFound('No such collection') + + def _process_build_creation(self, req): + queue = BuildQueue(self.env, build_all=self.build_all) + queue.populate() + + try: + elem = xmlio.parse(req.read()) + except xmlio.ParseError, e: + raise HTTPBadRequest('XML parser error') + + name = elem.attr['name'] + properties = {Build.IP_ADDRESS: req.remote_addr} + self.log.info('Build slave %r connected from %s', name, req.remote_addr) + + for child in elem.children(): + if child.name == 'platform': + properties[Build.MACHINE] = child.gettext() + properties[Build.PROCESSOR] = child.attr.get('processor') + elif child.name == 'os': + properties[Build.OS_NAME] = child.gettext() + properties[Build.OS_FAMILY] = child.attr.get('family') + properties[Build.OS_VERSION] = child.attr.get('version') + elif child.name == 'package': + for name, value in child.attr.items(): + if name == 'name': + continue + properties[child.attr['name'] + '.' + name] = value + + build = queue.get_build_for_slave(name, properties) + if not build: + req.send_response(204) + req.send_header('Content-Type', 'text/plain') + req.write('No pending builds') + raise RequestDone + + req.send_response(201) + req.send_header('Content-Type', 'text/plain') + req.send_header('Location', req.abs_href.builds(build.id)) + req.write('Build pending') + raise RequestDone + + def _process_build_initiation(self, req, config, build): + build.started = int(time.time()) + build.update() xml = xmlio.parse(config.recipe) - xml.attr['project'] = os.path.basename(queue.env.path) - self.channel.send_msg(beep.Payload(xml), - handle_reply=handle_reply, force_flush=True) - - def send_snapshot(self, queue, build, snapshot): - timestamp_delta = 0 - if self.master.adjust_timestamps: - d = datetime.now() - timedelta(seconds=self.master.check_interval) \ - - datetime.fromtimestamp(build.rev_time) - log.info('Warping timestamps by %s', d) - timestamp_delta = d.days * 86400 + d.seconds - - def handle_reply(cmd, msgno, ansno, payload): - if cmd == 'ERR': - if payload.content_type != beep.BEEP_XML: - raise beep.ProtocolError(500) - elem = xmlio.parse(payload.body) - if elem.name == 'error': - log.warning('Slave %s refused to start build: %s (%d)', - self.name, elem.gettext(), - int(elem.attr['code'])) - self.building = False - self._build_aborted(queue, build) - - elif cmd == 'ANS': - if payload.content_type != beep.BEEP_XML: - raise beep.ProtocolError(500) - elem = xmlio.parse(payload.body) + xml.attr['path'] = config.path + xml.attr['revision'] = build.rev + body = str(xml) - # verify that the build hasn't been modified out from - # under us-- we don't want to accept any build information - # from builds that have been invalidated or claimed by - # other slaves. - current_build = Build.fetch(queue.env, build.id) - if current_build.status != Build.IN_PROGRESS or \ - current_build.slave != self.name: - raise beep.ProtocolError(550, 'Build %s has been ' - 'invalidated, will not accept completed steps from ' - 'slave %s' % (build.id, self.name)) - - if elem.name == 'started': - self._build_started(queue, build, elem, timestamp_delta) - elif elem.name == 'step': - self._build_step_completed(queue, build, elem, - timestamp_delta) - elif elem.name == 'completed': - self._build_completed(queue, build, elem, timestamp_delta) - elif elem.name == 'aborted': - self._build_aborted(queue, build) - elif elem.name == 'error': - build.status = Build.FAILURE - - elif cmd == 'NUL': - self.building = False + req.send_response(200) + req.send_header('Content-Type', 'application/x-bitten+xml') + req.send_header('Content-Length', str(len(body))) + req.send_header('Content-Disposition', + 'attachment; filename=recipe_%s_r%s.xml' % + (config.name, build.rev)) + req.write(body) + raise RequestDone - snapshot_name = os.path.basename(snapshot) - message = beep.Payload(file(snapshot, 'rb'), - content_type='application/tar', - content_encoding='bzip2', - content_disposition=snapshot_name) - self.channel.send_msg(message, handle_reply=handle_reply) - - def _build_started(self, queue, build, elem, timestamp_delta=None): - build.started = int(_parse_iso_datetime(elem.attr['time'])) - if timestamp_delta: - build.started -= timestamp_delta - build.update() + def _process_build_step(self, req, config, build, stepname): + step = BuildStep.fetch(self.env, build=build.id, name=stepname) + if step: + raise HTTPConflict('Build step already exists') - log.info('Slave %s started build %d ("%s" as of [%s])', - self.name, build.id, build.config, build.rev) - for listener in BuildSystem(queue.env).listeners: - listener.build_started(build) - - def _build_step_completed(self, queue, build, elem, timestamp_delta=None): - log.debug('Slave %s completed step "%s" with status %s', self.name, - elem.attr['id'], elem.attr['result']) + recipe = Recipe(xmlio.parse(config.recipe)) + index = None + current_step = None + for num, recipe_step in enumerate(recipe): + if recipe_step.id == stepname: + index = num + current_step = recipe_step + if index is None: + raise HTTPForbidden('No such build step') + last_step = index == num - db = queue.env.get_db_cnx() + try: + elem = xmlio.parse(req.read()) + except xmlio.ParseError, e: + raise HTTPBadRequest('XML parser error') - step = BuildStep(queue.env, build=build.id, name=elem.attr['id'], - description=elem.attr.get('description')) - step.started = int(_parse_iso_datetime(elem.attr['time'])) - step.stopped = step.started + int(elem.attr['duration']) - if timestamp_delta: - step.started -= timestamp_delta - step.stopped -= timestamp_delta - if elem.attr['result'] == 'failure': - log.warning('Build %s Step %s failed', build.id, elem.attr['id']) + self.log.debug('Slave %s completed step %d (%s) with status %s', + build.slave, index, stepname, elem.attr['status']) + + db = self.env.get_db_cnx() + + step = BuildStep(self.env, build=build.id, name=stepname) + try: + step.started = int(_parse_iso_datetime(elem.attr['time'])) + step.stopped = step.started + float(elem.attr['duration']) + except ValueError, e: + raise HTTPBadRequest(e.args[0]) + if elem.attr['status'] == 'failure': + self.log.warning('Build %s step %s failed', build.id, stepname) step.status = BuildStep.FAILURE else: step.status = BuildStep.SUCCESS step.errors += [error.gettext() for error in elem.children('error')] step.insert(db=db) + # Collect log messages from the request body for idx, log_elem in enumerate(elem.children('log')): - build_log = BuildLog(queue.env, build=build.id, step=step.name, + build_log = BuildLog(self.env, build=build.id, step=step.name, generator=log_elem.attr.get('generator'), orderno=idx) for message_elem in log_elem.children('message'): @@ -321,8 +199,9 @@ message_elem.gettext())) build_log.insert(db=db) + # Collect report data from the request body for report_elem in elem.children('report'): - report = Report(queue.env, build=build.id, step=step.name, + report = Report(self.env, build=build.id, step=step.name, category=report_elem.attr.get('category'), generator=report_elem.attr.get('generator')) for item_elem in report_elem.children(): @@ -333,42 +212,40 @@ report.items.append(item) report.insert(db=db) + # If this was the last step in the recipe we mark the build as + # completed + if last_step or step.status == BuildStep.FAILURE and \ + current_step.onerror == 'fail': + self.log.info('Slave %s completed build %d ("%s" as of [%s])', + build.slave, build.id, build.config, build.rev) + build.stopped = step.stopped + + # Determine overall outcome of the build by checking the outcome + # of the individual steps against the "onerror" specification of + # each step in the recipe + for num, recipe_step in enumerate(recipe): + step = BuildStep.fetch(self.env, build.id, recipe_step.id) + if step.status == BuildStep.FAILURE: + if recipe_step.onerror != 'ignore': + build.status = Build.FAILURE + break + else: + build.status = Build.SUCCESS + + build.update(db=db) + db.commit() - def _build_completed(self, queue, build, elem, timestamp_delta=None): - build.stopped = int(_parse_iso_datetime(elem.attr['time'])) - if timestamp_delta: - build.stopped -= timestamp_delta - if elem.attr['result'] == 'failure': - build.status = Build.FAILURE - else: - build.status = Build.SUCCESS - build.update() - - log.info('Slave %s completed build %d ("%s" as of [%s]) with status %s', - self.name, build.id, build.config, build.rev, - build.status == Build.FAILURE and 'FAILURE' or 'SUCCESS') - for listener in BuildSystem(queue.env).listeners: - listener.build_completed(build) + if last_step: + for listener in BuildSystem(self.env).listeners: + listener.build_completed(build) - def _build_aborted(self, queue, build): - log.info('Slave %s aborted build %d ("%s" as of [%s])', - self.name, build.id, build.config, build.rev) - for listener in BuildSystem(queue.env).listeners: - listener.build_aborted(build) - - db = queue.env.get_db_cnx() - - for step in list(BuildStep.select(queue.env, build=build.id, db=db)): - step.delete(db=db) - - build.slave = None - build.slave_info = {} - build.started = 0 - build.status = Build.PENDING - build.update(db=db) - - db.commit() + body = 'Build step processed' + req.send_response(200) + req.send_header('Content-Type', 'text/plain') + req.send_header('Content-Length', str(len(body))) + req.write(body) + raise RequestDone def _parse_iso_datetime(string): @@ -380,106 +257,4 @@ string = string.split('.', 1)[0] # strip out microseconds return calendar.timegm(time.strptime(string, '%Y-%m-%dT%H:%M:%S')) except ValueError, e: - raise ValueError('Invalid ISO date/time %s (%s)' % (string, e)) - -def main(): - """Main entry point for running the build master.""" - from bitten import __version__ as VERSION - from optparse import OptionParser - - # Parse command-line arguments - parser = OptionParser(usage='usage: %prog [options] ENV_PATHS', - version='%%prog %s' % VERSION) - parser.add_option('-p', '--port', action='store', type='int', dest='port', - help='port number to use') - parser.add_option('-H', '--host', action='store', dest='host', - metavar='HOSTNAME', - help='the host name or IP address to bind to') - parser.add_option('-l', '--log', dest='logfile', metavar='FILENAME', - help='write log messages to FILENAME') - parser.add_option('-i', '--interval', dest='interval', metavar='SECONDS', - default=DEFAULT_CHECK_INTERVAL, type='int', - help='poll interval for changeset detection') - parser.add_option('--build-all', action='store_true', dest='buildall', - help='build older revisions even when a build for a ' - 'newer revision has already been performed') - parser.add_option('--timewarp', action='store_true', dest='timewarp', - help='adjust timestamps of builds to be near the ' - 'timestamps of the corresponding changesets') - parser.add_option('--debug', action='store_const', dest='loglevel', - const=logging.DEBUG, help='enable debugging output') - parser.add_option('-v', '--verbose', action='store_const', dest='loglevel', - const=logging.INFO, help='print as much as possible') - parser.add_option('-q', '--quiet', action='store_const', dest='loglevel', - const=logging.ERROR, help='print as little as possible') - parser.set_defaults(port=7633, loglevel=logging.WARNING) - options, args = parser.parse_args() - - if len(args) < 1: - parser.error('incorrect number of arguments') - - # Configure logging - logger = logging.getLogger('bitten') - logger.setLevel(options.loglevel) - handler = logging.StreamHandler() - if options.logfile: - handler.setLevel(logging.WARNING) - else: - handler.setLevel(options.loglevel) - formatter = logging.Formatter('%(message)s') - handler.setFormatter(formatter) - logger.addHandler(handler) - if options.logfile: - handler = logging.FileHandler(options.logfile) - handler.setLevel(options.loglevel) - formatter = logging.Formatter('%(asctime)s [%(name)s] %(levelname)s: ' - '%(message)s') - handler.setFormatter(formatter) - logger.addHandler(handler) - - port = options.port - if not (1 <= port <= 65535): - parser.error('port must be an integer in the range 1-65535') - - host = options.host - if not host: - import socket - ip = socket.gethostbyname(socket.gethostname()) - try: - host = socket.gethostbyaddr(ip)[0] - except socket.error, e: - log.warning('Reverse host name lookup failed (%s)', e) - host = ip - - envs = [] - env_names = set() - for env_path in [os.path.normpath(arg) for arg in args]: - if not os.path.isdir(env_path): - log.warning('Ignoring %s: not a directory', env_path) - continue - env_name = os.path.basename(env_path) - if env_name in env_names: - log.warning('Ignoring %s: duplicate project name "%s"', env_path, - env_name) - continue - env_names.add(env_name) - env = Environment(env_path) - if BuildSystem(env): - if env.needs_upgrade(): - log.warning('Environment at %s needs to be upgraded', env.path) - continue - envs.append(env) - if not envs: - log.error('None of the specified environments has support for Bitten') - sys.exit(2) - - master = Master(envs, host, port, build_all=options.buildall, - adjust_timestamps=options.timewarp, - check_interval=options.interval) - try: - master.run(timeout=5.0) - except KeyboardInterrupt: - master.quit() - -if __name__ == '__main__': - main() + raise ValueError('Invalid ISO date/time %r' % string)