changeset 10:2269b705deb9

Improved the BEEP protocol implementation: * Callbacks for replying to messages. * Starting of channels implemented. * Some error handling (though not much yet). Also, added a sample client that sends a message using the echo protocol. Finally, added a simple proxy script that outputs the peer-to-peer communication to the console.
author cmlenz
date Fri, 10 Jun 2005 20:21:10 +0000
parents 5d8457aa2025
children 4d31bd0c3eba
files bitten/util/beep.py bitten/util/tests/beep.py scripts/beepclient.py scripts/proxy.py
diffstat 4 files changed, 406 insertions(+), 109 deletions(-) [+]
line wrap: on
line diff
--- a/bitten/util/beep.py
+++ b/bitten/util/beep.py
@@ -28,14 +28,21 @@
     from cStringIO import StringIO
 except ImportError:
     from StringIO import StringIO
+import sys
 import traceback
 
 from bitten.util.xmlio import Element, parse_xml
 
 __all__ = ['Listener', 'Initiator', 'Profile']
 
+
 class Listener(asyncore.dispatcher):
-
+    """BEEP peer in the listener role.
+    
+    This peer opens a socket for listening to incoming connections. For each
+    connection, it opens a new session: an instance of `Session` that handle
+    communication with the connected peer.
+    """
     def __init__(self, ip, port):
         asyncore.dispatcher.__init__(self)
         self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
@@ -68,19 +75,21 @@
         pass
 
     def handle_accept(self):
+        """Start a new BEEP session."""
         conn, addr = self.accept()
         print 'Connected to %s:%s' % addr
         Session(conn, addr, self.profiles, first_channelno=2)
 
 
 class Session(asynchat.async_chat):
+    """A BEEP session between two peers."""
 
     def __init__(self, conn, addr, profiles, first_channelno=1):
         asynchat.async_chat.__init__(self, conn)
         self.addr = addr
         self.set_terminator('\r\n')
 
-        self.profiles = profiles
+        self.profiles = profiles or {}
         self.channels = {0: Channel(self, 0, ManagementProfile())}
         self.channelno = cycle_through(first_channelno, 2147483647, step=2)
         self.inbuf = []
@@ -89,10 +98,19 @@
     def handle_connect(self):
         pass
 
+    def handle_error(self):
+        t, v = sys.exc_info()[:2]
+        if t is SystemExit:
+            raise t, v
+        else:
+            asynchat.async_chat.handle_error(self)
+
     def collect_incoming_data(self, data):
         self.inbuf.append(data)
 
     def found_terminator(self):
+        """Called by async_chat when a terminator is found in the input
+        stream."""
         if self.header is None:
             self.header = ''.join(self.inbuf).split(' ')
             self.inbuf = []
@@ -121,28 +139,29 @@
             self.set_terminator('\r\n')
 
     def _handle_frame(self, header, payload):
-        try:
-            # Parse frame header
-            cmd = header[0]
-            if cmd == 'SEQ':
-                # RFC 3081 - need to digest and implement
-                raise NotImplementedError
-            channel = int(header[1])
-            msgno = int(header[2])
-            more = header[3] == '*'
-            seqno = int(header[4])
-            size = int(header[5])
-            if cmd == 'ANS':
-                ansno = int(header[6])
-            else:
-                ansno = None
-            self.channels[channel].handle_frame(cmd, msgno, more, seqno,
-                                                ansno, payload)
-        except Exception, e:
-            traceback.print_exc()
+        """Handle an incoming frame.
+        
+        This parses the frame header and decides which channel to pass it to.
+        """
+        cmd = header[0]
+        if cmd == 'SEQ':
+            # RFC 3081 - need to digest and implement
+            raise NotImplementedError
+        channel = int(header[1])
+        msgno = int(header[2])
+        more = header[3] == '*'
+        seqno = int(header[4])
+        size = int(header[5])
+        if cmd == 'ANS':
+            ansno = int(header[6])
+        else:
+            ansno = None
+        self.channels[channel].handle_frame(cmd, msgno, more, seqno,
+                                            ansno, payload)
 
