# HG changeset patch # User cmlenz # Date 1119191013 0 # Node ID a8e509f4d33bbd266afaee4d8a1a707064a93262 # Parent be89881ee0a103ca9cad2373f10c843cc9b33a48 Added a couple of unit tests for the BEEP management profile. See #2. More to come. diff --git a/bitten/util/beep.py b/bitten/util/beep.py --- a/bitten/util/beep.py +++ b/bitten/util/beep.py @@ -338,10 +338,7 @@ self.seqno = [serial(), serial()] # incoming, outgoing sequence numbers self.mime_parser = Parser() - self.profile = profile_cls() - self.profile.session = self.session - self.profile.channel = self - + self.profile = profile_cls(self) self.profile.handle_connect() def close(self): @@ -379,11 +376,11 @@ return # Complete message received, so handle it - if msgno in self.inqueue.keys(): + if msgno in self.inqueue: # Recombine queued messages payload = ''.join(self.inqueue[msgno]) + payload del self.inqueue[msgno] - if cmd == 'RPY' and msgno in self.msgnos.keys(): + if cmd == 'RPY' and msgno in self.msgnos: # Final reply using this message number, so dealloc del self.msgnos[msgno] message = None @@ -393,7 +390,7 @@ if cmd == 'MSG': self.profile.handle_msg(msgno, message) else: - if msgno in self.reply_handlers.keys(): + if msgno in self.reply_handlers: self.reply_handlers[msgno](cmd, msgno, message) del self.reply_handlers[msgno] elif cmd == 'RPY': @@ -429,7 +426,7 @@ def send_msg(self, message, handle_reply=None): while True: # Find a unique message number msgno = self.msgno.next() - if msgno not in self.msgnos.keys(): + if msgno not in self.msgnos: break self.msgnos[msgno] = True # Flag the chosen message number as in use if handle_reply is not None: @@ -444,7 +441,7 @@ self._send('ERR', msgno, None, message) def send_ans(self, msgno, message): - if not msgno in self.ansnos.keys(): + if not msgno in self.ansnos: ansno = cycle_through(0, 2147483647) self.ansnos[msgno] = ansno else: @@ -465,10 +462,10 @@ and may override any of the others. """ - def __init__(self): + def __init__(self, channel): """Create the profile.""" - self.session = None - self.channel = None + self.session = channel.session + self.channel = channel def handle_connect(self): """Called when the channel this profile is associated with is @@ -511,7 +508,7 @@ if elem.tagname == 'start': for profile in elem['profile']: - if profile.uri in self.session.profiles.keys(): + if profile.uri in self.session.profiles: logging.debug('Start channel %s for profile <%s>', elem.number, profile.uri) channel = Channel(self.session, int(elem.number), diff --git a/bitten/util/tests/beep.py b/bitten/util/tests/beep.py --- a/bitten/util/tests/beep.py +++ b/bitten/util/tests/beep.py @@ -1,13 +1,19 @@ -from email.Message import Message +import logging import unittest from bitten.util import beep +from bitten.util.xmlio import Element -class MockSession(object): +class MockSession(beep.Initiator): def __init__(self): + self.profiles = {} self.sent_messages = [] + self.channelno = beep.cycle_through(1, 2147483647, step=2) + self.channels = {0: beep.Channel(self, 0, beep.ManagementProfileHandler)} + del self.sent_messages[0] # Clear out the management greeting + self.channels[0].seqno = [beep.serial(), beep.serial()] def send_data_frame(self, cmd, channel, msgno, more, seqno, ansno=None, payload=''): @@ -16,13 +22,17 @@ class MockProfileHandler(object): + URI = 'http://example.com/mock' - def __init__(self): + def __init__(self, channel): self.handled_messages = [] def handle_connect(self): pass + def handle_disconnect(self): + pass + def handle_msg(self, msgno, message): text = message.as_string().strip() self.handled_messages.append(('MSG', msgno, text)) @@ -78,7 +88,7 @@ msgno = channel.send_msg(beep.MIMEMessage('foo bar')) self.assertEqual(('MSG', 0, msgno, False, 0L, None, 'foo bar'), self.session.sent_messages[0]) - assert msgno in channel.msgnos.keys() + assert msgno in channel.msgnos def test_send_frames_seqno_incrementing(self): """ @@ -103,12 +113,12 @@ assert msgno == 0 self.assertEqual(('MSG', 0, msgno, False, 0L, None, 'foo bar'), self.session.sent_messages[0]) - assert msgno in channel.msgnos.keys() + assert msgno in channel.msgnos msgno = channel.send_msg(beep.MIMEMessage('foo baz')) assert msgno == 1 self.assertEqual(('MSG', 0, msgno, False, 8L, None, 'foo baz'), self.session.sent_messages[1]) - assert msgno in channel.msgnos.keys() + assert msgno in channel.msgnos def test_send_reply(self): """ @@ -128,11 +138,11 @@ msgno = channel.send_msg(beep.MIMEMessage('foo bar')) self.assertEqual(('MSG', 0, msgno, False, 0L, None, 'foo bar'), self.session.sent_messages[0]) - assert msgno in channel.msgnos.keys() + assert msgno in channel.msgnos channel.handle_data_frame('RPY', msgno, False, 0, None, '42') self.assertEqual(('RPY', msgno, '42'), channel.profile.handled_messages[0]) - assert msgno not in channel.msgnos.keys() + assert msgno not in channel.msgnos def test_send_error(self): """ @@ -152,20 +162,248 @@ assert ansno == 0 self.assertEqual(('ANS', 0, 0, False, 0L, ansno, 'foo bar'), self.session.sent_messages[0]) - assert 0 in channel.ansnos.keys() + assert 0 in channel.ansnos ansno = channel.send_ans(0, beep.MIMEMessage('foo baz')) assert ansno == 1 self.assertEqual(('ANS', 0, 0, False, 8L, ansno, 'foo baz'), self.session.sent_messages[1]) - assert 0 in channel.ansnos.keys() + assert 0 in channel.ansnos channel.send_nul(0) self.assertEqual(('NUL', 0, 0, False, 16L, None, ''), self.session.sent_messages[2]) - assert 0 not in channel.ansnos.keys() + assert 0 not in channel.ansnos + + +class ManagementProfileHandlerTestCase(unittest.TestCase): + + def setUp(self): + self.session = MockSession() + self.channel = self.session.channels[0] + self.profile = self.channel.profile + + def test_send_greeting(self): + """ + Verify that the management profile sends a greeting reply when + initialized. + """ + self.profile.handle_connect() + self.assertEqual(1, len(self.session.sent_messages)) + xml = Element('greeting') + message = beep.MIMEMessage(xml, beep.BEEP_XML).as_string() + self.assertEqual(('RPY', 0, 0, False, 0, None, message), + self.session.sent_messages[0]) + + def test_send_greeting_with_profile(self): + """ + Verify that the management profile sends a greeting with a list of + supported profiles reply when initialized. + """ + self.session.profiles['test'] = MockProfileHandler + self.profile.handle_connect() + self.assertEqual(1, len(self.session.sent_messages)) + xml = Element('greeting')[Element('profile', uri='test')] + message = beep.MIMEMessage(xml, beep.BEEP_XML).as_string() + self.assertEqual(('RPY', 0, 0, False, 0, None, message), + self.session.sent_messages[0]) + + def test_handle_greeting(self): + """ + Verify that the management profile calls the greeting_received() method + of the initiator session. + """ + def greeting_received(profiles): + greeting_received.called = True + self.assertEqual(['test'], profiles) + greeting_received.called = False + self.session.greeting_received = greeting_received + xml = Element('greeting')[Element('profile', uri='test')] + message = beep.MIMEMessage(xml, beep.BEEP_XML).as_string() + self.channel.handle_data_frame('RPY', 0, False, 0L, None, message) + assert greeting_received.called + + def test_send_error(self): + """ + Verify that a negative reply is sent as expected. + """ + self.profile.send_error(0, 521, 'ouch') + xml = Element('error', code=521)['ouch'] + message = beep.MIMEMessage(xml, beep.BEEP_XML).as_string() + self.assertEqual(('ERR', 0, 0, False, 0, None, message), + self.session.sent_messages[0]) + + def test_send_start(self): + """ + Verify that a request is sent correctly. + """ + self.profile.send_start([MockProfileHandler]) + xml = Element('start', number="1")[ + Element('profile', uri=MockProfileHandler.URI) + ] + message = beep.MIMEMessage(xml, beep.BEEP_XML).as_string() + self.assertEqual(('MSG', 0, 0, False, 0, None, message), + self.session.sent_messages[0]) + + def test_send_start_ok(self): + """ + Verify that a positive reply to a request is handled correctly, + and the channel is created. + """ + self.profile.send_start([MockProfileHandler]) + xml = Element('profile', uri=MockProfileHandler.URI) + message = beep.MIMEMessage(xml, beep.BEEP_XML).as_string() + self.channel.handle_data_frame('RPY', 0, False, 0L, None, message) + assert isinstance(self.session.channels[1].profile, MockProfileHandler) + + def test_send_start_error(self): + """ + Verify that a negative reply to a request is handled correctly, + and no channel gets created. + """ + self.profile.send_start([MockProfileHandler]) + xml = Element('error', code=500)['ouch'] + message = beep.MIMEMessage(xml, beep.BEEP_XML).as_string() + self.channel.handle_data_frame('ERR', 0, False, 0L, None, message) + assert not 1 in self.session.channels + + def test_send_start_ok_with_callback(self): + """ + Verify that user-supplied callback for positive replies is invoked + when a reply is received in response to a request. + """ + def handle_ok(channelno, profile_uri): + self.assertEqual(1, channelno) + self.assertEqual(MockProfileHandler.URI, profile_uri) + handle_ok.called = True + handle_ok.called = False + def handle_error(code, text): + handle_error.called = True + handle_error.called = False + self.profile.send_start([MockProfileHandler], handle_ok=handle_ok, + handle_error=handle_error) + + xml = Element('profile', uri=MockProfileHandler.URI) + message = beep.MIMEMessage(xml, beep.BEEP_XML).as_string() + self.channel.handle_data_frame('RPY', 0, False, 0L, None, message) + assert isinstance(self.session.channels[1].profile, MockProfileHandler) + assert handle_ok.called + assert not handle_error.called + + def test_send_start_error_with_callback(self): + """ + Verify that user-supplied callback for negative replies is invoked + when an error is received in response to a request. + """ + def handle_ok(channelno, profile_uri): + handle_ok.called = True + handle_ok.called = False + def handle_error(code, text): + self.assertEqual(500, code) + self.assertEqual('ouch', text) + handle_error.called = True + handle_error.called = False + self.profile.send_start([MockProfileHandler], handle_ok=handle_ok, + handle_error=handle_error) + + xml = Element('error', code=500)['ouch'] + message = beep.MIMEMessage(xml, beep.BEEP_XML).as_string() + self.channel.handle_data_frame('ERR', 0, False, 0L, None, message) + assert not 1 in self.session.channels + assert not handle_ok.called + assert handle_error.called + + def test_send_close(self): + """ + Verify that a request is sent correctly. + """ + self.profile.send_close(1, code=200) + xml = Element('close', number=1, code=200) + message = beep.MIMEMessage(xml, beep.BEEP_XML).as_string() + self.assertEqual(('MSG', 0, 0, False, 0, None, message), + self.session.sent_messages[0]) + + def test_send_close_ok(self): + """ + Verify that a positive reply to a request is handled correctly, + and the channel is closed. + """ + self.session.channels[1] = beep.Channel(self.session, 1, + MockProfileHandler) + self.profile.send_close(1, code=200) + + xml = Element('ok') + message = beep.MIMEMessage(xml, beep.BEEP_XML).as_string() + self.channel.handle_data_frame('RPY', 0, False, 0L, None, message) + assert 1 not in self.session.channels + + def test_send_close_error(self): + """ + Verify that a negative reply to a request is handled correctly, + and the channel stays open. + """ + self.session.channels[1] = beep.Channel(self.session, 1, + MockProfileHandler) + self.profile.send_close(1, code=200) + + xml = Element('error', code=500)['ouch'] + message = beep.MIMEMessage(xml, beep.BEEP_XML).as_string() + self.channel.handle_data_frame('ERR', 0, False, 0L, None, message) + assert 1 in self.session.channels + + def test_send_close_ok_with_callback(self): + """ + Verify that user-supplied callback for positive replies is invoked + when an reply is received in response to a request. + """ + self.session.channels[1] = beep.Channel(self.session, 1, + MockProfileHandler) + def handle_ok(): + handle_ok.called = True + handle_ok.called = False + def handle_error(code, text): + handle_error.called = True + handle_error.called = False + self.profile.send_close(1, code=200, handle_ok=handle_ok, + handle_error=handle_error) + + xml = Element('profile', uri=MockProfileHandler.URI) + message = beep.MIMEMessage(xml, beep.BEEP_XML).as_string() + self.channel.handle_data_frame('RPY', 0, False, 0L, None, message) + assert 1 not in self.session.channels + assert handle_ok.called + assert not handle_error.called + + def test_send_close_error_with_callback(self): + """ + Verify that user-supplied callback for negative replies is invoked + when an error is received in response to a request. + """ + self.session.channels[1] = beep.Channel(self.session, 1, + MockProfileHandler) + def handle_ok(channelno, profile_uri): + handle_ok.called = True + handle_ok.called = False + def handle_error(code, text): + self.assertEqual(500, code) + self.assertEqual('ouch', text) + handle_error.called = True + handle_error.called = False + self.profile.send_close(1, code=200, handle_ok=handle_ok, + handle_error=handle_error) + + xml = Element('error', code=500)['ouch'] + message = beep.MIMEMessage(xml, beep.BEEP_XML).as_string() + self.channel.handle_data_frame('ERR', 0, False, 0L, None, message) + assert 1 in self.session.channels + assert not handle_ok.called + assert handle_error.called def suite(): - return unittest.makeSuite(ChannelTestCase, 'test') + suite = unittest.TestSuite() + suite.addTest(unittest.makeSuite(ChannelTestCase, 'test')) + suite.addTest(unittest.makeSuite(ManagementProfileHandlerTestCase, 'test')) + return suite if __name__ == '__main__': - unittest.main() + logging.getLogger().setLevel(logging.CRITICAL) + unittest.main(defaultTest='suite')