view bitten/master.py @ 213:25f84dd9f159

* Refactoring of build recipes, the file format has changed slightly: * The namespace URIs of recipe command collections are now abstract, implementations are registered using setuptools entry points. * Commands for report generation are no longer nested in a `<reports>` sub-element, but are at the same level as normal commands. * Fixed linking to files from the test results and code coverage summarizers. * Windows file separators are normalized to a forward slash by recipe commands (thereby also fixing linking to the repository browser from report summaries). * Paths using backslashes as file separators are now recognized in build logs in the web interface, and linked to the repository browser. * The `generator` column in the build log and report tables now has the qualified name of the recipe command that generated the log messages or report data. * There's a database upgrade script to fix file separator normalization and generator values for existing reports and build logs.
author cmlenz
date Tue, 20 Sep 2005 22:16:41 +0000
parents e1a53e70b43f
children 014bc6c29dff
line wrap: on
line source
# -*- coding: iso8859-1 -*-
#
# Copyright (C) 2005 Christopher Lenz <cmlenz@gmx.de>
# All rights reserved.
#
# This software is licensed as described in the file COPYING, which
# you should have received as part of this distribution. The terms
# are also available at http://bitten.cmlenz.net/wiki/License.

from datetime import datetime, timedelta
from itertools import ifilter
import logging
import os.path
import re
try:
    set
except NameError:
    from sets import Set as set
import time

from trac.env import Environment
from bitten.model import BuildConfig, TargetPlatform, Build, BuildStep, \
                         BuildLog, Report
from bitten.util import archive, beep, xmlio

log = logging.getLogger('bitten.master')

DEFAULT_CHECK_INTERVAL = 120 # 2 minutes


class Master(beep.Listener):

    def __init__(self, env_path, ip, port, adjust_timestamps=False,
                 check_interval=DEFAULT_CHECK_INTERVAL):
        beep.Listener.__init__(self, ip, port)
        self.profiles[OrchestrationProfileHandler.URI] = OrchestrationProfileHandler
        self.env = Environment(env_path)
        self.adjust_timestamps = adjust_timestamps
        self.check_interval = check_interval

        self.slaves = {}

        # path to generated snapshot archives, key is (config name, revision)
        self.snapshots = {}
        for config in BuildConfig.select(self.env):
            snapshots = archive.index(self.env, prefix=config.name)
            for rev, format, path in snapshots:
                self.snapshots[(config.name, rev, format)] = path

        self._cleanup_orphaned_builds()
        self.schedule(self.check_interval, self._check_build_triggers)

    def close(self):
        self._cleanup_orphaned_builds()
        beep.Listener.close(self)

    def _check_build_triggers(self, when):
        self.schedule(self.check_interval, self._check_build_triggers)

        repos = self.env.get_repository()
        try:
            repos.sync()

            db = self.env.get_db_cnx()
            for config in BuildConfig.select(self.env, db=db):
                log.debug('Checking for changes to "%s" at %s', config.label,
                          config.path)
                node = repos.get_node(config.path)
                for path, rev, chg in node.get_history():

                    # Don't follow moves/copies
                    if path != repos.normalize_path(config.path):
                        break

                    # Make sure the repository directory isn't empty at this
                    # revision
                    old_node = repos.get_node(path, rev)
                    is_empty = True
                    for entry in old_node.get_entries():
                        is_empty = False
                        break
                    if is_empty:
                        continue

                    enqueued = False
                    for platform in TargetPlatform.select(self.env,
                                                          config.name, db=db):
                        # Check whether the latest revision of the configuration
                        # has already been built on this platform
                        builds = Build.select(self.env, config.name, rev,
                                              platform.id, db=db)
                        if not list(builds):
                            log.info('Enqueuing build of configuration "%s" at '
                                     'revision [%s] on %s', config.name, rev,
                                     platform.name)
                            build = Build(self.env)
                            build.config = config.name
                            build.rev = str(rev)
                            build.rev_time = repos.get_changeset(rev).date
                            build.platform = platform.id
                            build.insert(db)
                            enqueued = True
                    if enqueued:
                        db.commit()
                        break
        finally:
            repos.close()

        self.schedule(self.check_interval * 0.2, self._check_build_queue)
        self.schedule(self.check_interval * 1.8, self._cleanup_snapshots)

    def _check_build_queue(self, when):
        if not self.slaves:
            return
        log.debug('Checking for pending builds...')
        for build in Build.select(self.env, status=Build.PENDING):
            config = BuildConfig.fetch(self.env, name=build.config)
            if not config.active:
                continue
            for slave in self.slaves.get(build.platform, []):
                active_builds = Build.select(self.env, slave=slave.name,
                                             status=Build.IN_PROGRESS)
                if not list(active_builds):
                    slave.send_initiation(build)
                    return

    def _cleanup_orphaned_builds(self):
        # Reset all in-progress builds
        db = self.env.get_db_cnx()
        for build in Build.select(self.env, status=Build.IN_PROGRESS, db=db):
            build.status = Build.PENDING
            build.slave = None
            build.slave_info = {}
            build.started = 0
            for step in list(BuildStep.select(self.env, build=build.id, db=db)):
                step.delete(db=db)
            build.update(db=db)
        db.commit()

    def _cleanup_snapshots(self, when):
        log.debug('Checking for unused snapshot archives...')
        for (config, rev, format), path in self.snapshots.items():
            keep = False
            for build in Build.select(self.env, config=config, rev=rev):
                if build.status not in (Build.SUCCESS, Build.FAILURE):
                    keep = True
                    break
            if not keep:
                log.info('Removing unused snapshot %s', path)
                os.unlink(path)
                del self.snapshots[(config, rev, format)]

    def get_snapshot(self, build, format):
        snapshot = self.snapshots.get((build.config, build.rev, format))
        if not snapshot:
            config = BuildConfig.fetch(self.env, build.config)
            snapshot = archive.pack(self.env, path=config.path, rev=build.rev,
                                    prefix=config.name, format=format)
            log.info('Prepared snapshot archive at %s' % snapshot)
            self.snapshots[(build.config, build.rev, format)] = snapshot
        return snapshot

    def register(self, handler):
        any_match = False
        for config in BuildConfig.select(self.env):
            for platform in TargetPlatform.select(self.env, config=config.name):
                if not platform.id in self.slaves:
                    self.slaves[platform.id] = set()
                match = True
                for propname, pattern in ifilter(None, platform.rules):
                    try:
                        propvalue = handler.info.get(propname)
                        if not propvalue or not re.match(pattern, propvalue):
                            match = False
                            break
                    except re.error:
                        log.error('Invalid platform matching pattern "%s"',
                                  pattern, exc_info=True)
                        match = False
                        break
                if match:
                    log.debug('Slave %s matched target platform %s',
                              handler.name, platform.name)
                    self.slaves[platform.id].add(handler)
                    any_match = True

        if not any_match:
            log.warning('Slave %s does not match any of the configured target '
                        'platforms', handler.name)
            return False

        self.schedule(self.check_interval * 0.2, self._check_build_queue)

        log.info('Registered slave "%s"', handler.name)
        return True

    def unregister(self, handler):
        for slaves in self.slaves.values():
            slaves.discard(handler)

        db = self.env.get_db_cnx()
        for build in Build.select(self.env, slave=handler.name,
                                  status=Build.IN_PROGRESS, db=db):
            log.info('Build %d ("%s" as of [%s]) cancelled by  %s', build.id,
                     build.rev, build.config, handler.name)
            for step in list(BuildStep.select(self.env, build=build.id)):
                step.delete(db=db)

            build.slave = None
            build.slave_info = {}
            build.status = Build.PENDING
            build.started = 0
            build.update(db=db)
            break
        db.commit()
        log.info('Unregistered slave "%s"', handler.name)


class OrchestrationProfileHandler(beep.ProfileHandler):
    """Handler for communication on the Bitten build orchestration profile from
    the perspective of the build master.
    """
    URI = 'http://bitten.cmlenz.net/beep/orchestration'

    def handle_connect(self):
        self.master = self.session.listener
        assert self.master
        self.env = self.master.env
        assert self.env
        self.name = None
        self.info = {}

    def handle_disconnect(self):
        self.master.unregister(self)

    def handle_msg(self, msgno, payload):
        assert payload.content_type == beep.BEEP_XML
        elem = xmlio.parse(payload.body)

        if elem.name == 'register':
            self.name = elem.attr['name']
            self.info[Build.IP_ADDRESS] = self.session.addr[0]
            for child in elem.children():
                if child.name == 'platform':
                    self.info[Build.MACHINE] = child.gettext()
                    self.info[Build.PROCESSOR] = child.attr.get('processor')
                elif child.name == 'os':
                    self.info[Build.OS_NAME] = child.gettext()
                    self.info[Build.OS_FAMILY] = child.attr.get('family')
                    self.info[Build.OS_VERSION] = child.attr.get('version')
                elif child.name == 'package':
                    for name, value in child.attr.items():
                        if name == 'name':
                            continue
                        self.info[child.attr['name'] + '.' + name] = value

            if not self.master.register(self):
                xml = xmlio.Element('error', code=550)[
                    'Nothing for you to build here, please move along'
                ]
                self.channel.send_err(msgno, beep.Payload(xml))
                return

            xml = xmlio.Element('ok')
            self.channel.send_rpy(msgno, beep.Payload(xml))

    def send_initiation(self, build):
        log.info('Initiating build of "%s" on slave %s', build.config,
                 self.name)

        def handle_reply(cmd, msgno, ansno, payload):
            if cmd == 'ERR':
                if payload.content_type == beep.BEEP_XML:
                    elem = xmlio.parse(payload.body)
                    if elem.name == 'error':
                        log.warning('Slave %s refused build request: %s (%d)',
                                    self.name, elem.gettext(),
                                    int(elem.attr['code']))
                return

            elem = xmlio.parse(payload.body)
            assert elem.name == 'proceed'
            type = encoding = None
            for child in elem.children('accept'):
                type, encoding = child.attr['type'], child.attr.get('encoding')
                if (type, encoding) in (('application/tar', 'gzip'),
                                        ('application/tar', 'bzip2'),
                                        ('application/tar', None),
                                        ('application/zip', None)):
                    break
                type = None
            if not type:
                xml = xmlio.Element('error', code=550)[
                    'None of the accepted archive formats supported'
                ]
                self.channel.send_err(beep.Payload(xml))
                return
            self.send_snapshot(build, type, encoding)

        config = BuildConfig.fetch(self.env, build.config)
        self.channel.send_msg(beep.Payload(config.recipe),
                              handle_reply=handle_reply)

    def send_snapshot(self, build, type, encoding):
        timestamp_delta = 0
        if self.master.adjust_timestamps:
            d = datetime.now() - timedelta(seconds=self.master.check_interval) \
                - datetime.fromtimestamp(build.rev_time)
            log.info('Warping timestamps by %s' % d)
            timestamp_delta = d.days * 86400 + d.seconds

        def handle_reply(cmd, msgno, ansno, payload):
            if cmd == 'ERR':
                assert payload.content_type == beep.BEEP_XML
                elem = xmlio.parse(payload.body)
                if elem.name == 'error':
                    log.warning('Slave %s refused to start build: %s (%d)',
                                self.name, elem.gettext(),
                                int(elem.attr['code']))

            elif cmd == 'ANS':
                assert payload.content_type == beep.BEEP_XML
                elem = xmlio.parse(payload.body)
                if elem.name == 'started':
                    self._build_started(build, elem, timestamp_delta)
                elif elem.name == 'step':
                    self._build_step_completed(build, elem, timestamp_delta)
                elif elem.name == 'completed':
                    self._build_completed(build, elem, timestamp_delta)
                elif elem.name == 'aborted':
                    self._build_aborted(build)
                elif elem.name == 'error':
                    build.status = Build.FAILURE

        snapshot_format = {
            ('application/tar', 'bzip2'): 'bzip2',
            ('application/tar', 'gzip'): 'gzip',
            ('application/tar', None): 'tar',
            ('application/zip', None): 'zip',
        }[(type, encoding)]
        snapshot_path = self.master.get_snapshot(build, snapshot_format)
        snapshot_name = os.path.basename(snapshot_path)
        message = beep.Payload(file(snapshot_path), content_type=type,
                               content_disposition=snapshot_name,
                               content_encoding=encoding)
        self.channel.send_msg(message, handle_reply=handle_reply)

    def _build_started(self, build, elem, timestamp_delta=None):
        build.slave = self.name
        build.slave_info.update(self.info)
        build.started = int(_parse_iso_datetime(elem.attr['time']))
        if timestamp_delta:
            build.started -= timestamp_delta
        build.status = Build.IN_PROGRESS
        log.info('Slave %s started build %d ("%s" as of [%s])',
                 self.name, build.id, build.config, build.rev)
        build.update()

    def _build_step_completed(self, build, elem, timestamp_delta=None):
        log.debug('Slave %s completed step "%s" with status %s', self.name,
                  elem.attr['id'], elem.attr['result'])

        db = self.env.get_db_cnx()

        step = BuildStep(self.env, build=build.id, name=elem.attr['id'],
                         description=elem.attr.get('description'))
        step.started = int(_parse_iso_datetime(elem.attr['time']))
        step.stopped = step.started + int(elem.attr['duration'])
        if timestamp_delta:
            step.started -= timestamp_delta
            step.stopped -= timestamp_delta
        if elem.attr['result'] == 'failure':
            log.warning('Step failed')
            step.status = BuildStep.FAILURE
        else:
            step.status = BuildStep.SUCCESS
        step.insert(db=db)

        for idx, log_elem in enumerate(elem.children('log')):
            build_log = BuildLog(self.env, build=build.id, step=step.name,
                                 generator=log_elem.attr.get('generator'),
                                 orderno=idx)
            for message_elem in log_elem.children('message'):
                build_log.messages.append((message_elem.attr['level'],
                                           message_elem.gettext()))
            build_log.insert(db=db)

        for report_elem in elem.children('report'):
            report = Report(self.env, build=build.id, step=step.name,
                            category=report_elem.attr.get('category'),
                            generator=report_elem.attr.get('generator'))
            for item_elem in report_elem.children():
                item = {'type': item_elem.name}
                item.update(item_elem.attr)
                for child_elem in item_elem.children():
                    item[child_elem.name] = child_elem.gettext()
                report.items.append(item)
            report.insert(db=db)

        db.commit()

    def _build_completed(self, build, elem, timestamp_delta=None):
        log.info('Slave %s completed build %d ("%s" as of [%s]) with status %s',
                 self.name, build.id, build.config, build.rev,
                 elem.attr['result'])

        build.stopped = int(_parse_iso_datetime(elem.attr['time']))
        if timestamp_delta:
            build.stopped -= timestamp_delta
        if elem.attr['result'] == 'failure':
            build.status = Build.FAILURE
        else:
            build.status = Build.SUCCESS
        build.update()

    def _build_aborted(self, build):
        log.info('Slave %s aborted build %d ("%s" as of [%s])',
                 self.name, build.id, build.config, build.rev)

        db = self.env.get_db_cnx()

        for step in list(BuildStep.select(self.env, build=build.id, db=db)):
            step.delete(db=db)

        build.slave = None
        build.started = 0
        build.status = Build.PENDING
        build.slave_info = {}
        build.update(db=db)

        db.commit()


