Mercurial > genshi > genshi-test
changeset 144:28b56f09a7e1
* Coalesce adjacent text events that the parsers would produce when text crossed the buffer boundaries. Fixes #26.
* Fix handling of character and entity references in the HTML parser
author | cmlenz |
---|---|
date | Fri, 11 Aug 2006 16:34:35 +0000 |
parents | ef761afcedff |
children | 56d534eb53f9 |
files | markup/input.py markup/tests/filters.py |
diffstat | 2 files changed, 138 insertions(+), 98 deletions(-) [+] |
line wrap: on
line diff
--- a/markup/input.py +++ b/markup/input.py @@ -11,6 +11,7 @@ # individuals. For the exact contribution history, see the revision # history and logs, available at http://markup.edgewall.org/log/. +from itertools import chain from xml.parsers import expat try: frozenset @@ -21,6 +22,10 @@ from StringIO import StringIO from markup.core import Attributes, Markup, QName, Stream +from markup.core import DOCTYPE, START, END, START_NS, END_NS, TEXT, \ + START_CDATA, END_CDATA, PI, COMMENT + +__all__ = ['ParseError', 'XMLParser', 'XML', 'HTMLParser', 'HTML'] class ParseError(Exception): @@ -82,35 +87,40 @@ self.expat = parser self._queue = [] + def parse(self): + def _generate(): + try: + bufsize = 4 * 1024 # 4K + done = False + while 1: + while not done and len(self._queue) == 0: + data = self.source.read(bufsize) + if data == '': # end of data + if hasattr(self, 'expat'): + self.expat.Parse('', True) + del self.expat # get rid of circular references + done = True + else: + self.expat.Parse(data, False) + for event in self._queue: + yield event + self._queue = [] + if done: + break + except expat.ExpatError, e: + msg = str(e) + if self.filename: + msg += ', in ' + self.filename + raise ParseError(msg, self.filename, e.lineno, e.offset) + return Stream(_generate()).filter(CoalesceFilter()) + def __iter__(self): - try: - bufsize = 4 * 1024 # 4K - done = False - while 1: - while not done and len(self._queue) == 0: - data = self.source.read(bufsize) - if data == '': # end of data - if hasattr(self, 'expat'): - self.expat.Parse('', True) - del self.expat # get rid of circular references - done = True - else: - self.expat.Parse(data, False) - for event in self._queue: - yield event - self._queue = [] - if done: - break - except expat.ExpatError, e: - msg = str(e) - if self.filename: - msg += ', in ' + self.filename - raise ParseError(msg, self.filename, e.lineno, e.offset) + return iter(self.parse()) def _enqueue(self, kind, data=None, pos=None): if pos is None: pos = self._getpos() - if kind is Stream.TEXT: + if kind is TEXT: # Expat reports the *end* of the text event as current position. We # try to fix that up here as much as possible. Unfortunately, the # offset is only valid for single-line text. For multi-line text, @@ -134,41 +144,41 @@ self.expat.CurrentColumnNumber) def _handle_start(self, tag, attrib): - self._enqueue(Stream.START, (QName(tag), Attributes(attrib.items()))) + self._enqueue(START, (QName(tag), Attributes(attrib.items()))) def _handle_end(self, tag): - self._enqueue(Stream.END, QName(tag)) + self._enqueue(END, QName(tag)) def _handle_data(self, text): - self._enqueue(Stream.TEXT, text) + self._enqueue(TEXT, text) def _handle_doctype(self, name, sysid, pubid, has_internal_subset): - self._enqueue(Stream.DOCTYPE, (name, pubid, sysid)) + self._enqueue(DOCTYPE, (name, pubid, sysid)) def _handle_start_ns(self, prefix, uri): - self._enqueue(Stream.START_NS, (prefix or '', uri)) + self._enqueue(START_NS, (prefix or '', uri)) def _handle_end_ns(self, prefix): - self._enqueue(Stream.END_NS, prefix or '') + self._enqueue(END_NS, prefix or '') def _handle_start_cdata(self): - self._enqueue(Stream.START_CDATA) + self._enqueue(START_CDATA) def _handle_end_cdata(self): - self._enqueue(Stream.END_CDATA) + self._enqueue(END_CDATA) def _handle_pi(self, target, data): - self._enqueue(Stream.PI, (target, data)) + self._enqueue(PI, (target, data)) def _handle_comment(self, text): - self._enqueue(Stream.COMMENT, text) + self._enqueue(COMMENT, text) def _handle_other(self, text): if text.startswith('&'): # deal with undefined entities try: text = unichr(htmlentitydefs.name2codepoint[text[1:-1]]) - self._enqueue(Stream.TEXT, text) + self._enqueue(TEXT, text) except KeyError: lineno, offset = self._getpos() raise expat.error("undefined entity %s: line %d, column %d" % @@ -208,32 +218,37 @@ self._queue = [] self._open_tags = [] + def parse(self): + def _generate(): + try: + bufsize = 4 * 1024 # 4K + done = False + while 1: + while not done and len(self._queue) == 0: + data = self.source.read(bufsize) + if data == '': # end of data + self.close() + done = True + else: + self.feed(data) + for kind, data, pos in self._queue: + yield kind, data, pos + self._queue = [] + if done: + open_tags = self._open_tags + open_tags.reverse() + for tag in open_tags: + yield END, QName(tag), pos + break + except html.HTMLParseError, e: + msg = '%s: line %d, column %d' % (e.msg, e.lineno, e.offset) + if self.filename: + msg += ', in %s' % self.filename + raise ParseError(msg, self.filename, e.lineno, e.offset) + return Stream(_generate()).filter(CoalesceFilter()) + def __iter__(self): - try: - bufsize = 4 * 1024 # 4K - done = False - while 1: - while not done and len(self._queue) == 0: - data = self.source.read(bufsize) - if data == '': # end of data - self.close() - done = True - else: - self.feed(data) - for kind, data, pos in self._queue: - yield kind, data, pos - self._queue = [] - if done: - open_tags = self._open_tags - open_tags.reverse() - for tag in open_tags: - yield Stream.END, QName(tag), pos - break - except html.HTMLParseError, e: - msg = '%s: line %d, column %d' % (e.msg, e.lineno, e.offset) - if self.filename: - msg += ', in %s' % self.filename - raise ParseError(msg, self.filename, e.lineno, e.offset) + return iter(self.parse()) def _enqueue(self, kind, data, pos=None): if pos is None: @@ -251,9 +266,9 @@ value = name fixed_attrib.append((name, unicode(value))) - self._enqueue(Stream.START, (QName(tag), Attributes(fixed_attrib))) + self._enqueue(START, (QName(tag), Attributes(fixed_attrib))) if tag in self._EMPTY_ELEMS: - self._enqueue(Stream.END, QName(tag)) + self._enqueue(END, QName(tag)) else: self._open_tags.append(tag) @@ -263,26 +278,51 @@ open_tag = self._open_tags.pop() if open_tag.lower() == tag.lower(): break - self._enqueue(Stream.END, QName(open_tag)) - self._enqueue(Stream.END, QName(tag)) + self._enqueue(END, QName(open_tag)) + self._enqueue(END, QName(tag)) def handle_data(self, text): - self._enqueue(Stream.TEXT, text) + self._enqueue(TEXT, text) def handle_charref(self, name): - self._enqueue(Stream.TEXT, Markup('&#%s;' % name)) + text = unichr(int(name)) + self._enqueue(TEXT, text) def handle_entityref(self, name): - self._enqueue(Stream.TEXT, Markup('&%s;' % name)) + try: + text = unichr(htmlentitydefs.name2codepoint[name]) + except KeyError: + text = '&%s;' % name + self._enqueue(TEXT, text) def handle_pi(self, data): target, data = data.split(maxsplit=1) data = data.rstrip('?') - self._enqueue(Stream.PI, (target.strip(), data.strip())) + self._enqueue(PI, (target.strip(), data.strip())) def handle_comment(self, text): - self._enqueue(Stream.COMMENT, text) + self._enqueue(COMMENT, text) def HTML(text): return Stream(list(HTMLParser(StringIO(text)))) + + +class CoalesceFilter(object): + """Coalesces adjacent TEXT events into a single event.""" + + def __call__(self, stream, ctxt=None): + textbuf = [] + textpos = None + for kind, data, pos in chain(stream, [(None, None, None)]): + if kind is TEXT: + textbuf.append(data) + if textpos is None: + textpos = pos + else: + if textbuf: + yield TEXT, u''.join(textbuf), textpos + del textbuf[:] + textpos = None + if kind: + yield kind, data, pos
--- a/markup/tests/filters.py +++ b/markup/tests/filters.py @@ -23,97 +23,97 @@ def test_sanitize_unchanged(self): html = HTML('<a href="#">fo<br />o</a>') - self.assertEquals('<a href="#">fo<br/>o</a>', - str(html.filter(HTMLSanitizer()))) + self.assertEquals(u'<a href="#">fo<br/>o</a>', + unicode(html.filter(HTMLSanitizer()))) def test_sanitize_escape_text(self): html = HTML('<a href="#">fo&</a>') - self.assertEquals('<a href="#">fo&</a>', - str(html.filter(HTMLSanitizer()))) + self.assertEquals(u'<a href="#">fo&</a>', + unicode(html.filter(HTMLSanitizer()))) html = HTML('<a href="#"><foo></a>') - self.assertEquals('<a href="#"><foo></a>', - str(html.filter(HTMLSanitizer()))) + self.assertEquals(u'<a href="#"><foo></a>', + unicode(html.filter(HTMLSanitizer()))) def test_sanitize_entityref_text(self): html = HTML('<a href="#">foö</a>') - self.assertEquals(u'<a href="#">foö</a>', - str(html.filter(HTMLSanitizer()))) + self.assertEquals(u'<a href="#">foƶ</a>', + unicode(html.filter(HTMLSanitizer()))) def test_sanitize_escape_attr(self): html = HTML('<div title="<foo>"></div>') - self.assertEquals('<div title="<foo>"/>', - str(html.filter(HTMLSanitizer()))) + self.assertEquals(u'<div title="<foo>"/>', + unicode(html.filter(HTMLSanitizer()))) def test_sanitize_close_empty_tag(self): html = HTML('<a href="#">fo<br>o</a>') - self.assertEquals('<a href="#">fo<br/>o</a>', - str(html.filter(HTMLSanitizer()))) + self.assertEquals(u'<a href="#">fo<br/>o</a>', + unicode(html.filter(HTMLSanitizer()))) def test_sanitize_invalid_entity(self): html = HTML('&junk;') - self.assertEquals('&junk;', str(html.filter(HTMLSanitizer()))) + self.assertEquals('&junk;', unicode(html.filter(HTMLSanitizer()))) def test_sanitize_remove_script_elem(self): html = HTML('<script>alert("Foo")</script>') - self.assertEquals('', str(html.filter(HTMLSanitizer()))) + self.assertEquals(u'', unicode(html.filter(HTMLSanitizer()))) html = HTML('<SCRIPT SRC="http://example.com/"></SCRIPT>') - self.assertEquals('', str(html.filter(HTMLSanitizer()))) + self.assertEquals(u'', unicode(html.filter(HTMLSanitizer()))) self.assertRaises(ParseError, HTML, '<SCR\0IPT>alert("foo")</SCR\0IPT>') self.assertRaises(ParseError, HTML, '<SCRIPT&XYZ SRC="http://example.com/"></SCRIPT>') def test_sanitize_remove_onclick_attr(self): html = HTML('<div onclick=\'alert("foo")\' />') - self.assertEquals('<div/>', str(html.filter(HTMLSanitizer()))) + self.assertEquals(u'<div/>', unicode(html.filter(HTMLSanitizer()))) def test_sanitize_remove_style_scripts(self): # Inline style with url() using javascript: scheme html = HTML('<DIV STYLE=\'background: url(javascript:alert("foo"))\'>') - self.assertEquals('<div/>', str(html.filter(HTMLSanitizer()))) + self.assertEquals(u'<div/>', unicode(html.filter(HTMLSanitizer()))) # Inline style with url() using javascript: scheme, using control char html = HTML('<DIV STYLE=\'background: url(javascript:alert("foo"))\'>') - self.assertEquals('<div/>', str(html.filter(HTMLSanitizer()))) + self.assertEquals(u'<div/>', unicode(html.filter(HTMLSanitizer()))) # Inline style with url() using javascript: scheme, in quotes html = HTML('<DIV STYLE=\'background: url("javascript:alert(foo)")\'>') - self.assertEquals('<div/>', str(html.filter(HTMLSanitizer()))) + self.assertEquals(u'<div/>', unicode(html.filter(HTMLSanitizer()))) # IE expressions in CSS not allowed html = HTML('<DIV STYLE=\'width: expression(alert("foo"));\'>') - self.assertEquals('<div/>', str(html.filter(HTMLSanitizer()))) + self.assertEquals(u'<div/>', unicode(html.filter(HTMLSanitizer()))) html = HTML('<DIV STYLE=\'background: url(javascript:alert("foo"));' 'color: #fff\'>') - self.assertEquals('<div style="color: #fff"/>', - str(html.filter(HTMLSanitizer()))) + self.assertEquals(u'<div style="color: #fff"/>', + unicode(html.filter(HTMLSanitizer()))) def test_sanitize_remove_src_javascript(self): html = HTML('<img src=\'javascript:alert("foo")\'>') - self.assertEquals('<img/>', str(html.filter(HTMLSanitizer()))) + self.assertEquals(u'<img/>', unicode(html.filter(HTMLSanitizer()))) # Case-insensitive protocol matching html = HTML('<IMG SRC=\'JaVaScRiPt:alert("foo")\'>') - self.assertEquals('<img/>', str(html.filter(HTMLSanitizer()))) + self.assertEquals(u'<img/>', unicode(html.filter(HTMLSanitizer()))) # Grave accents (not parsed) self.assertRaises(ParseError, HTML, '<IMG SRC=`javascript:alert("RSnake says, \'foo\'")`>') # Protocol encoded using UTF-8 numeric entities html = HTML('<IMG SRC=\'javascri' 'pt:alert("foo")\'>') - self.assertEquals('<img/>', str(html.filter(HTMLSanitizer()))) + self.assertEquals(u'<img/>', unicode(html.filter(HTMLSanitizer()))) # Protocol encoded using UTF-8 numeric entities without a semicolon # (which is allowed because the max number of digits is used) html = HTML('<IMG SRC=\'java' 'script' ':alert("foo")\'>') - self.assertEquals('<img/>', str(html.filter(HTMLSanitizer()))) + self.assertEquals(u'<img/>', unicode(html.filter(HTMLSanitizer()))) # Protocol encoded using UTF-8 numeric hex entities without a semicolon # (which is allowed because the max number of digits is used) html = HTML('<IMG SRC=\'javascri' 'pt:alert("foo")\'>') - self.assertEquals('<img/>', str(html.filter(HTMLSanitizer()))) + self.assertEquals(u'<img/>', unicode(html.filter(HTMLSanitizer()))) # Embedded tab character in protocol html = HTML('<IMG SRC=\'jav\tascript:alert("foo");\'>') - self.assertEquals('<img/>', str(html.filter(HTMLSanitizer()))) + self.assertEquals(u'<img/>', unicode(html.filter(HTMLSanitizer()))) # Embedded tab character in protocol, but encoded this time html = HTML('<IMG SRC=\'jav	ascript:alert("foo");\'>') - self.assertEquals('<img/>', str(html.filter(HTMLSanitizer()))) + self.assertEquals(u'<img/>', unicode(html.filter(HTMLSanitizer()))) def suite():