changeset 24:cad6d28b8975

* Proper separation between {{{beep.ProfileHandler}}} instances between different channels. * Unregister slaves on disconnect.
author cmlenz
date Fri, 17 Jun 2005 15:35:48 +0000
parents 1a7b9044b0a5
children 0f251596ba05
files bitten/master.py bitten/util/beep.py
diffstat 2 files changed, 40 insertions(+), 21 deletions(-) [+]
line wrap: on
line diff
--- a/bitten/master.py
+++ b/bitten/master.py
@@ -19,11 +19,8 @@
 # Author: Christopher Lenz <cmlenz@gmx.de>
 
 import logging
-import os.path
-import time
 
 from trac.env import Environment
-
 from bitten import __version__ as VERSION
 from bitten.util import beep
 from bitten.util.xmlio import Element, parse_xml
@@ -35,7 +32,7 @@
 
     def __init__(self, env_path, ip, port):
         beep.Listener.__init__(self, ip, port)
-        self.profiles[OrchestrationProfileHandler.URI] = OrchestrationProfileHandler()
+        self.profiles[OrchestrationProfileHandler.URI] = OrchestrationProfileHandler
 
         self.env = Environment(env_path)
         self.youngest_rev = None
@@ -43,12 +40,11 @@
         self.schedule(self.TRIGGER_INTERVAL, self.check_trigger)
 
     def check_trigger(self, master, when):
-        logging.debug('Checking for build triggers... (%s)'
-                      % time.strftime('%x %X', time.localtime(when)))
+        logging.debug('Checking for build triggers...')
         repos = self.env.get_repository()
         repos.sync()
         if repos.youngest_rev != self.youngest_rev:
-            logging.debug('New changesets detected: %s'
+            logging.info('New changeset detected: [%s]'
                           % repos.youngest_rev)
             self.youngest_rev = repos.youngest_rev
         repos.close()
@@ -64,6 +60,11 @@
     def handle_connect(self):
         self.master = self.session.listener
         assert self.master
+        self.slave_name = None
+
+    def handle_disconnect(self):
+        del self.master.slaves[self.slave_name]
+        logging.info('Unregistered slave "%s"', self.slave_name)
 
     def handle_msg(self, msgno, msg):
         assert msg.get_content_type() == beep.BEEP_XML
@@ -79,12 +80,13 @@
                     os_family = child.family
                     os_version = child.version
 
-            self.master.slaves[elem.name] = self
+            self.slave_name = elem.name
+            self.master.slaves[self.slave_name] = self
 
             rpy = beep.MIMEMessage(Element('ok'), beep.BEEP_XML)
             self.channel.send_rpy(msgno, rpy)
-            logging.info('Registered slave %s (%s running %s %s [%s])',
-                         elem.name, platform, os, os_version, os_family)
+            logging.info('Registered slave "%s" (%s running %s %s [%s])',
+                         self.slave_name, platform, os, os_version, os_family)
 
 
 if __name__ == '__main__':
--- a/bitten/util/beep.py
+++ b/bitten/util/beep.py
@@ -63,7 +63,7 @@
         self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
         self.set_reuse_addr()
         self.bind((ip, port))
-        self.profiles = {}
+        self.profiles = {} # Mapping from URIs to ProfileHandler sub-classes
         self.eventqueue = []
         logging.debug('Listening to connections on %s:%d', ip, port)
         self.listen(5)
@@ -120,6 +120,18 @@
 
     def __init__(self, listener=None, conn=None, addr=None, profiles=None,
                  first_channelno=1):
+        """Create the BEEP session.
+        
+        @param listener: The `Listener` this session belongs to, or `None` for
+                         a session by the initiating peer
+        @param conn: The connection
+        @param addr: The address of the remote peer, a (IP-address, port) tuple
+        @param profiles: A dictionary of supported profiles; the keys are the
+                         URIs of the profiles, the values are corresponding
+                         sub-classes of `ProfileHandler`
+        @param first_channelno: The first channel number to request; 0 for the
+                                peer in the listening role, 1 for initiators
+        """
         asynchat.async_chat.__init__(self, conn)
         self.listener = listener
         self.addr = addr
@@ -130,7 +142,7 @@
         self.header = self.payload = None
         
         self.channelno = cycle_through(first_channelno, 2147483647, step=2)
-        self.channels = {0: Channel(self, 0, ManagementProfileHandler())}
+        self.channels = {0: Channel(self, 0, ManagementProfileHandler)}
 
     def handle_connect(self):
         pass
@@ -306,12 +318,12 @@
 class Channel(object):
     """A specific channel of a BEEP session."""
 
-    def __init__(self, session, channelno, profile):
+    def __init__(self, session, channelno, profile_cls):
         """Create the channel.
 
         @param session The `Session` object that the channel belongs to
         @param channelno The channel number
-        @param profile The associated `Profile` object
+        @param profile The associated `ProfileHandler` class
         """
         self.session = session
         self.channelno = channelno
@@ -326,12 +338,16 @@
         self.seqno = [serial(), serial()] # incoming, outgoing sequence numbers
         self.mime_parser = Parser()
 
-        self.profile = profile
+        self.profile = profile_cls()
         self.profile.session = self.session
         self.profile.channel = self
 
         self.profile.handle_connect()
 
+    def close(self):
+        self.profile.handle_disconnect()
+        del self.session.channels[self.channelno]
+
     def handle_seq_frame(self, ackno, window):
         """Process a TCP mapping frame (SEQ).
         
@@ -457,7 +473,9 @@
     def handle_connect(self):
         """Called when the channel this profile is associated with is
         initially started."""
-        pass
+
+    def handle_disconnect(self):
+        """Called when the channel this profile is associated with is closed."""
 
     def handle_msg(self, msgno, message):
         raise NotImplementedError
@@ -518,7 +536,7 @@
             if self.session.channels[channelno].msgnos:
                 self.send_error(msgno, 550, 'Channel waiting for replies')
                 return
-            del self.session.channels[channelno]
+            self.session.channels[channelno].close()
             message = MIMEMessage(Element('ok'), BEEP_XML)
             self.channel.send_rpy(msgno, message)
             if not self.session.channels:
@@ -551,7 +569,7 @@
         def handle_reply(cmd, msgno, message):
             if cmd == 'RPY':
                 logging.debug('Channel %d closed', channelno)
-                del self.session.channels[channelno]
+                self.session.channels[channelno].close()
                 if handle_ok is not None:
                     handle_ok()
                 if not self.session.channels:
@@ -580,12 +598,11 @@
         def handle_reply(cmd, msgno, message):
             if cmd == 'RPY':
                 elem = parse_xml(message.get_payload())
-                for profile in [p for p in profiles if p.URI == elem.uri]:
+                for cls in [cls for cls in profiles if cls.URI == elem.uri]:
                     logging.debug('Channel %d started with profile %s',
                                   channelno, elem.uri)
                     self.session.channels[channelno] = Channel(self.session,
-                                                               channelno,
-                                                               profile())
+                                                               channelno, cls)
                     break
                 if handle_ok is not None:
                     handle_ok(channelno, elem.uri)
Copyright (C) 2012-2017 Edgewall Software