def _parse_iso_datetime(string):
    """Minimal parser for ISO date-time strings.
    
    Return the time as floating point number. Only handles UTC timestamps
    without time zone information."""
    try:
        string = string.split('.', 1)[0] # strip out microseconds
        secs = time.mktime(time.strptime(string, '%Y-%m-%dT%H:%M:%S'))
        tzoffset = time.timezone
        if time.daylight:
            tzoffset = time.altzone
        return secs - tzoffset
    except ValueError, e:
        raise ValueError, 'Invalid ISO date/time %s (%s)' % (string, e)

def main():
    from bitten import __version__ as VERSION
    from optparse import OptionParser

    # Parse command-line arguments
    parser = OptionParser(usage='usage: %prog [options] env-path',
                          version='%%prog %s' % VERSION)
    parser.add_option('-p', '--port', action='store', type='int', dest='port',
                      help='port number to use')
    parser.add_option('-H', '--host', action='store', dest='host',
                      metavar='HOSTNAME',
                      help='the host name or IP address to bind to')
    parser.add_option('-l', '--log', dest='logfile', metavar='FILENAME',
                      help='write log messages to FILENAME')
    parser.add_option('-i', '--interval', dest='interval', metavar='SECONDS',
                      default=DEFAULT_CHECK_INTERVAL, type='int',
                      help='poll interval for changeset detection')
    parser.add_option('--timewarp', action='store_const', dest='timewarp',
                      const=True,
                      help='adjust timestamps of builds to be neat the '
                           'timestamps of the corresponding changesets')
    parser.add_option('--debug', action='store_const', dest='loglevel',
                      const=logging.DEBUG, help='enable debugging output')
    parser.add_option('-v', '--verbose', action='store_const', dest='loglevel',
                      const=logging.INFO, help='print as much as possible')
    parser.add_option('-q', '--quiet', action='store_const', dest='loglevel',
                      const=logging.ERROR, help='print as little as possible')
    parser.set_defaults(port=7633, loglevel=logging.WARNING)
    options, args = parser.parse_args()

    if len(args) < 1:
        parser.error('incorrect number of arguments')
    env_path = args[0]

    # Configure logging
    logger = logging.getLogger('bitten')
    logger.setLevel(options.loglevel)
    handler = logging.StreamHandler()
    if options.logfile:
        handler.setLevel(logging.WARNING)
    else:
        handler.setLevel(options.loglevel)
    formatter = logging.Formatter('%(message)s')
    handler.setFormatter(formatter)
    logger.addHandler(handler)
    if options.logfile:
        handler = logging.FileHandler(options.logfile)
        handler.setLevel(options.loglevel)
        formatter = logging.Formatter('%(asctime)s [%(name)s] %(levelname)s: '
                                      '%(message)s')
        handler.setFormatter(formatter)
        logger.addHandler(handler)

    port = options.port
    if not (1 <= port <= 65535):
        parser.error('port must be an integer in the range 1-65535')

    host = options.host
    if not host:
        import socket
        ip = socket.gethostbyname(socket.gethostname())
        try:
            host = socket.gethostbyaddr(ip)[0]
        except socket.error, e:
            log.warning('Reverse host name lookup failed (%s)', e)
            host = ip

    master = Master(env_path, host, port, adjust_timestamps=options.timewarp,
                    check_interval=options.interval)
    try:
        master.run(timeout=5.0)
    except KeyboardInterrupt:
        master.quit()

if __name__ == '__main__':
    main()
Copyright (C) 2012-2017 Edgewall Software