Skip to content
Snippets Groups Projects
Commit b16f3a57 authored by Shengkang (William) Chen's avatar Shengkang (William) Chen
Browse files

Merge branch 'master' of git.uclalemur.com:billyskc/CoLo

parents f53cabd6 abd9f69c
Branches
No related merge requests found
......@@ -228,7 +228,9 @@ def animate_plot(dataset_labels, data_recorder, analyzer, lm = None):
#ax.legend(prop=fontP, bbox_to_anchor=(1.0, 0.8), loc=9, ncol=1)
ax.legend(prop=fontP, loc =1)
ax_var.legend(prop=fontP, bbox_to_anchor=(1.1, 0.8), loc=9, ncol=1)
# Show graph
plt.show()
# TODO - fix gif formatting
# (optional) save as GIF, comment the following line if you don't want this
ani.save('colo_demo.gif', writer="imagemagick", fps=60)
# https://imagemagick.org/ used for install
# ani.save('animated_plot.gif', writer="imagemagick", fps=60) # sort by date modified in Colo folder to quickly fine, don't forget to delete colo_demo frames
# print("******Saved gif in Colo folder!******")
plt.show()
......@@ -2,6 +2,7 @@ import numpy as np
from math import pi, sqrt, atan2, hypot, sin, cos
import matplotlib.pyplot as plt
# TODO - add collision/boundary checking for circle/random data
class DataGenerator():
def __init__(self, boundary, landmarks, duration, robot_labels, starting_states, start_time, delta_t,
velocity_noise = 0, angular_velocity_noise = 0, measurement_range_noise = 0, bearing_noise = 0, communication_noise = 0,
......@@ -86,8 +87,6 @@ class DataGenerator():
# Generates simulated of robots moving in a circle
def generate_circular_data(self, test):
# TODO - add boundary, collision checking
# NOTE - this method of generating a circular path goes from polygons (low duration) to rounder as duration increases
path_radius = self.boundary/(self.num_robots + 3)
path_circumference = 2*pi*path_radius
......@@ -112,7 +111,7 @@ class DataGenerator():
# Converts all orientations to be between pi and pi
if (next_orientation > pi or next_orientation < -pi):
next_orientation = self.converge_to_bearing_range(next_orientation)
next_orientation = self.converge_to_angle_range(next_orientation)
self.groundtruth_data[i].append({'time' : self.time_arr['groundtruth'][i][time_idx], 'x_pos': next_x, 'y_pos': next_y, 'orientation': next_orientation})
......@@ -127,6 +126,43 @@ class DataGenerator():
return self.time_arr, self.groundtruth_data, self.odometry_data, self.measurement_data, self.dataset_data
# Inputs: None
# Outputs: time_arr, groundtruth_data, self.odometry_data, measurement_data, self.dataset_data
# Generates simulated of robots moving randomly
def generate_random_data(self, test):
# Generate ground truth data
for i, label in enumerate(self.robot_labels, 0):
# append starting state
self.groundtruth_data[i].append({'time' : self.starting_states[label][0], 'x_pos': self.starting_states[label][1], 'y_pos': self.starting_states[label][2], 'orientation' : self.starting_states[label][3]})
for time_idx in range(1, len(self.time_arr['groundtruth'][i])):
curr_x = self.groundtruth_data[i][time_idx-1]['x_pos']
curr_y = self.groundtruth_data[i][time_idx-1]['y_pos']
curr_orientation = self.groundtruth_data[i][time_idx-1]['orientation']
# rotate some random number of radians between pi/4 and -pi/4
next_orientation = curr_orientation + np.random.uniform(-pi/3, pi/3)
next_orientation = self.converge_to_angle_range(next_orientation)
rand_dist = np.random.uniform(0, 0.5) # units of distance change are in
next_x = curr_x + rand_dist*cos(next_orientation)
next_y = curr_y + rand_dist*sin(next_orientation)
self.groundtruth_data[i].append({'time' : self.time_arr['groundtruth'][i][time_idx], 'x_pos': next_x, 'y_pos': next_y, 'orientation': next_orientation})
self.generate_odometry_data()
self.generate_measurement_data()
self.dataset_data = {'odometry': self.odometry_data, 'measurement': self.measurement_data, 'groundtruth': self.groundtruth_data}
if (test):
self.verify_generated_data()
return self.time_arr, self.groundtruth_data, self.odometry_data, self.measurement_data, self.dataset_data
# Inputs: None
# Outputs: time_arr, groundtruth_data, self.odometry_data, measurement_data, self.dataset_data
# Generates simulated data for robots moving completely randomly (within realism)
def generate_odometry_data(self):
for i, label in enumerate(self.robot_labels, 0):
# Initial robot state
......@@ -144,7 +180,7 @@ class DataGenerator():
self.odometry_data[i].append({'time': self.time_arr['odometry'][i][time_idx], 'velocity' : velocity, 'angular velocity' : angular_velocity,
'orientation' : self.groundtruth_data[i][time_idx]['orientation'], 'delta_t' : self.delta_t})
# TODO - add relative robot measurements
def generate_measurement_data(self):
# Generate Measurement Data
for robot_idx, label in enumerate(self.robot_labels, 0):
......@@ -167,7 +203,7 @@ class DataGenerator():
measurement_range = self.calc_distance(robot_loc, other_robot_loc) + np.random.normal(loc=0.0,scale=self.measurement_range_noise)
bearing = atan2((other_robot_loc[1]-robot_loc[1]), (other_robot_loc[0]-robot_loc[0])) + np.random.normal(loc=0.0,scale=self.bearing_noise) # global coordinates
bearing = bearing - robot_orientation # NOTE - local = global - orientation, all between 0 and 2*pi
bearing = self.converge_to_bearing_range(bearing)
bearing = self.converge_to_angle_range(bearing)
self.measurement_data[robot_idx].append({'time': time, 'subject_ID': robotID, 'measurment_range':measurement_range, 'bearing':bearing})
time_arr.append(time) # we expect repeats/skipped times in time array
......@@ -178,7 +214,7 @@ class DataGenerator():
measurement_range = self.calc_distance(robot_loc, landmark_loc) + np.random.normal(loc=0.0,scale=self.measurement_range_noise)
bearing = atan2((landmark_loc[1]-robot_loc[1]), (landmark_loc[0]-robot_loc[0])) + np.random.normal(loc=0.0,scale=self.bearing_noise) # global coordinates
bearing = bearing - robot_orientation # NOTE - local = global - orientation, all between 0 and 2*pi
bearing = self.converge_to_bearing_range(bearing)
bearing = self.converge_to_angle_range(bearing)
self.measurement_data[robot_idx].append({'time': time, 'subject_ID': landmarkID, 'measurment_range':measurement_range, 'bearing':bearing})
time_arr.append(time) # we expect repeats/skipped times in time array
......@@ -237,7 +273,7 @@ class DataGenerator():
actual_meas = self.calc_distance(curr_loc, landmark_loc)
generated_measurement = measurement['measurment_range']
actual_bearing = self.converge_to_bearing_range(atan2(landmark_loc[1]- y2, landmark_loc[0]-x2) - actual_o) # global - orientation = local, -pi to pi
actual_bearing = self.converge_to_angle_range(atan2(landmark_loc[1]- y2, landmark_loc[0]-x2) - actual_o) # global - orientation = local, -pi to pi
generated_bearing = measurement['bearing']
meas_diff = actual_meas - generated_measurement
......@@ -253,7 +289,7 @@ class DataGenerator():
actual_meas = self.calc_distance(curr_loc, other_robot_loc)
generated_measurement = measurement['measurment_range']
actual_bearing = self.converge_to_bearing_range(atan2(other_robot_loc[1]- y2, other_robot_loc[0]-x2) - actual_o) # global - orientation = local, -pi to pi
actual_bearing = self.converge_to_angle_range(atan2(other_robot_loc[1]- y2, other_robot_loc[0]-x2) - actual_o) # global - orientation = local, -pi to pi
generated_bearing = measurement['bearing']
meas_diff = actual_meas - generated_measurement
......@@ -331,7 +367,7 @@ class DataGenerator():
return abs(bearing) <= robot_fov/2
# get angle between -pi and pi
def converge_to_bearing_range(self, theta):
def converge_to_angle_range(self, theta):
while (theta < -pi):
theta += 2*pi
while (theta > pi):
......
from math import pi
import numpy as np
import IPython
# Import from other directories in repository
import sys
import os
......@@ -44,7 +46,7 @@ class SimulatedDataSetManager:
raise Exception("Invalid landmark ID: landmark IDs must be bigger than 5.")
# starting_states = {robot label: [time, x_pos, y_pos, orientation], ... }
# TODO - create a generate starting states function
# TODO - create a generate starting states function (potentially randomized to fit boundary)
self.starting_states = {label : [self.start_time, -boundary + float(i+1)*boundary/self.num_robots, 0, 0.0] for i,label in enumerate(self.robot_labels)}
# groundtruth & odometry are of fixed length=duration
......@@ -64,10 +66,13 @@ class SimulatedDataSetManager:
if (path == 'line'):
self.time_arr, self.groundtruth_data, self.odometry_data, self.measurement_data, self.dataset_data = self.generator.generate_straight_line_data(test)
return self.start_time, self.starting_states, self.dataset_data, self.time_arr
# TODO
elif(path=='circle'):
self.time_arr, self.groundtruth_data, self.odometry_data, self.measurement_data, self.dataset_data = self.generator.generate_circular_data(test)
return self.start_time, self.starting_states, self.dataset_data, self.time_arr
elif(path == 'random'):
self.time_arr, self.groundtruth_data, self.odometry_data, self.measurement_data, self.dataset_data = self.generator.generate_random_data(test)
self.measurement_data[0] = [] # TODO - to test simulated communication, trace of state variance should increase unless a communication update
return self.start_time, self.starting_states, self.dataset_data, self.time_arr
else:
raise Exception("Unsupported robot path specified.")
......@@ -92,10 +97,16 @@ class SimulatedDataSetManager:
def get_time_arr(self, data_category):
return self.time_arr[data_category]
# TODO - issues with simulation process communication module:
# issue appears to be time index is out of boutds
def get_robot_groundtruth(self, gt_time, robot_index):
gt = self.groundtruth_data[robot_index][gt_time]
while (gt_time >= self.end_time):
gt_time -= self.delta_t
gt_index = np.where(gt_time == self.time_arr['groundtruth'][robot_index])[0][0]
gt = self.groundtruth_data[robot_index][gt_index]
return gt
# Used to cycle through all robots in native simulation
def reset_robot_ctr(self):
if (self.curr_robot_idx > self.num_robots-1):
self.curr_robot_idx = 0
......
......@@ -26,6 +26,9 @@ from data_analysis.realtime_plot import animate_plot
from pprint import pprint
# load algorithms
# NOTE
# GS = global state -> distributed system
# LS = localized state -> centralized system.
sys.path.append(os.path.join(os.path.dirname(__file__), "localization_algos"))
from centralized_ekf import Centralized_EKF # works
from simple_ekf import Simple_EKF
......@@ -38,30 +41,31 @@ compname = getpass.getuser()
#dataset_path = "/home/"+ compname +"/CoLo/CoLo-D/UTIAS-Datasets/MRCLAM_Dataset3/"
# landmark IDs must be bigger than 5
landmarks = {10: [0,5], 11: [3,3], 12: [-3,0], 13: [-3, -2] }
landmarks = {10: [0,5] , 11: [3,3], 12: [-3,0], 13: [-3, -2] }
robot_labels = [1]
robot_labels = [1,2]
boundary = 5 # radius of circular map centered at 0,0
duration = 150 # (seconds)
# testing_dataset = SimulatedDataSetManager('testing', boundary, landmarks, duration, robot_labels,
# velocity_noise=0.01, angular_velocity_noise=0.01, measurement_range_noise=0.01, bearing_noise=0.01, robot_fov=2*pi)
# start_time, starting_states, dataset_data, time_arr = testing_dataset.simulate_dataset('circle', test=True)
duration = 45 # (seconds)
# NOTE - adjust noise if need to see drastic convergence/divergence
testing_dataset = SimulatedDataSetManager('testing', boundary, landmarks, duration, robot_labels,
velocity_noise=0.01, angular_velocity_noise=0.01, measurement_range_noise=0.01, bearing_noise=0.01, robot_fov=2*pi)
start_time, starting_states, dataset_data, time_arr = testing_dataset.simulate_dataset('random', test=False)
# Real-World DataSet
dataset_path = "D:/LEMUR/CoLo/CoLo-D/CoLo-Datasets/official_dataset1/"
testing_dataset = RW_Dataset_Manager('testing')
start_time, starting_states, dataset_data, time_arr = testing_dataset.load_datasets(dataset_path, robot_labels, duration,delay_start=20)
# dataset_path = "D:/LEMUR/CoLo/CoLo-D/CoLo-Datasets/official_dataset1/"
# testing_dataset = RW_Dataset_Manager('testing')
# start_time, starting_states, dataset_data, time_arr = testing_dataset.load_datasets(dataset_path, robot_labels, duration,delay_start=20)
analyzer = Analyzer('analyzer', robot_labels)
loc_algo = Simple_EKF('Simple_EKF')
robot = RobotSystem('robot', robot_labels, loc_algo, distr_sys = False)
loc_algo = EKF_GS_BOUND('EKF_GS_BOUND')
robot = RobotSystem('robot', robot_labels, loc_algo, distr_sys = True)
sim = SimulationManager('sim')
state_recorder = StatesRecorder('Simple_EKF', robot_labels)
state_recorder = StatesRecorder('EKF_GS_BOUND', robot_labels)
sim.sim_process_native(robot_labels, testing_dataset, robot, state_recorder, simple_plot = False, comm=False, simulated_comm = False)
# TODO - 3 robots, but one CANNOT take measurements and must rely on communication, see if convergent.
sim.sim_process_native(robot_labels, testing_dataset, robot, state_recorder, simple_plot = False, comm=True, simulated_comm = True)
loc_err_per_run, trace_per_run, t_arr = analyzer.calculate_loc_err_and_trace_state_variance_per_run(state_recorder, plot_graphs = False)
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment