import numpy as np from numpy.random import Generator from numpy.typing import NDArray from scipy import stats from scipy.optimize import minimize # region 3 Key Parameters def sensitivity_function(t, b0, b1, t_bar): """ Calculate the sensitivity function β(t). Parameters ---------- t : float Time variable. b0 : float Coefficient b0. b1 : float Coefficient b1. t_bar : float Average age at entry in the study group. average_life_expectancy : float Average life expectancy in years. Returns ------- float Sensitivity at time t. """ return 1 / (1 + np.exp(-b0 - b1 * (t - t_bar))) def transition_probability_function(t, w_max, mu, sigma): """ Calculate the transition probability function w(t). Parameters ---------- t : float Time variable, must be greater than 0. w_max : float Maximum weight. mu : float Mean of the log-normal distribution. sigma : float Standard deviation of the log-normal distribution. Returns ------- float Transition probability at time t. """ if t <= 0: raise ValueError("t must be greater than 0") return w_max * (1 / (np.sqrt(2 * np.pi) * sigma * t)) * np.exp(-((np.log(t) - mu) ** 2) / (2 * sigma ** 2)) def sojourn_time_function(x, kappa, rho): """ Calculate the sojourn time function q(x). Parameters ---------- x : float Time variable. kappa : float Shape parameter. rho : float Scale parameter. Returns ------- float Sojourn time at x. """ return (kappa * x ** (kappa - 1) * rho ** kappa) / ((1 + (x * rho) ** kappa) ** 2) # endregion # region Random Variates def transition_survival_rvs(rng: Generator, w: float, size: int = 1) -> np.ndarray[np.float64]: """ Generate random variates of transition survival. Parameters ---------- rng : np.random.Generator Random number generator. w : float Scale parameter that represents the probability of transition from healthy state to preclinical state. size : int, optional Number of random variates to generate. Returns ------- np.ndarray[np.float64] Times when a population transitions from healthy state to preclinical state. """ return rng.uniform(low=0, high=1, size=size) / w def transition_survival_rvs_alt(rng: Generator, t: float, a: float, size: int = 1) -> np.ndarray[np.float64]: """ Generate random variates of transition survival. Alternative implementation of transition_survival_rvs that returns np.nan for times that are greater than T. Parameters ---------- rng : np.random.Generator Random number generator. t : float Life expectancy. a : float Lifetime risk. size : int, optional Number of random variates to generate. Returns ------- np.ndarray[np.float64] Times when a population transitions from healthy state to preclinical state. """ times = rng.uniform(low=0, high=1, size=size) / a times[times < 1] = np.nan return times * t def sojourn_survival_rvs(rng: Generator, mu: float, t_w: np.ndarray[np.float64]) -> np.ndarray[np.float64]: """ Generate random variates of sojourn survival. Parameters ---------- rng : np.random.Generator Random number generator. mu : float Scale parameter that represents the probability of transition from preclinical state to detectable state. t_w : float Scale parameter that indicates the time when the person transitioned from healthy state to preclinical state. Returns ------- float Time when a person transitions from preclinical state to cancer state. """ return -mu * np.log(1-rng.uniform(low=0, high=1, size=len(t_w))) # region Likelihood def likelihood_function(D, I, n, s, r, alpha, beta): """ Calculate the likelihood function. Parameters ---------- D : list List of probabilities of preclinical diagnosis. I : list List of probabilities of clinical incidence. n : list List of total number of cases for each interval. s : list List of number of preclinical diagnoses for each interval. r : list List of number of clinical incidences for each interval. alpha : list List of alpha values for the product term. beta : float The probability of detection. Returns ------- float The likelihood value. """ L = [] for i in range(len(D)): qs=np.sum(s[i]) term1 = D[i] ** qs term2 = I[i] ** r[i] term3 = (1 - D[i] - I[i]) ** (n[i] - qs - r[i]) product_term = np.prod((alpha / beta) ** s[i] ) L_i = term1 * term2 * term3 * product_term L.append( L_i ) return L def mylog(x): if x==0: return - np.inf return np.log(x) def loglikelihood_function(D, I, n, s, r, alpha, beta): # Same as likelihood above L = [] for i in range(len(D)): qs=np.sum(s[i]) term1 = qs*mylog(D[i]) term2=r[i]*mylog(I[i]) term3 = (n[i]-qs-r[i])*mylog(1 - D[i] - I[i]) product_term=0 if beta!=0: product_term = np.sum(s[i]*mylog(alpha / beta)) L_i = term1 + term2 + term3 + product_term L.append( L_i ) return L def likelihood_function1(pars,n,s,r): w=pars[0] mu=pars[1] beta=pars[2] def Q(x): return np.exp(-x/mu) t=25+np.linspace(0,len(n),len(n)+1) I=probability_of_clinical_incidence(beta,w,mu,t,Q) D=probability_of_preclinical_diagnosis(beta,w,mu,t,Q) qs=np.expand_dims(s,1) alpha=np.array([beta]) L=loglikelihood_function(D,I,n,s,r,alpha,beta) return np.sum(L) def fI(pars,t): w=pars[0] mu=pars[1] beta=pars[2] def Q(x): return np.exp(-x/mu) return probability_of_clinical_incidence(beta,w,mu,t,Q) def fD(pars,t): w=pars[0] mu=pars[1] beta=pars[2] def Q(x): return np.exp(-x/mu) return probability_of_preclinical_diagnosis(beta,w,mu,t,Q) def probability_of_clinical_incidence(beta, w, mu, t, Q): #Calculate the probability of clinical incidence. # Parameters # ---------- # w : float # Weighting factor. # mu : float # Mean of the distribution. # t : list # List of screening times [0,..,tN] of length N+1 (where N is number intervals) # Q : function # Function Q(t) representing the probability distribution. # beta : float # The probability of detection. # # Returns # ------- # list # A list of probabilities I_i for each time interval (of length N, 0 corresponds to interval [t0,t1)) # """ I = [] for i in range(1,len(t)): sum_terms=[(1 - beta) ** (i - j - 1) * (Q(t[i - 1] - t[j]) - Q(t[i] - t[j])) for j in range(i)] sum_term = np.sum(sum_terms) dt=(t[i]-t[i-1]) I_i = w * (dt - beta * mu * sum_term) I.append(I_i) return np.array(I) def probability_of_preclinical_diagnosis(beta, w, mu, t, Q): """ Calculate the probability of preclinical diagnosis. Parameters ---------- beta : float The probability of detection. w : float Weighting factor. mu : float Mean of the distribution. t : list List of time intervals. Q : function Function Q(t) representing the probability distribution. Returns ------- list A list of probabilities D_i for each time interval. """ D = [] for i in range(1, len(t) ): if i == 1: D_i = beta * w * mu else: sum_terms=[(1 - beta) ** (i - j - 1) * Q(t[i - 1] - t[j - 1]) for j in range(1, i)] sum_term = np.sum(sum_terms) D_i = beta * w * mu * (1 - beta * sum_term) D.append(D_i) return np.array(D) # endregion # region Optimization def loss_function(params, real_data, population_size): sensitivity, specificity, risk, screening_interval, onset_interval = params simulated_data = analytic_simulation(sensitivity, specificity, risk, screening_interval, onset_interval, population_size) return np.sum((simulated_data - real_data) ** 2) def fit_to_real_data(real_data, initial_guess, population_size): result = minimize(loss_function, initial_guess, args=(real_data, population_size), method='BFGS') return result.x # endregion # region Deprecated def analytic_simulation(sensitivity, specificity, risk, screening_interval, onset_interval, population_size, average_life_expectancy) -> np.array: """ Analytic simulation of screening programme. Parameters ---------- sensitivity : float Sensitivity of the screening test. specificity : float Specificity of the screening test. risk : float Risk of cancer in the population. screening_interval : float Interval between screenings in years. onset_interval : float Interval between onset of cancer and detection in years. population_size : int Size of the population. Returns ------- array A numpy array with the following elements: - The number of people with cancer. - The number of people with cancer detected through screening. - The number of people with cancer that was not detected through screening and was detected through symptoms. - The number of people without cancer. - The number of false positives. """ population = stats.bernoulli.rvs(risk, size=population_size) detected_probability = stats.geom.cdf(onset_interval * screening_interval, sensitivity) # cancer cancer_quantity = sum(population) screening_population = stats.bernoulli.rvs(p=detected_probability, size=cancer_quantity) screening_detected_q = sum(screening_population) interval_cancer_q = len(screening_population) - screening_detected_q # recall recall_q = len(population) - cancer_quantity recall_total = 0 recall_total = np.sum(stats.binom.rvs(n=(int)(average_life_expectancy * screening_interval), p=(1-specificity), size=recall_q)) return np.array([cancer_quantity, screening_detected_q, interval_cancer_q, recall_q, recall_total]) # endregion