Mercurial > bitten > bitten-test
view bitten/util/cmdline.py @ 103:f1baf05a49dd
* Strip extra line separators from recipe command output on windows. Closes #25.
* Fix logging in the `build.py` script, and provide command-line options to configure the logging
author | cmlenz |
---|---|
date | Wed, 20 Jul 2005 13:23:05 +0000 |
parents | b289e572bc7e |
children | de5b6e69fc7e |
line wrap: on
line source
# -*- coding: iso8859-1 -*- # # Copyright (C) 2005 Christopher Lenz <cmlenz@gmx.de> # # Bitten is free software; you can redistribute it and/or # modify it under the terms of the GNU General Public License as # published by the Free Software Foundation; either version 2 of the # License, or (at your option) any later version. # # Trac is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU # General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software # Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. # # Author: Christopher Lenz <cmlenz@gmx.de> import logging import os import os.path import time log = logging.getLogger('bitten.cmdline') class TimeoutError(Exception): """Exception raised when the execution of a command times out.""" class Commandline(object): """Simple helper for executing subprocesses.""" # TODO: Use 'subprocess' module if available (Python >= 2.4) def __init__(self, executable, args, cwd=None): """Initialize the Commandline object. @param executable The name of the program to execute @param args A list of arguments to pass to the executable @param cwd The working directory to change to before executing the command """ self.executable = executable self.arguments = [str(arg) for arg in args] self.cwd = cwd if self.cwd: assert os.path.isdir(self.cwd) self.returncode = None if os.name == 'nt': # windows def execute(self, timeout=None): args = [self.executable] + self.arguments for idx, arg in enumerate(args): if arg.find(' ') >= 0: args[idx] = '"%s"' % arg log.debug('Executing %s', args) if self.cwd: os.chdir(self.cwd) import tempfile out_name = tempfile.mktemp() err_name = tempfile.mktemp() cmd = "( %s ) > %s 2> %s" % (' '.join(args), out_name, err_name) self.returncode = os.system(cmd) >> 8 out_file = file(out_name, 'r') err_file = file(err_name, 'r') out_lines = out_file.readlines() err_lines = err_file.readlines() out_file.close() err_file.close() for out_line, err_line in self._combine(out_lines, err_lines): yield out_line and out_line.rstrip(), \ err_line and err_line.rstrip() else: # posix def execute(self, timeout=None): import fcntl, popen2, select if self.cwd: os.chdir(self.cwd) log.debug('Executing %s', [self.executable] + self.arguments) pipe = popen2.Popen3([self.executable] + self.arguments, capturestderr=True) pipe.tochild.close() def make_non_blocking(fd): fl = fcntl.fcntl(fd, fcntl.F_GETFL) try: fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NDELAY) except AttributeError: fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.FNDELAY) out_file, err_file = pipe.fromchild, pipe.childerr map(make_non_blocking, [out_file.fileno(), err_file.fileno()]) out_data, err_data = [], [] out_eof = err_eof = False while not out_eof or not err_eof: to_check = [out_file] * (not out_eof) + [err_file] * (not err_eof) ready = select.select(to_check, [], [], timeout) if not ready[0]: raise TimeoutError, 'Command %s timed out' % self.executable if out_file in ready[0]: data = out_file.read() if data: out_data.append(data) else: out_eof = True if err_file in ready[0]: data = err_file.read() if data: err_data.append(data) else: err_eof = True out_lines = self._extract_lines(out_data) err_lines = self._extract_lines(err_data) for out_line, err_line in self._combine(out_lines, err_lines): yield out_line, err_line time.sleep(.1) self.returncode = pipe.wait() def _combine(self, *iterables): iterables = [iter(iterable) for iterable in iterables] size = len(iterables) while [iterable for iterable in iterables if iterable is not None]: to_yield = [None] * size for idx, iterable in enumerate(iterables): if iterable is None: continue try: to_yield[idx] = iterable.next() except StopIteration: iterables[idx] = None yield tuple(to_yield) def _extract_lines(self, data): extracted = [] def _endswith_linesep(string): for linesep in ('\n', '\r\n', '\r'): if string.endswith(linesep): return True buf = ''.join(data) lines = buf.splitlines(True) if len(lines) > 1: extracted += lines[:-1] if _endswith_linesep(lines[-1]): extracted.append(lines[-1]) buf = '' else: buf = lines[-1] elif _endswith_linesep(buf): extracted.append(buf) buf = '' data[:] = [buf] return [line.rstrip() for line in extracted]