# HG changeset patch
# User cmlenz
# Date 1118434870 0
# Node ID 2269b705deb934e8e14c4c4bacd57622724a65fa
# Parent 5d8457aa20259153fe7d1302dc1271f26af662ed
Improved the BEEP protocol implementation:
* Callbacks for replying to messages.
* Starting of channels implemented.
* Some error handling (though not much yet).
Also, added a sample client that sends a message using the echo protocol.
Finally, added a simple proxy script that outputs the peer-to-peer communication to the console.
diff --git a/bitten/util/beep.py b/bitten/util/beep.py
--- a/bitten/util/beep.py
+++ b/bitten/util/beep.py
@@ -28,14 +28,21 @@
from cStringIO import StringIO
except ImportError:
from StringIO import StringIO
+import sys
import traceback
from bitten.util.xmlio import Element, parse_xml
__all__ = ['Listener', 'Initiator', 'Profile']
+
class Listener(asyncore.dispatcher):
-
+ """BEEP peer in the listener role.
+
+ This peer opens a socket for listening to incoming connections. For each
+ connection, it opens a new session: an instance of `Session` that handle
+ communication with the connected peer.
+ """
def __init__(self, ip, port):
asyncore.dispatcher.__init__(self)
self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
@@ -68,19 +75,21 @@
pass
def handle_accept(self):
+ """Start a new BEEP session."""
conn, addr = self.accept()
print 'Connected to %s:%s' % addr
Session(conn, addr, self.profiles, first_channelno=2)
class Session(asynchat.async_chat):
+ """A BEEP session between two peers."""
def __init__(self, conn, addr, profiles, first_channelno=1):
asynchat.async_chat.__init__(self, conn)
self.addr = addr
self.set_terminator('\r\n')
- self.profiles = profiles
+ self.profiles = profiles or {}
self.channels = {0: Channel(self, 0, ManagementProfile())}
self.channelno = cycle_through(first_channelno, 2147483647, step=2)
self.inbuf = []
@@ -89,10 +98,19 @@
def handle_connect(self):
pass
+ def handle_error(self):
+ t, v = sys.exc_info()[:2]
+ if t is SystemExit:
+ raise t, v
+ else:
+ asynchat.async_chat.handle_error(self)
+
def collect_incoming_data(self, data):
self.inbuf.append(data)
def found_terminator(self):
+ """Called by async_chat when a terminator is found in the input
+ stream."""
if self.header is None:
self.header = ''.join(self.inbuf).split(' ')
self.inbuf = []
@@ -121,28 +139,29 @@
self.set_terminator('\r\n')
def _handle_frame(self, header, payload):
- try:
- # Parse frame header
- cmd = header[0]
- if cmd == 'SEQ':
- # RFC 3081 - need to digest and implement
- raise NotImplementedError
- channel = int(header[1])
- msgno = int(header[2])
- more = header[3] == '*'
- seqno = int(header[4])
- size = int(header[5])
- if cmd == 'ANS':
- ansno = int(header[6])
- else:
- ansno = None
- self.channels[channel].handle_frame(cmd, msgno, more, seqno,
- ansno, payload)
- except Exception, e:
- traceback.print_exc()
+ """Handle an incoming frame.
+
+ This parses the frame header and decides which channel to pass it to.
+ """
+ cmd = header[0]
+ if cmd == 'SEQ':
+ # RFC 3081 - need to digest and implement
+ raise NotImplementedError
+ channel = int(header[1])
+ msgno = int(header[2])
+ more = header[3] == '*'
+ seqno = int(header[4])
+ size = int(header[5])
+ if cmd == 'ANS':
+ ansno = int(header[6])
+ else:
+ ansno = None
+ self.channels[channel].handle_frame(cmd, msgno, more, seqno,
+ ansno, payload)
- def send_data(self, cmd, channel, msgno, more, seqno, ansno=None,
+ def send_frame(self, cmd, channel, msgno, more, seqno, ansno=None,
payload=''):
+ """Send the specified data frame to the peer."""
headerbits = [cmd, channel, msgno, more and '*' or '.', seqno,
len(payload)]
if cmd == 'ANS':
@@ -151,13 +170,28 @@
header = ' '.join([str(hb) for hb in headerbits])
self.push('\r\n'.join((header, payload, 'END', '')))
+ def start_channel(self, number, profile_uri):
+ profile = self.profiles[profile_uri]
+ channel = Channel(self, number, profile)
+ self.channels[number] = channel
+ return channel
+
+ def greeting_received(self, profiles):
+ """Initiator sub-classes should override this to start the channels they
+ want.
+
+ @param profiles: A list of URIs of the profiles the peer claims to
+ support.
+ """
+
class Initiator(Session):
- def __init__(self, ip, port, profiles=None):
+ def __init__(self, ip, port, profiles=None, handle_greeting=None):
Session.__init__(self, None, None, profiles or {})
self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
self.connect((ip, port))
+ self.greeting_handler = handle_greeting
class Channel(object):
@@ -167,7 +201,7 @@
self.session = session
self.channelno = channelno
self.inqueue = {}
- self.outqueue = {}
+ self.reply_handlers = {}
self.msgno = cycle_through(0, 2147483647)
self.msgnos = {} # message numbers currently in use
@@ -181,22 +215,42 @@
self.profile.handle_connect()
def handle_frame(self, cmd, msgno, more, seqno, ansno, payload):
- if seqno != self.seqno[0]: # Validate and update sequence number
+ # Validate and update sequence number
+ if seqno != self.seqno[0]:
raise Exception, 'Out of sync with peer' # TODO: Be nice
self.seqno[0] += len(payload)
- if more: # More of this message pending, so push it on the queue
+
+ if more:
+ # More of this message pending, so push it on the queue
self.inqueue.setdefault(msgno, []).append(payload)
- else: # Complete message received, so handle it
- if msgno in self.inqueue.keys(): # Recombine queued messages
- payload = ''.join(self.inqueue[msgno]) + payload
- del self.inqueue[msgno]
- if cmd == 'RPY' and msgno in self.msgnos.keys():
- # Final reply using this message number, so dealloc
- del self.msgnos[msgno]
- message = None
- if payload:
- message = self.mime_parser.parsestr(payload)
- self.profile.handle_message(cmd, msgno, message)
+ return
+
+ # Complete message received, so handle it
+ if msgno in self.inqueue.keys():
+ # Recombine queued messages
+ payload = ''.join(self.inqueue[msgno]) + payload
+ del self.inqueue[msgno]
+ if cmd == 'RPY' and msgno in self.msgnos.keys():
+ # Final reply using this message number, so dealloc
+ del self.msgnos[msgno]
+ message = None
+ if payload:
+ message = self.mime_parser.parsestr(payload)
+
+ if cmd == 'MSG':
+ self.profile.handle_msg(msgno, message)
+ else:
+ if msgno in self.reply_handlers.keys():
+ self.reply_handlers[msgno](cmd, msgno, message)
+ del self.reply_handlers[msgno]
+ elif cmd == 'RPY':
+ self.profile.handle_rpy(msgno, message)
+ elif cmd == 'ERR':
+ self.profile.handle_err(msgno, message)
+ elif cmd == 'ANS':
+ self.profile.handle_ans(msgno, ansno, message)
+ elif cmd == 'NUL':
+ self.profile.handle_nul(msgno)
def _send(self, cmd, msgno, ansno=None, message=None):
# TODO: Fragment and queue the message if necessary;
@@ -204,18 +258,21 @@
payload = ''
if message is not None:
payload = message.as_string()
- self.session.send_data(cmd, self.channelno, msgno, False,
- self.seqno[1].value, payload=payload,
- ansno=ansno)
+ self.session.send_frame(cmd, self.channelno, msgno, False,
+ self.seqno[1].value, payload=payload,
+ ansno=ansno)
self.seqno[1] += len(payload)
- def send_msg(self, message):
+ def send_msg(self, message, handle_reply=None):
while True: # Find a unique message number
msgno = self.msgno.next()
if msgno not in self.msgnos.keys():
break
self.msgnos[msgno] = True # Flag the chosen message number as in use
+ if handle_reply is not None:
+ self.reply_handlers[msgno] = handle_reply
self._send('MSG', msgno, None, message)
+ return msgno
def send_rpy(self, msgno, message):
self._send('RPY', msgno, None, message)
@@ -229,16 +286,17 @@
self.ansnos[msgno] = ansno
else:
ansno = self.ansnos[msgno]
- self._send('ANS', msgno, ansno.next(), message)
+ next_ansno = ansno.next()
+ self._send('ANS', msgno, next_ansno, message)
+ return next_ansno
def send_nul(self, msgno):
self._send('NUL', msgno)
+ del self.ansnos[msgno] # dealloc answer numbers for the message
class Profile(object):
"""Abstract base class for handlers of specific BEEP profiles."""
- # TODO: This is pretty thin... would a meta-class for declarative definition
- # of profiles work here?
def __init__(self):
self.session = None
@@ -247,69 +305,114 @@
def handle_connect(self):
pass
- def handle_message(self, cmd, message):
+ def handle_msg(self, msgno, message):
+ raise NotImplementedError
+
+ def handle_rpy(self, msgno, message):
+ raise NotImplementedError
+
+ def handle_err(self, msgno, message):
+ raise NotImplementedError
+
+ def handle_ans(self, msgno, ansno, message):
+ raise NotImplementedError
+
+ def handle_nul(self, msgno):
raise NotImplementedError
class ManagementProfile(Profile):
CONTENT_TYPE = 'application/beep+xml'
- def __init__(self):
- Profile.__init__(self)
- self.state = 'init'
-
def handle_connect(self):
greeting = Element('greeting')[
[Element('profile', uri=k) for k in self.session.profiles.keys()]
]
self.channel.send_rpy(0, MIMEMessage(greeting, self.CONTENT_TYPE))
- def handle_message(self, cmd, msgno, message):
+ def handle_msg(self, msgno, message):
assert message.get_content_type() == self.CONTENT_TYPE
root = parse_xml(message.get_payload())
- if cmd == 'MSG':
- if root.name == 'start':
- print 'Start channel %s' % root.number
- for profile in root['profile']:
- if uri in self.session.profiles.keys():
+ if root.name == 'start':
+ print 'Start channel %s' % root.number
+ for profile in root['profile']:
+ if profile.uri in self.session.profiles.keys():
+ try:
+ self.session.start_channel(int(root.number),
+ profile.uri)
message = MIMEMessage(Element('profile',
uri=profile.uri),
self.CONTENT_TYPE)
self.channel.send_rpy(msgno, message)
- # TODO: create the channel
return
- # TODO: send error (unsupported profile)
- elif root.name == 'close':
- print 'Close channel %s' % root.number
- message = MIMEMessage(Element('ok'), self.CONTENT_TYPE)
- self.channel.send_rpy(msgno, message)
- # TODO: close the channel, or if channelno is 0, terminate the
- # session... actually, that's done implicitly when the
- # peer disconnects after receiving the
- elif cmd == 'RPY':
- if root.name == 'greeting':
- print 'Greeting...'
- for profile in root['profile']:
- print ' profile %s' % profile.uri
- # TODO: invoke handler handle_greeting(profiles) or something
- elif root.name == 'profile':
- # This means that the channel has been established with this
- # profile... basically a channel_start_ok message
- print 'Profile %s' % root.uri
- elif root.name == 'ok':
- # A close request for a channel has been accepted, so we can
- # close it now
- print 'OK'
+ except StandardError, e:
+ print e
+ message = MIMEMessage(Element('error', code=550)[
+ 'All request profiles are unsupported'
+ ], self.CONTENT_TYPE)
+ self.channel.send_err(msgno, message)
+ elif root.name == 'close':
+ print 'Close channel %s' % root.number
+ message = MIMEMessage(Element('ok'), self.CONTENT_TYPE)
+ self.channel.send_rpy(msgno, message)
+ # TODO: close the channel or, if channelno is 0, terminate the
+ # session... actually, that's done implicitly when the
+ # peer disconnects after receiving the
- def close(self, channel=0, code=200):
- xml = Element('close', number=channel, code=code)
- self.channel.send_msg(MIMEMessage(xml, self.CONTENT_TYPE))
+ def handle_rpy(self, msgno, message):
+ assert message.get_content_type() == self.CONTENT_TYPE
+ root = parse_xml(message.get_payload())
+ if root.name == 'greeting':
+ print 'Greeting...'
+ profiles = [profile.uri for profile in root['profile']]
+ self.session.greeting_received(profiles)
+ elif root.name == 'profile':
+ # This means that the channel has been established with this
+ # profile... basically a channel_start_ok message
+ print 'Profile %s' % root.uri
+ elif root.name == 'ok':
+ # A close request for a channel has been accepted, so we can
+ # close it now
+ print 'OK'
- def start(self, profiles):
- xml = Element('start', number=self.session.channelno.next())[
- [Element('profile', uri=uri) for uri in profiles]
+ def handle_err(self, msgno, message):
+ assert message.get_content_type() == self.CONTENT_TYPE
+ root = parse_xml(message.get_payload())
+ assert root.name == 'error'
+ print root.code
+
+ def close(self, channel=0, code=200, handle_ok=None, handle_error=None):
+ def handle_reply(cmd, msgno, message):
+ if handle_ok is not None and cmd == 'RPY':
+ handle_ok()
+ if handle_error is not None and cmd == 'ERR':
+ root = parse_xml(message.get_payload())
+ handle_error(int(root.code), root.gettext())
+ xml = Element('close', number=channel, code=code)
+ return self.channel.send_msg(MIMEMessage(xml, self.CONTENT_TYPE),
+ handle_reply)
+
+ def start(self, profiles, handle_ok=None, handle_error=None):
+ channelno = self.session.channelno.next()
+ def handle_reply(cmd, msgno, message):
+ if handle_ok is not None and cmd == 'RPY':
+ root = parse_xml(message.get_payload())
+ selected = None
+ for profile in profiles:
+ if profile.URI == root.uri:
+ selected = profile
+ break
+ self.session.channels[channelno] = Channel(self.session,
+ channelno, profile())
+ handle_ok(channelno, root.uri)
+ if handle_error is not None and cmd == 'ERR':
+ root = parse_xml(message.get_payload())
+ handle_error(int(root.code), root.gettext())
+ xml = Element('start', number=channelno)[
+ [Element('profile', uri=profile.URI) for profile in profiles]
]
- self.channel.send_msg(MIMEMessage(xml, self.CONTENT_TYPE))
+ return self.channel.send_msg(MIMEMessage(xml, self.CONTENT_TYPE),
+ handle_reply)
def cycle_through(start, stop=None, step=1):
@@ -357,8 +460,17 @@
del self['MIME-Version']
+# Simple echo profile implementation for testing
+class EchoProfile(Profile):
+ URI = 'http://www.eigenmagic.com/beep/ECHO'
+
+ def handle_msg(self, msgno, message):
+ self.channel.send_rpy(msgno, message)
+
+
if __name__ == '__main__':
listener = Listener('127.0.0.1', 8000)
+ listener.profiles[EchoProfile.URI] = EchoProfile()
try:
asyncore.loop()
except KeyboardInterrupt:
diff --git a/bitten/util/tests/beep.py b/bitten/util/tests/beep.py
--- a/bitten/util/tests/beep.py
+++ b/bitten/util/tests/beep.py
@@ -9,8 +9,8 @@
def __init__(self):
self.sent_messages = []
- def send_data(self, cmd, channel, msgno, more, seqno, ansno=None,
- payload=''):
+ def send_frame(self, cmd, channel, msgno, more, seqno, ansno=None,
+ payload=''):
self.sent_messages.append((cmd, channel, msgno, more, seqno, ansno,
payload.strip()))
@@ -23,8 +23,13 @@
def handle_connect(self):
pass
- def handle_message(self, cmd, msgno, message):
- self.handled_messages.append((cmd, msgno, message.as_string().strip()))
+ def handle_msg(self, msgno, message):
+ text = message.as_string().strip()
+ self.handled_messages.append(('MSG', msgno, text))
+
+ def handle_rpy(self, msgno, message):
+ text = message.as_string().strip()
+ self.handled_messages.append(('RPY', msgno, text))
class ChannelTestCase(unittest.TestCase):
@@ -61,10 +66,10 @@
corresponding message number (0) is reserved.
"""
channel = beep.Channel(self.session, 0, self.profile)
- channel.send_msg(beep.MIMEMessage('foo bar'))
- self.assertEqual(('MSG', 0, 0, False, 0L, None, 'foo bar'),
+ msgno = channel.send_msg(beep.MIMEMessage('foo bar'))
+ self.assertEqual(('MSG', 0, msgno, False, 0L, None, 'foo bar'),
self.session.sent_messages[0])
- assert 0 in channel.msgnos.keys()
+ assert msgno in channel.msgnos.keys()
def test_send_message_msgno_inc(self):
"""
@@ -72,14 +77,25 @@
messages.
"""
channel = beep.Channel(self.session, 0, self.profile)
- channel.send_msg(beep.MIMEMessage('foo bar'))
- self.assertEqual(('MSG', 0, 0, False, 0L, None, 'foo bar'),
+ msgno = channel.send_msg(beep.MIMEMessage('foo bar'))
+ assert msgno == 0
+ self.assertEqual(('MSG', 0, msgno, False, 0L, None, 'foo bar'),
self.session.sent_messages[0])
- assert 0 in channel.msgnos.keys()
- channel.send_msg(beep.MIMEMessage('foo baz'))
- self.assertEqual(('MSG', 0, 1, False, 8L, None, 'foo baz'),
+ assert msgno in channel.msgnos.keys()
+ msgno = channel.send_msg(beep.MIMEMessage('foo baz'))
+ assert msgno == 1
+ self.assertEqual(('MSG', 0, msgno, False, 8L, None, 'foo baz'),
self.session.sent_messages[1])
- assert 1 in channel.msgnos.keys()
+ assert msgno in channel.msgnos.keys()
+
+ def test_send_reply(self):
+ """
+ Verify that sending an ANS message is processed correctly.
+ """
+ channel = beep.Channel(self.session, 0, self.profile)
+ channel.send_rpy(0, beep.MIMEMessage('foo bar'))
+ self.assertEqual(('RPY', 0, 0, False, 0L, None, 'foo bar'),
+ self.session.sent_messages[0])
def test_message_and_reply(self):
"""
@@ -87,25 +103,42 @@
received.
"""
channel = beep.Channel(self.session, 0, self.profile)
- channel.send_msg(beep.MIMEMessage('foo bar'))
- self.assertEqual(('MSG', 0, 0, False, 0L, None, 'foo bar'),
+ msgno = channel.send_msg(beep.MIMEMessage('foo bar'))
+ self.assertEqual(('MSG', 0, msgno, False, 0L, None, 'foo bar'),
self.session.sent_messages[0])
- assert 0 in channel.msgnos.keys()
- channel.handle_frame('RPY', 0, False, 0, None, '42')
- self.assertEqual(('RPY', 0, '42'), self.profile.handled_messages[0])
- assert 0 not in channel.msgnos.keys()
+ assert msgno in channel.msgnos.keys()
+ channel.handle_frame('RPY', msgno, False, 0, None, '42')
+ self.assertEqual(('RPY', msgno, '42'), self.profile.handled_messages[0])
+ assert msgno not in channel.msgnos.keys()
- def test_send_answer(self):
+ def test_send_error(self):
+ """
+ Verify that sending an ERR message is processed correctly.
+ """
+ channel = beep.Channel(self.session, 0, self.profile)
+ channel.send_err(0, beep.MIMEMessage('oops'))
+ self.assertEqual(('ERR', 0, 0, False, 0L, None, 'oops'),
+ self.session.sent_messages[0])
+
+ def test_send_answers(self):
"""
Verify that sending an ANS message is processed correctly.
"""
channel = beep.Channel(self.session, 0, self.profile)
- channel.send_ans(0, beep.MIMEMessage('foo bar'))
- self.assertEqual(('ANS', 0, 0, False, 0L, 0, 'foo bar'),
+ ansno = channel.send_ans(0, beep.MIMEMessage('foo bar'))
+ assert ansno == 0
+ self.assertEqual(('ANS', 0, 0, False, 0L, ansno, 'foo bar'),
self.session.sent_messages[0])
- channel.send_ans(0, beep.MIMEMessage('foo baz'))
- self.assertEqual(('ANS', 0, 0, False, 8L, 1, 'foo baz'),
+ assert 0 in channel.ansnos.keys()
+ ansno = channel.send_ans(0, beep.MIMEMessage('foo baz'))
+ assert ansno == 1
+ self.assertEqual(('ANS', 0, 0, False, 8L, ansno, 'foo baz'),
self.session.sent_messages[1])
+ assert 0 in channel.ansnos.keys()
+ channel.send_nul(0)
+ self.assertEqual(('NUL', 0, 0, False, 16L, None, ''),
+ self.session.sent_messages[2])
+ assert 0 not in channel.ansnos.keys()
def suite():
diff --git a/scripts/beepclient.py b/scripts/beepclient.py
new file mode 100644
--- /dev/null
+++ b/scripts/beepclient.py
@@ -0,0 +1,59 @@
+import asyncore
+import sys
+import time
+
+from bitten.util.beep import Initiator, MIMEMessage, Profile
+from bitten.util.xmlio import Element, parse_xml
+
+
+if __name__ == '__main__':
+ host = 'localhost'
+ port = 8000
+ if len(sys.argv) > 1:
+ host = sys.argv[1]
+ if len(sys.argv) > 2:
+ port = int(sys.argv[2])
+
+
+ class EchoProfile(Profile):
+ URI = 'http://www.eigenmagic.com/beep/ECHO'
+
+ def handle_connect(self):
+ print 'Here we are!'
+ msgno = self.channel.send_msg(MIMEMessage('Hello peer!'))
+
+ def handle_rpy(self, msgno, message):
+ print message.get_payload()
+
+
+ class EchoClient(Initiator):
+
+ def handle_start_error(self, code, message):
+ print>>sys.stderr, 'Error %d: %s' % (code, message)
+
+ def handle_start_ok(self, channelno, uri):
+ print 'Channel %d started for profile %s...' % (channelno, uri)
+ #self.channels[channelno].send_msg(MIMEMessage('Hello'))
+
+ def greeting_received(self, profiles):
+ if EchoProfile.URI in profiles:
+ self.channels[0].profile.start([EchoProfile],
+ handle_ok=self.handle_start_ok,
+ handle_error=self.handle_start_error)
+
+ client = EchoClient(host, port)
+ try:
+ while client:
+ try:
+ asyncore.loop()
+ except KeyboardInterrupt:
+ def handle_ok():
+ raise asyncore.ExitNow, 'Session terminated'
+ def handle_error(code, message):
+ print>>sys.stderr, 'Peer refused to terminate session (%d)' \
+ % code
+ client.channels[0].profile.close(handle_ok=handle_ok,
+ handle_error=handle_error)
+ time.sleep(.2)
+ except asyncore.ExitNow, e:
+ print e
diff --git a/scripts/proxy.py b/scripts/proxy.py
new file mode 100644
--- /dev/null
+++ b/scripts/proxy.py
@@ -0,0 +1,93 @@
+# Based on the proxy module from the Medusa project
+# Used for inspecting the communication between two BEEP peers
+
+import asynchat
+import asyncore
+import socket
+import sys
+
+
+class proxy_server(asyncore.dispatcher):
+
+ def __init__(self, host, port):
+ asyncore.dispatcher.__init__ (self)
+ self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
+ self.set_reuse_addr()
+ self.there = (host, port)
+ here = ('', port + 1)
+ self.bind(here)
+ self.listen(5)
+
+ def handle_accept(self):
+ proxy_receiver(self, self.accept())
+
+
+class proxy_sender(asynchat.async_chat):
+
+ def __init__(self, receiver, address):
+ asynchat.async_chat.__init__(self)
+ self.receiver = receiver
+ self.set_terminator(None)
+ self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
+ self.buffer = ''
+ self.set_terminator('\r\n')
+ self.connect(address)
+ print 'L:', ''
+
+ def handle_connect(self):
+ print 'L:', ''
+
+ def collect_incoming_data(self, data):
+ self.buffer = self.buffer + data
+
+ def found_terminator(self):
+ data = self.buffer
+ self.buffer = ''
+ for line in data.splitlines():
+ print 'L:', '\x1b[35m' + line + '\x1b[0m'
+ self.receiver.push(data + '\r\n')
+
+ def handle_close(self):
+ self.receiver.close()
+ self.close()
+
+
+class proxy_receiver(asynchat.async_chat):
+
+ channel_counter = 0
+
+ def __init__(self, server, (conn, addr)):
+ asynchat.async_chat.__init__(self, conn)
+ self.set_terminator('\r\n')
+ self.server = server
+ self.id = self.channel_counter
+ self.channel_counter = self.channel_counter + 1
+ self.sender = proxy_sender (self, server.there)
+ self.sender.id = self.id
+ self.buffer = ''
+
+ def collect_incoming_data (self, data):
+ self.buffer = self.buffer + data
+
+ def found_terminator(self):
+ data = self.buffer
+ self.buffer = ''
+ for line in data.splitlines():
+ print 'I:', '\x1b[34m' + line + '\x1b[0m'
+ self.sender.push (data + '\r\n')
+
+ def handle_connect(self):
+ print 'I:', ''
+
+ def handle_close(self):
+ print 'I:', ''
+ self.sender.close()
+ self.close()
+
+
+if __name__ == '__main__':
+ if len(sys.argv) < 3:
+ print 'Usage: %s ' % sys.argv[0]
+ else:
+ ps = proxy_server(sys.argv[1], int(sys.argv[2]))
+ asyncore.loop()