changeset 210:c550e31c06d2

Implement providing input data to processes executed via the `CommandLine` class. The `<sh:pipe>` recipe command should now be functional. Closes #34. Also, add unit tests for the `CommandLine` class.
author cmlenz
date Tue, 20 Sep 2005 12:02:38 +0000
parents d2b9c72e9643
children d1ead1bfcc65
files bitten/build/api.py bitten/build/shtools.py bitten/build/tests/api.py
diffstat 3 files changed, 148 insertions(+), 46 deletions(-) [+]
line wrap: on
line diff
--- a/bitten/build/api.py
+++ b/bitten/build/api.py
@@ -7,13 +7,10 @@
 # you should have received as part of this distribution. The terms
 # are also available at http://bitten.cmlenz.net/wiki/License.
 
-from itertools import ifilterfalse
 import logging
 import fnmatch
 import os
 import shlex
-import shutil
-import time
 
 log = logging.getLogger('bitten.build.api')
 
@@ -30,42 +27,24 @@
     """Simple helper for executing subprocesses."""
     # TODO: Use 'subprocess' module if available (Python >= 2.4)
 
-    def __init__(self, executable, args, stdin=None, cwd=None):
+    def __init__(self, executable, args, input=None, cwd=None):
         """Initialize the CommandLine object.
         
         @param executable The name of the program to execute
         @param args A list of arguments to pass to the executable
+        @param input String or file-like object containing any input data for
+                     the program
         @param cwd The working directory to change to before executing the
                    command
         """
         self.executable = executable
         self.arguments = [str(arg) for arg in args]
-        self.stdin = stdin
+        self.input = input
         self.cwd = cwd
         if self.cwd:
             assert os.path.isdir(self.cwd)
         self.returncode = None
 
-        # TODO: On windows, map file name extension to application
-        if os.name == 'nt':
-            pass
-
-        # Shebang support for Posix systems
-        if os.path.isfile(self.executable):
-            executable_file = file(self.executable, 'r')
-            try:
-                for line in executable_file:
-                    if line.startswith('#!'):
-                        parts = shlex.split(line[2:])
-                        if len(parts) > 1:
-                            self.arguments[:0] = parts[1:] + [self.executable]
-                        else:
-                            self.arguments[:0] = [self.executable]
-                        self.executable = parts[0]
-                    break
-            finally:
-                executable_file.close()
-
     if os.name == 'nt': # windows
 
         def execute(self, timeout=None):
@@ -110,32 +89,41 @@
             log.debug('Executing %s', [self.executable] + self.arguments)
             pipe = popen2.Popen3([self.executable] + self.arguments,
                                  capturestderr=True)
-            if self.stdin:
-                if isinstance(self.stdin, basestring):
-                    pipe.tochild.write(self.stdin)
-                else:
-                    shutil.copyfileobj(self.stdin, pipe.tochild)
-            pipe.tochild.close()
+            if self.input:
+                if not isinstance(self.input, basestring):
+                    self.input = self.input.read()
+            else:
+                pipe.tochild.close()
 
-            def make_non_blocking(fd):
-                fn = fd.fileno()
-                fl = fcntl.fcntl(fn, fcntl.F_GETFL)
+            def make_non_blocking(filedesc):
+                fileno = filedesc.fileno()
+                flock = fcntl.fcntl(fileno, fcntl.F_GETFL)
                 try:
-                    fcntl.fcntl(fn, fcntl.F_SETFL, fl | os.O_NDELAY)
+                    fcntl.fcntl(fileno, fcntl.F_SETFL, flock | os.O_NDELAY)
                 except AttributeError:
-                    fcntl.fcntl(fn, fcntl.F_SETFL, fl | os.FNDELAY)
-                return fd
+                    fcntl.fcntl(fileno, fcntl.F_SETFL, flock | os.FNDELAY)
+                return filedesc
 
             out_file, err_file = [make_non_blocking(fd) for fd
                                   in (pipe.fromchild, pipe.childerr)]
+            in_file = None
+            if self.input:
+                in_file = make_non_blocking(pipe.tochild)
             out_data, err_data = [], []
             out_eof = err_eof = False
             while not out_eof or not err_eof:
                 to_check = [out_file] * (not out_eof) + \
                            [err_file] * (not err_eof)
-                ready = select.select(to_check, [], [], timeout)
-                if not ready[0]:
+                ready = select.select(to_check, in_file and [in_file] or [],
+                                      [], timeout)
+                if not (ready[0] or ready[1]):
                     raise TimeoutError, 'Command %s timed out' % self.executable
+                if in_file in ready[1]:
+                    sent = os.write(in_file.fileno(), self.input)
+                    self.input = self.input[sent:]
+                    if not self.input:
+                        in_file.close()
+                        in_file = None
                 if out_file in ready[0]:
                     data = out_file.read()
                     if data:
@@ -148,12 +136,11 @@
                         err_data.append(data)
                     else:
                         err_eof = True
-
                 out_lines = self._extract_lines(out_data)
                 err_lines = self._extract_lines(err_data)
                 for out_line, err_line in self._combine(out_lines, err_lines):
                     yield out_line, err_line
-                time.sleep(.1)
+                select.select([], [], [], .1)
             self.returncode = pipe.wait()
             log.debug('%s exited with code %s', self.executable,
                       self.returncode)
@@ -218,9 +205,7 @@
             self.exclude += shlex.split(exclude)
 
         for dirpath, dirnames, filenames in os.walk(self.basedir):
