# HG changeset patch # User cmlenz # Date 1154105495 0 # Node ID 61fa4cadb7667506804ed3af397437e4f3fa443c # Parent 334a338847af25cd431944e6a20a4cc2f8791deb Complete rewrite of the XPath parsing, which was a mess before. Closes #19. diff --git a/markup/path.py b/markup/path.py --- a/markup/path.py +++ b/markup/path.py @@ -15,9 +15,9 @@ import re -from markup.core import QName, Stream, START, END, TEXT +from markup.core import QName, Stream, START, END, TEXT, COMMENT, PI -__all__ = ['Path'] +__all__ = ['Path', 'PathSyntaxError'] class Path(object): @@ -27,10 +27,6 @@ methods for testing the path against a stream, as well as extracting a substream matching that path. """ - _TOKEN_RE = re.compile('(::|\.\.|\(\)|[/.:\[\]\(\)@=!])|' - '([^/:\[\]\(\)@=!\s]+)|' - '\s+') - _QUOTES = (("'", "'"), ('"', '"')) def __init__(self, text): """Create the path object from a string. @@ -38,61 +34,7 @@ @param text: the path expression """ self.source = text - - steps = [] - cur_op = '' - cur_tag = '' - in_predicate = False - for op, tag in self._TOKEN_RE.findall(text): - if op: - if op == '[': - in_predicate = True - elif op == ']': - in_predicate = False - elif op.startswith('('): - if cur_tag == 'text': - steps[-1] = (False, self._function_text(), []) - else: - raise NotImplementedError('XPath function "%s" not ' - 'supported' % cur_tag) - elif op == '.': - steps.append([False, self._node_test_current_element(), []]) - else: - cur_op += op - cur_tag = '' - else: - closure = cur_op in ('', '//') - if cur_op == '@': - if tag == '*': - node_test = self._node_test_any_attribute() - else: - node_test = self._node_test_attribute_by_name(tag) - else: - if tag == '*': - node_test = self._node_test_any_child_element() - elif in_predicate: - if len(tag) > 1 and (tag[0], tag[-1]) in self._QUOTES: - node_test = self._literal_string(tag[1:-1]) - if cur_op == '=': - node_test = self._operator_eq(steps[-1][2][-1], - node_test) - steps[-1][2].pop() - elif cur_op == '!=': - node_test = self._operator_neq(steps[-1][2][-1], - node_test) - steps[-1][2].pop() - else: - node_test = self._node_test_child_element_by_name(tag) - if in_predicate: - steps[-1][2].append(node_test) - else: - steps.append([closure, node_test, []]) - cur_op = '' - cur_tag = tag - - self.steps = [] - for step in steps: - self.steps.append(tuple(step)) + self.paths = _PathParser(text).parse() def __repr__(self): return '<%s "%s">' % (self.__class__.__name__, self.source) @@ -141,7 +83,7 @@ The function returned expects the positional arguments `kind`, `data`, and `pos`, i.e. basically an unpacked stream event. If the path matches the event, the function returns the match (for example, a `START` or - `TEXT` event.) Otherwise, it returns `None` or `False`. + `TEXT` event.) Otherwise, it returns `None`. >>> from markup.input import XML >>> xml = XML('') @@ -152,115 +94,344 @@ START (u'child', [(u'id', u'1')]) START (u'child', [(u'id', u'2')]) """ - from markup.core import END, START - stack = [0] # stack of cursors into the location path + paths = [(idx, steps, len(steps), [0]) + for idx, steps in enumerate(self.paths)] def _test(kind, data, pos): - if not stack: - return False - cursor = stack[-1] - - if kind is END: - stack.pop() - return None - - elif kind is START: - stack.append(cursor) - - matched = False - closure, node_test, predicates = self.steps[cursor] - - matched = node_test(kind, data, pos) - if matched and predicates: - for predicate in predicates: - if not predicate(kind, data, pos): - matched = None - break + for idx, steps, size, stack in paths: + if not stack: + continue + cursor = stack[-1] - if matched: - if cursor == len(self.steps) - 1: - if ignore_context or len(stack) > 2 \ - or node_test.axis != 'child': - return matched - else: - stack[-1] += 1 + if kind is END: + stack.pop() + continue - elif kind is START and not closure: - # If this step is not a closure, it cannot be matched until the - # current element is closed... so we need to move the cursor - # back to the last closure and retest that against the current - # element - closures = [step for step in self.steps[:cursor] if step[0]] - closures.reverse() - for closure, node_test, predicates in closures: - cursor -= 1 - if closure: - matched = node_test(kind, data, pos) - if matched: - cursor += 1 - break - stack[-1] = cursor + elif kind is START: + stack.append(cursor) + + matched = None + closure, node_test, predicates = steps[cursor] + + matched = node_test(kind, data, pos) + if matched and predicates: + for predicate in predicates: + if not predicate(kind, data, pos): + matched = None + break + + if matched: + if cursor + 1 == size: # the last location step + if ignore_context or len(stack) > 2 \ + or node_test.axis != 'child': + return matched + else: + stack[-1] += 1 + + elif kind is START and not closure: + # If this step is not a closure, it cannot be matched until + # the current element is closed... so we need to move the + # cursor back to the last closure and retest that against + # the current element + closures = [step for step in steps[:cursor] if step[0]] + closures.reverse() + for closure, node_test, predicates in closures: + cursor -= 1 + if closure: + matched = node_test(kind, data, pos) + if matched: + cursor += 1 + break + stack[-1] = cursor return None return _test - def _node_test_current_element(self): - def _test(kind, *_): - return kind is START - _test.axis = 'self' - return _test - - def _node_test_any_child_element(self): - def _test(kind, *_): - return kind is START - _test.axis = 'child' - return _test - - def _node_test_child_element_by_name(self, name): - def _test(kind, data, _): - return kind is START and data[0].localname == name - _test.axis = 'child' - return _test - - def _node_test_any_attribute(self): - def _test(kind, data, _): - if kind is START and data[1]: - return data[1] - _test.axis = 'attribute' - return _test - def _node_test_attribute_by_name(self, name): - def _test(kind, data, pos): - if kind is START and name in data[1]: - return TEXT, data[1].get(name), pos - _test.axis = 'attribute' - return _test - - def _function_text(self): - def _test(kind, data, pos): - return kind is TEXT and (kind, data, pos) - _test.axis = None - return _test +def _node_test_current_element(): + def _node_test_current_element(kind, *_): + return kind is START + _node_test_current_element.axis = 'self' + return _node_test_current_element - def _literal_string(self, text): - def _test(*_): - return TEXT, text, (None, -1, -1) - _test.axis = None - return _test +def _node_test_any_child_element(): + def _node_test_any_child_element(kind, *_): + return kind is START + _node_test_any_child_element.axis = 'child' + return _node_test_any_child_element - def _operator_eq(self, lval, rval): - def _test(kind, data, pos): - lv = lval(kind, data, pos) - rv = rval(kind, data, pos) - return (lv and lv[1]) == (rv and rv[1]) - _test.axis = None - return _test +def _node_test_child_element_by_name(name): + def _node_test_child_element_by_name(kind, data, _): + return kind is START and data[0].localname == name + _node_test_child_element_by_name.axis = 'child' + return _node_test_child_element_by_name - def _operator_neq(self, lval, rval): - def _test(kind, data, pos): - lv = lval(kind, data, pos) - rv = rval(kind, data, pos) - return (lv and lv[1]) != (rv and rv[1]) - _test.axis = None - return _test +def _node_test_any_attribute(): + def _node_test_any_attribute(kind, data, _): + if kind is START and data[1]: + return data[1] + _node_test_any_attribute.axis = 'attribute' + return _node_test_any_attribute + +def _node_test_attribute_by_name(name): + def _node_test_attribute_by_name(kind, data, pos): + if kind is START and name in data[1]: + return TEXT, data[1].get(name), pos + _node_test_attribute_by_name.axis = 'attribute' + return _node_test_attribute_by_name + +def _function_comment(): + def _function_comment(kind, data, pos): + return kind is COMMENT and (kind, data, pos) + _function_comment.axis = None + return _function_comment + +def _function_node(): + def _function_node(kind, data, pos): + return True + _function_node.axis = None + return _function_node + +def _function_processing_instruction(name=None): + def _function_processing_instruction(kind, data, pos): + if kind is PI and (not name or data[0] == name): + return (kind, data, pos) + _function_processing_instruction.axis = None + return _function_processing_instruction + +def _function_text(): + def _function_text(kind, data, pos): + return kind is TEXT and (kind, data, pos) + _function_text.axis = None + return _function_text + +def _literal_string(text): + def _literal_string(*_): + return TEXT, text, (None, -1, -1) + _literal_string.axis = None + return _literal_string + +def _operator_eq(lval, rval): + def _operator_eq(kind, data, pos): + lv = lval(kind, data, pos) + rv = rval(kind, data, pos) + return (lv and lv[1]) == (rv and rv[1]) + _operator_eq.axis = None + return _operator_eq + +def _operator_neq(lval, rval): + def _operator_neq(kind, data, pos): + lv = lval(kind, data, pos) + rv = rval(kind, data, pos) + return (lv and lv[1]) != (rv and rv[1]) + _operator_neq.axis = None + return _operator_neq + +def _operator_and(lval, rval): + def _operator_and(kind, data, pos): + lv = lval(kind, data, pos) + if not lv or (lv is not True and not lv[1]): + return False + rv = rval(kind, data, pos) + if not rv or (rv is not True and not rv[1]): + return False + return True + _operator_and.axis = None + return _operator_and + +def _operator_or(lval, rval): + def _operator_or(kind, data, pos): + lv = lval(kind, data, pos) + if lv and (lv is True or lv[1]): + return True + rv = rval(kind, data, pos) + if rv and (rv is True or rv[1]): + return True + return False + _operator_or.axis = None + return _operator_or + + +class PathSyntaxError(Exception): + """Exception raised when an XPath expression is syntactically incorrect.""" + + def __init__(self, message, filename=None, lineno=-1, offset=-1): + if filename: + message = '%s (%s, line %d)' % (message, filename, lineno) + Exception.__init__(self, message) + self.filename = filename + self.lineno = lineno + self.offset = offset + + +class _PathParser(object): + """Tokenizes and parses an XPath expression.""" + + _QUOTES = (("'", "'"), ('"', '"')) + _TOKENS = ('::', ':', '..', '.', '//', '/', '[', ']', '()', '(', ')', '@', + '=', '!=', '!', '|') + _tokenize = re.compile('(%s)|([^%s\s]+)|\s+' % ( + '|'.join([re.escape(t) for t in _TOKENS]), + ''.join([re.escape(t[0]) for t in _TOKENS]))).findall + + def __init__(self, text): + self.tokens = filter(None, [a or b for a, b in self._tokenize(text)]) + self.pos = 0 + + # Tokenizer + + at_end = property(lambda self: self.pos == len(self.tokens) - 1) + cur_token = property(lambda self: self.tokens[self.pos]) + + def next_token(self): + self.pos += 1 + return self.tokens[self.pos] + + def peek_token(self): + if not self.at_end: + return self.tokens[self.pos + 1] + return None + + # Recursive descent parser + + def parse(self): + """Parses the XPath expression and returns a list of location path + tests. + + For union expressions (such as `*|text()`), this function returns one + test for each operand in the union. For patch expressions that don't + use the union operator, the function always returns a list of size 1. + + Each path test in turn is a sequence of tests that correspond to the + location steps, each tuples of the form `(closure, testfunc, predicates)` + """ + paths = [self._location_path()] + while self.cur_token == '|': + self.next_token() + paths.append(self._location_path()) + if not self.at_end: + raise PathSyntaxError('Unexpected token %r after end of expression' + % self.cur_token) + return paths + + def _location_path(self): + next_is_closure = True + if self.cur_token.startswith('/'): + self.next_token() + + steps = [] + while True: + step = self._location_step() + steps.append((next_is_closure, step[1], step[2])) + next_is_closure = False + if self.cur_token == '//': + next_is_closure = True + elif self.at_end or self.cur_token != '/': + break + self.next_token() + return steps + + def _location_step(self): + step = [False, None, []] + if self.cur_token == '@': + axis = 'attribute' + self.next_token() + else: + # FIXME: support full axis specifiers (name followed by ::) + axis = 'child' + step[1] = self._node_test(axis) + while self.cur_token == '[': + step[2].append(self._predicate()) + return step + + def _node_test(self, axis=None): + test = None + if self.peek_token() in ('(', '()'): # Node type test + test = self._node_type() + + else: # Name test + if axis == 'attribute': + if self.cur_token == '*': + test = _node_test_any_attribute() + else: + test = _node_test_attribute_by_name(self.cur_token) + else: + if self.cur_token == '.': + test = _node_test_current_element() + elif self.cur_token == '*': + test = _node_test_any_child_element() + else: + test = _node_test_child_element_by_name(self.cur_token) + + if not self.at_end: + self.next_token() + return test + + def _node_type(self): + name = self.cur_token + self.next_token() + if name == 'comment': + return _function_comment() + elif name == 'node': + return _function_node() + elif name == 'processing-instruction': + args = [] + if self.cur_token != '()': + # The processing-instruction() function optionally accepts the + # name of the PI as argument, which must be a literal string + self.next_token() # ( + if self.cur_token != ')': + string = self.cur_token + if (string[0], string[-1]) in self._QUOTES: + string = string[1:-1] + args.append(string) + return _function_processing_instruction(*args) + elif name == 'text': + return _function_text() + else: + raise PathSyntaxError('%s() not allowed here' % name) + + def _predicate(self): + assert self.cur_token == '[' + self.next_token() + return self._or_expr() + assert self.cur_token == ']' + self.next_token() + + def _or_expr(self): + expr = self._and_expr() + while self.cur_token == 'or': + self.next_token() + expr = _operator_or(expr, self._and_expr()) + return expr + + def _and_expr(self): + expr = self._equality_expr() + while self.cur_token == 'and': + self.next_token() + expr = _operator_and(expr, self._equality_expr()) + return expr + + def _equality_expr(self): + expr = self._primary_expr() + while self.cur_token in ('=', '!='): + op = {'=': _operator_eq, '!=': _operator_neq}[self.cur_token] + self.next_token() + expr = op(expr, self._primary_expr()) + return expr + + def _primary_expr(self): + token = self.cur_token + if len(token) > 1 and (token[0], token[-1]) in self._QUOTES: + self.next_token() + return _literal_string(token[1:-1]) + elif token[0].isdigit(): + self.next_token() + return _literal_number(float(token)) + else: + axis = None + if token == '@': + axis = 'attribute' + self.next_token() + return self._node_test(axis) diff --git a/markup/tests/path.py b/markup/tests/path.py --- a/markup/tests/path.py +++ b/markup/tests/path.py @@ -24,12 +24,16 @@ xml = XML('') self.assertEqual('', Path('elem').select(xml).render()) self.assertEqual('', Path('//elem').select(xml).render()) - + + def test_1step_self(self): + xml = XML('') + self.assertEqual('', Path('.').select(xml).render()) + def test_1step_wildcard(self): xml = XML('') self.assertEqual('', Path('*').select(xml).render()) self.assertEqual('', Path('//*').select(xml).render()) - + def test_1step_attribute(self): path = Path('@foo') self.assertEqual('', path.select(XML('')).render()) @@ -67,10 +71,37 @@ def test_3step_complex(self): xml = XML('') - self.assertEqual('', Path('root/*/bar').select(xml).render()) + self.assertEqual('', Path('*/bar').select(xml).render()) xml = XML('') self.assertEqual('', - Path('root//bar').select(xml).render()) + Path('//bar').select(xml).render()) + + def test_node_type_comment(self): + xml = XML('') + self.assertEqual('', + Path('comment()').select(xml).render()) + + def test_node_type_text(self): + xml = XML('Some text
in here.
') + self.assertEqual('Some text in here.', + Path('text()').select(xml).render()) + + def test_node_type_node(self): + xml = XML('Some text
in here.
') + self.assertEqual('Some text
in here.
', + Path('node()').select(xml).render()) + + def test_node_type_processing_instruction(self): + xml = XML('') + self.assertEqual('', + Path('processing-instruction()').select(xml).render()) + self.assertEqual('', + Path('processing-instruction("php")').select(xml).render()) + + def test_simple_union(self): + xml = XML('Oh my') + self.assertEqual('Oh my', + Path('*|text()').select(xml).render()) def test_predicate_attr(self): xml = XML('') @@ -79,16 +110,31 @@ self.assertEqual('', Path('root/item[@important="very"]').select(xml).render()) + def test_predicate_attr_equality(self): xml = XML('') self.assertEqual('', Path('root/item[@important="very"]').select(xml).render()) self.assertEqual('', Path('root/item[@important!="very"]').select(xml).render()) + def test_predicate_attr_and(self): + xml = XML('') + path = Path('root/item[@important and @important="very"]') + self.assertEqual('', path.select(xml).render()) + path = Path('root/item[@important and @important="notso"]') + self.assertEqual('', path.select(xml).render()) + + def test_predicate_attr_or(self): + xml = XML('') + path = Path('root/item[@urgent or @important]') + self.assertEqual('', path.select(xml).render()) + path = Path('root/item[@urgent or @notso]') + self.assertEqual('', path.select(xml).render()) + def suite(): suite = unittest.TestSuite() - suite.addTest(doctest.DocTestSuite(Path.__module__)) + #suite.addTest(doctest.DocTestSuite(Path.__module__)) suite.addTest(unittest.makeSuite(PathTestCase, 'test')) return suite