#!/usr/bin/python import numpy as np from libs.consts import MAX_RANGE from libs.consts import NOISE_FLOOR from libs.consts import FLOAT_TOLERANCE from libs.models import log_gamma_dist from libs.plotting import plotSpace class SpaceBlock(): ''' ''' def __init__(self, x, y, z=None, material='default'): self.x = float(x) self.y = float(y) self.z = float('nan') if z is None else float(z) self.mat = material self.has_transmitter = False self.loss_penetration = 0.0 self.loss_reflection = -100.0 self.orientation = None # backup linked list self.left = None # <- self.right = None # -> self.up = None # ^ self.down = None # v self.top = None # . (3d on top) self.bottom = None # o (3d underneath) def __mul__(self, val): return SpaceBlock(self.x * val, self.y * val, self.z * val) def __div__(self, val): return self.__truediv__(val) def __truediv__(self, val): return SpaceBlock(self.x / val, self.y / val, self.z / val) def __floordiv__(self, val): return SpaceBlock(self.x // val, self.y // val, self.z // val) def __add__(self, blk): if isinstance(blk, SpaceBlock): return SpaceBlock(self.x + blk.x, self.y + blk.y, self.z + blk.z) return SpaceBlock(self.x + blk, self.y + blk, self.z + blk) def __sub__(self, blk): if isinstance(blk, SpaceBlock): return SpaceBlock(self.x - blk.x, self.y - blk.y, self.z - blk.z) return SpaceBlock(self.x - blk, self.y - blk, self.z - blk) def __abs__(self): return self.dot(self) def __eq__(self, blk): return self.distance(blk) < FLOAT_TOLERANCE def __contains__(self, lst): for each in lst: if self == each: return True return False def includes(self, x, y, z=None, block_size=0.1): flag = x >= self.x and x < (self.x + block_size) flag = flag and y >= self.y and y < (self.y + block_size) if z is None: return flag return flag and z >= self.z and z < (self.z + block_size) def dot(self, blk): ''' dot product ''' if np.isnan(self.z) or np.isnan(blk.z): return (self.x * blk.x) + (self.y * blk.y) return (self.x * blk.x) + (self.y * blk.y) + (self.z * blk.z) def round(self, dg=0): return SpaceBlock(round(self.x, dg), round(self.y, dg), round(self.z, dg)) def distanceSquared(self, blk): return (self - blk).__abs__() def distance(self, blk): return np.sqrt(self.distanceSquared(blk)) def __iter__(self): self.__i = 0 return self def __next__(self): self.__i += 1 if self.__i == 1: return self.x elif self.__i == 2: return self.y elif self.__i == 3 and not np.isnan(self.z): return self.z raise StopIteration def __str__(self): return "SpaceBlock(x = {:.3f}, y = {:.3f}, z = {:.3f})".format(self.x, self.y, self.z) def setTransmitter(self, flag): self.has_transmitter = flag def hasTransmitter(self): return self.has_transmitter def setLoss(self, penetration=0.0, reflection=-100.0): self.loss_penetration = penetration self.loss_reflection = reflection def setOrientation(self, orientation=0.0): self.orientation = orientation def getOrientation(self): return self.orientation def getLoss(self): return [self.loss_penetration, self.loss_reflection] class SpaceRay(): ''' TODO: extend to 3D ''' def __init__(self, point1, point2): self.start = point1 if isinstance(point2, float): self.end = point1 + SpaceBlock(np.cos(point2), np.sin(point2)) * MAX_RANGE else: self.end = point2 # property self.distance = None self.angle_theta = None self.angle_theta_deg = None self.angle_theta_sin = None self.angle_theta_cos = None self.angle_theta_tan = self.slope = None # power and propagation self.init_pwr = None self.init_phase = None self.starting_loss = None self.starting_distance = None self.ending_pwr = None self.ending_phase = None self.ending_loss = None self.ending_distance = None self.passthrough_loss = None self.passthrough_blks = None self.reflection_blks = None self.gamma = 2.0 # distance traveled prior to this distance self.distance_traveled = None self.reflection_count = 0 # id self.ray_id = '' self.prev_ray = None def __eq__(self, spaceray): return (self.start - self.end) == (spaceray.start - spaceray.end) def __contains__(self, lst): for each in lst: if self == each: return True return False def __str__(self): return ( "SpaceRay(start = {0}, end = {1}): ".format(self.start, self.end) + "pwr_init = {0}, pwr_end = {1}, ".format(self.init_pwr, self.ending_pwr) + "loss_start = {0}, loss_end = {1}\n".format(self.starting_loss, self.ending_loss) + " <- prev_ray = {}".format(self.prev_ray) ) def __iter__(self): self.__prev_ray = self return self def __next__(self): if self.__prev_ray is None: raise StopIteration tmp = self.__prev_ray self.__prev_ray = self.__prev_ray.prev_ray return tmp def getAngle(self, degree=False): if self.angle_theta is None: self.angle_theta = np.arctan2( self.end.y - self.start.y, self.end.x - self.start.x ) if degree: if self.angle_theta_deg is None: self.angle_theta_deg = self.angle_theta * 180.0 / np.pi return self.angle_theta_deg return self.angle_theta def getSlope(self): if self.slope is None: self.slope = np.tan(self.getAngle()) self.angle_theta_tan = self.slope return self.slope def getAngleThetaTan(self): return self.getSlope() def getAngleThetaSin(self): if self.angle_theta_sin is None: self.angle_theta_sin = np.sin(self.getAngle()) return self.angle_theta_sin def getAngleThetaCos(self): if self.angle_theta_cos is None: self.angle_theta_cos = np.cos(self.getAngle()) return self.angle_theta_cos def getDistance(self): if self.distance is None: self.distance = self.start.distance(self.end) return self.distance def setStartingDistance(self, val): self.starting_distance = val def getTraveledDistance(self): if self.starting_distance is None: print("need to run `setStartingLoss` first") return if self.ending_distance is None: self.ending_distance = self.starting_distance + self.getDistance() return self.ending_distance def derivePassAndReflectBlocks(self, space_map): self.passthrough_blks = [] self.reflection_blks = [] step_blk = SpaceBlock(self.getAngleThetaCos(), self.getAngleThetaSin()) * space_map.bs early_stop_flag = False # reflect_blk_min_gap = 2 for i in range(2, int(self.getDistance() / space_map.bs) - 1): next_blk = self.start + step_blk * i # assume 2D x_idx, y_idx = [int(x) for x in (next_blk / space_map.bs).round()] if x_idx < space_map.map.shape[0] and x_idx > -1 and y_idx < space_map.map.shape[1] and y_idx > -1: early_stop_flag = True if space_map.map[x_idx, y_idx].loss_penetration < 0.1: self.passthrough_blks.append(space_map.map[x_idx, y_idx]) if space_map.map[x_idx, y_idx].loss_reflection > -100: self.reflection_blks.append(space_map.map[x_idx, y_idx]) continue if early_stop_flag: break def getPassThroughLoss(self): if self.passthrough_loss is None: if self.passthrough_blks is None: print("need to run `derivePassAndReflectBlocks` first") return self.passthrough_loss = np.sum([ each.loss_penetration for each in self.passthrough_blks ]) return self.passthrough_loss def getEndingLoss(self): ''' excluding the end penetration/reflection loss ''' if self.starting_loss is None: print("need to run `setStartingLoss` first") return if self.passthrough_loss is None and self.getPassThroughLoss() is None: return self.ending_loss = self.starting_loss + self.passthrough_loss return self.ending_loss def setStartingLoss(self, val): self.starting_loss = val def setInitPower(self, amplitude, phase=0.0): self.init_pwr = amplitude self.init_phase = phase def getEndingPower(self): if self.ending_loss is None: if self.getEndingLoss() is None: return if self.init_pwr is None: print("need to run `setInitPower` first") return self.ending_pwr = log_gamma_dist( np.array([self.ending_distance]), self.init_pwr, self.gamma, loss = self.ending_loss, gaussian_noise = False )[0] return self.ending_pwr def getPathFromRay(ray, stop_blk, spacemap): path = SpaceRay(ray.start, stop_blk) # prevent 0 distance if path.getDistance() < FLOAT_TOLERANCE: return None # inherit ray properties path.derivePassAndReflectBlocks(spacemap) # path.passthrough_blks = ray.passthrough_blks[:ray.passthrough_blks.index(stop_blk)] path.setInitPower(ray.init_pwr, ray.gamma) path.setStartingDistance(ray.starting_distance) path.setStartingLoss(ray.starting_loss) path.prev_ray = ray.prev_ray # path.getTraveledDistance() path.getPassThroughLoss() path.getEndingLoss() # path.getEndingPower() return path def getRayFromPath(path, direction, t='penetrate'): if path.getEndingPower() <= NOISE_FLOOR: return None ray = SpaceRay(path.end, direction) ray.setInitPower(path.init_pwr, path.gamma) ray.setStartingDistance(path.getTraveledDistance()) if t == 'penetrate': ray.setStartingLoss(path.getEndingLoss() + path.end.getLoss()[0]) elif t == 'reflect': ray.setStartingLoss(path.getEndingLoss() + path.end.getLoss()[1]) else: ray.setStartingLoss(path.getEndingLoss()) if ray.starting_loss <= NOISE_FLOOR: return None ray.prev_ray = path return ray class SpaceMap(): ''' TODO: extend to 3D ''' def __init__( self, width: float = 6.4, length: float = 6.4, block_size: float = 0.1 ): if MAX_RANGE < width or MAX_RANGE < length: print("WARNNING! MAX_RANGE {} is less than input".format(MAX_RANGE)) self.width = width self.length = length self.bs = block_size self.map = np.empty( ( int(self.width / self.bs), int(self.length / self.bs) ), dtype=SpaceBlock ) # initialize the map self.__loss_p = np.zeros(self.map.shape) self.__loss_r = np.ones(self.map.shape) * -100 self.__tx_locs = np.zeros(self.map.shape) for j in range(self.map.shape[1]): y = self.bs * (j + 0.5) for i in range(self.map.shape[0]): self.map[i, j] = SpaceBlock(self.bs * (i + 0.5), y) for j in range(self.map.shape[1]): for i in range(self.map.shape[0]): self.map[i, j].up = self.map[i, j+1] if j < self.map.shape[1]-1 else None self.map[i, j].down = self.map[i, j-1] if j > 0 else None self.map[i, j].left = self.map[i-1, j] if i < 0 else None self.map[i, j].right = self.map[i+1, j] if i < self.map.shape[0]-1 else None # propagation self.endpaths = [] self.tx_loc = None self.env_gamma = 2.0 self.ray_trace_deg_step = 1.0 # according to VTC paper # "A RAY TRACING METHOD FOR PREDICTING PATH LOSS AND DELAY SPREAD # IN MICROCELLULAR ENVIRONMENTS" self.ray_trace_deg_tol = self.ray_trace_deg_step / 180.0 * np.pi / np.sqrt(3) def getLosses(self): return np.array([self.__loss_p, self.__loss_r]) def getLoss(self, i, j): return np.array([self.__loss_p[i, j], self.__loss_r[i, j]]) def setLosses(self, penetrations, reflections): for j in range(self.map.shape[1]): for i in range(self.map.shape[0]): self.setLoss(i, j, penetrations[i ,j], reflections[i, j]) def setOrientations(self, orientations): for j in range(self.map.shape[1]): for i in range(self.map.shape[0]): self.setOrientation(i, j, orientations[i ,j]) def setOrientation(self, i, j, orientation): if np.isnan(orientation): orientation = None self.map[i, j].setOrientation(orientation) def setLoss(self, i, j, penetration, reflection): self.map[i, j].setLoss(penetration, reflection) self.__loss_p[i, j] = penetration self.__loss_r[i, j] = reflection def setHasTransmitter(self, i, j): self.map[i, j].setTransmitter(flag) self.__tx_locs[i, j] = 1 if flag else 0 def getTransmitterLocs(self): return self.__tx_locs def setEnvGamma(self, val: float): self.env_gamma = val def traceRays(self, tx_power: float, tx_loc: SpaceBlock): ''' ''' if self.tx_loc is not None and tx_loc == self.tx_loc: return self.endpaths = [] self.tx_loc = tx_loc rays = [] for direction in np.arange(0, 359.99, self.ray_trace_deg_step): ray = SpaceRay(tx_loc, direction / 180.0 * np.pi) ray.setInitPower(tx_power, self.env_gamma) ray.setStartingLoss(0.0) ray.setStartingDistance(0.0) rays.append(ray) # fake_init_ray.next_rays.append(ray) # BFS while len(rays) > 0: ray = rays.pop(0) ray.derivePassAndReflectBlocks(self) # if no reflections exist, then only passing through, simple if not ray.reflection_blks: # if passing through the receiver block, we only have a LoS here # safe to save this last ray (instead of path) self.endpaths.append(ray) continue ref_blk = ray.reflection_blks[0] path = getPathFromRay(ray, ref_blk, self) # zero length if path is None: continue # too weak if path.getEndingPower() < NOISE_FLOOR: self.endpaths.append(path) continue # penetrated ray ray_p = getRayFromPath(path, ray.getAngle(), t='penetrate') if ray_p is not None: rays.append(ray_p) # reflected ray if ref_blk.getOrientation() is None: # if no orientation info provided, cannot calculate reflected ray continue ray_r = getRayFromPath(path, 2.0 * ref_blk.getOrientation() - path.getAngle() , t='reflect') if ray_r is not None: rays.append(ray_r) print("size of all found rays: {}".format(len(self.endpaths))) def traceRay(self, rx_loc: SpaceBlock): ''' ''' def aggreatePower(paths): if not paths: return NOISE_FLOOR sig_pwr = 0.0 for path in paths: sig_pwr += np.power(10, path.getEndingPower() / 10.0) return 10.0 * np.log10(sig_pwr) def removeRedundantPaths(paths): paths_len = len(paths) duplicated_ones = {} # calcualte pair-wise distance for i in range(paths_len): for j in range(i + 1, paths_len): if abs(paths[i].start - paths[j].start) < 2.0 * self.bs: if i not in duplicated_ones: duplicated_ones[i] = [0, []] duplicated_ones[i][0] += 1 duplicated_ones[i][1].append(j) if j not in duplicated_ones: duplicated_ones[j] = [0, []] duplicated_ones[j][0] += 1 duplicated_ones[j][1].append(i) # removal to_be_removed = [] while len(duplicated_ones) > 0: idx = sorted(list(duplicated_ones.keys()), key=lambda x: duplicated_ones[x][0], reverse=True)[0] for target_idx in duplicated_ones[idx][1]: if target_idx not in to_be_removed: to_be_removed.append(target_idx) duplicated_ones[idx][0] -= 1 duplicated_ones[target_idx][0] -= 1 if duplicated_ones[target_idx][0] == 0: del duplicated_ones[target_idx] continue del duplicated_ones[target_idx][1][duplicated_ones[target_idx][1].index(idx)] del duplicated_ones[idx] for each in sorted(to_be_removed, reverse=True): del paths[each] if not self.endpaths: return NOISE_FLOOR, [] rx_loc_paths = [] for path in self.endpaths: current_p = path target_p_found = None original_path_flag = True while current_p is not None: target_p = getPathFromRay(current_p, rx_loc, self) if target_p is None: break angle_diff = abs(target_p.getAngle() % (2 * np.pi) - current_p.getAngle() % (2 * np.pi)) if angle_diff < self.ray_trace_deg_tol * max(target_p.getTraveledDistance(), 2): if target_p.getEndingPower() < NOISE_FLOOR: continue target_p_found = target_p current_p = current_p.prev_ray original_path_flag = False if target_p_found is not None and target_p_found not in rx_loc_paths: rx_loc_paths.append(target_p_found) # clean up for too nearby paths removeRedundantPaths(rx_loc_paths) return aggreatePower(rx_loc_paths), rx_loc_paths