iw_intel8260_localization/libMeasurement.py

333 lines
10 KiB
Python
Executable File

#!/usr/bin/python
# -*- coding: utf-8 -*-
import re
import os
import time
import json
import argparse
import subprocess
from numpy import median, sqrt
from libLocalization import deriveLocation
def which(program):
'''
check if a certain program exists
'''
def is_executable(fp):
return os.path.isfile(fp) and os.access(fp, os.X_OK)
fp, fn = os.path.split(program)
if fp:
if is_executable(program):
return program
else:
for path in os.environ["PATH"].split(os.pathsep):
path = path.strip('"')
exec_file = os.path.join(path, program)
if is_executable(exec_file):
return exec_file
return None
class Measurement(object):
def __init__(self, interface, ofp=None, cali=(1.0, 0.0)):
self.outf = None
self.interface = interface
# default file path for config for iw ftm_request
self.config_fp = '/tmp/config_entry'
if ofp:
try:
self.outf = open(ofp, 'w')
self.outf.write(
'MAC,caliDist(cm),rawRTT(psec),rawRTTVar,rawDist(cm),' +
'rawDistVar,rssi(dBm),time(sec)\n'
)
self.outf.write(
"ff:ff:ff:ff:ff:ff,nan,nan,nan,nan,nan,nan,{0:.6f}\n"
.format(time.time())
)
except Exception as e:
print(str(e))
self.regex = (
r"Target: (([0-9a-f]{2}:*){6}), " +
r"status: ([0-9]), rtt: ([0-9\-]+) \(±([0-9\-]+)\) psec, " +
r"distance: ([0-9\-]+) \(±([0-9\-]+)\) cm, rssi: ([0-9\-]+) dBm"
)
self.cali = cali
if not self.check_iw_validity():
exit(127) # command not found
def check_iw_validity(self):
'''
check if iw exists and support FTM commands
'''
iwPath = which('iw')
if iwPath is None:
print('Err: iw command not found!')
return False
p = subprocess.Popen(
"iw --help | grep FTM",
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
shell=True
)
out, err = p.communicate()
if err:
print('Err: {0}'.format(err))
return False
if 'FTM' not in out:
print('Err: iw command does not support FTM')
return False
return True
def prepare_config_file(self, targets):
if not isinstance(targets, dict):
return False
with open(self.config_fp, 'w') as of:
for bssid in targets:
of.write(
"{0} bw={1} cf={2} retries={3} asap spb={4}\n"
.format(
bssid,
targets[bssid]['bw'],
targets[bssid]['cf'],
targets[bssid]['retries'],
targets[bssid]['spb'],
)
)
return True
def get_distance_once(self, verbose=False):
p = subprocess.Popen(
"iw wlp58s0 measurement ftm_request " +
"{0}".format(self.config_fp),
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
shell=True
)
out, err = p.communicate()
if err:
print(err)
exit(13)
matches = re.finditer(self.regex, out)
if not matches:
return []
result = []
mytime = time.time()
for match in matches:
mac = match.group(1)
status = int(match.group(3))
rtt = int(match.group(4))
rtt_var = int(match.group(5))
raw_distance = int(match.group(6))
raw_distance_var = int(match.group(7))
rssi = int(match.group(8))
if status is not 0 or raw_distance < -1000 or raw_distance > 10000:
continue
distance = self.cali[0] * raw_distance + self.cali[1]
result.append(
(mac, distance, rtt, rtt_var,
raw_distance, raw_distance_var, rssi)
)
if verbose:
print(
'*** {0} - {1}dBm - {2}{3:.2f}cm)'
.format(mac, rssi, raw_distance, sqrt(raw_distance_var))
)
if self.outf is not None:
self.outf.write(
"{0},{1:.2f},{2},{3},{4},{5},{6},{7:.6f}\n"
.format(
mac, distance, rtt, rtt_var,
raw_distance, raw_distance_var,
rssi, mytime
)
)
return result
def get_distance_median(self, rounds=1, verbose=False):
'''
use median instead of mean for less bias with small number of rounds
'''
result = {}
median_result = {}
if rounds < 1:
rounds = 1
for i in range(rounds):
# no guarantee that all rounds are successful
for each in self.get_distance_once(verbose=verbose):
if each[0] not in result:
result[each[0]] = []
result[each[0]].append(each[1:])
for mac in result:
median_result[mac] = (
median([x[0] for x in result[mac]]),
median(
[sqrt(x[4]) * self.cali[0] for x in result[mac]]
)
)
return median_result
def __enter__(self):
return self
def __exit__(self, exc_type, exc_value, traceback):
# properly close the file when destroying the object
if self.outf is not None:
self.outf.write(
"ff:ff:ff:ff:ff:ff,nan,nan,nan,nan,nan,nan,{0:.6f}\n"
.format(time.time())
)
self.outf.close()
def wrapper(args):
if os.path.isfile(args['json']):
args['config_entry'] = json.load(open(args['json'], 'r'))
print('Successfully loaded {0}!'.format(args['json']))
else: # default config
args['config_entry'] = {
'34:f6:4b:5e:69:1f': {
'bw': 20,
'cf': 2462,
'spb': 255,
'retries': 3
}
}
counter = 1
if args['plot']:
try:
import matplotlib.pyplot as plt
from libLocalization import plotLocation
handler = None
fig = plt.figure()
plt.ion()
plt.xlim([-200, 500])
plt.ylim([-10, 1000])
except Exception:
args['plot'] = False
print('Cannot plot because lacking matplotlib!')
with Measurement(
args['interface'],
ofp=args['outfp'], cali=args['cali']
) as m:
while 1:
print('Round {0}'.format(counter))
try:
m.prepare_config_file(args['config_entry'])
# only print out results
results = m.get_distance_median(
rounds=args['rounds'], verbose=args['verbose']
)
for mac in results:
print(
'* {0} is {1:.4f}cm (±{2:.2f}) away.'
.format(mac, results[mac][0], results[mac][1])
)
# calculate location info
if args['locs']:
loc = deriveLocation(args, results)
print(
'* Derived location: ({0:.3f}, {1:.3f})'
.format(loc[0], loc[1])
)
if args['plot']:
try:
if handler is not None:
handler.remove()
handler = plotLocation(loc)
if handler is None:
plt.close(fig)
except KeyboardInterrupt:
plt.close(fig)
break
except Exception:
raise
except KeyboardInterrupt:
break
except Exception as e:
print(str(e))
break
counter += 1
def main():
p = argparse.ArgumentParser(description='iw measurement tool')
p.add_argument(
'--cali',
nargs=2,
# default=(0.9376, 558.0551), # indoor
default=(0.8927, 553.3157), # outdoor
type=float,
help="calibrate calibration params (pre-defined outdoor by default)"
)
p.add_argument(
'--outfp', '-f',
default=None,
help="if set, will write raw fetched data to file"
)
p.add_argument(
'--rounds',
default=1,
type=int,
help="how many rounds to run one command; default is 1"
)
p.add_argument(
'--interface', '-i',
default='wlp58s0',
help="set the wireless interface"
)
p.add_argument(
'--json', '-j',
default='config_entry.default',
help="load a config json file"
)
p.add_argument(
'--verbose', '-v',
default=False,
action="store_true",
help="if set, show detailed messages"
)
p.add_argument(
'--indoor',
default=False,
action="store_true",
help=(
"if set, use default indoor calibration params " +
"(will be ignored if `cali` is being used)"
)
)
p.add_argument(
'--locs',
default=False,
action="store_true",
help="if set, derive location and store it to file"
)
p.add_argument(
'--plot',
default=False,
action="store_true",
help="if set, will plot the derived location in realtime"
)
try:
args = vars(p.parse_args())
except Exception as e:
print(str(e))
sys.exit()
if args['indoor'] and args['cali'] == (0.8927, 553.3157):
args['cali'] = (0.9376, 558.0551)
args['time_of_exec'] = int(time.time())
# TODO: add option to change loc bounds, currently force y_min = 0
args['loc_bounds'] = {'y_min': 0}
# rename file path by adding time of exec
if args['outfp']:
fp, ext = os.path.splitext(args['outfp'])
args['outfp'] = "{0}_{1}{2}".format(fp, args['time_of_exec'], ext)
wrapper(args)
if __name__ == '__main__':
main()