Skip to content
Snippets Groups Projects
Commit d61c1995 authored by oceanpdoshi@g.ucla.edu's avatar oceanpdoshi@g.ucla.edu
Browse files

Added the simulated_dataset_manager.py

1st iteration, next step is to add dynamic generation.
parent 0e27e6cd
Branches
No related merge requests found
from math import pi, sqrt
import numpy as np
import IPython
# If need to import modules from other folders
'''
import sys
import os
import inspect
currentdir = os.path.dirname(os.path.abspath(inspect.getfile(inspect.currentframe())))
parentdir = os.path.dirname(currentdir)
sys.path.insert(0,parentdir)
from requests.request_response import Request_response
'''
# TODO - incorporate communcation (LQI, RSS) as a threshold/cutoff for communication
class SimulatedDataSetManager:
def __init__(self, dataset_name, boundary, landmarks, duration, robot_labels,
odometry_noise, communication_noise, measurement_noise, robot_vision = pi/3):
self.name = dataset_name
# Used for request scheduling
self.next_request = 'odometry'
self.duration = duration
self.robot_labels = robot_labels
self.num_robots = len(robot_labels)
# angular width of robots's vision with respect to the x_axis (radians)
self.robot_vision = robot_vision
# User-specified noises
# TODO - may need to add more choice in the future (Ex: specify angular velocity noise)
self.odometry_noise = odometry_noise
self.communication_noise = communication_noise
self.measurement_noise = measurement_noise
self.boundary = boundary
# All robots begin moving at t = 0s
self.start_time = 0
self.end_time = self.start_time + self.duration
self.start_moving_times = [0 for label in robot_labels]
# landmarks = {landmark ID: [x,y]}
self.landmark_map = landmarks
# starting_states = {robot label: [time, x_pos, y_pos, orientation], ... }
# TODO - Update this to be user-specified on how to initialize robot locations.
self.starting_states = {label : [0, i, i, 0] for i,label in enumerate(robot_labels)}
# in the simulation all the times for all the measurements are the same.
self.time_arr = {'odometry': [[] for i in range(self.num_robots)], 'measurement': [[] for i in range(self.num_robots)],
'groundtruth': [[] for i in range(self.num_robots)]}
for data_type in self.time_arr:
for i in range(self.num_robots):
self.time_arr[data_type][i] = np.arange(duration)
# Measurement Data Format
# subject ID = Landmark ID
# measurement_range = distance between robot and landmark
# bearing = angle betweeen robot and landmark, defined with respect to ROBOT'S FRAME OF REFERENCE
'''
{
'time': 1536012482.625,
'subject_ID': 120,
'measurment_range': 2.8757099026371695,
'bearing': 0.29890824572449504
}
'''
self.measurement_data = [[] for i in range(self.num_robots)]
# Odometry Data Format
'''
{
'time': 1536012462.22,
'velocity': 0.0,
'angular velocity': -0.2,
'orientation': -0.4324032103332002,
'delta_t': 0.09999990463256836
}
'''
self.odometry_data = [[] for i in range(self.num_robots)]
# ground_truth formatting
# ORIENTATION DEFINED WITH RESPECT TO GLOBAL FRAME OF REFERENCE
'''
{
'time': 1536012433.141,
'x_pos': -0.690963,
'y_pos': -1.685781,
'orientation': 0.03251611228318508
}
'''
self.groundtruth_data = [ [] for i in range(self.num_robots)]
# TODO - convert to dynamic generation, just generate ground truth data
def simulate_dataset(self):
# Generate ground truth data
# NOTE - FOR NOW THIS WILL JUST MOVE ROBOTS IN A STRAIGHT LINE UNTIL THEY REACH THE BOUNDARY - THEN THEY WILL BE STATIONARY
for i, label in enumerate(self.robot_labels, 0):
# append starting state
self.groundtruth_data[i].append({'time' : self.starting_states[label][0], 'x_pos': self.starting_states[label][1], 'y_pos': self.starting_states[label][2], 'orientation' : self.starting_states[label][3]})
for time in range(1, len(self.time_arr['groundtruth'][i])):
curr_x = self.groundtruth_data[i][time-1]['x_pos']
curr_y = self.groundtruth_data[i][time-1]['y_pos']
curr_orientation = self.groundtruth_data[i][time-1]['orientation']
next_x = curr_x
next_y = curr_y
next_orientation = curr_orientation
next_time = time + 1
if (self.within_map([curr_x + 1, curr_y + 1])):
next_x = curr_x + 1
next_y = curr_y
self.groundtruth_data[i].append({'time' : next_time, 'x_pos': next_x, 'y_pos': next_y, 'orientation': next_orientation})
# NOTE - modify incorporation of noise
# Generate Odometery Data
for i, label in enumerate(self.robot_labels, 0):
# Initial robot state
self.odometry_data[i].append({'time': 0, 'velocity' : 0, 'angular velocity' : 0, 'orientation' : self.groundtruth_data[i][0]['orientation'],
'delta_t' : 0})
for time in range(1, len(self.time_arr['odometry'][i])):
# NOTE - delta_t is currently hard coded to 1 as we only have ground truth data on each individual tick, this can be changed later on
delta_t = 1
loc1 = [self.groundtruth_data[i][time-1]['x_pos'], self.groundtruth_data[i][time-1]['y_pos']]
loc2 = [self.groundtruth_data[i][time]['x_pos'], self.groundtruth_data[i][time]['y_pos']]
velocity = self.calc_distance(loc1, loc2)/delta_t + np.random.normal(loc=0.0,scale=self.odometry_noise)
theta_1 = self.groundtruth_data[i][time-1]['orientation']
theta_2 = self.groundtruth_data[i][time]['orientation']
angular_velocity = (theta_2-theta_1)/delta_t + np.random.normal(loc=0.0,scale=0.1)
self.odometry_data[i].append({'time': time, 'velocity' : velocity, 'angular velocity' : angular_velocity,
'orientation' : self.groundtruth_data[i][time]['orientation'], 'delta_t' : delta_t})
# Generate Measurment Data
for i, label in enumerate(self.robot_labels, 0):
for time in range(0, len(self.time_arr['measurement'][i])):
for landmarkID, landmark_loc in self.landmark_map.items():
robot_loc_x = self.groundtruth_data[i][time]['x_pos']
robot_loc_y = self.groundtruth_data[i][time]['y_pos']
robot_loc = [robot_loc_x, robot_loc_y]
if (self.within_vision(2*pi, robot_loc, landmark_loc)):
measurment_range = self.calc_distance(robot_loc, landmark_loc) + np.random.normal(loc=0.0,scale=self.measurement_noise)
# NOTE - arctan returns between [-pi/2, pi/2]
bearing = 0
# Same point
if (landmark_loc[1] == robot_loc[1] and landmark_loc[0] == robot_loc[0]):
# TODO - robots are NOT supposed to collide with landmarks
bearing = 0
# either + or - pi/2 (delta_x = 0)
elif (landmark_loc[0] == robot_loc[0]):
if (landmark_loc[1] < robot_loc[1]):
bearing = -1*pi/2
else:
bearing = pi/2
else:
bearing = np.arctan((landmark_loc[1]-robot_loc[1])/(landmark_loc[0]-robot_loc[0])) + np.random.normal(loc=0.0,scale=0.1)
# We only want bearings between [0, 2pi]
if (bearing < 0):
bearing += 2*pi
self.measurement_data[i].append({'time':time, 'subject_ID': landmarkID, 'measurment_range':measurment_range, 'bearing':bearing})
self.dataset_data = {'odometry': self.odometry_data, 'measurement': self.measurement_data, 'groundtruth': self.groundtruth_data}
return self.start_time, self.starting_states, self.dataset_data, self.time_arr
def get_dataset_name(self):
return self.name
def get_start_time(self):
return self.start_time
def get_duration(self):
return self.duration
def get_starting_states(self):
return self.starting_states
def get_start_moving_times(self):
return self.start_moving_times
def get_landmark_map(self):
return self.landmark_map
def get_time_arr(self, data_category):
return self.time_arr[data_category]
def get_robot_groundtruth(self, gt_time, robot_index):
gt = self.groundtruth_data[robot_index][gt_time]
return gt
def respond(self, req, current_time, need_specific_time = False):
valid_respond, message, req_type, robot_idx, time_idx = self.get_dataline(req, current_time)
current_time += 1
if (valid_respond):
req.set_time(time_idx)
req.set_message(message)
req.set_robot_idx(robot_idx)
req.set_type(req_type)
print(req.get_type())
return valid_respond, current_time, req
# Alternates odometry and measurement updates
# TODO - dynamic generation
def get_dataline(self, req, current_time):
message = req.get_message()
if message['robot_index'] == None:
if req.get_type() == None:
# Alternate between measurement and odometry requests
# TODO - change method of selecting robot indices - Suggestion (use a counter maintained within simulation)
req_type = self.next_request
req.set_type(req_type)
if (self.next_request == 'odometry'):
robot_idx = np.argmin(self.get_time_arr('odometry'))
self.next_request = 'measurement'
else:
robot_idx = np.argmin(self.get_time_arr('measurement'))
self.next_request = 'odometry'
else:
req_type = req.get_type()
time_arr = self.get_time_arr(req_type)
robot_idx = np.argmin(time_arr)
message['robot_index'] = robot_idx
time_idx = current_time
if self.dataset_data[req_type][robot_idx][time_idx]['time'] > self.end_time or time_idx == -1:
valid_dataline = False
else:
valid_dataline= True
message['data'] = self.dataset_data[req_type][robot_idx][time_idx]
message['groundtruth'] = self.groundtruth_data[robot_idx][time_idx]
return valid_dataline, message, req_type, robot_idx, time_idx
# Helper Functions
def generate_groundtruth():
return True
def generate_odometry():
return True
def generate_measurement():
return True
# Check if a given loc [x,y] is within map
def within_map(self, loc):
center_point = [0,0]
if sqrt(pow(loc[0]-center_point[0], 2)+pow(loc[1]-center_point[1], 2)) > self.boundary:
return False
else:
return True
# Compute the distance betwen loc1 = [x1,y1] and loc2 = [x2,y2]
def calc_distance(self, loc1, loc2):
x1 = loc1[0]
y1 = loc1[1]
x2 = loc2[0]
y2 = loc2[1]
distance = sqrt((x1-x2)**2 + (y1-y2)**2)
return distance
# robot_vision = angular width of robot's view in reference to x_axis
# robot_loc & landmark_loc = [x,y]
def within_vision(self, robot_vision, robot_loc, landmark_loc):
# arctan() returns between [-pi/2, pi/2] => Correct for our F.O.V application with respect to the x-axis as realistically F.O.V will not exceed pi/3
bearing = 0
# Same point
if (landmark_loc[1] == robot_loc[1] and landmark_loc[0] == robot_loc[0]):
bearing = 0
# either + or - pi/2 (delta_x = 0)
elif (landmark_loc[0] == robot_loc[0]):
if (landmark_loc[1] < robot_loc[1]):
bearing = -1*pi/2
else:
bearing = pi/2
else:
bearing = np.arctan((landmark_loc[1]-robot_loc[1])/(landmark_loc[0]-robot_loc[0])) + np.random.normal(loc=0.0,scale=0.1)
return abs(bearing) < robot_vision/2
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment