# HG changeset patch # User cmlenz # Date 1118434870 0 # Node ID 2269b705deb934e8e14c4c4bacd57622724a65fa # Parent 5d8457aa20259153fe7d1302dc1271f26af662ed Improved the BEEP protocol implementation: * Callbacks for replying to messages. * Starting of channels implemented. * Some error handling (though not much yet). Also, added a sample client that sends a message using the echo protocol. Finally, added a simple proxy script that outputs the peer-to-peer communication to the console. diff --git a/bitten/util/beep.py b/bitten/util/beep.py --- a/bitten/util/beep.py +++ b/bitten/util/beep.py @@ -28,14 +28,21 @@ from cStringIO import StringIO except ImportError: from StringIO import StringIO +import sys import traceback from bitten.util.xmlio import Element, parse_xml __all__ = ['Listener', 'Initiator', 'Profile'] + class Listener(asyncore.dispatcher): - + """BEEP peer in the listener role. + + This peer opens a socket for listening to incoming connections. For each + connection, it opens a new session: an instance of `Session` that handle + communication with the connected peer. + """ def __init__(self, ip, port): asyncore.dispatcher.__init__(self) self.create_socket(socket.AF_INET, socket.SOCK_STREAM) @@ -68,19 +75,21 @@ pass def handle_accept(self): + """Start a new BEEP session.""" conn, addr = self.accept() print 'Connected to %s:%s' % addr Session(conn, addr, self.profiles, first_channelno=2) class Session(asynchat.async_chat): + """A BEEP session between two peers.""" def __init__(self, conn, addr, profiles, first_channelno=1): asynchat.async_chat.__init__(self, conn) self.addr = addr self.set_terminator('\r\n') - self.profiles = profiles + self.profiles = profiles or {} self.channels = {0: Channel(self, 0, ManagementProfile())} self.channelno = cycle_through(first_channelno, 2147483647, step=2) self.inbuf = [] @@ -89,10 +98,19 @@ def handle_connect(self): pass + def handle_error(self): + t, v = sys.exc_info()[:2] + if t is SystemExit: + raise t, v + else: + asynchat.async_chat.handle_error(self) + def collect_incoming_data(self, data): self.inbuf.append(data) def found_terminator(self): + """Called by async_chat when a terminator is found in the input + stream.""" if self.header is None: self.header = ''.join(self.inbuf).split(' ') self.inbuf = [] @@ -121,28 +139,29 @@ self.set_terminator('\r\n') def _handle_frame(self, header, payload): - try: - # Parse frame header - cmd = header[0] - if cmd == 'SEQ': - # RFC 3081 - need to digest and implement - raise NotImplementedError - channel = int(header[1]) - msgno = int(header[2]) - more = header[3] == '*' - seqno = int(header[4]) - size = int(header[5]) - if cmd == 'ANS': - ansno = int(header[6]) - else: - ansno = None - self.channels[channel].handle_frame(cmd, msgno, more, seqno, - ansno, payload) - except Exception, e: - traceback.print_exc() + """Handle an incoming frame. + + This parses the frame header and decides which channel to pass it to. + """ + cmd = header[0] + if cmd == 'SEQ': + # RFC 3081 - need to digest and implement + raise NotImplementedError + channel = int(header[1]) + msgno = int(header[2]) + more = header[3] == '*' + seqno = int(header[4]) + size = int(header[5]) + if cmd == 'ANS': + ansno = int(header[6]) + else: + ansno = None + self.channels[channel].handle_frame(cmd, msgno, more, seqno, + ansno, payload) - def send_data(self, cmd, channel, msgno, more, seqno, ansno=None, + def send_frame(self, cmd, channel, msgno, more, seqno, ansno=None, payload=''): + """Send the specified data frame to the peer.""" headerbits = [cmd, channel, msgno, more and '*' or '.', seqno, len(payload)] if cmd == 'ANS': @@ -151,13 +170,28 @@ header = ' '.join([str(hb) for hb in headerbits]) self.push('\r\n'.join((header, payload, 'END', ''))) + def start_channel(self, number, profile_uri): + profile = self.profiles[profile_uri] + channel = Channel(self, number, profile) + self.channels[number] = channel + return channel + + def greeting_received(self, profiles): + """Initiator sub-classes should override this to start the channels they + want. + + @param profiles: A list of URIs of the profiles the peer claims to + support. + """ + class Initiator(Session): - def __init__(self, ip, port, profiles=None): + def __init__(self, ip, port, profiles=None, handle_greeting=None): Session.__init__(self, None, None, profiles or {}) self.create_socket(socket.AF_INET, socket.SOCK_STREAM) self.connect((ip, port)) + self.greeting_handler = handle_greeting class Channel(object): @@ -167,7 +201,7 @@ self.session = session self.channelno = channelno self.inqueue = {} - self.outqueue = {} + self.reply_handlers = {} self.msgno = cycle_through(0, 2147483647) self.msgnos = {} # message numbers currently in use @@ -181,22 +215,42 @@ self.profile.handle_connect() def handle_frame(self, cmd, msgno, more, seqno, ansno, payload): - if seqno != self.seqno[0]: # Validate and update sequence number + # Validate and update sequence number + if seqno != self.seqno[0]: raise Exception, 'Out of sync with peer' # TODO: Be nice self.seqno[0] += len(payload) - if more: # More of this message pending, so push it on the queue + + if more: + # More of this message pending, so push it on the queue self.inqueue.setdefault(msgno, []).append(payload) - else: # Complete message received, so handle it - if msgno in self.inqueue.keys(): # Recombine queued messages - payload = ''.join(self.inqueue[msgno]) + payload - del self.inqueue[msgno] - if cmd == 'RPY' and msgno in self.msgnos.keys(): - # Final reply using this message number, so dealloc - del self.msgnos[msgno] - message = None - if payload: - message = self.mime_parser.parsestr(payload) - self.profile.handle_message(cmd, msgno, message) + return + + # Complete message received, so handle it + if msgno in self.inqueue.keys(): + # Recombine queued messages + payload = ''.join(self.inqueue[msgno]) + payload + del self.inqueue[msgno] + if cmd == 'RPY' and msgno in self.msgnos.keys(): + # Final reply using this message number, so dealloc + del self.msgnos[msgno] + message = None + if payload: + message = self.mime_parser.parsestr(payload) + + if cmd == 'MSG': + self.profile.handle_msg(msgno, message) + else: + if msgno in self.reply_handlers.keys(): + self.reply_handlers[msgno](cmd, msgno, message) + del self.reply_handlers[msgno] + elif cmd == 'RPY': + self.profile.handle_rpy(msgno, message) + elif cmd == 'ERR': + self.profile.handle_err(msgno, message) + elif cmd == 'ANS': + self.profile.handle_ans(msgno, ansno, message) + elif cmd == 'NUL': + self.profile.handle_nul(msgno) def _send(self, cmd, msgno, ansno=None, message=None): # TODO: Fragment and queue the message if necessary; @@ -204,18 +258,21 @@ payload = '' if message is not None: payload = message.as_string() - self.session.send_data(cmd, self.channelno, msgno, False, - self.seqno[1].value, payload=payload, - ansno=ansno) + self.session.send_frame(cmd, self.channelno, msgno, False, + self.seqno[1].value, payload=payload, + ansno=ansno) self.seqno[1] += len(payload) - def send_msg(self, message): + def send_msg(self, message, handle_reply=None): while True: # Find a unique message number msgno = self.msgno.next() if msgno not in self.msgnos.keys(): break self.msgnos[msgno] = True # Flag the chosen message number as in use + if handle_reply is not None: + self.reply_handlers[msgno] = handle_reply self._send('MSG', msgno, None, message) + return msgno def send_rpy(self, msgno, message): self._send('RPY', msgno, None, message) @@ -229,16 +286,17 @@ self.ansnos[msgno] = ansno else: ansno = self.ansnos[msgno] - self._send('ANS', msgno, ansno.next(), message) + next_ansno = ansno.next() + self._send('ANS', msgno, next_ansno, message) + return next_ansno def send_nul(self, msgno): self._send('NUL', msgno) + del self.ansnos[msgno] # dealloc answer numbers for the message class Profile(object): """Abstract base class for handlers of specific BEEP profiles.""" - # TODO: This is pretty thin... would a meta-class for declarative definition - # of profiles work here? def __init__(self): self.session = None @@ -247,69 +305,114 @@ def handle_connect(self): pass - def handle_message(self, cmd, message): + def handle_msg(self, msgno, message): + raise NotImplementedError + + def handle_rpy(self, msgno, message): + raise NotImplementedError + + def handle_err(self, msgno, message): + raise NotImplementedError + + def handle_ans(self, msgno, ansno, message): + raise NotImplementedError + + def handle_nul(self, msgno): raise NotImplementedError class ManagementProfile(Profile): CONTENT_TYPE = 'application/beep+xml' - def __init__(self): - Profile.__init__(self) - self.state = 'init' - def handle_connect(self): greeting = Element('greeting')[ [Element('profile', uri=k) for k in self.session.profiles.keys()] ] self.channel.send_rpy(0, MIMEMessage(greeting, self.CONTENT_TYPE)) - def handle_message(self, cmd, msgno, message): + def handle_msg(self, msgno, message): assert message.get_content_type() == self.CONTENT_TYPE root = parse_xml(message.get_payload()) - if cmd == 'MSG': - if root.name == 'start': - print 'Start channel %s' % root.number - for profile in root['profile']: - if uri in self.session.profiles.keys(): + if root.name == 'start': + print 'Start channel %s' % root.number + for profile in root['profile']: + if profile.uri in self.session.profiles.keys(): + try: + self.session.start_channel(int(root.number), + profile.uri) message = MIMEMessage(Element('profile', uri=profile.uri), self.CONTENT_TYPE) self.channel.send_rpy(msgno, message) - # TODO: create the channel return - # TODO: send error (unsupported profile) - elif root.name == 'close': - print 'Close channel %s' % root.number - message = MIMEMessage(Element('ok'), self.CONTENT_TYPE) - self.channel.send_rpy(msgno, message) - # TODO: close the channel, or if channelno is 0, terminate the - # session... actually, that's done implicitly when the - # peer disconnects after receiving the - elif cmd == 'RPY': - if root.name == 'greeting': - print 'Greeting...' - for profile in root['profile']: - print ' profile %s' % profile.uri - # TODO: invoke handler handle_greeting(profiles) or something - elif root.name == 'profile': - # This means that the channel has been established with this - # profile... basically a channel_start_ok message - print 'Profile %s' % root.uri - elif root.name == 'ok': - # A close request for a channel has been accepted, so we can - # close it now - print 'OK' + except StandardError, e: + print e + message = MIMEMessage(Element('error', code=550)[ + 'All request profiles are unsupported' + ], self.CONTENT_TYPE) + self.channel.send_err(msgno, message) + elif root.name == 'close': + print 'Close channel %s' % root.number + message = MIMEMessage(Element('ok'), self.CONTENT_TYPE) + self.channel.send_rpy(msgno, message) + # TODO: close the channel or, if channelno is 0, terminate the + # session... actually, that's done implicitly when the + # peer disconnects after receiving the - def close(self, channel=0, code=200): - xml = Element('close', number=channel, code=code) - self.channel.send_msg(MIMEMessage(xml, self.CONTENT_TYPE)) + def handle_rpy(self, msgno, message): + assert message.get_content_type() == self.CONTENT_TYPE + root = parse_xml(message.get_payload()) + if root.name == 'greeting': + print 'Greeting...' + profiles = [profile.uri for profile in root['profile']] + self.session.greeting_received(profiles) + elif root.name == 'profile': + # This means that the channel has been established with this + # profile... basically a channel_start_ok message + print 'Profile %s' % root.uri + elif root.name == 'ok': + # A close request for a channel has been accepted, so we can + # close it now + print 'OK' - def start(self, profiles): - xml = Element('start', number=self.session.channelno.next())[ - [Element('profile', uri=uri) for uri in profiles] + def handle_err(self, msgno, message): + assert message.get_content_type() == self.CONTENT_TYPE + root = parse_xml(message.get_payload()) + assert root.name == 'error' + print root.code + + def close(self, channel=0, code=200, handle_ok=None, handle_error=None): + def handle_reply(cmd, msgno, message): + if handle_ok is not None and cmd == 'RPY': + handle_ok() + if handle_error is not None and cmd == 'ERR': + root = parse_xml(message.get_payload()) + handle_error(int(root.code), root.gettext()) + xml = Element('close', number=channel, code=code) + return self.channel.send_msg(MIMEMessage(xml, self.CONTENT_TYPE), + handle_reply) + + def start(self, profiles, handle_ok=None, handle_error=None): + channelno = self.session.channelno.next() + def handle_reply(cmd, msgno, message): + if handle_ok is not None and cmd == 'RPY': + root = parse_xml(message.get_payload()) + selected = None + for profile in profiles: + if profile.URI == root.uri: + selected = profile + break + self.session.channels[channelno] = Channel(self.session, + channelno, profile()) + handle_ok(channelno, root.uri) + if handle_error is not None and cmd == 'ERR': + root = parse_xml(message.get_payload()) + handle_error(int(root.code), root.gettext()) + xml = Element('start', number=channelno)[ + [Element('profile', uri=profile.URI) for profile in profiles] ] - self.channel.send_msg(MIMEMessage(xml, self.CONTENT_TYPE)) + return self.channel.send_msg(MIMEMessage(xml, self.CONTENT_TYPE), + handle_reply) def cycle_through(start, stop=None, step=1): @@ -357,8 +460,17 @@ del self['MIME-Version'] +# Simple echo profile implementation for testing +class EchoProfile(Profile): + URI = 'http://www.eigenmagic.com/beep/ECHO' + + def handle_msg(self, msgno, message): + self.channel.send_rpy(msgno, message) + + if __name__ == '__main__': listener = Listener('127.0.0.1', 8000) + listener.profiles[EchoProfile.URI] = EchoProfile() try: asyncore.loop() except KeyboardInterrupt: diff --git a/bitten/util/tests/beep.py b/bitten/util/tests/beep.py --- a/bitten/util/tests/beep.py +++ b/bitten/util/tests/beep.py @@ -9,8 +9,8 @@ def __init__(self): self.sent_messages = [] - def send_data(self, cmd, channel, msgno, more, seqno, ansno=None, - payload=''): + def send_frame(self, cmd, channel, msgno, more, seqno, ansno=None, + payload=''): self.sent_messages.append((cmd, channel, msgno, more, seqno, ansno, payload.strip())) @@ -23,8 +23,13 @@ def handle_connect(self): pass - def handle_message(self, cmd, msgno, message): - self.handled_messages.append((cmd, msgno, message.as_string().strip())) + def handle_msg(self, msgno, message): + text = message.as_string().strip() + self.handled_messages.append(('MSG', msgno, text)) + + def handle_rpy(self, msgno, message): + text = message.as_string().strip() + self.handled_messages.append(('RPY', msgno, text)) class ChannelTestCase(unittest.TestCase): @@ -61,10 +66,10 @@ corresponding message number (0) is reserved. """ channel = beep.Channel(self.session, 0, self.profile) - channel.send_msg(beep.MIMEMessage('foo bar')) - self.assertEqual(('MSG', 0, 0, False, 0L, None, 'foo bar'), + msgno = channel.send_msg(beep.MIMEMessage('foo bar')) + self.assertEqual(('MSG', 0, msgno, False, 0L, None, 'foo bar'), self.session.sent_messages[0]) - assert 0 in channel.msgnos.keys() + assert msgno in channel.msgnos.keys() def test_send_message_msgno_inc(self): """ @@ -72,14 +77,25 @@ messages. """ channel = beep.Channel(self.session, 0, self.profile) - channel.send_msg(beep.MIMEMessage('foo bar')) - self.assertEqual(('MSG', 0, 0, False, 0L, None, 'foo bar'), + msgno = channel.send_msg(beep.MIMEMessage('foo bar')) + assert msgno == 0 + self.assertEqual(('MSG', 0, msgno, False, 0L, None, 'foo bar'), self.session.sent_messages[0]) - assert 0 in channel.msgnos.keys() - channel.send_msg(beep.MIMEMessage('foo baz')) - self.assertEqual(('MSG', 0, 1, False, 8L, None, 'foo baz'), + assert msgno in channel.msgnos.keys() + msgno = channel.send_msg(beep.MIMEMessage('foo baz')) + assert msgno == 1 + self.assertEqual(('MSG', 0, msgno, False, 8L, None, 'foo baz'), self.session.sent_messages[1]) - assert 1 in channel.msgnos.keys() + assert msgno in channel.msgnos.keys() + + def test_send_reply(self): + """ + Verify that sending an ANS message is processed correctly. + """ + channel = beep.Channel(self.session, 0, self.profile) + channel.send_rpy(0, beep.MIMEMessage('foo bar')) + self.assertEqual(('RPY', 0, 0, False, 0L, None, 'foo bar'), + self.session.sent_messages[0]) def test_message_and_reply(self): """ @@ -87,25 +103,42 @@ received. """ channel = beep.Channel(self.session, 0, self.profile) - channel.send_msg(beep.MIMEMessage('foo bar')) - self.assertEqual(('MSG', 0, 0, False, 0L, None, 'foo bar'), + msgno = channel.send_msg(beep.MIMEMessage('foo bar')) + self.assertEqual(('MSG', 0, msgno, False, 0L, None, 'foo bar'), self.session.sent_messages[0]) - assert 0 in channel.msgnos.keys() - channel.handle_frame('RPY', 0, False, 0, None, '42') - self.assertEqual(('RPY', 0, '42'), self.profile.handled_messages[0]) - assert 0 not in channel.msgnos.keys() + assert msgno in channel.msgnos.keys() + channel.handle_frame('RPY', msgno, False, 0, None, '42') + self.assertEqual(('RPY', msgno, '42'), self.profile.handled_messages[0]) + assert msgno not in channel.msgnos.keys() - def test_send_answer(self): + def test_send_error(self): + """ + Verify that sending an ERR message is processed correctly. + """ + channel = beep.Channel(self.session, 0, self.profile) + channel.send_err(0, beep.MIMEMessage('oops')) + self.assertEqual(('ERR', 0, 0, False, 0L, None, 'oops'), + self.session.sent_messages[0]) + + def test_send_answers(self): """ Verify that sending an ANS message is processed correctly. """ channel = beep.Channel(self.session, 0, self.profile) - channel.send_ans(0, beep.MIMEMessage('foo bar')) - self.assertEqual(('ANS', 0, 0, False, 0L, 0, 'foo bar'), + ansno = channel.send_ans(0, beep.MIMEMessage('foo bar')) + assert ansno == 0 + self.assertEqual(('ANS', 0, 0, False, 0L, ansno, 'foo bar'), self.session.sent_messages[0]) - channel.send_ans(0, beep.MIMEMessage('foo baz')) - self.assertEqual(('ANS', 0, 0, False, 8L, 1, 'foo baz'), + assert 0 in channel.ansnos.keys() + ansno = channel.send_ans(0, beep.MIMEMessage('foo baz')) + assert ansno == 1 + self.assertEqual(('ANS', 0, 0, False, 8L, ansno, 'foo baz'), self.session.sent_messages[1]) + assert 0 in channel.ansnos.keys() + channel.send_nul(0) + self.assertEqual(('NUL', 0, 0, False, 16L, None, ''), + self.session.sent_messages[2]) + assert 0 not in channel.ansnos.keys() def suite(): diff --git a/scripts/beepclient.py b/scripts/beepclient.py new file mode 100644 --- /dev/null +++ b/scripts/beepclient.py @@ -0,0 +1,59 @@ +import asyncore +import sys +import time + +from bitten.util.beep import Initiator, MIMEMessage, Profile +from bitten.util.xmlio import Element, parse_xml + + +if __name__ == '__main__': + host = 'localhost' + port = 8000 + if len(sys.argv) > 1: + host = sys.argv[1] + if len(sys.argv) > 2: + port = int(sys.argv[2]) + + + class EchoProfile(Profile): + URI = 'http://www.eigenmagic.com/beep/ECHO' + + def handle_connect(self): + print 'Here we are!' + msgno = self.channel.send_msg(MIMEMessage('Hello peer!')) + + def handle_rpy(self, msgno, message): + print message.get_payload() + + + class EchoClient(Initiator): + + def handle_start_error(self, code, message): + print>>sys.stderr, 'Error %d: %s' % (code, message) + + def handle_start_ok(self, channelno, uri): + print 'Channel %d started for profile %s...' % (channelno, uri) + #self.channels[channelno].send_msg(MIMEMessage('Hello')) + + def greeting_received(self, profiles): + if EchoProfile.URI in profiles: + self.channels[0].profile.start([EchoProfile], + handle_ok=self.handle_start_ok, + handle_error=self.handle_start_error) + + client = EchoClient(host, port) + try: + while client: + try: + asyncore.loop() + except KeyboardInterrupt: + def handle_ok(): + raise asyncore.ExitNow, 'Session terminated' + def handle_error(code, message): + print>>sys.stderr, 'Peer refused to terminate session (%d)' \ + % code + client.channels[0].profile.close(handle_ok=handle_ok, + handle_error=handle_error) + time.sleep(.2) + except asyncore.ExitNow, e: + print e diff --git a/scripts/proxy.py b/scripts/proxy.py new file mode 100644 --- /dev/null +++ b/scripts/proxy.py @@ -0,0 +1,93 @@ +# Based on the proxy module from the Medusa project +# Used for inspecting the communication between two BEEP peers + +import asynchat +import asyncore +import socket +import sys + + +class proxy_server(asyncore.dispatcher): + + def __init__(self, host, port): + asyncore.dispatcher.__init__ (self) + self.create_socket(socket.AF_INET, socket.SOCK_STREAM) + self.set_reuse_addr() + self.there = (host, port) + here = ('', port + 1) + self.bind(here) + self.listen(5) + + def handle_accept(self): + proxy_receiver(self, self.accept()) + + +class proxy_sender(asynchat.async_chat): + + def __init__(self, receiver, address): + asynchat.async_chat.__init__(self) + self.receiver = receiver + self.set_terminator(None) + self.create_socket(socket.AF_INET, socket.SOCK_STREAM) + self.buffer = '' + self.set_terminator('\r\n') + self.connect(address) + print 'L:', '' + + def handle_connect(self): + print 'L:', '' + + def collect_incoming_data(self, data): + self.buffer = self.buffer + data + + def found_terminator(self): + data = self.buffer + self.buffer = '' + for line in data.splitlines(): + print 'L:', '\x1b[35m' + line + '\x1b[0m' + self.receiver.push(data + '\r\n') + + def handle_close(self): + self.receiver.close() + self.close() + + +class proxy_receiver(asynchat.async_chat): + + channel_counter = 0 + + def __init__(self, server, (conn, addr)): + asynchat.async_chat.__init__(self, conn) + self.set_terminator('\r\n') + self.server = server + self.id = self.channel_counter + self.channel_counter = self.channel_counter + 1 + self.sender = proxy_sender (self, server.there) + self.sender.id = self.id + self.buffer = '' + + def collect_incoming_data (self, data): + self.buffer = self.buffer + data + + def found_terminator(self): + data = self.buffer + self.buffer = '' + for line in data.splitlines(): + print 'I:', '\x1b[34m' + line + '\x1b[0m' + self.sender.push (data + '\r\n') + + def handle_connect(self): + print 'I:', '' + + def handle_close(self): + print 'I:', '' + self.sender.close() + self.close() + + +if __name__ == '__main__': + if len(sys.argv) < 3: + print 'Usage: %s ' % sys.argv[0] + else: + ps = proxy_server(sys.argv[1], int(sys.argv[2])) + asyncore.loop()