repl.py 21 KB


  1. # -*- coding: utf-8 -*-
  2. from __future__ import unicode_literals, print_function
  3. def main():
  4. import codecs
  5. from colorama import Fore, Style
  6. import frida
  7. from frida.application import ConsoleApplication
  8. import json
  9. import os
  10. import platform
  11. import re
  12. from prompt_toolkit.shortcuts import create_prompt_application, create_output, create_eventloop
  13. from prompt_toolkit.history import FileHistory
  14. from prompt_toolkit.completion import Completion, Completer
  15. from prompt_toolkit.interface import CommandLineInterface
  16. from pygments.lexers import JavascriptLexer
  17. from pygments.token import Token
  18. import sys
  19. import threading
  20. class REPLApplication(ConsoleApplication):
  21. def __init__(self):
  22. self._script = None
  23. self._seqno = 0
  24. self._ready = threading.Event()
  25. self._history = FileHistory(os.path.join(os.path.expanduser('~'), '.frida_history'))
  26. self._completer = FridaCompleter(self)
  27. self._cli = None
  28. self._last_change_id = 0
  29. self._script_monitor = None
  30. self._monitored_file = None
  31. super(REPLApplication, self).__init__(self._process_input, self._on_stop)
  32. def _add_options(self, parser):
  33. parser.add_option("-l", "--load", help="load SCRIPT", metavar="SCRIPT",
  34. type='string', action='store', dest="user_script", default=None)
  35. parser.add_option("--no-pause", help="automatically start main thread after startup",
  36. action='store_true', dest="no_pause", default=False)
  37. def _initialize(self, parser, options, args):
  38. self._user_script = options.user_script
  39. self._no_pause = options.no_pause
  40. def _usage(self):
  41. return "usage: %prog [options] target"
  42. def _needs_target(self):
  43. return True
  44. def _start(self):
  45. self._prompt_string = self._create_prompt()
  46. try:
  47. self._load_script()
  48. except Exception as e:
  49. self._update_status("Failed to load script: {error}".format(error=e))
  50. self._exit(1)
  51. return
  52. if self._spawned_argv is not None:
  53. if self._no_pause:
  54. self._update_status("Spawned `{command}`. Resuming main thread!".format(command=" ".join(self._spawned_argv)))
  55. self._do_magic("resume")
  56. else:
  57. self._update_status("Spawned `{command}`. Use %resume to let the main thread start executing!".format(command=" ".join(self._spawned_argv)))
  58. else:
  59. self._clear_status()
  60. self._ready.set()
  61. def _on_stop(self):
  62. def set_return():
  63. raise EOFError()
  64. try:
  65. self._cli.eventloop.call_from_executor(set_return)
  66. except Exception:
  67. pass
  68. def _stop(self):
  69. self._unload_script()
  70. self._unmonitor_script()
  71. def _load_script(self):
  72. self._monitor_script()
  73. self._seqno += 1
  74. script = self._session.create_script(name="repl%d" % self._seqno, source=self._create_repl_script())
  75. script.set_log_handler(self._log)
  76. self._unload_script()
  77. self._script = script
  78. def on_message(message, data):
  79. self._reactor.schedule(lambda: self._process_message(message, data))
  80. script.on('message', on_message)
  81. script.load()
  82. def _unload_script(self):
  83. if self._script is None:
  84. return
  85. try:
  86. self._script.unload()
  87. except:
  88. pass
  89. self._script = None
  90. def _monitor_script(self):
  91. if self._monitored_file == self._user_script:
  92. return
  93. self._unmonitor_script()
  94. if self._user_script is not None:
  95. monitor = frida.FileMonitor(self._user_script)
  96. monitor.on('change', self._on_change)
  97. monitor.enable()
  98. self._script_monitor = monitor
  99. self._monitored_file = self._user_script
  100. def _unmonitor_script(self):
  101. if self._script_monitor is None:
  102. return
  103. self._script_monitor.disable()
  104. self._script_monitor = None
  105. def _process_input(self, reactor):
  106. self._print_startup_message()
  107. while self._ready.wait(0.5) != True:
  108. if not reactor.is_running():
  109. return
  110. while True:
  111. expression = ""
  112. line = ""
  113. while len(expression) == 0 or line.endswith("\\"):
  114. if not reactor.is_running():
  115. return
  116. try:
  117. prompt = "[%s]" % self._prompt_string + "-> " if len(expression) == 0 else "... "
  118. # We create the prompt manually instead of using get_input,
  119. # so we can use the cli in the _on_stop method
  120. eventloop = create_eventloop()
  121. self._cli = CommandLineInterface(
  122. application=create_prompt_application(prompt, history=self._history, completer=self._completer, lexer=JavascriptLexer),
  123. eventloop=eventloop,
  124. output=create_output())
  125. try:
  126. line = None
  127. document = self._cli.run()
  128. if document:
  129. line = document.text
  130. finally:
  131. eventloop.close()
  132. except EOFError:
  133. # An extra newline after EOF to exit the REPL cleanly
  134. self._print("\nThank you for using Frida!")
  135. return
  136. except KeyboardInterrupt:
  137. line = ""
  138. continue
  139. if len(line.strip()) > 0:
  140. if len(expression) > 0:
  141. expression += "\n"
  142. expression += line.rstrip("\\")
  143. if expression.endswith("?"):
  144. try:
  145. self._print_help(expression)
  146. except JavaScriptError as e:
  147. error = e.error
  148. self._print(Fore.RED + Style.BRIGHT + error['name'] + Style.RESET_ALL + ": " + error['message'])
  149. except frida.InvalidOperationError:
  150. return
  151. elif expression.startswith("%"):
  152. self._do_magic(expression[1:].rstrip())
  153. elif expression in ("exit", "quit", "q"):
  154. self._print("Thank you for using Frida!")
  155. return
  156. elif expression == "help":
  157. self._print("Help: #TODO :)")
  158. else:
  159. self._eval_and_print(expression)
  160. def _eval_and_print(self, expression):
  161. try:
  162. (t, value) = self._evaluate(expression)
  163. if t in ('function', 'undefined', 'null'):
  164. output = t
  165. elif t == 'binary':
  166. output = hexdump(value).rstrip("\n")
  167. else:
  168. output = json.dumps(value, sort_keys=True, indent=4, separators=(",", ": "))
  169. except JavaScriptError as e:
  170. error = e.error
  171. output = Fore.RED + Style.BRIGHT + error['name'] + Style.RESET_ALL + ": " + error['message']
  172. except frida.InvalidOperationError:
  173. return
  174. self._print(output)
  175. def _print_startup_message(self):
  176. self._print("""\
  177. ____
  178. / _ | Frida {version} - A world-class dynamic instrumentation framework
  179. | (_| |
  180. > _ | Commands:
  181. /_/ |_| help -> Displays the help system
  182. . . . . object? -> Display information about 'object'
  183. . . . . exit/quit -> Exit
  184. . . . .
  185. . . . . More info at http://www.frida.re/docs/home/""".format(version=frida.__version__))
  186. def _print_help(self, expression):
  187. # TODO: Figure out docstrings and implement here. This is real jankaty right now.
  188. help_text = ""
  189. if expression.endswith(".?"):
  190. expression = expression[:-2] + "?"
  191. obj_to_identify = [x for x in expression.split(' ') if x.endswith("?")][0][:-1]
  192. (obj_type, obj_value) = self._evaluate(obj_to_identify)
  193. if obj_type == "function":
  194. signature = self._evaluate("%s.toString()" % obj_to_identify)[1]
  195. clean_signature = signature.split("{")[0][:-1].split('function ')[-1]
  196. if "[native code]" in signature:
  197. help_text += "Type: Function (native)\n"
  198. else:
  199. help_text += "Type: Function\n"
  200. help_text += "Signature: %s\n" % clean_signature
  201. help_text += "Docstring: #TODO :)"
  202. elif obj_type == "object":
  203. help_text += "Type: Object\n"
  204. help_text += "Docstring: #TODO :)"
  205. elif obj_type == "boolean":
  206. help_text += "Type: Boolean\n"
  207. help_text += "Docstring: #TODO :)"
  208. elif obj_type == "string":
  209. help_text += "Type: Boolean\n"
  210. help_text += "Text: %s\n" % self._evaluate("%s.toString()" % obj_to_identify)[1]
  211. help_text += "Docstring: #TODO :)"
  212. self._print(help_text)
  213. # Negative means at least abs(val) - 1
  214. _magic_command_args = {
  215. 'resume': 0,
  216. 'load': 1,
  217. 'reload': 0,
  218. 'unload': 0,
  219. 'time': -2 # At least 1 arg
  220. }
  221. def _do_magic(self, statement):
  222. tokens = statement.split(" ")
  223. command = tokens[0]
  224. args = tokens[1:]
  225. required_args = self._magic_command_args.get(command)
  226. if required_args == None:
  227. self._print("Unknown command: {}".format(command))
  228. self._print("Valid commands: {}".format(", ".join(self._magic_command_args.keys())))
  229. return
  230. atleast_args = False
  231. if required_args < 0:
  232. atleast_args = True
  233. required_args = abs(required_args) - 1
  234. if (not atleast_args and len(args) != required_args) or \
  235. (atleast_args and len(args) < required_args):
  236. self._print("{cmd} command expects {atleast}{n} argument{s}".format(
  237. cmd=command, atleast='atleast ' if atleast_args else '', n=required_args, s='' if required_args == 1 else ' '))
  238. return
  239. if command == 'resume':
  240. self._reactor.schedule(lambda: self._resume())
  241. elif command == 'load':
  242. old_user_script = self._user_script
  243. self._user_script = os.path.abspath(args[0])
  244. if not self._reload():
  245. self._user_script = old_user_script
  246. elif command == 'reload':
  247. self._reload()
  248. elif command == 'unload':
  249. self._user_script = None
  250. self._reload()
  251. elif command == 'time':
  252. self._eval_and_print('''
  253. (function () {
  254. var _startTime = Date.now();
  255. var _result = eval({expression});
  256. var _endTime = Date.now();
  257. console.log('Time: ' + (_endTime - _startTime).toLocaleString() + ' ms.');
  258. return _result;
  259. })();'''.format(expression=json.dumps(' '.join(args))))
  260. def _reload(self):
  261. completed = threading.Event()
  262. result = [None]
  263. def do_reload():
  264. try:
  265. self._load_script()
  266. except Exception as e:
  267. result[0] = e
  268. completed.set()
  269. self._reactor.schedule(do_reload)
  270. completed.wait()
  271. if result[0] is None:
  272. return True
  273. else:
  274. self._print("Failed to load script: {error}".format(error=result[0]))
  275. return False
  276. def _create_prompt(self):
  277. device_type = self._device.type
  278. type_name = self._target[0]
  279. if self._target[0] == 'pid' and self._target[1] == 0:
  280. target = 'System'
  281. else:
  282. target = self._target[1]
  283. if device_type in ('local', 'remote'):
  284. if self._target[0] == 'name':
  285. type_name = "ProcName"
  286. elif self._target[0] == 'pid':
  287. type_name = "PID"
  288. prompt_string = "%s::%s::%s" % (device_type.title(), type_name, target)
  289. else:
  290. prompt_string = "%s::%s::%s" % ("USB", self._device.name, target)
  291. return prompt_string
  292. def _evaluate(self, text):
  293. result = self._script.exports.evaluate(text)
  294. if is_byte_array(result):
  295. return ('binary', result)
  296. elif isinstance(result, dict):
  297. return ('binary', bytes())
  298. elif result[0] == 'error':
  299. raise JavaScriptError(result[1])
  300. else:
  301. return result
  302. def _process_message(self, message, data):
  303. message_type = message['type']
  304. if message_type == 'error':
  305. text = message.get('stack', message['description'])
  306. self._log('error', text)
  307. else:
  308. self._print("message:", message, "data:", data)
  309. def _on_change(self, changed_file, other_file, event_type):
  310. if event_type == 'changes-done-hint':
  311. return
  312. self._last_change_id += 1
  313. change_id = self._last_change_id
  314. self._reactor.schedule(lambda: self._process_change(change_id), delay=0.05)
  315. def _process_change(self, change_id):
  316. if change_id != self._last_change_id:
  317. return
  318. try:
  319. self._load_script()
  320. except Exception as e:
  321. self._print("Failed to load script: {error}".format(error=e))
  322. def _create_repl_script(self):
  323. user_script = ""
  324. if self._user_script is not None:
  325. with codecs.open(self._user_script, 'rb', 'utf-8') as f:
  326. user_script = f.read().rstrip("\r\n") + "\n\n// Frida REPL script:\n"
  327. return user_script + """\
  328. rpc.exports.evaluate = function (expression) {
  329. try {
  330. var result = (1, eval)(expression);
  331. if (result instanceof ArrayBuffer) {
  332. return result;
  333. } else {
  334. var type = (result === null) ? 'null' : typeof result;
  335. return [type, result];
  336. }
  337. } catch (e) {
  338. return ['error', {
  339. name: e.name,
  340. message: e.message,
  341. stack: e.stack
  342. }];
  343. }
  344. };
  345. """
  346. class FridaCompleter(Completer):
  347. def __init__(self, repl):
  348. self._repl = repl
  349. self._lexer = JavascriptLexer()
  350. def get_completions(self, document, complete_event):
  351. prefix = document.text_before_cursor
  352. magic = len(prefix) > 0 and prefix[0] == '%' and not any(map(lambda c: c.isspace(), prefix))
  353. tokens = list(self._lexer.get_tokens(prefix))[:-1]
  354. # 0.toString() is invalid syntax,
  355. # but pygments doesn't seem to know that
  356. for i in range(len(tokens) - 1):
  357. if tokens[i][0] == Token.Literal.Number.Integer \
  358. and tokens[i + 1][0] == Token.Punctuation and tokens[i + 1][1] == '.':
  359. tokens[i] = (Token.Literal.Number.Float, tokens[i][1] + tokens[i + 1][1])
  360. del tokens[i + 1]
  361. before_dot = ''
  362. after_dot = ''
  363. encountered_dot = False
  364. for t in tokens[::-1]:
  365. if t[0] in Token.Name.subtypes:
  366. before_dot = t[1] + before_dot
  367. elif t[0] == Token.Punctuation and t[1] == '.':
  368. before_dot = '.' + before_dot
  369. if not encountered_dot:
  370. encountered_dot = True
  371. after_dot = before_dot[1:]
  372. before_dot = ''
  373. else:
  374. if encountered_dot:
  375. # The value/contents of the string, number or array doesn't matter,
  376. # so we just use the simplest value with that type
  377. if t[0] in Token.Literal.String.subtypes:
  378. before_dot = '""' + before_dot
  379. elif t[0] in Token.Literal.Number.subtypes:
  380. before_dot = '0.0' + before_dot
  381. elif t[0] == Token.Punctuation and t[1] == ']':
  382. before_dot = '[]' + before_dot
  383. break
  384. try:
  385. if encountered_dot:
  386. for key in self._get_keys("""try {
  387. (function (o) {
  388. "use strict";
  389. var k = Object.getOwnPropertyNames(o);
  390. if (o !== null && o !== undefined) {
  391. var p;
  392. if (typeof o !== 'object')
  393. p = o.__proto__;
  394. else
  395. p = Object.getPrototypeOf(o);
  396. if (p !== null && p !== undefined)
  397. k = k.concat(Object.getOwnPropertyNames(p));
  398. }
  399. return k;
  400. })(""" + before_dot + """);
  401. } catch (e) {
  402. [];
  403. }"""):
  404. if self._pattern_matches(after_dot, key):
  405. yield Completion(key, -len(after_dot))
  406. else:
  407. if magic:
  408. keys = self._repl._magic_command_args.keys()
  409. else:
  410. keys = self._get_keys("Object.getOwnPropertyNames(this)")
  411. for key in keys:
  412. if not self._pattern_matches(before_dot, key) or (key.startswith('_') and before_dot == ''):
  413. continue
  414. yield Completion(key, -len(before_dot))
  415. except frida.InvalidOperationError:
  416. pass
  417. except Exception as e:
  418. self._repl._print(e)
  419. def _get_keys(self, code):
  420. return sorted(
  421. filter(self._is_valid_name,
  422. set(self._repl._evaluate(code)[1])))
  423. def _is_valid_name(self, name):
  424. tokens = list(self._lexer.get_tokens(name))
  425. return len(tokens) == 2 and tokens[0][0] in Token.Name.subtypes
  426. def _pattern_matches(self, pattern, text):
  427. return re.search(re.escape(pattern), text, re.IGNORECASE) != None
  428. def hexdump(src, length=16):
  429. try:
  430. xrange
  431. except NameError:
  432. xrange = range
  433. FILTER = "".join([(len(repr(chr(x))) == 3) and chr(x) or "." for x in range(256)])
  434. lines = []
  435. for c in xrange(0, len(src), length):
  436. chars = src[c:c + length]
  437. hex = " ".join(["%02x" % x for x in iterbytes(chars)])
  438. printable = ''.join(["%s" % ((x <= 127 and FILTER[x]) or ".") for x in iterbytes(chars)])
  439. lines.append("%04x %-*s %s\n" % (c, length * 3, hex, printable))
  440. return "".join(lines)
  441. def is_byte_array(value):
  442. if sys.version_info[0] >= 3:
  443. return isinstance(value, bytes)
  444. else:
  445. return isinstance(value, str)
  446. if sys.version_info[0] >= 3:
  447. iterbytes = lambda x: iter(x)
  448. else:
  449. def iterbytes(data):
  450. return (ord(char) for char in data)
  451. app = REPLApplication()
  452. app.run()
  453. class JavaScriptError(Exception):
  454. def __init__(self, error):
  455. super(JavaScriptError, self).__init__(error['message'])
  456. self.error = error
  457. if __name__ == '__main__':
  458. main()