Tutorials

This section provides comprehensive tutorials for using TranCIT: Transient Causal Interaction Toolbox in different scenarios. Each tutorial builds on the previous ones and includes complete working examples.

Tutorial 1: Basic DCS Analysis

Learn the fundamentals of causality analysis with Dynamic Causal Strength (DCS).

Understanding the Data Format

DCS works with multivariate time series data in a specific format:

import numpy as np
from trancit import generate_signals

# Generate sample data
data, _, _ = generate_signals(
    T=500,        # Time points
    Ntrial=10,    # Number of trials/repetitions
    h=0.1,        # Time step
    gamma1=0.5,   # Damping parameter 1
    gamma2=0.5,   # Damping parameter 2
    Omega1=1.0,   # Natural frequency 1
    Omega2=1.2    # Natural frequency 2
)

print(f"Data shape: {data.shape}")
# Output: (2, 500, 10) = (variables, time_points, trials)

# Visualize the data structure
print(f"Variable 1 shape: {data[0].shape}")  # (500, 10)
print(f"Variable 2 shape: {data[1].shape}")  # (500, 10)

# Each trial is a realization of the same underlying system
print(f"Trial 0 of variable 1: {data[0, :5, 0]}")  # First 5 time points
print(f"Trial 1 of variable 1: {data[0, :5, 1]}")  # Same time points, different trial

Key Points:

  • Shape: (n_variables, n_timepoints, n_trials)

  • Variables: DCS analyzes bivariate relationships (n_variables = 2)

  • Trials: Multiple realizations improve statistical robustness

  • Time: Each time point represents one measurement

Step-by-Step DCS Analysis

from trancit import DCSCalculator
import matplotlib.pyplot as plt

# Step 1: Create calculator with appropriate model order
calculator = DCSCalculator(
    model_order=4,          # Number of time lags to consider
    time_mode="inhomo",     # Time-inhomogeneous analysis (recommended)
    use_diagonal_covariance=False  # Use full covariance (more accurate)
)

# Step 2: Perform the analysis
result = calculator.analyze(data)

# Step 3: Examine the results
print(f"Analysis completed!")
print(f"Causal strength shape: {result.causal_strength.shape}")
print(f"Transfer entropy shape: {result.transfer_entropy.shape}")
print(f"Granger causality shape: {result.granger_causality.shape}")

# Step 4: Interpret the results
# Column 0: Y → X (variable 2 influences variable 1)
# Column 1: X → Y (variable 1 influences variable 2)

mean_dcs_x_to_y = result.causal_strength[:, 1].mean()
mean_dcs_y_to_x = result.causal_strength[:, 0].mean()

print(f"Mean causal strength X→Y: {mean_dcs_x_to_y:.4f}")
print(f"Mean causal strength Y→X: {mean_dcs_y_to_x:.4f}")

if mean_dcs_x_to_y > mean_dcs_y_to_x:
    print("Stronger causality: X → Y")
else:
    print("Stronger causality: Y → X")

Visualizing Results

# Create comprehensive visualization
fig, axes = plt.subplots(2, 2, figsize=(15, 10))

# Plot 1: Original signals (first trial)
axes[0, 0].plot(data[0, :100, 0], label='Variable X', alpha=0.8)
axes[0, 0].plot(data[1, :100, 0], label='Variable Y', alpha=0.8)
axes[0, 0].set_xlabel('Time')
axes[0, 0].set_ylabel('Amplitude')
axes[0, 0].set_title('Original Time Series (First 100 points)')
axes[0, 0].legend()
axes[0, 0].grid(True, alpha=0.3)

# Plot 2: Dynamic Causal Strength over time
axes[0, 1].plot(result.causal_strength[:, 1], label='X → Y', linewidth=2)
axes[0, 1].plot(result.causal_strength[:, 0], label='Y → X', linewidth=2)
axes[0, 1].set_xlabel('Time')
axes[0, 1].set_ylabel('Causal Strength')
axes[0, 1].set_title('Dynamic Causal Strength')
axes[0, 1].legend()
axes[0, 1].grid(True, alpha=0.3)

# Plot 3: Transfer Entropy
axes[1, 0].plot(result.transfer_entropy[:, 1], label='X → Y', linewidth=2)
axes[1, 0].plot(result.transfer_entropy[:, 0], label='Y → X', linewidth=2)
axes[1, 0].set_xlabel('Time')
axes[1, 0].set_ylabel('Transfer Entropy')
axes[1, 0].set_title('Transfer Entropy')
axes[1, 0].legend()
axes[1, 0].grid(True, alpha=0.3)

# Plot 4: Comparison of measures
time_points = range(len(result.causal_strength))
width = 0.35

mean_measures = {
    'DCS': [result.causal_strength[:, 0].mean(), result.causal_strength[:, 1].mean()],
    'TE': [result.transfer_entropy[:, 0].mean(), result.transfer_entropy[:, 1].mean()],
    'GC': [result.granger_causality[:, 0].mean(), result.granger_causality[:, 1].mean()]
}

x = np.arange(2)  # Y→X, X→Y
axes[1, 1].bar(x - width, [mean_measures['DCS'][0], mean_measures['DCS'][1]],
               width, label='DCS', alpha=0.8)
axes[1, 1].bar(x, [mean_measures['TE'][0], mean_measures['TE'][1]],
               width, label='TE', alpha=0.8)
axes[1, 1].bar(x + width, [mean_measures['GC'][0], mean_measures['GC'][1]],
               width, label='GC', alpha=0.8)

axes[1, 1].set_xlabel('Direction')
axes[1, 1].set_ylabel('Mean Value')
axes[1, 1].set_title('Comparison of Causality Measures')
axes[1, 1].set_xticks(x)
axes[1, 1].set_xticklabels(['Y → X', 'X → Y'])
axes[1, 1].legend()
axes[1, 1].grid(True, alpha=0.3)

plt.tight_layout()
plt.show()

Tutorial 2: Event-Based Analysis with Pipeline

Learn how to detect and analyze specific events in time series data.

Understanding Event Detection

Event-based analysis focuses on detecting specific time windows of interest and analyzing causality within those windows.

import numpy as np
from trancit import generate_signals, PipelineOrchestrator
from trancit.config import (
    PipelineConfig, PipelineOptions, DetectionParams,
    CausalParams, BicParams, OutputParams
)

