Переглянути джерело

Finishing prentice criteria analysis

Martin Horvat 1 тиждень тому
батько
коміт
877e854228
2 змінених файлів з 346 додано та 2 видалено
  1. 179 0
      prentice_criteria.ipynb
  2. 167 2
      src/utils.py

Різницю між файлами не показано, бо вона завелика
+ 179 - 0
prentice_criteria.ipynb


+ 167 - 2
src/utils.py

@@ -13,7 +13,7 @@ def read_data(filename, cols_in, cols_out, logscale = False):
     
     return df_data, df_work
 
-def tz_analysis(
+def binary_independence_analysis(
     df: pd.DataFrame,
     t_col: str = "T",
     z_col: str = "Z",
@@ -91,4 +91,169 @@ def tz_analysis(
                 "reject_alpha": bool(p_fisher < alpha),
             },
         },
-    }
+    }
+
+import pandas as pd
+import numpy as np
+import statsmodels.formula.api as smf
+from typing import Literal, Dict, Any, Tuple, Union, Optional
+
+from scipy.stats import (
+    fisher_exact,
+    chi2_contingency, 
+    ttest_ind,
+    mannwhitneyu,
+    ks_2samp,
+    permutation_test
+)
+
+Criterion1Test = Literal["logit", "fisher", "chi2"]
+Criterion2Test = Literal["ols", "ols_robust", "welch", "mannwhitney", "ks", "permutation_mean"]
+
+def prentice_criteria(
+    data: pd.DataFrame,
+    endpoint: str = "T",
+    surrogate: str = "S",
+    treatment: str = "Z",
+    alpha: float = 0.05,
+    criterion1_test: Criterion1Test = "logit",
+    criterion2_test: Criterion2Test = "ols_robust",
+    chi2_yates: bool = False,
+    n_resamples: int = 10000,
+    random_state: int = 123,
+    return_models: bool = False
+) -> Union[pd.DataFrame, Tuple[pd.DataFrame, Dict[str, Any]]]:
+    
+    df = data.copy()
+    df = df[[endpoint, surrogate, treatment]].dropna().copy()
+    
+    f1 = f"{endpoint} ~ {treatment}"
+    f2 = f"{surrogate} ~ {treatment}"
+    f3 = f"{endpoint} ~ {surrogate}"
+    f4 = f"{endpoint} ~ {surrogate} + {treatment}"
+
+    # =========================================================
+    # Criterion 1: Treatment effect on True Endpoint
+    # =========================================================
+    model1 = smf.logit(f1, data=df).fit(disp=0)
+    aux: Dict[str, Any] = {"criterion1_logit": model1}
+
+    if criterion1_test in ["fisher", "chi2"]:
+        table1 = pd.crosstab(df[treatment], df[endpoint])
+        if table1.shape != (2, 2):
+            raise ValueError(f"'{criterion1_test}' requires binary {treatment}/{endpoint}.")
+        
+        if criterion1_test == "fisher":
+            _, p1 = fisher_exact(table1)
+            method1 = "Fisher's exact test"
+        else:  # chi2
+            chi2_stat, p1, dof, expected = chi2_contingency(table1, correction=chi2_yates)
+            method1 = "Pearson's Chi-squared test"
+            aux["criterion1_chi2_stats"] = {"stat": chi2_stat, "dof": dof, "expected": expected}
+            
+        model1_label = f"{treatment} x {endpoint} (2x2 table)"
+        aux["criterion1_contingency_table"] = table1
+
+    elif criterion1_test == "logit":
+        p1 = model1.pvalues[treatment]
+        method1 = "Logistic regression"
+        model1_label = f1
+    else:
+        raise ValueError("Invalid criterion1_test")
+
+    pass1 = bool(p1 < alpha)
+
+    # =========================================================
+    # Criterion 2: Treatment effect on Surrogate
+    # =========================================================
+    s0 = df.loc[df[treatment] == 0, surrogate].dropna()
+    s1 = df.loc[df[treatment] == 1, surrogate].dropna()
+
+    unique_treat = sorted(df[treatment].dropna().unique())
+    if criterion2_test in {"welch", "mannwhitney", "ks", "permutation_mean"}:
+        if len(unique_treat) != 2 or set(unique_treat) != {0, 1}:
+            raise ValueError(f"'{criterion2_test}' requires binary 0/1 treatment.")
+
+    model2 = None
+    effect2 = np.nan
+
+    if criterion2_test == "ols":
+        model2 = smf.ols(f2, data=df).fit()
+        p2, effect2, method2 = model2.pvalues[treatment], model2.params[treatment], "OLS"
+        model2_label = f2
+    elif criterion2_test == "ols_robust":
+        model2 = smf.ols(f2, data=df).fit(cov_type="HC3")
+        p2, effect2, method2 = model2.pvalues[treatment], model2.params[treatment], "OLS (HC3 robust SE)"
+        model2_label = f2
+    elif criterion2_test == "welch":
+        stat2, p2 = ttest_ind(s1, s0, equal_var=False, nan_policy="omit")
+        effect2, method2 = s1.mean() - s0.mean(), "Welch two-sample t-test"
+        model2_label = f"{surrogate} by {treatment} groups"
+        aux["criterion2_statistic"] = stat2
+    elif criterion2_test == "mannwhitney":
+        stat2, p2 = mannwhitneyu(s1, s0, alternative="two-sided")
+        effect2, method2 = s1.median() - s0.median(), "Mann–Whitney U test"
+        model2_label = f"{surrogate} by {treatment} groups"
+        aux["criterion2_statistic"] = stat2
+    elif criterion2_test == "ks":
+        stat2, p2 = ks_2samp(s1, s0, alternative="two-sided")
+        effect2, method2 = stat2, "Kolmogorov–Smirnov test"
+        model2_label = f"{surrogate} by {treatment} groups"
+        aux["criterion2_statistic"] = stat2
+    elif criterion2_test == "permutation_mean":
+        def mean_diff(x, y, axis=0): return np.mean(x, axis=axis) - np.mean(y, axis=axis)
+        perm_res = permutation_test(
+            (s1.to_numpy(), s0.to_numpy()), statistic=mean_diff,
+            permutation_type="independent", alternative="two-sided",
+            n_resamples=n_resamples, random_state=random_state
+        )
+        p2, effect2, method2 = perm_res.pvalue, s1.mean() - s0.mean(), "Permutation test"
+        model2_label = f"{surrogate} by {treatment} groups"
+        aux["criterion2_permutation_result"] = perm_res
+
+    if model2 is not None:
+        aux["criterion2_model"] = model2
+    pass2 = bool(p2 < alpha)
+
+    # =========================================================
+    # Criterion 3 & 4
+    # =========================================================
+    model3 = smf.logit(f3, data=df).fit(disp=0)
+    p3 = model3.pvalues[surrogate]
+    pass3 = bool(p3 < alpha)
+    aux["criterion3"] = model3
+
+    model4 = smf.logit(f4, data=df).fit(disp=0)
+    p4_z, p4_s = model4.pvalues[treatment], model4.pvalues[surrogate]
+    pass4 = bool((p4_z > alpha) and (p4_s < alpha))
+    aux["criterion4"] = model4
+
+    # PE calculation
+    beta_unadj = model1.params[treatment]
+    beta_adj = model4.params[treatment]
+    pe = (beta_unadj - beta_adj) / beta_unadj if beta_unadj != 0 else np.nan
+
+    # --- Construct Results ---
+    results_list = [
+        {"Criterion": "1. Treatment -> True", "Method": method1, "Model": model1_label, "Estimate": model1.params[treatment] if treatment in model1.params else np.nan, "P-value": p1, "Pass": pass1},
+        {"Criterion": "2. Treatment -> Surrogate", "Method": method2, "Model": model2_label, "Estimate": effect2, "P-value": p2, "Pass": pass2},
+        {"Criterion": "3. Surrogate -> True", "Method": "Logistic", "Model": f3, "Estimate": model3.params[surrogate], "P-value": p3, "Pass": pass3},
+        {"Criterion": "4. Full Mediation", "Method": "Logistic", "Model": f4, "Estimate": model4.params[treatment], "P-value": p4_z, "Pass": pass4},
+        
+        # --- Proportion Explained (still uses logistic coefficients) ---
+        {
+        "Criterion": "Proportion Explained (PE)",
+        "Method": "Logistic coefficients",
+        "Model": f"Compare {treatment} in ({f1}) vs ({f4}).",
+        "Estimate": pe,
+        "P-value": pd.NA,
+        "Pass": (0.5 <= pe <= 0.75)
+        }
+    ]
+    
+    results_df = pd.DataFrame(results_list)
+    results_df["Pass"] = results_df["Pass"].astype("boolean")
+
+    if return_models:
+        return results_df, aux
+    return results_df

Деякі файли не було показано, через те що забагато файлів було змінено