Event Detection Preprocessing Pipeline
Overview
TranCIT provides an integrated preprocessing pipeline for event detection, data alignment, and artifact rejection. This pipeline is specifically designed for analyzing transient, event-related neural dynamics where causal interactions occur during brief, intense bursts of activity (e.g., sharp wave-ripples, beta bursts, or other transient events).
The preprocessing pipeline transforms raw continuous time-series data into aligned event trials, preparing the data for subsequent causality analysis using methods such as Dynamic Causal Strength (DCS) and relative Dynamic Causal Strength (rDCS).
Preprocessing Stages
The event detection preprocessing consists of five sequential stages:
1. Event Detection
Purpose: Identifies transient events in the detection signal using a threshold-based approach.
Algorithm:
Computes a detection threshold:
threshold = mean(signal) + thres_ratio × std(signal)Identifies all time points where the detection signal exceeds this threshold
Applies one of two alignment methods to refine event locations:
Peak alignment: Refines detected locations to local peaks within a specified window, ensuring events are aligned to the maximum amplitude
Pooled alignment: Uses detected locations directly, with optional location shrinking to reduce redundancy when events are detected in close temporal proximity
Configuration Parameters:
thres_ratio(float): Multiplier for standard deviation in threshold calculation (higher values = fewer events detected)align_type(str): Either'peak'or'pooled'alignment methodl_extract(int): Length of event windows to extract (used for peak alignment window)shrink_flag(bool): Whether to apply location shrinking for pooled alignmentlocs(Optional[np.ndarray]): Pre-provided event locations (if detection is disabled)
Output: Array of event location indices in the original signal.
2. Border Removal
Purpose: Filters out events that are too close to signal boundaries to ensure complete event windows can be extracted.
Algorithm:
Removes event locations where
location < l_extractorlocation > signal_length - l_extractEnsures that each event has sufficient data before and after its center point for complete window extraction
Configuration Parameters:
l_extract(int): Minimum required window length (inherited from detection stage)
Output: Filtered array of event locations with border events removed.
3. Snapshot Extraction
Purpose: Extracts fixed-length time windows around each aligned event location, creating a 3D array of event trials.
Algorithm:
For each event location, extracts a window of length
l_extractstarting at offsetl_startfrom the event centerCreates a 3D array of shape
(n_variables × (model_order + 1), n_time_points, n_trials)Includes lagged variables up to
model_orderfor VAR model estimationHandles out-of-bounds windows by filling with NaN values
Configuration Parameters:
l_extract(int): Length of each extracted event windowl_start(int): Offset from event center to start extraction (can be negative)morder(int): Model order for VAR estimation (determines number of lagged variables)
Output: 3D numpy array (n_variables × (model_order + 1), n_time_points, n_trials) containing aligned event snapshots.
4. Artifact Rejection
Purpose: Optionally removes trials contaminated by artifacts or signal corruption.
Algorithm:
Identifies trials where any value in the first two variables falls below a specified threshold
Removes contaminated trials from the event data array
Updates corresponding location indices to maintain consistency
Configuration Parameters:
remove_artif(bool): Whether to enable artifact removalremove_artif_threshold(float): Threshold below which trials are considered artifacts (default: -15000)
Output: Cleaned event data array and updated location indices.
5. Statistics Computation
Purpose: Computes VAR model statistics (coefficients, covariances) from the aligned event data for subsequent causality analysis.
Algorithm:
Estimates VAR model coefficients using Ordinary Least Squares (OLS) or other estimation methods
Computes residual covariances and other statistical measures
Prepares statistics dictionary for causality calculators
Output: Dictionary containing VAR model statistics required for DCS/rDCS computation.
Software Architecture
Pipeline Design Pattern
The preprocessing pipeline is implemented using a modular stage-based architecture that follows the Pipeline Pattern and Strategy Pattern design principles:
Core Components
``PipelineOrchestrator`` (Main Coordinator)
Coordinates all preprocessing stages sequentially
Manages pipeline state (dictionary passed between stages)
Handles error propagation and logging
Implements the
BaseAnalyzerinterface for consistency with other TranCIT components
``PipelineStage`` (Abstract Base Class)
Defines the interface for all preprocessing stages
Provides common functionality (logging, configuration access)
Each stage implements
execute(**kwargs) -> Dict[str, Any]Stages are stateless and receive configuration through constructor
Individual Stage Classes
InputValidationStage: Validates input data and parametersEventDetectionStage: Detects and aligns eventsBorderRemovalStage: Removes border eventsBICSelectionStage: Optional model order selectionSnapshotExtractionStage: Extracts event windowsArtifactRemovalStage: Removes artifact-contaminated trialsStatisticsComputationStage: Computes VAR model statisticsCausalityAnalysisStage: Performs causality analysis (post-preprocessing)Additional stages for bootstrap analysis and output preparation
Architecture Benefits
Modularity: Each preprocessing step is a separate, testable component
Flexibility: Users can customize each stage through configuration parameters
Extensibility: New preprocessing stages can be added by implementing the
PipelineStageinterfaceReproducibility: All preprocessing steps are logged and can be traced through the pipeline state
Maintainability: Clear separation of concerns makes the codebase easier to understand and modify
State Management
The pipeline uses a state dictionary that is passed sequentially between stages:
pipeline_state = {
"original_signal": original_signal,
"detection_signal": detection_signal,
"locs": event_locations, # Added by EventDetectionStage
"event_snapshots": event_data, # Added by SnapshotExtractionStage
"morder": model_order, # Added by BICSelectionStage
"stats": statistics_dict, # Added by StatisticsComputationStage
# ... additional state as needed
}
Each stage:
Reads required data from the state dictionary
Performs its processing
Updates the state dictionary with its outputs
Returns the updated state
This design ensures that stages are loosely coupled and can be easily reordered or modified without affecting other stages.
API Design
Configuration-Driven Architecture
All preprocessing parameters are specified through dataclass-based configuration objects, enabling type safety and clear parameter documentation:
PipelineConfig
Main configuration container that holds all pipeline parameters:
@dataclass
class PipelineConfig:
options: PipelineOptions # Enable/disable pipeline features
detection: DetectionParams # Event detection parameters
bic: BicParams # Model selection parameters
causal: CausalParams # Causality analysis parameters
# ... additional parameter groups
DetectionParams
Event detection-specific parameters:
@dataclass
class DetectionParams:
thres_ratio: float # Threshold multiplier
align_type: str # 'peak' or 'pooled'
l_extract: int # Window length
l_start: int # Window start offset
shrink_flag: bool = False # Enable location shrinking
locs: Optional[np.ndarray] = None # Pre-provided locations
remove_artif: bool = False # Enable artifact removal
remove_artif_threshold: float = -15000 # Artifact threshold
User Interface
High-Level API (Recommended)
The simplest way to use the preprocessing pipeline is through the PipelineOrchestrator:
from trancit import PipelineOrchestrator, PipelineConfig, DetectionParams, PipelineOptions
# Create configuration
config = PipelineConfig(
options=PipelineOptions(detection=True, bic=True),
detection=DetectionParams(
thres_ratio=2.0,
align_type='peak',
l_extract=100,
l_start=-50
),
# ... additional configuration
)
# Initialize orchestrator
orchestrator = PipelineOrchestrator(config)
# Run complete pipeline
result = orchestrator.run(original_signal, detection_signal)
# Access results
event_locations = result.results['locs']
event_snapshots = result.event_snapshots
causality_results = result.results.get('dcs_results')
Low-Level API (Advanced Users)
Advanced users can access individual stages directly for custom workflows:
from trancit.pipeline.stages import EventDetectionStage, SnapshotExtractionStage
# Create individual stages
detection_stage = EventDetectionStage(config)
extraction_stage = SnapshotExtractionStage(config)
# Execute stages manually
state = {"detection_signal": detection_signal}
state = detection_stage.execute(**state)
state = extraction_stage.execute(**state)
# Access intermediate results
event_locations = state['locs']
event_snapshots = state['event_snapshots']
Configuration Flexibility
The pipeline supports multiple usage modes:
Automatic Event Detection: Set
config.options.detection = Trueto automatically detect eventsPre-Provided Locations: Set
config.options.detection = Falseand provideconfig.detection.locswith known event timesCustom Stage Execution: Execute stages individually for fine-grained control
Optional Stages: Enable/disable stages (BIC selection, artifact removal, bootstrap analysis) based on needs
Implementation Details
Event Detection Algorithm
The event detection uses a robust threshold-based approach:
Threshold Calculation:
threshold = np.nanmean(detection_signal) + thres_ratio * np.nanstd(detection_signal)
Initial Detection:
temp_locs = np.where(detection_signal >= threshold)[0]
Peak Alignment (if selected):
For each detected location, finds the local peak within a window of size
l_extractUses
find_peak_locations()utility functionEnsures events are aligned to maximum amplitude
Pooled Alignment (if selected):
Uses detected locations directly
Optional shrinking: reduces redundant detections when events are temporally close
Uses
shrink_locations_resample_uniform()andfind_best_shrinked_locations()utilities
Snapshot Extraction Details
The snapshot extraction creates a 3D array suitable for VAR model estimation:
Shape:
(n_variables × (model_order + 1), n_time_points, n_trials)Lagged Variables: Includes
model_orderlags of each variable for VAR modelingTime Alignment: All events are aligned to the same temporal reference point
Boundary Handling: Out-of-bounds windows are filled with NaN and logged
Error Handling
The pipeline includes comprehensive error handling:
Input Validation: Each stage validates its inputs before processing
Graceful Degradation: Missing optional parameters use sensible defaults
Detailed Logging: All stages log their progress and any issues encountered
Exception Propagation: Errors are caught, logged, and re-raised with context
Note: General performance considerations for the TranCIT package are documented in Software Architecture.
Integration with Causality Analysis
The preprocessing pipeline is tightly integrated with TranCIT’s causality analysis methods:
DCS/rDCS: The extracted event snapshots and computed statistics are directly used by
DCSCalculatorandRelativeDCSCalculatorTransfer Entropy: Event-aligned data enables time-varying TE computation
Granger Causality: VAR model statistics from preprocessing are used for GC computation
The pipeline output (PipelineResult) contains all necessary data structures for immediate causality analysis without additional preprocessing.
References
For detailed API documentation, see the API Reference (specifically the pipeline-system section).
For usage example, see: