view examples/trac/trac/attachment.py @ 39:93b4dcbafd7b trunk

Copy Trac to main branch.
author cmlenz
date Mon, 03 Jul 2006 18:53:27 +0000
parents
children 2cb5b54d87ff
line wrap: on
line source
# -*- coding: utf-8 -*-
#
# Copyright (C) 2003-2005 Edgewall Software
# Copyright (C) 2003-2005 Jonas Borgström <jonas@edgewall.com>
# Copyright (C) 2005 Christopher Lenz <cmlenz@gmx.de>
# All rights reserved.
#
# This software is licensed as described in the file COPYING, which
# you should have received as part of this distribution. The terms
# are also available at http://trac.edgewall.com/license.html.
#
# This software consists of voluntary contributions made by many
# individuals. For the exact contribution history, see the revision
# history and logs, available at http://projects.edgewall.com/trac/.
#
# Author: Jonas Borgström <jonas@edgewall.com>
#         Christopher Lenz <cmlenz@gmx.de>

import os
import re
import shutil
import time
import unicodedata

from trac import perm, util
from trac.config import BoolOption, IntOption
from trac.core import *
from trac.env import IEnvironmentSetupParticipant
from trac.mimeview import *
from trac.util import get_reporter_id, create_unique_file
from trac.util.datefmt import format_datetime, pretty_timedelta
from trac.util.markup import Markup, html
from trac.util.text import unicode_quote, unicode_unquote, pretty_size
from trac.web import HTTPBadRequest, IRequestHandler
from trac.web.chrome import add_link, add_stylesheet, INavigationContributor
from trac.wiki.api import IWikiSyntaxProvider
from trac.wiki.formatter import wiki_to_html, wiki_to_oneliner


class InvalidAttachment(TracError):
    """Exception raised when attachment validation fails."""


class IAttachmentChangeListener(Interface):
    """Extension point interface for components that require notification when
    attachments are created or deleted."""
    def attachment_added(attachment):
        """Called when an attachment is added."""

    def attachment_deleted(attachment):
        """Called when an attachment is deleted."""


class IAttachmentManipulator(Interface):
    """Extension point interface for components that need to manipulate
    attachments.
    
    Unlike change listeners, a manipulator can reject changes being committed
    to the database."""
    def prepare_attachment(req, attachment, fields):
        """Not currently called, but should be provided for future
        compatibility."""

    def validate_attachment(req, attachment):
        """Validate an attachment after upload but before being stored in Trac
        environment.
        
        Must return a list of `(field, message)` tuples, one for each problem
        detected. `field` can be any of `description`, `username`, `filename`,
        `content`, or `None` to indicate an overall problem with the
        attachment. Therefore, a return value of `[]` means everything is
        OK."""


class Attachment(object):

    def __init__(self, env, parent_type, parent_id, filename=None, db=None):
        self.env = env
        self.parent_type = parent_type
        self.parent_id = unicode(parent_id)
        if filename:
            self._fetch(filename, db)
        else:
            self.filename = None
            self.description = None
            self.size = None
            self.time = None
            self.author = None
            self.ipnr = None

    def _fetch(self, filename, db=None):
        if not db:
            db = self.env.get_db_cnx()
        cursor = db.cursor()
        cursor.execute("SELECT filename,description,size,time,author,ipnr "
                       "FROM attachment WHERE type=%s AND id=%s "
                       "AND filename=%s ORDER BY time",
                       (self.parent_type, unicode(self.parent_id), filename))
        row = cursor.fetchone()
        cursor.close()
        if not row:
            self.filename = filename
            raise TracError('Attachment %s does not exist.' % (self.title),
                            'Invalid Attachment')
        self.filename = row[0]
        self.description = row[1]
        self.size = row[2] and int(row[2]) or 0
        self.time = row[3] and int(row[3]) or 0
        self.author = row[4]
        self.ipnr = row[5]

    def _get_path(self):
        path = os.path.join(self.env.path, 'attachments', self.parent_type,
                            unicode_quote(self.parent_id))
        if self.filename:
            path = os.path.join(path, unicode_quote(self.filename))
        return os.path.normpath(path)
    path = property(_get_path)

    def href(self, req, *args, **dict):
        return req.href.attachment(self.parent_type, self.parent_id,
                                   self.filename, *args, **dict)

    def parent_href(self, req):
        return req.href(self.parent_type, self.parent_id)

    def _get_title(self):
        return '%s%s: %s' % (self.parent_type == 'ticket' and '#' or '',
                             self.parent_id, self.filename)
    title = property(_get_title)

    def delete(self, db=None):
        assert self.filename, 'Cannot delete non-existent attachment'
        if not db:
            db = self.env.get_db_cnx()
            handle_ta = True
        else:
            handle_ta = False

        cursor = db.cursor()
        cursor.execute("DELETE FROM attachment WHERE type=%s AND id=%s "
                       "AND filename=%s", (self.parent_type, self.parent_id,
                       self.filename))
        if os.path.isfile(self.path):
            try:
                os.unlink(self.path)
            except OSError:
                self.env.log.error('Failed to delete attachment file %s',
                                   self.path, exc_info=True)
                if handle_ta:
                    db.rollback()
                raise TracError, 'Could not delete attachment'

        self.env.log.info('Attachment removed: %s' % self.title)
        if handle_ta:
            db.commit()

        for listener in AttachmentModule(self.env).change_listeners:
            listener.attachment_deleted(self)


    def insert(self, filename, fileobj, size, t=None, db=None):
        if not db:
            db = self.env.get_db_cnx()
            handle_ta = True
        else:
            handle_ta = False

        self.size = size
        self.time = t or time.time()

        # Make sure the path to the attachment is inside the environment
        # attachments directory
        attachments_dir = os.path.join(os.path.normpath(self.env.path),
                                       'attachments')
        commonprefix = os.path.commonprefix([attachments_dir, self.path])
        assert commonprefix == attachments_dir

        if not os.access(self.path, os.F_OK):
            os.makedirs(self.path)
        filename = unicode_quote(filename)
        path, targetfile = create_unique_file(os.path.join(self.path,
                                                           filename))
        try:
            # Note: `path` is an unicode string because `self.path` was one.
            # As it contains only quoted chars and numbers, we can use `ascii`
            basename = os.path.basename(path).encode('ascii')
            filename = unicode_unquote(basename)

            cursor = db.cursor()
            cursor.execute("INSERT INTO attachment "
                           "VALUES (%s,%s,%s,%s,%s,%s,%s,%s)",
                           (self.parent_type, self.parent_id, filename,
                            self.size, self.time, self.description, self.author,
                            self.ipnr))
            shutil.copyfileobj(fileobj, targetfile)
            self.filename = filename

            self.env.log.info('New attachment: %s by %s', self.title,
                              self.author)

            if handle_ta:
                db.commit()

            for listener in AttachmentModule(self.env).change_listeners:
                listener.attachment_added(self)

        finally:
            targetfile.close()

    def select(cls, env, parent_type, parent_id, db=None):
        if not db:
            db = env.get_db_cnx()
        cursor = db.cursor()
        cursor.execute("SELECT filename,description,size,time,author,ipnr "
                       "FROM attachment WHERE type=%s AND id=%s ORDER BY time",
                       (parent_type, unicode(parent_id)))
        for filename,description,size,time,author,ipnr in cursor:
            attachment = Attachment(env, parent_type, parent_id)
            attachment.filename = filename
            attachment.description = description
            attachment.size = size
            attachment.time = time
            attachment.author = author
            attachment.ipnr = ipnr
            yield attachment

    select = classmethod(select)

    def open(self):
        self.env.log.debug('Trying to open attachment at %s', self.path)
        try:
            fd = open(self.path, 'rb')
        except IOError:
            raise TracError('Attachment %s not found' % self.filename)
        return fd


# Templating utilities

def attachments_to_hdf(env, req, db, parent_type, parent_id):
    return [attachment_to_hdf(env, req, db, attachment) for attachment
            in Attachment.select(env, parent_type, parent_id, db)]
    
def attachment_to_hdf(env, req, db, attachment):
    if not db:
        db = env.get_db_cnx()
    hdf = {
        'filename': attachment.filename,
        'description': wiki_to_oneliner(attachment.description, env, db),
        'author': attachment.author,
        'ipnr': attachment.ipnr,
        'size': pretty_size(attachment.size),
        'time': format_datetime(attachment.time),
        'age': pretty_timedelta(attachment.time),
        'href': attachment.href(req)
    }
    return hdf


