Mercurial > genshi > mirror
diff examples/trac/trac/tests/notification.py @ 39:93b4dcbafd7b trunk
Copy Trac to main branch.
author | cmlenz |
---|---|
date | Mon, 03 Jul 2006 18:53:27 +0000 |
parents | |
children |
line wrap: on
line diff
new file mode 100644 --- /dev/null +++ b/examples/trac/trac/tests/notification.py @@ -0,0 +1,430 @@ +# -*- coding: utf-8 -*- +# +# Copyright (C) 2005-2006 Edgewall Software +# Copyright (C) 2005-2006 Emmanuel Blot <emmanuel.blot@free.fr> +# All rights reserved. +# +# This software is licensed as described in the file COPYING, which +# you should have received as part of this distribution. The terms +# are also available at http://trac.edgewall.com/license.html. +# +# This software consists of voluntary contributions made by many +# individuals. For exact contribution history, see the revision +# history and logs, available at http://projects.edgewall.com/trac/. +# +# Include a basic SMTP server, based on L. Smithson +# (lsmithson@open-networks.co.uk) extensible Python SMTP Server +# +# This file does not contain unit tests, but provides a set of +# classes to run SMTP notification tests +# + +import socket +import string +import threading +import re +import base64 +import quopri + + +LF = '\n' +CR = '\r' +email_re = re.compile(r"([\w\d_\.\-])+\@(([\w\d\-])+\.)+([\w\d]{2,4})+") +header_re = re.compile(r'^=\?(?P<charset>[\w\d\-]+)\?(?P<code>[qb])\?(?P<value>.*)\?=$') + + +class SMTPServerInterface: + """ + A base class for the imlementation of an application specific SMTP + Server. Applications should subclass this and overide these + methods, which by default do nothing. + + A method is defined for each RFC821 command. For each of these + methods, 'args' is the complete command received from the + client. The 'data' method is called after all of the client DATA + is received. + + If a method returns 'None', then a '250 OK'message is + automatically sent to the client. If a subclass returns a non-null + string then it is returned instead. + """ + + def helo(self, args): + return None + + def mail_from(self, args): + return None + + def rcpt_to(self, args): + return None + + def data(self, args): + return None + + def quit(self, args): + return None + + def reset(self, args): + return None + +# +# Some helper functions for manipulating from & to addresses etc. +# + +def strip_address(address): + """ + Strip the leading & trailing <> from an address. Handy for + getting FROM: addresses. + """ + start = string.index(address, '<') + 1 + end = string.index(address, '>') + return address[start:end] + +def split_to(address): + """ + Return 'address' as undressed (host, fulladdress) tuple. + Handy for use with TO: addresses. + """ + start = string.index(address, '<') + 1 + sep = string.index(address, '@') + 1 + end = string.index(address, '>') + return (address[sep:end], address[start:end],) + + +# +# This drives the state for a single RFC821 message. +# +class SMTPServerEngine: + """ + Server engine that calls methods on the SMTPServerInterface object + passed at construction time. It is constructed with a bound socket + connection to a client. The 'chug' method drives the state, + returning when the client RFC821 transaction is complete. + """ + + ST_INIT = 0 + ST_HELO = 1 + ST_MAIL = 2 + ST_RCPT = 3 + ST_DATA = 4 + ST_QUIT = 5 + + def __init__(self, socket, impl): + self.impl = impl; + self.socket = socket; + self.state = SMTPServerEngine.ST_INIT + + def chug(self): + """ + Chug the engine, till QUIT is received from the client. As + each RFC821 message is received, calls are made on the + SMTPServerInterface methods on the object passed at + construction time. + """ + self.socket.send("220 Welcome to Trac notification test server\r\n") + while 1: + data = '' + completeLine = 0 + # Make sure an entire line is received before handing off + # to the state engine. Thanks to John Hall for pointing + # this out. + while not completeLine: + try: + lump = self.socket.recv(1024); + if len(lump): + data += lump + if (len(data) >= 2) and data[-2:] == '\r\n': + completeLine = 1 + if self.state != SMTPServerEngine.ST_DATA: + rsp, keep = self.do_command(data) + else: + rsp = self.do_data(data) + if rsp == None: + continue + self.socket.send(rsp + "\r\n") + if keep == 0: + self.socket.close() + return + else: + # EOF + return + except socket.error: + return + return + + def do_command(self, data): + """Process a single SMTP Command""" + cmd = data[0:4] + cmd = string.upper(cmd) + keep = 1 + rv = None + if cmd == "HELO": + self.state = SMTPServerEngine.ST_HELO + rv = self.impl.helo(data[5:]) + elif cmd == "RSET": + rv = self.impl.reset(data[5:]) + self.data_accum = "" + self.state = SMTPServerEngine.ST_INIT + elif cmd == "NOOP": + pass + elif cmd == "QUIT": + rv = self.impl.quit(data[5:]) + keep = 0 + elif cmd == "MAIL": + if self.state != SMTPServerEngine.ST_HELO: + return ("503 Bad command sequence", 1) + self.state = SMTPServerEngine.ST_MAIL + rv = self.impl.mail_from(data[5:]) + elif cmd == "RCPT": + if (self.state != SMTPServerEngine.ST_MAIL) and \ + (self.state != SMTPServerEngine.ST_RCPT): + return ("503 Bad command sequence", 1) + self.state = SMTPServerEngine.ST_RCPT + rv = self.impl.rcpt_to(data[5:]) + elif cmd == "DATA": + if self.state != SMTPServerEngine.ST_RCPT: + return ("503 Bad command sequence", 1) + self.state = SMTPServerEngine.ST_DATA + self.data_accum = "" + return ("354 OK, Enter data, terminated with a \\r\\n.\\r\\n", 1) + else: + return ("505 Eh? WTF was that?", 1) + + if rv: + return (rv, keep) + else: + return("250 OK", keep) + + def do_data(self, data): + """ + Process SMTP Data. Accumulates client DATA until the + terminator is found. + """ + self.data_accum = self.data_accum + data + if len(self.data_accum) > 4 and self.data_accum[-5:] == '\r\n.\r\n': + self.data_accum = self.data_accum[:-5] + rv = self.impl.data(self.data_accum) + self.state = SMTPServerEngine.ST_HELO + if rv: + return rv + else: + return "250 OK - Data and terminator. found" + else: + return None + + +class SMTPServer: + """ + A single threaded SMTP Server connection manager. Listens for + incoming SMTP connections on a given port. For each connection, + the SMTPServerEngine is chugged, passing the given instance of + SMTPServerInterface. + """ + + def __init__(self, port): + self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self._socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + self._socket.bind(("", port)) + self._socket_service = None + + def serve(self, impl): + while ( self._resume ): + try: + nsd = self._socket.accept() + except socket.error: + return + self._socket_service = nsd[0] + engine = SMTPServerEngine(self._socket_service, impl) + engine.chug() + self._socket_service = None + + def start(self): + self._socket.listen(1) + self._resume = True + + def stop(self): + self._resume = False + + def terminate(self): + if self._socket_service: + # force the blocking socket to stop waiting for data + try: + #self._socket_service.shutdown(2) + self._socket_service.close() + except AttributeError: + # the SMTP server may also discard the socket + pass + self._socket_service = None + if self._socket: + #self._socket.shutdown(2) + self._socket.close() + self._socket = None + +class SMTPServerStore(SMTPServerInterface): + """ + Simple store for SMTP data + """ + + def __init__(self): + self.reset(None) + + def helo(self, args): + self.reset(None) + + def mail_from(self, args): + if args.lower().startswith('from:'): + self.sender = strip_address(args[5:].replace('\r\n','').strip()) + + def rcpt_to(self, args): + if args.lower().startswith('to:'): + rcpt = args[3:].replace('\r\n','').strip() + self.recipients.append(strip_address(rcpt)) + + def data(self, args): + self.message = args + + def quit(self, args): + pass + + def reset(self, args): + self.sender = None + self.recipients = [] + self.message = None + + +class SMTPThreadedServer(threading.Thread): + """ + Run a SMTP server for a single connection, within a dedicated thread + """ + + def __init__(self, port): + self.port = port + self.server = SMTPServer(port) + self.store = SMTPServerStore() + threading.Thread.__init__(self) + + def run(self): + # run from within the SMTP server thread + self.server.serve(impl = self.store) + + def start(self): + # run from the main thread + self.server.start() + threading.Thread.start(self) + + def stop(self): + # run from the main thread + self.server.stop() + # send a message to make the SMTP server quit gracefully + s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + try: + s.connect(('localhost', self.port)) + r = s.send("QUIT\r\n"); + except socket.error: + pass + s.close() + # wait for the SMTP server to complete (for up to 2 secs) + self.join(2.0) + # clean up the SMTP server (and force quit if needed) + self.server.terminate() + + def get_sender(self): + return self.store.sender + + def get_recipients(self): + return self.store.recipients + + def get_message(self): + return self.store.message + + def cleanup(self): + self.store.reset(None) + +def smtp_address(fulladdr): + mo = email_re.search(fulladdr) + if mo: + return mo.group(0) + if start >= 0: + return fulladdr[start+1:-1] + return fulladdr + +def decode_header(header): + """ Decode a MIME-encoded header value """ + mo = header_re.match(header) + # header does not seem to be MIME-encoded + if not mo: + return header + # attempts to decode the hedear, + # following the specified MIME endoding and charset + try: + encoding = mo.group('code').lower() + if encoding == 'q': + val = quopri.decodestring(mo.group('value'), header=True) + elif encoding == 'b': + val = base64.decodestring(mo.group('value')) + else: + raise AssertionError, "unsupported encoding: %s" % encoding + header = unicode(val, mo.group('charset')) + except Exception, e: + raise AssertionError, e + return header + +def parse_smtp_message(msg): + """ Split a SMTP message into its headers and body. + Returns a (headers, body) tuple + We do not use the email/MIME Python facilities here + as they may accept invalid RFC822 data, or data we do not + want to support nor generate """ + headers = {} + lh = None + body = None + # last line does not contain the final line ending + msg += '\r\n' + for line in msg.splitlines(True): + if body != None: + # append current line to the body + if line[-2] == CR: + body += line[0:-2] + body += '\n' + else: + raise AssertionError, "body misses CRLF: %s (0x%x)" \ + % (line, ord(line[-1])) + else: + if line[-2] != CR: + # RFC822 requires CRLF at end of field line + raise AssertionError, "header field misses CRLF: %s (0x%x)" \ + % (line, ord(line[-1])) + # discards CR + line = line[0:-2] + if line.strip() == '': + # end of headers, body starts + body = '' + else: + val = None + if line[0] in ' \t': + # continution of the previous line + if not lh: + # unexpected multiline + raise AssertionError, \ + "unexpected folded line: %s" % line + val = decode_header(line.strip(' \t')) + # appends the current line to the previous one + if not isinstance(headers[lh], tuple): + headers[lh] += val + else: + headers[lh][-1] = headers[lh][-1] + val + else: + # splits header name from value + (h,v) = line.split(':',1) + val = decode_header(v.strip()) + if headers.has_key(h): + if isinstance(headers[h], tuple): + headers[h] += val + else: + headers[h] = (headers[h], val) + else: + headers[h] = val + # stores the last header (for multilines headers) + lh = h + # returns the headers and the message body + return (headers, body)