changeset 28:1e562dd56ec0

More management profile unit tests. Closes #2.
author cmlenz
date Sun, 19 Jun 2005 15:36:51 +0000
parents a8e509f4d33b
children 2adc3480e4aa
files bitten/util/beep.py bitten/util/tests/beep.py
diffstat 2 files changed, 135 insertions(+), 8 deletions(-) [+]
line wrap: on
line diff
--- a/bitten/util/beep.py
+++ b/bitten/util/beep.py
@@ -507,19 +507,23 @@
         elem = parse_xml(message.get_payload())
 
         if elem.tagname == 'start':
+            channelno = int(elem.number)
+            if channelno in self.session.channels:
+                self.send_error(msgno, 550, 'Channel already in use')
+                return
             for profile in elem['profile']:
                 if profile.uri in self.session.profiles:
                     logging.debug('Start channel %s for profile <%s>',
                                   elem.number, profile.uri)
-                    channel = Channel(self.session, int(elem.number),
+                    channel = Channel(self.session, channelno,
                                       self.session.profiles[profile.uri])
-                    self.session.channels[int(elem.number)] = channel
+                    self.session.channels[channelno] = channel
                     message = MIMEMessage(Element('profile', uri=profile.uri),
                                           BEEP_XML)
                     self.channel.send_rpy(msgno, message)
                     return
             self.send_error(msgno, 550,
-                            'All requested profiles are unsupported')
+                            'None of the requested profiles is supported')
 
         elif elem.tagname == 'close':
             channelno = int(elem.number)
--- a/bitten/util/tests/beep.py
+++ b/bitten/util/tests/beep.py
@@ -8,6 +8,7 @@
 class MockSession(beep.Initiator):
 
     def __init__(self):
+        self.closed = False
         self.profiles = {}
         self.sent_messages = []
         self.channelno = beep.cycle_through(1, 2147483647, step=2)
@@ -15,8 +16,12 @@
         del self.sent_messages[0] # Clear out the management greeting
         self.channels[0].seqno = [beep.serial(), beep.serial()]
 
+    def close(self):
+        self.closed = True
+
     def send_data_frame(self, cmd, channel, msgno, more, seqno, ansno=None,
                         payload=''):
+        assert not self.closed
         self.sent_messages.append((cmd, channel, msgno, more, seqno, ansno,
                                    payload.strip()))
 