class AttachmentModule(Component):

    implements(IEnvironmentSetupParticipant, IRequestHandler,
               INavigationContributor, IWikiSyntaxProvider)

    change_listeners = ExtensionPoint(IAttachmentChangeListener)
    manipulators = ExtensionPoint(IAttachmentManipulator)

    CHUNK_SIZE = 4096

    max_size = IntOption('attachment', 'max_size', 262144,
        """Maximum allowed file size for ticket and wiki attachments.""")

    render_unsafe_content = BoolOption('attachment', 'render_unsafe_content',
                                       'false',
        """Whether non-binary attachments should be rendered in the browser, or
        only made downloadable.

        Pretty much any text file may be interpreted as HTML by the browser,
        which allows a malicious user to attach a file containing cross-site
        scripting attacks.

        For public sites where anonymous users can create attachments, it is
        recommended to leave this option disabled (which is the default).""")

    # IEnvironmentSetupParticipant methods

    def environment_created(self):
        """Create the attachments directory."""
        if self.env.path:
            os.mkdir(os.path.join(self.env.path, 'attachments'))

    def environment_needs_upgrade(self, db):
        return False

    def upgrade_environment(self, db):
        pass

    # INavigationContributor methods

    def get_active_navigation_item(self, req):
        return req.args.get('type')

    def get_navigation_items(self, req):
        return []

    # IRequestHandler methods

    def match_request(self, req):
        match = re.match(r'^/attachment/(ticket|wiki)(?:[/:](.*))?$',
                         req.path_info)
        if match:
            req.args['type'] = match.group(1)
            req.args['path'] = match.group(2).replace(':', '/')
            return True

    def process_request(self, req):
        parent_type = req.args.get('type')
        path = req.args.get('path')
        if not parent_type or not path:
            raise HTTPBadRequest('Bad request')
        if not parent_type in ['ticket', 'wiki']:
            raise HTTPBadRequest('Unknown attachment type')

        action = req.args.get('action', 'view')
        if action == 'new':
            attachment = Attachment(self.env, parent_type, path)
        else:
            segments = path.split('/')
            parent_id = '/'.join(segments[:-1])
            last_segment = segments[-1]
            if len(segments) == 1:
                self._render_list(req, parent_type, last_segment)
                return 'attachment.cs', None
            if not last_segment:
                raise HTTPBadRequest('Bad request')
            attachment = Attachment(self.env, parent_type, parent_id,
                                    last_segment)
        parent_link, parent_text = self._parent_to_hdf(
            req, attachment.parent_type, attachment.parent_id)
        if req.method == 'POST':
            if action == 'new':
                self._do_save(req, attachment)
            elif action == 'delete':
                self._do_delete(req, attachment)
        elif action == 'delete':
            self._render_confirm(req, attachment)
        elif action == 'new':
            self._render_form(req, attachment)
        else:
            add_link(req, 'up', parent_link, parent_text)
            self._render_view(req, attachment)

        add_stylesheet(req, 'common/css/code.css')
        return 'attachment.cs', None

    def _parent_to_hdf(self, req, parent_type, parent_id):
        # Populate attachment.parent:
        parent_link = req.href(parent_type, parent_id)
        if parent_type == 'ticket':
            parent_text = 'Ticket #' + parent_id
        else: # 'wiki'
            parent_text = parent_id
        req.hdf['attachment.parent'] = {
            'type': parent_type, 'id': parent_id,
            'name': parent_text, 'href': parent_link
        }
        return parent_link, parent_text

    # IWikiSyntaxProvider methods
    
    def get_wiki_syntax(self):
        return []

    def get_link_resolvers(self):
        yield ('attachment', self._format_link)

    # Public methods

    def get_history(self, start, stop, type):
        """Return an iterable of tuples describing changes to attachments on
        a particular object type.

        The tuples are in the form (change, type, id, filename, time,
        description, author). `change` can currently only be `created`."""
        # Traverse attachment directory
        db = self.env.get_db_cnx()
        cursor = db.cursor()
        cursor.execute("SELECT type, id, filename, time, description, author "
                       "  FROM attachment "
                       "  WHERE time > %s AND time < %s "
                       "        AND type = %s", (start, stop, type))
        for type, id, filename, time, description, author in cursor:
            yield ('created', type, id, filename, time, description, author)

    def get_timeline_events(self, req, db, type, format, start, stop, display):
        """Return an iterable of events suitable for ITimelineEventProvider.

        `display` is a callback for formatting the attachment's parent
        """
        for change, type, id, filename, time, descr, author in \
                self.get_history(start, stop, type):
            title = html.EM(os.path.basename(filename)) + \
                    ' attached to ' + display(id)
            if format == 'rss':
                descr = wiki_to_html(descr or '--', self.env, req, db,
                                     absurls=True)
                href = req.abs_href
            else:
                descr = wiki_to_oneliner(descr, self.env, db, shorten=True)
                title += Markup(' by %s', author)
                href = req.href
            yield('attachment', href.attachment(type, id, filename), title,
                  time, author, descr)

    # Internal methods

    def _do_save(self, req, attachment):
        perm_map = {'ticket': 'TICKET_APPEND', 'wiki': 'WIKI_MODIFY'}
        req.perm.assert_permission(perm_map[attachment.parent_type])

        if req.args.has_key('cancel'):
            req.redirect(attachment.parent_href(req))

        upload = req.args['attachment']
        if not hasattr(upload, 'filename') or not upload.filename:
            raise TracError('No file uploaded')
        if hasattr(upload.file, 'fileno'):
            size = os.fstat(upload.file.fileno())[6]
        else:
            size = upload.file.len
        if size == 0:
            raise TracError("Can't upload empty file")

        # Maximum attachment size (in bytes)
        max_size = self.max_size
        if max_size >= 0 and size > max_size:
            raise TracError('Maximum attachment size: %d bytes' % max_size,
                            'Upload failed')

        # We try to normalize the filename to unicode NFC if we can.
        # Files uploaded from OS X might be in NFD.
        filename = unicodedata.normalize('NFC', unicode(upload.filename,
                                                        'utf-8'))
        filename = filename.replace('\\', '/').replace(':', '/')
        filename = os.path.basename(filename)
        if not filename:
            raise TracError('No file uploaded')

        attachment.description = req.args.get('description', '')
        attachment.author = get_reporter_id(req, 'author')
        attachment.ipnr = req.remote_addr

        # Validate attachment
        for manipulator in self.manipulators:
            for field, message in manipulator.validate_attachment(req, attachment):
                if field:
                    raise InvalidAttachment('Attachment field %s is invalid: %s'
                                            % (field, message))
                else:
                    raise InvalidAttachment('Invalid attachment: %s' % message)

        if req.args.get('replace'):
            try:
                old_attachment = Attachment(self.env, attachment.parent_type,
                                            attachment.parent_id, filename)
                if not (old_attachment.author and req.authname \
                        and old_attachment.author == req.authname):
                    perm_map = {'ticket': 'TICKET_ADMIN', 'wiki': 'WIKI_DELETE'}
                    req.perm.assert_permission(perm_map[old_attachment.parent_type])
                old_attachment.delete()
            except TracError:
                pass # don't worry if there's nothing to replace
            attachment.filename = None
        attachment.insert(filename, upload.file, size)

        # Redirect the user to the newly created attachment
        req.redirect(attachment.href(req))

    def _do_delete(self, req, attachment):
        perm_map = {'ticket': 'TICKET_ADMIN', 'wiki': 'WIKI_DELETE'}
        req.perm.assert_permission(perm_map[attachment.parent_type])

        if req.args.has_key('cancel'):
            req.redirect(attachment.href(req))

        attachment.delete()

        # Redirect the user to the attachment parent page
        req.redirect(attachment.parent_href(req))

    def _render_confirm(self, req, attachment):
        perm_map = {'ticket': 'TICKET_ADMIN', 'wiki': 'WIKI_DELETE'}
        req.perm.assert_permission(perm_map[attachment.parent_type])

        req.hdf['title'] = '%s (delete)' % attachment.title
        req.hdf['attachment'] = {'filename': attachment.filename,
                                 'mode': 'delete'}

    def _render_form(self, req, attachment):
        perm_map = {'ticket': 'TICKET_APPEND', 'wiki': 'WIKI_MODIFY'}
        req.perm.assert_permission(perm_map[attachment.parent_type])

        req.hdf['attachment'] = {'mode': 'new',
                                 'author': get_reporter_id(req)}

    def _render_view(self, req, attachment):
        perm_map = {'ticket': 'TICKET_VIEW', 'wiki': 'WIKI_VIEW'}
        req.perm.assert_permission(perm_map[attachment.parent_type])

        req.check_modified(attachment.time)

        # Render HTML view
        req.hdf['title'] = attachment.title
        req.hdf['attachment'] = attachment_to_hdf(self.env, req, None,
                                                  attachment)
        # Override the 'oneliner'
        req.hdf['attachment.description'] = wiki_to_html(attachment.description,
                                                         self.env, req)

        perm_map = {'ticket': 'TICKET_ADMIN', 'wiki': 'WIKI_DELETE'}
        if req.perm.has_permission(perm_map[attachment.parent_type]):
            req.hdf['attachment.can_delete'] = 1

        fd = attachment.open()
        try:
            mimeview = Mimeview(self.env)

            # MIME type detection
            str_data = fd.read(1000)
            fd.seek(0)
            
            binary = is_binary(str_data)
            mime_type = mimeview.get_mimetype(attachment.filename, str_data)

            # Eventually send the file directly
            format = req.args.get('format')
            if format in ('raw', 'txt'):
                if not self.render_unsafe_content and not binary:
                    # Force browser to download HTML/SVG/etc pages that may
                    # contain malicious code enabling XSS attacks
                    req.send_header('Content-Disposition', 'attachment;' +
                                    'filename=' + attachment.filename)
                if not mime_type or (self.render_unsafe_content and \
                                     not binary and format == 'txt'):
                    mime_type = 'text/plain'
                if 'charset=' not in mime_type:
                    charset = mimeview.get_charset(str_data, mime_type)
                    mime_type = mime_type + '; charset=' + charset
                req.send_file(attachment.path, mime_type)

            # add ''Plain Text'' alternate link if needed
            if self.render_unsafe_content and not binary and \
               mime_type and not mime_type.startswith('text/plain'):
                plaintext_href = attachment.href(req, format='txt')
                add_link(req, 'alternate', plaintext_href, 'Plain Text',
                         mime_type)

            # add ''Original Format'' alternate link (always)
            raw_href = attachment.href(req, format='raw')
            add_link(req, 'alternate', raw_href, 'Original Format', mime_type)

            self.log.debug("Rendering preview of file %s with mime-type %s"
                           % (attachment.filename, mime_type))

            req.hdf['attachment'] = mimeview.preview_to_hdf(
                req, fd, os.fstat(fd.fileno()).st_size, mime_type,
                attachment.filename, raw_href, annotations=['lineno'])
        finally:
            fd.close()

    def _render_list(self, req, p_type, p_id):
        self._parent_to_hdf(req, p_type, p_id)
        req.hdf['attachment'] = {
            'mode': 'list',
            'list': attachments_to_hdf(self.env, req, None, p_type, p_id),
            'attach_href': req.href.attachment(p_type, p_id)
            }

    def _format_link(self, formatter, ns, target, label):
        link, params, fragment = formatter.split_link(target)
        ids = link.split(':', 2)
        if len(ids) == 3:
            parent_type, parent_id, filename = ids
        else:
            # FIXME: the formatter should know which object the text being
            #        formatter belongs to
            parent_type, parent_id = 'wiki', 'WikiStart'
            if formatter.req:
                path_info = formatter.req.path_info.split('/', 2)
                if len(path_info) > 1:
                    parent_type = path_info[1]
                if len(path_info) > 2:
                    parent_id = path_info[2]
            filename = link
        href = formatter.href()
        try:
            attachment = Attachment(self.env, parent_type, parent_id, filename)
            if formatter.req:
                href = attachment.href(formatter.req) + params
            return html.A(label, class_='attachment', href=href,
                          title='Attachment %s' % attachment.title)
        except TracError:
            return html.A(label, class_='missing attachment', rel='nofollow',
                          href=formatter.href())
Copyright (C) 2012-2017 Edgewall Software