Skip to content
Snippets Groups Projects
Commit 29c6b781 authored by Tilboon Elberier's avatar Tilboon Elberier
Browse files

multivariable estimator preliminary implementation and tests

parent 18d6313b
Branches
No related merge requests found
......@@ -10,9 +10,9 @@ from System.system_simulator import SystemSimulator
from .estimator_update import EstimatorUpdate
class Estimator:
def __init__(self, λ, k, b, dt, H, Q, R, x0, noisy):
def __init__(self, λ, dt, H, Q, R, x0, noisy):
# Dynamic System Simulator initialization for state estimator
self.SystemSimulator = SystemSimulator(λ, k, b, dt, H, Q, R, x0, noisy)
self.SystemSimulator = SystemSimulator(λ, dt, H, Q, R, x0, noisy)
# state Estimator update initialization
self.EstimatorUpdate = EstimatorUpdate(self.SystemSimulator.model, eye(self.SystemSimulator.model.Φ.shape[1]), x0)
......
......@@ -10,9 +10,9 @@ from .PDV.pdv import PDV
from System.system_simulator import SystemSimulator
class EstimatorLikelihood:
def __init__(self, λ, k, b, dt, H, Q, R, x0, noisy):
def __init__(self, λ, dt, H, Q, R, x0, noisy):
# State estimator initialization
self.Estimator = Estimator(λ, k, b, dt, H, Q, R, x0, noisy)
self.Estimator = Estimator(λ, dt, H, Q, R, x0, noisy)
# Compute scalar likelihood initialization
self.PDV = PDV()
......
......@@ -6,7 +6,7 @@ from System.system_simulator import SystemSimulator
from ..Estimator_Likelihood.estimator_likelihood import EstimatorLikelihood
class ConditionalProbabilityUpdate:
def __init__(self, λs, threshold=1e-4):
def __init__(self, λs, threshold=0):
# Estimator likelihood simulators initialization
self.λs = λs
......
......@@ -7,22 +7,28 @@ from ..Estimator_Likelihood.estimator_likelihood import EstimatorLikelihood
from .conditional_probability_update import ConditionalProbabilityUpdate
class WeightedEstimate:
def __init__(self, λs):
self.λs = λs
def __init__(self, λs: ndarray):
"""
:param λs: An array where each element is a vector [m, k, b] representing model parameters.
"""
self.λs = λs # List of parameter vectors for each model (each λ = [m, k, b])
def update(self, cumulative_posteriors: ndarray) -> float:
def update(self, cumulative_posteriors: ndarray) -> ndarray:
"""
Calculate the most likely model and the weighted mass estimate.
Calculate the most likely model and the weighted estimates for each parameter (m, k, b).
:param cumulative_posteriors: A 1D array of posterior probabilities for each model.
:return: A vector of weighted parameter estimates [weighted_m, weighted_k, weighted_b].
"""
# Determine the most likely model after updating probabilities
most_likely_model_index = np.argmax(cumulative_posteriors)
most_likely_parameter = self.λs[most_likely_model_index]
# Calculate the weighted parameter estimate
weighted_mass_estimate = np.sum([p * λ for p, λ in zip(cumulative_posteriors, self.λs)])
# Initialize a vector for the weighted estimate [m, k, b]
weighted_estimates = [0.0, 0.0, 0.0]
# Calculate the weighted estimates for each parameter in the vector [m, k, b]
for i in range(len(weighted_estimates)):
weighted_estimates[i] = np.sum([p * λ[i] for p, λ in zip(cumulative_posteriors, self.λs)])
return weighted_mass_estimate
return weighted_estimates
########### Testbench ###########
......
......@@ -7,9 +7,9 @@ from MMAE.Joint_Probability.joint_probability import JointProbability
from System.system_simulator import SystemSimulator
class MMAE:
def __init__(self, λs, k, b, dt, H, Q, R, x0, noisy):
def __init__(self, λs, dt, H, Q, R, x0, noisy):
# Estimator likelihood simulator initialization
self.EstimatorLikelihoods = [EstimatorLikelihood(λ, k, b, dt, H, Q, R, x0, noisy) for λ in λs]
self.EstimatorLikelihoods = [EstimatorLikelihood(λ, dt, H, Q, R, x0, noisy) for λ in λs]
# Joint probability simulator initialization
self.JointProbability = JointProbability(λs)
......
......@@ -25,7 +25,30 @@ class SimpleHarmonicOscillator(System):
Q=Q,
R=R
)
# Subclass SHM (Unkonwn M, unknown B, etc)
class MultivariableSimpleHarmonicOscillator(System):
def __init__(self, λ: ndarray, dt: float, H: ndarray, Q: ndarray, R: ndarray):
self.λ = array(λ)
self.λ = λ # Parameter vector
m = λ[0] # Mass
k = λ[1] # Spring constant
b = λ[2] # Damping coefficient
self.dt = dt # Time step
super().__init__(
# State transition matrix (A)
Φ = array([[1, dt],
[- (k * dt) / m, 1 - ((b / m) * dt)]]),
# Input transition matrix (B)
B = array([[0],
[dt/m]]),
H=H,
Q=Q,
R=R
)
def ensure_positive_semidefinite(matrix: ndarray) -> ndarray:
......
......@@ -3,13 +3,13 @@ from numpy import ndarray
import matplotlib.pyplot as plt
import os
from .Model.models import SimpleHarmonicOscillator
from .Model.models import MultivariableSimpleHarmonicOscillator
from .plant import Plant
class SystemSimulator:
def __init__(self, λ, k, b, dt, H, Q, R, x0, noisy):
def __init__(self, λ, dt, H, Q, R, x0, noisy):
# Model initialization
self.model = SimpleHarmonicOscillator(λ, k, b, dt, H, Q, R)
self.model = MultivariableSimpleHarmonicOscillator(λ, dt, H, Q, R)
# Plant initialization
self.plant = Plant(self.model, x0, noisy)
......
......@@ -3,21 +3,31 @@
"screen_height": 800,
"background_color": [255, 255, 255],
"mass_color": [255, 0, 0],
"max_time": 1000.0,
"max_time": 500.0,
"dt": 0.1,
"H": [[1, 0]],
"Q": 0.001,
"R": 0.000195,
"amplitude": 5000,
"k": 17000.0,
"b": 1900.0,
"initial_state": [[0], [0]],
"measurements_output": "output/measurements.txt",
"input_signal_output": "output/input_signal.txt",
"true_mass": 2500,
"model_variants_start": 2300,
"model_variants_end": 2900,
"model_variants_step": 300,
"true_m": 2500.0,
"m_variants_start": 2400.0,
"m_variants_end": 2600.0,
"m_variants_step": 100.0,
"true_k": 16000.0,
"k_variants_start": 15000.0,
"k_variants_end": 17000.0,
"k_variants_step": 1000.0,
"true_b": 1800.0,
"b_variants_start": 1700.0,
"b_variants_end": 1900.0,
"b_variants_step": 100.0,
"weighted_estimation": true,
"plot": false,
"model": "SimpleHarmonicOscillator"
......
......@@ -7,15 +7,15 @@ from testbench_tools import simulation_configuration_setup
from testbench_tools import mmae_simulator_simulation_and_plot
class MMAESimulator:
def __init__(self, λ, λs, k, b, dt, H, Q, R, x0, true_system_noisy, estimator_noisy, max_time, max_steps, amplitude):
def __init__(self, λs, true_λ, dt, H, Q, R, x0, true_system_noisy, estimator_noisy, max_time, max_steps, amplitude):
# Synthetic system simulator initialization
self.TrueSystem = SystemSimulator(λ, k, b, dt, H, Q, R, x0, true_system_noisy)
self.TrueSystem = SystemSimulator(true_λ, dt, H, Q, R, x0, true_system_noisy)
# Input initialization
self.input_signal = Input(self.TrueSystem.model, max_time).step_function(max_steps, amplitude)
# MMAE initialization
self.MMAE = MMAE(λs, k, b, dt, H, Q, R, x0, estimator_noisy)
self.MMAE = MMAE(λs, dt, H, Q, R, x0, estimator_noisy)
def update(self, t: int) -> float:
......@@ -28,9 +28,11 @@ class MMAESimulator:
if __name__ == "__main__":
# Load configuration from JSON file
λs, λ, k, b, dt, H, Q, R, x0, max_time, max_steps, amplitude = simulation_configuration_setup()
λs, m, k, b, dt, H, Q, R, x0, max_time, max_steps, amplitude = simulation_configuration_setup()
true_λ = [m, k, b]
# MMAE simulator initialization
MMAESimulator = MMAESimulator(λ, λs, k, b, dt, H, Q, R, x0, True, False, max_time, max_steps, amplitude)
MMAESimulator = MMAESimulator(λs, true_λ, dt, H, Q, R, x0, True, False, max_time, max_steps, amplitude)
mmae_simulator_simulation_and_plot(MMAESimulator, λ, λs, max_steps, dt)
mmae_simulator_simulation_and_plot(MMAESimulator, λs, true_λ, max_steps, dt)
import numpy as np
import json
from itertools import product
import matplotlib.pyplot as plt
# Load configuration from JSON file
......@@ -11,16 +12,29 @@ def simulation_configuration_setup():
config_path = "config.json"
config = load_config(config_path)
start = config["model_variants_start"]
end = config["model_variants_end"]
step = config["model_variants_step"]
m_start = config["m_variants_start"]
m_end = config["m_variants_end"]
m_step = config["m_variants_step"]
k_start = config["k_variants_start"]
k_end = config["k_variants_end"]
k_step = config["k_variants_step"]
b_start = config["b_variants_start"]
b_end = config["b_variants_end"]
b_step = config["b_variants_step"]
# Generate model variants
λs = np.arange(start, end + step, step).tolist()
ms = np.arange(m_start, m_end + m_step, m_step).tolist()
ks = np.arange(k_start, k_end + k_step, k_step).tolist()
bs = np.arange(b_start, b_end + b_step, b_step).tolist()
λ = config['true_mass']
k = config['k']
b = config['b']
# Generate all possible combinations of m, k, and b
λs = [np.array(λ) for λ in product(ms, ks, bs)]
m = config['true_m']
k = config['true_k']
b = config['true_b']
dt = config["dt"]
H = np.array(config["H"])
Q = np.eye(H.shape[1]) * config["Q"]
......@@ -30,25 +44,49 @@ def simulation_configuration_setup():
max_steps = int(config['max_time'] / dt)
amplitude = config['amplitude']
return λs, λ, k, b, dt, H, Q, R, x0, max_time, max_steps, amplitude
def plot_λ_hat(times, lambda_hats, λ):
# Plot λ_hat vs time
plt.figure()
plt.plot(times, lambda_hats, label='λ_hat', color='blue')
# Plot true mass as a red dotted line and label it "True mass"
plt.axhline(y=λ, color='red', linestyle='--', label='True mass')
# Add labels and title
plt.xlabel('Time (s)')
plt.ylabel('λ_hat')
plt.title('Parameter Estimate (λ_hat) vs Time')
plt.legend()
return λs, m, k, b, dt, H, Q, R, x0, max_time, max_steps, amplitude
def plot_λ_hat(times, lambda_hats, true_λ):
# Convert the list of lambda_hats (which are vectors) to a numpy array for easy indexing
lambda_hats = np.array(lambda_hats)
fig, ax1 = plt.subplots()
# Plot estimated mass (m) on the primary y-axis
ax1.set_xlabel('Time (s)')
ax1.set_ylabel('Mass (m)', color='blue')
ax1.plot(times, lambda_hats[:, 0], label='Estimated mass (m)', color='blue', linestyle='-')
ax1.axhline(y=true_λ[0], color='blue', linestyle='--', label='True mass (m)')
ax1.tick_params(axis='y', labelcolor='blue')
# Create a secondary y-axis for spring constant (k)
ax2 = ax1.twinx()
ax2.set_ylabel('Spring constant (k)', color='green')
ax2.plot(times, lambda_hats[:, 1], label='Estimated spring constant (k)', color='green', linestyle='-')
ax2.axhline(y=true_λ[1], color='green', linestyle='--', label='True spring constant (k)')
ax2.tick_params(axis='y', labelcolor='green')
# Create another secondary y-axis for damping coefficient (b) (offset from the right)
ax3 = ax1.twinx()
ax3.spines['right'].set_position(('outward', 60)) # Offset third axis to the right
ax3.set_ylabel('Damping coefficient (b)', color='red')
ax3.plot(times, lambda_hats[:, 2], label='Estimated damping coefficient (b)', color='red', linestyle='-')
ax3.axhline(y=true_λ[2], color='red', linestyle='--', label='True damping coefficient (b)')
ax3.tick_params(axis='y', labelcolor='red')
# Add legends for each axis
ax1.legend(loc='upper left')
ax2.legend(loc='upper right')
ax3.legend(loc='lower right')
plt.title('Parameter Estimates (m, k, b) vs Time')
fig.tight_layout() # To avoid overlap of labels
plt.grid(True)
plt.show()
def plot_heatmap(models_summary, times, λs, title):
# Convert to string for label on plot
λs = [f"m: {round(λ[0], 2)}, k: {round(λ[1], 2)}, b: {round(λ[2], 2)}" for λ in λs]
plt.figure(figsize=(10, 6))
plt.imshow(models_summary.T, aspect='auto', cmap='hot', origin='lower',
extent=[times[0], times[-1], λs[0], λs[-1]])
......@@ -59,7 +97,7 @@ def plot_heatmap(models_summary, times, λs, title):
plt.title(title)
plt.show()
def mmae_simulator_simulation_and_plot(simulator, λ, λs, max_steps, dt):
def mmae_simulator_simulation_and_plot(simulator, λs, true_λ, max_steps, dt):
# Lists to store time and λ_hat values
times = []
lambda_hats = []
......@@ -75,6 +113,9 @@ def mmae_simulator_simulation_and_plot(simulator, λ, λs, max_steps, dt):
pdvs_summary.append(pdvs)
print(f"Step {step_counter}: λ_hat = {λ_hat}")
# Convert the list of lambda_hat values (vectors) to a numpy array
lambda_hats = np.array(lambda_hats)
# Convert the list of model probabilities to a numpy array
cumulative_posteriors_summary = np.array(cumulative_posteriors_summary)
......@@ -85,7 +126,7 @@ def mmae_simulator_simulation_and_plot(simulator, λ, λs, max_steps, dt):
# np.savetxt("cumulative_posteriors_summary.txt", cumulative_posteriors_summary)
# Plot the estimated mass over time
plot_λ_hat(times, lambda_hats, λ)
plot_λ_hat(times, lambda_hats, true_λ)
# Plot the heatmap for likelihoods (PDVs) over time
plot_heatmap(pdvs_summary, times, λs, title="Heatmap of Model Likelihood Over Time")
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment