cmlenz@1: # -*- coding: utf-8 -*-
cmlenz@1: #
cmlenz@66: # Copyright (C) 2006 Edgewall Software
cmlenz@1: # All rights reserved.
cmlenz@1: #
cmlenz@1: # This software is licensed as described in the file COPYING, which
cmlenz@1: # you should have received as part of this distribution. The terms
cmlenz@66: # are also available at http://markup.edgewall.org/wiki/License.
cmlenz@1: #
cmlenz@1: # This software consists of voluntary contributions made by many
cmlenz@1: # individuals. For the exact contribution history, see the revision
cmlenz@66: # history and logs, available at http://markup.edgewall.org/log/.
cmlenz@1:
cmlenz@111: """Basic support for evaluating XPath expressions against streams.
cmlenz@111:
cmlenz@111: >>> from markup.input import XML
cmlenz@111: >>> doc = XML('''
cmlenz@111: ...
cmlenz@111: ... -
cmlenz@111: ... Foo
cmlenz@111: ...
cmlenz@111: ... -
cmlenz@111: ... Bar
cmlenz@111: ...
cmlenz@111: ...
cmlenz@111: ... ''')
cmlenz@111: >>> print doc.select('items/item[@status="closed"]/summary/text()')
cmlenz@111: Bar
cmlenz@111:
cmlenz@111: Because the XPath engine operates on markup streams (as opposed to tree
cmlenz@111: structures), it only implements a subset of the full XPath 1.0 language.
cmlenz@111: """
cmlenz@1:
cmlenz@1: import re
cmlenz@1:
cmlenz@106: from markup.core import QName, Stream, START, END, TEXT, COMMENT, PI
cmlenz@1:
cmlenz@106: __all__ = ['Path', 'PathSyntaxError']
cmlenz@1:
cmlenz@1:
cmlenz@114: class Axis(object):
cmlenz@114: """Defines constants for the various supported XPath axes."""
cmlenz@114:
cmlenz@114: ATTRIBUTE = 'attribute'
cmlenz@114: CHILD = 'child'
cmlenz@114: DESCENDANT = 'descendant'
cmlenz@114: DESCENDANT_OR_SELF = 'descendant-or-self'
cmlenz@114: NAMESPACE = 'namespace'
cmlenz@114: SELF = 'self'
cmlenz@114:
cmlenz@114: def forname(cls, name):
cmlenz@114: """Return the axis constant for the given name, or `None` if no such
cmlenz@114: axis was defined.
cmlenz@114: """
cmlenz@114: return getattr(cls, name.upper().replace('-', '_'), None)
cmlenz@114: forname = classmethod(forname)
cmlenz@114:
cmlenz@114:
cmlenz@114: ATTRIBUTE = Axis.ATTRIBUTE
cmlenz@114: CHILD = Axis.CHILD
cmlenz@114: DESCENDANT = Axis.DESCENDANT
cmlenz@114: DESCENDANT_OR_SELF = Axis.DESCENDANT_OR_SELF
cmlenz@114: NAMESPACE = Axis.NAMESPACE
cmlenz@114: SELF = Axis.SELF
cmlenz@114:
cmlenz@114:
cmlenz@1: class Path(object):
cmlenz@26: """Implements basic XPath support on streams.
cmlenz@1:
cmlenz@26: Instances of this class represent a "compiled" XPath expression, and provide
cmlenz@26: methods for testing the path against a stream, as well as extracting a
cmlenz@26: substream matching that path.
cmlenz@1: """
cmlenz@1:
cmlenz@1: def __init__(self, text):
cmlenz@26: """Create the path object from a string.
cmlenz@26:
cmlenz@26: @param text: the path expression
cmlenz@26: """
cmlenz@1: self.source = text
cmlenz@106: self.paths = _PathParser(text).parse()
cmlenz@1:
cmlenz@1: def __repr__(self):
cmlenz@1: return '<%s "%s">' % (self.__class__.__name__, self.source)
cmlenz@1:
cmlenz@1: def select(self, stream):
cmlenz@26: """Returns a substream of the given stream that matches the path.
cmlenz@26:
cmlenz@26: If there are no matches, this method returns an empty stream.
cmlenz@26:
cmlenz@33: >>> from markup.input import XML
cmlenz@33: >>> xml = XML('Text')
cmlenz@61:
cmlenz@33: >>> print Path('child').select(xml)
cmlenz@33: Text
cmlenz@33:
cmlenz@33: >>> print Path('child/text()').select(xml)
cmlenz@33: Text
cmlenz@33:
cmlenz@26: @param stream: the stream to select from
cmlenz@26: @return: the substream matching the path, or an empty stream
cmlenz@26: """
cmlenz@1: stream = iter(stream)
cmlenz@26: def _generate():
cmlenz@1: test = self.test()
cmlenz@1: for kind, data, pos in stream:
cmlenz@1: result = test(kind, data, pos)
cmlenz@1: if result is True:
cmlenz@1: yield kind, data, pos
cmlenz@1: depth = 1
cmlenz@1: while depth > 0:
cmlenz@73: subkind, subdata, subpos = stream.next()
cmlenz@73: if subkind is START:
cmlenz@73: depth += 1
cmlenz@73: elif subkind is END:
cmlenz@73: depth -= 1
cmlenz@73: yield subkind, subdata, subpos
cmlenz@73: test(subkind, subdata, subpos)
cmlenz@1: elif result:
cmlenz@1: yield result
cmlenz@26: return Stream(_generate())
cmlenz@1:
cmlenz@38: def test(self, ignore_context=False):
cmlenz@26: """Returns a function that can be used to track whether the path matches
cmlenz@26: a specific stream event.
cmlenz@26:
cmlenz@26: The function returned expects the positional arguments `kind`, `data`,
cmlenz@26: and `pos`, i.e. basically an unpacked stream event. If the path matches
cmlenz@26: the event, the function returns the match (for example, a `START` or
cmlenz@106: `TEXT` event.) Otherwise, it returns `None`.
cmlenz@33:
cmlenz@33: >>> from markup.input import XML
cmlenz@33: >>> xml = XML('')
cmlenz@33: >>> test = Path('child').test()
cmlenz@33: >>> for kind, data, pos in xml:
cmlenz@33: ... if test(kind, data, pos):
cmlenz@33: ... print kind, data
cmlenz@33: START (u'child', [(u'id', u'1')])
cmlenz@33: START (u'child', [(u'id', u'2')])
cmlenz@26: """
cmlenz@106: paths = [(idx, steps, len(steps), [0])
cmlenz@106: for idx, steps in enumerate(self.paths)]
cmlenz@1:
cmlenz@1: def _test(kind, data, pos):
cmlenz@106: for idx, steps, size, stack in paths:
cmlenz@106: if not stack:
cmlenz@106: continue
cmlenz@106: cursor = stack[-1]
cmlenz@1:
cmlenz@106: if kind is END:
cmlenz@106: stack.pop()
cmlenz@106: continue
cmlenz@1:
cmlenz@106: elif kind is START:
cmlenz@106: stack.append(cursor)
cmlenz@106:
cmlenz@106: matched = None
cmlenz@111: while 1:
cmlenz@111: axis, node_test, predicates = steps[cursor]
cmlenz@106:
cmlenz@111: matched = node_test(kind, data, pos)
cmlenz@111: if matched and predicates:
cmlenz@111: for predicate in predicates:
cmlenz@111: if not predicate(kind, data, pos):
cmlenz@111: matched = None
cmlenz@111: break
cmlenz@106:
cmlenz@111: if matched:
cmlenz@111: if cursor + 1 == size: # the last location step
cmlenz@111: if ignore_context or \
cmlenz@111: kind is not START or \
cmlenz@114: axis in (ATTRIBUTE, SELF) or \
cmlenz@111: len(stack) > 2:
cmlenz@111: return matched
cmlenz@111: else:
cmlenz@111: cursor += 1
cmlenz@111: stack[-1] = cursor
cmlenz@111:
cmlenz@114: if axis is not SELF:
cmlenz@111: break
cmlenz@111:
cmlenz@111: if not matched and kind is START \
cmlenz@114: and axis not in (DESCENDANT, DESCENDANT_OR_SELF):
cmlenz@106: # If this step is not a closure, it cannot be matched until
cmlenz@106: # the current element is closed... so we need to move the
cmlenz@114: # cursor back to the previous closure and retest that
cmlenz@114: # against the current element
cmlenz@111: backsteps = [step for step in steps[:cursor]
cmlenz@114: if step[0] in (DESCENDANT, DESCENDANT_OR_SELF)]
cmlenz@111: backsteps.reverse()
cmlenz@111: for axis, node_test, predicates in backsteps:
cmlenz@111: matched = node_test(kind, data, pos)
cmlenz@111: if not matched:
cmlenz@111: cursor -= 1
cmlenz@111: break
cmlenz@106: stack[-1] = cursor
cmlenz@1:
cmlenz@1: return None
cmlenz@1:
cmlenz@1: return _test
cmlenz@1:
cmlenz@1:
cmlenz@106: def _node_test_current_element():
cmlenz@106: def _node_test_current_element(kind, *_):
cmlenz@106: return kind is START
cmlenz@106: return _node_test_current_element
cmlenz@77:
cmlenz@106: def _node_test_any_child_element():
cmlenz@106: def _node_test_any_child_element(kind, *_):
cmlenz@106: return kind is START
cmlenz@106: return _node_test_any_child_element
cmlenz@77:
cmlenz@106: def _node_test_child_element_by_name(name):
cmlenz@106: def _node_test_child_element_by_name(kind, data, _):
cmlenz@106: return kind is START and data[0].localname == name
cmlenz@106: return _node_test_child_element_by_name
cmlenz@77:
cmlenz@106: def _node_test_any_attribute():
cmlenz@106: def _node_test_any_attribute(kind, data, _):
cmlenz@106: if kind is START and data[1]:
cmlenz@106: return data[1]
cmlenz@106: return _node_test_any_attribute
cmlenz@106:
cmlenz@106: def _node_test_attribute_by_name(name):
cmlenz@106: def _node_test_attribute_by_name(kind, data, pos):
cmlenz@106: if kind is START and name in data[1]:
cmlenz@106: return TEXT, data[1].get(name), pos
cmlenz@106: return _node_test_attribute_by_name
cmlenz@106:
cmlenz@106: def _function_comment():
cmlenz@106: def _function_comment(kind, data, pos):
cmlenz@106: return kind is COMMENT and (kind, data, pos)
cmlenz@106: return _function_comment
cmlenz@106:
cmlenz@121: def _function_local_name():
cmlenz@121: def _function_local_name(kind, data, pos):
cmlenz@121: if kind is START:
cmlenz@121: return TEXT, data[0].localname, pos
cmlenz@121: return _function_local_name
cmlenz@121:
cmlenz@121: def _function_name():
cmlenz@121: def _function_name(kind, data, pos):
cmlenz@121: if kind is START:
cmlenz@121: return TEXT, data[0], pos
cmlenz@121: return _function_name
cmlenz@121:
cmlenz@121: def _function_namespace_uri():
cmlenz@121: def _function_namespace_uri(kind, data, pos):
cmlenz@121: if kind is START:
cmlenz@121: return TEXT, data[0].namespace, pos
cmlenz@121: return _function_namespace_uri
cmlenz@121:
cmlenz@106: def _function_node():
cmlenz@106: def _function_node(kind, data, pos):
cmlenz@111: if kind is START:
cmlenz@111: return True
cmlenz@111: return kind, data, pos
cmlenz@106: return _function_node
cmlenz@106:
cmlenz@121: def _function_not(expr):
cmlenz@121: def _function_not(kind, data, pos):
cmlenz@121: return not expr(kind, data, pos)
cmlenz@121: return _function_not
cmlenz@121:
cmlenz@106: def _function_processing_instruction(name=None):
cmlenz@106: def _function_processing_instruction(kind, data, pos):
cmlenz@106: if kind is PI and (not name or data[0] == name):
cmlenz@106: return (kind, data, pos)
cmlenz@106: return _function_processing_instruction
cmlenz@106:
cmlenz@106: def _function_text():
cmlenz@106: def _function_text(kind, data, pos):
cmlenz@106: return kind is TEXT and (kind, data, pos)
cmlenz@106: return _function_text
cmlenz@106:
cmlenz@106: def _literal_string(text):
cmlenz@106: def _literal_string(*_):
cmlenz@106: return TEXT, text, (None, -1, -1)
cmlenz@106: return _literal_string
cmlenz@106:
cmlenz@106: def _operator_eq(lval, rval):
cmlenz@106: def _operator_eq(kind, data, pos):
cmlenz@106: lv = lval(kind, data, pos)
cmlenz@121: if type(lv) is tuple:
cmlenz@121: lv = lv[1]
cmlenz@106: rv = rval(kind, data, pos)
cmlenz@121: if type(rv) is tuple:
cmlenz@121: rv = rv[1]
cmlenz@121: return lv == rv
cmlenz@106: return _operator_eq
cmlenz@106:
cmlenz@106: def _operator_neq(lval, rval):
cmlenz@106: def _operator_neq(kind, data, pos):
cmlenz@106: lv = lval(kind, data, pos)
cmlenz@121: if type(lv) is tuple:
cmlenz@121: lv = lv[1]
cmlenz@106: rv = rval(kind, data, pos)
cmlenz@121: if type(rv) is tuple:
cmlenz@121: rv = rv[1]
cmlenz@121: return lv != rv
cmlenz@106: return _operator_neq
cmlenz@106:
cmlenz@106: def _operator_and(lval, rval):
cmlenz@106: def _operator_and(kind, data, pos):
cmlenz@106: lv = lval(kind, data, pos)
cmlenz@121: if type(lv) is tuple:
cmlenz@121: lv = lv[1]
cmlenz@121: if not lv:
cmlenz@106: return False
cmlenz@106: rv = rval(kind, data, pos)
cmlenz@121: if type(rv) is tuple:
cmlenz@121: rv = rv[1]
cmlenz@121: return bool(rv)
cmlenz@106: return _operator_and
cmlenz@106:
cmlenz@106: def _operator_or(lval, rval):
cmlenz@106: def _operator_or(kind, data, pos):
cmlenz@106: lv = lval(kind, data, pos)
cmlenz@121: if type(lv) is tuple:
cmlenz@121: lv = lv[1]
cmlenz@121: if lv:
cmlenz@106: return True
cmlenz@106: rv = rval(kind, data, pos)
cmlenz@121: if type(rv) is tuple:
cmlenz@121: rv = rv[1]
cmlenz@121: return bool(rv)
cmlenz@106: return _operator_or
cmlenz@106:
cmlenz@106:
cmlenz@106: class PathSyntaxError(Exception):
cmlenz@106: """Exception raised when an XPath expression is syntactically incorrect."""
cmlenz@106:
cmlenz@106: def __init__(self, message, filename=None, lineno=-1, offset=-1):
cmlenz@106: if filename:
cmlenz@106: message = '%s (%s, line %d)' % (message, filename, lineno)
cmlenz@106: Exception.__init__(self, message)
cmlenz@106: self.filename = filename
cmlenz@106: self.lineno = lineno
cmlenz@106: self.offset = offset
cmlenz@106:
cmlenz@106:
cmlenz@106: class _PathParser(object):
cmlenz@106: """Tokenizes and parses an XPath expression."""
cmlenz@106:
cmlenz@106: _QUOTES = (("'", "'"), ('"', '"'))
cmlenz@106: _TOKENS = ('::', ':', '..', '.', '//', '/', '[', ']', '()', '(', ')', '@',
cmlenz@121: '=', '!=', '!', '|', ',')
cmlenz@106: _tokenize = re.compile('(%s)|([^%s\s]+)|\s+' % (
cmlenz@106: '|'.join([re.escape(t) for t in _TOKENS]),
cmlenz@106: ''.join([re.escape(t[0]) for t in _TOKENS]))).findall
cmlenz@106:
cmlenz@106: def __init__(self, text):
cmlenz@106: self.tokens = filter(None, [a or b for a, b in self._tokenize(text)])
cmlenz@106: self.pos = 0
cmlenz@106:
cmlenz@106: # Tokenizer
cmlenz@106:
cmlenz@106: at_end = property(lambda self: self.pos == len(self.tokens) - 1)
cmlenz@106: cur_token = property(lambda self: self.tokens[self.pos])
cmlenz@106:
cmlenz@106: def next_token(self):
cmlenz@106: self.pos += 1
cmlenz@106: return self.tokens[self.pos]
cmlenz@106:
cmlenz@106: def peek_token(self):
cmlenz@106: if not self.at_end:
cmlenz@106: return self.tokens[self.pos + 1]
cmlenz@106: return None
cmlenz@106:
cmlenz@106: # Recursive descent parser
cmlenz@106:
cmlenz@106: def parse(self):
cmlenz@106: """Parses the XPath expression and returns a list of location path
cmlenz@106: tests.
cmlenz@106:
cmlenz@106: For union expressions (such as `*|text()`), this function returns one
cmlenz@106: test for each operand in the union. For patch expressions that don't
cmlenz@106: use the union operator, the function always returns a list of size 1.
cmlenz@106:
cmlenz@106: Each path test in turn is a sequence of tests that correspond to the
cmlenz@111: location steps, each tuples of the form `(axis, testfunc, predicates)`
cmlenz@106: """
cmlenz@106: paths = [self._location_path()]
cmlenz@106: while self.cur_token == '|':
cmlenz@106: self.next_token()
cmlenz@106: paths.append(self._location_path())
cmlenz@106: if not self.at_end:
cmlenz@106: raise PathSyntaxError('Unexpected token %r after end of expression'
cmlenz@106: % self.cur_token)
cmlenz@106: return paths
cmlenz@106:
cmlenz@106: def _location_path(self):
cmlenz@106: next_is_closure = True
cmlenz@106: steps = []
cmlenz@106: while True:
cmlenz@106: if self.cur_token == '//':
cmlenz@106: next_is_closure = True
cmlenz@111: self.next_token()
cmlenz@111: elif self.cur_token == '/' and not steps:
cmlenz@111: raise PathSyntaxError('Absolute location paths not supported')
cmlenz@111:
cmlenz@111: axis, node_test, predicates = self._location_step()
cmlenz@114: if axis is CHILD and next_is_closure:
cmlenz@114: axis = DESCENDANT_OR_SELF
cmlenz@111: steps.append((axis, node_test, predicates))
cmlenz@111: next_is_closure = False
cmlenz@111:
cmlenz@111: if self.at_end or not self.cur_token.startswith('/'):
cmlenz@106: break
cmlenz@106: self.next_token()
cmlenz@111:
cmlenz@106: return steps
cmlenz@106:
cmlenz@106: def _location_step(self):
cmlenz@106: if self.cur_token == '@':
cmlenz@114: axis = ATTRIBUTE
cmlenz@106: self.next_token()
cmlenz@111: elif self.cur_token == '.':
cmlenz@114: axis = SELF
cmlenz@111: elif self.peek_token() == '::':
cmlenz@114: axis = Axis.forname(self.cur_token)
cmlenz@114: if axis is None:
cmlenz@111: raise PathSyntaxError('Unsupport axis "%s"' % axis)
cmlenz@111: self.next_token()
cmlenz@111: self.next_token()
cmlenz@106: else:
cmlenz@114: axis = CHILD
cmlenz@111: node_test = self._node_test(axis)
cmlenz@111: predicates = []
cmlenz@106: while self.cur_token == '[':
cmlenz@111: predicates.append(self._predicate())
cmlenz@111: return axis, node_test, predicates
cmlenz@106:
cmlenz@106: def _node_test(self, axis=None):
cmlenz@106: test = None
cmlenz@106: if self.peek_token() in ('(', '()'): # Node type test
cmlenz@106: test = self._node_type()
cmlenz@106:
cmlenz@106: else: # Name test
cmlenz@114: if axis is ATTRIBUTE:
cmlenz@106: if self.cur_token == '*':
cmlenz@106: test = _node_test_any_attribute()
cmlenz@106: else:
cmlenz@106: test = _node_test_attribute_by_name(self.cur_token)
cmlenz@114: elif axis is SELF:
cmlenz@111: test = _node_test_current_element()
cmlenz@106: else:
cmlenz@111: if self.cur_token == '*':
cmlenz@106: test = _node_test_any_child_element()
cmlenz@106: else:
cmlenz@106: test = _node_test_child_element_by_name(self.cur_token)
cmlenz@106:
cmlenz@106: if not self.at_end:
cmlenz@106: self.next_token()
cmlenz@106: return test
cmlenz@106:
cmlenz@106: def _node_type(self):
cmlenz@106: name = self.cur_token
cmlenz@106: self.next_token()
cmlenz@106: if name == 'comment':
cmlenz@106: return _function_comment()
cmlenz@106: elif name == 'node':
cmlenz@106: return _function_node()
cmlenz@106: elif name == 'processing-instruction':
cmlenz@106: args = []
cmlenz@106: if self.cur_token != '()':
cmlenz@106: # The processing-instruction() function optionally accepts the
cmlenz@106: # name of the PI as argument, which must be a literal string
cmlenz@106: self.next_token() # (
cmlenz@106: if self.cur_token != ')':
cmlenz@106: string = self.cur_token
cmlenz@106: if (string[0], string[-1]) in self._QUOTES:
cmlenz@106: string = string[1:-1]
cmlenz@106: args.append(string)
cmlenz@106: return _function_processing_instruction(*args)
cmlenz@106: elif name == 'text':
cmlenz@106: return _function_text()
cmlenz@106: else:
cmlenz@106: raise PathSyntaxError('%s() not allowed here' % name)
cmlenz@106:
cmlenz@106: def _predicate(self):
cmlenz@106: assert self.cur_token == '['
cmlenz@106: self.next_token()
cmlenz@111: expr = self._or_expr()
cmlenz@121: if self.cur_token != ']':
cmlenz@121: raise PathSyntaxError('Expected "]" to close predicate, '
cmlenz@121: 'but found "%s"' % self.cur_token)
cmlenz@111: if not self.at_end:
cmlenz@111: self.next_token()
cmlenz@111: return expr
cmlenz@106:
cmlenz@106: def _or_expr(self):
cmlenz@106: expr = self._and_expr()
cmlenz@106: while self.cur_token == 'or':
cmlenz@106: self.next_token()
cmlenz@106: expr = _operator_or(expr, self._and_expr())
cmlenz@106: return expr
cmlenz@106:
cmlenz@106: def _and_expr(self):
cmlenz@106: expr = self._equality_expr()
cmlenz@106: while self.cur_token == 'and':
cmlenz@106: self.next_token()
cmlenz@106: expr = _operator_and(expr, self._equality_expr())
cmlenz@106: return expr
cmlenz@106:
cmlenz@106: def _equality_expr(self):
cmlenz@106: expr = self._primary_expr()
cmlenz@106: while self.cur_token in ('=', '!='):
cmlenz@106: op = {'=': _operator_eq, '!=': _operator_neq}[self.cur_token]
cmlenz@106: self.next_token()
cmlenz@106: expr = op(expr, self._primary_expr())
cmlenz@106: return expr
cmlenz@106:
cmlenz@106: def _primary_expr(self):
cmlenz@106: token = self.cur_token
cmlenz@106: if len(token) > 1 and (token[0], token[-1]) in self._QUOTES:
cmlenz@106: self.next_token()
cmlenz@106: return _literal_string(token[1:-1])
cmlenz@106: elif token[0].isdigit():
cmlenz@106: self.next_token()
cmlenz@106: return _literal_number(float(token))
cmlenz@121: elif not self.at_end and self.peek_token().startswith('('):
cmlenz@121: if self.next_token() == '()':
cmlenz@121: args = []
cmlenz@121: else:
cmlenz@121: self.next_token()
cmlenz@121: args = [self._or_expr()]
cmlenz@121: while self.cur_token not in (',', ')'):
cmlenz@121: args.append(self._or_expr())
cmlenz@121: self.next_token()
cmlenz@121: if token == 'local-name':
cmlenz@121: return _function_local_name(*args)
cmlenz@121: elif token == 'name':
cmlenz@121: return _function_name(*args)
cmlenz@121: elif token == 'namespace-uri':
cmlenz@121: return _function_namespace_uri(*args)
cmlenz@121: elif token == 'not':
cmlenz@121: return _function_not(*args)
cmlenz@121: else:
cmlenz@121: raise PathSyntaxError('Unsupported function "%s"' % token)
cmlenz@106: else:
cmlenz@106: axis = None
cmlenz@106: if token == '@':
cmlenz@114: axis = ATTRIBUTE
cmlenz@106: self.next_token()
cmlenz@106: return self._node_test(axis)