|
@@ -0,0 +1,307 @@
|
|
|
+import numpy as np
|
|
|
+from numpy.random import Generator
|
|
|
+from numpy.typing import NDArray
|
|
|
+from scipy import stats
|
|
|
+from scipy.optimize import minimize
|
|
|
+
|
|
|
+# region 3 Key Parameters
|
|
|
+def sensitivity_function(t, b0, b1, t_bar):
|
|
|
+ """
|
|
|
+ Calculate the sensitivity function β(t).
|
|
|
+
|
|
|
+ Parameters
|
|
|
+ ----------
|
|
|
+ t : float
|
|
|
+ Time variable.
|
|
|
+ b0 : float
|
|
|
+ Coefficient b0.
|
|
|
+ b1 : float
|
|
|
+ Coefficient b1.
|
|
|
+ t_bar : float
|
|
|
+ Average age at entry in the study group.
|
|
|
+
|
|
|
+ average_life_expectancy : float
|
|
|
+ Average life expectancy in years.
|
|
|
+
|
|
|
+ Returns
|
|
|
+ -------
|
|
|
+ float
|
|
|
+ Sensitivity at time t.
|
|
|
+ """
|
|
|
+ return 1 / (1 + np.exp(-b0 - b1 * (t - t_bar)))
|
|
|
+
|
|
|
+def transition_probability_function(t, w_max, mu, sigma):
|
|
|
+ """
|
|
|
+ Calculate the transition probability function w(t).
|
|
|
+
|
|
|
+ Parameters
|
|
|
+ ----------
|
|
|
+ t : float
|
|
|
+ Time variable, must be greater than 0.
|
|
|
+ w_max : float
|
|
|
+ Maximum weight.
|
|
|
+ mu : float
|
|
|
+ Mean of the log-normal distribution.
|
|
|
+ sigma : float
|
|
|
+ Standard deviation of the log-normal distribution.
|
|
|
+
|
|
|
+ Returns
|
|
|
+ -------
|
|
|
+ float
|
|
|
+ Transition probability at time t.
|
|
|
+ """
|
|
|
+ if t <= 0:
|
|
|
+ raise ValueError("t must be greater than 0")
|
|
|
+ return w_max * (1 / (np.sqrt(2 * np.pi) * sigma * t)) * np.exp(-((np.log(t) - mu) ** 2) / (2 * sigma ** 2))
|
|
|
+
|
|
|
+def sojourn_time_function(x, kappa, rho):
|
|
|
+ """
|
|
|
+ Calculate the sojourn time function q(x).
|
|
|
+
|
|
|
+ Parameters
|
|
|
+ ----------
|
|
|
+ x : float
|
|
|
+ Time variable.
|
|
|
+ kappa : float
|
|
|
+ Shape parameter.
|
|
|
+ rho : float
|
|
|
+ Scale parameter.
|
|
|
+
|
|
|
+ Returns
|
|
|
+ -------
|
|
|
+ float
|
|
|
+ Sojourn time at x.
|
|
|
+ """
|
|
|
+ return (kappa * x ** (kappa - 1) * rho ** kappa) / ((1 + (x * rho) ** kappa) ** 2)
|
|
|
+
|
|
|
+# endregion
|
|
|
+# region Random Variates
|
|
|
+
|
|
|
+def transition_survival_rvs(rng: Generator, w: float, size: int = 1) -> np.ndarray[np.float64]:
|
|
|
+ """
|
|
|
+ Generate random variates of transition survival.
|
|
|
+
|
|
|
+ Parameters
|
|
|
+ ----------
|
|
|
+ rng : np.random.Generator
|
|
|
+ Random number generator.
|
|
|
+ w : float
|
|
|
+ Scale parameter that represents the probability of transition
|
|
|
+ from healthy state to preclinical state.
|
|
|
+ size : int, optional
|
|
|
+ Number of random variates to generate.
|
|
|
+
|
|
|
+ Returns
|
|
|
+ -------
|
|
|
+ np.ndarray[np.float64]
|
|
|
+ Times when a population transitions from healthy state to preclinical state.
|
|
|
+ """
|
|
|
+ return rng.uniform(low=0, high=1, size=size) / w
|
|
|
+
|
|
|
+def transition_survival_rvs_alt(rng: Generator, t: float, a: float, size: int = 1) -> np.ndarray[np.float64]:
|
|
|
+ """
|
|
|
+ Generate random variates of transition survival.
|
|
|
+ Alternative implementation of transition_survival_rvs that returns np.nan for times that are greater than T.
|
|
|
+
|
|
|
+ Parameters
|
|
|
+ ----------
|
|
|
+ rng : np.random.Generator
|
|
|
+ Random number generator.
|
|
|
+ t : float
|
|
|
+ Life expectancy.
|
|
|
+ a : float
|
|
|
+ Lifetime risk.
|
|
|
+ size : int, optional
|
|
|
+ Number of random variates to generate.
|
|
|
+
|
|
|
+ Returns
|
|
|
+ -------
|
|
|
+ np.ndarray[np.float64]
|
|
|
+ Times when a population transitions from healthy state to preclinical state.
|
|
|
+ """
|
|
|
+ times = rng.uniform(low=0, high=1, size=size) / a
|
|
|
+ times[times < 1] = np.nan
|
|
|
+ return times * t
|
|
|
+
|
|
|
+def sojourn_survival_rvs(rng: Generator, mu: float, t_w: np.ndarray[np.float64]) -> np.ndarray[np.float64]:
|
|
|
+ """
|
|
|
+ Generate random variates of sojourn survival.
|
|
|
+
|
|
|
+ Parameters
|
|
|
+ ----------
|
|
|
+ rng : np.random.Generator
|
|
|
+ Random number generator.
|
|
|
+ mu : float
|
|
|
+ Scale parameter that represents the probability of transition
|
|
|
+ from preclinical state to detectable state.
|
|
|
+ t_w : float
|
|
|
+ Scale parameter that indicates the time when the person transitioned
|
|
|
+ from healthy state to preclinical state.
|
|
|
+
|
|
|
+ Returns
|
|
|
+ -------
|
|
|
+ float
|
|
|
+ Time when a person transitions from preclinical state to cancer state.
|
|
|
+ """
|
|
|
+ return -mu * np.log(1-rng.uniform(low=0, high=1, size=len(t_w)))
|
|
|
+
|
|
|
+# region Likelihood
|
|
|
+
|
|
|
+def likelihood_function(D, I, n, s, r, alpha, beta):
|
|
|
+ """
|
|
|
+ Calculate the likelihood function.
|
|
|
+
|
|
|
+ Parameters
|
|
|
+ ----------
|
|
|
+ D : list
|
|
|
+ List of probabilities of preclinical diagnosis.
|
|
|
+ I : list
|
|
|
+ List of probabilities of clinical incidence.
|
|
|
+ n : list
|
|
|
+ List of total number of cases for each interval.
|
|
|
+ s : list
|
|
|
+ List of number of preclinical diagnoses for each interval.
|
|
|
+ r : list
|
|
|
+ List of number of clinical incidences for each interval.
|
|
|
+ alpha : list
|
|
|
+ List of alpha values for the product term.
|
|
|
+ beta : float
|
|
|
+ The probability of detection.
|
|
|
+
|
|
|
+ Returns
|
|
|
+ -------
|
|
|
+ float
|
|
|
+ The likelihood value.
|
|
|
+ """
|
|
|
+ L = 1.0
|
|
|
+ for i in range(len(D)):
|
|
|
+ term1 = D[i] ** s[i]
|
|
|
+ term2 = I[i] ** r[i]
|
|
|
+ term3 = (1 - D[i] - I[i]) ** (n[i] - s[i] - r[i])
|
|
|
+ product_term = np.prod([(alpha[j] / beta) ** s[i][j] for j in range(3)])
|
|
|
+ L_i = term1 * term2 * term3 * product_term
|
|
|
+ L *= L_i
|
|
|
+ return L
|
|
|
+
|
|
|
+def probability_of_clinical_incidence(beta, w, mu, t, Q):
|
|
|
+ """
|
|
|
+ Calculate the probability of clinical incidence.
|
|
|
+
|
|
|
+ Parameters
|
|
|
+ ----------
|
|
|
+ w : float
|
|
|
+ Weighting factor.
|
|
|
+ mu : float
|
|
|
+ Mean of the distribution.
|
|
|
+ t : list
|
|
|
+ List of time intervals.
|
|
|
+ Q : function
|
|
|
+ Function Q(t) representing the probability distribution.
|
|
|
+ beta : float
|
|
|
+ The probability of detection.
|
|
|
+
|
|
|
+ Returns
|
|
|
+ -------
|
|
|
+ list
|
|
|
+ A list of probabilities I_i for each time interval.
|
|
|
+ """
|
|
|
+ I = []
|
|
|
+ for i in range(1, len(t) + 1):
|
|
|
+ sum_term = sum((1 - beta) ** (i - j - 1) * (Q(t[i - 1] - t[j]) - Q(t[i] - t[j])) for j in range(i))
|
|
|
+ I_i = w * mu * ((t[i] - t[i - 1]) / mu - beta * sum_term)
|
|
|
+ I.append(I_i)
|
|
|
+ return I
|
|
|
+
|
|
|
+def probability_of_preclinical_diagnosis(beta, w, mu, t, Q):
|
|
|
+ """
|
|
|
+ Calculate the probability of preclinical diagnosis.
|
|
|
+
|
|
|
+ Parameters
|
|
|
+ ----------
|
|
|
+ beta : float
|
|
|
+ The probability of detection.
|
|
|
+ w : float
|
|
|
+ Weighting factor.
|
|
|
+ mu : float
|
|
|
+ Mean of the distribution.
|
|
|
+ t : list
|
|
|
+ List of time intervals.
|
|
|
+ Q : function
|
|
|
+ Function Q(t) representing the probability distribution.
|
|
|
+
|
|
|
+ Returns
|
|
|
+ -------
|
|
|
+ list
|
|
|
+ A list of probabilities D_i for each time interval.
|
|
|
+ """
|
|
|
+ D = []
|
|
|
+ for i in range(1, len(t) + 1):
|
|
|
+ if i == 1:
|
|
|
+ D_i = beta * w * mu
|
|
|
+ else:
|
|
|
+ sum_term = sum((1 - beta) ** (i - j - 1) * Q(t[i - 1] - t[j - 1]) for j in range(1, i))
|
|
|
+ D_i = beta * w * mu * (1 - beta * sum_term)
|
|
|
+ D.append(D_i)
|
|
|
+ return D
|
|
|
+
|
|
|
+# endregion
|
|
|
+# region Optimization
|
|
|
+
|
|
|
+def loss_function(params, real_data, population_size):
|
|
|
+ sensitivity, specificity, risk, screening_interval, onset_interval = params
|
|
|
+ simulated_data = analytic_simulation(sensitivity, specificity, risk, screening_interval, onset_interval, population_size)
|
|
|
+ return np.sum((simulated_data - real_data) ** 2)
|
|
|
+
|
|
|
+def fit_to_real_data(real_data, initial_guess, population_size):
|
|
|
+ result = minimize(loss_function, initial_guess, args=(real_data, population_size), method='BFGS')
|
|
|
+ return result.x
|
|
|
+
|
|
|
+# endregion
|
|
|
+# region Deprecated
|
|
|
+
|
|
|
+def analytic_simulation(sensitivity, specificity, risk, screening_interval, onset_interval, population_size, average_life_expectancy) -> np.array:
|
|
|
+ """
|
|
|
+ Analytic simulation of screening programme.
|
|
|
+
|
|
|
+ Parameters
|
|
|
+ ----------
|
|
|
+ sensitivity : float
|
|
|
+ Sensitivity of the screening test.
|
|
|
+ specificity : float
|
|
|
+ Specificity of the screening test.
|
|
|
+ risk : float
|
|
|
+ Risk of cancer in the population.
|
|
|
+ screening_interval : float
|
|
|
+ Interval between screenings in years.
|
|
|
+ onset_interval : float
|
|
|
+ Interval between onset of cancer and detection in years.
|
|
|
+ population_size : int
|
|
|
+ Size of the population.
|
|
|
+
|
|
|
+ Returns
|
|
|
+ -------
|
|
|
+ array
|
|
|
+ A numpy array with the following elements:
|
|
|
+ - The number of people with cancer.
|
|
|
+ - The number of people with cancer detected through screening.
|
|
|
+ - The number of people with cancer that was not detected through screening and was detected through symptoms.
|
|
|
+ - The number of people without cancer.
|
|
|
+ - The number of false positives.
|
|
|
+ """
|
|
|
+ population = stats.bernoulli.rvs(risk, size=population_size)
|
|
|
+
|
|
|
+ detected_probability = stats.geom.cdf(onset_interval * screening_interval, sensitivity)
|
|
|
+ # cancer
|
|
|
+ cancer_quantity = sum(population)
|
|
|
+ screening_population = stats.bernoulli.rvs(p=detected_probability, size=cancer_quantity)
|
|
|
+ screening_detected_q = sum(screening_population)
|
|
|
+ interval_cancer_q = len(screening_population) - screening_detected_q
|
|
|
+
|
|
|
+ # recall
|
|
|
+ recall_q = len(population) - cancer_quantity
|
|
|
+ recall_total = 0
|
|
|
+ recall_total = np.sum(stats.binom.rvs(n=(int)(average_life_expectancy * screening_interval), p=(1-specificity), size=recall_q))
|
|
|
+
|
|
|
+ return np.array([cancer_quantity, screening_detected_q, interval_cancer_q, recall_q, recall_total])
|
|
|
+
|
|
|
+# endregion
|