cmlenz@1: # -*- coding: utf-8 -*-
cmlenz@1: #
cmlenz@1: # Copyright (C) 2006 Edgewall Software
cmlenz@1: # All rights reserved.
cmlenz@1: #
cmlenz@1: # This software is licensed as described in the file COPYING, which
cmlenz@1: # you should have received as part of this distribution. The terms
cmlenz@1: # are also available at http://trac.edgewall.com/license.html.
cmlenz@1: #
cmlenz@1: # This software consists of voluntary contributions made by many
cmlenz@1: # individuals. For the exact contribution history, see the revision
cmlenz@1: # history and logs, available at http://projects.edgewall.com/trac/.
cmlenz@1:
cmlenz@1: """Basic support for evaluating XPath expressions against streams."""
cmlenz@1:
cmlenz@1: import re
cmlenz@1:
cmlenz@1: from markup.core import QName, Stream
cmlenz@1:
cmlenz@1: __all__ = ['Path']
cmlenz@1:
cmlenz@1: _QUOTES = (("'", "'"), ('"', '"'))
cmlenz@1:
cmlenz@1: class Path(object):
cmlenz@1: """Basic XPath support on markup event streams.
cmlenz@1:
cmlenz@1: >>> from markup.input import XML
cmlenz@1:
cmlenz@1: Selecting specific tags:
cmlenz@1:
cmlenz@1: >>> Path('root').select(XML('')).render()
cmlenz@1: ''
cmlenz@1: >>> Path('//root').select(XML('')).render()
cmlenz@1: ''
cmlenz@1:
cmlenz@1: Using wildcards for tag names:
cmlenz@1:
cmlenz@1: >>> Path('*').select(XML('')).render()
cmlenz@1: ''
cmlenz@1: >>> Path('//*').select(XML('')).render()
cmlenz@1: ''
cmlenz@1:
cmlenz@1: Selecting attribute values:
cmlenz@1:
cmlenz@1: >>> Path('@foo').select(XML('')).render()
cmlenz@1: ''
cmlenz@1: >>> Path('@foo').select(XML('')).render()
cmlenz@1: 'bar'
cmlenz@1:
cmlenz@1: Selecting descendants:
cmlenz@1:
cmlenz@1: >>> Path("root/*").select(XML('')).render()
cmlenz@1: ''
cmlenz@1: >>> Path("root/bar").select(XML('')).render()
cmlenz@1: ''
cmlenz@1: >>> Path("root/baz").select(XML('')).render()
cmlenz@1: ''
cmlenz@1: >>> Path("root/foo/*").select(XML('')).render()
cmlenz@1: ''
cmlenz@1:
cmlenz@1: Selecting text nodes:
cmlenz@1: >>> Path("item/text()").select(XML('- Foo
')).render()
cmlenz@1: 'Foo'
cmlenz@1: >>> Path("item/text()").select(XML('- Foo
- Bar
')).render()
cmlenz@1: 'FooBar'
cmlenz@1:
cmlenz@1: Skipping ancestors:
cmlenz@1:
cmlenz@1: >>> Path("foo/bar").select(XML('')).render()
cmlenz@1: ''
cmlenz@1: >>> Path("foo/*").select(XML('')).render()
cmlenz@1: ''
cmlenz@1: >>> Path("root/bar").select(XML('')).render()
cmlenz@1: ''
cmlenz@1: >>> Path("root/bar").select(XML('')).render()
cmlenz@1: ''
cmlenz@1: >>> Path("root/*/bar").select(XML('')).render()
cmlenz@1: ''
cmlenz@1: >>> Path("root//bar").select(XML('')).render()
cmlenz@1: ''
cmlenz@1: >>> Path("root//bar").select(XML('')).render()
cmlenz@1: ''
cmlenz@1:
cmlenz@1: Using simple attribute predicates:
cmlenz@1: >>> Path("root/item[@important]").select(XML(' ')).render()
cmlenz@1: ' '
cmlenz@1: >>> Path('root/item[@important="very"]').select(XML(' ')).render()
cmlenz@1: ' '
cmlenz@1: >>> Path("root/item[@important='very']").select(XML(' ')).render()
cmlenz@1: ''
cmlenz@1: >>> Path("root/item[@important!='very']").select(
cmlenz@1: ... XML(' ')).render()
cmlenz@1: ' '
cmlenz@1: """
cmlenz@1:
cmlenz@1: _TOKEN_RE = re.compile('(::|\.\.|\(\)|[/.:\[\]\(\)@=!])|'
cmlenz@1: '([^/:\[\]\(\)@=!\s]+)|'
cmlenz@1: '\s+')
cmlenz@1:
cmlenz@1: def __init__(self, text):
cmlenz@1: self.source = text
cmlenz@1:
cmlenz@1: steps = []
cmlenz@1: cur_op = ''
cmlenz@1: cur_tag = ''
cmlenz@1: in_predicate = False
cmlenz@1: for op, tag in self._TOKEN_RE.findall(text):
cmlenz@1: if op:
cmlenz@1: if op == '[':
cmlenz@1: in_predicate = True
cmlenz@1: elif op == ']':
cmlenz@1: in_predicate = False
cmlenz@1: elif op.startswith('('):
cmlenz@1: if cur_tag == 'text':
cmlenz@1: steps[-1] = (False, self.fn_text(), [])
cmlenz@1: else:
cmlenz@1: raise NotImplementedError('XPath function "%s" not '
cmlenz@1: 'supported' % cur_tag)
cmlenz@1: else:
cmlenz@1: cur_op += op
cmlenz@1: cur_tag = ''
cmlenz@1: else:
cmlenz@1: closure = cur_op in ('', '//')
cmlenz@1: if cur_op == '@':
cmlenz@1: if tag == '*':
cmlenz@1: node_test = self.any_attribute()
cmlenz@1: else:
cmlenz@1: node_test = self.attribute_by_name(tag)
cmlenz@1: else:
cmlenz@1: if tag == '*':
cmlenz@1: node_test = self.any_element()
cmlenz@1: elif in_predicate:
cmlenz@1: if len(tag) > 1 and (tag[0], tag[-1]) in _QUOTES:
cmlenz@1: node_test = self.literal_string(tag[1:-1])
cmlenz@1: if cur_op == '=':
cmlenz@1: node_test = self.op_eq(steps[-1][2][-1], node_test)
cmlenz@1: steps[-1][2].pop()
cmlenz@1: elif cur_op == '!=':
cmlenz@1: node_test = self.op_neq(steps[-1][2][-1], node_test)
cmlenz@1: steps[-1][2].pop()
cmlenz@1: else:
cmlenz@1: node_test = self.element_by_name(tag)
cmlenz@1: if in_predicate:
cmlenz@1: steps[-1][2].append(node_test)
cmlenz@1: else:
cmlenz@1: steps.append([closure, node_test, []])
cmlenz@1: cur_op = ''
cmlenz@1: cur_tag = tag
cmlenz@1: self.steps = steps
cmlenz@1:
cmlenz@1: def __repr__(self):
cmlenz@1: return '<%s "%s">' % (self.__class__.__name__, self.source)
cmlenz@1:
cmlenz@1: def select(self, stream):
cmlenz@1: stream = iter(stream)
cmlenz@1: def _generate(tests):
cmlenz@1: test = self.test()
cmlenz@1: for kind, data, pos in stream:
cmlenz@1: result = test(kind, data, pos)
cmlenz@1: if result is True:
cmlenz@1: yield kind, data, pos
cmlenz@1: depth = 1
cmlenz@1: while depth > 0:
cmlenz@1: ev = stream.next()
cmlenz@1: if ev[0] is Stream.START:
cmlenz@1: depth += 1
cmlenz@1: elif ev[0] is Stream.END:
cmlenz@1: depth -= 1
cmlenz@1: yield ev
cmlenz@1: test(*ev)
cmlenz@1: elif result:
cmlenz@1: yield result
cmlenz@1: return Stream(_generate(self.steps))
cmlenz@1:
cmlenz@1: def test(self):
cmlenz@1: stack = [0] # stack of cursors into the location path
cmlenz@1:
cmlenz@1: def _test(kind, data, pos):
cmlenz@1: #print '\nTracker %r test [%s] %r' % (self, kind, data)
cmlenz@1:
cmlenz@1: if not stack:
cmlenz@1: return False
cmlenz@1:
cmlenz@1: if kind is Stream.END:
cmlenz@1: stack.pop()
cmlenz@1: return None
cmlenz@1:
cmlenz@1: if kind is Stream.START:
cmlenz@1: stack.append(stack[-1])
cmlenz@1:
cmlenz@1: matched = False
cmlenz@1: closure, node_test, predicates = self.steps[stack[-1]]
cmlenz@1:
cmlenz@1: #print ' Testing against %r' % node_test
cmlenz@1: matched = node_test(kind, data, pos)
cmlenz@1: if matched and predicates:
cmlenz@1: for predicate in predicates:
cmlenz@1: if not predicate(kind, data, pos):
cmlenz@1: matched = None
cmlenz@1: break
cmlenz@1:
cmlenz@1: if matched:
cmlenz@1: if stack[-1] == len(self.steps) - 1:
cmlenz@1: #print ' Last step %r... returned %r' % (node_test, matched)
cmlenz@1: return matched
cmlenz@1:
cmlenz@1: #print ' Matched intermediate step %r... proceed to next step %r' % (node_test, self.steps[stack[-1] + 1])
cmlenz@1: stack[-1] += 1
cmlenz@1:
cmlenz@1: elif kind is Stream.START and not closure:
cmlenz@1: # FIXME: If this step is not a closure, it cannot be matched
cmlenz@1: # until the current element is closed... so we need to
cmlenz@1: # move the cursor back to the last closure and retest
cmlenz@1: # that against the current element
cmlenz@1: closures = [step for step in self.steps[:stack[-1]] if step[0]]
cmlenz@1: closures.reverse()
cmlenz@1: for closure, node_test, predicates in closures:
cmlenz@1: stack[-1] -= 1
cmlenz@1: if closure:
cmlenz@1: matched = node_test(kind, data, pos)
cmlenz@1: if matched:
cmlenz@1: stack[-1] += 1
cmlenz@1: break
cmlenz@1:
cmlenz@1: return None
cmlenz@1:
cmlenz@1: return _test
cmlenz@1:
cmlenz@1: class any_element(object):
cmlenz@1: def __call__(self, kind, data, pos):
cmlenz@1: if kind is Stream.START:
cmlenz@1: return True
cmlenz@1: return None
cmlenz@1: def __repr__(self):
cmlenz@1: return '<%s>' % self.__class__.__name__
cmlenz@1:
cmlenz@1: class element_by_name(object):
cmlenz@1: def __init__(self, name):
cmlenz@1: self.name = QName(name)
cmlenz@1: def __call__(self, kind, data, pos):
cmlenz@1: if kind is Stream.START:
cmlenz@1: return data[0].localname == self.name
cmlenz@1: return None
cmlenz@1: def __repr__(self):
cmlenz@1: return '<%s "%s">' % (self.__class__.__name__, self.name)
cmlenz@1:
cmlenz@1: class any_attribute(object):
cmlenz@1: def __call__(self, kind, data, pos):
cmlenz@1: if kind is Stream.START:
cmlenz@1: text = ''.join([val for name, val in data[1]])
cmlenz@1: if text:
cmlenz@1: return Stream.TEXT, text, pos
cmlenz@1: return None
cmlenz@1: return None
cmlenz@1: def __repr__(self):
cmlenz@1: return '<%s>' % (self.__class__.__name__)
cmlenz@1:
cmlenz@1: class attribute_by_name(object):
cmlenz@1: def __init__(self, name):
cmlenz@1: self.name = QName(name)
cmlenz@1: def __call__(self, kind, data, pos):
cmlenz@1: if kind is Stream.START:
cmlenz@1: if self.name in data[1]:
cmlenz@1: return Stream.TEXT, data[1].get(self.name), pos
cmlenz@1: return None
cmlenz@1: return None
cmlenz@1: def __repr__(self):
cmlenz@1: return '<%s "%s">' % (self.__class__.__name__, self.name)
cmlenz@1:
cmlenz@1: class fn_text(object):
cmlenz@1: def __call__(self, kind, data, pos):
cmlenz@1: if kind is Stream.TEXT:
cmlenz@1: return kind, data, pos
cmlenz@1: return None
cmlenz@1: def __repr__(self):
cmlenz@1: return '<%s>' % (self.__class__.__name__)
cmlenz@1:
cmlenz@1: class literal_string(object):
cmlenz@1: def __init__(self, value):
cmlenz@1: self.value = value
cmlenz@1: def __call__(self, kind, data, pos):
cmlenz@1: return Stream.TEXT, self.value, (-1, -1)
cmlenz@1: def __repr__(self):
cmlenz@1: return '<%s>' % (self.__class__.__name__)
cmlenz@1:
cmlenz@1: class op_eq(object):
cmlenz@1: def __init__(self, lval, rval):
cmlenz@1: self.lval = lval
cmlenz@1: self.rval = rval
cmlenz@1: def __call__(self, kind, data, pos):
cmlenz@1: lval = self.lval(kind, data, pos)
cmlenz@1: rval = self.rval(kind, data, pos)
cmlenz@1: return (lval and lval[1]) == (rval and rval[1])
cmlenz@1: def __repr__(self):
cmlenz@1: return '<%s %r = %r>' % (self.__class__.__name__, self.lval,
cmlenz@1: self.rval)
cmlenz@1:
cmlenz@1: class op_neq(object):
cmlenz@1: def __init__(self, lval, rval):
cmlenz@1: self.lval = lval
cmlenz@1: self.rval = rval
cmlenz@1: def __call__(self, kind, data, pos):
cmlenz@1: lval = self.lval(kind, data, pos)
cmlenz@1: rval = self.rval(kind, data, pos)
cmlenz@1: return (lval and lval[1]) != (rval and rval[1])
cmlenz@1: def __repr__(self):
cmlenz@1: return '<%s %r != %r>' % (self.__class__.__name__, self.lval,
cmlenz@1: self.rval)