-    def send_data(self, cmd, channel, msgno, more, seqno, ansno=None,
+    def send_frame(self, cmd, channel, msgno, more, seqno, ansno=None,
                   payload=''):
+        """Send the specified data frame to the peer."""
         headerbits = [cmd, channel, msgno, more and '*' or '.', seqno,
                       len(payload)]
         if cmd == 'ANS':
@@ -151,13 +170,28 @@
         header = ' '.join([str(hb) for hb in headerbits])
         self.push('\r\n'.join((header, payload, 'END', '')))
 
+    def start_channel(self, number, profile_uri):
+        profile = self.profiles[profile_uri]
+        channel = Channel(self, number, profile)
+        self.channels[number] = channel
+        return channel
+
+    def greeting_received(self, profiles):
+        """Initiator sub-classes should override this to start the channels they
+        want.
+        
+        @param profiles: A list of URIs of the profiles the peer claims to
+                         support.
+        """
+
 
 class Initiator(Session):
 
-    def __init__(self, ip, port, profiles=None):
+    def __init__(self, ip, port, profiles=None, handle_greeting=None):
         Session.__init__(self, None, None, profiles or {})
         self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
         self.connect((ip, port))
+        self.greeting_handler = handle_greeting
 
 
 class Channel(object):
@@ -167,7 +201,7 @@
         self.session = session
         self.channelno = channelno
         self.inqueue = {}
-        self.outqueue = {}
+        self.reply_handlers = {}
 
         self.msgno = cycle_through(0, 2147483647)
         self.msgnos = {} # message numbers currently in use
@@ -181,22 +215,42 @@
         self.profile.handle_connect()
 
     def handle_frame(self, cmd, msgno, more, seqno, ansno, payload):
-        if seqno != self.seqno[0]: # Validate and update sequence number
+        # Validate and update sequence number
+        if seqno != self.seqno[0]:
             raise Exception, 'Out of sync with peer' # TODO: Be nice
         self.seqno[0] += len(payload)
-        if more: # More of this message pending, so push it on the queue
+
+        if more:
+            # More of this message pending, so push it on the queue
             self.inqueue.setdefault(msgno, []).append(payload)
-        else: # Complete message received, so handle it
-            if msgno in self.inqueue.keys(): # Recombine queued messages
-                payload = ''.join(self.inqueue[msgno]) + payload
-                del self.inqueue[msgno]
-            if cmd == 'RPY' and msgno in self.msgnos.keys():
-                # Final reply using this message number, so dealloc
-                del self.msgnos[msgno]
-            message = None
-            if payload:
-                message = self.mime_parser.parsestr(payload)
-            self.profile.handle_message(cmd, msgno, message)
+            return
+
+        # Complete message received, so handle it
+        if msgno in self.inqueue.keys():
+            # Recombine queued messages
+            payload = ''.join(self.inqueue[msgno]) + payload
+            del self.inqueue[msgno]
+        if cmd == 'RPY' and msgno in self.msgnos.keys():
+            # Final reply using this message number, so dealloc
+            del self.msgnos[msgno]
+        message = None
+        if payload:
+            message = self.mime_parser.parsestr(payload)
+
+        if cmd == 'MSG':
+            self.profile.handle_msg(msgno, message)
+        else:
+            if msgno in self.reply_handlers.keys():
+                self.reply_handlers[msgno](cmd, msgno, message)
+                del self.reply_handlers[msgno]
+            elif cmd == 'RPY':
+                self.profile.handle_rpy(msgno, message)
+            elif cmd == 'ERR':
+                self.profile.handle_err(msgno, message)
+            elif cmd == 'ANS':
+                self.profile.handle_ans(msgno, ansno, message)
+            elif cmd == 'NUL':
+                self.profile.handle_nul(msgno)
 
     def _send(self, cmd, msgno, ansno=None, message=None):
         # TODO: Fragment and queue the message if necessary;
@@ -204,18 +258,21 @@
         payload = ''
         if message is not None:
             payload = message.as_string()
-        self.session.send_data(cmd, self.channelno, msgno, False,
-                               self.seqno[1].value, payload=payload,
-                               ansno=ansno)
+        self.session.send_frame(cmd, self.channelno, msgno, False,
+                                self.seqno[1].value, payload=payload,
+                                ansno=ansno)
         self.seqno[1] += len(payload)
 
-    def send_msg(self, message):
+    def send_msg(self, message, handle_reply=None):
         while True: # Find a unique message number
             msgno = self.msgno.next()
             if msgno not in self.msgnos.keys():
                 break
         self.msgnos[msgno] = True # Flag the chosen message number as in use
+        if handle_reply is not None:
+            self.reply_handlers[msgno] = handle_reply
         self._send('MSG', msgno, None, message)
+        return msgno
 
     def send_rpy(self, msgno, message):
         self._send('RPY', msgno, None, message)
@@ -229,16 +286,17 @@
             self.ansnos[msgno] = ansno
         else:
             ansno = self.ansnos[msgno]
-        self._send('ANS', msgno, ansno.next(), message)
+        next_ansno = ansno.next()
+        self._send('ANS', msgno, next_ansno, message)
+        return next_ansno
 
     def send_nul(self, msgno):
         self._send('NUL', msgno)
+        del self.ansnos[msgno] # dealloc answer numbers for the message
 
 
 class Profile(object):
     """Abstract base class for handlers of specific BEEP profiles."""
-    # TODO: This is pretty thin... would a meta-class for declarative definition
-    #       of profiles work here? 
 
     def __init__(self):
         self.session = None
@@ -247,69 +305,114 @@
     def handle_connect(self):
         pass
 
-    def handle_message(self, cmd, message):
+    def handle_msg(self, msgno, message):
+        raise NotImplementedError
+
+    def handle_rpy(self, msgno, message):
+        raise NotImplementedError
+
+    def handle_err(self, msgno, message):
+        raise NotImplementedError
+
+    def handle_ans(self, msgno, ansno, message):
+        raise NotImplementedError
+
+    def handle_nul(self, msgno):
         raise NotImplementedError
 
 
 class ManagementProfile(Profile):
     CONTENT_TYPE = 'application/beep+xml'
 
-    def __init__(self):
-        Profile.__init__(self)
-        self.state = 'init'
-
     def handle_connect(self):
         greeting = Element('greeting')[
             [Element('profile', uri=k) for k in self.session.profiles.keys()]
         ]
         self.channel.send_rpy(0, MIMEMessage(greeting, self.CONTENT_TYPE))
 
-    def handle_message(self, cmd, msgno, message):
+    def handle_msg(self, msgno, message):
         assert message.get_content_type() == self.CONTENT_TYPE
         root = parse_xml(message.get_payload())
-        if cmd == 'MSG':
-            if root.name == 'start':
-                print 'Start channel %s' % root.number
-                for profile in root['profile']:
-                    if uri in self.session.profiles.keys():
+        if root.name == 'start':
+            print 'Start channel %s' % root.number
+            for profile in root['profile']:
+                if profile.uri in self.session.profiles.keys():
+                    try:
+                        self.session.start_channel(int(root.number),
+                                                   profile.uri)
                         message = MIMEMessage(Element('profile',
                                                       uri=profile.uri),
                                               self.CONTENT_TYPE)
                         self.channel.send_rpy(msgno, message)
-                        # TODO: create the channel
                         return
-                # TODO: send error (unsupported profile)
-            elif root.name == 'close':
-                print 'Close channel %s' % root.number
-                message = MIMEMessage(Element('ok'), self.CONTENT_TYPE)
-                self.channel.send_rpy(msgno, message)
-                # TODO: close the channel, or if channelno is 0, terminate the
-                #       session... actually, that's done implicitly when the
-                #       peer disconnects after receiving the <ok/>
-        elif cmd == 'RPY':
-            if root.name == 'greeting':
-                print 'Greeting...'
-                for profile in root['profile']:
-                    print '  profile %s' % profile.uri
-                # TODO: invoke handler handle_greeting(profiles) or something
-            elif root.name == 'profile':
-                # This means that the channel has been established with this
-                # profile... basically a channel_start_ok message
-                print 'Profile %s' % root.uri
-            elif root.name == 'ok':
-                # A close request for a channel has been accepted, so we can
-                # close it now
-                print 'OK'
+                    except StandardError, e:
+                        print e
+            message = MIMEMessage(Element('error', code=550)[
+                'All request profiles are unsupported'
+            ], self.CONTENT_TYPE)
+            self.channel.send_err(msgno, message)
+        elif root.name == 'close':
+            print 'Close channel %s' % root.number
+            message = MIMEMessage(Element('ok'), self.CONTENT_TYPE)
+            self.channel.send_rpy(msgno, message)
+            # TODO: close the channel or, if channelno is 0, terminate the
+            #       session... actually, that's done implicitly when the
+            #       peer disconnects after receiving the <ok/>
 
-    def close(self, channel=0, code=200):
-        xml = Element('close', number=channel, code=code)
-        self.channel.send_msg(MIMEMessage(xml, self.CONTENT_TYPE))
+    def handle_rpy(self, msgno, message):
+        assert message.get_content_type() == self.CONTENT_TYPE
+        root = parse_xml(message.get_payload())
+        if root.name == 'greeting':
+            print 'Greeting...'
+            profiles = [profile.uri for profile in root['profile']]
+            self.session.greeting_received(profiles)
+        elif root.name == 'profile':
+            # This means that the channel has been established with this
+            # profile... basically a channel_start_ok message
+            print 'Profile %s' % root.uri
+        elif root.name == 'ok':
+            # A close request for a channel has been accepted, so we can
+            # close it now
+            print 'OK'
 
-    def start(self, profiles):
-        xml = Element('start', number=self.session.channelno.next())[
-            [Element('profile', uri=uri) for uri in profiles]
+    def handle_err(self, msgno, message):
+        assert message.get_content_type() == self.CONTENT_TYPE
+        root = parse_xml(message.get_payload())
+        assert root.name == 'error'
+        print root.code
+
+    def close(self, channel=0, code=200, handle_ok=None, handle_error=None):
+        def handle_reply(cmd, msgno, message):
+            if handle_ok is not None and cmd == 'RPY':
+                handle_ok()
+            if handle_error is not None and cmd == 'ERR':
+                root = parse_xml(message.get_payload())
+                handle_error(int(root.code), root.gettext())
+        xml = Element('close', number=channel, code=code)
+        return self.channel.send_msg(MIMEMessage(xml, self.CONTENT_TYPE),
+                                     handle_reply)
+
+    def start(self, profiles, handle_ok=None, handle_error=None):
+        channelno = self.session.channelno.next()
+        def handle_reply(cmd, msgno, message):
+            if handle_ok is not None and cmd == 'RPY':
+                root = parse_xml(message.get_payload())
+                selected = None
+                for profile in profiles:
+                    if profile.URI == root.uri:
+                        selected = profile
+                        break
+                self.session.channels[channelno] = Channel(self.session,
+                                                           channelno, profile())
+                handle_ok(channelno, root.uri)
+            if handle_error is not None and cmd == 'ERR':
+                root = parse_xml(message.get_payload())
+                handle_error(int(root.code), root.gettext())
+        xml = Element('start', number=channelno)[
+            [Element('profile', uri=profile.URI) for profile in profiles]
         ]
-        self.channel.send_msg(MIMEMessage(xml, self.CONTENT_TYPE))
+        return self.channel.send_msg(MIMEMessage(xml, self.CONTENT_TYPE),
+                                     handle_reply)
 
 
 def cycle_through(start, stop=None, step=1):
@@ -357,8 +460,17 @@
         del self['MIME-Version']
 
 
+# Simple echo profile implementation for testing
+class EchoProfile(Profile):
+    URI = 'http://www.eigenmagic.com/beep/ECHO'
+
+    def handle_msg(self, msgno, message):
+        self.channel.send_rpy(msgno, message)
+
+
 if __name__ == '__main__':
     listener = Listener('127.0.0.1', 8000)
+    listener.profiles[EchoProfile.URI] = EchoProfile()
     try:
         asyncore.loop()
     except KeyboardInterrupt:
--- a/bitten/util/tests/beep.py
+++ b/bitten/util/tests/beep.py
@@ -9,8 +9,8 @@
     def __init__(self):
         self.sent_messages = []
 
-    def send_data(self, cmd, channel, msgno, more, seqno, ansno=None,
-                  payload=''):
+    def send_frame(self, cmd, channel, msgno, more, seqno, ansno=None,
+                   payload=''):
         self.sent_messages.append((cmd, channel, msgno, more, seqno, ansno,
                                    payload.strip()))
 
@@ -23,8 +23,13 @@
     def handle_connect(self):
         pass
 
-    def handle_message(self, cmd, msgno, message):
-        self.handled_messages.append((cmd, msgno, message.as_string().strip()))
+    def handle_msg(self, msgno, message):
+        text = message.as_string().strip()
+        self.handled_messages.append(('MSG', msgno, text))
+
+    def handle_rpy(self, msgno, message):
+        text = message.as_string().strip()
+        self.handled_messages.append(('RPY', msgno, text))
 
 
 class ChannelTestCase(unittest.TestCase):
@@ -61,10 +66,10 @@
         corresponding message number (0) is reserved.
         """
         channel = beep.Channel(self.session, 0, self.profile)
-        channel.send_msg(beep.MIMEMessage('foo bar'))
-        self.assertEqual(('MSG', 0, 0, False, 0L, None, 'foo bar'),
+        msgno = channel.send_msg(beep.MIMEMessage('foo bar'))
+        self.assertEqual(('MSG', 0, msgno, False, 0L, None, 'foo bar'),
                          self.session.sent_messages[0])
-        assert 0 in channel.msgnos.keys()
+        assert msgno in channel.msgnos.keys()
 
     def test_send_message_msgno_inc(self):
         """
@@ -72,14 +77,25 @@
         messages.
         """
         channel = beep.Channel(self.session, 0, self.profile)
-        channel.send_msg(beep.MIMEMessage('foo bar'))
-        self.assertEqual(('MSG', 0, 0, False, 0L, None, 'foo bar'),
+        msgno = channel.send_msg(beep.MIMEMessage('foo bar'))
+        assert msgno == 0
+        self.assertEqual(('MSG', 0, msgno, False, 0L, None, 'foo bar'),
                          self.session.sent_messages[0])
-        assert 0 in channel.msgnos.keys()
-        channel.send_msg(beep.MIMEMessage('foo baz'))
-        self.assertEqual(('MSG', 0, 1, False, 8L, None, 'foo baz'),
+        assert msgno in channel.msgnos.keys()
+        msgno = channel.send_msg(beep.MIMEMessage('foo baz'))
+        assert msgno == 1
+        self.assertEqual(('MSG', 0, msgno, False, 8L, None, 'foo baz'),
                          self.session.sent_messages[1])
-        assert 1 in channel.msgnos.keys()
+        assert msgno in channel.msgnos.keys()
+
+    def test_send_reply(self):
+        """
+        Verify that sending an ANS message is processed correctly.
+        """
+        channel = beep.Channel(self.session, 0, self.profile)
+        channel.send_rpy(0, beep.MIMEMessage('foo bar'))
+        self.assertEqual(('RPY', 0, 0, False, 0L, None, 'foo bar'),
+                         self.session.sent_messages[0])
 
     def test_message_and_reply(self):
         """
@@ -87,25 +103,42 @@
         received.
         """
         channel = beep.Channel(self.session, 0, self.profile)
-        channel.send_msg(beep.MIMEMessage('foo bar'))
-        self.assertEqual(('MSG', 0, 0, False, 0L, None, 'foo bar'),
+        msgno = channel.send_msg(beep.MIMEMessage('foo bar'))
+        self.assertEqual(('MSG', 0, msgno, False, 0L, None, 'foo bar'),
                          self.session.sent_messages[0])
-        assert 0 in channel.msgnos.keys()
-        channel.handle_frame('RPY', 0, False, 0, None, '42')
-        self.assertEqual(('RPY', 0, '42'), self.profile.handled_messages[0])
-        assert 0 not in channel.msgnos.keys()
+        assert msgno in channel.msgnos.keys()
+        channel.handle_frame('RPY', msgno, False, 0, None, '42')
+        self.assertEqual(('RPY', msgno, '42'), self.profile.handled_messages[0])
+        assert msgno not in channel.msgnos.keys()
 
-    def test_send_answer(self):
+    def test_send_error(self):
+        """
+        Verify that sending an ERR message is processed correctly.
+        """
+        channel = beep.Channel(self.session, 0, self.profile)
+        channel.send_err(0, beep.MIMEMessage('oops'))
+        self.assertEqual(('ERR', 0, 0, False, 0L, None, 'oops'),
+                         self.session.sent_messages[0])
+
+    def test_send_answers(self):
         """
         Verify that sending an ANS message is processed correctly.
         """
         channel = beep.Channel(self.session, 0, self.profile)
-        channel.send_ans(0, beep.MIMEMessage('foo bar'))
-        self.assertEqual(('ANS', 0, 0, False, 0L, 0, 'foo bar'),
+        ansno = channel.send_ans(0, beep.MIMEMessage('foo bar'))
+        assert ansno == 0
+        self.assertEqual(('ANS', 0, 0, False, 0L, ansno, 'foo bar'),
                          self.session.sent_messages[0])
-        channel.send_ans(0, beep.MIMEMessage('foo baz'))
-        self.assertEqual(('ANS', 0, 0, False, 8L, 1, 'foo baz'),
+        assert 0 in channel.ansnos.keys()
+        ansno = channel.send_ans(0, beep.MIMEMessage('foo baz'))
+        assert ansno == 1
+        self.assertEqual(('ANS', 0, 0, False, 8L, ansno, 'foo baz'),
                          self.session.sent_messages[1])
+        assert 0 in channel.ansnos.keys()
+        channel.send_nul(0)
+        self.assertEqual(('NUL', 0, 0, False, 16L, None, ''),
+                         self.session.sent_messages[2])
+        assert 0 not in channel.ansnos.keys()
 
 
 def suite():
new file mode 100644
--- /dev/null
+++ b/scripts/beepclient.py
@@ -0,0 +1,59 @@
+import asyncore
+import sys
+import time
+
+from bitten.util.beep import Initiator, MIMEMessage, Profile
+from bitten.util.xmlio import Element, parse_xml
+
+
+if __name__ == '__main__':
+    host = 'localhost'
+    port = 8000
+    if len(sys.argv) > 1:
+        host = sys.argv[1]
+        if len(sys.argv) > 2:
+            port = int(sys.argv[2])
+
+
+    class EchoProfile(Profile):
+        URI = 'http://www.eigenmagic.com/beep/ECHO'
+
+        def handle_connect(self):
+            print 'Here we are!'
+            msgno = self.channel.send_msg(MIMEMessage('Hello peer!'))
+
+        def handle_rpy(self, msgno, message):
+            print message.get_payload()
+
+
+    class EchoClient(Initiator):
+
+        def handle_start_error(self, code, message):
+            print>>sys.stderr, 'Error %d: %s' % (code, message)
+
+        def handle_start_ok(self, channelno, uri):
+            print 'Channel %d started for profile %s...' % (channelno, uri)
+            #self.channels[channelno].send_msg(MIMEMessage('Hello'))
+
+        def greeting_received(self, profiles):
+            if EchoProfile.URI in profiles:
+                self.channels[0].profile.start([EchoProfile],
+                                               handle_ok=self.handle_start_ok,
+                                               handle_error=self.handle_start_error)
+
+    client = EchoClient(host, port)
+    try:
+        while client:
+            try:
+                asyncore.loop()
+            except KeyboardInterrupt:
+                def handle_ok():
+                    raise asyncore.ExitNow, 'Session terminated'
+                def handle_error(code, message):
+                    print>>sys.stderr, 'Peer refused to terminate session (%d)' \
+                                       % code
+                client.channels[0].profile.close(handle_ok=handle_ok,
+                                                 handle_error=handle_error)
+                time.sleep(.2)
+    except asyncore.ExitNow, e:
+        print e
new file mode 100644
--- /dev/null
+++ b/scripts/proxy.py
@@ -0,0 +1,93 @@
+# Based on the proxy module from the Medusa project
+# Used for inspecting the communication between two BEEP peers
+
+import asynchat
+import asyncore
+import socket
+import sys
+
+
+class proxy_server(asyncore.dispatcher):
+    
+    def __init__(self, host, port):
+        asyncore.dispatcher.__init__ (self)
+        self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
+        self.set_reuse_addr()
+        self.there = (host, port)
+        here = ('', port + 1)
+        self.bind(here)
+        self.listen(5)
+
+    def handle_accept(self):
+        proxy_receiver(self, self.accept())
+
+
+class proxy_sender(asynchat.async_chat):
+
+    def __init__(self, receiver, address):
+        asynchat.async_chat.__init__(self)
+        self.receiver = receiver
+        self.set_terminator(None)
+        self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
+        self.buffer = ''
+        self.set_terminator('\r\n')
+        self.connect(address)
+        print 'L:', '<wait for incoming connections>'
+
+    def handle_connect(self):
+        print 'L:', '<open connection>'
+
+    def collect_incoming_data(self, data):
+        self.buffer = self.buffer + data
+
+    def found_terminator(self):
+        data = self.buffer
+        self.buffer = ''
+        for line in data.splitlines():
+            print 'L:', '\x1b[35m' + line + '\x1b[0m'
+        self.receiver.push(data + '\r\n')
+
+    def handle_close(self):
+         self.receiver.close()
+         self.close()
+
+
+class proxy_receiver(asynchat.async_chat):
+
+    channel_counter = 0
+
+    def __init__(self, server, (conn, addr)):
+        asynchat.async_chat.__init__(self, conn)
+        self.set_terminator('\r\n')
+        self.server = server
+        self.id = self.channel_counter
+        self.channel_counter = self.channel_counter + 1
+        self.sender = proxy_sender (self, server.there)
+        self.sender.id = self.id
+        self.buffer = ''
+
+    def collect_incoming_data (self, data):
+        self.buffer = self.buffer + data
+
+    def found_terminator(self):
+        data = self.buffer
+        self.buffer = ''
+        for line in data.splitlines():
+            print 'I:', '\x1b[34m' + line + '\x1b[0m'
+        self.sender.push (data + '\r\n')
+
+    def handle_connect(self):
+        print 'I:', '<open connection>'
+
+    def handle_close(self):
+         print 'I:', '<close connection>'
+         self.sender.close()
+         self.close()
+
+
+if __name__ == '__main__':
+    if len(sys.argv) < 3:
+        print 'Usage: %s <server-host> <server-port>' % sys.argv[0]
+    else:
+        ps = proxy_server(sys.argv[1], int(sys.argv[2]))
+        asyncore.loop()
Copyright (C) 2012-2017 Edgewall Software