diff --git a/controller.py b/controller.py new file mode 100644 index 0000000..5a2736f --- /dev/null +++ b/controller.py @@ -0,0 +1,31 @@ +from libs.env import get_env_var +from libs.env import set_env_var + +from libs.vacuum_controller import VacuumController + + + +def export_ip_token(ip, token): + print("Exporting to environment variables") + set_env_var("MIROBO_IP", ip) + set_env_var("MIROBO_TOKEN", token) + + print("Exporting to `.setup.sh` for later references") + with open(".setup.sh", "w") as f: + f.write("export MIROBO_IP={}\n".format(ip)) + f.write("export MIROBO_TOKEN={}\n".format(token)) + + +if __name__ == '__main__': + ip = get_env_var("MIROBO_IP") + token = get_env_var("MIROBO_TOKEN") + + c = VacuumController(ip, token) + if not c.test_connection(): + if ip is None or token is None: + c = VacuumController() + assert(c.test_connection()) + + export_ip_token(c.ip, c.token) + + diff --git a/get_loc_est.py b/get_loc_est.py new file mode 100644 index 0000000..c500ee5 --- /dev/null +++ b/get_loc_est.py @@ -0,0 +1,18 @@ +import argparse +import subprocess + +from libs.parser import get_slam_log + + +# args +parser = argparse.ArgumentParser( + description='Get Location Estimation from SLAM log on the vacuum' +) +parser.add_argument( + dest='filepath', + help='Specify output file path' +) +args, __ = parser.parse_known_args() + +# fetch slam +get_slam_log(outputfile=args.filepath) diff --git a/init_vacuum.sh b/init_vacuum.sh new file mode 100755 index 0000000..c8ac3df --- /dev/null +++ b/init_vacuum.sh @@ -0,0 +1,23 @@ +#!/bin/bash + +REMOTE="root@${MIROBO_IP}" +EXP_FP="/mnt/data/exp" +REMOTE_EXP_FP="${REMOTE}:${EXP_FP}" +REMOTE_CMD="ssh -t ${REMOTE}" + + +echo "IP: ${MIROBO_IP}" +echo "EXP_FP: ${EXP_FP}" + +echo "Create folder on vacuum.." +${REMOTE_CMD} mkdir -p ${EXP_FP}/libs + +echo "Push files to vacuum to run.." +scp ./libs/__init__.py ${REMOTE_EXP_FP} +scp ./libs/parser.py ${REMOTE_EXP_FP} +scp ./get_loc_est.py ${REMOTE_EXP_FP} + +echo "Install necessary packages" +${REMOTE_CMD} apt --yes install python3-minimal + +echo "Done!" diff --git a/libs/env.py b/libs/env.py new file mode 100644 index 0000000..abe090e --- /dev/null +++ b/libs/env.py @@ -0,0 +1,13 @@ +import os + + +def get_env_var(name): + return os.environ[name] if name in os.environ else None + + +def set_env_var(name, value): + os.environ[name] = value + + +def clear_env_var(name): + del os.environ[name] diff --git a/libs/exceptions.py b/libs/exceptions.py new file mode 100644 index 0000000..e69de29 diff --git a/libs/vacuum_controller.py b/libs/vacuum_controller.py new file mode 100644 index 0000000..8c3f29d --- /dev/null +++ b/libs/vacuum_controller.py @@ -0,0 +1,94 @@ +import miio +import codecs +import socket + + +class VacuumController(): + ''' + controlling xiaomi vacuum + ''' + + def __init__(self, ip=None, token=None): + self.ip = ip + self.token = token + + if self.ip is None: + self.finding_ip() + + self.vacuum = miio.Vacuum(ip=self.ip, token=self.token) + + if self.token is None: + self.get_token() + + def _discover_devices(self, timeout=5): + ips = [] + # broadcast magic handshake to find devices + s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + s.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) + s.settimeout(timeout) + s.sendto( + bytes.fromhex('21310020ffffffffffffffffffffffffffffffffffffffffffffffffffffffff'), + ('', 54321) + ) + while 1: + try: + __, addr = s.recvfrom(1024) + if addr[0] not in ips: + ips.append(addr[0]) + except socket.timeout: + break # ignore timeouts on discover + except Exception as e: + print('Error while reading discover results: {}'.format(e)) + break + return ips + + def finding_ip(self): + ''' + getting ip of vacuum + ''' + ips = self._discover_devices() + if not ips: + print('Err: cannot find any vacuum IP') + exit(-1) + if len(ips) == 1: + self.ip = ips[0] + return + print('Found multiple IPs:') + for i, ip in enumerate(ips): + print(' {0}. {1}'.format(i+1, ip)) + try: + selected = input('Please select one by typing number (1-{}): '.format(len(ips))) + self.ip = ips[int(selected)-1] + except KeyboardInterrupt: + print('User requested to exit') + exit(0) + except ValueError: + print('Err: Please enter only one number') + exit(-1) + except IndexError: + print('Err: Please enter one number between 1-{}'.format(len(ips))) + exit(-1) + except BaseException as e: + print('Err: {}'.format(e)) + exit(-1) + + def get_token(self): + ''' + getting token by handshaking with vacuum + ''' + print('Sending handshake to get token') + m = self.vacuum.do_discover() + self.vacuum.token = m.checksum + self.token = codecs.encode(m.checksum, 'hex') + + def test_connection(self): + ''' + test connection + ''' + try: + s = self.vacuum.status() + print(s) + return True + except Exception as e: + print('Err: {}'.format(e)) + return False diff --git a/manual_control.py b/manual_control.py deleted file mode 100644 index 129eddc..0000000 --- a/manual_control.py +++ /dev/null @@ -1,84 +0,0 @@ -import miio -import codecs -import socket - - -def discover_devices(): - timeout = 5 - seen_addrs = [] # type: List[str] - addr = '' - # magic, length 32 - helobytes = bytes.fromhex('21310020ffffffffffffffffffffffffffffffffffffffffffffffffffffffff') - s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) - s.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) - s.settimeout(timeout) - s.sendto(helobytes, (addr, 54321)) - while True: - try: - data, addr = s.recvfrom(1024) - if addr[0] not in seen_addrs: - seen_addrs.append(addr[0]) - except socket.timeout: - break # ignore timeouts on discover - except Exception as ex: - print('Error while reading discover results:', ex) - break - return seen_addrs - - -def select_item(welcome_text, items): - print(welcome_text) - for i, item in enumerate(items): - print('{}. {}'.format(i+1, item)) - try: - selected = input('Please select option by typing number (1-{}): '.format(len(items))) - result = items[int(selected)-1] - return result - except KeyboardInterrupt: - print('User requested to exit') - exit() - except ValueError: - print('Error! Please enter only one number') - exit() - except IndexError: - print('Error! Please enter one number between 1-{}'.format(len(items))) - exit() - - -ip_address = None -known_token = None - -if not ip_address: - print('Address is not set. Trying to discover.') - seen_addrs = discover_devices() - - if len(seen_addrs) == 0: - print('No devices discovered.') - exit() - elif len(seen_addrs) == 1: - ip_address = seen_addrs[0] - else: - ip_address = select_item('Choose device for connection:', seen_addrs) - -vacuum = miio.Vacuum(ip=ip_address, token=known_token) - -if not known_token: - print('Sending handshake to get token') - m = vacuum.do_discover() - vacuum.token = m.checksum - known_token = codecs.encode(m.checksum, 'hex') -else: - if len(known_token) == 16: - known_token = str(binascii.hexlify(bytes(known_token, encoding="utf8"))) - -print("ip_address: {}".format(ip_address)) -print("known_token: {}".format(known_token)) - -try: - s = vacuum.status() - print(s) -except Exception as ex: - print('error while checking device:', ex) - exit() - -vacuum.home()