Mercurial > bitten > bitten-test
changeset 212:62b668fc713d
Removed the use of the `fcntl` module to make the file IO non-blocking in the `CommandLine` class. Instead, use the functions `os.read()` and `os.write()`, which should only block when no data is available -- and that cannot happen because of the preceding `select()`.
Also, add a unit test to verify that the sub-process is interrupted when a timeout occurs. This currently fails on Windows.
author | cmlenz |
---|---|
date | Tue, 20 Sep 2005 17:19:31 +0000 |
parents | d1ead1bfcc65 |
children | 25f84dd9f159 |
files | bitten/build/api.py bitten/build/tests/api.py |
diffstat | 2 files changed, 38 insertions(+), 37 deletions(-) [+] |
line wrap: on
line diff
--- a/bitten/build/api.py +++ b/bitten/build/api.py @@ -11,6 +11,7 @@ import fnmatch import os import shlex +import time log = logging.getLogger('bitten.build.api') @@ -98,18 +99,17 @@ os.unlink(out_name) if err_name: os.unlink(err_name) + if self.cwd: + os.chdir(old_cwd) for out_line, err_line in self._combine(out_lines, err_lines): yield out_line and out_line.rstrip(), \ err_line and err_line.rstrip() - if self.cwd: - os.chdir(old_cwd) - else: # posix def execute(self, timeout=None): - import fcntl, popen2, select + import popen2, select if self.cwd: old_cwd = os.getcwd() os.chdir(self.cwd) @@ -118,48 +118,39 @@ pipe = popen2.Popen3([self.executable] + self.arguments, capturestderr=True) if self.input: - if not isinstance(self.input, basestring): - self.input = self.input.read() + if isinstance(self.input, basestring): + in_data = self.input + else: + in_data = self.input.read() else: pipe.tochild.close() - - def make_non_blocking(filedesc): - fileno = filedesc.fileno() - flock = fcntl.fcntl(fileno, fcntl.F_GETFL) - try: - fcntl.fcntl(fileno, fcntl.F_SETFL, flock | os.O_NDELAY) - except AttributeError: - fcntl.fcntl(fileno, fcntl.F_SETFL, flock | os.FNDELAY) - return filedesc + in_data = '' - out_file, err_file = [make_non_blocking(fd) for fd - in (pipe.fromchild, pipe.childerr)] - in_file = None - if self.input: - in_file = make_non_blocking(pipe.tochild) out_data, err_data = [], [] - out_eof = err_eof = False + in_eof = out_eof = err_eof = False + if not in_data: + in_eof = True while not out_eof or not err_eof: - to_check = [out_file] * (not out_eof) + \ - [err_file] * (not err_eof) - ready = select.select(to_check, in_file and [in_file] or [], - [], timeout) + readable = [pipe.fromchild] * (not out_eof) + \ + [pipe.childerr] * (not err_eof) + writable = [pipe.tochild] * (not in_eof) + ready = select.select(readable, writable, [], timeout) if not (ready[0] or ready[1]): raise TimeoutError, 'Command %s timed out' % self.executable - if in_file in ready[1]: - sent = os.write(in_file.fileno(), self.input) - self.input = self.input[sent:] - if not self.input: - in_file.close() - in_file = None - if out_file in ready[0]: - data = out_file.read() + if pipe.tochild in ready[1]: + sent = os.write(pipe.tochild.fileno(), in_data) + in_data = in_data[sent:] + if not in_data: + pipe.tochild.close() + in_eof = True + if pipe.fromchild in ready[0]: + data = os.read(pipe.fromchild.fileno(), 1024) if data: out_data.append(data) else: out_eof = True - if err_file in ready[0]: - data = err_file.read() + if pipe.childerr in ready[0]: + data = os.read(pipe.childerr.fileno(), 1024) if data: err_data.append(data) else: @@ -168,7 +159,7 @@ err_lines = self._extract_lines(err_data) for out_line, err_line in self._combine(out_lines, err_lines): yield out_line, err_line - select.select([], [], [], .1) + time.sleep(.1) self.returncode = pipe.wait() log.debug('%s exited with code %s', self.executable, self.returncode)
--- a/bitten/build/tests/api.py +++ b/bitten/build/tests/api.py @@ -13,7 +13,7 @@ import tempfile import unittest -from bitten.build import CommandLine, FileSet +from bitten.build import CommandLine, FileSet, TimeoutError class CommandLineTestCase(unittest.TestCase): @@ -130,6 +130,16 @@ self.assertEqual(['Thanks'], stdout) self.assertEqual(0, cmdline.returncode) + def test_timeout(self): + script_file = self._create_file('test.py', content=""" +import time +time.sleep(2.0) +print 'Done' +""") + cmdline = CommandLine('python', [script_file]) + iterable = iter(cmdline.execute(timeout=.5)) + self.assertRaises(TimeoutError, iterable.next) + class FileSetTestCase(unittest.TestCase):