# Generate data with clearer event structure
np.random.seed(42)
data, _, _ = generate_signals(
    T=800, Ntrial=15, h=0.1,
    gamma1=0.3, gamma2=0.7,  # Asymmetric damping
    Omega1=0.8, Omega2=1.4   # Different frequencies
)

# Prepare signals for event detection
original_signal = np.mean(data, axis=2)  # Average over trials

# Create detection signal with enhanced events
detection_signal = original_signal.copy()

# Add some artificial "events" for demonstration
event_times = [200, 400, 600]
for t in event_times:
    # Enhance signal at event times
    detection_signal[:, t-10:t+10] *= 2.0
    detection_signal[:, t-5:t+5] += np.random.randn(2, 10) * 0.5

print(f"Original signal shape: {original_signal.shape}")
print(f"Detection signal shape: {detection_signal.shape}")

Configuring the Pipeline

The pipeline configuration controls every aspect of the analysis:

# Create comprehensive pipeline configuration
config = PipelineConfig(
    # Main analysis options
    options=PipelineOptions(
        detection=True,          # Enable event detection
        bic=True,               # Enable BIC model selection
        causal_analysis=True,    # Enable causality analysis
        bootstrap=False,         # Skip bootstrap for speed
        save_flag=False,         # Don't save intermediate files
        debiased_stats=False     # Skip debiased analysis for speed
    ),

    # Event detection parameters
    detection=DetectionParams(
        thres_ratio=1.8,         # Threshold for event detection (lower = more events)
        align_type="peak",       # Align events to their peaks
        l_extract=100,           # Length of extracted event windows
        l_start=50,              # Starting point within extracted windows
        shrink_flag=False,       # Don't apply shrinkage
        remove_artif=True,       # Remove artifact-contaminated trials
        locs=None                # Automatically detect event locations
    ),

    # BIC model selection parameters
    bic=BicParams(
        morder=4,                # Default model order
        momax=8,                 # Maximum model order to test
        mode="OLS",              # Ordinary Least Squares
        tau=1                    # Smoothing parameter
    ),

    # Causality analysis parameters
    causal=CausalParams(
        ref_time=50,             # Reference time for rDCS (should match l_start)
        estim_mode="OLS",        # Estimation method
        diag_flag=False,         # Use full covariance matrix
        old_version=False        # Use new rDCS calculation method
    ),

    # Output parameters
    output=OutputParams(
        file_keyword="tutorial_events"
    )
)

print("Configuration created successfully")

Running the Event-Based Analysis

# Create and run the pipeline orchestrator
orchestrator = PipelineOrchestrator(config)

try:
    print("Starting pipeline analysis...")

    # Run the complete pipeline
    result = orchestrator.run(original_signal, detection_signal)

    print("Pipeline completed successfully!")

    # Examine the pipeline results
    print(f"Event snapshots shape: {result.event_snapshots.shape}")

    if result.results.get('locs') is not None:
        detected_events = result.results['locs']
        print(f"Number of events detected: {len(detected_events)}")
        print(f"Event locations: {detected_events}")

    # Access causality results if available
    if result.results.get("CausalOutput"):
        causal_output = result.results["CausalOutput"]["OLS"]

        if "DCS" in causal_output:
            dcs_results = causal_output["DCS"]
            print(f"DCS results shape: {dcs_results.shape}")
            print(f"Mean DCS X→Y: {dcs_results[:, 1].mean():.4f}")
            print(f"Mean DCS Y→X: {dcs_results[:, 0].mean():.4f}")

            # Analyze individual events
            print("\nPer-event analysis:")
            for i, (dcs_xy, dcs_yx) in enumerate(dcs_results):
                print(f"Event {i+1}: X→Y={dcs_xy:.4f}, Y→X={dcs_yx:.4f}")

        if "rDCS" in causal_output:
            rdcs_results = causal_output["rDCS"]
            print(f"Relative DCS shape: {rdcs_results.shape}")
            print(f"Mean rDCS X→Y: {rdcs_results[:, 1].mean():.4f}")
            print(f"Mean rDCS Y→X: {rdcs_results[:, 0].mean():.4f}")

    else:
        print("No causal output generated")
        print("This might happen if no events were detected or analysis failed")

except Exception as e:
    print(f"Pipeline analysis failed: {e}")
    print("Common issues:")
    print("1. No events detected - try lowering thres_ratio")
    print("2. Insufficient data - try shorter l_extract or more data")
    print("3. Numerical issues - try different model parameters")

Visualizing Event-Based Results

# Create visualization of event-based analysis
if 'result' in locals() and result.results.get("CausalOutput"):
    fig, axes = plt.subplots(3, 1, figsize=(15, 12))

    # Plot 1: Original signals with detected events
    time_axis = np.arange(original_signal.shape[1])
    axes[0].plot(time_axis, original_signal[0], label='Variable X', alpha=0.7)
    axes[0].plot(time_axis, original_signal[1], label='Variable Y', alpha=0.7)

    # Mark detected events
    if result.results.get('locs') is not None:
        for loc in result.results['locs']:
            axes[0].axvline(x=loc, color='red', linestyle='--', alpha=0.7)
            axes[0].text(loc, axes[0].get_ylim()[1]*0.9, 'Event',
                        rotation=90, fontsize=8)

    axes[0].set_xlabel('Time')
    axes[0].set_ylabel('Amplitude')
    axes[0].set_title('Original Signals with Detected Events')
    axes[0].legend()
    axes[0].grid(True, alpha=0.3)

    # Plot 2: Event-wise causality measures
    if "DCS" in result.results["CausalOutput"]["OLS"]:
        dcs_data = result.results["CausalOutput"]["OLS"]["DCS"]
        event_nums = range(1, len(dcs_data) + 1)

        width = 0.35
        x = np.arange(len(event_nums))

        axes[1].bar(x - width/2, dcs_data[:, 0], width,
                   label='Y → X', alpha=0.8, color='blue')
        axes[1].bar(x + width/2, dcs_data[:, 1], width,
                   label='X → Y', alpha=0.8, color='red')

        axes[1].set_xlabel('Event Number')
        axes[1].set_ylabel('Dynamic Causal Strength')
        axes[1].set_title('Per-Event Causality Analysis')
        axes[1].set_xticks(x)
        axes[1].set_xticklabels(event_nums)
        axes[1].legend()
        axes[1].grid(True, alpha=0.3)

    # Plot 3: Comparison of measures across events
    if ("DCS" in result.results["CausalOutput"]["OLS"] and
        "TE" in result.results["CausalOutput"]["OLS"]):

        dcs_data = result.results["CausalOutput"]["OLS"]["DCS"]
        te_data = result.results["CausalOutput"]["OLS"]["TE"]

        # Plot X→Y direction
        axes[2].plot(dcs_data[:, 1], 'o-', label='DCS (X→Y)', linewidth=2)
        axes[2].plot(te_data[:, 1], 's-', label='TE (X→Y)', linewidth=2)

        axes[2].set_xlabel('Event Number')
        axes[2].set_ylabel('Causality Measure')
        axes[2].set_title('Causality Measures Across Events (X→Y Direction)')
        axes[2].legend()
        axes[2].grid(True, alpha=0.3)

    plt.tight_layout()
    plt.show()

