From d61c1995367ec113c3b7c50d571affc26aebfda4 Mon Sep 17 00:00:00 2001 From: "oceanpdoshi@g.ucla.edu" <oceanpdoshi@.ucla.edu> Date: Tue, 16 Apr 2019 17:30:18 -0700 Subject: [PATCH] Added the simulated_dataset_manager.py 1st iteration, next step is to add dynamic generation. --- .../simulated_dataset_manager.py | 301 ++++++++++++++++++ 1 file changed, 301 insertions(+) create mode 100644 CoLo-AT/dataset_manager/simulated_dataset_manager.py diff --git a/CoLo-AT/dataset_manager/simulated_dataset_manager.py b/CoLo-AT/dataset_manager/simulated_dataset_manager.py new file mode 100644 index 0000000..b8ddf90 --- /dev/null +++ b/CoLo-AT/dataset_manager/simulated_dataset_manager.py @@ -0,0 +1,301 @@ +from math import pi, sqrt +import numpy as np + +import IPython + +# If need to import modules from other folders +''' +import sys +import os +import inspect +currentdir = os.path.dirname(os.path.abspath(inspect.getfile(inspect.currentframe()))) +parentdir = os.path.dirname(currentdir) +sys.path.insert(0,parentdir) +from requests.request_response import Request_response +''' + +# TODO - incorporate communcation (LQI, RSS) as a threshold/cutoff for communication +class SimulatedDataSetManager: + + def __init__(self, dataset_name, boundary, landmarks, duration, robot_labels, + odometry_noise, communication_noise, measurement_noise, robot_vision = pi/3): + + self.name = dataset_name + + # Used for request scheduling + self.next_request = 'odometry' + + self.duration = duration + self.robot_labels = robot_labels + self.num_robots = len(robot_labels) + + # angular width of robots's vision with respect to the x_axis (radians) + self.robot_vision = robot_vision + + # User-specified noises + # TODO - may need to add more choice in the future (Ex: specify angular velocity noise) + self.odometry_noise = odometry_noise + self.communication_noise = communication_noise + self.measurement_noise = measurement_noise + self.boundary = boundary + + # All robots begin moving at t = 0s + self.start_time = 0 + self.end_time = self.start_time + self.duration + self.start_moving_times = [0 for label in robot_labels] + + # landmarks = {landmark ID: [x,y]} + self.landmark_map = landmarks + + # starting_states = {robot label: [time, x_pos, y_pos, orientation], ... } + # TODO - Update this to be user-specified on how to initialize robot locations. + self.starting_states = {label : [0, i, i, 0] for i,label in enumerate(robot_labels)} + + # in the simulation all the times for all the measurements are the same. + self.time_arr = {'odometry': [[] for i in range(self.num_robots)], 'measurement': [[] for i in range(self.num_robots)], + 'groundtruth': [[] for i in range(self.num_robots)]} + + for data_type in self.time_arr: + for i in range(self.num_robots): + self.time_arr[data_type][i] = np.arange(duration) + + + # Measurement Data Format + # subject ID = Landmark ID + # measurement_range = distance between robot and landmark + # bearing = angle betweeen robot and landmark, defined with respect to ROBOT'S FRAME OF REFERENCE + ''' + { + 'time': 1536012482.625, + 'subject_ID': 120, + 'measurment_range': 2.8757099026371695, + 'bearing': 0.29890824572449504 + } + ''' + self.measurement_data = [[] for i in range(self.num_robots)] + + # Odometry Data Format + ''' + { + 'time': 1536012462.22, + 'velocity': 0.0, + 'angular velocity': -0.2, + 'orientation': -0.4324032103332002, + 'delta_t': 0.09999990463256836 + } + ''' + self.odometry_data = [[] for i in range(self.num_robots)] + + # ground_truth formatting + # ORIENTATION DEFINED WITH RESPECT TO GLOBAL FRAME OF REFERENCE + ''' + { + 'time': 1536012433.141, + 'x_pos': -0.690963, + 'y_pos': -1.685781, + 'orientation': 0.03251611228318508 + } + ''' + self.groundtruth_data = [ [] for i in range(self.num_robots)] + + + # TODO - convert to dynamic generation, just generate ground truth data + def simulate_dataset(self): + + # Generate ground truth data + # NOTE - FOR NOW THIS WILL JUST MOVE ROBOTS IN A STRAIGHT LINE UNTIL THEY REACH THE BOUNDARY - THEN THEY WILL BE STATIONARY + for i, label in enumerate(self.robot_labels, 0): + # append starting state + self.groundtruth_data[i].append({'time' : self.starting_states[label][0], 'x_pos': self.starting_states[label][1], 'y_pos': self.starting_states[label][2], 'orientation' : self.starting_states[label][3]}) + for time in range(1, len(self.time_arr['groundtruth'][i])): + curr_x = self.groundtruth_data[i][time-1]['x_pos'] + curr_y = self.groundtruth_data[i][time-1]['y_pos'] + curr_orientation = self.groundtruth_data[i][time-1]['orientation'] + + next_x = curr_x + next_y = curr_y + next_orientation = curr_orientation + next_time = time + 1 + + if (self.within_map([curr_x + 1, curr_y + 1])): + next_x = curr_x + 1 + next_y = curr_y + + self.groundtruth_data[i].append({'time' : next_time, 'x_pos': next_x, 'y_pos': next_y, 'orientation': next_orientation}) + + # NOTE - modify incorporation of noise + # Generate Odometery Data + for i, label in enumerate(self.robot_labels, 0): + # Initial robot state + self.odometry_data[i].append({'time': 0, 'velocity' : 0, 'angular velocity' : 0, 'orientation' : self.groundtruth_data[i][0]['orientation'], + 'delta_t' : 0}) + for time in range(1, len(self.time_arr['odometry'][i])): + # NOTE - delta_t is currently hard coded to 1 as we only have ground truth data on each individual tick, this can be changed later on + delta_t = 1 + loc1 = [self.groundtruth_data[i][time-1]['x_pos'], self.groundtruth_data[i][time-1]['y_pos']] + loc2 = [self.groundtruth_data[i][time]['x_pos'], self.groundtruth_data[i][time]['y_pos']] + velocity = self.calc_distance(loc1, loc2)/delta_t + np.random.normal(loc=0.0,scale=self.odometry_noise) + + theta_1 = self.groundtruth_data[i][time-1]['orientation'] + theta_2 = self.groundtruth_data[i][time]['orientation'] + angular_velocity = (theta_2-theta_1)/delta_t + np.random.normal(loc=0.0,scale=0.1) + + self.odometry_data[i].append({'time': time, 'velocity' : velocity, 'angular velocity' : angular_velocity, + 'orientation' : self.groundtruth_data[i][time]['orientation'], 'delta_t' : delta_t}) + + # Generate Measurment Data + for i, label in enumerate(self.robot_labels, 0): + for time in range(0, len(self.time_arr['measurement'][i])): + for landmarkID, landmark_loc in self.landmark_map.items(): + robot_loc_x = self.groundtruth_data[i][time]['x_pos'] + robot_loc_y = self.groundtruth_data[i][time]['y_pos'] + robot_loc = [robot_loc_x, robot_loc_y] + if (self.within_vision(2*pi, robot_loc, landmark_loc)): + + measurment_range = self.calc_distance(robot_loc, landmark_loc) + np.random.normal(loc=0.0,scale=self.measurement_noise) + # NOTE - arctan returns between [-pi/2, pi/2] + bearing = 0 + # Same point + if (landmark_loc[1] == robot_loc[1] and landmark_loc[0] == robot_loc[0]): + # TODO - robots are NOT supposed to collide with landmarks + bearing = 0 + # either + or - pi/2 (delta_x = 0) + elif (landmark_loc[0] == robot_loc[0]): + if (landmark_loc[1] < robot_loc[1]): + bearing = -1*pi/2 + else: + bearing = pi/2 + else: + bearing = np.arctan((landmark_loc[1]-robot_loc[1])/(landmark_loc[0]-robot_loc[0])) + np.random.normal(loc=0.0,scale=0.1) + # We only want bearings between [0, 2pi] + if (bearing < 0): + bearing += 2*pi + + self.measurement_data[i].append({'time':time, 'subject_ID': landmarkID, 'measurment_range':measurment_range, 'bearing':bearing}) + + self.dataset_data = {'odometry': self.odometry_data, 'measurement': self.measurement_data, 'groundtruth': self.groundtruth_data} + return self.start_time, self.starting_states, self.dataset_data, self.time_arr + + def get_dataset_name(self): + return self.name + + def get_start_time(self): + return self.start_time + + def get_duration(self): + return self.duration + + def get_starting_states(self): + return self.starting_states + + def get_start_moving_times(self): + return self.start_moving_times + + def get_landmark_map(self): + return self.landmark_map + + def get_time_arr(self, data_category): + return self.time_arr[data_category] + + def get_robot_groundtruth(self, gt_time, robot_index): + gt = self.groundtruth_data[robot_index][gt_time] + return gt + + + def respond(self, req, current_time, need_specific_time = False): + + valid_respond, message, req_type, robot_idx, time_idx = self.get_dataline(req, current_time) + + current_time += 1 + + if (valid_respond): + req.set_time(time_idx) + req.set_message(message) + req.set_robot_idx(robot_idx) + req.set_type(req_type) + + print(req.get_type()) + return valid_respond, current_time, req + + # Alternates odometry and measurement updates + # TODO - dynamic generation + def get_dataline(self, req, current_time): + message = req.get_message() + + if message['robot_index'] == None: + if req.get_type() == None: + # Alternate between measurement and odometry requests + # TODO - change method of selecting robot indices - Suggestion (use a counter maintained within simulation) + req_type = self.next_request + req.set_type(req_type) + if (self.next_request == 'odometry'): + robot_idx = np.argmin(self.get_time_arr('odometry')) + self.next_request = 'measurement' + else: + robot_idx = np.argmin(self.get_time_arr('measurement')) + self.next_request = 'odometry' + else: + req_type = req.get_type() + time_arr = self.get_time_arr(req_type) + robot_idx = np.argmin(time_arr) + + message['robot_index'] = robot_idx + + time_idx = current_time + if self.dataset_data[req_type][robot_idx][time_idx]['time'] > self.end_time or time_idx == -1: + valid_dataline = False + else: + valid_dataline= True + message['data'] = self.dataset_data[req_type][robot_idx][time_idx] + message['groundtruth'] = self.groundtruth_data[robot_idx][time_idx] + return valid_dataline, message, req_type, robot_idx, time_idx + + # Helper Functions + + def generate_groundtruth(): + return True + + def generate_odometry(): + return True + + def generate_measurement(): + return True + + # Check if a given loc [x,y] is within map + def within_map(self, loc): + center_point = [0,0] + if sqrt(pow(loc[0]-center_point[0], 2)+pow(loc[1]-center_point[1], 2)) > self.boundary: + return False + else: + return True + + # Compute the distance betwen loc1 = [x1,y1] and loc2 = [x2,y2] + def calc_distance(self, loc1, loc2): + + x1 = loc1[0] + y1 = loc1[1] + + x2 = loc2[0] + y2 = loc2[1] + + distance = sqrt((x1-x2)**2 + (y1-y2)**2) + return distance + + # robot_vision = angular width of robot's view in reference to x_axis + # robot_loc & landmark_loc = [x,y] + def within_vision(self, robot_vision, robot_loc, landmark_loc): + + # arctan() returns between [-pi/2, pi/2] => Correct for our F.O.V application with respect to the x-axis as realistically F.O.V will not exceed pi/3 + bearing = 0 + # Same point + if (landmark_loc[1] == robot_loc[1] and landmark_loc[0] == robot_loc[0]): + bearing = 0 + # either + or - pi/2 (delta_x = 0) + elif (landmark_loc[0] == robot_loc[0]): + if (landmark_loc[1] < robot_loc[1]): + bearing = -1*pi/2 + else: + bearing = pi/2 + else: + bearing = np.arctan((landmark_loc[1]-robot_loc[1])/(landmark_loc[0]-robot_loc[0])) + np.random.normal(loc=0.0,scale=0.1) + return abs(bearing) < robot_vision/2 -- GitLab