123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262 |
- import os
- import subprocess
- import shlex
- import signal
- from pexpect.popen_spawn import PopenSpawn
- # Include `unicode` in STR_TYPES for Python 2.X
- try:
- STR_TYPES = (str, unicode)
- except NameError:
- STR_TYPES = (str, )
- TIMEOUT = 30
- class Command(object):
- def __init__(self, cmd, timeout=TIMEOUT):
- super(Command, self).__init__()
- self.cmd = cmd
- self.timeout = timeout
- self.subprocess = None
- self.blocking = None
- self.was_run = False
- self.__out = None
- self.__err = None
- def __repr__(self):
- return '<Command {!r}>'.format(self.cmd)
- @property
- def _popen_args(self):
- return self.cmd
- @property
- def _default_popen_kwargs(self):
- return {
- 'env': os.environ.copy(),
- 'stdin': subprocess.PIPE,
- 'stdout': subprocess.PIPE,
- 'stderr': subprocess.PIPE,
- 'shell': True,
- 'universal_newlines': True,
- 'bufsize': 0,
- }
- @property
- def _default_pexpect_kwargs(self):
- return {
- 'env': os.environ.copy(),
- 'encoding': 'utf-8',
- 'timeout': self.timeout
- }
- @property
- def _uses_subprocess(self):
- return isinstance(self.subprocess, subprocess.Popen)
- @property
- def _uses_pexpect(self):
- return isinstance(self.subprocess, PopenSpawn)
- @property
- def std_out(self):
- return self.subprocess.stdout
- @property
- def _pexpect_out(self):
- if self.subprocess.encoding:
- result = ''
- else:
- result = b''
- if self.subprocess.before:
- result += self.subprocess.before
- if self.subprocess.after:
- result += self.subprocess.after
- result += self.subprocess.read()
- return result
- @property
- def out(self):
- """Std/out output (cached)"""
- if self.__out is not None:
- return self.__out
- if self._uses_subprocess:
- self.__out = self.std_out.read()
- else:
- self.__out = self._pexpect_out
- return self.__out
- @property
- def std_err(self):
- return self.subprocess.stderr
- @property
- def err(self):
- """Std/err output (cached)"""
- if self.__err is not None:
- return self.__err
- if self._uses_subprocess:
- self.__err = self.std_err.read()
- return self.__err
- else:
- return self._pexpect_out
- @property
- def pid(self):
- """The process' PID."""
- # Support for pexpect's functionality.
- if hasattr(self.subprocess, 'proc'):
- return self.subprocess.proc.pid
- # Standard subprocess method.
- return self.subprocess.pid
- @property
- def return_code(self):
- # Support for pexpect's functionality.
- if self._uses_pexpect:
- return self.subprocess.exitstatus
- # Standard subprocess method.
- return self.subprocess.returncode
- @property
- def std_in(self):
- return self.subprocess.stdin
- def run(self, block=True, binary=False):
- """Runs the given command, with or without pexpect functionality enabled."""
- self.blocking = block
- # Use subprocess.
- if self.blocking:
- popen_kwargs = self._default_popen_kwargs.copy()
- popen_kwargs['universal_newlines'] = not binary
- s = subprocess.Popen(self._popen_args, **popen_kwargs)
- # Otherwise, use pexpect.
- else:
- pexpect_kwargs = self._default_pexpect_kwargs.copy()
- if binary:
- pexpect_kwargs['encoding'] = None
- # Enable Python subprocesses to work with expect functionality.
- pexpect_kwargs['env']['PYTHONUNBUFFERED'] = '1'
- s = PopenSpawn(self._popen_args, **pexpect_kwargs)
- self.subprocess = s
- self.was_run = True
- def expect(self, pattern, timeout=-1):
- """Waits on the given pattern to appear in std_out"""
- if self.blocking:
- raise RuntimeError('expect can only be used on non-blocking commands.')
- self.subprocess.expect(pattern=pattern, timeout=timeout)
- def send(self, s, end=os.linesep, signal=False):
- """Sends the given string or signal to std_in."""
- if self.blocking:
- raise RuntimeError('send can only be used on non-blocking commands.')
- if not signal:
- if self._uses_subprocess:
- return self.subprocess.communicate(s + end)
- else:
- return self.subprocess.send(s + end)
- else:
- self.subprocess.send_signal(s)
- def terminate(self):
- self.subprocess.terminate()
- def kill(self):
- self.subprocess.kill(signal.SIGINT)
- def block(self):
- """Blocks until process is complete."""
- if self._uses_subprocess:
- # consume stdout and stderr
- stdout, stderr = self.subprocess.communicate()
- self.__out = stdout
- self.__err = stderr
- else:
- self.subprocess.wait()
- def pipe(self, command, timeout=None):
- """Runs the current command and passes its output to the next
- given process.
- """
- if not timeout:
- timeout = self.timeout
- if not self.was_run:
- self.run(block=False)
- data = self.out
- if timeout:
- c = Command(command, timeout)
- else:
- c = Command(command)
- c.run(block=False)
- if data:
- c.send(data)
- c.subprocess.sendeof()
- c.block()
- return c
- def _expand_args(command):
- """Parses command strings and returns a Popen-ready list."""
- # Prepare arguments.
- if isinstance(command, STR_TYPES):
- splitter = shlex.shlex(command.encode('utf-8'))
- splitter.whitespace = '|'
- splitter.whitespace_split = True
- command = []
- while True:
- token = splitter.get_token()
- if token:
- command.append(token)
- else:
- break
- command = list(map(shlex.split, command))
- return command
- def chain(command, timeout=TIMEOUT):
- commands = _expand_args(command)
- data = None
- for command in commands:
- c = run(command, block=False, timeout=timeout)
- if data:
- c.send(data)
- c.subprocess.sendeof()
- data = c.out
- return c
- def run(command, block=True, binary=False, timeout=TIMEOUT):
- c = Command(command, timeout=timeout)
- c.run(block=block, binary=binary)
- if block:
- c.block()
- return c
|