Tutorial 3: Advanced Configuration and Optimization

Learn how to optimize DCS analysis for different types of data and research questions.

Model Order Selection

Choosing the right model order is crucial for accurate causality analysis:

from trancit import DCSCalculator
from trancit.models import BICSelector
import matplotlib.pyplot as plt

# Generate test data
np.random.seed(123)
data, _, _ = generate_signals(T=600, Ntrial=20, h=0.1,
                             gamma1=0.4, gamma2=0.6,
                             Omega1=1.1, Omega2=0.9)

# Method 1: Manual comparison of different model orders
model_orders = [1, 2, 3, 4, 5, 6, 7, 8]
causality_results = {}

print("Testing different model orders...")
for order in model_orders:
    try:
        calculator = DCSCalculator(model_order=order, time_mode="inhomo")
        result = calculator.analyze(data)

        causality_results[order] = {
            'dcs_mean': result.causal_strength.mean(axis=0),
            'te_mean': result.transfer_entropy.mean(axis=0),
            'gc_mean': result.granger_causality.mean(axis=0)
        }
        print(f"Order {order}: DCS X→Y = {result.causal_strength[:, 1].mean():.4f}")

    except Exception as e:
        print(f"Order {order} failed: {e}")
        causality_results[order] = None

# Method 2: Automatic BIC-based selection
print("\nUsing BIC for automatic model selection...")
try:
    bic_selector = BICSelector(max_order=10, mode="OLS")

    # Prepare data for BIC analysis (requires specific format)
    bic_data = data.copy()  # Shape: (2, T, N)

    # BIC analysis parameters
    bic_params = {
        "Params": {
            "BIC": {
                "momax": 8,
                "mode": "OLS"
            }
        },
        "EstimMode": "OLS"
    }

    # Run BIC analysis
    bic_results = bic_selector.compute_multi_trial_BIC(bic_data, bic_params)

    if bic_results and 'morder' in bic_results:
        optimal_order = bic_results['morder']
        print(f"BIC-selected optimal model order: {optimal_order}")

        # Use optimal order for final analysis
        calculator = DCSCalculator(model_order=optimal_order, time_mode="inhomo")
        final_result = calculator.analyze(data)

        print(f"Final analysis with order {optimal_order}:")
        print(f"DCS X→Y: {final_result.causal_strength[:, 1].mean():.4f}")
        print(f"DCS Y→X: {final_result.causal_strength[:, 0].mean():.4f}")

except Exception as e:
    print(f"BIC selection failed: {e}")
    print("Using default model order 4")
    optimal_order = 4

Handling Different Data Types

from trancit.utils.preprocess import normalize_data, check_data_quality

# Simulate different types of real-world data issues
np.random.seed(456)

# 1. Noisy data
print("1. Handling noisy data:")
noisy_data, _, _ = generate_signals(T=400, Ntrial=15, h=0.1,
                                   gamma1=0.5, gamma2=0.5,
                                   Omega1=1.0, Omega2=1.2)
# Add significant noise
noisy_data += np.random.randn(*noisy_data.shape) * 0.5

# Check data quality
quality_issues = check_data_quality(noisy_data)
print(f"Data quality issues: {quality_issues}")

# Normalize to improve analysis
clean_data = normalize_data(noisy_data, method="zscore", axis=1)

# Compare results
calc = DCSCalculator(model_order=4)

try:
    result_noisy = calc.analyze(noisy_data)
    result_clean = calc.analyze(clean_data)

    print(f"Noisy data DCS X→Y: {result_noisy.causal_strength[:, 1].mean():.4f}")
    print(f"Clean data DCS X→Y: {result_clean.causal_strength[:, 1].mean():.4f}")
except Exception as e:
    print(f"Analysis failed: {e}")

# 2. Short time series
print("\n2. Handling short time series:")
short_data, _, _ = generate_signals(T=100, Ntrial=25, h=0.1,  # More trials, less time
                                   gamma1=0.5, gamma2=0.5,
                                   Omega1=1.0, Omega2=1.2)

# Use smaller model order for short series
calc_short = DCSCalculator(model_order=2, time_mode="homo")  # Homogeneous for short data

try:
    result_short = calc_short.analyze(short_data)
    print(f"Short series DCS X→Y: {result_short.causal_strength[:, 1].mean():.4f}")
except Exception as e:
    print(f"Short series analysis failed: {e}")

# 3. Highly correlated data
print("\n3. Handling highly correlated data:")
T, N = 300, 20

# Generate highly correlated signals
base_signal = np.random.randn(T, N)
corr_data = np.zeros((2, T, N))
corr_data[0] = base_signal
corr_data[1] = 0.95 * base_signal + 0.05 * np.random.randn(T, N)  # Very high correlation

# Add regularization for numerical stability
try:
    calc_reg = DCSCalculator(model_order=3, use_diagonal_covariance=True)
    result_corr = calc_reg.analyze(corr_data)
    print(f"Highly correlated data DCS X→Y: {result_corr.causal_strength[:, 1].mean():.4f}")
except Exception as e:
    print(f"Correlated data analysis failed: {e}")

Performance Optimization

