changeset 227:014bc6c29dff

* Factor build queue logic into a class separate from the build master. * Support for multiple Trac environments. * Unit tests for the build queue.
author cmlenz
date Tue, 27 Sep 2005 23:14:48 +0000
parents 86836ec604f8
children a8c9dd7e3f71
files bitten/master.py bitten/model.py bitten/queue.py bitten/tests/__init__.py bitten/tests/queue.py
diffstat 5 files changed, 501 insertions(+), 196 deletions(-) [+]
line wrap: on
line diff
--- a/bitten/master.py
+++ b/bitten/master.py
@@ -8,19 +8,19 @@
 # are also available at http://bitten.cmlenz.net/wiki/License.
 
 from datetime import datetime, timedelta
-from itertools import ifilter
 import logging
-import os.path
-import re
+import os
 try:
     set
 except NameError:
     from sets import Set as set
+import sys
 import time
 
 from trac.env import Environment
-from bitten.model import BuildConfig, TargetPlatform, Build, BuildStep, \
-                         BuildLog, Report
+from bitten.model import BuildConfig, Build, BuildStep, BuildLog, Report
+from bitten.queue import BuildQueue
+from bitten.trac_ext.main import BuildSystem
 from bitten.util import archive, beep, xmlio
 
 log = logging.getLogger('bitten.master')
@@ -30,189 +30,78 @@
 
 class Master(beep.Listener):
 
