view bitten/master.py @ 116:86439c2aa6d6

Store report data in BDB XML database. Closes #31.
author cmlenz
date Sun, 07 Aug 2005 18:12:43 +0000
parents 16d69eb6e047
children 2f0f2f006526
line wrap: on
line source
# -*- coding: iso8859-1 -*-
#
# Copyright (C) 2005 Christopher Lenz <cmlenz@gmx.de>
#
# Bitten is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License as
# published by the Free Software Foundation; either version 2 of the
# License, or (at your option) any later version.
#
# Trac is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
#
# Author: Christopher Lenz <cmlenz@gmx.de>

from datetime import datetime, timedelta
from itertools import ifilter
import logging
import os.path
import re
try:
    set
except NameError:
    from sets import Set as set
import time

from trac.env import Environment
from bitten.model import BuildConfig, TargetPlatform, Build, BuildStep, BuildLog
from bitten.store import ReportStore
from bitten.util import archive, beep, xmlio

log = logging.getLogger('bitten.master')

DEFAULT_CHECK_INTERVAL = 120 # 2 minutes


class Master(beep.Listener):

    def __init__(self, env_path, ip, port,
                 check_interval=DEFAULT_CHECK_INTERVAL):
        beep.Listener.__init__(self, ip, port)
        self.profiles[OrchestrationProfileHandler.URI] = OrchestrationProfileHandler
        self.env = Environment(env_path)
        self.check_interval = check_interval

        self.slaves = {}

        # path to generated snapshot archives, key is (config name, revision)
        self.snapshots = {}
        for config in BuildConfig.select(self.env):
            snapshots = archive.index(self.env, prefix=config.name)
            for rev, format, path in snapshots:
                self.snapshots[(config.name, rev, format)] = path

        self._cleanup_orphaned_builds()
        self.schedule(self.check_interval, self._check_build_triggers)

    def close(self):
        self._cleanup_orphaned_builds()
        beep.Listener.close(self)

    def _check_build_triggers(self, when):
        self.schedule(self.check_interval, self._check_build_triggers)

        repos = self.env.get_repository()
        try:
            repos.sync()

            db = self.env.get_db_cnx()
            for config in BuildConfig.select(self.env, db=db):
                log.debug('Checking for changes to "%s" at %s', config.label,
                          config.path)
                node = repos.get_node(config.path)
                for path, rev, chg in node.get_history():
                    enqueued = False
                    for platform in TargetPlatform.select(self.env,
                                                          config.name, db=db):
                        # Check whether the latest revision of the configuration
                        # has already been built on this platform
                        builds = Build.select(self.env, config.name, rev,
                                              platform.id, db=db)
                        if not list(builds):
                            log.info('Enqueuing build of configuration "%s" at '
                                     'revision [%s] on %s', config.name, rev,
                                     platform.name)
                            build = Build(self.env)
                            build.config = config.name
                            build.rev = str(rev)
                            build.rev_time = repos.get_changeset(rev).date
                            build.platform = platform.id
                            build.insert(db)
                            enqueued = True
                    if enqueued:
                        db.commit()
                        break
        finally:
            repos.close()

        self.schedule(self.check_interval * 0.2, self._check_build_queue)
        self.schedule(self.check_interval * 1.8, self._cleanup_snapshots)

    def _check_build_queue(self, when):
        if not self.slaves:
            return
        log.debug('Checking for pending builds...')
        for build in Build.select(self.env, status=Build.PENDING):
            for slave in self.slaves.get(build.platform, []):
                active_builds = Build.select(self.env, slave=slave.name,
                                             status=Build.IN_PROGRESS)
                if not list(active_builds):
                    slave.send_initiation(build)
                    return

    def _cleanup_orphaned_builds(self):
        # Remove all pending or in-progress builds
        db = self.env.get_db_cnx()
        for build in Build.select(self.env, status=Build.IN_PROGRESS, db=db):
            build.status = Build.PENDING
            build.update(db=db)
        for build in Build.select(self.env, status=Build.PENDING, db=db):
            for step in BuildStep.select(self.env, build=build.id, db=db):
                for log in BuildLog.select(self.env, build=build.id,
                                           step=step.name, db=db):
                    log.delete(db=db)
                step.delete(db=db)
            build.delete(db=db)
        db.commit()

    def _cleanup_snapshots(self, when):
        log.debug('Checking for unused snapshot archives...')
        for (config, rev, format), path in self.snapshots.items():
            keep = False
            for build in Build.select(self.env, config=config, rev=rev):
                if build.status not in (Build.SUCCESS, Build.FAILURE):
                    keep = True
                    break
            if not keep:
                log.info('Removing unused snapshot %s', path)
                os.unlink(path)
                del self.snapshots[(config, rev, format)]

    def get_snapshot(self, build, format):
        snapshot = self.snapshots.get((build.config, build.rev, format))
        if not snapshot:
            config = BuildConfig.fetch(self.env, build.config)
            snapshot = archive.pack(self.env, path=config.path, rev=build.rev,
                                    prefix=config.name, format=format)
            log.info('Prepared snapshot archive at %s' % snapshot)
            self.snapshots[(build.config, build.rev, format)] = snapshot
        return snapshot

    def register(self, handler):
        any_match = False
        for config in BuildConfig.select(self.env):
            for platform in TargetPlatform.select(self.env, config=config.name):
                if not platform.id in self.slaves:
                    self.slaves[platform.id] = set()
                match = True
                for property, pattern in ifilter(None, platform.rules):
                    try:
                        if not re.match(pattern, handler.info.get(property)):
                            match = False
                            break
                    except re.error, e:
                        log.error('Invalid platform matching pattern "%s"',
                                  pattern, exc_info=True)
                        match = False
                        break
                if match:
                    log.debug('Slave %s matched target platform %s',
                              handler.name, platform.name)
                    self.slaves[platform.id].add(handler)
                    any_match = True

        if not any_match:
            log.warning('Slave %s does not match any of the configured target '
                        'platforms', handler.name)
            return False

        self.schedule(self.check_interval * 0.2, self._check_build_queue)

        log.info('Registered slave "%s"', handler.name)
        return True

    def unregister(self, handler):
        for slaves in self.slaves.values():
            slaves.discard(handler)

        for build in Build.select(self.env, slave=handler.name,
                                  status=Build.IN_PROGRESS):
            log.info('Build [%s] of "%s" by %s cancelled', build.rev,
                     build.config, handler.name)
            build.slave = None
            build.status = Build.PENDING
            build.started = 0
            build.update()
            break
        log.info('Unregistered slave "%s"', handler.name)


