changeset 785:8e76b8f6310a

Adding keepalives to the bitten client/server protocol. Keepalives now keep builds running as long as the slave is up, rather than having them time out independently of if the slave is working on them. This requires an update of both the master and slave to support keepalives-- older slaves (protocol version 3) will continue to work, but will not submit keepalives (and thus will retain previous behavior.) I've updated the PROTOCOL_VERSION field as older masters would kill builds for newer slaves that attempted to keepalive. We probably want to update test_create_build_protocol_wrong_version to test the matrix of allowed versions, but I expect the exception of allowing of older slaves to disappear when we next edit the protocol. Closes #411.
author wbell
date Thu, 29 Apr 2010 18:17:05 +0000
parents 15e26246cc0a
children 9a1a2faa6a79
files bitten/__init__.py bitten/master.py bitten/slave.py bitten/tests/master.py
diffstat 4 files changed, 103 insertions(+), 8 deletions(-) [+]
line wrap: on
line diff
--- a/bitten/__init__.py
+++ b/bitten/__init__.py
@@ -19,4 +19,4 @@
         pass
 
 # The master-slave protocol/configuration version
-PROTOCOL_VERSION = 3
+PROTOCOL_VERSION = 4
--- a/bitten/master.py
+++ b/bitten/master.py
@@ -128,6 +128,8 @@
 
         if req.args['collection'] == 'steps':
             return self._process_build_step(req, config, build)
+        elif req.args['collection'] == 'keepalive':
+            return self._process_keepalive(req, config, build)
         else:
             self._send_error(req, HTTP_NOT_FOUND,
                     "No such collection '%s'" % req.args['collection'])
@@ -164,7 +166,18 @@
             self._send_error(req, HTTP_BAD_REQUEST, 'XML parser error')
 
         slave_version = int(elem.attr.get('version', 1))
-        if slave_version != PROTOCOL_VERSION:
+
+        # FIXME: Remove version compatibility code.
+        # The initial difference between protocol version 3 and 4 is that
+        # the master allows keepalive requests-- the master must be
+        # at least 4 before slaves supporting version 4 are allowed. When
+        # the first force master/slave upgrade requirement comes in
+        # (or we bump the) version number again, remove this code.
+        if slave_version == 3 and PROTOCOL_VERSION == 4:
+            self.log.info('Allowing slave version %d to process build for '
+                          'compatibility. Upgrade slave to support build '
+                          'keepalives.', slave_version)
+        elif slave_version != PROTOCOL_VERSION:
             self._send_error(req, HTTP_BAD_REQUEST,
                     "Master-Slave version mismatch: master=%d, slave=%d" % \
                                 (PROTOCOL_VERSION, slave_version))
@@ -398,6 +411,20 @@
                             'Location': req.abs_href.builds(
                                     build.id, 'steps', stepname)})
 
+    def _process_keepalive(self, req, config, build):
+        build.last_activity = int(time.time())
+        build.update()
+
+        self.log.info('Slave %s build %d keepalive ("%s" as of [%s])',
+                      build.slave, build.id, build.config, build.rev)
+
+        body = 'Keepalive processed'
+        self._send_response(req, 200, body, {
+                            'Content-Type': 'text/plain',
+                            'Content-Length': str(len(body)),
+                            'Location': req.abs_href.builds(
+                                    build.id, 'keepalive')})
+
     def _start_new_step(self, build, stepname):
         """Creates the in-memory representation for a newly started
         step, ready to be persisted to the database.
--- a/bitten/slave.py
+++ b/bitten/slave.py
@@ -24,6 +24,8 @@
 import time
 import re
 import cookielib
+import threading
+import os
 from ConfigParser import MissingSectionHeaderError
 
 from bitten import PROTOCOL_VERSION
@@ -71,6 +73,60 @@
             self.method = self.has_data() and 'POST' or 'GET'
         return self.method
 
+class KeepAliveThread(threading.Thread):
+    "A thread to periodically send keep-alive messages to the master"
+    
+    def __init__(self, opener, build_url, single_build, keepalive_interval):
+        threading.Thread.__init__(self, None, None, "KeepaliveThread")
+        self.build_url = build_url
+        self.keepalive_interval = keepalive_interval
+        self.single_build = single_build
+        self.last_keepalive = int(time.time())
+        self.kill = False
+        self.opener = opener
+
+    def keepalive(self):
+        log.debug('Sending keepalive')
+        method = 'POST'
+        url = self.build_url + '/keepalive/'
+        body = None
+        shutdown = False
+        headers = {
+            'Content-Type': 'application/x-bitten+xml'
+            }
+            
+        log.debug('Sending %s request to %r', method, url)
+        req = SaneHTTPRequest(method, url, body, headers or {})
+        try:
+            return self.opener.open(req)
+        except urllib2.HTTPError, e:
+            # a conflict error lets us know that we've been
+            # invalidated. Ideally, we'd engineer something to stop any
+            # running steps in progress, but killing threads is tricky
+            # stuff. For now, we'll wait for whatever's going
+            # on to stop, and the main thread'll figure out that we've
+            # been invalidated.
+            log.warning('Server returned keepalive error %d: %s', e.code, e.msg)
+        except:
+            log.warning('Server returned unknown keepalive error')
+
+    def run(self):
+        log.debug('Keepalive thread starting.')
+        while (not self.kill):
+            now = int(time.time())
+            if (self.last_keepalive + self.keepalive_interval) < now:
+                self.keepalive()
+                self.last_keepalive = now
+            
+            time.sleep(1)
+        log.debug('Keepalive thread exiting.')
+
+    def stop(self):
+        log.debug('Stopping keepalive thread')
+        self.kill = True
+        self.join(30)
+        log.debug('Keepalive thread stopped')
+        
 
 class BuildSlave(object):
     """HTTP client implementation for the build slave."""
@@ -78,7 +134,8 @@
     def __init__(self, urls, name=None, config=None, dry_run=False,
                  work_dir=None, build_dir="build_${build}",
                  keep_files=False, single_build=False,
-                 poll_interval=300, username=None, password=None,
+                 poll_interval=300, keepalive_interval = 60,
+                 username=None, password=None,
                  dump_reports=False, no_loop=False, form_auth=False):
         """Create the build slave instance.
         
@@ -98,6 +155,8 @@
         :param poll_interval: the time in seconds to wait between requesting
                               builds from the build master (default is five
                               minutes)
+        :param keep_alive_interval: the time in seconds to wait between sending
+                                    keepalive heartbeats (default is 30 seconds)
         :param username: the username to use when authentication against the
                          build master is requested
         :param password: the password to use when authentication is needed
@@ -127,6 +186,7 @@
         self.single_build = single_build
         self.no_loop = no_loop
         self.poll_interval = poll_interval
+        self.keepalive_interval = keepalive_interval
         self.dump_reports = dump_reports
         self.cookiejar = cookielib.CookieJar()
         self.username = username \
@@ -169,8 +229,9 @@
                                         ).startswith('text/plain'):
                     content = e.read()
                 else:
-                    content = 'Unknown cause of error'
-                e.msg = '%s (%s)' % (e.msg, content)
+                    content = 'no message available'
+                log.debug('Server returned error %d: %s (%s)',
+                                        e.code, e.msg, content)
                 raise
             return e
 
@@ -294,7 +355,10 @@
         build_id = build_url and int(build_url.split('/')[-1]) or 0
         xml = xmlio.parse(fileobj)
         basedir = ''
+        keepalive_thread = KeepAliveThread(self.opener, build_url, self.single_build, self.keepalive_interval)
         try:
+            if not self.local:
+                keepalive_thread.start()
             recipe = Recipe(xml, os.path.join(self.work_dir, self.build_dir), 
                             self.config)
             basedir = recipe.ctxt.basedir
@@ -316,6 +380,7 @@
             if self.dry_run:
                 self._cancel_build(build_url)
         finally:
+            keepalive_thread.stop()
             if not self.keep_files and os.path.isdir(basedir):
                 log.debug('Removing build directory %s' % basedir)
                 _rmtree(basedir)
@@ -431,6 +496,7 @@
                      help='don\'t report results back to master')
     group.add_option('-i', '--interval', dest='interval', metavar='SECONDS',
                      type='int', help='time to wait between requesting builds')
+    group.add_option('-b', '--keepalive_interval', dest='keepalive_interval', metavar='SECONDS', type='int', help='time to wait between keepalive heartbeats')
     group = parser.add_option_group('logging')
     group.add_option('-l', '--log', dest='logfile', metavar='FILENAME',
                      help='write log messages to FILENAME')
@@ -443,7 +509,8 @@
 
     parser.set_defaults(dry_run=False, keep_files=False,
                         loglevel=logging.INFO, single_build=False, no_loop=False,
-                        dump_reports=False, interval=300, form_auth=False)
+                        dump_reports=False, interval=300, keepalive_interval=60,
+                        form_auth=False)
     options, args = parser.parse_args()
 
     if len(args) < 1:
@@ -477,6 +544,7 @@
                        single_build=options.single_build,
                        no_loop=options.no_loop,
                        poll_interval=options.interval,
+                       keepalive_interval=options.keepalive_interval,
                        username=options.username, password=options.password,
                        dump_reports=options.dump_reports,
                        form_auth=options.form_auth)
--- a/bitten/tests/master.py
+++ b/bitten/tests/master.py
@@ -178,7 +178,7 @@
         inbody = StringIO("""<slave name="hal" version="%d">
   <platform>Power Macintosh</platform>
   <os family="posix" version="8.1.0">Darwin</os>
-</slave>""" % (PROTOCOL_VERSION-1,))
+</slave>""" % (PROTOCOL_VERSION-2,))
         outheaders = {}
         outbody = StringIO()
         req = Mock(method='POST', base_path='', path_info='/builds',
@@ -197,7 +197,7 @@
 
         self.assertEqual(400, outheaders['Status'])
         self.assertEqual('Master-Slave version mismatch: master=%d, slave=%d' \
-                                % (PROTOCOL_VERSION, PROTOCOL_VERSION-1),
+                                % (PROTOCOL_VERSION, PROTOCOL_VERSION-2),
                             outbody.getvalue())
 
     def test_create_build_protocol_no_version(self):
Copyright (C) 2012-2017 Edgewall Software