import ivp import solveMatrix import os import cModel_posodabljanje as cModel import sys import json from contextlib import contextmanager import numpy as np import time import scipy.interpolate import shutil import importlib from scipy.optimize import least_squares importlib.reload(solveMatrix) import runSolver importlib.reload(runSolver) def updateSetup(job): setupFileSrc = job['setupFile'] setupFile = os.path.join(job['jobDir'], 'setupInput.json') with open(setupFileSrc, 'r') as f: setup = json.load(f) try: setup.update(job['setupUpdates']) except KeyError: pass with open(setupFile, 'w+') as f: f.write(json.dumps(setup)) return setupFile def generate_noisy_data(full_sol, noise_std=20.5): full_sol = np.asarray(full_sol) # Ensure input is a NumPy array random_state = np.random.RandomState(seed=None) # Create highly volatile noise noise = random_state.normal(loc=0, scale=noise_std, size=full_sol.shape) ** 2 # Squared to amplify large deviations noise2 = np.random.laplace(loc=0, scale=noise_std * 10, size=full_sol.shape) # Increased scale for even more extreme spikes # Introduce highly chaotic scaling factors random_scaling = random_state.uniform(-200, 400, size=full_sol.shape) # Further extended range for more variation random_exponent = random_state.uniform(4, 12, size=full_sol.shape) # Increased exponentiation for more instability # Apply extreme, chaotic transformations noisy_data = np.zeros_like(full_sol) for i in range(len(full_sol)): perturbation = random_scaling[i] * (abs(noise[i]) ** random_exponent[i]) - noise2[i] * np.sin(full_sol[i]) noisy_data[i] = (1 + np.tanh(perturbation)) * full_sol[i] * np.cosh(noise[i]) # Introduce frequent large perturbations to increase instability if random_state.rand() < 0.75: # 30% chance of a drastic shift noisy_data[i] *= random_state.uniform(2, 5) return noisy_data def calculate_residuals(full_sol, noisy_data): """ Calculates residuals based on the solution and noisy data. """ residuals = np.ravel((noisy_data - full_sol)) #print(residuals) return residuals # Updated cost function def cost_function(params,parnames,noisy_data,model,startPoint,setup,jobDir): """ Cost function that calculates residuals based on the model. """ #print(params) model.setValues(parnames,params) clean_dir(jobDir) t,sol,se,qt,sOut,full_t,full_sol=runSolver.solve(model,setup,startPoint,jobDir) residuals = calculate_residuals(np.array(full_sol), noisy_data) return residuals def create_name_value_dict(original_dict): return { val['name']: val['value'] for key, val in original_dict.items() if isinstance(val, dict) and 'name' in val and 'value' in val } # Create parameter dictionary def clean_dir(jobDir): shutil.rmtree(jobDir) os.mkdir(jobDir) #PRENOS ZAČETNIH PODATKOV ############################################################################################################################################## fh = os.path.expanduser("~") jobDir = os.path.join(fh, 'Documents', 'Sola', 'IJS', 'PBPK_ociscen',"rezultati","curve_fit") srcDir = "NONE" setup_file = os.path.join(fh, 'Documents', 'Sola', 'IJS', 'PBPK_ociscen', 'setup', 'setupMinute.json') model_file = os.path.join(fh, 'Documents', 'Sola', 'IJS', 'PBPK_ociscen', 'models', 'cDiazepam.json') parameter_file1 = os.path.join(fh, 'Documents', 'Sola', 'IJS', 'PBPK_ociscen', 'models', 'cDiazepam_parameters.json') # Prvi model s prvim parameter file model1 = cModel.model() model1.parse(model_file, parameter_file1) setup1 = runSolver.parseSetup(setup_file) name_value_dict1 = create_name_value_dict(model1.parSetup['parameters']) # Lahko zdaj uporabiš name_value_dict1 in name_value_dict2 ločeno # Excluded parameters vse_parametri = { 'commentsInput', 'venousInput', '_venousInput', 'inputA', 'inputTau', 'commentPC', 'adiposePC', 'brainPC', 'heartPC', 'kidneyPC', 'liverPC', 'lungPC', 'musclePC', 'remainderPC', 'skinPC', 'splanchnicPC', 'stomachPC', 'testesPC', 'commentFlows', 'flowNotes', 'adiposeFlow', 'brainFlow', 'heartFlow', 'kidneyFlow', 'liverInFlow', 'liverOutFlow', 'gutOutFlow', 'totalFlow', 'muscleFlow', 'skinFlow', 'splanchnicFlow', 'stomachFlow', 'testesFlow', 'remainderFlow', 'adiposeVolume', 'brainVolume', 'heartVolume', 'kidneyVolume', 'liverVolume', 'lungVolume', 'muscleVolume', 'skinVolume', 'splanchnicVolume', 'stomachVolume', 'testesVolume', 'remainderVolume', 'venousVolume', 'arterialVolume', 'liverToExcrementNotes', 'k1', 'one', 'zero', 'two' } excluded_params1 = {'venousInput',"heartPC","kidneyPC","remainderPC","skinPC","splanchnicPC","stomachPC","testesPC","one","venousVolume","heartVolume","brainVolume","adiposeVolume"} #excluded_params1={} # Create initial guesses without excluded parameters initial_guesses = np.array( [val for par, val in name_value_dict1.items() if par not in excluded_params1], dtype=np.float64 ) # Create list of parameter names without excluded param_names = [par for par in name_value_dict1.keys() if par not in excluded_params1] initial_pars = np.array([x for x in initial_guesses]) #initial_pars[3]*=1.1 #print("kolicnik") #print(initial_guesses/initial_pars) #print(param_names) # TO NAREDI REŠITVE Z ZAČETNIMI PARAMETRI start_point= runSolver.findStartPoint("NONE",setup1) #model1.setValues(param_names,initial_pars) clean_dir(jobDir) t,sol,se,qt,sOut,full_t,full_sol = runSolver.solve(model1,setup1,start_point,jobDir) clean_dir(jobDir) # Step 4: Create a new dictionary for the optimized parameters # Map the optimized parameters back to the correct order in the original parameter dictionary optimized_params_dict1 = {par: initial_guesses[i] for i, par in enumerate(param_names)} # Step 5: Include the excluded parameters with their original values for par in excluded_params1: optimized_params_dict1[par] = name_value_dict1[par] # Step 6: Ensure that everything stays in order in the final dictionary # This will maintain the order of parameters as they appear in the original `name_value_dict2` final_dict1 = {par: optimized_params_dict1.get(par, name_value_dict1[par]) for par in name_value_dict1} #################################################################################################################333 # Z DRUGIMI PARAMETRI ########################################################################################################################## fh = os.path.expanduser("~") # Drugi model z drugim parameter file parameter_file2 = os.path.join(fh, 'Documents', 'Sola', 'IJS', 'PBPK_ociscen', 'models', 'cDiazepam_parameters1.json') model2 = cModel.model() model2.parse(model_file, parameter_file2) setup2 = runSolver.parseSetup(setup_file) name_value_dict2 = create_name_value_dict(model2.parSetup['parameters']) # Excluded parameters excluded_params2 = {'venousInput',"heartPC","kidneyPC","remainderPC","skinPC","splanchnicPC","stomachPC","testesPC","one","venousVolume","heartVolume","brainVolume","adiposeVolume"} #excluded_params2={} # Step 1: Filter out excluded parameters for initial guesses initial_guesses2 = np.array( [val for par, val in name_value_dict2.items() if par not in excluded_params2], dtype=np.float64 ) param_names2 = [par for par in name_value_dict2.keys() if par not in excluded_params2] initial_pars2 = np.array([x for x in initial_guesses2]) # Step 4: Create a new dictionary for the optimized parameters # Map the optimized parameters back to the correct order in the original parameter dictionary optimized_params_dict2 = {par: initial_guesses2[i] for i, par in enumerate(param_names2)} # Step 5: Include the excluded parameters with their original values for par in excluded_params2: optimized_params_dict2[par] = name_value_dict2[par] # Step 6: Ensure that everything stays in order in the final dictionary # This will maintain the order of parameters as they appear in the original `name_value_dict2` final_dict2 = {par: optimized_params_dict2.get(par, name_value_dict2[par]) for par in name_value_dict2} # Step 2: Run optimization clean_dir(jobDir) lower_bound_value = 0 # Example: all parameters must be ≥ 0 upper_bound_value = 1000 # Example: all parameters must be ≤ 10 # Create bounds arrays of the same length as initial_guesses2 lower_bounds = np.full_like(initial_guesses2, lower_bound_value) upper_bounds = np.full_like(initial_guesses2, upper_bound_value) start_point2= runSolver.findStartPoint("NONE",setup2) #t,sol,se,qt,sOut,full_t,full_sol2 = runSolver.solve(model2,setup2,start_point2,jobDir) from scipy.optimize import curve_fit import numpy as np # Define the wrapper function for curve_fit def wrapped_cost_function(xdata, *param_values_and_args): """ Curve fitting wrapper function that adapts the cost function to work with curve_fit. """ # Separate parameter values from extra arguments param_values = param_values_and_args[:len(param_names2)] # Extract parameter values model, startPoint, setup, jobDir = param_values_and_args[len(param_names2):] # Extract extra arguments # Set parameter values in the model model.setValues(param_names2, param_values) clean_dir(jobDir) # Clean job directory before solving # Solve the model t, sol, se, qt, sOut, full_t, full_sol = runSolver.solve(model, setup, startPoint, jobDir) return np.ravel(full_sol) # Flatten the output for curve_fit # Prepare extra arguments (pack model and related arguments in a tuple) extra_args = (model2, start_point2, setup2, jobDir) # Use `curve_fit`, passing extra arguments through a lambda function optimized_params3, covariance, infodict, errmsg, success = curve_fit( lambda xdata, *param_values: wrapped_cost_function(xdata, *param_values, *extra_args), xdata=np.arange(len(full_sol)), # Explicit x-data (index of time points or data points) ydata=np.ravel(full_sol), # Explicit y-data (flattened model output) p0=initial_guesses2, # Initial parameter guesses bounds=(lower_bounds, upper_bounds), # Parameter bounds full_output=True # Get additional output ) # Extract the number of iterations from the info dictionary num_iterations = infodict.get('nfev', None) # 'nfev' is the number of function evaluations (which correlates to iterations) print(f"Number of iterations: {num_iterations}") # Step 3: Retrieve optimized parameters #optimized_params3 = result.x #print(result.x) # Step 4: Create a new dictionary for the optimized parameters # Map the optimized parameters back to the correct order in the original parameter dictionary optimized_params_dict3 = {par: optimized_params3[i] for i, par in enumerate(param_names2)} # Step 5: Include the excluded parameters with their original values for par in excluded_params2: optimized_params_dict3[par] = name_value_dict2[par] # Step 6: Ensure that everything stays in order in the final dictionary # This will maintain the order of parameters as they appear in the original `name_value_dict2` final_optimized_dict3 = {par: optimized_params_dict3.get(par, name_value_dict2[par]) for par in name_value_dict2} # Now, final_optimized_dict contains the optimized values in the same order as in `name_value_dict2` print(name_value_dict2) print(final_optimized_dict3) vrednosti3 = np.array( [val for par, val in final_optimized_dict3.items()], dtype=np.float64 ) param_names3 = [par for par in final_optimized_dict3.keys()] #model2.setValues(param_names3,vrednosti3) ################################################################################################################################# # PREVRBA ČE SO 3 različne rešitve #clean_dir(jobDir) #t,sol,se,qt,sOut,full_t,full_sol1 = runSolver.solve(model1,setup1,start_point,jobDir) #clean_dir(jobDir) #t,sol,se,qt,sOut,full_t,full_sol2 = runSolver.solve(model2,setup2,start_point2,jobDir) #model2.setValues(param_names3,vrednosti3) #clean_dir(jobDir) #t,sol,se,qt,sOut,full_t,full_sol3 = runSolver.solve(model2,setup2,start_point,jobDir) #print(np.array(full_sol2)/np.array(full_sol1)) #print(np.array(full_sol3)/np.array(full_sol1)) #print(np.array(full_sol3)/np.array(full_sol2)) #print(initial_guesses2/initial_guesses) #print(optimized_params3/initial_guesses2) #print(optimized_params3/initial_guesses) ################################################################################################################################## # SHRANIMO REŠITVE ZA PARAMETRE def create_or_clean_dir(directory): """Create the directory if it doesn't exist, otherwise clean it.""" if os.path.exists(directory): shutil.rmtree(directory) # Remove the existing directory and its contents os.makedirs(directory) # Create a fresh directory # Example structure to match your requested output format def save_parameters(file_path, param_dict): """ Overwrites the original file with the updated parameters. Parameters: file_path (str): The path where the updated parameters will be saved. param_dict (dict): The updated parameters to save. """ # Ensure the directory exists os.makedirs(os.path.dirname(file_path), exist_ok=True) # Open the original file and load its content to preserve its structure with open(file_path, 'r') as f: data = json.load(f) # Update the 'parameters' field with the new parameters data['parameters'] = param_dict # Save the updated structure back to the same file with open(file_path, 'w') as f: json.dump(data, f, indent=4) print(f"Updated parameters have been written to {file_path}") return file_path # Make sure the file path is returned # Define unique subdirectories jobDir1 = os.path.join(jobDir, "run1") jobDir2 = os.path.join(jobDir, "run2") jobDir3 = os.path.join(jobDir, "run3") # Ensure directories exist and are clean create_or_clean_dir(jobDir1) create_or_clean_dir(jobDir2) create_or_clean_dir(jobDir3) #ZA ORIGNAL # Solve and save results in respective directories t, sol, se, qt, sOut, full_t, full_sol1 = runSolver.solve(model1, setup1, start_point, jobDir1) # Define file paths for the new parameter file new_param_file1 = os.path.join(jobDir1, "parameters.json") # Update parameters and save them to the new file model1.updateParameters(final_dict1, parameter_file1, new_param_file1) # Save updated parameters to the respective directory save_parameters(os.path.join(jobDir1, "parameters.json"), model1.parSetup['parameters']) #ZA DRUGI ORIGINAL new_param_file2 = os.path.join(jobDir2, "parameters.json") t, sol, se, qt, sOut, full_t, full_sol2 = runSolver.solve(model2, setup2, start_point, jobDir2) model2.updateParameters(final_dict2, parameter_file2, new_param_file2) save_parameters(os.path.join(jobDir2, "parameters.json"), model2.parSetup['parameters']) # Posodobljeni parametri in rešitev new_param_file3 = os.path.join(jobDir3, "updated_parameters.json") model2.setValues(param_names3, vrednosti3) t, sol, se, qt, sOut, full_t, full_sol3 = runSolver.solve(model2, setup2, start_point, jobDir3) model2.updateParameters(final_optimized_dict3, parameter_file2, new_param_file3) # Save parameters and immediately store the path for further use param_file3 = save_parameters(os.path.join(jobDir3, "updated_parameters.json"), model2.parSetup['parameters']) # Now you can directly use param_file3 in further processing print(f"Parameters for run3 saved at: {param_file3}") ######################################################################################################################## #ZGRADIMO 3 MODEL (da bo lažje pol risat) ############################################################################################################################################## # param_file3 already contains the full path to "parameters.json" import os import glob directory = os.path.dirname(param_file3) jobDir3 = os.path.join(fh, 'Documents', 'Sola', 'IJS', 'PBPK_ociscen',"rezultati","curve_fit","run3") # Make sure param_file3 contains the correct file path, including file name. # If param_file3 only has the directory, append the file name parameter_file3 = os.path.join(directory, "updated_parameters.json") # Specify the file name # Check if it's a valid file if os.path.isfile(parameter_file3): with open(parameter_file3, 'r') as f: # Your code to process the file pass else: print(f"Error: {parameter_file3} is not a valid file.") import os import glob def clean_files(directory): """Deletes all .txt and .csv files in the given directory.""" # Use glob to find all .txt and .csv files in the directory txt_files = glob.glob(os.path.join(directory, "*.txt")) csv_files = glob.glob(os.path.join(directory, "*.csv")) # Combine both lists of files all_files = txt_files + csv_files # Loop through the list of files and delete each one for file in all_files: try: os.remove(file) print(f"Deleted: {file}") except Exception as e: print(f"Error deleting {file}: {e}") clean_files(jobDir3) clean_files(jobDir) clean_files(jobDir1) clean_files(jobDir2) #Saves alll the necessary information for drawing in the right files runSolver.main([setup_file,model_file,parameter_file2,srcDir],jobDir2) runSolver.main([setup_file,model_file,parameter_file1,srcDir],jobDir1) runSolver.main([setup_file,model_file,parameter_file3,srcDir],jobDir3) ########################################################################################################3333 #RISANJE ######################################################################################################################## import matplotlib.pyplot as plt job_configs = { 'Osnovni': { 'jobDir': os.path.join(fh, 'Documents', 'Sola', 'IJS', 'PBPK_ociscen', 'rezultati','curve_fit',"run1") }, 'Osnovni2': { 'jobDir': os.path.join(fh, 'Documents', 'Sola', 'IJS', 'PBPK_ociscen', 'rezultati','curve_fit','run2') }, 'Posodobljeni_parametri': { 'jobDir': os.path.join(fh, 'Documents', 'Sola', 'IJS', 'PBPK_ociscen', 'rezultati','curve_fit','run3') } } def getModel(solution, modelName): """ Load and parse the model from the given solution dictionary using the modelName. """ Q = solution[modelName] # Retrieve the solution dictionary for the specified model model = cModel.model() # Create an instance of the cModel class setupFile = Q['setup'] # Get the setup file path from the solution modelFile = Q['model'] # Get the model file path from the solution parameterFile = Q['parameters'] # Get the parameter file path from the solution model.parse(modelFile, parameterFile) # Parse the model file with the given parameters return model # Return the loaded model def mergeSolutions(seq): """ Merge multiple solution dictionaries into a single dictionary. """ out = {} for v in ['t', 'sol', 'se', 'qt', 'sOut']: # Merge arrays from each solution out[v] = np.concatenate([x[v] for x in seq]) for v in ['lut', 'lutSE', 'setup', 'model', 'parameters', 'qt', 'sOut']: # Use data from the last solution for these keys out[v] = seq[-1][v] return out # Return the merged dictionary # Load and merge solutions solution = {} for modelName, job in job_configs.items(): seq = [runSolver.loadSolutionFromDir(job['jobDir'], True)] solution[modelName] = mergeSolutions(seq) def analyze_model(model, setup): """ Perform matrix operations and print results. """ tscale = runSolver.getScale(setup) # Get the time scale from the setup model.inspect() # Inspect the model nt = setup['nt'] # Number of time points qtmax = 1 # Maximum time (minute) qt = np.linspace(0, qtmax, nt) # Generate time points par = 'venousInput' # Parameter to plot # Plot parameters try: hw = model.get(par) # Get the parameter from the model ht = [10 * hw['value'](x) for x in qt] # Calculate parameter values over time plt.plot(qt / tscale, ht) # Plot the parameter values except (KeyError, TypeError): # Handle errors if the parameter is not found pass # Measure performance of matrix operations start_time = time.time() # Record start time for i in range(100000): # Perform matrix operations 100,000 times model.M(1e7) # Call the matrix operation end_time = time.time() # Record end time fM = model.M(0) # Get the matrix from the model np.set_printoptions(suppress=True, precision=2, linewidth=150) # Set NumPy print options np.set_printoptions(suppress=False) # Reset NumPy print options v, Q = np.linalg.eig(fM) # Perform eigenvalue decomposition Q1 = np.linalg.inv(Q) # Calculate the inverse of the eigenvector matrix D = np.diag(v) # Create a diagonal matrix of eigenvalues t = np.linspace(0, 100, 101) # Generate time points for plotting fy = [model.u(x)[14] for x in t] # Calculate model output over time plt.figure() # Create a new figure plt.plot(fy) # Plot the model output fu = model.u(0) # Get the model output at time 0 plt.show() # Display the plot # Restore stdout def plot_results(solution, modelName): """ Plot results for the specified model from the solution. """ model = getModel(solution, modelName) # Load the model setup = solution[modelName]['setup'] # Get the setup from the solution tscale = runSolver.getScale(setup) # Get the time scale name = ['venous', 'adipose', 'brain', 'heart', 'kidney', 'liver', 'lung', 'muscle', 'skin', 'stomach', 'splanchnic', "arterial","remainder","excrement"] # Compartment names tmax = solution[modelName]['t'][-1] # Get the maximum time from the solution # Define colors and shading for models models = { 'Osnovni': {'color': 'orange', 'shadeColor': 'red'}, 'Osnovni2': {'color': 'blue', 'shadeColor': 'green'}, 'Posodobljeni_parametri': {'color': 'black', 'shadeColor': 'yellow'} } fig, axs = plt.subplots(5, 3, figsize=(15, 20)) # Create a grid of subplots for i in range(len(name)): # Loop over each compartment row = i // 3 # Determine row index col = i % 3 # Determine column index ax = axs[row, col] # Get the subplot for m in models: # Loop over each model fM = models[m] # Get model configuration Q = solution[m] # Get solution for the model v = name[i] # Get the compartment name try: j = Q['lut'][v] # Get the index for the compartment except KeyError: try: v1 = alias[v] # Handle alias if needed j = Q['lut'][v1] # Get the index for the alias except KeyError: continue fy = Q['sol'][:, j] # Get the solution for the compartment fe = Q['se'][:, j] # Get the standard error for the compartment t = Q['t'] # Get the time points # Plot the mean values and confidence intervals ax.plot(t / tscale, fy, color=fM['color'], label=f'{m} sol') ax.fill_between(t / tscale, fy - fe, fy + fe, color=fM['shadeColor'], alpha=0.1) # Shaded area for confidence intervals ax.plot(t / tscale, fy - fe, color=fM['shadeColor'], linewidth=1, alpha=0.2) # Lower bound ax.plot(t / tscale, fy + fe, color=fM['shadeColor'], linewidth=1, alpha=0.2) # Upper bound # Find the maximum y-value including confidence intervals max_y_value = max(np.max(fy + fe), np.max(fy - fe)) ax.set_ylim([0, min(max_y_value, 10000)]) # Set y-axis limit to max value or 10000, whichever is smaller ax.set_xlim([0, 1.1 * tmax / tscale]) # Set x-axis limits ax.set_xlabel(setup['tUnit']) # Set x-axis label ax.set_title(name[i]) # Set plot title ax.legend() # Add legend # output_file = os.path.join(fh, 'Documents', 'Sola', 'IJS', 'PBPK_ociscen', f'{modelName}_results.pdf') plt.tight_layout() # plt.savefig(output_file) # Save the figure plt.show() # Display the plot # Extract the number of iterations from the info dictionary num_iterations = infodict.get('nfev', None) # 'nfev' is the number of function evaluations (which correlates to iterations) print(f"Number of iterations: {num_iterations}") # Analyze and plot results for the specified models for modelName in job_configs.keys(): if modelName == 'Osnovni': # Analyze only the specified model analyze_model(getModel(solution, modelName), solution[modelName]['setup']) plot_results(solution, modelName)