123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307 |
- import numpy as np
- from numpy.random import Generator
- from numpy.typing import NDArray
- from scipy import stats
- from scipy.optimize import minimize
- # region 3 Key Parameters
- def sensitivity_function(t, b0, b1, t_bar):
- """
- Calculate the sensitivity function β(t).
- Parameters
- ----------
- t : float
- Time variable.
- b0 : float
- Coefficient b0.
- b1 : float
- Coefficient b1.
- t_bar : float
- Average age at entry in the study group.
- average_life_expectancy : float
- Average life expectancy in years.
- Returns
- -------
- float
- Sensitivity at time t.
- """
- return 1 / (1 + np.exp(-b0 - b1 * (t - t_bar)))
- def transition_probability_function(t, w_max, mu, sigma):
- """
- Calculate the transition probability function w(t).
- Parameters
- ----------
- t : float
- Time variable, must be greater than 0.
- w_max : float
- Maximum weight.
- mu : float
- Mean of the log-normal distribution.
- sigma : float
- Standard deviation of the log-normal distribution.
- Returns
- -------
- float
- Transition probability at time t.
- """
- if t <= 0:
- raise ValueError("t must be greater than 0")
- return w_max * (1 / (np.sqrt(2 * np.pi) * sigma * t)) * np.exp(-((np.log(t) - mu) ** 2) / (2 * sigma ** 2))
- def sojourn_time_function(x, kappa, rho):
- """
- Calculate the sojourn time function q(x).
- Parameters
- ----------
- x : float
- Time variable.
- kappa : float
- Shape parameter.
- rho : float
- Scale parameter.
- Returns
- -------
- float
- Sojourn time at x.
- """
- return (kappa * x ** (kappa - 1) * rho ** kappa) / ((1 + (x * rho) ** kappa) ** 2)
- # endregion
- # region Random Variates
- def transition_survival_rvs(rng: Generator, w: float, size: int = 1) -> np.ndarray[np.float64]:
- """
- Generate random variates of transition survival.
- Parameters
- ----------
- rng : np.random.Generator
- Random number generator.
- w : float
- Scale parameter that represents the probability of transition
- from healthy state to preclinical state.
- size : int, optional
- Number of random variates to generate.
- Returns
- -------
- np.ndarray[np.float64]
- Times when a population transitions from healthy state to preclinical state.
- """
- return rng.uniform(low=0, high=1, size=size) / w
- def transition_survival_rvs_alt(rng: Generator, t: float, a: float, size: int = 1) -> np.ndarray[np.float64]:
- """
- Generate random variates of transition survival.
- Alternative implementation of transition_survival_rvs that returns np.nan for times that are greater than T.
- Parameters
- ----------
- rng : np.random.Generator
- Random number generator.
- t : float
- Life expectancy.
- a : float
- Lifetime risk.
- size : int, optional
- Number of random variates to generate.
- Returns
- -------
- np.ndarray[np.float64]
- Times when a population transitions from healthy state to preclinical state.
- """
- times = rng.uniform(low=0, high=1, size=size) / a
- times[times < 1] = np.nan
- return times * t
- def sojourn_survival_rvs(rng: Generator, mu: float, t_w: np.ndarray[np.float64]) -> np.ndarray[np.float64]:
- """
- Generate random variates of sojourn survival.
- Parameters
- ----------
- rng : np.random.Generator
- Random number generator.
- mu : float
- Scale parameter that represents the probability of transition
- from preclinical state to detectable state.
- t_w : float
- Scale parameter that indicates the time when the person transitioned
- from healthy state to preclinical state.
- Returns
- -------
- float
- Time when a person transitions from preclinical state to cancer state.
- """
- return -mu * np.log(1-rng.uniform(low=0, high=1, size=len(t_w)))
- # region Likelihood
- def likelihood_function(D, I, n, s, r, alpha, beta):
- """
- Calculate the likelihood function.
- Parameters
- ----------
- D : list
- List of probabilities of preclinical diagnosis.
- I : list
- List of probabilities of clinical incidence.
- n : list
- List of total number of cases for each interval.
- s : list
- List of number of preclinical diagnoses for each interval.
- r : list
- List of number of clinical incidences for each interval.
- alpha : list
- List of alpha values for the product term.
- beta : float
- The probability of detection.
- Returns
- -------
- float
- The likelihood value.
- """
- L = 1.0
- for i in range(len(D)):
- term1 = D[i] ** s[i]
- term2 = I[i] ** r[i]
- term3 = (1 - D[i] - I[i]) ** (n[i] - s[i] - r[i])
- product_term = np.prod([(alpha[j] / beta) ** s[i][j] for j in range(3)])
- L_i = term1 * term2 * term3 * product_term
- L *= L_i
- return L
- def probability_of_clinical_incidence(beta, w, mu, t, Q):
- """
- Calculate the probability of clinical incidence.
- Parameters
- ----------
- w : float
- Weighting factor.
- mu : float
- Mean of the distribution.
- t : list
- List of time intervals.
- Q : function
- Function Q(t) representing the probability distribution.
- beta : float
- The probability of detection.
- Returns
- -------
- list
- A list of probabilities I_i for each time interval.
- """
- I = []
- for i in range(1, len(t) + 1):
- sum_term = sum((1 - beta) ** (i - j - 1) * (Q(t[i - 1] - t[j]) - Q(t[i] - t[j])) for j in range(i))
- I_i = w * mu * ((t[i] - t[i - 1]) / mu - beta * sum_term)
- I.append(I_i)
- return I
- def probability_of_preclinical_diagnosis(beta, w, mu, t, Q):
- """
- Calculate the probability of preclinical diagnosis.
- Parameters
- ----------
- beta : float
- The probability of detection.
- w : float
- Weighting factor.
- mu : float
- Mean of the distribution.
- t : list
- List of time intervals.
- Q : function
- Function Q(t) representing the probability distribution.
- Returns
- -------
- list
- A list of probabilities D_i for each time interval.
- """
- D = []
- for i in range(1, len(t) + 1):
- if i == 1:
- D_i = beta * w * mu
- else:
- sum_term = sum((1 - beta) ** (i - j - 1) * Q(t[i - 1] - t[j - 1]) for j in range(1, i))
- D_i = beta * w * mu * (1 - beta * sum_term)
- D.append(D_i)
- return D
- # endregion
- # region Optimization
- def loss_function(params, real_data, population_size):
- sensitivity, specificity, risk, screening_interval, onset_interval = params
- simulated_data = analytic_simulation(sensitivity, specificity, risk, screening_interval, onset_interval, population_size)
- return np.sum((simulated_data - real_data) ** 2)
- def fit_to_real_data(real_data, initial_guess, population_size):
- result = minimize(loss_function, initial_guess, args=(real_data, population_size), method='BFGS')
- return result.x
- # endregion
- # region Deprecated
- def analytic_simulation(sensitivity, specificity, risk, screening_interval, onset_interval, population_size, average_life_expectancy) -> np.array:
- """
- Analytic simulation of screening programme.
- Parameters
- ----------
- sensitivity : float
- Sensitivity of the screening test.
- specificity : float
- Specificity of the screening test.
- risk : float
- Risk of cancer in the population.
- screening_interval : float
- Interval between screenings in years.
- onset_interval : float
- Interval between onset of cancer and detection in years.
- population_size : int
- Size of the population.
- Returns
- -------
- array
- A numpy array with the following elements:
- - The number of people with cancer.
- - The number of people with cancer detected through screening.
- - The number of people with cancer that was not detected through screening and was detected through symptoms.
- - The number of people without cancer.
- - The number of false positives.
- """
- population = stats.bernoulli.rvs(risk, size=population_size)
- detected_probability = stats.geom.cdf(onset_interval * screening_interval, sensitivity)
- # cancer
- cancer_quantity = sum(population)
- screening_population = stats.bernoulli.rvs(p=detected_probability, size=cancer_quantity)
- screening_detected_q = sum(screening_population)
- interval_cancer_q = len(screening_population) - screening_detected_q
- # recall
- recall_q = len(population) - cancer_quantity
- recall_total = 0
- recall_total = np.sum(stats.binom.rvs(n=(int)(average_life_expectancy * screening_interval), p=(1-specificity), size=recall_q))
-
- return np.array([cancer_quantity, screening_detected_q, interval_cancer_q, recall_q, recall_total])
- # endregion
|