| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521 |
- # pyright: basic
- from __future__ import annotations
- from pathlib import Path
- from typing import Any
- import matplotlib.pyplot as plt
- import numpy as np
- import pandas as pd
- from .data_access import BackendEvaluation, physician_column
- from .metrics import calibration_stats, performance_at_threshold, threshold_sweep
- from .runtime import write_json
- def _save_table(rows: list[dict[str, Any]], out_path: Path) -> pd.DataFrame:
- df = pd.DataFrame(rows)
- out_path.parent.mkdir(parents=True, exist_ok=True)
- df.to_csv(out_path, index=False)
- return df
- def run_performance(
- evaluation: BackendEvaluation,
- output_dir: Path,
- thresholds: np.ndarray,
- ) -> dict[str, Any]:
- rows = threshold_sweep(evaluation.y_true, evaluation.y_prob, thresholds)
- table_path = output_dir / "performance_threshold_sweep.csv"
- df = _save_table(rows, table_path)
- fig, ax = plt.subplots(figsize=(10, 5))
- ax.plot(df["threshold"], df["accuracy"], label="accuracy", marker="o")
- ax.plot(df["threshold"], df["f1"], label="f1", marker="s")
- ax.set_xlabel("Threshold")
- ax.set_ylabel("Score")
- ax.set_title(f"Performance vs Threshold ({evaluation.backend})")
- ax.grid(True, alpha=0.3)
- ax.legend()
- fig.tight_layout()
- plot_path = output_dir / "performance_threshold_sweep.png"
- fig.savefig(plot_path)
- plt.close(fig)
- best_idx = int(df["f1"].idxmax())
- best = df.iloc[best_idx].to_dict()
- cutoff_percentiles = np.array(
- [100, 95, 90, 85, 80, 75, 70, 60, 50, 40, 30, 20, 10, 5, 2, 1],
- dtype=float,
- )
- confidence_uncertainty = 1.0 - np.asarray(
- evaluation.uncertainty_confidence, dtype=float
- )
- secondary_uncertainty = np.asarray(evaluation.uncertainty_std, dtype=float)
- uncertainty_types = [
- ("confidence_uncertainty", confidence_uncertainty),
- (evaluation.uncertainty_metric, secondary_uncertainty),
- ]
- cutoff_rows: list[dict[str, Any]] = []
- for uncertainty_name, values in uncertainty_types:
- finite_mask = np.isfinite(values)
- if not finite_mask.any():
- continue
- values_valid = values[finite_mask]
- y_true_valid = evaluation.y_true[finite_mask]
- y_prob_valid = evaluation.y_prob[finite_mask]
- for cutoff_percentile in cutoff_percentiles:
- # Keep predictions whose uncertainty is <= percentile cutoff.
- cutoff_value = float(np.percentile(values_valid, cutoff_percentile))
- keep_mask = values_valid <= cutoff_value
- retained = int(keep_mask.sum())
- if retained == 0:
- continue
- perf = performance_at_threshold(
- y_true=y_true_valid[keep_mask],
- y_prob=y_prob_valid[keep_mask],
- threshold=0.5,
- )
- cutoff_rows.append(
- {
- "uncertainty_type": uncertainty_name,
- "cutoff_percentile": float(cutoff_percentile),
- "cutoff_value": cutoff_value,
- "n_retained": retained,
- "coverage": float(retained / len(values_valid)),
- "accuracy": float(perf["accuracy"]),
- "f1": float(perf["f1"]),
- }
- )
- cutoff_table_path = output_dir / "performance_uncertainty_cutoff.csv"
- cutoff_plot_path = output_dir / "performance_uncertainty_cutoff.png"
- if cutoff_rows:
- cutoff_df = pd.DataFrame(cutoff_rows)
- cutoff_df.to_csv(cutoff_table_path, index=False)
- fig_u, axes_u = plt.subplots(1, 2, figsize=(14, 5), sharex=True)
- for uncertainty_name, group in cutoff_df.groupby("uncertainty_type"):
- g = group.sort_values("cutoff_percentile", ascending=False)
- axes_u[0].plot(
- g["cutoff_percentile"],
- g["accuracy"],
- marker="o",
- label=uncertainty_name,
- )
- axes_u[1].plot(
- g["cutoff_percentile"],
- g["f1"],
- marker="s",
- label=uncertainty_name,
- )
- axes_u[0].set_title("Accuracy vs Uncertainty Cutoff Percentile")
- axes_u[1].set_title("F1 vs Uncertainty Cutoff Percentile")
- for ax in axes_u:
- ax.set_xlabel("Uncertainty Cutoff Percentile (100 = no cutoff)")
- ax.grid(True, alpha=0.3)
- ax.legend()
- axes_u[0].set_ylabel("Accuracy")
- axes_u[1].set_ylabel("F1")
- fig_u.tight_layout()
- fig_u.savefig(cutoff_plot_path)
- plt.close(fig_u)
- summary = {
- "best_by_f1": {
- k: float(v) for k, v in best.items() if isinstance(v, (int, float))
- },
- "table": str(table_path),
- "plot": str(plot_path),
- "uncertainty_cutoff": {
- "table": str(cutoff_table_path),
- "plot": str(cutoff_plot_path),
- "decision_threshold": 0.5,
- },
- }
- write_json(output_dir / "performance_summary.json", summary)
- return summary
- def run_calibration(
- evaluation: BackendEvaluation,
- output_dir: Path,
- bins: int,
- ) -> dict[str, Any]:
- summary, per_bin = calibration_stats(
- evaluation.y_true, evaluation.y_prob, bins=bins
- )
- bin_df = pd.DataFrame(
- per_bin,
- columns=["mean_confidence", "fraction_positive", "count"],
- )
- table_path = output_dir / "calibration_bins.csv"
- bin_df.to_csv(table_path, index=False)
- fig, ax = plt.subplots(figsize=(6, 6))
- valid = ~np.isnan(per_bin[:, 1])
- ax.plot([0, 1], [0, 1], linestyle="--", color="gray", label="ideal")
- ax.plot(
- per_bin[valid, 0],
- per_bin[valid, 1],
- marker="o",
- label=f"{evaluation.backend}",
- )
- ax.set_xlabel("Mean Predicted Probability")
- ax.set_ylabel("Empirical Fraction Positive")
- ax.set_title(f"Reliability Diagram ({evaluation.backend})")
- ax.legend()
- ax.grid(True, alpha=0.3)
- fig.tight_layout()
- plot_path = output_dir / "calibration_reliability.png"
- fig.savefig(plot_path)
- plt.close(fig)
- out = {
- **summary,
- "table": str(table_path),
- "plot": str(plot_path),
- }
- write_json(output_dir / "calibration_summary.json", out)
- return out
- def run_physician(
- evaluation: BackendEvaluation,
- clinical_df: pd.DataFrame,
- output_dir: Path,
- ) -> dict[str, Any]:
- secondary_key = (
- "predictive_entropy"
- if evaluation.uncertainty_metric == "predictive_entropy"
- else "std"
- )
- secondary_label = (
- "Model Predictive Entropy"
- if secondary_key == "predictive_entropy"
- else "Model Uncertainty Std"
- )
- col = physician_column(clinical_df)
- subset = clinical_df[["Image Data ID", col]].copy()
- subset[col] = pd.to_numeric(subset[col], errors="coerce")
- subset = subset.dropna(subset=["Image Data ID", col])
- subset["Image Data ID"] = subset["Image Data ID"].astype(int)
- subset[col] = subset[col].astype(int)
- eval_df = pd.DataFrame(
- {
- "Image Data ID": evaluation.image_ids.astype(int),
- "model_confidence": evaluation.uncertainty_confidence,
- "model_std": evaluation.uncertainty_std,
- "model_prob": evaluation.y_prob,
- }
- )
- merged = eval_df.merge(subset, on="Image Data ID", how="inner")
- if merged.empty:
- raise ValueError("No overlapping Image Data ID rows for physician analysis")
- grouped_rows: list[dict[str, Any]] = []
- uncertainty_specs = [
- ("confidence", "model_confidence", "Model Confidence (2*|p-0.5|)"),
- (secondary_key, "model_std", secondary_label),
- ]
- ratings = [int(r) for r in sorted(pd.unique(merged[col]))]
- plot_paths: dict[str, str] = {}
- correlations: dict[str, float] = {}
- for metric_name, metric_col, metric_label in uncertainty_specs:
- grouped_metric = (
- merged.groupby(col)
- .agg(
- n=("Image Data ID", "count"),
- mean_value=(metric_col, "mean"),
- std_value=(metric_col, "std"),
- mean_prob=("model_prob", "mean"),
- )
- .reset_index()
- .rename(columns={col: "physician_rating"})
- )
- grouped_metric["uncertainty_type"] = metric_name
- grouped_rows.extend(
- [
- {str(k): v for k, v in rec.items()}
- for rec in grouped_metric.to_dict(orient="records")
- ]
- )
- fig, ax = plt.subplots(figsize=(9, 5))
- data = [
- np.asarray(merged.loc[merged[col] == r, metric_col], dtype=float)
- for r in ratings
- ]
- ax.boxplot(data, tick_labels=[str(r) for r in ratings])
- ax.set_xlabel("Physician Confidence Rating (DXCONFID)")
- ax.set_ylabel(metric_label)
- ax.set_title(f"{metric_label} vs Physician Confidence ({evaluation.backend})")
- ax.grid(True, axis="y", alpha=0.3)
- fig.tight_layout()
- plot_path = output_dir / f"physician_{metric_name}_boxplot.png"
- fig.savefig(plot_path)
- plt.close(fig)
- corr = float(
- pd.to_numeric(
- merged[[metric_col, col]].corr(method="spearman").iloc[0, 1],
- errors="coerce",
- )
- )
- correlations[metric_name] = corr
- plot_paths[metric_name] = str(plot_path)
- grouped = pd.DataFrame(grouped_rows)
- table_path = output_dir / "physician_grouped_metrics.csv"
- grouped.to_csv(table_path, index=False)
- confidence_table = output_dir / "physician_confidence_grouped_metrics.csv"
- std_table = output_dir / "physician_std_grouped_metrics.csv"
- secondary_table = output_dir / f"physician_{secondary_key}_grouped_metrics.csv"
- grouped[grouped["uncertainty_type"] == "confidence"].to_csv(
- confidence_table, index=False
- )
- grouped[grouped["uncertainty_type"] == secondary_key].to_csv(
- secondary_table, index=False
- )
- grouped[grouped["uncertainty_type"] == secondary_key].to_csv(std_table, index=False)
- out = {
- "n_overlap": int(len(merged)),
- "spearman_vs_dxconfid": correlations,
- "table": str(table_path),
- "tables": {
- "confidence": str(confidence_table),
- secondary_key: str(secondary_table),
- "std": str(std_table),
- },
- "plots": plot_paths,
- }
- write_json(output_dir / "physician_summary.json", out)
- return out
- def _normalize_dx(value: Any) -> str:
- if value is None or (isinstance(value, float) and np.isnan(value)):
- return ""
- v = str(value).strip().upper()
- if v in {"NL", "NORMAL"}:
- return "CN"
- return v
- def run_longitudinal(
- evaluation: BackendEvaluation,
- clinical_df: pd.DataFrame,
- output_dir: Path,
- ) -> dict[str, Any]:
- secondary_key = (
- "predictive_entropy"
- if evaluation.uncertainty_metric == "predictive_entropy"
- else "std"
- )
- secondary_label = (
- "Mean Model Predictive Entropy"
- if secondary_key == "predictive_entropy"
- else "Mean Model Uncertainty Std"
- )
- required = ["Image Data ID", "PTID"]
- missing = [c for c in required if c not in clinical_df.columns]
- if missing:
- raise KeyError(f"Missing columns for longitudinal analysis: {missing}")
- diagnosis_col = None
- for candidate in ["Class", "DX", "Diagnosis"]:
- if candidate in clinical_df.columns:
- diagnosis_col = candidate
- break
- if diagnosis_col is None:
- raise KeyError(
- "No diagnosis column found. Expected one of: Class, DX, Diagnosis"
- )
- work = clinical_df[
- ["Image Data ID", "PTID", diagnosis_col]
- + [c for c in ["EXAMDATE"] if c in clinical_df.columns]
- ].copy()
- work["Image Data ID"] = pd.to_numeric(work["Image Data ID"], errors="coerce")
- work = work.dropna(subset=["Image Data ID", "PTID"])
- work["Image Data ID"] = work["Image Data ID"].astype(int)
- work["PTID"] = work["PTID"].astype(str).str.strip()
- work["diagnosis"] = work[diagnosis_col].map(_normalize_dx)
- if "EXAMDATE" in work.columns:
- work["EXAMDATE"] = pd.to_datetime(work["EXAMDATE"], errors="coerce")
- work = work.sort_values(["PTID", "EXAMDATE"], na_position="last")
- else:
- work = work.sort_values(["PTID", "Image Data ID"])
- eval_df = pd.DataFrame(
- {
- "Image Data ID": evaluation.image_ids.astype(int),
- "model_confidence": evaluation.uncertainty_confidence,
- "model_std": evaluation.uncertainty_std,
- "model_prob": evaluation.y_prob,
- }
- )
- merged = work.merge(eval_df, on="Image Data ID", how="inner")
- if merged.empty:
- raise ValueError("No overlapping Image Data ID rows for longitudinal analysis")
- patient_rows: list[dict[str, Any]] = []
- for ptid, group in merged.groupby("PTID"):
- diagnoses = [d for d in group["diagnosis"].tolist() if d]
- if len(diagnoses) < 2:
- continue
- first_dx = diagnoses[0]
- last_dx = diagnoses[-1]
- unique_dx = set(diagnoses)
- cohort = "other"
- if unique_dx.issubset({"CN"}):
- cohort = "stable_cn"
- elif unique_dx.issubset({"AD"}):
- cohort = "stable_ad"
- elif first_dx == "CN" and "AD" in unique_dx and last_dx == "AD":
- cohort = "cn_to_ad"
- patient_rows.append(
- {
- "PTID": ptid,
- "n_visits": int(len(group)),
- "first_dx": first_dx,
- "last_dx": last_dx,
- "cohort": cohort,
- "mean_confidence": float(group["model_confidence"].mean()),
- "mean_std": float(group["model_std"].mean()),
- "mean_prob": float(group["model_prob"].mean()),
- }
- )
- patient_df = pd.DataFrame(patient_rows)
- table_path = output_dir / "longitudinal_patient_summary.csv"
- patient_df.to_csv(table_path, index=False)
- cohort_df = (
- patient_df.groupby("cohort")
- .agg(
- n_patients=("PTID", "count"),
- mean_confidence=("mean_confidence", "mean"),
- mean_std=("mean_std", "mean"),
- mean_prob=("mean_prob", "mean"),
- )
- .reset_index()
- )
- cohort_table = output_dir / "longitudinal_cohort_summary.csv"
- cohort_df.to_csv(cohort_table, index=False)
- cohorts = ["stable_cn", "stable_ad", "cn_to_ad"]
- uncertainty_specs = [
- ("confidence", "mean_confidence", "Mean Model Confidence"),
- (secondary_key, "mean_std", secondary_label),
- ]
- plot_paths: dict[str, str] = {}
- for metric_name, metric_col, metric_label in uncertainty_specs:
- fig, ax = plt.subplots(figsize=(9, 5))
- values = [
- np.asarray(
- patient_df.loc[patient_df["cohort"] == c, metric_col], dtype=float
- )
- for c in cohorts
- ]
- ax.boxplot(values, tick_labels=cohorts)
- ax.set_ylabel(metric_label)
- ax.set_title(f"Longitudinal Cohort {metric_label} ({evaluation.backend})")
- ax.grid(True, axis="y", alpha=0.3)
- fig.tight_layout()
- plot_path = output_dir / f"longitudinal_cohort_{metric_name}.png"
- fig.savefig(plot_path)
- plt.close(fig)
- plot_paths[metric_name] = str(plot_path)
- uncertainty_by_cohort = cohort_df.melt(
- id_vars=["cohort", "n_patients"],
- value_vars=["mean_confidence", "mean_std"],
- var_name="uncertainty_type",
- value_name="mean_value",
- ).replace(
- {
- "uncertainty_type": {
- "mean_confidence": "confidence",
- "mean_std": secondary_key,
- }
- }
- )
- uncertainty_table = output_dir / "longitudinal_uncertainty_by_cohort.csv"
- uncertainty_by_cohort.to_csv(uncertainty_table, index=False)
- confidence_patient_table = (
- output_dir / "longitudinal_confidence_patient_summary.csv"
- )
- std_patient_table = output_dir / "longitudinal_std_patient_summary.csv"
- confidence_cohort_table = output_dir / "longitudinal_confidence_cohort_summary.csv"
- std_cohort_table = output_dir / "longitudinal_std_cohort_summary.csv"
- secondary_patient_table = (
- output_dir / f"longitudinal_{secondary_key}_patient_summary.csv"
- )
- secondary_cohort_table = (
- output_dir / f"longitudinal_{secondary_key}_cohort_summary.csv"
- )
- patient_df[
- ["PTID", "n_visits", "first_dx", "last_dx", "cohort", "mean_confidence"]
- ].to_csv(confidence_patient_table, index=False)
- patient_df[
- ["PTID", "n_visits", "first_dx", "last_dx", "cohort", "mean_std"]
- ].to_csv(std_patient_table, index=False)
- patient_df[
- ["PTID", "n_visits", "first_dx", "last_dx", "cohort", "mean_std"]
- ].to_csv(secondary_patient_table, index=False)
- cohort_df[["cohort", "n_patients", "mean_confidence"]].to_csv(
- confidence_cohort_table, index=False
- )
- cohort_df[["cohort", "n_patients", "mean_std"]].to_csv(
- std_cohort_table, index=False
- )
- cohort_df[["cohort", "n_patients", "mean_std"]].to_csv(
- secondary_cohort_table, index=False
- )
- out = {
- "n_patients_analyzed": int(len(patient_df)),
- "table_patient": str(table_path),
- "table_cohort": str(cohort_table),
- "table_uncertainty": str(uncertainty_table),
- "tables": {
- "confidence": {
- "patient": str(confidence_patient_table),
- "cohort": str(confidence_cohort_table),
- },
- secondary_key: {
- "patient": str(secondary_patient_table),
- "cohort": str(secondary_cohort_table),
- },
- "std": {
- "patient": str(std_patient_table),
- "cohort": str(std_cohort_table),
- },
- },
- "plots": plot_paths,
- }
- write_json(output_dir / "longitudinal_summary.json", out)
- return out
|