winpty_wrapper.py 3.0 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798
  1. # -*- coding: utf-8 -*-
  2. """Wrap process I/O pipe communication using pywin32."""
  3. # yapf: disable
  4. # Standard library imports
  5. from ctypes import windll
  6. from ctypes.wintypes import DWORD, LPVOID, HANDLE, BOOL, LPCVOID
  7. import ctypes
  8. # Local imports
  9. from .cywinpty import Agent
  10. import sys
  11. PY2 = sys.version_info[0] == 2
  12. # yapf: enable
  13. OPEN_EXISTING = 3
  14. GENERIC_WRITE = 0x40000000
  15. GENERIC_READ = 0x80000000
  16. LARGE_INTEGER = ctypes.c_ulong
  17. PLARGE_INTEGER = ctypes.POINTER(LARGE_INTEGER)
  18. LPOVERLAPPED = LPVOID
  19. # LPDWORD is not in ctypes.wintypes on Python 2
  20. LPDWORD = ctypes.POINTER(DWORD)
  21. ReadFile = windll.kernel32.ReadFile
  22. ReadFile.restype = BOOL
  23. ReadFile.argtypes = [HANDLE, LPVOID, DWORD, LPDWORD, LPOVERLAPPED]
  24. WriteFile = windll.kernel32.WriteFile
  25. WriteFile.restype = BOOL
  26. WriteFile.argtypes = [HANDLE, LPCVOID, DWORD, LPDWORD, LPOVERLAPPED]
  27. class PTY(Agent):
  28. """
  29. This class provides a pywin32 communication wrapper around winpty process
  30. communication pipes.
  31. Inherits all Cython winpty agent functionality and properties.
  32. """
  33. def __init__(self, cols, rows):
  34. """Initialize a new Pseudo Terminal of size ``(cols, rows)``."""
  35. Agent.__init__(self, cols, rows, True)
  36. self.conin_pipe = windll.kernel32.CreateFileW(
  37. self.conin_pipe_name, GENERIC_WRITE, 0, None, OPEN_EXISTING, 0,
  38. None
  39. )
  40. self.conout_pipe = windll.kernel32.CreateFileW(
  41. self.conout_pipe_name, GENERIC_READ, 0, None, OPEN_EXISTING, 0,
  42. None
  43. )
  44. def read(self, length=1000, blocking=False):
  45. """
  46. Read ``length`` bytes from current process output stream.
  47. Note: This method is not fully non-blocking, however it
  48. behaves like one.
  49. """
  50. size_p = PLARGE_INTEGER(LARGE_INTEGER(0))
  51. if not blocking:
  52. windll.kernel32.GetFileSizeEx(self.conout_pipe, size_p)
  53. size = size_p[0]
  54. length = min(size, length)
  55. data = ctypes.create_string_buffer(length)
  56. if length > 0:
  57. num_bytes = PLARGE_INTEGER(LARGE_INTEGER(0))
  58. ReadFile(self.conout_pipe, data, length, num_bytes, None)
  59. return data.value
  60. def write(self, data):
  61. """Write string data to current process input stream."""
  62. data = data.encode('utf-8')
  63. data_p = ctypes.create_string_buffer(data)
  64. num_bytes = PLARGE_INTEGER(LARGE_INTEGER(0))
  65. bytes_to_write = len(data)
  66. success = WriteFile(self.conin_pipe, data_p,
  67. bytes_to_write, num_bytes, None)
  68. return success, num_bytes[0]
  69. def close(self):
  70. """Close all communication process streams."""
  71. windll.kernel32.CloseHandle(self.conout_pipe)
  72. windll.kernel32.CloseHandle(self.conin_pipe)
  73. def iseof(self):
  74. """Check if current process streams are still open."""
  75. succ = windll.kernel32.PeekNamedPipe(
  76. self.conout_pipe, None, None, None, None, None
  77. )
  78. return not bool(succ)