changeset 11:4d31bd0c3eba

Improved BEEP implementation.
author cmlenz
date Mon, 13 Jun 2005 09:27:04 +0000
parents 2269b705deb9
children fd802a55be55
files bitten/util/__init__.py bitten/util/beep.py bitten/util/xmlio.py scripts/beepclient.py scripts/echoclient.py
diffstat 4 files changed, 245 insertions(+), 165 deletions(-) [+]
line wrap: on
line diff
--- a/bitten/util/__init__.py
+++ b/bitten/util/__init__.py
@@ -0,0 +1,19 @@
+# -*- coding: iso8859-1 -*-
+#
+# Copyright (C) 2005 Christopher Lenz <cmlenz@gmx.de>
+#
+# Bitten is free software; you can redistribute it and/or
+# modify it under the terms of the GNU General Public License as
+# published by the Free Software Foundation; either version 2 of the
+# License, or (at your option) any later version.
+#
+# Trac is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+# General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
+#
+# Author: Christopher Lenz <cmlenz@gmx.de>
--- a/bitten/util/beep.py
+++ b/bitten/util/beep.py
@@ -18,24 +18,33 @@
 #
 # Author: Christopher Lenz <cmlenz@gmx.de>
 
+"""Minimal implementation of the BEEP protocol (IETF RFC 3080) based on the
+`asyncore` module.
+
+Current limitations:
+ * No support for the TSL and SASL profiles.
+ * No support for mapping frames (SEQ frames for TCP mapping). 
+"""
+
 import asynchat
 import asyncore
 from email.Message import Message
 from email.Parser import Parser
-import mimetools
 import socket
-try:
-    from cStringIO import StringIO
-except ImportError:
-    from StringIO import StringIO
 import sys
-import traceback
 
 from bitten.util.xmlio import Element, parse_xml
 
 __all__ = ['Listener', 'Initiator', 'Profile']
 
 
+BEEP_XML = 'application/beep+xml'
+
+
+class ProtocolError(Exception):
+    """Generic root class for BEEP exceptions."""
+
+
 class Listener(asyncore.dispatcher):
     """BEEP peer in the listener role.
     
@@ -48,20 +57,9 @@
         self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
         self.set_reuse_addr()
         self.bind((ip, port))
-
         self.profiles = {}
-
         self.listen(5)
 
-        host, port = self.socket.getsockname()
-        if not ip:
-            ip = socket.gethostbyname (socket.gethostname())
-        try:
-            self.server_name = socket.gethostbyaddr(ip)[0]
-        except socket.error:
-            self.server_name = ip
-        print 'Listening on %s:%s' % (self.server_name, port)
-
     def writable(self):
         return False
 
@@ -90,10 +88,11 @@
         self.set_terminator('\r\n')
 
         self.profiles = profiles or {}
-        self.channels = {0: Channel(self, 0, ManagementProfile())}
-        self.channelno = cycle_through(first_channelno, 2147483647, step=2)
         self.inbuf = []
         self.header = self.payload = None
+        
+        self.channelno = cycle_through(first_channelno, 2147483647, step=2)
+        self.channels = {0: Channel(self, 0, ManagementProfile())}
 
     def handle_connect(self):
         pass
@@ -110,17 +109,27 @@
 
     def found_terminator(self):
         """Called by async_chat when a terminator is found in the input
-        stream."""
+        stream.
+        
+        Parse the incoming data depending on whether it terminated on the frame
+        header, playload or trailer. For the header, extract the payload size
+        parameter and use it as terminator or the payload. When the trailer has
+        been received, delegate to `_handle_frame()`.
+        """
         if self.header is None:
+            # Frame header received
             self.header = ''.join(self.inbuf).split(' ')
             self.inbuf = []
             if self.header[0] == 'SEQ':
                 # TCP mapping frame
                 raise NotImplementedError                
             else:
+                # Extract payload size to use as next terminator
                 try:
                     size = int(self.header[int(self.header[0] != 'ANS') - 2])
                 except ValueError:
+                    # TODO: Malformed frame... should we terminate the session
+                    # here?
                     self.header = None
                     return
                 if size == 0:
@@ -129,10 +138,12 @@
                 else:
                     self.set_terminator(size)
         elif self.payload is None:
+            # Frame payload received
             self.payload = ''.join(self.inbuf)
             self.inbuf = []
             self.set_terminator('END\r\n')
         else:
