39
|
1 # -*- coding: utf-8 -*-
|
|
2 #
|
|
3 # Copyright (C) 2005 Edgewall Software
|
|
4 # Copyright (C) 2005 Christopher Lenz <cmlenz@gmx.de>
|
|
5 # All rights reserved.
|
|
6 #
|
|
7 # This software is licensed as described in the file COPYING, which
|
|
8 # you should have received as part of this distribution. The terms
|
|
9 # are also available at http://trac.edgewall.com/license.html.
|
|
10 #
|
|
11 # This software consists of voluntary contributions made by many
|
|
12 # individuals. For the exact contribution history, see the revision
|
|
13 # history and logs, available at http://projects.edgewall.com/trac/.
|
|
14 #
|
|
15 # Author: Christopher Lenz <cmlenz@gmx.de>
|
|
16
|
|
17 from trac.log import logger_factory
|
|
18 from trac.test import Mock, InMemoryDatabase
|
|
19 from trac.versioncontrol import Repository, Changeset, Node
|
|
20 from trac.versioncontrol.cache import CachedRepository
|
|
21
|
|
22 import time
|
|
23 import unittest
|
|
24
|
|
25
|
|
26 class CacheTestCase(unittest.TestCase):
|
|
27
|
|
28 def setUp(self):
|
|
29 self.db = InMemoryDatabase()
|
|
30 self.log = logger_factory('test')
|
|
31
|
|
32 def test_initial_sync_with_empty_repos(self):
|
|
33 changeset = Mock(Changeset, 0, '', '', 42000,
|
|
34 get_changes=lambda: [])
|
|
35 repos = Mock(Repository, 'test-repos', None, self.log,
|
|
36 get_changeset=lambda x: changeset,
|
|
37 get_oldest_rev=lambda: 0,
|
|
38 get_youngest_rev=lambda: 0,
|
|
39 normalize_rev=lambda x: x,
|
|
40 next_rev=lambda x: None)
|
|
41 cache = CachedRepository(self.db, repos, None, self.log)
|
|
42 cache.sync()
|
|
43
|
|
44 cursor = self.db.cursor()
|
|
45 cursor.execute("SELECT rev,time,author,message FROM revision")
|
|
46 self.assertEquals(('0', 42000, '', ''), cursor.fetchone())
|
|
47 cursor.execute("SELECT COUNT(*) FROM node_change")
|
|
48 self.assertEquals(0, cursor.fetchone()[0])
|
|
49
|
|
50 def test_initial_sync(self):
|
|
51 changes = [('trunk', Node.DIRECTORY, Changeset.ADD, None, None),
|
|
52 ('trunk/README', Node.FILE, Changeset.ADD, None, None)]
|
|
53 changesets = [Mock(Changeset, 0, '', '', 41000,
|
|
54 get_changes=lambda: []),
|
|
55 Mock(Changeset, 1, 'Import', 'joe', 42000,
|
|
56 get_changes=lambda: iter(changes))]
|
|
57 repos = Mock(Repository, 'test-repos', None, self.log,
|
|
58 get_changeset=lambda x: changesets[int(x)],
|
|
59 get_oldest_rev=lambda: 0,
|
|
60 get_youngest_rev=lambda: 1,
|
|
61 normalize_rev=lambda x: x,
|
|
62 next_rev=lambda x: int(x) == 0 and 1 or None)
|
|
63 cache = CachedRepository(self.db, repos, None, self.log)
|
|
64 cache.sync()
|
|
65
|
|
66 cursor = self.db.cursor()
|
|
67 cursor.execute("SELECT rev,time,author,message FROM revision")
|
|
68 self.assertEquals(('0', 41000, '', ''), cursor.fetchone())
|
|
69 self.assertEquals(('1', 42000, 'joe', 'Import'), cursor.fetchone())
|
|
70 self.assertEquals(None, cursor.fetchone())
|
|
71 cursor.execute("SELECT rev,path,node_type,change_type,base_path,"
|
|
72 "base_rev FROM node_change")
|
|
73 self.assertEquals(('1', 'trunk', 'D', 'A', None, None),
|
|
74 cursor.fetchone())
|
|
75 self.assertEquals(('1', 'trunk/README', 'F', 'A', None, None),
|
|
76 cursor.fetchone())
|
|
77 self.assertEquals(None, cursor.fetchone())
|
|
78
|
|
79 def test_update_sync(self):
|
|
80 cursor = self.db.cursor()
|
|
81 cursor.execute("INSERT INTO revision (rev,time,author,message) "
|
|
82 "VALUES (0,41000,'','')")
|
|
83 cursor.execute("INSERT INTO revision (rev,time,author,message) "
|
|
84 "VALUES (1,42000,'joe','Import')")
|
|
85 cursor.executemany("INSERT INTO node_change (rev,path,node_type,"
|
|
86 "change_type,base_path,base_rev) "
|
|
87 "VALUES ('1',%s,%s,%s,%s,%s)",
|
|
88 [('trunk', 'D', 'A', None, None),
|
|
89 ('trunk/README', 'F', 'A', None, None)])
|
|
90
|
|
91 changes = [('trunk/README', Node.FILE, Changeset.EDIT, 'trunk/README', 1)]
|
|
92 changeset = Mock(Changeset, 2, 'Update', 'joe', 42042,
|
|
93 get_changes=lambda: iter(changes))
|
|
94 repos = Mock(Repository, 'test-repos', None, self.log,
|
|
95 get_changeset=lambda x: changeset,
|
|
96 get_youngest_rev=lambda: 2,
|
|
97 next_rev=lambda x: int(x) == 1 and 2 or None)
|
|
98 cache = CachedRepository(self.db, repos, None, self.log)
|
|
99 cache.sync()
|
|
100
|
|
101 cursor = self.db.cursor()
|
|
102 cursor.execute("SELECT time,author,message FROM revision WHERE rev='2'")
|
|
103 self.assertEquals((42042, 'joe', 'Update'), cursor.fetchone())
|
|
104 self.assertEquals(None, cursor.fetchone())
|
|
105 cursor.execute("SELECT path,node_type,change_type,base_path,base_rev "
|
|
106 "FROM node_change WHERE rev='2'")
|
|
107 self.assertEquals(('trunk/README', 'F', 'E', 'trunk/README', '1'),
|
|
108 cursor.fetchone())
|
|
109 self.assertEquals(None, cursor.fetchone())
|
|
110
|
|
111 def test_get_changes(self):
|
|
112 cursor = self.db.cursor()
|
|
113 cursor.execute("INSERT INTO revision (rev,time,author,message) "
|
|
114 "VALUES (0,41000,'','')")
|
|
115 cursor.execute("INSERT INTO revision (rev,time,author,message) "
|
|
116 "VALUES (1,42000,'joe','Import')")
|
|
117 cursor.executemany("INSERT INTO node_change (rev,path,node_type,"
|
|
118 "change_type,base_path,base_rev) "
|
|
119 "VALUES ('1',%s,%s,%s,%s,%s)",
|
|
120 [('trunk', 'D', 'A', None, None),
|
|
121 ('trunk/README', 'F', 'A', None, None)])
|
|
122
|
|
123 repos = Mock(Repository, 'test-repos', None, self.log,
|
|
124 get_changeset=lambda x: None,
|
|
125 get_youngest_rev=lambda: 1,
|
|
126 next_rev=lambda x: None, normalize_rev=lambda rev: rev)
|
|
127 cache = CachedRepository(self.db, repos, None, self.log)
|
|
128 self.assertEqual(1, cache.youngest_rev)
|
|
129 changeset = cache.get_changeset(1)
|
|
130 self.assertEqual('joe', changeset.author)
|
|
131 self.assertEqual('Import', changeset.message)
|
|
132 self.assertEqual(42000, changeset.date)
|
|
133 changes = changeset.get_changes()
|
|
134 self.assertEqual(('trunk', Node.DIRECTORY, Changeset.ADD, None, None),
|
|
135 changes.next())
|
|
136 self.assertEqual(('trunk/README', Node.FILE, Changeset.ADD, None, None),
|
|
137 changes.next())
|
|
138 self.assertRaises(StopIteration, changes.next)
|
|
139
|
|
140
|
|
141 def suite():
|
|
142 return unittest.makeSuite(CacheTestCase, 'test')
|
|
143
|
|
144 if __name__ == '__main__':
|
|
145 unittest.main()
|