# HG changeset patch # User cmlenz # Date 1129294271 0 # Node ID a4aed338b3c336ad74640bc31893495520c446c9 # Parent 1141027071b315f457b9f5aee70202784f0d968f Extract event loop logic into a separate class, and allow a BEEP initiator to have an event loop, too. diff --git a/bitten/util/beep.py b/bitten/util/beep.py --- a/bitten/util/beep.py +++ b/bitten/util/beep.py @@ -19,9 +19,9 @@ import asynchat import asyncore -import bisect from datetime import datetime, timedelta import email +from heapq import heappop, heappush import logging import socket try: @@ -88,7 +88,48 @@ """Signal termination of a session.""" -class Listener(asyncore.dispatcher): +class EventLoop(object): + """An `asyncore` event loop. + + This will interrupt the normal asyncore loop at configurable intervals, and + execute scheduled callbacks. + """ + + def __init__(self): + """Create the event loop.""" + self.eventqueue = [] + + def run(self, timeout=15.0, granularity=5): + """Start listening to incoming connections.""" + granularity = timedelta(seconds=granularity) + socket_map = asyncore.socket_map + last_event_check = datetime.min + while socket_map: + now = datetime.now() + if now - last_event_check >= granularity: + last_event_check = now + while self.eventqueue: + when, callback = heappop(self.eventqueue) + if now < when: + heappush(self.eventqueue, (when, callback)) + break + callback() + asyncore.poll(timeout) + + def schedule(self, delta, callback): + """Schedule a function to be called. + + @param delta: The number of seconds after which the callback should be + invoked + @param callback: The function to call + """ + when = datetime.now() + timedelta(seconds=delta) + log.debug('Scheduling event %s to run at %s', callback.__name__, when) + + heappush(self.eventqueue, (when, callback)) + + +class Listener(EventLoop, asyncore.dispatcher): """BEEP peer in the listener role. This peer opens a socket for listening to incoming connections. For each @@ -96,6 +137,7 @@ communication with the connected peer. """ def __init__(self, ip, port): + EventLoop.__init__(self) asyncore.dispatcher.__init__(self) self.create_socket(socket.AF_INET, socket.SOCK_STREAM) self.set_reuse_addr() @@ -103,7 +145,6 @@ self.bind((ip, port)) self.sessions = [] self.profiles = {} # Mapping from URIs to ProfileHandler sub-classes - self.eventqueue = [] log.debug('Listening to connections on %s:%d', ip, port) self.listen(5) @@ -125,48 +166,11 @@ self.sessions.append(Session(self, conn, (ip, port), self.profiles, first_channelno=2)) - def run(self, timeout=15.0, granularity=5): - """Start listening to incoming connections.""" - granularity = timedelta(seconds=granularity) - socket_map = asyncore.socket_map - last_event_check = datetime.min - while socket_map: - now = datetime.now() - if now - last_event_check >= granularity: - last_event_check = now - fired = [] - i = j = 0 - while i < len(self.eventqueue): - when, callback = self.eventqueue[i] - if now >= when: - fired.append(callback) - j = i + 1 - else: - break - i = i + 1 - if fired: - self.eventqueue = self.eventqueue[j:] - for callback in fired: - callback(now) - asyncore.poll(timeout) - - def schedule(self, delta, callback): - """Schedule a function to be called. - - @param delta: The number of seconds after which the callback should be - invoked - @param callback: The function to call - """ - when = datetime.now() + timedelta(seconds=delta) - log.debug('Scheduling event %s to run at %s', callback.__name__, when) - - bisect.insort(self.eventqueue, (when, callback)) - def quit(self): if not self.sessions: self.close() return - def terminate_next_session(when=None): + def terminate_next_session(): session = self.sessions[-1] def handle_ok(): if self.sessions: @@ -233,7 +237,7 @@ """Called by asyncore when an exception is raised.""" cls, value = sys.exc_info()[:2] if cls is TerminateSession: - raise cls, value + raise log.exception(value) def collect_incoming_data(self, data): @@ -314,10 +318,10 @@ seqno, ansno, payload) except ProtocolError, e: - log.exception(e) - if e.local and channel == 0 and msgno is not None: - xml = xmlio.Element('error', code=550)[e] - self.channels[channel].send_err(msgno, Payload(xml)) + log.error(e) + if e.local and msgno is not None: + self.channels[channel].send_err(msgno, + Payload(e.to_xml())) except (AssertionError, IndexError, TypeError, ValueError), e: log.error('Malformed frame', exc_info=True) @@ -345,7 +349,7 @@ close_next_channel() -class Initiator(Session): +class Initiator(EventLoop, Session): """Root class for BEEP peers in the initiating role.""" def __init__(self, ip, port, profiles=None): @@ -358,11 +362,13 @@ `ProfileHandler` sub-class that will be instantiated to handle the communication for that profile """ + EventLoop.__init__(self) Session.__init__(self, profiles=profiles or {}) self.create_socket(socket.AF_INET, socket.SOCK_STREAM) log.debug('Connecting to %s:%s', ip, port) - self.addr = (ip, port) - self.connect(self.addr) + if ip and port: + self.addr = (ip, port) + self.connect(self.addr) def handle_connect(self): """Called by asyncore when the connection is established."""