# HG changeset patch # User cmlenz # Date 1118954258 0 # Node ID 1733c601d2f8a58976a8b8b21f37d77225982155 # Parent 21aa17f975228cad5e741a1dff1c0d07661d88e4 Refactored the asyncore loop and shutdown procedure into {{{beep.Initiator}}}. diff --git a/bitten/master.py b/bitten/master.py --- a/bitten/master.py +++ b/bitten/master.py @@ -27,7 +27,7 @@ from bitten.util.xmlio import Element, parse_xml -class BittenProfile(beep.Profile): +class BittenProfileHandler(beep.Profile): URI = 'http://bitten.cmlenz.net/beep-profile/' def __init__(self): @@ -46,6 +46,7 @@ os = child.gettext() os_family = child.family os_version = child.version + rpy = beep.MIMEMessage(Element('ok'), beep.BEEP_XML) self.channel.send_rpy(msgno, rpy) print 'Connected to %s (%s running %s %s [%s])' \ @@ -67,7 +68,7 @@ port = 7633 listener = beep.Listener('localhost', port) - listener.profiles[BittenProfile.URI] = BittenProfile() + listener.profiles[BittenProfileHandler.URI] = BittenProfileHandler() try: asyncore.loop() except KeyboardInterrupt: diff --git a/bitten/slave.py b/bitten/slave.py --- a/bitten/slave.py +++ b/bitten/slave.py @@ -31,6 +31,7 @@ class Slave(beep.Initiator): channelno = None # The channel number used by the bitten profile + terminated = False def channel_started(self, channelno, profile_uri): if profile_uri == BittenProfileHandler.URI: @@ -41,6 +42,7 @@ self.channels[0].profile.send_start([BittenProfileHandler], handle_ok=self.channel_started) + class BittenProfileHandler(beep.Profile): """Handles communication on the Bitten profile from the client perspective. """ @@ -98,18 +100,6 @@ slave = Slave(host, port) try: - try: - asyncore.loop() - except KeyboardInterrupt, beep.TerminateSession: - def handle_ok(): - raise asyncore.ExitNow, 'Session terminated' - def handle_error(code, message): - print>>sys.stderr, \ - 'Build master refused to terminate session (%d): %s' \ - % (code, message) - slave.channels[0].profile.send_close(slave.channelno) - slave.channels[0].profile.send_close(handle_ok=handle_ok, - handle_error=handle_error) - time.sleep(.25) + slave.run() except beep.TerminateSession, e: - print e + print 'Session terminated:', e diff --git a/bitten/util/beep.py b/bitten/util/beep.py --- a/bitten/util/beep.py +++ b/bitten/util/beep.py @@ -32,6 +32,7 @@ from email.Parser import Parser import socket import sys +import time from bitten.util.xmlio import Element, parse_xml @@ -151,10 +152,12 @@ self.set_terminator('END\r\n') else: # Frame trailer received - self._handle_frame(self.header, self.payload) - self.header = self.payload = None - self.inbuf = [] - self.set_terminator('\r\n') + try: + self._handle_frame(self.header, self.payload) + finally: + self.header = self.payload = None + self.inbuf = [] + self.set_terminator('\r\n') def _handle_frame(self, header, payload): """Handle an incoming frame. @@ -205,9 +208,13 @@ for that profile """ Session.__init__(self, None, None, profiles or {}) + self.terminated = False self.create_socket(socket.AF_INET, socket.SOCK_STREAM) self.connect((ip, port)) + def handle_close(self): + self.terminated = True + def greeting_received(self, profiles): """Sub-classes should override this to start the channels they need. @@ -215,6 +222,31 @@ support. """ + def run(self): + """Start this peer, which will try to connect to the server and send a + greeting. + """ + while not self.terminated: + try: + asyncore.loop() + print 'Normal exit' + self.terminated = True + except (KeyboardInterrupt, TerminateSession): + self._quit() + time.sleep(.25) + + def _quit(self): + channelno = max(self.channels.keys()) + def handle_ok(): + if channelno == 0: + self.terminated = True + else: + self._quit() + def handle_error(code, message): + raise ProtocolError, '%s (%d)' % (message, code) + self.channels[0].profile.send_close(channelno, handle_ok=handle_ok, + handle_error=handle_error) + class Channel(object): """A specific channel of a BEEP session.""" @@ -397,6 +429,9 @@ elif elem.tagname == 'close': channelno = int(elem.number) + if not channelno in self.session.channels: + self.send_error(msgno, 550, 'Channel not open') + return if channelno == 0: if len(self.session.channels) > 1: self.send_error(msgno, 550, 'Other channels still open') @@ -404,11 +439,10 @@ if self.session.channels[channelno].msgnos: self.send_error(msgno, 550, 'Channel waiting for replies') return - print 'Close channel %s' % (channelno) del self.session.channels[channelno] message = MIMEMessage(Element('ok'), BEEP_XML) self.channel.send_rpy(msgno, message) - if channelno == 0: + if not self.session.channels: self.session.close() def handle_rpy(self, msgno, message): @@ -430,19 +464,22 @@ assert message.get_content_type() == BEEP_XML elem = parse_xml(message.get_payload()) assert elem.tagname == 'error' - print elem.code + print 'Received error in response to message #%d: %s (%s)' \ + % (msgno, elem.gettext(), elem.code) def send_close(self, channelno=0, code=200, handle_ok=None, handle_error=None): - def handle_reply(cmd, msgno, message): - if handle_ok is not None and cmd == 'RPY': + if cmd == 'RPY': del self.session.channels[channelno] - handle_ok() - if handle_error is not None and cmd == 'ERR': + if handle_ok is not None: + handle_ok() + if not self.session.channels: + self.session.close() + elif cmd == 'ERR': elem = parse_xml(message.get_payload()) - handle_error(int(elem.code), elem.gettext()) - + if handle_error is not None: + handle_error(int(elem.code), elem.gettext()) xml = Element('close', number=channelno, code=code) return self.channel.send_msg(MIMEMessage(xml, BEEP_XML), handle_reply) @@ -452,7 +489,6 @@ def send_start(self, profiles, handle_ok=None, handle_error=None): channelno = self.session.channelno.next() - def handle_reply(cmd, msgno, message): if handle_ok is not None and cmd == 'RPY': elem = parse_xml(message.get_payload()) @@ -472,6 +508,18 @@ return self.channel.send_msg(MIMEMessage(xml, BEEP_XML), handle_reply) +class MIMEMessage(Message): + """Simplified construction of generic MIME messages for transmission as + payload with BEEP.""" + + def __init__(self, payload, content_type=None): + Message.__init__(self) + if content_type: + self.set_type(content_type) + self.set_payload(str(payload)) + del self['MIME-Version'] + + def cycle_through(start, stop=None, step=1): """Utility generator that cycles through a defined range of numbers.""" if stop is None: @@ -503,31 +551,3 @@ if self.value > self.limit: self.value -= self.limit return self - - -class MIMEMessage(Message): - """Simplified construction of generic MIME messages for transmission as - payload with BEEP.""" - - def __init__(self, payload, content_type=None): - Message.__init__(self) - if content_type: - self.set_type(content_type) - self.set_payload(str(payload)) - del self['MIME-Version'] - - -if __name__ == '__main__': - # Simple echo profile implementation for testing - class EchoProfile(Profile): - URI = 'http://beepcore.org/beep/ECHO' - - def handle_msg(self, msgno, message): - self.channel.send_rpy(msgno, message) - - listener = Listener('127.0.0.1', 8000) - listener.profiles[EchoProfile.URI] = EchoProfile() - try: - asyncore.loop() - except KeyboardInterrupt: - pass