+            # Frame trailer received
             self._handle_frame(self.header, self.payload)
             self.header = self.payload = None
             self.inbuf = []
@@ -143,21 +154,23 @@
         
         This parses the frame header and decides which channel to pass it to.
         """
-        cmd = header[0]
-        if cmd == 'SEQ':
-            # RFC 3081 - need to digest and implement
-            raise NotImplementedError
-        channel = int(header[1])
-        msgno = int(header[2])
-        more = header[3] == '*'
-        seqno = int(header[4])
-        size = int(header[5])
-        if cmd == 'ANS':
-            ansno = int(header[6])
-        else:
+        msgno = None
+        channel = None
+        try:
+            cmd = header[0].upper()
+            channel = int(header[1])
+            msgno = int(header[2])
+            more = header[3] == '*'
+            seqno = int(header[4])
+            size = int(header[5])
             ansno = None
-        self.channels[channel].handle_frame(cmd, msgno, more, seqno,
-                                            ansno, payload)
+            if cmd == 'ANS':
+                ansno = int(header[6])
+            self.channels[channel].handle_frame(cmd, msgno, more, seqno,
+                                                ansno, payload)
+        except (ValueError, TypeError, ProtocolError), e:
+            if channel == 0 and msgno is not None:
+                self.channels[0].profile.send_error(msgno, 550, e)
 
     def send_frame(self, cmd, channel, msgno, more, seqno, ansno=None,
                   payload=''):
@@ -170,43 +183,52 @@
         header = ' '.join([str(hb) for hb in headerbits])
         self.push('\r\n'.join((header, payload, 'END', '')))
 
-    def start_channel(self, number, profile_uri):
-        profile = self.profiles[profile_uri]
-        channel = Channel(self, number, profile)
-        self.channels[number] = channel
-        return channel
+
+class Initiator(Session):
+    """Root class for BEEP peers in the initiating role."""
+
+    def __init__(self, ip, port, profiles=None):
+        """Create the BEEP session.
+        
+        @param ip: The IP address to connect to
+        @param port: The port to connect to
+        @param profiles: A dictionary of the supported profiles, where the key
+                         is the URI identifying the profile, and the value is a
+                         `Profile` instance that will handle the communication
+                         for that profile
+        """
+        Session.__init__(self, None, None, profiles or {})
+        self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
+        self.connect((ip, port))
 
     def greeting_received(self, profiles):
-        """Initiator sub-classes should override this to start the channels they
-        want.
+        """Sub-classes should override this to start the channels they need.
         
         @param profiles: A list of URIs of the profiles the peer claims to
                          support.
         """
 
 
-class Initiator(Session):
-
-    def __init__(self, ip, port, profiles=None, handle_greeting=None):
-        Session.__init__(self, None, None, profiles or {})
-        self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
-        self.connect((ip, port))
-        self.greeting_handler = handle_greeting
-
-
 class Channel(object):
     """A specific channel of a BEEP session."""
 
     def __init__(self, session, channelno, profile):
+        """Create the channel.
+
+        @param session The `Session` object that the channel belongs to
+        @param channelno The channel number
+        @param profile The associated `Profile` object
+        """
         self.session = session
         self.channelno = channelno
         self.inqueue = {}
+        self.outqueue = []
         self.reply_handlers = {}
 
         self.msgno = cycle_through(0, 2147483647)
         self.msgnos = {} # message numbers currently in use
         self.ansnos = {} # answer numbers keyed by msgno, each 0-2147483647
-        self.seqno = [serial(), serial()]
+        self.seqno = [serial(), serial()] # incoming, outgoing sequence numbers
         self.mime_parser = Parser()
 
         self.profile = profile
@@ -215,9 +237,18 @@
         self.profile.handle_connect()
 
     def handle_frame(self, cmd, msgno, more, seqno, ansno, payload):
+        """Process a single data frame.
+
+        @param cmd: The frame keyword (MSG, RPY, ERR, ANS or NUL)
+        @param msgno: The message number
+        @param more: `True` if more frames are pending for this message
+        @param seqno: Sequence number of the frame
+        @param ansno: The answer number for 'ANS' messages, otherwise `None`
+        @param payload: The frame payload as a string
+        """
         # Validate and update sequence number
         if seqno != self.seqno[0]:
-            raise Exception, 'Out of sync with peer' # TODO: Be nice
+            raise ProtocolError, 'Out of sync with peer' # TODO: Be nice
         self.seqno[0] += len(payload)
 
         if more:
