view bitten/master.py @ 239:bc7b77236011

Add MD5-based integrity checks for the snapshot archives maintained by the build master. If an archive is corrupted (for example by interruption of the archive creation), the build master will detect this because the MD5 checksum file is either missing, or does not match. Closes #56. Thanks to Chandler Carruth for the suggestion!
author cmlenz
date Sun, 02 Oct 2005 13:02:03 +0000
parents a8c9dd7e3f71
children 372d1de2e3ec
line wrap: on
line source
# -*- coding: iso8859-1 -*-
#
# Copyright (C) 2005 Christopher Lenz <cmlenz@gmx.de>
# All rights reserved.
#
# This software is licensed as described in the file COPYING, which
# you should have received as part of this distribution. The terms
# are also available at http://bitten.cmlenz.net/wiki/License.

from datetime import datetime, timedelta
import logging
import os
try:
    set
except NameError:
    from sets import Set as set
import sys
import time

from trac.env import Environment
from bitten.model import BuildConfig, Build, BuildStep, BuildLog, Report
from bitten.queue import BuildQueue
from bitten.trac_ext.main import BuildSystem
from bitten.util import archive, beep, xmlio

log = logging.getLogger('bitten.master')

DEFAULT_CHECK_INTERVAL = 120 # 2 minutes


class Master(beep.Listener):

    def __init__(self, envs, ip, port, adjust_timestamps=False,
                 check_interval=DEFAULT_CHECK_INTERVAL):
        beep.Listener.__init__(self, ip, port)
        self.profiles[OrchestrationProfileHandler.URI] = \
                OrchestrationProfileHandler
        self.adjust_timestamps = adjust_timestamps
        self.check_interval = check_interval
        self.handlers = {} # Map of connected slaves keyed by name

        self.queues = []
        for env in envs:
            self.queues.append(BuildQueue(env))

        self.schedule(self.check_interval, self._enqueue_builds)

    def close(self):
        for queue in self.queues:
            queue.reset_orphaned_builds()
        beep.Listener.close(self)

    def _cleanup(self, when):
        for queue in self.queues:
            queue.remove_unused_snapshots()

    def _enqueue_builds(self, when):
        self.schedule(self.check_interval, self._enqueue_builds)

        for queue in self.queues:
            queue.populate()

        self.schedule(self.check_interval * 0.2, self._initiate_builds)
        self.schedule(self.check_interval * 1.8, self._cleanup)

    def _initiate_builds(self, when):
        available_slaves = set([name for name in self.handlers
                                if not self.handlers[name].building])

        for idx, queue in enumerate(self.queues[:]):
            build, slave = queue.get_next_pending_build(available_slaves)
            if build:
                self.handlers[slave].send_initiation(queue, build)
                available_slaves.discard(slave)
                self.queues.append(self.queues.pop(idx)) # Round robin

    def register(self, handler):
        any_match = False
        for queue in self.queues:
            if queue.register_slave(handler.name, handler.info):
                any_match = True

        if not any_match:
            log.warning('Slave %s does not match any of the configured target '
                        'platforms', handler.name)
            return False

        self.handlers[handler.name] = handler
        self.schedule(self.check_interval * 0.2, self._initiate_builds)

        log.info('Registered slave "%s"', handler.name)
        return True

    def unregister(self, handler):
        if handler.name not in self.handlers:
            return

        for queue in self.queues:
            queue.unregister_slave(handler.name)
        del self.handlers[handler.name]

        log.info('Unregistered slave "%s"', handler.name)