-    def __init__(self, env_path, ip, port, adjust_timestamps=False,
+    def __init__(self, envs, ip, port, adjust_timestamps=False,
                  check_interval=DEFAULT_CHECK_INTERVAL):
         beep.Listener.__init__(self, ip, port)
-        self.profiles[OrchestrationProfileHandler.URI] = OrchestrationProfileHandler
-        self.env = Environment(env_path)
+        self.profiles[OrchestrationProfileHandler.URI] = \
+                OrchestrationProfileHandler
         self.adjust_timestamps = adjust_timestamps
         self.check_interval = check_interval
-
-        self.slaves = {}
+        self.handlers = {} # Map of connected slaves keyed by name
 
-        # path to generated snapshot archives, key is (config name, revision)
-        self.snapshots = {}
-        for config in BuildConfig.select(self.env):
-            snapshots = archive.index(self.env, prefix=config.name)
-            for rev, format, path in snapshots:
-                self.snapshots[(config.name, rev, format)] = path
+        self.queues = []
+        for env in envs:
+            self.queues.append(BuildQueue(env))
 
-        self._cleanup_orphaned_builds()
-        self.schedule(self.check_interval, self._check_build_triggers)
+        self.schedule(self.check_interval, self._enqueue_builds)
 
     def close(self):
-        self._cleanup_orphaned_builds()
+        for queue in self.queues:
+            queue.reset_orphaned_builds()
         beep.Listener.close(self)
 
-    def _check_build_triggers(self, when):
-        self.schedule(self.check_interval, self._check_build_triggers)
-
-        repos = self.env.get_repository()
-        try:
-            repos.sync()
-
-            db = self.env.get_db_cnx()
-            for config in BuildConfig.select(self.env, db=db):
-                log.debug('Checking for changes to "%s" at %s', config.label,
-                          config.path)
-                node = repos.get_node(config.path)
-                for path, rev, chg in node.get_history():
-
-                    # Don't follow moves/copies
-                    if path != repos.normalize_path(config.path):
-                        break
-
-                    # Make sure the repository directory isn't empty at this
-                    # revision
-                    old_node = repos.get_node(path, rev)
-                    is_empty = True
-                    for entry in old_node.get_entries():
-                        is_empty = False
-                        break
-                    if is_empty:
-                        continue
-
-                    enqueued = False
-                    for platform in TargetPlatform.select(self.env,
-                                                          config.name, db=db):
-                        # Check whether the latest revision of the configuration
-                        # has already been built on this platform
-                        builds = Build.select(self.env, config.name, rev,
-                                              platform.id, db=db)
-                        if not list(builds):
-                            log.info('Enqueuing build of configuration "%s" at '
-                                     'revision [%s] on %s', config.name, rev,
-                                     platform.name)
-                            build = Build(self.env)
-                            build.config = config.name
-                            build.rev = str(rev)
-                            build.rev_time = repos.get_changeset(rev).date
-                            build.platform = platform.id
-                            build.insert(db)
-                            enqueued = True
-                    if enqueued:
-                        db.commit()
-                        break
-        finally:
-            repos.close()
+    def _cleanup(self, when):
+        for queue in self.queues:
+            queue.remove_unused_snapshots()
 
-        self.schedule(self.check_interval * 0.2, self._check_build_queue)
-        self.schedule(self.check_interval * 1.8, self._cleanup_snapshots)
-
-    def _check_build_queue(self, when):
-        if not self.slaves:
-            return
-        log.debug('Checking for pending builds...')
-        for build in Build.select(self.env, status=Build.PENDING):
-            config = BuildConfig.fetch(self.env, name=build.config)
-            if not config.active:
-                continue
-            for slave in self.slaves.get(build.platform, []):
-                active_builds = Build.select(self.env, slave=slave.name,
-                                             status=Build.IN_PROGRESS)
-                if not list(active_builds):
-                    slave.send_initiation(build)
-                    return
+    def _enqueue_builds(self, when):
+        self.schedule(self.check_interval, self._enqueue_builds)
 
-    def _cleanup_orphaned_builds(self):
-        # Reset all in-progress builds
-        db = self.env.get_db_cnx()
-        for build in Build.select(self.env, status=Build.IN_PROGRESS, db=db):
-            build.status = Build.PENDING
-            build.slave = None
-            build.slave_info = {}
-            build.started = 0
-            for step in list(BuildStep.select(self.env, build=build.id, db=db)):
-                step.delete(db=db)
-            build.update(db=db)
-        db.commit()
+        for queue in self.queues:
+            queue.populate()
 
-    def _cleanup_snapshots(self, when):
-        log.debug('Checking for unused snapshot archives...')
-        for (config, rev, format), path in self.snapshots.items():
-            keep = False
-            for build in Build.select(self.env, config=config, rev=rev):
-                if build.status not in (Build.SUCCESS, Build.FAILURE):
-                    keep = True
-                    break
-            if not keep:
-                log.info('Removing unused snapshot %s', path)
-                os.unlink(path)
-                del self.snapshots[(config, rev, format)]
+        self.schedule(self.check_interval * 0.2, self._initiate_builds)
+        self.schedule(self.check_interval * 1.8, self._cleanup)
 
-    def get_snapshot(self, build, format):
-        snapshot = self.snapshots.get((build.config, build.rev, format))
-        if not snapshot:
-            config = BuildConfig.fetch(self.env, build.config)
-            snapshot = archive.pack(self.env, path=config.path, rev=build.rev,
-                                    prefix=config.name, format=format)
-            log.info('Prepared snapshot archive at %s' % snapshot)
-            self.snapshots[(build.config, build.rev, format)] = snapshot
-        return snapshot
+    def _initiate_builds(self, when):
+        available_slaves = set([name for name in self.handlers
+                                if not self.handlers[name].building])
+
+        for queue in self.queues[:]:
+            build, slave = queue.get_next_pending_build(available_slaves)
+            if build:
+                self.handlers[slave].send_initiation(queue, build)
+                available_slaves.discard(slave)
+
+                # Round robin
+                self.queues.remove(queue)
+                self.queues.append(queue)
 
     def register(self, handler):
         any_match = False
-        for config in BuildConfig.select(self.env):
-            for platform in TargetPlatform.select(self.env, config=config.name):
-                if not platform.id in self.slaves:
-                    self.slaves[platform.id] = set()
-                match = True
-                for propname, pattern in ifilter(None, platform.rules):
-                    try:
-                        propvalue = handler.info.get(propname)
-                        if not propvalue or not re.match(pattern, propvalue):
-                            match = False
-                            break
-                    except re.error:
-                        log.error('Invalid platform matching pattern "%s"',
-                                  pattern, exc_info=True)
-                        match = False
-                        break
-                if match:
-                    log.debug('Slave %s matched target platform %s',
-                              handler.name, platform.name)
-                    self.slaves[platform.id].add(handler)
-                    any_match = True
+        for queue in self.queues:
+            if queue.register_slave(handler.name, handler.info):
+                any_match = True
 
         if not any_match:
             log.warning('Slave %s does not match any of the configured target '
                         'platforms', handler.name)
             return False
 
-        self.schedule(self.check_interval * 0.2, self._check_build_queue)
+        self.handlers[handler.name] = handler
+        self.schedule(self.check_interval * 0.2, self._initiate_builds)
 
         log.info('Registered slave "%s"', handler.name)
         return True
 
     def unregister(self, handler):
-        for slaves in self.slaves.values():
-            slaves.discard(handler)
+        if handler.name not in self.handlers:
+            return
 
-        db = self.env.get_db_cnx()
-        for build in Build.select(self.env, slave=handler.name,
-                                  status=Build.IN_PROGRESS, db=db):
-            log.info('Build %d ("%s" as of [%s]) cancelled by  %s', build.id,
-                     build.rev, build.config, handler.name)
-            for step in list(BuildStep.select(self.env, build=build.id)):
-                step.delete(db=db)
+        for queue in self.queues:
+            queue.unregister_slave(handler.name)
+        del self.handlers[handler.name]
 
-            build.slave = None
-            build.slave_info = {}
-            build.status = Build.PENDING
-            build.started = 0
-            build.update(db=db)
-            break
-        db.commit()
         log.info('Unregistered slave "%s"', handler.name)
 
 
@@ -225,9 +114,8 @@
     def handle_connect(self):
         self.master = self.session.listener
         assert self.master
-        self.env = self.master.env
-        assert self.env
         self.name = None
+        self.building = False
         self.info = {}
 
     def handle_disconnect(self):
@@ -264,9 +152,10 @@
             xml = xmlio.Element('ok')
             self.channel.send_rpy(msgno, beep.Payload(xml))
 
-    def send_initiation(self, build):
+    def send_initiation(self, queue, build):
         log.info('Initiating build of "%s" on slave %s', build.config,
                  self.name)
+        self.building = True
 
         def handle_reply(cmd, msgno, ansno, payload):
             if cmd == 'ERR':
@@ -276,6 +165,7 @@
                         log.warning('Slave %s refused build request: %s (%d)',
                                     self.name, elem.gettext(),
                                     int(elem.attr['code']))
+                self.building = False
                 return
 
             elem = xmlio.parse(payload.body)
@@ -294,14 +184,15 @@
                     'None of the accepted archive formats supported'
                 ]
                 self.channel.send_err(beep.Payload(xml))
