# HG changeset patch # User cmlenz # Date 1119195411 0 # Node ID 1e562dd56ec06a2b02eaaa160bb0cd171404b9cb # Parent a8e509f4d33bbd266afaee4d8a1a707064a93262 More management profile unit tests. Closes #2. diff --git a/bitten/util/beep.py b/bitten/util/beep.py --- a/bitten/util/beep.py +++ b/bitten/util/beep.py @@ -507,19 +507,23 @@ elem = parse_xml(message.get_payload()) if elem.tagname == 'start': + channelno = int(elem.number) + if channelno in self.session.channels: + self.send_error(msgno, 550, 'Channel already in use') + return for profile in elem['profile']: if profile.uri in self.session.profiles: logging.debug('Start channel %s for profile <%s>', elem.number, profile.uri) - channel = Channel(self.session, int(elem.number), + channel = Channel(self.session, channelno, self.session.profiles[profile.uri]) - self.session.channels[int(elem.number)] = channel + self.session.channels[channelno] = channel message = MIMEMessage(Element('profile', uri=profile.uri), BEEP_XML) self.channel.send_rpy(msgno, message) return self.send_error(msgno, 550, - 'All requested profiles are unsupported') + 'None of the requested profiles is supported') elif elem.tagname == 'close': channelno = int(elem.number) diff --git a/bitten/util/tests/beep.py b/bitten/util/tests/beep.py --- a/bitten/util/tests/beep.py +++ b/bitten/util/tests/beep.py @@ -8,6 +8,7 @@ class MockSession(beep.Initiator): def __init__(self): + self.closed = False self.profiles = {} self.sent_messages = [] self.channelno = beep.cycle_through(1, 2147483647, step=2) @@ -15,8 +16,12 @@ del self.sent_messages[0] # Clear out the management greeting self.channels[0].seqno = [beep.serial(), beep.serial()] + def close(self): + self.closed = True + def send_data_frame(self, cmd, channel, msgno, more, seqno, ansno=None, payload=''): + assert not self.closed self.sent_messages.append((cmd, channel, msgno, more, seqno, ansno, payload.strip())) @@ -198,10 +203,12 @@ Verify that the management profile sends a greeting with a list of supported profiles reply when initialized. """ - self.session.profiles['test'] = MockProfileHandler + self.session.profiles[MockProfileHandler.URI] = MockProfileHandler self.profile.handle_connect() self.assertEqual(1, len(self.session.sent_messages)) - xml = Element('greeting')[Element('profile', uri='test')] + xml = Element('greeting')[ + Element('profile', uri=MockProfileHandler.URI) + ] message = beep.MIMEMessage(xml, beep.BEEP_XML).as_string() self.assertEqual(('RPY', 0, 0, False, 0, None, message), self.session.sent_messages[0]) @@ -221,6 +228,110 @@ self.channel.handle_data_frame('RPY', 0, False, 0L, None, message) assert greeting_received.called + def test_handle_start(self): + self.session.profiles[MockProfileHandler.URI] = MockProfileHandler + xml = Element('start', number=2)[ + Element('profile', uri=MockProfileHandler.URI), + Element('profile', uri='http://example.com/bogus') + ] + self.profile.handle_msg(0, beep.MIMEMessage(xml, beep.BEEP_XML)) + + assert 2 in self.session.channels + xml = Element('profile', uri=MockProfileHandler.URI) + message = beep.MIMEMessage(xml, beep.BEEP_XML).as_string() + self.assertEqual(('RPY', 0, 0, False, 0, None, message), + self.session.sent_messages[0]) + + def test_handle_start_unsupported_profile(self): + self.session.profiles[MockProfileHandler.URI] = MockProfileHandler + xml = Element('start', number=2)[ + Element('profile', uri='http://example.com/foo'), + Element('profile', uri='http://example.com/bar') + ] + self.profile.handle_msg(0, beep.MIMEMessage(xml, beep.BEEP_XML)) + + assert 2 not in self.session.channels + xml = Element('error', code=550)[ + 'None of the requested profiles is supported' + ] + message = beep.MIMEMessage(xml, beep.BEEP_XML).as_string() + self.assertEqual(('ERR', 0, 0, False, 0, None, message), + self.session.sent_messages[0]) + + def test_handle_start_channel_in_use(self): + self.session.channels[2] = beep.Channel(self.session, 2, + MockProfileHandler) + orig_profile = self.session.channels[2].profile + self.session.profiles[MockProfileHandler.URI] = MockProfileHandler + xml = Element('start', number=2)[ + Element('profile', uri=MockProfileHandler.URI) + ] + self.profile.handle_msg(0, beep.MIMEMessage(xml, beep.BEEP_XML)) + + assert self.session.channels[2].profile is orig_profile + xml = Element('error', code=550)['Channel already in use'] + message = beep.MIMEMessage(xml, beep.BEEP_XML).as_string() + self.assertEqual(('ERR', 0, 0, False, 0, None, message), + self.session.sent_messages[0]) + + def test_handle_close(self): + self.session.channels[1] = beep.Channel(self.session, 1, + MockProfileHandler) + xml = Element('close', number=1, code=200) + self.profile.handle_msg(0, beep.MIMEMessage(xml, beep.BEEP_XML)) + + assert 1 not in self.session.channels + xml = Element('ok') + message = beep.MIMEMessage(xml, beep.BEEP_XML).as_string() + self.assertEqual(('RPY', 0, 0, False, 0, None, message), + self.session.sent_messages[0]) + + def test_handle_close_session(self): + xml = Element('close', number=0, code=200) + self.profile.handle_msg(0, beep.MIMEMessage(xml, beep.BEEP_XML)) + + assert 1 not in self.session.channels + xml = Element('ok') + message = beep.MIMEMessage(xml, beep.BEEP_XML).as_string() + self.assertEqual(('RPY', 0, 0, False, 0, None, message), + self.session.sent_messages[0]) + assert self.session.closed + + def test_handle_close_channel_not_open(self): + xml = Element('close', number=1, code=200) + self.profile.handle_msg(0, beep.MIMEMessage(xml, beep.BEEP_XML)) + + xml = Element('error', code=550)['Channel not open'] + message = beep.MIMEMessage(xml, beep.BEEP_XML).as_string() + self.assertEqual(('ERR', 0, 0, False, 0, None, message), + self.session.sent_messages[0]) + + def test_handle_close_channel_busy(self): + self.session.channels[1] = beep.Channel(self.session, 1, + MockProfileHandler) + self.session.channels[1].send_msg(beep.MIMEMessage('test')) + assert self.session.channels[1].msgnos + + xml = Element('close', number=1, code=200) + self.profile.handle_msg(0, beep.MIMEMessage(xml, beep.BEEP_XML)) + + xml = Element('error', code=550)['Channel waiting for replies'] + message = beep.MIMEMessage(xml, beep.BEEP_XML).as_string() + self.assertEqual(('ERR', 0, 0, False, 0, None, message), + self.session.sent_messages[1]) + + def test_handle_close_session_busy(self): + self.session.channels[1] = beep.Channel(self.session, 1, + MockProfileHandler) + + xml = Element('close', number=0, code=200) + self.profile.handle_msg(0, beep.MIMEMessage(xml, beep.BEEP_XML)) + + xml = Element('error', code=550)['Other channels still open'] + message = beep.MIMEMessage(xml, beep.BEEP_XML).as_string() + self.assertEqual(('ERR', 0, 0, False, 0, None, message), + self.session.sent_messages[0]) + def test_send_error(self): """ Verify that a negative reply is sent as expected. @@ -263,7 +374,7 @@ xml = Element('error', code=500)['ouch'] message = beep.MIMEMessage(xml, beep.BEEP_XML).as_string() self.channel.handle_data_frame('ERR', 0, False, 0L, None, message) - assert not 1 in self.session.channels + assert 1 not in self.session.channels def test_send_start_ok_with_callback(self): """ @@ -307,7 +418,7 @@ xml = Element('error', code=500)['ouch'] message = beep.MIMEMessage(xml, beep.BEEP_XML).as_string() self.channel.handle_data_frame('ERR', 0, False, 0L, None, message) - assert not 1 in self.session.channels + assert 1 not in self.session.channels assert not handle_ok.called assert handle_error.called @@ -335,6 +446,19 @@ self.channel.handle_data_frame('RPY', 0, False, 0L, None, message) assert 1 not in self.session.channels + def test_send_close_session_ok(self): + """ + Verify that a positive reply to a request is handled correctly, + and the channel is closed. + """ + self.profile.send_close(0, code=200) + + xml = Element('ok') + message = beep.MIMEMessage(xml, beep.BEEP_XML).as_string() + self.channel.handle_data_frame('RPY', 0, False, 0L, None, message) + assert 0 not in self.session.channels + assert self.session.closed + def test_send_close_error(self): """ Verify that a negative reply to a request is handled correctly, @@ -405,5 +529,4 @@ return suite if __name__ == '__main__': - logging.getLogger().setLevel(logging.CRITICAL) unittest.main(defaultTest='suite')