import time
from trancit import PipelineOrchestrator
from trancit.config import PipelineConfig, PipelineOptions, DetectionParams, CausalParams

# Generate larger dataset for performance testing
large_data, _, _ = generate_signals(T=2000, Ntrial=30, h=0.1,
                                   gamma1=0.5, gamma2=0.5,
                                   Omega1=1.0, Omega2=1.2)

original_signal = np.mean(large_data, axis=2)
detection_signal = original_signal * 1.2

print("Performance optimization comparison:")

# Configuration 1: Full analysis (slower but comprehensive)
full_config = PipelineConfig(
    options=PipelineOptions(
        detection=True,
        bic=True,              # BIC is computationally expensive
        causal_analysis=True,
        bootstrap=True,        # Bootstrap is time-consuming
        debiased_stats=True    # Additional computational overhead
    ),
    detection=DetectionParams(thres_ratio=2.0, l_extract=200, l_start=100),
    causal=CausalParams(ref_time=100, estim_mode="OLS"),
    # ... other params
)

# Configuration 2: Fast analysis (faster but less comprehensive)
fast_config = PipelineConfig(
    options=PipelineOptions(
        detection=True,
        bic=False,             # Skip BIC for speed
        causal_analysis=True,
        bootstrap=False,       # Skip bootstrap for speed
        debiased_stats=False   # Skip debiased analysis for speed
    ),
    detection=DetectionParams(thres_ratio=2.0, l_extract=100, l_start=50),  # Shorter windows
    causal=CausalParams(ref_time=50, estim_mode="OLS"),
    # ... other params
)

# Time both approaches
configurations = [
    ("Fast Configuration", fast_config),
    # ("Full Configuration", full_config)  # Uncomment for comparison
]

for name, config in configurations:
    try:
        start_time = time.time()
        orchestrator = PipelineOrchestrator(config)
        result = orchestrator.run(original_signal, detection_signal)
        end_time = time.time()

        print(f"{name}: {end_time - start_time:.2f} seconds")

        if result.results.get("CausalOutput"):
            causal_data = result.results["CausalOutput"]["OLS"]
            if "DCS" in causal_data:
                n_events = len(causal_data["DCS"])
                print(f"  - Events detected: {n_events}")
                print(f"  - Mean DCS X→Y: {causal_data['DCS'][:, 1].mean():.4f}")

    except Exception as e:
        print(f"{name} failed: {e}")

Statistical Validation

from scipy import stats
import numpy as np

# Generate data with known causal structure for validation
np.random.seed(789)

# Create signals where X clearly influences Y
T, N = 500, 25

signal_x = np.random.randn(T, N)
signal_y = np.zeros((T, N))

# Y depends on past values of X (true causality X→Y)
for t in range(2, T):
    signal_y[t] = (0.7 * signal_y[t-1] -
                  0.2 * signal_y[t-2] +
                  0.4 * signal_x[t-1] +     # Clear X→Y influence
                  0.1 * signal_x[t-2] +
                  np.random.randn(N) * 0.3)

validation_data = np.array([signal_x.T, signal_y.T])

# Multiple analyses for statistical validation
n_bootstrap = 20
dcs_xy_values = []
dcs_yx_values = []

print("Statistical validation with bootstrap sampling:")

for i in range(n_bootstrap):
    # Random resampling of trials
    trial_indices = np.random.choice(N, size=N, replace=True)
    bootstrap_data = validation_data[:, :, trial_indices]

    calc = DCSCalculator(model_order=3, time_mode="inhomo")
    result = calc.analyze(bootstrap_data)

    dcs_xy_values.append(result.causal_strength[:, 1].mean())
    dcs_yx_values.append(result.causal_strength[:, 0].mean())

# Statistical analysis
dcs_xy_mean = np.mean(dcs_xy_values)
dcs_yx_mean = np.mean(dcs_yx_values)
dcs_xy_std = np.std(dcs_xy_values)
dcs_yx_std = np.std(dcs_yx_values)

print(f"DCS X→Y: {dcs_xy_mean:.4f} ± {dcs_xy_std:.4f}")
print(f"DCS Y→X: {dcs_yx_mean:.4f} ± {dcs_yx_std:.4f}")

# Statistical test for significant difference
t_stat, p_value = stats.ttest_rel(dcs_xy_values, dcs_yx_values)

print(f"Statistical test (paired t-test):")
print(f"t-statistic: {t_stat:.4f}, p-value: {p_value:.4f}")

if p_value < 0.05:
    if dcs_xy_mean > dcs_yx_mean:
        print("Significant causality detected: X → Y")
    else:
        print("Significant causality detected: Y → X")
else:
    print("No significant causal asymmetry detected")

Tutorial 4: Real-World Applications

Apply DCS to realistic neuroscience and time series analysis scenarios.

Neural Data Analysis

# Simulate Local Field Potential (LFP) data
def simulate_lfp_data(duration=10.0, sampling_rate=1000, n_trials=30):
    """Simulate realistic LFP data with event-related responses."""

    n_samples = int(duration * sampling_rate)
    n_channels = 2

    # Base oscillatory activity
    t = np.linspace(0, duration, n_samples)

    data = np.zeros((n_channels, n_samples, n_trials))

    for trial in range(n_trials):
        # Base activity with multiple frequency components
        alpha_freq = 10 + np.random.randn() * 1  # 8-12 Hz alpha
        beta_freq = 20 + np.random.randn() * 3   # 15-25 Hz beta
        gamma_freq = 40 + np.random.randn() * 5  # 35-45 Hz gamma

        # Channel 1: Mix of frequencies
        data[0, :, trial] = (
            0.5 * np.sin(2 * np.pi * alpha_freq * t) +
            0.3 * np.sin(2 * np.pi * beta_freq * t) +
            0.2 * np.sin(2 * np.pi * gamma_freq * t) +
            np.random.randn(n_samples) * 0.1
        )

        # Channel 2: Influenced by Channel 1 with delay
        delay_samples = 5  # 5ms delay at 1kHz sampling
        data[1, delay_samples:, trial] = (
            0.6 * data[1, :-delay_samples, trial] +  # AR component
            0.4 * data[0, :-delay_samples, trial] +  # Channel 1 influence
            0.2 * np.sin(2 * np.pi * beta_freq * t[:-delay_samples]) +
            np.random.randn(n_samples - delay_samples) * 0.15
        )

        # Add event-related responses at random times
        n_events = np.random.poisson(3)  # Average 3 events per trial
        event_times = np.random.randint(1000, n_samples-1000, n_events)

        for event_time in event_times:
            # Event response in both channels
            event_duration = 200  # 200ms event
            event_window = slice(event_time, event_time + event_duration)

            # Enhanced coupling during events
            data[0, event_window, trial] *= 1.5
            data[1, event_window, trial] += 0.3 * data[0, event_window, trial]

    return data

