diff examples/trac/trac/versioncontrol/tests/cache.py @ 39:93b4dcbafd7b trunk

Copy Trac to main branch.
author cmlenz
date Mon, 03 Jul 2006 18:53:27 +0000
parents
children
line wrap: on
line diff
new file mode 100644
--- /dev/null
+++ b/examples/trac/trac/versioncontrol/tests/cache.py
@@ -0,0 +1,145 @@
+# -*- coding: utf-8 -*-
+#
+# Copyright (C) 2005 Edgewall Software
+# Copyright (C) 2005 Christopher Lenz <cmlenz@gmx.de>
+# All rights reserved.
+#
+# This software is licensed as described in the file COPYING, which
+# you should have received as part of this distribution. The terms
+# are also available at http://trac.edgewall.com/license.html.
+#
+# This software consists of voluntary contributions made by many
+# individuals. For the exact contribution history, see the revision
+# history and logs, available at http://projects.edgewall.com/trac/.
+#
+# Author: Christopher Lenz <cmlenz@gmx.de>
+
+from trac.log import logger_factory
+from trac.test import Mock, InMemoryDatabase
+from trac.versioncontrol import Repository, Changeset, Node
+from trac.versioncontrol.cache import CachedRepository
+
+import time
+import unittest
+
+
+class CacheTestCase(unittest.TestCase):
+
+    def setUp(self):
+        self.db = InMemoryDatabase()
+        self.log = logger_factory('test')
+
+    def test_initial_sync_with_empty_repos(self):
+        changeset = Mock(Changeset, 0, '', '', 42000,
+                         get_changes=lambda: [])
+        repos = Mock(Repository, 'test-repos', None, self.log,
+                     get_changeset=lambda x: changeset,
+                     get_oldest_rev=lambda: 0,
+                     get_youngest_rev=lambda: 0,
+                     normalize_rev=lambda x: x,
+                     next_rev=lambda x: None)
+        cache = CachedRepository(self.db, repos, None, self.log)
+        cache.sync()
+
+        cursor = self.db.cursor()
+        cursor.execute("SELECT rev,time,author,message FROM revision")
+        self.assertEquals(('0', 42000, '', ''), cursor.fetchone())
+        cursor.execute("SELECT COUNT(*) FROM node_change")
+        self.assertEquals(0, cursor.fetchone()[0])
+
+    def test_initial_sync(self):
+        changes = [('trunk', Node.DIRECTORY, Changeset.ADD, None, None),
+                   ('trunk/README', Node.FILE, Changeset.ADD, None, None)]
+        changesets = [Mock(Changeset, 0, '', '', 41000,
+                           get_changes=lambda: []),
+                      Mock(Changeset, 1, 'Import', 'joe', 42000,
+                           get_changes=lambda: iter(changes))]
+        repos = Mock(Repository, 'test-repos', None, self.log,
+                     get_changeset=lambda x: changesets[int(x)],
+                     get_oldest_rev=lambda: 0,
+                     get_youngest_rev=lambda: 1,
+                     normalize_rev=lambda x: x,
+                     next_rev=lambda x: int(x) == 0 and 1 or None)
+        cache = CachedRepository(self.db, repos, None, self.log)
+        cache.sync()
+
+        cursor = self.db.cursor()
+        cursor.execute("SELECT rev,time,author,message FROM revision")
+        self.assertEquals(('0', 41000, '', ''), cursor.fetchone())
+        self.assertEquals(('1', 42000, 'joe', 'Import'), cursor.fetchone())
+        self.assertEquals(None, cursor.fetchone())
+        cursor.execute("SELECT rev,path,node_type,change_type,base_path,"
+                       "base_rev FROM node_change")
+        self.assertEquals(('1', 'trunk', 'D', 'A', None, None),
+                          cursor.fetchone())
+        self.assertEquals(('1', 'trunk/README', 'F', 'A', None, None),
+                          cursor.fetchone())
+        self.assertEquals(None, cursor.fetchone())
+
+    def test_update_sync(self):
+        cursor = self.db.cursor()
+        cursor.execute("INSERT INTO revision (rev,time,author,message) "
+                       "VALUES (0,41000,'','')")
+        cursor.execute("INSERT INTO revision (rev,time,author,message) "
+                       "VALUES (1,42000,'joe','Import')")
+        cursor.executemany("INSERT INTO node_change (rev,path,node_type,"
+                           "change_type,base_path,base_rev) "
+                           "VALUES ('1',%s,%s,%s,%s,%s)",
+                           [('trunk', 'D', 'A', None, None),
+                            ('trunk/README', 'F', 'A', None, None)])
+
+        changes = [('trunk/README', Node.FILE, Changeset.EDIT, 'trunk/README', 1)]
+        changeset = Mock(Changeset, 2, 'Update', 'joe', 42042,
+                         get_changes=lambda: iter(changes))
+        repos = Mock(Repository, 'test-repos', None, self.log,
+                     get_changeset=lambda x: changeset,
+                     get_youngest_rev=lambda: 2,
+                     next_rev=lambda x: int(x) == 1 and 2 or None)
+        cache = CachedRepository(self.db, repos, None, self.log)
+        cache.sync()
+
+        cursor = self.db.cursor()
+        cursor.execute("SELECT time,author,message FROM revision WHERE rev='2'")
+        self.assertEquals((42042, 'joe', 'Update'), cursor.fetchone())
+        self.assertEquals(None, cursor.fetchone())
+        cursor.execute("SELECT path,node_type,change_type,base_path,base_rev "
+                       "FROM node_change WHERE rev='2'")
+        self.assertEquals(('trunk/README', 'F', 'E', 'trunk/README', '1'),
+                          cursor.fetchone())
+        self.assertEquals(None, cursor.fetchone())
+
+    def test_get_changes(self):
+        cursor = self.db.cursor()
+        cursor.execute("INSERT INTO revision (rev,time,author,message) "
+                       "VALUES (0,41000,'','')")
+        cursor.execute("INSERT INTO revision (rev,time,author,message) "
+                       "VALUES (1,42000,'joe','Import')")
+        cursor.executemany("INSERT INTO node_change (rev,path,node_type,"
+                           "change_type,base_path,base_rev) "
+                           "VALUES ('1',%s,%s,%s,%s,%s)",
+                           [('trunk', 'D', 'A', None, None),
+                            ('trunk/README', 'F', 'A', None, None)])
+
+        repos = Mock(Repository, 'test-repos', None, self.log,
+                     get_changeset=lambda x: None,
+                     get_youngest_rev=lambda: 1,
+                     next_rev=lambda x: None, normalize_rev=lambda rev: rev)
+        cache = CachedRepository(self.db, repos, None, self.log)
+        self.assertEqual(1, cache.youngest_rev)
+        changeset = cache.get_changeset(1)
+        self.assertEqual('joe', changeset.author)
+        self.assertEqual('Import', changeset.message)
+        self.assertEqual(42000, changeset.date)
+        changes = changeset.get_changes()
+        self.assertEqual(('trunk', Node.DIRECTORY, Changeset.ADD, None, None),
+                         changes.next())
+        self.assertEqual(('trunk/README', Node.FILE, Changeset.ADD, None, None),
+                         changes.next())
+        self.assertRaises(StopIteration, changes.next)
+
+
+def suite():
+    return unittest.makeSuite(CacheTestCase, 'test')
+
+if __name__ == '__main__':
+    unittest.main()
Copyright (C) 2012-2017 Edgewall Software