Mercurial > bitten > bitten-test
changeset 18:591a5a836ecc
* {{{beep.Listener}}} now has an event loop (based on code mostly from medusa)
* The Bitten master now opens a Trac environment and checks for changes to the repository every 15 seconds.
* {{{beep.Profile}}} renamed to {{{beep.ProfileHandler}}}
author | cmlenz |
---|---|
date | Fri, 17 Jun 2005 09:09:07 +0000 |
parents | 97219c9c70bf |
children | 9db5f8eddb0d |
files | bitten/master.py bitten/slave.py bitten/util/beep.py bitten/util/tests/beep.py |
diffstat | 4 files changed, 88 insertions(+), 20 deletions(-) [+] |
line wrap: on
line diff
--- a/bitten/master.py +++ b/bitten/master.py @@ -18,21 +18,50 @@ # # Author: Christopher Lenz <cmlenz@gmx.de> -import asyncore import getopt import logging import os.path import sys +import time + +from trac.env import Environment from bitten.util import beep from bitten.util.xmlio import Element, parse_xml -class BittenProfileHandler(beep.Profile): +class Master(beep.Listener): + + TRIGGER_INTERVAL = 10 + + def __init__(self, env_path, ip, port): + beep.Listener.__init__(self, ip, port) + self.profiles[BittenProfileHandler.URI] = BittenProfileHandler() + + self.env = Environment(env_path) + self.youngest_rev = None + self.slaves = {} + self.schedule(self.TRIGGER_INTERVAL, self.check_trigger) + + def check_trigger(self, master, when): + logging.debug('Checking for build triggers... (%s)' + % time.strftime('%x %X', time.localtime(when))) + repos = self.env.get_repository() + repos.sync() + if repos.youngest_rev != self.youngest_rev: + logging.debug('New changesets detected: %s' + % repos.youngest_rev) + self.youngest_rev = repos.youngest_rev + repos.close() + self.schedule(self.TRIGGER_INTERVAL, self.check_trigger) + + +class BittenProfileHandler(beep.ProfileHandler): URI = 'http://bitten.cmlenz.net/beep-profile/' - def __init__(self): - beep.Profile.__init__(self) + def handle_connect(self): + self.master = self.session.listener + assert self.master def handle_msg(self, msgno, msg): assert msg.get_content_type() == beep.BEEP_XML @@ -48,9 +77,11 @@ os_family = child.family os_version = child.version + self.master.slaves[elem.name] = self + rpy = beep.MIMEMessage(Element('ok'), beep.BEEP_XML) self.channel.send_rpy(msgno, rpy) - logging.info('Registered slave %s (%s running %s %s [%s])', + logging.info('Registered slave %s (%s running %s %s [%s])', elem.name, platform, os, os_version, os_family) @@ -58,13 +89,15 @@ options, args = getopt.getopt(sys.argv[1:], 'p:dvq', ['port=', 'debug', 'verbose', 'quiet']) if len(args) < 1: - print>>sys.stderr, 'usage: %s [options] ENV_PATH' % os.path.basename(sys.argv[0]) + print>>sys.stderr, 'usage: %s [options] ENV_PATH' \ + % os.path.basename(sys.argv[0]) print>>sys.stderr print>>sys.stderr, 'Valid options:' print>>sys.stderr, ' -p [--port] arg\tport number to use (default: 7633)' print>>sys.stderr, ' -q [--quiet]\tprint as little as possible' print>>sys.stderr, ' -v [--verbose]\tprint as much as possible' sys.exit(2) + env_path = args[0] port = 7633 loglevel = logging.WARNING @@ -74,6 +107,7 @@ port = int(arg) except ValueError: print>>sys.stderr, 'Port must be an integer' + sys.exit(2) elif opt in ('-d', '--debug'): loglevel = logging.DEBUG elif opt in ('-v', '--verbose'): @@ -82,9 +116,8 @@ loglevel = logging.ERROR logging.getLogger().setLevel(loglevel) - listener = beep.Listener('localhost', port) - listener.profiles[BittenProfileHandler.URI] = BittenProfileHandler() + master = Master(env_path, 'localhost', port) try: - asyncore.loop() + master.run() except KeyboardInterrupt: pass
--- a/bitten/slave.py +++ b/bitten/slave.py @@ -18,7 +18,6 @@ # # Author: Christopher Lenz <cmlenz@gmx.de> -import asyncore import getopt import logging import os @@ -46,7 +45,7 @@ handle_ok=self.channel_started) -class BittenProfileHandler(beep.Profile): +class BittenProfileHandler(beep.ProfileHandler): """Handles communication on the Bitten profile from the client perspective. """ URI = 'http://bitten.cmlenz.net/beep-profile/' @@ -90,7 +89,11 @@ host = args[0] if len(args) > 1: - port = int(args[1]) + try: + port = int(args[1]) + except ValueError: + print>>sys.stderr, 'Port must be an integer' + sys.exit(2) else: port = 7633
--- a/bitten/util/beep.py +++ b/bitten/util/beep.py @@ -28,6 +28,7 @@ import asynchat import asyncore +import bisect from email.Message import Message from email.Parser import Parser import logging @@ -63,6 +64,7 @@ self.set_reuse_addr() self.bind((ip, port)) self.profiles = {} + self.eventqueue = [] logging.debug('Listening to connections on %s:%d', ip, port) self.listen(5) @@ -82,14 +84,44 @@ """Start a new BEEP session.""" conn, (ip, port) = self.accept() logging.debug('Connected to %s:%d', ip, port) - Session(conn, (ip, port), self.profiles, first_channelno=2) + Session(self, conn, (ip, port), self.profiles, first_channelno=2) + + def run(self, timeout=15.0, granularity=5): + socket_map = asyncore.socket_map + last_event_check = 0 + while socket_map: + now = int(time.time()) + if (now - last_event_check) >= granularity: + last_event_check = now + fired = [] + # yuck. i want my lisp. + i = j = 0 + while i < len(self.eventqueue): + when, what = self.eventqueue[i] + if now >= when: + fired.append(what) + j = i + 1 + else: + break + i = i + 1 + if fired: + self.eventqueue = self.eventqueue[j:] + for what in fired: + what (self, now) + asyncore.poll(timeout) + + def schedule (self, delta, callback): + now = int(time.time()) + bisect.insort(self.eventqueue, (now + delta, callback)) class Session(asynchat.async_chat): """A BEEP session between two peers.""" - def __init__(self, conn, addr, profiles, first_channelno=1): + def __init__(self, listener=None, conn=None, addr=None, profiles=None, + first_channelno=1): asynchat.async_chat.__init__(self, conn) + self.listener = listener self.addr = addr self.set_terminator('\r\n') @@ -98,7 +130,7 @@ self.header = self.payload = None self.channelno = cycle_through(first_channelno, 2147483647, step=2) - self.channels = {0: Channel(self, 0, ManagementProfile())} + self.channels = {0: Channel(self, 0, ManagementProfileHandler())} def handle_connect(self): pass @@ -214,7 +246,7 @@ `Profile` instance that will handle the communication for that profile """ - Session.__init__(self, None, None, profiles or {}) + Session.__init__(self, profiles=profiles or {}) self.terminated = False self.create_socket(socket.AF_INET, socket.SOCK_STREAM) logging.debug('Connecting to %s:%s', ip, port) @@ -378,7 +410,7 @@ del self.ansnos[msgno] # dealloc answer numbers for the message -class Profile(object): +class ProfileHandler(object): """Abstract base class for handlers of specific BEEP profiles. Concrete subclasses need to at least implement the `handle_msg()` method, @@ -411,7 +443,7 @@ pass -class ManagementProfile(Profile): +class ManagementProfileHandler(ProfileHandler): """Implementation of the BEEP management profile.""" def handle_connect(self):
--- a/bitten/util/tests/beep.py +++ b/bitten/util/tests/beep.py @@ -15,7 +15,7 @@ payload.strip())) -class MockProfile(object): +class MockProfileHandler(object): def __init__(self): self.handled_messages = [] @@ -36,7 +36,7 @@ def setUp(self): self.session = MockSession() - self.profile = MockProfile() + self.profile = MockProfileHandler() def test_handle_single_msg_frame(self): """