changeset 18:591a5a836ecc

* {{{beep.Listener}}} now has an event loop (based on code mostly from medusa) * The Bitten master now opens a Trac environment and checks for changes to the repository every 15 seconds. * {{{beep.Profile}}} renamed to {{{beep.ProfileHandler}}}
author cmlenz
date Fri, 17 Jun 2005 09:09:07 +0000
parents 97219c9c70bf
children 9db5f8eddb0d
files bitten/master.py bitten/slave.py bitten/util/beep.py bitten/util/tests/beep.py
diffstat 4 files changed, 88 insertions(+), 20 deletions(-) [+]
line wrap: on
line diff
--- a/bitten/master.py
+++ b/bitten/master.py
@@ -18,21 +18,50 @@
 #
 # Author: Christopher Lenz <cmlenz@gmx.de>
 
-import asyncore
 import getopt
 import logging
 import os.path
 import sys
+import time
+
+from trac.env import Environment
 
 from bitten.util import beep
 from bitten.util.xmlio import Element, parse_xml
 
 
-class BittenProfileHandler(beep.Profile):
+class Master(beep.Listener):
+
+    TRIGGER_INTERVAL = 10
+
+    def __init__(self, env_path, ip, port):
+        beep.Listener.__init__(self, ip, port)
+        self.profiles[BittenProfileHandler.URI] = BittenProfileHandler()
+
+        self.env = Environment(env_path)
+        self.youngest_rev = None
+        self.slaves = {}
+        self.schedule(self.TRIGGER_INTERVAL, self.check_trigger)
+
+    def check_trigger(self, master, when):
+        logging.debug('Checking for build triggers... (%s)'
+                      % time.strftime('%x %X', time.localtime(when)))
+        repos = self.env.get_repository()
+        repos.sync()
+        if repos.youngest_rev != self.youngest_rev:
+            logging.debug('New changesets detected: %s'
+                          % repos.youngest_rev)
+            self.youngest_rev = repos.youngest_rev
+        repos.close()
+        self.schedule(self.TRIGGER_INTERVAL, self.check_trigger)
+
+
+class BittenProfileHandler(beep.ProfileHandler):
     URI = 'http://bitten.cmlenz.net/beep-profile/'
 
-    def __init__(self):
-        beep.Profile.__init__(self)
+    def handle_connect(self):
+        self.master = self.session.listener
+        assert self.master
 
     def handle_msg(self, msgno, msg):
         assert msg.get_content_type() == beep.BEEP_XML
@@ -48,9 +77,11 @@
                     os_family = child.family
                     os_version = child.version
 
+            self.master.slaves[elem.name] = self
+
             rpy = beep.MIMEMessage(Element('ok'), beep.BEEP_XML)
             self.channel.send_rpy(msgno, rpy)
-            logging.info('Registered  slave %s (%s running %s %s [%s])',
+            logging.info('Registered slave %s (%s running %s %s [%s])',
                          elem.name, platform, os, os_version, os_family)
 
 
@@ -58,13 +89,15 @@
     options, args = getopt.getopt(sys.argv[1:], 'p:dvq',
                                   ['port=', 'debug', 'verbose', 'quiet'])
     if len(args) < 1:
-        print>>sys.stderr, 'usage: %s [options] ENV_PATH' % os.path.basename(sys.argv[0])
+        print>>sys.stderr, 'usage: %s [options] ENV_PATH' \
+                           % os.path.basename(sys.argv[0])
         print>>sys.stderr
         print>>sys.stderr, 'Valid options:'
         print>>sys.stderr, '  -p [--port] arg\tport number to use (default: 7633)'
         print>>sys.stderr, '  -q [--quiet]\tprint as little as possible'
         print>>sys.stderr, '  -v [--verbose]\tprint as much as possible'
         sys.exit(2)
+    env_path = args[0]
 
     port = 7633
     loglevel = logging.WARNING
@@ -74,6 +107,7 @@
                 port = int(arg)
             except ValueError:
                 print>>sys.stderr, 'Port must be an integer'
+                sys.exit(2)
         elif opt in ('-d', '--debug'):
             loglevel = logging.DEBUG
         elif opt in ('-v', '--verbose'):
@@ -82,9 +116,8 @@
             loglevel = logging.ERROR
     logging.getLogger().setLevel(loglevel)
 
-    listener = beep.Listener('localhost', port)
-    listener.profiles[BittenProfileHandler.URI] = BittenProfileHandler()
+    master = Master(env_path, 'localhost', port)
     try:
-        asyncore.loop()
+        master.run()
     except KeyboardInterrupt:
         pass
--- a/bitten/slave.py
+++ b/bitten/slave.py
@@ -18,7 +18,6 @@
 #
 # Author: Christopher Lenz <cmlenz@gmx.de>
 
-import asyncore
 import getopt
 import logging
 import os
@@ -46,7 +45,7 @@
                                             handle_ok=self.channel_started)
 
 
