simulator

package
v1.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 12, 2025 License: MIT Imports: 7 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Simulator

type Simulator struct {
	// Duration of the simulation
	Duration time.Duration
	// MaxGeneratedItems is the maximum number of items to generate.
	// If set, the simulation will run until the number of generated items is reached instead of the duration.
	MaxGeneratedItems int

	Stages []*Stage
	Mu     sync.RWMutex
	Ctx    context.Context
	Cancel context.CancelFunc
	Quit   chan struct{}
	Wg     sync.WaitGroup
}

Simulator represents a concurrent pipeline simulator

func NewSimulator

func NewSimulator() *Simulator

NewSimulator creates a new simulator instance with a context and a cancel function

func (*Simulator) AddStage

func (s *Simulator) AddStage(stage *Stage) error

AddStage adds a new stage to the pipeline

func (*Simulator) Done

func (s *Simulator) Done() <-chan struct{}

func (*Simulator) GetStages

func (s *Simulator) GetStages() []*Stage

func (*Simulator) PrintStats

func (s *Simulator) PrintStats()

func (*Simulator) Start

func (s *Simulator) Start() error

Start begins the simulation

func (*Simulator) Stop

func (s *Simulator) Stop()

func (*Simulator) WaitForStats

func (s *Simulator) WaitForStats()

type Stage

type Stage struct {
	Name string

	Input  chan any
	Output chan any
	Sem    chan struct{}

	Config  *StageConfig
	Metrics *StageMetrics
	IdleSpy *tracker.GoroutineManager

	IsFinal           bool
	MaxGeneratedItems int
	Stop              func()
	// contains filtered or unexported fields
}

Stage represents a processing stage in the pipeline

func NewStage

func NewStage(name string, config *StageConfig) *Stage

NewStage creates a new stage with the given configuration

func (*Stage) GetMetrics

func (s *Stage) GetMetrics() *StageMetrics

func (*Stage) Start

func (s *Stage) Start(ctx context.Context, wg *sync.WaitGroup) error

Start initializes the workers and generators for all stages

func (*Stage) StopOnce

func (s *Stage) StopOnce()

type StageConfig

type StageConfig struct {
	// Rate at which items are generated (generator only)
	InputRate time.Duration
	// Custom item generator function
	ItemGenerator func() any

	// Handles load spikes and burst patterns
	// Generates input bursts at intervals
	InputBurst func() []any
	// Total number of bursts to inject
	BurstCountTotal int
	// Interval between bursts
	BurstInterval time.Duration

	// Number of goroutines per stage
	RoutineNum int
	// Channel buffer size per stage
	BufferSize int
	// Simulated delay per item
	WorkerDelay time.Duration
	// Number of times to retry on error
	RetryCount int

	// Drop input if channel is full
	DropOnBackpressure bool
	// Whether the stage is a generator
	IsGenerator bool

	// Core processing function
	// Worker function that processes each item
	WorkerFunc func(item any) (any, error)
	// Context for cancellation and deadlines
	Ctx context.Context
}

StageConfig holds the configuration for a pipeline stage

func DefaultConfig

func DefaultConfig() *StageConfig

DefaultConfig returns a new SimulationConfig with sensible defaults

type StageMetrics

type StageMetrics struct {

	// Counters
	ProcessedItems uint64
	DroppedItems   uint64
	OutputItems    uint64

	// State
	StartTime time.Time
	EndTime   time.Time

	// Generator stats
	GeneratedItems uint64
	// contains filtered or unexported fields
}

StageMetrics tracks performance metrics for a stage

func NewStageMetrics

func NewStageMetrics() *StageMetrics

NewStageMetrics creates a new metrics collector

func (*StageMetrics) GetStats

func (m *StageMetrics) GetStats() map[string]any

GetStats returns a map of current metrics

func (*StageMetrics) RecordDropped

func (m *StageMetrics) RecordDropped()

RecordDropped records a dropped item

func (*StageMetrics) RecordDroppedBurst

func (m *StageMetrics) RecordDroppedBurst(items int)

RecordDroppedBurst records a dropped burst

func (*StageMetrics) RecordGenerated

func (m *StageMetrics) RecordGenerated()

RecordGenerated records a generated item

func (*StageMetrics) RecordGeneratedBurst

func (m *StageMetrics) RecordGeneratedBurst(items int)

RecordGeneratedBurst records a generated burst

func (*StageMetrics) RecordOutput

func (m *StageMetrics) RecordOutput()

RecordOutput records a successful output

func (*StageMetrics) RecordProcessing

func (m *StageMetrics) RecordProcessing()

RecordProcessing records the processing of an item

func (*StageMetrics) Stop

func (m *StageMetrics) Stop()

Stop marks the end of metrics collection

type StageStats

type StageStats struct {
	StageName      string  `json:"stage_name"`
	ProcessedItems uint64  `json:"processed_items"`
	OutputItems    uint64  `json:"output_items"`
	Throughput     float64 `json:"throughput"`
	DroppedItems   uint64  `json:"dropped_items"`
	DropRate       float64 `json:"drop_rate"`
	GeneratedItems uint64  `json:"generated_items,omitempty"`
	ThruDiffPct    float64 `json:"-"`
	ProcDiffPct    float64 `json:"-"`
}

StageStats represents the statistics for a single stage

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL