# pyright: basic from __future__ import annotations from pathlib import Path import re from typing import Any import numpy as np import pandas as pd from .data_access import BackendEvaluation, physician_column from .defaults import DEFAULT_DECISION_THRESHOLD, uncertainty_cutoff_percentiles from .metrics import calibration_stats, performance_at_threshold, threshold_sweep from .plotting import ( plots_dir, save_boxplot, save_calibration_plot, save_performance_threshold_plot, save_performance_threshold_pair_plot, save_uncertainty_cutoff_plot, save_uncertainty_cutoff_pair_plot, ) from .runtime import write_json def _save_table(rows: list[dict[str, Any]], out_path: Path) -> pd.DataFrame: df = pd.DataFrame(rows) out_path.parent.mkdir(parents=True, exist_ok=True) df.to_csv(out_path, index=False) return df def _uncertainty_percentiles(values: np.ndarray) -> np.ndarray: vals = np.asarray(values, dtype=float) n = len(vals) if n == 0: return np.asarray([], dtype=float) if n == 1: return np.asarray([0.0], dtype=float) order = np.argsort(vals) ranks = np.empty(n, dtype=float) ranks[order] = np.arange(0, n, dtype=float) return (ranks / float(n - 1)) * 100.0 def _save_ensemble_prediction_debug( evaluation: BackendEvaluation, output_dir: Path, ) -> str: uncertainty_vals = np.asarray(evaluation.uncertainty_confidence, dtype=float) uncertainty_pct = _uncertainty_percentiles(uncertainty_vals) pred_label = (evaluation.y_prob >= DEFAULT_DECISION_THRESHOLD).astype(int) true_label = np.asarray(evaluation.y_true, dtype=int) is_correct = (pred_label == true_label).astype(int) debug_df = pd.DataFrame( { "image_id": evaluation.image_ids.astype(int), "predicted_probability": np.asarray(evaluation.y_prob, dtype=float), "predicted_label": pred_label, "actual_label": true_label, "is_correct": is_correct, "confidence": uncertainty_vals, "confidence_percentile": uncertainty_pct, } ).sort_values("confidence", ascending=False) path = output_dir / "ensemble_prediction_debug.csv" debug_df.to_csv(path, index=False) return str(path) def _uncertainty_cutoff_analysis( evaluation: BackendEvaluation, output_dir: Path, uncertainty_types: list[tuple[str, np.ndarray]], cutoff_percentiles: np.ndarray, table_filename: str, plot_filename_prefix: str, title_prefix: str, x_label: str, ) -> dict[str, Any]: def _slug(name: str) -> str: s = re.sub(r"[^a-z0-9]+", "_", name.strip().lower()) return s.strip("_") or "metric" def _confidence_like(name: str) -> bool: return "confidence" in name.strip().lower() cutoff_rows: list[dict[str, Any]] = [] for uncertainty_name, values in uncertainty_types: finite_mask = np.isfinite(values) if not finite_mask.any(): continue values_valid = values[finite_mask] y_true_valid = evaluation.y_true[finite_mask] y_prob_valid = evaluation.y_prob[finite_mask] keep_higher = _confidence_like(uncertainty_name) selection_label = ( "metric >= percentile cutoff (higher is lower uncertainty)" if keep_higher else "metric <= percentile cutoff (lower is lower uncertainty)" ) for cutoff_percentile in cutoff_percentiles: cutoff_value = float(np.percentile(values_valid, cutoff_percentile)) if keep_higher: keep_mask = values_valid >= cutoff_value else: keep_mask = values_valid <= cutoff_value retained = int(keep_mask.sum()) if retained == 0: continue perf = performance_at_threshold( y_true=y_true_valid[keep_mask], y_prob=y_prob_valid[keep_mask], threshold=DEFAULT_DECISION_THRESHOLD, ) cutoff_rows.append( { "uncertainty_type": uncertainty_name, "cutoff_percentile": float(cutoff_percentile), "cutoff_value": cutoff_value, "n_retained": retained, "coverage": float(retained / len(values_valid)), "selection_rule": selection_label, "keep_higher": bool(keep_higher), "accuracy": float(perf["accuracy"]), "f1": float(perf["f1"]), "n_correct": int(perf["tp"] + perf["tn"]), "n_incorrect": int(perf["fp"] + perf["fn"]), } ) table_path = output_dir / table_filename accuracy_plot_path = plots_dir(output_dir) / f"{plot_filename_prefix}_accuracy.png" f1_plot_path = plots_dir(output_dir) / f"{plot_filename_prefix}_f1.png" pair_plot_path = plots_dir(output_dir) / f"{plot_filename_prefix}_accuracy_f1.png" if not cutoff_rows: return { "table": str(table_path), "accuracy_plot": str(accuracy_plot_path), "f1_plot": str(f1_plot_path), "pair_plot": str(pair_plot_path), "rows": 0, } cutoff_df = pd.DataFrame(cutoff_rows) cutoff_df.to_csv(table_path, index=False) # Restriction level increases from left to right so right-most points are most restricted. cutoff_df["restriction_level"] = np.where( cutoff_df["keep_higher"].astype(bool), cutoff_df["cutoff_percentile"], 100.0 - cutoff_df["cutoff_percentile"], ) plots_by_uncertainty: dict[str, dict[str, str]] = {} for uncertainty_name in sorted(pd.unique(cutoff_df["uncertainty_type"])): sub_df = cutoff_df[cutoff_df["uncertainty_type"] == uncertainty_name].copy() slug = _slug(str(uncertainty_name)) sub_accuracy_plot_path = ( plots_dir(output_dir) / f"{plot_filename_prefix}_{slug}_accuracy.png" ) sub_f1_plot_path = ( plots_dir(output_dir) / f"{plot_filename_prefix}_{slug}_f1.png" ) sub_pair_plot_path = ( plots_dir(output_dir) / f"{plot_filename_prefix}_{slug}_accuracy_f1.png" ) save_uncertainty_cutoff_plot( cutoff_df=sub_df, title_prefix=f"{title_prefix} ({uncertainty_name})", x_label=x_label, output_path=sub_accuracy_plot_path, metric_column="accuracy", metric_label="Accuracy", plot_key=f"{plot_filename_prefix}_{slug}_accuracy", ) save_uncertainty_cutoff_plot( cutoff_df=sub_df, title_prefix=f"{title_prefix} ({uncertainty_name})", x_label=x_label, output_path=sub_f1_plot_path, metric_column="f1", metric_label="F1", plot_key=f"{plot_filename_prefix}_{slug}_f1", ) save_uncertainty_cutoff_pair_plot( cutoff_df=sub_df, title_prefix=f"{title_prefix} ({uncertainty_name})", x_label=x_label, output_path=sub_pair_plot_path, plot_key=f"{plot_filename_prefix}_{slug}_accuracy_f1", ) plots_by_uncertainty[str(uncertainty_name)] = { "accuracy": str(sub_accuracy_plot_path), "f1": str(sub_f1_plot_path), "accuracy_f1": str(sub_pair_plot_path), } return { "table": str(table_path), "plots_by_uncertainty": plots_by_uncertainty, "rows": int(len(cutoff_df)), } def run_performance( evaluation: BackendEvaluation, output_dir: Path, thresholds: np.ndarray, ) -> dict[str, Any]: rows = threshold_sweep(evaluation.y_true, evaluation.y_prob, thresholds) table_path = output_dir / "performance_threshold_sweep.csv" df = _save_table(rows, table_path) accuracy_plot_path = plots_dir(output_dir) / "performance_threshold_accuracy.png" f1_plot_path = plots_dir(output_dir) / "performance_threshold_f1.png" pair_plot_path = plots_dir(output_dir) / "performance_threshold_accuracy_f1.png" save_performance_threshold_plot( df=df, backend=evaluation.backend, output_path=accuracy_plot_path, metric_column="accuracy", metric_label="Accuracy", plot_key="performance_threshold_accuracy", ) save_performance_threshold_plot( df=df, backend=evaluation.backend, output_path=f1_plot_path, metric_column="f1", metric_label="F1", plot_key="performance_threshold_f1", ) save_performance_threshold_pair_plot( df=df, backend=evaluation.backend, output_path=pair_plot_path, plot_key="performance_threshold_accuracy_f1", ) best_idx = int(df["f1"].idxmax()) best = df.iloc[best_idx].to_dict() cutoff_percentiles = uncertainty_cutoff_percentiles() model_output_vals = np.asarray(evaluation.uncertainty_confidence, dtype=float) secondary_uncertainty = np.asarray(evaluation.uncertainty_std, dtype=float) secondary_uncertainty_name = ( "predictive uncertainty" if evaluation.uncertainty_metric == "predictive_entropy" else "standard deviation" ) uncertainty_types = [ ("confidence", model_output_vals), (secondary_uncertainty_name, secondary_uncertainty), ] uncertainty_cutoff = _uncertainty_cutoff_analysis( evaluation=evaluation, output_dir=output_dir, uncertainty_types=uncertainty_types, cutoff_percentiles=cutoff_percentiles, table_filename="performance_uncertainty_cutoff.csv", plot_filename_prefix="performance_uncertainty_cutoff", title_prefix="Model Output / Uncertainty Cutoff Percentile", x_label="Restriction Level (0 = all samples, 100 = most restricted subset)", ) percentile_cutoffs = uncertainty_cutoff_percentiles() uncertainty_percentile_cutoff = _uncertainty_cutoff_analysis( evaluation=evaluation, output_dir=output_dir, uncertainty_types=uncertainty_types, cutoff_percentiles=percentile_cutoffs, table_filename="performance_uncertainty_percentile_cutoff.csv", plot_filename_prefix="performance_uncertainty_percentile_cutoff", title_prefix="Model Output / Uncertainty Percentile Floor", x_label="Percentile Floor (0 = all samples, 100 = top percentile subset)", ) cutoff_table_path = Path(uncertainty_cutoff["table"]) percentile_cutoff_table_path = Path(uncertainty_percentile_cutoff["table"]) summary = { "best_by_f1": { k: float(v) for k, v in best.items() if isinstance(v, (int, float)) }, "table": str(table_path), "plots": { "accuracy": str(accuracy_plot_path), "f1": str(f1_plot_path), "accuracy_f1": str(pair_plot_path), }, "uncertainty_cutoff": { "table": str(cutoff_table_path), "plots_by_uncertainty": uncertainty_cutoff["plots_by_uncertainty"], "decision_threshold": DEFAULT_DECISION_THRESHOLD, }, "uncertainty_percentile_cutoff": { "table": str(percentile_cutoff_table_path), "plots_by_uncertainty": uncertainty_percentile_cutoff[ "plots_by_uncertainty" ], "decision_threshold": DEFAULT_DECISION_THRESHOLD, }, } if evaluation.backend == "ensemble": summary["ensemble_prediction_debug"] = _save_ensemble_prediction_debug( evaluation=evaluation, output_dir=output_dir, ) write_json(output_dir / "performance_summary.json", summary) return summary def run_calibration( evaluation: BackendEvaluation, output_dir: Path, bins: int, ) -> dict[str, Any]: summary, per_bin = calibration_stats( evaluation.y_true, evaluation.y_prob, bins=bins ) bin_df = pd.DataFrame( per_bin, columns=["mean_confidence", "fraction_positive", "count"], ) table_path = output_dir / "calibration_bins.csv" bin_df.to_csv(table_path, index=False) plot_path = plots_dir(output_dir) / "calibration_reliability.png" save_calibration_plot( per_bin=per_bin, backend=evaluation.backend, output_path=plot_path, ) out = { **summary, "table": str(table_path), "plot": str(plot_path), } write_json(output_dir / "calibration_summary.json", out) return out def run_physician( evaluation: BackendEvaluation, clinical_df: pd.DataFrame, output_dir: Path, ) -> dict[str, Any]: secondary_key = ( "predictive_entropy" if evaluation.uncertainty_metric == "predictive_entropy" else "std" ) secondary_label = ( "Predictive Uncertainty" if secondary_key == "predictive_entropy" else "Standard Deviation" ) col = physician_column(clinical_df) subset = clinical_df[["Image Data ID", col]].copy() subset[col] = pd.to_numeric(subset[col], errors="coerce") subset = subset.dropna(subset=["Image Data ID", col]) subset["Image Data ID"] = subset["Image Data ID"].astype(int) subset[col] = subset[col].astype(int) eval_df = pd.DataFrame( { "Image Data ID": evaluation.image_ids.astype(int), "model_probability": evaluation.uncertainty_confidence, "model_std": evaluation.uncertainty_std, "model_prob": evaluation.y_prob, } ) merged = eval_df.merge(subset, on="Image Data ID", how="inner") if merged.empty: raise ValueError("No overlapping Image Data ID rows for physician analysis") grouped_rows: list[dict[str, Any]] = [] uncertainty_specs = [ ("confidence", "model_probability", "Confidence"), (secondary_key, "model_std", secondary_label), ] ratings = [int(r) for r in sorted(pd.unique(merged[col]))] plot_paths: dict[str, str] = {} correlations: dict[str, float] = {} for metric_name, metric_col, metric_label in uncertainty_specs: grouped_metric = ( merged.groupby(col) .agg( n=("Image Data ID", "count"), mean_value=(metric_col, "mean"), std_value=(metric_col, "std"), mean_prob=("model_prob", "mean"), ) .reset_index() .rename(columns={col: "physician_rating"}) ) grouped_metric["uncertainty_type"] = metric_name grouped_rows.extend( [ {str(k): v for k, v in rec.items()} for rec in grouped_metric.to_dict(orient="records") ] ) data = [ np.asarray(merged.loc[merged[col] == r, metric_col], dtype=float) for r in ratings ] plot_path = plots_dir(output_dir) / f"physician_{metric_name}_boxplot.png" save_boxplot( data=data, tick_labels=[str(r) for r in ratings], x_label="Physician Confidence Rating (DXCONFID)", y_label=metric_label, title=f"{metric_label} by Physician Confidence Rating ({evaluation.backend})", output_path=plot_path, ) corr = float( pd.to_numeric( merged[[metric_col, col]].corr(method="spearman").iloc[0, 1], errors="coerce", ) ) correlations[metric_name] = corr plot_paths[metric_name] = str(plot_path) grouped = pd.DataFrame(grouped_rows) table_path = output_dir / "physician_grouped_metrics.csv" grouped.to_csv(table_path, index=False) confidence_table = output_dir / "physician_confidence_grouped_metrics.csv" std_table = output_dir / "physician_std_grouped_metrics.csv" secondary_table = output_dir / f"physician_{secondary_key}_grouped_metrics.csv" grouped[grouped["uncertainty_type"] == "confidence"].to_csv( confidence_table, index=False ) grouped[grouped["uncertainty_type"] == secondary_key].to_csv( secondary_table, index=False ) grouped[grouped["uncertainty_type"] == secondary_key].to_csv(std_table, index=False) out = { "n_overlap": int(len(merged)), "spearman_vs_dxconfid": correlations, "table": str(table_path), "tables": { "confidence": str(confidence_table), secondary_key: str(secondary_table), "std": str(std_table), }, "plots": plot_paths, } write_json(output_dir / "physician_summary.json", out) return out def _normalize_dx(value: Any) -> str: if value is None or (isinstance(value, float) and np.isnan(value)): return "" v = str(value).strip().upper() if v in {"NL", "NORMAL"}: return "CN" return v def run_longitudinal( evaluation: BackendEvaluation, clinical_df: pd.DataFrame, output_dir: Path, ) -> dict[str, Any]: secondary_key = ( "predictive_entropy" if evaluation.uncertainty_metric == "predictive_entropy" else "std" ) secondary_label = ( "Mean Predictive Uncertainty" if secondary_key == "predictive_entropy" else "Mean Standard Deviation" ) required = ["Image Data ID", "PTID"] missing = [c for c in required if c not in clinical_df.columns] if missing: raise KeyError(f"Missing columns for longitudinal analysis: {missing}") diagnosis_col = None for candidate in ["Class", "DX", "Diagnosis"]: if candidate in clinical_df.columns: diagnosis_col = candidate break if diagnosis_col is None: raise KeyError( "No diagnosis column found. Expected one of: Class, DX, Diagnosis" ) work = clinical_df[ ["Image Data ID", "PTID", diagnosis_col] + [c for c in ["EXAMDATE"] if c in clinical_df.columns] ].copy() work["Image Data ID"] = pd.to_numeric(work["Image Data ID"], errors="coerce") work = work.dropna(subset=["Image Data ID", "PTID"]) work["Image Data ID"] = work["Image Data ID"].astype(int) work["PTID"] = work["PTID"].astype(str).str.strip() work["diagnosis"] = work[diagnosis_col].map(_normalize_dx) if "EXAMDATE" in work.columns: work["EXAMDATE"] = pd.to_datetime(work["EXAMDATE"], errors="coerce") work = work.sort_values(["PTID", "EXAMDATE"], na_position="last") else: work = work.sort_values(["PTID", "Image Data ID"]) eval_df = pd.DataFrame( { "Image Data ID": evaluation.image_ids.astype(int), "model_confidence": evaluation.uncertainty_confidence, "model_std": evaluation.uncertainty_std, "model_prob": evaluation.y_prob, } ) merged = work.merge(eval_df, on="Image Data ID", how="inner") if merged.empty: raise ValueError("No overlapping Image Data ID rows for longitudinal analysis") patient_rows: list[dict[str, Any]] = [] for ptid, group in merged.groupby("PTID"): diagnoses = [d for d in group["diagnosis"].tolist() if d] if len(diagnoses) < 2: continue first_dx = diagnoses[0] last_dx = diagnoses[-1] unique_dx = set(diagnoses) cohort = "other" if unique_dx.issubset({"CN"}): cohort = "stable_cn" elif unique_dx.issubset({"AD"}): cohort = "stable_ad" elif first_dx == "CN" and last_dx == "AD" and unique_dx.issubset({"CN", "AD"}): cohort = "cn_to_ad" patient_rows.append( { "PTID": ptid, "n_visits": int(len(group)), "first_dx": first_dx, "last_dx": last_dx, "cohort": cohort, "mean_confidence": float(group["model_confidence"].mean()), "mean_std": float(group["model_std"].mean()), "mean_prob": float(group["model_prob"].mean()), } ) patient_df = pd.DataFrame(patient_rows) patient_df = patient_df[ patient_df["cohort"].isin(["stable_cn", "stable_ad", "cn_to_ad"]) ].copy() table_path = output_dir / "longitudinal_patient_summary.csv" patient_df.to_csv(table_path, index=False) cohort_df = ( patient_df.groupby("cohort") .agg( n_patients=("PTID", "count"), mean_confidence=("mean_confidence", "mean"), mean_std=("mean_std", "mean"), mean_prob=("mean_prob", "mean"), ) .reset_index() ) cohort_table = output_dir / "longitudinal_cohort_summary.csv" cohort_df.to_csv(cohort_table, index=False) cohorts = ["stable_cn", "stable_ad", "cn_to_ad"] uncertainty_specs = [ ("confidence", "mean_confidence", "Mean Confidence"), (secondary_key, "mean_std", secondary_label), ] plot_paths: dict[str, str] = {} for metric_name, metric_col, metric_label in uncertainty_specs: values = [ np.asarray( patient_df.loc[patient_df["cohort"] == c, metric_col], dtype=float ) for c in cohorts ] plot_path = plots_dir(output_dir) / f"longitudinal_cohort_{metric_name}.png" save_boxplot( data=values, tick_labels=cohorts, x_label="Longitudinal Cohort", y_label=metric_label, title=f"Longitudinal Cohort Comparison: {metric_label} ({evaluation.backend})", output_path=plot_path, ) plot_paths[metric_name] = str(plot_path) uncertainty_by_cohort = cohort_df.melt( id_vars=["cohort", "n_patients"], value_vars=["mean_confidence", "mean_std"], var_name="uncertainty_type", value_name="mean_value", ).replace( { "uncertainty_type": { "mean_confidence": "confidence", "mean_std": secondary_key, } } ) uncertainty_table = output_dir / "longitudinal_uncertainty_by_cohort.csv" uncertainty_by_cohort.to_csv(uncertainty_table, index=False) confidence_patient_table = ( output_dir / "longitudinal_confidence_patient_summary.csv" ) std_patient_table = output_dir / "longitudinal_std_patient_summary.csv" confidence_cohort_table = output_dir / "longitudinal_confidence_cohort_summary.csv" std_cohort_table = output_dir / "longitudinal_std_cohort_summary.csv" secondary_patient_table = ( output_dir / f"longitudinal_{secondary_key}_patient_summary.csv" ) secondary_cohort_table = ( output_dir / f"longitudinal_{secondary_key}_cohort_summary.csv" ) patient_df[ ["PTID", "n_visits", "first_dx", "last_dx", "cohort", "mean_confidence"] ].to_csv(confidence_patient_table, index=False) patient_df[ ["PTID", "n_visits", "first_dx", "last_dx", "cohort", "mean_std"] ].to_csv(std_patient_table, index=False) patient_df[ ["PTID", "n_visits", "first_dx", "last_dx", "cohort", "mean_std"] ].to_csv(secondary_patient_table, index=False) cohort_df[["cohort", "n_patients", "mean_confidence"]].to_csv( confidence_cohort_table, index=False ) cohort_df[["cohort", "n_patients", "mean_std"]].to_csv( std_cohort_table, index=False ) cohort_df[["cohort", "n_patients", "mean_std"]].to_csv( secondary_cohort_table, index=False ) out = { "n_patients_analyzed": int(len(patient_df)), "table_patient": str(table_path), "table_cohort": str(cohort_table), "table_uncertainty": str(uncertainty_table), "tables": { "confidence": { "patient": str(confidence_patient_table), "cohort": str(confidence_cohort_table), }, secondary_key: { "patient": str(secondary_patient_table), "cohort": str(secondary_cohort_table), }, "std": { "patient": str(std_patient_table), "cohort": str(std_cohort_table), }, }, "plots": plot_paths, } write_json(output_dir / "longitudinal_summary.json", out) return out