-class BittenProfileHandler(beep.Profile):
+class BittenProfileHandler(beep.ProfileHandler):
     """Handles communication on the Bitten profile from the client perspective.
     """
     URI = 'http://bitten.cmlenz.net/beep-profile/'
@@ -90,7 +89,11 @@
 
     host = args[0]
     if len(args) > 1:
-        port = int(args[1])
+        try:
+            port = int(args[1])
+        except ValueError:
+            print>>sys.stderr, 'Port must be an integer'
+            sys.exit(2)
     else:
         port = 7633
 
--- a/bitten/util/beep.py
+++ b/bitten/util/beep.py
@@ -28,6 +28,7 @@
 
 import asynchat
 import asyncore
+import bisect
 from email.Message import Message
 from email.Parser import Parser
 import logging
@@ -63,6 +64,7 @@
         self.set_reuse_addr()
         self.bind((ip, port))
         self.profiles = {}
+        self.eventqueue = []
         logging.debug('Listening to connections on %s:%d', ip, port)
         self.listen(5)
 
@@ -82,14 +84,44 @@
         """Start a new BEEP session."""
         conn, (ip, port) = self.accept()
         logging.debug('Connected to %s:%d', ip, port)
-        Session(conn, (ip, port), self.profiles, first_channelno=2)
+        Session(self, conn, (ip, port), self.profiles, first_channelno=2)
+
+    def run(self, timeout=15.0, granularity=5):
+        socket_map = asyncore.socket_map
+        last_event_check = 0
+        while socket_map:
+            now = int(time.time())
+            if (now - last_event_check) >= granularity:
+                last_event_check = now
+                fired = []
+                # yuck. i want my lisp.
+                i = j = 0
+                while i < len(self.eventqueue):
+                    when, what = self.eventqueue[i]
+                    if now >= when:
+                        fired.append(what)
+                        j = i + 1
+                    else:
+                        break
+                    i = i + 1
+                if fired:
+                    self.eventqueue = self.eventqueue[j:]
+                    for what in fired:
+                        what (self, now)
+            asyncore.poll(timeout)
+
+    def schedule (self, delta, callback):
+        now = int(time.time())
+        bisect.insort(self.eventqueue, (now + delta, callback))
 
 
 class Session(asynchat.async_chat):
     """A BEEP session between two peers."""
 
-    def __init__(self, conn, addr, profiles, first_channelno=1):
+    def __init__(self, listener=None, conn=None, addr=None, profiles=None,
+                 first_channelno=1):
         asynchat.async_chat.__init__(self, conn)
+        self.listener = listener
         self.addr = addr
         self.set_terminator('\r\n')
 
@@ -98,7 +130,7 @@
         self.header = self.payload = None
         
         self.channelno = cycle_through(first_channelno, 2147483647, step=2)
-        self.channels = {0: Channel(self, 0, ManagementProfile())}
+        self.channels = {0: Channel(self, 0, ManagementProfileHandler())}
 
     def handle_connect(self):
         pass
@@ -214,7 +246,7 @@
                          `Profile` instance that will handle the communication
                          for that profile
         """
-        Session.__init__(self, None, None, profiles or {})
+        Session.__init__(self, profiles=profiles or {})
         self.terminated = False
         self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
         logging.debug('Connecting to %s:%s', ip, port)
@@ -378,7 +410,7 @@
         del self.ansnos[msgno] # dealloc answer numbers for the message
 
 
-class Profile(object):
+class ProfileHandler(object):
     """Abstract base class for handlers of specific BEEP profiles.
     
     Concrete subclasses need to at least implement the `handle_msg()` method,
@@ -411,7 +443,7 @@
         pass
 
 
-class ManagementProfile(Profile):
+class ManagementProfileHandler(ProfileHandler):
     """Implementation of the BEEP management profile."""
 
     def handle_connect(self):
--- a/bitten/util/tests/beep.py
+++ b/bitten/util/tests/beep.py
@@ -15,7 +15,7 @@
                                    payload.strip()))
 
 
-class MockProfile(object):
+class MockProfileHandler(object):
 
     def __init__(self):
         self.handled_messages = []
@@ -36,7 +36,7 @@
 
     def setUp(self):
         self.session = MockSession()
-        self.profile = MockProfile()
+        self.profile = MockProfileHandler()
 
     def test_handle_single_msg_frame(self):
         """
Copyright (C) 2012-2017 Edgewall Software