"""
Transfer Entropy (TE) implementation.
This module provides the implementation of Transfer Entropy
calculation using the new modular architecture.
"""
import logging
from typing import Dict, Tuple
import numpy as np
from trancit.core.base import BaseAnalyzer, BaseResult
from trancit.core.exceptions import ComputationError, ValidationError
from trancit.utils.helpers import compute_covariances, estimate_coefficients
from trancit.utils.preprocess import regularize_if_singular
logger = logging.getLogger(__name__)
[docs]
class TransferEntropyResult(BaseResult):
"""Result container for Transfer Entropy analysis."""
[docs]
def __init__(
self,
transfer_entropy: np.ndarray,
te_residual_cov: np.ndarray,
coefficients: np.ndarray,
):
"""
Initialize Transfer Entropy result.
Parameters
----------
transfer_entropy : np.ndarray
Transfer Entropy values
te_residual_cov : np.ndarray
Transfer entropy residual covariance
coefficients : np.ndarray
VAR coefficients
"""
super().__init__(
transfer_entropy=transfer_entropy,
te_residual_cov=te_residual_cov,
coefficients=coefficients,
)
[docs]
class TransferEntropyCalculator(BaseAnalyzer):
"""
Transfer Entropy (TE) calculator.
This class implements the Transfer Entropy algorithm for quantifying
information flow between time series variables.
"""
[docs]
def __init__(
self,
model_order: int,
time_mode: str = "inhomo",
use_diagonal_covariance: bool = False,
**kwargs,
):
"""
Initialize Transfer Entropy calculator.
Parameters
----------
model_order : int
Model order for VAR analysis
time_mode : str
Time mode: 'inhomo' (time-inhomogeneous) or 'homo' (homogeneous)
use_diagonal_covariance : bool
Whether to use diagonal covariance approximation
**kwargs
Additional configuration parameters
"""
super().__init__(
model_order=model_order,
time_mode=time_mode,
use_diagonal_covariance=use_diagonal_covariance,
**kwargs,
)
# VAREstimator is not used in this implementation - using direct computation
[docs]
def analyze(self, data: np.ndarray, **kwargs) -> TransferEntropyResult:
"""
Perform Transfer Entropy analysis on the given data.
Parameters
----------
data : np.ndarray
Time series data with shape (n_vars, n_observations, n_trials)
**kwargs
Additional parameters
Returns
-------
TransferEntropyResult
Transfer Entropy analysis results
"""
self._validate_input_data(data)
self._log_analysis_start(data.shape)
try:
current_data, lagged_data, n_time_steps = self._prepare_data_structures(
data
)
cov_xp, cov_yp, c_xyp, c_yxp = compute_covariances(
lagged_data, n_time_steps, self.config["model_order"]
)
result_arrays = self._initialize_result_arrays(
n_time_steps, data.shape[1], data.shape[0]
)
if self.config["time_mode"] == "inhomo":
self._compute_inhomogeneous_te(
current_data,
lagged_data,
cov_xp,
cov_yp,
c_xyp,
c_yxp,
result_arrays,
data.shape[2],
data.shape[0],
)
self._adjust_outputs_for_inhomo(result_arrays)
else:
self._compute_homogeneous_te(
current_data,
lagged_data,
cov_xp,
cov_yp,
c_xyp,
c_yxp,
result_arrays,
data.shape[2],
data.shape[0],
)
self._log_analysis_complete()
return TransferEntropyResult(
transfer_entropy=result_arrays["transfer_entropy"],
te_residual_cov=result_arrays["te_residual_cov"],
coefficients=result_arrays["coefficients"],
)
except Exception as e:
logger.error(f"Transfer Entropy analysis failed: {e}")
raise ComputationError(
f"Transfer Entropy analysis failed: {e}", "te_computation", data.shape
)
def _validate_config(self) -> None:
"""Validate configuration parameters."""
if self.config["model_order"] <= 0:
raise ValidationError(
"model_order must be positive",
"model_order",
self.config["model_order"],
)
if self.config["time_mode"] not in ["inhomo", "homo"]:
raise ValidationError(
"time_mode must be 'inhomo' or 'homo'",
"time_mode",
self.config["time_mode"],
)
def _validate_input_data(self, data: np.ndarray) -> None:
"""Validate input data format and dimensions."""
super()._validate_input_data(data)
if data.ndim != 3:
raise ValidationError(
"Input data must be 3D (n_vars, n_observations, n_trials)",
"data_ndim",
data.ndim,
)
if data.shape[0] != 2:
raise ValidationError(
"Input data must be bivariate (n_vars=2)", "n_vars", data.shape[0]
)
def _prepare_data_structures(
self, time_series_data: np.ndarray
) -> Tuple[np.ndarray, np.ndarray, int]:
"""
Prepare data structures for analysis.
Parameters
----------
time_series_data : np.ndarray
Input time series data
Returns
-------
Tuple[np.ndarray, np.ndarray, int]
current_data, lagged_data, n_time_steps
"""
nvar, nobs, ntrials = time_series_data.shape
r1 = self.config["model_order"] + 1
extended_data = np.zeros((nvar, r1, nobs + r1 - 1, ntrials))
for k in range(r1):
extended_data[:, k, k : k + nobs, :] = time_series_data
current_data = extended_data[:, 0, r1 - 1 : nobs, :]
lagged_data = extended_data[
:, 1 : self.config["model_order"] + 1, r1 - 1 : nobs - 1, :
]
n_time_steps = current_data.shape[1] - 1
return current_data, lagged_data, n_time_steps
def _initialize_result_arrays(
self, n_time_steps: int, nobs: int, nvar: int
) -> Dict[str, np.ndarray]:
"""
Initialize arrays for storing results.
Parameters
----------
n_time_steps : int
Number of time steps
nobs : int
Number of observations
nvar : int
Number of variables
Returns
-------
Dict[str, np.ndarray]
Dictionary of initialized arrays
"""
return {
"te_residual_cov": np.zeros((n_time_steps, 2)),
"transfer_entropy": np.zeros((n_time_steps, 2)),
"coefficients": np.full(
(nobs, nvar, nvar * self.config["model_order"] + 1), np.nan
),
}
def _compute_inhomogeneous_te(
self,
current_data: np.ndarray,
lagged_data: np.ndarray,
cov_xp: np.ndarray,
cov_yp: np.ndarray,
c_xyp: np.ndarray,
c_yxp: np.ndarray,
result_arrays: Dict[str, np.ndarray],
ntrials: int,
nvar: int,
) -> None:
"""Compute inhomogeneous Transfer Entropy."""
n_time_steps = current_data.shape[1] - 1
for t in range(n_time_steps):
try:
coeff, residual_cov = estimate_coefficients(
current_data[:, t, :].T,
lagged_data[:, : self.config["model_order"], t, :]
.reshape(nvar * self.config["model_order"], ntrials)
.T,
ntrials,
)
result_arrays["coefficients"][t, :, :] = coeff
a_square = coeff[:, :-1].reshape(nvar, nvar, self.config["model_order"])
b = a_square[0, 1, :] # X -> Y
c = a_square[1, 0, :] # Y -> X
sigy = residual_cov[0, 0] + np.finfo(float).eps
sigx = residual_cov[1, 1] + np.finfo(float).eps
cov_yp_reg = regularize_if_singular(cov_yp[t])
cov_xp_reg = regularize_if_singular(cov_xp[t])
result_arrays["te_residual_cov"][t, 1] = (
sigy
+ b.T @ cov_xp_reg @ b
- b.T @ c_xyp[t] @ np.linalg.inv(cov_yp_reg) @ c_xyp[t].T @ b
)
result_arrays["te_residual_cov"][t, 0] = (
sigx
+ c.T @ cov_yp_reg @ c
- c.T @ c_yxp[t] @ np.linalg.inv(cov_xp_reg) @ c_yxp[t].T @ c
)
result_arrays["transfer_entropy"][t, 1] = 0.5 * np.log(
result_arrays["te_residual_cov"][t, 1] / sigy
)
result_arrays["transfer_entropy"][t, 0] = 0.5 * np.log(
result_arrays["te_residual_cov"][t, 0] / sigx
)
except Exception as e:
logger.error(
f"Transfer Entropy computation failed at time step {t}: {e}"
)
result_arrays["transfer_entropy"][t] = np.zeros(2)
result_arrays["te_residual_cov"][t] = np.zeros(2)
def _compute_homogeneous_te(
self,
current_data: np.ndarray,
lagged_data: np.ndarray,
cov_xp: np.ndarray,
cov_yp: np.ndarray,
c_xyp: np.ndarray,
c_yxp: np.ndarray,
result_arrays: Dict[str, np.ndarray],
ntrials: int,
nvar: int,
) -> None:
"""Compute homogeneous Transfer Entropy."""
coeff, residual_cov = estimate_coefficients(current_data, lagged_data, ntrials)
for t in range(result_arrays["coefficients"].shape[0]):
result_arrays["coefficients"][t] = coeff.T
a_square = coeff[:, :-1].reshape(nvar, nvar, self.config["model_order"])
b = a_square[0, 1, :] # X -> Y
c = a_square[1, 0, :] # Y -> X
sigy = residual_cov[0, 0] + np.finfo(float).eps
sigx = residual_cov[1, 1] + np.finfo(float).eps
cov_xp_avg = np.mean(cov_xp, axis=0)
cov_yp_avg = np.mean(cov_yp, axis=0)
cov_xy_p_avg = np.mean(c_xyp, axis=0)
cov_yx_p_avg = np.mean(c_yxp, axis=0)
result_arrays["transfer_entropy"][0, 1] = 0.5 * np.log(
(
sigy
+ b.T @ cov_xp_avg @ b
- b.T @ cov_xy_p_avg @ np.linalg.inv(cov_yp_avg) @ cov_xy_p_avg.T @ b
)
/ sigy
)
result_arrays["transfer_entropy"][0, 0] = 0.5 * np.log(
(
sigx
+ c.T @ cov_yp_avg @ c
- c.T @ cov_yx_p_avg @ np.linalg.inv(cov_xp_avg) @ cov_yx_p_avg.T @ c
)
/ sigx
)
def _adjust_outputs_for_inhomo(self, result_arrays: Dict[str, np.ndarray]) -> None:
"""Adjust outputs for inhomogeneous mode by adding NaN blocks."""
if self.config["time_mode"] == "inhomo":
nan_block = np.full((self.config["model_order"], 2), np.nan)
result_arrays["transfer_entropy"] = np.vstack(
[nan_block, result_arrays["transfer_entropy"]]
)
result_arrays["te_residual_cov"] = np.vstack(
[nan_block, result_arrays["te_residual_cov"]]
)