# pyright: basic from __future__ import annotations from pathlib import Path from typing import Any import matplotlib.pyplot as plt import numpy as np import pandas as pd from .data_access import BackendEvaluation, physician_column from .metrics import calibration_stats, performance_at_threshold, threshold_sweep from .runtime import write_json def _save_table(rows: list[dict[str, Any]], out_path: Path) -> pd.DataFrame: df = pd.DataFrame(rows) out_path.parent.mkdir(parents=True, exist_ok=True) df.to_csv(out_path, index=False) return df def run_performance( evaluation: BackendEvaluation, output_dir: Path, thresholds: np.ndarray, ) -> dict[str, Any]: rows = threshold_sweep(evaluation.y_true, evaluation.y_prob, thresholds) table_path = output_dir / "performance_threshold_sweep.csv" df = _save_table(rows, table_path) fig, ax = plt.subplots(figsize=(10, 5)) ax.plot(df["threshold"], df["accuracy"], label="accuracy", marker="o") ax.plot(df["threshold"], df["f1"], label="f1", marker="s") ax.set_xlabel("Threshold") ax.set_ylabel("Score") ax.set_title(f"Performance vs Threshold ({evaluation.backend})") ax.grid(True, alpha=0.3) ax.legend() fig.tight_layout() plot_path = output_dir / "performance_threshold_sweep.png" fig.savefig(plot_path) plt.close(fig) best_idx = int(df["f1"].idxmax()) best = df.iloc[best_idx].to_dict() cutoff_percentiles = np.array( [100, 95, 90, 85, 80, 75, 70, 60, 50, 40, 30, 20, 10, 5, 2, 1], dtype=float, ) confidence_uncertainty = 1.0 - np.asarray( evaluation.uncertainty_confidence, dtype=float ) secondary_uncertainty = np.asarray(evaluation.uncertainty_std, dtype=float) uncertainty_types = [ ("confidence_uncertainty", confidence_uncertainty), (evaluation.uncertainty_metric, secondary_uncertainty), ] cutoff_rows: list[dict[str, Any]] = [] for uncertainty_name, values in uncertainty_types: finite_mask = np.isfinite(values) if not finite_mask.any(): continue values_valid = values[finite_mask] y_true_valid = evaluation.y_true[finite_mask] y_prob_valid = evaluation.y_prob[finite_mask] for cutoff_percentile in cutoff_percentiles: # Keep predictions whose uncertainty is <= percentile cutoff. cutoff_value = float(np.percentile(values_valid, cutoff_percentile)) keep_mask = values_valid <= cutoff_value retained = int(keep_mask.sum()) if retained == 0: continue perf = performance_at_threshold( y_true=y_true_valid[keep_mask], y_prob=y_prob_valid[keep_mask], threshold=0.5, ) cutoff_rows.append( { "uncertainty_type": uncertainty_name, "cutoff_percentile": float(cutoff_percentile), "cutoff_value": cutoff_value, "n_retained": retained, "coverage": float(retained / len(values_valid)), "accuracy": float(perf["accuracy"]), "f1": float(perf["f1"]), } ) cutoff_table_path = output_dir / "performance_uncertainty_cutoff.csv" cutoff_plot_path = output_dir / "performance_uncertainty_cutoff.png" if cutoff_rows: cutoff_df = pd.DataFrame(cutoff_rows) cutoff_df.to_csv(cutoff_table_path, index=False) fig_u, axes_u = plt.subplots(1, 2, figsize=(14, 5), sharex=True) for uncertainty_name, group in cutoff_df.groupby("uncertainty_type"): g = group.sort_values("cutoff_percentile", ascending=False) axes_u[0].plot( g["cutoff_percentile"], g["accuracy"], marker="o", label=uncertainty_name, ) axes_u[1].plot( g["cutoff_percentile"], g["f1"], marker="s", label=uncertainty_name, ) axes_u[0].set_title("Accuracy vs Uncertainty Cutoff Percentile") axes_u[1].set_title("F1 vs Uncertainty Cutoff Percentile") for ax in axes_u: ax.set_xlabel("Uncertainty Cutoff Percentile (100 = no cutoff)") ax.grid(True, alpha=0.3) ax.legend() axes_u[0].set_ylabel("Accuracy") axes_u[1].set_ylabel("F1") fig_u.tight_layout() fig_u.savefig(cutoff_plot_path) plt.close(fig_u) summary = { "best_by_f1": { k: float(v) for k, v in best.items() if isinstance(v, (int, float)) }, "table": str(table_path), "plot": str(plot_path), "uncertainty_cutoff": { "table": str(cutoff_table_path), "plot": str(cutoff_plot_path), "decision_threshold": 0.5, }, } write_json(output_dir / "performance_summary.json", summary) return summary def run_calibration( evaluation: BackendEvaluation, output_dir: Path, bins: int, ) -> dict[str, Any]: summary, per_bin = calibration_stats( evaluation.y_true, evaluation.y_prob, bins=bins ) bin_df = pd.DataFrame( per_bin, columns=["mean_confidence", "fraction_positive", "count"], ) table_path = output_dir / "calibration_bins.csv" bin_df.to_csv(table_path, index=False) fig, ax = plt.subplots(figsize=(6, 6)) valid = ~np.isnan(per_bin[:, 1]) ax.plot([0, 1], [0, 1], linestyle="--", color="gray", label="ideal") ax.plot( per_bin[valid, 0], per_bin[valid, 1], marker="o", label=f"{evaluation.backend}", ) ax.set_xlabel("Mean Predicted Probability") ax.set_ylabel("Empirical Fraction Positive") ax.set_title(f"Reliability Diagram ({evaluation.backend})") ax.legend() ax.grid(True, alpha=0.3) fig.tight_layout() plot_path = output_dir / "calibration_reliability.png" fig.savefig(plot_path) plt.close(fig) out = { **summary, "table": str(table_path), "plot": str(plot_path), } write_json(output_dir / "calibration_summary.json", out) return out def run_physician( evaluation: BackendEvaluation, clinical_df: pd.DataFrame, output_dir: Path, ) -> dict[str, Any]: secondary_key = ( "predictive_entropy" if evaluation.uncertainty_metric == "predictive_entropy" else "std" ) secondary_label = ( "Model Predictive Entropy" if secondary_key == "predictive_entropy" else "Model Uncertainty Std" ) col = physician_column(clinical_df) subset = clinical_df[["Image Data ID", col]].copy() subset[col] = pd.to_numeric(subset[col], errors="coerce") subset = subset.dropna(subset=["Image Data ID", col]) subset["Image Data ID"] = subset["Image Data ID"].astype(int) subset[col] = subset[col].astype(int) eval_df = pd.DataFrame( { "Image Data ID": evaluation.image_ids.astype(int), "model_confidence": evaluation.uncertainty_confidence, "model_std": evaluation.uncertainty_std, "model_prob": evaluation.y_prob, } ) merged = eval_df.merge(subset, on="Image Data ID", how="inner") if merged.empty: raise ValueError("No overlapping Image Data ID rows for physician analysis") grouped_rows: list[dict[str, Any]] = [] uncertainty_specs = [ ("confidence", "model_confidence", "Model Confidence (2*|p-0.5|)"), (secondary_key, "model_std", secondary_label), ] ratings = [int(r) for r in sorted(pd.unique(merged[col]))] plot_paths: dict[str, str] = {} correlations: dict[str, float] = {} for metric_name, metric_col, metric_label in uncertainty_specs: grouped_metric = ( merged.groupby(col) .agg( n=("Image Data ID", "count"), mean_value=(metric_col, "mean"), std_value=(metric_col, "std"), mean_prob=("model_prob", "mean"), ) .reset_index() .rename(columns={col: "physician_rating"}) ) grouped_metric["uncertainty_type"] = metric_name grouped_rows.extend( [ {str(k): v for k, v in rec.items()} for rec in grouped_metric.to_dict(orient="records") ] ) fig, ax = plt.subplots(figsize=(9, 5)) data = [ np.asarray(merged.loc[merged[col] == r, metric_col], dtype=float) for r in ratings ] ax.boxplot(data, tick_labels=[str(r) for r in ratings]) ax.set_xlabel("Physician Confidence Rating (DXCONFID)") ax.set_ylabel(metric_label) ax.set_title(f"{metric_label} vs Physician Confidence ({evaluation.backend})") ax.grid(True, axis="y", alpha=0.3) fig.tight_layout() plot_path = output_dir / f"physician_{metric_name}_boxplot.png" fig.savefig(plot_path) plt.close(fig) corr = float( pd.to_numeric( merged[[metric_col, col]].corr(method="spearman").iloc[0, 1], errors="coerce", ) ) correlations[metric_name] = corr plot_paths[metric_name] = str(plot_path) grouped = pd.DataFrame(grouped_rows) table_path = output_dir / "physician_grouped_metrics.csv" grouped.to_csv(table_path, index=False) confidence_table = output_dir / "physician_confidence_grouped_metrics.csv" std_table = output_dir / "physician_std_grouped_metrics.csv" secondary_table = output_dir / f"physician_{secondary_key}_grouped_metrics.csv" grouped[grouped["uncertainty_type"] == "confidence"].to_csv( confidence_table, index=False ) grouped[grouped["uncertainty_type"] == secondary_key].to_csv( secondary_table, index=False ) grouped[grouped["uncertainty_type"] == secondary_key].to_csv(std_table, index=False) out = { "n_overlap": int(len(merged)), "spearman_vs_dxconfid": correlations, "table": str(table_path), "tables": { "confidence": str(confidence_table), secondary_key: str(secondary_table), "std": str(std_table), }, "plots": plot_paths, } write_json(output_dir / "physician_summary.json", out) return out def _normalize_dx(value: Any) -> str: if value is None or (isinstance(value, float) and np.isnan(value)): return "" v = str(value).strip().upper() if v in {"NL", "NORMAL"}: return "CN" return v def run_longitudinal( evaluation: BackendEvaluation, clinical_df: pd.DataFrame, output_dir: Path, ) -> dict[str, Any]: secondary_key = ( "predictive_entropy" if evaluation.uncertainty_metric == "predictive_entropy" else "std" ) secondary_label = ( "Mean Model Predictive Entropy" if secondary_key == "predictive_entropy" else "Mean Model Uncertainty Std" ) required = ["Image Data ID", "PTID"] missing = [c for c in required if c not in clinical_df.columns] if missing: raise KeyError(f"Missing columns for longitudinal analysis: {missing}") diagnosis_col = None for candidate in ["Class", "DX", "Diagnosis"]: if candidate in clinical_df.columns: diagnosis_col = candidate break if diagnosis_col is None: raise KeyError( "No diagnosis column found. Expected one of: Class, DX, Diagnosis" ) work = clinical_df[ ["Image Data ID", "PTID", diagnosis_col] + [c for c in ["EXAMDATE"] if c in clinical_df.columns] ].copy() work["Image Data ID"] = pd.to_numeric(work["Image Data ID"], errors="coerce") work = work.dropna(subset=["Image Data ID", "PTID"]) work["Image Data ID"] = work["Image Data ID"].astype(int) work["PTID"] = work["PTID"].astype(str).str.strip() work["diagnosis"] = work[diagnosis_col].map(_normalize_dx) if "EXAMDATE" in work.columns: work["EXAMDATE"] = pd.to_datetime(work["EXAMDATE"], errors="coerce") work = work.sort_values(["PTID", "EXAMDATE"], na_position="last") else: work = work.sort_values(["PTID", "Image Data ID"]) eval_df = pd.DataFrame( { "Image Data ID": evaluation.image_ids.astype(int), "model_confidence": evaluation.uncertainty_confidence, "model_std": evaluation.uncertainty_std, "model_prob": evaluation.y_prob, } ) merged = work.merge(eval_df, on="Image Data ID", how="inner") if merged.empty: raise ValueError("No overlapping Image Data ID rows for longitudinal analysis") patient_rows: list[dict[str, Any]] = [] for ptid, group in merged.groupby("PTID"): diagnoses = [d for d in group["diagnosis"].tolist() if d] if len(diagnoses) < 2: continue first_dx = diagnoses[0] last_dx = diagnoses[-1] unique_dx = set(diagnoses) cohort = "other" if unique_dx.issubset({"CN"}): cohort = "stable_cn" elif unique_dx.issubset({"AD"}): cohort = "stable_ad" elif first_dx == "CN" and "AD" in unique_dx and last_dx == "AD": cohort = "cn_to_ad" patient_rows.append( { "PTID": ptid, "n_visits": int(len(group)), "first_dx": first_dx, "last_dx": last_dx, "cohort": cohort, "mean_confidence": float(group["model_confidence"].mean()), "mean_std": float(group["model_std"].mean()), "mean_prob": float(group["model_prob"].mean()), } ) patient_df = pd.DataFrame(patient_rows) table_path = output_dir / "longitudinal_patient_summary.csv" patient_df.to_csv(table_path, index=False) cohort_df = ( patient_df.groupby("cohort") .agg( n_patients=("PTID", "count"), mean_confidence=("mean_confidence", "mean"), mean_std=("mean_std", "mean"), mean_prob=("mean_prob", "mean"), ) .reset_index() ) cohort_table = output_dir / "longitudinal_cohort_summary.csv" cohort_df.to_csv(cohort_table, index=False) cohorts = ["stable_cn", "stable_ad", "cn_to_ad"] uncertainty_specs = [ ("confidence", "mean_confidence", "Mean Model Confidence"), (secondary_key, "mean_std", secondary_label), ] plot_paths: dict[str, str] = {} for metric_name, metric_col, metric_label in uncertainty_specs: fig, ax = plt.subplots(figsize=(9, 5)) values = [ np.asarray( patient_df.loc[patient_df["cohort"] == c, metric_col], dtype=float ) for c in cohorts ] ax.boxplot(values, tick_labels=cohorts) ax.set_ylabel(metric_label) ax.set_title(f"Longitudinal Cohort {metric_label} ({evaluation.backend})") ax.grid(True, axis="y", alpha=0.3) fig.tight_layout() plot_path = output_dir / f"longitudinal_cohort_{metric_name}.png" fig.savefig(plot_path) plt.close(fig) plot_paths[metric_name] = str(plot_path) uncertainty_by_cohort = cohort_df.melt( id_vars=["cohort", "n_patients"], value_vars=["mean_confidence", "mean_std"], var_name="uncertainty_type", value_name="mean_value", ).replace( { "uncertainty_type": { "mean_confidence": "confidence", "mean_std": secondary_key, } } ) uncertainty_table = output_dir / "longitudinal_uncertainty_by_cohort.csv" uncertainty_by_cohort.to_csv(uncertainty_table, index=False) confidence_patient_table = ( output_dir / "longitudinal_confidence_patient_summary.csv" ) std_patient_table = output_dir / "longitudinal_std_patient_summary.csv" confidence_cohort_table = output_dir / "longitudinal_confidence_cohort_summary.csv" std_cohort_table = output_dir / "longitudinal_std_cohort_summary.csv" secondary_patient_table = ( output_dir / f"longitudinal_{secondary_key}_patient_summary.csv" ) secondary_cohort_table = ( output_dir / f"longitudinal_{secondary_key}_cohort_summary.csv" ) patient_df[ ["PTID", "n_visits", "first_dx", "last_dx", "cohort", "mean_confidence"] ].to_csv(confidence_patient_table, index=False) patient_df[ ["PTID", "n_visits", "first_dx", "last_dx", "cohort", "mean_std"] ].to_csv(std_patient_table, index=False) patient_df[ ["PTID", "n_visits", "first_dx", "last_dx", "cohort", "mean_std"] ].to_csv(secondary_patient_table, index=False) cohort_df[["cohort", "n_patients", "mean_confidence"]].to_csv( confidence_cohort_table, index=False ) cohort_df[["cohort", "n_patients", "mean_std"]].to_csv( std_cohort_table, index=False ) cohort_df[["cohort", "n_patients", "mean_std"]].to_csv( secondary_cohort_table, index=False ) out = { "n_patients_analyzed": int(len(patient_df)), "table_patient": str(table_path), "table_cohort": str(cohort_table), "table_uncertainty": str(uncertainty_table), "tables": { "confidence": { "patient": str(confidence_patient_table), "cohort": str(confidence_cohort_table), }, secondary_key: { "patient": str(secondary_patient_table), "cohort": str(secondary_cohort_table), }, "std": { "patient": str(std_patient_table), "cohort": str(std_cohort_table), }, }, "plots": plot_paths, } write_json(output_dir / "longitudinal_summary.json", out) return out