Mercurial > bitten > bitten-test
diff bitten/master.py @ 47:083e848088ee
* Improvements to the model classes, and a couple of unit tests.
* The build master now stores information about ongoing builds in the Trac database.
* The web interface displays the status of ongoing builds.
author | cmlenz |
---|---|
date | Fri, 24 Jun 2005 15:35:23 +0000 |
parents | 80bc0fae3ed1 |
children | 757aa3bf9594 |
line wrap: on
line diff
--- a/bitten/master.py +++ b/bitten/master.py @@ -20,6 +20,7 @@ import logging import os.path +import time from trac.env import Environment from bitten import __version__ as VERSION @@ -34,16 +35,23 @@ def __init__(self, env_path, ip, port): beep.Listener.__init__(self, ip, port) self.profiles[OrchestrationProfileHandler.URI] = OrchestrationProfileHandler - self.env = Environment(env_path) + self.slaves = {} + + # path to generated snapshot archives, key is (config name, revision) + self.snapshots = {} + self.schedule(self.TRIGGER_INTERVAL, self._check_build_triggers) - self.build_queue = {} + + def close(self): + # Remove all pending builds + for build in Build.select(self.env, status=Build.PENDING): + build.delete() + beep.Listener.close(self) def _check_build_triggers(self, master, when): self.schedule(self.TRIGGER_INTERVAL, self._check_build_triggers) - if not self.slaves: - return logging.debug('Checking for build triggers...') repos = self.env.get_repository() @@ -52,36 +60,41 @@ for config in BuildConfig.select(self.env): node = repos.get_node(config.path) - if (node.path, node.rev) in self.build_queue: - # Builds already pending - continue # Check whether the latest revision of that configuration has # already been built - builds = list(Build.select(self.env, node.path, node.rev)) - if not builds: - logging.info('Enqueuing build of configuration "%s" as of revision [%s]', - config.name, node.rev) + builds = Build.select(self.env, config.name, node.rev) + if not list(builds): snapshot = archive.make_archive(self.env, repos, node.path, node.rev, config.name) logging.info('Created snapshot archive at %s' % snapshot) - self.build_queue[(node.path, node.rev)] = (config, snapshot) + self.snapshots[(config.name, str(node.rev))] = snapshot + + logging.info('Enqueuing build of configuration "%s" as of revision [%s]', + config.name, node.rev) + build = Build(self.env) + build.config = config.name + build.rev = node.rev + build.insert() finally: repos.close() - if self.build_queue: - self.schedule(5, self._check_build_queue) + self.schedule(5, self._check_build_queue) def _check_build_queue(self, master, when): - if self.build_queue: - for path, rev in self.build_queue.keys(): - config, snapshot = self.build_queue[(path, rev)] - logging.info('Building configuration "%s" as of revision [%s]', - config.name, rev) - for slave in self.slaves.values(): - if not slave.building: - slave.send_build(snapshot) - break + if not self.slaves: + return + logging.info('Checking for pending builds...') + for build in Build.select(self.env, status=Build.PENDING): + logging.info('Building configuration "%s" as of revision [%s]', + build.config, build.rev) + snapshot = self.snapshots[(build.config, build.rev)] + for slave in self.slaves.values(): + active_builds = Build.select(self.env, slave=slave.name, + status=Build.IN_PROGRESS) + if not list(active_builds): + slave.send_build(build, snapshot) + break class OrchestrationProfileHandler(beep.ProfileHandler): @@ -94,11 +107,21 @@ self.master = self.session.listener assert self.master self.building = False - self.slave_name = None + self.name = None def handle_disconnect(self): - del self.master.slaves[self.slave_name] - logging.info('Unregistered slave "%s"', self.slave_name) + del self.master.slaves[self.name] + logging.info('Unregistered slave "%s"', self.name) + if self.building: + for build in Build.select(self.master.env, slave=self.name, + status=Build.IN_PROGRESS): + logging.info('Build [%s] of "%s" by %s cancelled', build.rev, + build.config, self.name) + build.slave = None + build.status = Build.PENDING + build.time = None + build.update() + break def handle_msg(self, msgno, msg): assert msg.get_content_type() == beep.BEEP_XML @@ -114,16 +137,16 @@ os_family = child.family os_version = child.version - self.slave_name = elem.name - self.master.slaves[self.slave_name] = self + self.name = elem.name + self.master.slaves[self.name] = self xml = xmlio.Element('ok') self.channel.send_rpy(msgno, beep.MIMEMessage(xml)) logging.info('Registered slave "%s" (%s running %s %s [%s])', - self.slave_name, platform, os, os_version, os_family) + self.name, platform, os, os_version, os_family) - def send_build(self, archive_path, handle_reply=None): - logging.info('Initiating build on slave %s', self.slave_name) + def send_build(self, build, snapshot_path, handle_reply=None): + logging.info('Initiating build on slave %s', self.name) self.building = True def handle_reply(cmd, msgno, msg): @@ -133,12 +156,18 @@ if elem.tagname == 'error': logging.warning('Slave refused build request: %s (%d)', elem.gettext(), int(elem.code)) + build.slave = self.name + build.time = int(time.time()) + build.status = Build.IN_PROGRESS + build.update() logging.info('Build started') - archive_name = os.path.basename(archive_path) - message = beep.MIMEMessage(file(archive_path).read(), + # TODO: should not block while reading the file; rather stream it using + # asyncore push_with_producer() + snapshot_name = os.path.basename(snapshot_path) + message = beep.MIMEMessage(file(snapshot_path).read(), content_type='application/tar', - content_disposition=archive_name, + content_disposition=snapshot_name, content_encoding='gzip') self.channel.send_msg(message, handle_reply=handle_reply)