xiaomi_vacuum_as_data_colle.../libs/parser_post.py

526 lines
16 KiB
Python

#!/usr/bin/python
import io
import os
import sys
import pickle
from time import sleep
from PIL import Image
from PIL import ImageDraw
from PIL import ImageChops
import numpy as np
import matplotlib.pyplot as plt
from libs.tshark import Tshark
RED = (255, 0, 0, 255)
PICKLE_MAP_SIZE = 64 # num
PICKLE_MAP_STEP = 0.1 # meter
WALL_PENETRATION = -30.0 # dB
WALL_REFLECTION = -15.0 # dB
np.set_printoptions(threshold=sys.maxsize)
def get_groundtruth_dict(f_gt):
gt = {}
if f_gt is None:
return gt
with open(f_gt, 'r') as f:
lines = f.readlines()
for line in lines:
if '#' in line:
continue
tmp = line.rstrip('\n').split(',')
addr = tmp[0]
loc_x = float(tmp[1])
loc_y = float(tmp[2])
gt[addr] = np.array([loc_x, loc_y])
return gt
def load_rss_data_with_pkt_types(fp: str, orientation: int) -> dict:
'''
split rss data into multiple types
'''
result = {}
with open(fp, 'r') as f:
lines = f.readlines()
for line in lines:
if '#' in line:
continue
data = line.rstrip().split(',')
pkt_type = int(data[9])
if pkt_type not in result:
result[pkt_type] = []
# rotate at (0, 0), the dock location
if (orientation % 4) is 0:
loc_x = float(data[0])
loc_y = float(data[1])
orient = float(data[2]) % (2 * np.pi)
elif (orientation % 4) is 1:
loc_x = -float(data[1])
loc_y = float(data[0])
orient = (float(data[2]) - 0.5 * np.pi) % (2 * np.pi)
elif (orientation % 4) is 2:
loc_x = -float(data[0])
loc_y = -float(data[1])
orient = (float(data[2]) - np.pi) % (2 * np.pi)
elif (orientation % 4) is 3:
loc_x = float(data[1])
loc_y = -float(data[0])
orient = (float(data[2]) + 0.5 * np.pi) % (2 * np.pi)
# only need to take x, y, RSS for now
result[pkt_type].append([loc_x, loc_y, float(data[5]), orient])
return result
def blocking_display_rss_map(
rss_map: np.ndarray,
visualize: bool = False,
output_map: bool = False,
vmin: float = -80.0,
vmax: float = -40.0,
fp: str = None
):
'''
'''
plt.imshow(
np.transpose(rss_map),
cmap='hot',
origin='lower',
interpolation='nearest',
vmin=vmin,
vmax=vmax
)
plt.colorbar()
# plt.show()
plt.draw()
if output_map:
plt.savefig("{}.png".format(fp), dpi=50)
if visualize:
plt.pause(0.1)
q = input("press Enter to continue... type q to quit: ")
if q == 'q':
sys.exit()
plt.close()
def convert_to_pickle_rss(
fp: str,
orientation: int,
labels: list = None, # the right groundtruth in rss map pixels
visualize: bool = False,
output_map: bool = False,
filters: int = None,
sampling: bool = False,
map_dim: tuple = None,
map_res: float = None
):
'''
modified from Zhuolin
'''
def find_index(array, lower, upper):
return np.where((array >= lower) & (array <= upper))[0]
# load data and split into different types
results = load_rss_data_with_pkt_types(fp, orientation)
# define map dimension
if map_dim is None:
map_dim = (PICKLE_MAP_SIZE, PICKLE_MAP_SIZE)
if map_res is None:
map_res = PICKLE_MAP_STEP
# pick the most frequent type
pkt_types = [(key, len(results[key])) for key in results.keys()]
pkt_types = sorted(pkt_types, key=lambda x: x[1], reverse=True)
print("most frequent data type is {} with {} pkts".format(pkt_types[0][0], pkt_types[0][1]))
data = results[pkt_types[0][0]]
# sort it and transpose it
data = np.transpose(np.array(sorted(data, key = lambda x: (x[0], x[1]))))
loc_x_min = min(data[0, :])
loc_x_max = max(data[0, :])
loc_y_min = min(data[1, :])
loc_y_max = max(data[1, :])
loc_x_center = (loc_x_min + loc_x_max) / 2.0
loc_y_center = (loc_y_min + loc_y_max) / 2.0
# convert it to a map
rss_map = np.ones(map_dim) * -85.0
factor = 0.75
for i in range(map_dim[0]):
# if i not in rss_map_dict:
# rss_map_dict[i] = {}
# search for x_idx
upper_bound_x = loc_x_center + map_res * (i - (map_dim[0] / 2) + 0.5)
lower_bound_x = upper_bound_x - map_res
data_x_idxs = find_index(
data[0, :],
lower_bound_x - factor * map_res,
upper_bound_x + factor * map_res
)
data_part = data[:, data_x_idxs]
if data_part.size is 0:
continue
data_y_idx = 0
for j in range(map_dim[1]):
# search for y_idx
upper_bound_y = loc_y_center + map_res * (j - (map_dim[1] / 2) + 0.5)
lower_bound_y = upper_bound_y - map_res
data_y_idxs = find_index(
data_part[1, :],
lower_bound_y - factor * map_res,
upper_bound_y + factor * map_res
)
data_fullfilled = data_part[2, data_y_idxs]
orientation_fullfilled = data_part[3, data_y_idxs]
if filters is 0:
data_fullfilled = data_fullfilled[(orientation_fullfilled > 1.75 * np.pi) | (orientation_fullfilled < 0.25 * np.pi)]
elif filters is 1:
data_fullfilled = data_fullfilled[(orientation_fullfilled > 1.25 * np.pi) & (orientation_fullfilled < 1.75 * np.pi)]
elif filters is 2:
data_fullfilled = data_fullfilled[(orientation_fullfilled > 0.75 * np.pi) & (orientation_fullfilled < 1.25 * np.pi)]
elif filters is 3:
data_fullfilled = data_fullfilled[(orientation_fullfilled > 0.25 * np.pi) & (orientation_fullfilled < 0.75 * np.pi)]
elif filters is 4:
data_fullfilled = data_fullfilled[(orientation_fullfilled > 1.5 * np.pi) | (orientation_fullfilled < 0.5 * np.pi)]
elif filters is 5:
data_fullfilled = data_fullfilled[(orientation_fullfilled > 0.5 * np.pi) & (orientation_fullfilled < 1.5 * np.pi)]
if data_fullfilled.size:
if sampling:
rss_map[i, j] = max(np.random.choice(data_fullfilled, 1)[0], -85.0)
else:
rss_map[i, j] = max(np.median(data_fullfilled), -85.0)
filepath = fp.replace(
".csv", "{}_pkttype_{}_map{}"
.format(
"_s{}".format(np.random.randint(0, 999999)) if sampling else "",
pkt_types[0][0],
"" if filters is None else "_{}".format(filters)
)
)
if visualize or output_map:
blocking_display_rss_map(rss_map, visualize=visualize, output_map=output_map, fp=filepath)
with open("{}.pickle".format(filepath), "wb") as f:
pickle.dump([rss_map, labels], f)
def extract_dev_from_combined(fp, minimalCounts=100, cleanup=True):
'''
extract each device data from combined file `fp`
'''
files = {}
folderpath, ext = os.path.splitext(fp)
try:
os.mkdir(folderpath)
except FileExistsError:
pass
except BaseException:
raise
with open(fp) as f:
lines = f.readlines()
for line in lines[1:]:
tmp = line.rstrip().split(",")
addr = tmp[3].replace(":", "")
if addr not in files:
files[addr] = []
files[addr].append(",".join(tmp[:3] + tmp[4:]))
for addr in list(files.keys()):
if len(files[addr]) < minimalCounts:
del files[addr]
title = lines[0].rstrip().split(",")
headline = ",".join(title[:3] + title[4:]) + "\n"
filepaths = []
for addr in files.keys():
filepath = "{}/{}.csv".format(folderpath, addr)
filepaths.append(filepath)
with open(filepath, "w") as f:
f.write(headline)
for line in files[addr]:
f.write("{}\n".format(line))
if len(files) > 0 and cleanup:
os.remove(fp)
return filepaths
def combine_sig_loc(sig_fp, loc_fp):
'''
append location to signal data
'''
filename, ext = os.path.splitext(loc_fp)
with open(sig_fp) as f:
sig_data = f.readlines()
with open(loc_fp) as f:
loc_data = f.readlines()
i_s = 1 # remove first line on info
i_l = 1 # remove first line on info
len_sig = len(sig_data)
len_loc = len(loc_data)
min_x = float('inf')
max_x = float('-inf')
min_y = float('inf')
max_y = float('-inf')
outfile = "{0}_sig.csv".format(filename.rstrip("_loc"))
with open(outfile, 'w') as f:
f.write("#x,y,orient," + sig_data[0][1:])
prev_i_s = 0
prev_i_l = 0
while i_s < len_sig:
if prev_i_s != i_s:
sig_tmp = sig_data[i_s].rstrip().split(',')
prev_i_s = i_s
if prev_i_l != i_l:
loc_tmp = loc_data[i_l].rstrip().split(',')
prev_i_l = i_l
epoch_sig = float(sig_tmp[1])
epoch_loc = float(loc_tmp[2]) / 1000.0
x = float(loc_tmp[3])
y = float(loc_tmp[4])
if x > max_x:
max_x = x
if x < min_x:
min_x = x
if y > max_y:
max_y = y
if y < min_y:
min_y = y
orientation = float(loc_tmp[5])
if epoch_sig > epoch_loc and i_l < len_loc - 1:
i_l += 1
continue
f.write("{},{},{},{}\n".format(x, y, orientation, ",".join(sig_tmp)))
i_s += 1
return outfile, ((min_x, min_y), (max_x, max_y))
def translate_pcap(pcap_fp, is_csi):
tshark = Tshark()
filepath, ext = os.path.splitext(pcap_fp)
outputfp = "{}.csv".format(filepath)
if os.path.isfile(outputfp):
return outputfp
if is_csi:
tshark.translateCSI(pcap_fp, outputfp)
else:
tshark.translatePcap(pcap_fp, outputfp)
return outputfp
def normalize_rss(rss):
rss = max(min(rss, -20), -85)
return (rss + 85) / 65.0
def get_locs_from_parsed_sig_data(sig_data, is_csi=False):
# loop each loc
locs_data = []
if is_csi:
print("Not implemented yet")
return locs_data
for line in sig_data:
tmp = line.rstrip().split(",")
x = float(tmp[0])
y = float(tmp[1])
rss = float(tmp[4])
# calculate color of rss
color = (RED[0], RED[1], RED[2], int(255 * normalize_rss(rss)))
pos = (x, y, color)
locs_data.append(pos)
return locs_data
def get_locs_from_slam_data(slam_data):
# loop each loc
locs_data = []
for line in slam_data:
tmp = line.rstrip().split(",")
robotime = float(tmp[1])
epoch = int(tmp[2])
x = float(tmp[3])
y = float(tmp[4])
yaw = float(tmp[5])
pos = (x, y, RED)
locs_data.append(pos)
return locs_data
def estimate_orientation(map_dim, reflections):
orientations = np.empty(map_dim, dtype=float) * float('nan')
for i in range(map_dim[0]):
for j in range(map_dim[1]):
if reflections[i, j] < -80.0:
continue
xs = []
ys = []
for off_i in range(-4, 5, 2):
end_i = min(max(i + off_i, 0), map_dim[0]-1)
for off_j in range(-4, 5, 2):
end_j = min(max(j + off_j, 0), map_dim[1]-1)
if reflections[end_i, end_j] > -100.0:
if end_i not in xs or end_j not in ys:
xs.append(end_i)
ys.append(end_j)
if len(xs) is 0:
continue
if len(set(xs)) == 1:
coefs = [None, 999.0]
else:
coefs = np.polynomial.polynomial.polyfit(xs, ys, 1)
orientations[i, j] = round(np.arctan2(coefs[1], 1.0) * 180 / np.pi, 1)
return orientations
def build_map(
f_map,
orientation,
minmax_xys,
markers=None,
visualize=False,
output_map=False,
map_dim=None,
map_res=None
):
'''
draws the path into the map. Returns the new map as a BytesIO
modded from https://github.com/dgiese/dustcloud/blob/71f7af3e2b9607548bcd845aca251326128f742c/dustcloud/build_map.py
'''
grey = (125, 125, 125, 255)
white = (255, 255, 255, 255)
blue = (0, 0, 255, 255)
pink = (255, 0, 255, 255)
red = (255, 0, 0, 255)
black = (0, 0, 0, 255)
# define map dimension
if map_dim is None:
map_dim = (PICKLE_MAP_SIZE, PICKLE_MAP_SIZE)
if map_res is None:
map_res = PICKLE_MAP_STEP
floor_map = np.zeros(map_dim)
# define map center x,y
center_x_val = (minmax_xys[1][0] + minmax_xys[0][0]) / 2
center_y_val = (minmax_xys[1][1] + minmax_xys[0][1]) / 2
# load map
with open(f_map, 'rb') as f:
map_image_data = f.read()
map_image = Image.open(io.BytesIO(map_image_data))
map_image = map_image.convert('RGBA')
map_pix = map_image.load()
# calculate center of the measurement
center_x = int(map_image.size[0] / 2 + center_x_val * 20)
center_y = int(map_image.size[1] / 2 - center_y_val * 20)
for i in range(map_dim[0]):
idx_x = center_x + map_res * 20 * (i - (map_dim[0] / 2))
for j in range(map_dim[1]):
idx_y = center_y - map_res * 20 * (j - (map_dim[1] / 2))
is_blocked = False
for kk in range(2):
for ll in range(2):
color = map_pix[idx_x + kk, idx_y + ll]
is_blocked = is_blocked or (color in [black, blue, pink, red])
if is_blocked:
if orientation % 4 == 0:
floor_map[i, j] = 1
elif orientation % 4 == 1:
floor_map[map_dim[1] - j - 1, i] = 1
elif orientation % 4 == 2:
floor_map[map_dim[0] - i - 1, map_dim[1] - j - 1] = 1
elif orientation % 4 == 3:
floor_map[j, map_dim[1] - i - 1] = 1
elif color not in [grey, white]:
print("unknown color: {}".format(color))
filepath, ext = os.path.splitext(f_map)
if visualize or output_map:
blocking_display_rss_map(
floor_map,
visualize=visualize,
output_map=output_map,
fp=filepath,
vmin=0.0,
vmax=1.0
)
penetrations = floor_map * WALL_PENETRATION
reflections = floor_map * WALL_REFLECTION
reflections[reflections == 0] = -100.0
orientations = estimate_orientation(map_dim, reflections)
# blocking_display_rss_map(
# abs(orientations),
# visualize=visualize,
# output_map=output_map,
# fp=filepath+'ori',
# vmin=-10,
# vmax=100
# )
with open(filepath.replace("_map", "_floormap.pickle"), "wb") as f:
pickle.dump([penetrations, reflections, orientations], f)
def test(args):
if args.loc and args.map:
with open(args.loc) as f:
# skip the first line which is coumn names
slam_data = f.readlines()[1:]
locs_data = get_locs_from_slam_data(slam_data)
with open(args.map, 'rb') as f:
map_image_data = f.read()
augmented_map = build_map(locs_data, map_image_data)
filepath, ext = os.path.splitext(args.map)
with open("{}.png".format(filepath), 'wb') as f:
f.write(augmented_map.getvalue())
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(
description='post processing test'
)
parser.add_argument(
'-l', '--location',
dest='loc',
default=None,
help='Specify location file path'
)
parser.add_argument(
'-m', '--map',
dest='map',
default=None,
help='Specify map file path'
)
args, __ = parser.parse_known_args()
test(args)