# Generate and analyze neural data
print("Analyzing simulated neural data...")
neural_data = simulate_lfp_data(duration=20.0, sampling_rate=500, n_trials=25)

print(f"Neural data shape: {neural_data.shape}")
print(f"Sampling rate: 500 Hz, Duration: 20s, Channels: 2, Trials: 25")

# Preprocess neural data
from trancit.utils.preprocess import normalize_data
neural_data_norm = normalize_data(neural_data, method="zscore", axis=1)

# DCS analysis with parameters suitable for neural data
neural_calculator = DCSCalculator(
    model_order=6,             # Higher order for complex neural dynamics
    time_mode="inhomo",        # Non-stationary neural activity
    use_diagonal_covariance=False
)

try:
    neural_result = neural_calculator.analyze(neural_data_norm)

    print("Neural DCS Analysis Results:")
    print(f"Mean causality Ch1→Ch2: {neural_result.causal_strength[:, 1].mean():.4f}")
    print(f"Mean causality Ch2→Ch1: {neural_result.causal_strength[:, 0].mean():.4f}")
    print(f"Mean transfer entropy Ch1→Ch2: {neural_result.transfer_entropy[:, 1].mean():.4f}")

    # Identify periods of high causality
    high_causality_threshold = np.percentile(neural_result.causal_strength[:, 1], 90)
    high_causality_times = np.where(neural_result.causal_strength[:, 1] > high_causality_threshold)[0]

    print(f"High causality periods (top 10%): {len(high_causality_times)} time points")
    print(f"High causality times (first 10): {high_causality_times[:10]}")

except Exception as e:
    print(f"Neural analysis failed: {e}")

Economic Time Series Analysis

# Simulate economic time series data
def simulate_economic_data(n_days=1000, n_series=2):
    """Simulate economic time series (e.g., stock prices, economic indicators)."""

    # Generate correlated economic indicators
    np.random.seed(999)

    # Base economic trends
    trend1 = np.cumsum(np.random.randn(n_days) * 0.01)  # Random walk trend
    trend2 = np.cumsum(np.random.randn(n_days) * 0.01)

    # Economic cycles (business cycles, seasonal effects)
    t = np.arange(n_days)
    cycle1 = 0.1 * np.sin(2 * np.pi * t / 252) + 0.05 * np.sin(2 * np.pi * t / 365)  # Annual cycle
    cycle2 = 0.08 * np.sin(2 * np.pi * t / 180) + 0.06 * np.sin(2 * np.pi * t / 30)   # Quarterly cycle

    # Market volatility (GARCH-like)
    volatility = np.zeros(n_days)
    volatility[0] = 0.02

    for i in range(1, n_days):
        volatility[i] = 0.01 + 0.1 * (np.random.randn(1)[0]**2) + 0.8 * volatility[i-1]

    # Generate multiple trials (different market conditions)
    n_trials = 15  # Different economic scenarios
    data = np.zeros((n_series, n_days, n_trials))

    for trial in range(n_trials):
        # Economic shocks and events
        shock_times = np.random.choice(n_days, size=np.random.poisson(5), replace=False)
        shock_magnitudes = np.random.randn(len(shock_times)) * 0.05

        # Series 1: Leading economic indicator
        noise1 = np.random.randn(n_days) * volatility * (0.8 + 0.4 * np.random.randn())
        data[0, :, trial] = trend1 + cycle1 + noise1

        # Add economic shocks
        for shock_time, shock_mag in zip(shock_times, shock_magnitudes):
            data[0, shock_time:shock_time+10, trial] += shock_mag

        # Series 2: Lagging indicator (influenced by Series 1)
        noise2 = np.random.randn(n_days) * volatility * (0.9 + 0.2 * np.random.randn())

        for i in range(5, n_days):  # 5-day lag
            data[1, i, trial] = (
                trend2[i] + cycle2[i] + noise2[i] +
                0.3 * data[0, i-5, trial] +    # 5-day lagged influence
                0.2 * data[0, i-3, trial] +    # 3-day lagged influence
                0.1 * data[0, i-1, trial]      # 1-day lagged influence
            )

    return data

# Analyze economic data
print("\nAnalyzing simulated economic time series...")

economic_data = simulate_economic_data(n_days=500, n_series=2)
print(f"Economic data shape: {economic_data.shape}")

# Difference the data to remove trends (common in economic analysis)
diff_data = np.diff(economic_data, axis=1)  # First difference
print(f"Differenced data shape: {diff_data.shape}")

# Economic DCS analysis
econ_calculator = DCSCalculator(
    model_order=8,             # Higher order for economic lags
    time_mode="inhomo",        # Non-stationary economic conditions
    use_diagonal_covariance=False
)

try:
    econ_result = econ_calculator.analyze(diff_data)

    print("Economic DCS Analysis Results:")
    print(f"Leading→Lagging causality: {econ_result.causal_strength[:, 1].mean():.4f}")
    print(f"Lagging→Leading causality: {econ_result.causal_strength[:, 0].mean():.4f}")

    # Expected: Leading should have stronger influence on Lagging
    if econ_result.causal_strength[:, 1].mean() > econ_result.causal_strength[:, 0].mean():
        print("✓ Expected pattern detected: Leading indicator influences lagging indicator")
    else:
        print("⚠ Unexpected pattern: Check data generation or model parameters")

    # Time-varying causality analysis
    causality_strength = econ_result.causal_strength[:, 1]  # Leading→Lagging

    # Identify periods of strong/weak causality
    strong_periods = causality_strength > np.percentile(causality_strength, 75)
    weak_periods = causality_strength < np.percentile(causality_strength, 25)

    print(f"Strong causality periods: {np.sum(strong_periods)} time points")
    print(f"Weak causality periods: {np.sum(weak_periods)} time points")

