Mercurial > genshi > mirror
diff examples/trac/trac/versioncontrol/tests/cache.py @ 39:93b4dcbafd7b trunk
Copy Trac to main branch.
author | cmlenz |
---|---|
date | Mon, 03 Jul 2006 18:53:27 +0000 |
parents | |
children |
line wrap: on
line diff
new file mode 100644 --- /dev/null +++ b/examples/trac/trac/versioncontrol/tests/cache.py @@ -0,0 +1,145 @@ +# -*- coding: utf-8 -*- +# +# Copyright (C) 2005 Edgewall Software +# Copyright (C) 2005 Christopher Lenz <cmlenz@gmx.de> +# All rights reserved. +# +# This software is licensed as described in the file COPYING, which +# you should have received as part of this distribution. The terms +# are also available at http://trac.edgewall.com/license.html. +# +# This software consists of voluntary contributions made by many +# individuals. For the exact contribution history, see the revision +# history and logs, available at http://projects.edgewall.com/trac/. +# +# Author: Christopher Lenz <cmlenz@gmx.de> + +from trac.log import logger_factory +from trac.test import Mock, InMemoryDatabase +from trac.versioncontrol import Repository, Changeset, Node +from trac.versioncontrol.cache import CachedRepository + +import time +import unittest + + +class CacheTestCase(unittest.TestCase): + + def setUp(self): + self.db = InMemoryDatabase() + self.log = logger_factory('test') + + def test_initial_sync_with_empty_repos(self): + changeset = Mock(Changeset, 0, '', '', 42000, + get_changes=lambda: []) + repos = Mock(Repository, 'test-repos', None, self.log, + get_changeset=lambda x: changeset, + get_oldest_rev=lambda: 0, + get_youngest_rev=lambda: 0, + normalize_rev=lambda x: x, + next_rev=lambda x: None) + cache = CachedRepository(self.db, repos, None, self.log) + cache.sync() + + cursor = self.db.cursor() + cursor.execute("SELECT rev,time,author,message FROM revision") + self.assertEquals(('0', 42000, '', ''), cursor.fetchone()) + cursor.execute("SELECT COUNT(*) FROM node_change") + self.assertEquals(0, cursor.fetchone()[0]) + + def test_initial_sync(self): + changes = [('trunk', Node.DIRECTORY, Changeset.ADD, None, None), + ('trunk/README', Node.FILE, Changeset.ADD, None, None)] + changesets = [Mock(Changeset, 0, '', '', 41000, + get_changes=lambda: []), + Mock(Changeset, 1, 'Import', 'joe', 42000, + get_changes=lambda: iter(changes))] + repos = Mock(Repository, 'test-repos', None, self.log, + get_changeset=lambda x: changesets[int(x)], + get_oldest_rev=lambda: 0, + get_youngest_rev=lambda: 1, + normalize_rev=lambda x: x, + next_rev=lambda x: int(x) == 0 and 1 or None) + cache = CachedRepository(self.db, repos, None, self.log) + cache.sync() + + cursor = self.db.cursor() + cursor.execute("SELECT rev,time,author,message FROM revision") + self.assertEquals(('0', 41000, '', ''), cursor.fetchone()) + self.assertEquals(('1', 42000, 'joe', 'Import'), cursor.fetchone()) + self.assertEquals(None, cursor.fetchone()) + cursor.execute("SELECT rev,path,node_type,change_type,base_path," + "base_rev FROM node_change") + self.assertEquals(('1', 'trunk', 'D', 'A', None, None), + cursor.fetchone()) + self.assertEquals(('1', 'trunk/README', 'F', 'A', None, None), + cursor.fetchone()) + self.assertEquals(None, cursor.fetchone()) + + def test_update_sync(self): + cursor = self.db.cursor() + cursor.execute("INSERT INTO revision (rev,time,author,message) " + "VALUES (0,41000,'','')") + cursor.execute("INSERT INTO revision (rev,time,author,message) " + "VALUES (1,42000,'joe','Import')") + cursor.executemany("INSERT INTO node_change (rev,path,node_type," + "change_type,base_path,base_rev) " + "VALUES ('1',%s,%s,%s,%s,%s)", + [('trunk', 'D', 'A', None, None), + ('trunk/README', 'F', 'A', None, None)]) + + changes = [('trunk/README', Node.FILE, Changeset.EDIT, 'trunk/README', 1)] + changeset = Mock(Changeset, 2, 'Update', 'joe', 42042, + get_changes=lambda: iter(changes)) + repos = Mock(Repository, 'test-repos', None, self.log, + get_changeset=lambda x: changeset, + get_youngest_rev=lambda: 2, + next_rev=lambda x: int(x) == 1 and 2 or None) + cache = CachedRepository(self.db, repos, None, self.log) + cache.sync() + + cursor = self.db.cursor() + cursor.execute("SELECT time,author,message FROM revision WHERE rev='2'") + self.assertEquals((42042, 'joe', 'Update'), cursor.fetchone()) + self.assertEquals(None, cursor.fetchone()) + cursor.execute("SELECT path,node_type,change_type,base_path,base_rev " + "FROM node_change WHERE rev='2'") + self.assertEquals(('trunk/README', 'F', 'E', 'trunk/README', '1'), + cursor.fetchone()) + self.assertEquals(None, cursor.fetchone()) + + def test_get_changes(self): + cursor = self.db.cursor() + cursor.execute("INSERT INTO revision (rev,time,author,message) " + "VALUES (0,41000,'','')") + cursor.execute("INSERT INTO revision (rev,time,author,message) " + "VALUES (1,42000,'joe','Import')") + cursor.executemany("INSERT INTO node_change (rev,path,node_type," + "change_type,base_path,base_rev) " + "VALUES ('1',%s,%s,%s,%s,%s)", + [('trunk', 'D', 'A', None, None), + ('trunk/README', 'F', 'A', None, None)]) + + repos = Mock(Repository, 'test-repos', None, self.log, + get_changeset=lambda x: None, + get_youngest_rev=lambda: 1, + next_rev=lambda x: None, normalize_rev=lambda rev: rev) + cache = CachedRepository(self.db, repos, None, self.log) + self.assertEqual(1, cache.youngest_rev) + changeset = cache.get_changeset(1) + self.assertEqual('joe', changeset.author) + self.assertEqual('Import', changeset.message) + self.assertEqual(42000, changeset.date) + changes = changeset.get_changes() + self.assertEqual(('trunk', Node.DIRECTORY, Changeset.ADD, None, None), + changes.next()) + self.assertEqual(('trunk/README', Node.FILE, Changeset.ADD, None, None), + changes.next()) + self.assertRaises(StopIteration, changes.next) + + +def suite(): + return unittest.makeSuite(CacheTestCase, 'test') + +if __name__ == '__main__': + unittest.main()