changeset 289:9d93e622f963

Moved MD5 checksum functionality into new module `bitten.util.md5sum`.
author cmlenz
date Sat, 15 Oct 2005 18:56:16 +0000
parents a5ed3341d9a9
children 86281f8eef44
files bitten/snapshot.py bitten/tests/slave.py bitten/tests/snapshot.py bitten/util/md5sum.py bitten/util/tests/__init__.py bitten/util/tests/md5sum.py
diffstat 6 files changed, 204 insertions(+), 66 deletions(-) [+]
line wrap: on
line diff
--- a/bitten/snapshot.py
+++ b/bitten/snapshot.py
@@ -35,7 +35,6 @@
 """
 
 import logging
-import md5
 import os
 try:
     import threading
@@ -44,25 +43,13 @@
 import time
 import zipfile
 
+from bitten.util import md5sum
+
 log = logging.getLogger('bitten.snapshot')
 
 MAX_SNAPSHOTS = 10
 SNAPSHOTS_DIR = 'snapshots'
 
-def _make_md5sum(filename):
-    """Generate an MD5 checksum for the specified file."""
-    md5sum = md5.new()
-    fileobj = file(filename, 'rb')
-    try:
-        while True:
-            chunk = fileobj.read(4096)
-            if not chunk:
-                break
-            md5sum.update(chunk)
-    finally:
-        fileobj.close()
-    return md5sum.hexdigest() + '  ' + filename
-
 
 class SnapshotManager(object):
     """Manages snapshot archives for a specific build configuration."""
@@ -113,19 +100,11 @@
             rev = rest[2:]
 
             filepath = os.path.join(self.directory, filename)
-            expected_md5sum = _make_md5sum(filepath)
-            md5sum_path = os.path.join(self.directory,
-                                       filename[:-4] + '.md5')
-            if not os.path.isfile(md5sum_path):
+            try:
+                md5sum.validate(filepath)
+            except md5sum.IntegrityError, e:
+                log.warning('Integrity error checking %s (e)', filepath, e)
                 continue
-            md5sum_file = file(md5sum_path)
-            try:
-                existing_md5sum = md5sum_file.read()
-                if existing_md5sum != expected_md5sum:
-                    continue
-            finally:
-                md5sum_file.close()
-
             mtime = os.path.getmtime(filepath)
 
             yield mtime, rev, filepath
@@ -140,7 +119,13 @@
                 for mtime, rev, path in self._index[limit:]:
                     log.debug('Removing snapshot %s', path)
                     os.remove(path)
-                    os.remove(path[:-4] + '.md5')
+                    md5file = path + '.md5'
+                    if os.path.isfile(md5file):
+                        os.remove(md5file)
+                    else:
+                        md5file = os.path.splitext(path)[0] + '.md5'
+                        if os.path.isfile(md5file):
+                            os.remove(md5file)
                 self._index = self._index[:limit]
         finally:
             self._lock.release()
@@ -198,7 +183,7 @@
                 log.debug('Adding directory %s to archive' % name)
                 for entry in node.get_entries():
                     _add_entry(entry)
-                time.sleep(.5) # be nice
+                time.sleep(.1) # be nice
             else:
                 path = os.path.join(prefix, name)
                 info = zipfile.ZipInfo(path)
@@ -211,13 +196,8 @@
         finally:
             zip.close()
 
-        # Create MD5 checksum
-        md5sum = _make_md5sum(filepath)
-        md5sum_file = file(filepath[:-4] + '.md5', 'w')
-        try:
-            md5sum_file.write(md5sum)
-        finally:
-            md5sum_file.close()
+        # Create MD5 checksum file
+        md5sum.write(filepath)
 
         self._lock.acquire()
         try:
--- a/bitten/tests/slave.py
+++ b/bitten/tests/slave.py
@@ -11,18 +11,11 @@
 import shutil
 import tempfile
 import unittest
-
-
-import md5
-import os
-import shutil
-import tempfile
-import unittest
 import zipfile
 
 from trac.test import Mock
 from bitten.slave import Slave, OrchestrationProfileHandler
-from bitten.util.beep import ProtocolError
+from bitten.util import beep
 
 
 class OrchestrationProfileHandlerTestCase(unittest.TestCase):
@@ -54,7 +47,7 @@
         zip = file(path, 'w')
         zip.write('INVALID')
         zip.close()
-        self.assertRaises(ProtocolError, self.handler.unpack_snapshot, 0,
+        self.assertRaises(beep.ProtocolError, self.handler.unpack_snapshot, 0,
                           os.path.dirname(path), 'invalid.zip')
 
     def test_unpack_invalid_zip_2(self):
@@ -66,7 +59,7 @@
         zip = file(path, 'w')
         zip.write('INVALIDINVALIDINVALIDINVALIDINVALIDINVALID')
         zip.close()
-        self.assertRaises(ProtocolError, self.handler.unpack_snapshot, 0,
+        self.assertRaises(beep.ProtocolError, self.handler.unpack_snapshot, 0,
                           os.path.dirname(path), 'invalid.zip')
 
 def suite():
--- a/bitten/tests/snapshot.py
+++ b/bitten/tests/snapshot.py
@@ -7,12 +7,6 @@
 # you should have received as part of this distribution. The terms
 # are also available at http://bitten.cmlenz.net/wiki/License.
 
-import os
-import shutil
-import tempfile
-import unittest
-
-
 import md5
 import os
 import shutil
@@ -22,7 +16,8 @@
 
 from trac.test import EnvironmentStub, Mock
 from bitten.model import BuildConfig
-from bitten.snapshot import SnapshotManager, _make_md5sum
+from bitten.snapshot import SnapshotManager
+from bitten.util import md5sum
 
 
 class SnapshotManagerTestCase(unittest.TestCase):
@@ -48,12 +43,7 @@
         fileobj = file(filename, 'w')
         fileobj.close()
         if create_md5sum:
-            md5sum = _make_md5sum(filename)
-            md5sum_file = file(filename[:-4] + '.md5', 'w')
-            try:
-                md5sum_file.write(md5sum)
-            finally:
-                md5sum_file.close()
+            md5sum.write(filename)
         return filename
 
     def test_empty(self):
@@ -101,12 +91,8 @@
         path1 = self._create_file(os.path.join('snapshots', 'foo_r123.zip'))
         path2 = self._create_file(os.path.join('snapshots', 'foo_r124.zip'),
                                  create_md5sum=False)
-        md5sum = md5.new('Foo bar')
-        md5sum_file = file(path2[:-4] + '.md5', 'w')
-        try:
-            md5sum_file.write(md5sum.hexdigest() + '  ' + path2)
-        finally:
-            md5sum_file.close()
+        
+        md5sum.write(path1, path2 + '.md5')
         snapshots = SnapshotManager(self.config)
         self.assertEqual(path1, snapshots.get(123))
         self.assertEqual(None, snapshots.get(124))
new file mode 100644
--- /dev/null
+++ b/bitten/util/md5sum.py
@@ -0,0 +1,85 @@
+# -*- coding: iso8859-1 -*-
+#
+# Copyright (C) 2005 Christopher Lenz <cmlenz@gmx.de>
+# All rights reserved.
+#
+# This software is licensed as described in the file COPYING, which
+# you should have received as part of this distribution. The terms
+# are also available at http://bitten.cmlenz.net/wiki/License.
+
+"""Convenience functions for creating and validating MD5 checksums for files."""
+
+import md5
+import os
+
+
+class IntegrityError(Exception):
+    """Exception raised when checksum validation fails."""
+
+
+def generate(filename):
+    """Generate an MD5 checksum for the specified file.
+    
+    @param filename: the absolute path to the file
+    @return: string containing the checksum
+    """
+    md5sum = md5.new()
+    fileobj = file(filename, 'rb')
+    try:
+        while True:
+            chunk = fileobj.read(4096)
+            if not chunk:
+                break
+            md5sum.update(chunk)
+    finally:
+        fileobj.close()
+    return md5sum.hexdigest() + '  ' + filename
+
+def write(filename, md5file=None):
+    """Write an MD5 checksum file for the specified file.
+    
+    @param filename: absolute path to the file
+    @param md5file: absolute path to the MD5 checksum file to create (optional)
+    @return: the absolute path to the created checksum file
+    
+    If the `md5file` parameter is omitted, this function will write the checksum
+    to a file alongside the orignal file, with an added `.md5` extension.
+    """
+    if md5file is None:
+        md5file = filename + '.md5'
+
+    fileobj = file(md5file, 'w')
+    try:
+        fileobj.write(generate(filename))
+    finally:
+        fileobj.close()
+    return md5file
+
+def validate(filename, checksum=None):
+    """Check the integrity of a specified file against an MD5 checksum.
+    
+    @param filename: the absolute path to the file
+    @param checksum: string containing the checksum (optional)
+
+    If the second parameter is omitted, this function will look for a file with
+    an `.md5` extension alongside the original file, and try to read the
+    checksum from that file. If no such file is found, an `IntegrityError` is
+    raised.
+
+    If the file does not match the checksum, an `IntegrityError` is raised.
+    """
+    if checksum is None:
+        md5file = filename + '.md5'
+        if not os.path.isfile(md5file):
+            md5file = os.path.splitext(filename)[0] + '.md5'
+            if not os.path.isfile(md5file):
+                raise IntegrityError, 'Checksum file not found'
+        fileobj = file(md5file, 'r')
+        try:
+            checksum = fileobj.read()
+        finally:
+            fileobj.close()
+
+    expected = generate(filename)
+    if expected != checksum:
+        raise IntegrityError, 'Checksum does not match'
--- a/bitten/util/tests/__init__.py
+++ b/bitten/util/tests/__init__.py
@@ -11,11 +11,12 @@
 import unittest
 
 from bitten.util import xmlio
-from bitten.util.tests import beep
+from bitten.util.tests import beep, md5sum
 
 def suite():
     suite = unittest.TestSuite()
     suite.addTest(beep.suite())
+    suite.addTest(md5sum.suite())
     suite.addTest(doctest.DocTestSuite(xmlio))
     return suite
 
new file mode 100644
--- /dev/null
+++ b/bitten/util/tests/md5sum.py
@@ -0,0 +1,93 @@
+# -*- coding: iso8859-1 -*-
+#
+# Copyright (C) 2005 Christopher Lenz <cmlenz@gmx.de>
+# All rights reserved.
+#
+# This software is licensed as described in the file COPYING, which
+# you should have received as part of this distribution. The terms
+# are also available at http://bitten.cmlenz.net/wiki/License.
+
+import md5
+import os
+import shutil
+import tempfile
+import unittest
+
+from bitten.util import md5sum
+
+
+class Md5sumTestCase(unittest.TestCase):
+
+    def setUp(self):
+        self.tempdir = os.path.realpath(tempfile.mkdtemp(suffix='bitten_test'))
+
+    def tearDown(self):
+        shutil.rmtree(self.tempdir)
+
+    def _create_file(self, name, content=None):
+        filename = os.path.join(self.tempdir, name)
+        fd = file(filename, 'w')
+        if content:
+            fd.write(content)
+        fd.close()
+        return filename
+
+    def test_generate(self):
+        filename = self._create_file('test.xyz', 'Foo bar')
+        checksum = md5sum.generate(filename).split('  ')
+        self.assertEqual(md5.new('Foo bar').hexdigest(), checksum[0])
+        self.assertEqual(filename, checksum[1])
+
+    def test_write(self):
+        filename = self._create_file('test.xyz', 'Foo bar')
+        md5file = md5sum.write(filename)
+        self.assertEqual(filename + '.md5', md5file)
+        fileobj = file(md5file, 'r')
+        try:
+            checksum = fileobj.read().split('  ')
+        finally:
+            fileobj.close()
+        self.assertEqual(md5.new('Foo bar').hexdigest(), checksum[0])
+        self.assertEqual(filename, checksum[1])
+
+    def test_write_with_md5file(self):
+        filename = self._create_file('test.xyz', 'Foo bar')
+        md5file = os.path.join(self.tempdir, 'test.md5')
+        self.assertEqual(md5file, md5sum.write(filename, md5file=md5file))
+        fileobj = file(md5file, 'r')
+        try:
+            checksum = fileobj.read().split('  ')
+        finally:
+            fileobj.close()
+        self.assertEqual(md5.new('Foo bar').hexdigest(), checksum[0])
+        self.assertEqual(filename, checksum[1])
+
+    def test_validate_missing(self):
+        filename = self._create_file('test.xyz', 'Foo bar')
+        self.assertRaises(md5sum.IntegrityError, md5sum.validate, filename)
+
+    def test_validate_incorrect_digest(self):
+        filename = self._create_file('test.xyz', 'Foo bar')
+        checksum = md5.new('Foo baz').hexdigest() + '  ' + filename
+        md5file = self._create_file('test.xyz.md5', checksum)
+        self.assertRaises(md5sum.IntegrityError, md5sum.validate, filename)
+
+    def test_validate_incorrect_path(self):
+        filename = self._create_file('test.xyz', 'Foo bar')
+        checksum = md5.new('Foo bar').hexdigest() + '  ' + '/etc/test'
+        md5file = self._create_file('test.xyz.md5', checksum)
+        self.assertRaises(md5sum.IntegrityError, md5sum.validate, filename)
+
+    def test_validate_with_checksum(self):
+        filename = self._create_file('test.xyz', 'Foo bar')
+        checksum = md5.new('Foo bar').hexdigest() + '  ' + filename
+        md5sum.validate(filename, checksum)
+
+
+def suite():
+    suite = unittest.TestSuite()
+    suite.addTest(unittest.makeSuite(Md5sumTestCase, 'test'))
+    return suite
+
+if __name__ == '__main__':
+    unittest.main(defaultTest='suite')
Copyright (C) 2012-2017 Edgewall Software