cmlenz@1: # -*- coding: utf-8 -*-
cmlenz@1: #
cmlenz@27: # Copyright (C) 2006 Christopher Lenz
cmlenz@1: # All rights reserved.
cmlenz@1: #
cmlenz@1: # This software is licensed as described in the file COPYING, which
cmlenz@1: # you should have received as part of this distribution. The terms
cmlenz@27: # are also available at http://markup.cmlenz.net/wiki/License.
cmlenz@1: #
cmlenz@1: # This software consists of voluntary contributions made by many
cmlenz@1: # individuals. For the exact contribution history, see the revision
cmlenz@27: # history and logs, available at http://markup.cmlenz.net/log/.
cmlenz@1:
cmlenz@1: """Basic support for evaluating XPath expressions against streams."""
cmlenz@1:
cmlenz@1: import re
cmlenz@1:
cmlenz@1: from markup.core import QName, Stream
cmlenz@1:
cmlenz@1: __all__ = ['Path']
cmlenz@1:
cmlenz@1:
cmlenz@1: class Path(object):
cmlenz@26: """Implements basic XPath support on streams.
cmlenz@1:
cmlenz@26: Instances of this class represent a "compiled" XPath expression, and provide
cmlenz@26: methods for testing the path against a stream, as well as extracting a
cmlenz@26: substream matching that path.
cmlenz@1: """
cmlenz@1: _TOKEN_RE = re.compile('(::|\.\.|\(\)|[/.:\[\]\(\)@=!])|'
cmlenz@1: '([^/:\[\]\(\)@=!\s]+)|'
cmlenz@1: '\s+')
cmlenz@26: _QUOTES = (("'", "'"), ('"', '"'))
cmlenz@1:
cmlenz@1: def __init__(self, text):
cmlenz@26: """Create the path object from a string.
cmlenz@26:
cmlenz@26: @param text: the path expression
cmlenz@26: """
cmlenz@1: self.source = text
cmlenz@1:
cmlenz@1: steps = []
cmlenz@1: cur_op = ''
cmlenz@1: cur_tag = ''
cmlenz@1: in_predicate = False
cmlenz@1: for op, tag in self._TOKEN_RE.findall(text):
cmlenz@1: if op:
cmlenz@1: if op == '[':
cmlenz@1: in_predicate = True
cmlenz@1: elif op == ']':
cmlenz@1: in_predicate = False
cmlenz@1: elif op.startswith('('):
cmlenz@1: if cur_tag == 'text':
cmlenz@26: steps[-1] = (False, self._FunctionText(), [])
cmlenz@1: else:
cmlenz@1: raise NotImplementedError('XPath function "%s" not '
cmlenz@1: 'supported' % cur_tag)
cmlenz@38: elif op == '.':
cmlenz@38: steps.append([False, self._CurrentElement(), []])
cmlenz@1: else:
cmlenz@1: cur_op += op
cmlenz@1: cur_tag = ''
cmlenz@1: else:
cmlenz@1: closure = cur_op in ('', '//')
cmlenz@1: if cur_op == '@':
cmlenz@1: if tag == '*':
cmlenz@26: node_test = self._AnyAttribute()
cmlenz@1: else:
cmlenz@26: node_test = self._AttributeByName(tag)
cmlenz@1: else:
cmlenz@1: if tag == '*':
cmlenz@38: node_test = self._AnyChildElement()
cmlenz@1: elif in_predicate:
cmlenz@26: if len(tag) > 1 and (tag[0], tag[-1]) in self._QUOTES:
cmlenz@26: node_test = self._LiteralString(tag[1:-1])
cmlenz@1: if cur_op == '=':
cmlenz@26: node_test = self._OperatorEq(steps[-1][2][-1],
cmlenz@26: node_test)
cmlenz@1: steps[-1][2].pop()
cmlenz@1: elif cur_op == '!=':
cmlenz@26: node_test = self._OperatorNeq(steps[-1][2][-1],
cmlenz@26: node_test)
cmlenz@1: steps[-1][2].pop()
cmlenz@1: else:
cmlenz@38: node_test = self._ChildElementByName(tag)
cmlenz@1: if in_predicate:
cmlenz@1: steps[-1][2].append(node_test)
cmlenz@1: else:
cmlenz@1: steps.append([closure, node_test, []])
cmlenz@1: cur_op = ''
cmlenz@1: cur_tag = tag
cmlenz@38:
cmlenz@38: self.steps = []
cmlenz@38: for step in steps:
cmlenz@38: self.steps.append(tuple(step))
cmlenz@1:
cmlenz@1: def __repr__(self):
cmlenz@1: return '<%s "%s">' % (self.__class__.__name__, self.source)
cmlenz@1:
cmlenz@1: def select(self, stream):
cmlenz@26: """Returns a substream of the given stream that matches the path.
cmlenz@26:
cmlenz@26: If there are no matches, this method returns an empty stream.
cmlenz@26:
cmlenz@33: >>> from markup.input import XML
cmlenz@33: >>> xml = XML('Text')
cmlenz@61:
cmlenz@33: >>> print Path('child').select(xml)
cmlenz@33: Text
cmlenz@33:
cmlenz@33: >>> print Path('child/text()').select(xml)
cmlenz@33: Text
cmlenz@33:
cmlenz@26: @param stream: the stream to select from
cmlenz@26: @return: the substream matching the path, or an empty stream
cmlenz@26: """
cmlenz@1: stream = iter(stream)
cmlenz@26: def _generate():
cmlenz@1: test = self.test()
cmlenz@1: for kind, data, pos in stream:
cmlenz@1: result = test(kind, data, pos)
cmlenz@1: if result is True:
cmlenz@1: yield kind, data, pos
cmlenz@1: depth = 1
cmlenz@1: while depth > 0:
cmlenz@1: ev = stream.next()
cmlenz@38: depth += {Stream.START: 1, Stream.END: -1}.get(ev[0], 0)
cmlenz@1: yield ev
cmlenz@1: test(*ev)
cmlenz@1: elif result:
cmlenz@1: yield result
cmlenz@26: return Stream(_generate())
cmlenz@1:
cmlenz@38: def test(self, ignore_context=False):
cmlenz@26: """Returns a function that can be used to track whether the path matches
cmlenz@26: a specific stream event.
cmlenz@26:
cmlenz@26: The function returned expects the positional arguments `kind`, `data`,
cmlenz@26: and `pos`, i.e. basically an unpacked stream event. If the path matches
cmlenz@26: the event, the function returns the match (for example, a `START` or
cmlenz@26: `TEXT` event.) Otherwise, it returns `None` or `False`.
cmlenz@33:
cmlenz@33: >>> from markup.input import XML
cmlenz@33: >>> xml = XML('')
cmlenz@33: >>> test = Path('child').test()
cmlenz@33: >>> for kind, data, pos in xml:
cmlenz@33: ... if test(kind, data, pos):
cmlenz@33: ... print kind, data
cmlenz@33: START (u'child', [(u'id', u'1')])
cmlenz@33: START (u'child', [(u'id', u'2')])
cmlenz@26: """
cmlenz@1: stack = [0] # stack of cursors into the location path
cmlenz@1:
cmlenz@1: def _test(kind, data, pos):
cmlenz@1: if not stack:
cmlenz@1: return False
cmlenz@1:
cmlenz@1: if kind is Stream.END:
cmlenz@1: stack.pop()
cmlenz@1: return None
cmlenz@1:
cmlenz@1: if kind is Stream.START:
cmlenz@1: stack.append(stack[-1])
cmlenz@1:
cmlenz@1: matched = False
cmlenz@1: closure, node_test, predicates = self.steps[stack[-1]]
cmlenz@1:
cmlenz@1: matched = node_test(kind, data, pos)
cmlenz@1: if matched and predicates:
cmlenz@1: for predicate in predicates:
cmlenz@1: if not predicate(kind, data, pos):
cmlenz@1: matched = None
cmlenz@1: break
cmlenz@1:
cmlenz@1: if matched:
cmlenz@1: if stack[-1] == len(self.steps) - 1:
cmlenz@38: if ignore_context or len(stack) > 2 \
cmlenz@38: or node_test.axis != 'child':
cmlenz@38: return matched
cmlenz@38: else:
cmlenz@38: stack[-1] += 1
cmlenz@1:
cmlenz@1: elif kind is Stream.START and not closure:
cmlenz@24: # If this step is not a closure, it cannot be matched until the
cmlenz@24: # current element is closed... so we need to move the cursor
cmlenz@24: # back to the last closure and retest that against the current
cmlenz@24: # element
cmlenz@1: closures = [step for step in self.steps[:stack[-1]] if step[0]]
cmlenz@25: closures.reverse()
cmlenz@1: for closure, node_test, predicates in closures:
cmlenz@1: stack[-1] -= 1
cmlenz@1: if closure:
cmlenz@1: matched = node_test(kind, data, pos)
cmlenz@1: if matched:
cmlenz@1: stack[-1] += 1
cmlenz@1: break
cmlenz@1:
cmlenz@1: return None
cmlenz@1:
cmlenz@1: return _test
cmlenz@1:
cmlenz@38: class _NodeTest(object):
cmlenz@38: """Abstract node test."""
cmlenz@38: axis = None
cmlenz@38: def __repr__(self):
cmlenz@38: return '<%s>' % self.__class__.__name__
cmlenz@38:
cmlenz@38: class _CurrentElement(_NodeTest):
cmlenz@38: """Node test that matches the context node."""
cmlenz@38: axis = 'self'
cmlenz@26: def __call__(self, kind, *_):
cmlenz@1: if kind is Stream.START:
cmlenz@1: return True
cmlenz@1: return None
cmlenz@1:
cmlenz@38: class _AnyChildElement(_NodeTest):
cmlenz@38: """Node test that matches any child element."""
cmlenz@38: axis = 'child'
cmlenz@38: def __call__(self, kind, *_):
cmlenz@38: if kind is Stream.START:
cmlenz@38: return True
cmlenz@38: return None
cmlenz@38:
cmlenz@38: class _ChildElementByName(_NodeTest):
cmlenz@38: """Node test that matches a child element with a specific tag name."""
cmlenz@38: axis = 'child'
cmlenz@1: def __init__(self, name):
cmlenz@1: self.name = QName(name)
cmlenz@26: def __call__(self, kind, data, _):
cmlenz@1: if kind is Stream.START:
cmlenz@1: return data[0].localname == self.name
cmlenz@1: return None
cmlenz@1: def __repr__(self):
cmlenz@1: return '<%s "%s">' % (self.__class__.__name__, self.name)
cmlenz@1:
cmlenz@38: class _AnyAttribute(_NodeTest):
cmlenz@26: """Node test that matches any attribute."""
cmlenz@38: axis = 'attribute'
cmlenz@1: def __call__(self, kind, data, pos):
cmlenz@1: if kind is Stream.START:
cmlenz@26: text = ''.join([val for _, val in data[1]])
cmlenz@1: if text:
cmlenz@1: return Stream.TEXT, text, pos
cmlenz@1: return None
cmlenz@1: return None
cmlenz@1:
cmlenz@38: class _AttributeByName(_NodeTest):
cmlenz@26: """Node test that matches an attribute with a specific name."""
cmlenz@38: axis = 'attribute'
cmlenz@1: def __init__(self, name):
cmlenz@1: self.name = QName(name)
cmlenz@1: def __call__(self, kind, data, pos):
cmlenz@1: if kind is Stream.START:
cmlenz@1: if self.name in data[1]:
cmlenz@1: return Stream.TEXT, data[1].get(self.name), pos
cmlenz@1: return None
cmlenz@1: return None
cmlenz@1: def __repr__(self):
cmlenz@1: return '<%s "%s">' % (self.__class__.__name__, self.name)
cmlenz@1:
cmlenz@38: class _Function(_NodeTest):
cmlenz@38: """Abstract node test representing a function."""
cmlenz@38:
cmlenz@38: class _FunctionText(_Function):
cmlenz@26: """Function that returns text content."""
cmlenz@1: def __call__(self, kind, data, pos):
cmlenz@1: if kind is Stream.TEXT:
cmlenz@1: return kind, data, pos
cmlenz@1: return None
cmlenz@1:
cmlenz@38: class _LiteralString(_NodeTest):
cmlenz@26: """Always returns a literal string."""
cmlenz@1: def __init__(self, value):
cmlenz@1: self.value = value
cmlenz@26: def __call__(self, *_):
cmlenz@1: return Stream.TEXT, self.value, (-1, -1)
cmlenz@1:
cmlenz@38: class _OperatorEq(_NodeTest):
cmlenz@26: """Equality comparison operator."""
cmlenz@1: def __init__(self, lval, rval):
cmlenz@1: self.lval = lval
cmlenz@1: self.rval = rval
cmlenz@1: def __call__(self, kind, data, pos):
cmlenz@1: lval = self.lval(kind, data, pos)
cmlenz@1: rval = self.rval(kind, data, pos)
cmlenz@1: return (lval and lval[1]) == (rval and rval[1])
cmlenz@1: def __repr__(self):
cmlenz@1: return '<%s %r = %r>' % (self.__class__.__name__, self.lval,
cmlenz@1: self.rval)
cmlenz@1:
cmlenz@38: class _OperatorNeq(_NodeTest):
cmlenz@26: """Inequality comparison operator."""
cmlenz@1: def __init__(self, lval, rval):
cmlenz@1: self.lval = lval
cmlenz@1: self.rval = rval
cmlenz@1: def __call__(self, kind, data, pos):
cmlenz@1: lval = self.lval(kind, data, pos)
cmlenz@1: rval = self.rval(kind, data, pos)
cmlenz@1: return (lval and lval[1]) != (rval and rval[1])
cmlenz@1: def __repr__(self):
cmlenz@1: return '<%s %r != %r>' % (self.__class__.__name__, self.lval,
cmlenz@1: self.rval)