import numpy as np import pandas as pd from scipy.io import loadmat import statsmodels.api as sm def load_prentice_mat(flags_path, suv_path): flags_mat = loadmat(flags_path) suv_mat = loadmat(suv_path) patients_raw = flags_mat["patients"] patient_ids = [str(patients_raw[i, 0].squeeze()) for i in range(patients_raw.shape[0])] flags = flags_mat["flags"] days = flags_mat["days"] lung = suv_mat["lung_SUVperc_COMBINED"] return { "patient_ids": patient_ids, "flags": flags, "days": days, "lung": lung, } def extract_lung_percentile_max(lung_array, percentile): idx = int(percentile) - 1 x = lung_array[:, :, idx] x = np.where(np.isnan(x), -np.inf, x) x_max = np.max(x, axis=1) x_max[np.isneginf(x_max)] = np.nan return x_max def build_analysis_df_from_mat(data, treatment_col, ae_col, percentiles): df = pd.DataFrame({ "Patient_ID": data["patient_ids"], "Treatment_raw": data["flags"][:, treatment_col], "AE_raw": data["flags"][:, ae_col], }) for p in percentiles: df[f"Lung_SUV_{p}_max"] = extract_lung_percentile_max(data["lung"], p) return df def collapse_treatment_to_two_groups(series, mapping): return series.map(mapping) def add_constant_safe(x): return sm.add_constant(x, has_constant="add") def fit_logit(y, x): return sm.Logit(y, add_constant_safe(x)).fit(disp=0) def fit_ols(y, x): return sm.OLS(y, add_constant_safe(x)).fit() def null_logit_llf(y): return sm.Logit(y, np.ones((len(y), 1))).fit(disp=0).llf def run_prentice_single(df, biomarker_col, treatment_col="Treatment", ae_col="AE", alpha=0.05): dfx = df[[treatment_col, ae_col, biomarker_col]].dropna().copy() Z = dfx[treatment_col].astype(float).values T = dfx[ae_col].astype(float).values S = dfx[biomarker_col].astype(float).values m1 = fit_logit(T, Z) m2 = fit_ols(S, Z) m3 = fit_logit(T, S) m4 = fit_logit(T, pd.DataFrame({"Z": Z, "S": S})) ll_aa = m1.llf ll_ab = m4.llf ll_bb = null_logit_llf(T) pe = (ll_aa - ll_ab) / (ll_aa - ll_bb) if (ll_aa - ll_bb) != 0 else np.nan p_aa = m1.predict(add_constant_safe(Z)) p_ab = m4.predict(add_constant_safe(pd.DataFrame({"Z": Z, "S": S}))) p_bb = np.repeat(np.mean(T), len(T)) aa = np.mean((T - p_aa) ** 2) ab = np.mean((T - p_ab) ** 2) bb = np.mean((T - p_bb) ** 2) pe_risk = (aa - ab) / (aa - bb) if (aa - bb) != 0 else np.nan return { "biomarker": biomarker_col, "C1_p": m1.pvalues[1], "C2_p": m2.pvalues[1], "C3_p": m3.pvalues[1], "C4_Z_p": m4.pvalues[1], "C4_S_p": m4.pvalues[2], "PE": pe, "PE_risk": pe_risk, "C1": m1.pvalues[1] < alpha, "C2": m2.pvalues[1] < alpha, "C3": m3.pvalues[1] < alpha, "C4": (m4.pvalues[2] < alpha) and (m4.pvalues[1] > alpha), } def run_prentice_multiple(df, biomarker_cols, treatment_col="Treatment", ae_col="AE", alpha=0.05): rows = [] for col in biomarker_cols: try: out = run_prentice_single(df, col, treatment_col=treatment_col, ae_col=ae_col, alpha=alpha) except Exception as e: out = {"biomarker": col, "error": str(e)} rows.append(out) return pd.DataFrame(rows)