import numpy as np from numpy.random import Generator from numpy.typing import NDArray from scipy import stats from scipy.optimize import minimize # region 3 Key Parameters def sensitivity_function(t, b0, b1, t_bar): """ Calculate the sensitivity function β(t). Parameters ---------- t : float Time variable. b0 : float Coefficient b0. b1 : float Coefficient b1. t_bar : float Average age at entry in the study group. average_life_expectancy : float Average life expectancy in years. Returns ------- float Sensitivity at time t. """ return 1 / (1 + np.exp(-b0 - b1 * (t - t_bar))) def transition_probability_function(t, w_max, mu, sigma): """ Calculate the transition probability function w(t). Parameters ---------- t : float Time variable, must be greater than 0. w_max : float Maximum weight. mu : float Mean of the log-normal distribution. sigma : float Standard deviation of the log-normal distribution. Returns ------- float Transition probability at time t. """ if t <= 0: raise ValueError("t must be greater than 0") return w_max * (1 / (np.sqrt(2 * np.pi) * sigma * t)) * np.exp(-((np.log(t) - mu) ** 2) / (2 * sigma ** 2)) def sojourn_time_function(x, kappa, rho): """ Calculate the sojourn time function q(x). Parameters ---------- x : float Time variable. kappa : float Shape parameter. rho : float Scale parameter. Returns ------- float Sojourn time at x. """ return (kappa * x ** (kappa - 1) * rho ** kappa) / ((1 + (x * rho) ** kappa) ** 2) # endregion # region Random Variates def transition_survival_rvs(rng: Generator, w: float, size: int = 1) -> np.ndarray[np.float64]: """ Generate random variates of transition survival. Parameters ---------- rng : np.random.Generator Random number generator. w : float Scale parameter that represents the probability of transition from healthy state to preclinical state. size : int, optional Number of random variates to generate. Returns ------- np.ndarray[np.float64] Times when a population transitions from healthy state to preclinical state. """ return rng.uniform(low=0, high=1, size=size) / w def transition_survival_rvs_alt(rng: Generator, t: float, a: float, size: int = 1) -> np.ndarray[np.float64]: """ Generate random variates of transition survival. Alternative implementation of transition_survival_rvs that returns np.nan for times that are greater than T. Parameters ---------- rng : np.random.Generator Random number generator. t : float Life expectancy. a : float Lifetime risk. size : int, optional Number of random variates to generate. Returns ------- np.ndarray[np.float64] Times when a population transitions from healthy state to preclinical state. """ times = rng.uniform(low=0, high=1, size=size) / a times[times < 1] = np.nan return times * t def sojourn_survival_rvs(rng: Generator, mu: float, t_w: np.ndarray[np.float64]) -> np.ndarray[np.float64]: """ Generate random variates of sojourn survival. Parameters ---------- rng : np.random.Generator Random number generator. mu : float Scale parameter that represents the probability of transition from preclinical state to detectable state. t_w : float Scale parameter that indicates the time when the person transitioned from healthy state to preclinical state. Returns ------- float Time when a person transitions from preclinical state to cancer state. """ return -mu * np.log(1-rng.uniform(low=0, high=1, size=len(t_w))) # region Likelihood def likelihood_function(D, I, n, s, r, alpha, beta): """ Calculate the likelihood function. Parameters ---------- D : list List of probabilities of preclinical diagnosis. I : list List of probabilities of clinical incidence. n : list List of total number of cases for each interval. s : list List of number of preclinical diagnoses for each interval. r : list List of number of clinical incidences for each interval. alpha : list List of alpha values for the product term. beta : float The probability of detection. Returns ------- float The likelihood value. """ L = 1.0 for i in range(len(D)): term1 = D[i] ** s[i] term2 = I[i] ** r[i] term3 = (1 - D[i] - I[i]) ** (n[i] - s[i] - r[i]) product_term = np.prod([(alpha[j] / beta) ** s[i][j] for j in range(3)]) L_i = term1 * term2 * term3 * product_term L *= L_i return L def probability_of_clinical_incidence(beta, w, mu, t, Q): """ Calculate the probability of clinical incidence. Parameters ---------- w : float Weighting factor. mu : float Mean of the distribution. t : list List of time intervals. Q : function Function Q(t) representing the probability distribution. beta : float The probability of detection. Returns ------- list A list of probabilities I_i for each time interval. """ I = [] for i in range(1, len(t) + 1): sum_term = sum((1 - beta) ** (i - j - 1) * (Q(t[i - 1] - t[j]) - Q(t[i] - t[j])) for j in range(i)) I_i = w * mu * ((t[i] - t[i - 1]) / mu - beta * sum_term) I.append(I_i) return I def probability_of_preclinical_diagnosis(beta, w, mu, t, Q): """ Calculate the probability of preclinical diagnosis. Parameters ---------- beta : float The probability of detection. w : float Weighting factor. mu : float Mean of the distribution. t : list List of time intervals. Q : function Function Q(t) representing the probability distribution. Returns ------- list A list of probabilities D_i for each time interval. """ D = [] for i in range(1, len(t) + 1): if i == 1: D_i = beta * w * mu else: sum_term = sum((1 - beta) ** (i - j - 1) * Q(t[i - 1] - t[j - 1]) for j in range(1, i)) D_i = beta * w * mu * (1 - beta * sum_term) D.append(D_i) return D # endregion # region Optimization def loss_function(params, real_data, population_size): sensitivity, specificity, risk, screening_interval, onset_interval = params simulated_data = analytic_simulation(sensitivity, specificity, risk, screening_interval, onset_interval, population_size) return np.sum((simulated_data - real_data) ** 2) def fit_to_real_data(real_data, initial_guess, population_size): result = minimize(loss_function, initial_guess, args=(real_data, population_size), method='BFGS') return result.x # endregion # region Deprecated def analytic_simulation(sensitivity, specificity, risk, screening_interval, onset_interval, population_size, average_life_expectancy) -> np.array: """ Analytic simulation of screening programme. Parameters ---------- sensitivity : float Sensitivity of the screening test. specificity : float Specificity of the screening test. risk : float Risk of cancer in the population. screening_interval : float Interval between screenings in years. onset_interval : float Interval between onset of cancer and detection in years. population_size : int Size of the population. Returns ------- array A numpy array with the following elements: - The number of people with cancer. - The number of people with cancer detected through screening. - The number of people with cancer that was not detected through screening and was detected through symptoms. - The number of people without cancer. - The number of false positives. """ population = stats.bernoulli.rvs(risk, size=population_size) detected_probability = stats.geom.cdf(onset_interval * screening_interval, sensitivity) # cancer cancer_quantity = sum(population) screening_population = stats.bernoulli.rvs(p=detected_probability, size=cancer_quantity) screening_detected_q = sum(screening_population) interval_cancer_q = len(screening_population) - screening_detected_q # recall recall_q = len(population) - cancer_quantity recall_total = 0 recall_total = np.sum(stats.binom.rvs(n=(int)(average_life_expectancy * screening_interval), p=(1-specificity), size=recall_q)) return np.array([cancer_quantity, screening_detected_q, interval_cancer_q, recall_q, recall_total]) # endregion