# HG changeset patch # User cmlenz # Date 1119558954 0 # Node ID efa525876b1e2fb6f3b20e05f4856bc00a8f8dbe # Parent 16b30ffc5fb995f699c1164fbbf40b38b8efb356 Basic infrastructure for transmission of snapshot archives to build slaves. See #8. diff --git a/bitten/master.py b/bitten/master.py --- a/bitten/master.py +++ b/bitten/master.py @@ -19,10 +19,12 @@ # Author: Christopher Lenz import logging +import os.path from trac.env import Environment from bitten import __version__ as VERSION -from bitten.util import beep, xmlio +from bitten.model import Build, Configuration +from bitten.util import archive, beep, xmlio class Master(beep.Listener): @@ -34,20 +36,52 @@ self.profiles[OrchestrationProfileHandler.URI] = OrchestrationProfileHandler self.env = Environment(env_path) - self.youngest_rev = None self.slaves = {} - self.schedule(self.TRIGGER_INTERVAL, self.check_trigger) + self.schedule(self.TRIGGER_INTERVAL, self._check_build_triggers) + self.build_queue = {} - def check_trigger(self, master, when): + def _check_build_triggers(self, master, when): + self.schedule(self.TRIGGER_INTERVAL, self._check_build_triggers) + if not self.slaves: + return + logging.debug('Checking for build triggers...') repos = self.env.get_repository() - repos.sync() - if repos.youngest_rev != self.youngest_rev: - logging.info('New changeset detected: [%s]' - % repos.youngest_rev) - self.youngest_rev = repos.youngest_rev - repos.close() - self.schedule(self.TRIGGER_INTERVAL, self.check_trigger) + try: + repos.sync() + + for config in Configuration.select(self.env): + node = repos.get_node(config.path) + if (node.path, node.rev) in self.build_queue: + # Builds already pending + continue + + # Check whether the latest revision of that configuration has + # already been built + builds = list(Build.select(self.env, node.path, node.rev)) + if not builds: + logging.info('Enqueuing build of configuration "%s" as of revision [%s]', + config.name, node.rev) + snapshot = archive.make_archive(self.env, repos, node.path, + node.rev, config.name) + logging.info('Created snapshot archive at %s' % snapshot) + self.build_queue[(node.path, node.rev)] = (config, snapshot) + finally: + repos.close() + + if self.build_queue: + self.schedule(5, self._check_build_queue) + + def _check_build_queue(self, master, when): + if self.build_queue: + for path, rev in self.build_queue.keys(): + config, snapshot = self.build_queue[(path, rev)] + logging.info('Building configuration "%s" as of revision [%s]', + config.name, rev) + for slave in self.slaves.values(): + if not slave.building: + slave.send_build(snapshot) + break class OrchestrationProfileHandler(beep.ProfileHandler): @@ -59,6 +93,7 @@ def handle_connect(self, init_elem=None): self.master = self.session.listener assert self.master + self.building = False self.slave_name = None def handle_disconnect(self): @@ -87,6 +122,26 @@ logging.info('Registered slave "%s" (%s running %s %s [%s])', self.slave_name, platform, os, os_version, os_family) + def send_build(self, archive_path, handle_reply=None): + logging.info('Initiating build on slave %s', self.slave_name) + self.building = True + + def handle_reply(cmd, msgno, msg): + if cmd == 'ERR': + if msg.get_content_type() == beep.BEEP_XML: + elem = xmlio.parse(msg.get_payload()) + if elem.tagname == 'error': + logging.warning('Slave refused build request: %s (%d)', + elem.gettext(), int(elem.code)) + logging.info('Build started') + + archive_name = os.path.basename(archive_path) + message = beep.MIMEMessage(file(archive_path).read(), + content_type='application/tar', + content_disposition=archive_name, + content_encoding='gzip') + self.channel.send_msg(message, handle_reply=handle_reply) + def main(): from optparse import OptionParser @@ -127,7 +182,7 @@ master = Master(env_path, host, port) try: - master.run() + master.run(timeout=5.0) except KeyboardInterrupt: master.quit() diff --git a/bitten/slave.py b/bitten/slave.py --- a/bitten/slave.py +++ b/bitten/slave.py @@ -21,6 +21,7 @@ import logging import os import sys +import tempfile import time from bitten import __version__ as VERSION @@ -63,8 +64,23 @@ self.channel.send_msg(beep.MIMEMessage(xml), handle_reply) def handle_msg(self, msgno, msg): - # TODO: Handle build initiation requests etc - pass + if msg.get_content_type() == in ('application/tar', 'application/zip'): + logging.info('Received snapshot') + workdir = tempfile.mkdtemp(prefix='bitten') + archive_name = msg.get('Content-Disposition', 'snapshot.tar.gz') + archive_path = os.path.join(workdir, archive_name) + file(archive_path, 'wb').write(msg.get_payload()) + logging.info('Stored snapshot archive at %s', archive_path) + + # TODO: Spawn the build process + + xml = xmlio.Element('ok') + self.channel.send_rpy(msgno, beep.MIMEMessage(xml)) + logging.info('Sent in reply to build request') + + else: + xml = xmlio.Element('error', code=500)['Sorry, what?'] + self.channel.send_err(msgno, beep.MIMEMessage(xml)) def main():