@@ -198,10 +203,12 @@
         Verify that the management profile sends a greeting with a list of
         supported profiles reply when initialized.
         """
-        self.session.profiles['test'] = MockProfileHandler
+        self.session.profiles[MockProfileHandler.URI] = MockProfileHandler
         self.profile.handle_connect()
         self.assertEqual(1, len(self.session.sent_messages))
-        xml = Element('greeting')[Element('profile', uri='test')]
+        xml = Element('greeting')[
+            Element('profile', uri=MockProfileHandler.URI)
+        ]
         message = beep.MIMEMessage(xml, beep.BEEP_XML).as_string()
         self.assertEqual(('RPY', 0, 0, False, 0, None, message),
                          self.session.sent_messages[0])
@@ -221,6 +228,110 @@
         self.channel.handle_data_frame('RPY', 0, False, 0L, None, message)
         assert greeting_received.called
 
+    def test_handle_start(self):
+        self.session.profiles[MockProfileHandler.URI] = MockProfileHandler
+        xml = Element('start', number=2)[
+            Element('profile', uri=MockProfileHandler.URI),
+            Element('profile', uri='http://example.com/bogus')
+        ]
+        self.profile.handle_msg(0, beep.MIMEMessage(xml, beep.BEEP_XML))
+
+        assert 2 in self.session.channels
+        xml = Element('profile', uri=MockProfileHandler.URI)
+        message = beep.MIMEMessage(xml, beep.BEEP_XML).as_string()
+        self.assertEqual(('RPY', 0, 0, False, 0, None, message),
+                         self.session.sent_messages[0])
+
+    def test_handle_start_unsupported_profile(self):
+        self.session.profiles[MockProfileHandler.URI] = MockProfileHandler
+        xml = Element('start', number=2)[
+            Element('profile', uri='http://example.com/foo'),
+            Element('profile', uri='http://example.com/bar')
+        ]
+        self.profile.handle_msg(0, beep.MIMEMessage(xml, beep.BEEP_XML))
+
+        assert 2 not in self.session.channels
+        xml = Element('error', code=550)[
+            'None of the requested profiles is supported'
+        ]
+        message = beep.MIMEMessage(xml, beep.BEEP_XML).as_string()
+        self.assertEqual(('ERR', 0, 0, False, 0, None, message),
+                         self.session.sent_messages[0])
+
+    def test_handle_start_channel_in_use(self):
+        self.session.channels[2] = beep.Channel(self.session, 2,
+                                                MockProfileHandler)
+        orig_profile = self.session.channels[2].profile
+        self.session.profiles[MockProfileHandler.URI] = MockProfileHandler
+        xml = Element('start', number=2)[
+            Element('profile', uri=MockProfileHandler.URI)
+        ]
+        self.profile.handle_msg(0, beep.MIMEMessage(xml, beep.BEEP_XML))
+
+        assert self.session.channels[2].profile is orig_profile
+        xml = Element('error', code=550)['Channel already in use']
+        message = beep.MIMEMessage(xml, beep.BEEP_XML).as_string()
+        self.assertEqual(('ERR', 0, 0, False, 0, None, message),
+                         self.session.sent_messages[0])
+
+    def test_handle_close(self):
+        self.session.channels[1] = beep.Channel(self.session, 1,
+                                                MockProfileHandler)
+        xml = Element('close', number=1, code=200)
+        self.profile.handle_msg(0, beep.MIMEMessage(xml, beep.BEEP_XML))
+
+        assert 1 not in self.session.channels
+        xml = Element('ok')
+        message = beep.MIMEMessage(xml, beep.BEEP_XML).as_string()
+        self.assertEqual(('RPY', 0, 0, False, 0, None, message),
+                         self.session.sent_messages[0])
+
+    def test_handle_close_session(self):
+        xml = Element('close', number=0, code=200)
+        self.profile.handle_msg(0, beep.MIMEMessage(xml, beep.BEEP_XML))
+
+        assert 1 not in self.session.channels
+        xml = Element('ok')
+        message = beep.MIMEMessage(xml, beep.BEEP_XML).as_string()
+        self.assertEqual(('RPY', 0, 0, False, 0, None, message),
+                         self.session.sent_messages[0])
+        assert self.session.closed
+
+    def test_handle_close_channel_not_open(self):
+        xml = Element('close', number=1, code=200)
+        self.profile.handle_msg(0, beep.MIMEMessage(xml, beep.BEEP_XML))
+
+        xml = Element('error', code=550)['Channel not open']
+        message = beep.MIMEMessage(xml, beep.BEEP_XML).as_string()
+        self.assertEqual(('ERR', 0, 0, False, 0, None, message),
+                         self.session.sent_messages[0])
+
+    def test_handle_close_channel_busy(self):
+        self.session.channels[1] = beep.Channel(self.session, 1,
+                                                MockProfileHandler)
+        self.session.channels[1].send_msg(beep.MIMEMessage('test'))
+        assert self.session.channels[1].msgnos
+
+        xml = Element('close', number=1, code=200)
+        self.profile.handle_msg(0, beep.MIMEMessage(xml, beep.BEEP_XML))
+
+        xml = Element('error', code=550)['Channel waiting for replies']
+        message = beep.MIMEMessage(xml, beep.BEEP_XML).as_string()
+        self.assertEqual(('ERR', 0, 0, False, 0, None, message),
+                         self.session.sent_messages[1])
+
+    def test_handle_close_session_busy(self):
+        self.session.channels[1] = beep.Channel(self.session, 1,
+                                                MockProfileHandler)
+
+        xml = Element('close', number=0, code=200)
+        self.profile.handle_msg(0, beep.MIMEMessage(xml, beep.BEEP_XML))
+
+        xml = Element('error', code=550)['Other channels still open']
+        message = beep.MIMEMessage(xml, beep.BEEP_XML).as_string()
+        self.assertEqual(('ERR', 0, 0, False, 0, None, message),
+                         self.session.sent_messages[0])
+
     def test_send_error(self):
         """
         Verify that a negative reply is sent as expected.
@@ -263,7 +374,7 @@
         xml = Element('error', code=500)['ouch']
         message = beep.MIMEMessage(xml, beep.BEEP_XML).as_string()
         self.channel.handle_data_frame('ERR', 0, False, 0L, None, message)
-        assert not 1 in self.session.channels
+        assert 1 not in self.session.channels
 
     def test_send_start_ok_with_callback(self):
         """
@@ -307,7 +418,7 @@
         xml = Element('error', code=500)['ouch']
         message = beep.MIMEMessage(xml, beep.BEEP_XML).as_string()
         self.channel.handle_data_frame('ERR', 0, False, 0L, None, message)
-        assert not 1 in self.session.channels
+        assert 1 not in self.session.channels
         assert not handle_ok.called
         assert handle_error.called
 
@@ -335,6 +446,19 @@
         self.channel.handle_data_frame('RPY', 0, False, 0L, None, message)
         assert 1 not in self.session.channels
 
+    def test_send_close_session_ok(self):
+        """
+        Verify that a positive reply to a <close> request is handled correctly,
+        and the channel is closed.
+        """
+        self.profile.send_close(0, code=200)
+
+        xml = Element('ok')
+        message = beep.MIMEMessage(xml, beep.BEEP_XML).as_string()
+        self.channel.handle_data_frame('RPY', 0, False, 0L, None, message)
+        assert 0 not in self.session.channels
+        assert self.session.closed
+
     def test_send_close_error(self):
         """
         Verify that a negative reply to a <close> request is handled correctly,
@@ -405,5 +529,4 @@
     return suite
 
 if __name__ == '__main__':
-    logging.getLogger().setLevel(logging.CRITICAL)
     unittest.main(defaultTest='suite')
Copyright (C) 2012-2017 Edgewall Software