Mercurial > genshi > genshi-test
view markup/path.py @ 29:4b6cee37ce62
* Minor simplification of template directives: they no longer get passed the template instance and the position, as no directive was actually using
that information.
* More docstrings.
author | cmlenz |
---|---|
date | Wed, 28 Jun 2006 10:40:39 +0000 |
parents | b8456279c444 |
children | 0e1fc0211416 |
line wrap: on
line source
# -*- coding: utf-8 -*- # # Copyright (C) 2006 Christopher Lenz # All rights reserved. # # This software is licensed as described in the file COPYING, which # you should have received as part of this distribution. The terms # are also available at http://markup.cmlenz.net/wiki/License. # # This software consists of voluntary contributions made by many # individuals. For the exact contribution history, see the revision # history and logs, available at http://markup.cmlenz.net/log/. """Basic support for evaluating XPath expressions against streams.""" import re from markup.core import QName, Stream __all__ = ['Path'] class Path(object): """Implements basic XPath support on streams. Instances of this class represent a "compiled" XPath expression, and provide methods for testing the path against a stream, as well as extracting a substream matching that path. """ _TOKEN_RE = re.compile('(::|\.\.|\(\)|[/.:\[\]\(\)@=!])|' '([^/:\[\]\(\)@=!\s]+)|' '\s+') _QUOTES = (("'", "'"), ('"', '"')) def __init__(self, text): """Create the path object from a string. @param text: the path expression """ self.source = text steps = [] cur_op = '' cur_tag = '' in_predicate = False for op, tag in self._TOKEN_RE.findall(text): if op: if op == '[': in_predicate = True elif op == ']': in_predicate = False elif op.startswith('('): if cur_tag == 'text': steps[-1] = (False, self._FunctionText(), []) else: raise NotImplementedError('XPath function "%s" not ' 'supported' % cur_tag) else: cur_op += op cur_tag = '' else: closure = cur_op in ('', '//') if cur_op == '@': if tag == '*': node_test = self._AnyAttribute() else: node_test = self._AttributeByName(tag) else: if tag == '*': node_test = self._AnyElement() elif in_predicate: if len(tag) > 1 and (tag[0], tag[-1]) in self._QUOTES: node_test = self._LiteralString(tag[1:-1]) if cur_op == '=': node_test = self._OperatorEq(steps[-1][2][-1], node_test) steps[-1][2].pop() elif cur_op == '!=': node_test = self._OperatorNeq(steps[-1][2][-1], node_test) steps[-1][2].pop() else: node_test = self._ElementByName(tag) if in_predicate: steps[-1][2].append(node_test) else: steps.append([closure, node_test, []]) cur_op = '' cur_tag = tag self.steps = steps def __repr__(self): return '<%s "%s">' % (self.__class__.__name__, self.source) def select(self, stream): """Returns a substream of the given stream that matches the path. If there are no matches, this method returns an empty stream. @param stream: the stream to select from @return: the substream matching the path, or an empty stream """ stream = iter(stream) def _generate(): test = self.test() for kind, data, pos in stream: result = test(kind, data, pos) if result is True: yield kind, data, pos depth = 1 while depth > 0: ev = stream.next() if ev[0] is Stream.START: depth += 1 elif ev[0] is Stream.END: depth -= 1 yield ev test(*ev) elif result: yield result return Stream(_generate()) def test(self): """Returns a function that can be used to track whether the path matches a specific stream event. The function returned expects the positional arguments `kind`, `data`, and `pos`, i.e. basically an unpacked stream event. If the path matches the event, the function returns the match (for example, a `START` or `TEXT` event.) Otherwise, it returns `None` or `False`. """ stack = [0] # stack of cursors into the location path def _test(kind, data, pos): if not stack: return False if kind is Stream.END: stack.pop() return None if kind is Stream.START: stack.append(stack[-1]) matched = False closure, node_test, predicates = self.steps[stack[-1]] matched = node_test(kind, data, pos) if matched and predicates: for predicate in predicates: if not predicate(kind, data, pos): matched = None break if matched: if stack[-1] == len(self.steps) - 1: return matched stack[-1] += 1 elif kind is Stream.START and not closure: # If this step is not a closure, it cannot be matched until the # current element is closed... so we need to move the cursor # back to the last closure and retest that against the current # element closures = [step for step in self.steps[:stack[-1]] if step[0]] closures.reverse() for closure, node_test, predicates in closures: stack[-1] -= 1 if closure: matched = node_test(kind, data, pos) if matched: stack[-1] += 1 break return None return _test class _AnyElement(object): """Node test that matches any element.""" def __call__(self, kind, *_): if kind is Stream.START: return True return None def __repr__(self): return '<%s>' % self.__class__.__name__ class _ElementByName(object): """Node test that matches an element with a specific tag name.""" def __init__(self, name): self.name = QName(name) def __call__(self, kind, data, _): if kind is Stream.START: return data[0].localname == self.name return None def __repr__(self): return '<%s "%s">' % (self.__class__.__name__, self.name) class _AnyAttribute(object): """Node test that matches any attribute.""" def __call__(self, kind, data, pos): if kind is Stream.START: text = ''.join([val for _, val in data[1]]) if text: return Stream.TEXT, text, pos return None return None def __repr__(self): return '<%s>' % (self.__class__.__name__) class _AttributeByName(object): """Node test that matches an attribute with a specific name.""" def __init__(self, name): self.name = QName(name) def __call__(self, kind, data, pos): if kind is Stream.START: if self.name in data[1]: return Stream.TEXT, data[1].get(self.name), pos return None return None def __repr__(self): return '<%s "%s">' % (self.__class__.__name__, self.name) class _FunctionText(object): """Function that returns text content.""" def __call__(self, kind, data, pos): if kind is Stream.TEXT: return kind, data, pos return None def __repr__(self): return '<%s>' % (self.__class__.__name__) class _LiteralString(object): """Always returns a literal string.""" def __init__(self, value): self.value = value def __call__(self, *_): return Stream.TEXT, self.value, (-1, -1) def __repr__(self): return '<%s>' % (self.__class__.__name__) class _OperatorEq(object): """Equality comparison operator.""" def __init__(self, lval, rval): self.lval = lval self.rval = rval def __call__(self, kind, data, pos): lval = self.lval(kind, data, pos) rval = self.rval(kind, data, pos) return (lval and lval[1]) == (rval and rval[1]) def __repr__(self): return '<%s %r = %r>' % (self.__class__.__name__, self.lval, self.rval) class _OperatorNeq(object): """Inequality comparison operator.""" def __init__(self, lval, rval): self.lval = lval self.rval = rval def __call__(self, kind, data, pos): lval = self.lval(kind, data, pos) rval = self.rval(kind, data, pos) return (lval and lval[1]) != (rval and rval[1]) def __repr__(self): return '<%s %r != %r>' % (self.__class__.__name__, self.lval, self.rval)