# HG changeset patch # User cmlenz # Date 1119016513 0 # Node ID 1a7b9044b0a56101cd96c3c1d8a9f6b3d8cc43e2 # Parent e67713b7936f5be19cc8c6555ff77b125960d988 Basic TCP mapping and message fragmenting support. See #1. diff --git a/bitten/util/beep.py b/bitten/util/beep.py --- a/bitten/util/beep.py +++ b/bitten/util/beep.py @@ -163,8 +163,7 @@ self.header = ''.join(self.inbuf).split(' ') self.inbuf = [] if self.header[0] == 'SEQ': - # TCP mapping frame - raise NotImplementedError + size = 0 else: # Extract payload size to use as next terminator try: @@ -176,11 +175,11 @@ ' '.join(self.header)) self.header = None return - if size == 0: - self.payload = '' - self.set_terminator('END\r\n') - else: - self.set_terminator(size) + if size == 0: + self.payload = '' + self.set_terminator('END\r\n') + else: + self.set_terminator(size) elif self.payload is None: # Frame payload received self.payload = ''.join(self.inbuf) @@ -206,21 +205,26 @@ try: cmd = header[0].upper() channel = int(header[1]) - msgno = int(header[2]) - more = header[3] == '*' - seqno = int(header[4]) - size = int(header[5]) - ansno = None - if cmd == 'ANS': - ansno = int(header[6]) - self.channels[channel].handle_frame(cmd, msgno, more, seqno, - ansno, payload) + if cmd == 'SEQ': + ackno = int(header[2]) + window = int(header[3]) + self.channels[channel].handle_seq_frame(ackno, window) + else: + msgno = int(header[2]) + more = header[3] == '*' + seqno = int(header[4]) + size = int(header[5]) + ansno = None + if cmd == 'ANS': + ansno = int(header[6]) + self.channels[channel].handle_data_frame(cmd, msgno, more, + seqno, ansno, payload) except (ValueError, TypeError, ProtocolError), e: logging.exception(e) if channel == 0 and msgno is not None: self.channels[0].profile.send_error(msgno, 550, e) - def send_frame(self, cmd, channel, msgno, more, seqno, ansno=None, + def send_data_frame(self, cmd, channel, msgno, more, seqno, ansno=None, payload=''): """Send the specified data frame to the peer.""" headerbits = [cmd, channel, msgno, more and '*' or '.', seqno, @@ -232,6 +236,12 @@ logging.debug('Sending frame [%s]', header) self.push('\r\n'.join((header, payload, 'END', ''))) + def send_seq_frame(self, channel, ackno, window): + headerbits = ['SEQ', channel, ackno, window] + header = ' '.join([str(hb) for hb in headerbits]) + logging.debug('Sending frame [%s]', header) + self.push('\r\n'.join((header, 'END', ''))) + class Initiator(Session): """Root class for BEEP peers in the initiating role.""" @@ -305,6 +315,7 @@ """ self.session = session self.channelno = channelno + self.windowsize = 4096 self.inqueue = {} self.outqueue = [] self.reply_handlers = {} @@ -318,9 +329,20 @@ self.profile = profile self.profile.session = self.session self.profile.channel = self + self.profile.handle_connect() - def handle_frame(self, cmd, msgno, more, seqno, ansno, payload): + def handle_seq_frame(self, ackno, window): + """Process a TCP mapping frame (SEQ). + + @param ackno: the value of the next sequence number that the sender is + expecting to receive on this channel + @param window: window size, the number of payload octets per frame that + the sender is expecting to receive on this channel + """ + self.windowsize = window + + def handle_data_frame(self, cmd, msgno, more, seqno, ansno, payload): """Process a single data frame. @param cmd: The frame keyword (MSG, RPY, ERR, ANS or NUL) @@ -368,14 +390,24 @@ self.profile.handle_nul(msgno) def _send(self, cmd, msgno, ansno=None, message=None): - # TODO: Fragment and queue the message if necessary; - # First need TCP mapping (RFC 3081) for that to make real sense payload = '' if message is not None: payload = message.as_string() - self.session.send_frame(cmd, self.channelno, msgno, False, - self.seqno[1].value, payload=payload, - ansno=ansno) + + # If the size of the payload exceeds the current negotiated window size, + # fragment the message and send in smaller chunks + while len(payload) > self.windowsize: + window = payload[:self.windowsize] + self.session.send_data_frame(cmd, self.channelno, msgno, True, + self.seqno[1].value, payload=window, + ansno=ansno) + self.seqno[1] += len(window) + payload = payload[self.windowsize:] + + # Send the final frame + self.session.send_data_frame(cmd, self.channelno, msgno, False, + self.seqno[1].value, payload=payload, + ansno=ansno) self.seqno[1] += len(payload) def send_msg(self, message, handle_reply=None): diff --git a/bitten/util/tests/beep.py b/bitten/util/tests/beep.py --- a/bitten/util/tests/beep.py +++ b/bitten/util/tests/beep.py @@ -9,8 +9,8 @@ def __init__(self): self.sent_messages = [] - def send_frame(self, cmd, channel, msgno, more, seqno, ansno=None, - payload=''): + def send_data_frame(self, cmd, channel, msgno, more, seqno, ansno=None, + payload=''): self.sent_messages.append((cmd, channel, msgno, more, seqno, ansno, payload.strip())) @@ -44,7 +44,7 @@ profile. """ channel = beep.Channel(self.session, 0, self.profile) - channel.handle_frame('MSG', 0, False, 0, None, 'foo bar') + channel.handle_data_frame('MSG', 0, False, 0, None, 'foo bar') self.assertEqual(('MSG', 0, 'foo bar'), self.profile.handled_messages[0]) @@ -54,8 +54,8 @@ recombined message to the profile. """ channel = beep.Channel(self.session, 0, self.profile) - channel.handle_frame('MSG', 0, True, 0, None, 'foo ') - channel.handle_frame('MSG', 0, False, 4, None, 'bar') + channel.handle_data_frame('MSG', 0, True, 0, None, 'foo ') + channel.handle_data_frame('MSG', 0, False, 4, None, 'bar') self.assertEqual(('MSG', 0, 'foo bar'), self.profile.handled_messages[0]) @@ -64,10 +64,10 @@ Verify that the channel detects out-of-sync frames and bails. """ channel = beep.Channel(self.session, 0, self.profile) - channel.handle_frame('MSG', 0, False, 0L, None, 'foo bar') + channel.handle_data_frame('MSG', 0, False, 0L, None, 'foo bar') # The next sequence number should be 8; send 12 instead - self.assertRaises(beep.ProtocolError, channel.handle_frame, 'MSG', 0, - False, 12L, None, 'foo baz') + self.assertRaises(beep.ProtocolError, channel.handle_data_frame, 'MSG', + 0, False, 12L, None, 'foo baz') def test_send_single_frame_message(self): """ @@ -130,7 +130,7 @@ self.assertEqual(('MSG', 0, msgno, False, 0L, None, 'foo bar'), self.session.sent_messages[0]) assert msgno in channel.msgnos.keys() - channel.handle_frame('RPY', msgno, False, 0, None, '42') + channel.handle_data_frame('RPY', msgno, False, 0, None, '42') self.assertEqual(('RPY', msgno, '42'), self.profile.handled_messages[0]) assert msgno not in channel.msgnos.keys()