changeset 786:38e11e15d95e 0.6.x

Port of [864] to 0.6.x
author wbell
date Thu, 29 Apr 2010 18:19:48 +0000
parents 064c293c9a5e
children da6f033cc1d2
files bitten/__init__.py bitten/master.py bitten/slave.py bitten/tests/master.py
diffstat 4 files changed, 103 insertions(+), 8 deletions(-) [+]
line wrap: on
line diff
--- a/bitten/__init__.py
+++ b/bitten/__init__.py
@@ -19,4 +19,4 @@
         pass
 
 # The master-slave protocol/configuration version
-PROTOCOL_VERSION = 3
+PROTOCOL_VERSION = 4
--- a/bitten/master.py
+++ b/bitten/master.py
@@ -128,6 +128,8 @@
 
         if req.args['collection'] == 'steps':
             return self._process_build_step(req, config, build)
+        elif req.args['collection'] == 'keepalive':
+            return self._process_keepalive(req, config, build)
         else:
             self._send_error(req, HTTP_NOT_FOUND,
                     "No such collection '%s'" % req.args['collection'])
@@ -164,7 +166,18 @@
             self._send_error(req, HTTP_BAD_REQUEST, 'XML parser error')
 
         slave_version = int(elem.attr.get('version', 1))
-        if slave_version != PROTOCOL_VERSION:
+
+        # FIXME: Remove version compatibility code.
+        # The initial difference between protocol version 3 and 4 is that
+        # the master allows keepalive requests-- the master must be
+        # at least 4 before slaves supporting version 4 are allowed. When
+        # the first force master/slave upgrade requirement comes in
+        # (or we bump the) version number again, remove this code.
+        if slave_version == 3 and PROTOCOL_VERSION == 4:
+            self.log.info('Allowing slave version %d to process build for '
+                          'compatibility. Upgrade slave to support build '
+                          'keepalives.', slave_version)
+        elif slave_version != PROTOCOL_VERSION:
             self._send_error(req, HTTP_BAD_REQUEST,
                     "Master-Slave version mismatch: master=%d, slave=%d" % \
                                 (PROTOCOL_VERSION, slave_version))
@@ -398,6 +411,20 @@
                             'Location': req.abs_href.builds(
                                     build.id, 'steps', stepname)})
 
+    def _process_keepalive(self, req, config, build):
+        build.last_activity = int(time.time())
+        build.update()
+
+        self.log.info('Slave %s build %d keepalive ("%s" as of [%s])',
+                      build.slave, build.id, build.config, build.rev)
+
+        body = 'Keepalive processed'
+        self._send_response(req, 200, body, {
+                            'Content-Type': 'text/plain',
+                            'Content-Length': str(len(body)),
+                            'Location': req.abs_href.builds(
+                                    build.id, 'keepalive')})
+
     def _start_new_step(self, build, stepname):
         """Creates the in-memory representation for a newly started
         step, ready to be persisted to the database.
--- a/bitten/slave.py
+++ b/bitten/slave.py
@@ -24,6 +24,8 @@
 import time
 import re
 import cookielib
+import threading
+import os
 from ConfigParser import MissingSectionHeaderError
 
 from bitten import PROTOCOL_VERSION
@@ -71,6 +73,60 @@
             self.method = self.has_data() and 'POST' or 'GET'
         return self.method
 
+class KeepAliveThread(threading.Thread):
+    "A thread to periodically send keep-alive messages to the master"
+    
+    def __init__(self, opener, build_url, single_build, keepalive_interval):
+        threading.Thread.__init__(self, None, None, "KeepaliveThread")
+        self.build_url = build_url
+        self.keepalive_interval = keepalive_interval
+        self.single_build = single_build
+        self.last_keepalive = int(time.time())
+        self.kill = False
+        self.opener = opener
+
+    def keepalive(self):
+        log.debug('Sending keepalive')
+        method = 'POST'
+        url = self.build_url + '/keepalive/'
+        body = None
+        shutdown = False
+        headers = {
+            'Content-Type': 'application/x-bitten+xml'
+            }
+            
+        log.debug('Sending %s request to %r', method, url)
+        req = SaneHTTPRequest(method, url, body, headers or {})
+        try:
+            return self.opener.open(req)
+        except urllib2.HTTPError, e:
+            # a conflict error lets us know that we've been
+            # invalidated. Ideally, we'd engineer something to stop any
+            # running steps in progress, but killing threads is tricky
+            # stuff. For now, we'll wait for whatever's going
+            # on to stop, and the main thread'll figure out that we've
+            # been invalidated.
+            log.warning('Server returned keepalive error %d: %s', e.code, e.msg)
+        except:
+            log.warning('Server returned unknown keepalive error')
+
+    def run(self):
+        log.debug('Keepalive thread starting.')
+        while (not self.kill):
+            now = int(time.time())
+            if (self.last_keepalive + self.keepalive_interval) < now:
+                self.keepalive()
+                self.last_keepalive = now
+            
+            time.sleep(1)
+        log.debug('Keepalive thread exiting.')
+
+    def stop(self):
+        log.debug('Stopping keepalive thread')
+        self.kill = True
+        self.join(30)
+        log.debug('Keepalive thread stopped')
+        
 
 class BuildSlave(object):
     """HTTP client implementation for the build slave."""
@@ -78,7 +134,8 @@
     def __init__(self, urls, name=None, config=None, dry_run=False,
                  work_dir=None, build_dir="build_${build}",
                  keep_files=False, single_build=False,
-                 poll_interval=300, username=None, password=None,
+                 poll_interval=300, keepalive_interval = 60,
+                 username=None, password=None,
                  dump_reports=False, no_loop=False, form_auth=False):
         """Create the build slave instance.
         
