# HG changeset patch # User cmlenz # Date 1119361111 0 # Node ID 6da9468a6879b3085cccd7bb0f007819ff37ab6a # Parent d8d44216258a6cd24ad5e909fc2485ee370c8b26 The build master now gracefully exits by first terminating all active sessions. Fixes #7. diff --git a/bitten/master.py b/bitten/master.py --- a/bitten/master.py +++ b/bitten/master.py @@ -129,8 +129,7 @@ try: master.run() except KeyboardInterrupt: - # FIXME: gracefully shutdown all active sessions - pass + master.quit() if __name__ == '__main__': main() diff --git a/bitten/slave.py b/bitten/slave.py --- a/bitten/slave.py +++ b/bitten/slave.py @@ -96,7 +96,10 @@ logging.getLogger().setLevel(options.loglevel) slave = Slave(host, port) - slave.run() + try: + slave.run() + except KeyboardInterrupt: + slave.quit() if __name__ == '__main__': main() diff --git a/bitten/util/beep.py b/bitten/util/beep.py --- a/bitten/util/beep.py +++ b/bitten/util/beep.py @@ -64,6 +64,7 @@ self.create_socket(socket.AF_INET, socket.SOCK_STREAM) self.set_reuse_addr() self.bind((ip, port)) + self.sessions = [] self.profiles = {} # Mapping from URIs to ProfileHandler sub-classes self.eventqueue = [] logging.debug('Listening to connections on %s:%d', ip, port) @@ -85,7 +86,8 @@ """Start a new BEEP session initiated by a peer.""" conn, (ip, port) = self.accept() logging.debug('Connected to %s:%d', ip, port) - Session(self, conn, (ip, port), self.profiles, first_channelno=2) + self.sessions.append(Session(self, conn, (ip, port), self.profiles, + first_channelno=2)) def run(self, timeout=15.0, granularity=5): """Start listening to incoming connections.""" @@ -120,6 +122,24 @@ """ bisect.insort(self.eventqueue, (int(time.time()) + delta, callback)) + def quit(self): + if not self.sessions: + self.close() + return + def terminate_next_session(session=self, when=None): + session = self.sessions[-1] + def handle_ok(): + if self.sessions: + terminate_next_session() + else: + self.close() + def handle_error(channelno, code, message): + logging.error('Failed to close channel %d', channelno) + logging.info('Should close session with %s', session.addr) + session.terminate(handle_ok=handle_ok) + self.schedule(0, terminate_next_session) + self.run(.5) + class Session(asynchat.async_chat): """A BEEP session between two peers.""" @@ -146,16 +166,18 @@ self.profiles = profiles or {} self.inbuf = [] self.header = self.payload = None - + self.channelno = cycle_through(first_channelno, 2147483647, step=2) self.channels = {0: Channel(self, 0, ManagementProfileHandler)} - def handle_close(self): - logging.debug('Connection closed by peer') - self.close() - - def handle_connect(self): - """Called by asyncore when the connection is established.""" + def close(self): + if self.listener: + logging.debug('Closing connection to %s:%s', self.addr[0], + self.addr[1]) + self.listener.sessions.remove(self) + else: + logging.info('Session terminated') + asynchat.async_chat.close(self) def handle_error(self): """Called by asyncore when an exception is raised.""" @@ -278,6 +300,27 @@ logging.debug('Sending frame [%s]', header) self.push('\r\n'.join((header, 'END', ''))) + def terminate(self, handle_ok=None, handle_error=None): + """Terminate the session by closing all channels.""" + def close_next_channel(): + channelno = max(self.channels.keys()) + def _handle_ok(): + if channelno == 0: + if handle_ok is not None: + handle_ok() + else: + close_next_channel() + def _handle_error(code, message): + logging.error('Peer refused to close channel %d: %s (%d)', + channelno, message, code) + if handle_error is not None: + handle_error(channelno, code, message) + else: + raise ProtocolError, '%s (%d)' % (message, code) + self.channels[0].profile.send_close(channelno, handle_ok=_handle_ok, + handle_error=_handle_error) + close_next_channel() + class Initiator(Session): """Root class for BEEP peers in the initiating role.""" @@ -289,22 +332,18 @@ @param port: The port to connect to @param profiles: A dictionary of the supported profiles, where the key is the URI identifying the profile, and the value is a - `Profile` instance that will handle the communication - for that profile + `ProfileHandler` sub-class that will be instantiated to + handle the communication for that profile """ Session.__init__(self, profiles=profiles or {}) - self.terminated = False self.create_socket(socket.AF_INET, socket.SOCK_STREAM) logging.debug('Connecting to %s:%s', ip, port) - try: - self.connect((ip, port)) - except socket.error, e: - raise TerminateSession, 'Connection to %s:%d failed' % ip, port + self.addr = (ip, port) + self.connect(self.addr) - def handle_close(self): - """Called by asyncore when the socket has been closed.""" - self.terminated = True - Session.handle_close(self) + def handle_connect(self): + """Called by asyncore when the connection is established.""" + logging.debug('Connected to peer at %s:%s', self.addr[0], self.addr[1]) def greeting_received(self, profiles): """Sub-classes should override this to start the channels they need. @@ -318,28 +357,15 @@ """Start this peer, which will try to connect to the server and send a greeting. """ - while not self.terminated: - try: - asyncore.loop() - self.terminated = True - except (KeyboardInterrupt, TerminateSession), e: - logging.info('Terminating session') - self._quit() - time.sleep(.25) + try: + asyncore.loop() + except TerminateSession, e: + logging.info('Terminating session') + self.terminate() - def _quit(self): - channelno = max(self.channels.keys()) - def handle_ok(): - if channelno == 0: - self.terminated = True - else: - self._quit() - def handle_error(code, message): - logging.error('Peer refused to close channel %d: %s (%d)', - channelno, message, code) - raise ProtocolError, '%s (%d)' % (message, code) - self.channels[0].profile.send_close(channelno, handle_ok=handle_ok, - handle_error=handle_error) + def quit(self): + self.terminate() + asyncore.loop(timeout=10) class Channel(object): @@ -625,11 +651,10 @@ if cmd == 'RPY': logging.debug('Channel %d closed', channelno) self.session.channels[channelno].close() + if not self.session.channels: + self.session.close() if handle_ok is not None: handle_ok() - if not self.session.channels: - logging.debug('Session terminated') - self.session.close() elif cmd == 'ERR': elem = xmlio.parse(message.get_payload()) text = elem.gettext()