Source code for trancit.utils.signal

import logging
from typing import Tuple

import numpy as np
from scipy.spatial.distance import euclidean

logger = logging.getLogger(__name__)


def find_best_shrinked_locations(
    signal: np.ndarray,
    shrinked_locations: np.ndarray,
    all_locations: np.ndarray,
    num_bins: int = 100,
) -> Tuple[np.ndarray, np.ndarray]:
    """
    Find the best subset of shrinked locations by minimizing histogram distance.

    Computes the histogram of the full set of locations (`all_locations`) and compares
    it to histograms of subsets of `shrinked_locations` using Euclidean distance.
    Returns the subset that minimizes this distance.

    Args:
        signal (np.ndarray): 1D array of signal or data values.
        shrinked_locations (np.ndarray): 1D array of subsampled location indices.
        all_locations (np.ndarray): 1D array of all original location indices.
        num_bins (int, optional): Number of histogram bins. Defaults to 100.

    Returns:
        Tuple[np.ndarray, np.ndarray]:
            - best_locations: Subset of `shrinked_locations` with minimal histogram
              distance.
            - distances: Array of distances for each subset size, with NaN for sizes
              < 100.

    Raises:
        ValueError: If input arrays are empty or incompatible with `num_bins`.
    """
    if not signal.size or not shrinked_locations.size or not all_locations.size:
        logger.error("Empty input arrays provided to find_best_shrinked_locs.")
        raise ValueError("Signal and location arrays must not be empty.")

    hist_full, _ = np.histogram(signal[all_locations], bins=num_bins, density=True)
    logger.debug(f"Computed full histogram with {num_bins} bins.")

    distances = np.full(len(shrinked_locations), np.nan)

    for size in range(100, len(shrinked_locations)):
        hist_temp, _ = np.histogram(
            signal[shrinked_locations[:size]], bins=num_bins, density=True
        )
        # distances[size] = cdist(
        #     hist_full.reshape(1, -1), hist_temp.reshape(1, -1), metric="euclidean"
        # )[0, 0]
        distances[size] = euclidean(hist_full, hist_temp)
        logger.debug(f"Computed distance {distances[size]:.4f} for subset size {size}.")

    best_size = np.nanargmin(distances)
    best_locations = shrinked_locations[:best_size]
    logger.info(
        f"Best subset size: {best_size} with distance {distances[best_size]:.4f}."
    )

    return best_locations, distances


def shrink_locations_resample_uniform(
    locations: np.ndarray, min_distance: int
) -> np.ndarray:
    """
    Uniformly resample locations with a minimum distance between points.

    Iteratively selects locations randomly, ensuring each new location is at least
    `min_distance`
    away from previously selected ones, until no more can be added.

    Args:
        locations (np.ndarray): 1D array of location indices to resample from.
        min_distance (int): Minimum distance between selected locations.

    Returns:
        np.ndarray: 1D array of selected locations.

    Raises:
        ValueError: If `min_distance` is negative or `locations` is empty.
    """
    if len(locations) == 0:
        logger.warning("Empty locations array provided; returning empty array.")
        return np.array([])
    if min_distance < 0:
        logger.error("Negative min_distance provided.")
        raise ValueError("min_distance must be non-negative.")

    selected_locations: list[int] = []
    available_locations = locations.copy()
    max_number_generation = len(available_locations)
    logger.debug(
        f"Starting resampling with {len(locations)} locations and "
        f"min_distance {min_distance}."
    )

    while len(selected_locations) < max_number_generation:
        rand_idx = np.random.randint(0, len(available_locations))
        selected_loc = available_locations[rand_idx]
        selected_locations.append(selected_loc)
        mask = np.abs(selected_loc - available_locations) >= min_distance
        available_locations = available_locations[mask]
        logger.debug(
            f"Selected location {selected_loc}; {len(available_locations)} remain."
        )

    result = np.array(selected_locations)
    logger.info(f"Resampling complete; selected {len(result)} locations.")
    return result


[docs] def find_peak_locations( signal: np.ndarray, candidate_locations: np.ndarray, window_size: int ) -> np.ndarray: """ Refine peak locations by grouping candidates and local maximization. Groups candidate locations within `window_size`, finds the maximum in each group, and refines the peak within a local window around each maximum. Args: signal (np.ndarray): 1D array of signal data. candidate_locations (np.ndarray): 1D array of candidate peak indices. window_size (int): Size of the window for grouping and refinement. Returns: np.ndarray: 1D array of refined peak locations. Raises: ValueError: If `window_size` is negative or inputs are empty/invalid. """ if not signal.size or not candidate_locations.size: logger.error("Empty signal or candidate_locations provided.") raise ValueError("Signal and candidate_locations must not be empty.") if window_size < 0: logger.error("Negative window_size provided.") raise ValueError("window_size must be non-negative.") valid_candidates = np.sort( candidate_locations[ (candidate_locations >= window_size) & (candidate_locations <= len(signal) - window_size) ].astype(np.intp) ) logger.debug(f"Filtered to {len(valid_candidates)} valid candidates.") preliminary_peaks = [] idx_start = 0 idx_end = 0 while idx_end < len(valid_candidates) - 1: while ( idx_end + 1 < len(valid_candidates) and valid_candidates[idx_end + 1] - valid_candidates[idx_start] < window_size ): idx_end += 1 if idx_end == len(valid_candidates) - 1: break group = valid_candidates[idx_start : idx_end + 1] group_signal = signal[group] max_idx = np.argmax(group_signal) preliminary_peaks.append(group[max_idx]) idx_start = idx_end + 1 idx_end = idx_start logger.debug(f"Found {len(preliminary_peaks)} preliminary peaks.") # Refine peaks within local windows half_window = int(np.ceil(window_size / 2)) refined_peaks = [] for peak in preliminary_peaks: start = peak - half_window + 1 end = peak + half_window if start < 0 or end > len(signal): logger.warning(f"Skipping peak at {peak} due to boundary violation.") continue local_signal = signal[start:end] local_max_idx = np.argmax(local_signal) refined_peaks.append(start + local_max_idx) logger.info(f"Refined to {len(refined_peaks)} peaks.") return np.unique(refined_peaks)