simulator

package
v1.0.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 7, 2025 License: MIT Imports: 7 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Simulator

type Simulator struct {
	// Duration specifies how long the simulation should run.
	// If set to a positive value, the simulation will automatically stop
	// after this duration. Mutually exclusive with MaxGeneratedItems.
	Duration time.Duration

	// MaxGeneratedItems is the maximum number of items to generate before stopping.
	// If set to a positive value, the simulation will stop once this many items
	// have been generated by the first stage. Mutually exclusive with Duration.
	MaxGeneratedItems int

	// Stages contains all the processing stages in the pipeline, ordered
	// from first (generator) to last (final stage).
	Stages []*Stage

	// Mu protects access to the Stages slice and other shared state
	Mu sync.RWMutex

	// Ctx provides cancellation context for all stages
	Ctx context.Context

	// Cancel function to stop all stages gracefully
	Cancel context.CancelFunc

	// Quit channel is closed when the simulation completes
	Quit chan struct{}

	// Wg tracks all running goroutines for proper cleanup
	Wg sync.WaitGroup
}

Simulator represents a concurrent pipeline simulator that orchestrates multiple processing stages in a data flow pipeline.

The simulator manages the lifecycle of all stages, coordinates data flow between them, and collects comprehensive performance metrics. It supports both time-based and item-count-based termination conditions.

func NewSimulator

func NewSimulator() *Simulator

NewSimulator creates a new simulator instance with proper initialization.

Returns a simulator with:

  • A cancellable context for graceful shutdown
  • A quit channel for completion signaling
  • Proper synchronization primitives

The simulator is ready to have stages added and configured.

func (*Simulator) AddStage

func (s *Simulator) AddStage(stage *Stage) error

AddStage adds a new stage to the pipeline with validation.

The stage is added to the end of the pipeline. The first stage added should be a generator (IsGenerator: true), while subsequent stages should be processors (IsGenerator: false).

Args:

  • stage: The stage to add to the pipeline

Returns:

  • error: nil if successful, or an error describing the validation failure

Validation rules:

  • Stage cannot be nil
  • Stage name cannot be empty
  • Stage name must be unique within the pipeline

func (*Simulator) Done

func (s *Simulator) Done() <-chan struct{}

Done returns a channel that is closed when the simulation completes.

This channel can be used to wait for simulation completion without blocking the calling goroutine. Useful for implementing custom completion handling or integration with other systems.

Returns:

  • <-chan struct{}: A channel that closes when simulation is done

func (*Simulator) GetStages

func (s *Simulator) GetStages() []*Stage

GetStages returns a copy of all stages in the pipeline.

This method provides thread-safe access to the stages slice. The returned slice is a copy, so modifications won't affect the running simulation.

Returns:

  • []*Stage: A copy of all stages in the pipeline

func (*Simulator) PrintStats

func (s *Simulator) PrintStats()

func (*Simulator) Start

func (s *Simulator) Start() error

Start begins the simulation and blocks until completion.

This method initializes all stages, starts their goroutines, and waits for the simulation to complete based on the configured termination condition (Duration or MaxGeneratedItems).

The simulation will automatically stop when:

  • The configured Duration has elapsed (if Duration > 0)
  • The configured MaxGeneratedItems have been generated (if MaxGeneratedItems > 0)
  • Stop() is called explicitly

Returns:

  • error: nil if successful, or an error describing the failure

Panics:

  • If both Duration and MaxGeneratedItems are set to positive values

func (*Simulator) Stop

func (s *Simulator) Stop()

Stop gracefully terminates the simulation by canceling the context.

This method signals all stages to stop processing and initiates a graceful shutdown. All goroutines will receive the cancellation signal and clean up their resources.

func (*Simulator) WaitForStats

func (s *Simulator) WaitForStats()

WaitForStats blocks until the simulation completes and then prints statistics.

This is a convenience method that combines waiting for completion with automatic statistics reporting. Equivalent to calling <-s.Done() followed by s.PrintStats().

