cmlenz@13: # -*- coding: iso8859-1 -*- cmlenz@13: # cmlenz@13: # Copyright (C) 2005 Christopher Lenz cmlenz@13: # cmlenz@13: # Bitten is free software; you can redistribute it and/or cmlenz@13: # modify it under the terms of the GNU General Public License as cmlenz@13: # published by the Free Software Foundation; either version 2 of the cmlenz@13: # License, or (at your option) any later version. cmlenz@13: # cmlenz@13: # Trac is distributed in the hope that it will be useful, cmlenz@13: # but WITHOUT ANY WARRANTY; without even the implied warranty of cmlenz@13: # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU cmlenz@13: # General Public License for more details. cmlenz@13: # cmlenz@13: # You should have received a copy of the GNU General Public License cmlenz@13: # along with this program; if not, write to the Free Software cmlenz@13: # Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. cmlenz@13: # cmlenz@13: # Author: Christopher Lenz cmlenz@13: cmlenz@15: import logging cmlenz@42: import os.path cmlenz@47: import time cmlenz@18: cmlenz@18: from trac.env import Environment cmlenz@19: from bitten import __version__ as VERSION cmlenz@45: from bitten.model import Build, BuildConfig cmlenz@42: from bitten.util import archive, beep, xmlio cmlenz@13: cmlenz@13: cmlenz@18: class Master(beep.Listener): cmlenz@18: cmlenz@18: TRIGGER_INTERVAL = 10 cmlenz@18: cmlenz@18: def __init__(self, env_path, ip, port): cmlenz@18: beep.Listener.__init__(self, ip, port) cmlenz@24: self.profiles[OrchestrationProfileHandler.URI] = OrchestrationProfileHandler cmlenz@18: self.env = Environment(env_path) cmlenz@47: cmlenz@18: self.slaves = {} cmlenz@47: cmlenz@47: # path to generated snapshot archives, key is (config name, revision) cmlenz@47: self.snapshots = {} cmlenz@47: cmlenz@42: self.schedule(self.TRIGGER_INTERVAL, self._check_build_triggers) cmlenz@47: cmlenz@47: def close(self): cmlenz@47: # Remove all pending builds cmlenz@47: for build in Build.select(self.env, status=Build.PENDING): cmlenz@47: build.delete() cmlenz@47: beep.Listener.close(self) cmlenz@18: cmlenz@42: def _check_build_triggers(self, master, when): cmlenz@42: self.schedule(self.TRIGGER_INTERVAL, self._check_build_triggers) cmlenz@42: cmlenz@24: logging.debug('Checking for build triggers...') cmlenz@18: repos = self.env.get_repository() cmlenz@42: try: cmlenz@42: repos.sync() cmlenz@42: cmlenz@45: for config in BuildConfig.select(self.env): cmlenz@42: node = repos.get_node(config.path) cmlenz@42: cmlenz@42: # Check whether the latest revision of that configuration has cmlenz@42: # already been built cmlenz@47: builds = Build.select(self.env, config.name, node.rev) cmlenz@47: if not list(builds): cmlenz@42: snapshot = archive.make_archive(self.env, repos, node.path, cmlenz@42: node.rev, config.name) cmlenz@42: logging.info('Created snapshot archive at %s' % snapshot) cmlenz@47: self.snapshots[(config.name, str(node.rev))] = snapshot cmlenz@47: cmlenz@47: logging.info('Enqueuing build of configuration "%s" as of revision [%s]', cmlenz@47: config.name, node.rev) cmlenz@47: build = Build(self.env) cmlenz@47: build.config = config.name cmlenz@47: build.rev = node.rev cmlenz@47: build.insert() cmlenz@42: finally: cmlenz@42: repos.close() cmlenz@42: cmlenz@47: self.schedule(5, self._check_build_queue) cmlenz@42: cmlenz@42: def _check_build_queue(self, master, when): cmlenz@47: if not self.slaves: cmlenz@47: return cmlenz@47: logging.info('Checking for pending builds...') cmlenz@47: for build in Build.select(self.env, status=Build.PENDING): cmlenz@47: logging.info('Building configuration "%s" as of revision [%s]', cmlenz@47: build.config, build.rev) cmlenz@47: snapshot = self.snapshots[(build.config, build.rev)] cmlenz@47: for slave in self.slaves.values(): cmlenz@47: active_builds = Build.select(self.env, slave=slave.name, cmlenz@47: status=Build.IN_PROGRESS) cmlenz@47: if not list(active_builds): cmlenz@47: slave.send_build(build, snapshot) cmlenz@47: break cmlenz@18: cmlenz@18: cmlenz@19: class OrchestrationProfileHandler(beep.ProfileHandler): cmlenz@19: """Handler for communication on the Bitten build orchestration profile from cmlenz@19: the perspective of the build master. cmlenz@19: """ cmlenz@19: URI = 'http://bitten.cmlenz.net/beep/orchestration' cmlenz@13: cmlenz@29: def handle_connect(self, init_elem=None): cmlenz@18: self.master = self.session.listener cmlenz@18: assert self.master cmlenz@42: self.building = False cmlenz@47: self.name = None cmlenz@24: cmlenz@24: def handle_disconnect(self): cmlenz@47: del self.master.slaves[self.name] cmlenz@47: logging.info('Unregistered slave "%s"', self.name) cmlenz@47: if self.building: cmlenz@47: for build in Build.select(self.master.env, slave=self.name, cmlenz@47: status=Build.IN_PROGRESS): cmlenz@47: logging.info('Build [%s] of "%s" by %s cancelled', build.rev, cmlenz@47: build.config, self.name) cmlenz@47: build.slave = None cmlenz@47: build.status = Build.PENDING cmlenz@47: build.time = None cmlenz@47: build.update() cmlenz@47: break cmlenz@13: cmlenz@13: def handle_msg(self, msgno, msg): cmlenz@13: assert msg.get_content_type() == beep.BEEP_XML cmlenz@29: elem = xmlio.parse(msg.get_payload()) cmlenz@13: cmlenz@13: if elem.tagname == 'register': cmlenz@13: platform, os, os_family, os_version = None, None, None, None cmlenz@13: for child in elem['*']: cmlenz@13: if child.tagname == 'platform': cmlenz@13: platform = child.gettext() cmlenz@13: elif child.tagname == 'os': cmlenz@13: os = child.gettext() cmlenz@13: os_family = child.family cmlenz@13: os_version = child.version cmlenz@14: cmlenz@47: self.name = elem.name cmlenz@47: self.master.slaves[self.name] = self cmlenz@18: cmlenz@29: xml = xmlio.Element('ok') cmlenz@29: self.channel.send_rpy(msgno, beep.MIMEMessage(xml)) cmlenz@24: logging.info('Registered slave "%s" (%s running %s %s [%s])', cmlenz@47: self.name, platform, os, os_version, os_family) cmlenz@13: cmlenz@47: def send_build(self, build, snapshot_path, handle_reply=None): cmlenz@47: logging.info('Initiating build on slave %s', self.name) cmlenz@42: self.building = True cmlenz@42: cmlenz@42: def handle_reply(cmd, msgno, msg): cmlenz@42: if cmd == 'ERR': cmlenz@42: if msg.get_content_type() == beep.BEEP_XML: cmlenz@42: elem = xmlio.parse(msg.get_payload()) cmlenz@42: if elem.tagname == 'error': cmlenz@42: logging.warning('Slave refused build request: %s (%d)', cmlenz@42: elem.gettext(), int(elem.code)) cmlenz@47: build.slave = self.name cmlenz@47: build.time = int(time.time()) cmlenz@47: build.status = Build.IN_PROGRESS cmlenz@47: build.update() cmlenz@42: logging.info('Build started') cmlenz@42: cmlenz@47: # TODO: should not block while reading the file; rather stream it using cmlenz@47: # asyncore push_with_producer() cmlenz@47: snapshot_name = os.path.basename(snapshot_path) cmlenz@47: message = beep.MIMEMessage(file(snapshot_path).read(), cmlenz@42: content_type='application/tar', cmlenz@47: content_disposition=snapshot_name, cmlenz@42: content_encoding='gzip') cmlenz@42: self.channel.send_msg(message, handle_reply=handle_reply) cmlenz@42: cmlenz@13: cmlenz@31: def main(): cmlenz@19: from optparse import OptionParser cmlenz@19: cmlenz@19: parser = OptionParser(usage='usage: %prog [options] env-path', cmlenz@19: version='%%prog %s' % VERSION) cmlenz@19: parser.add_option('-p', '--port', action='store', type='int', dest='port', cmlenz@19: help='port number to use') cmlenz@19: parser.add_option('-H', '--host', action='store', dest='host', cmlenz@33: help='the host name or IP address to bind to') cmlenz@19: parser.add_option('--debug', action='store_const', dest='loglevel', cmlenz@19: const=logging.DEBUG, help='enable debugging output') cmlenz@19: parser.add_option('-v', '--verbose', action='store_const', dest='loglevel', cmlenz@19: const=logging.INFO, help='print as much as possible') cmlenz@19: parser.add_option('-q', '--quiet', action='store_const', dest='loglevel', cmlenz@19: const=logging.ERROR, help='print as little as possible') cmlenz@19: parser.set_defaults(port=7633, loglevel=logging.WARNING) cmlenz@19: options, args = parser.parse_args() cmlenz@19: cmlenz@13: if len(args) < 1: cmlenz@19: parser.error('incorrect number of arguments') cmlenz@18: env_path = args[0] cmlenz@13: cmlenz@19: logging.getLogger().setLevel(options.loglevel) cmlenz@19: port = options.port cmlenz@19: if not (1 <= port <= 65535): cmlenz@19: parser.error('port must be an integer in the range 1-65535') cmlenz@13: cmlenz@19: host = options.host cmlenz@19: if not host: cmlenz@19: import socket cmlenz@19: ip = socket.gethostbyname(socket.gethostname()) cmlenz@19: try: cmlenz@19: host = socket.gethostbyaddr(ip)[0] cmlenz@19: except socket.error, e: cmlenz@19: logging.warning('Reverse host name lookup failed (%s)', e) cmlenz@19: host = ip cmlenz@19: cmlenz@19: master = Master(env_path, host, port) cmlenz@13: try: cmlenz@42: master.run(timeout=5.0) cmlenz@13: except KeyboardInterrupt: cmlenz@34: master.quit() cmlenz@31: cmlenz@31: if __name__ == '__main__': cmlenz@33: main()