class OrchestrationProfileHandler(beep.ProfileHandler):
    """Handler for communication on the Bitten build orchestration profile from
    the perspective of the build master.
    """
    URI = 'http://bitten.cmlenz.net/beep/orchestration'

    def handle_connect(self):
        self.master = self.session.listener
        assert self.master
        self.name = None
        self.building = False
        self.info = {}

    def handle_disconnect(self):
        self.master.unregister(self)

    def handle_msg(self, msgno, payload):
        assert payload.content_type == beep.BEEP_XML
        elem = xmlio.parse(payload.body)

        if elem.name == 'register':
            self.name = elem.attr['name']
            self.info[Build.IP_ADDRESS] = self.session.addr[0]
            for child in elem.children():
                if child.name == 'platform':
                    self.info[Build.MACHINE] = child.gettext()
                    self.info[Build.PROCESSOR] = child.attr.get('processor')
                elif child.name == 'os':
                    self.info[Build.OS_NAME] = child.gettext()
                    self.info[Build.OS_FAMILY] = child.attr.get('family')
                    self.info[Build.OS_VERSION] = child.attr.get('version')
                elif child.name == 'package':
                    for name, value in child.attr.items():
                        if name == 'name':
                            continue
                        self.info[child.attr['name'] + '.' + name] = value

            if not self.master.register(self):
                xml = xmlio.Element('error', code=550)[
                    'Nothing for you to build here, please move along'
                ]
                self.channel.send_err(msgno, beep.Payload(xml))
                return

            xml = xmlio.Element('ok')
            self.channel.send_rpy(msgno, beep.Payload(xml))

    def send_initiation(self, queue, build):
        log.info('Initiating build of "%s" on slave %s', build.config,
                 self.name)
        self.building = True

        def handle_reply(cmd, msgno, ansno, payload):
            if cmd == 'ERR':
                if payload.content_type == beep.BEEP_XML:
                    elem = xmlio.parse(payload.body)
                    if elem.name == 'error':
                        log.warning('Slave %s refused build request: %s (%d)',
                                    self.name, elem.gettext(),
                                    int(elem.attr['code']))
                self.building = False
                return

            elem = xmlio.parse(payload.body)
            assert elem.name == 'proceed'
            type = encoding = None
            for child in elem.children('accept'):
                type, encoding = child.attr['type'], child.attr.get('encoding')
                if (type, encoding) in (('application/tar', 'gzip'),
                                        ('application/tar', 'bzip2'),
                                        ('application/tar', None),
                                        ('application/zip', None)):
                    break
                type = None
            if not type:
                xml = xmlio.Element('error', code=550)[
                    'None of the accepted archive formats supported'
                ]
                self.channel.send_err(beep.Payload(xml))
                self.building = False
                return
            self.send_snapshot(queue, build, type, encoding)

        config = BuildConfig.fetch(queue.env, build.config)
        self.channel.send_msg(beep.Payload(config.recipe),
                              handle_reply=handle_reply)

    def send_snapshot(self, queue, build, type, encoding):
        timestamp_delta = 0
        if self.master.adjust_timestamps:
            d = datetime.now() - timedelta(seconds=self.master.check_interval) \
                - datetime.fromtimestamp(build.rev_time)
            log.info('Warping timestamps by %s' % d)
            timestamp_delta = d.days * 86400 + d.seconds

        def handle_reply(cmd, msgno, ansno, payload):
            if cmd == 'ERR':
                assert payload.content_type == beep.BEEP_XML
                elem = xmlio.parse(payload.body)
                if elem.name == 'error':
                    log.warning('Slave %s refused to start build: %s (%d)',
                                self.name, elem.gettext(),
                                int(elem.attr['code']))
                self.building = False

            elif cmd == 'ANS':
                assert payload.content_type == beep.BEEP_XML
                elem = xmlio.parse(payload.body)
                if elem.name == 'started':
                    self._build_started(queue, build, elem, timestamp_delta)
                elif elem.name == 'step':
                    self._build_step_completed(queue, build, elem,
                                               timestamp_delta)
                elif elem.name == 'completed':
                    self._build_completed(queue, build, elem, timestamp_delta)
                elif elem.name == 'aborted':
                    self._build_aborted(queue, build)
                elif elem.name == 'error':
                    build.status = Build.FAILURE

            elif cmd == 'NUL':
                self.building = False

        snapshot_format = {
            ('application/tar', 'bzip2'): 'bzip2',
            ('application/tar', 'gzip'): 'gzip',
            ('application/tar', None): 'tar',
            ('application/zip', None): 'zip',
        }[(type, encoding)]
        snapshot_path = queue.get_snapshot(build, snapshot_format, create=True)
        snapshot_name = os.path.basename(snapshot_path)
        message = beep.Payload(file(snapshot_path, 'rb'), content_type=type,
                               content_disposition=snapshot_name,
                               content_encoding=encoding)
        self.channel.send_msg(message, handle_reply=handle_reply)

    def _build_started(self, queue, build, elem, timestamp_delta=None):
        build.slave = self.name
        build.slave_info.update(self.info)
        build.started = int(_parse_iso_datetime(elem.attr['time']))
        if timestamp_delta:
            build.started -= timestamp_delta
        build.status = Build.IN_PROGRESS
        log.info('Slave %s started build %d ("%s" as of [%s])',
                 self.name, build.id, build.config, build.rev)
        build.update()

    def _build_step_completed(self, queue, build, elem, timestamp_delta=None):
        log.debug('Slave %s completed step "%s" with status %s', self.name,
                  elem.attr['id'], elem.attr['result'])

        db = queue.env.get_db_cnx()

        step = BuildStep(queue.env, build=build.id, name=elem.attr['id'],
                         description=elem.attr.get('description'))
        step.started = int(_parse_iso_datetime(elem.attr['time']))
        step.stopped = step.started + int(elem.attr['duration'])
        if timestamp_delta:
            step.started -= timestamp_delta
            step.stopped -= timestamp_delta
        if elem.attr['result'] == 'failure':
            log.warning('Step failed')
            step.status = BuildStep.FAILURE
        else:
            step.status = BuildStep.SUCCESS
        step.insert(db=db)

        for idx, log_elem in enumerate(elem.children('log')):
            build_log = BuildLog(queue.env, build=build.id, step=step.name,
                                 generator=log_elem.attr.get('generator'),
                                 orderno=idx)
            for message_elem in log_elem.children('message'):
                build_log.messages.append((message_elem.attr['level'],
                                           message_elem.gettext()))
            build_log.insert(db=db)

        for report_elem in elem.children('report'):
            report = Report(queue.env, build=build.id, step=step.name,
                            category=report_elem.attr.get('category'),
                            generator=report_elem.attr.get('generator'))
            for item_elem in report_elem.children():
                item = {'type': item_elem.name}
                item.update(item_elem.attr)
                for child_elem in item_elem.children():
                    item[child_elem.name] = child_elem.gettext()
                report.items.append(item)
            report.insert(db=db)

        db.commit()

    def _build_completed(self, queue, build, elem, timestamp_delta=None):
        log.info('Slave %s completed build %d ("%s" as of [%s]) with status %s',
                 self.name, build.id, build.config, build.rev,
                 elem.attr['result'])

        build.stopped = int(_parse_iso_datetime(elem.attr['time']))
        if timestamp_delta:
            build.stopped -= timestamp_delta
        if elem.attr['result'] == 'failure':
            build.status = Build.FAILURE
        else:
            build.status = Build.SUCCESS
        build.update()

    def _build_aborted(self, queue, build):
        log.info('Slave %s aborted build %d ("%s" as of [%s])',
                 self.name, build.id, build.config, build.rev)

        db = queue.env.get_db_cnx()

        for step in list(BuildStep.select(queue.env, build=build.id, db=db)):
            step.delete(db=db)

        build.slave = None
        build.started = 0
        build.status = Build.PENDING
        build.slave_info = {}
        build.update(db=db)

        db.commit()


