changeset 373:b81c67778df7

Flush pipes to the master before after each step so it shows up in the UI immediately, not just at the end of the build. Closes #18, #96
author wbell
date Mon, 23 Jul 2007 21:53:16 +0000
parents 67372eba421c
children 446092a2d2fe
files bitten/master.py bitten/slave.py bitten/util/beep.py
diffstat 3 files changed, 28 insertions(+), 10 deletions(-) [+]
line wrap: on
line diff
--- a/bitten/master.py
+++ b/bitten/master.py
@@ -219,7 +219,8 @@
 
         xml = xmlio.parse(config.recipe)
         xml.attr['project'] = os.path.basename(queue.env.path)
-        self.channel.send_msg(beep.Payload(xml), handle_reply=handle_reply)
+        self.channel.send_msg(beep.Payload(xml),
+                              handle_reply=handle_reply, force_flush=True)
 
     def send_snapshot(self, queue, build, snapshot):
         timestamp_delta = 0
--- a/bitten/slave.py
+++ b/bitten/slave.py
@@ -123,7 +123,7 @@
         for package, properties in self.config.packages.items():
             xml.append(xmlio.Element('package', name=package, **properties))
 
-        self.channel.send_msg(beep.Payload(xml), handle_reply)
+        self.channel.send_msg(beep.Payload(xml), handle_reply, True)
 
     def handle_msg(self, msgno, payload):
         """Handle either a build initiation or the transmission of a snapshot
--- a/bitten/util/beep.py
+++ b/bitten/util/beep.py
@@ -507,12 +507,13 @@
             elif cmd == 'NUL':
                 self.profile.handle_nul(msgno)
 
-    def send_msg(self, payload, handle_reply=None):
+    def send_msg(self, payload, handle_reply=None, force_flush=True):
         """Send a MSG frame to the peer.
         
         @param payload: The message payload (a `Payload` instance)
         @param handle_reply: A function that is called when a reply to this
                              message is received
+	@param force_flush: Flush the beep channel after sending the message
         @return: the message number assigned to the message
         """
         while True: # Find a unique message number
@@ -523,8 +524,12 @@
         if handle_reply is not None:
             assert callable(handle_reply), 'Reply handler must be callable'
             self.reply_handlers[msgno] = handle_reply
-        self.session.push_with_producer(FrameProducer(self, 'MSG', msgno, None,
+	if force_flush == True:
+            self.send_with_producer(FrameProducer(self, 'MSG', msgno, None,
                                                       payload))
+	else: 
+ 	    self.session.push_with_producer(FrameProducer(self, 'MSG', msgno, None, payload))
+
         return msgno
 
     def send_rpy(self, msgno, payload):
@@ -533,7 +538,7 @@
         @param msgno: The number of the message this reply is in reference to
         @param payload: The message payload (a `Payload` instance)
         """
-        self.session.push_with_producer(FrameProducer(self, 'RPY', msgno, None,
+        self.send_with_producer(FrameProducer(self, 'RPY', msgno, None,
                                                       payload))
 
     def send_err(self, msgno, payload):
@@ -542,7 +547,7 @@
         @param msgno: The number of the message this reply is in reference to
         @param payload: The message payload (a `Payload` instance)
         """
-        self.session.push_with_producer(FrameProducer(self, 'ERR', msgno, None,
+        self.send_with_producer(FrameProducer(self, 'ERR', msgno, None,
                                                       payload))
 
     def send_ans(self, msgno, payload):
@@ -554,7 +559,7 @@
         """
         ansnos = self.ansnos.setdefault(msgno, cycle_through(0, 2147483647))
         next_ansno = ansnos.next()
-        self.session.push_with_producer(FrameProducer(self, 'ANS', msgno,
+        self.send_with_producer(FrameProducer(self, 'ANS', msgno,
                                                       next_ansno, payload))
         return next_ansno
 
@@ -563,9 +568,21 @@
         
         @param msgno: The number of the message this reply is in reference to
         """
-        self.session.push_with_producer(FrameProducer(self, 'NUL', msgno))
+        self.send_with_producer(FrameProducer(self, 'NUL', msgno))
         del self.ansnos[msgno] # dealloc answer numbers for the message
 
+    def send_with_producer(self, fp): 
+        """Sends a message contained in the given FrameProducer to the peer, 
+        ensuring the message is flushed before continuing.      
+	""" 
+	# push with producer seems to send the first frame out the door 
+	self.session.push_with_producer(fp) 
+	# if there are any more, make sure they get out as well. 
+	if not fp.done:  
+	    while not fp.done: 
+	       asyncore.loop(count=1) 
+	    # make sure to flush the last bit. 
+	    asyncore.loop(count=1) 
 
 class ProfileHandler(object):
     """Abstract base class for handlers of specific BEEP profiles.
@@ -712,7 +729,7 @@
 
         log.debug('Requesting closure of channel %d', channelno)
         xml = xmlio.Element('close', number=channelno, code=code)
-        return self.channel.send_msg(Payload(xml), handle_reply)
+        return self.channel.send_msg(Payload(xml), handle_reply, True)
 
     def send_start(self, profiles, handle_ok=None, handle_error=None):
         """Send a request to start a new channel to the peer.
@@ -750,7 +767,7 @@
         xml = xmlio.Element('start', number=channelno)[
             [xmlio.Element('profile', uri=profile.URI) for profile in profiles]
         ]
-        return self.channel.send_msg(Payload(xml), handle_reply)
+        return self.channel.send_msg(Payload(xml), handle_reply, True)
 
 
 class Payload(object):
Copyright (C) 2012-2017 Edgewall Software