| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403 |
- # -*- coding: utf-8 -*-
- from __future__ import unicode_literals, print_function
- import collections
- import errno
- from optparse import OptionParser
- import os
- import platform
- import select
- import sys
- import threading
- import time
- import colorama
- from colorama import Fore, Style
- import frida
- if platform.system() == 'Windows':
- import msvcrt
- def input_with_timeout(timeout):
- if platform.system() == 'Windows':
- start_time = time.time()
- s = ''
- while True:
- while msvcrt.kbhit():
- c = msvcrt.getche()
- if ord(c) == 13: # enter_key
- break
- elif ord(c) >= 32: #space_char
- s += c
- if time.time() - start_time > timeout:
- return None
- return s
- else:
- while True:
- try:
- rlist, _, _ = select.select([sys.stdin], [], [], timeout)
- break
- except (OSError, select.error) as e:
- if e.args[0] != errno.EINTR:
- raise e
- if rlist:
- return sys.stdin.readline()
- else:
- return None
- def await_enter(reactor):
- try:
- while input_with_timeout(0.5) == None:
- if not reactor.is_running():
- break
- except KeyboardInterrupt:
- print('')
- class ConsoleState:
- EMPTY = 1
- STATUS = 2
- TEXT = 3
- class ConsoleApplication(object):
- def __init__(self, run_until_return=await_enter, on_stop=None):
- colorama.init()
- parser = OptionParser(usage=self._usage(), version=frida.__version__)
- parser.add_option("-D", "--device", help="connect to device with the given ID",
- metavar="ID", type='string', action='store', dest="device_id", default=None)
- parser.add_option("-U", "--usb", help="connect to USB device",
- action='store_const', const='tether', dest="device_type", default=None)
- parser.add_option("-R", "--remote", help="connect to remote frida-server",
- action='store_const', const='remote', dest="device_type", default=None)
- parser.add_option("-H", "--host", help="connect to remote frida-server on HOST",
- metavar="HOST", type='string', action='store', dest="host", default=None)
- if self._needs_target():
- def store_target(option, opt_str, target_value, parser, target_type, *args, **kwargs):
- if target_type == 'file':
- target_value = [target_value]
- setattr(parser.values, 'target', (target_type, target_value))
- parser.add_option("-f", "--file", help="spawn FILE", metavar="FILE",
- type='string', action='callback', callback=store_target, callback_args=('file',))
- parser.add_option("-n", "--attach-name", help="attach to NAME", metavar="NAME",
- type='string', action='callback', callback=store_target, callback_args=('name',))
- parser.add_option("-p", "--attach-pid", help="attach to PID", metavar="PID",
- type='int', action='callback', callback=store_target, callback_args=('pid',))
- parser.add_option("--debug", help="enable the Node.js compatible script debugger",
- action='store_true', dest="enable_debugger", default=False)
- parser.add_option("--disable-jit", help="disable JIT",
- action='store_true', dest="disable_jit", default=False)
- self._add_options(parser)
- (options, args) = parser.parse_args()
- if sys.version_info[0] < 3:
- input_encoding = sys.stdin.encoding or 'UTF-8'
- args = [arg.decode(input_encoding) for arg in args]
- self._device_id = options.device_id
- self._device_type = options.device_type
- self._host = options.host
- self._device = None
- self._schedule_on_output = lambda pid, fd, data: self._reactor.schedule(lambda: self._on_output(pid, fd, data))
- self._schedule_on_device_lost = lambda: self._reactor.schedule(self._on_device_lost)
- self._spawned_pid = None
- self._spawned_argv = None
- self._session = None
- if self._needs_target():
- self._enable_debugger = options.enable_debugger
- self._disable_jit = options.disable_jit
- else:
- self._enable_debugger = False
- self._disable_jit = False
- self._schedule_on_session_detached = lambda: self._reactor.schedule(self._on_session_detached)
- self._started = False
- self._resumed = False
- self._reactor = Reactor(run_until_return, on_stop)
- self._exit_status = None
- self._console_state = ConsoleState.EMPTY
- if sum(map(lambda v: int(v is not None), (self._device_id, self._device_type, self._host))) > 1:
- parser.error("Only one of -D, -U, -R, and -H may be specified")
- if self._needs_target():
- target = getattr(options, 'target', None)
- if target is None:
- if len(args) < 1:
- parser.error("target file, process name or pid must be specified")
- target = infer_target(args[0])
- args.pop(0)
- target = expand_target(target)
- if target[0] == 'file':
- argv = target[1]
- argv.extend(args)
- args = []
- self._target = target
- else:
- self._target = None
- self._initialize(parser, options, args)
- def run(self):
- mgr = frida.get_device_manager()
- on_devices_changed = lambda: self._reactor.schedule(self._try_start)
- mgr.on('changed', on_devices_changed)
- self._reactor.schedule(self._try_start)
- self._reactor.schedule(self._show_message_if_no_device, delay=0.1)
- self._reactor.run()
- if self._started:
- self._stop()
- if self._session is not None:
- self._session.off('detached', self._schedule_on_session_detached)
- self._session.detach()
- self._session = None
- if self._spawned_pid is not None:
- try:
- self._device.kill(self._spawned_pid)
- except:
- pass
- if self._device is not None:
- self._device.off('output', self._schedule_on_output)
- self._device.off('lost', self._schedule_on_device_lost)
- mgr.off('changed', on_devices_changed)
- frida.shutdown()
- sys.exit(self._exit_status)
- def _add_options(self, parser):
- pass
- def _initialize(self, parser, options, args):
- pass
- def _needs_target(self):
- return False
- def _start(self):
- pass
- def _stop(self):
- pass
- def _resume(self):
- if self._resumed:
- return
- if self._spawned_pid is not None:
- self._device.resume(self._spawned_pid)
- self._resumed = True
- def _exit(self, exit_status):
- self._exit_status = exit_status
- self._reactor.stop()
- def _try_start(self):
- if self._device is not None:
- return
- if self._device_id is not None:
- try:
- self._device = frida.get_device(self._device_id)
- except:
- self._update_status("Device '%s' not found" % self._device_id)
- self._exit(1)
- return
- elif self._device_type is not None:
- self._device = find_device(self._device_type)
- if self._device is None:
- return
- elif self._host is not None:
- self._device = frida.get_device_manager().add_remote_device(self._host)
- else:
- self._device = frida.get_local_device()
- self._device.on('output', self._schedule_on_output)
- self._device.on('lost', self._schedule_on_device_lost)
- if self._target is not None:
- spawning = True
- try:
- target_type, target_value = self._target
- if target_type == 'file':
- argv = target_value
- self._update_status("Spawning `%s`..." % " ".join(argv))
- self._spawned_pid = self._device.spawn(argv)
- self._spawned_argv = argv
- attach_target = self._spawned_pid
- else:
- attach_target = target_value
- self._update_status("Attaching...")
- spawning = False
- self._session = self._device.attach(attach_target)
- if self._disable_jit:
- self._session.disable_jit()
- if self._enable_debugger:
- self._session.enable_debugger()
- self._print("Debugger listening on port 5858\n")
- self._session.on('detached', self._schedule_on_session_detached)
- except Exception as e:
- if spawning:
- self._update_status("Failed to spawn: %s" % e)
- else:
- self._update_status("Failed to attach: %s" % e)
- self._exit(1)
- return
- self._start()
- self._started = True
- def _show_message_if_no_device(self):
- if self._device is None:
- self._print("Waiting for USB device to appear...")
- def _on_output(self, pid, fd, data):
- if data is None:
- return
- if fd == 1:
- prefix = "stdout> "
- stream = sys.stdout
- else:
- prefix = "stderr> "
- stream = sys.stderr
- encoding = stream.encoding or 'UTF-8'
- text = data.decode(encoding, errors='replace')
- if text.endswith("\n"):
- text = text[:-1]
- lines = text.split("\n")
- self._print(prefix + ("\n" + prefix).join(lines))
- def _on_device_lost(self):
- if self._exit_status is not None:
- return
- self._print("Device disconnected.")
- self._exit(1)
- def _on_session_detached(self):
- self._print("Target process terminated.")
- self._exit(1)
- def _clear_status(self):
- if self._console_state == ConsoleState.STATUS:
- print("\033[A" + (80 * " "))
- def _update_status(self, message):
- if self._console_state == ConsoleState.STATUS:
- cursor_position = "\033[A"
- else:
- cursor_position = ""
- print("%-80s" % (cursor_position + Style.BRIGHT + message + Style.RESET_ALL,))
- self._console_state = ConsoleState.STATUS
- def _print(self, *args, **kwargs):
- encoded_args = []
- if sys.version_info[0] >= 3:
- string_type = str
- decoder = "unicode-escape"
- else:
- string_type = unicode
- decoder = "string-escape"
- encoding = sys.stdout.encoding or 'UTF-8'
- for arg in args:
- if isinstance(arg, string_type):
- encoded_args.append(arg.encode(encoding, errors='replace').decode(decoder))
- else:
- encoded_args.append(arg)
- print(*encoded_args, **kwargs)
- self._console_state = ConsoleState.TEXT
- def _log(self, level, text):
- if level == 'info':
- self._print(text)
- else:
- color = Fore.RED if level == 'error' else Fore.YELLOW
- self._print(color + Style.BRIGHT + text + Style.RESET_ALL)
- def find_device(type):
- for device in frida.enumerate_devices():
- if device.type == type:
- return device
- return None
- def infer_target(target_value):
- if target_value.startswith('.') or target_value.startswith(os.path.sep) \
- or (platform.system() == 'Windows' \
- and target_value[0].isalpha() \
- and target_value[1] == ":" \
- and target_value[2] == "\\"):
- target_type = 'file'
- target_value = [target_value]
- else:
- try:
- target_value = int(target_value)
- target_type = 'pid'
- except:
- target_type = 'name'
- return (target_type, target_value)
- def expand_target(target):
- target_type, target_value = target
- if target_type == 'file':
- target_value = [target_value[0]]
- return (target_type, target_value)
- class Reactor(object):
- def __init__(self, run_until_return, on_stop=None):
- self._running = False
- self._run_until_return = run_until_return
- self._on_stop = on_stop
- self._pending = collections.deque([])
- self._lock = threading.Lock()
- self._cond = threading.Condition(self._lock)
- def is_running(self):
- with self._lock:
- return self._running
- def run(self):
- with self._lock:
- self._running = True
- worker = threading.Thread(target=self._run)
- worker.start()
- self._run_until_return(self)
- self.stop()
- worker.join()
- def _run(self):
- running = True
- while running:
- now = time.time()
- work = None
- timeout = None
- with self._lock:
- for item in self._pending:
- (f, when) = item
- if now >= when:
- work = f
- self._pending.remove(item)
- break
- if len(self._pending) > 0:
- timeout = max([min(map(lambda item: item[1], self._pending)) - now, 0])
- if work is not None:
- work()
- with self._lock:
- if self._running:
- self._cond.wait(timeout)
- running = self._running
- if self._on_stop is not None:
- self._on_stop()
- def stop(self):
- with self._lock:
- self._running = False
- self._cond.notify()
- def schedule(self, f, delay=None):
- now = time.time()
- if delay is not None:
- when = now + delay
- else:
- when = now
- with self._lock:
- self._pending.append((f, when))
- self._cond.notify()
|