cmlenz@1: # -*- coding: utf-8 -*- cmlenz@1: # cmlenz@1: # Copyright (C) 2006 Edgewall Software cmlenz@1: # All rights reserved. cmlenz@1: # cmlenz@1: # This software is licensed as described in the file COPYING, which cmlenz@1: # you should have received as part of this distribution. The terms cmlenz@1: # are also available at http://trac.edgewall.com/license.html. cmlenz@1: # cmlenz@1: # This software consists of voluntary contributions made by many cmlenz@1: # individuals. For the exact contribution history, see the revision cmlenz@1: # history and logs, available at http://projects.edgewall.com/trac/. cmlenz@1: cmlenz@1: """Basic support for evaluating XPath expressions against streams.""" cmlenz@1: cmlenz@1: import re cmlenz@1: cmlenz@1: from markup.core import QName, Stream cmlenz@1: cmlenz@1: __all__ = ['Path'] cmlenz@1: cmlenz@1: _QUOTES = (("'", "'"), ('"', '"')) cmlenz@1: cmlenz@1: class Path(object): cmlenz@1: """Basic XPath support on markup event streams. cmlenz@1: cmlenz@1: >>> from markup.input import XML cmlenz@1: cmlenz@1: Selecting specific tags: cmlenz@1: cmlenz@1: >>> Path('root').select(XML('')).render() cmlenz@1: '' cmlenz@1: >>> Path('//root').select(XML('')).render() cmlenz@1: '' cmlenz@1: cmlenz@1: Using wildcards for tag names: cmlenz@1: cmlenz@1: >>> Path('*').select(XML('')).render() cmlenz@1: '' cmlenz@1: >>> Path('//*').select(XML('')).render() cmlenz@1: '' cmlenz@1: cmlenz@1: Selecting attribute values: cmlenz@1: cmlenz@1: >>> Path('@foo').select(XML('')).render() cmlenz@1: '' cmlenz@1: >>> Path('@foo').select(XML('')).render() cmlenz@1: 'bar' cmlenz@1: cmlenz@1: Selecting descendants: cmlenz@1: cmlenz@1: >>> Path("root/*").select(XML('')).render() cmlenz@1: '' cmlenz@1: >>> Path("root/bar").select(XML('')).render() cmlenz@1: '' cmlenz@1: >>> Path("root/baz").select(XML('')).render() cmlenz@1: '' cmlenz@1: >>> Path("root/foo/*").select(XML('')).render() cmlenz@1: '' cmlenz@1: cmlenz@1: Selecting text nodes: cmlenz@1: >>> Path("item/text()").select(XML('Foo')).render() cmlenz@1: 'Foo' cmlenz@1: >>> Path("item/text()").select(XML('FooBar')).render() cmlenz@1: 'FooBar' cmlenz@1: cmlenz@1: Skipping ancestors: cmlenz@1: cmlenz@1: >>> Path("foo/bar").select(XML('')).render() cmlenz@1: '' cmlenz@1: >>> Path("foo/*").select(XML('')).render() cmlenz@1: '' cmlenz@1: >>> Path("root/bar").select(XML('')).render() cmlenz@1: '' cmlenz@1: >>> Path("root/bar").select(XML('')).render() cmlenz@1: '' cmlenz@1: >>> Path("root/*/bar").select(XML('')).render() cmlenz@1: '' cmlenz@1: >>> Path("root//bar").select(XML('')).render() cmlenz@1: '' cmlenz@1: >>> Path("root//bar").select(XML('')).render() cmlenz@1: '' cmlenz@1: cmlenz@1: Using simple attribute predicates: cmlenz@1: >>> Path("root/item[@important]").select(XML('')).render() cmlenz@1: '' cmlenz@1: >>> Path('root/item[@important="very"]').select(XML('')).render() cmlenz@1: '' cmlenz@1: >>> Path("root/item[@important='very']").select(XML('')).render() cmlenz@1: '' cmlenz@1: >>> Path("root/item[@important!='very']").select( cmlenz@1: ... XML('')).render() cmlenz@1: '' cmlenz@1: """ cmlenz@1: cmlenz@1: _TOKEN_RE = re.compile('(::|\.\.|\(\)|[/.:\[\]\(\)@=!])|' cmlenz@1: '([^/:\[\]\(\)@=!\s]+)|' cmlenz@1: '\s+') cmlenz@1: cmlenz@1: def __init__(self, text): cmlenz@1: self.source = text cmlenz@1: cmlenz@1: steps = [] cmlenz@1: cur_op = '' cmlenz@1: cur_tag = '' cmlenz@1: in_predicate = False cmlenz@1: for op, tag in self._TOKEN_RE.findall(text): cmlenz@1: if op: cmlenz@1: if op == '[': cmlenz@1: in_predicate = True cmlenz@1: elif op == ']': cmlenz@1: in_predicate = False cmlenz@1: elif op.startswith('('): cmlenz@1: if cur_tag == 'text': cmlenz@1: steps[-1] = (False, self.fn_text(), []) cmlenz@1: else: cmlenz@1: raise NotImplementedError('XPath function "%s" not ' cmlenz@1: 'supported' % cur_tag) cmlenz@1: else: cmlenz@1: cur_op += op cmlenz@1: cur_tag = '' cmlenz@1: else: cmlenz@1: closure = cur_op in ('', '//') cmlenz@1: if cur_op == '@': cmlenz@1: if tag == '*': cmlenz@1: node_test = self.any_attribute() cmlenz@1: else: cmlenz@1: node_test = self.attribute_by_name(tag) cmlenz@1: else: cmlenz@1: if tag == '*': cmlenz@1: node_test = self.any_element() cmlenz@1: elif in_predicate: cmlenz@1: if len(tag) > 1 and (tag[0], tag[-1]) in _QUOTES: cmlenz@1: node_test = self.literal_string(tag[1:-1]) cmlenz@1: if cur_op == '=': cmlenz@1: node_test = self.op_eq(steps[-1][2][-1], node_test) cmlenz@1: steps[-1][2].pop() cmlenz@1: elif cur_op == '!=': cmlenz@1: node_test = self.op_neq(steps[-1][2][-1], node_test) cmlenz@1: steps[-1][2].pop() cmlenz@1: else: cmlenz@1: node_test = self.element_by_name(tag) cmlenz@1: if in_predicate: cmlenz@1: steps[-1][2].append(node_test) cmlenz@1: else: cmlenz@1: steps.append([closure, node_test, []]) cmlenz@1: cur_op = '' cmlenz@1: cur_tag = tag cmlenz@1: self.steps = steps cmlenz@1: cmlenz@1: def __repr__(self): cmlenz@1: return '<%s "%s">' % (self.__class__.__name__, self.source) cmlenz@1: cmlenz@1: def select(self, stream): cmlenz@1: stream = iter(stream) cmlenz@1: def _generate(tests): cmlenz@1: test = self.test() cmlenz@1: for kind, data, pos in stream: cmlenz@1: result = test(kind, data, pos) cmlenz@1: if result is True: cmlenz@1: yield kind, data, pos cmlenz@1: depth = 1 cmlenz@1: while depth > 0: cmlenz@1: ev = stream.next() cmlenz@1: if ev[0] is Stream.START: cmlenz@1: depth += 1 cmlenz@1: elif ev[0] is Stream.END: cmlenz@1: depth -= 1 cmlenz@1: yield ev cmlenz@1: test(*ev) cmlenz@1: elif result: cmlenz@1: yield result cmlenz@1: return Stream(_generate(self.steps)) cmlenz@1: cmlenz@1: def test(self): cmlenz@1: stack = [0] # stack of cursors into the location path cmlenz@1: cmlenz@1: def _test(kind, data, pos): cmlenz@1: #print '\nTracker %r test [%s] %r' % (self, kind, data) cmlenz@1: cmlenz@1: if not stack: cmlenz@1: return False cmlenz@1: cmlenz@1: if kind is Stream.END: cmlenz@1: stack.pop() cmlenz@1: return None cmlenz@1: cmlenz@1: if kind is Stream.START: cmlenz@1: stack.append(stack[-1]) cmlenz@1: cmlenz@1: matched = False cmlenz@1: closure, node_test, predicates = self.steps[stack[-1]] cmlenz@1: cmlenz@1: #print ' Testing against %r' % node_test cmlenz@1: matched = node_test(kind, data, pos) cmlenz@1: if matched and predicates: cmlenz@1: for predicate in predicates: cmlenz@1: if not predicate(kind, data, pos): cmlenz@1: matched = None cmlenz@1: break cmlenz@1: cmlenz@1: if matched: cmlenz@1: if stack[-1] == len(self.steps) - 1: cmlenz@1: #print ' Last step %r... returned %r' % (node_test, matched) cmlenz@1: return matched cmlenz@1: cmlenz@1: #print ' Matched intermediate step %r... proceed to next step %r' % (node_test, self.steps[stack[-1] + 1]) cmlenz@1: stack[-1] += 1 cmlenz@1: cmlenz@1: elif kind is Stream.START and not closure: cmlenz@1: # FIXME: If this step is not a closure, it cannot be matched cmlenz@1: # until the current element is closed... so we need to cmlenz@1: # move the cursor back to the last closure and retest cmlenz@1: # that against the current element cmlenz@1: closures = [step for step in self.steps[:stack[-1]] if step[0]] cmlenz@1: closures.reverse() cmlenz@1: for closure, node_test, predicates in closures: cmlenz@1: stack[-1] -= 1 cmlenz@1: if closure: cmlenz@1: matched = node_test(kind, data, pos) cmlenz@1: if matched: cmlenz@1: stack[-1] += 1 cmlenz@1: break cmlenz@1: cmlenz@1: return None cmlenz@1: cmlenz@1: return _test cmlenz@1: cmlenz@1: class any_element(object): cmlenz@1: def __call__(self, kind, data, pos): cmlenz@1: if kind is Stream.START: cmlenz@1: return True cmlenz@1: return None cmlenz@1: def __repr__(self): cmlenz@1: return '<%s>' % self.__class__.__name__ cmlenz@1: cmlenz@1: class element_by_name(object): cmlenz@1: def __init__(self, name): cmlenz@1: self.name = QName(name) cmlenz@1: def __call__(self, kind, data, pos): cmlenz@1: if kind is Stream.START: cmlenz@1: return data[0].localname == self.name cmlenz@1: return None cmlenz@1: def __repr__(self): cmlenz@1: return '<%s "%s">' % (self.__class__.__name__, self.name) cmlenz@1: cmlenz@1: class any_attribute(object): cmlenz@1: def __call__(self, kind, data, pos): cmlenz@1: if kind is Stream.START: cmlenz@1: text = ''.join([val for name, val in data[1]]) cmlenz@1: if text: cmlenz@1: return Stream.TEXT, text, pos cmlenz@1: return None cmlenz@1: return None cmlenz@1: def __repr__(self): cmlenz@1: return '<%s>' % (self.__class__.__name__) cmlenz@1: cmlenz@1: class attribute_by_name(object): cmlenz@1: def __init__(self, name): cmlenz@1: self.name = QName(name) cmlenz@1: def __call__(self, kind, data, pos): cmlenz@1: if kind is Stream.START: cmlenz@1: if self.name in data[1]: cmlenz@1: return Stream.TEXT, data[1].get(self.name), pos cmlenz@1: return None cmlenz@1: return None cmlenz@1: def __repr__(self): cmlenz@1: return '<%s "%s">' % (self.__class__.__name__, self.name) cmlenz@1: cmlenz@1: class fn_text(object): cmlenz@1: def __call__(self, kind, data, pos): cmlenz@1: if kind is Stream.TEXT: cmlenz@1: return kind, data, pos cmlenz@1: return None cmlenz@1: def __repr__(self): cmlenz@1: return '<%s>' % (self.__class__.__name__) cmlenz@1: cmlenz@1: class literal_string(object): cmlenz@1: def __init__(self, value): cmlenz@1: self.value = value cmlenz@1: def __call__(self, kind, data, pos): cmlenz@1: return Stream.TEXT, self.value, (-1, -1) cmlenz@1: def __repr__(self): cmlenz@1: return '<%s>' % (self.__class__.__name__) cmlenz@1: cmlenz@1: class op_eq(object): cmlenz@1: def __init__(self, lval, rval): cmlenz@1: self.lval = lval cmlenz@1: self.rval = rval cmlenz@1: def __call__(self, kind, data, pos): cmlenz@1: lval = self.lval(kind, data, pos) cmlenz@1: rval = self.rval(kind, data, pos) cmlenz@1: return (lval and lval[1]) == (rval and rval[1]) cmlenz@1: def __repr__(self): cmlenz@1: return '<%s %r = %r>' % (self.__class__.__name__, self.lval, cmlenz@1: self.rval) cmlenz@1: cmlenz@1: class op_neq(object): cmlenz@1: def __init__(self, lval, rval): cmlenz@1: self.lval = lval cmlenz@1: self.rval = rval cmlenz@1: def __call__(self, kind, data, pos): cmlenz@1: lval = self.lval(kind, data, pos) cmlenz@1: rval = self.rval(kind, data, pos) cmlenz@1: return (lval and lval[1]) != (rval and rval[1]) cmlenz@1: def __repr__(self): cmlenz@1: return '<%s %r != %r>' % (self.__class__.__name__, self.lval, cmlenz@1: self.rval)