# HG changeset patch # User cmlenz # Date 1119022548 0 # Node ID cad6d28b89752817e101ab575cb4c1f74aad1aed # Parent 1a7b9044b0a56101cd96c3c1d8a9f6b3d8cc43e2 * Proper separation between {{{beep.ProfileHandler}}} instances between different channels. * Unregister slaves on disconnect. diff --git a/bitten/master.py b/bitten/master.py --- a/bitten/master.py +++ b/bitten/master.py @@ -19,11 +19,8 @@ # Author: Christopher Lenz import logging -import os.path -import time from trac.env import Environment - from bitten import __version__ as VERSION from bitten.util import beep from bitten.util.xmlio import Element, parse_xml @@ -35,7 +32,7 @@ def __init__(self, env_path, ip, port): beep.Listener.__init__(self, ip, port) - self.profiles[OrchestrationProfileHandler.URI] = OrchestrationProfileHandler() + self.profiles[OrchestrationProfileHandler.URI] = OrchestrationProfileHandler self.env = Environment(env_path) self.youngest_rev = None @@ -43,12 +40,11 @@ self.schedule(self.TRIGGER_INTERVAL, self.check_trigger) def check_trigger(self, master, when): - logging.debug('Checking for build triggers... (%s)' - % time.strftime('%x %X', time.localtime(when))) + logging.debug('Checking for build triggers...') repos = self.env.get_repository() repos.sync() if repos.youngest_rev != self.youngest_rev: - logging.debug('New changesets detected: %s' + logging.info('New changeset detected: [%s]' % repos.youngest_rev) self.youngest_rev = repos.youngest_rev repos.close() @@ -64,6 +60,11 @@ def handle_connect(self): self.master = self.session.listener assert self.master + self.slave_name = None + + def handle_disconnect(self): + del self.master.slaves[self.slave_name] + logging.info('Unregistered slave "%s"', self.slave_name) def handle_msg(self, msgno, msg): assert msg.get_content_type() == beep.BEEP_XML @@ -79,12 +80,13 @@ os_family = child.family os_version = child.version - self.master.slaves[elem.name] = self + self.slave_name = elem.name + self.master.slaves[self.slave_name] = self rpy = beep.MIMEMessage(Element('ok'), beep.BEEP_XML) self.channel.send_rpy(msgno, rpy) - logging.info('Registered slave %s (%s running %s %s [%s])', - elem.name, platform, os, os_version, os_family) + logging.info('Registered slave "%s" (%s running %s %s [%s])', + self.slave_name, platform, os, os_version, os_family) if __name__ == '__main__': diff --git a/bitten/util/beep.py b/bitten/util/beep.py --- a/bitten/util/beep.py +++ b/bitten/util/beep.py @@ -63,7 +63,7 @@ self.create_socket(socket.AF_INET, socket.SOCK_STREAM) self.set_reuse_addr() self.bind((ip, port)) - self.profiles = {} + self.profiles = {} # Mapping from URIs to ProfileHandler sub-classes self.eventqueue = [] logging.debug('Listening to connections on %s:%d', ip, port) self.listen(5) @@ -120,6 +120,18 @@ def __init__(self, listener=None, conn=None, addr=None, profiles=None, first_channelno=1): + """Create the BEEP session. + + @param listener: The `Listener` this session belongs to, or `None` for + a session by the initiating peer + @param conn: The connection + @param addr: The address of the remote peer, a (IP-address, port) tuple + @param profiles: A dictionary of supported profiles; the keys are the + URIs of the profiles, the values are corresponding + sub-classes of `ProfileHandler` + @param first_channelno: The first channel number to request; 0 for the + peer in the listening role, 1 for initiators + """ asynchat.async_chat.__init__(self, conn) self.listener = listener self.addr = addr @@ -130,7 +142,7 @@ self.header = self.payload = None self.channelno = cycle_through(first_channelno, 2147483647, step=2) - self.channels = {0: Channel(self, 0, ManagementProfileHandler())} + self.channels = {0: Channel(self, 0, ManagementProfileHandler)} def handle_connect(self): pass @@ -306,12 +318,12 @@ class Channel(object): """A specific channel of a BEEP session.""" - def __init__(self, session, channelno, profile): + def __init__(self, session, channelno, profile_cls): """Create the channel. @param session The `Session` object that the channel belongs to @param channelno The channel number - @param profile The associated `Profile` object + @param profile The associated `ProfileHandler` class """ self.session = session self.channelno = channelno @@ -326,12 +338,16 @@ self.seqno = [serial(), serial()] # incoming, outgoing sequence numbers self.mime_parser = Parser() - self.profile = profile + self.profile = profile_cls() self.profile.session = self.session self.profile.channel = self self.profile.handle_connect() + def close(self): + self.profile.handle_disconnect() + del self.session.channels[self.channelno] + def handle_seq_frame(self, ackno, window): """Process a TCP mapping frame (SEQ). @@ -457,7 +473,9 @@ def handle_connect(self): """Called when the channel this profile is associated with is initially started.""" - pass + + def handle_disconnect(self): + """Called when the channel this profile is associated with is closed.""" def handle_msg(self, msgno, message): raise NotImplementedError @@ -518,7 +536,7 @@ if self.session.channels[channelno].msgnos: self.send_error(msgno, 550, 'Channel waiting for replies') return - del self.session.channels[channelno] + self.session.channels[channelno].close() message = MIMEMessage(Element('ok'), BEEP_XML) self.channel.send_rpy(msgno, message) if not self.session.channels: @@ -551,7 +569,7 @@ def handle_reply(cmd, msgno, message): if cmd == 'RPY': logging.debug('Channel %d closed', channelno) - del self.session.channels[channelno] + self.session.channels[channelno].close() if handle_ok is not None: handle_ok() if not self.session.channels: @@ -580,12 +598,11 @@ def handle_reply(cmd, msgno, message): if cmd == 'RPY': elem = parse_xml(message.get_payload()) - for profile in [p for p in profiles if p.URI == elem.uri]: + for cls in [cls for cls in profiles if cls.URI == elem.uri]: logging.debug('Channel %d started with profile %s', channelno, elem.uri) self.session.channels[channelno] = Channel(self.session, - channelno, - profile()) + channelno, cls) break if handle_ok is not None: handle_ok(channelno, elem.uri)