"""Spatial and heterogeneity features for high-SUV tail regions. This module provides :func:`compute_tail_spatial_features`. Design principle ---------------- The default call is intentionally backward-compatible with the original implementation: compute_tail_spatial_features(image, connectivity=26) uses 26-connectivity for both connected components and local contrast, computes spatial spread, computes sphericity, and does not crop the image. Performance options are available as explicit opt-ins: compute_sphericity=False crop_to_roi=True contrast_connectivity=6 This avoids silently changing the scientific meaning of already-computed features. """ from __future__ import annotations from functools import lru_cache import numpy as np import pandas as pd import nibabel as nib from scipy import ndimage from skimage.measure import marching_cubes, mesh_surface_area Array3D = np.ndarray Spacing3D = tuple[float, float, float] def _validate_connectivity(connectivity: int) -> None: """Validate a 3D connectivity code. Parameters ---------- connectivity: Must be one of 6, 18, or 26. """ if connectivity not in (6, 18, 26): raise ValueError("connectivity must be one of: 6, 18, 26.") @lru_cache(maxsize=None) def _connectivity_structure(connectivity: int) -> Array3D: """Return a ``scipy.ndimage`` binary structure for 3D connectivity.""" _validate_connectivity(connectivity) if connectivity == 6: return ndimage.generate_binary_structure(rank=3, connectivity=1) if connectivity == 18: return ndimage.generate_binary_structure(rank=3, connectivity=2) return ndimage.generate_binary_structure(rank=3, connectivity=3) @lru_cache(maxsize=None) def _neighbor_offsets(connectivity: int) -> tuple[tuple[int, int, int], ...]: """Return unique 3D neighbor offsets for the selected connectivity. Only one offset from each symmetric pair is returned. Therefore each neighboring voxel pair is counted once, matching the original implementation. """ _validate_connectivity(connectivity) offsets: list[tuple[int, int, int]] = [] for di in (-1, 0, 1): for dj in (-1, 0, 1): for dk in (-1, 0, 1): if di == 0 and dj == 0 and dk == 0: continue dist1 = abs(di) + abs(dj) + abs(dk) if connectivity == 6 and dist1 != 1: continue if connectivity == 18 and dist1 > 2: continue # Keep one direction only to avoid double-counting. # This is the same rule as in the original implementation. if (di, dj, dk) > (0, 0, 0): offsets.append((di, dj, dk)) return tuple(offsets) def _component_entropy(component_sizes: np.ndarray) -> float: """Entropy of the connected-component size distribution. If the component sizes are ``s_j``, this returns - sum_j p_j log(p_j), where p_j = s_j / sum_k s_k. """ component_sizes = np.asarray(component_sizes, dtype=float) if component_sizes.size == 0 or component_sizes.sum() <= 0: return np.nan p = component_sizes / component_sizes.sum() p = p[p > 0] return float(-np.sum(p * np.log(p))) def _mask_bbox(mask: Array3D, margin: int = 0) -> tuple[slice, slice, slice] | None: """Return bounding-box slices for a 3D mask. This implementation avoids ``np.argwhere(mask)`` and instead uses 1D projections, which is usually faster and less memory-intensive. """ if mask.ndim != 3: raise ValueError("mask must be 3D.") if not np.any(mask): return None if margin < 0: raise ValueError("margin must be >= 0.") i_nonzero = np.flatnonzero(mask.any(axis=(1, 2))) j_nonzero = np.flatnonzero(mask.any(axis=(0, 2))) k_nonzero = np.flatnonzero(mask.any(axis=(0, 1))) lo = np.array([i_nonzero[0], j_nonzero[0], k_nonzero[0]], dtype=int) hi = np.array( [i_nonzero[-1] + 1, j_nonzero[-1] + 1, k_nonzero[-1] + 1], dtype=int, ) if margin > 0: lo = np.maximum(lo - margin, 0) hi = np.minimum(hi + margin, mask.shape) return tuple(slice(int(lo[d]), int(hi[d])) for d in range(3)) def _crop_to_mask_bbox( suv: Array3D, valid_mask: Array3D, margin: int = 1, ) -> tuple[Array3D, Array3D]: """Crop SUV image and valid-mask to the valid-mask bounding box.""" bbox = _mask_bbox(valid_mask, margin=margin) if bbox is None: return suv, valid_mask return suv[bbox], valid_mask[bbox] def _spatial_spread_from_mask( region_mask: Array3D, values: np.ndarray, voxel_spacing: Spacing3D, ) -> tuple[float, float]: """Compute unweighted and SUV-weighted spatial spread in mm^2. This is equivalent to the original implementation based on ``np.argwhere(region_mask)`` followed by multiplication with voxel spacing, but it avoids constructing a single large ``(n, 3)`` coordinate matrix. """ n_voxels = int(np.count_nonzero(region_mask)) if n_voxels == 0: return np.nan, np.nan ii, jj, kk = np.nonzero(region_mask) dx, dy, dz = voxel_spacing x = ii.astype(np.float64, copy=False) * dx y = jj.astype(np.float64, copy=False) * dy z = kk.astype(np.float64, copy=False) * dz cx = float(np.mean(x)) cy = float(np.mean(y)) cz = float(np.mean(z)) spread = float(np.mean((x - cx) ** 2 + (y - cy) ** 2 + (z - cz) ** 2)) weights = np.asarray(values, dtype=np.float64) weight_sum = float(np.sum(weights)) if weight_sum <= 0: weighted_spread = np.nan else: wcx = float(np.sum(weights * x) / weight_sum) wcy = float(np.sum(weights * y) / weight_sum) wcz = float(np.sum(weights * z) / weight_sum) weighted_spread = float( np.sum(weights * ((x - wcx) ** 2 + (y - wcy) ** 2 + (z - wcz) ** 2)) / weight_sum ) return spread, weighted_spread def _local_contrast( suv: Array3D, region_mask: Array3D, connectivity: int = 26, ) -> float: """Continuous local contrast inside a binary region. Computes the mean squared SUV difference between neighboring voxel pairs where both voxels are inside ``region_mask``. This is numerically equivalent to the original concatenate-based version, but it accumulates the sum of squared differences and the number of pairs directly. This avoids large temporary arrays. """ if suv.ndim != 3: raise ValueError("suv must be a 3D array.") _validate_connectivity(connectivity) region_mask = np.asarray(region_mask, dtype=bool) if region_mask.shape != suv.shape: raise ValueError("region_mask must have the same shape as suv.") if np.count_nonzero(region_mask) < 2: return np.nan nx, ny, nz = suv.shape total_sqdiff = 0.0 total_pairs = 0 for di, dj, dk in _neighbor_offsets(connectivity): src_slices = ( slice(max(0, -di), min(nx, nx - di)), slice(max(0, -dj), min(ny, ny - dj)), slice(max(0, -dk), min(nz, nz - dk)), ) dst_slices = ( slice(max(0, di), min(nx, nx + di)), slice(max(0, dj), min(ny, ny + dj)), slice(max(0, dk), min(nz, nz + dk)), ) pair_mask = region_mask[src_slices] & region_mask[dst_slices] n_pairs = int(np.count_nonzero(pair_mask)) if n_pairs == 0: continue diff = suv[src_slices][pair_mask] - suv[dst_slices][pair_mask] total_sqdiff += float(np.sum(diff * diff)) total_pairs += n_pairs if total_pairs == 0: return np.nan return float(total_sqdiff / total_pairs) def _largest_component_sphericity( largest_component_mask: Array3D, voxel_spacing: Spacing3D, *, crop_component: bool = False, ) -> float: """Approximate sphericity of the largest connected component. Sphericity is defined as psi = pi^(1/3) * (6V)^(2/3) / A, where ``V`` is physical volume and ``A`` is triangulated surface area. Parameters ---------- crop_component: If False, exactly follows the original full-volume marching-cubes calculation. If True, crop the largest component to its bounding box before marching cubes. Cropping is faster but may very slightly change the surface mesh if the component touches the crop boundary. """ n_voxels = int(np.count_nonzero(largest_component_mask)) if n_voxels < 4: return np.nan voxel_volume = float(np.prod(voxel_spacing)) volume = float(n_voxels * voxel_volume) component_for_surface = largest_component_mask if crop_component: bbox = _mask_bbox(largest_component_mask, margin=1) if bbox is None: return np.nan component_for_surface = largest_component_mask[bbox] try: verts, faces, _, _ = marching_cubes( component_for_surface.astype(float, copy=False), level=0.5, spacing=voxel_spacing, ) area = float(mesh_surface_area(verts, faces)) except Exception: return np.nan if area <= 0: return np.nan return float((np.pi ** (1 / 3)) * ((6 * volume) ** (2 / 3)) / area) def compute_tail_spatial_features( image: nib.Nifti1Image, voxel_spacing: Spacing3D | None = None, percentiles: tuple[float, ...] = (90, 95, 97.5, 99), connectivity: int = 26, min_component_voxels: int = 1, image_id: str | None = None, *, component_connectivity: int | None = None, contrast_connectivity: int | None = None, compute_spread: bool = True, compute_local_contrast: bool = True, compute_sphericity: bool = True, crop_to_roi: bool = False, crop_margin: int = 1, crop_component_for_sphericity: bool = False, ) -> pd.DataFrame: """Compute spatial and heterogeneity features of high-SUV tail regions. The input image is assumed to be already processed, for example PET SUV multiplied by an organ segmentation mask. Finite positive voxels are treated as the region of interest. For each percentile ``q``, the high-SUV tail is defined as R_q = {voxel : SUV(voxel) >= percentile_q(SUV within ROI)}. Backward compatibility ---------------------- By default, this function preserves the semantics of the original version: - ``connectivity`` is used for both components and local contrast; - spread is computed; - sphericity is computed; - the volume is not cropped before feature calculation. Faster screening runs should explicitly disable or alter expensive features. Parameters ---------- image: 3D NIfTI image containing SUV values per voxel. voxel_spacing: Physical voxel size in mm. If None, it is read from the NIfTI header. percentiles: Percentile thresholds used to define high-SUV regions. connectivity: Backward-compatible connectivity parameter. Used whenever ``component_connectivity`` or ``contrast_connectivity`` is not supplied. min_component_voxels: Components smaller than this number of voxels are ignored in component-level summaries. image_id: Optional image or patient identifier added to the output table. component_connectivity: Connectivity used for connected-component labeling. If None, uses ``connectivity``. contrast_connectivity: Connectivity used for local contrast. If None, uses ``connectivity``. Set this to 6 explicitly for a faster face-neighbor contrast. compute_spread: If False, skip spatial spread features. compute_local_contrast: If False, skip local contrast features. compute_sphericity: If False, skip marching-cubes sphericity computation. crop_to_roi: If True, crop the image to the nonzero ROI bounding box before feature calculation. This should preserve component counts and component entropy, but the default is False to match the original implementation exactly. crop_margin: Margin in voxels added when ``crop_to_roi=True``. crop_component_for_sphericity: If True, crop the largest component before marching cubes. This is faster but not the exact original sphericity calculation, so the default is False. Returns ------- pandas.DataFrame One row per percentile threshold. """ if not isinstance(image, nib.Nifti1Image): raise TypeError("image must be a nib.Nifti1Image.") if component_connectivity is None: component_connectivity = connectivity if contrast_connectivity is None: contrast_connectivity = connectivity _validate_connectivity(component_connectivity) _validate_connectivity(contrast_connectivity) if min_component_voxels < 1: raise ValueError("min_component_voxels must be >= 1.") if crop_margin < 0: raise ValueError("crop_margin must be >= 0.") suv = image.get_fdata(dtype=np.float64) if suv.ndim != 3: raise ValueError("Input NIfTI image must be 3D.") if voxel_spacing is None: voxel_spacing = image.header.get_zooms()[:3] if len(voxel_spacing) != 3: raise ValueError("voxel_spacing must have length 3.") voxel_spacing = tuple(float(x) for x in voxel_spacing) voxel_volume = float(np.prod(voxel_spacing)) valid_mask = np.isfinite(suv) & (suv > 0) if crop_to_roi: suv, valid_mask = _crop_to_mask_bbox( suv=suv, valid_mask=valid_mask, margin=crop_margin, ) values = suv[valid_mask] if values.size == 0: raise ValueError("No finite positive SUV values found in the image.") percentile_values = np.percentile(values, percentiles) structure = _connectivity_structure(component_connectivity) n_roi_voxels = int(np.count_nonzero(valid_mask)) roi_volume_mm3 = float(n_roi_voxels * voxel_volume) rows: list[dict[str, object]] = [] for q, threshold_raw in zip(percentiles, percentile_values): threshold = float(threshold_raw) tail_mask = valid_mask & (suv >= threshold) n_tail_voxels = int(np.count_nonzero(tail_mask)) base_row: dict[str, object] = { "image_id": image_id, "percentile": q, "threshold": threshold, "n_roi_voxels": n_roi_voxels, "roi_volume_mm3": roi_volume_mm3, "n_tail_voxels": n_tail_voxels, "tail_volume_mm3": float(n_tail_voxels * voxel_volume), "tail_fraction": float(n_tail_voxels / n_roi_voxels), } if n_tail_voxels == 0: rows.append( base_row | { "tail_mean": np.nan, "tail_median": np.nan, "tail_max": np.nan, "tail_std": np.nan, "tail_cv": np.nan, "tail_sum": np.nan, "tail_excess_mean": np.nan, "tail_excess_sum": np.nan, "tail_local_contrast": np.nan, "tail_local_contrast_norm": np.nan, "tail_spread_mm2": np.nan, "tail_weighted_spread_mm2": np.nan, "n_components": 0, "largest_component_voxels": 0, "largest_component_volume_mm3": np.nan, "largest_component_fraction": np.nan, "component_entropy": np.nan, "largest_component_sphericity": np.nan, } ) continue tail_values = suv[tail_mask] tail_mean = float(np.mean(tail_values)) tail_median = float(np.median(tail_values)) tail_max = float(np.max(tail_values)) tail_std = float(np.std(tail_values, ddof=1)) if n_tail_voxels > 1 else 0.0 tail_cv = float(tail_std / tail_mean) if tail_mean > 0 else np.nan tail_sum = float(np.sum(tail_values)) tail_excess = tail_values - threshold tail_excess_mean = float(np.mean(tail_excess)) tail_excess_sum = float(np.sum(tail_excess)) if compute_local_contrast: tail_local_contrast = _local_contrast( suv=suv, region_mask=tail_mask, connectivity=contrast_connectivity, ) else: tail_local_contrast = np.nan tail_local_contrast_norm = ( float(tail_local_contrast / tail_mean**2) if np.isfinite(tail_local_contrast) and tail_mean > 0 else np.nan ) if compute_spread: tail_spread_mm2, tail_weighted_spread_mm2 = _spatial_spread_from_mask( region_mask=tail_mask, values=tail_values, voxel_spacing=voxel_spacing, ) else: tail_spread_mm2 = np.nan tail_weighted_spread_mm2 = np.nan labeled, _ = ndimage.label(tail_mask, structure=structure) component_sizes_all = np.bincount(labeled.ravel())[1:] component_sizes = component_sizes_all[component_sizes_all >= min_component_voxels] n_components = int(component_sizes.size) if n_components > 0: largest_component_voxels = int(np.max(component_sizes)) largest_component_volume_mm3 = float(largest_component_voxels * voxel_volume) largest_component_fraction = float(largest_component_voxels / n_tail_voxels) component_entropy = _component_entropy(component_sizes) largest_label = int( np.flatnonzero(component_sizes_all == largest_component_voxels)[0] + 1 ) if compute_sphericity: largest_component_mask = labeled == largest_label largest_component_sphericity = _largest_component_sphericity( largest_component_mask=largest_component_mask, voxel_spacing=voxel_spacing, crop_component=crop_component_for_sphericity, ) else: largest_component_sphericity = np.nan else: largest_component_voxels = 0 largest_component_volume_mm3 = np.nan largest_component_fraction = np.nan component_entropy = np.nan largest_component_sphericity = np.nan rows.append( base_row | { "tail_mean": tail_mean, "tail_median": tail_median, "tail_max": tail_max, "tail_std": tail_std, "tail_cv": tail_cv, "tail_sum": tail_sum, "tail_excess_mean": tail_excess_mean, "tail_excess_sum": tail_excess_sum, "tail_local_contrast": tail_local_contrast, "tail_local_contrast_norm": tail_local_contrast_norm, "tail_spread_mm2": tail_spread_mm2, "tail_weighted_spread_mm2": tail_weighted_spread_mm2, "n_components": n_components, "largest_component_voxels": largest_component_voxels, "largest_component_volume_mm3": largest_component_volume_mm3, "largest_component_fraction": largest_component_fraction, "component_entropy": component_entropy, "largest_component_sphericity": largest_component_sphericity, } ) return pd.DataFrame(rows)