add traditional fitting test
This commit is contained in:
parent
9c5533ceaf
commit
fa31fb46fd
|
|
@ -1,11 +1,16 @@
|
|||
#!/usr/bin/python
|
||||
|
||||
|
||||
RANDOM_SEED = 0
|
||||
RANDOM_SEED = 666
|
||||
|
||||
GAUSSIAN_NOISE_MEAN = 0.0 # dB
|
||||
GAUSSIAN_NOISE_STD = 3.16 # 10 dB variance
|
||||
|
||||
NOISE_FLOOR = -85 #dB
|
||||
NOISE_FLOOR = -85.0 #dB
|
||||
|
||||
FLOAT_TOLERANCE = 0.001
|
||||
|
||||
MAX_RANGE = 10.0 # meter
|
||||
|
||||
COLORMAP_MIN = -60.0
|
||||
COLORMAP_MAX = -30.0
|
||||
|
|
|
|||
|
|
@ -0,0 +1,91 @@
|
|||
#!/usr/bin/python
|
||||
|
||||
|
||||
import numpy as np
|
||||
from scipy.optimize import curve_fit
|
||||
from libs.models import log_gamma_loc
|
||||
|
||||
|
||||
def modelfit_log_gamma_func(input_data, *target_data):
|
||||
# unpack input_data
|
||||
rx_locs = input_data
|
||||
|
||||
# unpack target_data
|
||||
if len(target_data) < 4:
|
||||
print("missing arguments, should have loc_x, loc_y, pwr, gamma at least")
|
||||
return
|
||||
tx_loc_x = target_data[0]
|
||||
tx_loc_y = target_data[1]
|
||||
tx_power = target_data[2]
|
||||
env_gamma = target_data[3]
|
||||
tx_loc_z = None if len(target_data) == 4 else target_data[4]
|
||||
tx_loc = np.array([tx_loc_x, tx_loc_y])
|
||||
if tx_loc_z is not None:
|
||||
tx_loc = np.array([tx_loc_x, tx_loc_y, tx_loc_z])
|
||||
return log_gamma_loc(rx_locs, tx_loc, tx_power, env_gamma)
|
||||
|
||||
|
||||
def modelfit_log_gamma(
|
||||
rx_locs,
|
||||
rx_rsses,
|
||||
bounds_pwr=(-60, 0),
|
||||
bounds_gamma=(2, 6),
|
||||
bounds_loc_x=(0, 6.2),
|
||||
bounds_loc_y=(0, 6.2),
|
||||
bounds_loc_z=None,
|
||||
monte_carlo_sampling=False,
|
||||
monte_carlo_sampling_rate=0.8
|
||||
):
|
||||
# initial seeds
|
||||
seed_pwr = np.random.uniform(bounds_pwr[0], bounds_pwr[1])
|
||||
seed_gamma = np.random.uniform(bounds_gamma[0], bounds_gamma[1])
|
||||
seed_loc_x = np.random.uniform(bounds_loc_x[0], bounds_loc_x[1])
|
||||
seed_loc_y = np.random.uniform(bounds_loc_y[0], bounds_loc_y[1])
|
||||
if bounds_loc_z is not None:
|
||||
seed_loc_z = np.random.uniform(bounds_loc_z[0], bounds_loc_z[1])
|
||||
seeds = [
|
||||
seed_loc_x, seed_loc_y,
|
||||
seed_pwr, seed_gamma,
|
||||
seed_loc_z
|
||||
]
|
||||
bounds = list(zip(*[
|
||||
bounds_loc_x, bounds_loc_y,
|
||||
bounds_pwr, bounds_gamma,
|
||||
bounds_loc_z
|
||||
]))
|
||||
else:
|
||||
seeds = [
|
||||
seed_loc_x, seed_loc_y,
|
||||
seed_pwr, seed_gamma
|
||||
]
|
||||
bounds = list(zip(*[
|
||||
bounds_loc_x, bounds_loc_y,
|
||||
bounds_pwr, bounds_gamma
|
||||
]))
|
||||
if monte_carlo_sampling and rx_rsses.shape[0] > 10:
|
||||
logistics = np.random.choice(
|
||||
np.arange(rx_rsses.shape[0]),
|
||||
size=int(monte_carlo_sampling_rate * rx_rsses.shape[0]),
|
||||
replace=False
|
||||
)
|
||||
rx_locs = rx_locs[logistics, :]
|
||||
rx_rsses = rx_rsses[logistics, :]
|
||||
|
||||
# fit
|
||||
popt, pcov = curve_fit(
|
||||
modelfit_log_gamma_func, rx_locs, rx_rsses,
|
||||
p0=seeds, bounds=bounds
|
||||
)
|
||||
|
||||
# unpack popt
|
||||
if bounds_loc_z is None:
|
||||
est_loc_x, est_loc_y, est_tx_pwr, est_env_gamma = popt
|
||||
est_tx_loc = np.array([est_loc_x, est_loc_y])
|
||||
else:
|
||||
est_loc_x, est_loc_y, est_tx_pwr, est_env_gamma, est_loc_z = popt
|
||||
est_tx_loc = np.array([est_loc_x, est_loc_y, est_loc_z])
|
||||
est_rsses = log_gamma_loc(rx_locs, est_tx_loc, est_tx_pwr, est_env_gamma)
|
||||
est_errors = est_rsses - rx_rsses
|
||||
pmse = 1.0 * np.nansum(est_errors * est_errors) / est_errors.shape[0]
|
||||
|
||||
return pmse, est_tx_loc, est_tx_pwr, est_env_gamma, est_rsses
|
||||
|
|
@ -12,8 +12,8 @@ np.random.seed(RANDOM_SEED)
|
|||
|
||||
|
||||
def log_gamma_loc(
|
||||
rx_loc: np.ndarray,
|
||||
tx_loc: np.ndarray,
|
||||
rx_loc: np.ndarray, # dtype=float, avoid int
|
||||
tx_loc: np.ndarray, # dtype=float, avoid int
|
||||
pwr: float,
|
||||
gamma: float,
|
||||
loss: float = 0.0,
|
||||
|
|
|
|||
|
|
@ -0,0 +1,98 @@
|
|||
#!/usr/bin/python
|
||||
|
||||
|
||||
import numpy as np
|
||||
import matplotlib.pyplot as plt
|
||||
|
||||
from libs.consts import COLORMAP_MIN
|
||||
from libs.consts import COLORMAP_MAX
|
||||
from libs.spacemap import SpaceMap
|
||||
|
||||
|
||||
def plotSpace(space_map: SpaceMap):
|
||||
'''
|
||||
'''
|
||||
tx_locs = space_map.getTransmitterLocs()
|
||||
penetration_loss, reflection_loss = space_map.getLosses()
|
||||
|
||||
fig = plt.figure(figsize=(8, 3))
|
||||
|
||||
ax1 = fig.add_subplot(1, 2, 1)
|
||||
ax1.set_title("penetration loss")
|
||||
plt.imshow(
|
||||
np.transpose(penetration_loss),
|
||||
cmap='hot',
|
||||
origin='lower',
|
||||
interpolation='nearest',
|
||||
vmin=COLORMAP_MIN,
|
||||
vmax=COLORMAP_MAX
|
||||
)
|
||||
plt.colorbar()
|
||||
|
||||
ax2 = fig.add_subplot(1, 2, 2)
|
||||
ax2.set_title("reflection loss")
|
||||
plt.imshow(
|
||||
np.transpose(reflection_loss),
|
||||
cmap='hot',
|
||||
origin='lower',
|
||||
interpolation='nearest',
|
||||
vmin=COLORMAP_MIN,
|
||||
vmax=COLORMAP_MAX
|
||||
)
|
||||
plt.colorbar()
|
||||
|
||||
# plt.show()
|
||||
plt.draw()
|
||||
plt.pause(0.1)
|
||||
q = input("press enter..")
|
||||
plt.close()
|
||||
|
||||
|
||||
def plotRSS(
|
||||
ori_rss: np.ndarray,
|
||||
est_rss: np.ndarray = None,
|
||||
est_tx_loc: np.ndarray = None,
|
||||
true_tx_loc: np.ndarray = None
|
||||
):
|
||||
'''
|
||||
'''
|
||||
fig = plt.figure(figsize=(8, 3))
|
||||
ax1 = fig.add_subplot(1, 2, 1)
|
||||
ax1.set_title("original rss map")
|
||||
plt.imshow(
|
||||
np.transpose(ori_rss),
|
||||
cmap='hot',
|
||||
origin='lower',
|
||||
interpolation='nearest',
|
||||
vmin=COLORMAP_MIN,
|
||||
vmax=COLORMAP_MAX
|
||||
)
|
||||
plt.colorbar()
|
||||
|
||||
ax2 = fig.add_subplot(1, 2, 2)
|
||||
ax2.set_title("est rss map")
|
||||
if est_rss is not None:
|
||||
plt.imshow(
|
||||
np.transpose(est_rss),
|
||||
cmap='hot',
|
||||
origin='lower',
|
||||
interpolation='nearest',
|
||||
vmin=COLORMAP_MIN,
|
||||
vmax=COLORMAP_MAX
|
||||
)
|
||||
plt.colorbar()
|
||||
if est_tx_loc is not None:
|
||||
ax2.plot(est_tx_loc[0], est_tx_loc[1], 'bo')
|
||||
ax2.text(est_tx_loc[0] - 5, est_tx_loc[1] + 2, 'est tx', color='b')
|
||||
if true_tx_loc is not None:
|
||||
ax2.plot(true_tx_loc[0], true_tx_loc[1], 'bs')
|
||||
ax2.text(true_tx_loc[0], true_tx_loc[1], 'true tx', color='b')
|
||||
|
||||
# plt.show()
|
||||
plt.draw()
|
||||
plt.pause(0.1)
|
||||
q = input("press enter..")
|
||||
plt.close()
|
||||
|
||||
|
||||
|
||||
|
|
@ -3,6 +3,7 @@
|
|||
|
||||
import numpy as np
|
||||
|
||||
from libs.consts import MAX_RANGE
|
||||
from libs.consts import FLOAT_TOLERANCE
|
||||
from libs.models import log_gamma_dist
|
||||
|
||||
|
|
@ -17,7 +18,7 @@ class SpaceBlock():
|
|||
self.mat = material
|
||||
self.has_transmitter = False
|
||||
self.loss_penetration = 0.0
|
||||
self.loss_reflection = 0.0
|
||||
self.loss_reflection = -100.0
|
||||
self.related_rays = []
|
||||
|
||||
def __mul__(self, val):
|
||||
|
|
@ -95,7 +96,7 @@ class SpaceBlock():
|
|||
def hasTransmitter(self):
|
||||
return self.has_transmitter
|
||||
|
||||
def setLoss(self, penetration=0.0, reflection=0.0):
|
||||
def setLoss(self, penetration=0.0, reflection=-100.0):
|
||||
self.loss_penetration = penetration
|
||||
self.loss_reflection = reflection
|
||||
|
||||
|
|
@ -109,7 +110,10 @@ class SpaceRay():
|
|||
'''
|
||||
def __init__(self, point1, point2):
|
||||
self.start = point1
|
||||
self.end = point2
|
||||
if not isinstance(point2, SpaceBlock):
|
||||
self.end = point1 + SpaceBlock(np.cos(point2), np.sin(point2)) * MAX_RANGE
|
||||
else:
|
||||
self.end = point2
|
||||
# property
|
||||
self.distance = None
|
||||
self.angle_theta = None
|
||||
|
|
@ -127,12 +131,13 @@ class SpaceRay():
|
|||
self.this_pass_through_loss = None
|
||||
self.this_pass_through_blks = None
|
||||
self.this_gamma = None
|
||||
self.__prev_factor = None
|
||||
self.__prev_bounds = None
|
||||
self.loss_total = None
|
||||
self.distance_traveled = None
|
||||
self.reflection_count = 0
|
||||
# id
|
||||
self.ray_id = ''
|
||||
self.prev_ray = None
|
||||
self.next_rays = []
|
||||
|
||||
def getAngle(self, degree=False):
|
||||
if self.angle_theta is None:
|
||||
|
|
@ -172,31 +177,25 @@ class SpaceRay():
|
|||
def setTravelDistance(self, prior_distance):
|
||||
self.distance_traveled = prior_distance + self.getDistance()
|
||||
|
||||
def computeLinePassThroughLoss(self, space_map):
|
||||
factor = space_map.bs
|
||||
bounds = space_map.map.shape
|
||||
|
||||
if (
|
||||
self.__prev_bounds == bounds and
|
||||
self.__prev_factor == factor and
|
||||
self.this_pass_through_blks is not None
|
||||
):
|
||||
return np.sum([
|
||||
each.loss_penetration
|
||||
for each in self.this_pass_through_blks
|
||||
])
|
||||
|
||||
self.__prev_bounds = bounds
|
||||
self.__prev_factor = factor
|
||||
def derivePassAndReflectBlocks(self, space_map):
|
||||
self.this_pass_through_blks = []
|
||||
self.this_reflection_blks = []
|
||||
step_blk = SpaceBlock(self.getAngleThetaCos(), self.getAngleThetaSin()) * factor
|
||||
for i in range(1, int(self.getDistance() / factor)):
|
||||
next_blk = self.start + step_blk * i
|
||||
# assume 2D
|
||||
x_idx, y_idx = [int(x) for x in (next_blk / factor).round()]
|
||||
if x_idx < bounds[0] and x_idx > -1 and y_idx < bounds[1] and y_idx > -1:
|
||||
self.this_pass_through_blks.append(space_map.map[x_idx, y_idx])
|
||||
space_map.map[x_idx, y_idx].related_rays.append(self)
|
||||
if space_map.map[x_idx, y_idx].loss_penetration < 0:
|
||||
self.this_pass_through_blks.append(space_map.map[x_idx, y_idx])
|
||||
if space_map.map[x_idx, y_idx].loss_reflection > -100:
|
||||
self.this_reflection_blks.append(space_map.map[x_idx, y_idx])
|
||||
# space_map.map[x_idx, y_idx].related_rays.append(self)
|
||||
|
||||
def getPassThroughLoss(self):
|
||||
if self.this_pass_through_blks is None:
|
||||
print("need to run `derivePassAndReflectBlocks` first")
|
||||
return
|
||||
self.this_pass_through_loss = np.sum([
|
||||
each.loss_penetration
|
||||
for each in self.this_pass_through_blks
|
||||
|
|
@ -207,7 +206,7 @@ class SpaceRay():
|
|||
excluding the end penetration/reflection loss
|
||||
'''
|
||||
if self.this_pass_through_loss is None:
|
||||
print("need to run `computeLinePassThroughLoss` first")
|
||||
print("need to run `getPassThroughLoss` first")
|
||||
return
|
||||
self.loss_total = prior_loss + self.this_pass_through_loss
|
||||
|
||||
|
|
@ -245,7 +244,9 @@ class SpaceMap():
|
|||
width: float = 6.4,
|
||||
length: float = 6.4,
|
||||
block_size: float = 0.1
|
||||
):
|
||||
):
|
||||
if MAX_RANGE < width or MAX_RANGE < length:
|
||||
print("WARNNING! MAX_RANGE {} is less than input".format(MAX_RANGE))
|
||||
self.width = width
|
||||
self.length = length
|
||||
self.bs = block_size
|
||||
|
|
@ -258,12 +259,21 @@ class SpaceMap():
|
|||
|
||||
# initialize the map
|
||||
self.__loss_p = np.zeros(self.map.shape)
|
||||
self.__loss_r = np.zeros(self.map.shape)
|
||||
self.__loss_r = np.ones(self.map.shape) * -100
|
||||
self.__tx_locs = np.zeros(self.map.shape)
|
||||
for j in range(self.map.shape[1]):
|
||||
y = self.bs * (j + 0.5)
|
||||
for i in range(self.map.shape[0]):
|
||||
self.map[i, j] = SpaceBlock(self.bs * (i + 0.5), y)
|
||||
|
||||
# propagation
|
||||
self.env_gamma = 2.0
|
||||
self.ray_trace_deg_step = 1.0
|
||||
# according to VTC paper
|
||||
# "A RAY TRACING METHOD FOR PREDICTING PATH LOSS AND DELAY SPREAD
|
||||
# IN MICROCELLULAR ENVIRONMENTS"
|
||||
self.ray_trace_deg_tol = self.ray_trace_deg_step / 180.0 * np.pi / np.sqrt(3)
|
||||
|
||||
def getLosses(self):
|
||||
return np.array([self.__loss_p, self.__loss_r])
|
||||
|
||||
|
|
@ -279,3 +289,25 @@ class SpaceMap():
|
|||
self.map[i, j].setLoss(penetration, reflection)
|
||||
self.__loss_p[i, j] = penetration
|
||||
self.__loss_r[i, j] = reflection
|
||||
|
||||
def setHasTransmitter(self, i, j):
|
||||
self.map[i, j].setTransmitter(flag)
|
||||
self.__tx_locs[i, j] = 1 if flag else 0
|
||||
|
||||
def getTransmitterLocs(self):
|
||||
return self.__tx_locs
|
||||
|
||||
def setEnvGamma(self, val: float):
|
||||
self.env_gamma = val
|
||||
|
||||
def traceRay(self, tx_power: float, tx_loc: SpaceBlock, rx_loc: SpaceBlock):
|
||||
rays = []
|
||||
for direction in np.arange(0, 359.99, self.ray_trace_deg_step):
|
||||
ray = SpaceRay(tx_loc, direction / 180.0 * np.pi)
|
||||
ray.setPower(tx_power, self.env_gamma)
|
||||
ray.ray_id = "ray_{0:.3f}".format(direction)
|
||||
rays.append(ray)
|
||||
while len(rays) > 0:
|
||||
ray = rays.pop(0)
|
||||
|
||||
|
||||
|
|
|
|||
Binary file not shown.
|
|
@ -0,0 +1,73 @@
|
|||
#!/usr/bin/python
|
||||
|
||||
import sys
|
||||
sys.path.append(".")
|
||||
sys.path.append("..")
|
||||
sys.path.append("../..") # Adds higher directory to python modules path
|
||||
|
||||
import os
|
||||
import time
|
||||
import pickle
|
||||
import numpy as np
|
||||
from libs.consts import NOISE_FLOOR
|
||||
from libs.fitting import modelfit_log_gamma
|
||||
from libs.plotting import plotRSS
|
||||
|
||||
|
||||
def convert_mat_to_vector(mat, block_size=0.1):
|
||||
'''
|
||||
'''
|
||||
rx_locs = []
|
||||
rx_rsses = []
|
||||
width, length = mat.shape
|
||||
for i in range(width):
|
||||
for j in range(length):
|
||||
if mat[i, j] <= NOISE_FLOOR:
|
||||
continue
|
||||
rx_locs.append([(i + 0.5) * block_size, (j + 0.5) * block_size])
|
||||
rx_rsses.append(mat[i, j])
|
||||
return np.array(rx_locs), np.array(rx_rsses)
|
||||
|
||||
|
||||
def convert_vector_to_mat(rx_locs, rx_rsses, shape, block_size=0.1):
|
||||
'''
|
||||
'''
|
||||
result = np.ones(shape) * -85.0
|
||||
for i in range(rx_locs.shape[0]):
|
||||
x, y = rx_locs[i, :] / block_size
|
||||
result[int(x), int(y)] = rx_rsses[i]
|
||||
return result
|
||||
|
||||
|
||||
def test():
|
||||
data_ori = pickle.load(open(os.path.join(os.path.dirname(__file__), "dev_map.pickle"), "rb"))
|
||||
rx_locs, rx_rsses = convert_mat_to_vector(data_ori)
|
||||
|
||||
min_pmse = float('inf')
|
||||
best_tx_loc = None
|
||||
best_tx_pwr = None
|
||||
best_env_gamma = None
|
||||
best_rsses = None
|
||||
for i in range(10):
|
||||
pmse, est_tx_loc, est_tx_pwr, est_env_gamma, est_rsses = modelfit_log_gamma(rx_locs, rx_rsses)
|
||||
if pmse < min_pmse:
|
||||
print("found better:", pmse, est_tx_loc, est_tx_pwr, est_env_gamma)
|
||||
data_est = convert_vector_to_mat(rx_locs, est_rsses, data_ori.shape)
|
||||
min_pmse = pmse
|
||||
best_tx_loc = est_tx_loc
|
||||
best_tx_pwr = est_tx_pwr
|
||||
best_env_gamma = est_env_gamma
|
||||
best_rsses = est_rsses
|
||||
print("final:", min_pmse, best_tx_loc, best_tx_pwr, best_env_gamma)
|
||||
data_best_est = convert_vector_to_mat(rx_locs, best_rsses, data_ori.shape)
|
||||
plotRSS(
|
||||
data_ori,
|
||||
data_best_est,
|
||||
est_tx_loc=best_tx_loc / 0.1
|
||||
)
|
||||
# print(rx_locs[:,1])
|
||||
# print(rx_locs[1,:])
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
test()
|
||||
|
|
@ -11,6 +11,7 @@ import numpy as np
|
|||
from libs.spacemap import SpaceBlock
|
||||
from libs.spacemap import SpaceRay
|
||||
from libs.spacemap import SpaceMap
|
||||
from libs.plotting import plotSpace
|
||||
|
||||
|
||||
def test():
|
||||
|
|
@ -18,7 +19,8 @@ def test():
|
|||
print(spacemap.getLosses())
|
||||
print(spacemap.getLoss(np.array([1, 2, 3]), 1))
|
||||
ray = SpaceRay(SpaceBlock(0.0, 0.0), SpaceBlock(2, 6.38))
|
||||
ray.computeLinePassThroughLoss(spacemap)
|
||||
ray.derivePassAndReflectBlocks(spacemap)
|
||||
ray.getPassThroughLoss()
|
||||
ray.setTotalLoss(0.0)
|
||||
ray.setInitPower(0, gamma=2.0)
|
||||
ray.setTravelDistance(0.0)
|
||||
|
|
@ -26,6 +28,7 @@ def test():
|
|||
print("pwr = {}".format(pwr))
|
||||
print(ray.this_pass_through_blks[0].related_rays)
|
||||
print(ray.this_pass_through_blks[0])
|
||||
plotSpace(spacemap)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
|
|
|||
Loading…
Reference in New Issue