Skip to content
Snippets Groups Projects
Commit 3c7379a2 authored by Tilboon Elberier's avatar Tilboon Elberier
Browse files

code cleanup and integrated scripts for simulated data and real data testing....

code cleanup and integrated scripts for simulated data and real data testing. Also fixed dt value update to be able to input different dt values at each time step in simulation (mainly for real-data test). Additionally fixed input simulator issue and updated config file.
parent 29c6b781
Branches
No related merge requests found
......@@ -21,9 +21,9 @@ class Estimator:
self.x̂_ = None
def update(self, u: ndarray, z: ndarray):
def update(self, u: ndarray, z: ndarray, dt: float):
# State estimate before the measurement update phase
_x̂, = self.SystemSimulator.update(u, self.x̂_)
_x̂, = self.SystemSimulator.update(u, dt, self.x̂_)
# State estimate after the measurement update phase
x̂, P, r, A = self.EstimatorUpdate.update(z, _x̂, )
......
......@@ -18,8 +18,8 @@ class EstimatorLikelihood:
self.PDV = PDV()
def update(self, u: ndarray, z: ndarray):
_, _, r, A = self.Estimator.update(u, z)
def update(self, u: ndarray, z: ndarray, dt: float):
_, _, r, A = self.Estimator.update(u, z, dt)
pdv = self.PDV.update(r, A)
return pdv
......
......@@ -15,8 +15,8 @@ class MMAE:
self.JointProbability = JointProbability(λs)
def update(self, u: ndarray, z: ndarray) -> float:
pdvs = [EstimatorLikelihood.update(u, z) for EstimatorLikelihood in self.EstimatorLikelihoods]
def update(self, u: ndarray, z: ndarray, dt: float) -> float:
pdvs = [EstimatorLikelihood.update(u, z, dt) for EstimatorLikelihood in self.EstimatorLikelihoods]
λ_hat, cumulative_posteriors = self.JointProbability.update(pdvs)
return λ_hat, cumulative_posteriors, pdvs
......
......@@ -8,8 +8,8 @@ from .system import System
class SimpleHarmonicOscillator(System):
def __init__(self, λ: float, k: float, b: float, dt: float, H: ndarray, Q: ndarray, R: ndarray):
self.λ = λ # Mass
k = k # Spring constant
b = b # Damping coefficient
self.k = k # Spring constant
self.b = b # Damping coefficient
self.dt = dt # Time step
super().__init__(
......@@ -26,29 +26,52 @@ class SimpleHarmonicOscillator(System):
R=R
)
def update_dt(self, dt: float):
self.dt = dt
self.update_matrices()
def update_matrices(self):
# Recalculate the state transition matrix Φ and the input transition matrix B
self.Φ = array([[1, self.dt],
[- (self.k * self.dt) / self.λ, 1 - ((self.b / self.λ) * self.dt)]])
self.B = array([[0],
[self.dt / self.λ]])
class MultivariableSimpleHarmonicOscillator(System):
def __init__(self, λ: ndarray, dt: float, H: ndarray, Q: ndarray, R: ndarray):
self.λ = array(λ)
self.λ = λ # Parameter vector
m = λ[0] # Mass
k = λ[1] # Spring constant
b = λ[2] # Damping coefficient
self.m = λ[0] # Mass
self.k = λ[1] # Spring constant
self.b = λ[2] # Damping coefficient
self.dt = dt # Time step
super().__init__(
# State transition matrix (A)
Φ = array([[1, dt],
[- (k * dt) / m, 1 - ((b / m) * dt)]]),
Φ = array([[1, self.dt],
[- (self.k * self.dt) / self.m, 1 - ((self.b / self.m) * self.dt)]]),
# Input transition matrix (B)
B = array([[0],
[dt/m]]),
[self.dt/self.m]]),
H=H,
Q=Q,
R=R
)
def update_dt(self, dt: float):
self.dt = dt
self.update_matrices()
def update_matrices(self):
# Recalculate the state transition matrix Φ and the input transition matrix B
self.Φ = array([[1, self.dt],
[- (self.k * self.dt) / self.m, 1 - ((self.b / self.m) * self.dt)]])
self.B = array([[0],
[self.dt/self.m]])
def ensure_positive_semidefinite(matrix: ndarray) -> ndarray:
......
......@@ -15,7 +15,8 @@ class SystemSimulator:
self.plant = Plant(self.model, x0, noisy)
def update(self, u: ndarray, x̂_: ndarray = None) -> ndarray:
def update(self, u: ndarray, dt: float, x̂_: ndarray = None) -> ndarray:
self.model.update_dt(dt)
return self.plant.update(u, x̂_)
......
{
"screen_width": 600,
"screen_height": 800,
"background_color": [255, 255, 255],
"mass_color": [255, 0, 0],
"max_time": 500.0,
"max_time": 100.0,
"dt": 0.1,
"H": [[1, 0]],
"Q": 0.001,
"R": 0.000195,
"Q_mmae": 1e-18,
"R_mmae": 1e-18,
"Q_true_system": 1e-18,
"R_true_system": 1e-18,
"amplitude": 5000,
"initial_state": [[0], [0]],
"measurements_output": "output/measurements.txt",
......@@ -28,7 +29,7 @@
"b_variants_end": 1900.0,
"b_variants_step": 100.0,
"weighted_estimation": true,
"plot": false,
"model": "SimpleHarmonicOscillator"
"model": "SimpleHarmonicOscillator",
"random_seed": 42,
"true_system_noisy": true
}
......@@ -20,7 +20,7 @@ class Input:
self.T = T
self.u0 = zeros((self.s.B.shape[-1], 1)) if u0 is None else u0
def step_function(self, steps, change=100, amplitude=50):
def step_function(self, steps, amplitude, change=100):
"""
Generate a step function input signal.
......
......@@ -4,9 +4,9 @@ from inputs import Input
from MMAE.mmae import MMAE
from System.system_simulator import SystemSimulator
from testbench_tools import simulation_configuration_setup
from testbench_tools import mmae_simulator_simulation_and_plot
from testbench_tools import mmae_simulator_plots
class MMAESimulator:
class MMAESimulatorRealData:
def __init__(self, λs, true_λ, dt, H, Q, R, x0, true_system_noisy, estimator_noisy, max_time, max_steps, amplitude):
# Synthetic system simulator initialization
self.TrueSystem = SystemSimulator(true_λ, dt, H, Q, R, x0, true_system_noisy)
......@@ -16,15 +16,30 @@ class MMAESimulator:
# MMAE initialization
self.MMAE = MMAE(λs, dt, H, Q, R, x0, estimator_noisy)
def update(self, t: int) -> float:
def update(self, t: int, z: np.ndarray, dt: float) -> float:
u = self.input_signal[t, :].reshape(-1, 1)
_, z = self.TrueSystem.update(u)
λ_hat, cumulative_posteriors, pdvs = self.MMAE.update(u, z)
λ_hat, cumulative_posteriors, pdvs = self.MMAE.update(u, z, dt)
return λ_hat, cumulative_posteriors, pdvs
def real_data(dataset):
data = np.loadtxt(dataset, delimiter=",", usecols=5, skiprows=1) # skiprows=1 if there is a header
dts = np.loadtxt(dataset, delimiter=",", usecols=2, skiprows=1) # skiprows=1 if there is a header
# Init Initial state
time_track = 0.0
times.append(time_track)
zs.append(np.array([[data[0]]], dtype='float64'))
for step_counter in range(1, max_time):
curr_dt = dts[step_counter]
time_track += curr_dt
times.append(time_track)
z = np.array([[data[step_counter]]], dtype='float64')
zs.append(z)
pass
if __name__ == "__main__":
# Load configuration from JSON file
......@@ -33,6 +48,35 @@ if __name__ == "__main__":
true_λ = [m, k, b]
# MMAE simulator initialization
MMAESimulator = MMAESimulator(λs, true_λ, dt, H, Q, R, x0, True, False, max_time, max_steps, amplitude)
MMAESimulator = MMAESimulatorRealData(λs, true_λ, dt, H, Q, R, x0, True, False, max_time, max_steps, amplitude)
### Run simulation ###
times = []
zs = []
lambda_hats = []
cumulative_posteriors_summary = []
pdvs_summary = []
data = np.loadtxt("./Data/Dataset_3.csv", delimiter=",", usecols=5, skiprows=1) # skiprows=1 if there is a header
dts = np.loadtxt("./Data/Dataset_3.csv", delimiter=",", usecols=2, skiprows=1) # skiprows=1 if there is a header
# Init Initial state
time_track = 0.0
times.append(time_track)
z = np.array([[data[0]]], dtype='float64')
zs.append(z)
MMAESimulator.update(0.0, z, dt)
# Main simulation loop
for step_counter in range(1, max_time):
dt = dts[step_counter]
time_track += dt
times.append(time_track)
z = np.array([[data[step_counter]]], dtype='float64')
zs.append(z)
λ_hat, cumulative_posteriors, pdvs = MMAESimulator.update(step_counter, z, dt)
lambda_hats.append(λ_hat)
cumulative_posteriors_summary.append(cumulative_posteriors)
pdvs_summary.append(pdvs)
mmae_simulator_simulation_and_plot(MMAESimulator, λs, true_λ, max_steps, dt)
mmae_simulator_plots(times, true_λ, λs, lambda_hats, cumulative_posteriors_summary, pdvs_summary)
import numpy as np
from inputs import Input
from MMAE.mmae import MMAE
from System.system_simulator import SystemSimulator
from testbench_tools import simulation_configuration_setup
from testbench_tools import mmae_simulator_plots
class MMAESimulatorSyntheticData:
def __init__(self, λs, true_λ, dt, H, Q_mmae, R_mmae, Q_true_system, R_true_system, x0, true_system_noisy, estimator_noisy, max_time, max_steps, amplitude):
# Synthetic system simulator initialization
self.TrueSystem = SystemSimulator(true_λ, dt, H, Q_true_system, R_true_system, x0, true_system_noisy)
# Input initialization
self.input_signal = Input(self.TrueSystem.model, max_time).step_function(max_steps, amplitude)
# MMAE initialization
self.MMAE = MMAE(λs, dt, H, Q_mmae, R_mmae, x0, estimator_noisy)
def update(self, t: int, dt: float) -> float:
u = self.input_signal[t, :].reshape(-1, 1)
_, z = self.TrueSystem.update(u, dt)
zs.append(z.flatten()) # Appends a 1D array instead of 2D
λ_hat, cumulative_posteriors, pdvs = self.MMAE.update(u, z, dt)
return λ_hat, cumulative_posteriors, pdvs
if __name__ == "__main__":
# Load configuration from JSON file
λs, m, k, b, dt, H, Q_mmae, R_mmae, Q_true_system, R_true_system, x0, max_time, max_steps, amplitude, random_seed, true_system_noisy = simulation_configuration_setup()
# Set random seed
np.random.seed(random_seed)
true_λ = [m, k, b]
# MMAE simulator initialization
MMAESimulator = MMAESimulatorSyntheticData(λs, true_λ, dt, H, Q_mmae, R_mmae, Q_true_system, R_true_system, x0, true_system_noisy, False, max_time, max_steps, amplitude)
# Lists to store time and λ_hat values
times = []
lambda_hats = []
zs = []
cumulative_posteriors_summary = []
pdvs_summary = []
# Main simulation loop
for step_counter in range(0, max_steps):
λ_hat, cumulative_posteriors, pdvs = MMAESimulator.update(step_counter, dt)
times.append(step_counter * dt)
lambda_hats.append(λ_hat)
cumulative_posteriors_summary.append(cumulative_posteriors)
pdvs_summary.append(pdvs)
mmae_simulator_plots(times, true_λ, λs, zs, lambda_hats, cumulative_posteriors_summary, pdvs_summary)
......@@ -2,6 +2,7 @@ import numpy as np
import json
from itertools import product
import matplotlib.pyplot as plt
import pandas as pd
# Load configuration from JSON file
def load_config(config_path):
......@@ -37,14 +38,60 @@ def simulation_configuration_setup():
b = config['true_b']
dt = config["dt"]
H = np.array(config["H"])
Q = np.eye(H.shape[1]) * config["Q"]
R = np.eye(H.shape[0]) * config["R"]
# Q and R for the MMAE estimator
Q_mmae = np.eye(H.shape[1]) * config["Q_mmae"]
R_mmae = np.eye(H.shape[0]) * config["R_mmae"]
# Q and R for the true system
Q_true_system = np.eye(H.shape[1]) * config["Q_true_system"]
R_true_system = np.eye(H.shape[0]) * config["R_true_system"]
x0 = np.array(config["initial_state"])
max_time = config['max_time']
max_steps = int(config['max_time'] / dt)
amplitude = config['amplitude']
random_seed = config['random_seed']
true_system_noisy = config['true_system_noisy']
return λs, m, k, b, dt, H, Q_mmae, R_mmae, Q_true_system, R_true_system, x0, max_time, max_steps, amplitude, random_seed, true_system_noisy
def return_csv(times, data, title):
df = pd.DataFrame(data)
df.insert(0, "Time", times) # Insert times as the first column
df.columns = ["Time"] + [f"Model_{i+1}" for i in range(len(df.columns) - 1)]
df.to_csv(title, index=False)
def plot_csv_data(csv_file: str):
"""
Reads a CSV file and plots each column as a separate line.
Assumes the first column is 'times'.
Parameters:
csv_file (str): Path to the CSV file.
"""
# Load the CSV file
data = pd.read_csv(csv_file)
# Assume the first column is 'times'
times = data.iloc[:, 0]
values = data.iloc[:, 1:] # All other columns are data to plot
# Plot each column as a separate line
plt.figure(figsize=(10, 6))
for col in values.columns:
plt.plot(times, values[col], label=col)
# Add labels, legend, and grid
plt.xlabel('Time')
plt.ylabel('Values')
plt.title('Measurements Over Time')
plt.legend(title='Columns')
plt.grid(True)
plt.tight_layout()
return λs, m, k, b, dt, H, Q, R, x0, max_time, max_steps, amplitude
# Show the plot
plt.show()
def plot_λ_hat(times, lambda_hats, true_λ):
# Convert the list of lambda_hats (which are vectors) to a numpy array for easy indexing
......@@ -97,33 +144,27 @@ def plot_heatmap(models_summary, times, λs, title):
plt.title(title)
plt.show()
def mmae_simulator_simulation_and_plot(simulator, λs, true_λ, max_steps, dt):
# Lists to store time and λ_hat values
times = []
lambda_hats = []
cumulative_posteriors_summary = []
pdvs_summary = []
# Main simulation loop
for step_counter in range(1, max_steps):
λ_hat, cumulative_posteriors, pdvs = simulator.update(step_counter)
times.append(step_counter * dt)
lambda_hats.append(λ_hat)
cumulative_posteriors_summary.append(cumulative_posteriors)
pdvs_summary.append(pdvs)
print(f"Step {step_counter}: λ_hat = {λ_hat}")
def mmae_simulator_plots(times, true_λ, λs, zs, lambda_hats, cumulative_posteriors_summary, pdvs_summary):
# Convert the list of lambda_hat values (vectors) to a numpy array
zs = np.array(zs)
# Convert the list of lambda_hat values (vectors) to a numpy array
lambda_hats = np.array(lambda_hats)
# Convert the list of model probabilities to a numpy array
cumulative_posteriors_summary = np.array(cumulative_posteriors_summary)
pdvs_summary = np.array(pdvs_summary)
# Convert the list of model probabilities to a numpy array
pdvs_summary = np.array(pdvs_summary)
cumulative_posteriors_summary = np.array(cumulative_posteriors_summary)
# Make csv ouputs
return_csv(times, zs, title="./output/measurements.csv")
return_csv(times, lambda_hats, title="./output/lambda_hats.csv")
return_csv(times, cumulative_posteriors_summary, title="./output/cumulative_posteriors_summary.csv")
return_csv(times, pdvs_summary, title="./output/pdvs_summary.csv")
# np.savetxt("pdvs_summary.txt", pdvs_summary)
# np.savetxt("cumulative_posteriors_summary.txt", cumulative_posteriors_summary)
# Pl;ot csv data
# plot_csv_data("./output/measurements.csv")
# Plot the estimated mass over time
plot_λ_hat(times, lambda_hats, true_λ)
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment