Mercurial > bitten > bitten-test
changeset 373:b81c67778df7
Flush pipes to the master before after each step so it shows up in the UI immediately, not just at the end of the build. Closes #18, #96
author | wbell |
---|---|
date | Mon, 23 Jul 2007 21:53:16 +0000 |
parents | 67372eba421c |
children | 446092a2d2fe |
files | bitten/master.py bitten/slave.py bitten/util/beep.py |
diffstat | 3 files changed, 28 insertions(+), 10 deletions(-) [+] |
line wrap: on
line diff
--- a/bitten/master.py +++ b/bitten/master.py @@ -219,7 +219,8 @@ xml = xmlio.parse(config.recipe) xml.attr['project'] = os.path.basename(queue.env.path) - self.channel.send_msg(beep.Payload(xml), handle_reply=handle_reply) + self.channel.send_msg(beep.Payload(xml), + handle_reply=handle_reply, force_flush=True) def send_snapshot(self, queue, build, snapshot): timestamp_delta = 0
--- a/bitten/slave.py +++ b/bitten/slave.py @@ -123,7 +123,7 @@ for package, properties in self.config.packages.items(): xml.append(xmlio.Element('package', name=package, **properties)) - self.channel.send_msg(beep.Payload(xml), handle_reply) + self.channel.send_msg(beep.Payload(xml), handle_reply, True) def handle_msg(self, msgno, payload): """Handle either a build initiation or the transmission of a snapshot
--- a/bitten/util/beep.py +++ b/bitten/util/beep.py @@ -507,12 +507,13 @@ elif cmd == 'NUL': self.profile.handle_nul(msgno) - def send_msg(self, payload, handle_reply=None): + def send_msg(self, payload, handle_reply=None, force_flush=True): """Send a MSG frame to the peer. @param payload: The message payload (a `Payload` instance) @param handle_reply: A function that is called when a reply to this message is received + @param force_flush: Flush the beep channel after sending the message @return: the message number assigned to the message """ while True: # Find a unique message number @@ -523,8 +524,12 @@ if handle_reply is not None: assert callable(handle_reply), 'Reply handler must be callable' self.reply_handlers[msgno] = handle_reply - self.session.push_with_producer(FrameProducer(self, 'MSG', msgno, None, + if force_flush == True: + self.send_with_producer(FrameProducer(self, 'MSG', msgno, None, payload)) + else: + self.session.push_with_producer(FrameProducer(self, 'MSG', msgno, None, payload)) + return msgno def send_rpy(self, msgno, payload): @@ -533,7 +538,7 @@ @param msgno: The number of the message this reply is in reference to @param payload: The message payload (a `Payload` instance) """ - self.session.push_with_producer(FrameProducer(self, 'RPY', msgno, None, + self.send_with_producer(FrameProducer(self, 'RPY', msgno, None, payload)) def send_err(self, msgno, payload): @@ -542,7 +547,7 @@ @param msgno: The number of the message this reply is in reference to @param payload: The message payload (a `Payload` instance) """ - self.session.push_with_producer(FrameProducer(self, 'ERR', msgno, None, + self.send_with_producer(FrameProducer(self, 'ERR', msgno, None, payload)) def send_ans(self, msgno, payload): @@ -554,7 +559,7 @@ """ ansnos = self.ansnos.setdefault(msgno, cycle_through(0, 2147483647)) next_ansno = ansnos.next() - self.session.push_with_producer(FrameProducer(self, 'ANS', msgno, + self.send_with_producer(FrameProducer(self, 'ANS', msgno, next_ansno, payload)) return next_ansno @@ -563,9 +568,21 @@ @param msgno: The number of the message this reply is in reference to """ - self.session.push_with_producer(FrameProducer(self, 'NUL', msgno)) + self.send_with_producer(FrameProducer(self, 'NUL', msgno)) del self.ansnos[msgno] # dealloc answer numbers for the message + def send_with_producer(self, fp): + """Sends a message contained in the given FrameProducer to the peer, + ensuring the message is flushed before continuing. + """ + # push with producer seems to send the first frame out the door + self.session.push_with_producer(fp) + # if there are any more, make sure they get out as well. + if not fp.done: + while not fp.done: + asyncore.loop(count=1) + # make sure to flush the last bit. + asyncore.loop(count=1) class ProfileHandler(object): """Abstract base class for handlers of specific BEEP profiles. @@ -712,7 +729,7 @@ log.debug('Requesting closure of channel %d', channelno) xml = xmlio.Element('close', number=channelno, code=code) - return self.channel.send_msg(Payload(xml), handle_reply) + return self.channel.send_msg(Payload(xml), handle_reply, True) def send_start(self, profiles, handle_ok=None, handle_error=None): """Send a request to start a new channel to the peer. @@ -750,7 +767,7 @@ xml = xmlio.Element('start', number=channelno)[ [xmlio.Element('profile', uri=profile.URI) for profile in profiles] ] - return self.channel.send_msg(Payload(xml), handle_reply) + return self.channel.send_msg(Payload(xml), handle_reply, True) class Payload(object):