|
@@ -0,0 +1,691 @@
|
|
|
|
|
+# pyright: basic
|
|
|
|
|
+
|
|
|
|
|
+from __future__ import annotations
|
|
|
|
|
+
|
|
|
|
|
+from pathlib import Path
|
|
|
|
|
+import re
|
|
|
|
|
+from typing import Any
|
|
|
|
|
+
|
|
|
|
|
+import numpy as np
|
|
|
|
|
+import pandas as pd
|
|
|
|
|
+
|
|
|
|
|
+from .data_access import BackendEvaluation, physician_column
|
|
|
|
|
+from .defaults import DEFAULT_DECISION_THRESHOLD, uncertainty_cutoff_percentiles
|
|
|
|
|
+from .metrics import calibration_stats, performance_at_threshold, threshold_sweep
|
|
|
|
|
+from .plotting import (
|
|
|
|
|
+ plots_dir,
|
|
|
|
|
+ save_boxplot,
|
|
|
|
|
+ save_calibration_plot,
|
|
|
|
|
+ save_performance_threshold_plot,
|
|
|
|
|
+ save_performance_threshold_pair_plot,
|
|
|
|
|
+ save_uncertainty_cutoff_plot,
|
|
|
|
|
+ save_uncertainty_cutoff_pair_plot,
|
|
|
|
|
+)
|
|
|
|
|
+from .runtime import write_json
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+def _save_table(rows: list[dict[str, Any]], out_path: Path) -> pd.DataFrame:
|
|
|
|
|
+ df = pd.DataFrame(rows)
|
|
|
|
|
+ out_path.parent.mkdir(parents=True, exist_ok=True)
|
|
|
|
|
+ df.to_csv(out_path, index=False)
|
|
|
|
|
+ return df
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+def _uncertainty_percentiles(values: np.ndarray) -> np.ndarray:
|
|
|
|
|
+ vals = np.asarray(values, dtype=float)
|
|
|
|
|
+ n = len(vals)
|
|
|
|
|
+ if n == 0:
|
|
|
|
|
+ return np.asarray([], dtype=float)
|
|
|
|
|
+ if n == 1:
|
|
|
|
|
+ return np.asarray([0.0], dtype=float)
|
|
|
|
|
+
|
|
|
|
|
+ order = np.argsort(vals)
|
|
|
|
|
+ ranks = np.empty(n, dtype=float)
|
|
|
|
|
+ ranks[order] = np.arange(0, n, dtype=float)
|
|
|
|
|
+ return (ranks / float(n - 1)) * 100.0
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+def _save_ensemble_prediction_debug(
|
|
|
|
|
+ evaluation: BackendEvaluation,
|
|
|
|
|
+ output_dir: Path,
|
|
|
|
|
+) -> str:
|
|
|
|
|
+ uncertainty_vals = np.asarray(evaluation.uncertainty_confidence, dtype=float)
|
|
|
|
|
+ uncertainty_pct = _uncertainty_percentiles(uncertainty_vals)
|
|
|
|
|
+ pred_label = (evaluation.y_prob >= DEFAULT_DECISION_THRESHOLD).astype(int)
|
|
|
|
|
+ true_label = np.asarray(evaluation.y_true, dtype=int)
|
|
|
|
|
+ is_correct = (pred_label == true_label).astype(int)
|
|
|
|
|
+
|
|
|
|
|
+ debug_df = pd.DataFrame(
|
|
|
|
|
+ {
|
|
|
|
|
+ "image_id": evaluation.image_ids.astype(int),
|
|
|
|
|
+ "predicted_probability": np.asarray(evaluation.y_prob, dtype=float),
|
|
|
|
|
+ "predicted_label": pred_label,
|
|
|
|
|
+ "actual_label": true_label,
|
|
|
|
|
+ "is_correct": is_correct,
|
|
|
|
|
+ "confidence": uncertainty_vals,
|
|
|
|
|
+ "confidence_percentile": uncertainty_pct,
|
|
|
|
|
+ }
|
|
|
|
|
+ ).sort_values("confidence", ascending=False)
|
|
|
|
|
+
|
|
|
|
|
+ path = output_dir / "ensemble_prediction_debug.csv"
|
|
|
|
|
+ debug_df.to_csv(path, index=False)
|
|
|
|
|
+ return str(path)
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+def _uncertainty_cutoff_analysis(
|
|
|
|
|
+ evaluation: BackendEvaluation,
|
|
|
|
|
+ output_dir: Path,
|
|
|
|
|
+ uncertainty_types: list[tuple[str, np.ndarray]],
|
|
|
|
|
+ cutoff_percentiles: np.ndarray,
|
|
|
|
|
+ table_filename: str,
|
|
|
|
|
+ plot_filename_prefix: str,
|
|
|
|
|
+ title_prefix: str,
|
|
|
|
|
+ x_label: str,
|
|
|
|
|
+) -> dict[str, Any]:
|
|
|
|
|
+ def _slug(name: str) -> str:
|
|
|
|
|
+ s = re.sub(r"[^a-z0-9]+", "_", name.strip().lower())
|
|
|
|
|
+ return s.strip("_") or "metric"
|
|
|
|
|
+
|
|
|
|
|
+ def _confidence_like(name: str) -> bool:
|
|
|
|
|
+ return "confidence" in name.strip().lower()
|
|
|
|
|
+
|
|
|
|
|
+ cutoff_rows: list[dict[str, Any]] = []
|
|
|
|
|
+
|
|
|
|
|
+ for uncertainty_name, values in uncertainty_types:
|
|
|
|
|
+ finite_mask = np.isfinite(values)
|
|
|
|
|
+ if not finite_mask.any():
|
|
|
|
|
+ continue
|
|
|
|
|
+
|
|
|
|
|
+ values_valid = values[finite_mask]
|
|
|
|
|
+ y_true_valid = evaluation.y_true[finite_mask]
|
|
|
|
|
+ y_prob_valid = evaluation.y_prob[finite_mask]
|
|
|
|
|
+ keep_higher = _confidence_like(uncertainty_name)
|
|
|
|
|
+ selection_label = (
|
|
|
|
|
+ "metric >= percentile cutoff (higher is lower uncertainty)"
|
|
|
|
|
+ if keep_higher
|
|
|
|
|
+ else "metric <= percentile cutoff (lower is lower uncertainty)"
|
|
|
|
|
+ )
|
|
|
|
|
+
|
|
|
|
|
+ for cutoff_percentile in cutoff_percentiles:
|
|
|
|
|
+ cutoff_value = float(np.percentile(values_valid, cutoff_percentile))
|
|
|
|
|
+ if keep_higher:
|
|
|
|
|
+ keep_mask = values_valid >= cutoff_value
|
|
|
|
|
+ else:
|
|
|
|
|
+ keep_mask = values_valid <= cutoff_value
|
|
|
|
|
+
|
|
|
|
|
+ retained = int(keep_mask.sum())
|
|
|
|
|
+ if retained == 0:
|
|
|
|
|
+ continue
|
|
|
|
|
+
|
|
|
|
|
+ perf = performance_at_threshold(
|
|
|
|
|
+ y_true=y_true_valid[keep_mask],
|
|
|
|
|
+ y_prob=y_prob_valid[keep_mask],
|
|
|
|
|
+ threshold=DEFAULT_DECISION_THRESHOLD,
|
|
|
|
|
+ )
|
|
|
|
|
+ cutoff_rows.append(
|
|
|
|
|
+ {
|
|
|
|
|
+ "uncertainty_type": uncertainty_name,
|
|
|
|
|
+ "cutoff_percentile": float(cutoff_percentile),
|
|
|
|
|
+ "cutoff_value": cutoff_value,
|
|
|
|
|
+ "n_retained": retained,
|
|
|
|
|
+ "coverage": float(retained / len(values_valid)),
|
|
|
|
|
+ "selection_rule": selection_label,
|
|
|
|
|
+ "keep_higher": bool(keep_higher),
|
|
|
|
|
+ "accuracy": float(perf["accuracy"]),
|
|
|
|
|
+ "f1": float(perf["f1"]),
|
|
|
|
|
+ "n_correct": int(perf["tp"] + perf["tn"]),
|
|
|
|
|
+ "n_incorrect": int(perf["fp"] + perf["fn"]),
|
|
|
|
|
+ }
|
|
|
|
|
+ )
|
|
|
|
|
+
|
|
|
|
|
+ table_path = output_dir / table_filename
|
|
|
|
|
+ accuracy_plot_path = plots_dir(output_dir) / f"{plot_filename_prefix}_accuracy.png"
|
|
|
|
|
+ f1_plot_path = plots_dir(output_dir) / f"{plot_filename_prefix}_f1.png"
|
|
|
|
|
+ pair_plot_path = plots_dir(output_dir) / f"{plot_filename_prefix}_accuracy_f1.png"
|
|
|
|
|
+ if not cutoff_rows:
|
|
|
|
|
+ return {
|
|
|
|
|
+ "table": str(table_path),
|
|
|
|
|
+ "accuracy_plot": str(accuracy_plot_path),
|
|
|
|
|
+ "f1_plot": str(f1_plot_path),
|
|
|
|
|
+ "pair_plot": str(pair_plot_path),
|
|
|
|
|
+ "rows": 0,
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ cutoff_df = pd.DataFrame(cutoff_rows)
|
|
|
|
|
+ cutoff_df.to_csv(table_path, index=False)
|
|
|
|
|
+
|
|
|
|
|
+ # Restriction level increases from left to right so right-most points are most restricted.
|
|
|
|
|
+ cutoff_df["restriction_level"] = np.where(
|
|
|
|
|
+ cutoff_df["keep_higher"].astype(bool),
|
|
|
|
|
+ cutoff_df["cutoff_percentile"],
|
|
|
|
|
+ 100.0 - cutoff_df["cutoff_percentile"],
|
|
|
|
|
+ )
|
|
|
|
|
+
|
|
|
|
|
+ plots_by_uncertainty: dict[str, dict[str, str]] = {}
|
|
|
|
|
+ for uncertainty_name in sorted(pd.unique(cutoff_df["uncertainty_type"])):
|
|
|
|
|
+ sub_df = cutoff_df[cutoff_df["uncertainty_type"] == uncertainty_name].copy()
|
|
|
|
|
+ slug = _slug(str(uncertainty_name))
|
|
|
|
|
+ sub_accuracy_plot_path = (
|
|
|
|
|
+ plots_dir(output_dir) / f"{plot_filename_prefix}_{slug}_accuracy.png"
|
|
|
|
|
+ )
|
|
|
|
|
+ sub_f1_plot_path = (
|
|
|
|
|
+ plots_dir(output_dir) / f"{plot_filename_prefix}_{slug}_f1.png"
|
|
|
|
|
+ )
|
|
|
|
|
+ sub_pair_plot_path = (
|
|
|
|
|
+ plots_dir(output_dir) / f"{plot_filename_prefix}_{slug}_accuracy_f1.png"
|
|
|
|
|
+ )
|
|
|
|
|
+
|
|
|
|
|
+ save_uncertainty_cutoff_plot(
|
|
|
|
|
+ cutoff_df=sub_df,
|
|
|
|
|
+ title_prefix=f"{title_prefix} ({uncertainty_name})",
|
|
|
|
|
+ x_label=x_label,
|
|
|
|
|
+ output_path=sub_accuracy_plot_path,
|
|
|
|
|
+ metric_column="accuracy",
|
|
|
|
|
+ metric_label="Accuracy",
|
|
|
|
|
+ plot_key=f"{plot_filename_prefix}_{slug}_accuracy",
|
|
|
|
|
+ )
|
|
|
|
|
+ save_uncertainty_cutoff_plot(
|
|
|
|
|
+ cutoff_df=sub_df,
|
|
|
|
|
+ title_prefix=f"{title_prefix} ({uncertainty_name})",
|
|
|
|
|
+ x_label=x_label,
|
|
|
|
|
+ output_path=sub_f1_plot_path,
|
|
|
|
|
+ metric_column="f1",
|
|
|
|
|
+ metric_label="F1",
|
|
|
|
|
+ plot_key=f"{plot_filename_prefix}_{slug}_f1",
|
|
|
|
|
+ )
|
|
|
|
|
+ save_uncertainty_cutoff_pair_plot(
|
|
|
|
|
+ cutoff_df=sub_df,
|
|
|
|
|
+ title_prefix=f"{title_prefix} ({uncertainty_name})",
|
|
|
|
|
+ x_label=x_label,
|
|
|
|
|
+ output_path=sub_pair_plot_path,
|
|
|
|
|
+ plot_key=f"{plot_filename_prefix}_{slug}_accuracy_f1",
|
|
|
|
|
+ )
|
|
|
|
|
+
|
|
|
|
|
+ plots_by_uncertainty[str(uncertainty_name)] = {
|
|
|
|
|
+ "accuracy": str(sub_accuracy_plot_path),
|
|
|
|
|
+ "f1": str(sub_f1_plot_path),
|
|
|
|
|
+ "accuracy_f1": str(sub_pair_plot_path),
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ return {
|
|
|
|
|
+ "table": str(table_path),
|
|
|
|
|
+ "plots_by_uncertainty": plots_by_uncertainty,
|
|
|
|
|
+ "rows": int(len(cutoff_df)),
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+def run_performance(
|
|
|
|
|
+ evaluation: BackendEvaluation,
|
|
|
|
|
+ output_dir: Path,
|
|
|
|
|
+ thresholds: np.ndarray,
|
|
|
|
|
+) -> dict[str, Any]:
|
|
|
|
|
+ rows = threshold_sweep(evaluation.y_true, evaluation.y_prob, thresholds)
|
|
|
|
|
+ table_path = output_dir / "performance_threshold_sweep.csv"
|
|
|
|
|
+ df = _save_table(rows, table_path)
|
|
|
|
|
+
|
|
|
|
|
+ accuracy_plot_path = plots_dir(output_dir) / "performance_threshold_accuracy.png"
|
|
|
|
|
+ f1_plot_path = plots_dir(output_dir) / "performance_threshold_f1.png"
|
|
|
|
|
+ pair_plot_path = plots_dir(output_dir) / "performance_threshold_accuracy_f1.png"
|
|
|
|
|
+ save_performance_threshold_plot(
|
|
|
|
|
+ df=df,
|
|
|
|
|
+ backend=evaluation.backend,
|
|
|
|
|
+ output_path=accuracy_plot_path,
|
|
|
|
|
+ metric_column="accuracy",
|
|
|
|
|
+ metric_label="Accuracy",
|
|
|
|
|
+ plot_key="performance_threshold_accuracy",
|
|
|
|
|
+ )
|
|
|
|
|
+ save_performance_threshold_plot(
|
|
|
|
|
+ df=df,
|
|
|
|
|
+ backend=evaluation.backend,
|
|
|
|
|
+ output_path=f1_plot_path,
|
|
|
|
|
+ metric_column="f1",
|
|
|
|
|
+ metric_label="F1",
|
|
|
|
|
+ plot_key="performance_threshold_f1",
|
|
|
|
|
+ )
|
|
|
|
|
+ save_performance_threshold_pair_plot(
|
|
|
|
|
+ df=df,
|
|
|
|
|
+ backend=evaluation.backend,
|
|
|
|
|
+ output_path=pair_plot_path,
|
|
|
|
|
+ plot_key="performance_threshold_accuracy_f1",
|
|
|
|
|
+ )
|
|
|
|
|
+
|
|
|
|
|
+ best_idx = int(df["f1"].idxmax())
|
|
|
|
|
+ best = df.iloc[best_idx].to_dict()
|
|
|
|
|
+
|
|
|
|
|
+ cutoff_percentiles = uncertainty_cutoff_percentiles()
|
|
|
|
|
+ model_output_vals = np.asarray(evaluation.uncertainty_confidence, dtype=float)
|
|
|
|
|
+ secondary_uncertainty = np.asarray(evaluation.uncertainty_std, dtype=float)
|
|
|
|
|
+ secondary_uncertainty_name = (
|
|
|
|
|
+ "predictive uncertainty"
|
|
|
|
|
+ if evaluation.uncertainty_metric == "predictive_entropy"
|
|
|
|
|
+ else "standard deviation"
|
|
|
|
|
+ )
|
|
|
|
|
+ uncertainty_types = [
|
|
|
|
|
+ ("confidence", model_output_vals),
|
|
|
|
|
+ (secondary_uncertainty_name, secondary_uncertainty),
|
|
|
|
|
+ ]
|
|
|
|
|
+
|
|
|
|
|
+ uncertainty_cutoff = _uncertainty_cutoff_analysis(
|
|
|
|
|
+ evaluation=evaluation,
|
|
|
|
|
+ output_dir=output_dir,
|
|
|
|
|
+ uncertainty_types=uncertainty_types,
|
|
|
|
|
+ cutoff_percentiles=cutoff_percentiles,
|
|
|
|
|
+ table_filename="performance_uncertainty_cutoff.csv",
|
|
|
|
|
+ plot_filename_prefix="performance_uncertainty_cutoff",
|
|
|
|
|
+ title_prefix="Model Output / Uncertainty Cutoff Percentile",
|
|
|
|
|
+ x_label="Restriction Level (0 = all samples, 100 = most restricted subset)",
|
|
|
|
|
+ )
|
|
|
|
|
+
|
|
|
|
|
+ percentile_cutoffs = uncertainty_cutoff_percentiles()
|
|
|
|
|
+ uncertainty_percentile_cutoff = _uncertainty_cutoff_analysis(
|
|
|
|
|
+ evaluation=evaluation,
|
|
|
|
|
+ output_dir=output_dir,
|
|
|
|
|
+ uncertainty_types=uncertainty_types,
|
|
|
|
|
+ cutoff_percentiles=percentile_cutoffs,
|
|
|
|
|
+ table_filename="performance_uncertainty_percentile_cutoff.csv",
|
|
|
|
|
+ plot_filename_prefix="performance_uncertainty_percentile_cutoff",
|
|
|
|
|
+ title_prefix="Model Output / Uncertainty Percentile Floor",
|
|
|
|
|
+ x_label="Percentile Floor (0 = all samples, 100 = top percentile subset)",
|
|
|
|
|
+ )
|
|
|
|
|
+
|
|
|
|
|
+ cutoff_table_path = Path(uncertainty_cutoff["table"])
|
|
|
|
|
+ percentile_cutoff_table_path = Path(uncertainty_percentile_cutoff["table"])
|
|
|
|
|
+ summary = {
|
|
|
|
|
+ "best_by_f1": {
|
|
|
|
|
+ k: float(v) for k, v in best.items() if isinstance(v, (int, float))
|
|
|
|
|
+ },
|
|
|
|
|
+ "table": str(table_path),
|
|
|
|
|
+ "plots": {
|
|
|
|
|
+ "accuracy": str(accuracy_plot_path),
|
|
|
|
|
+ "f1": str(f1_plot_path),
|
|
|
|
|
+ "accuracy_f1": str(pair_plot_path),
|
|
|
|
|
+ },
|
|
|
|
|
+ "uncertainty_cutoff": {
|
|
|
|
|
+ "table": str(cutoff_table_path),
|
|
|
|
|
+ "plots_by_uncertainty": uncertainty_cutoff["plots_by_uncertainty"],
|
|
|
|
|
+ "decision_threshold": DEFAULT_DECISION_THRESHOLD,
|
|
|
|
|
+ },
|
|
|
|
|
+ "uncertainty_percentile_cutoff": {
|
|
|
|
|
+ "table": str(percentile_cutoff_table_path),
|
|
|
|
|
+ "plots_by_uncertainty": uncertainty_percentile_cutoff[
|
|
|
|
|
+ "plots_by_uncertainty"
|
|
|
|
|
+ ],
|
|
|
|
|
+ "decision_threshold": DEFAULT_DECISION_THRESHOLD,
|
|
|
|
|
+ },
|
|
|
|
|
+ }
|
|
|
|
|
+ if evaluation.backend == "ensemble":
|
|
|
|
|
+ summary["ensemble_prediction_debug"] = _save_ensemble_prediction_debug(
|
|
|
|
|
+ evaluation=evaluation,
|
|
|
|
|
+ output_dir=output_dir,
|
|
|
|
|
+ )
|
|
|
|
|
+ write_json(output_dir / "performance_summary.json", summary)
|
|
|
|
|
+ return summary
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+def run_calibration(
|
|
|
|
|
+ evaluation: BackendEvaluation,
|
|
|
|
|
+ output_dir: Path,
|
|
|
|
|
+ bins: int,
|
|
|
|
|
+) -> dict[str, Any]:
|
|
|
|
|
+ summary, per_bin = calibration_stats(
|
|
|
|
|
+ evaluation.y_true, evaluation.y_prob, bins=bins
|
|
|
|
|
+ )
|
|
|
|
|
+
|
|
|
|
|
+ bin_df = pd.DataFrame(
|
|
|
|
|
+ per_bin,
|
|
|
|
|
+ columns=["mean_confidence", "fraction_positive", "count"],
|
|
|
|
|
+ )
|
|
|
|
|
+ table_path = output_dir / "calibration_bins.csv"
|
|
|
|
|
+ bin_df.to_csv(table_path, index=False)
|
|
|
|
|
+
|
|
|
|
|
+ plot_path = plots_dir(output_dir) / "calibration_reliability.png"
|
|
|
|
|
+ save_calibration_plot(
|
|
|
|
|
+ per_bin=per_bin,
|
|
|
|
|
+ backend=evaluation.backend,
|
|
|
|
|
+ output_path=plot_path,
|
|
|
|
|
+ )
|
|
|
|
|
+
|
|
|
|
|
+ out = {
|
|
|
|
|
+ **summary,
|
|
|
|
|
+ "table": str(table_path),
|
|
|
|
|
+ "plot": str(plot_path),
|
|
|
|
|
+ }
|
|
|
|
|
+ write_json(output_dir / "calibration_summary.json", out)
|
|
|
|
|
+ return out
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+def run_physician(
|
|
|
|
|
+ evaluation: BackendEvaluation,
|
|
|
|
|
+ clinical_df: pd.DataFrame,
|
|
|
|
|
+ output_dir: Path,
|
|
|
|
|
+) -> dict[str, Any]:
|
|
|
|
|
+ secondary_key = (
|
|
|
|
|
+ "predictive_entropy"
|
|
|
|
|
+ if evaluation.uncertainty_metric == "predictive_entropy"
|
|
|
|
|
+ else "std"
|
|
|
|
|
+ )
|
|
|
|
|
+ secondary_label = (
|
|
|
|
|
+ "Predictive Uncertainty"
|
|
|
|
|
+ if secondary_key == "predictive_entropy"
|
|
|
|
|
+ else "Standard Deviation"
|
|
|
|
|
+ )
|
|
|
|
|
+
|
|
|
|
|
+ col = physician_column(clinical_df)
|
|
|
|
|
+ subset = clinical_df[["Image Data ID", col]].copy()
|
|
|
|
|
+ subset[col] = pd.to_numeric(subset[col], errors="coerce")
|
|
|
|
|
+ subset = subset.dropna(subset=["Image Data ID", col])
|
|
|
|
|
+ subset["Image Data ID"] = subset["Image Data ID"].astype(int)
|
|
|
|
|
+ subset[col] = subset[col].astype(int)
|
|
|
|
|
+
|
|
|
|
|
+ eval_df = pd.DataFrame(
|
|
|
|
|
+ {
|
|
|
|
|
+ "Image Data ID": evaluation.image_ids.astype(int),
|
|
|
|
|
+ "model_probability": evaluation.uncertainty_confidence,
|
|
|
|
|
+ "model_std": evaluation.uncertainty_std,
|
|
|
|
|
+ "model_prob": evaluation.y_prob,
|
|
|
|
|
+ }
|
|
|
|
|
+ )
|
|
|
|
|
+ merged = eval_df.merge(subset, on="Image Data ID", how="inner")
|
|
|
|
|
+
|
|
|
|
|
+ if merged.empty:
|
|
|
|
|
+ raise ValueError("No overlapping Image Data ID rows for physician analysis")
|
|
|
|
|
+
|
|
|
|
|
+ grouped_rows: list[dict[str, Any]] = []
|
|
|
|
|
+ uncertainty_specs = [
|
|
|
|
|
+ ("confidence", "model_probability", "Confidence"),
|
|
|
|
|
+ (secondary_key, "model_std", secondary_label),
|
|
|
|
|
+ ]
|
|
|
|
|
+ ratings = [int(r) for r in sorted(pd.unique(merged[col]))]
|
|
|
|
|
+ plot_paths: dict[str, str] = {}
|
|
|
|
|
+ correlations: dict[str, float] = {}
|
|
|
|
|
+
|
|
|
|
|
+ for metric_name, metric_col, metric_label in uncertainty_specs:
|
|
|
|
|
+ grouped_metric = (
|
|
|
|
|
+ merged.groupby(col)
|
|
|
|
|
+ .agg(
|
|
|
|
|
+ n=("Image Data ID", "count"),
|
|
|
|
|
+ mean_value=(metric_col, "mean"),
|
|
|
|
|
+ std_value=(metric_col, "std"),
|
|
|
|
|
+ mean_prob=("model_prob", "mean"),
|
|
|
|
|
+ )
|
|
|
|
|
+ .reset_index()
|
|
|
|
|
+ .rename(columns={col: "physician_rating"})
|
|
|
|
|
+ )
|
|
|
|
|
+ grouped_metric["uncertainty_type"] = metric_name
|
|
|
|
|
+ grouped_rows.extend(
|
|
|
|
|
+ [
|
|
|
|
|
+ {str(k): v for k, v in rec.items()}
|
|
|
|
|
+ for rec in grouped_metric.to_dict(orient="records")
|
|
|
|
|
+ ]
|
|
|
|
|
+ )
|
|
|
|
|
+
|
|
|
|
|
+ data = [
|
|
|
|
|
+ np.asarray(merged.loc[merged[col] == r, metric_col], dtype=float)
|
|
|
|
|
+ for r in ratings
|
|
|
|
|
+ ]
|
|
|
|
|
+ plot_path = plots_dir(output_dir) / f"physician_{metric_name}_boxplot.png"
|
|
|
|
|
+ save_boxplot(
|
|
|
|
|
+ data=data,
|
|
|
|
|
+ tick_labels=[str(r) for r in ratings],
|
|
|
|
|
+ x_label="Physician Confidence Rating (DXCONFID)",
|
|
|
|
|
+ y_label=metric_label,
|
|
|
|
|
+ title=f"{metric_label} by Physician Confidence Rating ({evaluation.backend})",
|
|
|
|
|
+ output_path=plot_path,
|
|
|
|
|
+ )
|
|
|
|
|
+
|
|
|
|
|
+ corr = float(
|
|
|
|
|
+ pd.to_numeric(
|
|
|
|
|
+ merged[[metric_col, col]].corr(method="spearman").iloc[0, 1],
|
|
|
|
|
+ errors="coerce",
|
|
|
|
|
+ )
|
|
|
|
|
+ )
|
|
|
|
|
+ correlations[metric_name] = corr
|
|
|
|
|
+ plot_paths[metric_name] = str(plot_path)
|
|
|
|
|
+
|
|
|
|
|
+ grouped = pd.DataFrame(grouped_rows)
|
|
|
|
|
+ table_path = output_dir / "physician_grouped_metrics.csv"
|
|
|
|
|
+ grouped.to_csv(table_path, index=False)
|
|
|
|
|
+
|
|
|
|
|
+ confidence_table = output_dir / "physician_confidence_grouped_metrics.csv"
|
|
|
|
|
+ std_table = output_dir / "physician_std_grouped_metrics.csv"
|
|
|
|
|
+ secondary_table = output_dir / f"physician_{secondary_key}_grouped_metrics.csv"
|
|
|
|
|
+ grouped[grouped["uncertainty_type"] == "confidence"].to_csv(
|
|
|
|
|
+ confidence_table, index=False
|
|
|
|
|
+ )
|
|
|
|
|
+ grouped[grouped["uncertainty_type"] == secondary_key].to_csv(
|
|
|
|
|
+ secondary_table, index=False
|
|
|
|
|
+ )
|
|
|
|
|
+ grouped[grouped["uncertainty_type"] == secondary_key].to_csv(std_table, index=False)
|
|
|
|
|
+
|
|
|
|
|
+ out = {
|
|
|
|
|
+ "n_overlap": int(len(merged)),
|
|
|
|
|
+ "spearman_vs_dxconfid": correlations,
|
|
|
|
|
+ "table": str(table_path),
|
|
|
|
|
+ "tables": {
|
|
|
|
|
+ "confidence": str(confidence_table),
|
|
|
|
|
+ secondary_key: str(secondary_table),
|
|
|
|
|
+ "std": str(std_table),
|
|
|
|
|
+ },
|
|
|
|
|
+ "plots": plot_paths,
|
|
|
|
|
+ }
|
|
|
|
|
+ write_json(output_dir / "physician_summary.json", out)
|
|
|
|
|
+ return out
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+def _normalize_dx(value: Any) -> str:
|
|
|
|
|
+ if value is None or (isinstance(value, float) and np.isnan(value)):
|
|
|
|
|
+ return ""
|
|
|
|
|
+
|
|
|
|
|
+ v = str(value).strip().upper()
|
|
|
|
|
+ if v in {"NL", "NORMAL"}:
|
|
|
|
|
+ return "CN"
|
|
|
|
|
+ return v
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+def run_longitudinal(
|
|
|
|
|
+ evaluation: BackendEvaluation,
|
|
|
|
|
+ clinical_df: pd.DataFrame,
|
|
|
|
|
+ output_dir: Path,
|
|
|
|
|
+) -> dict[str, Any]:
|
|
|
|
|
+ secondary_key = (
|
|
|
|
|
+ "predictive_entropy"
|
|
|
|
|
+ if evaluation.uncertainty_metric == "predictive_entropy"
|
|
|
|
|
+ else "std"
|
|
|
|
|
+ )
|
|
|
|
|
+ secondary_label = (
|
|
|
|
|
+ "Mean Predictive Uncertainty"
|
|
|
|
|
+ if secondary_key == "predictive_entropy"
|
|
|
|
|
+ else "Mean Standard Deviation"
|
|
|
|
|
+ )
|
|
|
|
|
+
|
|
|
|
|
+ required = ["Image Data ID", "PTID"]
|
|
|
|
|
+ missing = [c for c in required if c not in clinical_df.columns]
|
|
|
|
|
+ if missing:
|
|
|
|
|
+ raise KeyError(f"Missing columns for longitudinal analysis: {missing}")
|
|
|
|
|
+
|
|
|
|
|
+ diagnosis_col = None
|
|
|
|
|
+ for candidate in ["Class", "DX", "Diagnosis"]:
|
|
|
|
|
+ if candidate in clinical_df.columns:
|
|
|
|
|
+ diagnosis_col = candidate
|
|
|
|
|
+ break
|
|
|
|
|
+ if diagnosis_col is None:
|
|
|
|
|
+ raise KeyError(
|
|
|
|
|
+ "No diagnosis column found. Expected one of: Class, DX, Diagnosis"
|
|
|
|
|
+ )
|
|
|
|
|
+
|
|
|
|
|
+ work = clinical_df[
|
|
|
|
|
+ ["Image Data ID", "PTID", diagnosis_col]
|
|
|
|
|
+ + [c for c in ["EXAMDATE"] if c in clinical_df.columns]
|
|
|
|
|
+ ].copy()
|
|
|
|
|
+ work["Image Data ID"] = pd.to_numeric(work["Image Data ID"], errors="coerce")
|
|
|
|
|
+ work = work.dropna(subset=["Image Data ID", "PTID"])
|
|
|
|
|
+ work["Image Data ID"] = work["Image Data ID"].astype(int)
|
|
|
|
|
+ work["PTID"] = work["PTID"].astype(str).str.strip()
|
|
|
|
|
+ work["diagnosis"] = work[diagnosis_col].map(_normalize_dx)
|
|
|
|
|
+
|
|
|
|
|
+ if "EXAMDATE" in work.columns:
|
|
|
|
|
+ work["EXAMDATE"] = pd.to_datetime(work["EXAMDATE"], errors="coerce")
|
|
|
|
|
+ work = work.sort_values(["PTID", "EXAMDATE"], na_position="last")
|
|
|
|
|
+ else:
|
|
|
|
|
+ work = work.sort_values(["PTID", "Image Data ID"])
|
|
|
|
|
+
|
|
|
|
|
+ eval_df = pd.DataFrame(
|
|
|
|
|
+ {
|
|
|
|
|
+ "Image Data ID": evaluation.image_ids.astype(int),
|
|
|
|
|
+ "model_confidence": evaluation.uncertainty_confidence,
|
|
|
|
|
+ "model_std": evaluation.uncertainty_std,
|
|
|
|
|
+ "model_prob": evaluation.y_prob,
|
|
|
|
|
+ }
|
|
|
|
|
+ )
|
|
|
|
|
+ merged = work.merge(eval_df, on="Image Data ID", how="inner")
|
|
|
|
|
+
|
|
|
|
|
+ if merged.empty:
|
|
|
|
|
+ raise ValueError("No overlapping Image Data ID rows for longitudinal analysis")
|
|
|
|
|
+
|
|
|
|
|
+ patient_rows: list[dict[str, Any]] = []
|
|
|
|
|
+ for ptid, group in merged.groupby("PTID"):
|
|
|
|
|
+ diagnoses = [d for d in group["diagnosis"].tolist() if d]
|
|
|
|
|
+ if len(diagnoses) < 2:
|
|
|
|
|
+ continue
|
|
|
|
|
+
|
|
|
|
|
+ first_dx = diagnoses[0]
|
|
|
|
|
+ last_dx = diagnoses[-1]
|
|
|
|
|
+ unique_dx = set(diagnoses)
|
|
|
|
|
+
|
|
|
|
|
+ cohort = "other"
|
|
|
|
|
+ if unique_dx.issubset({"CN"}):
|
|
|
|
|
+ cohort = "stable_cn"
|
|
|
|
|
+ elif unique_dx.issubset({"AD"}):
|
|
|
|
|
+ cohort = "stable_ad"
|
|
|
|
|
+ elif first_dx == "CN" and last_dx == "AD" and unique_dx.issubset({"CN", "AD"}):
|
|
|
|
|
+ cohort = "cn_to_ad"
|
|
|
|
|
+
|
|
|
|
|
+ patient_rows.append(
|
|
|
|
|
+ {
|
|
|
|
|
+ "PTID": ptid,
|
|
|
|
|
+ "n_visits": int(len(group)),
|
|
|
|
|
+ "first_dx": first_dx,
|
|
|
|
|
+ "last_dx": last_dx,
|
|
|
|
|
+ "cohort": cohort,
|
|
|
|
|
+ "mean_confidence": float(group["model_confidence"].mean()),
|
|
|
|
|
+ "mean_std": float(group["model_std"].mean()),
|
|
|
|
|
+ "mean_prob": float(group["model_prob"].mean()),
|
|
|
|
|
+ }
|
|
|
|
|
+ )
|
|
|
|
|
+
|
|
|
|
|
+ patient_df = pd.DataFrame(patient_rows)
|
|
|
|
|
+ patient_df = patient_df[
|
|
|
|
|
+ patient_df["cohort"].isin(["stable_cn", "stable_ad", "cn_to_ad"])
|
|
|
|
|
+ ].copy()
|
|
|
|
|
+
|
|
|
|
|
+ table_path = output_dir / "longitudinal_patient_summary.csv"
|
|
|
|
|
+ patient_df.to_csv(table_path, index=False)
|
|
|
|
|
+
|
|
|
|
|
+ cohort_df = (
|
|
|
|
|
+ patient_df.groupby("cohort")
|
|
|
|
|
+ .agg(
|
|
|
|
|
+ n_patients=("PTID", "count"),
|
|
|
|
|
+ mean_confidence=("mean_confidence", "mean"),
|
|
|
|
|
+ mean_std=("mean_std", "mean"),
|
|
|
|
|
+ mean_prob=("mean_prob", "mean"),
|
|
|
|
|
+ )
|
|
|
|
|
+ .reset_index()
|
|
|
|
|
+ )
|
|
|
|
|
+ cohort_table = output_dir / "longitudinal_cohort_summary.csv"
|
|
|
|
|
+ cohort_df.to_csv(cohort_table, index=False)
|
|
|
|
|
+
|
|
|
|
|
+ cohorts = ["stable_cn", "stable_ad", "cn_to_ad"]
|
|
|
|
|
+ uncertainty_specs = [
|
|
|
|
|
+ ("confidence", "mean_confidence", "Mean Confidence"),
|
|
|
|
|
+ (secondary_key, "mean_std", secondary_label),
|
|
|
|
|
+ ]
|
|
|
|
|
+ plot_paths: dict[str, str] = {}
|
|
|
|
|
+ for metric_name, metric_col, metric_label in uncertainty_specs:
|
|
|
|
|
+ values = [
|
|
|
|
|
+ np.asarray(
|
|
|
|
|
+ patient_df.loc[patient_df["cohort"] == c, metric_col], dtype=float
|
|
|
|
|
+ )
|
|
|
|
|
+ for c in cohorts
|
|
|
|
|
+ ]
|
|
|
|
|
+ plot_path = plots_dir(output_dir) / f"longitudinal_cohort_{metric_name}.png"
|
|
|
|
|
+ save_boxplot(
|
|
|
|
|
+ data=values,
|
|
|
|
|
+ tick_labels=cohorts,
|
|
|
|
|
+ x_label="Longitudinal Cohort",
|
|
|
|
|
+ y_label=metric_label,
|
|
|
|
|
+ title=f"Longitudinal Cohort Comparison: {metric_label} ({evaluation.backend})",
|
|
|
|
|
+ output_path=plot_path,
|
|
|
|
|
+ )
|
|
|
|
|
+ plot_paths[metric_name] = str(plot_path)
|
|
|
|
|
+
|
|
|
|
|
+ uncertainty_by_cohort = cohort_df.melt(
|
|
|
|
|
+ id_vars=["cohort", "n_patients"],
|
|
|
|
|
+ value_vars=["mean_confidence", "mean_std"],
|
|
|
|
|
+ var_name="uncertainty_type",
|
|
|
|
|
+ value_name="mean_value",
|
|
|
|
|
+ ).replace(
|
|
|
|
|
+ {
|
|
|
|
|
+ "uncertainty_type": {
|
|
|
|
|
+ "mean_confidence": "confidence",
|
|
|
|
|
+ "mean_std": secondary_key,
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+ )
|
|
|
|
|
+ uncertainty_table = output_dir / "longitudinal_uncertainty_by_cohort.csv"
|
|
|
|
|
+ uncertainty_by_cohort.to_csv(uncertainty_table, index=False)
|
|
|
|
|
+
|
|
|
|
|
+ confidence_patient_table = (
|
|
|
|
|
+ output_dir / "longitudinal_confidence_patient_summary.csv"
|
|
|
|
|
+ )
|
|
|
|
|
+ std_patient_table = output_dir / "longitudinal_std_patient_summary.csv"
|
|
|
|
|
+ confidence_cohort_table = output_dir / "longitudinal_confidence_cohort_summary.csv"
|
|
|
|
|
+ std_cohort_table = output_dir / "longitudinal_std_cohort_summary.csv"
|
|
|
|
|
+ secondary_patient_table = (
|
|
|
|
|
+ output_dir / f"longitudinal_{secondary_key}_patient_summary.csv"
|
|
|
|
|
+ )
|
|
|
|
|
+ secondary_cohort_table = (
|
|
|
|
|
+ output_dir / f"longitudinal_{secondary_key}_cohort_summary.csv"
|
|
|
|
|
+ )
|
|
|
|
|
+
|
|
|
|
|
+ patient_df[
|
|
|
|
|
+ ["PTID", "n_visits", "first_dx", "last_dx", "cohort", "mean_confidence"]
|
|
|
|
|
+ ].to_csv(confidence_patient_table, index=False)
|
|
|
|
|
+ patient_df[
|
|
|
|
|
+ ["PTID", "n_visits", "first_dx", "last_dx", "cohort", "mean_std"]
|
|
|
|
|
+ ].to_csv(std_patient_table, index=False)
|
|
|
|
|
+ patient_df[
|
|
|
|
|
+ ["PTID", "n_visits", "first_dx", "last_dx", "cohort", "mean_std"]
|
|
|
|
|
+ ].to_csv(secondary_patient_table, index=False)
|
|
|
|
|
+ cohort_df[["cohort", "n_patients", "mean_confidence"]].to_csv(
|
|
|
|
|
+ confidence_cohort_table, index=False
|
|
|
|
|
+ )
|
|
|
|
|
+ cohort_df[["cohort", "n_patients", "mean_std"]].to_csv(
|
|
|
|
|
+ std_cohort_table, index=False
|
|
|
|
|
+ )
|
|
|
|
|
+ cohort_df[["cohort", "n_patients", "mean_std"]].to_csv(
|
|
|
|
|
+ secondary_cohort_table, index=False
|
|
|
|
|
+ )
|
|
|
|
|
+
|
|
|
|
|
+ out = {
|
|
|
|
|
+ "n_patients_analyzed": int(len(patient_df)),
|
|
|
|
|
+ "table_patient": str(table_path),
|
|
|
|
|
+ "table_cohort": str(cohort_table),
|
|
|
|
|
+ "table_uncertainty": str(uncertainty_table),
|
|
|
|
|
+ "tables": {
|
|
|
|
|
+ "confidence": {
|
|
|
|
|
+ "patient": str(confidence_patient_table),
|
|
|
|
|
+ "cohort": str(confidence_cohort_table),
|
|
|
|
|
+ },
|
|
|
|
|
+ secondary_key: {
|
|
|
|
|
+ "patient": str(secondary_patient_table),
|
|
|
|
|
+ "cohort": str(secondary_cohort_table),
|
|
|
|
|
+ },
|
|
|
|
|
+ "std": {
|
|
|
|
|
+ "patient": str(std_patient_table),
|
|
|
|
|
+ "cohort": str(std_cohort_table),
|
|
|
|
|
+ },
|
|
|
|
|
+ },
|
|
|
|
|
+ "plots": plot_paths,
|
|
|
|
|
+ }
|
|
|
|
|
+ write_json(output_dir / "longitudinal_summary.json", out)
|
|
|
|
|
+ return out
|