From 373a1158c250b5020b77a1e379f9c0b7c60fe445 Mon Sep 17 00:00:00 2001 From: HappyZ Date: Fri, 16 Nov 2018 04:55:05 -0600 Subject: [PATCH] add support of diagnosis/serial interaction without unix shell added file transferring (very slow method); shall enable ssh --- dpt-tools.py | 43 ++++-- python_api/libDPT.py | 262 +++++++++++++++++----------------- python_api/libInteractive.py | 265 +++++++++++++++++++++++++++++++++++ 3 files changed, 430 insertions(+), 140 deletions(-) create mode 100644 python_api/libInteractive.py diff --git a/dpt-tools.py b/dpt-tools.py index c806162..a2df522 100644 --- a/dpt-tools.py +++ b/dpt-tools.py @@ -6,25 +6,34 @@ import argparse # lib from python_api.libDPT import DPT -from python_api.libDPT import update_firmware -from python_api.libDPT import obtain_diagnosis_access +from python_api.libInteractive import diagnosis_mode +from python_api.libInteractive import update_firmware +from python_api.libInteractive import obtain_diagnosis_access def print_info(): - print("""Thanks for using DPT Tools. -Type `help` to show this message. - + print("""=========== + DPT Tools +=========== +Thanks for using DPT Tools. Type `help` to show this message. Supported commands: - fw -- update firmware - root (thanks to shankerzhiwu and his/her anoymous friend) -- obtain root access + fw -- update firmware + root -- obtain root access (thanks to shankerzhiwu and his/her anoymous friend) + diagnosis -- enter diagnosis mode (after you get root access) + exit/quit -- leave the tool """) -def interactive(dpt): +def interactive(dpt, diagnosis=False): ''' interactive shell to run commands + @param dpt: DPT object + @param diagnosis: if set True, will directly enter diagnosis mode ''' firstTime = True + if diagnosis: + diagnosis_mode(dpt) + return while(1): if firstTime: print_info() @@ -48,6 +57,8 @@ def interactive(dpt): update_firmware(dpt) elif cmd == 'help' or cmd == 'h': print_info() + elif cmd == 'diagnosis': + diagnosis_mode(dpt) def main(): @@ -71,6 +82,10 @@ def main(): dest="dpt_addr", default=None, help="Hostname or IP address of the device") + p.add_argument( + '--diagnosis', + action='store_true', + help="Run diagnosis mode directly") p.add_argument( '--debug', '-d', action='store_true', @@ -83,11 +98,17 @@ def main(): sys.exit() dpt = DPT(args.get('apt_addr', None), args.get('debug', False)) - if not dpt.authenticate(args.get('dpt_id', ""), args.get('dpt_key', "")): - dpt.err_print("Cannot authenticate. Make sure your id, key, and ip addresses are correct.") + if ( + not args.get('diagnosis', False) and + not dpt.authenticate(args.get('dpt_id', ""), args.get('dpt_key', "")) + ): + dpt.err_print( + "Cannot authenticate. " + + "Make sure your id, key, and ip addresses are correct." + ) exit(1) - interactive(dpt) + interactive(dpt, diagnosis=args.get('diagnosis', False)) if __name__ == '__main__': diff --git a/python_api/libDPT.py b/python_api/libDPT.py index 1c89fe9..6f87c5f 100644 --- a/python_api/libDPT.py +++ b/python_api/libDPT.py @@ -1,7 +1,9 @@ #!/usr/bin/python3 -# builtins +# built-ins import os +import time +import serial import base64 import httpsig import urllib3 @@ -13,116 +15,6 @@ from urllib.parse import quote_plus urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) -def validateRequiredFiles(dpt): - requiredFiles = [ - 'python_api/shankerzhiwu_disableidcheck.pkg', - 'python_api/shankerzhiwu_changepwd.pkg' - ] - dpt.dbg_print('Checking required files...') - for file in requiredFiles: - if not os.path.isfile(file): - dpt.err_print('File {0} does not exist!'.format(file)) - return False - return True - - -def disable_id_check(dpt): - ''' - disable the id check (thanks to shankerzhiwu and his/her friend) - ''' - fp = 'python_api/shankerzhiwu_disableidcheck.pkg' - try: - resp = input('>>> Have you disabled the id check already? [yes/no]: ') - if resp == 'no': - if not dpt.update_firmware(open(fp, 'rb')): - dpt.err_print('Failed to upload shankerzhiwu_disableidcheck pkg') - return False - try: - input( - '>>> Press `Enter` key to continue after your DPT reboot, ' + - 'shows `update failure` message, and connects back to WiFi: ') - except BaseException as e: - dpt.err_print(str(e)) - return False - return True - elif resp == 'yes': - return True - else: - dpt.err_print('Unrecognized response: {}'.format(resp)) - return False - except BaseException as e: - dpt.err_print(str(e)) - return False - - -def reset_root_password(dpt): - ''' - reset the root password (thanks to shankerzhiwu and his/her friend) - ''' - fp = 'python_api/shankerzhiwu_changepwd.pkg' - try: - if not dpt.update_firmware(open(fp, 'rb')): - dpt.err_print('Failed to upload shankerzhiwu_changepwd pkg') - return False - return True - except BaseException as e: - dpt.err_print(str(e)) - return False - - -def obtain_diagnosis_access(dpt): - ''' - root thanks to shankerzhiwu - ''' - dpt.info_print( - 'Please make sure you have charged your battery before this action.') - dpt.info_print( - 'Thank shankerzhiwu (and his/her anonymous friend) a lot on this hack!!!' + - 'All credits go to him (and his/her anonymous friend)!') - if not validateRequiredFiles(dpt): - return False - # step 1: disable the id check - if not disable_id_check(dpt): - return False - dpt.info_print('Congrats! You are half-way through! You have disabled the OTG ID check') - # step 2: reset root password - if not reset_root_password(dpt): - return False - dpt.info_print( - 'You are all set! Wait till your DPT reboots and ' + - 'shows `update failure` message! More edits will be added to this tool.') - return True - - -def update_firmware(dpt): - ''' - update firmware interface - ''' - dpt.info_print( - 'Please make sure you have charged your battery before this action.') - try: - resp = input('>>> Please enter the pkg file path: ') - if not os.path.isfile(resp): - dpt.err_print('File `{}` does not exist!'.format(resp)) - return False - resp2 = input('>>> Pleae confirm {} is the pkg file to use [yes/no]: ') - if resp2 == 'yes': - if not dpt.update_firmware(open(resp, 'rb')): - dpt.err_print('Failed to upload pkg {}'.format(resp)) - return False - dpt.info_print('Success!') - return True - elif resp == 'no': - dpt.info_print('Okay!') - return False - else: - dpt.err_print('Unrecognized response: {}'.format(resp)) - return False - except BaseException as e: - dpt.err_print(str(e)) - return False - - class DPT(): def __init__(self, addr=None, debug=False): ''' @@ -141,8 +33,117 @@ class DPT(): self.cookies = {} # setup base url self.base_url = "https://{0}:8443".format(self.addr) + # holder of diagnosis serial + self.serial = None + self.serialReadTimeout = 1 # default read timeout is 1sec - def runCmd(self): + ''' + diagnosis mode related + ''' + + def connect_to_diagnosis(self, ttyName): + ''' + connect to diagnosis + ''' + try: + ser = serial.Serial(ttyName, 115200, timeout=self.serialReadTimeout) + # ser.open() + if not ser.is_open: + raise BaseException + self.serial = ser + except BaseException as e: + self.err_print( + "Cannot open serial port {0} due to {1}" + .format(ttyName, str(e))) + return False + return True + + def diagnosis_login(self, username, password): + ''' + login onto DPT diagnosis mode + ''' + if self.serial is None: + return False + try: + self.serial.write(b'\n') # poke + resp = self.serial.read(50) + self.dbg_print(resp) + if b'login' in resp: + self.dbg_print('Entering username {}'.format(username)) + self.serial.write(username.encode() + b'\n') + resp = self.serial.read(50) + self.dbg_print(resp) + if b'Password' in resp: + self.dbg_print('Entering password {}'.format(password)) + self.serial.write(password.encode() + b'\n') + resp = self.serial.read(80) + self.dbg_print(resp) + except serial.SerialTimeoutException as e: + self.err_print('Timeout: {}'.format(e)) + except BaseException as e: + self.err_print(str(e)) + return False + if b'# ' in resp: + return True + return False + + def diagnosis_isfile(self, fp): + ''' + check if file exists given file path + ''' + cmd = "[ -f {} ] && echo 'YESS' || echo 'NONO'".format(fp) + return 'YESS' in self.diagnosis_write(cmd) + + def diagnosis_isfolder(self, folderp): + ''' + check if file exists given file path + ''' + cmd = "[ -d {} ] && echo 'YESS' || echo 'NONO'".format(folderp) + return 'YESS' in self.diagnosis_write(cmd) + + def diagnosis_write(self, cmd, echo=False): + ''' + write cmd and read feedbacks + ''' + if self.serial is None: + return "" + if 'less ' in cmd: + self.err_print('do not support less/more') + try: + self.serial.write(cmd.encode() + b'\n') + # change timeout to (nearly) blocking first to read + self.serial.timeout = 99 + resp = self.serial.read_until(b'# ') + # change back the original timeout + self.serial.timeout = self.serialReadTimeout + self.dbg_print(resp) + except serial.SerialTimeoutException as e: + self.err_print('Timeout: {}'.format(e)) + except BaseException as e: + self.err_print(str(e)) + return "" + if echo: + return resp.decode("utf-8").replace('\r\r\n', '') + return resp.decode("utf-8").replace('\r\r\n', '').replace(cmd, '') + + + def shut_down_diagnosis(self): + ''' + close serial connection + ''' + if self.serial is not None: + try: + self.serial.close() + except BaseException as e: + self.err_print('Cannot close serial port') + return False + return True + + ''' + Web interface related + ''' + + def run_cmd(self): # self._put_api_with_cookies( # "/notify/login_result", data={"value": "this is a test"} # ) @@ -150,8 +151,8 @@ class DPT(): # if folder_id: # self.delete_folder(folder_id, force=False) # self._get_api_with_cookies("/folders2/c18c51bb-4323-4e9c-b874-40288e20227b") - self._get_testmode_auth_nonce("testmode") - self._get_api_with_cookies("/testmode/auth/nonce") + # self._get_testmode_auth_nonce("testmode") + # self._get_api_with_cookies("/testmode/auth/nonce") pass def commands_need_testmode_authentication(self): @@ -451,21 +452,24 @@ class DPT(): ok_code=204, cookies=None, data={}, files={}, headers={} ): apiurl = "{0}{1}".format(self.base_url, api) - r = requests.put( - apiurl, - cookies=cookies, verify=False, - json=data, files=files, headers=headers) - if r.status_code is ok_code: - self.dbg_print(apiurl) - if r.text: - self.dbg_print(r.json()) - return r.json() - return {"status": "ok"} - self.err_print(apiurl) try: - self.err_request_print(r.status_code, r.json()) - except BaseException: - self.err_print("{}, {}".format(r.status_code, r.text)) + r = requests.put( + apiurl, + cookies=cookies, verify=False, + json=data, files=files, headers=headers) + if r.status_code is ok_code: + self.dbg_print(apiurl) + if r.text: + self.dbg_print(r.json()) + return r.json() + return {"status": "ok"} + self.err_print(apiurl) + try: + self.err_request_print(r.status_code, r.json()) + except BaseException: + self.err_print("{}, {}".format(r.status_code, r.text)) + except BaseException as e: + self.err_print(str(e)) return {} def _put_api_with_cookies(self, api, ok_code=204, data={}, files={}): @@ -568,4 +572,4 @@ if __name__ == '__main__': # f.write(dpt.get_past_logs()) if args['dpt_fwfh'] and os.path.isfile(args['dpt_fwfh']): dpt.update_firmware(open(args['dpt_fwfh'], 'rb')) - # dpt.runCmd() \ No newline at end of file + # dpt.run_cmd() \ No newline at end of file diff --git a/python_api/libInteractive.py b/python_api/libInteractive.py new file mode 100644 index 0000000..82ca35e --- /dev/null +++ b/python_api/libInteractive.py @@ -0,0 +1,265 @@ +#!/usr/bin/python3 + +# built-ins +import os +import time +# import traceback + + +def validate_required_files(dpt): + requiredFiles = [ + 'python_api/shankerzhiwu_disableidcheck.pkg', + 'python_api/shankerzhiwu_changepwd.pkg' + ] + dpt.dbg_print('Checking required files...') + for file in requiredFiles: + if not os.path.isfile(file): + dpt.err_print('File {0} does not exist!'.format(file)) + return False + return True + + +def disable_id_check(dpt): + ''' + disable the id check (thanks to shankerzhiwu and his/her friend) + ''' + fp = 'python_api/shankerzhiwu_disableidcheck.pkg' + try: + resp = input('>>> Have you disabled the id check already? [yes/no]: ') + if resp == 'no': + if not dpt.update_firmware(open(fp, 'rb')): + dpt.err_print('Failed to upload shankerzhiwu_disableidcheck pkg') + return False + try: + input( + '>>> Press `Enter` key to continue after your DPT reboot, ' + + 'shows `update failure` message, and connects back to WiFi: ') + except BaseException as e: + dpt.err_print(str(e)) + return False + return True + elif resp == 'yes': + return True + else: + dpt.err_print('Unrecognized response: {}'.format(resp)) + except BaseException as e: + dpt.err_print(str(e)) + return False + + +def reset_root_password(dpt): + ''' + reset the root password (thanks to shankerzhiwu and his/her friend) + ''' + fp = 'python_api/shankerzhiwu_changepwd.pkg' + try: + if not dpt.update_firmware(open(fp, 'rb')): + dpt.err_print('Failed to upload shankerzhiwu_changepwd pkg') + return False + return True + except BaseException as e: + dpt.err_print(str(e)) + return False + + +def obtain_diagnosis_access(dpt): + ''' + root thanks to shankerzhiwu + ''' + dpt.info_print( + 'Please make sure you have charged your battery before this action.') + dpt.info_print( + 'Thank shankerzhiwu (and his/her anonymous friend) a lot on this hack!!!' + + 'All credits go to him (and his/her anonymous friend)!') + if not validate_required_files(dpt): + return False + # step 1: disable the id check + if not disable_id_check(dpt): + return False + dpt.info_print('Congrats! You are half-way through! You have disabled the OTG ID check') + # step 2: reset root password + if not reset_root_password(dpt): + return False + dpt.info_print( + 'You are all set! Wait till your DPT reboots and ' + + 'shows `update failure` message! More edits will be added to this tool.') + return True + + +def update_firmware(dpt): + ''' + update firmware interface + ''' + dpt.info_print( + 'Please make sure you have charged your battery before this action.') + try: + resp = input('>>> Please enter the pkg file path: ') + if not os.path.isfile(resp): + dpt.err_print('File `{}` does not exist!'.format(resp)) + return False + resp2 = input('>>> Pleae confirm {} is the pkg file to use [yes/no]: ') + if resp2 == 'yes': + if not dpt.update_firmware(open(resp, 'rb')): + dpt.err_print('Failed to upload pkg {}'.format(resp)) + return False + dpt.info_print('Success!') + return True + elif resp == 'no': + dpt.info_print('Okay!') + else: + dpt.err_print('Unrecognized response: {}'.format(resp)) + except BaseException as e: + dpt.err_print(str(e)) + return False + + +def print_diagnosis_info(): + print("""============================ + DPT Tools - Diagnosis Mode +============================ +This is diagnosis mode. Type `help` to show this message. +It behaves similarly to regular serial session with less flexibility (cannot use tab, scroll up, quick reverse search, etc.). +This mode intends to automate some complicated procedures. + +Supported commands: + `transfer-file` -- transfer file to DPT at 512bps + `exit`/`quit` -- leave the tool + and many unix cmds (do not support less/head) +""") + + +def diagnosis_transfer_file(dpt, chunkSize=128): + ''' + transfer file through echo in diagnosis (serial) mode + using echo is dumb and slow but very reliable + limited to 128 bytes per sec since we send raw bytes in + string (each byte sent = 4 bytes), and terminal at best + allows 1024 bytes to send + do NOT transfer large file using this, it will take + forever to finish.. + ''' + try: + # get local file path + localfp = input('> Local file path: ') + while localfp[-1] == ' ': # remove extra spaces + localfp = localfp[:-1] + if not os.path.isfile(localfp): + dpt.err_print('File {} does not exist!'.format(localfp)) + return False + # get remote folder and validate it + folder = input('> DPT folder path: ') + # folder does not exit, create one? + if not dpt.diagnosis_isfolder(folder): + resp = input('> {} not exist, create? [yes/no]: '.format(folder)) + if resp == 'no': + return False + elif resp == 'yes': + dpt.diagnosis_write('mkdir -p {}'.format(folder)) + else: + dpt.err_print('Unrecognized input {}'.format(resp)) + return False + # remote file exists, overwrite it? + remotefp = "{0}/{1}".format(folder, os.path.basename(localfp)) + if dpt.diagnosis_isfile(remotefp): + resp = input('> {} exist, overwrite? [yes/no]: '.format(remotefp)) + if resp == 'no': + return False + elif not resp == 'yes': + dpt.err_print('Unrecognized input {}'.format(resp)) + return False + firstRun = True + symbol = '>' + startTime = int(time.time() * 1000) + with open(localfp, 'rb') as f: + while 1: + chunk = f.read(chunkSize) + if chunk: + cmd = "echo -e -n '\\x{0}' {1} {2}".format( + '\\x'.join('{:02x}'.format(x) for x in chunk), + symbol, + remotefp + ) + dpt.diagnosis_write(cmd) + else: + break + if firstRun: + symbol = '>>' + firstRun = False + duration = int(time.time() * 1000) - startTime + dpt.info_print('Finished in {0:.2f}sec'.format(duration / 1000.0)) + except BaseException as e: + dpt.err_print(str(e)) + return False + + +def diagnosis_cmd(dpt): + ''' + run commands in diagnosis mode + ''' + # login + if not dpt.diagnosis_login(username='root', password='12345'): + dpt.err_print('failed to login..') + return + # interactive mode + firstTime = True + frontLine = 'root #: ' + while 1: + if firstTime: + print_diagnosis_info() + firstTime = False + try: + cmd = input(frontLine) + if cmd == 'exit' or cmd == 'quit': + break + elif cmd == 'help': + print_diagnosis_info() + continue + elif cmd == 'transfer-file': + diagnosis_transfer_file(dpt) + continue + rawresp = dpt.diagnosis_write(cmd) + # ignore first and last echos + tmp = rawresp.splitlines() + frontLine = tmp[-1] + resp = tmp[1:-1] + for line in resp: + print(line) + except KeyboardInterrupt: + break + except EOFError: + break + except BaseException as e: + dpt.err_print(str(e)) + + +def diagnosis_mode(dpt): + ''' + enter diagnosis mode + ''' + dpt.info_print('Steps to enter diagnosis mode:') + dpt.info_print('1. Turn of DPT') + dpt.info_print('2. Hold HOME button') + dpt.info_print('3. Press POWER button once. Then light blinks yellow') + dpt.info_print('4. Wait till a black square appear on the screen') + dpt.info_print('5. Connect to computer') + try: + resp = input('>>> Black square on the screen? [yes/no]: ') + if resp == 'no': + return False + elif not resp == 'yes': + dpt.err_print('Unrecognized response: {}'.format(resp)) + return False + ttyName = input('>>> Enter the serial port [/dev/tty.usbmodem01]: ') + if ttyName == "": + ttyName = "/dev/tty.usbmodem01" + if not os.path.exists(ttyName): + dpt.err_print('serial `{}` not exists!'.format(ttyName)) + return False + except BaseException as e: + dpt.err_print(str(e)) + return False + if not dpt.connect_to_diagnosis(ttyName): + return False + diagnosis_cmd(dpt) + dpt.shut_down_diagnosis() + return True