"""
Model validation utilities for Dynamic Causal Strength (DCS).
This module provides tools for validating VAR models and assessing
their quality and reliability.
"""
import logging
from typing import Dict
import numpy as np
from scipy import stats
from trancit.core.base import BaseAnalyzer, BaseResult
from trancit.core.exceptions import ComputationError, ValidationError
logger = logging.getLogger(__name__)
class ModelValidationResult(BaseResult):
"""Result container for model validation analysis."""
def __init__(
self,
residual_autocorrelation: np.ndarray,
residual_normality: Dict[str, float],
model_stability: bool,
condition_number: float,
log_likelihood: float,
aic_score: float,
bic_score: float,
):
"""
Initialize model validation result.
Parameters
----------
residual_autocorrelation : np.ndarray
Autocorrelation of residuals
residual_normality : Dict[str, float]
Normality test results (p-values)
model_stability : bool
Whether the model is stable
condition_number : float
Condition number of the design matrix
log_likelihood : float
Log-likelihood of the model
aic_score : float
Akaike Information Criterion score
bic_score : float
Bayesian Information Criterion score
"""
super().__init__(
residual_autocorrelation=residual_autocorrelation,
residual_normality=residual_normality,
model_stability=model_stability,
condition_number=condition_number,
log_likelihood=log_likelihood,
aic_score=aic_score,
bic_score=bic_score,
)
[docs]
class ModelValidator(BaseAnalyzer):
"""
Model validator for VAR models.
This class provides comprehensive validation tools for assessing
the quality and reliability of fitted VAR models.
"""
[docs]
def __init__(
self,
max_lag: int = 10,
significance_level: float = 0.05,
stability_threshold: float = 1.0,
**kwargs,
):
"""
Initialize model validator.
Parameters
----------
max_lag : int
Maximum lag for autocorrelation testing
significance_level : float
Significance level for hypothesis tests
stability_threshold : float
Threshold for model stability (eigenvalues)
**kwargs
Additional configuration parameters
"""
super().__init__(
max_lag=max_lag,
significance_level=significance_level,
stability_threshold=stability_threshold,
**kwargs,
)
[docs]
def validate(
self,
coefficients: np.ndarray,
residuals: np.ndarray,
design_matrix: np.ndarray,
n_observations: int,
n_parameters: int,
**kwargs,
) -> ModelValidationResult:
"""
Perform comprehensive model validation.
Parameters
----------
coefficients : np.ndarray
VAR coefficients
residuals : np.ndarray
Model residuals
design_matrix : np.ndarray
Design matrix used for estimation
n_observations : int
Number of observations
n_parameters : int
Number of parameters in the model
**kwargs
Additional parameters
Returns
-------
ModelValidationResult
Model validation results
"""
self._validate_input_data(coefficients, residuals, design_matrix)
self._log_analysis_start(coefficients.shape)
try:
residual_autocorr = self._check_residual_autocorrelation(residuals)
residual_normality = self._check_residual_normality(residuals)
model_stability = self._check_model_stability(coefficients)
condition_number = self._compute_condition_number(design_matrix)
log_likelihood = self._compute_log_likelihood(residuals)
aic_score = self._compute_aic(log_likelihood, n_parameters, n_observations)
bic_score = self._compute_bic(log_likelihood, n_parameters, n_observations)
self._log_analysis_complete()
return ModelValidationResult(
residual_autocorrelation=residual_autocorr,
residual_normality=residual_normality,
model_stability=model_stability,
condition_number=condition_number,
log_likelihood=log_likelihood,
aic_score=aic_score,
bic_score=bic_score,
)
except Exception as e:
logger.error(f"Model validation failed: {e}")
raise ComputationError(
f"Model validation failed: {e}",
"validation_computation",
coefficients.shape,
)
def _validate_config(self) -> None:
"""Validate configuration parameters."""
if self.config["max_lag"] <= 0:
raise ValidationError(
"max_lag must be positive", "max_lag", self.config["max_lag"]
)
if not 0 < self.config["significance_level"] < 1:
raise ValidationError(
"significance_level must be between 0 and 1",
"significance_level",
self.config["significance_level"],
)
if self.config["stability_threshold"] <= 0:
raise ValidationError(
"stability_threshold must be positive",
"stability_threshold",
self.config["stability_threshold"],
)
def _validate_input_data( # type: ignore[override]
self, coefficients: np.ndarray, residuals: np.ndarray, design_matrix: np.ndarray
) -> None:
"""Validate input data format and dimensions."""
super()._validate_input_data(coefficients)
if not isinstance(residuals, np.ndarray):
raise ValidationError(
"residuals must be a NumPy array", "residuals_type", type(residuals)
)
if not isinstance(design_matrix, np.ndarray):
raise ValidationError(
"design_matrix must be a NumPy array",
"design_matrix_type",
type(design_matrix),
)
if np.any(np.isnan(coefficients)) or np.any(np.isinf(coefficients)):
raise ValidationError("coefficients contains NaN or Inf values")
if np.any(np.isnan(residuals)) or np.any(np.isinf(residuals)):
raise ValidationError("residuals contains NaN or Inf values")
def _check_residual_autocorrelation(self, residuals: np.ndarray) -> np.ndarray:
"""
Check for residual autocorrelation.
Parameters
----------
residuals : np.ndarray
Model residuals
Returns
-------
np.ndarray
Autocorrelation coefficients for different lags
"""
max_lag = min(self.config["max_lag"], residuals.shape[0] // 4)
autocorr = np.zeros(max_lag)
for lag in range(1, max_lag + 1):
var_autocorr = []
for var in range(residuals.shape[1]):
series = residuals[:, var]
if len(series) > lag:
corr = np.corrcoef(series[:-lag], series[lag:])[0, 1]
if not np.isnan(corr):
var_autocorr.append(corr)
if var_autocorr:
autocorr[lag - 1] = np.mean(var_autocorr)
return autocorr
def _check_residual_normality(self, residuals: np.ndarray) -> Dict[str, float]:
"""
Check residual normality using various tests.
Parameters
----------
residuals : np.ndarray
Model residuals
Returns
-------
Dict[str, float]
Dictionary with test names and p-values
"""
normality_tests = {}
for var in range(residuals.shape[1]):
series = residuals[:, var]
series = series[~np.isnan(series)]
if len(series) > 3:
try:
_, p_value = stats.shapiro(series)
normality_tests[f"shapiro_var_{var}"] = p_value
except (ValueError, RuntimeWarning):
normality_tests[f"shapiro_var_{var}"] = np.nan
try:
result = stats.anderson(series)
normality_tests[f"anderson_var_{var}"] = result.significance_level[
2
] # 5% level
except (ValueError, IndexError):
normality_tests[f"anderson_var_{var}"] = np.nan
return normality_tests
def _check_model_stability(self, coefficients: np.ndarray) -> bool:
"""
Check VAR model stability by examining eigenvalues.
Parameters
----------
coefficients : np.ndarray
VAR coefficients
Returns
-------
bool
True if model is stable, False otherwise
"""
try:
n_vars = coefficients.shape[0]
model_order = (coefficients.shape[1] - 1) // n_vars
companion_size = n_vars * model_order
companion_matrix = np.zeros((companion_size, companion_size))
for i in range(model_order - 1):
companion_matrix[
i * n_vars : (i + 1) * n_vars, (i + 1) * n_vars : (i + 2) * n_vars
] = np.eye(n_vars)
for i in range(model_order):
start_idx = i * n_vars
end_idx = (i + 1) * n_vars
companion_matrix[(model_order - 1) * n_vars :, start_idx:end_idx] = (
coefficients[:, i * n_vars : (i + 1) * n_vars]
)
eigenvalues = np.linalg.eigvals(companion_matrix)
max_eigenvalue = np.max(np.abs(eigenvalues))
return max_eigenvalue < self.config["stability_threshold"]
except Exception as e:
logger.warning(f"Model stability check failed: {e}")
return False
def _compute_condition_number(self, design_matrix: np.ndarray) -> float:
"""
Compute condition number of the design matrix.
Parameters
----------
design_matrix : np.ndarray
Design matrix
Returns
-------
float
Condition number
"""
try:
singular_values = np.linalg.svd(design_matrix, compute_uv=False)
condition_number = singular_values[0] / singular_values[-1]
return condition_number
except Exception as e:
logger.warning(f"Condition number computation failed: {e}")
return np.inf
def _compute_log_likelihood(self, residuals: np.ndarray) -> float:
"""
Compute log-likelihood of the model.
Parameters
----------
residuals : np.ndarray
Model residuals
Returns
-------
float
Log-likelihood value
"""
try:
residual_cov = np.cov(residuals.T)
n_obs = residuals.shape[0]
n_vars = residuals.shape[1]
log_det = np.log(np.linalg.det(residual_cov))
inv_cov = np.linalg.inv(residual_cov)
mahal_dist = np.sum(residuals @ inv_cov * residuals, axis=1)
log_likelihood = (
-0.5
* n_obs
* (n_vars * np.log(2 * np.pi) + log_det + np.mean(mahal_dist))
)
return log_likelihood
except Exception as e:
logger.warning(f"Log-likelihood computation failed: {e}")
return np.nan
def _compute_aic(
self, log_likelihood: float, n_parameters: int, n_observations: int
) -> float:
"""
Compute Akaike Information Criterion.
Parameters
----------
log_likelihood : float
Log-likelihood of the model
n_parameters : int
Number of parameters
n_observations : int
Number of observations
Returns
-------
float
AIC score
"""
if np.isnan(log_likelihood):
return np.nan
return 2 * n_parameters - 2 * log_likelihood
def _compute_bic(
self, log_likelihood: float, n_parameters: int, n_observations: int
) -> float:
"""
Compute Bayesian Information Criterion.
Parameters
----------
log_likelihood : float
Log-likelihood of the model
n_parameters : int
Number of parameters
n_observations : int
Number of observations
Returns
-------
float
BIC score
"""
if np.isnan(log_likelihood):
return np.nan
return np.log(n_observations) * n_parameters - 2 * log_likelihood