changeset 42:efa525876b1e

Basic infrastructure for transmission of snapshot archives to build slaves. See #8.
author cmlenz
date Thu, 23 Jun 2005 20:35:54 +0000
parents 16b30ffc5fb9
children 8184c7ad896c
files bitten/master.py bitten/slave.py
diffstat 2 files changed, 85 insertions(+), 14 deletions(-) [+]
line wrap: on
line diff
--- a/bitten/master.py
+++ b/bitten/master.py
@@ -19,10 +19,12 @@
 # Author: Christopher Lenz <cmlenz@gmx.de>
 
 import logging
+import os.path
 
 from trac.env import Environment
 from bitten import __version__ as VERSION
-from bitten.util import beep, xmlio
+from bitten.model import Build, Configuration
+from bitten.util import archive, beep, xmlio
 
 
 class Master(beep.Listener):
@@ -34,20 +36,52 @@
         self.profiles[OrchestrationProfileHandler.URI] = OrchestrationProfileHandler
 
         self.env = Environment(env_path)
-        self.youngest_rev = None
         self.slaves = {}
-        self.schedule(self.TRIGGER_INTERVAL, self.check_trigger)
+        self.schedule(self.TRIGGER_INTERVAL, self._check_build_triggers)
+        self.build_queue = {}
 
-    def check_trigger(self, master, when):
+    def _check_build_triggers(self, master, when):
+        self.schedule(self.TRIGGER_INTERVAL, self._check_build_triggers)
+        if not self.slaves:
+            return
+
         logging.debug('Checking for build triggers...')
         repos = self.env.get_repository()
-        repos.sync()
-        if repos.youngest_rev != self.youngest_rev:
-            logging.info('New changeset detected: [%s]'
-                          % repos.youngest_rev)
-            self.youngest_rev = repos.youngest_rev
-        repos.close()
-        self.schedule(self.TRIGGER_INTERVAL, self.check_trigger)
+        try:
+            repos.sync()
+
+            for config in Configuration.select(self.env):
+                node = repos.get_node(config.path)
+                if (node.path, node.rev) in self.build_queue:
+                    # Builds already pending
+                    continue
+
+                # Check whether the latest revision of that configuration has
+                # already been built
+                builds = list(Build.select(self.env, node.path, node.rev))
+                if not builds:
+                    logging.info('Enqueuing build of configuration "%s" as of revision [%s]',
+                                 config.name, node.rev)
+                    snapshot = archive.make_archive(self.env, repos, node.path,
+                                                    node.rev, config.name)
+                    logging.info('Created snapshot archive at %s' % snapshot)
+                    self.build_queue[(node.path, node.rev)] = (config, snapshot)
+        finally:
+            repos.close()
+
+        if self.build_queue:
+            self.schedule(5, self._check_build_queue)
+
+    def _check_build_queue(self, master, when):
+        if self.build_queue:
+            for path, rev in self.build_queue.keys():
+                config, snapshot = self.build_queue[(path, rev)]
+                logging.info('Building configuration "%s" as of revision [%s]',
+                             config.name, rev)
+                for slave in self.slaves.values():
+                    if not slave.building:
+                        slave.send_build(snapshot)
+                        break
 
 
 class OrchestrationProfileHandler(beep.ProfileHandler):
@@ -59,6 +93,7 @@
     def handle_connect(self, init_elem=None):
         self.master = self.session.listener
         assert self.master
+        self.building = False
         self.slave_name = None
 
     def handle_disconnect(self):
@@ -87,6 +122,26 @@
             logging.info('Registered slave "%s" (%s running %s %s [%s])',
                          self.slave_name, platform, os, os_version, os_family)
 
+    def send_build(self, archive_path, handle_reply=None):
+        logging.info('Initiating build on slave %s', self.slave_name)
+        self.building = True
+
+        def handle_reply(cmd, msgno, msg):
+            if cmd == 'ERR':
+                if msg.get_content_type() == beep.BEEP_XML:
+                    elem = xmlio.parse(msg.get_payload())
+                    if elem.tagname == 'error':
+                        logging.warning('Slave refused build request: %s (%d)',
+                                        elem.gettext(), int(elem.code))
+            logging.info('Build started')
+
+        archive_name = os.path.basename(archive_path)
+        message = beep.MIMEMessage(file(archive_path).read(),
+                                   content_type='application/tar',
+                                   content_disposition=archive_name,
+                                   content_encoding='gzip')
+        self.channel.send_msg(message, handle_reply=handle_reply)
+
 
 def main():
     from optparse import OptionParser
@@ -127,7 +182,7 @@
 
     master = Master(env_path, host, port)
     try:
-        master.run()
+        master.run(timeout=5.0)
     except KeyboardInterrupt:
         master.quit()
 
--- a/bitten/slave.py
+++ b/bitten/slave.py
@@ -21,6 +21,7 @@
 import logging
 import os
 import sys
+import tempfile
 import time
 
 from bitten import __version__ as VERSION
@@ -63,8 +64,23 @@
         self.channel.send_msg(beep.MIMEMessage(xml), handle_reply)
 
     def handle_msg(self, msgno, msg):
-        # TODO: Handle build initiation requests etc
-        pass
+        if msg.get_content_type() == in ('application/tar', 'application/zip'):
+            logging.info('Received snapshot')
+            workdir = tempfile.mkdtemp(prefix='bitten')
+            archive_name = msg.get('Content-Disposition', 'snapshot.tar.gz')
+            archive_path = os.path.join(workdir, archive_name)
+            file(archive_path, 'wb').write(msg.get_payload())
+            logging.info('Stored snapshot archive at %s', archive_path)
+
+            # TODO: Spawn the build process
+
+            xml = xmlio.Element('ok')
+            self.channel.send_rpy(msgno, beep.MIMEMessage(xml))
+            logging.info('Sent <ok/> in reply to build request')
+
+        else:
+            xml = xmlio.Element('error', code=500)['Sorry, what?']
+            self.channel.send_err(msgno, beep.MIMEMessage(xml))
 
 
 def main():
Copyright (C) 2012-2017 Edgewall Software