view bitten/util/cmdline.py @ 103:f1baf05a49dd

* Strip extra line separators from recipe command output on windows. Closes #25. * Fix logging in the `build.py` script, and provide command-line options to configure the logging
author cmlenz
date Wed, 20 Jul 2005 13:23:05 +0000
parents b289e572bc7e
children de5b6e69fc7e
line wrap: on
line source
# -*- coding: iso8859-1 -*-
#
# Copyright (C) 2005 Christopher Lenz <cmlenz@gmx.de>
#
# Bitten is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License as
# published by the Free Software Foundation; either version 2 of the
# License, or (at your option) any later version.
#
# Trac is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
#
# Author: Christopher Lenz <cmlenz@gmx.de>

import logging
import os
import os.path
import time

log = logging.getLogger('bitten.cmdline')


class TimeoutError(Exception):
    """Exception raised when the execution of a command times out."""


class Commandline(object):
    """Simple helper for executing subprocesses."""
    # TODO: Use 'subprocess' module if available (Python >= 2.4)

    def __init__(self, executable, args, cwd=None):
        """Initialize the Commandline object.
        
        @param executable The name of the program to execute
        @param args A list of arguments to pass to the executable
        @param cwd The working directory to change to before executing the
                   command
        """
        self.executable = executable
        self.arguments = [str(arg) for arg in args]
        self.cwd = cwd
        if self.cwd:
            assert os.path.isdir(self.cwd)
        self.returncode = None

    if os.name == 'nt': # windows

        def execute(self, timeout=None):
            args = [self.executable] + self.arguments
            for idx, arg in enumerate(args):
                if arg.find(' ') >= 0:
                    args[idx] = '"%s"' % arg
            log.debug('Executing %s', args)

            if self.cwd:
                os.chdir(self.cwd)

            import tempfile
            out_name = tempfile.mktemp()
            err_name = tempfile.mktemp()
            cmd = "( %s ) > %s 2> %s" % (' '.join(args), out_name, err_name)
            self.returncode = os.system(cmd) >> 8

            out_file = file(out_name, 'r')
            err_file = file(err_name, 'r')
            out_lines = out_file.readlines()
            err_lines = err_file.readlines()
            out_file.close()
            err_file.close()
            for out_line, err_line in self._combine(out_lines, err_lines):
                yield out_line and out_line.rstrip(), \
                      err_line and err_line.rstrip()

    else: # posix

        def execute(self, timeout=None):
            import fcntl, popen2, select
            if self.cwd:
                os.chdir(self.cwd)

            log.debug('Executing %s', [self.executable] + self.arguments)
            pipe = popen2.Popen3([self.executable] + self.arguments,
                                 capturestderr=True)
            pipe.tochild.close()

            def make_non_blocking(fd):
                fl = fcntl.fcntl(fd, fcntl.F_GETFL)
                try:
                    fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NDELAY)
                except AttributeError:
                    fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.FNDELAY)

            out_file, err_file = pipe.fromchild, pipe.childerr
            map(make_non_blocking, [out_file.fileno(), err_file.fileno()])
            out_data, err_data = [], []
            out_eof = err_eof = False
            while not out_eof or not err_eof:
                to_check = [out_file] * (not out_eof) + [err_file] * (not err_eof)
                ready = select.select(to_check, [], [], timeout)
                if not ready[0]:
                    raise TimeoutError, 'Command %s timed out' % self.executable
                if out_file in ready[0]:
                    data = out_file.read()
                    if data:
                        out_data.append(data)
                    else:
                        out_eof = True
                if err_file in ready[0]:
                    data = err_file.read()
                    if data:
                        err_data.append(data)
                    else:
                        err_eof = True

                out_lines = self._extract_lines(out_data)
                err_lines = self._extract_lines(err_data)
                for out_line, err_line in self._combine(out_lines, err_lines):
                    yield out_line, err_line
                time.sleep(.1)
            self.returncode = pipe.wait()

    def _combine(self, *iterables):
        iterables = [iter(iterable) for iterable in iterables]
        size = len(iterables)
        while [iterable for iterable in iterables if iterable is not None]:
            to_yield = [None] * size
            for idx, iterable in enumerate(iterables):
                if iterable is None:
                    continue
                try:
                    to_yield[idx] = iterable.next()
                except StopIteration:
                    iterables[idx] = None
            yield tuple(to_yield)

    def _extract_lines(self, data):
        extracted = []
        def _endswith_linesep(string):
            for linesep in ('\n', '\r\n', '\r'):
                if string.endswith(linesep):
                    return True
        buf = ''.join(data)
        lines = buf.splitlines(True)
        if len(lines) > 1:
            extracted += lines[:-1]
            if _endswith_linesep(lines[-1]):
                extracted.append(lines[-1])
                buf = ''
            else:
                buf = lines[-1]
        elif _endswith_linesep(buf):
            extracted.append(buf)
            buf = ''
        data[:] = [buf]

        return [line.rstrip() for line in extracted]
Copyright (C) 2012-2017 Edgewall Software