diff --git a/libs/consts.py b/libs/consts.py new file mode 100644 index 0000000..245e70b --- /dev/null +++ b/libs/consts.py @@ -0,0 +1,11 @@ +#!/usr/bin/python + + +RANDOM_SEED = 0 + +GAUSSIAN_NOISE_MEAN = 0.0 # dB +GAUSSIAN_NOISE_STD = 3.16 # 10 dB variance + +NOISE_FLOOR = -85 #dB + +FLOAT_TOLERANCE = 0.001 diff --git a/libs/models.py b/libs/models.py new file mode 100644 index 0000000..2990f8c --- /dev/null +++ b/libs/models.py @@ -0,0 +1,48 @@ +#!/usr/bin/python + + +import numpy as np + +from libs.consts import NOISE_FLOOR +from libs.consts import RANDOM_SEED +from libs.consts import GAUSSIAN_NOISE_STD +from libs.consts import GAUSSIAN_NOISE_MEAN + +np.random.seed(RANDOM_SEED) + + +def log_gamma_loc( + rx_loc: np.ndarray, + tx_loc: np.ndarray, + pwr: float, + gamma: float, + loss: float = 0.0, + gaussian_noise: bool = False +): + ''' + ''' + dist_squared = np.nansum((rx_loc - tx_loc) * (rx_loc - tx_loc), axis=1) + dist_squared[dist_squared < 0.02] = 0.02 + noise = normal(GAUSSIAN_NOISE_MEAN, GAUSSIAN_NOISE_STD) if gaussian_noise else 0.0 + rss = pwr - 10.0 * gamma / 2 * np.log10(dist_squared) + noise + loss + rss[rss < NOISE_FLOOR] = NOISE_FLOOR + rss[rss > pwr] = pwr + return rss + + +def log_gamma_dist( + dist: np.ndarray, + pwr: float, + gamma: float, + loss: float = 0.0, + gaussian_noise: bool = False, + is_squared: bool = False +): + ''' + ''' + factor = 0.5 if is_squared else 1.0 + noise = normal(GAUSSIAN_NOISE_MEAN, GAUSSIAN_NOISE_STD) if gaussian_noise else 0.0 + rss = pwr - 10.0 * gamma * factor * np.log10(dist) + noise + loss + rss[rss < NOISE_FLOOR] = NOISE_FLOOR + rss[rss > pwr] = pwr + return rss diff --git a/libs/spacemap.py b/libs/spacemap.py new file mode 100644 index 0000000..b3a2cbe --- /dev/null +++ b/libs/spacemap.py @@ -0,0 +1,281 @@ +#!/usr/bin/python + + +import numpy as np + +from libs.consts import FLOAT_TOLERANCE +from libs.models import log_gamma_dist + + +class SpaceBlock(): + ''' + ''' + def __init__(self, x, y, z=None, material='default'): + self.x = float(x) + self.y = float(y) + self.z = float('nan') if z is None else float(z) + self.mat = material + self.has_transmitter = False + self.loss_penetration = 0.0 + self.loss_reflection = 0.0 + self.related_rays = [] + + def __mul__(self, val): + return SpaceBlock(self.x * val, self.y * val, self.z * val) + + def __div__(self, val): + return self.__truediv__(val) + + def __truediv__(self, val): + return SpaceBlock(self.x / val, self.y / val, self.z / val) + + def __floordiv__(self, val): + return SpaceBlock(self.x // val, self.y // val, self.z // val) + + def __add__(self, blk): + if isinstance(blk, SpaceBlock): + return SpaceBlock(self.x + blk.x, self.y + blk.y, self.z + blk.z) + return SpaceBlock(self.x + blk, self.y + blk, self.z + blk) + + def __sub__(self, blk): + if isinstance(blk, SpaceBlock): + return SpaceBlock(self.x - blk.x, self.y - blk.y, self.z - blk.z) + return SpaceBlock(self.x - blk, self.y - blk, self.z - blk) + + def __abs__(self): + return self.dot(self) + + def __eq__(self, blk): + return self.distance(blk) < FLOAT_TOLERANCE + + def includes(self, x, y, z=None, block_size=0.1): + flag = x >= self.x and x < (self.x + block_size) + flag = flag and y >= self.y and y < (self.y + block_size) + if z is None: + return flag + return flag and z >= self.z and z < (self.z + block_size) + + def dot(self, blk): + ''' + dot product + ''' + if np.isnan(self.z) or np.isnan(blk.z): + return (self.x * blk.x) + (self.y * blk.y) + return (self.x * blk.x) + (self.y * blk.y) + (self.z * blk.z) + + def round(self, dg=0): + return SpaceBlock(round(self.x, dg), round(self.y, dg), round(self.z, dg)) + + def distanceSquared(self, blk): + return (self - blk).__abs__() + + def distance(self, blk): + return np.sqrt(self.distanceSquared(blk)) + + def __iter__(self): + self.__i = 0 + return self + + def __next__(self): + self.__i += 1 + if self.__i == 1: + return self.x + elif self.__i == 2: + return self.y + elif self.__i == 3 and not np.isnan(self.z): + return self.z + raise StopIteration + + def __str__(self): + return "SpaceBlock(x = {:.3f}, y = {:.3f}, z = {:.3f})".format(self.x, self.y, self.z) + + def setTransmitter(self, flag): + self.has_transmitter = flag + + def hasTransmitter(self): + return self.has_transmitter + + def setLoss(self, penetration=0.0, reflection=0.0): + self.loss_penetration = penetration + self.loss_reflection = reflection + + def getLoss(self): + return [self.loss_penetration, self.loss_reflection] + + +class SpaceRay(): + ''' + TODO: extend to 3D + ''' + def __init__(self, point1, point2): + self.start = point1 + self.end = point2 + # property + self.distance = None + self.angle_theta = None + self.angle_theta_deg = None + self.angle_theta_sin = None + self.angle_theta_cos = None + self.angle_theta_tan = self.slope = None + # power and propagation + self.begininng_pwr = 0.0 + self.begininng_phase = 0.0 + self.this_starting_pwr = 0.0 + self.this_starting_phase = 0.0 + self.this_ending_pwr = 0.0 + self.this_ending_phase = 0.0 + self.this_pass_through_loss = None + self.this_pass_through_blks = None + self.this_gamma = None + self.__prev_factor = None + self.__prev_bounds = None + self.loss_total = None + self.distance_traveled = None + # id + self.ray_id = '' + + def getAngle(self, degree=False): + if self.angle_theta is None: + self.angle_theta = np.arctan2( + self.end.y - self.start.y, self.end.x - self.start.x + ) + if degree: + if self.angle_theta_deg is None: + self.angle_theta_deg = self.angle_theta * 180.0 / np.pi + return self.angle_theta_deg + return self.angle_theta + + def getSlope(self): + if self.slope is None: + self.slope = np.tan(self.getAngle()) + self.angle_theta_tan = self.slope + return self.slope + + def getAngleThetaTan(self): + return self.getSlope() + + def getAngleThetaSin(self): + if self.angle_theta_sin is None: + self.angle_theta_sin = np.sin(self.getAngle()) + return self.angle_theta_sin + + def getAngleThetaCos(self): + if self.angle_theta_cos is None: + self.angle_theta_cos = np.cos(self.getAngle()) + return self.angle_theta_cos + + def getDistance(self): + if self.distance is None: + self.distance = self.start.distance(self.end) + return self.distance + + def setTravelDistance(self, prior_distance): + self.distance_traveled = prior_distance + self.getDistance() + + def computeLinePassThroughLoss(self, space_map): + factor = space_map.bs + bounds = space_map.map.shape + + if ( + self.__prev_bounds == bounds and + self.__prev_factor == factor and + self.this_pass_through_blks is not None + ): + return np.sum([ + each.loss_penetration + for each in self.this_pass_through_blks + ]) + + self.__prev_bounds = bounds + self.__prev_factor = factor + self.this_pass_through_blks = [] + step_blk = SpaceBlock(self.getAngleThetaCos(), self.getAngleThetaSin()) * factor + for i in range(1, int(self.getDistance() / factor)): + next_blk = self.start + step_blk * i + # assume 2D + x_idx, y_idx = [int(x) for x in (next_blk / factor).round()] + if x_idx < bounds[0] and x_idx > -1 and y_idx < bounds[1] and y_idx > -1: + self.this_pass_through_blks.append(space_map.map[x_idx, y_idx]) + space_map.map[x_idx, y_idx].related_rays.append(self) + self.this_pass_through_loss = np.sum([ + each.loss_penetration + for each in self.this_pass_through_blks + ]) + + def setTotalLoss(self, prior_loss): + ''' + excluding the end penetration/reflection loss + ''' + if self.this_pass_through_loss is None: + print("need to run `computeLinePassThroughLoss` first") + return + self.loss_total = prior_loss + self.this_pass_through_loss + + def setInitPower(self, power, gamma=2.0): + self.begininng_pwr = power + self.this_gamma = gamma + + def computeResultingPwr(self): + if self.loss_total is None: + print("need to run `setTotalLoss` first") + return + if self.begininng_pwr is None: + print("need to run `setInitPower` first") + return + if self.distance_traveled is None: + print("need to run `setTravelDistance` first") + return + self.this_ending_pwr = log_gamma_dist( + np.array([self.distance_traveled]), + self.begininng_pwr, + self.this_gamma, + loss = self.loss_total, + gaussian_noise = False, + is_squared = False + )[0] + return self.this_ending_pwr + + +class SpaceMap(): + ''' + TODO: extend to 3D + ''' + def __init__( + self, + width: float = 6.4, + length: float = 6.4, + block_size: float = 0.1 + ): + self.width = width + self.length = length + self.bs = block_size + self.map = np.empty( + ( + int(self.width / self.bs), + int(self.length / self.bs) + ), dtype=SpaceBlock + ) + + # initialize the map + self.__loss_p = np.zeros(self.map.shape) + self.__loss_r = np.zeros(self.map.shape) + for j in range(self.map.shape[1]): + y = self.bs * (j + 0.5) + for i in range(self.map.shape[0]): + self.map[i, j] = SpaceBlock(self.bs * (i + 0.5), y) + + def getLosses(self): + return np.array([self.__loss_p, self.__loss_r]) + + def getLoss(self, i, j): + return np.array([self.__loss_p[i, j], self.__loss_r[i, j]]) + + def setLosses(self, penetrations, reflections): + for j in range(self.map.shape[1]): + for i in range(self.map.shape[0]): + self.setLoss(i, j, penetrations[i ,j], reflections[i, j]) + + def setLoss(self, i, j, penetration, reflection): + self.map[i, j].setLoss(penetration, reflection) + self.__loss_p[i, j] = penetration + self.__loss_r[i, j] = reflection diff --git a/tests/spacemap.py b/tests/spacemap.py new file mode 100644 index 0000000..f21b6a4 --- /dev/null +++ b/tests/spacemap.py @@ -0,0 +1,32 @@ +#!/usr/bin/python + +import sys +sys.path.append(".") +sys.path.append("..") +sys.path.append("../..") # Adds higher directory to python modules path + +import os +import time +import numpy as np +from libs.spacemap import SpaceBlock +from libs.spacemap import SpaceRay +from libs.spacemap import SpaceMap + + +def test(): + spacemap = SpaceMap(width=6.4, length=6.4, block_size=0.1) + print(spacemap.getLosses()) + print(spacemap.getLoss(np.array([1, 2, 3]), 1)) + ray = SpaceRay(SpaceBlock(0.0, 0.0), SpaceBlock(2, 6.38)) + ray.computeLinePassThroughLoss(spacemap) + ray.setTotalLoss(0.0) + ray.setInitPower(0, gamma=2.0) + ray.setTravelDistance(0.0) + pwr = ray.computeResultingPwr() + print("pwr = {}".format(pwr)) + print(ray.this_pass_through_blks[0].related_rays) + print(ray.this_pass_through_blks[0]) + + +if __name__ == "__main__": + test()