""" Nonlinear model-predictive control (MPC) of a simulated air heater https://techteach.no/lab/air_heater/ Process model: dT_dt = (Kh*u(t-td) + (T_amb - T) + d)/tc where: - T [C ]is outlet air temperature - T_amb [C] is ambient (room) temp - u [V] is control signal to the heater - d [K] is a disturbance Features: - Constraint on state (T) - Constraint on control signal (u) - Constraint on rate of change of control signal (du_dt) - Control signal blocking (or grouping) (u) - An Extended Kalman Filter which estimates an input disturbance. Stop time is set with t_stop. Default: 300 s. Toggle between continuous (online) plot and batch (offline) plot with variable cont_plot: cont_plot = 1 = continuous (online) plot. cont_plot = 0 = batch (offline) plot. If continuous plot, execute the so-called magic command %matplotlib auto executed on the IDE command line (the console in Spyder) will show plots in an external window. Author: Finn Aakre Haugen, USN finn.haugen@usn.no Updated 2024 04 28. """ #%% Imports import matplotlib.pyplot as plt import numpy as np from scipy.optimize import minimize import time #%% Definition of functions def fun_mpc_objective_air_heater(u): u_array = np.array([]) T_k = T_est_k d_k = d_est_k for ku in range(0, N_blocks_u): u_array = np.append(u_array, np.zeros(N_samples_in_blocks)*0 + u[ku]) u_km1 = u[0] J_km1 = 0 delay_array = np.zeros(N_delay) + u[0] # Simulation loop: for k in range(0, N_pred): u_k = u_array[k] r_k = r_to_mpc_array[k] # Time delay: u_delayed_k = delay_array[-1] delay_array[1:] = delay_array[0:-1] delay_array[0] = u_k # Solving diff eq: dT_dt_k = (Kh*u_delayed_k + (T_amb - T_k) + d_k)/tc T_kp1 = T_k + ts*dT_dt_k # Updating objective function: e_k = r_k - T_k du_dt_k = (u_k - u_km1)/ts J_k = J_km1 + ts*(C_e*e_k**2 + C_du*du_dt_k**2) # Time shift: T_k = T_kp1 u_km1 = u_k J_km1 = J_k return J_k def fun_mpc_constraints_air_heater(u): # The return values are required >= 0 (geq 0) u_array = np.array([]) T_k = T_est_k d_k = d_est_k for ku in range(0, N_blocks_u): u_array = np.append(u_array, np.zeros(N_samples_in_blocks)*0 + u[ku]) delay_array = np.zeros(N_delay) + u[0] # Simulation loop: for k in range(0, N_pred): u_k = u_array[k] # Time delay: u_delayed_k = delay_array[-1] delay_array[1:] = delay_array[0:-1] delay_array[0] = u_k # Solving diff eq: dT_dt_k = (Kh*u_delayed_k + (T_amb - T_k) + d_k)/tc T_kp1 = T_k + ts*dT_dt_k # Rate of change of u: du_dt_k = (u_k - u_opt_km1)/ts # Storing in array T_fun_constraint_array[k] = T_k du_dt_fun_constraint_array[k] = du_dt_k # Time shift: T_k = T_kp1 geq_zero_equiv_to_T_upperbound_constr = \ (T_upperbound - np.max(T_fun_constraint_array)) geq_zero_equiv_to_T_lowerbound_constr = \ (np.min(T_fun_constraint_array) - T_lowerbound) geq_zero_equiv_to_du_dt_upperbound_constr = \ (du_dt_upperbound - np.max(du_dt_fun_constraint_array)) geq_zero_equiv_to_du_dt_lowerbound_constr = \ (np.min(du_dt_fun_constraint_array) - du_dt_lowerbound) return np.array([geq_zero_equiv_to_T_upperbound_constr, geq_zero_equiv_to_T_lowerbound_constr, geq_zero_equiv_to_du_dt_upperbound_constr, geq_zero_equiv_to_du_dt_lowerbound_constr]) def fun_kalman_filter_air_heater(kalman_filter_inputs): # Kalman Filter for estimating T and d using meas of T. # These estimates are used by the MPC. # Note: The time-delayed u is used as control signal here. (T_meas_k, u_delayed_k, T_est_pred_k, d_est_pred_k, P_pred_k) \ = kalman_filter_inputs # Matrices in linearized model: n_states = 2 A_cont = np.array([[-1/tc, 1/tc], [0, 0]]) C_cont = np.array([[1, 0]]) A_disc = np.eye(n_states) + ts*A_cont C_disc = C_cont # Kalman Kh: K_k = ((P_pred_k @ (C_disc.T)) @ (np.linalg.inv(C_disc @ P_pred_k @ (C_disc.T) + R))) e_est_k = T_meas_k - T_est_pred_k # Measurement-based correction of estimate = the applied estimate: T_est_corr_k = T_est_pred_k + K_k[0, 0]*e_est_k d_est_corr_k = d_est_pred_k + K_k[1, 0]*e_est_k # Prediction of state estimate for next time-step: # Diff eqs: dT_est_pred_dt_k = (Kh*u_delayed_k + (T_amb - T_est_corr_k) + d_est_corr_k)/tc dd_est_pred_dt_k = 0 # Euler forward prediction (integration): T_est_pred_kp1 = T_est_corr_k + ts*dT_est_pred_dt_k d_est_pred_kp1 = d_est_corr_k + ts*dd_est_pred_dt_k # Auto-covariance of error of corrected estimate: P_corr_k = (np.eye(n_states) - K_k @ C_disc) @ P_pred_k # Auto-covariance of error of predicted estimate of next time step: P_pred_kp1 = A_disc @ P_corr_k @ (A_disc.T) + Q T_est_k = T_est_corr_k d_est_k = d_est_corr_k kalman_filter_outputs = (T_est_k, d_est_k, T_est_pred_kp1, d_est_pred_kp1, P_pred_kp1) return kalman_filter_outputs #%% Process params Kh = 3.5 # [deg C/V] tc = 23.0 # [s] td = 3.0 # [s] T_amb = 20.0 # [deg C] #%% Measurement noise setting An = 0.1 # [K] Amplitude of uniformly distributed measurement noise #%% Time settings ts = 0.5 # Time-step [s] t_pred_horizon = 8.0 # [s] N_pred = int(t_pred_horizon/ts) t_start = 0.0 # [s] t_stop = 400.0 # [s] N_sim = int((t_stop-t_start)/ts) + 1 #%% MPC costs C_e = 1.0 C_du = 20.0 # [(L/s)/min] mpc_costs = np.array([C_e, C_du]) #%% MPC control blocking (or grouping) # Number of blocks of control signal: N_blocks_u = 3 # Number of samples in each control block: N_samples_in_blocks = int(np.ceil(N_pred/N_blocks_u)) #%% Defining sequence of temperature reference r_const = 30.0 # [oC] step_r = 3.0 # [K] slope_r = -0.04 # [K/s] ampl_sine_r = 1.0 # [K] tp_sine_r = 50.0 # [s] t_const_start = t_start t_const_stop = 100 t_step_start = t_const_stop t_step_stop = 150 t_ramp_start = t_step_stop t_ramp_stop = 200 t_sine_start = t_ramp_stop t_sine_stop = 250 t_const2_start = t_sine_stop t_const2_stop = t_stop t = np.linspace(t_start, t_stop, N_sim) r_array = np.linspace(t_start, t_stop, N_sim)*0 for k in range(0, N_sim): if t[k] >= t_const_start and t[k] < t_const_stop: r_array[k] = r_const if t[k] >= t_step_start and t[k] < t_step_stop: r_array[k] = r_const + step_r if t[k] >= t_ramp_start and t[k] < t_ramp_stop: r_array[k] = (r_const + step_r + slope_r*(t[k]-t_ramp_start)) if t[k] >= t_sine_start and t[k] < t_sine_stop: r_array[k] = (r_const + 1 + ampl_sine_r*np.sin(2*np.pi*(1/tp_sine_r) *(t[k]-t_sine_start))) if t[k] >= t_const2_start: r_array[k] = r_const + 1 #%% Change of disturbance t_disturbance = 300 # [s] #%% Initialization of time delay u_init = 0.0 # [V] N_delay = int(round(td/ts)) + 1 delay_array = np.zeros(N_delay) + u_init #%% Initial state of process T_k = 27.0 # [C] #%% Initialization of Kalman Filter # Initial state: T_est_pred_k = 27.0 # [C] d_est_pred_k = 0.0 # [K] # Initial covariance of pred estim error: P_pred_k = np.diag([1.0e-3, 1.0e-3]) #%% Tuning of Kalman Filter stdev_w_T = 0.02 cov_w_T = stdev_w_T**2 stdev_w_d = 0.1 cov_w_d = stdev_w_d**2 Q = np.diag([1*cov_w_T, 1*cov_w_d]) stdev_v_T = 0.1 cov_v_T = stdev_v_T**2 R = np.diag([1*cov_v_T]) #%% Initial value of previous optimal value u_opt_km1 = u_init #%% Initial value of time delayed control signal u_delayed_k = 2 #%% Defining arrays for plotting t_plot_array = np.linspace(t_start, t_stop, N_sim-N_pred) r_plot_array = np.zeros(N_sim-N_pred) + r_const T_plot_array = np.zeros(N_sim-N_pred) + r_const T_est_plot_array = np.zeros(N_sim-N_pred) + r_const T_amb_plot_array = np.zeros(N_sim-N_pred) + T_amb u_plot_array = np.zeros(N_sim-N_pred) du_opt_dt_plot_array = np.zeros(N_sim-N_pred) d_est_plot_array = np.zeros(N_sim-N_pred) d_plot_array = np.zeros(N_sim-N_pred) #%% Settings of optimizer # Initial guessed optimal control sequence: u_guess = np.zeros(N_blocks_u) + u_init # Lower and upper limits of optim variable for use in SLSQP: u_upperbound = 5 u_lowerbound = 0 bounds_u = np.zeros([N_blocks_u, 2]) bounds_u[:, 0] = u_lowerbound bounds_u[:, 1] = u_upperbound # Constraints: T_upperbound = 32.0 # [C] T_lowerbound = 28.0 # [C] T_fun_constraint_array = np.zeros(N_pred) du_dt_upperbound = 0.25 #1 # 0.25 # [V/s] du_dt_lowerbound = -0.25 #-1 # -0.25 # [V/s] du_dt_fun_constraint_array = np.zeros(N_pred) #%% Selecting between continuous plot and batch plotting # 0 = off = batch (offline) plot. # 1 = on = continuous (online) plot. May use external # figure window by executing %matplotlib auto in console. cont_plot = 0 real_time_step = 0.01 #%% Preparing for plotting plt.close("all") fig1 = plt.figure(num=1, figsize=(12, 9)) legend_shadow = True location = 'upper right' font_size = 8 handle_length = 1 #%% For-loop for MPC of simulated process incl Kalman Filter tic = time.time() # Starting timer of simulation execution for k in range(0, (N_sim-N_pred)): t_k = t[k] # Storage for plotting t_plot_array[k] = t_k print('t = ', t_k) # Innovation process: num_samples = 1 meas_noise_k = np.random.uniform(-An, An, num_samples)[0] T_meas_k = T_k + meas_noise_k # Storage for plotting T_plot_array[k] = T_meas_k kalman_filter_inputs = (T_meas_k, u_delayed_k, T_est_pred_k, d_est_pred_k, P_pred_k) kalman_filter_outputs = fun_kalman_filter_air_heater(kalman_filter_inputs) (T_est_k, d_est_k, T_est_pred_kp1, d_est_pred_kp1, P_pred_kp1) = kalman_filter_outputs # Time index shift: P_pred_k = P_pred_kp1 T_est_pred_k = T_est_pred_kp1 d_est_pred_k = d_est_pred_kp1 # Storage for plotting: T_est_plot_array[k] = T_est_k d_est_plot_array[k] = d_est_k # Future reference of MPC: r_to_mpc_array = r_array[k:k+N_pred] r_plot_array[k] = r_array[k] # Calculating optimal control sequence: ineq_cons = {'type':'ineq', 'fun':fun_mpc_constraints_air_heater} # >= 0 constr res = minimize(fun_mpc_objective_air_heater, u_guess, method='SLSQP', constraints=[ineq_cons], options={'ftol': 1e-9, 'disp': False}, bounds=bounds_u) toc = time.time() u_opt = res.x # Optim solution used as guessed sol. in next iteration: u_guess = u_opt # Optimal control signal (sample) to be applied: u_opt_k = u_opt[0] u_plot_array[k] = u_opt_k # Storage for plotting # Rate of change of u: du_opt_dt_k = (u_opt_k - u_opt_km1)/ts du_opt_dt_plot_array[k] = du_opt_dt_k # Time shift: u_opt_km1 = u_opt_k # Applying optimal control signal to simulated process: # Time delay: u_delayed_k = delay_array[-1] delay_array[1:] = delay_array[0:-1] delay_array[0] = u_opt_k # Process disturbance: if t[k] <= t_disturbance: d_k = -1 else: d_k = -10 d_plot_array[k] = d_k # Solving diff eq: dT_dt_k = (Kh*u_delayed_k + (T_amb - T_k) + d_k)/tc T_kp1 = T_k + ts*dT_dt_k # Time index shift: T_k = T_kp1 #%% Continuous plotting if cont_plot == 1: plt.subplot(2, 2, 1) plt.grid(which='both', color='grey') plt.ylim(25, 35) plt.xlim(t_start, t_stop) plt.xlabel('t [s]') plt.ylabel('[deg C]') plt.legend(('r', 'T_meas', 'T_amb', 'T_bounds'), loc=location) plt.subplot(2, 2, 2) plt.grid(which='both', color='grey') plt.ylim(-1, 6) plt.xlim(t_start, t_stop) plt.xlabel('t [s]') plt.ylabel('[V]') plt.legend(('u', 'u bounds'), loc=location) plt.subplot(2, 2, 3) plt.grid(which='both', color='grey') plt.ylim(-12, 1) plt.xlim(t_start, t_stop) plt.xlabel('t [s]') plt.ylabel('[K]') plt.legend(('d_est', 'd'), loc=location) plt.subplot(2, 2, 4) plt.grid(which='both', color='grey') plt.ylim(-0.5, 0.5) plt.xlim(t_start, t_stop) plt.xlabel('t [s]') plt.ylabel('[K/s]') plt.legend(('du_dt', 'du_dt_bounds'), loc=location) if k > 0 and k < N_sim: plt.subplot(2, 2, 1) plt.plot([t_plot_array[k],t_plot_array[k-1]], [r_plot_array[k],r_plot_array[k-1]], color='red') plt.plot([t_plot_array[k],t_plot_array[k-1]], [T_plot_array[k],T_plot_array[k-1]], color='blue') plt.plot([t_plot_array[k],t_plot_array[k-1]], [T_amb_plot_array[k],T_amb_plot_array[k-1]], color='cyan') plt.plot([t_plot_array[k],t_plot_array[k-1]], [T_upperbound,T_upperbound], color='magenta') plt.plot([t_plot_array[k],t_plot_array[k-1]], [T_lowerbound,T_lowerbound], color='magenta') plt.subplot(2, 2, 2) plt.plot([t_plot_array[k],t_plot_array[k-1]], [u_plot_array[k],u_plot_array[k-1]], color='blue') plt.plot([t_plot_array[k],t_plot_array[k-1]], [u_upperbound,u_upperbound], color='magenta') plt.plot([t_plot_array[k],t_plot_array[k-1]], [u_lowerbound,u_lowerbound], color='magenta') plt.subplot(2, 2, 3) plt.plot([t_plot_array[k],t_plot_array[k-1]], [d_est_plot_array[k],d_est_plot_array[k-1]], color='blue') plt.plot([t_plot_array[k],t_plot_array[k-1]], [d_plot_array[k],d_plot_array[k-1]], color='green') plt.subplot(2, 2, 4) plt.plot([t_plot_array[k],t_plot_array[k-1]], [du_opt_dt_plot_array[k],du_opt_dt_plot_array[k-1]], color='blue') plt.plot([t_plot_array[k],t_plot_array[k-1]], [du_dt_upperbound,du_dt_upperbound], color='magenta') plt.plot([t_plot_array[k],t_plot_array[k-1]], [du_dt_lowerbound,du_dt_lowerbound], color='magenta') fig1.canvas.draw() fig1.canvas.flush_events() time.sleep(real_time_step) #%% Simulation execution time: toc = time.time() # Stopping timer for simulation execution t_elapsed = toc - tic print('Elapsed time total [s] = {:.2e}'.format(t_elapsed)) print('Elapsed time per time step [s] = {:.2e}'.format(t_elapsed/N_sim)) print('------------------------------------------------') #%% Batch plotting: if cont_plot == 0: plt.subplot(2, 2, 1) plt.grid(which='both', color='grey') plt.ylim(25, 35) plt.xlim(t_start, t_stop) plt.xlabel('t [s]') plt.ylabel('[deg C]') plt.plot(t_plot_array, r_plot_array, color='red', label='r') plt.plot(t_plot_array, T_plot_array, color='blue', label='T_meas') plt.plot(t_plot_array, T_amb_plot_array, 'c', label='T_amb') plt.plot(t_plot_array, T_plot_array*0 + T_upperbound, color='magenta', label='T_bounds') plt.plot(t_plot_array, T_plot_array*0 + T_lowerbound, color='magenta') plt.legend(loc=location) plt.subplot(2, 2, 2) plt.grid(which='both', color='grey') plt.ylim(-1, 6) plt.xlim(t_start, t_stop) plt.xlabel('t [s]') plt.ylabel('[V]') plt.plot(t_plot_array, u_plot_array, color='blue', label='u') plt.plot(t_plot_array, u_plot_array*0 + u_upperbound, color='magenta', label='u_bounds') plt.plot(t_plot_array, u_plot_array*0 + u_lowerbound, color='magenta') plt.legend(loc=location) plt.subplot(2, 2, 3) plt.grid(which='both', color='grey') plt.ylim(-12, 1) plt.xlim(t_start, t_stop) plt.xlabel('t [s]') plt.ylabel('[K]') plt.plot(t_plot_array, d_est_plot_array, color='blue',label='d_est') plt.plot(t_plot_array, d_plot_array, color='green') plt.legend(loc=location) plt.subplot(2, 2, 4) plt.grid(which='both', color='grey') plt.ylim(-0.5, 0.5) plt.xlim(t_start, t_stop) plt.xlabel('t [s]') plt.ylabel('[K/s]') plt.plot(t_plot_array, du_opt_dt_plot_array, color='blue', label='du_dt') plt.plot(t_plot_array, du_opt_dt_plot_array*0 + du_dt_upperbound, color='magenta', label='du_dt_bounds') plt.plot(t_plot_array, du_opt_dt_plot_array*0 + du_dt_lowerbound, color='magenta') plt.legend(loc=location) # plt.savefig('mpc_air_heater_no_noise.pdf') # plt.savefig('mpc_air_heater.pdf') plt.show()