# pyright: basic from __future__ import annotations from pathlib import Path from typing import Any import numpy as np import pandas as pd from .data_access import BackendEvaluation, physician_column from .defaults import DEFAULT_DECISION_THRESHOLD, uncertainty_cutoff_percentiles from .metrics import calibration_stats, performance_at_threshold, threshold_sweep from .plotting import ( plots_dir, save_boxplot, save_calibration_plot, save_performance_threshold_plot, save_uncertainty_cutoff_plot, ) from .runtime import write_json from .uncertainty import confidence_uncertainty def _save_table(rows: list[dict[str, Any]], out_path: Path) -> pd.DataFrame: df = pd.DataFrame(rows) out_path.parent.mkdir(parents=True, exist_ok=True) df.to_csv(out_path, index=False) return df def _uncertainty_percentiles(values: np.ndarray) -> np.ndarray: vals = np.asarray(values, dtype=float) n = len(vals) if n == 0: return np.asarray([], dtype=float) if n == 1: return np.asarray([0.0], dtype=float) order = np.argsort(vals) ranks = np.empty(n, dtype=float) ranks[order] = np.arange(n, dtype=float) return (ranks / float(n - 1)) * 100.0 def _save_ensemble_prediction_debug( evaluation: BackendEvaluation, output_dir: Path, ) -> str: uncertainty_vals = confidence_uncertainty(evaluation.y_prob) uncertainty_pct = _uncertainty_percentiles(uncertainty_vals) pred_label = (evaluation.y_prob >= DEFAULT_DECISION_THRESHOLD).astype(int) true_label = np.asarray(evaluation.y_true, dtype=int) is_correct = (pred_label == true_label).astype(int) debug_df = pd.DataFrame( { "image_id": evaluation.image_ids.astype(int), "predicted_probability": np.asarray(evaluation.y_prob, dtype=float), "predicted_label": pred_label, "actual_label": true_label, "is_correct": is_correct, "uncertainty": uncertainty_vals, "uncertainty_percentile": uncertainty_pct, } ).sort_values("uncertainty", ascending=True) path = output_dir / "ensemble_prediction_debug.csv" debug_df.to_csv(path, index=False) return str(path) def _uncertainty_cutoff_analysis( evaluation: BackendEvaluation, output_dir: Path, uncertainty_types: list[tuple[str, np.ndarray]], cutoff_percentiles: np.ndarray, keep_higher: bool, table_filename: str, plot_filename: str, title_prefix: str, x_label: str, selection_label: str, ) -> dict[str, Any]: cutoff_rows: list[dict[str, Any]] = [] for uncertainty_name, values in uncertainty_types: finite_mask = np.isfinite(values) if not finite_mask.any(): continue values_valid = values[finite_mask] y_true_valid = evaluation.y_true[finite_mask] y_prob_valid = evaluation.y_prob[finite_mask] for cutoff_percentile in cutoff_percentiles: cutoff_value = float(np.percentile(values_valid, cutoff_percentile)) if keep_higher: keep_mask = values_valid >= cutoff_value else: keep_mask = values_valid <= cutoff_value retained = int(keep_mask.sum()) if retained == 0: continue perf = performance_at_threshold( y_true=y_true_valid[keep_mask], y_prob=y_prob_valid[keep_mask], threshold=DEFAULT_DECISION_THRESHOLD, ) cutoff_rows.append( { "uncertainty_type": uncertainty_name, "cutoff_percentile": float(cutoff_percentile), "cutoff_value": cutoff_value, "n_retained": retained, "coverage": float(retained / len(values_valid)), "selection_rule": selection_label, "accuracy": float(perf["accuracy"]), "f1": float(perf["f1"]), } ) table_path = output_dir / table_filename plot_path = plots_dir(output_dir) / plot_filename if not cutoff_rows: return {"table": str(table_path), "plot": str(plot_path), "rows": 0} cutoff_df = pd.DataFrame(cutoff_rows) cutoff_df.to_csv(table_path, index=False) # Restriction level increases from left to right so right-most points are most restricted. if keep_higher: cutoff_df["restriction_level"] = cutoff_df["cutoff_percentile"] else: cutoff_df["restriction_level"] = 100.0 - cutoff_df["cutoff_percentile"] save_uncertainty_cutoff_plot( cutoff_df=cutoff_df, title_prefix=title_prefix, x_label=x_label, output_path=plot_path, ) return { "table": str(table_path), "plot": str(plot_path), "rows": int(len(cutoff_df)), } def run_performance( evaluation: BackendEvaluation, output_dir: Path, thresholds: np.ndarray, ) -> dict[str, Any]: rows = threshold_sweep(evaluation.y_true, evaluation.y_prob, thresholds) table_path = output_dir / "performance_threshold_sweep.csv" df = _save_table(rows, table_path) plot_path = plots_dir(output_dir) / "performance_threshold_sweep.png" save_performance_threshold_plot( df=df, backend=evaluation.backend, output_path=plot_path, ) best_idx = int(df["f1"].idxmax()) best = df.iloc[best_idx].to_dict() cutoff_percentiles = uncertainty_cutoff_percentiles() confidence_uncertainty_vals = confidence_uncertainty(evaluation.y_prob) secondary_uncertainty = np.asarray(evaluation.uncertainty_std, dtype=float) uncertainty_types = [ ("confidence_uncertainty", confidence_uncertainty_vals), (evaluation.uncertainty_metric, secondary_uncertainty), ] uncertainty_cutoff = _uncertainty_cutoff_analysis( evaluation=evaluation, output_dir=output_dir, uncertainty_types=uncertainty_types, cutoff_percentiles=cutoff_percentiles, keep_higher=False, table_filename="performance_uncertainty_cutoff.csv", plot_filename="performance_uncertainty_cutoff.png", title_prefix="Uncertainty Cutoff Percentile", x_label="Restriction Level (0 = all results, 100 = lowest-uncertainty subset)", selection_label="uncertainty <= percentile cutoff", ) percentile_cutoffs = uncertainty_cutoff_percentiles() uncertainty_percentile_cutoff = _uncertainty_cutoff_analysis( evaluation=evaluation, output_dir=output_dir, uncertainty_types=uncertainty_types, cutoff_percentiles=percentile_cutoffs, keep_higher=True, table_filename="performance_uncertainty_percentile_cutoff.csv", plot_filename="performance_uncertainty_percentile_cutoff.png", title_prefix="Uncertainty Percentile Floor", x_label="Uncertainty Percentile Floor (0 = all results, 100 = highest-uncertainty subset)", selection_label="uncertainty >= percentile floor", ) cutoff_table_path = Path(uncertainty_cutoff["table"]) cutoff_plot_path = Path(uncertainty_cutoff["plot"]) percentile_cutoff_table_path = Path(uncertainty_percentile_cutoff["table"]) percentile_cutoff_plot_path = Path(uncertainty_percentile_cutoff["plot"]) summary = { "best_by_f1": { k: float(v) for k, v in best.items() if isinstance(v, (int, float)) }, "table": str(table_path), "plot": str(plot_path), "uncertainty_cutoff": { "table": str(cutoff_table_path), "plot": str(cutoff_plot_path), "decision_threshold": DEFAULT_DECISION_THRESHOLD, }, "uncertainty_percentile_cutoff": { "table": str(percentile_cutoff_table_path), "plot": str(percentile_cutoff_plot_path), "decision_threshold": DEFAULT_DECISION_THRESHOLD, }, } if evaluation.backend == "ensemble": summary["ensemble_prediction_debug"] = _save_ensemble_prediction_debug( evaluation=evaluation, output_dir=output_dir, ) write_json(output_dir / "performance_summary.json", summary) return summary def run_calibration( evaluation: BackendEvaluation, output_dir: Path, bins: int, ) -> dict[str, Any]: summary, per_bin = calibration_stats( evaluation.y_true, evaluation.y_prob, bins=bins ) bin_df = pd.DataFrame( per_bin, columns=["mean_confidence", "fraction_positive", "count"], ) table_path = output_dir / "calibration_bins.csv" bin_df.to_csv(table_path, index=False) plot_path = plots_dir(output_dir) / "calibration_reliability.png" save_calibration_plot( per_bin=per_bin, backend=evaluation.backend, output_path=plot_path, ) out = { **summary, "table": str(table_path), "plot": str(plot_path), } write_json(output_dir / "calibration_summary.json", out) return out def run_physician( evaluation: BackendEvaluation, clinical_df: pd.DataFrame, output_dir: Path, ) -> dict[str, Any]: secondary_key = ( "predictive_entropy" if evaluation.uncertainty_metric == "predictive_entropy" else "std" ) secondary_label = ( "Model Predictive Entropy" if secondary_key == "predictive_entropy" else "Model Uncertainty Std" ) col = physician_column(clinical_df) subset = clinical_df[["Image Data ID", col]].copy() subset[col] = pd.to_numeric(subset[col], errors="coerce") subset = subset.dropna(subset=["Image Data ID", col]) subset["Image Data ID"] = subset["Image Data ID"].astype(int) subset[col] = subset[col].astype(int) eval_df = pd.DataFrame( { "Image Data ID": evaluation.image_ids.astype(int), "model_confidence": evaluation.uncertainty_confidence, "model_std": evaluation.uncertainty_std, "model_prob": evaluation.y_prob, } ) merged = eval_df.merge(subset, on="Image Data ID", how="inner") if merged.empty: raise ValueError("No overlapping Image Data ID rows for physician analysis") grouped_rows: list[dict[str, Any]] = [] uncertainty_specs = [ ("confidence", "model_confidence", "Model Confidence (2*|p-0.5|)"), (secondary_key, "model_std", secondary_label), ] ratings = [int(r) for r in sorted(pd.unique(merged[col]))] plot_paths: dict[str, str] = {} correlations: dict[str, float] = {} for metric_name, metric_col, metric_label in uncertainty_specs: grouped_metric = ( merged.groupby(col) .agg( n=("Image Data ID", "count"), mean_value=(metric_col, "mean"), std_value=(metric_col, "std"), mean_prob=("model_prob", "mean"), ) .reset_index() .rename(columns={col: "physician_rating"}) ) grouped_metric["uncertainty_type"] = metric_name grouped_rows.extend( [ {str(k): v for k, v in rec.items()} for rec in grouped_metric.to_dict(orient="records") ] ) data = [ np.asarray(merged.loc[merged[col] == r, metric_col], dtype=float) for r in ratings ] plot_path = plots_dir(output_dir) / f"physician_{metric_name}_boxplot.png" save_boxplot( data=data, tick_labels=[str(r) for r in ratings], x_label="Physician Confidence Rating (DXCONFID)", y_label=metric_label, title=f"{metric_label} vs Physician Confidence ({evaluation.backend})", output_path=plot_path, ) corr = float( pd.to_numeric( merged[[metric_col, col]].corr(method="spearman").iloc[0, 1], errors="coerce", ) ) correlations[metric_name] = corr plot_paths[metric_name] = str(plot_path) grouped = pd.DataFrame(grouped_rows) table_path = output_dir / "physician_grouped_metrics.csv" grouped.to_csv(table_path, index=False) confidence_table = output_dir / "physician_confidence_grouped_metrics.csv" std_table = output_dir / "physician_std_grouped_metrics.csv" secondary_table = output_dir / f"physician_{secondary_key}_grouped_metrics.csv" grouped[grouped["uncertainty_type"] == "confidence"].to_csv( confidence_table, index=False ) grouped[grouped["uncertainty_type"] == secondary_key].to_csv( secondary_table, index=False ) grouped[grouped["uncertainty_type"] == secondary_key].to_csv(std_table, index=False) out = { "n_overlap": int(len(merged)), "spearman_vs_dxconfid": correlations, "table": str(table_path), "tables": { "confidence": str(confidence_table), secondary_key: str(secondary_table), "std": str(std_table), }, "plots": plot_paths, } write_json(output_dir / "physician_summary.json", out) return out def _normalize_dx(value: Any) -> str: if value is None or (isinstance(value, float) and np.isnan(value)): return "" v = str(value).strip().upper() if v in {"NL", "NORMAL"}: return "CN" return v def run_longitudinal( evaluation: BackendEvaluation, clinical_df: pd.DataFrame, output_dir: Path, ) -> dict[str, Any]: secondary_key = ( "predictive_entropy" if evaluation.uncertainty_metric == "predictive_entropy" else "std" ) secondary_label = ( "Mean Model Predictive Entropy" if secondary_key == "predictive_entropy" else "Mean Model Uncertainty Std" ) required = ["Image Data ID", "PTID"] missing = [c for c in required if c not in clinical_df.columns] if missing: raise KeyError(f"Missing columns for longitudinal analysis: {missing}") diagnosis_col = None for candidate in ["Class", "DX", "Diagnosis"]: if candidate in clinical_df.columns: diagnosis_col = candidate break if diagnosis_col is None: raise KeyError( "No diagnosis column found. Expected one of: Class, DX, Diagnosis" ) work = clinical_df[ ["Image Data ID", "PTID", diagnosis_col] + [c for c in ["EXAMDATE"] if c in clinical_df.columns] ].copy() work["Image Data ID"] = pd.to_numeric(work["Image Data ID"], errors="coerce") work = work.dropna(subset=["Image Data ID", "PTID"]) work["Image Data ID"] = work["Image Data ID"].astype(int) work["PTID"] = work["PTID"].astype(str).str.strip() work["diagnosis"] = work[diagnosis_col].map(_normalize_dx) if "EXAMDATE" in work.columns: work["EXAMDATE"] = pd.to_datetime(work["EXAMDATE"], errors="coerce") work = work.sort_values(["PTID", "EXAMDATE"], na_position="last") else: work = work.sort_values(["PTID", "Image Data ID"]) eval_df = pd.DataFrame( { "Image Data ID": evaluation.image_ids.astype(int), "model_confidence": evaluation.uncertainty_confidence, "model_std": evaluation.uncertainty_std, "model_prob": evaluation.y_prob, } ) merged = work.merge(eval_df, on="Image Data ID", how="inner") if merged.empty: raise ValueError("No overlapping Image Data ID rows for longitudinal analysis") patient_rows: list[dict[str, Any]] = [] for ptid, group in merged.groupby("PTID"): diagnoses = [d for d in group["diagnosis"].tolist() if d] if len(diagnoses) < 2: continue first_dx = diagnoses[0] last_dx = diagnoses[-1] unique_dx = set(diagnoses) cohort = "other" if unique_dx.issubset({"CN"}): cohort = "stable_cn" elif unique_dx.issubset({"AD"}): cohort = "stable_ad" elif first_dx == "CN" and "AD" in unique_dx and last_dx == "AD": cohort = "cn_to_ad" patient_rows.append( { "PTID": ptid, "n_visits": int(len(group)), "first_dx": first_dx, "last_dx": last_dx, "cohort": cohort, "mean_confidence": float(group["model_confidence"].mean()), "mean_std": float(group["model_std"].mean()), "mean_prob": float(group["model_prob"].mean()), } ) patient_df = pd.DataFrame(patient_rows) table_path = output_dir / "longitudinal_patient_summary.csv" patient_df.to_csv(table_path, index=False) cohort_df = ( patient_df.groupby("cohort") .agg( n_patients=("PTID", "count"), mean_confidence=("mean_confidence", "mean"), mean_std=("mean_std", "mean"), mean_prob=("mean_prob", "mean"), ) .reset_index() ) cohort_table = output_dir / "longitudinal_cohort_summary.csv" cohort_df.to_csv(cohort_table, index=False) cohorts = ["stable_cn", "stable_ad", "cn_to_ad"] uncertainty_specs = [ ("confidence", "mean_confidence", "Mean Model Confidence"), (secondary_key, "mean_std", secondary_label), ] plot_paths: dict[str, str] = {} for metric_name, metric_col, metric_label in uncertainty_specs: values = [ np.asarray( patient_df.loc[patient_df["cohort"] == c, metric_col], dtype=float ) for c in cohorts ] plot_path = plots_dir(output_dir) / f"longitudinal_cohort_{metric_name}.png" save_boxplot( data=values, tick_labels=cohorts, x_label="Cohort", y_label=metric_label, title=f"Longitudinal Cohort {metric_label} ({evaluation.backend})", output_path=plot_path, ) plot_paths[metric_name] = str(plot_path) uncertainty_by_cohort = cohort_df.melt( id_vars=["cohort", "n_patients"], value_vars=["mean_confidence", "mean_std"], var_name="uncertainty_type", value_name="mean_value", ).replace( { "uncertainty_type": { "mean_confidence": "confidence", "mean_std": secondary_key, } } ) uncertainty_table = output_dir / "longitudinal_uncertainty_by_cohort.csv" uncertainty_by_cohort.to_csv(uncertainty_table, index=False) confidence_patient_table = ( output_dir / "longitudinal_confidence_patient_summary.csv" ) std_patient_table = output_dir / "longitudinal_std_patient_summary.csv" confidence_cohort_table = output_dir / "longitudinal_confidence_cohort_summary.csv" std_cohort_table = output_dir / "longitudinal_std_cohort_summary.csv" secondary_patient_table = ( output_dir / f"longitudinal_{secondary_key}_patient_summary.csv" ) secondary_cohort_table = ( output_dir / f"longitudinal_{secondary_key}_cohort_summary.csv" ) patient_df[ ["PTID", "n_visits", "first_dx", "last_dx", "cohort", "mean_confidence"] ].to_csv(confidence_patient_table, index=False) patient_df[ ["PTID", "n_visits", "first_dx", "last_dx", "cohort", "mean_std"] ].to_csv(std_patient_table, index=False) patient_df[ ["PTID", "n_visits", "first_dx", "last_dx", "cohort", "mean_std"] ].to_csv(secondary_patient_table, index=False) cohort_df[["cohort", "n_patients", "mean_confidence"]].to_csv( confidence_cohort_table, index=False ) cohort_df[["cohort", "n_patients", "mean_std"]].to_csv( std_cohort_table, index=False ) cohort_df[["cohort", "n_patients", "mean_std"]].to_csv( secondary_cohort_table, index=False ) out = { "n_patients_analyzed": int(len(patient_df)), "table_patient": str(table_path), "table_cohort": str(cohort_table), "table_uncertainty": str(uncertainty_table), "tables": { "confidence": { "patient": str(confidence_patient_table), "cohort": str(confidence_cohort_table), }, secondary_key: { "patient": str(secondary_patient_table), "cohort": str(secondary_cohort_table), }, "std": { "patient": str(std_patient_table), "cohort": str(std_cohort_table), }, }, "plots": plot_paths, } write_json(output_dir / "longitudinal_summary.json", out) return out