| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118 |
- import numpy as np
- import pandas as pd
- from scipy.io import loadmat
- import statsmodels.api as sm
- def load_prentice_mat(flags_path, suv_path):
- flags_mat = loadmat(flags_path)
- suv_mat = loadmat(suv_path)
- patients_raw = flags_mat["patients"]
- patient_ids = [str(patients_raw[i, 0].squeeze()) for i in range(patients_raw.shape[0])]
- flags = flags_mat["flags"]
- days = flags_mat["days"]
- lung = suv_mat["lung_SUVperc_COMBINED"]
- return {
- "patient_ids": patient_ids,
- "flags": flags,
- "days": days,
- "lung": lung,
- }
- def extract_lung_percentile_max(lung_array, percentile):
- idx = int(percentile) - 1
- x = lung_array[:, :, idx]
- x = np.where(np.isnan(x), -np.inf, x)
- x_max = np.max(x, axis=1)
- x_max[np.isneginf(x_max)] = np.nan
- return x_max
- def build_analysis_df_from_mat(data, treatment_col, ae_col, percentiles):
- df = pd.DataFrame({
- "Patient_ID": data["patient_ids"],
- "Treatment_raw": data["flags"][:, treatment_col],
- "AE_raw": data["flags"][:, ae_col],
- })
- for p in percentiles:
- df[f"Lung_SUV_{p}_max"] = extract_lung_percentile_max(data["lung"], p)
- return df
- def collapse_treatment_to_two_groups(series, mapping):
- return series.map(mapping)
- def add_constant_safe(x):
- return sm.add_constant(x, has_constant="add")
- def fit_logit(y, x):
- return sm.Logit(y, add_constant_safe(x)).fit(disp=0)
- def fit_ols(y, x):
- return sm.OLS(y, add_constant_safe(x)).fit()
- def null_logit_llf(y):
- return sm.Logit(y, np.ones((len(y), 1))).fit(disp=0).llf
- def run_prentice_single(df, biomarker_col, treatment_col="Treatment", ae_col="AE", alpha=0.05):
- dfx = df[[treatment_col, ae_col, biomarker_col]].dropna().copy()
- Z = dfx[treatment_col].astype(float).values
- T = dfx[ae_col].astype(float).values
- S = dfx[biomarker_col].astype(float).values
- m1 = fit_logit(T, Z)
- m2 = fit_ols(S, Z)
- m3 = fit_logit(T, S)
- m4 = fit_logit(T, pd.DataFrame({"Z": Z, "S": S}))
- ll_aa = m1.llf
- ll_ab = m4.llf
- ll_bb = null_logit_llf(T)
- pe = (ll_aa - ll_ab) / (ll_aa - ll_bb) if (ll_aa - ll_bb) != 0 else np.nan
- p_aa = m1.predict(add_constant_safe(Z))
- p_ab = m4.predict(add_constant_safe(pd.DataFrame({"Z": Z, "S": S})))
- p_bb = np.repeat(np.mean(T), len(T))
- aa = np.mean((T - p_aa) ** 2)
- ab = np.mean((T - p_ab) ** 2)
- bb = np.mean((T - p_bb) ** 2)
- pe_risk = (aa - ab) / (aa - bb) if (aa - bb) != 0 else np.nan
- return {
- "biomarker": biomarker_col,
- "C1_p": m1.pvalues[1],
- "C2_p": m2.pvalues[1],
- "C3_p": m3.pvalues[1],
- "C4_Z_p": m4.pvalues[1],
- "C4_S_p": m4.pvalues[2],
- "PE": pe,
- "PE_risk": pe_risk,
- "C1": m1.pvalues[1] < alpha,
- "C2": m2.pvalues[1] < alpha,
- "C3": m3.pvalues[1] < alpha,
- "C4": (m4.pvalues[2] < alpha) and (m4.pvalues[1] > alpha),
- }
- def run_prentice_multiple(df, biomarker_cols, treatment_col="Treatment", ae_col="AE", alpha=0.05):
- rows = []
- for col in biomarker_cols:
- try:
- out = run_prentice_single(df, col, treatment_col=treatment_col, ae_col=ae_col, alpha=alpha)
- except Exception as e:
- out = {"biomarker": col, "error": str(e)}
- rows.append(out)
- return pd.DataFrame(rows)
|