Documentation
¶
Index ¶
- type Simulator
- type Stage
- type StageConfig
- type StageMetrics
- func (m *StageMetrics) GetStats() map[string]any
- func (m *StageMetrics) RecordDropped()
- func (m *StageMetrics) RecordDroppedBurst(items int)
- func (m *StageMetrics) RecordGenerated()
- func (m *StageMetrics) RecordGeneratedBurst(items int)
- func (m *StageMetrics) RecordOutput()
- func (m *StageMetrics) RecordProcessing()
- func (m *StageMetrics) Stop()
- type StageStats
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Simulator ¶
type Simulator struct {
// Duration of the simulation
Duration time.Duration
// MaxGeneratedItems is the maximum number of items to generate.
// If set, the simulation will run until the number of generated items is reached instead of the duration.
MaxGeneratedItems int
Stages []*Stage
Mu sync.RWMutex
Ctx context.Context
Cancel context.CancelFunc
Quit chan struct{}
Wg sync.WaitGroup
}
Simulator represents a concurrent pipeline simulator
func NewSimulator ¶
func NewSimulator() *Simulator
NewSimulator creates a new simulator instance with a context and a cancel function
func (*Simulator) PrintStats ¶
func (s *Simulator) PrintStats()
func (*Simulator) WaitForStats ¶
func (s *Simulator) WaitForStats()
type Stage ¶
type Stage struct {
Name string
Input chan any
Output chan any
Sem chan struct{}
Config *StageConfig
Metrics *StageMetrics
IdleSpy *tracker.GoroutineManager
IsFinal bool
MaxGeneratedItems int
Stop func()
// contains filtered or unexported fields
}
Stage represents a processing stage in the pipeline
func NewStage ¶
func NewStage(name string, config *StageConfig) *Stage
NewStage creates a new stage with the given configuration
func (*Stage) GetMetrics ¶
func (s *Stage) GetMetrics() *StageMetrics
type StageConfig ¶
type StageConfig struct {
// Rate at which items are generated (generator only)
InputRate time.Duration
// Custom item generator function
ItemGenerator func() any
// Handles load spikes and burst patterns
// Generates input bursts at intervals
InputBurst func() []any
// Total number of bursts to inject
BurstCountTotal int
// Interval between bursts
BurstInterval time.Duration
// Number of goroutines per stage
RoutineNum int
// Channel buffer size per stage
BufferSize int
// Simulated delay per item
WorkerDelay time.Duration
// Number of times to retry on error
RetryCount int
// Drop input if channel is full
DropOnBackpressure bool
// Whether the stage is a generator
IsGenerator bool
// Core processing function
// Worker function that processes each item
WorkerFunc func(item any) (any, error)
// Context for cancellation and deadlines
Ctx context.Context
}
StageConfig holds the configuration for a pipeline stage
func DefaultConfig ¶
func DefaultConfig() *StageConfig
DefaultConfig returns a new SimulationConfig with sensible defaults
type StageMetrics ¶
type StageMetrics struct {
// Counters
ProcessedItems uint64
DroppedItems uint64
OutputItems uint64
// State
StartTime time.Time
EndTime time.Time
// Generator stats
GeneratedItems uint64
// contains filtered or unexported fields
}
StageMetrics tracks performance metrics for a stage
func NewStageMetrics ¶
func NewStageMetrics() *StageMetrics
NewStageMetrics creates a new metrics collector
func (*StageMetrics) GetStats ¶
func (m *StageMetrics) GetStats() map[string]any
GetStats returns a map of current metrics
func (*StageMetrics) RecordDropped ¶
func (m *StageMetrics) RecordDropped()
RecordDropped records a dropped item
func (*StageMetrics) RecordDroppedBurst ¶
func (m *StageMetrics) RecordDroppedBurst(items int)
RecordDroppedBurst records a dropped burst
func (*StageMetrics) RecordGenerated ¶
func (m *StageMetrics) RecordGenerated()
RecordGenerated records a generated item
func (*StageMetrics) RecordGeneratedBurst ¶
func (m *StageMetrics) RecordGeneratedBurst(items int)
RecordGeneratedBurst records a generated burst
func (*StageMetrics) RecordOutput ¶
func (m *StageMetrics) RecordOutput()
RecordOutput records a successful output
func (*StageMetrics) RecordProcessing ¶
func (m *StageMetrics) RecordProcessing()
RecordProcessing records the processing of an item
type StageStats ¶
type StageStats struct {
StageName string `json:"stage_name"`
ProcessedItems uint64 `json:"processed_items"`
OutputItems uint64 `json:"output_items"`
Throughput float64 `json:"throughput"`
DroppedItems uint64 `json:"dropped_items"`
DropRate float64 `json:"drop_rate"`
GeneratedItems uint64 `json:"generated_items,omitempty"`
ThruDiffPct float64 `json:"-"`
ProcDiffPct float64 `json:"-"`
}
StageStats represents the statistics for a single stage