view examples/trac/trac/ticket/tests/notification.py @ 39:93b4dcbafd7b trunk

Copy Trac to main branch.
author cmlenz
date Mon, 03 Jul 2006 18:53:27 +0000
parents
children
line wrap: on
line source
# -*- coding: utf-8 -*-
#
# Copyright (C) 2005-2006 Edgewall Software
# Copyright (C) 2005-2006 Emmanuel Blot <emmanuel.blot@free.fr>
# All rights reserved.
#
# This software is licensed as described in the file COPYING, which
# you should have received as part of this distribution. The terms
# are also available at http://trac.edgewall.com/license.html.
#
# This software consists of voluntary contributions made by many
# individuals. For exact contribution history, see the revision
# history and logs, available at http://projects.edgewall.com/trac/.
#
# Include a basic SMTP server, based on L. Smithson 
# (lsmithson@open-networks.co.uk) extensible Python SMTP Server
#

from trac.config import Configuration
from trac.core import TracError
from trac.ticket.model import Ticket
from trac.ticket.notification import TicketNotifyEmail
from trac.test import EnvironmentStub, Mock
from trac.tests.notification import SMTPThreadedServer, parse_smtp_message, \
                                    smtp_address

import unittest
import re
import base64
import quopri
import time


SMTP_TEST_PORT = 8225
MAXBODYWIDTH = 76
notifysuite = None