class OrchestrationProfileHandler(beep.ProfileHandler):
    """Handler for communication on the Bitten build orchestration profile from
    the perspective of the build master.
    """
    URI = 'http://bitten.cmlenz.net/beep/orchestration'

    def handle_connect(self):
        self.master = self.session.listener
        assert self.master
        self.env = self.master.env
        assert self.env
        self.name = None
        self.info = {}

    def handle_disconnect(self):
        self.master.unregister(self)

    def handle_msg(self, msgno, payload):
        assert payload.content_type == beep.BEEP_XML
        elem = xmlio.parse(payload.body)

        if elem.name == 'register':
            self.name = elem.attr['name']
            for child in elem.children():
                if child.name == 'platform':
                    self.info[Build.MACHINE] = child.gettext()
                    self.info[Build.PROCESSOR] = child.attr.get('processor')
                elif child.name == 'os':
                    self.info[Build.OS_NAME] = child.gettext()
                    self.info[Build.OS_FAMILY] = child.attr.get('family')
                    self.info[Build.OS_VERSION] = child.attr.get('version')
            self.info[Build.IP_ADDRESS] = self.session.addr[0]

            if not self.master.register(self):
                xml = xmlio.Element('error', code=550)[
                    'Nothing for you to build here, please move along'
                ]
                self.channel.send_err(msgno, beep.Payload(xml))
                return

            xml = xmlio.Element('ok')
            self.channel.send_rpy(msgno, beep.Payload(xml))

    def send_initiation(self, build):
        log.info('Initiating build of "%s" on slave %s', build.config,
                 self.name)

        def handle_reply(cmd, msgno, ansno, payload):
            if cmd == 'ERR':
                if payload.content_type == beep.BEEP_XML:
                    elem = xmlio.parse(payload.body)
                    if elem.name == 'error':
                        log.warning('Slave %s refused build request: %s (%d)',
                                    self.name, elem.gettext(),
                                    int(elem.attr['code']))
                return

            elem = xmlio.parse(payload.body)
            assert elem.name == 'proceed'
            type = encoding = None
            for child in elem.children('accept'):
                type, encoding = child.attr['type'], child.attr.get('encoding')
                if (type, encoding) in (('application/tar', 'gzip'),
                                        ('application/tar', 'bzip2'),
                                        ('application/tar', None),
                                        ('application/zip', None)):
                    break
                type = None
            if not type:
                xml = xmlio.Element('error', code=550)[
                    'None of the accepted archive formats supported'
                ]
                self.channel.send_err(beep.Payload(xml))
                return
            self.send_snapshot(build, type, encoding)

        xml = xmlio.Element('build', recipe='recipe.xml')
        self.channel.send_msg(beep.Payload(xml), handle_reply=handle_reply)

    def send_snapshot(self, build, type, encoding):

        def handle_reply(cmd, msgno, ansno, payload):
            if cmd == 'ERR':
                assert payload.content_type == beep.BEEP_XML
                elem = xmlio.parse(payload.body)
                if elem.name == 'error':
                    log.warning('Slave %s did not accept archive: %s (%d)',
                                self.name, elem.gettext(),
                                int(elem.attr['code']))

            elif cmd == 'ANS':
                assert payload.content_type == beep.BEEP_XML
                db = self.env.get_db_cnx()
                elem = xmlio.parse(payload.body)
                if elem.name == 'started':
                    self._build_started(db, build, elem)
                elif elem.name == 'step':
                    self._build_step_completed(db, build, elem)
                elif elem.name == 'completed':
                    self._build_completed(db, build, elem)
                elif elem.name == 'aborted':
                    self._build_aborted(db, build, elem)
                elif elem.name == 'error':
                    build.status = Build.FAILURE
                build.update(db=db)
                db.commit()                    

        snapshot_format = {
            ('application/tar', 'bzip2'): 'bzip2',
            ('application/tar', 'gzip'): 'gzip',
            ('application/tar', None): 'tar',
            ('application/zip', None): 'zip',
        }[(type, encoding)]
        snapshot_path = self.master.get_snapshot(build, snapshot_format)
        snapshot_name = os.path.basename(snapshot_path)
        message = beep.Payload(file(snapshot_path), content_type=type,
                               content_disposition=snapshot_name,
                               content_encoding=encoding)
        self.channel.send_msg(message, handle_reply=handle_reply)

    def _build_started(self, db, build, elem):
        build.slave = self.name
        build.slave_info.update(self.info)
        build.started = int(_parse_iso_datetime(elem.attr['time']))
        build.status = Build.IN_PROGRESS
        log.info('Slave %s started build %d ("%s" as of [%s])',
                 self.name, build.id, build.config, build.rev)

    def _build_step_completed(self, db, build, elem):
        log.debug('Slave completed step "%s"', elem.attr['id'])
        step = BuildStep(self.env, build=build.id, name=elem.attr['id'],
                         description=elem.attr.get('description'))
        step.started = int(_parse_iso_datetime(elem.attr['time']))
        step.stopped = step.started + int(elem.attr['duration'])
        if elem.attr['result'] == 'failure':
            log.warning('Step failed: %s', elem.gettext())
            step.status = BuildStep.FAILURE
        else:
            step.status = BuildStep.SUCCESS
        step.insert(db=db)

        # TODO: Store reports, too
        level_map = {'debug': BuildLog.DEBUG, 'info': BuildLog.INFO,
                     'warning': BuildLog.WARNING, 'error': BuildLog.ERROR}
        for log_elem in elem.children('log'):
            build_log = BuildLog(self.env, build=build.id, step=step.name,
                                 type=log_elem.attr.get('type'))
            for message_elem in log_elem.children('message'):
                build_log.messages.append((message_elem.attr['level'],
                                           message_elem.gettext()))
            build_log.insert(db=db)

        store = ReportStore(self.env)
        for report in elem.children('report'):
            store.store_report(build, step, report)

    def _build_completed(self, db, build, elem):
        log.info('Slave %s completed build %d ("%s" as of [%s])', self.name,
                 build.id, build.config, build.rev)
        build.stopped = int(_parse_iso_datetime(elem.attr['time']))
        if elem.attr['result'] == 'failure':
            build.status = Build.FAILURE
        else:
            build.status = Build.SUCCESS

    def _build_aborted(self, db, build, elem):
        log.info('Slave "%s" aborted build %d ("%s" as of [%s])',
                 self.name, build.id, build.config, build.rev)
        build.slave = None
        build.started = 0
        build.status = Build.PENDING
        build.slave_info = {}
        for step in BuildStep.select(self.env, build=build.id, db=db):
            step.delete(db=db)


