Skip to content
Snippets Groups Projects
Commit cf048d39 authored by oceanpdoshi@g.ucla.edu's avatar oceanpdoshi@g.ucla.edu
Browse files

Added circular path data generator

(only had to modify generation of groundtruths)
changed initial starting states of robots in dm
added some basic path plotting code to test_simulation
parent 042bb0f4
Branches
No related merge requests found
import numpy as np
from math import pi, sqrt, atan2, hypot
from math import pi, sqrt, atan2, hypot, sin, cos
class DataGenerator():
def __init__(self, boundary, landmarks, duration, robot_labels, starting_states, start_time, delta_t,
......@@ -44,7 +44,7 @@ class DataGenerator():
# Inputs: None
# Outputs: time_arr, groundtruth_data, self.odometry_data, measurement_data, self.dataset_data
# Generates simulated data based upon simulation parameters passed from user to SimulatedDataSetManager
# Generates simulated data based of robots moving in a straight line until simulation ends/robots hit the boundary and stop moving
def generate_straight_line_data(self):
# Generate ground truth data
......@@ -56,8 +56,6 @@ class DataGenerator():
curr_y = self.groundtruth_data[i][time_idx-1]['y_pos']
curr_orientation = self.groundtruth_data[i][time_idx-1]['orientation']
next_x = curr_x
next_y = curr_y
next_orientation = curr_orientation
if (self.within_map([curr_x + 1, curr_y + 1])):
......@@ -107,6 +105,80 @@ class DataGenerator():
return self.time_arr, self.groundtruth_data, self.odometry_data, self.measurement_data, self.dataset_data
# Inputs: None
# Outputs: time_arr, groundtruth_data, self.odometry_data, measurement_data, self.dataset_data
# Generates simulated of robots moving in a circle
def generate_circular_data(self):
# TODO - add boundary, collision checking
# NOTE - this method of generating a circular path goes from polygons (low duration) to rounder as duration increases
path_radius = self.boundary/self.num_robots
path_circumference = 2*pi*path_radius
radian_step = 2*pi/self.duration
velocity_step = path_circumference/self.duration
distance_step = velocity_step*self.delta_t
# Generate ground truth data
for i, label in enumerate(self.robot_labels, 0):
# append starting state
self.groundtruth_data[i].append({'time' : self.starting_states[label][0], 'x_pos': self.starting_states[label][1], 'y_pos': self.starting_states[label][2], 'orientation' : self.starting_states[label][3]})
for time_idx in range(1, len(self.time_arr['groundtruth'][i])):
curr_x = self.groundtruth_data[i][time_idx-1]['x_pos']
curr_y = self.groundtruth_data[i][time_idx-1]['y_pos']
curr_orientation = self.groundtruth_data[i][time_idx-1]['orientation']
next_x = curr_x + distance_step*cos(radian_step + curr_orientation)
next_y = curr_y + distance_step*sin(radian_step + curr_orientation)
next_orientation = curr_orientation + radian_step # TODO - Check if orientaion is between 0 and 2pi
if (next_orientation > 2*pi): # later on we may have robots go in multiple loops
next_orientation -= 2*pi
self.groundtruth_data[i].append({'time' : self.time_arr['groundtruth'][i][time_idx], 'x_pos': next_x, 'y_pos': next_y, 'orientation': next_orientation})
# Generate Odometery Data
for i, label in enumerate(self.robot_labels, 0):
# Initial robot state
self.odometry_data[i].append({'time': self.start_time, 'velocity' : 0, 'angular velocity' : 0, 'orientation' : self.groundtruth_data[i][0]['orientation'],
'delta_t' : 0})
for time_idx in range(1, len(self.time_arr['odometry'][i])):
loc1 = [self.groundtruth_data[i][time_idx-1]['x_pos'], self.groundtruth_data[i][time_idx-1]['y_pos']]
loc2 = [self.groundtruth_data[i][time_idx]['x_pos'], self.groundtruth_data[i][time_idx]['y_pos']]
velocity = self.calc_distance(loc1, loc2)/self.delta_t + np.random.normal(loc=0.0,scale=self.velocity_noise)
theta_1 = self.groundtruth_data[i][time_idx-1]['orientation']
theta_2 = self.groundtruth_data[i][time_idx]['orientation']
angular_velocity = (theta_2-theta_1)/self.delta_t + np.random.normal(loc=0.0,scale=self.angular_velocity_noise)
self.odometry_data[i].append({'time': self.time_arr['odometry'][i][time_idx], 'velocity' : velocity, 'angular velocity' : angular_velocity,
'orientation' : self.groundtruth_data[i][time_idx]['orientation'], 'delta_t' : self.delta_t})
# Generate Measurement Data
for robot_idx, label in enumerate(self.robot_labels, 0):
# NOTE - time_arr is used to emplace a one-to-one index-to-time correspondence
# between measurement_data and time_arr['measurement'] to account for varing number of landmark observations
time_arr = []
# iterate over all possible times and (potentially) observable landmarks
for time_idx in range(0, len(self.time_arr['measurement'][i])):
for landmarkID, landmark_loc in self.landmark_map.items():
time = self.time_arr['measurement'][robot_idx][time_idx]
robot_loc_x = self.groundtruth_data[robot_idx][time_idx]['x_pos']
robot_loc_y = self.groundtruth_data[robot_idx][time_idx]['y_pos']
robot_loc = [robot_loc_x, robot_loc_y]
if (self.within_vision(self.robot_fov, robot_loc, landmark_loc)):
measurement_range = self.calc_distance(robot_loc, landmark_loc) + np.random.normal(loc=0.0,scale=self.measurement_range_noise)
bearing = atan2((landmark_loc[1]-robot_loc[1]), (landmark_loc[0]-robot_loc[0])) + np.random.normal(loc=0.0,scale=self.bearing_noise)
self.measurement_data[robot_idx].append({'time': time, 'subject_ID': landmarkID, 'measurment_range':measurement_range, 'bearing':bearing})
time_arr.append(time) # we expect repeats/skipped times in time array
self.time_arr['measurement'][robot_idx] = time_arr
self.dataset_data = {'odometry': self.odometry_data, 'measurement': self.measurement_data, 'groundtruth': self.groundtruth_data}
return self.time_arr, self.groundtruth_data, self.odometry_data, self.measurement_data, self.dataset_data
# Input: loc=[x,y]
# Output: boolean if the point [x,y] is within the map
def within_map(self, loc):
......
......@@ -15,6 +15,7 @@ from dataset_manager.data_generator import DataGenerator
# TODO
# - Circular path data in DataGenerator class
# - communication
# - relative observations
# - animated plots
class SimulatedDataSetManager:
......@@ -23,6 +24,7 @@ class SimulatedDataSetManager:
robot_fov = 2*pi/3):
self.name = dataset_name
self.boundary = float(boundary)
self.duration = duration
self.robot_labels = robot_labels
self.num_robots = len(robot_labels)
......@@ -44,7 +46,8 @@ class SimulatedDataSetManager:
raise Exception("Invalid landmark ID: landmark IDs must be bigger than 5.")
# starting_states = {robot label: [time, x_pos, y_pos, orientation], ... }
self.starting_states = {label : [self.start_time, float(i), float(i), 0.0] for i,label in enumerate(self.robot_labels)}
# TODO - create a generate starting states function
self.starting_states = {label : [self.start_time, -boundary + float(i+2)*boundary/self.num_robots, 0, 0.0] for i,label in enumerate(self.robot_labels)}
# groundtruth & odometry are of fixed length=duration
# measurement_data is of variable length
......@@ -65,7 +68,8 @@ class SimulatedDataSetManager:
return self.start_time, self.starting_states, self.dataset_data, self.time_arr
# TODO
elif(path=='circle'):
pass
self.time_arr, self.groundtruth_data, self.odometry_data, self.measurement_data, self.dataset_data = self.generator.generate_circular_data()
return self.start_time, self.starting_states, self.dataset_data, self.time_arr
else:
raise Exception("Unsupported robot path specified.")
......
......@@ -9,6 +9,8 @@ Created on Apr 17 2019
import os, sys
import getpass
import IPython
import numpy as np
from math import pi
from matplotlib import pyplot as plt
from matplotlib import animation
......@@ -36,17 +38,46 @@ compname = getpass.getuser()
#dataset_path = "/home/"+ compname +"/CoLo/CoLo-D/UTIAS-Datasets/MRCLAM_Dataset3/"
# landmark IDs must be bigger than 5
landmarks = {10: [2,5]}
landmarks = {10: [0, 2], 11: [25,0], 12: [0,25], 13: [-25,0], 14: [0, -25] }
robot_labels = [1]
boundary = 100 # size of circular map
duration = 120 # duration for the simulation in sec
robot_labels = [1,2,3]
boundary = 25 # radius of circular map centered at 0,0
duration = 120 # (seconds)
# NOTE - noise significantly affects the results
testing_dataset = SimulatedDataSetManager('testing', boundary, landmarks, duration, robot_labels, velocity_noise=0.01, angular_velocity_noise=0.01,
measurement_range_noise=0.01, bearing_noise=0.01 )
start_time, starting_states, dataset_data, time_arr = testing_dataset.simulate_dataset()
testing_dataset = SimulatedDataSetManager('testing', boundary, landmarks, duration, robot_labels,
velocity_noise=0.01, angular_velocity_noise=0.01, measurement_range_noise=0.01, bearing_noise=0.01, robot_fov=2*pi)
start_time, starting_states, dataset_data, time_arr = testing_dataset.simulate_dataset('circle')
robot_1_groundtruths = dataset_data['groundtruth'][0]
robot_2_groundtruths = dataset_data['groundtruth'][1]
robot_3_groundtruths = dataset_data['groundtruth'][2]
landmark_x = [val[0] for key,val in landmarks.items()]
landmark_y = [val[1] for key,val in landmarks.items()]
x1_array = np.array([groundtruth['x_pos'] for groundtruth in robot_1_groundtruths])
y1_array = np.array([groundtruth['y_pos'] for groundtruth in robot_1_groundtruths])
x2_array = np.array([groundtruth['x_pos'] for groundtruth in robot_2_groundtruths])
y2_array = np.array([groundtruth['y_pos'] for groundtruth in robot_2_groundtruths])
x3_array = np.array([groundtruth['x_pos'] for groundtruth in robot_3_groundtruths])
y3_array = np.array([groundtruth['y_pos'] for groundtruth in robot_3_groundtruths])
boundary_circle = plt.Circle((0,0), boundary, color='#B9F1BA', label='boundary')
plt.title('Robot Paths')
plt.xlabel('x [m]')
plt.ylabel('y [m]')
plt.gcf().gca().add_artist(boundary_circle)
plt.gcf().gca().set_xlim(-2*boundary, 2*boundary)
plt.gcf().gca().set_ylim(-2*boundary, 2*boundary)
plt.plot(x1_array, y1_array, label='Robot 1')
plt.plot(x2_array, y2_array, label='Robot 2')
plt.plot(x3_array, y3_array, label='Robot 3')
plt.plot(landmark_x, landmark_y, 'k.', label='Landmarks' )
plt.legend(loc='best')
# Real-World DataSet
# dataset_path = "D:/LEMUR/CoLo/CoLo-D/CoLo-Datasets/official_dataset1/"
......@@ -64,5 +95,5 @@ state_recorder_bda = StatesRecorder('CentralizedEKF', robot_labels)
sim.sim_process_native(robot_labels, testing_dataset, robot, state_recorder_bda, simple_plot = True, comm=False, simulated_comm = False)
loc_err_per_run, trace_per_run, t_arr = analyzer.calculate_loc_err_and_trace_state_variance_per_run(state_recorder_bda, plot_graphs = True)
# loc_err_per_run, trace_per_run, t_arr = analyzer.calculate_loc_err_and_trace_state_variance_per_run(state_recorder_bda, plot_graphs = True)
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment