changeset 27:a8e509f4d33b

Added a couple of unit tests for the BEEP management profile. See #2. More to come.
author cmlenz
date Sun, 19 Jun 2005 14:23:33 +0000
parents be89881ee0a1
children 1e562dd56ec0
files bitten/util/beep.py bitten/util/tests/beep.py
diffstat 2 files changed, 261 insertions(+), 26 deletions(-) [+]
line wrap: on
line diff
--- a/bitten/util/beep.py
+++ b/bitten/util/beep.py
@@ -338,10 +338,7 @@
         self.seqno = [serial(), serial()] # incoming, outgoing sequence numbers
         self.mime_parser = Parser()
 
-        self.profile = profile_cls()
-        self.profile.session = self.session
-        self.profile.channel = self
-
+        self.profile = profile_cls(self)
         self.profile.handle_connect()
 
     def close(self):
@@ -379,11 +376,11 @@
             return
 
         # Complete message received, so handle it
-        if msgno in self.inqueue.keys():
+        if msgno in self.inqueue:
             # Recombine queued messages
             payload = ''.join(self.inqueue[msgno]) + payload
             del self.inqueue[msgno]
-        if cmd == 'RPY' and msgno in self.msgnos.keys():
+        if cmd == 'RPY' and msgno in self.msgnos:
             # Final reply using this message number, so dealloc
             del self.msgnos[msgno]
         message = None
@@ -393,7 +390,7 @@
         if cmd == 'MSG':
             self.profile.handle_msg(msgno, message)
         else:
-            if msgno in self.reply_handlers.keys():
+            if msgno in self.reply_handlers:
                 self.reply_handlers[msgno](cmd, msgno, message)
                 del self.reply_handlers[msgno]
             elif cmd == 'RPY':
@@ -429,7 +426,7 @@
     def send_msg(self, message, handle_reply=None):
         while True: # Find a unique message number
             msgno = self.msgno.next()
-            if msgno not in self.msgnos.keys():
+            if msgno not in self.msgnos:
                 break
         self.msgnos[msgno] = True # Flag the chosen message number as in use
         if handle_reply is not None:
@@ -444,7 +441,7 @@
         self._send('ERR', msgno, None, message)
 
     def send_ans(self, msgno, message):
-        if not msgno in self.ansnos.keys():
+        if not msgno in self.ansnos:
             ansno = cycle_through(0, 2147483647)
             self.ansnos[msgno] = ansno
         else:
@@ -465,10 +462,10 @@
     and may override any of the others.
     """
 
-    def __init__(self):
+    def __init__(self, channel):
         """Create the profile."""
-        self.session = None
-        self.channel = None
+        self.session = channel.session
+        self.channel = channel
 
     def handle_connect(self):
         """Called when the channel this profile is associated with is
@@ -511,7 +508,7 @@
 
         if elem.tagname == 'start':
             for profile in elem['profile']:
-                if profile.uri in self.session.profiles.keys():
+                if profile.uri in self.session.profiles:
                     logging.debug('Start channel %s for profile <%s>',
                                   elem.number, profile.uri)
                     channel = Channel(self.session, int(elem.number),
--- a/bitten/util/tests/beep.py
+++ b/bitten/util/tests/beep.py
@@ -1,13 +1,19 @@
-from email.Message import Message
+import logging
 import unittest
 
 from bitten.util import beep
+from bitten.util.xmlio import Element
 
 
-class MockSession(object):
+class MockSession(beep.Initiator):
 
     def __init__(self):
+        self.profiles = {}
         self.sent_messages = []
+        self.channelno = beep.cycle_through(1, 2147483647, step=2)
+        self.channels = {0: beep.Channel(self, 0, beep.ManagementProfileHandler)}
+        del self.sent_messages[0] # Clear out the management greeting
+        self.channels[0].seqno = [beep.serial(), beep.serial()]
 
     def send_data_frame(self, cmd, channel, msgno, more, seqno, ansno=None,
                         payload=''):
@@ -16,13 +22,17 @@
 
 
 class MockProfileHandler(object):
+    URI = 'http://example.com/mock'
 
-    def __init__(self):
+    def __init__(self, channel):
         self.handled_messages = []
 
     def handle_connect(self):
         pass
 
+    def handle_disconnect(self):
+        pass
+
     def handle_msg(self, msgno, message):
         text = message.as_string().strip()
         self.handled_messages.append(('MSG', msgno, text))
@@ -78,7 +88,7 @@
         msgno = channel.send_msg(beep.MIMEMessage('foo bar'))
         self.assertEqual(('MSG', 0, msgno, False, 0L, None, 'foo bar'),
                          self.session.sent_messages[0])
-        assert msgno in channel.msgnos.keys()
+        assert msgno in channel.msgnos
 
     def test_send_frames_seqno_incrementing(self):
         """
@@ -103,12 +113,12 @@
         assert msgno == 0
         self.assertEqual(('MSG', 0, msgno, False, 0L, None, 'foo bar'),
                          self.session.sent_messages[0])
-        assert msgno in channel.msgnos.keys()
+        assert msgno in channel.msgnos
         msgno = channel.send_msg(beep.MIMEMessage('foo baz'))
         assert msgno == 1
         self.assertEqual(('MSG', 0, msgno, False, 8L, None, 'foo baz'),
                          self.session.sent_messages[1])
-        assert msgno in channel.msgnos.keys()
+        assert msgno in channel.msgnos
 
     def test_send_reply(self):
         """
@@ -128,11 +138,11 @@
         msgno = channel.send_msg(beep.MIMEMessage('foo bar'))
         self.assertEqual(('MSG', 0, msgno, False, 0L, None, 'foo bar'),
                          self.session.sent_messages[0])
-        assert msgno in channel.msgnos.keys()
+        assert msgno in channel.msgnos
         channel.handle_data_frame('RPY', msgno, False, 0, None, '42')
         self.assertEqual(('RPY', msgno, '42'),
                          channel.profile.handled_messages[0])
-        assert msgno not in channel.msgnos.keys()
+        assert msgno not in channel.msgnos
 
     def test_send_error(self):
         """
@@ -152,20 +162,248 @@
         assert ansno == 0
         self.assertEqual(('ANS', 0, 0, False, 0L, ansno, 'foo bar'),
                          self.session.sent_messages[0])
-        assert 0 in channel.ansnos.keys()
+        assert 0 in channel.ansnos
         ansno = channel.send_ans(0, beep.MIMEMessage('foo baz'))
         assert ansno == 1
         self.assertEqual(('ANS', 0, 0, False, 8L, ansno, 'foo baz'),
                          self.session.sent_messages[1])
-        assert 0 in channel.ansnos.keys()
+        assert 0 in channel.ansnos
         channel.send_nul(0)
         self.assertEqual(('NUL', 0, 0, False, 16L, None, ''),
                          self.session.sent_messages[2])
-        assert 0 not in channel.ansnos.keys()
+        assert 0 not in channel.ansnos
+
+
+class ManagementProfileHandlerTestCase(unittest.TestCase):
+
+    def setUp(self):
+        self.session = MockSession()
+        self.channel = self.session.channels[0]
+        self.profile = self.channel.profile
+
+    def test_send_greeting(self):
+        """
+        Verify that the management profile sends a greeting reply when
+        initialized.
+        """
+        self.profile.handle_connect()
+        self.assertEqual(1, len(self.session.sent_messages))
+        xml = Element('greeting')
+        message = beep.MIMEMessage(xml, beep.BEEP_XML).as_string()
+        self.assertEqual(('RPY', 0, 0, False, 0, None, message),
+                         self.session.sent_messages[0])
+
+    def test_send_greeting_with_profile(self):
+        """
+        Verify that the management profile sends a greeting with a list of
+        supported profiles reply when initialized.
+        """
+        self.session.profiles['test'] = MockProfileHandler
+        self.profile.handle_connect()
+        self.assertEqual(1, len(self.session.sent_messages))
+        xml = Element('greeting')[Element('profile', uri='test')]
+        message = beep.MIMEMessage(xml, beep.BEEP_XML).as_string()
+        self.assertEqual(('RPY', 0, 0, False, 0, None, message),
+                         self.session.sent_messages[0])
+
+    def test_handle_greeting(self):
+        """
+        Verify that the management profile calls the greeting_received() method
+        of the initiator session.
+        """
+        def greeting_received(profiles):
+            greeting_received.called = True
+            self.assertEqual(['test'], profiles)
+        greeting_received.called = False
+        self.session.greeting_received = greeting_received
+        xml = Element('greeting')[Element('profile', uri='test')]
+        message = beep.MIMEMessage(xml, beep.BEEP_XML).as_string()
+        self.channel.handle_data_frame('RPY', 0, False, 0L, None, message)
+        assert greeting_received.called
+
+    def test_send_error(self):
+        """
+        Verify that a negative reply is sent as expected.
+        """
+        self.profile.send_error(0, 521, 'ouch')
+        xml = Element('error', code=521)['ouch']
+        message = beep.MIMEMessage(xml, beep.BEEP_XML).as_string()
+        self.assertEqual(('ERR', 0, 0, False, 0, None, message),
+                         self.session.sent_messages[0])
+
+    def test_send_start(self):
+        """
+        Verify that a <start> request is sent correctly.
+        """
+        self.profile.send_start([MockProfileHandler])
+        xml = Element('start', number="1")[
+            Element('profile', uri=MockProfileHandler.URI)
+        ]
+        message = beep.MIMEMessage(xml, beep.BEEP_XML).as_string()
+        self.assertEqual(('MSG', 0, 0, False, 0, None, message),
+                         self.session.sent_messages[0])
+
+    def test_send_start_ok(self):
+        """
+        Verify that a positive reply to a <start> request is handled correctly,
+        and the channel is created.
+        """
+        self.profile.send_start([MockProfileHandler])
+        xml = Element('profile', uri=MockProfileHandler.URI)
+        message = beep.MIMEMessage(xml, beep.BEEP_XML).as_string()
+        self.channel.handle_data_frame('RPY', 0, False, 0L, None, message)
+        assert isinstance(self.session.channels[1].profile, MockProfileHandler)
+
+    def test_send_start_error(self):
+        """
+        Verify that a negative reply to a <close> request is handled correctly,
+        and no channel gets created.
+        """
+        self.profile.send_start([MockProfileHandler])
+        xml = Element('error', code=500)['ouch']
+        message = beep.MIMEMessage(xml, beep.BEEP_XML).as_string()
+        self.channel.handle_data_frame('ERR', 0, False, 0L, None, message)
+        assert not 1 in self.session.channels
+
+    def test_send_start_ok_with_callback(self):
+        """
+        Verify that user-supplied callback for positive replies is invoked
+        when a <profile> reply is received in response to a <start> request.
+        """
+        def handle_ok(channelno, profile_uri):
+            self.assertEqual(1, channelno)
+            self.assertEqual(MockProfileHandler.URI, profile_uri)
+            handle_ok.called = True
+        handle_ok.called = False
+        def handle_error(code, text):
+            handle_error.called = True
+        handle_error.called = False
+        self.profile.send_start([MockProfileHandler], handle_ok=handle_ok,
+                                handle_error=handle_error)
+
+        xml = Element('profile', uri=MockProfileHandler.URI)
+        message = beep.MIMEMessage(xml, beep.BEEP_XML).as_string()
+        self.channel.handle_data_frame('RPY', 0, False, 0L, None, message)
+        assert isinstance(self.session.channels[1].profile, MockProfileHandler)
+        assert handle_ok.called
+        assert not handle_error.called
+
+    def test_send_start_error_with_callback(self):
+        """
+        Verify that user-supplied callback for negative replies is invoked
+        when an error is received in response to a <start> request.
+        """
+        def handle_ok(channelno, profile_uri):
+            handle_ok.called = True
+        handle_ok.called = False
+        def handle_error(code, text):
+            self.assertEqual(500, code)
+            self.assertEqual('ouch', text)
+            handle_error.called = True
+        handle_error.called = False
+        self.profile.send_start([MockProfileHandler], handle_ok=handle_ok,
+                                handle_error=handle_error)
+
+        xml = Element('error', code=500)['ouch']
+        message = beep.MIMEMessage(xml, beep.BEEP_XML).as_string()
+        self.channel.handle_data_frame('ERR', 0, False, 0L, None, message)
+        assert not 1 in self.session.channels
+        assert not handle_ok.called
+        assert handle_error.called
+
+    def test_send_close(self):
+        """
+        Verify that a <close> request is sent correctly.
+        """
+        self.profile.send_close(1, code=200)
+        xml = Element('close', number=1, code=200)
+        message = beep.MIMEMessage(xml, beep.BEEP_XML).as_string()
+        self.assertEqual(('MSG', 0, 0, False, 0, None, message),
+                         self.session.sent_messages[0])
+
+    def test_send_close_ok(self):
+        """
+        Verify that a positive reply to a <close> request is handled correctly,
+        and the channel is closed.
+        """
+        self.session.channels[1] = beep.Channel(self.session, 1,
+                                                MockProfileHandler)
+        self.profile.send_close(1, code=200)
+
+        xml = Element('ok')
+        message = beep.MIMEMessage(xml, beep.BEEP_XML).as_string()
+        self.channel.handle_data_frame('RPY', 0, False, 0L, None, message)
+        assert 1 not in self.session.channels
+
+    def test_send_close_error(self):
+        """
+        Verify that a negative reply to a <close> request is handled correctly,
+        and the channel stays open.
+        """
+        self.session.channels[1] = beep.Channel(self.session, 1,
+                                                MockProfileHandler)
+        self.profile.send_close(1, code=200)
+
+        xml = Element('error', code=500)['ouch']
+        message = beep.MIMEMessage(xml, beep.BEEP_XML).as_string()
+        self.channel.handle_data_frame('ERR', 0, False, 0L, None, message)
+        assert 1 in self.session.channels
+
+    def test_send_close_ok_with_callback(self):
+        """
+        Verify that user-supplied callback for positive replies is invoked
+        when an <ok> reply is received in response to a <close> request.
+        """
+        self.session.channels[1] = beep.Channel(self.session, 1,
+                                                MockProfileHandler)
+        def handle_ok():
+            handle_ok.called = True
+        handle_ok.called = False
+        def handle_error(code, text):
+            handle_error.called = True
+        handle_error.called = False
+        self.profile.send_close(1, code=200, handle_ok=handle_ok,
+                                handle_error=handle_error)
+
+        xml = Element('profile', uri=MockProfileHandler.URI)
+        message = beep.MIMEMessage(xml, beep.BEEP_XML).as_string()
+        self.channel.handle_data_frame('RPY', 0, False, 0L, None, message)
+        assert 1 not in self.session.channels
+        assert handle_ok.called
+        assert not handle_error.called
+
+    def test_send_close_error_with_callback(self):
+        """
+        Verify that user-supplied callback for negative replies is invoked
+        when an error is received in response to a <close> request.
+        """
+        self.session.channels[1] = beep.Channel(self.session, 1,
+                                                MockProfileHandler)
+        def handle_ok(channelno, profile_uri):
+            handle_ok.called = True
+        handle_ok.called = False
+        def handle_error(code, text):
+            self.assertEqual(500, code)
+            self.assertEqual('ouch', text)
+            handle_error.called = True
+        handle_error.called = False
+        self.profile.send_close(1, code=200, handle_ok=handle_ok,
+                                handle_error=handle_error)
+
+        xml = Element('error', code=500)['ouch']
+        message = beep.MIMEMessage(xml, beep.BEEP_XML).as_string()
+        self.channel.handle_data_frame('ERR', 0, False, 0L, None, message)
+        assert 1 in self.session.channels
+        assert not handle_ok.called
+        assert handle_error.called
 
 
 def suite():
-    return unittest.makeSuite(ChannelTestCase, 'test')
+    suite = unittest.TestSuite()
+    suite.addTest(unittest.makeSuite(ChannelTestCase, 'test'))
+    suite.addTest(unittest.makeSuite(ManagementProfileHandlerTestCase, 'test'))
+    return suite
 
 if __name__ == '__main__':
-    unittest.main()
+    logging.getLogger().setLevel(logging.CRITICAL)
+    unittest.main(defaultTest='suite')
Copyright (C) 2012-2017 Edgewall Software