diff --git a/python_api/libDPT.py b/python_api/libDPT.py index 5f77666..365a3c5 100644 --- a/python_api/libDPT.py +++ b/python_api/libDPT.py @@ -106,10 +106,20 @@ class DPT(): back up boot partition to /tmp/ folder ''' cmd = 'dd if=/dev/mmcblk0p8 of=/tmp/boot.img.bak bs=4M' - self.diagnosis_write(cmd, timeout=60) + self.diagnosis_write(cmd, timeout=999) if not self.diagnosis_isfile('/tmp/boot.img.bak'): - self.err_print('Failed to backup!') + self.err_print('Failed to dump boot.img.bak!') + return None + return "/tmp/boot.img.bak" + + def diagnosis_restore_boot(self, fp="/tmp/boot.img.bak"): + if not self.diagnosis_isfile(fp): + self.dbg_print("{} does not exist".format(fp)) return False + cmd = "dd if='{}' of=/dev/mmcblk0p8 bs=4M".format(fp) + self.info_print("Fingercrossing.. Do NOT power off device!") + # need to be extra careful here + self.diagnosis_write(cmd, timeout=99999) return True def diagnosis_write(self, cmd, echo=False, timeout=99): @@ -130,6 +140,19 @@ class DPT(): self.dbg_print(resp) except serial.SerialTimeoutException as e: self.err_print('Timeout: {}'.format(e)) + self.err_print("Do NOT panic. Command may be still running.") + self.err_print("Do NOT power off the device") + self.err_print("Quit the tool. You need manual troubleshooting:") + self.err_print("1. See if you can get back into tty in terminal") + self.err_print(" If not, unplug the cable and plug it back in,") + self.err_print(" try tty again in terminal") + self.err_print("2. Once you get in, try press `Enter` to see") + self.err_print(" the response. It could be still running so") + self.err_print(" nothing responded. Suggest not to kill it") + self.err_print("3. Be patient. If it stucks for hours, then kill") + self.err_print(" the process by pressing Ctrl + C. Depending on") + self.err_print(" what you ran, you may or may not have troubles") + self.err_print("4. Worst case is to flash the stock pkg, I think.") except BaseException as e: self.err_print(str(e)) return "" diff --git a/python_api/libInteractive.py b/python_api/libInteractive.py index 5e37c97..1d44d9a 100644 --- a/python_api/libInteractive.py +++ b/python_api/libInteractive.py @@ -123,14 +123,16 @@ It behaves similarly to regular serial session with less flexibility (cannot use This mode intends to automate some complicated procedures. Supported commands: - `push-file` -- transfer file to DPT at 512bps - `pull-file` -- transfer file from DPT - `exit`/`quit` -- leave the tool + `push-file` -- transfer file to DPT at 512bps + `pull-file` -- transfer file from DPT + `backup-bootimg` -- backup the boot img and download it to local device + `restore-bootimg` -- restore the boot img + `exit`/`quit` -- leave the tool and many unix cmds (do not support less/head) """) -def diagnosis_pull_file(dpt): +def diagnosis_pull_file(dpt, remotefp=None, folder=None, overwrite=None): ''' pull file from device to local via xxd and parsing in python @@ -139,51 +141,54 @@ def diagnosis_pull_file(dpt): ''' try: # get and validate remote file path - remotefp = input('> DPT file path: ') + if remotefp is None: + remotefp = input('> DPT file path: ') if not dpt.diagnosis_isfile(remotefp): dpt.err_print('File {} does not exist!'.format(remotefp)) - return False + return None # get local folder path - folder = input('> Local folder path: ') - if not os.path.isdir(folder): - resp = input('> {} not exist, create? [yes/no]: '.format(folder)) - if resp == 'no': - return False - elif resp == 'yes': - os.makedirs(folder) - else: - dpt.err_print('Unrecognized input {}'.format(resp)) - return False + if folder is None: + folder = input('> Local folder path: ') + if not os.path.isdir(folder): + resp = input( + '> {} not exist, create? [yes/no]: '.format(folder)) + if resp == 'no': + return None + elif resp == 'yes': + os.makedirs(folder) + else: + dpt.err_print('Unrecognized input {}'.format(resp)) + return None # check if local fp exists localfp = "{0}/{1}".format(folder, os.path.basename(remotefp)) - if os.path.isfile(localfp): + if os.path.isfile(localfp) and overwrite is None:: resp = input('> {} exist, overwrite? [yes/no]: '.format(localfp)) - if resp == 'no': - return False - elif not resp == 'yes': - dpt.err_print('Unrecognized input {}'.format(resp)) - return False - # read from xxd, parse, and write to local file - startTime = int(time.time() * 1000) - cmd = "xxd -p {}".format(remotefp) - with open("{}.tmp".format(localfp), 'w') as f: - for each in dpt.diagnosis_write(cmd, timeout=999).splitlines(): - f.write(each) - subprocess.call( - ['xxd', '-r', '-p', "{}.tmp".format(localfp), '>', localfp] - ) - duration = int(time.time() * 1000) - startTime - dpt.info_print('Finished in {0:.2f}sec'.format(duration / 1000.0)) - if os.path.isfile(localfp): - dpt.info_print("Success") - os.remove("{}.tmp".format(localfp)) - return True + overwrite = True resp == 'yes' else False + if overwrite: + # read from xxd, parse, and write to local file + startTime = int(time.time() * 1000) + s = "xxd -p {}".format(remotefp) + with open("{}.tmp".format(localfp), 'w') as f: + for each in dpt.diagnosis_write(s, timeout=999).splitlines(): + f.write(each) + subprocess.call( + ['xxd', '-r', '-p', "{}.tmp".format(localfp), '>', localfp] + ) + duration = int(time.time() * 1000) - startTime + dpt.info_print('Finished in {0:.2f}sec'.format(duration / 1000.0)) + if os.path.isfile(localfp): + # TODO: add md5 validation + dpt.info_print("File pulled to: {}".format(localfp)) + os.remove("{}.tmp".format(localfp)) + return localfp except BaseException as e: dpt.err_print(str(e)) - return False + return None -def diagnosis_push_file(dpt, chunkSize=128): +def diagnosis_push_file( + dpt, chunkSize=128, localfp=None, folder=None, overwrite=None +): ''' push file from local to device through echo in diagnosis (serial) mode @@ -196,59 +201,98 @@ def diagnosis_push_file(dpt, chunkSize=128): ''' try: # get local file path - localfp = input('> Local file path: ') - while localfp[-1] == ' ': # remove extra spaces - localfp = localfp[:-1] + if localfp is None: + localfp = input('> Local file path: ') + while localfp[-1] == ' ': # remove extra spaces + localfp = localfp[:-1] if not os.path.isfile(localfp): dpt.err_print('File {} does not exist!'.format(localfp)) - return False + return None # get remote folder and validate it - folder = input('> DPT folder path: ') - # folder does not exit, create one? - if not dpt.diagnosis_isfolder(folder): - resp = input('> {} not exist, create? [yes/no]: '.format(folder)) - if resp == 'no': - return False - elif resp == 'yes': - dpt.diagnosis_write('mkdir -p {}'.format(folder)) - else: - dpt.err_print('Unrecognized input {}'.format(resp)) - return False + if folder is None: + folder = input('> DPT folder path: ') + # folder does not exit, create one? + if not dpt.diagnosis_isfolder(folder): + resp = input('> {} not exist, create? [yes/no]: '.format(folder)) + if resp == 'no': + return None + elif resp == 'yes': + dpt.diagnosis_write('mkdir -p {}'.format(folder)) + else: + dpt.err_print('Unrecognized input {}'.format(resp)) + return None # remote file exists, overwrite it? remotefp = "{0}/{1}".format(folder, os.path.basename(localfp)) - if dpt.diagnosis_isfile(remotefp): + if dpt.diagnosis_isfile(remotefp) and overwrite is None: resp = input('> {} exist, overwrite? [yes/no]: '.format(remotefp)) - if resp == 'no': - return False - elif not resp == 'yes': - dpt.err_print('Unrecognized input {}'.format(resp)) - return False - # write through echo - firstRun = True - symbol = '>' - startTime = int(time.time() * 1000) - with open(localfp, 'rb') as f: - while 1: - chunk = f.read(chunkSize) - if chunk: - cmd = "echo -e -n '\\x{0}' {1} {2}".format( - '\\x'.join('{:02x}'.format(x) for x in chunk), - symbol, - remotefp - ) - dpt.diagnosis_write(cmd) - else: - break - if firstRun: - symbol = '>>' - firstRun = False - duration = int(time.time() * 1000) - startTime - dpt.info_print('Finished in {0:.2f}sec'.format(duration / 1000.0)) - if dpt.diagnosis_isfile(remotefp): - dpt.info_print("Success") - return True + overwrite = True resp == 'yes' else False + if overwrite: + # write through echo + firstRun = True + symbol = '>' + startTime = int(time.time() * 1000) + with open(localfp, 'rb') as f: + while 1: + chunk = f.read(chunkSize) + if chunk: + cmd = "echo -e -n '\\x{0}' {1} {2}".format( + '\\x'.join('{:02x}'.format(x) for x in chunk), + symbol, + remotefp + ) + dpt.diagnosis_write(cmd) + else: + break + if firstRun: + symbol = '>>' + firstRun = False + duration = int(time.time() * 1000) - startTime + dpt.info_print('Finished in {0:.2f}sec'.format(duration / 1000.0)) + if dpt.diagnosis_isfile(remotefp): + # TODO: add md5 validation + dpt.info_print("File pushed to: {}".format(remotefp)) + return remotefp except BaseException as e: dpt.err_print(str(e)) + return None + + +def diagnosis_backup_bootimg(dpt): + ''' + backup boot img and then pull img from DPT to local disk + ''' + remotefp = dpt.diagnosis_backup_boot() + # pull this backup file to current folder + if remotefp is not None: + fp = diagnosis_pull_file( + dpt, remotefp=remotefp, folder=".", overwrite=True + ) + if fp is not None: + dpt.info_print("Success!") + return True + dpt.info_print("Nothing happened..") + return False + + +def diagnosis_restore_bootimg(dpt, usetmpfp=None, bootimgfp=None): + ''' + restore boot img + ''' + if usetmpfp is None: + resp = input('> Use local boot img? [yes/no]: ') + usetmpfp = False resp == 'yes' else True + # directly use the original backup, if exists + if usetmpfp: + return dpt.diagnosis_restore_boot(self, fp="/tmp/boot.img.bak") + # otherwise we need to first upload our own boot img + remotefp = diagnosis_push_file(dpt, folder="/tmp", overwrite=True) + if remotefp is not None: + if dpt.diagnosis_restore_boot(self, fp=remotefp): + dpt.info_print("Success!") + return True + dpt.err_print("Failed..") + return False + dpt.err_print("Nothing happened..") return False @@ -280,6 +324,12 @@ def diagnosis_cmd(dpt): elif cmd == 'pull-file': diagnosis_pull_file(dpt) continue + elif cmd == 'backup-bootimg': + diagnosis_backup_bootimg(dpt) + continue + elif cmd == 'restore-bootimg': + diagnosis_restore_bootimg(dpt) + continue rawresp = dpt.diagnosis_write(cmd) # ignore first and last echos tmp = rawresp.splitlines()