Mercurial > bitten > bitten-test
changeset 650:b1a50f2d92eb
0.6dev: Database upgrade to ensure no duplicate builds are created due to thread race condition when populating builds. Threaded test included.
Thanks to Hodgestar for patch! Closes #214.
author | osimons |
---|---|
date | Mon, 24 Aug 2009 23:03:38 +0000 |
parents | eed0149c302a |
children | 44a862c1e559 |
files | bitten/model.py bitten/queue.py bitten/tests/queue.py bitten/upgrades.py |
diffstat | 4 files changed, 78 insertions(+), 5 deletions(-) [+] |
line wrap: on
line diff
--- a/bitten/model.py +++ b/bitten/model.py @@ -341,7 +341,7 @@ Column('rev_time', type='int'), Column('platform', type='int'), Column('slave'), Column('started', type='int'), Column('stopped', type='int'), Column('status', size=1), - Index(['config', 'rev', 'slave']) + Index(['config', 'rev', 'platform'], unique=True) ], Table('bitten_slave', key=('build', 'propname'))[ Column('build', type='int'), Column('propname'), Column('propvalue') @@ -998,4 +998,4 @@ schema = BuildConfig._schema + TargetPlatform._schema + Build._schema + \ BuildStep._schema + BuildLog._schema + Report._schema -schema_version = 9 +schema_version = 10
--- a/bitten/queue.py +++ b/bitten/queue.py @@ -246,9 +246,17 @@ builds.append(build) for build in builds: - build.insert(db=db) - - db.commit() + try: + build.insert(db=db) + db.commit() + except Exception, e: + # really only want to catch IntegrityErrors raised when + # a second slave attempts to add builds with the same + # (config, platform, rev) as an existing build. + self.log.info('Failed to insert build of configuration "%s" ' + 'at revision [%s] on platform [%s]: %s', + build.config, build.rev, build.platform, e) + db.rollback() def reset_orphaned_builds(self): """Reset all in-progress builds to ``PENDING`` state if they've been
--- a/bitten/tests/queue.py +++ b/bitten/tests/queue.py @@ -11,6 +11,7 @@ import os import shutil import tempfile +import threading import time import unittest @@ -261,6 +262,63 @@ self.assertEqual(platform2.id, builds[5].platform) self.assertEqual('120', builds[5].rev) + def test_populate_thread_race_condition(self): + messages = [] + self.env.log = Mock(info=lambda msg, *args: messages.append(msg)) + def get_history(): + yield ('somepath', 123, 'edit') + yield ('somepath', 121, 'edit') + yield ('somepath', 120, 'edit') + time.sleep(1) # sleep to make sure both threads collect + self.env.get_repository = lambda authname=None: Mock( + get_changeset=lambda rev: Mock(date=to_datetime(rev * 1000, utc)), + get_node=lambda path, rev=None: Mock( + get_entries=lambda: [Mock(), Mock()], + get_history=get_history + ), + normalize_path=lambda path: path, + rev_older_than=lambda rev1, rev2: rev1 < rev2 + ) + BuildConfig(self.env, 'test', path='somepath', active=True).insert() + platform1 = TargetPlatform(self.env, config='test', name='P1') + platform1.insert() + platform2 = TargetPlatform(self.env, config='test', name='P2') + platform2.insert() + + class BuildPopulator(threading.Thread): + def __init__(self, env): + self.env = env + threading.Thread.__init__(self) + def run(self): + queue = BuildQueue(self.env, build_all=True) + queue.populate() + + thread1 = BuildPopulator(self.env) + thread2 = BuildPopulator(self.env) + thread1.start(); thread2.start() + thread1.join(); thread2.join() + + # check builds got added + builds = list(Build.select(self.env, config='test')) + builds.sort(lambda a, b: cmp(a.platform, b.platform)) + self.assertEqual(6, len(builds)) + self.assertEqual(platform1.id, builds[0].platform) + self.assertEqual('123', builds[0].rev) + self.assertEqual(platform1.id, builds[1].platform) + self.assertEqual('121', builds[1].rev) + self.assertEqual(platform1.id, builds[2].platform) + self.assertEqual('120', builds[2].rev) + self.assertEqual(platform2.id, builds[3].platform) + self.assertEqual('123', builds[3].rev) + self.assertEqual(platform2.id, builds[4].platform) + self.assertEqual('121', builds[4].rev) + self.assertEqual(platform2.id, builds[5].platform) + self.assertEqual('120', builds[5].rev) + + # check attempts at duplicate inserts were logged. + failure_messages = [x for x in messages if x.startswith('Failed to insert build')] + self.assertEqual(6, len(failure_messages)) + def test_should_delete_build_platform_dont_exist(self): messages = [] self.env.log = Mock(info=lambda msg, *args: messages.append(msg))
--- a/bitten/upgrades.py +++ b/bitten/upgrades.py @@ -358,6 +358,12 @@ cursor.execute(stmt) cursor.execute("INSERT INTO bitten_rule (id,propname,pattern,orderno) SELECT %s,propname,pattern,orderno FROM old_rule" % db.cast('id', 'int')) +def add_config_platform_rev_index_to_build(env, db): + """Adds a unique index on (config, platform, rev) to the bitten_build table. + Also drops the old index on bitten_build that serves no real purpose anymore.""" + cursor = db.cursor() + cursor.execute("CREATE UNIQUE INDEX bitten_build_config_rev_platform_idx ON bitten_build (config,rev,platform)") + cursor.execute("DROP INDEX bitten_build_config_rev_slave_idx") map = { 2: [add_log_table], @@ -368,4 +374,5 @@ 7: [add_error_table], 8: [add_filename_to_logs,migrate_logs_to_files], 9: [recreate_rule_with_int_id], + 10: [add_config_platform_rev_index_to_build], }