propagation_gan/syngen_floorplan_multipath.py

265 lines
8.0 KiB
Python

#!/usr/bin/python
import os
import sys
import time
import pickle
import argparse
import numpy as np
from multiprocessing import Process
from multiprocessing import Manager
from libs.util import convert_vector_to_mat
from libs.consts import RANDOM_SEED_TEST
from libs.consts import RANDOM_SEED_TRAIN
from libs.models import log_gamma_floorplan
from libs.spacemap import SpaceMap
from libs.spacemap import SpaceBlock
from libs.plotting import plotSpace
def getRandomTXLocs(
num: int,
width: float,
length: float,
offset_w: float = 0.0,
offset_l: float = 0.0
):
'''
'''
xys = np.random.rand(num, 2)
xys[:, 0] = xys[:, 0] * width + offset_w
xys[:, 1] = xys[:, 1] * length + offset_l
return xys
def getLocs(loc_x_range, loc_y_range, step_size: float = 0.3):
loc_xs = np.arange(loc_x_range[0], loc_x_range[1] + step_size, step_size)
loc_ys = np.arange(loc_y_range[0], loc_y_range[1] + step_size, step_size)
return np.stack(np.meshgrid(loc_xs, loc_ys), -1).reshape(-1, 2)
def getPowers(power_range, step_size: float = 2):
return np.arange(power_range[0], power_range[1] + step_size, step_size)
def log_gamma_floorplan_multi(floormap, rx_loc, tx_loc, power, gaussian_noise, results, kk):
rss, __ = log_gamma_floorplan(
floormap,
rx_loc,
tx_loc,
power,
gaussian_noise=gaussian_noise
)
# print(kk, rss)
results[kk] = rss
def floormap_noise_injection(floormap, args):
if not args.floormap_rand:
return
for i in range(floormap.map.shape[0]):
for j in range(floormap.map.shape[1]):
if np.random.random() < (1-args.floormap_rand_prob):
continue
loss_p, loss_r = floormap.getLoss(i, j)
loss_r = np.random.randint(-3, 3) - 10
floormap.setLoss(
i, j,
min(loss_p + np.random.randint(-10, 5), 0.0),
loss_r
)
floormap.setOrientation(
i, j,
np.random.random() * 2 * np.pi - np.pi
)
def generateData(floormap, tx_locs, args):
'''
'''
if args.train:
tag = "input_synthetic_train"
elif args.train_real:
tag = "input_real_emu_train"
elif args.test:
tag = "testing_real_emu"
tag = "{0}".format(tag)
if args.withnoise:
tag = "{}_noise_10dBvar".format(tag)
# create folder
folderp = "{}/{}".format(args.outfolder, tag)
if not os.path.isdir(folderp):
try:
os.makedirs(folderp)
except BaseException as e:
print("err: {}".format(e))
sys.exit(-1)
# generate
rx_locs = getLocs([0, 6.3], [0, 6.3], step_size=0.1)
powers = getPowers([-40, 30])
for i in range(tx_locs.shape[0]):
fp_base = "{0}/img_{1:.2f}_{2:.2f}_2.0".format(folderp, tx_locs[i, 0] - 3.2, tx_locs[i, 1] - 3.2)
for power in powers:
rss_vec = []
starttime = int(time.time())
# tx_loc = SpaceBlock(3.2, 3.2)
tx_loc = SpaceBlock(tx_locs[i, 0], tx_locs[i, 1])
if args.floormap_rand:
floormap = getFloormap(args)
floormap_noise_injection(floormap, args)
floormap.traceRays(power, tx_loc)
for j in range(0, rx_locs.shape[0], args.procnum):
procs = []
results = Manager().dict()
actual_num = min(args.procnum, rx_locs.shape[0] - j)
for kk in range(actual_num):
rx_loc = SpaceBlock(rx_locs[j + kk, 0], rx_locs[j + kk, 1])
proc = Process(
target=log_gamma_floorplan_multi,
args=(floormap, rx_loc, tx_loc, power, args.withnoise, results, kk)
)
proc.start()
procs.append(proc)
for proc in procs:
proc.join()
rss_vec.extend([results[kk] for kk in range(actual_num)])
# rx_loc = SpaceBlock(rx_locs[j, 0], rx_locs[j, 1])
# rss, multipaths = log_gamma_floorplan(
# floormap,
# rx_loc,
# tx_loc,
# power,
# gaussian_noise=args.withnoise
# )
# if args.visualize:
# paths = []
# for path in multipaths:
# paths.extend(list(path))
# print(rss, rx_loc, tx_loc)
# plotSpace(floormap, space_rays=paths, cminmax=(-50, 0.0))
# rss_vec.append(rss)
# print("results: {}".format(rss_vec))
rss_map = convert_vector_to_mat(rx_locs, np.array(rss_vec), (64, 64))
duration = (int(time.time()) - starttime)
print("duration: {}s ({:.2f}%) - len {}".format(duration, j/rx_locs.shape[0]*100.0, len(rss_vec)))
with open("{}_{}.pickle".format(fp_base, int(power)), 'wb') as f:
pickle.dump(rss_map, f, pickle.HIGHEST_PROTOCOL)
loss_p, loss_r = floormap.getLosses()
orient = floormap.getOrientations()
with open("{}_{}_floormap.pickle".format(fp_base, int(power)), 'wb') as f:
pickle.dump([loss_p, loss_r, orient], f, pickle.HIGHEST_PROTOCOL)
def getFloormap(args):
'''
'''
floormap = SpaceMap(width=6.4, length=6.4, block_size=0.1)
penetrations, reflections, orientations = pickle.load(open(args.floormap, "rb"))
floormap.setLosses(penetrations, reflections)
floormap.setOrientations(orientations)
return floormap
def main(args):
tx_locs = None
if args.train:
np.random.seed(RANDOM_SEED_TRAIN)
tx_locs = getLocs([0.2, 6.2], [0.2, 6.2], step_size=0.3)
elif args.train_real:
np.random.seed(RANDOM_SEED_TRAIN)
tx_locs = getRandomTXLocs(400, 6.4, 6.4, offset_w=0.0, offset_l=0.0)
elif args.test:
np.random.seed(RANDOM_SEED_TEST)
tx_locs = getRandomTXLocs(400, 6.4, 6.4, offset_w=0.0, offset_l=0.0)
else:
print("nothing specified")
return
generateData(getFloormap(args), tx_locs, args)
if __name__ == '__main__':
p = argparse.ArgumentParser(description='Data Generator with Log Gamma Distance Model')
p.add_argument(
'outfolder',
help='output generated data to folder'
)
p.add_argument(
'--training',
dest='train',
action='store_true',
default=False,
help='generate training data (synthetic part)'
)
p.add_argument(
'--training-real',
dest='train_real',
action='store_true',
default=False,
help='emulating training data (real data part)'
)
p.add_argument(
'--testing',
dest='test',
action='store_true',
default=False,
help='emulating testing data (real data part)'
)
p.add_argument(
'--visualize', '-v',
dest='visualize',
action='store_true',
default=False,
help='visualize'
)
p.add_argument(
'--floormap', '-f',
dest='floormap',
default=None,
help='the base of floormap data'
)
p.add_argument(
'--floormap-rand', '-frnd',
dest='floormap_rand',
action='store_true',
default=False,
help='add some random penetration/reflection losses in floormap'
)
p.add_argument(
'--floormap-rand-prob', '-frndprob',
dest='floormap_rand_prob',
type=float,
default=0.2,
help='set the probability of adding the randomness'
)
p.add_argument(
'--with-noise',
dest='withnoise',
action='store_true',
default=False,
help='add 10dB noise to generated data'
)
p.add_argument(
'--proc', '-p',
dest='procnum',
type=int,
default=1,
help='number of parallel process'
)
try:
args = p.parse_args()
except BaseException as e:
print(e)
sys.exit()
main(args)