123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535 |
- # -*- coding: utf-8 -*-
- from __future__ import unicode_literals, print_function
- def main():
- import codecs
- from colorama import Fore, Style
- import frida
- from frida.application import ConsoleApplication
- import json
- import os
- import platform
- import re
- from prompt_toolkit.shortcuts import create_prompt_application, create_output, create_eventloop
- from prompt_toolkit.history import FileHistory
- from prompt_toolkit.completion import Completion, Completer
- from prompt_toolkit.interface import CommandLineInterface
- from pygments.lexers import JavascriptLexer
- from pygments.token import Token
- import sys
- import threading
- class REPLApplication(ConsoleApplication):
- def __init__(self):
- self._script = None
- self._seqno = 0
- self._ready = threading.Event()
- self._history = FileHistory(os.path.join(os.path.expanduser('~'), '.frida_history'))
- self._completer = FridaCompleter(self)
- self._cli = None
- self._last_change_id = 0
- self._script_monitor = None
- self._monitored_file = None
- super(REPLApplication, self).__init__(self._process_input, self._on_stop)
- def _add_options(self, parser):
- parser.add_option("-l", "--load", help="load SCRIPT", metavar="SCRIPT",
- type='string', action='store', dest="user_script", default=None)
- parser.add_option("--no-pause", help="automatically start main thread after startup",
- action='store_true', dest="no_pause", default=False)
- def _initialize(self, parser, options, args):
- self._user_script = options.user_script
- self._no_pause = options.no_pause
- def _usage(self):
- return "usage: %prog [options] target"
- def _needs_target(self):
- return True
- def _start(self):
- self._prompt_string = self._create_prompt()
- try:
- self._load_script()
- except Exception as e:
- self._update_status("Failed to load script: {error}".format(error=e))
- self._exit(1)
- return
- if self._spawned_argv is not None:
- if self._no_pause:
- self._update_status("Spawned `{command}`. Resuming main thread!".format(command=" ".join(self._spawned_argv)))
- self._do_magic("resume")
- else:
- self._update_status("Spawned `{command}`. Use %resume to let the main thread start executing!".format(command=" ".join(self._spawned_argv)))
- else:
- self._clear_status()
- self._ready.set()
- def _on_stop(self):
- def set_return():
- raise EOFError()
- try:
- self._cli.eventloop.call_from_executor(set_return)
- except Exception:
- pass
- def _stop(self):
- self._unload_script()
- self._unmonitor_script()
- def _load_script(self):
- self._monitor_script()
- self._seqno += 1
- script = self._session.create_script(name="repl%d" % self._seqno, source=self._create_repl_script())
- script.set_log_handler(self._log)
- self._unload_script()
- self._script = script
- def on_message(message, data):
- self._reactor.schedule(lambda: self._process_message(message, data))
- script.on('message', on_message)
- script.load()
- def _unload_script(self):
- if self._script is None:
- return
- try:
- self._script.unload()
- except:
- pass
- self._script = None
- def _monitor_script(self):
- if self._monitored_file == self._user_script:
- return
- self._unmonitor_script()
- if self._user_script is not None:
- monitor = frida.FileMonitor(self._user_script)
- monitor.on('change', self._on_change)
- monitor.enable()
- self._script_monitor = monitor
- self._monitored_file = self._user_script
- def _unmonitor_script(self):
- if self._script_monitor is None:
- return
- self._script_monitor.disable()
- self._script_monitor = None
- def _process_input(self, reactor):
- self._print_startup_message()
- while self._ready.wait(0.5) != True:
- if not reactor.is_running():
- return
- while True:
- expression = ""
- line = ""
- while len(expression) == 0 or line.endswith("\\"):
- if not reactor.is_running():
- return
- try:
- prompt = "[%s]" % self._prompt_string + "-> " if len(expression) == 0 else "... "
- # We create the prompt manually instead of using get_input,
- # so we can use the cli in the _on_stop method
- eventloop = create_eventloop()
- self._cli = CommandLineInterface(
- application=create_prompt_application(prompt, history=self._history, completer=self._completer, lexer=JavascriptLexer),
- eventloop=eventloop,
- output=create_output())
- try:
- line = None
- document = self._cli.run()
- if document:
- line = document.text
- finally:
- eventloop.close()
- except EOFError:
- # An extra newline after EOF to exit the REPL cleanly
- self._print("\nThank you for using Frida!")
- return
- except KeyboardInterrupt:
- line = ""
- continue
- if len(line.strip()) > 0:
- if len(expression) > 0:
- expression += "\n"
- expression += line.rstrip("\\")
- if expression.endswith("?"):
- try:
- self._print_help(expression)
- except JavaScriptError as e:
- error = e.error
- self._print(Fore.RED + Style.BRIGHT + error['name'] + Style.RESET_ALL + ": " + error['message'])
- except frida.InvalidOperationError:
- return
- elif expression.startswith("%"):
- self._do_magic(expression[1:].rstrip())
- elif expression in ("exit", "quit", "q"):
- self._print("Thank you for using Frida!")
- return
- elif expression == "help":
- self._print("Help: #TODO :)")
- else:
- self._eval_and_print(expression)
- def _eval_and_print(self, expression):
- try:
- (t, value) = self._evaluate(expression)
- if t in ('function', 'undefined', 'null'):
- output = t
- elif t == 'binary':
- output = hexdump(value).rstrip("\n")
- else:
- output = json.dumps(value, sort_keys=True, indent=4, separators=(",", ": "))
- except JavaScriptError as e:
- error = e.error
- output = Fore.RED + Style.BRIGHT + error['name'] + Style.RESET_ALL + ": " + error['message']
- except frida.InvalidOperationError:
- return
- self._print(output)
- def _print_startup_message(self):
- self._print("""\
- ____
- / _ | Frida {version} - A world-class dynamic instrumentation framework
- | (_| |
- > _ | Commands:
- /_/ |_| help -> Displays the help system
- . . . . object? -> Display information about 'object'
- . . . . exit/quit -> Exit
- . . . .
- . . . . More info at http://www.frida.re/docs/home/""".format(version=frida.__version__))
- def _print_help(self, expression):
- # TODO: Figure out docstrings and implement here. This is real jankaty right now.
- help_text = ""
- if expression.endswith(".?"):
- expression = expression[:-2] + "?"
- obj_to_identify = [x for x in expression.split(' ') if x.endswith("?")][0][:-1]
- (obj_type, obj_value) = self._evaluate(obj_to_identify)
- if obj_type == "function":
- signature = self._evaluate("%s.toString()" % obj_to_identify)[1]
- clean_signature = signature.split("{")[0][:-1].split('function ')[-1]
- if "[native code]" in signature:
- help_text += "Type: Function (native)\n"
- else:
- help_text += "Type: Function\n"
- help_text += "Signature: %s\n" % clean_signature
- help_text += "Docstring: #TODO :)"
- elif obj_type == "object":
- help_text += "Type: Object\n"
- help_text += "Docstring: #TODO :)"
- elif obj_type == "boolean":
- help_text += "Type: Boolean\n"
- help_text += "Docstring: #TODO :)"
- elif obj_type == "string":
- help_text += "Type: Boolean\n"
- help_text += "Text: %s\n" % self._evaluate("%s.toString()" % obj_to_identify)[1]
- help_text += "Docstring: #TODO :)"
- self._print(help_text)
- # Negative means at least abs(val) - 1
- _magic_command_args = {
- 'resume': 0,
- 'load': 1,
- 'reload': 0,
- 'unload': 0,
- 'time': -2 # At least 1 arg
- }
- def _do_magic(self, statement):
- tokens = statement.split(" ")
- command = tokens[0]
- args = tokens[1:]
- required_args = self._magic_command_args.get(command)
- if required_args == None:
- self._print("Unknown command: {}".format(command))
- self._print("Valid commands: {}".format(", ".join(self._magic_command_args.keys())))
- return
- atleast_args = False
- if required_args < 0:
- atleast_args = True
- required_args = abs(required_args) - 1
- if (not atleast_args and len(args) != required_args) or \
- (atleast_args and len(args) < required_args):
- self._print("{cmd} command expects {atleast}{n} argument{s}".format(
- cmd=command, atleast='atleast ' if atleast_args else '', n=required_args, s='' if required_args == 1 else ' '))
- return
- if command == 'resume':
- self._reactor.schedule(lambda: self._resume())
- elif command == 'load':
- old_user_script = self._user_script
- self._user_script = os.path.abspath(args[0])
- if not self._reload():
- self._user_script = old_user_script
- elif command == 'reload':
- self._reload()
- elif command == 'unload':
- self._user_script = None
- self._reload()
- elif command == 'time':
- self._eval_and_print('''
- (function () {
- var _startTime = Date.now();
- var _result = eval({expression});
- var _endTime = Date.now();
- console.log('Time: ' + (_endTime - _startTime).toLocaleString() + ' ms.');
- return _result;
- })();'''.format(expression=json.dumps(' '.join(args))))
- def _reload(self):
- completed = threading.Event()
- result = [None]
- def do_reload():
- try:
- self._load_script()
- except Exception as e:
- result[0] = e
- completed.set()
- self._reactor.schedule(do_reload)
- completed.wait()
- if result[0] is None:
- return True
- else:
- self._print("Failed to load script: {error}".format(error=result[0]))
- return False
- def _create_prompt(self):
- device_type = self._device.type
- type_name = self._target[0]
- if self._target[0] == 'pid' and self._target[1] == 0:
- target = 'System'
- else:
- target = self._target[1]
- if device_type in ('local', 'remote'):
- if self._target[0] == 'name':
- type_name = "ProcName"
- elif self._target[0] == 'pid':
- type_name = "PID"
- prompt_string = "%s::%s::%s" % (device_type.title(), type_name, target)
- else:
- prompt_string = "%s::%s::%s" % ("USB", self._device.name, target)
- return prompt_string
- def _evaluate(self, text):
- result = self._script.exports.evaluate(text)
- if is_byte_array(result):
- return ('binary', result)
- elif isinstance(result, dict):
- return ('binary', bytes())
- elif result[0] == 'error':
- raise JavaScriptError(result[1])
- else:
- return result
- def _process_message(self, message, data):
- message_type = message['type']
- if message_type == 'error':
- text = message.get('stack', message['description'])
- self._log('error', text)
- else:
- self._print("message:", message, "data:", data)
- def _on_change(self, changed_file, other_file, event_type):
- if event_type == 'changes-done-hint':
- return
- self._last_change_id += 1
- change_id = self._last_change_id
- self._reactor.schedule(lambda: self._process_change(change_id), delay=0.05)
- def _process_change(self, change_id):
- if change_id != self._last_change_id:
- return
- try:
- self._load_script()
- except Exception as e:
- self._print("Failed to load script: {error}".format(error=e))
- def _create_repl_script(self):
- user_script = ""
- if self._user_script is not None:
- with codecs.open(self._user_script, 'rb', 'utf-8') as f:
- user_script = f.read().rstrip("\r\n") + "\n\n// Frida REPL script:\n"
- return user_script + """\
- rpc.exports.evaluate = function (expression) {
- try {
- var result = (1, eval)(expression);
- if (result instanceof ArrayBuffer) {
- return result;
- } else {
- var type = (result === null) ? 'null' : typeof result;
- return [type, result];
- }
- } catch (e) {
- return ['error', {
- name: e.name,
- message: e.message,
- stack: e.stack
- }];
- }
- };
- """
- class FridaCompleter(Completer):
- def __init__(self, repl):
- self._repl = repl
- self._lexer = JavascriptLexer()
- def get_completions(self, document, complete_event):
- prefix = document.text_before_cursor
- magic = len(prefix) > 0 and prefix[0] == '%' and not any(map(lambda c: c.isspace(), prefix))
- tokens = list(self._lexer.get_tokens(prefix))[:-1]
- # 0.toString() is invalid syntax,
- # but pygments doesn't seem to know that
- for i in range(len(tokens) - 1):
- if tokens[i][0] == Token.Literal.Number.Integer \
- and tokens[i + 1][0] == Token.Punctuation and tokens[i + 1][1] == '.':
- tokens[i] = (Token.Literal.Number.Float, tokens[i][1] + tokens[i + 1][1])
- del tokens[i + 1]
- before_dot = ''
- after_dot = ''
- encountered_dot = False
- for t in tokens[::-1]:
- if t[0] in Token.Name.subtypes:
- before_dot = t[1] + before_dot
- elif t[0] == Token.Punctuation and t[1] == '.':
- before_dot = '.' + before_dot
- if not encountered_dot:
- encountered_dot = True
- after_dot = before_dot[1:]
- before_dot = ''
- else:
- if encountered_dot:
- # The value/contents of the string, number or array doesn't matter,
- # so we just use the simplest value with that type
- if t[0] in Token.Literal.String.subtypes:
- before_dot = '""' + before_dot
- elif t[0] in Token.Literal.Number.subtypes:
- before_dot = '0.0' + before_dot
- elif t[0] == Token.Punctuation and t[1] == ']':
- before_dot = '[]' + before_dot
- break
- try:
- if encountered_dot:
- for key in self._get_keys("""try {
- (function (o) {
- "use strict";
- var k = Object.getOwnPropertyNames(o);
- if (o !== null && o !== undefined) {
- var p;
- if (typeof o !== 'object')
- p = o.__proto__;
- else
- p = Object.getPrototypeOf(o);
- if (p !== null && p !== undefined)
- k = k.concat(Object.getOwnPropertyNames(p));
- }
- return k;
- })(""" + before_dot + """);
- } catch (e) {
- [];
- }"""):
- if self._pattern_matches(after_dot, key):
- yield Completion(key, -len(after_dot))
- else:
- if magic:
- keys = self._repl._magic_command_args.keys()
- else:
- keys = self._get_keys("Object.getOwnPropertyNames(this)")
- for key in keys:
- if not self._pattern_matches(before_dot, key) or (key.startswith('_') and before_dot == ''):
- continue
- yield Completion(key, -len(before_dot))
- except frida.InvalidOperationError:
- pass
- except Exception as e:
- self._repl._print(e)
- def _get_keys(self, code):
- return sorted(
- filter(self._is_valid_name,
- set(self._repl._evaluate(code)[1])))
- def _is_valid_name(self, name):
- tokens = list(self._lexer.get_tokens(name))
- return len(tokens) == 2 and tokens[0][0] in Token.Name.subtypes
- def _pattern_matches(self, pattern, text):
- return re.search(re.escape(pattern), text, re.IGNORECASE) != None
- def hexdump(src, length=16):
- try:
- xrange
- except NameError:
- xrange = range
- FILTER = "".join([(len(repr(chr(x))) == 3) and chr(x) or "." for x in range(256)])
- lines = []
- for c in xrange(0, len(src), length):
- chars = src[c:c + length]
- hex = " ".join(["%02x" % x for x in iterbytes(chars)])
- printable = ''.join(["%s" % ((x <= 127 and FILTER[x]) or ".") for x in iterbytes(chars)])
- lines.append("%04x %-*s %s\n" % (c, length * 3, hex, printable))
- return "".join(lines)
- def is_byte_array(value):
- if sys.version_info[0] >= 3:
- return isinstance(value, bytes)
- else:
- return isinstance(value, str)
- if sys.version_info[0] >= 3:
- iterbytes = lambda x: iter(x)
- else:
- def iterbytes(data):
- return (ord(char) for char in data)
- app = REPLApplication()
- app.run()
- class JavaScriptError(Exception):
- def __init__(self, error):
- super(JavaScriptError, self).__init__(error['message'])
- self.error = error
- if __name__ == '__main__':
- main()
|