Mercurial > bitten > bitten-test
view bitten/util/cmdline.py @ 163:634be6cbb808
Flip the switch: Bitten is now BSD-licensed.
author | cmlenz |
---|---|
date | Sat, 27 Aug 2005 07:58:12 +0000 |
parents | ac9318a6e936 |
children |
line wrap: on
line source
# -*- coding: iso8859-1 -*- # # Copyright (C) 2005 Christopher Lenz <cmlenz@gmx.de> # All rights reserved. # # This software is licensed as described in the file COPYING, which # you should have received as part of this distribution. The terms # are also available at http://bitten.cmlenz.net/wiki/License. import logging import os import shlex import shutil import time log = logging.getLogger('bitten.cmdline') class TimeoutError(Exception): """Exception raised when the execution of a command times out.""" class Commandline(object): """Simple helper for executing subprocesses.""" # TODO: Use 'subprocess' module if available (Python >= 2.4) def __init__(self, executable, args, stdin=None, cwd=None): """Initialize the Commandline object. @param executable The name of the program to execute @param args A list of arguments to pass to the executable @param cwd The working directory to change to before executing the command """ self.executable = executable self.arguments = [str(arg) for arg in args] self.stdin = stdin self.cwd = cwd if self.cwd: assert os.path.isdir(self.cwd) self.returncode = None # TODO: On windows, map file name extension to application if os.name == 'nt': pass # Shebang support for Posix systems if os.path.isfile(self.executable): executable_file = file(self.executable, 'r') try: for line in executable_file: if line.startswith('#!'): parts = shlex.split(line[2:]) if len(parts) > 1: self.arguments[:0] = parts[1:] + [self.executable] else: self.arguments[:0] = [self.executable] self.executable = parts[0] break finally: executable_file.close() if os.name == 'nt': # windows def execute(self, timeout=None): args = [self.executable] + self.arguments for idx, arg in enumerate(args): if arg.find(' ') >= 0: args[idx] = '"%s"' % arg log.debug('Executing %s', args) if self.cwd: old_cwd = os.getcwd() os.chdir(self.cwd) import tempfile out_name = tempfile.mktemp() err_name = tempfile.mktemp() cmd = "( %s ) > %s 2> %s" % (' '.join(args), out_name, err_name) self.returncode = os.system(cmd) log.debug('Exited with code %s', self.returncode) out_file = file(out_name, 'r') err_file = file(err_name, 'r') out_lines = out_file.readlines() err_lines = err_file.readlines() out_file.close() err_file.close() for out_line, err_line in self._combine(out_lines, err_lines): yield out_line and out_line.rstrip(), \ err_line and err_line.rstrip() if self.cwd: os.chdir(old_cwd) else: # posix def execute(self, timeout=None): import fcntl, popen2, select if self.cwd: old_cwd = os.getcwd() os.chdir(self.cwd) log.debug('Executing %s', [self.executable] + self.arguments) pipe = popen2.Popen3([self.executable] + self.arguments, capturestderr=True) if self.stdin: if isinstance(self.stdin, basestring): pipe.tochild.write(self.stdin) else: shutil.copyfileobj(self.stdin, pipe.tochild) pipe.tochild.close() def make_non_blocking(fd): fn = fd.fileno() fl = fcntl.fcntl(fn, fcntl.F_GETFL) try: fcntl.fcntl(fn, fcntl.F_SETFL, fl | os.O_NDELAY) except AttributeError: fcntl.fcntl(fn, fcntl.F_SETFL, fl | os.FNDELAY) return fd out_file, err_file = [make_non_blocking(fd) for fd in (pipe.fromchild, pipe.childerr)] out_data, err_data = [], [] out_eof = err_eof = False while not out_eof or not err_eof: to_check = [out_file] * (not out_eof) + \ [err_file] * (not err_eof) ready = select.select(to_check, [], [], timeout) if not ready[0]: raise TimeoutError, 'Command %s timed out' % self.executable if out_file in ready[0]: data = out_file.read() if data: out_data.append(data) else: out_eof = True if err_file in ready[0]: data = err_file.read() if data: err_data.append(data) else: err_eof = True out_lines = self._extract_lines(out_data) err_lines = self._extract_lines(err_data) for out_line, err_line in self._combine(out_lines, err_lines): yield out_line, err_line time.sleep(.1) self.returncode = pipe.wait() log.debug('%s exited with code %s', self.executable, self.returncode) if self.cwd: os.chdir(old_cwd) def _combine(self, *iterables): iterables = [iter(iterable) for iterable in iterables] size = len(iterables) while [iterable for iterable in iterables if iterable is not None]: to_yield = [None] * size for idx, iterable in enumerate(iterables): if iterable is None: continue try: to_yield[idx] = iterable.next() except StopIteration: iterables[idx] = None yield tuple(to_yield) def _extract_lines(self, data): extracted = [] def _endswith_linesep(string): for linesep in ('\n', '\r\n', '\r'): if string.endswith(linesep): return True buf = ''.join(data) lines = buf.splitlines(True) if len(lines) > 1: extracted += lines[:-1] if _endswith_linesep(lines[-1]): extracted.append(lines[-1]) buf = '' else: buf = lines[-1] elif _endswith_linesep(buf): extracted.append(buf) buf = '' data[:] = [buf] return [line.rstrip() for line in extracted]