Mercurial > bitten > bitten-test
view bitten/util/tests/beep.py @ 163:634be6cbb808
Flip the switch: Bitten is now BSD-licensed.
author | cmlenz |
---|---|
date | Sat, 27 Aug 2005 07:58:12 +0000 |
parents | 8b613e24cc34 |
children | 3ab0004a6fd9 |
line wrap: on
line source
# -*- coding: iso8859-1 -*- # # Copyright (C) 2005 Christopher Lenz <cmlenz@gmx.de> # All rights reserved. # # This software is licensed as described in the file COPYING, which # you should have received as part of this distribution. The terms # are also available at http://bitten.cmlenz.net/wiki/License. import logging import unittest from bitten.util import beep, xmlio class MockSession(beep.Initiator): def __init__(self): self.closed = False self.profiles = {} self.sent_messages = [] self.channelno = beep.cycle_through(1, 2147483647, step=2) self.channels = {0: beep.Channel(self, 0, beep.ManagementProfileHandler)} del self.sent_messages[0] # Clear out the management greeting self.channels[0].seqno = [beep.SerialNumber(), beep.SerialNumber()] def close(self): self.closed = True def push_with_producer(self, producer): assert not self.closed while True: frame = producer.more() if not frame: break header, rest = frame.split('\r\n', 1) header = header.split(' ') cmd = header[0].upper() channel = int(header[1]) msgno = int(header[2]) more = header[3] == '*' seqno = int(header[4]) size = int(header[5]) ansno = None if cmd == 'ANS': ansno = int(header[6]) if size == 0: payload = '' else: payload = rest[:size] rest = rest[size:] self.sent_messages.append((cmd, channel, msgno, more, seqno, ansno, payload.strip())) assert rest == '\r\nEND\r\n' class MockProfileHandler(object): URI = 'http://example.com/mock' def __init__(self, channel): self.handled_messages = [] self.init_elem = None def handle_connect(self, init_elem=None): self.init_elem = init_elem def handle_disconnect(self): pass def handle_msg(self, msgno, message): text = message.read().strip() self.handled_messages.append(('MSG', msgno, text, None)) def handle_rpy(self, msgno, message): text = message.read().strip() self.handled_messages.append(('RPY', msgno, text, None)) def handle_err(self, msgno, message): text = message.read().strip() self.handled_messages.append(('ERR', msgno, text, None)) def handle_ans(self, msgno, ansno, message): text = message.read().strip() self.handled_messages.append(('ANS', msgno, text, ansno)) def handle_nul(self, msgno): self.handled_messages.append(('NUL', msgno, '', None)) class SessionTestCase(unittest.TestCase): """Unit tests for the `beep.Session` class.""" def setUp(self): self.session = beep.Session() def test_malformed_frame_invalid_keyword(self): self.session.collect_incoming_data('XYZ 0 0 . 0 0') self.session.found_terminator() self.assertRaises(beep.TerminateSession, self.session.found_terminator) def test_malformed_frame_invalid_parameter1(self): self.session.collect_incoming_data('MSG x y . z 0') self.session.found_terminator() self.assertRaises(beep.TerminateSession, self.session.found_terminator) def test_malformed_frame_invalid_parameter2(self): self.session.collect_incoming_data('MSG 0 0 + 0 0') self.session.found_terminator() self.assertRaises(beep.TerminateSession, self.session.found_terminator) def test_malformed_frame_missing_size_param(self): self.session.collect_incoming_data('MSG 0 0 . 0') self.session.found_terminator() self.assertRaises(beep.TerminateSession, self.session.found_terminator) def test_malformed_frame_missing_ansno_param(self): self.session.collect_incoming_data('ANS 0 0 . 0 0') self.session.found_terminator() self.assertRaises(beep.TerminateSession, self.session.found_terminator) class ChannelTestCase(unittest.TestCase): def setUp(self): self.session = MockSession() def test_handle_single_msg_frame(self): """ Verify that the channel correctly passes a single frame MSG to the profile. """ channel = beep.Channel(self.session, 0, MockProfileHandler) channel.handle_data_frame('MSG', 0, False, 0, None, 'foo bar') self.assertEqual(('MSG', 0, 'foo bar', None), channel.profile.handled_messages[0]) def test_handle_segmented_msg_frames(self): """ Verify that the channel combines two segmented messages and passed the recombined message to the profile. """ channel = beep.Channel(self.session, 0, MockProfileHandler) channel.handle_data_frame('MSG', 0, True, 0, None, 'foo ') channel.handle_data_frame('MSG', 0, False, 4, None, 'bar') self.assertEqual(('MSG', 0, 'foo bar', None), channel.profile.handled_messages[0]) def test_handle_out_of_sync_frame(self): """ Verify that the channel detects out-of-sync frames and bails. """ channel = beep.Channel(self.session, 0, MockProfileHandler) channel.handle_data_frame('MSG', 0, False, 0L, None, 'foo bar') # The next sequence number should be 8; send 12 instead self.assertRaises(beep.TerminateSession, channel.handle_data_frame, 'MSG', 0, False, 12L, None, 'foo baz') def test_send_single_frame_message(self): """ Verify that the channel passes a sent message up to the session for transmission with the correct parameters. Also assert that the corresponding message number (0) is reserved. """ channel = beep.Channel(self.session, 0, MockProfileHandler) msgno = channel.send_msg(beep.Payload('foo bar', None)) self.assertEqual(('MSG', 0, msgno, False, 0L, None, 'foo bar'), self.session.sent_messages[0]) assert msgno in channel.msgnos def test_send_frames_seqno_incrementing(self): """ Verify that the sequence numbers of outgoing frames are incremented as expected. """ channel = beep.Channel(self.session, 0, MockProfileHandler) channel.send_msg(beep.Payload('foo bar', None)) channel.send_rpy(0, beep.Payload('nil', None)) self.assertEqual(('MSG', 0, 0, False, 0L, None, 'foo bar'), self.session.sent_messages[0]) self.assertEqual(('RPY', 0, 0, False, 8L, None, 'nil'), self.session.sent_messages[1]) def test_send_message_msgno_incrementing(self): """ Verify that the message number is incremented for subsequent outgoing messages. """ channel = beep.Channel(self.session, 0, MockProfileHandler) msgno = channel.send_msg(beep.Payload('foo bar', None)) assert msgno == 0 self.assertEqual(('MSG', 0, msgno, False, 0L, None, 'foo bar'), self.session.sent_messages[0]) assert msgno in channel.msgnos msgno = channel.send_msg(beep.Payload('foo baz', None)) assert msgno == 1 self.assertEqual(('MSG', 0, msgno, False, 8L, None, 'foo baz'), self.session.sent_messages[1]) assert msgno in channel.msgnos def test_send_reply(self): """ Verify that sending an ANS message is processed correctly. """ channel = beep.Channel(self.session, 0, MockProfileHandler) channel.send_rpy(0, beep.Payload('foo bar', None)) self.assertEqual(('RPY', 0, 0, False, 0L, None, 'foo bar'), self.session.sent_messages[0]) def test_message_and_reply(self): """ Verify that a message number is deallocated after a final "RPY" reply has been received. """ channel = beep.Channel(self.session, 0, MockProfileHandler) msgno = channel.send_msg(beep.Payload('foo bar', None)) self.assertEqual(('MSG', 0, msgno, False, 0L, None, 'foo bar'), self.session.sent_messages[0]) assert msgno in channel.msgnos channel.handle_data_frame('RPY', msgno, False, 0, None, '42') self.assertEqual(('RPY', msgno, '42', None), channel.profile.handled_messages[0]) assert msgno not in channel.msgnos def test_message_and_error(self): """ Verify that a message number is deallocated after a final "ERR" reply has been received. """ channel = beep.Channel(self.session, 0, MockProfileHandler) msgno = channel.send_msg(beep.Payload('foo bar', None)) self.assertEqual(('MSG', 0, msgno, False, 0L, None, 'foo bar'), self.session.sent_messages[0]) assert msgno in channel.msgnos channel.handle_data_frame('ERR', msgno, False, 0, None, '42') self.assertEqual(('ERR', msgno, '42', None), channel.profile.handled_messages[0]) assert msgno not in channel.msgnos def test_message_and_ans_nul(self): """ Verify that a message number is deallocated after a final "NUL" reply has been received. """ channel = beep.Channel(self.session, 0, MockProfileHandler) msgno = channel.send_msg(beep.Payload('foo bar', None)) self.assertEqual(('MSG', 0, msgno, False, 0L, None, 'foo bar'), self.session.sent_messages[0]) assert msgno in channel.msgnos channel.handle_data_frame('ANS', msgno, False, 0, 0, '42') self.assertEqual(('ANS', msgno, '42', 0), channel.profile.handled_messages[0]) assert msgno in channel.msgnos channel.handle_data_frame('NUL', msgno, False, 2, None, '42') self.assertEqual(('NUL', msgno, '', None), channel.profile.handled_messages[1]) assert msgno not in channel.msgnos def test_send_error(self): """ Verify that sending an ERR message is processed correctly. """ channel = beep.Channel(self.session, 0, MockProfileHandler) channel.send_err(0, beep.Payload('oops', None)) self.assertEqual(('ERR', 0, 0, False, 0L, None, 'oops'), self.session.sent_messages[0]) def test_send_answers(self): """ Verify that sending an ANS message is processed correctly. """ channel = beep.Channel(self.session, 0, MockProfileHandler) ansno = channel.send_ans(0, beep.Payload('foo bar', None)) assert ansno == 0 self.assertEqual(('ANS', 0, 0, False, 0L, ansno, 'foo bar'), self.session.sent_messages[0]) assert 0 in channel.ansnos ansno = channel.send_ans(0, beep.Payload('foo baz', None)) assert ansno == 1 self.assertEqual(('ANS', 0, 0, False, 8L, ansno, 'foo baz'), self.session.sent_messages[1]) assert 0 in channel.ansnos channel.send_nul(0) self.assertEqual(('NUL', 0, 0, False, 16L, None, ''), self.session.sent_messages[2]) assert 0 not in channel.ansnos class ManagementProfileHandlerTestCase(unittest.TestCase): def setUp(self): self.session = MockSession() self.channel = self.session.channels[0] self.profile = self.channel.profile def test_send_greeting(self): """ Verify that the management profile sends a greeting reply when initialized. """ self.profile.handle_connect() self.assertEqual(1, len(self.session.sent_messages)) xml = xmlio.Element('greeting') message = beep.Payload(xml).read() self.assertEqual(('RPY', 0, 0, False, 0, None, message), self.session.sent_messages[0]) def test_send_greeting_with_profile(self): """ Verify that the management profile sends a greeting with a list of supported profiles reply when initialized. """ self.session.profiles[MockProfileHandler.URI] = MockProfileHandler self.profile.handle_connect() self.assertEqual(1, len(self.session.sent_messages)) xml = xmlio.Element('greeting')[ xmlio.Element('profile', uri=MockProfileHandler.URI) ] message = beep.Payload(xml).read() self.assertEqual(('RPY', 0, 0, False, 0, None, message), self.session.sent_messages[0]) def test_handle_greeting(self): """ Verify that the management profile calls the greeting_received() method of the initiator session. """ def greeting_received(profiles): greeting_received.called = True self.assertEqual(['test'], profiles) greeting_received.called = False self.session.greeting_received = greeting_received xml = xmlio.Element('greeting')[xmlio.Element('profile', uri='test')] message = beep.Payload(xml).read() self.channel.handle_data_frame('RPY', 0, False, 0L, None, message) assert greeting_received.called def test_handle_start(self): self.session.profiles[MockProfileHandler.URI] = MockProfileHandler xml = xmlio.Element('start', number=2)[ xmlio.Element('profile', uri=MockProfileHandler.URI), xmlio.Element('profile', uri='http://example.com/bogus') ] self.profile.handle_msg(0, beep.Payload(xml)) assert 2 in self.session.channels xml = xmlio.Element('profile', uri=MockProfileHandler.URI) message = beep.Payload(xml).read() self.assertEqual(('RPY', 0, 0, False, 0, None, message), self.session.sent_messages[0]) def test_handle_start_unsupported_profile(self): self.session.profiles[MockProfileHandler.URI] = MockProfileHandler xml = xmlio.Element('start', number=2)[ xmlio.Element('profile', uri='http://example.com/foo'), xmlio.Element('profile', uri='http://example.com/bar') ] self.assertRaises(beep.ProtocolError, self.profile.handle_msg, 0, beep.Payload(xml)) assert 2 not in self.session.channels def test_handle_start_channel_in_use(self): self.session.channels[2] = beep.Channel(self.session, 2, MockProfileHandler) orig_profile = self.session.channels[2].profile self.session.profiles[MockProfileHandler.URI] = MockProfileHandler xml = xmlio.Element('start', number=2)[ xmlio.Element('profile', uri=MockProfileHandler.URI) ] self.assertRaises(beep.ProtocolError, self.profile.handle_msg, 0, beep.Payload(xml)) assert self.session.channels[2].profile is orig_profile def test_handle_close(self): self.session.channels[1] = beep.Channel(self.session, 1, MockProfileHandler) xml = xmlio.Element('close', number=1, code=200) self.profile.handle_msg(0, beep.Payload(xml)) assert 1 not in self.session.channels xml = xmlio.Element('ok') message = beep.Payload(xml).read() self.assertEqual(('RPY', 0, 0, False, 0, None, message), self.session.sent_messages[0]) def test_handle_close_session(self): xml = xmlio.Element('close', number=0, code=200) self.profile.handle_msg(0, beep.Payload(xml)) assert 1 not in self.session.channels xml = xmlio.Element('ok') message = beep.Payload(xml).read() self.assertEqual(('RPY', 0, 0, False, 0, None, message), self.session.sent_messages[0]) assert self.session.closed def test_handle_close_channel_not_open(self): xml = xmlio.Element('close', number=1, code=200) self.assertRaises(beep.ProtocolError, self.profile.handle_msg, 0, beep.Payload(xml)) def test_handle_close_channel_busy(self): self.session.channels[1] = beep.Channel(self.session, 1, MockProfileHandler) self.session.channels[1].send_msg(beep.Payload('test')) assert self.session.channels[1].msgnos xml = xmlio.Element('close', number=1, code=200) self.assertRaises(beep.ProtocolError, self.profile.handle_msg, 0, beep.Payload(xml)) def test_handle_close_session_busy(self): self.session.channels[1] = beep.Channel(self.session, 1, MockProfileHandler) xml = xmlio.Element('close', number=0, code=200) self.assertRaises(beep.ProtocolError, self.profile.handle_msg, 0, beep.Payload(xml)) def test_send_start(self): """ Verify that a <start> request is sent correctly. """ self.profile.send_start([MockProfileHandler]) xml = xmlio.Element('start', number="1")[ xmlio.Element('profile', uri=MockProfileHandler.URI) ] message = beep.Payload(xml).read() self.assertEqual(('MSG', 0, 0, False, 0, None, message), self.session.sent_messages[0]) def test_send_start_ok(self): """ Verify that a positive reply to a <start> request is handled correctly, and the channel is created. """ self.profile.send_start([MockProfileHandler]) xml = xmlio.Element('profile', uri=MockProfileHandler.URI) message = beep.Payload(xml).read() self.channel.handle_data_frame('RPY', 0, False, 0L, None, message) assert isinstance(self.session.channels[1].profile, MockProfileHandler) def test_send_start_error(self): """ Verify that a negative reply to a <close> request is handled correctly, and no channel gets created. """ self.profile.send_start([MockProfileHandler]) xml = xmlio.Element('error', code=500)['ouch'] message = beep.Payload(xml).read() self.channel.handle_data_frame('ERR', 0, False, 0L, None, message) assert 1 not in self.session.channels def test_send_start_ok_with_callback(self): """ Verify that user-supplied callback for positive replies is invoked when a <profile> reply is received in response to a <start> request. """ def handle_ok(channelno, profile_uri): self.assertEqual(1, channelno) self.assertEqual(MockProfileHandler.URI, profile_uri) handle_ok.called = True handle_ok.called = False def handle_error(code, text): handle_error.called = True handle_error.called = False self.profile.send_start([MockProfileHandler], handle_ok=handle_ok, handle_error=handle_error) xml = xmlio.Element('profile', uri=MockProfileHandler.URI) message = beep.Payload(xml).read() self.channel.handle_data_frame('RPY', 0, False, 0L, None, message) assert isinstance(self.session.channels[1].profile, MockProfileHandler) assert handle_ok.called assert not handle_error.called def test_send_start_error_with_callback(self): """ Verify that user-supplied callback for negative replies is invoked when an error is received in response to a <start> request. """ def handle_ok(channelno, profile_uri): handle_ok.called = True handle_ok.called = False def handle_error(code, text): self.assertEqual(500, code) self.assertEqual('ouch', text) handle_error.called = True handle_error.called = False self.profile.send_start([MockProfileHandler], handle_ok=handle_ok, handle_error=handle_error) xml = xmlio.Element('error', code=500)['ouch'] message = beep.Payload(xml).read() self.channel.handle_data_frame('ERR', 0, False, 0L, None, message) assert 1 not in self.session.channels assert not handle_ok.called assert handle_error.called def test_send_close(self): """ Verify that a <close> request is sent correctly. """ self.profile.send_close(1, code=200) xml = xmlio.Element('close', number=1, code=200) message = beep.Payload(xml).read() self.assertEqual(('MSG', 0, 0, False, 0, None, message), self.session.sent_messages[0]) def test_send_close_ok(self): """ Verify that a positive reply to a <close> request is handled correctly, and the channel is closed. """ self.session.channels[1] = beep.Channel(self.session, 1, MockProfileHandler) self.profile.send_close(1, code=200) xml = xmlio.Element('ok') message = beep.Payload(xml).read() self.channel.handle_data_frame('RPY', 0, False, 0L, None, message) assert 1 not in self.session.channels def test_send_close_session_ok(self): """ Verify that a positive reply to a <close> request is handled correctly, and the channel is closed. """ self.profile.send_close(0, code=200) xml = xmlio.Element('ok') message = beep.Payload(xml).read() self.channel.handle_data_frame('RPY', 0, False, 0L, None, message) assert 0 not in self.session.channels assert self.session.closed def test_send_close_error(self): """ Verify that a negative reply to a <close> request is handled correctly, and the channel stays open. """ self.session.channels[1] = beep.Channel(self.session, 1, MockProfileHandler) self.profile.send_close(1, code=200) xml = xmlio.Element('error', code=500)['ouch'] message = beep.Payload(xml).read() self.channel.handle_data_frame('ERR', 0, False, 0L, None, message) assert 1 in self.session.channels def test_send_close_ok_with_callback(self): """ Verify that user-supplied callback for positive replies is invoked when an <ok> reply is received in response to a <close> request. """ self.session.channels[1] = beep.Channel(self.session, 1, MockProfileHandler) def handle_ok(): handle_ok.called = True handle_ok.called = False def handle_error(code, text): handle_error.called = True handle_error.called = False self.profile.send_close(1, code=200, handle_ok=handle_ok, handle_error=handle_error) xml = xmlio.Element('profile', uri=MockProfileHandler.URI) message = beep.Payload(xml).read() self.channel.handle_data_frame('RPY', 0, False, 0L, None, message) assert 1 not in self.session.channels assert handle_ok.called assert not handle_error.called def test_send_close_error_with_callback(self): """ Verify that user-supplied callback for negative replies is invoked when an error is received in response to a <close> request. """ self.session.channels[1] = beep.Channel(self.session, 1, MockProfileHandler) def handle_ok(channelno, profile_uri): handle_ok.called = True handle_ok.called = False def handle_error(code, text): self.assertEqual(500, code) self.assertEqual('ouch', text) handle_error.called = True handle_error.called = False self.profile.send_close(1, code=200, handle_ok=handle_ok, handle_error=handle_error) xml = xmlio.Element('error', code=500)['ouch'] message = beep.Payload(xml).read() self.channel.handle_data_frame('ERR', 0, False, 0L, None, message) assert 1 in self.session.channels assert not handle_ok.called assert handle_error.called def suite(): suite = unittest.TestSuite() suite.addTest(unittest.makeSuite(SessionTestCase, 'test')) suite.addTest(unittest.makeSuite(ChannelTestCase, 'test')) suite.addTest(unittest.makeSuite(ManagementProfileHandlerTestCase, 'test')) return suite if __name__ == '__main__': unittest.main(defaultTest='suite')