""" Nonlinear model-predictive control (MPC) of a simulated air heater https://techteach.no/lab/air_heater/ Object-oriented programming (OOP) is used. Process model: dT_dt = (Kh*u(t-td) + (T_amb - T) + d)/tc where: - T [C ]is outlet air temperature - T_amb [C] is ambient (room) temp - u [V] is control signal to the heater - d [K] is a disturbance Features: - Constraint on state (T) - Constraint on control signal (u) - Constraint on rate of change of control signal (du_dt) - Control signal blocking (or grouping) (u) - An Extended Kalman Filter which estimates an input disturbance. Stop time is set with t_stop. Default: 300 s. Toggle between continuous (online) plot and batch (offline) plot with variable cont_plot: cont_plot = 1 = continuous (online) plot. cont_plot = 0 = batch (offline) plot. If continuous plot, execute the so-called magic command %matplotlib auto executed on the IDE command line (the console in Spyder) will show plots in an external window. Author: Finn Aakre Haugen, USN finn.haugen@usn.no Updated 2024 12 19. """ #%% Imports import matplotlib.pyplot as plt import numpy as np from scipy.optimize import minimize import time #%% Air_heater class class Air_heater: def __init__(self, Kh, tc, td, u_delayed_init, T_init, T_min, T_max, ts): self.u_delayed_init = u_delayed_init self.T = T_init N_delay = int(round(td/ts)) + 1 self.u_delay_array = np.zeros(N_delay) + self.u_delayed_init def sim_temp(self, u, T_amb, d): self.T = np.clip(self.T, T_min, T_max) T_return = self.T u_delayed = self.u_delay_array[-1] # Taking out delayed value self.u_delay_array[1:] = self.u_delay_array[0:-1] # Shifting array elements self.u_delay_array[0] = u # Inserting non-delayed value dT_dt = (Kh*u_delayed + (T_amb - self.T) + d)/tc T_next = self.T + ts * dT_dt self.T = T_next return T_return # %% Kalman filter class Kalman_filter: # Class constructor: def __init__(self, n_states, T_pred, d_pred, P_pred, G, Q, R, Kh, tc, td, u_delayed_init, T_init, ts): self.T_pred = T_pred self.d_pred = d_pred self.P_pred = P_pred self.T = T_init N_delay = int(round(td/ts)) + 1 self.u_delay_array = np.zeros(N_delay) + u_delayed_init # Class method to estimate states: def state_estim(self, T_meas, u, T_amb): # The A matrix in the linearized model: A_cont = np.array([[-1/tc, 1/tc], [0, 0]]) A_disc = np.eye(n_states) + ts*A_cont A = A_disc # The C matrix in the linearized model: C = np.array([[1, 0]]) # Kalman gain: K = (self.P_pred@C.T) @ (np.linalg.inv(C@self.P_pred@(C.T)+R)) # Innovation process: e_innov = T_meas - self.T_pred # Meas-corrected estimates, used as applied estim: self.T_corr = self.T_pred + K[0,0] * e_innov self.d_corr = self.d_pred + K[1,0] * e_innov # Model-predicted estimates: u_delayed = self.u_delay_array[-1] # Taking out delayed value self.u_delay_array[1:] = self.u_delay_array[0:-1] # Shifting array elements self.u_delay_array[0] = u # Inserting non-delayed value dT_corr_dt = (Kh*u_delayed + (T_amb - self.T_corr) + self.d_corr)/tc dd_corr_dt = 0 self.T_pred = self.T_corr + ts * dT_corr_dt self.d_pred = self.d_corr + ts * dd_corr_dt # Auto-covariance of error of meas-corrected estimate: self.P_corr = (np.eye(n_states) - K @ C) @ self.P_pred # Auto-covariance of error of model-predicted estimate: self.P_pred = A @ self.P_corr @ (A.T) + G @ Q @ G.T return (self.T_corr, self.d_corr) #%% Definition of functions def fun_mpc_objective_air_heater(u): u_array = np.array([]) for ku in range(0, N_blocks_u): u_array = np.append(u_array, np.zeros(N_samples_in_blocks)*0 + u[ku]) u_delay_init = u[0] T_init = T_est Air_heater_fun_obj = Air_heater(Kh, tc, td, u_delay_init, T_init, T_min, T_max, ts) # Simulation loop: u_km1 = u[0] J = 0 for k in range(0, N_pred): u = u_array[k] r = r_to_mpc_array[k] T = Air_heater_fun_obj.sim_temp(u, T_amb, d_est) # Updating objective function: e = r - T du_dt = (u - u_km1)/ts J += (C_e*e**2 + C_du*du_dt**2) * ts # Time shift: u_km1 = u return J def fun_mpc_constraints_air_heater(u): # The return values are required >= 0 (geq 0) u_array = np.array([]) for ku in range(0, N_blocks_u): u_array = np.append(u_array, np.zeros(N_samples_in_blocks)*0 + u[ku]) u_delay_init = u[0] T_init = T_est Air_heater_fun_constraints = Air_heater(Kh, tc, td, u_delay_init, T_init, T_min, T_max, ts) # Simulation loop: for k in range(0, N_pred): u = u_array[k] T = Air_heater_fun_constraints.sim_temp(u, T_amb, d_est) # Rate of change of u: du_dt = (u - u_opt_km1)/ts # Storing in array T_fun_constraint_array[k] = T du_dt_fun_constraint_array[k] = du_dt geq_zero_equiv_to_T_upperbound_constr = \ (T_upperbound - np.max(T_fun_constraint_array)) geq_zero_equiv_to_T_lowerbound_constr = \ (np.min(T_fun_constraint_array) - T_lowerbound) geq_zero_equiv_to_du_dt_upperbound_constr = \ (du_dt_upperbound - np.max(du_dt_fun_constraint_array)) geq_zero_equiv_to_du_dt_lowerbound_constr = \ (np.min(du_dt_fun_constraint_array) - du_dt_lowerbound) return np.array([geq_zero_equiv_to_T_upperbound_constr, geq_zero_equiv_to_T_lowerbound_constr, geq_zero_equiv_to_du_dt_upperbound_constr, geq_zero_equiv_to_du_dt_lowerbound_constr]) #%% Process params Kh = 3.5 # [deg C/V] tc = 23.0 # [s] td = 3.0 # [s] T_amb = 20.0 # [deg C] T_min = 0.0 # [deg C] T_max = 100.0 # [deg C] #%% Measurement noise setting An = 0.1 # [K] Amplitude of uniformly distributed measurement noise #%% Time settings ts = 0.5 # Time-step [s] t_pred_horizon = 8.0 # [s] N_pred = int(t_pred_horizon/ts) t_start = 0.0 # [s] t_stop = 400.0 # [s] N_sim = int((t_stop-t_start)/ts) + 1 #%% MPC costs C_e = 1.0 C_du = 20.0 # [(L/s)/min] #%% MPC control blocking (or grouping) # Number of blocks of control signal: N_blocks_u = 3 # Number of samples in each control block: N_samples_in_blocks = int(np.ceil(N_pred/N_blocks_u)) #%% Defining sequence of temperature reference r_const = 30.0 # [oC] step_r = 3.0 # [K] slope_r = -0.04 # [K/s] ampl_sine_r = 1.0 # [K] tp_sine_r = 50.0 # [s] t_const_start = t_start t_const_stop = 100 t_step_start = t_const_stop t_step_stop = 150 t_ramp_start = t_step_stop t_ramp_stop = 200 t_sine_start = t_ramp_stop t_sine_stop = 250 t_const2_start = t_sine_stop t_const2_stop = t_stop t = np.linspace(t_start, t_stop, N_sim) r_array = np.linspace(t_start, t_stop, N_sim)*0 for k in range(0, N_sim): if t[k] >= t_const_start and t[k] < t_const_stop: r_array[k] = r_const if t[k] >= t_step_start and t[k] < t_step_stop: r_array[k] = r_const + step_r if t[k] >= t_ramp_start and t[k] < t_ramp_stop: r_array[k] = (r_const + step_r + slope_r*(t[k]-t_ramp_start)) if t[k] >= t_sine_start and t[k] < t_sine_stop: r_array[k] = (r_const + 1 + ampl_sine_r*np.sin(2*np.pi*(1/tp_sine_r) *(t[k]-t_sine_start))) if t[k] >= t_const2_start: r_array[k] = r_const + 1 #%% Change of disturbance t_disturbance = 300 # [s] #%% Initialization of time delay u_init = 0.0 # [V] N_delay = int(round(td/ts)) + 1 delay_array = np.zeros(N_delay) + u_init #%% Initial state of process T = T_init = 27.0 # [C] #%% Initialization of Kalman Filter # Initial state: T_pred = 27.0 # [C] d_pred = 0.0 # [K] # Initial covariance of pred estim error: P_pred = np.diag([1.0e-3, 1.0e-3]) #%% Tuning of Kalman Filter n_states = 2 # Number of states # Disturbance matrix: G = np.eye(n_states) stdev_w_T = 0.02 cov_w_T = stdev_w_T**2 stdev_w_d = 0.1 cov_w_d = stdev_w_d**2 Q = np.diag([1*cov_w_T, 1*cov_w_d]) stdev_v_T = 0.1 cov_v_T = stdev_v_T**2 R = np.diag([1*cov_v_T]) #%% Initial value of optimal control signal u_opt_km1 = u_opt = u_init #%% Initial value of time delayed control signal u_delayed = u_delayed_init = 2 #%% Defining arrays for plotting t_plot_array = np.linspace(t_start, t_stop, N_sim-N_pred) r_plot_array = np.zeros(N_sim-N_pred) + r_const T_plot_array = np.zeros(N_sim-N_pred) + r_const T_est_plot_array = np.zeros(N_sim-N_pred) + r_const T_amb_plot_array = np.zeros(N_sim-N_pred) + T_amb u_plot_array = np.zeros(N_sim-N_pred) du_opt_dt_plot_array = np.zeros(N_sim-N_pred) d_est_plot_array = np.zeros(N_sim-N_pred) d_plot_array = np.zeros(N_sim-N_pred) #%% Settings of optimizer # Initial guessed optimal control sequence: u_guess = np.zeros(N_blocks_u) + u_init # Lower and upper limits of optim variable for use in SLSQP: u_upperbound = 5 u_lowerbound = 0 bounds_u = np.zeros([N_blocks_u, 2]) bounds_u[:, 0] = u_lowerbound bounds_u[:, 1] = u_upperbound # Constraints: T_upperbound = 32.0 # [C] T_lowerbound = 28.0 # [C] T_fun_constraint_array = np.zeros(N_pred) du_dt_upperbound = 0.25 #1 # 0.25 # [V/s] du_dt_lowerbound = -0.25 #-1 # -0.25 # [V/s] du_dt_fun_constraint_array = np.zeros(N_pred) #%% Selecting between continuous plot and batch plotting # 0 = off = batch (offline) plot. # 1 = on = continuous (online) plot. May use external # figure window by executing %matplotlib auto in console. cont_plot = 0 real_time_step = 0.01 #%% Preparing for plotting plt.close("all") fig1 = plt.figure(num=1, figsize=(12, 9)) legend_shadow = True location = 'upper right' font_size = 8 handle_length = 1 # %% Class instantiation: Air_heater_sim = Air_heater(Kh, tc, td, u_delayed_init, T_init, T_min, T_max, ts) my_Kalman_filter = Kalman_filter(n_states, T_pred, d_pred, P_pred, G, Q, R, Kh, tc, td, u_delayed_init, T_init, ts) #%% For-loop for MPC of simulated process incl Kalman Filter tic = time.time() # Starting timer of simulation execution for k in range(0, (N_sim-N_pred)): t_k = t[k] # Storage for plotting t_plot_array[k] = t_k print('t = ', t_k) # Meas noise: num_samples = 1 meas_noise = np.random.uniform(-An, An, num_samples)[0] T_meas = T + meas_noise # Storage for plotting T_plot_array[k] = T_meas # Kalman filter estimates: (T_est, d_est) = my_Kalman_filter.state_estim(T_meas, u_opt, T_amb) # Storage for plotting: T_est_plot_array[k] = T_est d_est_plot_array[k] = d_est # Future reference of MPC: r_to_mpc_array = r_array[k:k+N_pred] r_plot_array[k] = r_array[k] # Calculating optimal control sequence: ineq_cons = {'type':'ineq', 'fun':fun_mpc_constraints_air_heater} # >= 0 constr res = minimize(fun_mpc_objective_air_heater, u_guess, method='SLSQP', constraints=[ineq_cons], options={'ftol': 1e-9, 'disp': False}, bounds=bounds_u) toc = time.time() u_opt = res.x # Optim solution used as guessed sol. in next iteration: u_guess = u_opt # Optimal control signal (sample) to be applied: u_opt = u_opt[0] u_plot_array[k] = u_opt # Storage for plotting # Rate of change of u: du_opt_dt = (u_opt - u_opt_km1)/ts du_opt_dt_plot_array[k] = du_opt_dt # Time index shift: u_opt_km1 = u_opt # Applying optimal control signal to simulated process: # Process disturbance: if t[k] <= t_disturbance: d = -1 else: d = -10 d_plot_array[k] = d T = Air_heater_sim.sim_temp(u_opt, T_amb, d) #%% Continuous plotting if cont_plot == 1: plt.subplot(2, 2, 1) plt.grid(which='both', color='grey') plt.ylim(25, 35) plt.xlim(t_start, t_stop) plt.xlabel('t [s]') plt.ylabel('[deg C]') plt.legend(('r', 'T_meas', 'T_amb', 'T_bounds'), loc=location) plt.subplot(2, 2, 2) plt.grid(which='both', color='grey') plt.ylim(-1, 6) plt.xlim(t_start, t_stop) plt.xlabel('t [s]') plt.ylabel('[V]') plt.legend(('u', 'u bounds'), loc=location) plt.subplot(2, 2, 3) plt.grid(which='both', color='grey') plt.ylim(-12, 1) plt.xlim(t_start, t_stop) plt.xlabel('t [s]') plt.ylabel('[K]') plt.legend(('d_est', 'd'), loc=location) plt.subplot(2, 2, 4) plt.grid(which='both', color='grey') plt.ylim(-0.5, 0.5) plt.xlim(t_start, t_stop) plt.xlabel('t [s]') plt.ylabel('[K/s]') plt.legend(('du_dt', 'du_dt_bounds'), loc=location) if k > 0 and k < N_sim: plt.subplot(2, 2, 1) plt.plot([t_plot_array[k],t_plot_array[k-1]], [r_plot_array[k],r_plot_array[k-1]], color='red') plt.plot([t_plot_array[k],t_plot_array[k-1]], [T_plot_array[k],T_plot_array[k-1]], color='blue') plt.plot([t_plot_array[k],t_plot_array[k-1]], [T_amb_plot_array[k],T_amb_plot_array[k-1]], color='cyan') plt.plot([t_plot_array[k],t_plot_array[k-1]], [T_upperbound,T_upperbound], color='magenta') plt.plot([t_plot_array[k],t_plot_array[k-1]], [T_lowerbound,T_lowerbound], color='magenta') plt.subplot(2, 2, 2) plt.plot([t_plot_array[k],t_plot_array[k-1]], [u_plot_array[k],u_plot_array[k-1]], color='blue') plt.plot([t_plot_array[k],t_plot_array[k-1]], [u_upperbound,u_upperbound], color='magenta') plt.plot([t_plot_array[k],t_plot_array[k-1]], [u_lowerbound,u_lowerbound], color='magenta') plt.subplot(2, 2, 3) plt.plot([t_plot_array[k],t_plot_array[k-1]], [d_est_plot_array[k],d_est_plot_array[k-1]], color='blue') plt.plot([t_plot_array[k],t_plot_array[k-1]], [d_plot_array[k],d_plot_array[k-1]], color='green') plt.subplot(2, 2, 4) plt.plot([t_plot_array[k],t_plot_array[k-1]], [du_opt_dt_plot_array[k],du_opt_dt_plot_array[k-1]], color='blue') plt.plot([t_plot_array[k],t_plot_array[k-1]], [du_dt_upperbound,du_dt_upperbound], color='magenta') plt.plot([t_plot_array[k],t_plot_array[k-1]], [du_dt_lowerbound,du_dt_lowerbound], color='magenta') fig1.canvas.draw() fig1.canvas.flush_events() time.sleep(real_time_step) #%% Simulation execution time: toc = time.time() # Stopping timer for simulation execution t_elapsed = toc - tic print('Elapsed time total [s] = {:.2e}'.format(t_elapsed)) print('Elapsed time per time step [s] = {:.2e}'.format(t_elapsed/N_sim)) print('------------------------------------------------') #%% Batch plotting: if cont_plot == 0: plt.subplot(2, 2, 1) plt.grid(which='both', color='grey') plt.ylim(25, 35) plt.xlim(t_start, t_stop) plt.xlabel('t [s]') plt.ylabel('[deg C]') plt.plot(t_plot_array, r_plot_array, color='red', label='r') plt.plot(t_plot_array, T_plot_array, color='blue', label='T_meas') plt.plot(t_plot_array, T_amb_plot_array, 'c', label='T_amb') plt.plot(t_plot_array, T_plot_array*0 + T_upperbound, color='magenta', label='T_bounds') plt.plot(t_plot_array, T_plot_array*0 + T_lowerbound, color='magenta') plt.legend(loc=location) plt.subplot(2, 2, 2) plt.grid(which='both', color='grey') plt.ylim(-1, 6) plt.xlim(t_start, t_stop) plt.xlabel('t [s]') plt.ylabel('[V]') plt.plot(t_plot_array, u_plot_array, color='blue', label='u') plt.plot(t_plot_array, u_plot_array*0 + u_upperbound, color='magenta', label='u_bounds') plt.plot(t_plot_array, u_plot_array*0 + u_lowerbound, color='magenta') plt.legend(loc=location) plt.subplot(2, 2, 3) plt.grid(which='both', color='grey') plt.ylim(-12, 1) plt.xlim(t_start, t_stop) plt.xlabel('t [s]') plt.ylabel('[K]') plt.plot(t_plot_array, d_est_plot_array, color='blue',label='d_est') plt.plot(t_plot_array, d_plot_array, color='green') plt.legend(loc=location) plt.subplot(2, 2, 4) plt.grid(which='both', color='grey') plt.ylim(-0.5, 0.5) plt.xlim(t_start, t_stop) plt.xlabel('t [s]') plt.ylabel('[K/s]') plt.plot(t_plot_array, du_opt_dt_plot_array, color='blue', label='du_dt') plt.plot(t_plot_array, du_opt_dt_plot_array*0 + du_dt_upperbound, color='magenta', label='du_dt_bounds') plt.plot(t_plot_array, du_opt_dt_plot_array*0 + du_dt_lowerbound, color='magenta') plt.legend(loc=location) # plt.savefig('mpc_air_heater_no_noise.pdf') # plt.savefig('mpc_air_heater.pdf') plt.show()