@@ -98,6 +155,8 @@
         :param poll_interval: the time in seconds to wait between requesting
                               builds from the build master (default is five
                               minutes)
+        :param keep_alive_interval: the time in seconds to wait between sending
+                                    keepalive heartbeats (default is 30 seconds)
         :param username: the username to use when authentication against the
                          build master is requested
         :param password: the password to use when authentication is needed
@@ -127,6 +186,7 @@
         self.single_build = single_build
         self.no_loop = no_loop
         self.poll_interval = poll_interval
+        self.keepalive_interval = keepalive_interval
         self.dump_reports = dump_reports
         self.cookiejar = cookielib.CookieJar()
         self.username = username \
@@ -169,8 +229,9 @@
                                         ).startswith('text/plain'):
                     content = e.read()
                 else:
-                    content = 'Unknown cause of error'
-                e.msg = '%s (%s)' % (e.msg, content)
+                    content = 'no message available'
+                log.debug('Server returned error %d: %s (%s)',
+                                        e.code, e.msg, content)
                 raise
             return e
 
@@ -294,7 +355,10 @@
         build_id = build_url and int(build_url.split('/')[-1]) or 0
         xml = xmlio.parse(fileobj)
         basedir = ''
+        keepalive_thread = KeepAliveThread(self.opener, build_url, self.single_build, self.keepalive_interval)
         try:
+            if not self.local:
+                keepalive_thread.start()
             recipe = Recipe(xml, os.path.join(self.work_dir, self.build_dir), 
                             self.config)
             basedir = recipe.ctxt.basedir
@@ -316,6 +380,7 @@
             if self.dry_run:
                 self._cancel_build(build_url)
         finally:
+            keepalive_thread.stop()
             if not self.keep_files and os.path.isdir(basedir):
                 log.debug('Removing build directory %s' % basedir)
                 _rmtree(basedir)
@@ -431,6 +496,7 @@
                      help='don\'t report results back to master')
     group.add_option('-i', '--interval', dest='interval', metavar='SECONDS',
                      type='int', help='time to wait between requesting builds')
+    group.add_option('-b', '--keepalive_interval', dest='keepalive_interval', metavar='SECONDS', type='int', help='time to wait between keepalive heartbeats')
     group = parser.add_option_group('logging')
     group.add_option('-l', '--log', dest='logfile', metavar='FILENAME',
                      help='write log messages to FILENAME')
@@ -443,7 +509,8 @@
 
     parser.set_defaults(dry_run=False, keep_files=False,
                         loglevel=logging.INFO, single_build=False, no_loop=False,
-                        dump_reports=False, interval=300, form_auth=False)
+                        dump_reports=False, interval=300, keepalive_interval=60,
+                        form_auth=False)
     options, args = parser.parse_args()
 
     if len(args) < 1:
@@ -477,6 +544,7 @@
                        single_build=options.single_build,
                        no_loop=options.no_loop,
                        poll_interval=options.interval,
+                       keepalive_interval=options.keepalive_interval,
                        username=options.username, password=options.password,
                        dump_reports=options.dump_reports,
                        form_auth=options.form_auth)
--- a/bitten/tests/master.py
+++ b/bitten/tests/master.py
@@ -178,7 +178,7 @@
         inbody = StringIO("""<slave name="hal" version="%d">
   <platform>Power Macintosh</platform>
   <os family="posix" version="8.1.0">Darwin</os>
-</slave>""" % (PROTOCOL_VERSION-1,))
+</slave>""" % (PROTOCOL_VERSION-2,))
         outheaders = {}
         outbody = StringIO()
         req = Mock(method='POST', base_path='', path_info='/builds',
@@ -197,7 +197,7 @@
 
         self.assertEqual(400, outheaders['Status'])
         self.assertEqual('Master-Slave version mismatch: master=%d, slave=%d' \
-                                % (PROTOCOL_VERSION, PROTOCOL_VERSION-1),
+                                % (PROTOCOL_VERSION, PROTOCOL_VERSION-2),
                             outbody.getvalue())
 
     def test_create_build_protocol_no_version(self):
Copyright (C) 2012-2017 Edgewall Software