Source code for trancit.models.var_estimation

"""
VAR (Vector Autoregressive) model estimation.

This module provides robust VAR model estimation with proper
error handling and validation.
"""

import logging
from typing import Tuple

import numpy as np

from ..core.exceptions import DataError, ValidationError
from ..utils.preprocess import regularize_if_singular

logger = logging.getLogger(__name__)


[docs] class VAREstimator: """ Vector Autoregressive (VAR) model estimator. This class provides methods for estimating VAR model parameters including coefficients, residuals, and covariance matrices. """
[docs] def __init__(self, model_order: int, time_mode: str = "inhomo"): """ Initialize VAR estimator. Parameters ---------- model_order : int Model order for VAR analysis time_mode : str Time mode: 'inhomo' (time-inhomogeneous) or 'homo' (homogeneous) """ self.model_order = model_order self.time_mode = time_mode self._validate_parameters()
def _validate_parameters(self) -> None: """Validate initialization parameters.""" if self.model_order <= 0: raise ValidationError( "model_order must be positive", "model_order", self.model_order ) if self.time_mode not in ["inhomo", "homo"]: raise ValidationError( "time_mode must be 'inhomo' or 'homo'", "time_mode", self.time_mode )
[docs] def estimate_var_coefficients( self, time_series_data: np.ndarray, model_order: int, max_model_order: int, time_mode: str, lag_mode: str, epsilon: float = 1e-8, ) -> Tuple[np.ndarray, np.ndarray, float, float]: """ Estimate VAR coefficients and compute residual covariance with regularization. Supports both inhomogeneous ('inhomo') and homogeneous ('homo') time modes. Args: time_series_data (np.ndarray): Data array, shape (n_vars, n_observations, n_trials). model_order (int): Model order for the VAR process. max_model_order (int): Maximum model order for lag_mode 'infocrit'. time_mode (str): 'inhomo' or 'homo'. lag_mode (str): 'infocrit' or 'var' to determine lag structure. epsilon (float, optional): Regularization term for singular matrices. Defaults to 1e-8. Returns: Tuple[np.ndarray, np.ndarray, float, float]: - coefficients: VAR coefficients, shape varies by time_mode. - residual_covariance: Residual covariance, shape varies by time_mode. - log_likelihood: Log-likelihood of the model. - sum_log_det_hessian: Sum of log determinants of the Hessian. Raises: ValueError: If n_vars == 1, or time_mode/lag_mode is invalid. """ self._validate_input_data(time_series_data, model_order, time_mode, lag_mode) n_vars, n_observations, n_trials = time_series_data.shape extended_data = self._prepare_extended_data( time_series_data, model_order, max_model_order, lag_mode ) current_data, lagged_data, n_time_steps = self._extract_data_components( extended_data, model_order, max_model_order, lag_mode, n_observations ) coefficients, residual_covariance, residual_determinants, log_det_hessian = ( self._initialize_output_arrays(n_time_steps, n_vars, model_order) ) for t in range(n_time_steps): coeff, cov, det, hessian = self._estimate_single_time_step( current_data, lagged_data, t, n_vars, model_order, n_trials, epsilon ) coefficients[t] = coeff residual_covariance[t] = cov residual_determinants[t] = det log_det_hessian[t] = hessian if time_mode == "inhomo": log_likelihood, sum_log_det_hessian = ( self._compute_inhomogeneous_statistics( residual_determinants, n_time_steps, n_vars, epsilon ) ) else: # time_mode == "homo" coefficients, residual_covariance, log_likelihood, sum_log_det_hessian = ( self._compute_homogeneous_statistics( current_data, lagged_data, n_vars, model_order, n_time_steps, n_trials, epsilon, ) ) return coefficients, residual_covariance, log_likelihood, sum_log_det_hessian
def _validate_input_data( self, time_series_data: np.ndarray, model_order: int, time_mode: str, lag_mode: str, ) -> None: """Validate input data and parameters.""" if not isinstance(time_series_data, np.ndarray): raise ValidationError( "time_series_data must be a NumPy array", "time_series_data_type", type(time_series_data), ) if time_series_data.ndim != 3: raise ValidationError( "time_series_data must be 3-dimensional (n_vars, " "n_observations, n_trials)", "time_series_data_ndim", time_series_data.ndim, ) n_vars, n_observations, n_trials = time_series_data.shape if n_vars <= 1: raise ValidationError( "Input must be multivariate (n_vars > 1)", "n_vars", n_vars ) if np.isnan(time_series_data).any() or np.isinf(time_series_data).any(): raise DataError( "time_series_data contains NaN or Inf values", time_series_data.shape, "float64", ) if not isinstance(model_order, int) or model_order <= 0: raise ValidationError( "model_order must be a positive integer", "model_order", model_order ) if time_mode not in ["inhomo", "homo"]: raise ValidationError( "time_mode must be 'inhomo' or 'homo'", "time_mode", time_mode ) if lag_mode not in ["infocrit", "var"]: raise ValidationError( "lag_mode must be 'infocrit' or 'var'", "lag_mode", lag_mode ) if n_observations <= model_order: raise ValidationError( f"Number of observations ({n_observations}) must be greater " f"than model_order ({model_order})", "n_observations_vs_model_order", (n_observations, model_order), ) def _prepare_extended_data( self, time_series_data: np.ndarray, model_order: int, max_model_order: int, lag_mode: str, ) -> np.ndarray: """Prepare extended data structure for VAR estimation.""" n_vars, n_observations, n_trials = time_series_data.shape lag_depth = max_model_order + 1 if lag_mode == "infocrit" else model_order + 1 extended_data = np.zeros( (n_vars, lag_depth, n_observations + lag_depth - 1, n_trials) ) for k in range(lag_depth): extended_data[:, k, k : k + n_observations, :] = time_series_data return extended_data def _extract_data_components( self, extended_data: np.ndarray, model_order: int, max_model_order: int, lag_mode: str, n_observations: int, ) -> Tuple[np.ndarray, np.ndarray, int]: """Extract current and lagged data components.""" lag_depth = max_model_order + 1 if lag_mode == "infocrit" else model_order + 1 current_data = extended_data[:, 0, lag_depth - 1 : n_observations, :] lagged_data = extended_data[ :, 1 : model_order + 1, lag_depth - 1 : n_observations, : ] n_time_steps = current_data.shape[1] return current_data, lagged_data, n_time_steps def _initialize_output_arrays( self, n_time_steps: int, n_vars: int, model_order: int ) -> Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]: """Initialize output arrays for VAR estimation.""" coefficients = np.zeros((n_time_steps, n_vars, n_vars * model_order)) residual_covariance = np.zeros((n_time_steps, n_vars, n_vars)) residual_determinants = np.zeros(n_time_steps) log_det_hessian = np.zeros(n_time_steps) return coefficients, residual_covariance, residual_determinants, log_det_hessian def _estimate_single_time_step( self, current_data: np.ndarray, lagged_data: np.ndarray, t: int, n_vars: int, model_order: int, n_trials: int, epsilon: float, ) -> Tuple[np.ndarray, np.ndarray, float, float]: """Estimate VAR coefficients for a single time step.""" cov_current = np.dot(current_data[:, t, :], current_data[:, t, :].T) / n_trials lagged_matrix = np.vstack( [ lagged_data[:, :model_order, t, :].reshape( n_vars * model_order, n_trials ), np.ones((1, n_trials)), ] ) cov_current_lagged = np.dot(current_data[:, t, :], lagged_matrix.T) / n_trials cov_lagged = np.dot(lagged_matrix, lagged_matrix.T) / n_trials cov_lagged_reg = regularize_if_singular(cov_lagged) coeff = np.linalg.solve(cov_lagged_reg, cov_current_lagged.T).T residual_cov = cov_current - np.dot(coeff, np.dot(cov_lagged, coeff.T)) residual_det = np.prod(np.diag(residual_cov)) lagged_cov_subset = cov_lagged[:-1, :-1] * n_trials log_det_hessian = ( model_order * n_vars**2 * np.log(n_trials) + n_vars * np.log(np.linalg.det(lagged_cov_subset)) - n_vars * model_order * np.log(residual_det or epsilon) ) return coeff[:, :-1], residual_cov, residual_det, log_det_hessian def _compute_inhomogeneous_statistics( self, residual_determinants: np.ndarray, n_time_steps: int, n_vars: int, epsilon: float, ) -> Tuple[float, float]: """Compute statistics for inhomogeneous time mode.""" determinants_clamped = np.where( residual_determinants < epsilon, epsilon, residual_determinants ) log_likelihood = ( -0.5 * n_time_steps * n_vars * np.log(2 * np.pi) - 0.5 * np.sum(np.log(determinants_clamped)) - 0.5 * n_time_steps * n_vars ) return log_likelihood, np.sum(residual_determinants) def _compute_homogeneous_statistics( self, current_data: np.ndarray, lagged_data: np.ndarray, n_vars: int, model_order: int, n_time_steps: int, n_trials: int, epsilon: float, ) -> Tuple[np.ndarray, np.ndarray, float, float]: """Compute statistics for homogeneous time mode.""" cov_current_mean = np.mean(current_data, axis=0) cov_current_lagged_mean = np.mean(lagged_data, axis=0) cov_lagged_mean = np.mean(lagged_data, axis=0) coefficients = np.dot( cov_current_lagged_mean.reshape(n_vars, n_vars * model_order), np.linalg.inv(cov_lagged_mean), ) residual_covariance = cov_current_mean - np.dot( coefficients, np.dot(cov_lagged_mean, coefficients.T) ) determinant = np.prod(np.diag(residual_covariance)) log_likelihood = ( -0.5 * n_time_steps * n_vars * np.log(2 * np.pi) - 0.5 * n_time_steps * np.log(determinant or epsilon) - 0.5 * n_time_steps * n_vars ) sum_log_det_hessian = n_vars * np.log( np.linalg.det(cov_lagged_mean * n_time_steps * n_trials) ) + n_vars * model_order * np.log(1 / (determinant or epsilon)) return coefficients, residual_covariance, log_likelihood, sum_log_det_hessian