diff bitten/master.py @ 47:083e848088ee

* Improvements to the model classes, and a couple of unit tests. * The build master now stores information about ongoing builds in the Trac database. * The web interface displays the status of ongoing builds.
author cmlenz
date Fri, 24 Jun 2005 15:35:23 +0000
parents 80bc0fae3ed1
children 757aa3bf9594
line wrap: on
line diff
--- a/bitten/master.py
+++ b/bitten/master.py
@@ -20,6 +20,7 @@
 
 import logging
 import os.path
+import time
 
 from trac.env import Environment
 from bitten import __version__ as VERSION
@@ -34,16 +35,23 @@
     def __init__(self, env_path, ip, port):
         beep.Listener.__init__(self, ip, port)
         self.profiles[OrchestrationProfileHandler.URI] = OrchestrationProfileHandler
-
         self.env = Environment(env_path)
+
         self.slaves = {}
+
+        # path to generated snapshot archives, key is (config name, revision)
+        self.snapshots = {}
+
         self.schedule(self.TRIGGER_INTERVAL, self._check_build_triggers)
-        self.build_queue = {}
+
+    def close(self):
+        # Remove all pending builds
+        for build in Build.select(self.env, status=Build.PENDING):
+            build.delete()
+        beep.Listener.close(self)
 
     def _check_build_triggers(self, master, when):
         self.schedule(self.TRIGGER_INTERVAL, self._check_build_triggers)
-        if not self.slaves:
-            return
 
         logging.debug('Checking for build triggers...')
         repos = self.env.get_repository()
@@ -52,36 +60,41 @@
 
             for config in BuildConfig.select(self.env):
                 node = repos.get_node(config.path)
-                if (node.path, node.rev) in self.build_queue:
-                    # Builds already pending
-                    continue
 
                 # Check whether the latest revision of that configuration has
                 # already been built
-                builds = list(Build.select(self.env, node.path, node.rev))
-                if not builds:
-                    logging.info('Enqueuing build of configuration "%s" as of revision [%s]',
-                                 config.name, node.rev)
+                builds = Build.select(self.env, config.name, node.rev)
+                if not list(builds):
                     snapshot = archive.make_archive(self.env, repos, node.path,
                                                     node.rev, config.name)
                     logging.info('Created snapshot archive at %s' % snapshot)
-                    self.build_queue[(node.path, node.rev)] = (config, snapshot)
+                    self.snapshots[(config.name, str(node.rev))] = snapshot
+
+                    logging.info('Enqueuing build of configuration "%s" as of revision [%s]',
+                                 config.name, node.rev)
+                    build = Build(self.env)
+                    build.config = config.name
+                    build.rev = node.rev
+                    build.insert()
         finally:
             repos.close()
 
-        if self.build_queue:
-            self.schedule(5, self._check_build_queue)
+        self.schedule(5, self._check_build_queue)
 
     def _check_build_queue(self, master, when):
-        if self.build_queue:
-            for path, rev in self.build_queue.keys():
-                config, snapshot = self.build_queue[(path, rev)]
-                logging.info('Building configuration "%s" as of revision [%s]',
-                             config.name, rev)
-                for slave in self.slaves.values():
-                    if not slave.building:
-                        slave.send_build(snapshot)
-                        break
+        if not self.slaves:
+            return
+        logging.info('Checking for pending builds...')
+        for build in Build.select(self.env, status=Build.PENDING):
+            logging.info('Building configuration "%s" as of revision [%s]',
+                         build.config, build.rev)
+            snapshot = self.snapshots[(build.config, build.rev)]
+            for slave in self.slaves.values():
+                active_builds = Build.select(self.env, slave=slave.name,
+                                             status=Build.IN_PROGRESS)
+                if not list(active_builds):
+                    slave.send_build(build, snapshot)
+                    break
 
 
 class OrchestrationProfileHandler(beep.ProfileHandler):
@@ -94,11 +107,21 @@
         self.master = self.session.listener
         assert self.master
         self.building = False
-        self.slave_name = None
+        self.name = None
 
     def handle_disconnect(self):
-        del self.master.slaves[self.slave_name]
-        logging.info('Unregistered slave "%s"', self.slave_name)
+        del self.master.slaves[self.name]
+        logging.info('Unregistered slave "%s"', self.name)
+        if self.building:
+            for build in Build.select(self.master.env, slave=self.name,
+                                      status=Build.IN_PROGRESS):
+                logging.info('Build [%s] of "%s" by %s cancelled', build.rev,
+                             build.config, self.name)
+                build.slave = None
+                build.status = Build.PENDING
+                build.time = None
+                build.update()
+                break
 
     def handle_msg(self, msgno, msg):
         assert msg.get_content_type() == beep.BEEP_XML
@@ -114,16 +137,16 @@
                     os_family = child.family
                     os_version = child.version
 
-            self.slave_name = elem.name
-            self.master.slaves[self.slave_name] = self
+            self.name = elem.name
+            self.master.slaves[self.name] = self
 
             xml = xmlio.Element('ok')
             self.channel.send_rpy(msgno, beep.MIMEMessage(xml))
             logging.info('Registered slave "%s" (%s running %s %s [%s])',
-                         self.slave_name, platform, os, os_version, os_family)
+                         self.name, platform, os, os_version, os_family)
 
-    def send_build(self, archive_path, handle_reply=None):
-        logging.info('Initiating build on slave %s', self.slave_name)
+    def send_build(self, build, snapshot_path, handle_reply=None):
+        logging.info('Initiating build on slave %s', self.name)
         self.building = True
 
         def handle_reply(cmd, msgno, msg):
@@ -133,12 +156,18 @@
                     if elem.tagname == 'error':
                         logging.warning('Slave refused build request: %s (%d)',
                                         elem.gettext(), int(elem.code))
+            build.slave = self.name
+            build.time = int(time.time())
+            build.status = Build.IN_PROGRESS
+            build.update()
             logging.info('Build started')
 
-        archive_name = os.path.basename(archive_path)
-        message = beep.MIMEMessage(file(archive_path).read(),
+        # TODO: should not block while reading the file; rather stream it using
+        #       asyncore push_with_producer()
+        snapshot_name = os.path.basename(snapshot_path)
+        message = beep.MIMEMessage(file(snapshot_path).read(),
                                    content_type='application/tar',
-                                   content_disposition=archive_name,
+                                   content_disposition=snapshot_name,
                                    content_encoding='gzip')
         self.channel.send_msg(message, handle_reply=handle_reply)
 
Copyright (C) 2012-2017 Edgewall Software