@@ -296,123 +327,138 @@
 
 
 class Profile(object):
-    """Abstract base class for handlers of specific BEEP profiles."""
+    """Abstract base class for handlers of specific BEEP profiles.
+    
+    Concrete subclasses need to at least implement the `handle_msg()` method,
+    and may override any of the others.
+    """
 
     def __init__(self):
+        """Create the profile."""
         self.session = None
         self.channel = None
 
     def handle_connect(self):
+        """Called when the channel this profile is associated with is
+        initially started."""
         pass
 
     def handle_msg(self, msgno, message):
         raise NotImplementedError
 
     def handle_rpy(self, msgno, message):
-        raise NotImplementedError
+        pass
 
     def handle_err(self, msgno, message):
-        raise NotImplementedError
+        pass
 
     def handle_ans(self, msgno, ansno, message):
-        raise NotImplementedError
+        pass
 
     def handle_nul(self, msgno):
-        raise NotImplementedError
+        pass
 
 
 class ManagementProfile(Profile):
-    CONTENT_TYPE = 'application/beep+xml'
+    """Implementation of the BEEP management profile."""
 
     def handle_connect(self):
+        """Send a greeting reply directly after connecting to the peer."""
         greeting = Element('greeting')[
             [Element('profile', uri=k) for k in self.session.profiles.keys()]
         ]
-        self.channel.send_rpy(0, MIMEMessage(greeting, self.CONTENT_TYPE))
+        self.channel.send_rpy(0, MIMEMessage(greeting, BEEP_XML))
 
     def handle_msg(self, msgno, message):
-        assert message.get_content_type() == self.CONTENT_TYPE
-        root = parse_xml(message.get_payload())
-        if root.name == 'start':
-            print 'Start channel %s' % root.number
-            for profile in root['profile']:
+        assert message.get_content_type() == BEEP_XML
+        elem = parse_xml(message.get_payload())
+
+        if elem.name == 'start':
+            for profile in elem['profile']:
                 if profile.uri in self.session.profiles.keys():
-                    try:
-                        self.session.start_channel(int(root.number),
-                                                   profile.uri)
-                        message = MIMEMessage(Element('profile',
-                                                      uri=profile.uri),
-                                              self.CONTENT_TYPE)
-                        self.channel.send_rpy(msgno, message)
-                        return
-                    except StandardError, e:
-                        print e
-            message = MIMEMessage(Element('error', code=550)[
-                'All request profiles are unsupported'
-            ], self.CONTENT_TYPE)
-            self.channel.send_err(msgno, message)
-        elif root.name == 'close':
-            print 'Close channel %s' % root.number
-            message = MIMEMessage(Element('ok'), self.CONTENT_TYPE)
+                    print 'Start channel %s for profile <%s>' % (elem.number,
+                                                                 profile.uri)
+                    channel = Channel(self.session, int(elem.number),
+                                      self.session.profiles[profile.uri])
+                    self.session.channels[int(elem.number)] = channel
+                    message = MIMEMessage(Element('profile', uri=profile.uri),
+                                          BEEP_XML)
+                    self.channel.send_rpy(msgno, message)
+                    return
+            self.send_error(msgno, 550,
+                            'All requested profiles are unsupported')
+
+        elif elem.name == 'close':
+            channelno = int(elem.number)
+            if channelno == 0:
+                if len(self.session.channels) > 1:
+                    self.send_error(msgno, 550, 'Other channels still open')
+                    return
+            if self.session.channels[channelno].msgnos:
+                self.send_error(msgno, 550, 'Channel waiting for replies')
+                return
+            print 'Close channel %s' % (channelno)
+            del self.session.channels[channelno]
+            message = MIMEMessage(Element('ok'), BEEP_XML)
             self.channel.send_rpy(msgno, message)
-            # TODO: close the channel or, if channelno is 0, terminate the
-            #       session... actually, that's done implicitly when the
-            #       peer disconnects after receiving the <ok/>
+            if channelno == 0:
+                self.session.close()
 
     def handle_rpy(self, msgno, message):
