Mercurial > genshi > mirror
diff examples/trac/trac/db/api.py @ 39:93b4dcbafd7b trunk
Copy Trac to main branch.
author | cmlenz |
---|---|
date | Mon, 03 Jul 2006 18:53:27 +0000 |
parents | |
children |
line wrap: on
line diff
new file mode 100644 --- /dev/null +++ b/examples/trac/trac/db/api.py @@ -0,0 +1,163 @@ +# -*- coding: utf-8 -*- +# +# Copyright (C) 2005 Edgewall Software +# Copyright (C) 2005 Christopher Lenz <cmlenz@gmx.de> +# All rights reserved. +# +# This software is licensed as described in the file COPYING, which +# you should have received as part of this distribution. The terms +# are also available at http://trac.edgewall.com/license.html. +# +# This software consists of voluntary contributions made by many +# individuals. For the exact contribution history, see the revision +# history and logs, available at http://projects.edgewall.com/trac/. +# +# Author: Christopher Lenz <cmlenz@gmx.de> + +import os +import urllib + +from trac.config import Option +from trac.core import * +from trac.db.pool import ConnectionPool + + +def get_column_names(cursor): + return cursor.description and \ + [(isinstance(d[0], str) and [unicode(d[0], 'utf-8')] or [d[0]])[0] + for d in cursor.description] or [] + + +class IDatabaseConnector(Interface): + """Extension point interface for components that support the connection to + relational databases.""" + + def get_supported_schemes(): + """Return the connection URL schemes supported by the connector, and + their relative priorities as an iterable of `(scheme, priority)` tuples. + """ + + def get_connection(**kwargs): + """Create a new connection to the database.""" + + def init_db(**kwargs): + """Initialize the database.""" + + def to_sql(table): + """Return the DDL statements necessary to create the specified table, + including indices.""" + + +class DatabaseManager(Component): + + connectors = ExtensionPoint(IDatabaseConnector) + + connection_uri = Option('trac', 'database', 'sqlite:db/trac.db', + """Database connection + [wiki:TracEnvironment#DatabaseConnectionStrings string] for this + project""") + + def __init__(self): + self._cnx_pool = None + + def init_db(self): + connector, args = self._get_connector() + connector.init_db(**args) + + def get_connection(self): + if not self._cnx_pool: + connector, args = self._get_connector() + self._cnx_pool = ConnectionPool(5, connector, **args) + return self._cnx_pool.get_cnx() + + def shutdown(self): + if self._cnx_pool: + self._cnx_pool.shutdown() + self._cnx_pool = None + + def _get_connector(self): ### FIXME: Make it public? + scheme, args = _parse_db_str(self.connection_uri) + candidates = {} + connector = None + for connector in self.connectors: + for scheme_, priority in connector.get_supported_schemes(): + if scheme_ != scheme: + continue + highest = candidates.get(scheme_, (None, 0))[1] + if priority > highest: + candidates[scheme] = (connector, priority) + connector = candidates.get(scheme, [None])[0] + if not connector: + raise TracError, 'Unsupported database type "%s"' % scheme + + if scheme == 'sqlite': + # Special case for SQLite to support a path relative to the + # environment directory + if args['path'] != ':memory:' and \ + not args['path'].startswith('/'): + args['path'] = os.path.join(self.env.path, + args['path'].lstrip('/')) + + return connector, args + + +def _parse_db_str(db_str): + scheme, rest = db_str.split(':', 1) + + if not rest.startswith('/'): + if scheme == 'sqlite': + # Support for relative and in-memory SQLite connection strings + host = None + path = rest + else: + raise TracError, 'Database connection string %s must start with ' \ + 'scheme:/' % db_str + else: + if rest.startswith('/') and not rest.startswith('//'): + host = None + rest = rest[1:] + elif rest.startswith('///'): + host = None + rest = rest[3:] + else: + rest = rest[2:] + if rest.find('/') == -1: + host = rest + rest = '' + else: + host, rest = rest.split('/', 1) + path = None + + if host and host.find('@') != -1: + user, host = host.split('@', 1) + if user.find(':') != -1: + user, password = user.split(':', 1) + else: + password = None + else: + user = password = None + if host and host.find(':') != -1: + host, port = host.split(':') + port = int(port) + else: + port = None + + if not path: + path = '/' + rest + if os.name == 'nt': + # Support local paths containing drive letters on Win32 + if len(rest) > 1 and rest[1] == '|': + path = "%s:%s" % (rest[0], rest[2:]) + + params = {} + if path.find('?') != -1: + path, qs = path.split('?', 1) + qs = qs.split('&') + for param in qs: + name, value = param.split('=', 1) + value = urllib.unquote(value) + params[name] = value + + args = zip(('user', 'password', 'host', 'port', 'path', 'params'), + (user, password, host, port, path, params)) + return scheme, dict([(key, value) for key, value in args if value])