# HG changeset patch # User cmlenz # Date 1118963052 0 # Node ID 06207499c58c6ced40839fd3f7b35ed35937467f # Parent 1733c601d2f8a58976a8b8b21f37d77225982155 * Use logging in the BEEP core as well as in the master and slave scripts. Closes #4. * The command line options "verbose", "debug" and "quiet" for master and slave now map to log levels. * The slave now handles a connection refused error more gracefully. diff --git a/bitten/master.py b/bitten/master.py --- a/bitten/master.py +++ b/bitten/master.py @@ -20,6 +20,7 @@ import asyncore import getopt +import logging import os.path import sys @@ -49,23 +50,37 @@ rpy = beep.MIMEMessage(Element('ok'), beep.BEEP_XML) self.channel.send_rpy(msgno, rpy) - print 'Connected to %s (%s running %s %s [%s])' \ - % (elem.name, platform, os, os_version, os_family) + logging.info('Registered slave %s (%s running %s %s [%s])', + elem.name, platform, os, os_version, os_family) if __name__ == '__main__': - options, args = getopt.getopt(sys.argv[1:], 'p:', ['port=']) + options, args = getopt.getopt(sys.argv[1:], 'p:dvq', + ['port=', 'debug', 'verbose', 'quiet']) if len(args) < 1: print>>sys.stderr, 'usage: %s [options] ENV_PATH' % os.path.basename(sys.argv[0]) print>>sys.stderr print>>sys.stderr, 'Valid options:' print>>sys.stderr, ' -p [--port] arg\tport number to use (default: 7633)' + print>>sys.stderr, ' -q [--quiet]\tprint as little as possible' + print>>sys.stderr, ' -v [--verbose]\tprint as much as possible' sys.exit(2) - if len(args) > 1: - port = int(args[1]) - else: - port = 7633 + port = 7633 + loglevel = logging.WARNING + for opt, arg in options: + if opt in ('-p', '--port'): + try: + port = int(arg) + except ValueError: + print>>sys.stderr, 'Port must be an integer' + elif opt in ('-d', '--debug'): + loglevel = logging.DEBUG + elif opt in ('-v', '--verbose'): + loglevel = logging.INFO + elif opt in ('-q', '--quiet'): + loglevel = logging.ERROR + logging.getLogger().setLevel(loglevel) listener = beep.Listener('localhost', port) listener.profiles[BittenProfileHandler.URI] = BittenProfileHandler() diff --git a/bitten/slave.py b/bitten/slave.py --- a/bitten/slave.py +++ b/bitten/slave.py @@ -20,6 +20,7 @@ import asyncore import getopt +import logging import os import sys import time @@ -38,9 +39,11 @@ self.channelno = channelno def greeting_received(self, profiles): - if BittenProfileHandler.URI in profiles: - self.channels[0].profile.send_start([BittenProfileHandler], - handle_ok=self.channel_started) + if BittenProfileHandler.URI not in profiles: + logging.error('Peer does not support Bitten profile') + raise beep.TerminateSession, 'Peer does not support Bitten profile' + self.channels[0].profile.send_start([BittenProfileHandler], + handle_ok=self.channel_started) class BittenProfileHandler(beep.Profile): @@ -51,21 +54,20 @@ def handle_connect(self): """Register with the build master.""" sysname, nodename, release, version, machine = os.uname() - print 'Registering with build master as %s' % nodename + logging.info('Registering with build master as %s', nodename) register = Element('register', name=nodename)[ Element('platform')[machine], Element('os', family=os.name, version=release)[sysname] ] def handle_reply(cmd, msgno, msg): - if cmd == 'RPY': - print 'Registration successful' - else: + if cmd == 'ERR': if msg.get_content_type() == beep.BEEP_XML: elem = parse_xml(msg.get_payload()) if elem.tagname == 'error': raise beep.TerminateSession, \ '%s (%s)' % (elem.gettext(), elem.code) raise beep.TerminateSession, 'Registration failed!' + logging.info('Registration successful') self.channel.send_msg(beep.MIMEMessage(register, beep.BEEP_XML), handle_reply) @@ -75,11 +77,13 @@ if __name__ == '__main__': - options, args = getopt.getopt(sys.argv[1:], 'vq', ['verbose', 'qiuet']) + options, args = getopt.getopt(sys.argv[1:], 'dvq', + ['debug', 'verbose', 'quiet']) if len(args) < 1: print>>sys.stderr, 'Usage: %s [options] host [port]' % sys.argv[0] print>>sys.stderr print>>sys.stderr, 'Valid options:' + print>>sys.stderr, ' -d [--debug]\tenable debugging output' print>>sys.stderr, ' -q [--quiet]\tprint as little as possible' print>>sys.stderr, ' -v [--verbose]\tprint as much as possible' sys.exit(2) @@ -90,16 +94,19 @@ else: port = 7633 - verbose = False - quiet = False + loglevel = logging.WARNING for opt, arg in options: - if opt in ('-v', '--verbose'): - verbose = True + if opt in ('-d', '--debug'): + loglevel = logging.DEBUG + elif opt in ('-v', '--verbose'): + loglevel = logging.INFO elif opt in ('-q', '--quiet'): - quiet = True + loglevel = logging.ERROR + logger = logging.getLogger() + logger.setLevel(loglevel) slave = Slave(host, port) try: slave.run() except beep.TerminateSession, e: - print 'Session terminated:', e + print>>sys.stderr, 'Session terminated:', e diff --git a/bitten/util/beep.py b/bitten/util/beep.py --- a/bitten/util/beep.py +++ b/bitten/util/beep.py @@ -30,6 +30,7 @@ import asyncore from email.Message import Message from email.Parser import Parser +import logging import socket import sys import time @@ -38,7 +39,6 @@ __all__ = ['Listener', 'Initiator', 'Profile'] - BEEP_XML = 'application/beep+xml' @@ -63,6 +63,7 @@ self.set_reuse_addr() self.bind((ip, port)) self.profiles = {} + logging.debug('Listening to connections on %s:%d', ip, port) self.listen(5) def writable(self): @@ -79,9 +80,9 @@ def handle_accept(self): """Start a new BEEP session.""" - conn, addr = self.accept() - print 'Connected to %s:%s' % addr - Session(conn, addr, self.profiles, first_channelno=2) + conn, (ip, port) = self.accept() + logging.debug('Connected to %s:%d', ip, port) + Session(conn, (ip, port), self.profiles, first_channelno=2) class Session(asynchat.async_chat): @@ -107,7 +108,8 @@ t, v = sys.exc_info()[:2] if t is TerminateSession: raise t, v - asynchat.async_chat.handle_error(self) + logging.exception(v) + self.close() def collect_incoming_data(self, data): """Called by async_chat when data is received. @@ -138,6 +140,8 @@ except ValueError: # TODO: Malformed frame... should we terminate the session # here? + logging.error('Malformed frame header: [%s]', + ' '.join(self.header)) self.header = None return if size == 0: @@ -164,6 +168,7 @@ This parses the frame header and decides which channel to pass it to. """ + logging.debug('Handling frame [%s]', ' '.join(header)) msgno = None channel = None try: @@ -179,6 +184,7 @@ self.channels[channel].handle_frame(cmd, msgno, more, seqno, ansno, payload) except (ValueError, TypeError, ProtocolError), e: + logging.exception(e) if channel == 0 and msgno is not None: self.channels[0].profile.send_error(msgno, 550, e) @@ -191,6 +197,7 @@ assert ansno is not None headerbits.append(ansno) header = ' '.join([str(hb) for hb in headerbits]) + logging.debug('Sending frame [%s]', header) self.push('\r\n'.join((header, payload, 'END', ''))) @@ -210,7 +217,11 @@ Session.__init__(self, None, None, profiles or {}) self.terminated = False self.create_socket(socket.AF_INET, socket.SOCK_STREAM) - self.connect((ip, port)) + logging.debug('Connecting to %s:%s', ip, port) + try: + self.connect((ip, port)) + except socket.error, e: + raise TerminateSession, 'Connection to %s:%d failed' % ip, port def handle_close(self): self.terminated = True @@ -229,9 +240,9 @@ while not self.terminated: try: asyncore.loop() - print 'Normal exit' self.terminated = True - except (KeyboardInterrupt, TerminateSession): + except (KeyboardInterrupt, TerminateSession), e: + logging.info('Terminating session') self._quit() time.sleep(.25) @@ -243,6 +254,8 @@ else: self._quit() def handle_error(code, message): + logging.error('Peer refused to close channel %d: %s (%d)', + channelno, message, code) raise ProtocolError, '%s (%d)' % (message, code) self.channels[0].profile.send_close(channelno, handle_ok=handle_ok, handle_error=handle_error) @@ -403,8 +416,10 @@ def handle_connect(self): """Send a greeting reply directly after connecting to the peer.""" + profile_uris = self.session.profiles.keys() + logging.debug('Send greeting with profiles %s', profile_uris) greeting = Element('greeting')[ - [Element('profile', uri=k) for k in self.session.profiles.keys()] + [Element('profile', uri=k) for k in profile_uris] ] self.channel.send_rpy(0, MIMEMessage(greeting, BEEP_XML)) @@ -415,8 +430,8 @@ if elem.tagname == 'start': for profile in elem['profile']: if profile.uri in self.session.profiles.keys(): - print 'Start channel %s for profile <%s>' % (elem.number, - profile.uri) + logging.debug('Start channel %s for profile <%s>', + elem.number, profile.uri) channel = Channel(self.session, int(elem.number), self.session.profiles[profile.uri]) self.session.channels[int(elem.number)] = channel @@ -464,44 +479,63 @@ assert message.get_content_type() == BEEP_XML elem = parse_xml(message.get_payload()) assert elem.tagname == 'error' - print 'Received error in response to message #%d: %s (%s)' \ - % (msgno, elem.gettext(), elem.code) + logging.warning('Received error in response to message #%d: %s (%s)', + msgno, elem.gettext(), elem.code) def send_close(self, channelno=0, code=200, handle_ok=None, handle_error=None): def handle_reply(cmd, msgno, message): if cmd == 'RPY': + logging.debug('Channel %d closed', channelno) del self.session.channels[channelno] if handle_ok is not None: handle_ok() if not self.session.channels: + logging.debug('Session terminated') self.session.close() elif cmd == 'ERR': elem = parse_xml(message.get_payload()) + text = elem.gettext() + code = int(elem.code) + logging.debug('Peer refused to start channel %d: %s (%d)', + channelno, text, code) if handle_error is not None: - handle_error(int(elem.code), elem.gettext()) + handle_error(code, text) + + logging.debug('Requesting closure of channel %d', channelno) xml = Element('close', number=channelno, code=code) return self.channel.send_msg(MIMEMessage(xml, BEEP_XML), handle_reply) def send_error(self, msgno, code, message=''): + logging.warning('%s (%d)', message, code) xml = Element('error', code=code)[message] self.channel.send_err(msgno, MIMEMessage(xml, BEEP_XML)) def send_start(self, profiles, handle_ok=None, handle_error=None): channelno = self.session.channelno.next() def handle_reply(cmd, msgno, message): - if handle_ok is not None and cmd == 'RPY': + if cmd == 'RPY': elem = parse_xml(message.get_payload()) for profile in [p for p in profiles if p.URI == elem.uri]: + logging.debug('Channel %d started with profile %s', + channelno, elem.uri) self.session.channels[channelno] = Channel(self.session, channelno, profile()) break - handle_ok(channelno, elem.uri) - if handle_error is not None and cmd == 'ERR': + if handle_ok is not None: + handle_ok(channelno, elem.uri) + elif cmd == 'ERR': elem = parse_xml(message.get_payload()) - handle_error(int(elem.code), elem.gettext()) + text = elem.gettext() + code = int(elem.code) + logging.debug('Peer refused to start channel %d: %s (%d)', + channelno, text, code) + if handle_error is not None: + handle_error(code, text) + logging.debug('Requesting start of channel %d with profiles %s', + channelno, [profile.URI for profile in profiles]) xml = Element('start', number=channelno)[ [Element('profile', uri=profile.URI) for profile in profiles] ]