changeset 7:8442bcb47a03

Initial draft of a minimal [http://www.beepcore.org/ BEEP] protocol implementation for communication between the build master and build slaves.
author cmlenz
date Thu, 09 Jun 2005 22:19:53 +0000
parents 9b57159428b0
children 45d7bfe64d00
files bitten/tests/__init__.py bitten/util/__init__.py bitten/util/beep.py bitten/util/tests/__init__.py bitten/util/tests/beep.py bitten/util/xmlio.py setup.py
diffstat 7 files changed, 673 insertions(+), 3 deletions(-) [+]
line wrap: on
line diff
--- a/bitten/tests/__init__.py
+++ b/bitten/tests/__init__.py
@@ -21,8 +21,13 @@
 import unittest
 
 from bitten.recipe import tests as recipe
+from bitten.util import tests as util
 
 def suite():
     suite = unittest.TestSuite()
     suite.addTest(recipe.suite())
+    suite.addTest(util.suite())
     return suite
+
+if __name__ == '__main__':
+    unittest.main(defaultTest='suite')
new file mode 100644
new file mode 100644
--- /dev/null
+++ b/bitten/util/beep.py
@@ -0,0 +1,365 @@
+# -*- coding: iso8859-1 -*-
+#
+# Copyright (C) 2005 Christopher Lenz <cmlenz@gmx.de>
+#
+# Bitten is free software; you can redistribute it and/or
+# modify it under the terms of the GNU General Public License as
+# published by the Free Software Foundation; either version 2 of the
+# License, or (at your option) any later version.
+#
+# Trac is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+# General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
+#
+# Author: Christopher Lenz <cmlenz@gmx.de>
+
+import asynchat
+import asyncore
+from email.Message import Message
+from email.Parser import Parser
+import mimetools
+import socket
+try:
+    from cStringIO import StringIO
+except ImportError:
+    from StringIO import StringIO
+import traceback
+
+from bitten.util.xmlio import Element, parse_xml
+
+__all__ = ['Listener', 'Initiator', 'Profile']
+
+class Listener(asyncore.dispatcher):
+
+    def __init__(self, ip, port):
+        asyncore.dispatcher.__init__(self)
+        self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
+        self.set_reuse_addr()
+        self.bind((ip, port))
+
+        self.profiles = {}
+
+        self.listen(5)
+
+        host, port = self.socket.getsockname()
+        if not ip:
+            ip = socket.gethostbyname (socket.gethostname())
+        try:
+            self.server_name = socket.gethostbyaddr(ip)[0]
+        except socket.error:
+            self.server_name = ip
+        print 'Listening on %s:%s' % (self.server_name, port)
+
+    def writable(self):
+        return False
+
+    def handle_read(self):
+        pass
+
+    def readable(self):
+        return True
+
+    def handle_connect(self):
+        pass
+
+    def handle_accept(self):
+        conn, addr = self.accept()
+        print 'Connected to %s:%s' % addr
+        Session(conn, addr, self.profiles, first_channelno=2)
+
+
+class Session(asynchat.async_chat):
+
+    def __init__(self, conn, addr, profiles, first_channelno=1):
+        asynchat.async_chat.__init__(self, conn)
+        self.addr = addr
+        self.set_terminator('\r\n')
+
+        self.profiles = profiles
+        self.channels = {0: Channel(self, 0, ManagementProfile())}
+        self.channelno = cycle_through(first_channelno, 2147483647, step=2)
+        self.inbuf = []
+        self.header = self.payload = None
+
+    def handle_connect(self):
+        pass
+
+    def collect_incoming_data(self, data):
+        self.inbuf.append(data)
+
+    def found_terminator(self):
+        if self.header is None:
+            self.header = ''.join(self.inbuf).split(' ')
+            self.inbuf = []
+            if self.header[0] == 'SEQ':
+                # TCP mapping frame
+                raise NotImplementedError                
+            else:
+                try:
+                    size = int(self.header[int(self.header[0] != 'ANS') - 2])
+                except ValueError:
+                    self.header = None
+                    return
+                if size == 0:
+                    self.payload = ''
+                    self.set_terminator('END\r\n')
+                else:
+                    self.set_terminator(size)
+        elif self.payload is None:
+            self.payload = ''.join(self.inbuf)
+            self.inbuf = []
+            self.set_terminator('END\r\n')
+        else:
+            self._handle_frame(self.header, self.payload)
+            self.header = self.payload = None
+            self.inbuf = []
+            self.set_terminator('\r\n')
+
+    def _handle_frame(self, header, payload):
+        try:
+            # Parse frame header
+            cmd = header[0]
+            if cmd == 'SEQ':
+                # RFC 3081 - need to digest and implement
+                raise NotImplementedError
+            channel = int(header[1])
+            msgno = int(header[2])
+            more = header[3] == '*'
+            seqno = int(header[4])
+            size = int(header[5])
+            if cmd == 'ANS':
+                ansno = int(header[6])
+            else:
+                ansno = None
+            self.channels[channel].handle_frame(cmd, msgno, more, seqno,
+                                                ansno, payload)
+        except Exception, e:
+            traceback.print_exc()
+
+    def send_data(self, cmd, channel, msgno, more, seqno, ansno=None,
+                  payload=''):
+        headerbits = [cmd, channel, msgno, more and '*' or '.', seqno,
+                      len(payload)]
+        if cmd == 'ANS':
+            assert ansno is not None
+            headerbits.append(ansno)
+        header = ' '.join([str(hb) for hb in headerbits])
+        self.push('\r\n'.join((header, payload, 'END', '')))
+
+
+class Initiator(Session):
+
+    def __init__(self, ip, port, profiles=None):
+        Session.__init__(self, None, None, profiles or {})
+        self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
+        self.connect((ip, port))
+
+
+class Channel(object):
+    """A specific channel of a BEEP session."""
+
+    def __init__(self, session, channelno, profile):
+        self.session = session
+        self.channelno = channelno
+        self.inqueue = {}
+        self.outqueue = {}
+
+        self.msgno = cycle_through(0, 2147483647)
+        self.msgnos = {} # message numbers currently in use
+        self.ansnos = {} # answer numbers keyed by msgno, each 0-2147483647
+        self.seqno = [serial(), serial()]
+        self.mime_parser = Parser()
+
+        self.profile = profile
+        self.profile.session = self.session
+        self.profile.channel = self
+        self.profile.handle_connect()
+
+    def handle_frame(self, cmd, msgno, more, seqno, ansno, payload):
+        if seqno != self.seqno[0]: # Validate and update sequence number
+            raise Exception, 'Out of sync with peer' # TODO: Be nice
+        self.seqno[0] += len(payload)
+        if more: # More of this message pending, so push it on the queue
+            self.inqueue.setdefault(msgno, []).append(payload)
+        else: # Complete message received, so handle it
+            if msgno in self.inqueue.keys(): # Recombine queued messages
+                payload = ''.join(self.inqueue[msgno]) + payload
+                del self.inqueue[msgno]
+            if cmd == 'RPY' and msgno in self.msgnos.keys():
+                # Final reply using this message number, so dealloc
+                del self.msgnos[msgno]
+            message = None
+            if payload:
+                message = self.mime_parser.parsestr(payload)
+            self.profile.handle_message(cmd, msgno, message)
+
+    def _send(self, cmd, msgno, ansno=None, message=None):
+        # TODO: Fragment and queue the message if necessary;
+        #       First need TCP mapping (RFC 3081) for that to make real sense
+        payload = ''
+        if message is not None:
+            payload = message.as_string()
+        self.session.send_data(cmd, self.channelno, msgno, False,
+                               self.seqno[1].value, payload=payload,
+                               ansno=ansno)
+        self.seqno[1] += len(payload)
+
+    def send_msg(self, message):
+        while True: # Find a unique message number
+            msgno = self.msgno.next()
+            if msgno not in self.msgnos.keys():
+                break
+        self.msgnos[msgno] = True # Flag the chosen message number as in use
+        self._send('MSG', msgno, None, message)
+
+    def send_rpy(self, msgno, message):
+        self._send('RPY', msgno, None, message)
+
+    def send_err(self, msgno, message):
+        self._send('ERR', msgno, None, message)
+
+    def send_ans(self, msgno, message):
+        if not msgno in self.ansnos.keys():
+            ansno = cycle_through(0, 2147483647)
+            self.ansnos[msgno] = ansno
+        else:
+            ansno = self.ansnos[msgno]
+        self._send('ANS', msgno, ansno.next(), message)
+
+    def send_nul(self, msgno):
+        self._send('NUL', msgno)
+
+
+class Profile(object):
+    """Abstract base class for handlers of specific BEEP profiles."""
+    # TODO: This is pretty thin... would a meta-class for declarative definition
+    #       of profiles work here? 
+
+    def __init__(self):
+        self.session = None
+        self.channel = None
+
+    def handle_connect(self):
+        pass
+
+    def handle_message(self, cmd, message):
+        raise NotImplementedError
+
+
+class ManagementProfile(Profile):
+    CONTENT_TYPE = 'application/beep+xml'
+
+    def __init__(self):
+        Profile.__init__(self)
+        self.state = 'init'
+
+    def handle_connect(self):
+        greeting = Element('greeting')[
+            [Element('profile', uri=k) for k in self.session.profiles.keys()]
+        ]
+        self.channel.send_rpy(0, MIMEMessage(greeting, self.CONTENT_TYPE))
+
+    def handle_message(self, cmd, msgno, message):
+        assert message.get_content_type() == self.CONTENT_TYPE
+        root = parse_xml(message.get_payload())
+        if cmd == 'MSG':
+            if root.name == 'start':
+                print 'Start channel %s' % root.number
+                for profile in root['profile']:
+                    if uri in self.session.profiles.keys():
+                        message = MIMEMessage(Element('profile',
+                                                      uri=profile.uri),
+                                              self.CONTENT_TYPE)
+                        self.channel.send_rpy(msgno, message)
+                        # TODO: create the channel
+                        return
+                # TODO: send error (unsupported profile)
+            elif root.name == 'close':
+                print 'Close channel %s' % root.number
+                message = MIMEMessage(Element('ok'), self.CONTENT_TYPE)
+                self.channel.send_rpy(msgno, message)
+                # TODO: close the channel, or if channelno is 0, terminate the
+                #       session... actually, that's done implicitly when the
+                #       peer disconnects after receiving the <ok/>
+        elif cmd == 'RPY':
+            if root.name == 'greeting':
+                print 'Greeting...'
+                for profile in root['profile']:
+                    print '  profile %s' % profile.uri
+                # TODO: invoke handler handle_greeting(profiles) or something
+            elif root.name == 'profile':
+                # This means that the channel has been established with this
+                # profile... basically a channel_start_ok message
+                print 'Profile %s' % root.uri
+            elif root.name == 'ok':
+                # A close request for a channel has been accepted, so we can
+                # close it now
+                print 'OK'
+
+    def close(self, channel=0, code=200):
+        xml = Element('close', number=channel, code=code)
+        self.channel.send_msg(MIMEMessage(xml, self.CONTENT_TYPE))
+
+    def start(self, profiles):
+        xml = Element('start', number=self.session.channelno.next())[
+            [Element('profile', uri=uri) for uri in profiles]
+        ]
+        self.channel.send_msg(MIMEMessage(xml, self.CONTENT_TYPE))
+
+
+def cycle_through(start, stop=None, step=1):
+    """Utility generator that cycles through a defined range of numbers."""
+    if stop is None:
+        stop = start
+        start = 0
+    cur = start
+    while True:
+        yield cur
+        cur += 1
+        if cur  > stop:
+            cur = start
+
+
+class serial(object):
+    """Serial number (RFC 1982)."""
+
+    def __init__(self, limit=4294967295L):
+        self.value = 0L
+        self.limit = limit
+
+    def __ne__(self, num):
+        return self.value != num
+
+    def __eq__(self, num):
+        return self.value == num
+
+    def __iadd__(self, num):
+        self.value += num
+        if self.value > self.limit:
+            self.value -= self.limit
+        return self
+
+
+class MIMEMessage(Message):
+    """Simplified construction of generic MIME messages for transmission as
+    payload with BEEP."""
+
+    def __init__(self, payload, content_type=None):
+        Message.__init__(self)
+        if content_type:
+            self.set_type(content_type)
+        self.set_payload(str(payload))
+        del self['MIME-Version']
+
+
+if __name__ == '__main__':
+    listener = Listener('127.0.0.1', 8000)
+    try:
+        asyncore.loop()
+    except KeyboardInterrupt:
+        pass
new file mode 100644
--- /dev/null
+++ b/bitten/util/tests/__init__.py
@@ -0,0 +1,34 @@
+# -*- coding: iso8859-1 -*-
+#
+# Copyright (C) 2005 Christopher Lenz <cmlenz@gmx.de>
+#
+# Bitten is free software; you can redistribute it and/or
+# modify it under the terms of the GNU General Public License as
+# published by the Free Software Foundation; either version 2 of the
+# License, or (at your option) any later version.
+#
+# Trac is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+# General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
+#
+# Author: Christopher Lenz <cmlenz@gmx.de>
+
+import doctest
+import unittest
+
+from bitten.util import xmlio
+from bitten.util.tests import beep
+
+def suite():
+    suite = unittest.TestSuite()
+    suite.addTest(beep.suite())
+    suite.addTest(doctest.DocTestSuite(xmlio))
+    return suite
+
+if __name__ == '__main__':
+    unittest.main(defaultTest='suite')
new file mode 100644
--- /dev/null
+++ b/bitten/util/tests/beep.py
@@ -0,0 +1,115 @@
+from email.Message import Message
+import unittest
+
+from bitten.util import beep
+
+
+class MockSession(object):
+
+    def __init__(self):
+        self.sent_messages = []
+
+    def send_data(self, cmd, channel, msgno, more, seqno, ansno=None,
+                  payload=''):
+        self.sent_messages.append((cmd, channel, msgno, more, seqno, ansno,
+                                   payload.strip()))
+
+
+class MockProfile(object):
+
+    def __init__(self):
+        self.handled_messages = []
+
+    def handle_connect(self):
+        pass
+
+    def handle_message(self, cmd, msgno, message):
+        self.handled_messages.append((cmd, msgno, message.as_string().strip()))
+
+
+class ChannelTestCase(unittest.TestCase):
+
+    def setUp(self):
+        self.session = MockSession()
+        self.profile = MockProfile()
+
+    def test_handle_single_msg_frame(self):
+        """
+        Verify that the channel correctly passes a single frame MSG to the
+        profile.
+        """
+        channel = beep.Channel(self.session, 0, self.profile)
+        channel.handle_frame('MSG', 0, False, 0, None, 'foo bar')
+        self.assertEqual(('MSG', 0, 'foo bar'),
+                         self.profile.handled_messages[0])
+
+    def test_handle_segmented_msg_frames(self):
+        """
+        Verify that the channel combines two segmented messages and passed the
+        recombined message to the profile.
+        """
+        channel = beep.Channel(self.session, 0, self.profile)
+        channel.handle_frame('MSG', 0, True, 0, None, 'foo ')
+        channel.handle_frame('MSG', 0, False, 4, None, 'bar')
+        self.assertEqual(('MSG', 0, 'foo bar'),
+                         self.profile.handled_messages[0])
+
+    def test_send_single_frame_message(self):
+        """
+        Verify that the channel passes a sent message up to the session for
+        transmission with the correct parameters. Also assert that the
+        corresponding message number (0) is reserved.
+        """
+        channel = beep.Channel(self.session, 0, self.profile)
+        channel.send_msg(beep.MIMEMessage('foo bar'))
+        self.assertEqual(('MSG', 0, 0, False, 0L, None, 'foo bar'),
+                         self.session.sent_messages[0])
+        assert 0 in channel.msgnos.keys()
+
+    def test_send_message_msgno_inc(self):
+        """
+        Verify that the message number is incremented for subsequent outgoing
+        messages.
+        """
+        channel = beep.Channel(self.session, 0, self.profile)
+        channel.send_msg(beep.MIMEMessage('foo bar'))
+        self.assertEqual(('MSG', 0, 0, False, 0L, None, 'foo bar'),
+                         self.session.sent_messages[0])
+        assert 0 in channel.msgnos.keys()
+        channel.send_msg(beep.MIMEMessage('foo baz'))
+        self.assertEqual(('MSG', 0, 1, False, 8L, None, 'foo baz'),
+                         self.session.sent_messages[1])
+        assert 1 in channel.msgnos.keys()
+
+    def test_message_and_reply(self):
+        """
+        Verify that a message number is deallocated after a final reply has been
+        received.
+        """
+        channel = beep.Channel(self.session, 0, self.profile)
+        channel.send_msg(beep.MIMEMessage('foo bar'))
+        self.assertEqual(('MSG', 0, 0, False, 0L, None, 'foo bar'),
+                         self.session.sent_messages[0])
+        assert 0 in channel.msgnos.keys()
+        channel.handle_frame('RPY', 0, False, 0, None, '42')
+        self.assertEqual(('RPY', 0, '42'), self.profile.handled_messages[0])
+        assert 0 not in channel.msgnos.keys()
+
+    def test_send_answer(self):
+        """
+        Verify that sending an ANS message is processed correctly.
+        """
+        channel = beep.Channel(self.session, 0, self.profile)
+        channel.send_ans(0, beep.MIMEMessage('foo bar'))
+        self.assertEqual(('ANS', 0, 0, False, 0L, 0, 'foo bar'),
+                         self.session.sent_messages[0])
+        channel.send_ans(0, beep.MIMEMessage('foo baz'))
+        self.assertEqual(('ANS', 0, 0, False, 8L, 1, 'foo baz'),
+                         self.session.sent_messages[1])
+
+
+def suite():
+    return unittest.makeSuite(ChannelTestCase, 'test')
+
+if __name__ == '__main__':
+	unittest.main()
new file mode 100644
--- /dev/null
+++ b/bitten/util/xmlio.py
@@ -0,0 +1,152 @@
+# -*- coding: iso8859-1 -*-
+#
+# Copyright (C) 2005 Christopher Lenz <cmlenz@gmx.de>
+#
+# Bitten is free software; you can redistribute it and/or
+# modify it under the terms of the GNU General Public License as
+# published by the Free Software Foundation; either version 2 of the
+# License, or (at your option) any later version.
+#
+# Trac is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+# General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
+#
+# Author: Christopher Lenz <cmlenz@gmx.de>
+
+try:
+    from cStringIO import StringIO
+except ImportError:
+    from StringIO import StringIO
+
+
+__all__ = ['Element', 'parse']
+
+
+class Element(object):
+    """Simple XML output generator based on the builder pattern.
+
+    Construct XML elements by passing the tag name to the constructor:
+
+    >>> print Element('foo')
+    <foo/>
+
+    Attributes can be specified using keyword arguments. The values of the
+    arguments will be converted to strings and any special XML characters
+    escaped:
+
+    >>> print Element('foo', bar=42)
+    <foo bar="42"/>
+    >>> print Element('foo', bar='1 < 2')
+    <foo bar="1 &lt; 2"/>
+    >>> print Element('foo', bar='"baz"')
+    <foo bar="&#34;baz&#34;"/>
+
+    The order in which attributes are rendered is undefined.
+
+    Elements can be using item access notation:
+
+    >>> print Element('foo')[Element('bar'), Element('baz')]
+    <foo><bar/><baz/></foo>
+
+    Text nodes can be nested in an element by using strings instead of elements
+    in item access. Any special characters in the strings are escaped
+    automatically:
+
+    >>> print Element('foo')['Hello world']
+    <foo>Hello world</foo>
+    >>> print Element('foo')['1 < 2']
+    <foo>1 &lt; 2</foo>
+
+    This technique also allows mixed content:
+
+    >>> print Element('foo')['Hello ', Element('b')['world']]
+    <foo>Hello <b>world</b></foo>
+    """
+    __slots__ = ['name', 'attrs', 'children']
+
+    def __init__(self, name, **attrs):
+        """Create an XML element using the specified tag name.
+        
+        All keyword arguments are handled as attributes of the element.
+        """
+        self.name = name
+        self.attrs = attrs
+        self.children = []
+
+    def __getitem__(self, children):
+        """Add child nodes to an element."""
+        if not isinstance(children, (list, tuple)):
+            children = [children]
+        self.children = children
+        return self
+
+    def __str__(self):
+        """Return a string representation of the XML element."""
+        buf = StringIO()
+        buf.write('<')
+        buf.write(self.name)
+        for name, value in self.attrs.items():
+            buf.write(' ')
+            buf.write(name)
+            buf.write('="')
+            buf.write(self._escape_attr(value))
+            buf.write('"')
+        if self.children:
+            buf.write('>')
+            for child in self.children:
+                if isinstance(child, Element):
+                    buf.write(str(child))
+                else:
+                    buf.write(self._escape_text(child))
+            buf.write('</')
+            buf.write(self.name)
+            buf.write('>')
+        else:
+            buf.write('/>')
+        return buf.getvalue()
+
+    def _escape_text(self, text):
+        return str(text).replace('&', '&amp;').replace('<', '&lt;') \
+                        .replace('>', '&gt;')
+
+    def _escape_attr(self, attr):
+        return self._escape_text(attr).replace('"', '&#34;')
+
+
+def parse_xml(text):
+    from xml.dom import minidom
+    if isinstance(text, (str, unicode)):
+        dom = minidom.parseString(text)
+    else:
+        dom = minidom.parse(text)
+    return ParsedElement(dom.documentElement)
+
+
+class ParsedElement(object):
+    __slots__ = ['node']
+
+    def __init__(self, node):
+        self.node = node
+
+    name = property(fget=lambda self: self.node.tagName)
+
+    def __getattr__(self, name):
+        return self.node.getAttribute(name)
+
+    def __getitem__(self, name):
+        for child in [c for c in self.node.childNodes if c.nodeType == 1]:
+            if name in ('*', child.tagName):
+                yield ParsedElement(child)
+
+    def __iter__(self):
+        return self['*']
+
+
+if __name__ == '__main__':
+    import doctest
+    doctest.testmod()
--- a/setup.py
+++ b/setup.py
@@ -24,7 +24,6 @@
 from bitten.distutils.testrunner import unittest
 
 setup(name='bitten', version='1.0',
-      packages=['bitten', 'bitten.distutils', 'bitten.recipe'],
+      packages=['bitten', 'bitten.distutils', 'bitten.recipe', 'bitten.util'],
       author="Christopher Lenz", author_email="cmlenz@gmx.de",
-      url="http://projects.edgewall.com/bitten/",
-      cmdclass={'unittest': unittest})
+      url="http://bitten.cmlenz.net/", cmdclass={'unittest': unittest})
Copyright (C) 2012-2017 Edgewall Software