add support of diagnosis/serial interaction without unix shell

added file transferring (very slow method); shall enable ssh
This commit is contained in:
HappyZ 2018-11-16 04:55:05 -06:00
parent ef37294dfc
commit 373a1158c2
3 changed files with 430 additions and 140 deletions

View File

@ -6,25 +6,34 @@ import argparse
# lib
from python_api.libDPT import DPT
from python_api.libDPT import update_firmware
from python_api.libDPT import obtain_diagnosis_access
from python_api.libInteractive import diagnosis_mode
from python_api.libInteractive import update_firmware
from python_api.libInteractive import obtain_diagnosis_access
def print_info():
print("""Thanks for using DPT Tools.
Type `help` to show this message.
print("""===========
DPT Tools
===========
Thanks for using DPT Tools. Type `help` to show this message.
Supported commands:
fw -- update firmware
root (thanks to shankerzhiwu and his/her anoymous friend) -- obtain root access
fw -- update firmware
root -- obtain root access (thanks to shankerzhiwu and his/her anoymous friend)
diagnosis -- enter diagnosis mode (after you get root access)
exit/quit -- leave the tool
""")
def interactive(dpt):
def interactive(dpt, diagnosis=False):
'''
interactive shell to run commands
@param dpt: DPT object
@param diagnosis: if set True, will directly enter diagnosis mode
'''
firstTime = True
if diagnosis:
diagnosis_mode(dpt)
return
while(1):
if firstTime:
print_info()
@ -48,6 +57,8 @@ def interactive(dpt):
update_firmware(dpt)
elif cmd == 'help' or cmd == 'h':
print_info()
elif cmd == 'diagnosis':
diagnosis_mode(dpt)
def main():
@ -71,6 +82,10 @@ def main():
dest="dpt_addr",
default=None,
help="Hostname or IP address of the device")
p.add_argument(
'--diagnosis',
action='store_true',
help="Run diagnosis mode directly")
p.add_argument(
'--debug', '-d',
action='store_true',
@ -83,11 +98,17 @@ def main():
sys.exit()
dpt = DPT(args.get('apt_addr', None), args.get('debug', False))
if not dpt.authenticate(args.get('dpt_id', ""), args.get('dpt_key', "")):
dpt.err_print("Cannot authenticate. Make sure your id, key, and ip addresses are correct.")
if (
not args.get('diagnosis', False) and
not dpt.authenticate(args.get('dpt_id', ""), args.get('dpt_key', ""))
):
dpt.err_print(
"Cannot authenticate. " +
"Make sure your id, key, and ip addresses are correct."
)
exit(1)
interactive(dpt)
interactive(dpt, diagnosis=args.get('diagnosis', False))
if __name__ == '__main__':

View File

@ -1,7 +1,9 @@
#!/usr/bin/python3
# builtins
# built-ins
import os
import time
import serial
import base64
import httpsig
import urllib3
@ -13,116 +15,6 @@ from urllib.parse import quote_plus
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
def validateRequiredFiles(dpt):
requiredFiles = [
'python_api/shankerzhiwu_disableidcheck.pkg',
'python_api/shankerzhiwu_changepwd.pkg'
]
dpt.dbg_print('Checking required files...')
for file in requiredFiles:
if not os.path.isfile(file):
dpt.err_print('File {0} does not exist!'.format(file))
return False
return True
def disable_id_check(dpt):
'''
disable the id check (thanks to shankerzhiwu and his/her friend)
'''
fp = 'python_api/shankerzhiwu_disableidcheck.pkg'
try:
resp = input('>>> Have you disabled the id check already? [yes/no]: ')
if resp == 'no':
if not dpt.update_firmware(open(fp, 'rb')):
dpt.err_print('Failed to upload shankerzhiwu_disableidcheck pkg')
return False
try:
input(
'>>> Press `Enter` key to continue after your DPT reboot, ' +
'shows `update failure` message, and connects back to WiFi: ')
except BaseException as e:
dpt.err_print(str(e))
return False
return True
elif resp == 'yes':
return True
else:
dpt.err_print('Unrecognized response: {}'.format(resp))
return False
except BaseException as e:
dpt.err_print(str(e))
return False
def reset_root_password(dpt):
'''
reset the root password (thanks to shankerzhiwu and his/her friend)
'''
fp = 'python_api/shankerzhiwu_changepwd.pkg'
try:
if not dpt.update_firmware(open(fp, 'rb')):
dpt.err_print('Failed to upload shankerzhiwu_changepwd pkg')
return False
return True
except BaseException as e:
dpt.err_print(str(e))
return False
def obtain_diagnosis_access(dpt):
'''
root thanks to shankerzhiwu
'''
dpt.info_print(
'Please make sure you have charged your battery before this action.')
dpt.info_print(
'Thank shankerzhiwu (and his/her anonymous friend) a lot on this hack!!!' +
'All credits go to him (and his/her anonymous friend)!')
if not validateRequiredFiles(dpt):
return False
# step 1: disable the id check
if not disable_id_check(dpt):
return False
dpt.info_print('Congrats! You are half-way through! You have disabled the OTG ID check')
# step 2: reset root password
if not reset_root_password(dpt):
return False
dpt.info_print(
'You are all set! Wait till your DPT reboots and ' +
'shows `update failure` message! More edits will be added to this tool.')
return True
def update_firmware(dpt):
'''
update firmware interface
'''
dpt.info_print(
'Please make sure you have charged your battery before this action.')
try:
resp = input('>>> Please enter the pkg file path: ')
if not os.path.isfile(resp):
dpt.err_print('File `{}` does not exist!'.format(resp))
return False
resp2 = input('>>> Pleae confirm {} is the pkg file to use [yes/no]: ')
if resp2 == 'yes':
if not dpt.update_firmware(open(resp, 'rb')):
dpt.err_print('Failed to upload pkg {}'.format(resp))
return False
dpt.info_print('Success!')
return True
elif resp == 'no':
dpt.info_print('Okay!')
return False
else:
dpt.err_print('Unrecognized response: {}'.format(resp))
return False
except BaseException as e:
dpt.err_print(str(e))
return False
class DPT():
def __init__(self, addr=None, debug=False):
'''
@ -141,8 +33,117 @@ class DPT():
self.cookies = {}
# setup base url
self.base_url = "https://{0}:8443".format(self.addr)
# holder of diagnosis serial
self.serial = None
self.serialReadTimeout = 1 # default read timeout is 1sec
def runCmd(self):
'''
diagnosis mode related
'''
def connect_to_diagnosis(self, ttyName):
'''
connect to diagnosis
'''
try:
ser = serial.Serial(ttyName, 115200, timeout=self.serialReadTimeout)
# ser.open()
if not ser.is_open:
raise BaseException
self.serial = ser
except BaseException as e:
self.err_print(
"Cannot open serial port {0} due to {1}"
.format(ttyName, str(e)))
return False
return True
def diagnosis_login(self, username, password):
'''
login onto DPT diagnosis mode
'''
if self.serial is None:
return False
try:
self.serial.write(b'\n') # poke
resp = self.serial.read(50)
self.dbg_print(resp)
if b'login' in resp:
self.dbg_print('Entering username {}'.format(username))
self.serial.write(username.encode() + b'\n')
resp = self.serial.read(50)
self.dbg_print(resp)
if b'Password' in resp:
self.dbg_print('Entering password {}'.format(password))
self.serial.write(password.encode() + b'\n')
resp = self.serial.read(80)
self.dbg_print(resp)
except serial.SerialTimeoutException as e:
self.err_print('Timeout: {}'.format(e))
except BaseException as e:
self.err_print(str(e))
return False
if b'# ' in resp:
return True
return False
def diagnosis_isfile(self, fp):
'''
check if file exists given file path
'''
cmd = "[ -f {} ] && echo 'YESS' || echo 'NONO'".format(fp)
return 'YESS' in self.diagnosis_write(cmd)
def diagnosis_isfolder(self, folderp):
'''
check if file exists given file path
'''
cmd = "[ -d {} ] && echo 'YESS' || echo 'NONO'".format(folderp)
return 'YESS' in self.diagnosis_write(cmd)
def diagnosis_write(self, cmd, echo=False):
'''
write cmd and read feedbacks
'''
if self.serial is None:
return ""
if 'less ' in cmd:
self.err_print('do not support less/more')
try:
self.serial.write(cmd.encode() + b'\n')
# change timeout to (nearly) blocking first to read
self.serial.timeout = 99
resp = self.serial.read_until(b'# ')
# change back the original timeout
self.serial.timeout = self.serialReadTimeout
self.dbg_print(resp)
except serial.SerialTimeoutException as e:
self.err_print('Timeout: {}'.format(e))
except BaseException as e:
self.err_print(str(e))
return ""
if echo:
return resp.decode("utf-8").replace('\r\r\n', '')
return resp.decode("utf-8").replace('\r\r\n', '').replace(cmd, '')
def shut_down_diagnosis(self):
'''
close serial connection
'''
if self.serial is not None:
try:
self.serial.close()
except BaseException as e:
self.err_print('Cannot close serial port')
return False
return True
'''
Web interface related
'''
def run_cmd(self):
# self._put_api_with_cookies(
# "/notify/login_result", data={"value": "this is a test"}
# )
@ -150,8 +151,8 @@ class DPT():
# if folder_id:
# self.delete_folder(folder_id, force=False)
# self._get_api_with_cookies("/folders2/c18c51bb-4323-4e9c-b874-40288e20227b")
self._get_testmode_auth_nonce("testmode")
self._get_api_with_cookies("/testmode/auth/nonce")
# self._get_testmode_auth_nonce("testmode")
# self._get_api_with_cookies("/testmode/auth/nonce")
pass
def commands_need_testmode_authentication(self):
@ -451,21 +452,24 @@ class DPT():
ok_code=204, cookies=None, data={}, files={}, headers={}
):
apiurl = "{0}{1}".format(self.base_url, api)
r = requests.put(
apiurl,
cookies=cookies, verify=False,
json=data, files=files, headers=headers)
if r.status_code is ok_code:
self.dbg_print(apiurl)
if r.text:
self.dbg_print(r.json())
return r.json()
return {"status": "ok"}
self.err_print(apiurl)
try:
self.err_request_print(r.status_code, r.json())
except BaseException:
self.err_print("{}, {}".format(r.status_code, r.text))
r = requests.put(
apiurl,
cookies=cookies, verify=False,
json=data, files=files, headers=headers)
if r.status_code is ok_code:
self.dbg_print(apiurl)
if r.text:
self.dbg_print(r.json())
return r.json()
return {"status": "ok"}
self.err_print(apiurl)
try:
self.err_request_print(r.status_code, r.json())
except BaseException:
self.err_print("{}, {}".format(r.status_code, r.text))
except BaseException as e:
self.err_print(str(e))
return {}
def _put_api_with_cookies(self, api, ok_code=204, data={}, files={}):
@ -568,4 +572,4 @@ if __name__ == '__main__':
# f.write(dpt.get_past_logs())
if args['dpt_fwfh'] and os.path.isfile(args['dpt_fwfh']):
dpt.update_firmware(open(args['dpt_fwfh'], 'rb'))
# dpt.runCmd()
# dpt.run_cmd()

View File

@ -0,0 +1,265 @@
#!/usr/bin/python3
# built-ins
import os
import time
# import traceback
def validate_required_files(dpt):
requiredFiles = [
'python_api/shankerzhiwu_disableidcheck.pkg',
'python_api/shankerzhiwu_changepwd.pkg'
]
dpt.dbg_print('Checking required files...')
for file in requiredFiles:
if not os.path.isfile(file):
dpt.err_print('File {0} does not exist!'.format(file))
return False
return True
def disable_id_check(dpt):
'''
disable the id check (thanks to shankerzhiwu and his/her friend)
'''
fp = 'python_api/shankerzhiwu_disableidcheck.pkg'
try:
resp = input('>>> Have you disabled the id check already? [yes/no]: ')
if resp == 'no':
if not dpt.update_firmware(open(fp, 'rb')):
dpt.err_print('Failed to upload shankerzhiwu_disableidcheck pkg')
return False
try:
input(
'>>> Press `Enter` key to continue after your DPT reboot, ' +
'shows `update failure` message, and connects back to WiFi: ')
except BaseException as e:
dpt.err_print(str(e))
return False
return True
elif resp == 'yes':
return True
else:
dpt.err_print('Unrecognized response: {}'.format(resp))
except BaseException as e:
dpt.err_print(str(e))
return False
def reset_root_password(dpt):
'''
reset the root password (thanks to shankerzhiwu and his/her friend)
'''
fp = 'python_api/shankerzhiwu_changepwd.pkg'
try:
if not dpt.update_firmware(open(fp, 'rb')):
dpt.err_print('Failed to upload shankerzhiwu_changepwd pkg')
return False
return True
except BaseException as e:
dpt.err_print(str(e))
return False
def obtain_diagnosis_access(dpt):
'''
root thanks to shankerzhiwu
'''
dpt.info_print(
'Please make sure you have charged your battery before this action.')
dpt.info_print(
'Thank shankerzhiwu (and his/her anonymous friend) a lot on this hack!!!' +
'All credits go to him (and his/her anonymous friend)!')
if not validate_required_files(dpt):
return False
# step 1: disable the id check
if not disable_id_check(dpt):
return False
dpt.info_print('Congrats! You are half-way through! You have disabled the OTG ID check')
# step 2: reset root password
if not reset_root_password(dpt):
return False
dpt.info_print(
'You are all set! Wait till your DPT reboots and ' +
'shows `update failure` message! More edits will be added to this tool.')
return True
def update_firmware(dpt):
'''
update firmware interface
'''
dpt.info_print(
'Please make sure you have charged your battery before this action.')
try:
resp = input('>>> Please enter the pkg file path: ')
if not os.path.isfile(resp):
dpt.err_print('File `{}` does not exist!'.format(resp))
return False
resp2 = input('>>> Pleae confirm {} is the pkg file to use [yes/no]: ')
if resp2 == 'yes':
if not dpt.update_firmware(open(resp, 'rb')):
dpt.err_print('Failed to upload pkg {}'.format(resp))
return False
dpt.info_print('Success!')
return True
elif resp == 'no':
dpt.info_print('Okay!')
else:
dpt.err_print('Unrecognized response: {}'.format(resp))
except BaseException as e:
dpt.err_print(str(e))
return False
def print_diagnosis_info():
print("""============================
DPT Tools - Diagnosis Mode
============================
This is diagnosis mode. Type `help` to show this message.
It behaves similarly to regular serial session with less flexibility (cannot use tab, scroll up, quick reverse search, etc.).
This mode intends to automate some complicated procedures.
Supported commands:
`transfer-file` -- transfer file to DPT at 512bps
`exit`/`quit` -- leave the tool
and many unix cmds (do not support less/head)
""")
def diagnosis_transfer_file(dpt, chunkSize=128):
'''
transfer file through echo in diagnosis (serial) mode
using echo is dumb and slow but very reliable
limited to 128 bytes per sec since we send raw bytes in
string (each byte sent = 4 bytes), and terminal at best
allows 1024 bytes to send
do NOT transfer large file using this, it will take
forever to finish..
'''
try:
# get local file path
localfp = input('> Local file path: ')
while localfp[-1] == ' ': # remove extra spaces
localfp = localfp[:-1]
if not os.path.isfile(localfp):
dpt.err_print('File {} does not exist!'.format(localfp))
return False
# get remote folder and validate it
folder = input('> DPT folder path: ')
# folder does not exit, create one?
if not dpt.diagnosis_isfolder(folder):
resp = input('> {} not exist, create? [yes/no]: '.format(folder))
if resp == 'no':
return False
elif resp == 'yes':
dpt.diagnosis_write('mkdir -p {}'.format(folder))
else:
dpt.err_print('Unrecognized input {}'.format(resp))
return False
# remote file exists, overwrite it?
remotefp = "{0}/{1}".format(folder, os.path.basename(localfp))
if dpt.diagnosis_isfile(remotefp):
resp = input('> {} exist, overwrite? [yes/no]: '.format(remotefp))
if resp == 'no':
return False
elif not resp == 'yes':
dpt.err_print('Unrecognized input {}'.format(resp))
return False
firstRun = True
symbol = '>'
startTime = int(time.time() * 1000)
with open(localfp, 'rb') as f:
while 1:
chunk = f.read(chunkSize)
if chunk:
cmd = "echo -e -n '\\x{0}' {1} {2}".format(
'\\x'.join('{:02x}'.format(x) for x in chunk),
symbol,
remotefp
)
dpt.diagnosis_write(cmd)
else:
break
if firstRun:
symbol = '>>'
firstRun = False
duration = int(time.time() * 1000) - startTime
dpt.info_print('Finished in {0:.2f}sec'.format(duration / 1000.0))
except BaseException as e:
dpt.err_print(str(e))
return False
def diagnosis_cmd(dpt):
'''
run commands in diagnosis mode
'''
# login
if not dpt.diagnosis_login(username='root', password='12345'):
dpt.err_print('failed to login..')
return
# interactive mode
firstTime = True
frontLine = 'root #: '
while 1:
if firstTime:
print_diagnosis_info()
firstTime = False
try:
cmd = input(frontLine)
if cmd == 'exit' or cmd == 'quit':
break
elif cmd == 'help':
print_diagnosis_info()
continue
elif cmd == 'transfer-file':
diagnosis_transfer_file(dpt)
continue
rawresp = dpt.diagnosis_write(cmd)
# ignore first and last echos
tmp = rawresp.splitlines()
frontLine = tmp[-1]
resp = tmp[1:-1]
for line in resp:
print(line)
except KeyboardInterrupt:
break
except EOFError:
break
except BaseException as e:
dpt.err_print(str(e))
def diagnosis_mode(dpt):
'''
enter diagnosis mode
'''
dpt.info_print('Steps to enter diagnosis mode:')
dpt.info_print('1. Turn of DPT')
dpt.info_print('2. Hold HOME button')
dpt.info_print('3. Press POWER button once. Then light blinks yellow')
dpt.info_print('4. Wait till a black square appear on the screen')
dpt.info_print('5. Connect to computer')
try:
resp = input('>>> Black square on the screen? [yes/no]: ')
if resp == 'no':
return False
elif not resp == 'yes':
dpt.err_print('Unrecognized response: {}'.format(resp))
return False
ttyName = input('>>> Enter the serial port [/dev/tty.usbmodem01]: ')
if ttyName == "":
ttyName = "/dev/tty.usbmodem01"
if not os.path.exists(ttyName):
dpt.err_print('serial `{}` not exists!'.format(ttyName))
return False
except BaseException as e:
dpt.err_print(str(e))
return False
if not dpt.connect_to_diagnosis(ttyName):
return False
diagnosis_cmd(dpt)
dpt.shut_down_diagnosis()
return True