class NotificationTestCase(unittest.TestCase):
    """Notification test cases that send email over SMTP"""
    
    def setUp(self):
        self.env = EnvironmentStub(default_data=True)
        self.env.config.set('project',      'name', 'TracTest')
        self.env.config.set('notification', 'smtp_enabled', 'true')
        self.env.config.set('notification', 'always_notify_owner', 'true')
        self.env.config.set('notification', 'always_notify_reporter', 'true')
        self.env.config.set('notification', 'smtp_always_cc', 
                            'joe.user@example.net, joe.bar@example.net')
        self.env.config.set('notification', 'use_public_cc', 'true')
        self.env.config.set('notification', 'smtp_port', "%d" % SMTP_TEST_PORT)
        self.env.config.set('notification', 'smtp_server','localhost')
        self.req = Mock(href=self.env.href, abs_href=self.env.abs_href)

    def tearDown(self):
        """Signal the notification test suite that a test is over"""
        notifysuite.tear_down()

    def test_recipients(self):
        """Validate To/Cc recipients"""
        ticket = Ticket(self.env)
        ticket['reporter'] = '"Joe User" <joe.user@example.org>'
        ticket['owner']    = 'joe.user@example.net'
        ticket['cc']       = 'joe.user@example.com, joe.bar@example.org, ' \
                             'joe.bar@example.net'
        ticket['summary'] = 'Foo'
        ticket.insert()
        tn = TicketNotifyEmail(self.env)
        tn.notify(ticket, self.req, newticket=True)
        recipients = notifysuite.smtpd.get_recipients()
        # checks there is no duplicate in the recipient list
        rcpts = []
        for r in recipients:
            self.failIf(r in rcpts)
            rcpts.append(r)
        # checks that all cc recipients have been notified
        cc_list = self.env.config.get('notification', 'smtp_always_cc')
        cc_list = "%s, %s" % (cc_list, ticket['cc'])
        for r in cc_list.replace(',', ' ').split():
            self.failIf(r not in recipients)
        # checks that owner has been notified
        self.failIf(smtp_address(ticket['owner']) not in recipients)
        # checks that reporter has been notified
        self.failIf(smtp_address(ticket['reporter']) not in recipients)

    def test_no_recipient(self):
        """Validate no recipient case"""
        self.env.config.set('notification', 'smtp_always_cc', '')
        ticket = Ticket(self.env)
        ticket['reporter'] = 'anonymous'
        ticket['summary'] = 'Foo'
        ticket.insert()
        tn = TicketNotifyEmail(self.env)
        tn.notify(ticket, self.req, newticket=True)
        sender = notifysuite.smtpd.get_sender()
        recipients = notifysuite.smtpd.get_recipients()
        message = notifysuite.smtpd.get_message()
        # checks that no message has been sent
        self.failIf(recipients)
        self.failIf(sender)
        self.failIf(message)

    def test_cc_only(self):
        """Validate notifications w/o explicit recipients but Cc: 
           are actually sent. Non-regression test for #3101"""
        ticket = Ticket(self.env)
        ticket['summary'] = 'Foo'
        ticket.insert()
        tn = TicketNotifyEmail(self.env)
        tn.notify(ticket, self.req, newticket=True)
        recipients = notifysuite.smtpd.get_recipients()
        # checks that all cc recipients have been notified
        cc_list = self.env.config.get('notification', 'smtp_always_cc')
        for r in cc_list.replace(',', ' ').split():
            self.failIf(r not in recipients)

    def test_structure(self):
        """Validate basic SMTP message structure (headers, body)"""
        ticket = Ticket(self.env)
        ticket['reporter'] = '"Joe User" <joe.user@example.org>'
        ticket['owner']    = 'joe.user@example.net'
        ticket['cc']       = 'joe.user@example.com, joe.bar@example.org, ' \
                             'joe.bar@example.net'
        ticket['summary'] = 'This is a summary'
        ticket.insert()
        tn = TicketNotifyEmail(self.env)
        tn.notify(ticket, self.req, newticket=True)
        message = notifysuite.smtpd.get_message()
        (headers, body) = parse_smtp_message(message)
        # checks for header existence
        self.failIf(not headers)
        # checks for body existance
        self.failIf(not body)
        # checks for expected headers
        self.failIf('Date' not in headers)
        self.failIf('Subject' not in headers)
        self.failIf('Message-ID' not in headers)
        self.failIf('From' not in headers)
        self.failIf('Sender' not in headers)

    def test_date(self):
        """Validate date format 
           Date format hould be compliant with RFC822,
           we do not support 'military' format""" 
        date_str = r"^((?P<day>\w{3}),\s*)*(?P<dm>\d{2})\s+" \
                   r"(?P<month>\w{3})\s+(?P<year>200\d)\s+" \
                   r"(?P<hour>\d{2}):(?P<min>[0-5][0-9])" \
                   r"(:(?P<sec>[0-5][0-9]))*\s" \
                   r"((?P<tz>\w{2,3})|(?P<offset>[+\-]\d{4}))$"
        date_re = re.compile(date_str)
        # python time module does not detect incorrect time values
        days = ['Mon','Tue','Wed','Thu','Fri','Sat','Sun']
        months = ['Jan','Feb','Mar','Apr','May','Jun', \
                  'Jul','Aug','Sep','Oct','Nov','Dec']
        tz = ['UT','GMT','EST','EDT','CST','CDT','MST','MDT''PST','PDT']
        ticket = Ticket(self.env)
        ticket['reporter'] = '"Joe User" <joe.user@example.org>'
        ticket['summary'] = 'This is a summary'
        ticket.insert()
        tn = TicketNotifyEmail(self.env)
        tn.notify(ticket, self.req, newticket=True)
        message = notifysuite.smtpd.get_message()
        (headers, body) = parse_smtp_message(message)
        self.failIf('Date' not in headers)
        mo = date_re.match(headers['Date'])
        self.failIf(not mo)
        if mo.group('day'):
            self.failIf(mo.group('day') not in days)
        self.failIf(int(mo.group('dm')) not in range(1,32))
        self.failIf(mo.group('month') not in months)
        self.failIf(int(mo.group('hour')) not in range(0,24))
        if mo.group('tz'):
            self.failIf(mo.group('tz') not in tz)

    def test_bcc_privacy(self):
        """Validate visibility of recipients"""
        def run_bcc_feature(public):
            # CC list should be private
            self.env.config.set('notification', 'use_public_cc',
                                public and 'true' or 'false')
            self.env.config.set('notification', 'smtp_always_bcc', 
                                'joe.foobar@example.net')
            ticket = Ticket(self.env)
            ticket['reporter'] = '"Joe User" <joe.user@example.org>'
            ticket['summary'] = 'This is a summary'
            ticket.insert()
            tn = TicketNotifyEmail(self.env)
            tn.notify(ticket, self.req, newticket=True)
            message = notifysuite.smtpd.get_message()
            (headers, body) = parse_smtp_message(message)
            if public:
                # Msg should have a To list
                self.failIf('To' not in headers)
                # Extract the list of 'To' recipients from the message
                to = [rcpt.strip() for rcpt in headers['To'].split(',')]
            else:
                # Msg should not have a To list
                self.failIf('To' in headers)
                # Extract the list of 'To' recipients from the message
                to = []            
            # Extract the list of 'Cc' recipients from the message
            cc = [rcpt.strip() for rcpt in headers['Cc'].split(',')]
            # Extract the list of the actual SMTP recipients
            rcptlist = notifysuite.smtpd.get_recipients()
            # Build the list of the expected 'Cc' recipients 
            ccrcpt = self.env.config.get('notification', 'smtp_always_cc')
            cclist = [ccr.strip() for ccr in ccrcpt.split(',')]
            for rcpt in cclist:
                # Each recipient of the 'Cc' list should appear in the 'Cc' header
                self.failIf(rcpt not in cc)
                # Check the message has actually been sent to the recipients
                self.failIf(rcpt not in rcptlist)
            # Build the list of the expected 'Bcc' recipients 
            bccrcpt = self.env.config.get('notification', 'smtp_always_bcc')
            bcclist = [bccr.strip() for bccr in bccrcpt.split(',')]
            for rcpt in bcclist:
                # Check none of the 'Bcc' recipients appears in the 'To' header
                self.failIf(rcpt in to)
                # Check the message has actually been sent to the recipients
                self.failIf(rcpt not in rcptlist)
        run_bcc_feature(True)
        run_bcc_feature(False)

    def test_short_login(self):
        """Validate email addresses without a FQDN"""
        def _test_short_login(enabled):
            ticket = Ticket(self.env)
            ticket['reporter'] = 'joeuser'
            ticket['summary'] = 'This is a summary'
            ticket.insert()
            # Be sure that at least one email address is valid, so that we 
            # send a notification even if other addresses are not valid
            self.env.config.set('notification', 'smtp_always_cc', \
                                'joe.bar@example.net')
            if enabled:
                self.env.config.set('notification', 'use_short_addr', 'true')
            tn = TicketNotifyEmail(self.env)
            tn.notify(ticket, self.req, newticket=True)
            message = notifysuite.smtpd.get_message()
            (headers, body) = parse_smtp_message(message)
            # Msg should not have a 'To' header
            if not enabled:
                self.failIf('To' in headers)
            else:
                tolist = [addr.strip() for addr in headers['To'].split(',')]
            # Msg should have a 'Cc' field
            self.failIf('Cc' not in headers)
            cclist = [addr.strip() for addr in headers['Cc'].split(',')]
            if enabled:
                # Msg should be delivered to the reporter
                self.failIf(ticket['reporter'] not in tolist)
            else:
                # Msg should not be delivered to joeuser
                self.failIf(ticket['reporter'] in cclist)
            # Msg should still be delivered to the always_cc list
            self.failIf(self.env.config.get('notification', 'smtp_always_cc') \
                        not in cclist)
        # Validate with and without the short addr option enabled
        for enable in [False, True]:
            _test_short_login(enable)

    def test_default_domain(self):
        """Validate support for default domain name"""
        def _test_default_domain(enabled):
            self.env.config.set('notification', 'always_notify_owner', 'false')
            self.env.config.set('notification', 'always_notify_reporter', 'false')
            self.env.config.set('notification', 'smtp_always_cc', '')
            ticket = Ticket(self.env)
            ticket['cc'] = 'joenodom, joewithdom@example.com'
            ticket['summary'] = 'This is a summary'
            ticket.insert()
            # Be sure that at least one email address is valid, so that we 
            # send a notification even if other addresses are not valid
            self.env.config.set('notification', 'smtp_always_cc', \
                                'joe.bar@example.net')
            if enabled:
                self.env.config.set('notification', 'smtp_default_domain', 'example.org')
            tn = TicketNotifyEmail(self.env)
            tn.notify(ticket, self.req, newticket=True)
            message = notifysuite.smtpd.get_message()
            (headers, body) = parse_smtp_message(message)
            # Msg should always have a 'Cc' field
            self.failIf('Cc' not in headers)
            cclist = [addr.strip() for addr in headers['Cc'].split(',')]
            self.failIf('joewithdom@example.com' not in cclist)
            self.failIf('joe.bar@example.net' not in cclist)
            if not enabled:
                self.failIf(len(cclist) != 2)
                self.failIf('joenodom' in cclist)
            else:
                self.failIf(len(cclist) != 3)
                self.failIf('joenodom@example.org' not in cclist)

        # Validate with and without a default domain
        for enable in [False, True]:
            _test_default_domain(enable)

    def test_email_map(self):
        """Validate login-to-email map"""
        self.env.config.set('notification', 'always_notify_owner', 'false')
        self.env.config.set('notification', 'always_notify_reporter', 'true')
        self.env.config.set('notification', 'smtp_always_cc', 'joe@example.com')
        self.env.known_users = [('joeuser', 'Joe User', 'user-joe@example.com')]
        ticket = Ticket(self.env)
        ticket['reporter'] = 'joeuser'
        ticket['summary'] = 'This is a summary'
        ticket.insert()
        tn = TicketNotifyEmail(self.env)
        tn.notify(ticket, self.req, newticket=True)
        message = notifysuite.smtpd.get_message()
        (headers, body) = parse_smtp_message(message)
        # Msg should always have a 'To' field
        self.failIf('To' not in headers)
        tolist = [addr.strip() for addr in headers['To'].split(',')]
        # 'To' list should have been resolved to the real email address
        self.failIf('user-joe@example.com' not in tolist)
        self.failIf('joeuser' in tolist)

    def test_multiline_header(self):
        """Validate encoded headers split into multiple lines"""
        self.env.config.set('notification','mime_encoding', 'qp')
        ticket = Ticket(self.env)
        ticket['reporter'] = 'joe.user@example.org'
        # Forces non-ascii characters
        ticket['summary'] = u'A_very %s súmmäry' % u' '.join(['long'] * 20)
        ticket.insert()
        tn = TicketNotifyEmail(self.env)
        tn.notify(ticket, self.req, newticket=True)
        message = notifysuite.smtpd.get_message()
        (headers, body) = parse_smtp_message(message)
        # Discards the project name & ticket number
        subject = headers['Subject']
        summary = subject[subject.find(':')+2:]
        self.failIf(ticket['summary'] != summary)

    def test_mimebody_b64(self):
        """Validate MIME Base64/utf-8 encoding"""
        self.env.config.set('notification','mime_encoding', 'base64')
        ticket = Ticket(self.env)
        ticket['reporter'] = 'joe.user@example.org'
        ticket['summary'] = u'This is a long enough summary to cause Trac to' \
                            u'generate a multi-line súmmäry'
        ticket.insert()
        self._validate_mimebody((base64, 'base64', 'utf-8'), \
                                ticket, True)

    def test_mimebody_qp(self):
        """Validate MIME QP/utf-8 encoding"""
        self.env.config.set('notification','mime_encoding', 'qp')
        ticket = Ticket(self.env)
        ticket['reporter'] = 'joe.user@example.org'
        ticket['summary'] = u'This is a long enough summary to cause Trac to' \
                            u'generate a multi-line súmmäry'
        ticket.insert()
        self._validate_mimebody((quopri, 'quoted-printable', 'utf-8'), \
                                ticket, True)

    def test_mimebody_none(self):
        """Validate MIME None/ascii encoding"""
        self.env.config.set('notification','mime_encoding', 'none')
        ticket = Ticket(self.env)
        ticket['reporter'] = 'joe.user@example.org'
        ticket['summary'] = u'This is a summary'
        ticket.insert()
        self._validate_mimebody((None, '7bit', 'ascii'), \
                                ticket, True)

    def test_updater(self):
        """Validate no-self-notification option"""
        def _test_updater(disable):
            if disable:
                self.env.config.set('notification','always_notify_updater', 'false')
            ticket = Ticket(self.env)
            ticket['reporter'] = 'joe.user@example.org'
            ticket['summary'] = u'This is a súmmäry'
            ticket['cc'] = 'joe.bar@example.com'
            ticket.insert()
            ticket['component'] = 'dummy'
            now = time.time()
            ticket.save_changes('joe.bar2@example.com', 'This is a change', when=now)
            tn = TicketNotifyEmail(self.env)
            tn.notify(ticket, self.req, newticket=False, modtime=now)
            message = notifysuite.smtpd.get_message()
            (headers, body) = parse_smtp_message(message)
            # checks for header existence
            self.failIf(not headers)
            # checks for updater in the 'To' recipient list
            self.failIf('To' not in headers)
            tolist = [addr.strip() for addr in headers['To'].split(',')]
            if disable:
                self.failIf('joe.bar2@example.com' in tolist)
            else:
                self.failIf('joe.bar2@example.com' not in tolist)

        # Validate with and without a default domain
        for disable in [False, True]:
            _test_updater(disable)

    def _validate_mimebody(self, mime, ticket, newtk):
        """Validate the body of a ticket notification message"""
        (mime_decoder, mime_name, mime_charset) = mime
        tn = TicketNotifyEmail(self.env)
        tn.notify(ticket, self.req, newticket=newtk)
        message = notifysuite.smtpd.get_message()
        (headers, body) = parse_smtp_message(message)
        self.failIf('MIME-Version' not in headers)
        self.failIf('Content-Type' not in headers)
        self.failIf('Content-Transfer-Encoding' not in headers)
        self.failIf(not re.compile(r"1.\d").match(headers['MIME-Version']))
        type_re = re.compile(r'^text/plain;\scharset="([\w\-\d]+)"$')
        charset = type_re.match(headers['Content-Type'])
        self.failIf(not charset)
        charset = charset.group(1)
        self.assertEqual(charset, mime_charset)
        self.assertEqual(headers['Content-Transfer-Encoding'], mime_name)
        # checks the width of each body line
        for line in body.splitlines():
            self.failIf(len(line) > MAXBODYWIDTH)
        # attempts to decode the body, following the specified MIME endoding 
        # and charset
        try:
            if mime_decoder:
                body = mime_decoder.decodestring(body)
            body = unicode(body, charset)
        except Exception, e:
            raise AssertionError, e
        # now processes each line of the body
        bodylines = body.splitlines()
        # body starts with a summary line, prefixed with the ticket number
        # #<n>: summary
        (tknum, summary) = bodylines[0].split(' ', 1)
        self.assertEqual(tknum[0], '#')
        try:
            tkid = int(tknum[1:-1])
            self.assertEqual(tkid, 1)
        except ValueError:
            raise AssertionError, "invalid ticket number"
        self.assertEqual(tknum[-1], ':')
        self.assertEqual(summary, ticket['summary'])
        # next step: checks the banner appears right after the summary
        banner_delim_re = re.compile(r'^\-+\+\-+$')
        self.failIf(not banner_delim_re.match(bodylines[1]))
        banner = True
        footer = None
        props = {}
        for line in bodylines[2:]:
            # detect end of banner
            if banner_delim_re.match(line):
                banner = False
                continue
            if banner:
                # parse banner and fill in a property dict
                properties = line.split('|')
                self.assertEqual(len(properties), 2)
                for prop in properties:
                    if prop.strip() == '':
                        continue
                    (k, v) = prop.split(':')
                    props[k.strip().lower()] = v.strip()
            # detect footer marker (weak detection)
            if not footer:
                if line.strip() == '--':
                    footer = 0
                    continue
            # check footer
            if footer != None:
                footer += 1
                # invalid footer detection
                self.failIf(footer > 3)
                # check ticket link
                if line[:11] == 'Ticket URL:':
                    self.assertEqual(line[12:].strip(), \
                                     "<%s>" % ticket['link'].strip())
                # note project title / URL are not validated yet

        # ticket properties which are not expected in the banner
        xlist = ['summary', 'description', 'link', 'comment']
        # check banner content (field exists, msg value matches ticket value)
        for p in [prop for prop in ticket.values.keys() if prop not in xlist]:
            self.failIf(not props.has_key(p))
            self.failIf(props[p] != ticket[p])


class NotificationTestSuite(unittest.TestSuite):
    """Thin test suite wrapper to start and stop the SMTP test server"""

    def __init__(self):
        """Start the local SMTP test server"""
        unittest.TestSuite.__init__(self)
        self.smtpd = SMTPThreadedServer(SMTP_TEST_PORT)
        self.smtpd.start()
        self.addTest(unittest.makeSuite(NotificationTestCase, 'test'))
        self.remaining = self.countTestCases()

    def tear_down(self):
        """Reset the local SMTP test server"""
        self.smtpd.cleanup()
        self.remaining = self.remaining-1
        if self.remaining > 0:
            return
        # stop the SMTP test server when all tests have been completed
        self.smtpd.stop()

def suite():
    global notifysuite
    if not notifysuite:
        notifysuite = NotificationTestSuite()
    return notifysuite

if __name__ == '__main__':
    unittest.TextTestRunner(verbosity=2).run(suite())
Copyright (C) 2012-2017 Edgewall Software