+                self.building = False
                 return
-            self.send_snapshot(build, type, encoding)
+            self.send_snapshot(queue, build, type, encoding)
 
-        config = BuildConfig.fetch(self.env, build.config)
+        config = BuildConfig.fetch(queue.env, build.config)
         self.channel.send_msg(beep.Payload(config.recipe),
                               handle_reply=handle_reply)
 
-    def send_snapshot(self, build, type, encoding):
+    def send_snapshot(self, queue, build, type, encoding):
         timestamp_delta = 0
         if self.master.adjust_timestamps:
             d = datetime.now() - timedelta(seconds=self.master.check_interval) \
@@ -317,35 +208,40 @@
                     log.warning('Slave %s refused to start build: %s (%d)',
                                 self.name, elem.gettext(),
                                 int(elem.attr['code']))
+                self.building = False
 
             elif cmd == 'ANS':
                 assert payload.content_type == beep.BEEP_XML
                 elem = xmlio.parse(payload.body)
                 if elem.name == 'started':
-                    self._build_started(build, elem, timestamp_delta)
+                    self._build_started(queue, build, elem, timestamp_delta)
                 elif elem.name == 'step':
-                    self._build_step_completed(build, elem, timestamp_delta)
+                    self._build_step_completed(queue, build, elem,
+                                               timestamp_delta)
                 elif elem.name == 'completed':
-                    self._build_completed(build, elem, timestamp_delta)
+                    self._build_completed(queue, build, elem, timestamp_delta)
                 elif elem.name == 'aborted':
-                    self._build_aborted(build)
+                    self._build_aborted(queue, build)
                 elif elem.name == 'error':
                     build.status = Build.FAILURE
 
+            elif cmd == 'NUL':
+                self.building = False
+
         snapshot_format = {
             ('application/tar', 'bzip2'): 'bzip2',
             ('application/tar', 'gzip'): 'gzip',
             ('application/tar', None): 'tar',
             ('application/zip', None): 'zip',
         }[(type, encoding)]
-        snapshot_path = self.master.get_snapshot(build, snapshot_format)
+        snapshot_path = queue.get_snapshot(build, snapshot_format, create=True)
         snapshot_name = os.path.basename(snapshot_path)
-        message = beep.Payload(file(snapshot_path), content_type=type,
+        message = beep.Payload(file(snapshot_path, 'rb'), content_type=type,
                                content_disposition=snapshot_name,
                                content_encoding=encoding)
         self.channel.send_msg(message, handle_reply=handle_reply)
 
-    def _build_started(self, build, elem, timestamp_delta=None):
+    def _build_started(self, queue, build, elem, timestamp_delta=None):
         build.slave = self.name
         build.slave_info.update(self.info)
         build.started = int(_parse_iso_datetime(elem.attr['time']))
@@ -356,13 +252,13 @@
                  self.name, build.id, build.config, build.rev)
         build.update()
 
