Tutorials
This section provides comprehensive tutorials for using TranCIT: Transient Causal Interaction Toolbox in different scenarios. Each tutorial builds on the previous ones and includes complete working examples.
Tutorial 1: Basic DCS Analysis
Learn the fundamentals of causality analysis with Dynamic Causal Strength (DCS).
Understanding the Data Format
DCS works with multivariate time series data in a specific format:
import numpy as np
from trancit import generate_signals
# Generate sample data
data, _, _ = generate_signals(
T=500, # Time points
Ntrial=10, # Number of trials/repetitions
h=0.1, # Time step
gamma1=0.5, # Damping parameter 1
gamma2=0.5, # Damping parameter 2
Omega1=1.0, # Natural frequency 1
Omega2=1.2 # Natural frequency 2
)
print(f"Data shape: {data.shape}")
# Output: (2, 500, 10) = (variables, time_points, trials)
# Visualize the data structure
print(f"Variable 1 shape: {data[0].shape}") # (500, 10)
print(f"Variable 2 shape: {data[1].shape}") # (500, 10)
# Each trial is a realization of the same underlying system
print(f"Trial 0 of variable 1: {data[0, :5, 0]}") # First 5 time points
print(f"Trial 1 of variable 1: {data[0, :5, 1]}") # Same time points, different trial
Key Points:
Shape:
(n_variables, n_timepoints, n_trials)Variables: DCS analyzes bivariate relationships (n_variables = 2)
Trials: Multiple realizations improve statistical robustness
Time: Each time point represents one measurement
Step-by-Step DCS Analysis
from trancit import DCSCalculator
import matplotlib.pyplot as plt
# Step 1: Create calculator with appropriate model order
calculator = DCSCalculator(
model_order=4, # Number of time lags to consider
time_mode="inhomo", # Time-inhomogeneous analysis (recommended)
use_diagonal_covariance=False # Use full covariance (more accurate)
)
# Step 2: Perform the analysis
result = calculator.analyze(data)
# Step 3: Examine the results
print(f"Analysis completed!")
print(f"Causal strength shape: {result.causal_strength.shape}")
print(f"Transfer entropy shape: {result.transfer_entropy.shape}")
print(f"Granger causality shape: {result.granger_causality.shape}")
# Step 4: Interpret the results
# Column 0: Y → X (variable 2 influences variable 1)
# Column 1: X → Y (variable 1 influences variable 2)
mean_dcs_x_to_y = result.causal_strength[:, 1].mean()
mean_dcs_y_to_x = result.causal_strength[:, 0].mean()
print(f"Mean causal strength X→Y: {mean_dcs_x_to_y:.4f}")
print(f"Mean causal strength Y→X: {mean_dcs_y_to_x:.4f}")
if mean_dcs_x_to_y > mean_dcs_y_to_x:
print("Stronger causality: X → Y")
else:
print("Stronger causality: Y → X")
Visualizing Results
# Create comprehensive visualization
fig, axes = plt.subplots(2, 2, figsize=(15, 10))
# Plot 1: Original signals (first trial)
axes[0, 0].plot(data[0, :100, 0], label='Variable X', alpha=0.8)
axes[0, 0].plot(data[1, :100, 0], label='Variable Y', alpha=0.8)
axes[0, 0].set_xlabel('Time')
axes[0, 0].set_ylabel('Amplitude')
axes[0, 0].set_title('Original Time Series (First 100 points)')
axes[0, 0].legend()
axes[0, 0].grid(True, alpha=0.3)
# Plot 2: Dynamic Causal Strength over time
axes[0, 1].plot(result.causal_strength[:, 1], label='X → Y', linewidth=2)
axes[0, 1].plot(result.causal_strength[:, 0], label='Y → X', linewidth=2)
axes[0, 1].set_xlabel('Time')
axes[0, 1].set_ylabel('Causal Strength')
axes[0, 1].set_title('Dynamic Causal Strength')
axes[0, 1].legend()
axes[0, 1].grid(True, alpha=0.3)
# Plot 3: Transfer Entropy
axes[1, 0].plot(result.transfer_entropy[:, 1], label='X → Y', linewidth=2)
axes[1, 0].plot(result.transfer_entropy[:, 0], label='Y → X', linewidth=2)
axes[1, 0].set_xlabel('Time')
axes[1, 0].set_ylabel('Transfer Entropy')
axes[1, 0].set_title('Transfer Entropy')
axes[1, 0].legend()
axes[1, 0].grid(True, alpha=0.3)
# Plot 4: Comparison of measures
time_points = range(len(result.causal_strength))
width = 0.35
mean_measures = {
'DCS': [result.causal_strength[:, 0].mean(), result.causal_strength[:, 1].mean()],
'TE': [result.transfer_entropy[:, 0].mean(), result.transfer_entropy[:, 1].mean()],
'GC': [result.granger_causality[:, 0].mean(), result.granger_causality[:, 1].mean()]
}
x = np.arange(2) # Y→X, X→Y
axes[1, 1].bar(x - width, [mean_measures['DCS'][0], mean_measures['DCS'][1]],
width, label='DCS', alpha=0.8)
axes[1, 1].bar(x, [mean_measures['TE'][0], mean_measures['TE'][1]],
width, label='TE', alpha=0.8)
axes[1, 1].bar(x + width, [mean_measures['GC'][0], mean_measures['GC'][1]],
width, label='GC', alpha=0.8)
axes[1, 1].set_xlabel('Direction')
axes[1, 1].set_ylabel('Mean Value')
axes[1, 1].set_title('Comparison of Causality Measures')
axes[1, 1].set_xticks(x)
axes[1, 1].set_xticklabels(['Y → X', 'X → Y'])
axes[1, 1].legend()
axes[1, 1].grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
Tutorial 2: Event-Based Analysis with Pipeline
Learn how to detect and analyze specific events in time series data.
Understanding Event Detection
Event-based analysis focuses on detecting specific time windows of interest and analyzing causality within those windows.
import numpy as np
from trancit import generate_signals, PipelineOrchestrator
from trancit.config import (
PipelineConfig, PipelineOptions, DetectionParams,
CausalParams, BicParams, OutputParams
)
# Generate data with clearer event structure
np.random.seed(42)
data, _, _ = generate_signals(
T=800, Ntrial=15, h=0.1,
gamma1=0.3, gamma2=0.7, # Asymmetric damping
Omega1=0.8, Omega2=1.4 # Different frequencies
)
# Prepare signals for event detection
original_signal = np.mean(data, axis=2) # Average over trials
# Create detection signal with enhanced events
detection_signal = original_signal.copy()
# Add some artificial "events" for demonstration
event_times = [200, 400, 600]
for t in event_times:
# Enhance signal at event times
detection_signal[:, t-10:t+10] *= 2.0
detection_signal[:, t-5:t+5] += np.random.randn(2, 10) * 0.5
print(f"Original signal shape: {original_signal.shape}")
print(f"Detection signal shape: {detection_signal.shape}")
Configuring the Pipeline
The pipeline configuration controls every aspect of the analysis:
# Create comprehensive pipeline configuration
config = PipelineConfig(
# Main analysis options
options=PipelineOptions(
detection=True, # Enable event detection
bic=True, # Enable BIC model selection
causal_analysis=True, # Enable causality analysis
bootstrap=False, # Skip bootstrap for speed
save_flag=False, # Don't save intermediate files
debiased_stats=False # Skip debiased analysis for speed
),
# Event detection parameters
detection=DetectionParams(
thres_ratio=1.8, # Threshold for event detection (lower = more events)
align_type="peak", # Align events to their peaks
l_extract=100, # Length of extracted event windows
l_start=50, # Starting point within extracted windows
shrink_flag=False, # Don't apply shrinkage
remove_artif=True, # Remove artifact-contaminated trials
locs=None # Automatically detect event locations
),
# BIC model selection parameters
bic=BicParams(
morder=4, # Default model order
momax=8, # Maximum model order to test
mode="OLS", # Ordinary Least Squares
tau=1 # Smoothing parameter
),
# Causality analysis parameters
causal=CausalParams(
ref_time=50, # Reference time for rDCS (should match l_start)
estim_mode="OLS", # Estimation method
diag_flag=False, # Use full covariance matrix
old_version=False # Use new rDCS calculation method
),
# Output parameters
output=OutputParams(
file_keyword="tutorial_events"
)
)
print("Configuration created successfully")
Running the Event-Based Analysis
# Create and run the pipeline orchestrator
orchestrator = PipelineOrchestrator(config)
try:
print("Starting pipeline analysis...")
# Run the complete pipeline
result = orchestrator.run(original_signal, detection_signal)
print("Pipeline completed successfully!")
# Examine the pipeline results
print(f"Event snapshots shape: {result.event_snapshots.shape}")
if result.results.get('locs') is not None:
detected_events = result.results['locs']
print(f"Number of events detected: {len(detected_events)}")
print(f"Event locations: {detected_events}")
# Access causality results if available
if result.results.get("CausalOutput"):
causal_output = result.results["CausalOutput"]["OLS"]
if "DCS" in causal_output:
dcs_results = causal_output["DCS"]
print(f"DCS results shape: {dcs_results.shape}")
print(f"Mean DCS X→Y: {dcs_results[:, 1].mean():.4f}")
print(f"Mean DCS Y→X: {dcs_results[:, 0].mean():.4f}")
# Analyze individual events
print("\nPer-event analysis:")
for i, (dcs_xy, dcs_yx) in enumerate(dcs_results):
print(f"Event {i+1}: X→Y={dcs_xy:.4f}, Y→X={dcs_yx:.4f}")
if "rDCS" in causal_output:
rdcs_results = causal_output["rDCS"]
print(f"Relative DCS shape: {rdcs_results.shape}")
print(f"Mean rDCS X→Y: {rdcs_results[:, 1].mean():.4f}")
print(f"Mean rDCS Y→X: {rdcs_results[:, 0].mean():.4f}")
else:
print("No causal output generated")
print("This might happen if no events were detected or analysis failed")
except Exception as e:
print(f"Pipeline analysis failed: {e}")
print("Common issues:")
print("1. No events detected - try lowering thres_ratio")
print("2. Insufficient data - try shorter l_extract or more data")
print("3. Numerical issues - try different model parameters")
Visualizing Event-Based Results
# Create visualization of event-based analysis
if 'result' in locals() and result.results.get("CausalOutput"):
fig, axes = plt.subplots(3, 1, figsize=(15, 12))
# Plot 1: Original signals with detected events
time_axis = np.arange(original_signal.shape[1])
axes[0].plot(time_axis, original_signal[0], label='Variable X', alpha=0.7)
axes[0].plot(time_axis, original_signal[1], label='Variable Y', alpha=0.7)
# Mark detected events
if result.results.get('locs') is not None:
for loc in result.results['locs']:
axes[0].axvline(x=loc, color='red', linestyle='--', alpha=0.7)
axes[0].text(loc, axes[0].get_ylim()[1]*0.9, 'Event',
rotation=90, fontsize=8)
axes[0].set_xlabel('Time')
axes[0].set_ylabel('Amplitude')
axes[0].set_title('Original Signals with Detected Events')
axes[0].legend()
axes[0].grid(True, alpha=0.3)
# Plot 2: Event-wise causality measures
if "DCS" in result.results["CausalOutput"]["OLS"]:
dcs_data = result.results["CausalOutput"]["OLS"]["DCS"]
event_nums = range(1, len(dcs_data) + 1)
width = 0.35
x = np.arange(len(event_nums))
axes[1].bar(x - width/2, dcs_data[:, 0], width,
label='Y → X', alpha=0.8, color='blue')
axes[1].bar(x + width/2, dcs_data[:, 1], width,
label='X → Y', alpha=0.8, color='red')
axes[1].set_xlabel('Event Number')
axes[1].set_ylabel('Dynamic Causal Strength')
axes[1].set_title('Per-Event Causality Analysis')
axes[1].set_xticks(x)
axes[1].set_xticklabels(event_nums)
axes[1].legend()
axes[1].grid(True, alpha=0.3)
# Plot 3: Comparison of measures across events
if ("DCS" in result.results["CausalOutput"]["OLS"] and
"TE" in result.results["CausalOutput"]["OLS"]):
dcs_data = result.results["CausalOutput"]["OLS"]["DCS"]
te_data = result.results["CausalOutput"]["OLS"]["TE"]
# Plot X→Y direction
axes[2].plot(dcs_data[:, 1], 'o-', label='DCS (X→Y)', linewidth=2)
axes[2].plot(te_data[:, 1], 's-', label='TE (X→Y)', linewidth=2)
axes[2].set_xlabel('Event Number')
axes[2].set_ylabel('Causality Measure')
axes[2].set_title('Causality Measures Across Events (X→Y Direction)')
axes[2].legend()
axes[2].grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
Tutorial 3: Advanced Configuration and Optimization
Learn how to optimize DCS analysis for different types of data and research questions.
Model Order Selection
Choosing the right model order is crucial for accurate causality analysis:
from trancit import DCSCalculator
from trancit.models import BICSelector
import matplotlib.pyplot as plt
# Generate test data
np.random.seed(123)
data, _, _ = generate_signals(T=600, Ntrial=20, h=0.1,
gamma1=0.4, gamma2=0.6,
Omega1=1.1, Omega2=0.9)
# Method 1: Manual comparison of different model orders
model_orders = [1, 2, 3, 4, 5, 6, 7, 8]
causality_results = {}
print("Testing different model orders...")
for order in model_orders:
try:
calculator = DCSCalculator(model_order=order, time_mode="inhomo")
result = calculator.analyze(data)
causality_results[order] = {
'dcs_mean': result.causal_strength.mean(axis=0),
'te_mean': result.transfer_entropy.mean(axis=0),
'gc_mean': result.granger_causality.mean(axis=0)
}
print(f"Order {order}: DCS X→Y = {result.causal_strength[:, 1].mean():.4f}")
except Exception as e:
print(f"Order {order} failed: {e}")
causality_results[order] = None
# Method 2: Automatic BIC-based selection
print("\nUsing BIC for automatic model selection...")
try:
bic_selector = BICSelector(max_order=10, mode="OLS")
# Prepare data for BIC analysis (requires specific format)
bic_data = data.copy() # Shape: (2, T, N)
# BIC analysis parameters
bic_params = {
"Params": {
"BIC": {
"momax": 8,
"mode": "OLS"
}
},
"EstimMode": "OLS"
}
# Run BIC analysis
bic_results = bic_selector.compute_multi_trial_BIC(bic_data, bic_params)
if bic_results and 'morder' in bic_results:
optimal_order = bic_results['morder']
print(f"BIC-selected optimal model order: {optimal_order}")
# Use optimal order for final analysis
calculator = DCSCalculator(model_order=optimal_order, time_mode="inhomo")
final_result = calculator.analyze(data)
print(f"Final analysis with order {optimal_order}:")
print(f"DCS X→Y: {final_result.causal_strength[:, 1].mean():.4f}")
print(f"DCS Y→X: {final_result.causal_strength[:, 0].mean():.4f}")
except Exception as e:
print(f"BIC selection failed: {e}")
print("Using default model order 4")
optimal_order = 4
Handling Different Data Types
from trancit.utils.preprocess import normalize_data, check_data_quality
# Simulate different types of real-world data issues
np.random.seed(456)
# 1. Noisy data
print("1. Handling noisy data:")
noisy_data, _, _ = generate_signals(T=400, Ntrial=15, h=0.1,
gamma1=0.5, gamma2=0.5,
Omega1=1.0, Omega2=1.2)
# Add significant noise
noisy_data += np.random.randn(*noisy_data.shape) * 0.5
# Check data quality
quality_issues = check_data_quality(noisy_data)
print(f"Data quality issues: {quality_issues}")
# Normalize to improve analysis
clean_data = normalize_data(noisy_data, method="zscore", axis=1)
# Compare results
calc = DCSCalculator(model_order=4)
try:
result_noisy = calc.analyze(noisy_data)
result_clean = calc.analyze(clean_data)
print(f"Noisy data DCS X→Y: {result_noisy.causal_strength[:, 1].mean():.4f}")
print(f"Clean data DCS X→Y: {result_clean.causal_strength[:, 1].mean():.4f}")
except Exception as e:
print(f"Analysis failed: {e}")
# 2. Short time series
print("\n2. Handling short time series:")
short_data, _, _ = generate_signals(T=100, Ntrial=25, h=0.1, # More trials, less time
gamma1=0.5, gamma2=0.5,
Omega1=1.0, Omega2=1.2)
# Use smaller model order for short series
calc_short = DCSCalculator(model_order=2, time_mode="homo") # Homogeneous for short data
try:
result_short = calc_short.analyze(short_data)
print(f"Short series DCS X→Y: {result_short.causal_strength[:, 1].mean():.4f}")
except Exception as e:
print(f"Short series analysis failed: {e}")
# 3. Highly correlated data
print("\n3. Handling highly correlated data:")
T, N = 300, 20
# Generate highly correlated signals
base_signal = np.random.randn(T, N)
corr_data = np.zeros((2, T, N))
corr_data[0] = base_signal
corr_data[1] = 0.95 * base_signal + 0.05 * np.random.randn(T, N) # Very high correlation
# Add regularization for numerical stability
try:
calc_reg = DCSCalculator(model_order=3, use_diagonal_covariance=True)
result_corr = calc_reg.analyze(corr_data)
print(f"Highly correlated data DCS X→Y: {result_corr.causal_strength[:, 1].mean():.4f}")
except Exception as e:
print(f"Correlated data analysis failed: {e}")
Performance Optimization
import time
from trancit import PipelineOrchestrator
from trancit.config import PipelineConfig, PipelineOptions, DetectionParams, CausalParams
# Generate larger dataset for performance testing
large_data, _, _ = generate_signals(T=2000, Ntrial=30, h=0.1,
gamma1=0.5, gamma2=0.5,
Omega1=1.0, Omega2=1.2)
original_signal = np.mean(large_data, axis=2)
detection_signal = original_signal * 1.2
print("Performance optimization comparison:")
# Configuration 1: Full analysis (slower but comprehensive)
full_config = PipelineConfig(
options=PipelineOptions(
detection=True,
bic=True, # BIC is computationally expensive
causal_analysis=True,
bootstrap=True, # Bootstrap is time-consuming
debiased_stats=True # Additional computational overhead
),
detection=DetectionParams(thres_ratio=2.0, l_extract=200, l_start=100),
causal=CausalParams(ref_time=100, estim_mode="OLS"),
# ... other params
)
# Configuration 2: Fast analysis (faster but less comprehensive)
fast_config = PipelineConfig(
options=PipelineOptions(
detection=True,
bic=False, # Skip BIC for speed
causal_analysis=True,
bootstrap=False, # Skip bootstrap for speed
debiased_stats=False # Skip debiased analysis for speed
),
detection=DetectionParams(thres_ratio=2.0, l_extract=100, l_start=50), # Shorter windows
causal=CausalParams(ref_time=50, estim_mode="OLS"),
# ... other params
)
# Time both approaches
configurations = [
("Fast Configuration", fast_config),
# ("Full Configuration", full_config) # Uncomment for comparison
]
for name, config in configurations:
try:
start_time = time.time()
orchestrator = PipelineOrchestrator(config)
result = orchestrator.run(original_signal, detection_signal)
end_time = time.time()
print(f"{name}: {end_time - start_time:.2f} seconds")
if result.results.get("CausalOutput"):
causal_data = result.results["CausalOutput"]["OLS"]
if "DCS" in causal_data:
n_events = len(causal_data["DCS"])
print(f" - Events detected: {n_events}")
print(f" - Mean DCS X→Y: {causal_data['DCS'][:, 1].mean():.4f}")
except Exception as e:
print(f"{name} failed: {e}")
Statistical Validation
from scipy import stats
import numpy as np
# Generate data with known causal structure for validation
np.random.seed(789)
# Create signals where X clearly influences Y
T, N = 500, 25
signal_x = np.random.randn(T, N)
signal_y = np.zeros((T, N))
# Y depends on past values of X (true causality X→Y)
for t in range(2, T):
signal_y[t] = (0.7 * signal_y[t-1] -
0.2 * signal_y[t-2] +
0.4 * signal_x[t-1] + # Clear X→Y influence
0.1 * signal_x[t-2] +
np.random.randn(N) * 0.3)
validation_data = np.array([signal_x.T, signal_y.T])
# Multiple analyses for statistical validation
n_bootstrap = 20
dcs_xy_values = []
dcs_yx_values = []
print("Statistical validation with bootstrap sampling:")
for i in range(n_bootstrap):
# Random resampling of trials
trial_indices = np.random.choice(N, size=N, replace=True)
bootstrap_data = validation_data[:, :, trial_indices]
calc = DCSCalculator(model_order=3, time_mode="inhomo")
result = calc.analyze(bootstrap_data)
dcs_xy_values.append(result.causal_strength[:, 1].mean())
dcs_yx_values.append(result.causal_strength[:, 0].mean())
# Statistical analysis
dcs_xy_mean = np.mean(dcs_xy_values)
dcs_yx_mean = np.mean(dcs_yx_values)
dcs_xy_std = np.std(dcs_xy_values)
dcs_yx_std = np.std(dcs_yx_values)
print(f"DCS X→Y: {dcs_xy_mean:.4f} ± {dcs_xy_std:.4f}")
print(f"DCS Y→X: {dcs_yx_mean:.4f} ± {dcs_yx_std:.4f}")
# Statistical test for significant difference
t_stat, p_value = stats.ttest_rel(dcs_xy_values, dcs_yx_values)
print(f"Statistical test (paired t-test):")
print(f"t-statistic: {t_stat:.4f}, p-value: {p_value:.4f}")
if p_value < 0.05:
if dcs_xy_mean > dcs_yx_mean:
print("Significant causality detected: X → Y")
else:
print("Significant causality detected: Y → X")
else:
print("No significant causal asymmetry detected")
Tutorial 4: Real-World Applications
Apply DCS to realistic neuroscience and time series analysis scenarios.
Neural Data Analysis
# Simulate Local Field Potential (LFP) data
def simulate_lfp_data(duration=10.0, sampling_rate=1000, n_trials=30):
"""Simulate realistic LFP data with event-related responses."""
n_samples = int(duration * sampling_rate)
n_channels = 2
# Base oscillatory activity
t = np.linspace(0, duration, n_samples)
data = np.zeros((n_channels, n_samples, n_trials))
for trial in range(n_trials):
# Base activity with multiple frequency components
alpha_freq = 10 + np.random.randn() * 1 # 8-12 Hz alpha
beta_freq = 20 + np.random.randn() * 3 # 15-25 Hz beta
gamma_freq = 40 + np.random.randn() * 5 # 35-45 Hz gamma
# Channel 1: Mix of frequencies
data[0, :, trial] = (
0.5 * np.sin(2 * np.pi * alpha_freq * t) +
0.3 * np.sin(2 * np.pi * beta_freq * t) +
0.2 * np.sin(2 * np.pi * gamma_freq * t) +
np.random.randn(n_samples) * 0.1
)
# Channel 2: Influenced by Channel 1 with delay
delay_samples = 5 # 5ms delay at 1kHz sampling
data[1, delay_samples:, trial] = (
0.6 * data[1, :-delay_samples, trial] + # AR component
0.4 * data[0, :-delay_samples, trial] + # Channel 1 influence
0.2 * np.sin(2 * np.pi * beta_freq * t[:-delay_samples]) +
np.random.randn(n_samples - delay_samples) * 0.15
)
# Add event-related responses at random times
n_events = np.random.poisson(3) # Average 3 events per trial
event_times = np.random.randint(1000, n_samples-1000, n_events)
for event_time in event_times:
# Event response in both channels
event_duration = 200 # 200ms event
event_window = slice(event_time, event_time + event_duration)
# Enhanced coupling during events
data[0, event_window, trial] *= 1.5
data[1, event_window, trial] += 0.3 * data[0, event_window, trial]
return data
# Generate and analyze neural data
print("Analyzing simulated neural data...")
neural_data = simulate_lfp_data(duration=20.0, sampling_rate=500, n_trials=25)
print(f"Neural data shape: {neural_data.shape}")
print(f"Sampling rate: 500 Hz, Duration: 20s, Channels: 2, Trials: 25")
# Preprocess neural data
from trancit.utils.preprocess import normalize_data
neural_data_norm = normalize_data(neural_data, method="zscore", axis=1)
# DCS analysis with parameters suitable for neural data
neural_calculator = DCSCalculator(
model_order=6, # Higher order for complex neural dynamics
time_mode="inhomo", # Non-stationary neural activity
use_diagonal_covariance=False
)
try:
neural_result = neural_calculator.analyze(neural_data_norm)
print("Neural DCS Analysis Results:")
print(f"Mean causality Ch1→Ch2: {neural_result.causal_strength[:, 1].mean():.4f}")
print(f"Mean causality Ch2→Ch1: {neural_result.causal_strength[:, 0].mean():.4f}")
print(f"Mean transfer entropy Ch1→Ch2: {neural_result.transfer_entropy[:, 1].mean():.4f}")
# Identify periods of high causality
high_causality_threshold = np.percentile(neural_result.causal_strength[:, 1], 90)
high_causality_times = np.where(neural_result.causal_strength[:, 1] > high_causality_threshold)[0]
print(f"High causality periods (top 10%): {len(high_causality_times)} time points")
print(f"High causality times (first 10): {high_causality_times[:10]}")
except Exception as e:
print(f"Neural analysis failed: {e}")
Economic Time Series Analysis
# Simulate economic time series data
def simulate_economic_data(n_days=1000, n_series=2):
"""Simulate economic time series (e.g., stock prices, economic indicators)."""
# Generate correlated economic indicators
np.random.seed(999)
# Base economic trends
trend1 = np.cumsum(np.random.randn(n_days) * 0.01) # Random walk trend
trend2 = np.cumsum(np.random.randn(n_days) * 0.01)
# Economic cycles (business cycles, seasonal effects)
t = np.arange(n_days)
cycle1 = 0.1 * np.sin(2 * np.pi * t / 252) + 0.05 * np.sin(2 * np.pi * t / 365) # Annual cycle
cycle2 = 0.08 * np.sin(2 * np.pi * t / 180) + 0.06 * np.sin(2 * np.pi * t / 30) # Quarterly cycle
# Market volatility (GARCH-like)
volatility = np.zeros(n_days)
volatility[0] = 0.02
for i in range(1, n_days):
volatility[i] = 0.01 + 0.1 * (np.random.randn(1)[0]**2) + 0.8 * volatility[i-1]
# Generate multiple trials (different market conditions)
n_trials = 15 # Different economic scenarios
data = np.zeros((n_series, n_days, n_trials))
for trial in range(n_trials):
# Economic shocks and events
shock_times = np.random.choice(n_days, size=np.random.poisson(5), replace=False)
shock_magnitudes = np.random.randn(len(shock_times)) * 0.05
# Series 1: Leading economic indicator
noise1 = np.random.randn(n_days) * volatility * (0.8 + 0.4 * np.random.randn())
data[0, :, trial] = trend1 + cycle1 + noise1
# Add economic shocks
for shock_time, shock_mag in zip(shock_times, shock_magnitudes):
data[0, shock_time:shock_time+10, trial] += shock_mag
# Series 2: Lagging indicator (influenced by Series 1)
noise2 = np.random.randn(n_days) * volatility * (0.9 + 0.2 * np.random.randn())
for i in range(5, n_days): # 5-day lag
data[1, i, trial] = (
trend2[i] + cycle2[i] + noise2[i] +
0.3 * data[0, i-5, trial] + # 5-day lagged influence
0.2 * data[0, i-3, trial] + # 3-day lagged influence
0.1 * data[0, i-1, trial] # 1-day lagged influence
)
return data
# Analyze economic data
print("\nAnalyzing simulated economic time series...")
economic_data = simulate_economic_data(n_days=500, n_series=2)
print(f"Economic data shape: {economic_data.shape}")
# Difference the data to remove trends (common in economic analysis)
diff_data = np.diff(economic_data, axis=1) # First difference
print(f"Differenced data shape: {diff_data.shape}")
# Economic DCS analysis
econ_calculator = DCSCalculator(
model_order=8, # Higher order for economic lags
time_mode="inhomo", # Non-stationary economic conditions
use_diagonal_covariance=False
)
try:
econ_result = econ_calculator.analyze(diff_data)
print("Economic DCS Analysis Results:")
print(f"Leading→Lagging causality: {econ_result.causal_strength[:, 1].mean():.4f}")
print(f"Lagging→Leading causality: {econ_result.causal_strength[:, 0].mean():.4f}")
# Expected: Leading should have stronger influence on Lagging
if econ_result.causal_strength[:, 1].mean() > econ_result.causal_strength[:, 0].mean():
print("✓ Expected pattern detected: Leading indicator influences lagging indicator")
else:
print("⚠ Unexpected pattern: Check data generation or model parameters")
# Time-varying causality analysis
causality_strength = econ_result.causal_strength[:, 1] # Leading→Lagging
# Identify periods of strong/weak causality
strong_periods = causality_strength > np.percentile(causality_strength, 75)
weak_periods = causality_strength < np.percentile(causality_strength, 25)
print(f"Strong causality periods: {np.sum(strong_periods)} time points")
print(f"Weak causality periods: {np.sum(weak_periods)} time points")
except Exception as e:
print(f"Economic analysis failed: {e}")
Multi-Scale Analysis
# Multi-scale temporal analysis
def multiscale_analysis(data, scales=[1, 2, 4, 8]):
"""Perform DCS analysis at multiple temporal scales."""
results = {}
for scale in scales:
print(f"Analyzing at scale {scale}...")
# Downsample data
if scale == 1:
scaled_data = data
else:
# Average over non-overlapping windows
n_vars, n_time, n_trials = data.shape
n_time_scaled = n_time // scale
scaled_data = np.zeros((n_vars, n_time_scaled, n_trials))
for i in range(n_time_scaled):
start_idx = i * scale
end_idx = start_idx + scale
scaled_data[:, i, :] = data[:, start_idx:end_idx, :].mean(axis=1)
# DCS analysis at this scale
try:
# Adjust model order for scale
model_order = max(2, 6 // scale) # Fewer lags for coarser scales
calc = DCSCalculator(
model_order=model_order,
time_mode="inhomo"
)
result = calc.analyze(scaled_data)
results[scale] = {
'causal_strength': result.causal_strength,
'transfer_entropy': result.transfer_entropy,
'mean_dcs_xy': result.causal_strength[:, 1].mean(),
'mean_dcs_yx': result.causal_strength[:, 0].mean(),
'model_order': model_order
}
print(f" Scale {scale}: DCS X→Y = {results[scale]['mean_dcs_xy']:.4f}")
except Exception as e:
print(f" Scale {scale} failed: {e}")
results[scale] = None
return results
# Perform multi-scale analysis on neural data
print("\nMulti-scale causality analysis:")
if 'neural_data_norm' in locals():
multiscale_results = multiscale_analysis(neural_data_norm, scales=[1, 2, 4, 8])
# Visualize scale-dependent causality
scales = []
causality_values = []
for scale, result in multiscale_results.items():
if result is not None:
scales.append(scale)
causality_values.append(result['mean_dcs_xy'])
if len(scales) > 0:
plt.figure(figsize=(10, 6))
plt.semilogx(scales, causality_values, 'o-', linewidth=2, markersize=8)
plt.xlabel('Temporal Scale')
plt.ylabel('Mean Causal Strength (X→Y)')
plt.title('Multi-Scale Causality Analysis')
plt.grid(True, alpha=0.3)
plt.show()
print(f"Scale dependency analysis:")
print(f"Fine scale (1): {causality_values[0]:.4f}")
if len(causality_values) > 1:
print(f"Coarse scale ({scales[-1]}): {causality_values[-1]:.4f}")
if causality_values[0] > causality_values[-1]:
print("→ Causality stronger at fine temporal scales")
else:
print("→ Causality stronger at coarse temporal scales")
Tutorial 5: Best Practices and Troubleshooting
Learn best practices for robust DCS analysis and how to troubleshoot common issues.
Data Quality Assessment
from trancit.utils.preprocess import check_data_quality, normalize_data
import warnings
def comprehensive_data_check(data, description=""):
"""Perform comprehensive data quality assessment."""
print(f"\n=== Data Quality Assessment: {description} ===")
# Basic properties
print(f"Shape: {data.shape}")
print(f"Data type: {data.dtype}")
print(f"Memory usage: {data.nbytes / 1024**2:.2f} MB")
# Statistical properties
print(f"Mean: {data.mean():.4f}")
print(f"Std: {data.std():.4f}")
print(f"Min: {data.min():.4f}")
print(f"Max: {data.max():.4f}")
# Check for problematic values
n_nan = np.isnan(data).sum()
n_inf = np.isinf(data).sum()
n_zero = (data == 0).sum()
print(f"NaN values: {n_nan}")
print(f"Inf values: {n_inf}")
print(f"Zero values: {n_zero}")
if n_nan > 0:
warnings.warn(f"Found {n_nan} NaN values - may cause analysis failure")
if n_inf > 0:
warnings.warn(f"Found {n_inf} infinite values - may cause numerical issues")
# Check variance across trials
if data.ndim == 3:
trial_vars = np.var(data, axis=(0, 1)) # Variance of each trial
low_var_trials = np.sum(trial_vars < 0.01 * np.median(trial_vars))
print(f"Low variance trials: {low_var_trials}/{data.shape[2]}")
if low_var_trials > data.shape[2] * 0.2:
warnings.warn("Many trials have very low variance - check data quality")
# Check stationarity (simplified)
if data.ndim >= 2:
first_half_mean = data[:, :data.shape[1]//2].mean()
second_half_mean = data[:, data.shape[1]//2:].mean()
mean_diff = abs(first_half_mean - second_half_mean)
print(f"First/second half mean difference: {mean_diff:.4f}")
if mean_diff > data.std():
print("⚠ Large mean differences between halves - data may be non-stationary")
else:
print("✓ Mean appears relatively stable")
# Recommended actions
recommendations = []
if n_nan > 0 or n_inf > 0:
recommendations.append("Remove or interpolate NaN/Inf values")
if data.std() < 1e-6:
recommendations.append("Data has very low variance - check scaling")
elif data.std() > 1e6:
recommendations.append("Data has very high variance - consider normalization")
if data.shape[1] < 50:
recommendations.append("Short time series - consider lower model order")
if data.ndim == 3 and data.shape[2] < 5:
recommendations.append("Few trials - results may be less robust")
if recommendations:
print("Recommendations:")
for rec in recommendations:
print(f" • {rec}")
else:
print("✓ Data quality looks good")
return {
'n_nan': n_nan,
'n_inf': n_inf,
'mean_diff': mean_diff if data.ndim >= 2 else None,
'recommendations': recommendations
}
# Test with various data quality scenarios
print("Testing data quality assessment...")
# 1. Good quality data
good_data, _, _ = generate_signals(T=400, Ntrial=20, h=0.1,
gamma1=0.5, gamma2=0.5,
Omega1=1.0, Omega2=1.2)
comprehensive_data_check(good_data, "Good Quality Data")
# 2. Problematic data
bad_data = good_data.copy()
bad_data[0, 100:110, :] = np.nan # Introduce NaN values
bad_data[1, 200, 0] = np.inf # Introduce Inf value
bad_data *= 1e8 # Make values very large
comprehensive_data_check(bad_data, "Problematic Data")
Robust Analysis Pipeline
def robust_dcs_analysis(data, description="", max_model_order=8):
"""Perform robust DCS analysis with automatic parameter adjustment."""
print(f"\n=== Robust DCS Analysis: {description} ===")
# Step 1: Data quality check
quality_results = comprehensive_data_check(data, description)
# Step 2: Data preprocessing based on quality assessment
processed_data = data.copy()
if quality_results['n_nan'] > 0 or quality_results['n_inf'] > 0:
print("Cleaning data...")
# Replace NaN and Inf with interpolated values
from scipy.interpolate import interp1d
for var in range(processed_data.shape[0]):
for trial in range(processed_data.shape[2]):
signal = processed_data[var, :, trial]
# Find valid (non-NaN, non-Inf) indices
valid_mask = np.isfinite(signal)
if np.sum(valid_mask) > 10: # Need some valid points
valid_indices = np.where(valid_mask)[0]
invalid_indices = np.where(~valid_mask)[0]
if len(invalid_indices) > 0:
# Linear interpolation
f = interp1d(valid_indices, signal[valid_indices],
bounds_error=False, fill_value='extrapolate')
processed_data[var, invalid_indices, trial] = f(invalid_indices)
# Normalize data
if processed_data.std() > 1000 or processed_data.std() < 0.001:
print("Normalizing data...")
processed_data = normalize_data(processed_data, method="zscore", axis=1)
# Step 3: Adaptive model order selection
n_time = processed_data.shape[1]
max_reasonable_order = min(max_model_order, n_time // 10) # Rule of thumb
print(f"Testing model orders from 1 to {max_reasonable_order}...")
best_order = None
best_result = None
order_scores = {}
for order in range(1, max_reasonable_order + 1):
try:
calc = DCSCalculator(model_order=order, time_mode="inhomo")
result = calc.analyze(processed_data)
# Score based on finite values and reasonable magnitudes
dcs_values = result.causal_strength
if np.all(np.isfinite(dcs_values)) and np.all(dcs_values >= 0):
# Simple scoring: prefer moderate values, penalize extreme values
mean_dcs = dcs_values.mean()
std_dcs = dcs_values.std()
score = mean_dcs - 2 * (std_dcs > mean_dcs * 2) # Penalize high variability
order_scores[order] = score
if best_order is None or score > order_scores[best_order]:
best_order = order
best_result = result
print(f" Order {order}: Score = {score:.4f}")
else:
print(f" Order {order}: Failed (non-finite or negative values)")
except Exception as e:
print(f" Order {order}: Failed ({str(e)[:50]}...)")
# Step 4: Final analysis with best parameters
if best_result is not None:
print(f"\nBest model order: {best_order}")
print(f"Final results:")
print(f" DCS X→Y: {best_result.causal_strength[:, 1].mean():.4f}")
print(f" DCS Y→X: {best_result.causal_strength[:, 0].mean():.4f}")
print(f" TE X→Y: {best_result.transfer_entropy[:, 1].mean():.4f}")
print(f" TE Y→X: {best_result.transfer_entropy[:, 0].mean():.4f}")
# Confidence assessment
dcs_xy_std = best_result.causal_strength[:, 1].std()
dcs_yx_std = best_result.causal_strength[:, 0].std()
print(f" DCS X→Y variability: {dcs_xy_std:.4f}")
print(f" DCS Y→X variability: {dcs_yx_std:.4f}")
if dcs_xy_std < 0.1 and dcs_yx_std < 0.1:
print(" ✓ Low variability - results appear stable")
else:
print(" ⚠ High variability - results may be less reliable")
return best_result, best_order, processed_data
else:
print("❌ No successful analysis found")
print("Recommendations:")
print(" • Check data format (should be 3D: variables × time × trials)")
print(" • Ensure sufficient data length (>100 time points recommended)")
print(" • Verify data contains meaningful signal (not just noise)")
return None, None, processed_data
# Test robust analysis
test_data, _, _ = generate_signals(T=300, Ntrial=15, h=0.1,
gamma1=0.4, gamma2=0.6,
Omega1=0.9, Omega2=1.3)
robust_result, best_order, clean_data = robust_dcs_analysis(test_data, "Test Analysis")
Common Issues and Solutions
# Demonstrate common issues and their solutions
print("\n=== Common Issues and Solutions ===")
# Issue 1: Insufficient data length
print("\n1. Issue: Insufficient data length")
short_data, _, _ = generate_signals(T=50, Ntrial=10, h=0.1, # Very short
gamma1=0.5, gamma2=0.5,
Omega1=1.0, Omega2=1.2)
print("Attempting analysis with very short data...")
try:
calc = DCSCalculator(model_order=10, time_mode="inhomo") # Too high order
result = calc.analyze(short_data)
print("Analysis succeeded (unexpected)")
except Exception as e:
print(f"Analysis failed as expected: {type(e).__name__}")
print("Solution: Use lower model order or collect more data")
# Solution
try:
calc_fixed = DCSCalculator(model_order=2, time_mode="homo") # Lower order
result_fixed = calc_fixed.analyze(short_data)
print(f"✓ Fixed analysis succeeded: DCS X→Y = {result_fixed.causal_strength[:, 1].mean():.4f}")
except Exception as e2:
print(f"Still failed: {e2}")
# Issue 2: Highly correlated/singular data
print("\n2. Issue: Highly correlated data")
T, N = 200, 15
# Create perfectly correlated data
base_signal = np.random.randn(T, N)
singular_data = np.zeros((2, T, N))
singular_data[0] = base_signal
singular_data[1] = base_signal + 1e-10 * np.random.randn(T, N) # Nearly identical
print("Attempting analysis with highly correlated data...")
try:
calc = DCSCalculator(model_order=4, time_mode="inhomo")
result = calc.analyze(singular_data)
print("Analysis succeeded (unexpected)")
except Exception as e:
print(f"Analysis failed as expected: {type(e).__name__}")
print("Solution: Use diagonal covariance approximation or add regularization")
# Solution
try:
calc_fixed = DCSCalculator(model_order=3,
time_mode="inhomo",
use_diagonal_covariance=True) # Diagonal approximation
result_fixed = calc_fixed.analyze(singular_data)
print(f"✓ Fixed analysis succeeded: DCS X→Y = {result_fixed.causal_strength[:, 1].mean():.4f}")
except Exception as e2:
print(f"Still failed: {e2}")
# Issue 3: No events detected in pipeline
print("\n3. Issue: No events detected in pipeline")
# Create very smooth signals (no clear events)
smooth_data, _, _ = generate_signals(T=500, Ntrial=10, h=0.1,
gamma1=0.1, gamma2=0.1, # Low damping = smooth
Omega1=1.0, Omega2=1.0)
smooth_signal = np.mean(smooth_data, axis=2)
# Try with high threshold (likely to fail)
from trancit.config import PipelineConfig, PipelineOptions, DetectionParams, CausalParams, BicParams, OutputParams
high_threshold_config = PipelineConfig(
options=PipelineOptions(detection=True, causal_analysis=True),
detection=DetectionParams(thres_ratio=5.0, l_extract=50, l_start=25), # Very high threshold
causal=CausalParams(ref_time=25, estim_mode="OLS"),
bic=BicParams(),
output=OutputParams()
)
print("Attempting pipeline with high detection threshold...")
try:
orchestrator = PipelineOrchestrator(high_threshold_config)
result = orchestrator.run(smooth_signal, smooth_signal * 1.1)
if result.results.get('locs') is not None:
n_events = len(result.results['locs'])
print(f"Found {n_events} events")
else:
print("No events detected")
except Exception as e:
print(f"Pipeline failed: {type(e).__name__}")
print("Solution: Lower detection threshold")
# Solution: Lower threshold
low_threshold_config = PipelineConfig(
options=PipelineOptions(detection=True, causal_analysis=True),
detection=DetectionParams(thres_ratio=1.5, l_extract=50, l_start=25), # Lower threshold
causal=CausalParams(ref_time=25, estim_mode="OLS"),
bic=BicParams(),
output=OutputParams()
)
try:
orchestrator_fixed = PipelineOrchestrator(low_threshold_config)
result_fixed = orchestrator_fixed.run(smooth_signal, smooth_signal * 1.5) # More amplification
if result_fixed.results.get('locs') is not None:
n_events = len(result_fixed.results['locs'])
print(f"✓ Fixed pipeline found {n_events} events")
else:
print("Still no events detected - signal may be too smooth")
except Exception as e:
print(f"Fixed pipeline still failed: {e}")
Performance Monitoring
import time
import psutil
import os
def monitor_analysis_performance(data, description="", verbose=True):
"""Monitor memory and time performance of DCS analysis."""
if verbose:
print(f"\n=== Performance Monitoring: {description} ===")
# Initial system state
process = psutil.Process(os.getpid())
initial_memory = process.memory_info().rss / 1024**2 # MB
start_time = time.time()
if verbose:
print(f"Initial memory usage: {initial_memory:.1f} MB")
print(f"Data size: {data.nbytes / 1024**2:.2f} MB")
try:
# Perform analysis
calc = DCSCalculator(model_order=4, time_mode="inhomo")
result = calc.analyze(data)
# Final system state
end_time = time.time()
final_memory = process.memory_info().rss / 1024**2
analysis_time = end_time - start_time
memory_increase = final_memory - initial_memory
if verbose:
print(f"Analysis time: {analysis_time:.2f} seconds")
print(f"Memory increase: {memory_increase:.1f} MB")
print(f"Peak memory usage: {final_memory:.1f} MB")
# Performance metrics
data_throughput = data.size / analysis_time # elements per second
print(f"Data throughput: {data_throughput/1000:.1f}K elements/second")
# Efficiency assessment
if analysis_time < 1.0:
print("✓ Fast analysis")
elif analysis_time < 10.0:
print("○ Moderate analysis time")
else:
print("⚠ Slow analysis - consider optimization")
if memory_increase < 100:
print("✓ Low memory overhead")
elif memory_increase < 500:
print("○ Moderate memory usage")
else:
print("⚠ High memory usage - consider processing in chunks")
return {
'analysis_time': analysis_time,
'memory_increase': memory_increase,
'data_throughput': data_throughput,
'success': True
}
except Exception as e:
end_time = time.time()
analysis_time = end_time - start_time
if verbose:
print(f"Analysis failed after {analysis_time:.2f} seconds: {e}")
return {
'analysis_time': analysis_time,
'memory_increase': 0,
'data_throughput': 0,
'success': False,
'error': str(e)
}
# Test performance with different data sizes
data_sizes = [
(100, 10, "Small"),
(500, 20, "Medium"),
(1000, 30, "Large")
]
performance_results = {}
for T, N, size_name in data_sizes:
test_data, _, _ = generate_signals(T=T, Ntrial=N, h=0.1,
gamma1=0.5, gamma2=0.5,
Omega1=1.0, Omega2=1.2)
perf_result = monitor_analysis_performance(test_data, f"{size_name} Dataset ({T}×{N})")
performance_results[size_name] = perf_result
# Performance summary
print("\n=== Performance Summary ===")
for size_name, result in performance_results.items():
if result['success']:
print(f"{size_name}: {result['analysis_time']:.2f}s, {result['memory_increase']:.1f}MB")
else:
print(f"{size_name}: Failed - {result['error']}")