cmlenz@1: # -*- coding: utf-8 -*- cmlenz@1: # cmlenz@27: # Copyright (C) 2006 Christopher Lenz cmlenz@1: # All rights reserved. cmlenz@1: # cmlenz@1: # This software is licensed as described in the file COPYING, which cmlenz@1: # you should have received as part of this distribution. The terms cmlenz@27: # are also available at http://markup.cmlenz.net/wiki/License. cmlenz@1: # cmlenz@1: # This software consists of voluntary contributions made by many cmlenz@1: # individuals. For the exact contribution history, see the revision cmlenz@27: # history and logs, available at http://markup.cmlenz.net/log/. cmlenz@1: cmlenz@1: """Basic support for evaluating XPath expressions against streams.""" cmlenz@1: cmlenz@1: import re cmlenz@1: cmlenz@1: from markup.core import QName, Stream cmlenz@1: cmlenz@1: __all__ = ['Path'] cmlenz@1: cmlenz@1: cmlenz@1: class Path(object): cmlenz@26: """Implements basic XPath support on streams. cmlenz@1: cmlenz@26: Instances of this class represent a "compiled" XPath expression, and provide cmlenz@26: methods for testing the path against a stream, as well as extracting a cmlenz@26: substream matching that path. cmlenz@1: """ cmlenz@1: _TOKEN_RE = re.compile('(::|\.\.|\(\)|[/.:\[\]\(\)@=!])|' cmlenz@1: '([^/:\[\]\(\)@=!\s]+)|' cmlenz@1: '\s+') cmlenz@26: _QUOTES = (("'", "'"), ('"', '"')) cmlenz@1: cmlenz@1: def __init__(self, text): cmlenz@26: """Create the path object from a string. cmlenz@26: cmlenz@26: @param text: the path expression cmlenz@26: """ cmlenz@1: self.source = text cmlenz@1: cmlenz@1: steps = [] cmlenz@1: cur_op = '' cmlenz@1: cur_tag = '' cmlenz@1: in_predicate = False cmlenz@1: for op, tag in self._TOKEN_RE.findall(text): cmlenz@1: if op: cmlenz@1: if op == '[': cmlenz@1: in_predicate = True cmlenz@1: elif op == ']': cmlenz@1: in_predicate = False cmlenz@1: elif op.startswith('('): cmlenz@1: if cur_tag == 'text': cmlenz@26: steps[-1] = (False, self._FunctionText(), []) cmlenz@1: else: cmlenz@1: raise NotImplementedError('XPath function "%s" not ' cmlenz@1: 'supported' % cur_tag) cmlenz@38: elif op == '.': cmlenz@38: steps.append([False, self._CurrentElement(), []]) cmlenz@1: else: cmlenz@1: cur_op += op cmlenz@1: cur_tag = '' cmlenz@1: else: cmlenz@1: closure = cur_op in ('', '//') cmlenz@1: if cur_op == '@': cmlenz@1: if tag == '*': cmlenz@26: node_test = self._AnyAttribute() cmlenz@1: else: cmlenz@26: node_test = self._AttributeByName(tag) cmlenz@1: else: cmlenz@1: if tag == '*': cmlenz@38: node_test = self._AnyChildElement() cmlenz@1: elif in_predicate: cmlenz@26: if len(tag) > 1 and (tag[0], tag[-1]) in self._QUOTES: cmlenz@26: node_test = self._LiteralString(tag[1:-1]) cmlenz@1: if cur_op == '=': cmlenz@26: node_test = self._OperatorEq(steps[-1][2][-1], cmlenz@26: node_test) cmlenz@1: steps[-1][2].pop() cmlenz@1: elif cur_op == '!=': cmlenz@26: node_test = self._OperatorNeq(steps[-1][2][-1], cmlenz@26: node_test) cmlenz@1: steps[-1][2].pop() cmlenz@1: else: cmlenz@38: node_test = self._ChildElementByName(tag) cmlenz@1: if in_predicate: cmlenz@1: steps[-1][2].append(node_test) cmlenz@1: else: cmlenz@1: steps.append([closure, node_test, []]) cmlenz@1: cur_op = '' cmlenz@1: cur_tag = tag cmlenz@38: cmlenz@38: self.steps = [] cmlenz@38: for step in steps: cmlenz@38: self.steps.append(tuple(step)) cmlenz@1: cmlenz@1: def __repr__(self): cmlenz@1: return '<%s "%s">' % (self.__class__.__name__, self.source) cmlenz@1: cmlenz@1: def select(self, stream): cmlenz@26: """Returns a substream of the given stream that matches the path. cmlenz@26: cmlenz@26: If there are no matches, this method returns an empty stream. cmlenz@26: cmlenz@33: >>> from markup.input import XML cmlenz@33: >>> xml = XML('Text') cmlenz@37: cmlenz@33: >>> print Path('child').select(xml) cmlenz@33: Text cmlenz@33: cmlenz@33: >>> print Path('child/text()').select(xml) cmlenz@33: Text cmlenz@33: cmlenz@26: @param stream: the stream to select from cmlenz@26: @return: the substream matching the path, or an empty stream cmlenz@26: """ cmlenz@1: stream = iter(stream) cmlenz@26: def _generate(): cmlenz@1: test = self.test() cmlenz@1: for kind, data, pos in stream: cmlenz@1: result = test(kind, data, pos) cmlenz@1: if result is True: cmlenz@1: yield kind, data, pos cmlenz@1: depth = 1 cmlenz@1: while depth > 0: cmlenz@1: ev = stream.next() cmlenz@38: depth += {Stream.START: 1, Stream.END: -1}.get(ev[0], 0) cmlenz@1: yield ev cmlenz@1: test(*ev) cmlenz@1: elif result: cmlenz@1: yield result cmlenz@26: return Stream(_generate()) cmlenz@1: cmlenz@38: def test(self, ignore_context=False): cmlenz@26: """Returns a function that can be used to track whether the path matches cmlenz@26: a specific stream event. cmlenz@26: cmlenz@26: The function returned expects the positional arguments `kind`, `data`, cmlenz@26: and `pos`, i.e. basically an unpacked stream event. If the path matches cmlenz@26: the event, the function returns the match (for example, a `START` or cmlenz@26: `TEXT` event.) Otherwise, it returns `None` or `False`. cmlenz@33: cmlenz@33: >>> from markup.input import XML cmlenz@33: >>> xml = XML('') cmlenz@33: >>> test = Path('child').test() cmlenz@33: >>> for kind, data, pos in xml: cmlenz@33: ... if test(kind, data, pos): cmlenz@33: ... print kind, data cmlenz@33: START (u'child', [(u'id', u'1')]) cmlenz@33: START (u'child', [(u'id', u'2')]) cmlenz@26: """ cmlenz@1: stack = [0] # stack of cursors into the location path cmlenz@1: cmlenz@1: def _test(kind, data, pos): cmlenz@1: if not stack: cmlenz@1: return False cmlenz@1: cmlenz@1: if kind is Stream.END: cmlenz@1: stack.pop() cmlenz@1: return None cmlenz@1: cmlenz@1: if kind is Stream.START: cmlenz@1: stack.append(stack[-1]) cmlenz@1: cmlenz@1: matched = False cmlenz@1: closure, node_test, predicates = self.steps[stack[-1]] cmlenz@1: cmlenz@1: matched = node_test(kind, data, pos) cmlenz@1: if matched and predicates: cmlenz@1: for predicate in predicates: cmlenz@1: if not predicate(kind, data, pos): cmlenz@1: matched = None cmlenz@1: break cmlenz@1: cmlenz@1: if matched: cmlenz@1: if stack[-1] == len(self.steps) - 1: cmlenz@38: if ignore_context or len(stack) > 2 \ cmlenz@38: or node_test.axis != 'child': cmlenz@38: return matched cmlenz@38: else: cmlenz@38: stack[-1] += 1 cmlenz@1: cmlenz@1: elif kind is Stream.START and not closure: cmlenz@24: # If this step is not a closure, it cannot be matched until the cmlenz@24: # current element is closed... so we need to move the cursor cmlenz@24: # back to the last closure and retest that against the current cmlenz@24: # element cmlenz@1: closures = [step for step in self.steps[:stack[-1]] if step[0]] cmlenz@25: closures.reverse() cmlenz@1: for closure, node_test, predicates in closures: cmlenz@1: stack[-1] -= 1 cmlenz@1: if closure: cmlenz@1: matched = node_test(kind, data, pos) cmlenz@1: if matched: cmlenz@1: stack[-1] += 1 cmlenz@1: break cmlenz@1: cmlenz@1: return None cmlenz@1: cmlenz@1: return _test cmlenz@1: cmlenz@38: class _NodeTest(object): cmlenz@38: """Abstract node test.""" cmlenz@38: axis = None cmlenz@38: def __repr__(self): cmlenz@38: return '<%s>' % self.__class__.__name__ cmlenz@38: cmlenz@38: class _CurrentElement(_NodeTest): cmlenz@38: """Node test that matches the context node.""" cmlenz@38: axis = 'self' cmlenz@26: def __call__(self, kind, *_): cmlenz@1: if kind is Stream.START: cmlenz@1: return True cmlenz@1: return None cmlenz@1: cmlenz@38: class _AnyChildElement(_NodeTest): cmlenz@38: """Node test that matches any child element.""" cmlenz@38: axis = 'child' cmlenz@38: def __call__(self, kind, *_): cmlenz@38: if kind is Stream.START: cmlenz@38: return True cmlenz@38: return None cmlenz@38: cmlenz@38: class _ChildElementByName(_NodeTest): cmlenz@38: """Node test that matches a child element with a specific tag name.""" cmlenz@38: axis = 'child' cmlenz@1: def __init__(self, name): cmlenz@1: self.name = QName(name) cmlenz@26: def __call__(self, kind, data, _): cmlenz@1: if kind is Stream.START: cmlenz@1: return data[0].localname == self.name cmlenz@1: return None cmlenz@1: def __repr__(self): cmlenz@1: return '<%s "%s">' % (self.__class__.__name__, self.name) cmlenz@1: cmlenz@38: class _AnyAttribute(_NodeTest): cmlenz@26: """Node test that matches any attribute.""" cmlenz@38: axis = 'attribute' cmlenz@1: def __call__(self, kind, data, pos): cmlenz@1: if kind is Stream.START: cmlenz@26: text = ''.join([val for _, val in data[1]]) cmlenz@1: if text: cmlenz@1: return Stream.TEXT, text, pos cmlenz@1: return None cmlenz@1: return None cmlenz@1: cmlenz@38: class _AttributeByName(_NodeTest): cmlenz@26: """Node test that matches an attribute with a specific name.""" cmlenz@38: axis = 'attribute' cmlenz@1: def __init__(self, name): cmlenz@1: self.name = QName(name) cmlenz@1: def __call__(self, kind, data, pos): cmlenz@1: if kind is Stream.START: cmlenz@1: if self.name in data[1]: cmlenz@1: return Stream.TEXT, data[1].get(self.name), pos cmlenz@1: return None cmlenz@1: return None cmlenz@1: def __repr__(self): cmlenz@1: return '<%s "%s">' % (self.__class__.__name__, self.name) cmlenz@1: cmlenz@38: class _Function(_NodeTest): cmlenz@38: """Abstract node test representing a function.""" cmlenz@38: cmlenz@38: class _FunctionText(_Function): cmlenz@26: """Function that returns text content.""" cmlenz@1: def __call__(self, kind, data, pos): cmlenz@1: if kind is Stream.TEXT: cmlenz@1: return kind, data, pos cmlenz@1: return None cmlenz@1: cmlenz@38: class _LiteralString(_NodeTest): cmlenz@26: """Always returns a literal string.""" cmlenz@1: def __init__(self, value): cmlenz@1: self.value = value cmlenz@26: def __call__(self, *_): cmlenz@1: return Stream.TEXT, self.value, (-1, -1) cmlenz@1: cmlenz@38: class _OperatorEq(_NodeTest): cmlenz@26: """Equality comparison operator.""" cmlenz@1: def __init__(self, lval, rval): cmlenz@1: self.lval = lval cmlenz@1: self.rval = rval cmlenz@1: def __call__(self, kind, data, pos): cmlenz@1: lval = self.lval(kind, data, pos) cmlenz@1: rval = self.rval(kind, data, pos) cmlenz@1: return (lval and lval[1]) == (rval and rval[1]) cmlenz@1: def __repr__(self): cmlenz@1: return '<%s %r = %r>' % (self.__class__.__name__, self.lval, cmlenz@1: self.rval) cmlenz@1: cmlenz@38: class _OperatorNeq(_NodeTest): cmlenz@26: """Inequality comparison operator.""" cmlenz@1: def __init__(self, lval, rval): cmlenz@1: self.lval = lval cmlenz@1: self.rval = rval cmlenz@1: def __call__(self, kind, data, pos): cmlenz@1: lval = self.lval(kind, data, pos) cmlenz@1: rval = self.rval(kind, data, pos) cmlenz@1: return (lval and lval[1]) != (rval and rval[1]) cmlenz@1: def __repr__(self): cmlenz@1: return '<%s %r != %r>' % (self.__class__.__name__, self.lval, cmlenz@1: self.rval)