parent
0f61bfe432
commit
789299b46e
|
|
@ -106,10 +106,20 @@ class DPT():
|
|||
back up boot partition to /tmp/ folder
|
||||
'''
|
||||
cmd = 'dd if=/dev/mmcblk0p8 of=/tmp/boot.img.bak bs=4M'
|
||||
self.diagnosis_write(cmd, timeout=60)
|
||||
self.diagnosis_write(cmd, timeout=999)
|
||||
if not self.diagnosis_isfile('/tmp/boot.img.bak'):
|
||||
self.err_print('Failed to backup!')
|
||||
self.err_print('Failed to dump boot.img.bak!')
|
||||
return None
|
||||
return "/tmp/boot.img.bak"
|
||||
|
||||
def diagnosis_restore_boot(self, fp="/tmp/boot.img.bak"):
|
||||
if not self.diagnosis_isfile(fp):
|
||||
self.dbg_print("{} does not exist".format(fp))
|
||||
return False
|
||||
cmd = "dd if='{}' of=/dev/mmcblk0p8 bs=4M".format(fp)
|
||||
self.info_print("Fingercrossing.. Do NOT power off device!")
|
||||
# need to be extra careful here
|
||||
self.diagnosis_write(cmd, timeout=99999)
|
||||
return True
|
||||
|
||||
def diagnosis_write(self, cmd, echo=False, timeout=99):
|
||||
|
|
@ -130,6 +140,19 @@ class DPT():
|
|||
self.dbg_print(resp)
|
||||
except serial.SerialTimeoutException as e:
|
||||
self.err_print('Timeout: {}'.format(e))
|
||||
self.err_print("Do NOT panic. Command may be still running.")
|
||||
self.err_print("Do NOT power off the device")
|
||||
self.err_print("Quit the tool. You need manual troubleshooting:")
|
||||
self.err_print("1. See if you can get back into tty in terminal")
|
||||
self.err_print(" If not, unplug the cable and plug it back in,")
|
||||
self.err_print(" try tty again in terminal")
|
||||
self.err_print("2. Once you get in, try press `Enter` to see")
|
||||
self.err_print(" the response. It could be still running so")
|
||||
self.err_print(" nothing responded. Suggest not to kill it")
|
||||
self.err_print("3. Be patient. If it stucks for hours, then kill")
|
||||
self.err_print(" the process by pressing Ctrl + C. Depending on")
|
||||
self.err_print(" what you ran, you may or may not have troubles")
|
||||
self.err_print("4. Worst case is to flash the stock pkg, I think.")
|
||||
except BaseException as e:
|
||||
self.err_print(str(e))
|
||||
return ""
|
||||
|
|
|
|||
|
|
@ -123,14 +123,16 @@ It behaves similarly to regular serial session with less flexibility (cannot use
|
|||
This mode intends to automate some complicated procedures.
|
||||
|
||||
Supported commands:
|
||||
`push-file` -- transfer file to DPT at 512bps
|
||||
`pull-file` -- transfer file from DPT
|
||||
`exit`/`quit` -- leave the tool
|
||||
`push-file` -- transfer file to DPT at 512bps
|
||||
`pull-file` -- transfer file from DPT
|
||||
`backup-bootimg` -- backup the boot img and download it to local device
|
||||
`restore-bootimg` -- restore the boot img
|
||||
`exit`/`quit` -- leave the tool
|
||||
and many unix cmds (do not support less/head)
|
||||
""")
|
||||
|
||||
|
||||
def diagnosis_pull_file(dpt):
|
||||
def diagnosis_pull_file(dpt, remotefp=None, folder=None, overwrite=None):
|
||||
'''
|
||||
pull file from device to local via xxd and parsing in
|
||||
python
|
||||
|
|
@ -139,51 +141,54 @@ def diagnosis_pull_file(dpt):
|
|||
'''
|
||||
try:
|
||||
# get and validate remote file path
|
||||
remotefp = input('> DPT file path: ')
|
||||
if remotefp is None:
|
||||
remotefp = input('> DPT file path: ')
|
||||
if not dpt.diagnosis_isfile(remotefp):
|
||||
dpt.err_print('File {} does not exist!'.format(remotefp))
|
||||
return False
|
||||
return None
|
||||
# get local folder path
|
||||
folder = input('> Local folder path: ')
|
||||
if not os.path.isdir(folder):
|
||||
resp = input('> {} not exist, create? [yes/no]: '.format(folder))
|
||||
if resp == 'no':
|
||||
return False
|
||||
elif resp == 'yes':
|
||||
os.makedirs(folder)
|
||||
else:
|
||||
dpt.err_print('Unrecognized input {}'.format(resp))
|
||||
return False
|
||||
if folder is None:
|
||||
folder = input('> Local folder path: ')
|
||||
if not os.path.isdir(folder):
|
||||
resp = input(
|
||||
'> {} not exist, create? [yes/no]: '.format(folder))
|
||||
if resp == 'no':
|
||||
return None
|
||||
elif resp == 'yes':
|
||||
os.makedirs(folder)
|
||||
else:
|
||||
dpt.err_print('Unrecognized input {}'.format(resp))
|
||||
return None
|
||||
# check if local fp exists
|
||||
localfp = "{0}/{1}".format(folder, os.path.basename(remotefp))
|
||||
if os.path.isfile(localfp):
|
||||
if os.path.isfile(localfp) and overwrite is None::
|
||||
resp = input('> {} exist, overwrite? [yes/no]: '.format(localfp))
|
||||
if resp == 'no':
|
||||
return False
|
||||
elif not resp == 'yes':
|
||||
dpt.err_print('Unrecognized input {}'.format(resp))
|
||||
return False
|
||||
# read from xxd, parse, and write to local file
|
||||
startTime = int(time.time() * 1000)
|
||||
cmd = "xxd -p {}".format(remotefp)
|
||||
with open("{}.tmp".format(localfp), 'w') as f:
|
||||
for each in dpt.diagnosis_write(cmd, timeout=999).splitlines():
|
||||
f.write(each)
|
||||
subprocess.call(
|
||||
['xxd', '-r', '-p', "{}.tmp".format(localfp), '>', localfp]
|
||||
)
|
||||
duration = int(time.time() * 1000) - startTime
|
||||
dpt.info_print('Finished in {0:.2f}sec'.format(duration / 1000.0))
|
||||
if os.path.isfile(localfp):
|
||||
dpt.info_print("Success")
|
||||
os.remove("{}.tmp".format(localfp))
|
||||
return True
|
||||
overwrite = True resp == 'yes' else False
|
||||
if overwrite:
|
||||
# read from xxd, parse, and write to local file
|
||||
startTime = int(time.time() * 1000)
|
||||
s = "xxd -p {}".format(remotefp)
|
||||
with open("{}.tmp".format(localfp), 'w') as f:
|
||||
for each in dpt.diagnosis_write(s, timeout=999).splitlines():
|
||||
f.write(each)
|
||||
subprocess.call(
|
||||
['xxd', '-r', '-p', "{}.tmp".format(localfp), '>', localfp]
|
||||
)
|
||||
duration = int(time.time() * 1000) - startTime
|
||||
dpt.info_print('Finished in {0:.2f}sec'.format(duration / 1000.0))
|
||||
if os.path.isfile(localfp):
|
||||
# TODO: add md5 validation
|
||||
dpt.info_print("File pulled to: {}".format(localfp))
|
||||
os.remove("{}.tmp".format(localfp))
|
||||
return localfp
|
||||
except BaseException as e:
|
||||
dpt.err_print(str(e))
|
||||
return False
|
||||
return None
|
||||
|
||||
|
||||
def diagnosis_push_file(dpt, chunkSize=128):
|
||||
def diagnosis_push_file(
|
||||
dpt, chunkSize=128, localfp=None, folder=None, overwrite=None
|
||||
):
|
||||
'''
|
||||
push file from local to device through echo in diagnosis
|
||||
(serial) mode
|
||||
|
|
@ -196,59 +201,98 @@ def diagnosis_push_file(dpt, chunkSize=128):
|
|||
'''
|
||||
try:
|
||||
# get local file path
|
||||
localfp = input('> Local file path: ')
|
||||
while localfp[-1] == ' ': # remove extra spaces
|
||||
localfp = localfp[:-1]
|
||||
if localfp is None:
|
||||
localfp = input('> Local file path: ')
|
||||
while localfp[-1] == ' ': # remove extra spaces
|
||||
localfp = localfp[:-1]
|
||||
if not os.path.isfile(localfp):
|
||||
dpt.err_print('File {} does not exist!'.format(localfp))
|
||||
return False
|
||||
return None
|
||||
# get remote folder and validate it
|
||||
folder = input('> DPT folder path: ')
|
||||
# folder does not exit, create one?
|
||||
if not dpt.diagnosis_isfolder(folder):
|
||||
resp = input('> {} not exist, create? [yes/no]: '.format(folder))
|
||||
if resp == 'no':
|
||||
return False
|
||||
elif resp == 'yes':
|
||||
dpt.diagnosis_write('mkdir -p {}'.format(folder))
|
||||
else:
|
||||
dpt.err_print('Unrecognized input {}'.format(resp))
|
||||
return False
|
||||
if folder is None:
|
||||
folder = input('> DPT folder path: ')
|
||||
# folder does not exit, create one?
|
||||
if not dpt.diagnosis_isfolder(folder):
|
||||
resp = input('> {} not exist, create? [yes/no]: '.format(folder))
|
||||
if resp == 'no':
|
||||
return None
|
||||
elif resp == 'yes':
|
||||
dpt.diagnosis_write('mkdir -p {}'.format(folder))
|
||||
else:
|
||||
dpt.err_print('Unrecognized input {}'.format(resp))
|
||||
return None
|
||||
# remote file exists, overwrite it?
|
||||
remotefp = "{0}/{1}".format(folder, os.path.basename(localfp))
|
||||
if dpt.diagnosis_isfile(remotefp):
|
||||
if dpt.diagnosis_isfile(remotefp) and overwrite is None:
|
||||
resp = input('> {} exist, overwrite? [yes/no]: '.format(remotefp))
|
||||
if resp == 'no':
|
||||
return False
|
||||
elif not resp == 'yes':
|
||||
dpt.err_print('Unrecognized input {}'.format(resp))
|
||||
return False
|
||||
# write through echo
|
||||
firstRun = True
|
||||
symbol = '>'
|
||||
startTime = int(time.time() * 1000)
|
||||
with open(localfp, 'rb') as f:
|
||||
while 1:
|
||||
chunk = f.read(chunkSize)
|
||||
if chunk:
|
||||
cmd = "echo -e -n '\\x{0}' {1} {2}".format(
|
||||
'\\x'.join('{:02x}'.format(x) for x in chunk),
|
||||
symbol,
|
||||
remotefp
|
||||
)
|
||||
dpt.diagnosis_write(cmd)
|
||||
else:
|
||||
break
|
||||
if firstRun:
|
||||
symbol = '>>'
|
||||
firstRun = False
|
||||
duration = int(time.time() * 1000) - startTime
|
||||
dpt.info_print('Finished in {0:.2f}sec'.format(duration / 1000.0))
|
||||
if dpt.diagnosis_isfile(remotefp):
|
||||
dpt.info_print("Success")
|
||||
return True
|
||||
overwrite = True resp == 'yes' else False
|
||||
if overwrite:
|
||||
# write through echo
|
||||
firstRun = True
|
||||
symbol = '>'
|
||||
startTime = int(time.time() * 1000)
|
||||
with open(localfp, 'rb') as f:
|
||||
while 1:
|
||||
chunk = f.read(chunkSize)
|
||||
if chunk:
|
||||
cmd = "echo -e -n '\\x{0}' {1} {2}".format(
|
||||
'\\x'.join('{:02x}'.format(x) for x in chunk),
|
||||
symbol,
|
||||
remotefp
|
||||
)
|
||||
dpt.diagnosis_write(cmd)
|
||||
else:
|
||||
break
|
||||
if firstRun:
|
||||
symbol = '>>'
|
||||
firstRun = False
|
||||
duration = int(time.time() * 1000) - startTime
|
||||
dpt.info_print('Finished in {0:.2f}sec'.format(duration / 1000.0))
|
||||
if dpt.diagnosis_isfile(remotefp):
|
||||
# TODO: add md5 validation
|
||||
dpt.info_print("File pushed to: {}".format(remotefp))
|
||||
return remotefp
|
||||
except BaseException as e:
|
||||
dpt.err_print(str(e))
|
||||
return None
|
||||
|
||||
|
||||
def diagnosis_backup_bootimg(dpt):
|
||||
'''
|
||||
backup boot img and then pull img from DPT to local disk
|
||||
'''
|
||||
remotefp = dpt.diagnosis_backup_boot()
|
||||
# pull this backup file to current folder
|
||||
if remotefp is not None:
|
||||
fp = diagnosis_pull_file(
|
||||
dpt, remotefp=remotefp, folder=".", overwrite=True
|
||||
)
|
||||
if fp is not None:
|
||||
dpt.info_print("Success!")
|
||||
return True
|
||||
dpt.info_print("Nothing happened..")
|
||||
return False
|
||||
|
||||
|
||||
def diagnosis_restore_bootimg(dpt, usetmpfp=None, bootimgfp=None):
|
||||
'''
|
||||
restore boot img
|
||||
'''
|
||||
if usetmpfp is None:
|
||||
resp = input('> Use local boot img? [yes/no]: ')
|
||||
usetmpfp = False resp == 'yes' else True
|
||||
# directly use the original backup, if exists
|
||||
if usetmpfp:
|
||||
return dpt.diagnosis_restore_boot(self, fp="/tmp/boot.img.bak")
|
||||
# otherwise we need to first upload our own boot img
|
||||
remotefp = diagnosis_push_file(dpt, folder="/tmp", overwrite=True)
|
||||
if remotefp is not None:
|
||||
if dpt.diagnosis_restore_boot(self, fp=remotefp):
|
||||
dpt.info_print("Success!")
|
||||
return True
|
||||
dpt.err_print("Failed..")
|
||||
return False
|
||||
dpt.err_print("Nothing happened..")
|
||||
return False
|
||||
|
||||
|
||||
|
|
@ -280,6 +324,12 @@ def diagnosis_cmd(dpt):
|
|||
elif cmd == 'pull-file':
|
||||
diagnosis_pull_file(dpt)
|
||||
continue
|
||||
elif cmd == 'backup-bootimg':
|
||||
diagnosis_backup_bootimg(dpt)
|
||||
continue
|
||||
elif cmd == 'restore-bootimg':
|
||||
diagnosis_restore_bootimg(dpt)
|
||||
continue
|
||||
rawresp = dpt.diagnosis_write(cmd)
|
||||
# ignore first and last echos
|
||||
tmp = rawresp.splitlines()
|
||||
|
|
|
|||
Loading…
Reference in New Issue