changeset 23:1a7b9044b0a5

Basic TCP mapping and message fragmenting support. See #1.
author cmlenz
date Fri, 17 Jun 2005 13:55:13 +0000
parents e67713b7936f
children cad6d28b8975
files bitten/util/beep.py bitten/util/tests/beep.py
diffstat 2 files changed, 64 insertions(+), 32 deletions(-) [+]
line wrap: on
line diff
--- a/bitten/util/beep.py
+++ b/bitten/util/beep.py
@@ -163,8 +163,7 @@
             self.header = ''.join(self.inbuf).split(' ')
             self.inbuf = []
             if self.header[0] == 'SEQ':
-                # TCP mapping frame
-                raise NotImplementedError                
+                size = 0
             else:
                 # Extract payload size to use as next terminator
                 try:
@@ -176,11 +175,11 @@
                                   ' '.join(self.header))
                     self.header = None
                     return
-                if size == 0:
-                    self.payload = ''
-                    self.set_terminator('END\r\n')
-                else:
-                    self.set_terminator(size)
+            if size == 0:
+                self.payload = ''
+                self.set_terminator('END\r\n')
+            else:
+                self.set_terminator(size)
         elif self.payload is None:
             # Frame payload received
             self.payload = ''.join(self.inbuf)
@@ -206,21 +205,26 @@
         try:
             cmd = header[0].upper()
             channel = int(header[1])
-            msgno = int(header[2])
-            more = header[3] == '*'
-            seqno = int(header[4])
-            size = int(header[5])
-            ansno = None
-            if cmd == 'ANS':
-                ansno = int(header[6])
-            self.channels[channel].handle_frame(cmd, msgno, more, seqno,
-                                                ansno, payload)
+            if cmd == 'SEQ':
+                ackno = int(header[2])
+                window = int(header[3])
+                self.channels[channel].handle_seq_frame(ackno, window)
+            else:
+                msgno = int(header[2])
+                more = header[3] == '*'
+                seqno = int(header[4])
+                size = int(header[5])
+                ansno = None
+                if cmd == 'ANS':
+                    ansno = int(header[6])
+                self.channels[channel].handle_data_frame(cmd, msgno, more,
+                                                         seqno, ansno, payload)
         except (ValueError, TypeError, ProtocolError), e:
             logging.exception(e)
             if channel == 0 and msgno is not None:
                 self.channels[0].profile.send_error(msgno, 550, e)
 
-    def send_frame(self, cmd, channel, msgno, more, seqno, ansno=None,
+    def send_data_frame(self, cmd, channel, msgno, more, seqno, ansno=None,
                   payload=''):
         """Send the specified data frame to the peer."""
         headerbits = [cmd, channel, msgno, more and '*' or '.', seqno,
@@ -232,6 +236,12 @@
         logging.debug('Sending frame [%s]', header)
         self.push('\r\n'.join((header, payload, 'END', '')))
 
+    def send_seq_frame(self, channel, ackno, window):
+        headerbits = ['SEQ', channel, ackno, window]
+        header = ' '.join([str(hb) for hb in headerbits])
+        logging.debug('Sending frame [%s]', header)
+        self.push('\r\n'.join((header, 'END', '')))
+
 
 class Initiator(Session):
     """Root class for BEEP peers in the initiating role."""
@@ -305,6 +315,7 @@
         """
         self.session = session
         self.channelno = channelno
+        self.windowsize = 4096
         self.inqueue = {}
         self.outqueue = []
         self.reply_handlers = {}
@@ -318,9 +329,20 @@
         self.profile = profile
         self.profile.session = self.session
         self.profile.channel = self
+
         self.profile.handle_connect()
 
-    def handle_frame(self, cmd, msgno, more, seqno, ansno, payload):
+    def handle_seq_frame(self, ackno, window):
+        """Process a TCP mapping frame (SEQ).
+        
+        @param ackno: the value of the next sequence number that the sender is
+                      expecting to receive on this channel
+        @param window: window size, the number of payload octets per frame that
+                       the sender is expecting to receive on this channel
+        """
+        self.windowsize = window
+
+    def handle_data_frame(self, cmd, msgno, more, seqno, ansno, payload):
         """Process a single data frame.
 
         @param cmd: The frame keyword (MSG, RPY, ERR, ANS or NUL)
@@ -368,14 +390,24 @@
                 self.profile.handle_nul(msgno)
 
     def _send(self, cmd, msgno, ansno=None, message=None):
-        # TODO: Fragment and queue the message if necessary;
-        #       First need TCP mapping (RFC 3081) for that to make real sense
         payload = ''
         if message is not None:
             payload = message.as_string()
-        self.session.send_frame(cmd, self.channelno, msgno, False,
-                                self.seqno[1].value, payload=payload,
-                                ansno=ansno)
+
+        # If the size of the payload exceeds the current negotiated window size,
+        # fragment the message and send in smaller chunks
+        while len(payload) > self.windowsize:
+            window = payload[:self.windowsize]
+            self.session.send_data_frame(cmd, self.channelno, msgno, True,
+                                         self.seqno[1].value, payload=window,
+                                         ansno=ansno)
+            self.seqno[1] += len(window)
+            payload = payload[self.windowsize:]
+
+        # Send the final frame
+        self.session.send_data_frame(cmd, self.channelno, msgno, False,
+                                     self.seqno[1].value, payload=payload,
+                                     ansno=ansno)
         self.seqno[1] += len(payload)
 
     def send_msg(self, message, handle_reply=None):
--- a/bitten/util/tests/beep.py
+++ b/bitten/util/tests/beep.py
@@ -9,8 +9,8 @@
     def __init__(self):
         self.sent_messages = []
 
-    def send_frame(self, cmd, channel, msgno, more, seqno, ansno=None,
-                   payload=''):
+    def send_data_frame(self, cmd, channel, msgno, more, seqno, ansno=None,
+                        payload=''):
         self.sent_messages.append((cmd, channel, msgno, more, seqno, ansno,
                                    payload.strip()))
 
@@ -44,7 +44,7 @@
         profile.
         """
         channel = beep.Channel(self.session, 0, self.profile)
-        channel.handle_frame('MSG', 0, False, 0, None, 'foo bar')
+        channel.handle_data_frame('MSG', 0, False, 0, None, 'foo bar')
         self.assertEqual(('MSG', 0, 'foo bar'),
                          self.profile.handled_messages[0])
 
@@ -54,8 +54,8 @@
         recombined message to the profile.
         """
         channel = beep.Channel(self.session, 0, self.profile)
-        channel.handle_frame('MSG', 0, True, 0, None, 'foo ')
-        channel.handle_frame('MSG', 0, False, 4, None, 'bar')
+        channel.handle_data_frame('MSG', 0, True, 0, None, 'foo ')
+        channel.handle_data_frame('MSG', 0, False, 4, None, 'bar')
         self.assertEqual(('MSG', 0, 'foo bar'),
                          self.profile.handled_messages[0])
 
@@ -64,10 +64,10 @@
         Verify that the channel detects out-of-sync frames and bails.
         """
         channel = beep.Channel(self.session, 0, self.profile)
-        channel.handle_frame('MSG', 0, False, 0L, None, 'foo bar')
+        channel.handle_data_frame('MSG', 0, False, 0L, None, 'foo bar')
         # The next sequence number should be 8; send 12 instead
-        self.assertRaises(beep.ProtocolError, channel.handle_frame, 'MSG', 0,
-                          False, 12L, None, 'foo baz')
+        self.assertRaises(beep.ProtocolError, channel.handle_data_frame, 'MSG',
+                          0, False, 12L, None, 'foo baz')
 
     def test_send_single_frame_message(self):
         """
@@ -130,7 +130,7 @@
         self.assertEqual(('MSG', 0, msgno, False, 0L, None, 'foo bar'),
                          self.session.sent_messages[0])
         assert msgno in channel.msgnos.keys()
-        channel.handle_frame('RPY', msgno, False, 0, None, '42')
+        channel.handle_data_frame('RPY', msgno, False, 0, None, '42')
         self.assertEqual(('RPY', msgno, '42'), self.profile.handled_messages[0])
         assert msgno not in channel.msgnos.keys()
 
Copyright (C) 2012-2017 Edgewall Software