# HG changeset patch # User cmlenz # Date 1129319408 0 # Node ID 33625fa61d6c981a8addff1ea1c8ff63ed3b970e # Parent 24ba49dee33ceea0a37b48445d92c7f7f09d304c * If a slave disconnects after the master has started to create a snapshot archive for it, just remain calm and keep the archive in place. * Fix a potential race condition where two slaves could trigger the same snapshot archive to be created. diff --git a/bitten/master.py b/bitten/master.py --- a/bitten/master.py +++ b/bitten/master.py @@ -177,6 +177,10 @@ if worker.isAlive(): self.master.schedule(2, _check_snapshot) else: + if self.name not in self.master.handlers: + # The slave disconnected while we were building + # the archive + return snapshot = snapshots.get(build.rev) if snapshot is None: log.error('Failed to create snapshot archive for ' diff --git a/bitten/snapshot.py b/bitten/snapshot.py --- a/bitten/snapshot.py +++ b/bitten/snapshot.py @@ -99,6 +99,8 @@ self._lock = threading.RLock() self._cleanup() + self._workers = {} + def _scan(self): """Find all existing snapshots in the directory.""" for filename in [f for f in os.listdir(self.directory) @@ -150,23 +152,31 @@ function is the thread object. The caller may use this object to check for completion of the operation. """ - prefix = self.prefix + '_r' + str(rev) - filename = prefix + '.zip' - filepath = os.path.join(self.directory, filename) - if os.path.exists(filepath): - raise IOError, 'Snapshot file already exists at %s' % filepath + self._lock.acquire() + try: + repos = self.env.get_repository() + root = repos.get_node(self.config.path or '/', rev) + assert root.isdir, '"%s" is not a directory' % self.config.path - repos = self.env.get_repository() - root = repos.get_node(self.config.path or '/', rev) - assert root.isdir, '"%s" is not a directory' % self.config.path + if root.rev in self._workers: + return self._workers[root.rev] - self._cleanup(self.limit - 1) + prefix = self.prefix + '_r' + str(rev) + filename = prefix + '.zip' + filepath = os.path.join(self.directory, filename) + if os.path.exists(filepath): + raise IOError, 'Snapshot file already exists at %s' % filepath - worker = threading.Thread(target=self._create, - args=(prefix, root, filepath), - name='Create snapshot %s' % filename) - worker.start() - return worker + self._cleanup(self.limit - 1) + + worker = threading.Thread(target=self._create, + args=(prefix, root, filepath), + name='Create snapshot %s' % filename) + worker.start() + self._workers[root.rev] = worker + return worker + finally: + self._lock.release() def _create(self, prefix, root, filepath): """Actually create a snapshot archive. @@ -185,6 +195,7 @@ path = os.path.join(prefix, name).rstrip('/\\') + '/' info = zipfile.ZipInfo(path) zip.writestr(info, '') + log.debug('Adding directory %s to archive' % name) for entry in node.get_entries(): _add_entry(entry) time.sleep(.5) # be nice @@ -211,6 +222,7 @@ self._lock.acquire() try: self._index.append((os.path.getmtime(filepath), root.rev, filepath)) + del self._workers[root.rev] finally: self._lock.release() log.info('Prepared snapshot archive at %s', filepath)