# HG changeset patch # User cmlenz # Date 1121361044 0 # Node ID 2c4e104afef804324529e2d920b110b396216175 # Parent 7b10a8b2d0c49893712ce2a93c9a6a6a3e9055a0 The build master now transmits snapshot archives without blocking while reading the file and sending the BEEP frames. Closes #17. diff --git a/bitten/master.py b/bitten/master.py --- a/bitten/master.py +++ b/bitten/master.py @@ -308,9 +308,9 @@ # asyncore push_with_producer() snapshot_path = self.master.get_snapshot(build, type, encoding) snapshot_name = os.path.basename(snapshot_path) - message = beep.Payload(file(snapshot_path).read(), + message = beep.Payload(file(snapshot_path), content_type=type, content_disposition=snapshot_name, - content_type=type, content_encoding=encoding) + content_encoding=encoding) self.channel.send_msg(message, handle_reply=handle_reply) diff --git a/bitten/slave.py b/bitten/slave.py --- a/bitten/slave.py +++ b/bitten/slave.py @@ -22,6 +22,7 @@ import logging import os import platform +import shutil import sys import tempfile @@ -109,7 +110,13 @@ else: archive_name = 'snapshot.zip' archive_path = os.path.join(workdir, archive_name) - file(archive_path, 'wb').write(payload.body) + + archive_file = file(archive_path, 'wb') + try: + shutil.copyfileobj(payload.body, archive_file) + finally: + archive_file.close() + logging.debug('Received snapshot archive: %s', archive_path) # Unpack the archive diff --git a/bitten/util/beep.py b/bitten/util/beep.py --- a/bitten/util/beep.py +++ b/bitten/util/beep.py @@ -37,6 +37,10 @@ set except NameError: from sets import Set as set +try: + from cStringIO import StringIO +except ImportError: + from StringIO import StringIO import sys import time @@ -518,9 +522,7 @@ self.msgno = msgno self.ansno = ansno - self.data = '' - if payload is not None: - self.data = payload.as_string() + self.payload = payload self.done = False def more(self): @@ -529,30 +531,26 @@ if self.done: return '' - if len(self.data) > self.channel.windowsize: - # If the size of the payload exceeds the current negotiated window - # size, fragment the message and send in smaller chunks - frame = self._make_frame(self.data[:self.channel.windowsize], True) - self.channel.seqno[1] += self.channel.windowsize - self.data = self.data[self.channel.windowsize:] - return frame + if self.payload: + data = self.payload.read(self.channel.windowsize) + if len(data) < self.channel.windowsize: + self.done = True else: - # Send the final frame - frame = self._make_frame(self.data, False) - self.channel.seqno[1] += len(self.data) + data = '' self.done = True - return frame - def _make_frame(self, payload, more=False): headerbits = [self.cmd, self.channel.channelno, self.msgno, - more and '*' or '.', self.channel.seqno[1].value, - len(payload)] + self.done and '.' or '*', self.channel.seqno[1].value, + len(data)] if self.cmd == 'ANS': assert self.ansno is not None headerbits.append(self.ansno) header = ' '.join([str(bit) for bit in headerbits]) logging.debug('Sending frame [%s]', header) - return '\r\n'.join((header, payload, 'END', '')) + frame = '\r\n'.join((header, data, 'END', '')) + self.channel.seqno[1] += len(data) + + return frame class ProfileHandler(object): @@ -744,25 +742,56 @@ class Payload(object): """MIME message for transmission as payload with BEEP.""" - def __init__(self, data, content_type=BEEP_XML, content_disposition=None, - content_encoding=None): + def __init__(self, data=None, content_type=BEEP_XML, + content_disposition=None, content_encoding=None): """Initialize the payload.""" + self._hdr_buf = None self.content_type = content_type self.content_disposition = content_disposition self.content_encoding = content_encoding - self.body = str(data) + + if data is None: + data = '' + if isinstance(data, xmlio.Element): + self.body = StringIO(str(data)) + elif isinstance(data, (str, unicode)): + self.body = StringIO(data) + else: + assert hasattr(data, 'read'), \ + 'Payload data %s must provide a `read` method' % data + self.body = data def as_string(self): - hdrs = [] - if self.content_type: - hdrs.append('Content-Type: ' + self.content_type) - if self.content_disposition: - hdrs.append('Content-Disposition: ' + self.content_disposition) - if self.content_encoding: - hdrs.append('Content-Transfer-Encoding: ' + self.content_encoding) - hdrs.append('') + return self.read() - return '\n'.join(hdrs) + '\n' + self.body + def read(self, size=None): + if self._hdr_buf is None: + hdrs = [] + if self.content_type: + hdrs.append('Content-Type: ' + self.content_type) + if self.content_disposition: + hdrs.append('Content-Disposition: ' + self.content_disposition) + if self.content_encoding: + hdrs.append('Content-Transfer-Encoding: ' + + self.content_encoding) + hdrs.append('') + self._hdr_buf = '\n'.join(hdrs) + '\n' + + ret_buf = '' + if len(self._hdr_buf): + if size is not None and len(self._hdr_buf) > size: + ret_buf = self._hdr_buf[:size] + self._hdr_buf = self._hdr_buf[size:] + return ret_buf + ret_buf = self._hdr_buf + self._hdr_buf = '' + + if not self.body.closed: + ret_buf = ret_buf + self.body.read((size or -1) - len(ret_buf)) + if size is None or len(ret_buf) < size: + self.body.close() + + return ret_buf def parse(cls, string): message = email.message_from_string(string)