except Exception as e:
    print(f"Economic analysis failed: {e}")

Multi-Scale Analysis

# Multi-scale temporal analysis
def multiscale_analysis(data, scales=[1, 2, 4, 8]):
    """Perform DCS analysis at multiple temporal scales."""

    results = {}

    for scale in scales:
        print(f"Analyzing at scale {scale}...")

        # Downsample data
        if scale == 1:
            scaled_data = data
        else:
            # Average over non-overlapping windows
            n_vars, n_time, n_trials = data.shape
            n_time_scaled = n_time // scale

            scaled_data = np.zeros((n_vars, n_time_scaled, n_trials))
            for i in range(n_time_scaled):
                start_idx = i * scale
                end_idx = start_idx + scale
                scaled_data[:, i, :] = data[:, start_idx:end_idx, :].mean(axis=1)

        # DCS analysis at this scale
        try:
            # Adjust model order for scale
            model_order = max(2, 6 // scale)  # Fewer lags for coarser scales

            calc = DCSCalculator(
                model_order=model_order,
                time_mode="inhomo"
            )

            result = calc.analyze(scaled_data)

            results[scale] = {
                'causal_strength': result.causal_strength,
                'transfer_entropy': result.transfer_entropy,
                'mean_dcs_xy': result.causal_strength[:, 1].mean(),
                'mean_dcs_yx': result.causal_strength[:, 0].mean(),
                'model_order': model_order
            }

            print(f"  Scale {scale}: DCS X→Y = {results[scale]['mean_dcs_xy']:.4f}")

        except Exception as e:
            print(f"  Scale {scale} failed: {e}")
            results[scale] = None

    return results

# Perform multi-scale analysis on neural data
print("\nMulti-scale causality analysis:")

if 'neural_data_norm' in locals():
    multiscale_results = multiscale_analysis(neural_data_norm, scales=[1, 2, 4, 8])

    # Visualize scale-dependent causality
    scales = []
    causality_values = []

    for scale, result in multiscale_results.items():
        if result is not None:
            scales.append(scale)
            causality_values.append(result['mean_dcs_xy'])

    if len(scales) > 0:
        plt.figure(figsize=(10, 6))
        plt.semilogx(scales, causality_values, 'o-', linewidth=2, markersize=8)
        plt.xlabel('Temporal Scale')
        plt.ylabel('Mean Causal Strength (X→Y)')
        plt.title('Multi-Scale Causality Analysis')
        plt.grid(True, alpha=0.3)
        plt.show()

        print(f"Scale dependency analysis:")
        print(f"Fine scale (1): {causality_values[0]:.4f}")
        if len(causality_values) > 1:
            print(f"Coarse scale ({scales[-1]}): {causality_values[-1]:.4f}")

            if causality_values[0] > causality_values[-1]:
                print("→ Causality stronger at fine temporal scales")
            else:
                print("→ Causality stronger at coarse temporal scales")

Tutorial 5: Best Practices and Troubleshooting

Learn best practices for robust DCS analysis and how to troubleshoot common issues.

Data Quality Assessment

from trancit.utils.preprocess import check_data_quality, normalize_data
import warnings

def comprehensive_data_check(data, description=""):
    """Perform comprehensive data quality assessment."""

    print(f"\n=== Data Quality Assessment: {description} ===")

    # Basic properties
    print(f"Shape: {data.shape}")
    print(f"Data type: {data.dtype}")
    print(f"Memory usage: {data.nbytes / 1024**2:.2f} MB")

    # Statistical properties
    print(f"Mean: {data.mean():.4f}")
    print(f"Std: {data.std():.4f}")
    print(f"Min: {data.min():.4f}")
    print(f"Max: {data.max():.4f}")

    # Check for problematic values
    n_nan = np.isnan(data).sum()
    n_inf = np.isinf(data).sum()
    n_zero = (data == 0).sum()

    print(f"NaN values: {n_nan}")
    print(f"Inf values: {n_inf}")
    print(f"Zero values: {n_zero}")

    if n_nan > 0:
        warnings.warn(f"Found {n_nan} NaN values - may cause analysis failure")
    if n_inf > 0:
        warnings.warn(f"Found {n_inf} infinite values - may cause numerical issues")

    # Check variance across trials
    if data.ndim == 3:
        trial_vars = np.var(data, axis=(0, 1))  # Variance of each trial
        low_var_trials = np.sum(trial_vars < 0.01 * np.median(trial_vars))

        print(f"Low variance trials: {low_var_trials}/{data.shape[2]}")
        if low_var_trials > data.shape[2] * 0.2:
            warnings.warn("Many trials have very low variance - check data quality")

    # Check stationarity (simplified)
    if data.ndim >= 2:
        first_half_mean = data[:, :data.shape[1]//2].mean()
        second_half_mean = data[:, data.shape[1]//2:].mean()
        mean_diff = abs(first_half_mean - second_half_mean)

        print(f"First/second half mean difference: {mean_diff:.4f}")
        if mean_diff > data.std():
            print("⚠ Large mean differences between halves - data may be non-stationary")
        else:
            print("✓ Mean appears relatively stable")

    # Recommended actions
    recommendations = []

    if n_nan > 0 or n_inf > 0:
        recommendations.append("Remove or interpolate NaN/Inf values")

    if data.std() < 1e-6:
        recommendations.append("Data has very low variance - check scaling")
    elif data.std() > 1e6:
        recommendations.append("Data has very high variance - consider normalization")

    if data.shape[1] < 50:
        recommendations.append("Short time series - consider lower model order")

    if data.ndim == 3 and data.shape[2] < 5:
        recommendations.append("Few trials - results may be less robust")

    if recommendations:
        print("Recommendations:")
        for rec in recommendations:
            print(f"  • {rec}")
    else:
        print("✓ Data quality looks good")

    return {
        'n_nan': n_nan,
        'n_inf': n_inf,
        'mean_diff': mean_diff if data.ndim >= 2 else None,
        'recommendations': recommendations
    }

# Test with various data quality scenarios
print("Testing data quality assessment...")

# 1. Good quality data
good_data, _, _ = generate_signals(T=400, Ntrial=20, h=0.1,
                                  gamma1=0.5, gamma2=0.5,
                                  Omega1=1.0, Omega2=1.2)
comprehensive_data_check(good_data, "Good Quality Data")

# 2. Problematic data
bad_data = good_data.copy()
bad_data[0, 100:110, :] = np.nan  # Introduce NaN values
bad_data[1, 200, 0] = np.inf      # Introduce Inf value
bad_data *= 1e8                   # Make values very large

comprehensive_data_check(bad_data, "Problematic Data")

Robust Analysis Pipeline

def robust_dcs_analysis(data, description="", max_model_order=8):
    """Perform robust DCS analysis with automatic parameter adjustment."""

    print(f"\n=== Robust DCS Analysis: {description} ===")

    # Step 1: Data quality check
    quality_results = comprehensive_data_check(data, description)

    # Step 2: Data preprocessing based on quality assessment
    processed_data = data.copy()

    if quality_results['n_nan'] > 0 or quality_results['n_inf'] > 0:
        print("Cleaning data...")
        # Replace NaN and Inf with interpolated values
        from scipy.interpolate import interp1d

        for var in range(processed_data.shape[0]):
            for trial in range(processed_data.shape[2]):
                signal = processed_data[var, :, trial]

                # Find valid (non-NaN, non-Inf) indices
                valid_mask = np.isfinite(signal)

                if np.sum(valid_mask) > 10:  # Need some valid points
                    valid_indices = np.where(valid_mask)[0]
                    invalid_indices = np.where(~valid_mask)[0]

                    if len(invalid_indices) > 0:
                        # Linear interpolation
                        f = interp1d(valid_indices, signal[valid_indices],
                                    bounds_error=False, fill_value='extrapolate')
                        processed_data[var, invalid_indices, trial] = f(invalid_indices)

    # Normalize data
    if processed_data.std() > 1000 or processed_data.std() < 0.001:
        print("Normalizing data...")
        processed_data = normalize_data(processed_data, method="zscore", axis=1)

    # Step 3: Adaptive model order selection
    n_time = processed_data.shape[1]
    max_reasonable_order = min(max_model_order, n_time // 10)  # Rule of thumb

    print(f"Testing model orders from 1 to {max_reasonable_order}...")

    best_order = None
    best_result = None
    order_scores = {}

    for order in range(1, max_reasonable_order + 1):
        try:
            calc = DCSCalculator(model_order=order, time_mode="inhomo")
            result = calc.analyze(processed_data)

            # Score based on finite values and reasonable magnitudes
            dcs_values = result.causal_strength

            if np.all(np.isfinite(dcs_values)) and np.all(dcs_values >= 0):
                # Simple scoring: prefer moderate values, penalize extreme values
                mean_dcs = dcs_values.mean()
                std_dcs = dcs_values.std()

                score = mean_dcs - 2 * (std_dcs > mean_dcs * 2)  # Penalize high variability
                order_scores[order] = score

                if best_order is None or score > order_scores[best_order]:
                    best_order = order
                    best_result = result

                print(f"  Order {order}: Score = {score:.4f}")
            else:
                print(f"  Order {order}: Failed (non-finite or negative values)")

        except Exception as e:
            print(f"  Order {order}: Failed ({str(e)[:50]}...)")

    # Step 4: Final analysis with best parameters
    if best_result is not None:
        print(f"\nBest model order: {best_order}")
        print(f"Final results:")
        print(f"  DCS X→Y: {best_result.causal_strength[:, 1].mean():.4f}")
        print(f"  DCS Y→X: {best_result.causal_strength[:, 0].mean():.4f}")
        print(f"  TE X→Y: {best_result.transfer_entropy[:, 1].mean():.4f}")
        print(f"  TE Y→X: {best_result.transfer_entropy[:, 0].mean():.4f}")

        # Confidence assessment
        dcs_xy_std = best_result.causal_strength[:, 1].std()
        dcs_yx_std = best_result.causal_strength[:, 0].std()

        print(f"  DCS X→Y variability: {dcs_xy_std:.4f}")
        print(f"  DCS Y→X variability: {dcs_yx_std:.4f}")

        if dcs_xy_std < 0.1 and dcs_yx_std < 0.1:
            print("  ✓ Low variability - results appear stable")
        else:
            print("  ⚠ High variability - results may be less reliable")

        return best_result, best_order, processed_data

    else:
        print("❌ No successful analysis found")
        print("Recommendations:")
        print("  • Check data format (should be 3D: variables × time × trials)")
        print("  • Ensure sufficient data length (>100 time points recommended)")
        print("  • Verify data contains meaningful signal (not just noise)")
        return None, None, processed_data

# Test robust analysis
test_data, _, _ = generate_signals(T=300, Ntrial=15, h=0.1,
                                  gamma1=0.4, gamma2=0.6,
                                  Omega1=0.9, Omega2=1.3)

robust_result, best_order, clean_data = robust_dcs_analysis(test_data, "Test Analysis")

Common Issues and Solutions

# Demonstrate common issues and their solutions

print("\n=== Common Issues and Solutions ===")

# Issue 1: Insufficient data length
print("\n1. Issue: Insufficient data length")
short_data, _, _ = generate_signals(T=50, Ntrial=10, h=0.1,  # Very short
                                   gamma1=0.5, gamma2=0.5,
                                   Omega1=1.0, Omega2=1.2)

print("Attempting analysis with very short data...")
try:
    calc = DCSCalculator(model_order=10, time_mode="inhomo")  # Too high order
    result = calc.analyze(short_data)
    print("Analysis succeeded (unexpected)")
except Exception as e:
    print(f"Analysis failed as expected: {type(e).__name__}")
    print("Solution: Use lower model order or collect more data")

    # Solution
    try:
        calc_fixed = DCSCalculator(model_order=2, time_mode="homo")  # Lower order
        result_fixed = calc_fixed.analyze(short_data)
        print(f"✓ Fixed analysis succeeded: DCS X→Y = {result_fixed.causal_strength[:, 1].mean():.4f}")
    except Exception as e2:
        print(f"Still failed: {e2}")

# Issue 2: Highly correlated/singular data
print("\n2. Issue: Highly correlated data")
T, N = 200, 15

# Create perfectly correlated data
base_signal = np.random.randn(T, N)
singular_data = np.zeros((2, T, N))
singular_data[0] = base_signal
singular_data[1] = base_signal + 1e-10 * np.random.randn(T, N)  # Nearly identical

print("Attempting analysis with highly correlated data...")
try:
    calc = DCSCalculator(model_order=4, time_mode="inhomo")
    result = calc.analyze(singular_data)
    print("Analysis succeeded (unexpected)")
except Exception as e:
    print(f"Analysis failed as expected: {type(e).__name__}")
    print("Solution: Use diagonal covariance approximation or add regularization")

    # Solution
    try:
        calc_fixed = DCSCalculator(model_order=3,
                                  time_mode="inhomo",
                                  use_diagonal_covariance=True)  # Diagonal approximation
        result_fixed = calc_fixed.analyze(singular_data)
        print(f"✓ Fixed analysis succeeded: DCS X→Y = {result_fixed.causal_strength[:, 1].mean():.4f}")
    except Exception as e2:
        print(f"Still failed: {e2}")

# Issue 3: No events detected in pipeline
print("\n3. Issue: No events detected in pipeline")

# Create very smooth signals (no clear events)
smooth_data, _, _ = generate_signals(T=500, Ntrial=10, h=0.1,
                                    gamma1=0.1, gamma2=0.1,  # Low damping = smooth
                                    Omega1=1.0, Omega2=1.0)

smooth_signal = np.mean(smooth_data, axis=2)

# Try with high threshold (likely to fail)
from trancit.config import PipelineConfig, PipelineOptions, DetectionParams, CausalParams, BicParams, OutputParams

high_threshold_config = PipelineConfig(
    options=PipelineOptions(detection=True, causal_analysis=True),
    detection=DetectionParams(thres_ratio=5.0, l_extract=50, l_start=25),  # Very high threshold
    causal=CausalParams(ref_time=25, estim_mode="OLS"),
    bic=BicParams(),
    output=OutputParams()
)

print("Attempting pipeline with high detection threshold...")
try:
    orchestrator = PipelineOrchestrator(high_threshold_config)
    result = orchestrator.run(smooth_signal, smooth_signal * 1.1)

    if result.results.get('locs') is not None:
        n_events = len(result.results['locs'])
        print(f"Found {n_events} events")
    else:
        print("No events detected")

except Exception as e:
    print(f"Pipeline failed: {type(e).__name__}")

print("Solution: Lower detection threshold")

# Solution: Lower threshold
low_threshold_config = PipelineConfig(
    options=PipelineOptions(detection=True, causal_analysis=True),
    detection=DetectionParams(thres_ratio=1.5, l_extract=50, l_start=25),  # Lower threshold
    causal=CausalParams(ref_time=25, estim_mode="OLS"),
    bic=BicParams(),
    output=OutputParams()
)

try:
    orchestrator_fixed = PipelineOrchestrator(low_threshold_config)
    result_fixed = orchestrator_fixed.run(smooth_signal, smooth_signal * 1.5)  # More amplification

    if result_fixed.results.get('locs') is not None:
        n_events = len(result_fixed.results['locs'])
        print(f"✓ Fixed pipeline found {n_events} events")
    else:
        print("Still no events detected - signal may be too smooth")

except Exception as e:
    print(f"Fixed pipeline still failed: {e}")

Performance Monitoring

import time
import psutil
import os

def monitor_analysis_performance(data, description="", verbose=True):
    """Monitor memory and time performance of DCS analysis."""

    if verbose:
        print(f"\n=== Performance Monitoring: {description} ===")

    # Initial system state
    process = psutil.Process(os.getpid())
    initial_memory = process.memory_info().rss / 1024**2  # MB
    start_time = time.time()

    if verbose:
        print(f"Initial memory usage: {initial_memory:.1f} MB")
        print(f"Data size: {data.nbytes / 1024**2:.2f} MB")

    try:
        # Perform analysis
        calc = DCSCalculator(model_order=4, time_mode="inhomo")
        result = calc.analyze(data)

        # Final system state
        end_time = time.time()
        final_memory = process.memory_info().rss / 1024**2

        analysis_time = end_time - start_time
        memory_increase = final_memory - initial_memory

        if verbose:
            print(f"Analysis time: {analysis_time:.2f} seconds")
            print(f"Memory increase: {memory_increase:.1f} MB")
            print(f"Peak memory usage: {final_memory:.1f} MB")

            # Performance metrics
            data_throughput = data.size / analysis_time  # elements per second
            print(f"Data throughput: {data_throughput/1000:.1f}K elements/second")

            # Efficiency assessment
            if analysis_time < 1.0:
                print("✓ Fast analysis")
            elif analysis_time < 10.0:
                print("○ Moderate analysis time")
            else:
                print("⚠ Slow analysis - consider optimization")

            if memory_increase < 100:
                print("✓ Low memory overhead")
            elif memory_increase < 500:
                print("○ Moderate memory usage")
            else:
                print("⚠ High memory usage - consider processing in chunks")

        return {
            'analysis_time': analysis_time,
            'memory_increase': memory_increase,
            'data_throughput': data_throughput,
            'success': True
        }

    except Exception as e:
        end_time = time.time()
        analysis_time = end_time - start_time

        if verbose:
            print(f"Analysis failed after {analysis_time:.2f} seconds: {e}")

        return {
            'analysis_time': analysis_time,
            'memory_increase': 0,
            'data_throughput': 0,
            'success': False,
            'error': str(e)
        }

# Test performance with different data sizes
data_sizes = [
    (100, 10, "Small"),
    (500, 20, "Medium"),
    (1000, 30, "Large")
]

performance_results = {}

for T, N, size_name in data_sizes:
    test_data, _, _ = generate_signals(T=T, Ntrial=N, h=0.1,
                                      gamma1=0.5, gamma2=0.5,
                                      Omega1=1.0, Omega2=1.2)

    perf_result = monitor_analysis_performance(test_data, f"{size_name} Dataset ({T}×{N})")
    performance_results[size_name] = perf_result

# Performance summary
print("\n=== Performance Summary ===")
for size_name, result in performance_results.items():
    if result['success']:
        print(f"{size_name}: {result['analysis_time']:.2f}s, {result['memory_increase']:.1f}MB")
    else:
        print(f"{size_name}: Failed - {result['error']}")