526 lines
16 KiB
Python
526 lines
16 KiB
Python
#!/usr/bin/python
|
|
|
|
import io
|
|
import os
|
|
import sys
|
|
import pickle
|
|
from time import sleep
|
|
|
|
from PIL import Image
|
|
from PIL import ImageDraw
|
|
from PIL import ImageChops
|
|
|
|
import numpy as np
|
|
import matplotlib.pyplot as plt
|
|
|
|
from libs.tshark import Tshark
|
|
|
|
|
|
RED = (255, 0, 0, 255)
|
|
PICKLE_MAP_SIZE = 64 # num
|
|
PICKLE_MAP_STEP = 0.1 # meter
|
|
WALL_PENETRATION = -30.0 # dB
|
|
WALL_REFLECTION = -15.0 # dB
|
|
np.set_printoptions(threshold=sys.maxsize)
|
|
|
|
|
|
def get_groundtruth_dict(f_gt):
|
|
gt = {}
|
|
if f_gt is None:
|
|
return gt
|
|
with open(f_gt, 'r') as f:
|
|
lines = f.readlines()
|
|
for line in lines:
|
|
if '#' in line:
|
|
continue
|
|
tmp = line.rstrip('\n').split(',')
|
|
addr = tmp[0]
|
|
loc_x = float(tmp[1])
|
|
loc_y = float(tmp[2])
|
|
gt[addr] = np.array([loc_x, loc_y])
|
|
return gt
|
|
|
|
|
|
def load_rss_data_with_pkt_types(fp: str, orientation: int) -> dict:
|
|
'''
|
|
split rss data into multiple types
|
|
'''
|
|
result = {}
|
|
with open(fp, 'r') as f:
|
|
lines = f.readlines()
|
|
for line in lines:
|
|
if '#' in line:
|
|
continue
|
|
data = line.rstrip().split(',')
|
|
pkt_type = int(data[9])
|
|
if pkt_type not in result:
|
|
result[pkt_type] = []
|
|
# rotate at (0, 0), the dock location
|
|
if (orientation % 4) is 0:
|
|
loc_x = float(data[0])
|
|
loc_y = float(data[1])
|
|
orient = float(data[2]) % (2 * np.pi)
|
|
elif (orientation % 4) is 1:
|
|
loc_x = -float(data[1])
|
|
loc_y = float(data[0])
|
|
orient = (float(data[2]) - 0.5 * np.pi) % (2 * np.pi)
|
|
elif (orientation % 4) is 2:
|
|
loc_x = -float(data[0])
|
|
loc_y = -float(data[1])
|
|
orient = (float(data[2]) - np.pi) % (2 * np.pi)
|
|
elif (orientation % 4) is 3:
|
|
loc_x = float(data[1])
|
|
loc_y = -float(data[0])
|
|
orient = (float(data[2]) + 0.5 * np.pi) % (2 * np.pi)
|
|
# only need to take x, y, RSS for now
|
|
result[pkt_type].append([loc_x, loc_y, float(data[5]), orient])
|
|
return result
|
|
|
|
|
|
def blocking_display_rss_map(
|
|
rss_map: np.ndarray,
|
|
visualize: bool = False,
|
|
output_map: bool = False,
|
|
vmin: float = -80.0,
|
|
vmax: float = -40.0,
|
|
fp: str = None
|
|
):
|
|
'''
|
|
'''
|
|
plt.imshow(
|
|
np.transpose(rss_map),
|
|
cmap='hot',
|
|
origin='lower',
|
|
interpolation='nearest',
|
|
vmin=vmin,
|
|
vmax=vmax
|
|
)
|
|
plt.colorbar()
|
|
# plt.show()
|
|
plt.draw()
|
|
if output_map:
|
|
plt.savefig("{}.png".format(fp), dpi=50)
|
|
if visualize:
|
|
plt.pause(0.1)
|
|
q = input("press Enter to continue... type q to quit: ")
|
|
if q == 'q':
|
|
sys.exit()
|
|
plt.close()
|
|
|
|
|
|
def convert_to_pickle_rss(
|
|
fp: str,
|
|
orientation: int,
|
|
labels: list = None, # the right groundtruth in rss map pixels
|
|
visualize: bool = False,
|
|
output_map: bool = False,
|
|
filters: int = None,
|
|
sampling: bool = False,
|
|
map_dim: tuple = None,
|
|
map_res: float = None
|
|
):
|
|
'''
|
|
modified from Zhuolin
|
|
'''
|
|
|
|
def find_index(array, lower, upper):
|
|
return np.where((array >= lower) & (array <= upper))[0]
|
|
|
|
# load data and split into different types
|
|
results = load_rss_data_with_pkt_types(fp, orientation)
|
|
|
|
# define map dimension
|
|
if map_dim is None:
|
|
map_dim = (PICKLE_MAP_SIZE, PICKLE_MAP_SIZE)
|
|
|
|
if map_res is None:
|
|
map_res = PICKLE_MAP_STEP
|
|
|
|
# pick the most frequent type
|
|
pkt_types = [(key, len(results[key])) for key in results.keys()]
|
|
pkt_types = sorted(pkt_types, key=lambda x: x[1], reverse=True)
|
|
print("most frequent data type is {} with {} pkts".format(pkt_types[0][0], pkt_types[0][1]))
|
|
data = results[pkt_types[0][0]]
|
|
|
|
# sort it and transpose it
|
|
data = np.transpose(np.array(sorted(data, key = lambda x: (x[0], x[1]))))
|
|
loc_x_min = min(data[0, :])
|
|
loc_x_max = max(data[0, :])
|
|
loc_y_min = min(data[1, :])
|
|
loc_y_max = max(data[1, :])
|
|
loc_x_center = (loc_x_min + loc_x_max) / 2.0
|
|
loc_y_center = (loc_y_min + loc_y_max) / 2.0
|
|
|
|
# convert it to a map
|
|
rss_map = np.ones(map_dim) * -85.0
|
|
factor = 0.75
|
|
|
|
for i in range(map_dim[0]):
|
|
# if i not in rss_map_dict:
|
|
# rss_map_dict[i] = {}
|
|
# search for x_idx
|
|
upper_bound_x = loc_x_center + map_res * (i - (map_dim[0] / 2) + 0.5)
|
|
lower_bound_x = upper_bound_x - map_res
|
|
data_x_idxs = find_index(
|
|
data[0, :],
|
|
lower_bound_x - factor * map_res,
|
|
upper_bound_x + factor * map_res
|
|
)
|
|
data_part = data[:, data_x_idxs]
|
|
if data_part.size is 0:
|
|
continue
|
|
data_y_idx = 0
|
|
for j in range(map_dim[1]):
|
|
# search for y_idx
|
|
upper_bound_y = loc_y_center + map_res * (j - (map_dim[1] / 2) + 0.5)
|
|
lower_bound_y = upper_bound_y - map_res
|
|
data_y_idxs = find_index(
|
|
data_part[1, :],
|
|
lower_bound_y - factor * map_res,
|
|
upper_bound_y + factor * map_res
|
|
)
|
|
data_fullfilled = data_part[2, data_y_idxs]
|
|
orientation_fullfilled = data_part[3, data_y_idxs]
|
|
if filters is 0:
|
|
data_fullfilled = data_fullfilled[(orientation_fullfilled > 1.75 * np.pi) | (orientation_fullfilled < 0.25 * np.pi)]
|
|
elif filters is 1:
|
|
data_fullfilled = data_fullfilled[(orientation_fullfilled > 1.25 * np.pi) & (orientation_fullfilled < 1.75 * np.pi)]
|
|
elif filters is 2:
|
|
data_fullfilled = data_fullfilled[(orientation_fullfilled > 0.75 * np.pi) & (orientation_fullfilled < 1.25 * np.pi)]
|
|
elif filters is 3:
|
|
data_fullfilled = data_fullfilled[(orientation_fullfilled > 0.25 * np.pi) & (orientation_fullfilled < 0.75 * np.pi)]
|
|
elif filters is 4:
|
|
data_fullfilled = data_fullfilled[(orientation_fullfilled > 1.5 * np.pi) | (orientation_fullfilled < 0.5 * np.pi)]
|
|
elif filters is 5:
|
|
data_fullfilled = data_fullfilled[(orientation_fullfilled > 0.5 * np.pi) & (orientation_fullfilled < 1.5 * np.pi)]
|
|
if data_fullfilled.size:
|
|
if sampling:
|
|
rss_map[i, j] = max(np.random.choice(data_fullfilled, 1)[0], -85.0)
|
|
else:
|
|
rss_map[i, j] = max(np.median(data_fullfilled), -85.0)
|
|
|
|
filepath = fp.replace(
|
|
".csv", "{}_pkttype_{}_map{}"
|
|
.format(
|
|
"_s{}".format(np.random.randint(0, 999999)) if sampling else "",
|
|
pkt_types[0][0],
|
|
"" if filters is None else "_{}".format(filters)
|
|
)
|
|
)
|
|
|
|
if visualize or output_map:
|
|
blocking_display_rss_map(rss_map, visualize=visualize, output_map=output_map, fp=filepath)
|
|
|
|
with open("{}.pickle".format(filepath), "wb") as f:
|
|
pickle.dump([rss_map, labels], f)
|
|
|
|
|
|
def extract_dev_from_combined(fp, minimalCounts=100, cleanup=True):
|
|
'''
|
|
extract each device data from combined file `fp`
|
|
'''
|
|
files = {}
|
|
folderpath, ext = os.path.splitext(fp)
|
|
|
|
try:
|
|
os.mkdir(folderpath)
|
|
except FileExistsError:
|
|
pass
|
|
except BaseException:
|
|
raise
|
|
|
|
with open(fp) as f:
|
|
lines = f.readlines()
|
|
|
|
for line in lines[1:]:
|
|
tmp = line.rstrip().split(",")
|
|
addr = tmp[3].replace(":", "")
|
|
if addr not in files:
|
|
files[addr] = []
|
|
files[addr].append(",".join(tmp[:3] + tmp[4:]))
|
|
|
|
for addr in list(files.keys()):
|
|
if len(files[addr]) < minimalCounts:
|
|
del files[addr]
|
|
|
|
title = lines[0].rstrip().split(",")
|
|
headline = ",".join(title[:3] + title[4:]) + "\n"
|
|
filepaths = []
|
|
|
|
for addr in files.keys():
|
|
filepath = "{}/{}.csv".format(folderpath, addr)
|
|
filepaths.append(filepath)
|
|
with open(filepath, "w") as f:
|
|
f.write(headline)
|
|
for line in files[addr]:
|
|
f.write("{}\n".format(line))
|
|
|
|
if len(files) > 0 and cleanup:
|
|
os.remove(fp)
|
|
|
|
return filepaths
|
|
|
|
|
|
def combine_sig_loc(sig_fp, loc_fp):
|
|
'''
|
|
append location to signal data
|
|
'''
|
|
filename, ext = os.path.splitext(loc_fp)
|
|
with open(sig_fp) as f:
|
|
sig_data = f.readlines()
|
|
with open(loc_fp) as f:
|
|
loc_data = f.readlines()
|
|
i_s = 1 # remove first line on info
|
|
i_l = 1 # remove first line on info
|
|
len_sig = len(sig_data)
|
|
len_loc = len(loc_data)
|
|
min_x = float('inf')
|
|
max_x = float('-inf')
|
|
min_y = float('inf')
|
|
max_y = float('-inf')
|
|
outfile = "{0}_sig.csv".format(filename.rstrip("_loc"))
|
|
with open(outfile, 'w') as f:
|
|
f.write("#x,y,orient," + sig_data[0][1:])
|
|
prev_i_s = 0
|
|
prev_i_l = 0
|
|
while i_s < len_sig:
|
|
if prev_i_s != i_s:
|
|
sig_tmp = sig_data[i_s].rstrip().split(',')
|
|
prev_i_s = i_s
|
|
if prev_i_l != i_l:
|
|
loc_tmp = loc_data[i_l].rstrip().split(',')
|
|
prev_i_l = i_l
|
|
epoch_sig = float(sig_tmp[1])
|
|
epoch_loc = float(loc_tmp[2]) / 1000.0
|
|
x = float(loc_tmp[3])
|
|
y = float(loc_tmp[4])
|
|
if x > max_x:
|
|
max_x = x
|
|
if x < min_x:
|
|
min_x = x
|
|
if y > max_y:
|
|
max_y = y
|
|
if y < min_y:
|
|
min_y = y
|
|
orientation = float(loc_tmp[5])
|
|
if epoch_sig > epoch_loc and i_l < len_loc - 1:
|
|
i_l += 1
|
|
continue
|
|
f.write("{},{},{},{}\n".format(x, y, orientation, ",".join(sig_tmp)))
|
|
i_s += 1
|
|
return outfile, ((min_x, min_y), (max_x, max_y))
|
|
|
|
|
|
def translate_pcap(pcap_fp, is_csi):
|
|
tshark = Tshark()
|
|
filepath, ext = os.path.splitext(pcap_fp)
|
|
outputfp = "{}.csv".format(filepath)
|
|
if os.path.isfile(outputfp):
|
|
return outputfp
|
|
if is_csi:
|
|
tshark.translateCSI(pcap_fp, outputfp)
|
|
else:
|
|
tshark.translatePcap(pcap_fp, outputfp)
|
|
return outputfp
|
|
|
|
|
|
def normalize_rss(rss):
|
|
rss = max(min(rss, -20), -85)
|
|
return (rss + 85) / 65.0
|
|
|
|
|
|
def get_locs_from_parsed_sig_data(sig_data, is_csi=False):
|
|
# loop each loc
|
|
locs_data = []
|
|
|
|
if is_csi:
|
|
print("Not implemented yet")
|
|
return locs_data
|
|
|
|
for line in sig_data:
|
|
tmp = line.rstrip().split(",")
|
|
x = float(tmp[0])
|
|
y = float(tmp[1])
|
|
rss = float(tmp[4])
|
|
# calculate color of rss
|
|
color = (RED[0], RED[1], RED[2], int(255 * normalize_rss(rss)))
|
|
pos = (x, y, color)
|
|
locs_data.append(pos)
|
|
return locs_data
|
|
|
|
|
|
|
|
def get_locs_from_slam_data(slam_data):
|
|
# loop each loc
|
|
locs_data = []
|
|
for line in slam_data:
|
|
tmp = line.rstrip().split(",")
|
|
robotime = float(tmp[1])
|
|
epoch = int(tmp[2])
|
|
x = float(tmp[3])
|
|
y = float(tmp[4])
|
|
yaw = float(tmp[5])
|
|
pos = (x, y, RED)
|
|
locs_data.append(pos)
|
|
return locs_data
|
|
|
|
|
|
def estimate_orientation(map_dim, reflections):
|
|
orientations = np.empty(map_dim, dtype=float) * float('nan')
|
|
for i in range(map_dim[0]):
|
|
for j in range(map_dim[1]):
|
|
if reflections[i, j] < -80.0:
|
|
continue
|
|
xs = []
|
|
ys = []
|
|
for off_i in range(-4, 5, 2):
|
|
end_i = min(max(i + off_i, 0), map_dim[0]-1)
|
|
for off_j in range(-4, 5, 2):
|
|
end_j = min(max(j + off_j, 0), map_dim[1]-1)
|
|
if reflections[end_i, end_j] > -100.0:
|
|
if end_i not in xs or end_j not in ys:
|
|
xs.append(end_i)
|
|
ys.append(end_j)
|
|
if len(xs) is 0:
|
|
continue
|
|
if len(set(xs)) == 1:
|
|
coefs = [None, 999.0]
|
|
else:
|
|
coefs = np.polynomial.polynomial.polyfit(xs, ys, 1)
|
|
orientations[i, j] = round(np.arctan2(coefs[1], 1.0) * 180 / np.pi, 1)
|
|
return orientations
|
|
|
|
|
|
def build_map(
|
|
f_map,
|
|
orientation,
|
|
minmax_xys,
|
|
markers=None,
|
|
visualize=False,
|
|
output_map=False,
|
|
map_dim=None,
|
|
map_res=None
|
|
):
|
|
'''
|
|
draws the path into the map. Returns the new map as a BytesIO
|
|
modded from https://github.com/dgiese/dustcloud/blob/71f7af3e2b9607548bcd845aca251326128f742c/dustcloud/build_map.py
|
|
'''
|
|
grey = (125, 125, 125, 255)
|
|
white = (255, 255, 255, 255)
|
|
blue = (0, 0, 255, 255)
|
|
pink = (255, 0, 255, 255)
|
|
red = (255, 0, 0, 255)
|
|
black = (0, 0, 0, 255)
|
|
|
|
# define map dimension
|
|
if map_dim is None:
|
|
map_dim = (PICKLE_MAP_SIZE, PICKLE_MAP_SIZE)
|
|
if map_res is None:
|
|
map_res = PICKLE_MAP_STEP
|
|
floor_map = np.zeros(map_dim)
|
|
|
|
# define map center x,y
|
|
center_x_val = (minmax_xys[1][0] + minmax_xys[0][0]) / 2
|
|
center_y_val = (minmax_xys[1][1] + minmax_xys[0][1]) / 2
|
|
|
|
# load map
|
|
with open(f_map, 'rb') as f:
|
|
map_image_data = f.read()
|
|
|
|
map_image = Image.open(io.BytesIO(map_image_data))
|
|
map_image = map_image.convert('RGBA')
|
|
map_pix = map_image.load()
|
|
|
|
# calculate center of the measurement
|
|
center_x = int(map_image.size[0] / 2 + center_x_val * 20)
|
|
center_y = int(map_image.size[1] / 2 - center_y_val * 20)
|
|
|
|
for i in range(map_dim[0]):
|
|
idx_x = center_x + map_res * 20 * (i - (map_dim[0] / 2))
|
|
for j in range(map_dim[1]):
|
|
idx_y = center_y - map_res * 20 * (j - (map_dim[1] / 2))
|
|
is_blocked = False
|
|
for kk in range(2):
|
|
for ll in range(2):
|
|
color = map_pix[idx_x + kk, idx_y + ll]
|
|
is_blocked = is_blocked or (color in [black, blue, pink, red])
|
|
if is_blocked:
|
|
if orientation % 4 == 0:
|
|
floor_map[i, j] = 1
|
|
elif orientation % 4 == 1:
|
|
floor_map[map_dim[1] - j - 1, i] = 1
|
|
elif orientation % 4 == 2:
|
|
floor_map[map_dim[0] - i - 1, map_dim[1] - j - 1] = 1
|
|
elif orientation % 4 == 3:
|
|
floor_map[j, map_dim[1] - i - 1] = 1
|
|
elif color not in [grey, white]:
|
|
print("unknown color: {}".format(color))
|
|
|
|
filepath, ext = os.path.splitext(f_map)
|
|
|
|
if visualize or output_map:
|
|
blocking_display_rss_map(
|
|
floor_map,
|
|
visualize=visualize,
|
|
output_map=output_map,
|
|
fp=filepath,
|
|
vmin=0.0,
|
|
vmax=1.0
|
|
)
|
|
|
|
penetrations = floor_map * WALL_PENETRATION
|
|
reflections = floor_map * WALL_REFLECTION
|
|
reflections[reflections == 0] = -100.0
|
|
orientations = estimate_orientation(map_dim, reflections)
|
|
# blocking_display_rss_map(
|
|
# abs(orientations),
|
|
# visualize=visualize,
|
|
# output_map=output_map,
|
|
# fp=filepath+'ori',
|
|
# vmin=-10,
|
|
# vmax=100
|
|
# )
|
|
|
|
with open(filepath.replace("_map", "_floormap.pickle"), "wb") as f:
|
|
pickle.dump([penetrations, reflections, orientations], f)
|
|
|
|
|
|
def test(args):
|
|
if args.loc and args.map:
|
|
with open(args.loc) as f:
|
|
# skip the first line which is coumn names
|
|
slam_data = f.readlines()[1:]
|
|
locs_data = get_locs_from_slam_data(slam_data)
|
|
with open(args.map, 'rb') as f:
|
|
map_image_data = f.read()
|
|
augmented_map = build_map(locs_data, map_image_data)
|
|
filepath, ext = os.path.splitext(args.map)
|
|
with open("{}.png".format(filepath), 'wb') as f:
|
|
f.write(augmented_map.getvalue())
|
|
|
|
|
|
if __name__ == "__main__":
|
|
import argparse
|
|
|
|
parser = argparse.ArgumentParser(
|
|
description='post processing test'
|
|
)
|
|
|
|
parser.add_argument(
|
|
'-l', '--location',
|
|
dest='loc',
|
|
default=None,
|
|
help='Specify location file path'
|
|
)
|
|
|
|
parser.add_argument(
|
|
'-m', '--map',
|
|
dest='map',
|
|
default=None,
|
|
help='Specify map file path'
|
|
)
|
|
|
|
args, __ = parser.parse_known_args()
|
|
|
|
test(args)
|