-        assert message.get_content_type() == self.CONTENT_TYPE
-        root = parse_xml(message.get_payload())
-        if root.name == 'greeting':
-            print 'Greeting...'
-            profiles = [profile.uri for profile in root['profile']]
-            self.session.greeting_received(profiles)
-        elif root.name == 'profile':
-            # This means that the channel has been established with this
-            # profile... basically a channel_start_ok message
-            print 'Profile %s' % root.uri
-        elif root.name == 'ok':
-            # A close request for a channel has been accepted, so we can
-            # close it now
-            print 'OK'
+        assert message.get_content_type() == BEEP_XML
+        elem = parse_xml(message.get_payload())
+
+        if elem.name == 'greeting':
+            if isinstance(self.session, Initiator):
+                profiles = [profile.uri for profile in elem['profile']]
+                self.session.greeting_received(profiles)
+
+        else: # <profile/> and <ok/> are handled by callbacks
+            self.send_error(msgno, 501, 'What are you replying to, son?')
 
     def handle_err(self, msgno, message):
-        assert message.get_content_type() == self.CONTENT_TYPE
-        root = parse_xml(message.get_payload())
-        assert root.name == 'error'
-        print root.code
+        # Probably an error on connect, because other errors should get handled
+        # by the corresponding callbacks
+        # TODO: Terminate the session, I guess
+        assert message.get_content_type() == BEEP_XML
+        elem = parse_xml(message.get_payload())
+        assert elem.name == 'error'
+        print elem.code
 
-    def close(self, channel=0, code=200, handle_ok=None, handle_error=None):
+    def send_close(self, channelno=0, code=200, handle_ok=None,
+                   handle_error=None):
         def handle_reply(cmd, msgno, message):
             if handle_ok is not None and cmd == 'RPY':
+                del self.session.channels[channelno]
                 handle_ok()
             if handle_error is not None and cmd == 'ERR':
-                root = parse_xml(message.get_payload())
-                handle_error(int(root.code), root.gettext())
-        xml = Element('close', number=channel, code=code)
-        return self.channel.send_msg(MIMEMessage(xml, self.CONTENT_TYPE),
-                                     handle_reply)
+                elem = parse_xml(message.get_payload())
+                handle_error(int(elem.code), elem.gettext())
+        xml = Element('close', number=channelno, code=code)
+        return self.channel.send_msg(MIMEMessage(xml, BEEP_XML), handle_reply)
 
-    def start(self, profiles, handle_ok=None, handle_error=None):
+    def send_error(self, msgno, code, message=''):
+        xml = Element('error', code=code)[message]
+        self.channel.send_err(msgno, MIMEMessage(xml, BEEP_XML))
+
+    def send_start(self, profiles, handle_ok=None, handle_error=None):
         channelno = self.session.channelno.next()
         def handle_reply(cmd, msgno, message):
             if handle_ok is not None and cmd == 'RPY':
-                root = parse_xml(message.get_payload())
-                selected = None
-                for profile in profiles:
-                    if profile.URI == root.uri:
-                        selected = profile
-                        break
-                self.session.channels[channelno] = Channel(self.session,
-                                                           channelno, profile())
-                handle_ok(channelno, root.uri)
+                elem = parse_xml(message.get_payload())
+                for profile in [p for p in profiles if p.URI == elem.uri]:
+                    self.session.channels[channelno] = Channel(self.session,
+                                                               channelno,
+                                                               profile())
+                    break
+                handle_ok(channelno, elem.uri)
             if handle_error is not None and cmd == 'ERR':
-                root = parse_xml(message.get_payload())
-                handle_error(int(root.code), root.gettext())
+                elem = parse_xml(message.get_payload())
+                handle_error(int(elem.code), elem.gettext())
         xml = Element('start', number=channelno)[
             [Element('profile', uri=profile.URI) for profile in profiles]
         ]
-        return self.channel.send_msg(MIMEMessage(xml, self.CONTENT_TYPE),
-                                     handle_reply)
+        return self.channel.send_msg(MIMEMessage(xml, BEEP_XML), handle_reply)
 
 
 def cycle_through(start, stop=None, step=1):
@@ -423,7 +469,7 @@
     cur = start
     while True:
         yield cur
-        cur += 1
+        cur += step
         if cur  > stop:
             cur = start
 
@@ -460,15 +506,14 @@
         del self['MIME-Version']
 
 
-# Simple echo profile implementation for testing
-class EchoProfile(Profile):
-    URI = 'http://www.eigenmagic.com/beep/ECHO'
+if __name__ == '__main__':
+    # Simple echo profile implementation for testing
+    class EchoProfile(Profile):
+        URI = 'http://beepcore.org/beep/ECHO'
 