-    def _build_step_completed(self, build, elem, timestamp_delta=None):
+    def _build_step_completed(self, queue, build, elem, timestamp_delta=None):
         log.debug('Slave %s completed step "%s" with status %s', self.name,
                   elem.attr['id'], elem.attr['result'])
 
-        db = self.env.get_db_cnx()
+        db = queue.env.get_db_cnx()
 
-        step = BuildStep(self.env, build=build.id, name=elem.attr['id'],
+        step = BuildStep(queue.env, build=build.id, name=elem.attr['id'],
                          description=elem.attr.get('description'))
         step.started = int(_parse_iso_datetime(elem.attr['time']))
         step.stopped = step.started + int(elem.attr['duration'])
@@ -377,7 +273,7 @@
         step.insert(db=db)
 
         for idx, log_elem in enumerate(elem.children('log')):
-            build_log = BuildLog(self.env, build=build.id, step=step.name,
+            build_log = BuildLog(queue.env, build=build.id, step=step.name,
                                  generator=log_elem.attr.get('generator'),
                                  orderno=idx)
             for message_elem in log_elem.children('message'):
@@ -386,7 +282,7 @@
             build_log.insert(db=db)
 
         for report_elem in elem.children('report'):
-            report = Report(self.env, build=build.id, step=step.name,
+            report = Report(queue.env, build=build.id, step=step.name,
                             category=report_elem.attr.get('category'),
                             generator=report_elem.attr.get('generator'))
             for item_elem in report_elem.children():
@@ -399,7 +295,7 @@
 
         db.commit()
 
-    def _build_completed(self, build, elem, timestamp_delta=None):
+    def _build_completed(self, queue, build, elem, timestamp_delta=None):
         log.info('Slave %s completed build %d ("%s" as of [%s]) with status %s',
                  self.name, build.id, build.config, build.rev,
                  elem.attr['result'])
@@ -413,13 +309,13 @@
             build.status = Build.SUCCESS
         build.update()
 
-    def _build_aborted(self, build):
+    def _build_aborted(self, queue, build):
         log.info('Slave %s aborted build %d ("%s" as of [%s])',
                  self.name, build.id, build.config, build.rev)
 
-        db = self.env.get_db_cnx()
+        db = queue.env.get_db_cnx()
 
-        for step in list(BuildStep.select(self.env, build=build.id, db=db)):
+        for step in list(BuildStep.select(queue.env, build=build.id, db=db)):
             step.delete(db=db)
 
         build.slave = None
@@ -451,7 +347,7 @@
     from optparse import OptionParser
 
     # Parse command-line arguments
-    parser = OptionParser(usage='usage: %prog [options] env-path',
+    parser = OptionParser(usage='usage: %prog [options] ENV_PATHS',
                           version='%%prog %s' % VERSION)
     parser.add_option('-p', '--port', action='store', type='int', dest='port',
                       help='port number to use')
@@ -478,7 +374,6 @@
 
     if len(args) < 1:
         parser.error('incorrect number of arguments')
-    env_path = args[0]
 
     # Configure logging
     logger = logging.getLogger('bitten')
@@ -513,7 +408,19 @@
             log.warning('Reverse host name lookup failed (%s)', e)
             host = ip
 
-    master = Master(env_path, host, port, adjust_timestamps=options.timewarp,
+    envs = []
+    for arg in args:
+        env = Environment(arg)
+        if BuildSystem(env):
+            if env.needs_upgrade():
+                log.warning('Environment at %s needs to be upgraded', env.path)
+                continue
+            envs.append(env)
+    if not envs:
+        log.error('None of the specified environments has support for Bitten')
+        sys.exit(2)
+
+    master = Master(envs, host, port, adjust_timestamps=options.timewarp,
                     check_interval=options.interval)
     try:
         master.run(timeout=5.0)
--- a/bitten/model.py
+++ b/bitten/model.py
@@ -181,14 +181,14 @@
         ]
     ]
 
-    def __init__(self, env, id=None, config=None, name=None):
+    def __init__(self, env, config=None, name=None):
         """Initialize a new target platform with the specified attributes.
 
         To actually create this platform in the database, the `insert` method
         needs to be called.
         """
         self.env = env
-        self.id = id
+        self.id = None
         self.config = config
         self.name = name
         self.rules = []
@@ -272,7 +272,8 @@
         if not row:
             return None
 
-        platform = TargetPlatform(env, id=id, config=row[0], name=row[1])
+        platform = TargetPlatform(env, config=row[0], name=row[1])
+        platform.id = id
         cursor.execute("SELECT propname,pattern FROM bitten_rule "
                        "WHERE id=%s ORDER BY orderno", (id,))
         for propname, pattern in cursor:
@@ -336,25 +337,24 @@
     MACHINE = 'machine'
     PROCESSOR = 'processor'
 
-    def __init__(self, env, id=None, config=None, rev=None, platform=None,
-                 slave=None, started=0, stopped=0, rev_time=0,
-                 status=PENDING):
+    def __init__(self, env, config=None, rev=None, platform=None, slave=None,
+                 started=0, stopped=0, rev_time=0, status=PENDING):
         """Initialize a new build with the specified attributes.
 
         To actually create this build in the database, the `insert` method needs
         to be called.
         """
         self.env = env
-        self.slave_info = {}
-        self.id = id
+        self.id = None
         self.config = config
-        self.rev = rev
+        self.rev = rev and str(rev) or None
         self.platform = platform
         self.slave = slave
         self.started = started or 0
         self.stopped = stopped or 0
         self.rev_time = rev_time
         self.status = status
+        self.slave_info = {}
 
     exists = property(fget=lambda self: self.id is not None)
     completed = property(fget=lambda self: self.status != Build.IN_PROGRESS)
@@ -447,10 +447,11 @@
         if not row:
             return None
 
-        build = Build(env, id=int(id), config=row[0], rev=row[1],
-                      rev_time=int(row[2]), platform=int(row[3]),
-                      slave=row[4], started=row[5] and int(row[5]) or 0,
+        build = Build(env, config=row[0], rev=row[1], rev_time=int(row[2]),
+                      platform=int(row[3]), slave=row[4],
+                      started=row[5] and int(row[5]) or 0,
                       stopped=row[6] and int(row[6]) or 0, status=row[7])
+        build.id = int(id)
         cursor.execute("SELECT propname,propvalue FROM bitten_slave "
                        "WHERE build=%s", (id,))
         for propname, propvalue in cursor:
new file mode 100644
--- /dev/null
+++ b/bitten/queue.py
@@ -0,0 +1,204 @@
+# -*- coding: iso8859-1 -*-
+#
+# Copyright (C) 2005 Christopher Lenz <cmlenz@gmx.de>
+# All rights reserved.
+#
+# This software is licensed as described in the file COPYING, which
+# you should have received as part of this distribution. The terms
+# are also available at http://bitten.cmlenz.net/wiki/License.
+
+from itertools import ifilter
+import logging
+import os
+import re
+
+from bitten.model import BuildConfig, TargetPlatform, Build, BuildStep
+from bitten.util import archive
+
+log = logging.getLogger('bitten.queue')
+
+
+class BuildQueue(object):
+    """Enapsulates the build queue of an environment."""
+
+    def __init__(self, env):
+        self.env = env
+        self.slaves = {} # Sets of slave names keyed by target platform ID
+
+        # path to generated snapshot archives, key is (config name, revision)
+        self.snapshots = {}
+        for config in BuildConfig.select(self.env):
+            snapshots = archive.index(self.env, prefix=config.name)
+            for rev, format, path in snapshots:
+                self.snapshots[(config.name, rev, format)] = path
+
+        self.reset_orphaned_builds()
+        self.remove_unused_snapshots()
+
+    # Build scheduling
+
+    def get_next_pending_build(self, available_slaves):
+        """Check whether one of the pending builds can be built by one of the
+        available build slaves.
+        
+        If such a build is found, this method returns a `(build, slave)` tuple,
+        where `build` is the `Build` object and `slave` is the name of the
+        build slave.
+
+        Otherwise, this function will return `(None, None)`.
+        """
+        log.debug('Checking for pending builds...')
+
+        for build in Build.select(self.env, status=Build.PENDING):
+
+            # Ignore pending builds for deactived build configs
+            config = BuildConfig.fetch(self.env, name=build.config)
+            if not config.active:
+                continue
+
+            # Find a slave for the build platform that is not already building
+            # something else
+            slaves = self.slaves.get(build.platform, [])
+            for slave in [name for name in slaves if name in available_slaves]:
+                slaves.remove(slave)
+                slaves.append(slave)
+                return build, slave
+
+        return None, None
+
+    def populate(self):
+        repos = self.env.get_repository()
+        try:
+            repos.sync()
+
+            db = self.env.get_db_cnx()
+            for config in BuildConfig.select(self.env, db=db):
+                log.debug('Checking for changes to "%s" at %s', config.label,
+                          config.path)
+                node = repos.get_node(config.path)
+                for path, rev, chg in node.get_history():
+
+                    # Don't follow moves/copies
+                    if path != repos.normalize_path(config.path):
+                        break
+
+                    # Make sure the repository directory isn't empty at this
+                    # revision
+                    old_node = repos.get_node(path, rev)
+                    is_empty = True
+                    for entry in old_node.get_entries():
+                        is_empty = False
+                        break
+                    if is_empty:
+                        continue
+
+                    enqueued = False
+                    for platform in TargetPlatform.select(self.env,
+                                                          config.name, db=db):
+                        # Check whether this revision of the configuration has
+                        # already been built on this platform
+                        builds = Build.select(self.env, config.name, rev,
+                                              platform.id, db=db)
+                        if not list(builds):
+                            log.info('Enqueuing build of configuration "%s" at '
+                                     'revision [%s] on %s', config.name, rev,
+                                     platform.name)
+                            build = Build(self.env)
+                            build.config = config.name
+                            build.rev = str(rev)
+                            build.rev_time = repos.get_changeset(rev).date
+                            build.platform = platform.id
+                            build.insert(db)
+                            enqueued = True
+                    if enqueued:
+                        db.commit()
+                        break
+        finally:
+            repos.close()
+
+    def reset_orphaned_builds(self):
+        # Reset all in-progress builds
+        db = self.env.get_db_cnx()
+        for build in Build.select(self.env, status=Build.IN_PROGRESS, db=db):
+            build.status = Build.PENDING
+            build.slave = None
+            build.slave_info = {}
+            build.started = 0
+            for step in list(BuildStep.select(self.env, build=build.id, db=db)):
+                step.delete(db=db)
+            build.update(db=db)
+        db.commit()
+
+    # Snapshot management
+
+    def get_snapshot(self, build, format, create=False):
+        snapshot = self.snapshots.get((build.config, build.rev, format))
+        if create and snapshot is None:
+            config = BuildConfig.fetch(self.env, build.config)
+            snapshot = archive.pack(self.env, path=config.path, rev=build.rev,
+                                    prefix=config.name, format=format)
+            log.info('Prepared snapshot archive at %s' % snapshot)
+            self.snapshots[(build.config, build.rev, format)] = snapshot
+        return snapshot
+
+    def remove_unused_snapshots(self):
+        log.debug('Checking for unused snapshot archives...')
+        for (config, rev, format), path in self.snapshots.items():
+            keep = False
+            for build in Build.select(self.env, config=config, rev=rev):
+                if build.status not in (Build.SUCCESS, Build.FAILURE):
+                    keep = True
+                    break
+            if not keep:
+                log.info('Removing unused snapshot %s', path)
+                os.remove(path)
+                del self.snapshots[(config, rev, format)]
+
+    # Slave registry
+
+    def register_slave(self, name, properties):
+        any_match = False
+        for config in BuildConfig.select(self.env):
+            for platform in TargetPlatform.select(self.env, config=config.name):
+                if not platform.id in self.slaves:
+                    self.slaves[platform.id] = []
+                match = True
+                for propname, pattern in ifilter(None, platform.rules):
+                    try:
+                        propvalue = properties.get(propname)
+                        if not propvalue or not re.match(pattern, propvalue):
+                            match = False
+                            break
+                    except re.error:
+                        log.error('Invalid platform matching pattern "%s"',
+                                  pattern, exc_info=True)
+                        match = False
+                        break
+                if match:
+                    log.debug('Slave %s matched target platform "%s"', name,
+                              platform.name)
+                    self.slaves[platform.id].append(name)
+                    any_match = True
+        return any_match
+
+    def unregister_slave(self, name):
+        for slaves in self.slaves.values():
+            if name in slaves:
+                slaves.remove(name)
+
+        db = self.env.get_db_cnx()
+        for build in Build.select(self.env, slave=name,
+                                  status=Build.IN_PROGRESS, db=db):
+            log.info('Build %d ("%s" as of [%s]) cancelled by  %s', build.id,
+                     build.rev, build.config, name)
+            for step in list(BuildStep.select(self.env, build=build.id)):
+                step.delete(db=db)
+
+            build.slave = None
+            build.slave_info = {}
+            build.status = Build.PENDING
+            build.started = 0
+            build.update(db=db)
+            break
+
+        db.commit()
--- a/bitten/tests/__init__.py
+++ b/bitten/tests/__init__.py
@@ -9,7 +9,7 @@
 
 import unittest
 
-from bitten.tests import model, recipe
+from bitten.tests import model, recipe, queue
 from bitten.build import tests as build
 from bitten.util import tests as util
 from bitten.trac_ext import tests as trac_ext
@@ -18,6 +18,7 @@
     suite = unittest.TestSuite()
     suite.addTest(model.suite())
     suite.addTest(recipe.suite())
+    suite.addTest(queue.suite())
     suite.addTest(build.suite())
     suite.addTest(trac_ext.suite())
     suite.addTest(util.suite())
new file mode 100644
--- /dev/null
+++ b/bitten/tests/queue.py
@@ -0,0 +1,192 @@
+# -*- coding: iso8859-1 -*-
+#
+# Copyright (C) 2005 Christopher Lenz <cmlenz@gmx.de>
+# All rights reserved.
+#
+# This software is licensed as described in the file COPYING, which
+# you should have received as part of this distribution. The terms
+# are also available at http://bitten.cmlenz.net/wiki/License.
+
+import os
+import shutil
+import tempfile
+import unittest
+
+from trac.test import EnvironmentStub
+from bitten.model import BuildConfig, TargetPlatform, Build, BuildStep, schema
+from bitten.queue import BuildQueue
+
+
+class BuildQueueTestCase(unittest.TestCase):
+
+    def setUp(self):
+        self.env = EnvironmentStub()
+        self.env.path = tempfile.mkdtemp()
+        os.mkdir(os.path.join(self.env.path, 'snapshots'))
+        db = self.env.get_db_cnx()
+        cursor = db.cursor()
+        for table in schema:
+            for stmt in db.to_sql(table):
+                cursor.execute(stmt)
+        db.commit()
+
+    def tearDown(self):
+        shutil.rmtree(self.env.path)
+
+    def test_next_pending_build(self):
+        """
+        Make sure that a pending build of an activated configuration is
+        scheduled for a slave that matches the target platform.
+        """
+        BuildConfig(self.env, 'test', active=True).insert()
+        build = Build(self.env, config='test', platform=1, rev=123, rev_time=42,
+                      status=Build.PENDING)
+        build.insert()
+        build_id = build.id
+
+        queue = BuildQueue(self.env)
+        queue.slaves[1] = ['foobar']
+        build, slave = queue.get_next_pending_build(['foobar', 'dummy'])
+        self.assertEqual((build_id, 'foobar'), (build.id, slave))
+
+    def test_next_pending_build_no_matching_slave(self):
+        """
+        Make sure that builds for which there is no slave matching the target
+        platform are not scheduled.
+        """
+        BuildConfig(self.env, 'test', active=True).insert()
+        build = Build(self.env, config='test', platform=1, rev=123, rev_time=42,
+                      status=Build.PENDING)
+        build.insert()
+        build_id = build.id
+
+        queue = BuildQueue(self.env)
+        queue.slaves[2] = ['foobar']
+        build, slave = queue.get_next_pending_build(['foobar', 'dummy'])
+        self.assertEqual((None, None), (build, slave))
+
+    def test_next_pending_build_inactive_config(self):
+        """
+        Make sure that builds for a deactived build config are not scheduled.
+        """
+        BuildConfig(self.env, 'test').insert()
+        build = Build(self.env, config='test', platform=1, rev=123, rev_time=42,
+                      status=Build.PENDING)
+        build.insert()
+
+        queue = BuildQueue(self.env)
+        queue.slaves[1] = ['foobar']
+        build, slave = queue.get_next_pending_build(['foobar', 'dummy'])
+        self.assertEqual((None, None), (build, slave))
+
+    def test_next_pending_build_slave_round_robin(self):
+        """
+        Verify that if a slave is selected for a pending build, it is moved to
+        the end of the slave list for that target platform.
+        """
+        BuildConfig(self.env, 'test', active=True).insert()
+        build = Build(self.env, config='test', platform=1, rev=123, rev_time=42,
+                      status=Build.PENDING)
+        build.insert()
+
+        queue = BuildQueue(self.env)
+        queue.slaves[1] = ['foo', 'bar', 'baz']
+        build, slave = queue.get_next_pending_build(['foo'])
+        self.assertEqual(['bar', 'baz', 'foo'], queue.slaves[1])
+
+    def test_register_slave_match_simple(self):
+        BuildConfig(self.env, 'test', active=True).insert()
+        platform = TargetPlatform(self.env, config='test', name="Unix")
+        platform.rules.append(('family', 'posix'))
+        platform.insert()
+
+        queue = BuildQueue(self.env)
+        assert queue.register_slave('foo', {'family': 'posix'})
+        self.assertEqual(['foo'], queue.slaves[platform.id])
+
+    def test_register_slave_match_simple_fail(self):
+        BuildConfig(self.env, 'test', active=True).insert()
+        platform = TargetPlatform(self.env, config='test', name="Unix")
+        platform.rules.append(('family', 'posix'))
+        platform.insert()
+
+        queue = BuildQueue(self.env)
+        assert not queue.register_slave('foo', {'family': 'nt'})
+        self.assertEqual([], queue.slaves[platform.id])
+
+    def test_register_slave_match_regexp(self):
+        BuildConfig(self.env, 'test', active=True).insert()
+        platform = TargetPlatform(self.env, config='test', name="Unix")
+        platform.rules.append(('version', '8\.\d\.\d'))
+        platform.insert()
+
+        queue = BuildQueue(self.env)
+        assert queue.register_slave('foo', {'version': '8.2.0'})
+        self.assertEqual(['foo'], queue.slaves[platform.id])
+
+    def test_register_slave_match_regexp_fail(self):
+        BuildConfig(self.env, 'test', active=True).insert()
+        platform = TargetPlatform(self.env, config='test', name="Unix")
+        platform.rules.append(('version', '8\.\d\.\d'))
+        platform.insert()
+
+        queue = BuildQueue(self.env)
+        assert not queue.register_slave('foo', {'version': '7.8.1'})
+        self.assertEqual([], queue.slaves[platform.id])
+
+    def test_register_slave_match_regexp_invalid(self):
+        BuildConfig(self.env, 'test', active=True).insert()
+        platform = TargetPlatform(self.env, config='test', name="Unix")
+        platform.rules.append(('version', '8(\.\d'))
+        platform.insert()
+
+        queue = BuildQueue(self.env)
+        assert not queue.register_slave('foo', {'version': '7.8.1'})
+        self.assertEqual([], queue.slaves[platform.id])
+
+    def test_unregister_slave_no_builds(self):
+        queue = BuildQueue(self.env)
+        queue.slaves[1] = ['foo', 'bar']
+        queue.slaves[2] = ['baz']
+        queue.unregister_slave('bar')
+        self.assertEqual(['foo'], queue.slaves[1])
+        self.assertEqual(['baz'], queue.slaves[2])
+
+    def test_unregister_slave_in_progress_build(self):
+        build = Build(self.env, config='test', platform=1, rev=123, rev_time=42,
+                      slave='foo', status=Build.IN_PROGRESS)
+        build.insert()
+
+        queue = BuildQueue(self.env)
+        queue.slaves[1] = ['foo', 'bar']
+        queue.slaves[2] = ['baz']
+        queue.unregister_slave('bar')
+        self.assertEqual(['foo'], queue.slaves[1])
+        self.assertEqual(['baz'], queue.slaves[2])
+
+        build = Build.fetch(self.env, id=build.id)
+        self.assertEqual(Build.PENDING, build.status)
+        self.assertEqual('', build.slave)
+        self.assertEqual({}, build.slave_info)
+        self.assertEqual(0, build.started)
+
+    def test_get_existing_snapshot(self):
+        BuildConfig(self.env, 'test', active=True).insert()
+        build = Build(self.env, config='test', platform=1, rev=123, rev_time=42,
+                      status=Build.PENDING)
+        build.insert()
+        snapshot = os.path.join(self.env.path, 'snapshots', 'test_r123.zip')
+        snapshot_file = file(snapshot, 'w')
+        snapshot_file.close()
+
+        queue = BuildQueue(self.env)
+        self.assertEqual(snapshot, queue.get_snapshot(build, 'zip'))
+
+
+def suite():
+    suite = unittest.TestSuite()
+    suite.addTest(unittest.makeSuite(BuildQueueTestCase, 'test'))
+    return suite
+
+if __name__ == '__main__':
+    unittest.main(defaultTest='suite')
Copyright (C) 2012-2017 Edgewall Software