# HG changeset patch # User cmlenz # Date 1122805724 0 # Node ID 8b613e24cc34ab30e6532859c97608b270181ab0 # Parent 88869d08ea2a5fbf0456d876153c4cb6a4451684 * Improved error reporting by using exceptions. * Added unit tests for detection of malformed BEEP frames. diff --git a/bitten/util/beep.py b/bitten/util/beep.py --- a/bitten/util/beep.py +++ b/bitten/util/beep.py @@ -56,7 +56,43 @@ class ProtocolError(Exception): - """Generic root class for BEEP exceptions.""" + """Generic root class for BEEP errors.""" + + _default_messages = { + 421: 'Service Not Available', + 450: 'Requested Action Not Taken', + 451: 'Requested Action Aborted', + 454: 'Temporary Authentication Failure', + 500: 'General Syntax Error', + 501: 'Syntax Error In Parameters', + 504: 'Parameter Not Implemented', + 530: 'Authentication Required', + 534: 'Authentication Mechanism Insufficient', + 535: 'Authentication Failure', + 537: 'Action Not Authorised For User', + 538: 'Authentication Mechanism Requires Encryption', + 550: 'Requested Action Not Taken', + 553: 'Parameter Invalid', + 554: 'Transaction Failed' + } + + def __init__(self, code, message=None): + if message is None: + message = ProtocolError._default_messages.get(code) + Exception.__init__(self, 'BEEP error %d (%s)' % (code, message)) + self.code = code + self.message = message + self.local = True + + def from_xml(cls, xml): + elem = xmlio.parse(xml) + obj = cls(int(elem.attr['code']), elem.gettext()) + obj.local = False + return obj + from_xml = classmethod(from_xml) + + def to_xml(self): + return xmlio.Element('error', code=self.code)[self.message] class TerminateSession(Exception): @@ -71,7 +107,7 @@ communication with the connected peer. """ def __init__(self, ip, port): - asyncore.dispatcher.__init__(self) + super(Listener, self).__init__(self) self.create_socket(socket.AF_INET, socket.SOCK_STREAM) self.set_reuse_addr() self.bind((ip, port)) @@ -87,7 +123,6 @@ def handle_read(self): """Called by asyncore to signal data available for reading.""" - pass def readable(self): """Called by asyncore to determine whether the channel is readable.""" @@ -207,7 +242,6 @@ if cls is TerminateSession: raise cls, value log.exception(value) - self.close() def collect_incoming_data(self, data): """Called by async_chat when data is received. @@ -235,12 +269,10 @@ try: size = int(self.header[int(self.header[0] != 'ANS') - 2]) except ValueError: - # TODO: Malformed frame... should we terminate the session - # here? log.error('Malformed frame header: [%s]', - ' '.join(self.header)) + ' '.join(self.header), exc_info=True) self.header = None - return + raise TerminateSession, 'Malformed frame header' if size == 0: self.payload = '' self.set_terminator('END\r\n') @@ -276,18 +308,27 @@ window = int(header[3]) self.channels[channel].handle_seq_frame(ackno, window) else: + assert cmd in ('MSG', 'RPY', 'ERR', 'ANS', 'NUL') msgno = int(header[2]) + assert header[3] in ('*', '.') more = header[3] == '*' seqno = int(header[4]) ansno = None if cmd == 'ANS': ansno = int(header[6]) - self.channels[channel].handle_data_frame(cmd, msgno, more, - seqno, ansno, payload) - except (ValueError, TypeError, ProtocolError), e: - log.exception(e) - if channel == 0 and msgno is not None: - self.channels[0].profile.send_error(msgno, 550, e) + try: + self.channels[channel].handle_data_frame(cmd, msgno, more, + seqno, ansno, + payload) + except ProtocolError, e: + log.exception(e) + if e.local and channel == 0 and msgno is not None: + xml = xmlio.Element('error', code=550)[e] + self.channels[channel].send_err(msgno, Payload(xml)) + + except (AssertionError, IndexError, TypeError, ValueError), e: + log.error('Malformed frame', exc_info=True) + raise TerminateSession, 'Malformed frame header' def terminate(self, handle_ok=None, handle_error=None): """Terminate the session by closing all channels.""" @@ -305,7 +346,7 @@ if handle_error is not None: handle_error(channelno, code, message) else: - raise ProtocolError, '%s (%d)' % (message, code) + raise ProtocolError(code, message) self.channels[0].profile.send_close(channelno, handle_ok=_handle_ok, handle_error=_handle_error) close_next_channel() @@ -410,7 +451,7 @@ """ # Validate and update sequence number if seqno != self.seqno[0]: - raise ProtocolError, 'Out of sync with peer' # TODO: Be nice + raise TerminateSession, 'Out of sync with peer' self.seqno[0] += len(payload) if more: @@ -463,6 +504,7 @@ break self.msgnos.add(msgno) # Flag the chosen message number as in use if handle_reply is not None: + assert callable(handle_reply), 'Reply handler must be callable' self.reply_handlers[msgno] = handle_reply self.session.push_with_producer(FrameProducer(self, 'MSG', msgno, None, payload)) @@ -508,57 +550,6 @@ del self.ansnos[msgno] # dealloc answer numbers for the message -class FrameProducer(object): - """Internal class that emits the frames of a BEEP message, based on the - `asynchat` `push_with_producer()` protocol. - """ - - def __init__(self, channel, cmd, msgno, ansno=None, payload=None): - """Initialize the frame producer. - - @param channel the channel the message is to be sent on - @param cmd the BEEP command/keyword (MSG, RPY, ERR, ANS or NUL) - @param msgno the message number - @param ansno the answer number (only for ANS messages) - @param payload the message payload (an instance of `Payload`) - """ - self.session = channel.session - self.channel = channel - self.cmd = cmd - self.msgno = msgno - self.ansno = ansno - - self.payload = payload - self.done = False - - def more(self): - """Called by `async_chat` when the producer has been pushed on the - producer FIFO and the channel is about to write.""" - if self.done: - return '' - - if self.payload: - data = self.payload.read(self.channel.windowsize) - if len(data) < self.channel.windowsize: - self.done = True - else: - data = '' - self.done = True - - headerbits = [self.cmd, self.channel.channelno, self.msgno, - self.done and '.' or '*', self.channel.seqno[1].value, - len(data)] - if self.cmd == 'ANS': - assert self.ansno is not None - headerbits.append(self.ansno) - header = ' '.join([str(bit) for bit in headerbits]) - log.debug('Sending frame [%s]', header) - frame = '\r\n'.join((header, data, 'END', '')) - self.channel.seqno[1] += len(data) - - return frame - - class ProfileHandler(object): """Abstract base class for handlers of specific BEEP profiles. @@ -613,14 +604,13 @@ def handle_msg(self, msgno, message): """Handle an incoming message.""" - assert message.content_type == BEEP_XML + assert message and message.content_type == BEEP_XML elem = xmlio.parse(message.body) if elem.name == 'start': channelno = int(elem.attr['number']) if channelno in self.session.channels: - self.send_error(msgno, 550, 'Channel already in use') - return + raise ProtocolError(550, 'Channel already in use') for profile in elem.children('profile'): if profile.attr['uri'] in self.session.profiles: log.debug('Start channel %s for profile <%s>', @@ -631,21 +621,18 @@ xml = xmlio.Element('profile', uri=profile.attr['uri']) self.channel.send_rpy(msgno, Payload(xml)) return - self.send_error(msgno, 550, - 'None of the requested profiles is supported') + raise ProtocolError(550, + 'None of the requested profiles is supported') elif elem.name == 'close': channelno = int(elem.attr['number']) if not channelno in self.session.channels: - self.send_error(msgno, 550, 'Channel not open') - return + raise ProtocolError(550, 'Channel not open') if channelno == 0: if len(self.session.channels) > 1: - self.send_error(msgno, 550, 'Other channels still open') - return + raise ProtocolError(550, 'Other channels still open') if self.session.channels[channelno].msgnos: - self.send_error(msgno, 550, 'Channel waiting for replies') - return + raise ProtocolError(550, 'Channel waiting for replies') self.session.channels[channelno].close() self.channel.send_rpy(msgno, Payload(xmlio.Element('ok'))) if not self.session.channels: @@ -653,27 +640,20 @@ def handle_rpy(self, msgno, message): """Handle a positive reply.""" - assert message.content_type == BEEP_XML - elem = xmlio.parse(message.body) - - if elem.name == 'greeting': - if isinstance(self.session, Initiator): - profiles = [p.attr['uri'] for p in elem.children('profile')] - self.session.greeting_received(profiles) - - else: # and are handled by callbacks - self.send_error(msgno, 501, 'What are you replying to, son?') + if message.content_type == BEEP_XML: + elem = xmlio.parse(message.body) + if elem.name == 'greeting': + if isinstance(self.session, Initiator): + profiles = [p.attr['uri'] for p in elem.children('profile')] + self.session.greeting_received(profiles) def handle_err(self, msgno, message): """Handle a negative reply.""" # Probably an error on connect, because other errors should get handled # by the corresponding callbacks # TODO: Terminate the session, I guess - assert message.content_type == BEEP_XML - elem = xmlio.parse(message.body) - assert elem.name == 'error' - log.warning('Received error in response to message #%d: %s (%d)', - msgno, elem.gettext(), int(elem.attr['code'])) + if message.content_type == BEEP_XML: + raise ProtocolError.from_xml(message.body) def send_close(self, channelno=0, code=200, handle_ok=None, handle_error=None): @@ -687,24 +667,16 @@ if handle_ok is not None: handle_ok() elif cmd == 'ERR': - elem = xmlio.parse(message.body) - text = elem.gettext() - code = int(elem.attr['code']) + error = ProtocolError.from_xml(message.body) log.debug('Peer refused to start channel %d: %s (%d)', - channelno, text, code) + channelno, error.message, error.code) if handle_error is not None: - handle_error(code, text) + handle_error(error.code, error.message) log.debug('Requesting closure of channel %d', channelno) xml = xmlio.Element('close', number=channelno, code=code) return self.channel.send_msg(Payload(xml), handle_reply) - def send_error(self, msgno, code, message=''): - """Send an error reply to the peer.""" - log.warning('%s (%d)', message, code) - xml = xmlio.Element('error', code=code)[message] - self.channel.send_err(msgno, Payload(xml)) - def send_start(self, profiles, handle_ok=None, handle_error=None): """Send a request to start a new channel to the peer. @@ -767,9 +739,6 @@ 'Payload data %s must provide a `read` method' % data self.body = data - def as_string(self): - return self.read() - def read(self, size=None): if self._hdr_buf is None: hdrs = [] @@ -809,6 +778,56 @@ parse = classmethod(parse) +class FrameProducer(object): + """Internal class that emits the frames of a BEEP message, based on the + `asynchat` `push_with_producer()` protocol. + """ + def __init__(self, channel, cmd, msgno, ansno=None, payload=None): + """Initialize the frame producer. + + @param channel the channel the message is to be sent on + @param cmd the BEEP command/keyword (MSG, RPY, ERR, ANS or NUL) + @param msgno the message number + @param ansno the answer number (only for ANS messages) + @param payload the message payload (an instance of `Payload`) + """ + self.session = channel.session + self.channel = channel + self.cmd = cmd + self.msgno = msgno + self.ansno = ansno + + self.payload = payload + self.done = False + + def more(self): + """Called by `async_chat` when the producer has been pushed on the + producer FIFO and the channel is about to write.""" + if self.done: + return '' + + if self.payload: + data = self.payload.read(self.channel.windowsize) + if len(data) < self.channel.windowsize: + self.done = True + else: + data = '' + self.done = True + + headerbits = [self.cmd, self.channel.channelno, self.msgno, + self.done and '.' or '*', self.channel.seqno[1].value, + len(data)] + if self.cmd == 'ANS': + assert self.ansno is not None + headerbits.append(self.ansno) + header = ' '.join([str(bit) for bit in headerbits]) + log.debug('Sending frame [%s]', header) + frame = '\r\n'.join((header, data, 'END', '')) + self.channel.seqno[1] += len(data) + + return frame + + def cycle_through(start, stop=None, step=1): """Utility generator that cycles through a defined range of numbers.""" if stop is None: @@ -818,13 +837,12 @@ while True: yield cur cur += step - if cur > stop: + if cur > stop: cur = start class SerialNumber(object): """Serial number (RFC 1982).""" - def __init__(self, limit=4294967295L): self.value = 0L self.limit = limit diff --git a/bitten/util/tests/beep.py b/bitten/util/tests/beep.py --- a/bitten/util/tests/beep.py +++ b/bitten/util/tests/beep.py @@ -60,25 +60,57 @@ pass def handle_msg(self, msgno, message): - text = message.as_string().strip() + text = message.read().strip() self.handled_messages.append(('MSG', msgno, text, None)) def handle_rpy(self, msgno, message): - text = message.as_string().strip() + text = message.read().strip() self.handled_messages.append(('RPY', msgno, text, None)) def handle_err(self, msgno, message): - text = message.as_string().strip() + text = message.read().strip() self.handled_messages.append(('ERR', msgno, text, None)) def handle_ans(self, msgno, ansno, message): - text = message.as_string().strip() + text = message.read().strip() self.handled_messages.append(('ANS', msgno, text, ansno)) def handle_nul(self, msgno): self.handled_messages.append(('NUL', msgno, '', None)) +class SessionTestCase(unittest.TestCase): + """Unit tests for the `beep.Session` class.""" + + def setUp(self): + self.session = beep.Session() + + def test_malformed_frame_invalid_keyword(self): + self.session.collect_incoming_data('XYZ 0 0 . 0 0') + self.session.found_terminator() + self.assertRaises(beep.TerminateSession, self.session.found_terminator) + + def test_malformed_frame_invalid_parameter1(self): + self.session.collect_incoming_data('MSG x y . z 0') + self.session.found_terminator() + self.assertRaises(beep.TerminateSession, self.session.found_terminator) + + def test_malformed_frame_invalid_parameter2(self): + self.session.collect_incoming_data('MSG 0 0 + 0 0') + self.session.found_terminator() + self.assertRaises(beep.TerminateSession, self.session.found_terminator) + + def test_malformed_frame_missing_size_param(self): + self.session.collect_incoming_data('MSG 0 0 . 0') + self.session.found_terminator() + self.assertRaises(beep.TerminateSession, self.session.found_terminator) + + def test_malformed_frame_missing_ansno_param(self): + self.session.collect_incoming_data('ANS 0 0 . 0 0') + self.session.found_terminator() + self.assertRaises(beep.TerminateSession, self.session.found_terminator) + + class ChannelTestCase(unittest.TestCase): def setUp(self): @@ -112,8 +144,8 @@ channel = beep.Channel(self.session, 0, MockProfileHandler) channel.handle_data_frame('MSG', 0, False, 0L, None, 'foo bar') # The next sequence number should be 8; send 12 instead - self.assertRaises(beep.ProtocolError, channel.handle_data_frame, 'MSG', - 0, False, 12L, None, 'foo baz') + self.assertRaises(beep.TerminateSession, channel.handle_data_frame, + 'MSG', 0, False, 12L, None, 'foo baz') def test_send_single_frame_message(self): """ @@ -260,7 +292,7 @@ self.profile.handle_connect() self.assertEqual(1, len(self.session.sent_messages)) xml = xmlio.Element('greeting') - message = beep.Payload(xml).as_string() + message = beep.Payload(xml).read() self.assertEqual(('RPY', 0, 0, False, 0, None, message), self.session.sent_messages[0]) @@ -275,7 +307,7 @@ xml = xmlio.Element('greeting')[ xmlio.Element('profile', uri=MockProfileHandler.URI) ] - message = beep.Payload(xml).as_string() + message = beep.Payload(xml).read() self.assertEqual(('RPY', 0, 0, False, 0, None, message), self.session.sent_messages[0]) @@ -290,7 +322,7 @@ greeting_received.called = False self.session.greeting_received = greeting_received xml = xmlio.Element('greeting')[xmlio.Element('profile', uri='test')] - message = beep.Payload(xml).as_string() + message = beep.Payload(xml).read() self.channel.handle_data_frame('RPY', 0, False, 0L, None, message) assert greeting_received.called @@ -304,7 +336,7 @@ assert 2 in self.session.channels xml = xmlio.Element('profile', uri=MockProfileHandler.URI) - message = beep.Payload(xml).as_string() + message = beep.Payload(xml).read() self.assertEqual(('RPY', 0, 0, False, 0, None, message), self.session.sent_messages[0]) @@ -314,15 +346,9 @@ xmlio.Element('profile', uri='http://example.com/foo'), xmlio.Element('profile', uri='http://example.com/bar') ] - self.profile.handle_msg(0, beep.Payload(xml)) - + self.assertRaises(beep.ProtocolError, self.profile.handle_msg, 0, + beep.Payload(xml)) assert 2 not in self.session.channels - xml = xmlio.Element('error', code=550)[ - 'None of the requested profiles is supported' - ] - message = beep.Payload(xml).as_string() - self.assertEqual(('ERR', 0, 0, False, 0, None, message), - self.session.sent_messages[0]) def test_handle_start_channel_in_use(self): self.session.channels[2] = beep.Channel(self.session, 2, @@ -332,13 +358,9 @@ xml = xmlio.Element('start', number=2)[ xmlio.Element('profile', uri=MockProfileHandler.URI) ] - self.profile.handle_msg(0, beep.Payload(xml)) - + self.assertRaises(beep.ProtocolError, self.profile.handle_msg, 0, + beep.Payload(xml)) assert self.session.channels[2].profile is orig_profile - xml = xmlio.Element('error', code=550)['Channel already in use'] - message = beep.Payload(xml).as_string() - self.assertEqual(('ERR', 0, 0, False, 0, None, message), - self.session.sent_messages[0]) def test_handle_close(self): self.session.channels[1] = beep.Channel(self.session, 1, @@ -348,7 +370,7 @@ assert 1 not in self.session.channels xml = xmlio.Element('ok') - message = beep.Payload(xml).as_string() + message = beep.Payload(xml).read() self.assertEqual(('RPY', 0, 0, False, 0, None, message), self.session.sent_messages[0]) @@ -358,19 +380,15 @@ assert 1 not in self.session.channels xml = xmlio.Element('ok') - message = beep.Payload(xml).as_string() + message = beep.Payload(xml).read() self.assertEqual(('RPY', 0, 0, False, 0, None, message), self.session.sent_messages[0]) assert self.session.closed def test_handle_close_channel_not_open(self): xml = xmlio.Element('close', number=1, code=200) - self.profile.handle_msg(0, beep.Payload(xml)) - - xml = xmlio.Element('error', code=550)['Channel not open'] - message = beep.Payload(xml).as_string() - self.assertEqual(('ERR', 0, 0, False, 0, None, message), - self.session.sent_messages[0]) + self.assertRaises(beep.ProtocolError, self.profile.handle_msg, 0, + beep.Payload(xml)) def test_handle_close_channel_busy(self): self.session.channels[1] = beep.Channel(self.session, 1, @@ -379,34 +397,16 @@ assert self.session.channels[1].msgnos xml = xmlio.Element('close', number=1, code=200) - self.profile.handle_msg(0, beep.Payload(xml)) - - xml = xmlio.Element('error', code=550)['Channel waiting for replies'] - message = beep.Payload(xml).as_string() - self.assertEqual(('ERR', 0, 0, False, 0, None, message), - self.session.sent_messages[1]) + self.assertRaises(beep.ProtocolError, self.profile.handle_msg, 0, + beep.Payload(xml)) def test_handle_close_session_busy(self): self.session.channels[1] = beep.Channel(self.session, 1, MockProfileHandler) xml = xmlio.Element('close', number=0, code=200) - self.profile.handle_msg(0, beep.Payload(xml)) - - xml = xmlio.Element('error', code=550)['Other channels still open'] - message = beep.Payload(xml).as_string() - self.assertEqual(('ERR', 0, 0, False, 0, None, message), - self.session.sent_messages[0]) - - def test_send_error(self): - """ - Verify that a negative reply is sent as expected. - """ - self.profile.send_error(0, 521, 'ouch') - xml = xmlio.Element('error', code=521)['ouch'] - message = beep.Payload(xml).as_string() - self.assertEqual(('ERR', 0, 0, False, 0, None, message), - self.session.sent_messages[0]) + self.assertRaises(beep.ProtocolError, self.profile.handle_msg, 0, + beep.Payload(xml)) def test_send_start(self): """ @@ -416,7 +416,7 @@ xml = xmlio.Element('start', number="1")[ xmlio.Element('profile', uri=MockProfileHandler.URI) ] - message = beep.Payload(xml).as_string() + message = beep.Payload(xml).read() self.assertEqual(('MSG', 0, 0, False, 0, None, message), self.session.sent_messages[0]) @@ -427,7 +427,7 @@ """ self.profile.send_start([MockProfileHandler]) xml = xmlio.Element('profile', uri=MockProfileHandler.URI) - message = beep.Payload(xml).as_string() + message = beep.Payload(xml).read() self.channel.handle_data_frame('RPY', 0, False, 0L, None, message) assert isinstance(self.session.channels[1].profile, MockProfileHandler) @@ -438,7 +438,7 @@ """ self.profile.send_start([MockProfileHandler]) xml = xmlio.Element('error', code=500)['ouch'] - message = beep.Payload(xml).as_string() + message = beep.Payload(xml).read() self.channel.handle_data_frame('ERR', 0, False, 0L, None, message) assert 1 not in self.session.channels @@ -459,7 +459,7 @@ handle_error=handle_error) xml = xmlio.Element('profile', uri=MockProfileHandler.URI) - message = beep.Payload(xml).as_string() + message = beep.Payload(xml).read() self.channel.handle_data_frame('RPY', 0, False, 0L, None, message) assert isinstance(self.session.channels[1].profile, MockProfileHandler) assert handle_ok.called @@ -482,7 +482,7 @@ handle_error=handle_error) xml = xmlio.Element('error', code=500)['ouch'] - message = beep.Payload(xml).as_string() + message = beep.Payload(xml).read() self.channel.handle_data_frame('ERR', 0, False, 0L, None, message) assert 1 not in self.session.channels assert not handle_ok.called @@ -494,7 +494,7 @@ """ self.profile.send_close(1, code=200) xml = xmlio.Element('close', number=1, code=200) - message = beep.Payload(xml).as_string() + message = beep.Payload(xml).read() self.assertEqual(('MSG', 0, 0, False, 0, None, message), self.session.sent_messages[0]) @@ -508,7 +508,7 @@ self.profile.send_close(1, code=200) xml = xmlio.Element('ok') - message = beep.Payload(xml).as_string() + message = beep.Payload(xml).read() self.channel.handle_data_frame('RPY', 0, False, 0L, None, message) assert 1 not in self.session.channels @@ -520,7 +520,7 @@ self.profile.send_close(0, code=200) xml = xmlio.Element('ok') - message = beep.Payload(xml).as_string() + message = beep.Payload(xml).read() self.channel.handle_data_frame('RPY', 0, False, 0L, None, message) assert 0 not in self.session.channels assert self.session.closed @@ -535,7 +535,7 @@ self.profile.send_close(1, code=200) xml = xmlio.Element('error', code=500)['ouch'] - message = beep.Payload(xml).as_string() + message = beep.Payload(xml).read() self.channel.handle_data_frame('ERR', 0, False, 0L, None, message) assert 1 in self.session.channels @@ -556,7 +556,7 @@ handle_error=handle_error) xml = xmlio.Element('profile', uri=MockProfileHandler.URI) - message = beep.Payload(xml).as_string() + message = beep.Payload(xml).read() self.channel.handle_data_frame('RPY', 0, False, 0L, None, message) assert 1 not in self.session.channels assert handle_ok.called @@ -581,7 +581,7 @@ handle_error=handle_error) xml = xmlio.Element('error', code=500)['ouch'] - message = beep.Payload(xml).as_string() + message = beep.Payload(xml).read() self.channel.handle_data_frame('ERR', 0, False, 0L, None, message) assert 1 in self.session.channels assert not handle_ok.called @@ -590,6 +590,7 @@ def suite(): suite = unittest.TestSuite() + suite.addTest(unittest.makeSuite(SessionTestCase, 'test')) suite.addTest(unittest.makeSuite(ChannelTestCase, 'test')) suite.addTest(unittest.makeSuite(ManagementProfileHandlerTestCase, 'test')) return suite