def _parse_iso_datetime(string):
    """Minimal parser for ISO date-time strings.
    
    Return the time as floating point number. Only handles UTC timestamps
    without time zone information."""
    try:
        string = string.split('.', 1)[0] # strip out microseconds
        secs = time.mktime(time.strptime(string, '%Y-%m-%dT%H:%M:%S'))
        tzoffset = time.timezone
        if time.daylight:
            tzoffset = time.altzone
        return secs - tzoffset
    except ValueError, e:
        raise ValueError, 'Invalid ISO date/time %s (%s)' % (string, e)

def main():
    from bitten import __version__ as VERSION
    from optparse import OptionParser

    # Parse command-line arguments
    parser = OptionParser(usage='usage: %prog [options] ENV_PATHS',
                          version='%%prog %s' % VERSION)
    parser.add_option('-p', '--port', action='store', type='int', dest='port',
                      help='port number to use')
    parser.add_option('-H', '--host', action='store', dest='host',
                      metavar='HOSTNAME',
                      help='the host name or IP address to bind to')
    parser.add_option('-l', '--log', dest='logfile', metavar='FILENAME',
                      help='write log messages to FILENAME')
    parser.add_option('-i', '--interval', dest='interval', metavar='SECONDS',
                      default=DEFAULT_CHECK_INTERVAL, type='int',
                      help='poll interval for changeset detection')
    parser.add_option('--timewarp', action='store_const', dest='timewarp',
                      const=True,
                      help='adjust timestamps of builds to be neat the '
                           'timestamps of the corresponding changesets')
    parser.add_option('--debug', action='store_const', dest='loglevel',
                      const=logging.DEBUG, help='enable debugging output')
    parser.add_option('-v', '--verbose', action='store_const', dest='loglevel',
                      const=logging.INFO, help='print as much as possible')
    parser.add_option('-q', '--quiet', action='store_const', dest='loglevel',
                      const=logging.ERROR, help='print as little as possible')
    parser.set_defaults(port=7633, loglevel=logging.WARNING)
    options, args = parser.parse_args()

    if len(args) < 1:
        parser.error('incorrect number of arguments')

    # Configure logging
    logger = logging.getLogger('bitten')
    logger.setLevel(options.loglevel)
    handler = logging.StreamHandler()
    if options.logfile:
        handler.setLevel(logging.WARNING)
    else:
        handler.setLevel(options.loglevel)
    formatter = logging.Formatter('%(message)s')
    handler.setFormatter(formatter)
    logger.addHandler(handler)
    if options.logfile:
        handler = logging.FileHandler(options.logfile)
        handler.setLevel(options.loglevel)
        formatter = logging.Formatter('%(asctime)s [%(name)s] %(levelname)s: '
                                      '%(message)s')
        handler.setFormatter(formatter)
        logger.addHandler(handler)

    port = options.port
    if not (1 <= port <= 65535):
        parser.error('port must be an integer in the range 1-65535')

    host = options.host
    if not host:
        import socket
        ip = socket.gethostbyname(socket.gethostname())
        try:
            host = socket.gethostbyaddr(ip)[0]
        except socket.error, e:
            log.warning('Reverse host name lookup failed (%s)', e)
            host = ip

    envs = []
    for arg in args:
        if not os.path.isdir(arg):
            log.warning('Ignoring %s: not a directory', arg)
            continue
        env = Environment(arg)
        if BuildSystem(env):
            if env.needs_upgrade():
                log.warning('Environment at %s needs to be upgraded', env.path)
                continue
            envs.append(env)
    if not envs:
        log.error('None of the specified environments has support for Bitten')
        sys.exit(2)

    master = Master(envs, host, port, adjust_timestamps=options.timewarp,
                    check_interval=options.interval)
    try:
        master.run(timeout=5.0)
    except KeyboardInterrupt:
        master.quit()

if __name__ == '__main__':
    main()
Copyright (C) 2012-2017 Edgewall Software