-            dirpath = ndirpath = dirpath[len(self.basedir) + 1:]
-            if os.sep != '/':
-                ndirpath = dirpath.replace(os.sep, '/')
+            dirpath = dirpath[len(self.basedir) + 1:]
 
             for filename in filenames:
                 filepath = nfilepath = os.path.join(dirpath, filename)
--- a/bitten/build/shtools.py
+++ b/bitten/build/shtools.py
@@ -86,7 +86,7 @@
         output_file = file(output, 'w')
 
     try:
-        cmdline = CommandLine(executable, args, stdin=input_file,
+        cmdline = CommandLine(executable, args, input=input_file,
                               cwd=ctxt.basedir)
         log_elem = xmlio.Fragment()
         for out, err in cmdline.execute():
--- a/bitten/build/tests/api.py
+++ b/bitten/build/tests/api.py
@@ -9,10 +9,126 @@
 
 import os
 import shutil
+import sys
 import tempfile
 import unittest
 
-from bitten.build import FileSet
+from bitten.build import CommandLine, FileSet
+
+
+class CommandLineTestCase(unittest.TestCase):
+
+    def setUp(self):
+        self.basedir = os.path.realpath(tempfile.mkdtemp(suffix='bitten_test'))
+
+    def tearDown(self):
+        shutil.rmtree(self.basedir)
+
+    def _create_file(self, name, content=None):
+        filename = os.path.join(self.basedir, name)
+        fd = file(filename, 'w')
+        if content:
+            fd.write(content)
+        fd.close()
+        return filename
+
+    def test_single_argument(self):
+        cmdline = CommandLine('python', ['-V'])
+        stdout = []
+        stderr = []
+        for out, err in cmdline.execute(timeout=5.0):
+            if out is not None:
+                stdout.append(out)
+            if err is not None:
+                stderr.append(err)
+        py_version = '.'.join([str(v) for (v) in sys.version_info[:3]])
+        self.assertEqual(['Python %s' % py_version], stderr)
+        self.assertEqual([], stdout)
+        self.assertEqual(0, cmdline.returncode)
+
+    def test_multiple_arguments(self):
+        script_file = self._create_file('test.py', content="""
+import sys
+for arg in sys.argv[1:]:
+    print arg
+""")
+        cmdline = CommandLine('python', [script_file, 'foo', 'bar', 'baz'])
+        stdout = []
+        stderr = []
+        for out, err in cmdline.execute(timeout=5.0):
+            if out is not None:
+                stdout.append(out)
+            if err is not None:
+                stderr.append(err)
+        py_version = '.'.join([str(v) for (v) in sys.version_info[:3]])
+        self.assertEqual([], stderr)
+        self.assertEqual(['foo', 'bar', 'baz'], stdout)
+        self.assertEqual(0, cmdline.returncode)
+
+    def test_output_error_streams(self):
+        script_file = self._create_file('test.py', content="""
+import sys
+print>>sys.stdout, 'Hello'
+print>>sys.stdout, 'world!'
+print>>sys.stderr, 'Oops'
+""")
+        cmdline = CommandLine('python', [script_file])
+        stdout = []
+        stderr = []
+        for out, err in cmdline.execute(timeout=5.0):
+            if out is not None:
+                stdout.append(out)
+            if err is not None:
+                stderr.append(err)
+        py_version = '.'.join([str(v) for (v) in sys.version_info[:3]])
+        self.assertEqual(['Oops'], stderr)
+        self.assertEqual(['Hello', 'world!'], stdout)
+        self.assertEqual(0, cmdline.returncode)
+
+    def test_input_stream_as_fileobj(self):
+        script_file = self._create_file('test.py', content="""
+import sys
+data = sys.stdin.read()
+if data == 'abcd':
+    print>>sys.stdout, 'Thanks'
+""")
+        input_file = self._create_file('input.txt', content='abcd')
+        input_fileobj = file(input_file, 'r')
+        try:
+            cmdline = CommandLine('python', [script_file], input=input_fileobj)
+            stdout = []
+            stderr = []
+            for out, err in cmdline.execute(timeout=5.0):
+                if out is not None:
+                    stdout.append(out)
+                if err is not None:
+                    stderr.append(err)
+            py_version = '.'.join([str(v) for (v) in sys.version_info[:3]])
+            self.assertEqual([], stderr)
+            self.assertEqual(['Thanks'], stdout)
+            self.assertEqual(0, cmdline.returncode)
+        finally:
+            input_fileobj.close()
+
+    def test_input_stream_as_string(self):
+        script_file = self._create_file('test.py', content="""
+import sys
+data = sys.stdin.read()
+if data == 'abcd':
+    print>>sys.stdout, 'Thanks'
+""")
+        cmdline = CommandLine('python', [script_file], input='abcd')
+        stdout = []
+        stderr = []
+        for out, err in cmdline.execute(timeout=5.0):
+            if out is not None:
+                stdout.append(out)
+            if err is not None:
+                stderr.append(err)
+        py_version = '.'.join([str(v) for (v) in sys.version_info[:3]])
+        self.assertEqual([], stderr)
+        self.assertEqual(['Thanks'], stdout)
+        self.assertEqual(0, cmdline.returncode)
 
 
 class FileSetTestCase(unittest.TestCase):
@@ -74,6 +190,7 @@
 
 def suite():
     suite = unittest.TestSuite()
+    suite.addTest(unittest.makeSuite(CommandLineTestCase, 'test'))
     suite.addTest(unittest.makeSuite(FileSetTestCase, 'test'))
     return suite
 
Copyright (C) 2012-2017 Edgewall Software