Source code for trancit.causality.dcs

"""
Dynamic Causal Strength (DCS) implementation.

This module provides the implementation of Dynamic Causal Strength
calculation using the new modular architecture.
"""

import logging
from typing import Dict, Tuple

import numpy as np

from trancit.core.base import BaseAnalyzer, BaseResult
from trancit.core.exceptions import ComputationError, ValidationError
from trancit.utils.helpers import compute_covariances, estimate_coefficients
from trancit.utils.preprocess import regularize_if_singular

logger = logging.getLogger(__name__)


[docs] class DCSResult(BaseResult): """Result container for DCS analysis."""
[docs] def __init__( self, causal_strength: np.ndarray, transfer_entropy: np.ndarray, granger_causality: np.ndarray, coefficients: np.ndarray, te_residual_cov: np.ndarray, ): """ Initialize DCS result. Parameters ---------- causal_strength : np.ndarray Dynamic Causal Strength values transfer_entropy : np.ndarray Transfer Entropy values granger_causality : np.ndarray Granger Causality values coefficients : np.ndarray VAR coefficients te_residual_cov : np.ndarray Transfer entropy residual covariance """ super().__init__( causal_strength=causal_strength, transfer_entropy=transfer_entropy, granger_causality=granger_causality, coefficients=coefficients, te_residual_cov=te_residual_cov, )
[docs] class DCSCalculator(BaseAnalyzer): """ Dynamic Causal Strength (DCS) calculator. This class implements the DCS algorithm for quantifying causal relationships in time series data. """
[docs] def __init__( self, model_order: int, time_mode: str = "inhomo", use_diagonal_covariance: bool = False, **kwargs, ): """ Initialize DCS calculator. Parameters ---------- model_order : int Model order for VAR analysis time_mode : str Time mode: 'inhomo' (time-inhomogeneous) or 'homo' (homogeneous) use_diagonal_covariance : bool Whether to use diagonal covariance approximation **kwargs Additional configuration parameters """ super().__init__( model_order=model_order, time_mode=time_mode, use_diagonal_covariance=use_diagonal_covariance, **kwargs, )
[docs] def analyze(self, data: np.ndarray, **kwargs) -> DCSResult: """ Perform DCS analysis on the given data. Parameters ---------- data : np.ndarray Time series data with shape (n_vars, n_observations, n_trials) **kwargs Additional parameters Returns ------- DCSResult DCS analysis results """ self._validate_input_data(data) self._log_analysis_start(data.shape) try: current_data, lagged_data, n_time_steps = self._prepare_data_structures( data ) cov_xp, cov_yp, c_xyp, c_yxp = compute_covariances( lagged_data, n_time_steps, self.config["model_order"] ) result_arrays = self._initialize_result_arrays( n_time_steps, data.shape[1], data.shape[0] ) if self.config["time_mode"] == "inhomo": self._compute_inhomogeneous_causality( current_data, lagged_data, cov_xp, cov_yp, c_xyp, c_yxp, result_arrays, data.shape[2], data.shape[0], ) self._adjust_outputs_for_inhomo(result_arrays) else: self._compute_homogeneous_causality( current_data, lagged_data, cov_xp, cov_yp, c_xyp, c_yxp, result_arrays, data.shape[2], data.shape[0], ) self._log_analysis_complete() return DCSResult( causal_strength=result_arrays["causal_strength"], transfer_entropy=result_arrays["transfer_entropy"], granger_causality=result_arrays["granger_causality"], coefficients=result_arrays["coefficients"], te_residual_cov=result_arrays["te_residual_cov"], ) except Exception as e: logger.error(f"DCS analysis failed: {e}") raise ComputationError( f"DCS analysis failed: {e}", "dcs_computation", data.shape )
def _validate_config(self) -> None: """Validate configuration parameters.""" if self.config["model_order"] <= 0: raise ValidationError( "model_order must be positive", "model_order", self.config["model_order"], ) if self.config["time_mode"] not in ["inhomo", "homo"]: raise ValidationError( "time_mode must be 'inhomo' or 'homo'", "time_mode", self.config["time_mode"], ) def _validate_input_data(self, data: np.ndarray) -> None: """Validate input data format and dimensions.""" super()._validate_input_data(data) if data.ndim != 3: raise ValidationError( "Input data must be 3D (n_vars, n_observations, n_trials)", "data_ndim", data.ndim, ) if data.shape[0] != 2: raise ValidationError( "Input data must be bivariate (n_vars=2)", "n_vars", data.shape[0] ) def _prepare_data_structures( self, time_series_data: np.ndarray ) -> Tuple[np.ndarray, np.ndarray, int]: """ Prepare data structures for analysis exactly like the previous implementation. Parameters ---------- time_series_data : np.ndarray Input time series data Returns ------- Tuple[np.ndarray, np.ndarray, int] current_data, lagged_data, n_time_steps """ nvar, nobs, ntrials = time_series_data.shape r1 = self.config["model_order"] + 1 extended_data = np.zeros((nvar, r1, nobs + r1 - 1, ntrials)) for k in range(r1): extended_data[:, k, k : k + nobs, :] = time_series_data current_data = extended_data[:, 0, r1 - 1 : nobs, :] lagged_data = extended_data[ :, 1 : self.config["model_order"] + 1, r1 - 1 : nobs - 1, : ] n_time_steps = current_data.shape[1] - 1 return current_data, lagged_data, n_time_steps def _initialize_result_arrays( self, n_time_steps: int, nobs: int, nvar: int ) -> Dict[str, np.ndarray]: """ Initialize arrays for storing results exactly like the previous implementation. Parameters ---------- n_time_steps : int Number of time steps nobs : int Number of observations nvar : int Number of variables Returns ------- Dict[str, np.ndarray] Dictionary of initialized arrays """ return { "te_residual_cov": np.zeros((n_time_steps, 2)), "transfer_entropy": np.zeros((n_time_steps, 2)), "causal_strength": np.zeros((n_time_steps, 2)), "granger_causality": np.zeros((n_time_steps, 2)), "coefficients": np.full( (nobs, nvar, nvar * self.config["model_order"] + 1), np.nan ), } def _compute_inhomogeneous_causality( self, current_data: np.ndarray, lagged_data: np.ndarray, cov_xp: np.ndarray, cov_yp: np.ndarray, c_xyp: np.ndarray, c_yxp: np.ndarray, result_arrays: Dict[str, np.ndarray], ntrials: int, nvar: int, ) -> None: """Compute inhomogeneous causality exactly like the previous implementation.""" n_time_steps = current_data.shape[1] - 1 for t in range(n_time_steps): try: logging.debug(f"Processing time step {t}/{n_time_steps - 1}") coeff, residual_cov = estimate_coefficients( current_data[:, t, :].T, lagged_data[:, : self.config["model_order"], t, :] .reshape(nvar * self.config["model_order"], ntrials) .T, ntrials, ) result_arrays["coefficients"][t, :, :] = coeff a_square = coeff[:, :-1].reshape(nvar, nvar, self.config["model_order"]) b = a_square[0, 1, :] # X -> Y c = a_square[1, 0, :] # Y -> X sigy = residual_cov[0, 0] or np.finfo(float).eps sigx = residual_cov[1, 1] or np.finfo(float).eps cov_yp_reg = regularize_if_singular(cov_yp[t]) cov_xp_reg = regularize_if_singular(cov_xp[t]) result_arrays["te_residual_cov"][t, 1] = ( sigy + b.T @ cov_xp_reg @ b - b.T @ c_xyp[t] @ np.linalg.inv(cov_yp_reg) @ c_xyp[t].T @ b ) result_arrays["te_residual_cov"][t, 0] = ( sigx + c.T @ cov_yp_reg @ c - c.T @ c_yxp[t] @ np.linalg.inv(cov_xp_reg) @ c_yxp[t].T @ c ) result_arrays["transfer_entropy"][t, 1] = 0.5 * np.log( result_arrays["te_residual_cov"][t, 1] / sigy ) result_arrays["transfer_entropy"][t, 0] = 0.5 * np.log( result_arrays["te_residual_cov"][t, 0] / sigx ) if not self.config["use_diagonal_covariance"]: result_arrays["causal_strength"][t, 1] = 0.5 * np.log( (sigy + b.T @ cov_xp_reg @ b) / sigy ) result_arrays["causal_strength"][t, 0] = 0.5 * np.log( (sigx + c.T @ cov_yp_reg @ c) / sigx ) else: result_arrays["causal_strength"][t, 1] = 0.5 * np.log( (sigy + b.T @ np.diag(np.diag(cov_xp_reg)) @ b) / sigy ) result_arrays["causal_strength"][t, 0] = 0.5 * np.log( (sigx + c.T @ np.diag(np.diag(cov_yp_reg)) @ c) / sigx ) lagged_x_p = lagged_data[1, :, t, :].T lagged_y_p = lagged_data[0, :, t, :].T current_x_t = current_data[1, t, :] current_y_t = current_data[0, t, :] _, sigx_r = estimate_coefficients(current_x_t, lagged_x_p, ntrials) _, sigy_r = estimate_coefficients(current_y_t, lagged_y_p, ntrials) result_arrays["granger_causality"][t, 1] = np.log(sigy_r / sigy) result_arrays["granger_causality"][t, 0] = np.log(sigx_r / sigx) except Exception as e: logger.error(f"Computation failed at time step {t}: {e}") result_arrays["causal_strength"][t] = np.zeros(2) result_arrays["transfer_entropy"][t] = np.zeros(2) result_arrays["granger_causality"][t] = np.zeros(2) def _compute_homogeneous_causality( self, current_data: np.ndarray, lagged_data: np.ndarray, cov_xp: np.ndarray, cov_yp: np.ndarray, c_xyp: np.ndarray, c_yxp: np.ndarray, result_arrays: Dict[str, np.ndarray], ntrials: int, nvar: int, ) -> None: """Compute homogeneous causality exactly like the previous implementation.""" current_data.shape[1] - 1 coeff, residual_cov = estimate_coefficients(current_data, lagged_data, ntrials) for t in range(result_arrays["coefficients"].shape[0]): result_arrays["coefficients"][t] = coeff.T a_square = coeff[:, :-1].reshape(nvar, nvar, self.config["model_order"]) b = a_square[0, 1, :] # X -> Y c = a_square[1, 0, :] # Y -> X sigy = residual_cov[0, 0] or np.finfo(float).eps sigx = residual_cov[1, 1] or np.finfo(float).eps cov_xp_avg = np.mean(cov_xp, axis=0) cov_yp_avg = np.mean(cov_yp, axis=0) cov_xy_p_avg = np.mean(c_xyp, axis=0) cov_yx_p_avg = np.mean(c_yxp, axis=0) cov_yp_reg = regularize_if_singular(cov_yp_avg) cov_xp_reg = regularize_if_singular(cov_xp_avg) result_arrays["transfer_entropy"][0, 1] = 0.5 * np.log( ( sigy + b.T @ cov_xp_avg @ b - b.T @ cov_xy_p_avg @ np.linalg.inv(cov_yp_avg) @ cov_xy_p_avg.T @ b ) / sigy ) result_arrays["transfer_entropy"][0, 0] = 0.5 * np.log( ( sigx + c.T @ cov_yp_avg @ c - c.T @ cov_yx_p_avg @ np.linalg.inv(cov_xp_avg) @ cov_yx_p_avg.T @ c ) / sigx ) if not self.config["use_diagonal_covariance"]: result_arrays["causal_strength"][0, 1] = 0.5 * np.log( (sigy + b.T @ cov_xp_reg @ b) / sigy ) result_arrays["causal_strength"][0, 0] = 0.5 * np.log( (sigx + c.T @ cov_yp_reg @ c) / sigx ) else: result_arrays["causal_strength"][0, 1] = 0.5 * np.log( (sigy + b.T @ np.diag(np.diag(cov_xp_reg)) @ b) / sigy ) result_arrays["causal_strength"][0, 0] = 0.5 * np.log( (sigx + c.T @ np.diag(np.diag(cov_yp_reg)) @ c) / sigx ) lagged_x_p = lagged_data[1, :, :, :].reshape(self.config["model_order"], -1).T lagged_y_p = lagged_data[0, :, :, :].reshape(self.config["model_order"], -1).T current_x_t = current_data[1, :, :].flatten() current_y_t = current_data[0, :, :].flatten() _, sigx_r = estimate_coefficients(current_x_t, lagged_x_p, ntrials) _, sigy_r = estimate_coefficients(current_y_t, lagged_y_p, ntrials) result_arrays["granger_causality"][0, 1] = np.log(sigy_r / sigy) result_arrays["granger_causality"][0, 0] = np.log(sigx_r / sigx) def _adjust_outputs_for_inhomo(self, result_arrays: Dict[str, np.ndarray]) -> None: """Adjust outputs for inhomogeneous mode by adding NaN blocks exactly like the previous implementation.""" if self.config["time_mode"] == "inhomo": nan_block = np.full((self.config["model_order"], 2), np.nan) result_arrays["granger_causality"] = np.vstack( [nan_block, result_arrays["granger_causality"]] ) result_arrays["transfer_entropy"] = np.vstack( [nan_block, result_arrays["transfer_entropy"]] ) result_arrays["causal_strength"] = np.vstack( [nan_block, result_arrays["causal_strength"]] )