-    def handle_msg(self, msgno, message):
-        self.channel.send_rpy(msgno, message)
+        def handle_msg(self, msgno, message):
+            self.channel.send_rpy(msgno, message)
 
-
-if __name__ == '__main__':
     listener = Listener('127.0.0.1', 8000)
     listener.profiles[EchoProfile.URI] = EchoProfile()
     try:
--- a/bitten/util/xmlio.py
+++ b/bitten/util/xmlio.py
@@ -133,9 +133,6 @@
     def __init__(self, node):
         self.node = node
 
-    def __del__(self):
-        self.node.unlink()
-
     name = property(fget=lambda self: self.node.tagName)
 
     def __getattr__(self, name):
rename from scripts/beepclient.py
rename to scripts/echoclient.py
--- a/scripts/beepclient.py
+++ b/scripts/echoclient.py
@@ -6,6 +6,46 @@
 from bitten.util.xmlio import Element, parse_xml
 
 
+class StdinChannel(asyncore.file_dispatcher):
+
+    def __init__(self, handle_read):
+        asyncore.file_dispatcher.__init__(self, sys.stdin.fileno())
+        self.read_handler = handle_read
+
+    def readable(self):
+        return True
+
+    def handle_read(self):
+        data = self.recv(8192)
+        self.read_handler(data)
+
+    def writable(self):
+        return False
+
+
+class EchoProfile(Profile):
+    URI = 'http://beepcore.org/beep/ECHO'
+
+    def handle_rpy(self, msgno, message):
+        print '\x1b[31m' + message.get_payload().rstrip() + '\x1b[0m'
+
+
+class EchoClient(Initiator):
+
+    channel = None
+
+    def greeting_received(self, profiles):
+        def handle_ok(channelno, uri):
+            print 'Channel %d started for profile %s...' % (channelno, uri)
+            self.channel = channelno
+        def handle_error(code, message):
+            print>>sys.stderr, 'Error %d: %s' % (code, message)
+        if EchoProfile.URI in profiles:
+            self.channels[0].profile.send_start([EchoProfile],
+                                                handle_ok=handle_ok,
+                                                handle_error=handle_error)
+
+
 if __name__ == '__main__':
     host = 'localhost'
     port = 8000
@@ -14,46 +54,25 @@
         if len(sys.argv) > 2:
             port = int(sys.argv[2])
 
-
-    class EchoProfile(Profile):
-        URI = 'http://www.eigenmagic.com/beep/ECHO'
-
-        def handle_connect(self):
-            print 'Here we are!'
-            msgno = self.channel.send_msg(MIMEMessage('Hello peer!'))
-
-        def handle_rpy(self, msgno, message):
-            print message.get_payload()
-
-
-    class EchoClient(Initiator):
-
-        def handle_start_error(self, code, message):
-            print>>sys.stderr, 'Error %d: %s' % (code, message)
-
-        def handle_start_ok(self, channelno, uri):
-            print 'Channel %d started for profile %s...' % (channelno, uri)
-            #self.channels[channelno].send_msg(MIMEMessage('Hello'))
-
-        def greeting_received(self, profiles):
-            if EchoProfile.URI in profiles:
-                self.channels[0].profile.start([EchoProfile],
-                                               handle_ok=self.handle_start_ok,
-                                               handle_error=self.handle_start_error)
-
     client = EchoClient(host, port)
+    def handle_input(data):
+        message = MIMEMessage(data, 'text/plain')
+        client.channels[client.channel].send_msg(message)
+    stdin = StdinChannel(handle_input)
     try:
         while client:
             try:
                 asyncore.loop()
             except KeyboardInterrupt:
+                mgmt = client.channels[0].profile
                 def handle_ok():
                     raise asyncore.ExitNow, 'Session terminated'
                 def handle_error(code, message):
-                    print>>sys.stderr, 'Peer refused to terminate session (%d)' \
-                                       % code
-                client.channels[0].profile.close(handle_ok=handle_ok,
-                                                 handle_error=handle_error)
-                time.sleep(.2)
+                    print>>sys.stderr, \
+                        'Peer refused to terminate session (%d): %s' \
+                        % (code, message)
+                mgmt.send_close(client.channel)
+                mgmt.send_close(handle_ok=handle_ok, handle_error=handle_error)
+                time.sleep(.25)
     except asyncore.ExitNow, e:
         print e
Copyright (C) 2012-2017 Edgewall Software