diff markup/path.py @ 1:821114ec4f69

Initial import.
author cmlenz
date Sat, 03 Jun 2006 07:16:01 +0000
parents
children 00835401c8cc
line wrap: on
line diff
new file mode 100644
--- /dev/null
+++ b/markup/path.py
@@ -0,0 +1,308 @@
+# -*- coding: utf-8 -*-
+#
+# Copyright (C) 2006 Edgewall Software
+# All rights reserved.
+#
+# This software is licensed as described in the file COPYING, which
+# you should have received as part of this distribution. The terms
+# are also available at http://trac.edgewall.com/license.html.
+#
+# This software consists of voluntary contributions made by many
+# individuals. For the exact contribution history, see the revision
+# history and logs, available at http://projects.edgewall.com/trac/.
+
+"""Basic support for evaluating XPath expressions against streams."""
+
+import re
+
+from markup.core import QName, Stream
+
+__all__ = ['Path']
+
+_QUOTES = (("'", "'"), ('"', '"'))
+
+class Path(object):
+    """Basic XPath support on markup event streams.
+    
+    >>> from markup.input import XML
+    
+    Selecting specific tags:
+    
+    >>> Path('root').select(XML('<root/>')).render()
+    '<root/>'
+    >>> Path('//root').select(XML('<root/>')).render()
+    '<root/>'
+    
+    Using wildcards for tag names:
+    
+    >>> Path('*').select(XML('<root/>')).render()
+    '<root/>'
+    >>> Path('//*').select(XML('<root/>')).render()
+    '<root/>'
+    
+    Selecting attribute values:
+    
+    >>> Path('@foo').select(XML('<root/>')).render()
+    ''
+    >>> Path('@foo').select(XML('<root foo="bar"/>')).render()
+    'bar'
+    
+    Selecting descendants:
+    
+    >>> Path("root/*").select(XML('<root><foo/><bar/></root>')).render()
+    '<foo/><bar/>'
+    >>> Path("root/bar").select(XML('<root><foo/><bar/></root>')).render()
+    '<bar/>'
+    >>> Path("root/baz").select(XML('<root><foo/><bar/></root>')).render()
+    ''
+    >>> Path("root/foo/*").select(XML('<root><foo><bar/></foo></root>')).render()
+    '<bar/>'
+    
+    Selecting text nodes:
+    >>> Path("item/text()").select(XML('<root><item>Foo</item></root>')).render()
+    'Foo'
+    >>> Path("item/text()").select(XML('<root><item>Foo</item><item>Bar</item></root>')).render()
+    'FooBar'
+    
+    Skipping ancestors:
+    
+    >>> Path("foo/bar").select(XML('<root><foo><bar/></foo></root>')).render()
+    '<bar/>'
+    >>> Path("foo/*").select(XML('<root><foo><bar/></foo></root>')).render()
+    '<bar/>'
+    >>> Path("root/bar").select(XML('<root><foo><bar/></foo></root>')).render()
+    ''
+    >>> Path("root/bar").select(XML('<root><foo><bar id="1"/></foo><bar id="2"/></root>')).render()
+    '<bar id="2"/>'
+    >>> Path("root/*/bar").select(XML('<root><foo><bar/></foo></root>')).render()
+    '<bar/>'
+    >>> Path("root//bar").select(XML('<root><foo><bar id="1"/></foo><bar id="2"/></root>')).render()
+    '<bar id="1"/><bar id="2"/>'
+    >>> Path("root//bar").select(XML('<root><foo><bar id="1"/></foo><bar id="2"/></root>')).render()
+    '<bar id="1"/><bar id="2"/>'
+    
+    Using simple attribute predicates:
+    >>> Path("root/item[@important]").select(XML('<root><item/><item important="very"/></root>')).render()
+    '<item important="very"/>'
+    >>> Path('root/item[@important="very"]').select(XML('<root><item/><item important="very"/></root>')).render()
+    '<item important="very"/>'
+    >>> Path("root/item[@important='very']").select(XML('<root><item/><item important="notso"/></root>')).render()
+    ''
+    >>> Path("root/item[@important!='very']").select(
+    ...     XML('<root><item/><item important="notso"/></root>')).render()
+    '<item/><item important="notso"/>'
+    """
+
+    _TOKEN_RE = re.compile('(::|\.\.|\(\)|[/.:\[\]\(\)@=!])|'
+                           '([^/:\[\]\(\)@=!\s]+)|'
+                           '\s+')
+
+    def __init__(self, text):
+        self.source = text
+
+        steps = []
+        cur_op = ''
+        cur_tag = ''
+        in_predicate = False
+        for op, tag in self._TOKEN_RE.findall(text):
+            if op:
+                if op == '[':
+                    in_predicate = True
+                elif op == ']':
+                    in_predicate = False
+                elif op.startswith('('):
+                    if cur_tag == 'text':
+                        steps[-1] = (False, self.fn_text(), [])
+                    else:
+                        raise NotImplementedError('XPath function "%s" not '
+                                                  'supported' % cur_tag)
+                else:
+                    cur_op += op
+                cur_tag = ''
+            else:
+                closure = cur_op in ('', '//')
+                if cur_op == '@':
+                    if tag == '*':
+                        node_test = self.any_attribute()
+                    else:
+                        node_test = self.attribute_by_name(tag)
+                else:
+                    if tag == '*':
+                        node_test = self.any_element()
+                    elif in_predicate:
+                        if len(tag) > 1 and (tag[0], tag[-1]) in _QUOTES:
+                            node_test = self.literal_string(tag[1:-1])
+                        if cur_op == '=':
+                            node_test = self.op_eq(steps[-1][2][-1], node_test)
+                            steps[-1][2].pop()
+                        elif cur_op == '!=':
+                            node_test = self.op_neq(steps[-1][2][-1], node_test)
+                            steps[-1][2].pop()
+                    else:
+                        node_test = self.element_by_name(tag)
+                if in_predicate:
+                    steps[-1][2].append(node_test)
+                else:
+                    steps.append([closure, node_test, []])
+                cur_op = ''
+                cur_tag = tag
+        self.steps = steps
+
+    def __repr__(self):
+        return '<%s "%s">' % (self.__class__.__name__, self.source)
+
+    def select(self, stream):
+        stream = iter(stream)
+        def _generate(tests):
+            test = self.test()
+            for kind, data, pos in stream:
+                result = test(kind, data, pos)
+                if result is True:
+                    yield kind, data, pos
+                    depth = 1
+                    while depth > 0:
+                        ev = stream.next()
+                        if ev[0] is Stream.START:
+                            depth += 1
+                        elif ev[0] is Stream.END:
+                            depth -= 1
+                        yield ev
+                        test(*ev)
+                elif result:
+                    yield result
+        return Stream(_generate(self.steps))
+
+    def test(self):
+        stack = [0] # stack of cursors into the location path
+
+        def _test(kind, data, pos):
+            #print '\nTracker %r test [%s] %r' % (self, kind, data)
+
+            if not stack:
+                return False
+
+            if kind is Stream.END:
+                stack.pop()
+                return None
+
+            if kind is Stream.START:
+                stack.append(stack[-1])
+
+            matched = False
+            closure, node_test, predicates = self.steps[stack[-1]]
+
+            #print '  Testing against %r' % node_test
+            matched = node_test(kind, data, pos)
+            if matched and predicates:
+                for predicate in predicates:
+                    if not predicate(kind, data, pos):
+                        matched = None
+                        break
+
+            if matched:
+                if stack[-1] == len(self.steps) - 1:
+                    #print '  Last step %r... returned %r' % (node_test, matched)
+                    return matched
+
+                #print '  Matched intermediate step %r... proceed to next step %r' % (node_test, self.steps[stack[-1] + 1])
+                stack[-1] += 1
+
+            elif kind is Stream.START and not closure:
+                # FIXME: If this step is not a closure, it cannot be matched
+                #        until the current element is closed... so we need to
+                #        move the cursor back to the last closure and retest
+                #        that against the current element
+                closures = [step for step in self.steps[:stack[-1]] if step[0]]
+                closures.reverse()
+                for closure, node_test, predicates in closures:
+                    stack[-1] -= 1
+                    if closure:
+                        matched = node_test(kind, data, pos)
+                        if matched:
+                            stack[-1] += 1
+                        break
+
+            return None
+
+        return _test
+
+    class any_element(object):
+        def __call__(self, kind, data, pos):
+            if kind is Stream.START:
+                return True
+            return None
+        def __repr__(self):
+            return '<%s>' % self.__class__.__name__
+
+    class element_by_name(object):
+        def __init__(self, name):
+            self.name = QName(name)
+        def __call__(self, kind, data, pos):
+            if kind is Stream.START:
+                return data[0].localname == self.name
+            return None
+        def __repr__(self):
+            return '<%s "%s">' % (self.__class__.__name__, self.name)
+
+    class any_attribute(object):
+        def __call__(self, kind, data, pos):
+            if kind is Stream.START:
+                text = ''.join([val for name, val in data[1]])
+                if text:
+                    return Stream.TEXT, text, pos
+                return None
+            return None
+        def __repr__(self):
+            return '<%s>' % (self.__class__.__name__)
+
+    class attribute_by_name(object):
+        def __init__(self, name):
+            self.name = QName(name)
+        def __call__(self, kind, data, pos):
+            if kind is Stream.START:
+                if self.name in data[1]:
+                    return Stream.TEXT, data[1].get(self.name), pos
+                return None
+            return None
+        def __repr__(self):
+            return '<%s "%s">' % (self.__class__.__name__, self.name)
+
+    class fn_text(object):
+        def __call__(self, kind, data, pos):
+            if kind is Stream.TEXT:
+                return kind, data, pos
+            return None
+        def __repr__(self):
+            return '<%s>' % (self.__class__.__name__)
+
+    class literal_string(object):
+        def __init__(self, value):
+            self.value = value
+        def __call__(self, kind, data, pos):
+            return Stream.TEXT, self.value, (-1, -1)
+        def __repr__(self):
+            return '<%s>' % (self.__class__.__name__)
+
+    class op_eq(object):
+        def __init__(self, lval, rval):
+            self.lval = lval
+            self.rval = rval
+        def __call__(self, kind, data, pos):
+            lval = self.lval(kind, data, pos)
+            rval = self.rval(kind, data, pos)
+            return (lval and lval[1]) == (rval and rval[1])
+        def __repr__(self):
+            return '<%s %r = %r>' % (self.__class__.__name__, self.lval,
+                                     self.rval)
+
+    class op_neq(object):
+        def __init__(self, lval, rval):
+            self.lval = lval
+            self.rval = rval
+        def __call__(self, kind, data, pos):
+            lval = self.lval(kind, data, pos)
+            rval = self.rval(kind, data, pos)
+            return (lval and lval[1]) != (rval and rval[1])
+        def __repr__(self):
+            return '<%s %r != %r>' % (self.__class__.__name__, self.lval,
+                                      self.rval)
Copyright (C) 2012-2017 Edgewall Software