Skip to content
Snippets Groups Projects
Commit c403b6c9 authored by Shengkang (William) Chen's avatar Shengkang (William) Chen
Browse files

Merge branch 'master' of https://git.uclalemur.com/billyskc/CoLo

parents e5d6bff2 e7bed138
Branches
No related merge requests found
...@@ -2,7 +2,6 @@ import numpy as np ...@@ -2,7 +2,6 @@ import numpy as np
from math import pi, sqrt, atan2, hypot, sin, cos from math import pi, sqrt, atan2, hypot, sin, cos
import matplotlib.pyplot as plt import matplotlib.pyplot as plt
class DataGenerator(): class DataGenerator():
def __init__(self, boundary, landmarks, duration, robot_labels, starting_states, start_time, delta_t, def __init__(self, boundary, landmarks, duration, robot_labels, starting_states, start_time, delta_t,
velocity_noise = 0, angular_velocity_noise = 0, measurement_range_noise = 0, bearing_noise = 0, communication_noise = 0, velocity_noise = 0, angular_velocity_noise = 0, measurement_range_noise = 0, bearing_noise = 0, communication_noise = 0,
...@@ -145,22 +144,36 @@ class DataGenerator(): ...@@ -145,22 +144,36 @@ class DataGenerator():
self.odometry_data[i].append({'time': self.time_arr['odometry'][i][time_idx], 'velocity' : velocity, 'angular velocity' : angular_velocity, self.odometry_data[i].append({'time': self.time_arr['odometry'][i][time_idx], 'velocity' : velocity, 'angular velocity' : angular_velocity,
'orientation' : self.groundtruth_data[i][time_idx]['orientation'], 'delta_t' : self.delta_t}) 'orientation' : self.groundtruth_data[i][time_idx]['orientation'], 'delta_t' : self.delta_t})
# TODO - add relative robot measurements
def generate_measurement_data(self): def generate_measurement_data(self):
# Generate Measurement Data # Generate Measurement Data
for robot_idx, label in enumerate(self.robot_labels, 0): for robot_idx, label in enumerate(self.robot_labels, 0):
# NOTE - time_arr is used to emplace a one-to-one index-to-time correspondence # NOTE - time_arr is used to emplace a one-to-one index-to-time correspondence
# between measurement_data and time_arr['measurement'] to account for varing number of landmark observations # between measurement_data and time_arr['measurement'] to account for varing number of landmark observations
time_arr = [] time_arr = []
# iterate over all possible times and (potentially) observable landmarks
for time_idx in range(0, len(self.time_arr['measurement'][robot_idx])): for time_idx in range(0, len(self.time_arr['measurement'][robot_idx])):
for landmarkID, landmark_loc in self.landmark_map.items():
time = self.time_arr['measurement'][robot_idx][time_idx]
time = self.time_arr['measurement'][robot_idx][time_idx] robot_loc_x = self.groundtruth_data[robot_idx][time_idx]['x_pos']
robot_loc_x = self.groundtruth_data[robot_idx][time_idx]['x_pos'] robot_loc_y = self.groundtruth_data[robot_idx][time_idx]['y_pos']
robot_loc_y = self.groundtruth_data[robot_idx][time_idx]['y_pos'] robot_orientation = self.groundtruth_data[robot_idx][time_idx]['orientation']
robot_orientation = self.groundtruth_data[robot_idx][time_idx]['orientation'] robot_loc = [robot_loc_x, robot_loc_y]
robot_loc = [robot_loc_x, robot_loc_y]
# Relative (other robots) observations
other_robot_locs = {label : [self.groundtruth_data[i][time_idx]['x_pos'], self.groundtruth_data[i][time_idx]['y_pos']] for i, label in enumerate(self.robot_labels, 0) if i != robot_idx }
for robotID, other_robot_loc in other_robot_locs.items():
if (self.within_vision(self.robot_fov, robot_loc, other_robot_loc, robot_orientation)):
measurement_range = self.calc_distance(robot_loc, other_robot_loc) + np.random.normal(loc=0.0,scale=self.measurement_range_noise)
bearing = atan2((other_robot_loc[1]-robot_loc[1]), (other_robot_loc[0]-robot_loc[0])) + np.random.normal(loc=0.0,scale=self.bearing_noise) # global coordinates
bearing = bearing - robot_orientation # NOTE - local = global - orientation, all between 0 and 2*pi
bearing = self.converge_to_bearing_range(bearing)
self.measurement_data[robot_idx].append({'time': time, 'subject_ID': robotID, 'measurment_range':measurement_range, 'bearing':bearing})
time_arr.append(time) # we expect repeats/skipped times in time array
# Landmark (absolute) observations
for landmarkID, landmark_loc in self.landmark_map.items():
if (self.within_vision(self.robot_fov, robot_loc, landmark_loc, robot_orientation)): if (self.within_vision(self.robot_fov, robot_loc, landmark_loc, robot_orientation)):
measurement_range = self.calc_distance(robot_loc, landmark_loc) + np.random.normal(loc=0.0,scale=self.measurement_range_noise) measurement_range = self.calc_distance(robot_loc, landmark_loc) + np.random.normal(loc=0.0,scale=self.measurement_range_noise)
bearing = atan2((landmark_loc[1]-robot_loc[1]), (landmark_loc[0]-robot_loc[0])) + np.random.normal(loc=0.0,scale=self.bearing_noise) # global coordinates bearing = atan2((landmark_loc[1]-robot_loc[1]), (landmark_loc[0]-robot_loc[0])) + np.random.normal(loc=0.0,scale=self.bearing_noise) # global coordinates
...@@ -217,18 +230,36 @@ class DataGenerator(): ...@@ -217,18 +230,36 @@ class DataGenerator():
measurements = [measurement for measurement in measurement_list if measurement['time']==curr_time] measurements = [measurement for measurement in measurement_list if measurement['time']==curr_time]
curr_loc = [x2,y2] curr_loc = [x2,y2]
for measurement in measurements: for measurement in measurements:
landmark_loc = self.landmark_map[measurement['subject_ID']] # Verify a landmark measurement
if (measurement['subject_ID'] > 5):
actual_meas = self.calc_distance(curr_loc, landmark_loc) landmark_loc = self.landmark_map[measurement['subject_ID']]
generated_measurement = measurement['measurment_range']
actual_meas = self.calc_distance(curr_loc, landmark_loc)
generated_measurement = measurement['measurment_range']
actual_bearing = self.converge_to_bearing_range(atan2(landmark_loc[1]- y2, landmark_loc[0]-x2) - actual_o) # global - orientation = local, -pi to pi
generated_bearing = measurement['bearing']
meas_diff = actual_meas - generated_measurement
bearing_diff = actual_bearing - generated_bearing
meas_test_arr[i].append({'meas_diff': meas_diff, 'bearing_diff': bearing_diff, 'time': curr_time})
else:
other_robot_loc_x = self.groundtruth_data[self.robot_labels.index(measurement['subject_ID'])][g_idx]['x_pos']
other_robot_loc_y = self.groundtruth_data[self.robot_labels.index(measurement['subject_ID'])][g_idx]['y_pos']
other_robot_loc = [other_robot_loc_x, other_robot_loc_y]
actual_meas = self.calc_distance(curr_loc, other_robot_loc)
generated_measurement = measurement['measurment_range']
actual_bearing = self.converge_to_bearing_range(atan2(landmark_loc[1]- y2, landmark_loc[0]-x2) - actual_o) # global - orientation = local, -pi to pi actual_bearing = self.converge_to_bearing_range(atan2(other_robot_loc[1]- y2, other_robot_loc[0]-x2) - actual_o) # global - orientation = local, -pi to pi
generated_bearing = measurement['bearing'] generated_bearing = measurement['bearing']
meas_diff = actual_meas - generated_measurement meas_diff = actual_meas - generated_measurement
bearing_diff = actual_bearing - generated_bearing bearing_diff = actual_bearing - generated_bearing
meas_test_arr[i].append({'meas_diff': meas_diff, 'bearing_diff': bearing_diff, 'time': curr_time}) meas_test_arr[i].append({'meas_diff': meas_diff, 'bearing_diff': bearing_diff, 'time': curr_time})
fig, ax_arr = plt.subplots(6) fig, ax_arr = plt.subplots(6)
plt.xlabel('t [s]') plt.xlabel('t [s]')
...@@ -292,15 +323,12 @@ class DataGenerator(): ...@@ -292,15 +323,12 @@ class DataGenerator():
return distance return distance
# NOTE - add distance limitations to robot_vision? # NOTE - add distance limitations to robot_vision?
# TODO - FIX THIS
# Input: robot_loc, landmark_loc, [x,y] # Input: robot_loc, landmark_loc, [x,y]
# Output: boolean, a given point is with the field of view (sector of a circle) of a robot # Output: boolean, a given point is with the field of view (sector of a circle) of a robot
def within_vision(self, robot_fov, robot_loc, landmark_loc, robot_orientation=0): def within_vision(self, robot_fov, robot_loc, landmark_loc, robot_orientation=0):
# between pi and -pi with respect to the x axis # between pi and -pi with respect to LOCAL FRAME
bearing = atan2((landmark_loc[1]-robot_loc[1]),(landmark_loc[0]-robot_loc[0])) bearing = atan2((landmark_loc[1]-robot_loc[1]),(landmark_loc[0]-robot_loc[0])) - robot_orientation
if bearing < 0: return abs(bearing) <= robot_fov/2
bearing += 2*pi
return True # bearing <= robot_fov/2 + robot_orientation and bearing >= robot_orientation - robot_fov/2
# get angle between -pi and pi # get angle between -pi and pi
def converge_to_bearing_range(self, theta): def converge_to_bearing_range(self, theta):
......
...@@ -13,9 +13,8 @@ from dataset_manager.data_generator import DataGenerator ...@@ -13,9 +13,8 @@ from dataset_manager.data_generator import DataGenerator
# TODO # TODO
# - Circular path data in DataGenerator class # communication
# - communication # scheduling
# - relative observations
class SimulatedDataSetManager: class SimulatedDataSetManager:
def __init__(self, dataset_name, boundary, landmarks, duration, robot_labels, def __init__(self, dataset_name, boundary, landmarks, duration, robot_labels,
......
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on Tue Feb 13 16:11:36 2018
@author: william
"""
class DataGenerator():
def init_(self):
pass
def within_map(self, loc):
'''
For runtime self_generated data,
Parameters:
loc: array, shape = (2,1)
robot's location for [x, y]
Return: Boolean
True if within the map, False if not
'''
center_point = [0,0]
if sqrt(pow(loc[0]-center_point[0], 2)+pow(loc[1]-center_point[1], 2)) > d_max:
return False
else:
return True
def generate_propgation_data(self, current_loc, current_orientation):
'''
To generate propagtion data randomly so that it will run within predefined map *assuming orientation is perfect
Parameters:
current_loc: array, shape = (2,1)
robot's current location for [x, y]
current_orientation: float
robot's current orientation in radian
Return array, shape = (2,1)
'''
[meas_v, meas_a_v] = [random.uniform(-max_v,max_v), random.uniform(-max_a_v, max_a_v)]
actual_v = meas_v + random.normal(0, sqrt(var_u_v))
pre_update_position = [current_loc[0] + cos(current_orientation)*actual_v*delta_t, current_loc[1] + sin(current_orientation)*actual_v*delta_t]
while(not within_map(pre_update_position)):
[meas_v, meas_a_v] = [random.uniform(-max_v,max_v), random.uniform(-max_a_v, max_a_v)]
actual_v = meas_v + random.normal(0, sqrt(var_u_v))
pre_update_position = [current_loc[0] + cos(current_orientation)*actual_v*delta_t, current_loc[1] + sin(current_orientation)*actual_v*delta_t]
orientation = current_orientation + meas_a_v*delta_t
actual_loc = pre_update_position
return [actual_v, meas_v, orientation, actual_loc]
def generate_measurement_data(self, observer_loc, observee_loc, observer_orientation):
delta_x = observer_loc[0] - observee_loc[0]
delta_y = observer_loc[1] - observee_loc[1]
dis = sqrt(delta_y*delta_y + delta_x*delta_x) + random.normal(0, sqrt(var_dis))
bearing = atan2(delta_y, delta_x) + random.normal(0, sqrt(var_angle)) - observer_orientation
return [dis, bearing]
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment