# HG changeset patch # User cmlenz # Date 1118355593 0 # Node ID 8442bcb47a03fe7330c717ec1e87b45eb25344c3 # Parent 9b57159428b06032dbb8f16efadddc59aac9ea8e Initial draft of a minimal [http://www.beepcore.org/ BEEP] protocol implementation for communication between the build master and build slaves. diff --git a/bitten/tests/__init__.py b/bitten/tests/__init__.py --- a/bitten/tests/__init__.py +++ b/bitten/tests/__init__.py @@ -21,8 +21,13 @@ import unittest from bitten.recipe import tests as recipe +from bitten.util import tests as util def suite(): suite = unittest.TestSuite() suite.addTest(recipe.suite()) + suite.addTest(util.suite()) return suite + +if __name__ == '__main__': + unittest.main(defaultTest='suite') diff --git a/bitten/util/__init__.py b/bitten/util/__init__.py new file mode 100644 diff --git a/bitten/util/beep.py b/bitten/util/beep.py new file mode 100644 --- /dev/null +++ b/bitten/util/beep.py @@ -0,0 +1,365 @@ +# -*- coding: iso8859-1 -*- +# +# Copyright (C) 2005 Christopher Lenz +# +# Bitten is free software; you can redistribute it and/or +# modify it under the terms of the GNU General Public License as +# published by the Free Software Foundation; either version 2 of the +# License, or (at your option) any later version. +# +# Trac is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. +# +# Author: Christopher Lenz + +import asynchat +import asyncore +from email.Message import Message +from email.Parser import Parser +import mimetools +import socket +try: + from cStringIO import StringIO +except ImportError: + from StringIO import StringIO +import traceback + +from bitten.util.xmlio import Element, parse_xml + +__all__ = ['Listener', 'Initiator', 'Profile'] + +class Listener(asyncore.dispatcher): + + def __init__(self, ip, port): + asyncore.dispatcher.__init__(self) + self.create_socket(socket.AF_INET, socket.SOCK_STREAM) + self.set_reuse_addr() + self.bind((ip, port)) + + self.profiles = {} + + self.listen(5) + + host, port = self.socket.getsockname() + if not ip: + ip = socket.gethostbyname (socket.gethostname()) + try: + self.server_name = socket.gethostbyaddr(ip)[0] + except socket.error: + self.server_name = ip + print 'Listening on %s:%s' % (self.server_name, port) + + def writable(self): + return False + + def handle_read(self): + pass + + def readable(self): + return True + + def handle_connect(self): + pass + + def handle_accept(self): + conn, addr = self.accept() + print 'Connected to %s:%s' % addr + Session(conn, addr, self.profiles, first_channelno=2) + + +class Session(asynchat.async_chat): + + def __init__(self, conn, addr, profiles, first_channelno=1): + asynchat.async_chat.__init__(self, conn) + self.addr = addr + self.set_terminator('\r\n') + + self.profiles = profiles + self.channels = {0: Channel(self, 0, ManagementProfile())} + self.channelno = cycle_through(first_channelno, 2147483647, step=2) + self.inbuf = [] + self.header = self.payload = None + + def handle_connect(self): + pass + + def collect_incoming_data(self, data): + self.inbuf.append(data) + + def found_terminator(self): + if self.header is None: + self.header = ''.join(self.inbuf).split(' ') + self.inbuf = [] + if self.header[0] == 'SEQ': + # TCP mapping frame + raise NotImplementedError + else: + try: + size = int(self.header[int(self.header[0] != 'ANS') - 2]) + except ValueError: + self.header = None + return + if size == 0: + self.payload = '' + self.set_terminator('END\r\n') + else: + self.set_terminator(size) + elif self.payload is None: + self.payload = ''.join(self.inbuf) + self.inbuf = [] + self.set_terminator('END\r\n') + else: + self._handle_frame(self.header, self.payload) + self.header = self.payload = None + self.inbuf = [] + self.set_terminator('\r\n') + + def _handle_frame(self, header, payload): + try: + # Parse frame header + cmd = header[0] + if cmd == 'SEQ': + # RFC 3081 - need to digest and implement + raise NotImplementedError + channel = int(header[1]) + msgno = int(header[2]) + more = header[3] == '*' + seqno = int(header[4]) + size = int(header[5]) + if cmd == 'ANS': + ansno = int(header[6]) + else: + ansno = None + self.channels[channel].handle_frame(cmd, msgno, more, seqno, + ansno, payload) + except Exception, e: + traceback.print_exc() + + def send_data(self, cmd, channel, msgno, more, seqno, ansno=None, + payload=''): + headerbits = [cmd, channel, msgno, more and '*' or '.', seqno, + len(payload)] + if cmd == 'ANS': + assert ansno is not None + headerbits.append(ansno) + header = ' '.join([str(hb) for hb in headerbits]) + self.push('\r\n'.join((header, payload, 'END', ''))) + + +class Initiator(Session): + + def __init__(self, ip, port, profiles=None): + Session.__init__(self, None, None, profiles or {}) + self.create_socket(socket.AF_INET, socket.SOCK_STREAM) + self.connect((ip, port)) + + +class Channel(object): + """A specific channel of a BEEP session.""" + + def __init__(self, session, channelno, profile): + self.session = session + self.channelno = channelno + self.inqueue = {} + self.outqueue = {} + + self.msgno = cycle_through(0, 2147483647) + self.msgnos = {} # message numbers currently in use + self.ansnos = {} # answer numbers keyed by msgno, each 0-2147483647 + self.seqno = [serial(), serial()] + self.mime_parser = Parser() + + self.profile = profile + self.profile.session = self.session + self.profile.channel = self + self.profile.handle_connect() + + def handle_frame(self, cmd, msgno, more, seqno, ansno, payload): + if seqno != self.seqno[0]: # Validate and update sequence number + raise Exception, 'Out of sync with peer' # TODO: Be nice + self.seqno[0] += len(payload) + if more: # More of this message pending, so push it on the queue + self.inqueue.setdefault(msgno, []).append(payload) + else: # Complete message received, so handle it + if msgno in self.inqueue.keys(): # Recombine queued messages + payload = ''.join(self.inqueue[msgno]) + payload + del self.inqueue[msgno] + if cmd == 'RPY' and msgno in self.msgnos.keys(): + # Final reply using this message number, so dealloc + del self.msgnos[msgno] + message = None + if payload: + message = self.mime_parser.parsestr(payload) + self.profile.handle_message(cmd, msgno, message) + + def _send(self, cmd, msgno, ansno=None, message=None): + # TODO: Fragment and queue the message if necessary; + # First need TCP mapping (RFC 3081) for that to make real sense + payload = '' + if message is not None: + payload = message.as_string() + self.session.send_data(cmd, self.channelno, msgno, False, + self.seqno[1].value, payload=payload, + ansno=ansno) + self.seqno[1] += len(payload) + + def send_msg(self, message): + while True: # Find a unique message number + msgno = self.msgno.next() + if msgno not in self.msgnos.keys(): + break + self.msgnos[msgno] = True # Flag the chosen message number as in use + self._send('MSG', msgno, None, message) + + def send_rpy(self, msgno, message): + self._send('RPY', msgno, None, message) + + def send_err(self, msgno, message): + self._send('ERR', msgno, None, message) + + def send_ans(self, msgno, message): + if not msgno in self.ansnos.keys(): + ansno = cycle_through(0, 2147483647) + self.ansnos[msgno] = ansno + else: + ansno = self.ansnos[msgno] + self._send('ANS', msgno, ansno.next(), message) + + def send_nul(self, msgno): + self._send('NUL', msgno) + + +class Profile(object): + """Abstract base class for handlers of specific BEEP profiles.""" + # TODO: This is pretty thin... would a meta-class for declarative definition + # of profiles work here? + + def __init__(self): + self.session = None + self.channel = None + + def handle_connect(self): + pass + + def handle_message(self, cmd, message): + raise NotImplementedError + + +class ManagementProfile(Profile): + CONTENT_TYPE = 'application/beep+xml' + + def __init__(self): + Profile.__init__(self) + self.state = 'init' + + def handle_connect(self): + greeting = Element('greeting')[ + [Element('profile', uri=k) for k in self.session.profiles.keys()] + ] + self.channel.send_rpy(0, MIMEMessage(greeting, self.CONTENT_TYPE)) + + def handle_message(self, cmd, msgno, message): + assert message.get_content_type() == self.CONTENT_TYPE + root = parse_xml(message.get_payload()) + if cmd == 'MSG': + if root.name == 'start': + print 'Start channel %s' % root.number + for profile in root['profile']: + if uri in self.session.profiles.keys(): + message = MIMEMessage(Element('profile', + uri=profile.uri), + self.CONTENT_TYPE) + self.channel.send_rpy(msgno, message) + # TODO: create the channel + return + # TODO: send error (unsupported profile) + elif root.name == 'close': + print 'Close channel %s' % root.number + message = MIMEMessage(Element('ok'), self.CONTENT_TYPE) + self.channel.send_rpy(msgno, message) + # TODO: close the channel, or if channelno is 0, terminate the + # session... actually, that's done implicitly when the + # peer disconnects after receiving the + elif cmd == 'RPY': + if root.name == 'greeting': + print 'Greeting...' + for profile in root['profile']: + print ' profile %s' % profile.uri + # TODO: invoke handler handle_greeting(profiles) or something + elif root.name == 'profile': + # This means that the channel has been established with this + # profile... basically a channel_start_ok message + print 'Profile %s' % root.uri + elif root.name == 'ok': + # A close request for a channel has been accepted, so we can + # close it now + print 'OK' + + def close(self, channel=0, code=200): + xml = Element('close', number=channel, code=code) + self.channel.send_msg(MIMEMessage(xml, self.CONTENT_TYPE)) + + def start(self, profiles): + xml = Element('start', number=self.session.channelno.next())[ + [Element('profile', uri=uri) for uri in profiles] + ] + self.channel.send_msg(MIMEMessage(xml, self.CONTENT_TYPE)) + + +def cycle_through(start, stop=None, step=1): + """Utility generator that cycles through a defined range of numbers.""" + if stop is None: + stop = start + start = 0 + cur = start + while True: + yield cur + cur += 1 + if cur > stop: + cur = start + + +class serial(object): + """Serial number (RFC 1982).""" + + def __init__(self, limit=4294967295L): + self.value = 0L + self.limit = limit + + def __ne__(self, num): + return self.value != num + + def __eq__(self, num): + return self.value == num + + def __iadd__(self, num): + self.value += num + if self.value > self.limit: + self.value -= self.limit + return self + + +class MIMEMessage(Message): + """Simplified construction of generic MIME messages for transmission as + payload with BEEP.""" + + def __init__(self, payload, content_type=None): + Message.__init__(self) + if content_type: + self.set_type(content_type) + self.set_payload(str(payload)) + del self['MIME-Version'] + + +if __name__ == '__main__': + listener = Listener('127.0.0.1', 8000) + try: + asyncore.loop() + except KeyboardInterrupt: + pass diff --git a/bitten/util/tests/__init__.py b/bitten/util/tests/__init__.py new file mode 100644 --- /dev/null +++ b/bitten/util/tests/__init__.py @@ -0,0 +1,34 @@ +# -*- coding: iso8859-1 -*- +# +# Copyright (C) 2005 Christopher Lenz +# +# Bitten is free software; you can redistribute it and/or +# modify it under the terms of the GNU General Public License as +# published by the Free Software Foundation; either version 2 of the +# License, or (at your option) any later version. +# +# Trac is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. +# +# Author: Christopher Lenz + +import doctest +import unittest + +from bitten.util import xmlio +from bitten.util.tests import beep + +def suite(): + suite = unittest.TestSuite() + suite.addTest(beep.suite()) + suite.addTest(doctest.DocTestSuite(xmlio)) + return suite + +if __name__ == '__main__': + unittest.main(defaultTest='suite') diff --git a/bitten/util/tests/beep.py b/bitten/util/tests/beep.py new file mode 100644 --- /dev/null +++ b/bitten/util/tests/beep.py @@ -0,0 +1,115 @@ +from email.Message import Message +import unittest + +from bitten.util import beep + + +class MockSession(object): + + def __init__(self): + self.sent_messages = [] + + def send_data(self, cmd, channel, msgno, more, seqno, ansno=None, + payload=''): + self.sent_messages.append((cmd, channel, msgno, more, seqno, ansno, + payload.strip())) + + +class MockProfile(object): + + def __init__(self): + self.handled_messages = [] + + def handle_connect(self): + pass + + def handle_message(self, cmd, msgno, message): + self.handled_messages.append((cmd, msgno, message.as_string().strip())) + + +class ChannelTestCase(unittest.TestCase): + + def setUp(self): + self.session = MockSession() + self.profile = MockProfile() + + def test_handle_single_msg_frame(self): + """ + Verify that the channel correctly passes a single frame MSG to the + profile. + """ + channel = beep.Channel(self.session, 0, self.profile) + channel.handle_frame('MSG', 0, False, 0, None, 'foo bar') + self.assertEqual(('MSG', 0, 'foo bar'), + self.profile.handled_messages[0]) + + def test_handle_segmented_msg_frames(self): + """ + Verify that the channel combines two segmented messages and passed the + recombined message to the profile. + """ + channel = beep.Channel(self.session, 0, self.profile) + channel.handle_frame('MSG', 0, True, 0, None, 'foo ') + channel.handle_frame('MSG', 0, False, 4, None, 'bar') + self.assertEqual(('MSG', 0, 'foo bar'), + self.profile.handled_messages[0]) + + def test_send_single_frame_message(self): + """ + Verify that the channel passes a sent message up to the session for + transmission with the correct parameters. Also assert that the + corresponding message number (0) is reserved. + """ + channel = beep.Channel(self.session, 0, self.profile) + channel.send_msg(beep.MIMEMessage('foo bar')) + self.assertEqual(('MSG', 0, 0, False, 0L, None, 'foo bar'), + self.session.sent_messages[0]) + assert 0 in channel.msgnos.keys() + + def test_send_message_msgno_inc(self): + """ + Verify that the message number is incremented for subsequent outgoing + messages. + """ + channel = beep.Channel(self.session, 0, self.profile) + channel.send_msg(beep.MIMEMessage('foo bar')) + self.assertEqual(('MSG', 0, 0, False, 0L, None, 'foo bar'), + self.session.sent_messages[0]) + assert 0 in channel.msgnos.keys() + channel.send_msg(beep.MIMEMessage('foo baz')) + self.assertEqual(('MSG', 0, 1, False, 8L, None, 'foo baz'), + self.session.sent_messages[1]) + assert 1 in channel.msgnos.keys() + + def test_message_and_reply(self): + """ + Verify that a message number is deallocated after a final reply has been + received. + """ + channel = beep.Channel(self.session, 0, self.profile) + channel.send_msg(beep.MIMEMessage('foo bar')) + self.assertEqual(('MSG', 0, 0, False, 0L, None, 'foo bar'), + self.session.sent_messages[0]) + assert 0 in channel.msgnos.keys() + channel.handle_frame('RPY', 0, False, 0, None, '42') + self.assertEqual(('RPY', 0, '42'), self.profile.handled_messages[0]) + assert 0 not in channel.msgnos.keys() + + def test_send_answer(self): + """ + Verify that sending an ANS message is processed correctly. + """ + channel = beep.Channel(self.session, 0, self.profile) + channel.send_ans(0, beep.MIMEMessage('foo bar')) + self.assertEqual(('ANS', 0, 0, False, 0L, 0, 'foo bar'), + self.session.sent_messages[0]) + channel.send_ans(0, beep.MIMEMessage('foo baz')) + self.assertEqual(('ANS', 0, 0, False, 8L, 1, 'foo baz'), + self.session.sent_messages[1]) + + +def suite(): + return unittest.makeSuite(ChannelTestCase, 'test') + +if __name__ == '__main__': + unittest.main() diff --git a/bitten/util/xmlio.py b/bitten/util/xmlio.py new file mode 100644 --- /dev/null +++ b/bitten/util/xmlio.py @@ -0,0 +1,152 @@ +# -*- coding: iso8859-1 -*- +# +# Copyright (C) 2005 Christopher Lenz +# +# Bitten is free software; you can redistribute it and/or +# modify it under the terms of the GNU General Public License as +# published by the Free Software Foundation; either version 2 of the +# License, or (at your option) any later version. +# +# Trac is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. +# +# Author: Christopher Lenz + +try: + from cStringIO import StringIO +except ImportError: + from StringIO import StringIO + + +__all__ = ['Element', 'parse'] + + +class Element(object): + """Simple XML output generator based on the builder pattern. + + Construct XML elements by passing the tag name to the constructor: + + >>> print Element('foo') + + + Attributes can be specified using keyword arguments. The values of the + arguments will be converted to strings and any special XML characters + escaped: + + >>> print Element('foo', bar=42) + + >>> print Element('foo', bar='1 < 2') + + >>> print Element('foo', bar='"baz"') + + + The order in which attributes are rendered is undefined. + + Elements can be using item access notation: + + >>> print Element('foo')[Element('bar'), Element('baz')] + + + Text nodes can be nested in an element by using strings instead of elements + in item access. Any special characters in the strings are escaped + automatically: + + >>> print Element('foo')['Hello world'] + Hello world + >>> print Element('foo')['1 < 2'] + 1 < 2 + + This technique also allows mixed content: + + >>> print Element('foo')['Hello ', Element('b')['world']] + Hello world + """ + __slots__ = ['name', 'attrs', 'children'] + + def __init__(self, name, **attrs): + """Create an XML element using the specified tag name. + + All keyword arguments are handled as attributes of the element. + """ + self.name = name + self.attrs = attrs + self.children = [] + + def __getitem__(self, children): + """Add child nodes to an element.""" + if not isinstance(children, (list, tuple)): + children = [children] + self.children = children + return self + + def __str__(self): + """Return a string representation of the XML element.""" + buf = StringIO() + buf.write('<') + buf.write(self.name) + for name, value in self.attrs.items(): + buf.write(' ') + buf.write(name) + buf.write('="') + buf.write(self._escape_attr(value)) + buf.write('"') + if self.children: + buf.write('>') + for child in self.children: + if isinstance(child, Element): + buf.write(str(child)) + else: + buf.write(self._escape_text(child)) + buf.write('') + else: + buf.write('/>') + return buf.getvalue() + + def _escape_text(self, text): + return str(text).replace('&', '&').replace('<', '<') \ + .replace('>', '>') + + def _escape_attr(self, attr): + return self._escape_text(attr).replace('"', '"') + + +def parse_xml(text): + from xml.dom import minidom + if isinstance(text, (str, unicode)): + dom = minidom.parseString(text) + else: + dom = minidom.parse(text) + return ParsedElement(dom.documentElement) + + +class ParsedElement(object): + __slots__ = ['node'] + + def __init__(self, node): + self.node = node + + name = property(fget=lambda self: self.node.tagName) + + def __getattr__(self, name): + return self.node.getAttribute(name) + + def __getitem__(self, name): + for child in [c for c in self.node.childNodes if c.nodeType == 1]: + if name in ('*', child.tagName): + yield ParsedElement(child) + + def __iter__(self): + return self['*'] + + +if __name__ == '__main__': + import doctest + doctest.testmod() diff --git a/setup.py b/setup.py --- a/setup.py +++ b/setup.py @@ -24,7 +24,6 @@ from bitten.distutils.testrunner import unittest setup(name='bitten', version='1.0', - packages=['bitten', 'bitten.distutils', 'bitten.recipe'], + packages=['bitten', 'bitten.distutils', 'bitten.recipe', 'bitten.util'], author="Christopher Lenz", author_email="cmlenz@gmx.de", - url="http://projects.edgewall.com/bitten/", - cmdclass={'unittest': unittest}) + url="http://bitten.cmlenz.net/", cmdclass={'unittest': unittest})