cmlenz@1: # -*- coding: utf-8 -*- cmlenz@1: # cmlenz@66: # Copyright (C) 2006 Edgewall Software cmlenz@1: # All rights reserved. cmlenz@1: # cmlenz@1: # This software is licensed as described in the file COPYING, which cmlenz@1: # you should have received as part of this distribution. The terms cmlenz@66: # are also available at http://markup.edgewall.org/wiki/License. cmlenz@1: # cmlenz@1: # This software consists of voluntary contributions made by many cmlenz@1: # individuals. For the exact contribution history, see the revision cmlenz@66: # history and logs, available at http://markup.edgewall.org/log/. cmlenz@1: cmlenz@111: """Basic support for evaluating XPath expressions against streams. cmlenz@111: cmlenz@111: >>> from markup.input import XML cmlenz@111: >>> doc = XML(''' cmlenz@111: ... cmlenz@111: ... cmlenz@111: ... Foo cmlenz@111: ... cmlenz@111: ... cmlenz@111: ... Bar cmlenz@111: ... cmlenz@111: ... cmlenz@111: ... ''') cmlenz@111: >>> print doc.select('items/item[@status="closed"]/summary/text()') cmlenz@111: Bar cmlenz@111: cmlenz@111: Because the XPath engine operates on markup streams (as opposed to tree cmlenz@111: structures), it only implements a subset of the full XPath 1.0 language. cmlenz@111: """ cmlenz@1: cmlenz@1: import re cmlenz@1: cmlenz@106: from markup.core import QName, Stream, START, END, TEXT, COMMENT, PI cmlenz@1: cmlenz@106: __all__ = ['Path', 'PathSyntaxError'] cmlenz@1: cmlenz@1: cmlenz@1: class Path(object): cmlenz@26: """Implements basic XPath support on streams. cmlenz@1: cmlenz@26: Instances of this class represent a "compiled" XPath expression, and provide cmlenz@26: methods for testing the path against a stream, as well as extracting a cmlenz@26: substream matching that path. cmlenz@1: """ cmlenz@1: cmlenz@1: def __init__(self, text): cmlenz@26: """Create the path object from a string. cmlenz@26: cmlenz@26: @param text: the path expression cmlenz@26: """ cmlenz@1: self.source = text cmlenz@106: self.paths = _PathParser(text).parse() cmlenz@1: cmlenz@1: def __repr__(self): cmlenz@1: return '<%s "%s">' % (self.__class__.__name__, self.source) cmlenz@1: cmlenz@1: def select(self, stream): cmlenz@26: """Returns a substream of the given stream that matches the path. cmlenz@26: cmlenz@26: If there are no matches, this method returns an empty stream. cmlenz@26: cmlenz@33: >>> from markup.input import XML cmlenz@33: >>> xml = XML('Text') cmlenz@61: cmlenz@33: >>> print Path('child').select(xml) cmlenz@33: Text cmlenz@33: cmlenz@33: >>> print Path('child/text()').select(xml) cmlenz@33: Text cmlenz@33: cmlenz@26: @param stream: the stream to select from cmlenz@26: @return: the substream matching the path, or an empty stream cmlenz@26: """ cmlenz@1: stream = iter(stream) cmlenz@26: def _generate(): cmlenz@1: test = self.test() cmlenz@1: for kind, data, pos in stream: cmlenz@1: result = test(kind, data, pos) cmlenz@1: if result is True: cmlenz@1: yield kind, data, pos cmlenz@1: depth = 1 cmlenz@1: while depth > 0: cmlenz@73: subkind, subdata, subpos = stream.next() cmlenz@73: if subkind is START: cmlenz@73: depth += 1 cmlenz@73: elif subkind is END: cmlenz@73: depth -= 1 cmlenz@73: yield subkind, subdata, subpos cmlenz@73: test(subkind, subdata, subpos) cmlenz@1: elif result: cmlenz@1: yield result cmlenz@26: return Stream(_generate()) cmlenz@1: cmlenz@38: def test(self, ignore_context=False): cmlenz@26: """Returns a function that can be used to track whether the path matches cmlenz@26: a specific stream event. cmlenz@26: cmlenz@26: The function returned expects the positional arguments `kind`, `data`, cmlenz@26: and `pos`, i.e. basically an unpacked stream event. If the path matches cmlenz@26: the event, the function returns the match (for example, a `START` or cmlenz@106: `TEXT` event.) Otherwise, it returns `None`. cmlenz@33: cmlenz@33: >>> from markup.input import XML cmlenz@33: >>> xml = XML('') cmlenz@33: >>> test = Path('child').test() cmlenz@33: >>> for kind, data, pos in xml: cmlenz@33: ... if test(kind, data, pos): cmlenz@33: ... print kind, data cmlenz@33: START (u'child', [(u'id', u'1')]) cmlenz@33: START (u'child', [(u'id', u'2')]) cmlenz@26: """ cmlenz@106: paths = [(idx, steps, len(steps), [0]) cmlenz@106: for idx, steps in enumerate(self.paths)] cmlenz@1: cmlenz@1: def _test(kind, data, pos): cmlenz@106: for idx, steps, size, stack in paths: cmlenz@106: if not stack: cmlenz@106: continue cmlenz@106: cursor = stack[-1] cmlenz@1: cmlenz@106: if kind is END: cmlenz@106: stack.pop() cmlenz@106: continue cmlenz@1: cmlenz@106: elif kind is START: cmlenz@106: stack.append(cursor) cmlenz@106: cmlenz@106: matched = None cmlenz@111: while 1: cmlenz@111: axis, node_test, predicates = steps[cursor] cmlenz@106: cmlenz@111: matched = node_test(kind, data, pos) cmlenz@111: if matched and predicates: cmlenz@111: for predicate in predicates: cmlenz@111: if not predicate(kind, data, pos): cmlenz@111: matched = None cmlenz@111: break cmlenz@106: cmlenz@111: if matched: cmlenz@111: if cursor + 1 == size: # the last location step cmlenz@111: if ignore_context or \ cmlenz@111: kind is not START or \ cmlenz@111: axis in ('attribute', 'self') or \ cmlenz@111: len(stack) > 2: cmlenz@111: return matched cmlenz@111: else: cmlenz@111: cursor += 1 cmlenz@111: stack[-1] = cursor cmlenz@111: cmlenz@111: if axis != 'self': cmlenz@111: break cmlenz@111: cmlenz@111: if not matched and kind is START \ cmlenz@111: and not axis.startswith('descendant'): cmlenz@106: # If this step is not a closure, it cannot be matched until cmlenz@106: # the current element is closed... so we need to move the cmlenz@106: # cursor back to the last closure and retest that against cmlenz@106: # the current element cmlenz@111: backsteps = [step for step in steps[:cursor] cmlenz@111: if step[0].startswith('descendant')] cmlenz@111: backsteps.reverse() cmlenz@111: for axis, node_test, predicates in backsteps: cmlenz@111: matched = node_test(kind, data, pos) cmlenz@111: if not matched: cmlenz@111: cursor -= 1 cmlenz@111: break cmlenz@106: stack[-1] = cursor cmlenz@1: cmlenz@1: return None cmlenz@1: cmlenz@1: return _test cmlenz@1: cmlenz@1: cmlenz@106: def _node_test_current_element(): cmlenz@106: def _node_test_current_element(kind, *_): cmlenz@106: return kind is START cmlenz@106: _node_test_current_element.axis = 'self' cmlenz@106: return _node_test_current_element cmlenz@77: cmlenz@106: def _node_test_any_child_element(): cmlenz@106: def _node_test_any_child_element(kind, *_): cmlenz@106: return kind is START cmlenz@106: _node_test_any_child_element.axis = 'child' cmlenz@106: return _node_test_any_child_element cmlenz@77: cmlenz@106: def _node_test_child_element_by_name(name): cmlenz@106: def _node_test_child_element_by_name(kind, data, _): cmlenz@106: return kind is START and data[0].localname == name cmlenz@106: _node_test_child_element_by_name.axis = 'child' cmlenz@106: return _node_test_child_element_by_name cmlenz@77: cmlenz@106: def _node_test_any_attribute(): cmlenz@106: def _node_test_any_attribute(kind, data, _): cmlenz@106: if kind is START and data[1]: cmlenz@106: return data[1] cmlenz@106: _node_test_any_attribute.axis = 'attribute' cmlenz@106: return _node_test_any_attribute cmlenz@106: cmlenz@106: def _node_test_attribute_by_name(name): cmlenz@106: def _node_test_attribute_by_name(kind, data, pos): cmlenz@106: if kind is START and name in data[1]: cmlenz@106: return TEXT, data[1].get(name), pos cmlenz@106: _node_test_attribute_by_name.axis = 'attribute' cmlenz@106: return _node_test_attribute_by_name cmlenz@106: cmlenz@106: def _function_comment(): cmlenz@106: def _function_comment(kind, data, pos): cmlenz@106: return kind is COMMENT and (kind, data, pos) cmlenz@106: _function_comment.axis = None cmlenz@106: return _function_comment cmlenz@106: cmlenz@106: def _function_node(): cmlenz@106: def _function_node(kind, data, pos): cmlenz@111: if kind is START: cmlenz@111: return True cmlenz@111: return kind, data, pos cmlenz@106: _function_node.axis = None cmlenz@106: return _function_node cmlenz@106: cmlenz@106: def _function_processing_instruction(name=None): cmlenz@106: def _function_processing_instruction(kind, data, pos): cmlenz@106: if kind is PI and (not name or data[0] == name): cmlenz@106: return (kind, data, pos) cmlenz@106: _function_processing_instruction.axis = None cmlenz@106: return _function_processing_instruction cmlenz@106: cmlenz@106: def _function_text(): cmlenz@106: def _function_text(kind, data, pos): cmlenz@106: return kind is TEXT and (kind, data, pos) cmlenz@106: _function_text.axis = None cmlenz@106: return _function_text cmlenz@106: cmlenz@106: def _literal_string(text): cmlenz@106: def _literal_string(*_): cmlenz@106: return TEXT, text, (None, -1, -1) cmlenz@106: _literal_string.axis = None cmlenz@106: return _literal_string cmlenz@106: cmlenz@106: def _operator_eq(lval, rval): cmlenz@106: def _operator_eq(kind, data, pos): cmlenz@106: lv = lval(kind, data, pos) cmlenz@106: rv = rval(kind, data, pos) cmlenz@106: return (lv and lv[1]) == (rv and rv[1]) cmlenz@106: _operator_eq.axis = None cmlenz@106: return _operator_eq cmlenz@106: cmlenz@106: def _operator_neq(lval, rval): cmlenz@106: def _operator_neq(kind, data, pos): cmlenz@106: lv = lval(kind, data, pos) cmlenz@106: rv = rval(kind, data, pos) cmlenz@106: return (lv and lv[1]) != (rv and rv[1]) cmlenz@106: _operator_neq.axis = None cmlenz@106: return _operator_neq cmlenz@106: cmlenz@106: def _operator_and(lval, rval): cmlenz@106: def _operator_and(kind, data, pos): cmlenz@106: lv = lval(kind, data, pos) cmlenz@106: if not lv or (lv is not True and not lv[1]): cmlenz@106: return False cmlenz@106: rv = rval(kind, data, pos) cmlenz@106: if not rv or (rv is not True and not rv[1]): cmlenz@106: return False cmlenz@106: return True cmlenz@106: _operator_and.axis = None cmlenz@106: return _operator_and cmlenz@106: cmlenz@106: def _operator_or(lval, rval): cmlenz@106: def _operator_or(kind, data, pos): cmlenz@106: lv = lval(kind, data, pos) cmlenz@106: if lv and (lv is True or lv[1]): cmlenz@106: return True cmlenz@106: rv = rval(kind, data, pos) cmlenz@106: if rv and (rv is True or rv[1]): cmlenz@106: return True cmlenz@106: return False cmlenz@106: _operator_or.axis = None cmlenz@106: return _operator_or cmlenz@106: cmlenz@106: cmlenz@106: class PathSyntaxError(Exception): cmlenz@106: """Exception raised when an XPath expression is syntactically incorrect.""" cmlenz@106: cmlenz@106: def __init__(self, message, filename=None, lineno=-1, offset=-1): cmlenz@106: if filename: cmlenz@106: message = '%s (%s, line %d)' % (message, filename, lineno) cmlenz@106: Exception.__init__(self, message) cmlenz@106: self.filename = filename cmlenz@106: self.lineno = lineno cmlenz@106: self.offset = offset cmlenz@106: cmlenz@106: cmlenz@106: class _PathParser(object): cmlenz@106: """Tokenizes and parses an XPath expression.""" cmlenz@106: cmlenz@106: _QUOTES = (("'", "'"), ('"', '"')) cmlenz@106: _TOKENS = ('::', ':', '..', '.', '//', '/', '[', ']', '()', '(', ')', '@', cmlenz@106: '=', '!=', '!', '|') cmlenz@106: _tokenize = re.compile('(%s)|([^%s\s]+)|\s+' % ( cmlenz@106: '|'.join([re.escape(t) for t in _TOKENS]), cmlenz@106: ''.join([re.escape(t[0]) for t in _TOKENS]))).findall cmlenz@106: cmlenz@106: def __init__(self, text): cmlenz@106: self.tokens = filter(None, [a or b for a, b in self._tokenize(text)]) cmlenz@106: self.pos = 0 cmlenz@106: cmlenz@106: # Tokenizer cmlenz@106: cmlenz@106: at_end = property(lambda self: self.pos == len(self.tokens) - 1) cmlenz@106: cur_token = property(lambda self: self.tokens[self.pos]) cmlenz@106: cmlenz@106: def next_token(self): cmlenz@106: self.pos += 1 cmlenz@106: return self.tokens[self.pos] cmlenz@106: cmlenz@106: def peek_token(self): cmlenz@106: if not self.at_end: cmlenz@106: return self.tokens[self.pos + 1] cmlenz@106: return None cmlenz@106: cmlenz@106: # Recursive descent parser cmlenz@106: cmlenz@106: def parse(self): cmlenz@106: """Parses the XPath expression and returns a list of location path cmlenz@106: tests. cmlenz@106: cmlenz@106: For union expressions (such as `*|text()`), this function returns one cmlenz@106: test for each operand in the union. For patch expressions that don't cmlenz@106: use the union operator, the function always returns a list of size 1. cmlenz@106: cmlenz@106: Each path test in turn is a sequence of tests that correspond to the cmlenz@111: location steps, each tuples of the form `(axis, testfunc, predicates)` cmlenz@106: """ cmlenz@106: paths = [self._location_path()] cmlenz@106: while self.cur_token == '|': cmlenz@106: self.next_token() cmlenz@106: paths.append(self._location_path()) cmlenz@106: if not self.at_end: cmlenz@106: raise PathSyntaxError('Unexpected token %r after end of expression' cmlenz@106: % self.cur_token) cmlenz@106: return paths cmlenz@106: cmlenz@106: def _location_path(self): cmlenz@106: next_is_closure = True cmlenz@106: steps = [] cmlenz@106: while True: cmlenz@106: if self.cur_token == '//': cmlenz@106: next_is_closure = True cmlenz@111: self.next_token() cmlenz@111: elif self.cur_token == '/' and not steps: cmlenz@111: raise PathSyntaxError('Absolute location paths not supported') cmlenz@111: cmlenz@111: axis, node_test, predicates = self._location_step() cmlenz@111: if axis == 'child' and next_is_closure: cmlenz@111: axis = 'descendant-or-self' cmlenz@111: steps.append((axis, node_test, predicates)) cmlenz@111: next_is_closure = False cmlenz@111: cmlenz@111: if self.at_end or not self.cur_token.startswith('/'): cmlenz@106: break cmlenz@106: self.next_token() cmlenz@111: cmlenz@106: return steps cmlenz@106: cmlenz@106: def _location_step(self): cmlenz@106: if self.cur_token == '@': cmlenz@106: axis = 'attribute' cmlenz@106: self.next_token() cmlenz@111: elif self.cur_token == '.': cmlenz@111: axis = 'self' cmlenz@111: elif self.peek_token() == '::': cmlenz@111: axis = self.cur_token cmlenz@111: if axis not in ('attribute', 'child', 'descendant', cmlenz@111: 'descendant-or-self', 'namespace', 'self'): cmlenz@111: raise PathSyntaxError('Unsupport axis "%s"' % axis) cmlenz@111: self.next_token() cmlenz@111: self.next_token() cmlenz@106: else: cmlenz@106: axis = 'child' cmlenz@111: node_test = self._node_test(axis) cmlenz@111: predicates = [] cmlenz@106: while self.cur_token == '[': cmlenz@111: predicates.append(self._predicate()) cmlenz@111: return axis, node_test, predicates cmlenz@106: cmlenz@106: def _node_test(self, axis=None): cmlenz@106: test = None cmlenz@106: if self.peek_token() in ('(', '()'): # Node type test cmlenz@106: test = self._node_type() cmlenz@106: cmlenz@106: else: # Name test cmlenz@106: if axis == 'attribute': cmlenz@106: if self.cur_token == '*': cmlenz@106: test = _node_test_any_attribute() cmlenz@106: else: cmlenz@106: test = _node_test_attribute_by_name(self.cur_token) cmlenz@111: elif axis == 'self': cmlenz@111: test = _node_test_current_element() cmlenz@106: else: cmlenz@111: if self.cur_token == '*': cmlenz@106: test = _node_test_any_child_element() cmlenz@106: else: cmlenz@106: test = _node_test_child_element_by_name(self.cur_token) cmlenz@106: cmlenz@106: if not self.at_end: cmlenz@106: self.next_token() cmlenz@106: return test cmlenz@106: cmlenz@106: def _node_type(self): cmlenz@106: name = self.cur_token cmlenz@106: self.next_token() cmlenz@106: if name == 'comment': cmlenz@106: return _function_comment() cmlenz@106: elif name == 'node': cmlenz@106: return _function_node() cmlenz@106: elif name == 'processing-instruction': cmlenz@106: args = [] cmlenz@106: if self.cur_token != '()': cmlenz@106: # The processing-instruction() function optionally accepts the cmlenz@106: # name of the PI as argument, which must be a literal string cmlenz@106: self.next_token() # ( cmlenz@106: if self.cur_token != ')': cmlenz@106: string = self.cur_token cmlenz@106: if (string[0], string[-1]) in self._QUOTES: cmlenz@106: string = string[1:-1] cmlenz@106: args.append(string) cmlenz@106: return _function_processing_instruction(*args) cmlenz@106: elif name == 'text': cmlenz@106: return _function_text() cmlenz@106: else: cmlenz@106: raise PathSyntaxError('%s() not allowed here' % name) cmlenz@106: cmlenz@106: def _predicate(self): cmlenz@106: assert self.cur_token == '[' cmlenz@106: self.next_token() cmlenz@111: expr = self._or_expr() cmlenz@106: assert self.cur_token == ']' cmlenz@111: if not self.at_end: cmlenz@111: self.next_token() cmlenz@111: return expr cmlenz@106: cmlenz@106: def _or_expr(self): cmlenz@106: expr = self._and_expr() cmlenz@106: while self.cur_token == 'or': cmlenz@106: self.next_token() cmlenz@106: expr = _operator_or(expr, self._and_expr()) cmlenz@106: return expr cmlenz@106: cmlenz@106: def _and_expr(self): cmlenz@106: expr = self._equality_expr() cmlenz@106: while self.cur_token == 'and': cmlenz@106: self.next_token() cmlenz@106: expr = _operator_and(expr, self._equality_expr()) cmlenz@106: return expr cmlenz@106: cmlenz@106: def _equality_expr(self): cmlenz@106: expr = self._primary_expr() cmlenz@106: while self.cur_token in ('=', '!='): cmlenz@106: op = {'=': _operator_eq, '!=': _operator_neq}[self.cur_token] cmlenz@106: self.next_token() cmlenz@106: expr = op(expr, self._primary_expr()) cmlenz@106: return expr cmlenz@106: cmlenz@106: def _primary_expr(self): cmlenz@106: token = self.cur_token cmlenz@106: if len(token) > 1 and (token[0], token[-1]) in self._QUOTES: cmlenz@106: self.next_token() cmlenz@106: return _literal_string(token[1:-1]) cmlenz@106: elif token[0].isdigit(): cmlenz@106: self.next_token() cmlenz@106: return _literal_number(float(token)) cmlenz@106: else: cmlenz@106: axis = None cmlenz@106: if token == '@': cmlenz@106: axis = 'attribute' cmlenz@106: self.next_token() cmlenz@106: return self._node_test(axis)