changeset 34:6da9468a6879

The build master now gracefully exits by first terminating all active sessions. Fixes #7.
author cmlenz
date Tue, 21 Jun 2005 13:38:31 +0000
parents d8d44216258a
children 67631e1d4d45
files bitten/master.py bitten/slave.py bitten/util/beep.py
diffstat 3 files changed, 73 insertions(+), 46 deletions(-) [+]
line wrap: on
line diff
--- a/bitten/master.py
+++ b/bitten/master.py
@@ -129,8 +129,7 @@
     try:
         master.run()
     except KeyboardInterrupt:
-        # FIXME: gracefully shutdown all active sessions
-        pass
+        master.quit()
 
 if __name__ == '__main__':
     main()
--- a/bitten/slave.py
+++ b/bitten/slave.py
@@ -96,7 +96,10 @@
     logging.getLogger().setLevel(options.loglevel)
 
     slave = Slave(host, port)
-    slave.run()
+    try:
+        slave.run()
+    except KeyboardInterrupt:
+        slave.quit()
 
 if __name__ == '__main__':
     main()
--- a/bitten/util/beep.py
+++ b/bitten/util/beep.py
@@ -64,6 +64,7 @@
         self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
         self.set_reuse_addr()
         self.bind((ip, port))
+        self.sessions = []
         self.profiles = {} # Mapping from URIs to ProfileHandler sub-classes
         self.eventqueue = []
         logging.debug('Listening to connections on %s:%d', ip, port)
@@ -85,7 +86,8 @@
         """Start a new BEEP session initiated by a peer."""
         conn, (ip, port) = self.accept()
         logging.debug('Connected to %s:%d', ip, port)
-        Session(self, conn, (ip, port), self.profiles, first_channelno=2)
+        self.sessions.append(Session(self, conn, (ip, port), self.profiles,
+                                     first_channelno=2))
 
     def run(self, timeout=15.0, granularity=5):
         """Start listening to incoming connections."""
@@ -120,6 +122,24 @@
         """
         bisect.insort(self.eventqueue, (int(time.time()) + delta, callback))
 
+    def quit(self):
+        if not self.sessions:
+            self.close()
+            return
+        def terminate_next_session(session=self, when=None):
+            session = self.sessions[-1]
+            def handle_ok():
+                if self.sessions:
+                    terminate_next_session()
+                else:
+                    self.close()
+            def handle_error(channelno, code, message):
+                logging.error('Failed to close channel %d', channelno)
+            logging.info('Should close session with %s', session.addr)
+            session.terminate(handle_ok=handle_ok)
+        self.schedule(0, terminate_next_session)
+        self.run(.5)
+
 
 class Session(asynchat.async_chat):
     """A BEEP session between two peers."""
@@ -146,16 +166,18 @@
         self.profiles = profiles or {}
         self.inbuf = []
         self.header = self.payload = None
-        
+
         self.channelno = cycle_through(first_channelno, 2147483647, step=2)
         self.channels = {0: Channel(self, 0, ManagementProfileHandler)}
 
-    def handle_close(self):
-        logging.debug('Connection closed by peer')
-        self.close()
-
-    def handle_connect(self):
-        """Called by asyncore when the connection is established."""
+    def close(self):
+        if self.listener:
+            logging.debug('Closing connection to %s:%s', self.addr[0],
+                          self.addr[1])
+            self.listener.sessions.remove(self)
+        else:
+            logging.info('Session terminated')
+        asynchat.async_chat.close(self)
 
     def handle_error(self):
         """Called by asyncore when an exception is raised."""
@@ -278,6 +300,27 @@
         logging.debug('Sending frame [%s]', header)
         self.push('\r\n'.join((header, 'END', '')))
 
+    def terminate(self, handle_ok=None, handle_error=None):
+        """Terminate the session by closing all channels."""
+        def close_next_channel():
+            channelno = max(self.channels.keys())
+            def _handle_ok():
+                if channelno == 0:
+                    if handle_ok is not None:
+                        handle_ok()
+                else:
+                    close_next_channel()
+            def _handle_error(code, message):
+                logging.error('Peer refused to close channel %d: %s (%d)',
+                              channelno, message, code)
+                if handle_error is not None:
+                    handle_error(channelno, code, message)
+                else:
+                    raise ProtocolError, '%s (%d)' % (message, code)
+            self.channels[0].profile.send_close(channelno, handle_ok=_handle_ok,
+                                                handle_error=_handle_error)
+        close_next_channel()
+
 
 class Initiator(Session):
     """Root class for BEEP peers in the initiating role."""
@@ -289,22 +332,18 @@
         @param port: The port to connect to
         @param profiles: A dictionary of the supported profiles, where the key
                          is the URI identifying the profile, and the value is a
-                         `Profile` instance that will handle the communication
-                         for that profile
+                         `ProfileHandler` sub-class that will be instantiated to
+                         handle the communication for that profile
         """
         Session.__init__(self, profiles=profiles or {})
-        self.terminated = False
         self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
         logging.debug('Connecting to %s:%s', ip, port)
-        try:
-            self.connect((ip, port))
-        except socket.error, e:
-            raise TerminateSession, 'Connection to %s:%d failed' % ip, port
+        self.addr = (ip, port)
+        self.connect(self.addr)
 
-    def handle_close(self):
-        """Called by asyncore when the socket has been closed."""
-        self.terminated = True
-        Session.handle_close(self)
+    def handle_connect(self):
+        """Called by asyncore when the connection is established."""
+        logging.debug('Connected to peer at %s:%s', self.addr[0], self.addr[1])
 
     def greeting_received(self, profiles):
         """Sub-classes should override this to start the channels they need.
@@ -318,28 +357,15 @@
         """Start this peer, which will try to connect to the server and send a
         greeting.
         """
-        while not self.terminated:
-            try:
-                asyncore.loop()
-                self.terminated = True
-            except (KeyboardInterrupt, TerminateSession), e:
-                logging.info('Terminating session')
-                self._quit()
-                time.sleep(.25)
+        try:
+            asyncore.loop()
+        except TerminateSession, e:
+            logging.info('Terminating session')
+            self.terminate()
 
-    def _quit(self):
-        channelno = max(self.channels.keys())
-        def handle_ok():
-            if channelno == 0:
-                self.terminated = True
-            else:
-                self._quit()
-        def handle_error(code, message):
-            logging.error('Peer refused to close channel %d: %s (%d)',
-                          channelno, message, code)
-            raise ProtocolError, '%s (%d)' % (message, code)
-        self.channels[0].profile.send_close(channelno, handle_ok=handle_ok,
-                                            handle_error=handle_error)
+    def quit(self):
+        self.terminate()
+        asyncore.loop(timeout=10)
 
 
 class Channel(object):
@@ -625,11 +651,10 @@
             if cmd == 'RPY':
                 logging.debug('Channel %d closed', channelno)
                 self.session.channels[channelno].close()
+                if not self.session.channels:
+                    self.session.close()
                 if handle_ok is not None:
                     handle_ok()
-                if not self.session.channels:
-                    logging.debug('Session terminated')
-                    self.session.close()
             elif cmd == 'ERR':
                 elem = xmlio.parse(message.get_payload())
                 text = elem.gettext()
Copyright (C) 2012-2017 Edgewall Software