# HG changeset patch # User cmlenz # Date 1118654824 0 # Node ID 4d31bd0c3eba7e0cac69707e7dd3b713428605aa # Parent 2269b705deb934e8e14c4c4bacd57622724a65fa Improved BEEP implementation. diff --git a/bitten/util/__init__.py b/bitten/util/__init__.py --- a/bitten/util/__init__.py +++ b/bitten/util/__init__.py @@ -0,0 +1,19 @@ +# -*- coding: iso8859-1 -*- +# +# Copyright (C) 2005 Christopher Lenz +# +# Bitten is free software; you can redistribute it and/or +# modify it under the terms of the GNU General Public License as +# published by the Free Software Foundation; either version 2 of the +# License, or (at your option) any later version. +# +# Trac is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. +# +# Author: Christopher Lenz diff --git a/bitten/util/beep.py b/bitten/util/beep.py --- a/bitten/util/beep.py +++ b/bitten/util/beep.py @@ -18,24 +18,33 @@ # # Author: Christopher Lenz +"""Minimal implementation of the BEEP protocol (IETF RFC 3080) based on the +`asyncore` module. + +Current limitations: + * No support for the TSL and SASL profiles. + * No support for mapping frames (SEQ frames for TCP mapping). +""" + import asynchat import asyncore from email.Message import Message from email.Parser import Parser -import mimetools import socket -try: - from cStringIO import StringIO -except ImportError: - from StringIO import StringIO import sys -import traceback from bitten.util.xmlio import Element, parse_xml __all__ = ['Listener', 'Initiator', 'Profile'] +BEEP_XML = 'application/beep+xml' + + +class ProtocolError(Exception): + """Generic root class for BEEP exceptions.""" + + class Listener(asyncore.dispatcher): """BEEP peer in the listener role. @@ -48,20 +57,9 @@ self.create_socket(socket.AF_INET, socket.SOCK_STREAM) self.set_reuse_addr() self.bind((ip, port)) - self.profiles = {} - self.listen(5) - host, port = self.socket.getsockname() - if not ip: - ip = socket.gethostbyname (socket.gethostname()) - try: - self.server_name = socket.gethostbyaddr(ip)[0] - except socket.error: - self.server_name = ip - print 'Listening on %s:%s' % (self.server_name, port) - def writable(self): return False @@ -90,10 +88,11 @@ self.set_terminator('\r\n') self.profiles = profiles or {} - self.channels = {0: Channel(self, 0, ManagementProfile())} - self.channelno = cycle_through(first_channelno, 2147483647, step=2) self.inbuf = [] self.header = self.payload = None + + self.channelno = cycle_through(first_channelno, 2147483647, step=2) + self.channels = {0: Channel(self, 0, ManagementProfile())} def handle_connect(self): pass @@ -110,17 +109,27 @@ def found_terminator(self): """Called by async_chat when a terminator is found in the input - stream.""" + stream. + + Parse the incoming data depending on whether it terminated on the frame + header, playload or trailer. For the header, extract the payload size + parameter and use it as terminator or the payload. When the trailer has + been received, delegate to `_handle_frame()`. + """ if self.header is None: + # Frame header received self.header = ''.join(self.inbuf).split(' ') self.inbuf = [] if self.header[0] == 'SEQ': # TCP mapping frame raise NotImplementedError else: + # Extract payload size to use as next terminator try: size = int(self.header[int(self.header[0] != 'ANS') - 2]) except ValueError: + # TODO: Malformed frame... should we terminate the session + # here? self.header = None return if size == 0: @@ -129,10 +138,12 @@ else: self.set_terminator(size) elif self.payload is None: + # Frame payload received self.payload = ''.join(self.inbuf) self.inbuf = [] self.set_terminator('END\r\n') else: + # Frame trailer received self._handle_frame(self.header, self.payload) self.header = self.payload = None self.inbuf = [] @@ -143,21 +154,23 @@ This parses the frame header and decides which channel to pass it to. """ - cmd = header[0] - if cmd == 'SEQ': - # RFC 3081 - need to digest and implement - raise NotImplementedError - channel = int(header[1]) - msgno = int(header[2]) - more = header[3] == '*' - seqno = int(header[4]) - size = int(header[5]) - if cmd == 'ANS': - ansno = int(header[6]) - else: + msgno = None + channel = None + try: + cmd = header[0].upper() + channel = int(header[1]) + msgno = int(header[2]) + more = header[3] == '*' + seqno = int(header[4]) + size = int(header[5]) ansno = None - self.channels[channel].handle_frame(cmd, msgno, more, seqno, - ansno, payload) + if cmd == 'ANS': + ansno = int(header[6]) + self.channels[channel].handle_frame(cmd, msgno, more, seqno, + ansno, payload) + except (ValueError, TypeError, ProtocolError), e: + if channel == 0 and msgno is not None: + self.channels[0].profile.send_error(msgno, 550, e) def send_frame(self, cmd, channel, msgno, more, seqno, ansno=None, payload=''): @@ -170,43 +183,52 @@ header = ' '.join([str(hb) for hb in headerbits]) self.push('\r\n'.join((header, payload, 'END', ''))) - def start_channel(self, number, profile_uri): - profile = self.profiles[profile_uri] - channel = Channel(self, number, profile) - self.channels[number] = channel - return channel + +class Initiator(Session): + """Root class for BEEP peers in the initiating role.""" + + def __init__(self, ip, port, profiles=None): + """Create the BEEP session. + + @param ip: The IP address to connect to + @param port: The port to connect to + @param profiles: A dictionary of the supported profiles, where the key + is the URI identifying the profile, and the value is a + `Profile` instance that will handle the communication + for that profile + """ + Session.__init__(self, None, None, profiles or {}) + self.create_socket(socket.AF_INET, socket.SOCK_STREAM) + self.connect((ip, port)) def greeting_received(self, profiles): - """Initiator sub-classes should override this to start the channels they - want. + """Sub-classes should override this to start the channels they need. @param profiles: A list of URIs of the profiles the peer claims to support. """ -class Initiator(Session): - - def __init__(self, ip, port, profiles=None, handle_greeting=None): - Session.__init__(self, None, None, profiles or {}) - self.create_socket(socket.AF_INET, socket.SOCK_STREAM) - self.connect((ip, port)) - self.greeting_handler = handle_greeting - - class Channel(object): """A specific channel of a BEEP session.""" def __init__(self, session, channelno, profile): + """Create the channel. + + @param session The `Session` object that the channel belongs to + @param channelno The channel number + @param profile The associated `Profile` object + """ self.session = session self.channelno = channelno self.inqueue = {} + self.outqueue = [] self.reply_handlers = {} self.msgno = cycle_through(0, 2147483647) self.msgnos = {} # message numbers currently in use self.ansnos = {} # answer numbers keyed by msgno, each 0-2147483647 - self.seqno = [serial(), serial()] + self.seqno = [serial(), serial()] # incoming, outgoing sequence numbers self.mime_parser = Parser() self.profile = profile @@ -215,9 +237,18 @@ self.profile.handle_connect() def handle_frame(self, cmd, msgno, more, seqno, ansno, payload): + """Process a single data frame. + + @param cmd: The frame keyword (MSG, RPY, ERR, ANS or NUL) + @param msgno: The message number + @param more: `True` if more frames are pending for this message + @param seqno: Sequence number of the frame + @param ansno: The answer number for 'ANS' messages, otherwise `None` + @param payload: The frame payload as a string + """ # Validate and update sequence number if seqno != self.seqno[0]: - raise Exception, 'Out of sync with peer' # TODO: Be nice + raise ProtocolError, 'Out of sync with peer' # TODO: Be nice self.seqno[0] += len(payload) if more: @@ -296,123 +327,138 @@ class Profile(object): - """Abstract base class for handlers of specific BEEP profiles.""" + """Abstract base class for handlers of specific BEEP profiles. + + Concrete subclasses need to at least implement the `handle_msg()` method, + and may override any of the others. + """ def __init__(self): + """Create the profile.""" self.session = None self.channel = None def handle_connect(self): + """Called when the channel this profile is associated with is + initially started.""" pass def handle_msg(self, msgno, message): raise NotImplementedError def handle_rpy(self, msgno, message): - raise NotImplementedError + pass def handle_err(self, msgno, message): - raise NotImplementedError + pass def handle_ans(self, msgno, ansno, message): - raise NotImplementedError + pass def handle_nul(self, msgno): - raise NotImplementedError + pass class ManagementProfile(Profile): - CONTENT_TYPE = 'application/beep+xml' + """Implementation of the BEEP management profile.""" def handle_connect(self): + """Send a greeting reply directly after connecting to the peer.""" greeting = Element('greeting')[ [Element('profile', uri=k) for k in self.session.profiles.keys()] ] - self.channel.send_rpy(0, MIMEMessage(greeting, self.CONTENT_TYPE)) + self.channel.send_rpy(0, MIMEMessage(greeting, BEEP_XML)) def handle_msg(self, msgno, message): - assert message.get_content_type() == self.CONTENT_TYPE - root = parse_xml(message.get_payload()) - if root.name == 'start': - print 'Start channel %s' % root.number - for profile in root['profile']: + assert message.get_content_type() == BEEP_XML + elem = parse_xml(message.get_payload()) + + if elem.name == 'start': + for profile in elem['profile']: if profile.uri in self.session.profiles.keys(): - try: - self.session.start_channel(int(root.number), - profile.uri) - message = MIMEMessage(Element('profile', - uri=profile.uri), - self.CONTENT_TYPE) - self.channel.send_rpy(msgno, message) - return - except StandardError, e: - print e - message = MIMEMessage(Element('error', code=550)[ - 'All request profiles are unsupported' - ], self.CONTENT_TYPE) - self.channel.send_err(msgno, message) - elif root.name == 'close': - print 'Close channel %s' % root.number - message = MIMEMessage(Element('ok'), self.CONTENT_TYPE) + print 'Start channel %s for profile <%s>' % (elem.number, + profile.uri) + channel = Channel(self.session, int(elem.number), + self.session.profiles[profile.uri]) + self.session.channels[int(elem.number)] = channel + message = MIMEMessage(Element('profile', uri=profile.uri), + BEEP_XML) + self.channel.send_rpy(msgno, message) + return + self.send_error(msgno, 550, + 'All requested profiles are unsupported') + + elif elem.name == 'close': + channelno = int(elem.number) + if channelno == 0: + if len(self.session.channels) > 1: + self.send_error(msgno, 550, 'Other channels still open') + return + if self.session.channels[channelno].msgnos: + self.send_error(msgno, 550, 'Channel waiting for replies') + return + print 'Close channel %s' % (channelno) + del self.session.channels[channelno] + message = MIMEMessage(Element('ok'), BEEP_XML) self.channel.send_rpy(msgno, message) - # TODO: close the channel or, if channelno is 0, terminate the - # session... actually, that's done implicitly when the - # peer disconnects after receiving the + if channelno == 0: + self.session.close() def handle_rpy(self, msgno, message): - assert message.get_content_type() == self.CONTENT_TYPE - root = parse_xml(message.get_payload()) - if root.name == 'greeting': - print 'Greeting...' - profiles = [profile.uri for profile in root['profile']] - self.session.greeting_received(profiles) - elif root.name == 'profile': - # This means that the channel has been established with this - # profile... basically a channel_start_ok message - print 'Profile %s' % root.uri - elif root.name == 'ok': - # A close request for a channel has been accepted, so we can - # close it now - print 'OK' + assert message.get_content_type() == BEEP_XML + elem = parse_xml(message.get_payload()) + + if elem.name == 'greeting': + if isinstance(self.session, Initiator): + profiles = [profile.uri for profile in elem['profile']] + self.session.greeting_received(profiles) + + else: # and are handled by callbacks + self.send_error(msgno, 501, 'What are you replying to, son?') def handle_err(self, msgno, message): - assert message.get_content_type() == self.CONTENT_TYPE - root = parse_xml(message.get_payload()) - assert root.name == 'error' - print root.code + # Probably an error on connect, because other errors should get handled + # by the corresponding callbacks + # TODO: Terminate the session, I guess + assert message.get_content_type() == BEEP_XML + elem = parse_xml(message.get_payload()) + assert elem.name == 'error' + print elem.code - def close(self, channel=0, code=200, handle_ok=None, handle_error=None): + def send_close(self, channelno=0, code=200, handle_ok=None, + handle_error=None): def handle_reply(cmd, msgno, message): if handle_ok is not None and cmd == 'RPY': + del self.session.channels[channelno] handle_ok() if handle_error is not None and cmd == 'ERR': - root = parse_xml(message.get_payload()) - handle_error(int(root.code), root.gettext()) - xml = Element('close', number=channel, code=code) - return self.channel.send_msg(MIMEMessage(xml, self.CONTENT_TYPE), - handle_reply) + elem = parse_xml(message.get_payload()) + handle_error(int(elem.code), elem.gettext()) + xml = Element('close', number=channelno, code=code) + return self.channel.send_msg(MIMEMessage(xml, BEEP_XML), handle_reply) - def start(self, profiles, handle_ok=None, handle_error=None): + def send_error(self, msgno, code, message=''): + xml = Element('error', code=code)[message] + self.channel.send_err(msgno, MIMEMessage(xml, BEEP_XML)) + + def send_start(self, profiles, handle_ok=None, handle_error=None): channelno = self.session.channelno.next() def handle_reply(cmd, msgno, message): if handle_ok is not None and cmd == 'RPY': - root = parse_xml(message.get_payload()) - selected = None - for profile in profiles: - if profile.URI == root.uri: - selected = profile - break - self.session.channels[channelno] = Channel(self.session, - channelno, profile()) - handle_ok(channelno, root.uri) + elem = parse_xml(message.get_payload()) + for profile in [p for p in profiles if p.URI == elem.uri]: + self.session.channels[channelno] = Channel(self.session, + channelno, + profile()) + break + handle_ok(channelno, elem.uri) if handle_error is not None and cmd == 'ERR': - root = parse_xml(message.get_payload()) - handle_error(int(root.code), root.gettext()) + elem = parse_xml(message.get_payload()) + handle_error(int(elem.code), elem.gettext()) xml = Element('start', number=channelno)[ [Element('profile', uri=profile.URI) for profile in profiles] ] - return self.channel.send_msg(MIMEMessage(xml, self.CONTENT_TYPE), - handle_reply) + return self.channel.send_msg(MIMEMessage(xml, BEEP_XML), handle_reply) def cycle_through(start, stop=None, step=1): @@ -423,7 +469,7 @@ cur = start while True: yield cur - cur += 1 + cur += step if cur > stop: cur = start @@ -460,15 +506,14 @@ del self['MIME-Version'] -# Simple echo profile implementation for testing -class EchoProfile(Profile): - URI = 'http://www.eigenmagic.com/beep/ECHO' +if __name__ == '__main__': + # Simple echo profile implementation for testing + class EchoProfile(Profile): + URI = 'http://beepcore.org/beep/ECHO' - def handle_msg(self, msgno, message): - self.channel.send_rpy(msgno, message) + def handle_msg(self, msgno, message): + self.channel.send_rpy(msgno, message) - -if __name__ == '__main__': listener = Listener('127.0.0.1', 8000) listener.profiles[EchoProfile.URI] = EchoProfile() try: diff --git a/bitten/util/xmlio.py b/bitten/util/xmlio.py --- a/bitten/util/xmlio.py +++ b/bitten/util/xmlio.py @@ -133,9 +133,6 @@ def __init__(self, node): self.node = node - def __del__(self): - self.node.unlink() - name = property(fget=lambda self: self.node.tagName) def __getattr__(self, name): diff --git a/scripts/beepclient.py b/scripts/echoclient.py rename from scripts/beepclient.py rename to scripts/echoclient.py --- a/scripts/beepclient.py +++ b/scripts/echoclient.py @@ -6,6 +6,46 @@ from bitten.util.xmlio import Element, parse_xml +class StdinChannel(asyncore.file_dispatcher): + + def __init__(self, handle_read): + asyncore.file_dispatcher.__init__(self, sys.stdin.fileno()) + self.read_handler = handle_read + + def readable(self): + return True + + def handle_read(self): + data = self.recv(8192) + self.read_handler(data) + + def writable(self): + return False + + +class EchoProfile(Profile): + URI = 'http://beepcore.org/beep/ECHO' + + def handle_rpy(self, msgno, message): + print '\x1b[31m' + message.get_payload().rstrip() + '\x1b[0m' + + +class EchoClient(Initiator): + + channel = None + + def greeting_received(self, profiles): + def handle_ok(channelno, uri): + print 'Channel %d started for profile %s...' % (channelno, uri) + self.channel = channelno + def handle_error(code, message): + print>>sys.stderr, 'Error %d: %s' % (code, message) + if EchoProfile.URI in profiles: + self.channels[0].profile.send_start([EchoProfile], + handle_ok=handle_ok, + handle_error=handle_error) + + if __name__ == '__main__': host = 'localhost' port = 8000 @@ -14,46 +54,25 @@ if len(sys.argv) > 2: port = int(sys.argv[2]) - - class EchoProfile(Profile): - URI = 'http://www.eigenmagic.com/beep/ECHO' - - def handle_connect(self): - print 'Here we are!' - msgno = self.channel.send_msg(MIMEMessage('Hello peer!')) - - def handle_rpy(self, msgno, message): - print message.get_payload() - - - class EchoClient(Initiator): - - def handle_start_error(self, code, message): - print>>sys.stderr, 'Error %d: %s' % (code, message) - - def handle_start_ok(self, channelno, uri): - print 'Channel %d started for profile %s...' % (channelno, uri) - #self.channels[channelno].send_msg(MIMEMessage('Hello')) - - def greeting_received(self, profiles): - if EchoProfile.URI in profiles: - self.channels[0].profile.start([EchoProfile], - handle_ok=self.handle_start_ok, - handle_error=self.handle_start_error) - client = EchoClient(host, port) + def handle_input(data): + message = MIMEMessage(data, 'text/plain') + client.channels[client.channel].send_msg(message) + stdin = StdinChannel(handle_input) try: while client: try: asyncore.loop() except KeyboardInterrupt: + mgmt = client.channels[0].profile def handle_ok(): raise asyncore.ExitNow, 'Session terminated' def handle_error(code, message): - print>>sys.stderr, 'Peer refused to terminate session (%d)' \ - % code - client.channels[0].profile.close(handle_ok=handle_ok, - handle_error=handle_error) - time.sleep(.2) + print>>sys.stderr, \ + 'Peer refused to terminate session (%d): %s' \ + % (code, message) + mgmt.send_close(client.channel) + mgmt.send_close(handle_ok=handle_ok, handle_error=handle_error) + time.sleep(.25) except asyncore.ExitNow, e: print e