changeset 90:2c4e104afef8

The build master now transmits snapshot archives without blocking while reading the file and sending the BEEP frames. Closes #17.
author cmlenz
date Thu, 14 Jul 2005 17:10:44 +0000
parents 7b10a8b2d0c4
children 91db738c6a74
files bitten/master.py bitten/slave.py bitten/util/beep.py
diffstat 3 files changed, 69 insertions(+), 33 deletions(-) [+]
line wrap: on
line diff
--- a/bitten/master.py
+++ b/bitten/master.py
@@ -308,9 +308,9 @@
         #       asyncore push_with_producer()
         snapshot_path = self.master.get_snapshot(build, type, encoding)
         snapshot_name = os.path.basename(snapshot_path)
-        message = beep.Payload(file(snapshot_path).read(),
+        message = beep.Payload(file(snapshot_path), content_type=type,
                                content_disposition=snapshot_name,
-                               content_type=type, content_encoding=encoding)
+                               content_encoding=encoding)
         self.channel.send_msg(message, handle_reply=handle_reply)
 
 
--- a/bitten/slave.py
+++ b/bitten/slave.py
@@ -22,6 +22,7 @@
 import logging
 import os
 import platform
+import shutil
 import sys
 import tempfile
 
@@ -109,7 +110,13 @@
                 else:
                     archive_name = 'snapshot.zip'
             archive_path = os.path.join(workdir, archive_name)
-            file(archive_path, 'wb').write(payload.body)
+
+            archive_file = file(archive_path, 'wb')
+            try:
+                shutil.copyfileobj(payload.body, archive_file)
+            finally:
+                archive_file.close()
+
             logging.debug('Received snapshot archive: %s', archive_path)
 
             # Unpack the archive
--- a/bitten/util/beep.py
+++ b/bitten/util/beep.py
@@ -37,6 +37,10 @@
     set
 except NameError:
     from sets import Set as set
+try:
+    from cStringIO import StringIO
+except ImportError:
+    from StringIO import StringIO
 import sys
 import time
 
@@ -518,9 +522,7 @@
         self.msgno = msgno
         self.ansno = ansno
 
-        self.data = ''
-        if payload is not None:
-            self.data = payload.as_string()
+        self.payload = payload
         self.done = False
 
     def more(self):
@@ -529,30 +531,26 @@
         if self.done:
             return ''
 
-        if len(self.data) > self.channel.windowsize:
-            # If the size of the payload exceeds the current negotiated window
-            # size, fragment the message and send in smaller chunks
-            frame = self._make_frame(self.data[:self.channel.windowsize], True)
-            self.channel.seqno[1] += self.channel.windowsize
-            self.data = self.data[self.channel.windowsize:]
-            return frame
+        if self.payload:
+            data = self.payload.read(self.channel.windowsize)
+            if len(data) < self.channel.windowsize:
+                self.done = True
         else:
-            # Send the final frame
-            frame = self._make_frame(self.data, False)
-            self.channel.seqno[1] += len(self.data)
+            data = ''
             self.done = True
-            return frame
 
-    def _make_frame(self, payload, more=False):
         headerbits = [self.cmd, self.channel.channelno, self.msgno,
-                      more and '*' or '.', self.channel.seqno[1].value,
-                      len(payload)]
+                      self.done and '.' or '*', self.channel.seqno[1].value,
+                      len(data)]
         if self.cmd == 'ANS':
             assert self.ansno is not None
             headerbits.append(self.ansno)
         header = ' '.join([str(bit) for bit in headerbits])
         logging.debug('Sending frame [%s]', header)
-        return '\r\n'.join((header, payload, 'END', ''))
+        frame = '\r\n'.join((header, data, 'END', ''))
+        self.channel.seqno[1] += len(data)
+
+        return frame
 
 
 class ProfileHandler(object):
@@ -744,25 +742,56 @@
 class Payload(object):
     """MIME message for transmission as payload with BEEP."""
 
-    def __init__(self, data, content_type=BEEP_XML, content_disposition=None,
-                 content_encoding=None):
+    def __init__(self, data=None, content_type=BEEP_XML,
+                 content_disposition=None, content_encoding=None):
         """Initialize the payload."""
+        self._hdr_buf = None
         self.content_type = content_type
         self.content_disposition = content_disposition
         self.content_encoding = content_encoding
-        self.body = str(data)
+
+        if data is None:
+            data = ''
+        if isinstance(data, xmlio.Element):
+            self.body = StringIO(str(data))
+        elif isinstance(data, (str, unicode)):
+            self.body = StringIO(data)
+        else:
+            assert hasattr(data, 'read'), \
+                   'Payload data %s must provide a `read` method' % data
+            self.body = data
 
     def as_string(self):
-        hdrs = []
-        if self.content_type:
-            hdrs.append('Content-Type: ' + self.content_type)
-        if self.content_disposition:
-            hdrs.append('Content-Disposition: ' + self.content_disposition)
-        if self.content_encoding:
-            hdrs.append('Content-Transfer-Encoding: ' + self.content_encoding)
-        hdrs.append('')
+        return self.read()
 
-        return '\n'.join(hdrs) + '\n' + self.body
+    def read(self, size=None):
+        if self._hdr_buf is None:
+            hdrs = []
+            if self.content_type:
+                hdrs.append('Content-Type: ' + self.content_type)
+            if self.content_disposition:
+                hdrs.append('Content-Disposition: ' + self.content_disposition)
+            if self.content_encoding:
+                hdrs.append('Content-Transfer-Encoding: ' +
+                            self.content_encoding)
+            hdrs.append('')
+            self._hdr_buf = '\n'.join(hdrs) + '\n'
+
+        ret_buf = ''
+        if len(self._hdr_buf):
+            if size is not None and len(self._hdr_buf) > size:
+                ret_buf = self._hdr_buf[:size]
+                self._hdr_buf = self._hdr_buf[size:]
+                return ret_buf
+            ret_buf = self._hdr_buf
+            self._hdr_buf = ''
+
+        if not self.body.closed:
+            ret_buf = ret_buf + self.body.read((size or -1) - len(ret_buf))
+            if size is None or len(ret_buf) < size:
+                self.body.close()
+
+        return ret_buf
 
     def parse(cls, string):
         message = email.message_from_string(string)
Copyright (C) 2012-2017 Edgewall Software