input_processor.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372
  1. # *** encoding: utf-8 ***
  2. """
  3. An :class:`~.InputProcessor` receives callbacks for the keystrokes parsed from
  4. the input in the :class:`~prompt_toolkit.inputstream.InputStream` instance.
  5. The `InputProcessor` will according to the implemented keybindings call the
  6. correct callbacks when new key presses are feed through `feed`.
  7. """
  8. from __future__ import unicode_literals
  9. from prompt_toolkit.buffer import EditReadOnlyBuffer
  10. from prompt_toolkit.filters.cli import ViNavigationMode
  11. from prompt_toolkit.keys import Keys, Key
  12. from prompt_toolkit.utils import Event
  13. from .registry import BaseRegistry
  14. from collections import deque
  15. from six.moves import range
  16. import weakref
  17. import six
  18. __all__ = (
  19. 'InputProcessor',
  20. 'KeyPress',
  21. )
  22. class KeyPress(object):
  23. """
  24. :param key: A `Keys` instance or text (one character).
  25. :param data: The received string on stdin. (Often vt100 escape codes.)
  26. """
  27. def __init__(self, key, data=None):
  28. assert isinstance(key, (six.text_type, Key))
  29. assert data is None or isinstance(data, six.text_type)
  30. if data is None:
  31. data = key.name if isinstance(key, Key) else key
  32. self.key = key
  33. self.data = data
  34. def __repr__(self):
  35. return '%s(key=%r, data=%r)' % (
  36. self.__class__.__name__, self.key, self.data)
  37. def __eq__(self, other):
  38. return self.key == other.key and self.data == other.data
  39. class InputProcessor(object):
  40. """
  41. Statemachine that receives :class:`KeyPress` instances and according to the
  42. key bindings in the given :class:`Registry`, calls the matching handlers.
  43. ::
  44. p = InputProcessor(registry)
  45. # Send keys into the processor.
  46. p.feed(KeyPress(Keys.ControlX, '\x18'))
  47. p.feed(KeyPress(Keys.ControlC, '\x03')
  48. # Process all the keys in the queue.
  49. p.process_keys()
  50. # Now the ControlX-ControlC callback will be called if this sequence is
  51. # registered in the registry.
  52. :param registry: `BaseRegistry` instance.
  53. :param cli_ref: weakref to `CommandLineInterface`.
  54. """
  55. def __init__(self, registry, cli_ref):
  56. assert isinstance(registry, BaseRegistry)
  57. self._registry = registry
  58. self._cli_ref = cli_ref
  59. self.beforeKeyPress = Event(self)
  60. self.afterKeyPress = Event(self)
  61. # The queue of keys not yet send to our _process generator/state machine.
  62. self.input_queue = deque()
  63. # The key buffer that is matched in the generator state machine.
  64. # (This is at at most the amount of keys that make up for one key binding.)
  65. self.key_buffer = []
  66. # Simple macro recording. (Like readline does.)
  67. self.record_macro = False
  68. self.macro = []
  69. self.reset()
  70. def reset(self):
  71. self._previous_key_sequence = []
  72. self._previous_handler = None
  73. self._process_coroutine = self._process()
  74. self._process_coroutine.send(None)
  75. #: Readline argument (for repetition of commands.)
  76. #: https://www.gnu.org/software/bash/manual/html_node/Readline-Arguments.html
  77. self.arg = None
  78. def start_macro(self):
  79. " Start recording macro. "
  80. self.record_macro = True
  81. self.macro = []
  82. def end_macro(self):
  83. " End recording macro. "
  84. self.record_macro = False
  85. def call_macro(self):
  86. for k in self.macro:
  87. self.feed(k)
  88. def _get_matches(self, key_presses):
  89. """
  90. For a list of :class:`KeyPress` instances. Give the matching handlers
  91. that would handle this.
  92. """
  93. keys = tuple(k.key for k in key_presses)
  94. cli = self._cli_ref()
  95. # Try match, with mode flag
  96. return [b for b in self._registry.get_bindings_for_keys(keys) if b.filter(cli)]
  97. def _is_prefix_of_longer_match(self, key_presses):
  98. """
  99. For a list of :class:`KeyPress` instances. Return True if there is any
  100. handler that is bound to a suffix of this keys.
  101. """
  102. keys = tuple(k.key for k in key_presses)
  103. cli = self._cli_ref()
  104. # Get the filters for all the key bindings that have a longer match.
  105. # Note that we transform it into a `set`, because we don't care about
  106. # the actual bindings and executing it more than once doesn't make
  107. # sense. (Many key bindings share the same filter.)
  108. filters = set(b.filter for b in self._registry.get_bindings_starting_with_keys(keys))
  109. # When any key binding is active, return True.
  110. return any(f(cli) for f in filters)
  111. def _process(self):
  112. """
  113. Coroutine implementing the key match algorithm. Key strokes are sent
  114. into this generator, and it calls the appropriate handlers.
  115. """
  116. buffer = self.key_buffer
  117. retry = False
  118. while True:
  119. if retry:
  120. retry = False
  121. else:
  122. buffer.append((yield))
  123. # If we have some key presses, check for matches.
  124. if buffer:
  125. is_prefix_of_longer_match = self._is_prefix_of_longer_match(buffer)
  126. matches = self._get_matches(buffer)
  127. # When eager matches were found, give priority to them and also
  128. # ignore all the longer matches.
  129. eager_matches = [m for m in matches if m.eager(self._cli_ref())]
  130. if eager_matches:
  131. matches = eager_matches
  132. is_prefix_of_longer_match = False
  133. # Exact matches found, call handler.
  134. if not is_prefix_of_longer_match and matches:
  135. self._call_handler(matches[-1], key_sequence=buffer[:])
  136. del buffer[:] # Keep reference.
  137. # No match found.
  138. elif not is_prefix_of_longer_match and not matches:
  139. retry = True
  140. found = False
  141. # Loop over the input, try longest match first and shift.
  142. for i in range(len(buffer), 0, -1):
  143. matches = self._get_matches(buffer[:i])
  144. if matches:
  145. self._call_handler(matches[-1], key_sequence=buffer[:i])
  146. del buffer[:i]
  147. found = True
  148. break
  149. if not found:
  150. del buffer[:1]
  151. def feed(self, key_press):
  152. """
  153. Add a new :class:`KeyPress` to the input queue.
  154. (Don't forget to call `process_keys` in order to process the queue.)
  155. """
  156. assert isinstance(key_press, KeyPress)
  157. self.input_queue.append(key_press)
  158. def process_keys(self):
  159. """
  160. Process all the keys in the `input_queue`.
  161. (To be called after `feed`.)
  162. Note: because of the `feed`/`process_keys` separation, it is
  163. possible to call `feed` from inside a key binding.
  164. This function keeps looping until the queue is empty.
  165. """
  166. while self.input_queue:
  167. key_press = self.input_queue.popleft()
  168. if key_press.key != Keys.CPRResponse:
  169. self.beforeKeyPress.fire()
  170. self._process_coroutine.send(key_press)
  171. if key_press.key != Keys.CPRResponse:
  172. self.afterKeyPress.fire()
  173. # Invalidate user interface.
  174. cli = self._cli_ref()
  175. if cli:
  176. cli.invalidate()
  177. def _call_handler(self, handler, key_sequence=None):
  178. was_recording = self.record_macro
  179. arg = self.arg
  180. self.arg = None
  181. event = KeyPressEvent(
  182. weakref.ref(self), arg=arg, key_sequence=key_sequence,
  183. previous_key_sequence=self._previous_key_sequence,
  184. is_repeat=(handler == self._previous_handler))
  185. # Save the state of the current buffer.
  186. cli = event.cli # Can be `None` (In unit-tests only.)
  187. if handler.save_before(event) and cli:
  188. cli.current_buffer.save_to_undo_stack()
  189. # Call handler.
  190. try:
  191. handler.call(event)
  192. self._fix_vi_cursor_position(event)
  193. except EditReadOnlyBuffer:
  194. # When a key binding does an attempt to change a buffer which is
  195. # read-only, we can just silently ignore that.
  196. pass
  197. self._previous_key_sequence = key_sequence
  198. self._previous_handler = handler
  199. # Record the key sequence in our macro. (Only if we're in macro mode
  200. # before and after executing the key.)
  201. if self.record_macro and was_recording:
  202. self.macro.extend(key_sequence)
  203. def _fix_vi_cursor_position(self, event):
  204. """
  205. After every command, make sure that if we are in Vi navigation mode, we
  206. never put the cursor after the last character of a line. (Unless it's
  207. an empty line.)
  208. """
  209. cli = self._cli_ref()
  210. if cli:
  211. buff = cli.current_buffer
  212. preferred_column = buff.preferred_column
  213. if (ViNavigationMode()(event.cli) and
  214. buff.document.is_cursor_at_the_end_of_line and
  215. len(buff.document.current_line) > 0):
  216. buff.cursor_position -= 1
  217. # Set the preferred_column for arrow up/down again.
  218. # (This was cleared after changing the cursor position.)
  219. buff.preferred_column = preferred_column
  220. class KeyPressEvent(object):
  221. """
  222. Key press event, delivered to key bindings.
  223. :param input_processor_ref: Weak reference to the `InputProcessor`.
  224. :param arg: Repetition argument.
  225. :param key_sequence: List of `KeyPress` instances.
  226. :param previouskey_sequence: Previous list of `KeyPress` instances.
  227. :param is_repeat: True when the previous event was delivered to the same handler.
  228. """
  229. def __init__(self, input_processor_ref, arg=None, key_sequence=None,
  230. previous_key_sequence=None, is_repeat=False):
  231. self._input_processor_ref = input_processor_ref
  232. self.key_sequence = key_sequence
  233. self.previous_key_sequence = previous_key_sequence
  234. #: True when the previous key sequence was handled by the same handler.
  235. self.is_repeat = is_repeat
  236. self._arg = arg
  237. def __repr__(self):
  238. return 'KeyPressEvent(arg=%r, key_sequence=%r, is_repeat=%r)' % (
  239. self.arg, self.key_sequence, self.is_repeat)
  240. @property
  241. def data(self):
  242. return self.key_sequence[-1].data
  243. @property
  244. def input_processor(self):
  245. return self._input_processor_ref()
  246. @property
  247. def cli(self):
  248. """
  249. Command line interface.
  250. """
  251. return self.input_processor._cli_ref()
  252. @property
  253. def current_buffer(self):
  254. """
  255. The current buffer.
  256. """
  257. return self.cli.current_buffer
  258. @property
  259. def arg(self):
  260. """
  261. Repetition argument.
  262. """
  263. if self._arg == '-':
  264. return -1
  265. result = int(self._arg or 1)
  266. # Don't exceed a million.
  267. if int(result) >= 1000000:
  268. result = 1
  269. return result
  270. @property
  271. def arg_present(self):
  272. """
  273. True if repetition argument was explicitly provided.
  274. """
  275. return self._arg is not None
  276. def append_to_arg_count(self, data):
  277. """
  278. Add digit to the input argument.
  279. :param data: the typed digit as string
  280. """
  281. assert data in '-0123456789'
  282. current = self._arg
  283. if data == '-':
  284. assert current is None or current == '-'
  285. result = data
  286. elif current is None:
  287. result = data
  288. else:
  289. result = "%s%s" % (current, data)
  290. self.input_processor.arg = result