def _parse_iso_datetime(string):
    """Minimal parser for ISO date-time strings.
    
    Return the time as floating point number. Only handles UTC timestamps
    without time zone information."""
    try:
        string = string.split('.', 1)[0] # strip out microseconds
        secs = time.mktime(time.strptime(string, '%Y-%m-%dT%H:%M:%S'))
        tzoffset = time.timezone
        if time.daylight:
            tzoffset = time.altzone
        return secs - tzoffset
    except ValueError, e:
        raise ValueError, 'Invalid ISO date/time %s (%s)' % (string, e)

def main():
    from bitten import __version__ as VERSION
    from optparse import OptionParser

    # Parse command-line arguments
    parser = OptionParser(usage='usage: %prog [options] env-path',
                          version='%%prog %s' % VERSION)
    parser.add_option('-p', '--port', action='store', type='int', dest='port',
                      help='port number to use')
    parser.add_option('-H', '--host', action='store', dest='host',
                      metavar='HOSTNAME',
                      help='the host name or IP address to bind to')
    parser.add_option('-l', '--log', dest='logfile', metavar='FILENAME',
                      help='write log messages to FILENAME')
    parser.add_option('-i', '--interval', dest='interval', metavar='SECONDS',
                      default=DEFAULT_CHECK_INTERVAL, type='int',
                      help='poll interval for changeset detection')
    parser.add_option('--debug', action='store_const', dest='loglevel',
                      const=logging.DEBUG, help='enable debugging output')
    parser.add_option('-v', '--verbose', action='store_const', dest='loglevel',
                      const=logging.INFO, help='print as much as possible')
    parser.add_option('-q', '--quiet', action='store_const', dest='loglevel',
                      const=logging.ERROR, help='print as little as possible')
    parser.set_defaults(port=7633, loglevel=logging.WARNING)
    options, args = parser.parse_args()

    if len(args) < 1:
        parser.error('incorrect number of arguments')
    env_path = args[0]

    # Configure logging
    log = logging.getLogger('bitten')
    log.setLevel(options.loglevel)
    handler = logging.StreamHandler()
    if options.logfile:
        handler.setLevel(logging.WARNING)
    else:
        handler.setLevel(options.loglevel)
    formatter = logging.Formatter('%(message)s')
    handler.setFormatter(formatter)
    log.addHandler(handler)
    if options.logfile:
        handler = logging.FileHandler(options.logfile)
        handler.setLevel(options.loglevel)
        formatter = logging.Formatter('%(asctime)s [%(name)s] %(levelname)s: '
                                      '%(message)s')
        handler.setFormatter(formatter)
        log.addHandler(handler)

    port = options.port
    if not (1 <= port <= 65535):
        parser.error('port must be an integer in the range 1-65535')

    host = options.host
    if not host:
        import socket
        ip = socket.gethostbyname(socket.gethostname())
        try:
            host = socket.gethostbyaddr(ip)[0]
        except socket.error, e:
            log.warning('Reverse host name lookup failed (%s)', e)
            host = ip

    master = Master(env_path, host, port, check_interval=options.interval)
    try:
        master.run(timeout=5.0)
    except KeyboardInterrupt:
        master.quit()

if __name__ == '__main__':
    main()
Copyright (C) 2012-2017 Edgewall Software