""" Interactive Shiny app for inspecting processed PET SUV NIfTI images. Run from project root: shiny run --reload app.py Required packages: pip install shiny shinywidgets shinyswatch plotly pandas nibabel scipy scikit-image Expected project structure: project_root/ ├── app.py ├── data/ │ ├── gen/ │ └── raw/ └── spatial_suv_charact/ ├── image_io.py ├── metadata.py ├── plotting.py ├── spatial_features.py └── suv_stats.py """ from __future__ import annotations from pathlib import Path import sys import traceback import pandas as pd import plotly.graph_objects as go from shiny import App, Inputs, Outputs, Session, reactive, render, ui from shinywidgets import output_widget, render_widget # ----------------------------------------------------------------------------- # Project paths # ----------------------------------------------------------------------------- APP_DIR = "." # str(Path(__file__).resolve().parent) PROJECT_ROOT = APP_DIR if str(PROJECT_ROOT) not in sys.path: sys.path.insert(0, str(PROJECT_ROOT)) import os DATA_RAW = os.path.join(PROJECT_ROOT, "data", "raw") DATA_GEN = os.path.join(PROJECT_ROOT, "data", "gen") DEFAULT_METADATA_PATH = os.path.join(DATA_GEN, "metadata.pkl") # ----------------------------------------------------------------------------- # Local package imports # ----------------------------------------------------------------------------- from spatial_suv_charact.metadata import ( # noqa: E402 get_meta_data, flag_corrupted_files, flag_AE_patients, ) from spatial_suv_charact.image_io import get_processed_image # noqa: E402 from spatial_suv_charact.plotting import ( # noqa: E402 plot_suv_pdf_plotly, plot_hot_voxels_plotly, ) from spatial_suv_charact.spatial_features import compute_tail_spatial_features # noqa: E402 # ----------------------------------------------------------------------------- # Helper functions # ----------------------------------------------------------------------------- def _empty_figure(message: str = "No plot available") -> go.Figure: """Return an empty Plotly figure with a centered annotation.""" fig = go.Figure() fig.add_annotation( text=message, xref="paper", yref="paper", x=0.5, y=0.5, showarrow=False, ) fig.update_layout( template="plotly_white", xaxis={"visible": False}, yaxis={"visible": False}, height=500, ) return fig def _load_metadata() -> pd.DataFrame: """Load metadata table from disk or build it from DATA_RAW.""" if DEFAULT_METADATA_PATH.exists(): df_meta = pd.read_pickle(DEFAULT_METADATA_PATH) else: df_meta = get_meta_data(str(DATA_RAW)) df_meta = flag_corrupted_files(df_meta) df_meta = flag_AE_patients(df_meta) DATA_GEN.mkdir(parents=True, exist_ok=True) df_meta.to_pickle(DEFAULT_METADATA_PATH) if not isinstance(df_meta.index, pd.MultiIndex): required_cols = {"patient_id", "organ", "visit"} if required_cols.issubset(df_meta.columns): df_meta = df_meta.set_index(["patient_id", "organ", "visit"]) else: raise ValueError( "Metadata table must have MultiIndex (patient_id, organ, visit) " "or columns patient_id, organ, visit." ) required_columns = {"PET_path", "SEG_path"} missing = required_columns.difference(df_meta.columns) if missing: raise ValueError(f"Metadata table is missing required columns: {missing}") return df_meta.sort_index() DF_META = _load_metadata() def _metadata_for_display(df_meta: pd.DataFrame) -> pd.DataFrame: """Return metadata with index columns exposed and an internal row_id.""" df = df_meta.reset_index().copy() df.insert(0, "row_id", range(len(df))) preferred = [ "row_id", "patient_id", "organ", "visit", "is_AE_patient", "is_corrupted", "PET_filename", "SEG_filename", "PET_path", "SEG_path", ] cols = [c for c in preferred if c in df.columns] + [ c for c in df.columns if c not in preferred ] return df[cols] DF_DISPLAY = _metadata_for_display(DF_META) def _format_image_id(row: pd.Series) -> str: """Create a readable image identifier from a selected metadata row.""" return f"{row['patient_id']}_{row['organ']}_VISIT_{row['visit']}" def _parse_probs(prob_string: str) -> tuple[float, ...]: """Parse comma-separated percentile values from UI input.""" values: list[float] = [] for part in prob_string.split(","): part = part.strip() if not part: continue values.append(float(part)) if not values: raise ValueError("At least one percentile must be supplied.") for value in values: if not (0 < value < 100): raise ValueError("Percentiles must be between 0 and 100.") return tuple(values) def _safe_error_message(exc: BaseException) -> str: """Return a readable error message for the diagnostics tab.""" return "\n".join( [ f"{type(exc).__name__}: {exc}", "", traceback.format_exc(limit=8), ] ) # ----------------------------------------------------------------------------- # UI # ----------------------------------------------------------------------------- app_ui = ui.page_fluid( ui.h2("Spatial SUV tail-feature explorer"), ui.layout_sidebar( ui.sidebar( ui.input_text( "probs", "Percentiles", value="80, 90, 95", placeholder="80, 90, 95", ), ui.input_numeric("bins", "Histogram bins", value=100, min=10, max=500), ui.input_numeric("min_suv", "Minimum SUV for PDF", value=0.1, min=0), ui.input_checkbox("log_x", "Log x-axis for SUV PDF", value=True), ui.hr(), ui.input_numeric( "min_component_voxels", "Minimum component voxels", value=3, min=1, step=1, ), ui.input_select( "component_connectivity", "Component connectivity", choices={"6": "6", "18": "18", "26": "26"}, selected="26", ), ui.input_select( "contrast_connectivity", "Contrast connectivity", choices={"6": "6", "18": "18", "26": "26"}, selected="26", ), ui.input_checkbox("compute_spread", "Compute spread", value=True), ui.input_checkbox("compute_local_contrast", "Compute local contrast", value=True), ui.input_checkbox("compute_sphericity", "Compute sphericity", value=True), ui.input_checkbox("crop_to_roi", "Crop to ROI", value=True), ui.hr(), ui.input_action_button("run", "Compute selected row", class_="btn-primary"), width=330, ), ui.navset_tab( ui.nav_panel( "Metadata table", ui.p("Select one row from the table, then click 'Compute selected row'."), ui.output_data_frame("metadata_table"), ), ui.nav_panel( "SUV PDF", ui.output_ui("selected_summary_pdf"), output_widget("suv_pdf_plot", height="560px"), ui.h4("SUV percentiles"), ui.output_data_frame("suv_percentiles_table"), ), ui.nav_panel( "Hot voxels", ui.output_ui("selected_summary_hot"), ui.output_ui("hot_voxel_tabs"), ), ui.nav_panel( "Spatial features", ui.output_ui("selected_summary_features"), ui.output_data_frame("features_table"), ), ui.nav_panel( "Errors / diagnostics", ui.output_text_verbatim("diagnostics"), ), ), ), ) # ----------------------------------------------------------------------------- # Server # ----------------------------------------------------------------------------- def server(input: Inputs, output: Outputs, session: Session): @render.data_frame def metadata_table(): return render.DataGrid( DF_DISPLAY, selection_mode="row", filters=True, height="650px", ) @reactive.calc def selected_row_display() -> pd.Series | None: selected = metadata_table.cell_selection()["rows"] if not selected: return None return DF_DISPLAY.iloc[int(selected[0])] @reactive.calc @reactive.event(input.run) def analysis_result(): """Load selected image and compute all requested outputs once per click.""" row_display = selected_row_display() if row_display is None: return { "ok": False, "error": "No row selected. Select a row in the metadata table first.", } try: probs = _parse_probs(input.probs()) patient_id = row_display["patient_id"] organ = row_display["organ"] visit = int(row_display["visit"]) index_key = (patient_id, organ, visit) row_meta, processed_image = get_processed_image( DF_META, patient_id=patient_id, organ=organ, visit=visit, ) image_id = _format_image_id(row_display) pdf_fig, suv_percentiles = plot_suv_pdf_plotly( processed_image, percentiles=probs, bins=int(input.bins()), log_x=bool(input.log_x()), min_suv=float(input.min_suv()), title=f"SUV distribution: {image_id}", ) hot_figs: dict[float, go.Figure] = {} for p in probs: threshold = float( suv_percentiles.loc[ suv_percentiles["percentile"] == p, "suv_threshold", ].iloc[0] ) hot_fig = plot_hot_voxels_plotly( processed_image, c=threshold, ) hot_fig.update_layout( title=f"Hot voxels: {image_id}, p{p:g}, SUV ≥ {threshold:.4g}" ) hot_figs[p] = hot_fig features = compute_tail_spatial_features( image=processed_image, percentiles=probs, component_connectivity=int(input.component_connectivity()), contrast_connectivity=int(input.contrast_connectivity()), min_component_voxels=int(input.min_component_voxels()), compute_spread=bool(input.compute_spread()), compute_local_contrast=bool(input.compute_local_contrast()), compute_sphericity=bool(input.compute_sphericity()), crop_to_roi=bool(input.crop_to_roi()), image_id=index_key, ) return { "ok": True, "row_display": row_display, "row_meta": row_meta, "image_id": image_id, "probs": probs, "pdf_fig": pdf_fig, "suv_percentiles": suv_percentiles, "hot_figs": hot_figs, "features": features, "diagnostics": f"Computed successfully for {image_id}", } except Exception as exc: return { "ok": False, "row_display": row_display, "error": _safe_error_message(exc), } def _selected_summary(result: dict) -> ui.TagList: if not result.get("ok"): return ui.TagList(ui.div(ui.strong("No successful computation yet."))) row = result["row_display"] return ui.TagList( ui.div( ui.strong("Selected image: "), f"{row['patient_id']} | {row['organ']} | VISIT_{row['visit']}", ) ) @render.ui def selected_summary_pdf(): return _selected_summary(analysis_result()) @render.ui def selected_summary_hot(): return _selected_summary(analysis_result()) @render.ui def selected_summary_features(): return _selected_summary(analysis_result()) @render_widget def suv_pdf_plot(): result = analysis_result() if not result.get("ok"): return _empty_figure("Select a row and click 'Compute selected row' to show the SUV PDF.") return result["pdf_fig"] @render.data_frame def suv_percentiles_table(): result = analysis_result() if not result.get("ok"): return pd.DataFrame() return render.DataGrid(result["suv_percentiles"], height="250px") @render.ui def hot_voxel_tabs(): result = analysis_result() if not result.get("ok"): return ui.div(ui.em("Select a row and compute to show hot-voxel plots.")) probs = list(result["probs"]) max_plots = 5 if len(probs) > max_plots: probs = probs[:max_plots] tabs = [] for i, p in enumerate(probs): tabs.append( ui.nav_panel( f"p{p:g}", output_widget(f"hot_voxel_plot_{i}", height="760px"), ) ) return ui.navset_tab(*tabs) def _hot_voxel_figure_by_index(index: int): result = analysis_result() if not result.get("ok"): return _empty_figure("No hot-voxel plot available.") probs = list(result["probs"]) if index >= len(probs): return _empty_figure("No percentile assigned to this plot.") p = probs[index] fig = result["hot_figs"].get(p) if fig is None: return _empty_figure(f"Percentile p{p:g} was not requested.") return fig @render_widget def hot_voxel_plot_0(): return _hot_voxel_figure_by_index(0) @render_widget def hot_voxel_plot_1(): return _hot_voxel_figure_by_index(1) @render_widget def hot_voxel_plot_2(): return _hot_voxel_figure_by_index(2) @render_widget def hot_voxel_plot_3(): return _hot_voxel_figure_by_index(3) @render_widget def hot_voxel_plot_4(): return _hot_voxel_figure_by_index(4) @render.data_frame def features_table(): result = analysis_result() if not result.get("ok"): return pd.DataFrame() return render.DataGrid(result["features"], height="420px") @render.text def diagnostics(): result = analysis_result() if result.get("ok"): return result.get("diagnostics", "OK") return result.get("error", "Unknown error") app = App(app_ui, server) if __name__ == "__main__": from shiny import run_app run_app("app:app", host="127.0.0.1", port=8000, reload=True)