# HG changeset patch # User hodgestar # Date 1356786140 0 # Node ID 2bfd8f8d241c0c2d9f2a1e4c63173c140668c0e8 # Parent 99d4c481e4ebe11777019bfc4316b07f5d99ef76 Fix parsing of multi-byte characters that occur on 4K boundaries of HTML files (fixes #538). diff --git a/genshi/input.py b/genshi/input.py --- a/genshi/input.py +++ b/genshi/input.py @@ -16,6 +16,7 @@ """ from itertools import chain +import codecs import htmlentitydefs as entities import HTMLParser as html from xml.parsers import expat @@ -317,22 +318,23 @@ :raises ParseError: if the HTML text is not well formed """ def _generate(): + if self.encoding: + reader = codecs.getreader(self.encoding) + source = reader(self.source) + else: + source = self.source try: bufsize = 4 * 1024 # 4K done = False while 1: while not done and len(self._queue) == 0: - data = self.source.read(bufsize) + data = source.read(bufsize) if not data: # end of data self.close() done = True else: if not isinstance(data, unicode): - # bytes - if self.encoding: - data = data.decode(self.encoding) - else: - raise UnicodeError("source returned bytes, but no encoding specified") + raise UnicodeError("source returned bytes, but no encoding specified") self.feed(data) for kind, data, pos in self._queue: yield kind, data, pos @@ -432,7 +434,10 @@ fails """ if isinstance(text, unicode): - return Stream(list(HTMLParser(StringIO(text), encoding=encoding))) + # If it's unicode text the encoding should be set to None. + # The option to pass in an incorrect encoding is for ease + # of writing doctests that work in both Python 2.x and 3.x. + return Stream(list(HTMLParser(StringIO(text), encoding=None))) return Stream(list(HTMLParser(BytesIO(text), encoding=encoding))) diff --git a/genshi/tests/input.py b/genshi/tests/input.py --- a/genshi/tests/input.py +++ b/genshi/tests/input.py @@ -253,6 +253,13 @@ self.assertEqual((Stream.TEXT, "'"), events[1][:2]) self.assertEqual((Stream.END, 'span'), events[2][:2]) + def test_multibyte_character_on_chunk_boundary(self): + text = u'a' * ((4 * 1024) - 1) + u'\xe6' + events = list(HTMLParser(BytesIO(text.encode('utf-8')), + encoding='utf-8')) + self.assertEqual(1, len(events)) + self.assertEqual((Stream.TEXT, text), events[0][:2]) + def suite(): suite = unittest.TestSuite()