123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806 |
- import os
- import sys
- import time
- import pty
- import tty
- import errno
- import signal
- from contextlib import contextmanager
- import ptyprocess
- from ptyprocess.ptyprocess import use_native_pty_fork
- from .exceptions import ExceptionPexpect, EOF, TIMEOUT
- from .spawnbase import SpawnBase
- from .utils import which, split_command_line, select_ignore_interrupts
- @contextmanager
- def _wrap_ptyprocess_err():
- """Turn ptyprocess errors into our own ExceptionPexpect errors"""
- try:
- yield
- except ptyprocess.PtyProcessError as e:
- raise ExceptionPexpect(*e.args)
- PY3 = (sys.version_info[0] >= 3)
- class spawn(SpawnBase):
- '''This is the main class interface for Pexpect. Use this class to start
- and control child applications. '''
- # This is purely informational now - changing it has no effect
- use_native_pty_fork = use_native_pty_fork
- def __init__(self, command, args=[], timeout=30, maxread=2000,
- searchwindowsize=None, logfile=None, cwd=None, env=None,
- ignore_sighup=False, echo=True, preexec_fn=None,
- encoding=None, codec_errors='strict', dimensions=None):
- '''This is the constructor. The command parameter may be a string that
- includes a command and any arguments to the command. For example::
- child = pexpect.spawn('/usr/bin/ftp')
- child = pexpect.spawn('/usr/bin/ssh user@example.com')
- child = pexpect.spawn('ls -latr /tmp')
- You may also construct it with a list of arguments like so::
- child = pexpect.spawn('/usr/bin/ftp', [])
- child = pexpect.spawn('/usr/bin/ssh', ['user@example.com'])
- child = pexpect.spawn('ls', ['-latr', '/tmp'])
- After this the child application will be created and will be ready to
- talk to. For normal use, see expect() and send() and sendline().
- Remember that Pexpect does NOT interpret shell meta characters such as
- redirect, pipe, or wild cards (``>``, ``|``, or ``*``). This is a
- common mistake. If you want to run a command and pipe it through
- another command then you must also start a shell. For example::
- child = pexpect.spawn('/bin/bash -c "ls -l | grep LOG > logs.txt"')
- child.expect(pexpect.EOF)
- The second form of spawn (where you pass a list of arguments) is useful
- in situations where you wish to spawn a command and pass it its own
- argument list. This can make syntax more clear. For example, the
- following is equivalent to the previous example::
- shell_cmd = 'ls -l | grep LOG > logs.txt'
- child = pexpect.spawn('/bin/bash', ['-c', shell_cmd])
- child.expect(pexpect.EOF)
- The maxread attribute sets the read buffer size. This is maximum number
- of bytes that Pexpect will try to read from a TTY at one time. Setting
- the maxread size to 1 will turn off buffering. Setting the maxread
- value higher may help performance in cases where large amounts of
- output are read back from the child. This feature is useful in
- conjunction with searchwindowsize.
- When the keyword argument *searchwindowsize* is None (default), the
- full buffer is searched at each iteration of receiving incoming data.
- The default number of bytes scanned at each iteration is very large
- and may be reduced to collaterally reduce search cost. After
- :meth:`~.expect` returns, the full buffer attribute remains up to
- size *maxread* irrespective of *searchwindowsize* value.
- When the keyword argument ``timeout`` is specified as a number,
- (default: *30*), then :class:`TIMEOUT` will be raised after the value
- specified has elapsed, in seconds, for any of the :meth:`~.expect`
- family of method calls. When None, TIMEOUT will not be raised, and
- :meth:`~.expect` may block indefinitely until match.
- The logfile member turns on or off logging. All input and output will
- be copied to the given file object. Set logfile to None to stop
- logging. This is the default. Set logfile to sys.stdout to echo
- everything to standard output. The logfile is flushed after each write.
- Example log input and output to a file::
- child = pexpect.spawn('some_command')
- fout = open('mylog.txt','wb')
- child.logfile = fout
- Example log to stdout::
- # In Python 2:
- child = pexpect.spawn('some_command')
- child.logfile = sys.stdout
- # In Python 3, spawnu should be used to give str to stdout:
- child = pexpect.spawnu('some_command')
- child.logfile = sys.stdout
- The logfile_read and logfile_send members can be used to separately log
- the input from the child and output sent to the child. Sometimes you
- don't want to see everything you write to the child. You only want to
- log what the child sends back. For example::
- child = pexpect.spawn('some_command')
- child.logfile_read = sys.stdout
- You will need to pass an encoding to spawn in the above code if you are
- using Python 3.
- To separately log output sent to the child use logfile_send::
- child.logfile_send = fout
- If ``ignore_sighup`` is True, the child process will ignore SIGHUP
- signals. The default is False from Pexpect 4.0, meaning that SIGHUP
- will be handled normally by the child.
- The delaybeforesend helps overcome a weird behavior that many users
- were experiencing. The typical problem was that a user would expect() a
- "Password:" prompt and then immediately call sendline() to send the
- password. The user would then see that their password was echoed back
- to them. Passwords don't normally echo. The problem is caused by the
- fact that most applications print out the "Password" prompt and then
- turn off stdin echo, but if you send your password before the
- application turned off echo, then you get your password echoed.
- Normally this wouldn't be a problem when interacting with a human at a
- real keyboard. If you introduce a slight delay just before writing then
- this seems to clear up the problem. This was such a common problem for
- many users that I decided that the default pexpect behavior should be
- to sleep just before writing to the child application. 1/20th of a
- second (50 ms) seems to be enough to clear up the problem. You can set
- delaybeforesend to None to return to the old behavior.
- Note that spawn is clever about finding commands on your path.
- It uses the same logic that "which" uses to find executables.
- If you wish to get the exit status of the child you must call the
- close() method. The exit or signal status of the child will be stored
- in self.exitstatus or self.signalstatus. If the child exited normally
- then exitstatus will store the exit return code and signalstatus will
- be None. If the child was terminated abnormally with a signal then
- signalstatus will store the signal value and exitstatus will be None::
- child = pexpect.spawn('some_command')
- child.close()
- print(child.exitstatus, child.signalstatus)
- If you need more detail you can also read the self.status member which
- stores the status returned by os.waitpid. You can interpret this using
- os.WIFEXITED/os.WEXITSTATUS or os.WIFSIGNALED/os.TERMSIG.
- The echo attribute may be set to False to disable echoing of input.
- As a pseudo-terminal, all input echoed by the "keyboard" (send()
- or sendline()) will be repeated to output. For many cases, it is
- not desirable to have echo enabled, and it may be later disabled
- using setecho(False) followed by waitnoecho(). However, for some
- platforms such as Solaris, this is not possible, and should be
- disabled immediately on spawn.
-
- If preexec_fn is given, it will be called in the child process before
- launching the given command. This is useful to e.g. reset inherited
- signal handlers.
- The dimensions attribute specifies the size of the pseudo-terminal as
- seen by the subprocess, and is specified as a two-entry tuple (rows,
- columns). If this is unspecified, the defaults in ptyprocess will apply.
- '''
- super(spawn, self).__init__(timeout=timeout, maxread=maxread, searchwindowsize=searchwindowsize,
- logfile=logfile, encoding=encoding, codec_errors=codec_errors)
- self.STDIN_FILENO = pty.STDIN_FILENO
- self.STDOUT_FILENO = pty.STDOUT_FILENO
- self.STDERR_FILENO = pty.STDERR_FILENO
- self.cwd = cwd
- self.env = env
- self.echo = echo
- self.ignore_sighup = ignore_sighup
- self.__irix_hack = sys.platform.lower().startswith('irix')
- if command is None:
- self.command = None
- self.args = None
- self.name = '<pexpect factory incomplete>'
- else:
- self._spawn(command, args, preexec_fn, dimensions)
- def __str__(self):
- '''This returns a human-readable string that represents the state of
- the object. '''
- s = []
- s.append(repr(self))
- s.append('command: ' + str(self.command))
- s.append('args: %r' % (self.args,))
- s.append('buffer (last 100 chars): %r' % (
- self.buffer[-100:] if self.buffer else self.buffer,))
- s.append('before (last 100 chars): %r' % (
- self.before[-100:] if self.before else self.before,))
- s.append('after: %r' % (self.after,))
- s.append('match: %r' % (self.match,))
- s.append('match_index: ' + str(self.match_index))
- s.append('exitstatus: ' + str(self.exitstatus))
- if hasattr(self, 'ptyproc'):
- s.append('flag_eof: ' + str(self.flag_eof))
- s.append('pid: ' + str(self.pid))
- s.append('child_fd: ' + str(self.child_fd))
- s.append('closed: ' + str(self.closed))
- s.append('timeout: ' + str(self.timeout))
- s.append('delimiter: ' + str(self.delimiter))
- s.append('logfile: ' + str(self.logfile))
- s.append('logfile_read: ' + str(self.logfile_read))
- s.append('logfile_send: ' + str(self.logfile_send))
- s.append('maxread: ' + str(self.maxread))
- s.append('ignorecase: ' + str(self.ignorecase))
- s.append('searchwindowsize: ' + str(self.searchwindowsize))
- s.append('delaybeforesend: ' + str(self.delaybeforesend))
- s.append('delayafterclose: ' + str(self.delayafterclose))
- s.append('delayafterterminate: ' + str(self.delayafterterminate))
- return '\n'.join(s)
- def _spawn(self, command, args=[], preexec_fn=None, dimensions=None):
- '''This starts the given command in a child process. This does all the
- fork/exec type of stuff for a pty. This is called by __init__. If args
- is empty then command will be parsed (split on spaces) and args will be
- set to parsed arguments. '''
- # The pid and child_fd of this object get set by this method.
- # Note that it is difficult for this method to fail.
- # You cannot detect if the child process cannot start.
- # So the only way you can tell if the child process started
- # or not is to try to read from the file descriptor. If you get
- # EOF immediately then it means that the child is already dead.
- # That may not necessarily be bad because you may have spawned a child
- # that performs some task; creates no stdout output; and then dies.
- # If command is an int type then it may represent a file descriptor.
- if isinstance(command, type(0)):
- raise ExceptionPexpect('Command is an int type. ' +
- 'If this is a file descriptor then maybe you want to ' +
- 'use fdpexpect.fdspawn which takes an existing ' +
- 'file descriptor instead of a command string.')
- if not isinstance(args, type([])):
- raise TypeError('The argument, args, must be a list.')
- if args == []:
- self.args = split_command_line(command)
- self.command = self.args[0]
- else:
- # Make a shallow copy of the args list.
- self.args = args[:]
- self.args.insert(0, command)
- self.command = command
- command_with_path = which(self.command, env=self.env)
- if command_with_path is None:
- raise ExceptionPexpect('The command was not found or was not ' +
- 'executable: %s.' % self.command)
- self.command = command_with_path
- self.args[0] = self.command
- self.name = '<' + ' '.join(self.args) + '>'
- assert self.pid is None, 'The pid member must be None.'
- assert self.command is not None, 'The command member must not be None.'
- kwargs = {'echo': self.echo, 'preexec_fn': preexec_fn}
- if self.ignore_sighup:
- def preexec_wrapper():
- "Set SIGHUP to be ignored, then call the real preexec_fn"
- signal.signal(signal.SIGHUP, signal.SIG_IGN)
- if preexec_fn is not None:
- preexec_fn()
- kwargs['preexec_fn'] = preexec_wrapper
- if dimensions is not None:
- kwargs['dimensions'] = dimensions
- if self.encoding is not None:
- # Encode command line using the specified encoding
- self.args = [a if isinstance(a, bytes) else a.encode(self.encoding)
- for a in self.args]
- self.ptyproc = self._spawnpty(self.args, env=self.env,
- cwd=self.cwd, **kwargs)
- self.pid = self.ptyproc.pid
- self.child_fd = self.ptyproc.fd
- self.terminated = False
- self.closed = False
- def _spawnpty(self, args, **kwargs):
- '''Spawn a pty and return an instance of PtyProcess.'''
- return ptyprocess.PtyProcess.spawn(args, **kwargs)
- def close(self, force=True):
- '''This closes the connection with the child application. Note that
- calling close() more than once is valid. This emulates standard Python
- behavior with files. Set force to True if you want to make sure that
- the child is terminated (SIGKILL is sent if the child ignores SIGHUP
- and SIGINT). '''
- self.flush()
- self.ptyproc.close(force=force)
- self.isalive() # Update exit status from ptyproc
- self.child_fd = -1
- def isatty(self):
- '''This returns True if the file descriptor is open and connected to a
- tty(-like) device, else False.
- On SVR4-style platforms implementing streams, such as SunOS and HP-UX,
- the child pty may not appear as a terminal device. This means
- methods such as setecho(), setwinsize(), getwinsize() may raise an
- IOError. '''
- return os.isatty(self.child_fd)
- def waitnoecho(self, timeout=-1):
- '''This waits until the terminal ECHO flag is set False. This returns
- True if the echo mode is off. This returns False if the ECHO flag was
- not set False before the timeout. This can be used to detect when the
- child is waiting for a password. Usually a child application will turn
- off echo mode when it is waiting for the user to enter a password. For
- example, instead of expecting the "password:" prompt you can wait for
- the child to set ECHO off::
- p = pexpect.spawn('ssh user@example.com')
- p.waitnoecho()
- p.sendline(mypassword)
- If timeout==-1 then this method will use the value in self.timeout.
- If timeout==None then this method to block until ECHO flag is False.
- '''
- if timeout == -1:
- timeout = self.timeout
- if timeout is not None:
- end_time = time.time() + timeout
- while True:
- if not self.getecho():
- return True
- if timeout < 0 and timeout is not None:
- return False
- if timeout is not None:
- timeout = end_time - time.time()
- time.sleep(0.1)
- def getecho(self):
- '''This returns the terminal echo mode. This returns True if echo is
- on or False if echo is off. Child applications that are expecting you
- to enter a password often set ECHO False. See waitnoecho().
- Not supported on platforms where ``isatty()`` returns False. '''
- return self.ptyproc.getecho()
- def setecho(self, state):
- '''This sets the terminal echo mode on or off. Note that anything the
- child sent before the echo will be lost, so you should be sure that
- your input buffer is empty before you call setecho(). For example, the
- following will work as expected::
- p = pexpect.spawn('cat') # Echo is on by default.
- p.sendline('1234') # We expect see this twice from the child...
- p.expect(['1234']) # ... once from the tty echo...
- p.expect(['1234']) # ... and again from cat itself.
- p.setecho(False) # Turn off tty echo
- p.sendline('abcd') # We will set this only once (echoed by cat).
- p.sendline('wxyz') # We will set this only once (echoed by cat)
- p.expect(['abcd'])
- p.expect(['wxyz'])
- The following WILL NOT WORK because the lines sent before the setecho
- will be lost::
- p = pexpect.spawn('cat')
- p.sendline('1234')
- p.setecho(False) # Turn off tty echo
- p.sendline('abcd') # We will set this only once (echoed by cat).
- p.sendline('wxyz') # We will set this only once (echoed by cat)
- p.expect(['1234'])
- p.expect(['1234'])
- p.expect(['abcd'])
- p.expect(['wxyz'])
- Not supported on platforms where ``isatty()`` returns False.
- '''
- return self.ptyproc.setecho(state)
- def read_nonblocking(self, size=1, timeout=-1):
- '''This reads at most size characters from the child application. It
- includes a timeout. If the read does not complete within the timeout
- period then a TIMEOUT exception is raised. If the end of file is read
- then an EOF exception will be raised. If a logfile is specified, a
- copy is written to that log.
- If timeout is None then the read may block indefinitely.
- If timeout is -1 then the self.timeout value is used. If timeout is 0
- then the child is polled and if there is no data immediately ready
- then this will raise a TIMEOUT exception.
- The timeout refers only to the amount of time to read at least one
- character. This is not affected by the 'size' parameter, so if you call
- read_nonblocking(size=100, timeout=30) and only one character is
- available right away then one character will be returned immediately.
- It will not wait for 30 seconds for another 99 characters to come in.
- This is a wrapper around os.read(). It uses select.select() to
- implement the timeout. '''
- if self.closed:
- raise ValueError('I/O operation on closed file.')
- if timeout == -1:
- timeout = self.timeout
- # Note that some systems such as Solaris do not give an EOF when
- # the child dies. In fact, you can still try to read
- # from the child_fd -- it will block forever or until TIMEOUT.
- # For this case, I test isalive() before doing any reading.
- # If isalive() is false, then I pretend that this is the same as EOF.
- if not self.isalive():
- # timeout of 0 means "poll"
- r, w, e = select_ignore_interrupts([self.child_fd], [], [], 0)
- if not r:
- self.flag_eof = True
- raise EOF('End Of File (EOF). Braindead platform.')
- elif self.__irix_hack:
- # Irix takes a long time before it realizes a child was terminated.
- # FIXME So does this mean Irix systems are forced to always have
- # FIXME a 2 second delay when calling read_nonblocking? That sucks.
- r, w, e = select_ignore_interrupts([self.child_fd], [], [], 2)
- if not r and not self.isalive():
- self.flag_eof = True
- raise EOF('End Of File (EOF). Slow platform.')
- r, w, e = select_ignore_interrupts([self.child_fd], [], [], timeout)
- if not r:
- if not self.isalive():
- # Some platforms, such as Irix, will claim that their
- # processes are alive; timeout on the select; and
- # then finally admit that they are not alive.
- self.flag_eof = True
- raise EOF('End of File (EOF). Very slow platform.')
- else:
- raise TIMEOUT('Timeout exceeded.')
- if self.child_fd in r:
- return super(spawn, self).read_nonblocking(size)
- raise ExceptionPexpect('Reached an unexpected state.') # pragma: no cover
- def write(self, s):
- '''This is similar to send() except that there is no return value.
- '''
- self.send(s)
- def writelines(self, sequence):
- '''This calls write() for each element in the sequence. The sequence
- can be any iterable object producing strings, typically a list of
- strings. This does not add line separators. There is no return value.
- '''
- for s in sequence:
- self.write(s)
- def send(self, s):
- '''Sends string ``s`` to the child process, returning the number of
- bytes written. If a logfile is specified, a copy is written to that
- log.
- The default terminal input mode is canonical processing unless set
- otherwise by the child process. This allows backspace and other line
- processing to be performed prior to transmitting to the receiving
- program. As this is buffered, there is a limited size of such buffer.
- On Linux systems, this is 4096 (defined by N_TTY_BUF_SIZE). All
- other systems honor the POSIX.1 definition PC_MAX_CANON -- 1024
- on OSX, 256 on OpenSolaris, and 1920 on FreeBSD.
- This value may be discovered using fpathconf(3)::
- >>> from os import fpathconf
- >>> print(fpathconf(0, 'PC_MAX_CANON'))
- 256
- On such a system, only 256 bytes may be received per line. Any
- subsequent bytes received will be discarded. BEL (``'\a'``) is then
- sent to output if IMAXBEL (termios.h) is set by the tty driver.
- This is usually enabled by default. Linux does not honor this as
- an option -- it behaves as though it is always set on.
- Canonical input processing may be disabled altogether by executing
- a shell, then stty(1), before executing the final program::
- >>> bash = pexpect.spawn('/bin/bash', echo=False)
- >>> bash.sendline('stty -icanon')
- >>> bash.sendline('base64')
- >>> bash.sendline('x' * 5000)
- '''
- if self.delaybeforesend is not None:
- time.sleep(self.delaybeforesend)
- s = self._coerce_send_string(s)
- self._log(s, 'send')
- b = self._encoder.encode(s, final=False)
- return os.write(self.child_fd, b)
- def sendline(self, s=''):
- '''Wraps send(), sending string ``s`` to child process, with
- ``os.linesep`` automatically appended. Returns number of bytes
- written. Only a limited number of bytes may be sent for each
- line in the default terminal mode, see docstring of :meth:`send`.
- '''
- s = self._coerce_send_string(s)
- return self.send(s + self.linesep)
- def _log_control(self, s):
- """Write control characters to the appropriate log files"""
- if self.encoding is not None:
- s = s.decode(self.encoding, 'replace')
- self._log(s, 'send')
- def sendcontrol(self, char):
- '''Helper method that wraps send() with mnemonic access for sending control
- character to the child (such as Ctrl-C or Ctrl-D). For example, to send
- Ctrl-G (ASCII 7, bell, '\a')::
- child.sendcontrol('g')
- See also, sendintr() and sendeof().
- '''
- n, byte = self.ptyproc.sendcontrol(char)
- self._log_control(byte)
- return n
- def sendeof(self):
- '''This sends an EOF to the child. This sends a character which causes
- the pending parent output buffer to be sent to the waiting child
- program without waiting for end-of-line. If it is the first character
- of the line, the read() in the user program returns 0, which signifies
- end-of-file. This means to work as expected a sendeof() has to be
- called at the beginning of a line. This method does not send a newline.
- It is the responsibility of the caller to ensure the eof is sent at the
- beginning of a line. '''
- n, byte = self.ptyproc.sendeof()
- self._log_control(byte)
- def sendintr(self):
- '''This sends a SIGINT to the child. It does not require
- the SIGINT to be the first character on a line. '''
- n, byte = self.ptyproc.sendintr()
- self._log_control(byte)
- @property
- def flag_eof(self):
- return self.ptyproc.flag_eof
- @flag_eof.setter
- def flag_eof(self, value):
- self.ptyproc.flag_eof = value
- def eof(self):
- '''This returns True if the EOF exception was ever raised.
- '''
- return self.flag_eof
- def terminate(self, force=False):
- '''This forces a child process to terminate. It starts nicely with
- SIGHUP and SIGINT. If "force" is True then moves onto SIGKILL. This
- returns True if the child was terminated. This returns False if the
- child could not be terminated. '''
- if not self.isalive():
- return True
- try:
- self.kill(signal.SIGHUP)
- time.sleep(self.delayafterterminate)
- if not self.isalive():
- return True
- self.kill(signal.SIGCONT)
- time.sleep(self.delayafterterminate)
- if not self.isalive():
- return True
- self.kill(signal.SIGINT)
- time.sleep(self.delayafterterminate)
- if not self.isalive():
- return True
- if force:
- self.kill(signal.SIGKILL)
- time.sleep(self.delayafterterminate)
- if not self.isalive():
- return True
- else:
- return False
- return False
- except OSError:
- # I think there are kernel timing issues that sometimes cause
- # this to happen. I think isalive() reports True, but the
- # process is dead to the kernel.
- # Make one last attempt to see if the kernel is up to date.
- time.sleep(self.delayafterterminate)
- if not self.isalive():
- return True
- else:
- return False
- def wait(self):
- '''This waits until the child exits. This is a blocking call. This will
- not read any data from the child, so this will block forever if the
- child has unread output and has terminated. In other words, the child
- may have printed output then called exit(), but, the child is
- technically still alive until its output is read by the parent.
- This method is non-blocking if :meth:`wait` has already been called
- previously or :meth:`isalive` method returns False. It simply returns
- the previously determined exit status.
- '''
- ptyproc = self.ptyproc
- with _wrap_ptyprocess_err():
- # exception may occur if "Is some other process attempting
- # "job control with our child pid?"
- exitstatus = ptyproc.wait()
- self.status = ptyproc.status
- self.exitstatus = ptyproc.exitstatus
- self.signalstatus = ptyproc.signalstatus
- self.terminated = True
- return exitstatus
- def isalive(self):
- '''This tests if the child process is running or not. This is
- non-blocking. If the child was terminated then this will read the
- exitstatus or signalstatus of the child. This returns True if the child
- process appears to be running or False if not. It can take literally
- SECONDS for Solaris to return the right status. '''
- ptyproc = self.ptyproc
- with _wrap_ptyprocess_err():
- alive = ptyproc.isalive()
- if not alive:
- self.status = ptyproc.status
- self.exitstatus = ptyproc.exitstatus
- self.signalstatus = ptyproc.signalstatus
- self.terminated = True
- return alive
- def kill(self, sig):
- '''This sends the given signal to the child application. In keeping
- with UNIX tradition it has a misleading name. It does not necessarily
- kill the child unless you send the right signal. '''
- # Same as os.kill, but the pid is given for you.
- if self.isalive():
- os.kill(self.pid, sig)
- def getwinsize(self):
- '''This returns the terminal window size of the child tty. The return
- value is a tuple of (rows, cols). '''
- return self.ptyproc.getwinsize()
- def setwinsize(self, rows, cols):
- '''This sets the terminal window size of the child tty. This will cause
- a SIGWINCH signal to be sent to the child. This does not change the
- physical window size. It changes the size reported to TTY-aware
- applications like vi or curses -- applications that respond to the
- SIGWINCH signal. '''
- return self.ptyproc.setwinsize(rows, cols)
- def interact(self, escape_character=chr(29),
- input_filter=None, output_filter=None):
- '''This gives control of the child process to the interactive user (the
- human at the keyboard). Keystrokes are sent to the child process, and
- the stdout and stderr output of the child process is printed. This
- simply echos the child stdout and child stderr to the real stdout and
- it echos the real stdin to the child stdin. When the user types the
- escape_character this method will return None. The escape_character
- will not be transmitted. The default for escape_character is
- entered as ``Ctrl - ]``, the very same as BSD telnet. To prevent
- escaping, escape_character may be set to None.
- If a logfile is specified, then the data sent and received from the
- child process in interact mode is duplicated to the given log.
- You may pass in optional input and output filter functions. These
- functions should take a string and return a string. The output_filter
- will be passed all the output from the child process. The input_filter
- will be passed all the keyboard input from the user. The input_filter
- is run BEFORE the check for the escape_character.
- Note that if you change the window size of the parent the SIGWINCH
- signal will not be passed through to the child. If you want the child
- window size to change when the parent's window size changes then do
- something like the following example::
- import pexpect, struct, fcntl, termios, signal, sys
- def sigwinch_passthrough (sig, data):
- s = struct.pack("HHHH", 0, 0, 0, 0)
- a = struct.unpack('hhhh', fcntl.ioctl(sys.stdout.fileno(),
- termios.TIOCGWINSZ , s))
- global p
- p.setwinsize(a[0],a[1])
- # Note this 'p' global and used in sigwinch_passthrough.
- p = pexpect.spawn('/bin/bash')
- signal.signal(signal.SIGWINCH, sigwinch_passthrough)
- p.interact()
- '''
- # Flush the buffer.
- self.write_to_stdout(self.buffer)
- self.stdout.flush()
- self.buffer = self.string_type()
- mode = tty.tcgetattr(self.STDIN_FILENO)
- tty.setraw(self.STDIN_FILENO)
- if escape_character is not None and PY3:
- escape_character = escape_character.encode('latin-1')
- try:
- self.__interact_copy(escape_character, input_filter, output_filter)
- finally:
- tty.tcsetattr(self.STDIN_FILENO, tty.TCSAFLUSH, mode)
- def __interact_writen(self, fd, data):
- '''This is used by the interact() method.
- '''
- while data != b'' and self.isalive():
- n = os.write(fd, data)
- data = data[n:]
- def __interact_read(self, fd):
- '''This is used by the interact() method.
- '''
- return os.read(fd, 1000)
- def __interact_copy(self, escape_character=None,
- input_filter=None, output_filter=None):
- '''This is used by the interact() method.
- '''
- while self.isalive():
- r, w, e = select_ignore_interrupts([self.child_fd, self.STDIN_FILENO], [], [])
- if self.child_fd in r:
- try:
- data = self.__interact_read(self.child_fd)
- except OSError as err:
- if err.args[0] == errno.EIO:
- # Linux-style EOF
- break
- raise
- if data == b'':
- # BSD-style EOF
- break
- if output_filter:
- data = output_filter(data)
- self._log(data, 'read')
- os.write(self.STDOUT_FILENO, data)
- if self.STDIN_FILENO in r:
- data = self.__interact_read(self.STDIN_FILENO)
- if input_filter:
- data = input_filter(data)
- i = -1
- if escape_character is not None:
- i = data.rfind(escape_character)
- if i != -1:
- data = data[:i]
- if data:
- self._log(data, 'send')
- self.__interact_writen(self.child_fd, data)
- break
- self._log(data, 'send')
- self.__interact_writen(self.child_fd, data)
- def spawnu(*args, **kwargs):
- """Deprecated: pass encoding to spawn() instead."""
- kwargs.setdefault('encoding', 'utf-8')
- return spawn(*args, **kwargs)
|