| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599 |
- # pyright: basic
- from __future__ import annotations
- from pathlib import Path
- from typing import Any
- import numpy as np
- import pandas as pd
- from .data_access import BackendEvaluation, physician_column
- from .defaults import DEFAULT_DECISION_THRESHOLD, uncertainty_cutoff_percentiles
- from .metrics import calibration_stats, performance_at_threshold, threshold_sweep
- from .plotting import (
- plots_dir,
- save_boxplot,
- save_calibration_plot,
- save_performance_threshold_plot,
- save_uncertainty_cutoff_plot,
- )
- from .runtime import write_json
- from .uncertainty import confidence_uncertainty
- def _save_table(rows: list[dict[str, Any]], out_path: Path) -> pd.DataFrame:
- df = pd.DataFrame(rows)
- out_path.parent.mkdir(parents=True, exist_ok=True)
- df.to_csv(out_path, index=False)
- return df
- def _uncertainty_percentiles(values: np.ndarray) -> np.ndarray:
- vals = np.asarray(values, dtype=float)
- n = len(vals)
- if n == 0:
- return np.asarray([], dtype=float)
- if n == 1:
- return np.asarray([0.0], dtype=float)
- order = np.argsort(vals)
- ranks = np.empty(n, dtype=float)
- ranks[order] = np.arange(n, dtype=float)
- return (ranks / float(n - 1)) * 100.0
- def _save_ensemble_prediction_debug(
- evaluation: BackendEvaluation,
- output_dir: Path,
- ) -> str:
- uncertainty_vals = confidence_uncertainty(evaluation.y_prob)
- uncertainty_pct = _uncertainty_percentiles(uncertainty_vals)
- pred_label = (evaluation.y_prob >= DEFAULT_DECISION_THRESHOLD).astype(int)
- true_label = np.asarray(evaluation.y_true, dtype=int)
- is_correct = (pred_label == true_label).astype(int)
- debug_df = pd.DataFrame(
- {
- "image_id": evaluation.image_ids.astype(int),
- "predicted_probability": np.asarray(evaluation.y_prob, dtype=float),
- "predicted_label": pred_label,
- "actual_label": true_label,
- "is_correct": is_correct,
- "uncertainty": uncertainty_vals,
- "uncertainty_percentile": uncertainty_pct,
- }
- ).sort_values("uncertainty", ascending=True)
- path = output_dir / "ensemble_prediction_debug.csv"
- debug_df.to_csv(path, index=False)
- return str(path)
- def _uncertainty_cutoff_analysis(
- evaluation: BackendEvaluation,
- output_dir: Path,
- uncertainty_types: list[tuple[str, np.ndarray]],
- cutoff_percentiles: np.ndarray,
- keep_higher: bool,
- table_filename: str,
- plot_filename: str,
- title_prefix: str,
- x_label: str,
- selection_label: str,
- ) -> dict[str, Any]:
- cutoff_rows: list[dict[str, Any]] = []
- for uncertainty_name, values in uncertainty_types:
- finite_mask = np.isfinite(values)
- if not finite_mask.any():
- continue
- values_valid = values[finite_mask]
- y_true_valid = evaluation.y_true[finite_mask]
- y_prob_valid = evaluation.y_prob[finite_mask]
- for cutoff_percentile in cutoff_percentiles:
- cutoff_value = float(np.percentile(values_valid, cutoff_percentile))
- if keep_higher:
- keep_mask = values_valid >= cutoff_value
- else:
- keep_mask = values_valid <= cutoff_value
- retained = int(keep_mask.sum())
- if retained == 0:
- continue
- perf = performance_at_threshold(
- y_true=y_true_valid[keep_mask],
- y_prob=y_prob_valid[keep_mask],
- threshold=DEFAULT_DECISION_THRESHOLD,
- )
- cutoff_rows.append(
- {
- "uncertainty_type": uncertainty_name,
- "cutoff_percentile": float(cutoff_percentile),
- "cutoff_value": cutoff_value,
- "n_retained": retained,
- "coverage": float(retained / len(values_valid)),
- "selection_rule": selection_label,
- "accuracy": float(perf["accuracy"]),
- "f1": float(perf["f1"]),
- }
- )
- table_path = output_dir / table_filename
- plot_path = plots_dir(output_dir) / plot_filename
- if not cutoff_rows:
- return {"table": str(table_path), "plot": str(plot_path), "rows": 0}
- cutoff_df = pd.DataFrame(cutoff_rows)
- cutoff_df.to_csv(table_path, index=False)
- # Restriction level increases from left to right so right-most points are most restricted.
- if keep_higher:
- cutoff_df["restriction_level"] = cutoff_df["cutoff_percentile"]
- else:
- cutoff_df["restriction_level"] = 100.0 - cutoff_df["cutoff_percentile"]
- save_uncertainty_cutoff_plot(
- cutoff_df=cutoff_df,
- title_prefix=title_prefix,
- x_label=x_label,
- output_path=plot_path,
- )
- return {
- "table": str(table_path),
- "plot": str(plot_path),
- "rows": int(len(cutoff_df)),
- }
- def run_performance(
- evaluation: BackendEvaluation,
- output_dir: Path,
- thresholds: np.ndarray,
- ) -> dict[str, Any]:
- rows = threshold_sweep(evaluation.y_true, evaluation.y_prob, thresholds)
- table_path = output_dir / "performance_threshold_sweep.csv"
- df = _save_table(rows, table_path)
- plot_path = plots_dir(output_dir) / "performance_threshold_sweep.png"
- save_performance_threshold_plot(
- df=df,
- backend=evaluation.backend,
- output_path=plot_path,
- )
- best_idx = int(df["f1"].idxmax())
- best = df.iloc[best_idx].to_dict()
- cutoff_percentiles = uncertainty_cutoff_percentiles()
- confidence_uncertainty_vals = confidence_uncertainty(evaluation.y_prob)
- secondary_uncertainty = np.asarray(evaluation.uncertainty_std, dtype=float)
- uncertainty_types = [
- ("confidence_uncertainty", confidence_uncertainty_vals),
- (evaluation.uncertainty_metric, secondary_uncertainty),
- ]
- uncertainty_cutoff = _uncertainty_cutoff_analysis(
- evaluation=evaluation,
- output_dir=output_dir,
- uncertainty_types=uncertainty_types,
- cutoff_percentiles=cutoff_percentiles,
- keep_higher=False,
- table_filename="performance_uncertainty_cutoff.csv",
- plot_filename="performance_uncertainty_cutoff.png",
- title_prefix="Uncertainty Cutoff Percentile",
- x_label="Restriction Level (0 = all results, 100 = lowest-uncertainty subset)",
- selection_label="uncertainty <= percentile cutoff",
- )
- percentile_cutoffs = uncertainty_cutoff_percentiles()
- uncertainty_percentile_cutoff = _uncertainty_cutoff_analysis(
- evaluation=evaluation,
- output_dir=output_dir,
- uncertainty_types=uncertainty_types,
- cutoff_percentiles=percentile_cutoffs,
- keep_higher=True,
- table_filename="performance_uncertainty_percentile_cutoff.csv",
- plot_filename="performance_uncertainty_percentile_cutoff.png",
- title_prefix="Uncertainty Percentile Floor",
- x_label="Uncertainty Percentile Floor (0 = all results, 100 = highest-uncertainty subset)",
- selection_label="uncertainty >= percentile floor",
- )
- cutoff_table_path = Path(uncertainty_cutoff["table"])
- cutoff_plot_path = Path(uncertainty_cutoff["plot"])
- percentile_cutoff_table_path = Path(uncertainty_percentile_cutoff["table"])
- percentile_cutoff_plot_path = Path(uncertainty_percentile_cutoff["plot"])
- summary = {
- "best_by_f1": {
- k: float(v) for k, v in best.items() if isinstance(v, (int, float))
- },
- "table": str(table_path),
- "plot": str(plot_path),
- "uncertainty_cutoff": {
- "table": str(cutoff_table_path),
- "plot": str(cutoff_plot_path),
- "decision_threshold": DEFAULT_DECISION_THRESHOLD,
- },
- "uncertainty_percentile_cutoff": {
- "table": str(percentile_cutoff_table_path),
- "plot": str(percentile_cutoff_plot_path),
- "decision_threshold": DEFAULT_DECISION_THRESHOLD,
- },
- }
- if evaluation.backend == "ensemble":
- summary["ensemble_prediction_debug"] = _save_ensemble_prediction_debug(
- evaluation=evaluation,
- output_dir=output_dir,
- )
- write_json(output_dir / "performance_summary.json", summary)
- return summary
- def run_calibration(
- evaluation: BackendEvaluation,
- output_dir: Path,
- bins: int,
- ) -> dict[str, Any]:
- summary, per_bin = calibration_stats(
- evaluation.y_true, evaluation.y_prob, bins=bins
- )
- bin_df = pd.DataFrame(
- per_bin,
- columns=["mean_confidence", "fraction_positive", "count"],
- )
- table_path = output_dir / "calibration_bins.csv"
- bin_df.to_csv(table_path, index=False)
- plot_path = plots_dir(output_dir) / "calibration_reliability.png"
- save_calibration_plot(
- per_bin=per_bin,
- backend=evaluation.backend,
- output_path=plot_path,
- )
- out = {
- **summary,
- "table": str(table_path),
- "plot": str(plot_path),
- }
- write_json(output_dir / "calibration_summary.json", out)
- return out
- def run_physician(
- evaluation: BackendEvaluation,
- clinical_df: pd.DataFrame,
- output_dir: Path,
- ) -> dict[str, Any]:
- secondary_key = (
- "predictive_entropy"
- if evaluation.uncertainty_metric == "predictive_entropy"
- else "std"
- )
- secondary_label = (
- "Model Predictive Entropy"
- if secondary_key == "predictive_entropy"
- else "Model Uncertainty Std"
- )
- col = physician_column(clinical_df)
- subset = clinical_df[["Image Data ID", col]].copy()
- subset[col] = pd.to_numeric(subset[col], errors="coerce")
- subset = subset.dropna(subset=["Image Data ID", col])
- subset["Image Data ID"] = subset["Image Data ID"].astype(int)
- subset[col] = subset[col].astype(int)
- eval_df = pd.DataFrame(
- {
- "Image Data ID": evaluation.image_ids.astype(int),
- "model_confidence": evaluation.uncertainty_confidence,
- "model_std": evaluation.uncertainty_std,
- "model_prob": evaluation.y_prob,
- }
- )
- merged = eval_df.merge(subset, on="Image Data ID", how="inner")
- if merged.empty:
- raise ValueError("No overlapping Image Data ID rows for physician analysis")
- grouped_rows: list[dict[str, Any]] = []
- uncertainty_specs = [
- ("confidence", "model_confidence", "Model Confidence (2*|p-0.5|)"),
- (secondary_key, "model_std", secondary_label),
- ]
- ratings = [int(r) for r in sorted(pd.unique(merged[col]))]
- plot_paths: dict[str, str] = {}
- correlations: dict[str, float] = {}
- for metric_name, metric_col, metric_label in uncertainty_specs:
- grouped_metric = (
- merged.groupby(col)
- .agg(
- n=("Image Data ID", "count"),
- mean_value=(metric_col, "mean"),
- std_value=(metric_col, "std"),
- mean_prob=("model_prob", "mean"),
- )
- .reset_index()
- .rename(columns={col: "physician_rating"})
- )
- grouped_metric["uncertainty_type"] = metric_name
- grouped_rows.extend(
- [
- {str(k): v for k, v in rec.items()}
- for rec in grouped_metric.to_dict(orient="records")
- ]
- )
- data = [
- np.asarray(merged.loc[merged[col] == r, metric_col], dtype=float)
- for r in ratings
- ]
- plot_path = plots_dir(output_dir) / f"physician_{metric_name}_boxplot.png"
- save_boxplot(
- data=data,
- tick_labels=[str(r) for r in ratings],
- x_label="Physician Confidence Rating (DXCONFID)",
- y_label=metric_label,
- title=f"{metric_label} vs Physician Confidence ({evaluation.backend})",
- output_path=plot_path,
- )
- corr = float(
- pd.to_numeric(
- merged[[metric_col, col]].corr(method="spearman").iloc[0, 1],
- errors="coerce",
- )
- )
- correlations[metric_name] = corr
- plot_paths[metric_name] = str(plot_path)
- grouped = pd.DataFrame(grouped_rows)
- table_path = output_dir / "physician_grouped_metrics.csv"
- grouped.to_csv(table_path, index=False)
- confidence_table = output_dir / "physician_confidence_grouped_metrics.csv"
- std_table = output_dir / "physician_std_grouped_metrics.csv"
- secondary_table = output_dir / f"physician_{secondary_key}_grouped_metrics.csv"
- grouped[grouped["uncertainty_type"] == "confidence"].to_csv(
- confidence_table, index=False
- )
- grouped[grouped["uncertainty_type"] == secondary_key].to_csv(
- secondary_table, index=False
- )
- grouped[grouped["uncertainty_type"] == secondary_key].to_csv(std_table, index=False)
- out = {
- "n_overlap": int(len(merged)),
- "spearman_vs_dxconfid": correlations,
- "table": str(table_path),
- "tables": {
- "confidence": str(confidence_table),
- secondary_key: str(secondary_table),
- "std": str(std_table),
- },
- "plots": plot_paths,
- }
- write_json(output_dir / "physician_summary.json", out)
- return out
- def _normalize_dx(value: Any) -> str:
- if value is None or (isinstance(value, float) and np.isnan(value)):
- return ""
- v = str(value).strip().upper()
- if v in {"NL", "NORMAL"}:
- return "CN"
- return v
- def run_longitudinal(
- evaluation: BackendEvaluation,
- clinical_df: pd.DataFrame,
- output_dir: Path,
- ) -> dict[str, Any]:
- secondary_key = (
- "predictive_entropy"
- if evaluation.uncertainty_metric == "predictive_entropy"
- else "std"
- )
- secondary_label = (
- "Mean Model Predictive Entropy"
- if secondary_key == "predictive_entropy"
- else "Mean Model Uncertainty Std"
- )
- required = ["Image Data ID", "PTID"]
- missing = [c for c in required if c not in clinical_df.columns]
- if missing:
- raise KeyError(f"Missing columns for longitudinal analysis: {missing}")
- diagnosis_col = None
- for candidate in ["Class", "DX", "Diagnosis"]:
- if candidate in clinical_df.columns:
- diagnosis_col = candidate
- break
- if diagnosis_col is None:
- raise KeyError(
- "No diagnosis column found. Expected one of: Class, DX, Diagnosis"
- )
- work = clinical_df[
- ["Image Data ID", "PTID", diagnosis_col]
- + [c for c in ["EXAMDATE"] if c in clinical_df.columns]
- ].copy()
- work["Image Data ID"] = pd.to_numeric(work["Image Data ID"], errors="coerce")
- work = work.dropna(subset=["Image Data ID", "PTID"])
- work["Image Data ID"] = work["Image Data ID"].astype(int)
- work["PTID"] = work["PTID"].astype(str).str.strip()
- work["diagnosis"] = work[diagnosis_col].map(_normalize_dx)
- if "EXAMDATE" in work.columns:
- work["EXAMDATE"] = pd.to_datetime(work["EXAMDATE"], errors="coerce")
- work = work.sort_values(["PTID", "EXAMDATE"], na_position="last")
- else:
- work = work.sort_values(["PTID", "Image Data ID"])
- eval_df = pd.DataFrame(
- {
- "Image Data ID": evaluation.image_ids.astype(int),
- "model_confidence": evaluation.uncertainty_confidence,
- "model_std": evaluation.uncertainty_std,
- "model_prob": evaluation.y_prob,
- }
- )
- merged = work.merge(eval_df, on="Image Data ID", how="inner")
- if merged.empty:
- raise ValueError("No overlapping Image Data ID rows for longitudinal analysis")
- patient_rows: list[dict[str, Any]] = []
- for ptid, group in merged.groupby("PTID"):
- diagnoses = [d for d in group["diagnosis"].tolist() if d]
- if len(diagnoses) < 2:
- continue
- first_dx = diagnoses[0]
- last_dx = diagnoses[-1]
- unique_dx = set(diagnoses)
- cohort = "other"
- if unique_dx.issubset({"CN"}):
- cohort = "stable_cn"
- elif unique_dx.issubset({"AD"}):
- cohort = "stable_ad"
- elif first_dx == "CN" and "AD" in unique_dx and last_dx == "AD":
- cohort = "cn_to_ad"
- patient_rows.append(
- {
- "PTID": ptid,
- "n_visits": int(len(group)),
- "first_dx": first_dx,
- "last_dx": last_dx,
- "cohort": cohort,
- "mean_confidence": float(group["model_confidence"].mean()),
- "mean_std": float(group["model_std"].mean()),
- "mean_prob": float(group["model_prob"].mean()),
- }
- )
- patient_df = pd.DataFrame(patient_rows)
- table_path = output_dir / "longitudinal_patient_summary.csv"
- patient_df.to_csv(table_path, index=False)
- cohort_df = (
- patient_df.groupby("cohort")
- .agg(
- n_patients=("PTID", "count"),
- mean_confidence=("mean_confidence", "mean"),
- mean_std=("mean_std", "mean"),
- mean_prob=("mean_prob", "mean"),
- )
- .reset_index()
- )
- cohort_table = output_dir / "longitudinal_cohort_summary.csv"
- cohort_df.to_csv(cohort_table, index=False)
- cohorts = ["stable_cn", "stable_ad", "cn_to_ad"]
- uncertainty_specs = [
- ("confidence", "mean_confidence", "Mean Model Confidence"),
- (secondary_key, "mean_std", secondary_label),
- ]
- plot_paths: dict[str, str] = {}
- for metric_name, metric_col, metric_label in uncertainty_specs:
- values = [
- np.asarray(
- patient_df.loc[patient_df["cohort"] == c, metric_col], dtype=float
- )
- for c in cohorts
- ]
- plot_path = plots_dir(output_dir) / f"longitudinal_cohort_{metric_name}.png"
- save_boxplot(
- data=values,
- tick_labels=cohorts,
- x_label="Cohort",
- y_label=metric_label,
- title=f"Longitudinal Cohort {metric_label} ({evaluation.backend})",
- output_path=plot_path,
- )
- plot_paths[metric_name] = str(plot_path)
- uncertainty_by_cohort = cohort_df.melt(
- id_vars=["cohort", "n_patients"],
- value_vars=["mean_confidence", "mean_std"],
- var_name="uncertainty_type",
- value_name="mean_value",
- ).replace(
- {
- "uncertainty_type": {
- "mean_confidence": "confidence",
- "mean_std": secondary_key,
- }
- }
- )
- uncertainty_table = output_dir / "longitudinal_uncertainty_by_cohort.csv"
- uncertainty_by_cohort.to_csv(uncertainty_table, index=False)
- confidence_patient_table = (
- output_dir / "longitudinal_confidence_patient_summary.csv"
- )
- std_patient_table = output_dir / "longitudinal_std_patient_summary.csv"
- confidence_cohort_table = output_dir / "longitudinal_confidence_cohort_summary.csv"
- std_cohort_table = output_dir / "longitudinal_std_cohort_summary.csv"
- secondary_patient_table = (
- output_dir / f"longitudinal_{secondary_key}_patient_summary.csv"
- )
- secondary_cohort_table = (
- output_dir / f"longitudinal_{secondary_key}_cohort_summary.csv"
- )
- patient_df[
- ["PTID", "n_visits", "first_dx", "last_dx", "cohort", "mean_confidence"]
- ].to_csv(confidence_patient_table, index=False)
- patient_df[
- ["PTID", "n_visits", "first_dx", "last_dx", "cohort", "mean_std"]
- ].to_csv(std_patient_table, index=False)
- patient_df[
- ["PTID", "n_visits", "first_dx", "last_dx", "cohort", "mean_std"]
- ].to_csv(secondary_patient_table, index=False)
- cohort_df[["cohort", "n_patients", "mean_confidence"]].to_csv(
- confidence_cohort_table, index=False
- )
- cohort_df[["cohort", "n_patients", "mean_std"]].to_csv(
- std_cohort_table, index=False
- )
- cohort_df[["cohort", "n_patients", "mean_std"]].to_csv(
- secondary_cohort_table, index=False
- )
- out = {
- "n_patients_analyzed": int(len(patient_df)),
- "table_patient": str(table_path),
- "table_cohort": str(cohort_table),
- "table_uncertainty": str(uncertainty_table),
- "tables": {
- "confidence": {
- "patient": str(confidence_patient_table),
- "cohort": str(confidence_cohort_table),
- },
- secondary_key: {
- "patient": str(secondary_patient_table),
- "cohort": str(secondary_cohort_table),
- },
- "std": {
- "patient": str(std_patient_table),
- "cohort": str(std_cohort_table),
- },
- },
- "plots": plot_paths,
- }
- write_json(output_dir / "longitudinal_summary.json", out)
- return out
|