type Stage

type Stage struct {
	Name string

	Input  chan any
	Output chan any
	Sem    chan struct{}

	Config  *StageConfig
	Metrics *StageMetrics

	IsFinal           bool
	MaxGeneratedItems int
	Stop              func()
	// contains filtered or unexported fields
}

Stage represents a processing stage in the pipeline

func NewStage

func NewStage(name string, config *StageConfig) *Stage

NewStage creates a new stage with the given configuration

func (*Stage) GetMetrics

func (s *Stage) GetMetrics() *StageMetrics

func (*Stage) Start

func (s *Stage) Start(ctx context.Context, wg *sync.WaitGroup) error

Start initializes the workers and generators for all stages

func (*Stage) StopOnce

func (s *Stage) StopOnce()

type StageConfig

type StageConfig struct {
	// Rate at which items are generated (generator only)
	InputRate time.Duration
	// Custom item generator function
	ItemGenerator func() any

	// Handles load spikes and burst patterns
	// Generates input bursts at intervals
	InputBurst func() []any
	// Total number of bursts to inject
	BurstCountTotal int
	// Interval between bursts
	BurstInterval time.Duration

	// Number of goroutines per stage
	RoutineNum int
	// Channel buffer size per stage
	BufferSize int
	// Simulated delay per item
	WorkerDelay time.Duration
	// Number of times to retry on error
	RetryCount int

	// Drop input if channel is full
	DropOnBackpressure bool
	// Whether the stage is a generator
	IsGenerator bool

	// Core processing function
	// Worker function that processes each item
	WorkerFunc func(item any) (any, error)
	// Context for cancellation and deadlines
	Ctx context.Context
}

StageConfig holds the configuration for a pipeline stage

func DefaultConfig

func DefaultConfig() *StageConfig

DefaultConfig returns a new SimulationConfig with sensible defaults

type StageMetrics

type StageMetrics struct {

	// Counters
	ProcessedItems uint64
	DroppedItems   uint64
	OutputItems    uint64

	// State
	StartTime time.Time
	EndTime   time.Time

	// Generator stats
	GeneratedItems uint64
	// contains filtered or unexported fields
}

StageMetrics tracks performance metrics for a stage

func NewStageMetrics

func NewStageMetrics() *StageMetrics

NewStageMetrics creates a new metrics collector

func (*StageMetrics) GetStats

func (m *StageMetrics) GetStats() map[string]any

GetStats returns a map of current metrics

func (*StageMetrics) RecordDropped

func (m *StageMetrics) RecordDropped()

RecordDropped records a dropped item

func (*StageMetrics) RecordDroppedBurst

func (m *StageMetrics) RecordDroppedBurst(items int)

RecordDroppedBurst records a dropped burst

func (*StageMetrics) RecordGenerated

func (m *StageMetrics) RecordGenerated()

RecordGenerated records a generated item

func (*StageMetrics) RecordGeneratedBurst

func (m *StageMetrics) RecordGeneratedBurst(items int)

RecordGeneratedBurst records a generated burst

func (*StageMetrics) RecordOutput

func (m *StageMetrics) RecordOutput()

RecordOutput records a successful output

func (*StageMetrics) RecordProcessing

func (m *StageMetrics) RecordProcessing()

RecordProcessing records the processing of an item

func (*StageMetrics) Stop

func (m *StageMetrics) Stop()

Stop marks the end of metrics collection

type StageStats

type StageStats struct {
	StageName      string
	ProcessedItems uint64
	OutputItems    uint64
	Throughput     float64
	DroppedItems   uint64
	DropRate       float64
	GeneratedItems uint64
	ThruDiffPct    float64
	ProcDiffPct    float64
	IsGenerator    bool
	IsFinal        bool
}

StageStats represents the statistics for a single stage

type StateEntry added in v1.0.2

type StateEntry struct {
	Stats map[tracker.GoroutineId]*tracker.GoroutineStats
	Label string
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL