changeset 14:1733c601d2f8

Refactored the asyncore loop and shutdown procedure into {{{beep.Initiator}}}.
author cmlenz
date Thu, 16 Jun 2005 20:37:38 +0000
parents 21aa17f97522
children 06207499c58c
files bitten/master.py bitten/slave.py bitten/util/beep.py
diffstat 3 files changed, 69 insertions(+), 58 deletions(-) [+]
line wrap: on
line diff
--- a/bitten/master.py
+++ b/bitten/master.py
@@ -27,7 +27,7 @@
 from bitten.util.xmlio import Element, parse_xml
 
 
-class BittenProfile(beep.Profile):
+class BittenProfileHandler(beep.Profile):
     URI = 'http://bitten.cmlenz.net/beep-profile/'
 
     def __init__(self):
@@ -46,6 +46,7 @@
                     os = child.gettext()
                     os_family = child.family
                     os_version = child.version
+
             rpy = beep.MIMEMessage(Element('ok'), beep.BEEP_XML)
             self.channel.send_rpy(msgno, rpy)
             print 'Connected to %s (%s running %s %s [%s])' \
@@ -67,7 +68,7 @@
         port = 7633
 
     listener = beep.Listener('localhost', port)
-    listener.profiles[BittenProfile.URI] = BittenProfile()
+    listener.profiles[BittenProfileHandler.URI] = BittenProfileHandler()
     try:
         asyncore.loop()
     except KeyboardInterrupt:
--- a/bitten/slave.py
+++ b/bitten/slave.py
@@ -31,6 +31,7 @@
 class Slave(beep.Initiator):
 
     channelno = None # The channel number used by the bitten profile
+    terminated = False
 
     def channel_started(self, channelno, profile_uri):
         if profile_uri == BittenProfileHandler.URI:
@@ -41,6 +42,7 @@
             self.channels[0].profile.send_start([BittenProfileHandler],
                                                 handle_ok=self.channel_started)
 
+
 class BittenProfileHandler(beep.Profile):
     """Handles communication on the Bitten profile from the client perspective.
     """
@@ -98,18 +100,6 @@
 
     slave = Slave(host, port)
     try:
-        try:
-            asyncore.loop()
-        except KeyboardInterrupt, beep.TerminateSession:
-            def handle_ok():
-                raise asyncore.ExitNow, 'Session terminated'
-            def handle_error(code, message):
-                print>>sys.stderr, \
-                    'Build master refused to terminate session (%d): %s' \
-                    % (code, message)
-            slave.channels[0].profile.send_close(slave.channelno)
-            slave.channels[0].profile.send_close(handle_ok=handle_ok,
-                                                 handle_error=handle_error)
-            time.sleep(.25)
+        slave.run()
     except beep.TerminateSession, e:
-        print e
+        print 'Session terminated:', e
--- a/bitten/util/beep.py
+++ b/bitten/util/beep.py
@@ -32,6 +32,7 @@
 from email.Parser import Parser
 import socket
 import sys
+import time
 
 from bitten.util.xmlio import Element, parse_xml
 
@@ -151,10 +152,12 @@
             self.set_terminator('END\r\n')
         else:
             # Frame trailer received
-            self._handle_frame(self.header, self.payload)
-            self.header = self.payload = None
-            self.inbuf = []
-            self.set_terminator('\r\n')
+            try:
+                self._handle_frame(self.header, self.payload)
+            finally:
+                self.header = self.payload = None
+                self.inbuf = []
+                self.set_terminator('\r\n')
 
     def _handle_frame(self, header, payload):
         """Handle an incoming frame.
@@ -205,9 +208,13 @@
                          for that profile
         """
         Session.__init__(self, None, None, profiles or {})
+        self.terminated = False
         self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
         self.connect((ip, port))
 
+    def handle_close(self):
+        self.terminated = True
+
     def greeting_received(self, profiles):
         """Sub-classes should override this to start the channels they need.
         
@@ -215,6 +222,31 @@
                          support.
         """
 
