# HG changeset patch # User wbell # Date 1272565188 0 # Node ID 38e11e15d95e07801b6f6d6bf9447fb2572ef62e # Parent 064c293c9a5ebda7c874df7a4574dd7c2dedaaac Port of [864] to 0.6.x diff --git a/bitten/__init__.py b/bitten/__init__.py --- a/bitten/__init__.py +++ b/bitten/__init__.py @@ -19,4 +19,4 @@ pass # The master-slave protocol/configuration version -PROTOCOL_VERSION = 3 +PROTOCOL_VERSION = 4 diff --git a/bitten/master.py b/bitten/master.py --- a/bitten/master.py +++ b/bitten/master.py @@ -128,6 +128,8 @@ if req.args['collection'] == 'steps': return self._process_build_step(req, config, build) + elif req.args['collection'] == 'keepalive': + return self._process_keepalive(req, config, build) else: self._send_error(req, HTTP_NOT_FOUND, "No such collection '%s'" % req.args['collection']) @@ -164,7 +166,18 @@ self._send_error(req, HTTP_BAD_REQUEST, 'XML parser error') slave_version = int(elem.attr.get('version', 1)) - if slave_version != PROTOCOL_VERSION: + + # FIXME: Remove version compatibility code. + # The initial difference between protocol version 3 and 4 is that + # the master allows keepalive requests-- the master must be + # at least 4 before slaves supporting version 4 are allowed. When + # the first force master/slave upgrade requirement comes in + # (or we bump the) version number again, remove this code. + if slave_version == 3 and PROTOCOL_VERSION == 4: + self.log.info('Allowing slave version %d to process build for ' + 'compatibility. Upgrade slave to support build ' + 'keepalives.', slave_version) + elif slave_version != PROTOCOL_VERSION: self._send_error(req, HTTP_BAD_REQUEST, "Master-Slave version mismatch: master=%d, slave=%d" % \ (PROTOCOL_VERSION, slave_version)) @@ -398,6 +411,20 @@ 'Location': req.abs_href.builds( build.id, 'steps', stepname)}) + def _process_keepalive(self, req, config, build): + build.last_activity = int(time.time()) + build.update() + + self.log.info('Slave %s build %d keepalive ("%s" as of [%s])', + build.slave, build.id, build.config, build.rev) + + body = 'Keepalive processed' + self._send_response(req, 200, body, { + 'Content-Type': 'text/plain', + 'Content-Length': str(len(body)), + 'Location': req.abs_href.builds( + build.id, 'keepalive')}) + def _start_new_step(self, build, stepname): """Creates the in-memory representation for a newly started step, ready to be persisted to the database. diff --git a/bitten/slave.py b/bitten/slave.py --- a/bitten/slave.py +++ b/bitten/slave.py @@ -24,6 +24,8 @@ import time import re import cookielib +import threading +import os from ConfigParser import MissingSectionHeaderError from bitten import PROTOCOL_VERSION @@ -71,6 +73,60 @@ self.method = self.has_data() and 'POST' or 'GET' return self.method +class KeepAliveThread(threading.Thread): + "A thread to periodically send keep-alive messages to the master" + + def __init__(self, opener, build_url, single_build, keepalive_interval): + threading.Thread.__init__(self, None, None, "KeepaliveThread") + self.build_url = build_url + self.keepalive_interval = keepalive_interval + self.single_build = single_build + self.last_keepalive = int(time.time()) + self.kill = False + self.opener = opener + + def keepalive(self): + log.debug('Sending keepalive') + method = 'POST' + url = self.build_url + '/keepalive/' + body = None + shutdown = False + headers = { + 'Content-Type': 'application/x-bitten+xml' + } + + log.debug('Sending %s request to %r', method, url) + req = SaneHTTPRequest(method, url, body, headers or {}) + try: + return self.opener.open(req) + except urllib2.HTTPError, e: + # a conflict error lets us know that we've been + # invalidated. Ideally, we'd engineer something to stop any + # running steps in progress, but killing threads is tricky + # stuff. For now, we'll wait for whatever's going + # on to stop, and the main thread'll figure out that we've + # been invalidated. + log.warning('Server returned keepalive error %d: %s', e.code, e.msg) + except: + log.warning('Server returned unknown keepalive error') + + def run(self): + log.debug('Keepalive thread starting.') + while (not self.kill): + now = int(time.time()) + if (self.last_keepalive + self.keepalive_interval) < now: + self.keepalive() + self.last_keepalive = now + + time.sleep(1) + log.debug('Keepalive thread exiting.') + + def stop(self): + log.debug('Stopping keepalive thread') + self.kill = True + self.join(30) + log.debug('Keepalive thread stopped') + class BuildSlave(object): """HTTP client implementation for the build slave.""" @@ -78,7 +134,8 @@ def __init__(self, urls, name=None, config=None, dry_run=False, work_dir=None, build_dir="build_${build}", keep_files=False, single_build=False, - poll_interval=300, username=None, password=None, + poll_interval=300, keepalive_interval = 60, + username=None, password=None, dump_reports=False, no_loop=False, form_auth=False): """Create the build slave instance. @@ -98,6 +155,8 @@ :param poll_interval: the time in seconds to wait between requesting builds from the build master (default is five minutes) + :param keep_alive_interval: the time in seconds to wait between sending + keepalive heartbeats (default is 30 seconds) :param username: the username to use when authentication against the build master is requested :param password: the password to use when authentication is needed @@ -127,6 +186,7 @@ self.single_build = single_build self.no_loop = no_loop self.poll_interval = poll_interval + self.keepalive_interval = keepalive_interval self.dump_reports = dump_reports self.cookiejar = cookielib.CookieJar() self.username = username \ @@ -169,8 +229,9 @@ ).startswith('text/plain'): content = e.read() else: - content = 'Unknown cause of error' - e.msg = '%s (%s)' % (e.msg, content) + content = 'no message available' + log.debug('Server returned error %d: %s (%s)', + e.code, e.msg, content) raise return e @@ -294,7 +355,10 @@ build_id = build_url and int(build_url.split('/')[-1]) or 0 xml = xmlio.parse(fileobj) basedir = '' + keepalive_thread = KeepAliveThread(self.opener, build_url, self.single_build, self.keepalive_interval) try: + if not self.local: + keepalive_thread.start() recipe = Recipe(xml, os.path.join(self.work_dir, self.build_dir), self.config) basedir = recipe.ctxt.basedir @@ -316,6 +380,7 @@ if self.dry_run: self._cancel_build(build_url) finally: + keepalive_thread.stop() if not self.keep_files and os.path.isdir(basedir): log.debug('Removing build directory %s' % basedir) _rmtree(basedir) @@ -431,6 +496,7 @@ help='don\'t report results back to master') group.add_option('-i', '--interval', dest='interval', metavar='SECONDS', type='int', help='time to wait between requesting builds') + group.add_option('-b', '--keepalive_interval', dest='keepalive_interval', metavar='SECONDS', type='int', help='time to wait between keepalive heartbeats') group = parser.add_option_group('logging') group.add_option('-l', '--log', dest='logfile', metavar='FILENAME', help='write log messages to FILENAME') @@ -443,7 +509,8 @@ parser.set_defaults(dry_run=False, keep_files=False, loglevel=logging.INFO, single_build=False, no_loop=False, - dump_reports=False, interval=300, form_auth=False) + dump_reports=False, interval=300, keepalive_interval=60, + form_auth=False) options, args = parser.parse_args() if len(args) < 1: @@ -477,6 +544,7 @@ single_build=options.single_build, no_loop=options.no_loop, poll_interval=options.interval, + keepalive_interval=options.keepalive_interval, username=options.username, password=options.password, dump_reports=options.dump_reports, form_auth=options.form_auth) diff --git a/bitten/tests/master.py b/bitten/tests/master.py --- a/bitten/tests/master.py +++ b/bitten/tests/master.py @@ -178,7 +178,7 @@ inbody = StringIO(""" Power Macintosh Darwin -""" % (PROTOCOL_VERSION-1,)) +""" % (PROTOCOL_VERSION-2,)) outheaders = {} outbody = StringIO() req = Mock(method='POST', base_path='', path_info='/builds', @@ -197,7 +197,7 @@ self.assertEqual(400, outheaders['Status']) self.assertEqual('Master-Slave version mismatch: master=%d, slave=%d' \ - % (PROTOCOL_VERSION, PROTOCOL_VERSION-1), + % (PROTOCOL_VERSION, PROTOCOL_VERSION-2), outbody.getvalue()) def test_create_build_protocol_no_version(self):