Mercurial > bitten > bitten-test
changeset 210:c550e31c06d2
Implement providing input data to processes executed via the `CommandLine` class. The `<sh:pipe>` recipe command should now be functional. Closes #34.
Also, add unit tests for the `CommandLine` class.
author | cmlenz |
---|---|
date | Tue, 20 Sep 2005 12:02:38 +0000 |
parents | d2b9c72e9643 |
children | d1ead1bfcc65 |
files | bitten/build/api.py bitten/build/shtools.py bitten/build/tests/api.py |
diffstat | 3 files changed, 148 insertions(+), 46 deletions(-) [+] |
line wrap: on
line diff
--- a/bitten/build/api.py +++ b/bitten/build/api.py @@ -7,13 +7,10 @@ # you should have received as part of this distribution. The terms # are also available at http://bitten.cmlenz.net/wiki/License. -from itertools import ifilterfalse import logging import fnmatch import os import shlex -import shutil -import time log = logging.getLogger('bitten.build.api') @@ -30,42 +27,24 @@ """Simple helper for executing subprocesses.""" # TODO: Use 'subprocess' module if available (Python >= 2.4) - def __init__(self, executable, args, stdin=None, cwd=None): + def __init__(self, executable, args, input=None, cwd=None): """Initialize the CommandLine object. @param executable The name of the program to execute @param args A list of arguments to pass to the executable + @param input String or file-like object containing any input data for + the program @param cwd The working directory to change to before executing the command """ self.executable = executable self.arguments = [str(arg) for arg in args] - self.stdin = stdin + self.input = input self.cwd = cwd if self.cwd: assert os.path.isdir(self.cwd) self.returncode = None - # TODO: On windows, map file name extension to application - if os.name == 'nt': - pass - - # Shebang support for Posix systems - if os.path.isfile(self.executable): - executable_file = file(self.executable, 'r') - try: - for line in executable_file: - if line.startswith('#!'): - parts = shlex.split(line[2:]) - if len(parts) > 1: - self.arguments[:0] = parts[1:] + [self.executable] - else: - self.arguments[:0] = [self.executable] - self.executable = parts[0] - break - finally: - executable_file.close() - if os.name == 'nt': # windows def execute(self, timeout=None): @@ -110,32 +89,41 @@ log.debug('Executing %s', [self.executable] + self.arguments) pipe = popen2.Popen3([self.executable] + self.arguments, capturestderr=True) - if self.stdin: - if isinstance(self.stdin, basestring): - pipe.tochild.write(self.stdin) - else: - shutil.copyfileobj(self.stdin, pipe.tochild) - pipe.tochild.close() + if self.input: + if not isinstance(self.input, basestring): + self.input = self.input.read() + else: + pipe.tochild.close() - def make_non_blocking(fd): - fn = fd.fileno() - fl = fcntl.fcntl(fn, fcntl.F_GETFL) + def make_non_blocking(filedesc): + fileno = filedesc.fileno() + flock = fcntl.fcntl(fileno, fcntl.F_GETFL) try: - fcntl.fcntl(fn, fcntl.F_SETFL, fl | os.O_NDELAY) + fcntl.fcntl(fileno, fcntl.F_SETFL, flock | os.O_NDELAY) except AttributeError: - fcntl.fcntl(fn, fcntl.F_SETFL, fl | os.FNDELAY) - return fd + fcntl.fcntl(fileno, fcntl.F_SETFL, flock | os.FNDELAY) + return filedesc out_file, err_file = [make_non_blocking(fd) for fd in (pipe.fromchild, pipe.childerr)] + in_file = None + if self.input: + in_file = make_non_blocking(pipe.tochild) out_data, err_data = [], [] out_eof = err_eof = False while not out_eof or not err_eof: to_check = [out_file] * (not out_eof) + \ [err_file] * (not err_eof) - ready = select.select(to_check, [], [], timeout) - if not ready[0]: + ready = select.select(to_check, in_file and [in_file] or [], + [], timeout) + if not (ready[0] or ready[1]): raise TimeoutError, 'Command %s timed out' % self.executable + if in_file in ready[1]: + sent = os.write(in_file.fileno(), self.input) + self.input = self.input[sent:] + if not self.input: + in_file.close() + in_file = None if out_file in ready[0]: data = out_file.read() if data: @@ -148,12 +136,11 @@ err_data.append(data) else: err_eof = True - out_lines = self._extract_lines(out_data) err_lines = self._extract_lines(err_data) for out_line, err_line in self._combine(out_lines, err_lines): yield out_line, err_line - time.sleep(.1) + select.select([], [], [], .1) self.returncode = pipe.wait() log.debug('%s exited with code %s', self.executable, self.returncode) @@ -218,9 +205,7 @@ self.exclude += shlex.split(exclude) for dirpath, dirnames, filenames in os.walk(self.basedir): - dirpath = ndirpath = dirpath[len(self.basedir) + 1:] - if os.sep != '/': - ndirpath = dirpath.replace(os.sep, '/') + dirpath = dirpath[len(self.basedir) + 1:] for filename in filenames: filepath = nfilepath = os.path.join(dirpath, filename)
--- a/bitten/build/shtools.py +++ b/bitten/build/shtools.py @@ -86,7 +86,7 @@ output_file = file(output, 'w') try: - cmdline = CommandLine(executable, args, stdin=input_file, + cmdline = CommandLine(executable, args, input=input_file, cwd=ctxt.basedir) log_elem = xmlio.Fragment() for out, err in cmdline.execute():
--- a/bitten/build/tests/api.py +++ b/bitten/build/tests/api.py @@ -9,10 +9,126 @@ import os import shutil +import sys import tempfile import unittest -from bitten.build import FileSet +from bitten.build import CommandLine, FileSet + + +class CommandLineTestCase(unittest.TestCase): + + def setUp(self): + self.basedir = os.path.realpath(tempfile.mkdtemp(suffix='bitten_test')) + + def tearDown(self): + shutil.rmtree(self.basedir) + + def _create_file(self, name, content=None): + filename = os.path.join(self.basedir, name) + fd = file(filename, 'w') + if content: + fd.write(content) + fd.close() + return filename + + def test_single_argument(self): + cmdline = CommandLine('python', ['-V']) + stdout = [] + stderr = [] + for out, err in cmdline.execute(timeout=5.0): + if out is not None: + stdout.append(out) + if err is not None: + stderr.append(err) + py_version = '.'.join([str(v) for (v) in sys.version_info[:3]]) + self.assertEqual(['Python %s' % py_version], stderr) + self.assertEqual([], stdout) + self.assertEqual(0, cmdline.returncode) + + def test_multiple_arguments(self): + script_file = self._create_file('test.py', content=""" +import sys +for arg in sys.argv[1:]: + print arg +""") + cmdline = CommandLine('python', [script_file, 'foo', 'bar', 'baz']) + stdout = [] + stderr = [] + for out, err in cmdline.execute(timeout=5.0): + if out is not None: + stdout.append(out) + if err is not None: + stderr.append(err) + py_version = '.'.join([str(v) for (v) in sys.version_info[:3]]) + self.assertEqual([], stderr) + self.assertEqual(['foo', 'bar', 'baz'], stdout) + self.assertEqual(0, cmdline.returncode) + + def test_output_error_streams(self): + script_file = self._create_file('test.py', content=""" +import sys +print>>sys.stdout, 'Hello' +print>>sys.stdout, 'world!' +print>>sys.stderr, 'Oops' +""") + cmdline = CommandLine('python', [script_file]) + stdout = [] + stderr = [] + for out, err in cmdline.execute(timeout=5.0): + if out is not None: + stdout.append(out) + if err is not None: + stderr.append(err) + py_version = '.'.join([str(v) for (v) in sys.version_info[:3]]) + self.assertEqual(['Oops'], stderr) + self.assertEqual(['Hello', 'world!'], stdout) + self.assertEqual(0, cmdline.returncode) + + def test_input_stream_as_fileobj(self): + script_file = self._create_file('test.py', content=""" +import sys +data = sys.stdin.read() +if data == 'abcd': + print>>sys.stdout, 'Thanks' +""") + input_file = self._create_file('input.txt', content='abcd') + input_fileobj = file(input_file, 'r') + try: + cmdline = CommandLine('python', [script_file], input=input_fileobj) + stdout = [] + stderr = [] + for out, err in cmdline.execute(timeout=5.0): + if out is not None: + stdout.append(out) + if err is not None: + stderr.append(err) + py_version = '.'.join([str(v) for (v) in sys.version_info[:3]]) + self.assertEqual([], stderr) + self.assertEqual(['Thanks'], stdout) + self.assertEqual(0, cmdline.returncode) + finally: + input_fileobj.close() + + def test_input_stream_as_string(self): + script_file = self._create_file('test.py', content=""" +import sys +data = sys.stdin.read() +if data == 'abcd': + print>>sys.stdout, 'Thanks' +""") + cmdline = CommandLine('python', [script_file], input='abcd') + stdout = [] + stderr = [] + for out, err in cmdline.execute(timeout=5.0): + if out is not None: + stdout.append(out) + if err is not None: + stderr.append(err) + py_version = '.'.join([str(v) for (v) in sys.version_info[:3]]) + self.assertEqual([], stderr) + self.assertEqual(['Thanks'], stdout) + self.assertEqual(0, cmdline.returncode) class FileSetTestCase(unittest.TestCase): @@ -74,6 +190,7 @@ def suite(): suite = unittest.TestSuite() + suite.addTest(unittest.makeSuite(CommandLineTestCase, 'test')) suite.addTest(unittest.makeSuite(FileSetTestCase, 'test')) return suite