changeset 89:7b10a8b2d0c4

Changed BEEP frame sending mechanism to use the `push_with_producer()` function provided by `asynchat`. In preparation for #17.
author cmlenz
date Thu, 14 Jul 2005 15:10:30 +0000
parents 8ae753a22494
children 2c4e104afef8
files bitten/util/beep.py bitten/util/tests/beep.py
diffstat 2 files changed, 92 insertions(+), 64 deletions(-) [+]
line wrap: on
line diff
--- a/bitten/util/beep.py
+++ b/bitten/util/beep.py
@@ -279,39 +279,6 @@
             if channel == 0 and msgno is not None:
                 self.channels[0].profile.send_error(msgno, 550, e)
 
-    def send_data_frame(self, cmd, channel, msgno, more, seqno, ansno=None,
-                  payload=''):
-        """Send the specified data frame to the peer.
-        
-        @param cmd: The frame keyword (one of MSG, RPY, ERR, ANS, or NUL)
-        @param channel: The number of the sending channel
-        @param msgno: The message number
-        @param more: Whether this is the last frame belonging to the message
-        @param seqno: The frame-specific sequence number
-        @param ansno: For ANS frames, the answer number, otherwise `None`
-        @param payload: The payload of the frame
-        """
-        headerbits = [cmd, channel, msgno, more and '*' or '.', seqno,
-                      len(payload)]
-        if cmd == 'ANS':
-            assert ansno is not None
-            headerbits.append(ansno)
-        header = ' '.join([str(bit) for bit in headerbits])
-        logging.debug('Sending frame [%s]', header)
-        self.push('\r\n'.join((header, payload, 'END', '')))
-
-    def send_seq_frame(self, channel, ackno, window):
-        """Send a SEQ frame to the peer.
-        
-        @param channel: The number of the sending channel
-        @param ackno: The acknowledgement number.
-        @param window: The requested window size
-        """
-        headerbits = ['SEQ', channel, ackno, window]
-        header = ' '.join([str(bit) for bit in headerbits])
-        logging.debug('Sending frame [%s]', header)
-        self.push('\r\n'.join((header, 'END', '')))
-
     def terminate(self, handle_ok=None, handle_error=None):
         """Terminate the session by closing all channels."""
         def close_next_channel():
@@ -472,34 +439,13 @@
             elif cmd == 'NUL':
                 self.profile.handle_nul(msgno)
 
-    def _send(self, cmd, msgno, ansno=None, payload=None):
-        """Send a frame to the peer."""
-        data = ''
-        if payload is not None:
-            data = payload.as_string()
-
-        # If the size of the payload exceeds the current negotiated window size,
-        # fragment the message and send in smaller chunks
-        while len(data) > self.windowsize:
-            window = data[:self.windowsize]
-            self.session.send_data_frame(cmd, self.channelno, msgno, True,
-                                         self.seqno[1].value, payload=window,
-                                         ansno=ansno)
-            self.seqno[1] += self.windowsize
-            data = data[self.windowsize:]
-
-        # Send the final frame
-        self.session.send_data_frame(cmd, self.channelno, msgno, False,
-                                     self.seqno[1].value, payload=data,
-                                     ansno=ansno)
-        self.seqno[1] += len(data)
-
     def send_msg(self, payload, handle_reply=None):
         """Send a MSG frame to the peer.
         
         @param payload: The message payload (a `Payload` instance)
         @param handle_reply: A function that is called when a reply to this
                              message is received
+        @return: the message number assigned to the message
         """
         while True: # Find a unique message number
             msgno = self.msgno.next()
@@ -508,7 +454,8 @@
         self.msgnos.add(msgno) # Flag the chosen message number as in use
         if handle_reply is not None:
             self.reply_handlers[msgno] = handle_reply
-        self._send('MSG', msgno, None, payload)
+        self.session.push_with_producer(FrameProducer(self, 'MSG', msgno, None,
+                                                      payload))
         return msgno
 
     def send_rpy(self, msgno, payload):
@@ -517,7 +464,8 @@
         @param msgno: The number of the message this reply is in reference to
         @param payload: The message payload (a `Payload` instance)
         """
-        self._send('RPY', msgno, None, payload)
+        self.session.push_with_producer(FrameProducer(self, 'RPY', msgno, None,
+                                                      payload))
 
     def send_err(self, msgno, payload):
         """Send an ERR frame to the peer.
@@ -525,17 +473,20 @@
         @param msgno: The number of the message this reply is in reference to
         @param payload: The message payload (a `Payload` instance)
         """
-        self._send('ERR', msgno, None, payload)
+        self.session.push_with_producer(FrameProducer(self, 'ERR', msgno, None,
+                                                      payload))
 
     def send_ans(self, msgno, payload):
         """Send an ANS frame to the peer.
         
         @param msgno: The number of the message this reply is in reference to
         @param payload: The message payload (a `Payload` instance)
+        @return: the answer number assigned to the answer
         """
         ansnos = self.ansnos.setdefault(msgno, cycle_through(0, 2147483647))
         next_ansno = ansnos.next()
-        self._send('ANS', msgno, next_ansno, payload)
+        self.session.push_with_producer(FrameProducer(self, 'ANS', msgno,
+                                                      next_ansno, payload))
         return next_ansno
 
     def send_nul(self, msgno):
@@ -543,10 +494,67 @@
         
         @param msgno: The number of the message this reply is in reference to
         """
-        self._send('NUL', msgno)
+        self.session.push_with_producer(FrameProducer(self, 'NUL', msgno))
         del self.ansnos[msgno] # dealloc answer numbers for the message
 
 
+class FrameProducer(object):
+    """Internal class that emits the frames of a BEEP message, based on the
+    `asynchat` `push_with_producer()` protocol.
+    """
+
+    def __init__(self, channel, cmd, msgno, ansno=None, payload=None):
+        """Initialize the frame producer.
+        
+        @param channel the channel the message is to be sent on
+        @param cmd the BEEP command/keyword (MSG, RPY, ERR, ANS or NUL)
+        @param msgno the message number
+        @param ansno the answer number (only for ANS messages)
+        @param payload the message payload (an instance of `Payload`)
+        """
+        self.session = channel.session
+        self.channel = channel
+        self.cmd = cmd
+        self.msgno = msgno
+        self.ansno = ansno
+
+        self.data = ''
+        if payload is not None:
+            self.data = payload.as_string()
+        self.done = False
+
+    def more(self):
+        """Called by `async_chat` when the producer has been pushed on the
+        producer FIFO and the channel is about to write."""
+        if self.done:
+            return ''
+
+        if len(self.data) > self.channel.windowsize:
+            # If the size of the payload exceeds the current negotiated window
+            # size, fragment the message and send in smaller chunks
+            frame = self._make_frame(self.data[:self.channel.windowsize], True)
+            self.channel.seqno[1] += self.channel.windowsize
+            self.data = self.data[self.channel.windowsize:]
+            return frame
+        else:
+            # Send the final frame
+            frame = self._make_frame(self.data, False)
+            self.channel.seqno[1] += len(self.data)
+            self.done = True
+            return frame
+
+    def _make_frame(self, payload, more=False):
+        headerbits = [self.cmd, self.channel.channelno, self.msgno,
+                      more and '*' or '.', self.channel.seqno[1].value,
+                      len(payload)]
+        if self.cmd == 'ANS':
+            assert self.ansno is not None
+            headerbits.append(self.ansno)
+        header = ' '.join([str(bit) for bit in headerbits])
+        logging.debug('Sending frame [%s]', header)
+        return '\r\n'.join((header, payload, 'END', ''))
+
+
 class ProfileHandler(object):
     """Abstract base class for handlers of specific BEEP profiles.
     
--- a/bitten/util/tests/beep.py
+++ b/bitten/util/tests/beep.py
@@ -19,11 +19,31 @@
     def close(self):
         self.closed = True
 
-    def send_data_frame(self, cmd, channel, msgno, more, seqno, ansno=None,
-                        payload=''):
+    def push_with_producer(self, producer):
         assert not self.closed
-        self.sent_messages.append((cmd, channel, msgno, more, seqno, ansno,
-                                   payload.strip()))
+        while True:
+            frame = producer.more()
+            if not frame:
+                break
+            header, rest = frame.split('\r\n', 1)
+            header = header.split(' ')
+            cmd = header[0].upper()
+            channel = int(header[1])
+            msgno = int(header[2])
+            more = header[3] == '*'
+            seqno = int(header[4])
+            size = int(header[5])
+            ansno = None
+            if cmd == 'ANS':
+                ansno = int(header[6])
+            if size == 0:
+                payload = ''
+            else:
+                payload = rest[:size]
+                rest = rest[size:]
+            self.sent_messages.append((cmd, channel, msgno, more, seqno, ansno,
+                                       payload.strip()))
+            assert rest == '\r\nEND\r\n'
 
 
 class MockProfileHandler(object):
Copyright (C) 2012-2017 Edgewall Software