changeset 278:a4aed338b3c3

Extract event loop logic into a separate class, and allow a BEEP initiator to have an event loop, too.
author cmlenz
date Fri, 14 Oct 2005 12:51:11 +0000
parents 1141027071b3
children 5e7b6337d77c
files bitten/util/beep.py
diffstat 1 files changed, 55 insertions(+), 49 deletions(-) [+]
line wrap: on
line diff
--- a/bitten/util/beep.py
+++ b/bitten/util/beep.py
@@ -19,9 +19,9 @@
 
 import asynchat
 import asyncore
-import bisect
 from datetime import datetime, timedelta
 import email
+from heapq import heappop, heappush
 import logging
 import socket
 try:
@@ -88,7 +88,48 @@
     """Signal termination of a session."""
 
 
-class Listener(asyncore.dispatcher):
+class EventLoop(object):
+    """An `asyncore` event loop.
+    
+    This will interrupt the normal asyncore loop at configurable intervals, and
+    execute scheduled callbacks.
+    """
+
+    def __init__(self):
+        """Create the event loop."""
+        self.eventqueue = []
+
+    def run(self, timeout=15.0, granularity=5):
+        """Start listening to incoming connections."""
+        granularity = timedelta(seconds=granularity)
+        socket_map = asyncore.socket_map
+        last_event_check = datetime.min
+        while socket_map:
+            now = datetime.now()
+            if now - last_event_check >= granularity:
+                last_event_check = now
+                while self.eventqueue:
+                    when, callback = heappop(self.eventqueue)
+                    if now < when:
+                        heappush(self.eventqueue, (when, callback))
+                        break
+                    callback()
+            asyncore.poll(timeout)
+
+    def schedule(self, delta, callback):
+        """Schedule a function to be called.
+        
+        @param delta: The number of seconds after which the callback should be
+                      invoked
+        @param callback: The function to call
+        """
+        when = datetime.now() + timedelta(seconds=delta)
+        log.debug('Scheduling event %s to run at %s', callback.__name__, when)
+
+        heappush(self.eventqueue, (when, callback))
+
+
+class Listener(EventLoop, asyncore.dispatcher):
     """BEEP peer in the listener role.
     
     This peer opens a socket for listening to incoming connections. For each
@@ -96,6 +137,7 @@
     communication with the connected peer.
     """
     def __init__(self, ip, port):
+        EventLoop.__init__(self)
         asyncore.dispatcher.__init__(self)
         self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
         self.set_reuse_addr()
@@ -103,7 +145,6 @@
         self.bind((ip, port))
         self.sessions = []
         self.profiles = {} # Mapping from URIs to ProfileHandler sub-classes
-        self.eventqueue = []
         log.debug('Listening to connections on %s:%d', ip, port)
         self.listen(5)
 
@@ -125,48 +166,11 @@
         self.sessions.append(Session(self, conn, (ip, port), self.profiles,
                                      first_channelno=2))
 
-    def run(self, timeout=15.0, granularity=5):
-        """Start listening to incoming connections."""
-        granularity = timedelta(seconds=granularity)
-        socket_map = asyncore.socket_map
-        last_event_check = datetime.min
-        while socket_map:
-            now = datetime.now()
-            if now - last_event_check >= granularity:
-                last_event_check = now
-                fired = []
-                i = j = 0
-                while i < len(self.eventqueue):
-                    when, callback = self.eventqueue[i]
-                    if now >= when:
-                        fired.append(callback)
-                        j = i + 1
-                    else:
-                        break
-                    i = i + 1
-                if fired:
-                    self.eventqueue = self.eventqueue[j:]
-                    for callback in fired:
-                        callback(now)
-            asyncore.poll(timeout)
-
-    def schedule(self, delta, callback):
-        """Schedule a function to be called.
-        
-        @param delta: The number of seconds after which the callback should be
-                      invoked
-        @param callback: The function to call
-        """
-        when = datetime.now() + timedelta(seconds=delta)
-        log.debug('Scheduling event %s to run at %s', callback.__name__, when)
-
-        bisect.insort(self.eventqueue, (when, callback))
-
     def quit(self):
         if not self.sessions:
             self.close()
             return
-        def terminate_next_session(when=None):
+        def terminate_next_session():
             session = self.sessions[-1]
             def handle_ok():
                 if self.sessions:
@@ -233,7 +237,7 @@
         """Called by asyncore when an exception is raised."""
         cls, value = sys.exc_info()[:2]
         if cls is TerminateSession:
-            raise cls, value
+            raise
         log.exception(value)
 
     def collect_incoming_data(self, data):
@@ -314,10 +318,10 @@
                                                              seqno, ansno,
                                                              payload)
                 except ProtocolError, e:
-                    log.exception(e)
-                    if e.local and channel == 0 and msgno is not None:
-                        xml = xmlio.Element('error', code=550)[e]
-                        self.channels[channel].send_err(msgno, Payload(xml))
+                    log.error(e)
+                    if e.local and msgno is not None:
+                        self.channels[channel].send_err(msgno,
+                                                        Payload(e.to_xml()))
 
         except (AssertionError, IndexError, TypeError, ValueError), e:
             log.error('Malformed frame', exc_info=True)
@@ -345,7 +349,7 @@
         close_next_channel()
 
 
-class Initiator(Session):
+class Initiator(EventLoop, Session):
     """Root class for BEEP peers in the initiating role."""
 
     def __init__(self, ip, port, profiles=None):
@@ -358,11 +362,13 @@
                          `ProfileHandler` sub-class that will be instantiated to
                          handle the communication for that profile
         """
+        EventLoop.__init__(self)
         Session.__init__(self, profiles=profiles or {})
         self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
         log.debug('Connecting to %s:%s', ip, port)
-        self.addr = (ip, port)
-        self.connect(self.addr)
+        if ip and port:
+            self.addr = (ip, port)
+            self.connect(self.addr)
 
     def handle_connect(self):
         """Called by asyncore when the connection is established."""
Copyright (C) 2012-2017 Edgewall Software