+    def run(self):
+        """Start this peer, which will try to connect to the server and send a
+        greeting.
+        """
+        while not self.terminated:
+            try:
+                asyncore.loop()
+                print 'Normal exit'
+                self.terminated = True
+            except (KeyboardInterrupt, TerminateSession):
+                self._quit()
+                time.sleep(.25)
+
+    def _quit(self):
+        channelno = max(self.channels.keys())
+        def handle_ok():
+            if channelno == 0:
+                self.terminated = True
+            else:
+                self._quit()
+        def handle_error(code, message):
+            raise ProtocolError, '%s (%d)' % (message, code)
+        self.channels[0].profile.send_close(channelno, handle_ok=handle_ok,
+                                            handle_error=handle_error)
+
 
 class Channel(object):
     """A specific channel of a BEEP session."""
@@ -397,6 +429,9 @@
 
         elif elem.tagname == 'close':
             channelno = int(elem.number)
+            if not channelno in self.session.channels:
+                self.send_error(msgno, 550, 'Channel not open')
+                return
             if channelno == 0:
                 if len(self.session.channels) > 1:
                     self.send_error(msgno, 550, 'Other channels still open')
@@ -404,11 +439,10 @@
             if self.session.channels[channelno].msgnos:
                 self.send_error(msgno, 550, 'Channel waiting for replies')
                 return
-            print 'Close channel %s' % (channelno)
             del self.session.channels[channelno]
             message = MIMEMessage(Element('ok'), BEEP_XML)
             self.channel.send_rpy(msgno, message)
-            if channelno == 0:
+            if not self.session.channels:
                 self.session.close()
 
     def handle_rpy(self, msgno, message):
@@ -430,19 +464,22 @@
         assert message.get_content_type() == BEEP_XML
         elem = parse_xml(message.get_payload())
         assert elem.tagname == 'error'
-        print elem.code
+        print 'Received error in response to message #%d: %s (%s)' \
+              % (msgno, elem.gettext(), elem.code)
 
     def send_close(self, channelno=0, code=200, handle_ok=None,
                    handle_error=None):
-
         def handle_reply(cmd, msgno, message):
-            if handle_ok is not None and cmd == 'RPY':
+            if cmd == 'RPY':
                 del self.session.channels[channelno]
-                handle_ok()
-            if handle_error is not None and cmd == 'ERR':
+                if handle_ok is not None:
+                    handle_ok()
+                if not self.session.channels:
+                    self.session.close()
+            elif cmd == 'ERR':
                 elem = parse_xml(message.get_payload())
-                handle_error(int(elem.code), elem.gettext())
-
+                if handle_error is not None:
+                    handle_error(int(elem.code), elem.gettext())
         xml = Element('close', number=channelno, code=code)
         return self.channel.send_msg(MIMEMessage(xml, BEEP_XML), handle_reply)
 
@@ -452,7 +489,6 @@
 
     def send_start(self, profiles, handle_ok=None, handle_error=None):
         channelno = self.session.channelno.next()
-
         def handle_reply(cmd, msgno, message):
             if handle_ok is not None and cmd == 'RPY':
                 elem = parse_xml(message.get_payload())
@@ -472,6 +508,18 @@
         return self.channel.send_msg(MIMEMessage(xml, BEEP_XML), handle_reply)
 
 
+class MIMEMessage(Message):
+    """Simplified construction of generic MIME messages for transmission as
+    payload with BEEP."""
+
+    def __init__(self, payload, content_type=None):
+        Message.__init__(self)
+        if content_type:
+            self.set_type(content_type)
+        self.set_payload(str(payload))
+        del self['MIME-Version']
+
+
 def cycle_through(start, stop=None, step=1):
     """Utility generator that cycles through a defined range of numbers."""
     if stop is None:
@@ -503,31 +551,3 @@
         if self.value > self.limit:
             self.value -= self.limit
         return self
-
-
-class MIMEMessage(Message):
-    """Simplified construction of generic MIME messages for transmission as
-    payload with BEEP."""
-
-    def __init__(self, payload, content_type=None):
-        Message.__init__(self)
-        if content_type:
-            self.set_type(content_type)
-        self.set_payload(str(payload))
-        del self['MIME-Version']
-
-
-if __name__ == '__main__':
-    # Simple echo profile implementation for testing
-    class EchoProfile(Profile):
-        URI = 'http://beepcore.org/beep/ECHO'
-
-        def handle_msg(self, msgno, message):
-            self.channel.send_rpy(msgno, message)
-
-    listener = Listener('127.0.0.1', 8000)
-    listener.profiles[EchoProfile.URI] = EchoProfile()
-    try:
-        asyncore.loop()
-    except KeyboardInterrupt:
-        pass
Copyright (C) 2012-2017 Edgewall Software