pipeline

package
v1.5.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 15, 2026 License: MIT Imports: 6 Imported by: 0

Documentation

Overview

Package pipeline provides multi-stage data processing with error handling and monitoring.

Process data through sequential stages with support for timeouts, worker pools, and callbacks.

Quick Start

p := pipeline.New()

p.AddStageFunc("validate", func(ctx context.Context, input interface{}) (interface{}, error) {
	return input, nil  // Validation logic
})

p.AddStageFunc("transform", func(ctx context.Context, input interface{}) (interface{}, error) {
	return processedInput, nil  // Transform logic
})

result, err := p.Execute(context.Background(), inputData)
fmt.Printf("Output: %v\n", result.Output)

Configuration

pool, _ := workerpool.NewSafe(8, 100)

config := pipeline.Config{
	WorkerPool:     pool,
	Timeout:        30 * time.Second,
	StopOnError:    false,              // Continue on errors
	MaxConcurrency: 4,                  // Parallel stages
}

p := pipeline.NewWithConfig(config)

Adding Stages

Function-based stages:

p.AddStageFunc("process", func(ctx context.Context, input interface{}) (interface{}, error) {
	// Processing logic
	return output, nil
})

Custom stage types:

type MyStage struct{}

func (s *MyStage) Name() string { return "my-stage" }

func (s *MyStage) Execute(ctx context.Context, input interface{}) (interface{}, error) {
	// Custom logic
	return output, nil
}

p.AddStage(&MyStage{})

Execution

Synchronous:

result, err := p.Execute(ctx, data)

Asynchronous:

resultCh := p.ExecuteAsync(ctx, data)
result := <-resultCh

With timeout:

ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
defer cancel()
result, err := p.Execute(ctx, data)

Results

result, _ := p.Execute(ctx, input)

fmt.Printf("Output: %v\n", result.Output)
fmt.Printf("Duration: %v\n", result.Duration)
fmt.Printf("Stages completed: %d\n", len(result.StageResults))

// Check stage-specific results
for _, stageResult := range result.StageResults {
	fmt.Printf("Stage %s: %v (took %v)\n",
		stageResult.StageName, stageResult.Output, stageResult.Duration)
}

Error Handling

Stop on first error (default):

config := pipeline.Config{
	StopOnError: true,
}

Continue on errors:

config := pipeline.Config{
	StopOnError: false,
	OnError: func(stageName string, err error) {
		log.Printf("Stage %s failed: %v", stageName, err)
	},
}

Monitoring

Track pipeline execution with callbacks:

config := pipeline.Config{
	OnStageStart: func(stageName string, input interface{}) {
		log.Printf("Starting: %s", stageName)
	},
	OnStageComplete: func(result pipeline.StageResult) {
		log.Printf("%s completed in %v", result.StageName, result.Duration)
	},
	OnPipelineComplete: func(result pipeline.Result) {
		log.Printf("Pipeline completed: %d stages in %v",
			len(result.StageResults), result.Duration)
	},
}

Statistics

stats := p.Stats()
fmt.Printf("Total executions: %d\n", stats.TotalExecutions)
fmt.Printf("Successful: %d, Failed: %d\n", stats.SuccessfulRuns, stats.FailedRuns)
fmt.Printf("Average duration: %v\n", stats.AverageDuration)

Worker Pool Integration

Process stages in a worker pool for better resource management:

pool := workerpool.NewWithConfig(workerpool.Config{
	WorkerCount: 10,
	QueueSize:   100,
})

config := pipeline.Config{
	WorkerPool: pool,
}

p := pipeline.NewWithConfig(config)

Use Cases

Data Validation and Transformation:

p := pipeline.New()
p.AddStageFunc("validate", validateData)
p.AddStageFunc("sanitize", sanitizeData)
p.AddStageFunc("transform", transformData)
p.AddStageFunc("persist", persistData)

result, _ := p.Execute(ctx, userInput)

ETL Pipeline:

p := pipeline.New()
p.AddStageFunc("extract", extractFromSource)
p.AddStageFunc("transform", applyBusinessLogic)
p.AddStageFunc("load", loadToDestination)

p.Execute(ctx, dataSource)

Request Processing:

p := pipeline.New()
p.AddStageFunc("auth", authenticateRequest)
p.AddStageFunc("validate", validateRequest)
p.AddStageFunc("process", processRequest)
p.AddStageFunc("respond", formatResponse)

result, _ := p.Execute(ctx, httpRequest)

Thread Safety

Pipelines can be safely executed concurrently from multiple goroutines.

Performance Notes

- StopOnError=true fails faster but may waste work - StopOnError=false processes all stages but may do unnecessary work - MaxConcurrency limits resource usage but may slow execution - Worker pools reuse goroutines, reducing overhead for many executions

See example tests for more patterns.

Example

Example demonstrates basic pipeline usage.

// Create a simple processing pipeline
p := New()

// Add stages to the pipeline
p.AddStageFunc("uppercase", func(_ context.Context, input interface{}) (interface{}, error) {
	if str, ok := input.(string); ok {
		return strings.ToUpper(str), nil
	}
	return input, nil
})

p.AddStageFunc("prefix", func(_ context.Context, input interface{}) (interface{}, error) {
	if str, ok := input.(string); ok {
		return "PROCESSED: " + str, nil
	}
	return input, nil
})

// Execute the pipeline
result, err := p.Execute(context.Background(), "hello world")
if err != nil {
	fmt.Printf("Error: %v\n", err)
	return
}

fmt.Printf("Input: %s\n", result.Input)
fmt.Printf("Output: %s\n", result.Output)
fmt.Printf("Stages: %d\n", len(result.StageResults))

// Output:
// Input: hello world
Output:

Input: hello world
Output: PROCESSED: HELLO WORLD
Stages: 2
Example (AsyncExecution)

Example_asyncExecution demonstrates asynchronous pipeline execution.

p := New()

p.AddStageFunc("slow", func(ctx context.Context, input interface{}) (interface{}, error) {
	// Simulate slow processing
	select {
	case <-time.After(10 * time.Millisecond):
		return input.(string) + "-processed", nil
	case <-ctx.Done():
		return input, ctx.Err()
	}
})

// Execute asynchronously
fmt.Println("Starting async execution...")

resultCh := p.ExecuteAsync(context.Background(), "async-input")

// Do other work while pipeline executes
fmt.Println("Doing other work...")

// Get result when ready
result := <-resultCh

fmt.Printf("Async result: %s\n", result.Output)
fmt.Printf("Duration: %v\n", "10ms") // Fixed for deterministic output
Output:

Starting async execution...
Doing other work...
Async result: async-input-processed
Duration: 10ms
Example (Callbacks)

Example_callbacks demonstrates pipeline callbacks.

config := Config{
	OnPipelineStart: func(input interface{}) {
		fmt.Printf("Pipeline started with: %v\n", input)
	},
	OnStageStart: func(stageName string, _ interface{}) {
		fmt.Printf("Stage '%s' started\n", stageName)
	},
	OnStageComplete: func(result StageResult) {
		fmt.Printf("Stage '%s' completed in %v\n", result.StageName, "1ms") // Fixed for deterministic output
	},
	OnPipelineComplete: func(result Result) {
		fmt.Printf("Pipeline completed with %d stages\n", len(result.StageResults))
	},
}

p := NewWithConfig(config)

p.AddStageFunc("callback-stage", func(_ context.Context, input interface{}) (interface{}, error) {
	// Removed timing dependency for deterministic output
	return input.(string) + "-done", nil
})

result, _ := p.Execute(context.Background(), "callback-test")
fmt.Printf("Final: %s\n", result.Output)
Output:

Pipeline started with: callback-test
Stage 'callback-stage' started
Stage 'callback-stage' completed in 1ms
Pipeline completed with 1 stages
Final: callback-test-done
Example (ComplexWorkflow)

Example_complexWorkflow demonstrates a complex multi-stage workflow.

p := New()

// Stage 1: Parse input
p.AddStageFunc("parse", func(_ context.Context, input interface{}) (interface{}, error) {
	// Simulate parsing structured data
	data := map[string]interface{}{
		"raw":    input,
		"parsed": true,
	}
	return data, nil
})

// Stage 2: Validate
p.AddStageFunc("validate", func(_ context.Context, input interface{}) (interface{}, error) {
	data := input.(map[string]interface{})
	data["validated"] = true
	return data, nil
})

// Stage 3: Transform
p.AddStageFunc("transform", func(_ context.Context, input interface{}) (interface{}, error) {
	data := input.(map[string]interface{})
	data["transformed"] = true
	// Simulate data transformation
	if raw, ok := data["raw"].(string); ok {
		data["processed"] = strings.ToUpper(raw)
	}
	return data, nil
})

// Stage 4: Finalize
p.AddStageFunc("finalize", func(_ context.Context, input interface{}) (interface{}, error) {
	data := input.(map[string]interface{})
	return fmt.Sprintf("Final result: %s", data["processed"]), nil
})

result, err := p.Execute(context.Background(), "complex workflow")
if err != nil {
	fmt.Printf("Error: %v\n", err)
	return
}

fmt.Printf("Stages executed: %d\n", len(result.StageResults))
fmt.Printf("Total duration: %v\n", "1ms") // Fixed for deterministic output
fmt.Printf("Output: %s\n", result.Output)

// Show stage details with fixed durations
stageTimings := map[string]string{
	"parse":     "100μs",
	"validate":  "50μs",
	"transform": "75μs",
	"finalize":  "25μs",
}
for _, sr := range result.StageResults {
	fmt.Printf("- %s: %s\n", sr.StageName, stageTimings[sr.StageName])
}

// Output:
// Stages executed: 4
// Total duration: 1ms
Output:

Stages executed: 4
Total duration: 1ms
Output: Final result: COMPLEX WORKFLOW
- parse: 100μs
- validate: 50μs
- transform: 75μs
- finalize: 25μs
Example (DataProcessing)

Example_dataProcessing demonstrates a data processing pipeline.

p := New()

// Stage 1: Validate input
p.AddStageFunc("validate", func(_ context.Context, input interface{}) (interface{}, error) {
	data, ok := input.(map[string]interface{})
	if !ok {
		return nil, fmt.Errorf("invalid input type")
	}

	if _, exists := data["id"]; !exists {
		return nil, fmt.Errorf("missing required field: id")
	}

	fmt.Println("Validation passed")
	return data, nil
})

// Stage 2: Enrich data
p.AddStageFunc("enrich", func(_ context.Context, input interface{}) (interface{}, error) {
	data := input.(map[string]interface{})
	data["enriched"] = true
	data["timestamp"] = time.Now().Unix()

	fmt.Println("Data enriched")
	return data, nil
})

// Stage 3: Format output
p.AddStageFunc("format", func(_ context.Context, input interface{}) (interface{}, error) {
	data := input.(map[string]interface{})
	formatted := fmt.Sprintf("ID: %v, Enriched: %v", data["id"], data["enriched"])

	fmt.Println("Data formatted")
	return formatted, nil
})

// Execute pipeline
input := map[string]interface{}{
	"id":   "12345",
	"name": "test item",
}

result, err := p.Execute(context.Background(), input)
if err != nil {
	fmt.Printf("Error: %v\n", err)
	return
}

fmt.Printf("Final result: %s\n", result.Output)
Output:

Validation passed
Data enriched
Data formatted
Final result: ID: 12345, Enriched: true
Example (ErrorHandling)

Example_errorHandling demonstrates error handling in pipelines.

// Configure pipeline to continue on errors
config := Config{
	StopOnError: false,
	OnError: func(stageName string, err error) {
		fmt.Printf("Error in stage '%s': %v\n", stageName, err)
	},
}

p := NewWithConfig(config)

// Stage 1: Always succeeds
p.AddStageFunc("success", func(_ context.Context, input interface{}) (interface{}, error) {
	fmt.Println("Stage 1: Success")
	return input.(string) + "-processed", nil
})

// Stage 2: Always fails
p.AddStageFunc("failure", func(_ context.Context, input interface{}) (interface{}, error) {
	fmt.Println("Stage 2: About to fail")
	return input, fmt.Errorf("deliberate failure")
})

// Stage 3: Processes previous successful output
p.AddStageFunc("recovery", func(_ context.Context, input interface{}) (interface{}, error) {
	fmt.Println("Stage 3: Processing despite error")
	return input.(string) + "-recovered", nil
})

result, err := p.Execute(context.Background(), "input")

fmt.Printf("Pipeline error: %v\n", err)
fmt.Printf("Final output: %s\n", result.Output)
fmt.Printf("Stage errors: %d\n", countStageErrors(result))
Output:

Stage 1: Success
Stage 2: About to fail
Error in stage 'failure': deliberate failure
Stage 3: Processing despite error
Pipeline error: <nil>
Final output: input-processed-recovered
Stage errors: 1
Example (Statistics)

Example_statistics demonstrates pipeline statistics collection.

p := New()

p.AddStageFunc("stats-stage", func(_ context.Context, input interface{}) (interface{}, error) {
	return input, nil
})

// Execute multiple times
for i := 0; i < 3; i++ {
	_, _ = p.Execute(context.Background(), fmt.Sprintf("input-%d", i))
}

// Get statistics
stats := p.Stats()

fmt.Printf("Total executions: %d\n", stats.TotalExecutions)
fmt.Printf("Successful runs: %d\n", stats.SuccessfulRuns)
fmt.Printf("Failed runs: %d\n", stats.FailedRuns)

// Stage statistics
if stageStats, exists := stats.StageStats["stats-stage"]; exists {
	fmt.Printf("Stage executions: %d\n", stageStats.ExecutionCount)
	fmt.Printf("Stage success count: %d\n", stageStats.SuccessCount)
}
Output:

Total executions: 3
Successful runs: 3
Failed runs: 0
Stage executions: 3
Stage success count: 3
Example (Timeout)

Example_timeout demonstrates pipeline timeout handling.

// Set pipeline timeout
p := New().SetTimeout(50 * time.Millisecond)

p.AddStageFunc("slow-stage", func(ctx context.Context, input interface{}) (interface{}, error) {
	// This will exceed the timeout
	select {
	case <-time.After(100 * time.Millisecond):
		return input, nil
	case <-ctx.Done():
		return input, ctx.Err()
	}
})

result, err := p.Execute(context.Background(), "timeout-test")

fmt.Printf("Error: %v\n", err)
fmt.Printf("Is timeout error: %t\n", err == context.DeadlineExceeded)
fmt.Printf("Input preserved: %s\n", result.Input)
Output:

Error: context deadline exceeded
Is timeout error: true
Input preserved: timeout-test
Example (WorkerPool)

Example_workerPool demonstrates pipeline execution with worker pool.

// Create worker pool
pool := workerpool.New(3, 10) //nolint:staticcheck // OK in tests
defer func() { <-pool.Shutdown() }()

// Consume worker pool results
go func() {
	for range pool.Results() {
		_ = 1 // Consume results
	}
}()

// Create pipeline with worker pool
p := New().SetWorkerPool(pool)

p.AddStageFunc("worker-stage", func(_ context.Context, input interface{}) (interface{}, error) {
	// This will run in the worker pool
	return fmt.Sprintf("worker-processed: %s", input), nil
})

result, err := p.Execute(context.Background(), "pool-input")
if err != nil {
	fmt.Printf("Error: %v\n", err)
	return
}

fmt.Printf("Worker pool result: %s\n", result.Output)
Output:

Worker pool result: worker-processed: pool-input

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Config

type Config struct {
	// WorkerPool is the worker pool for parallel execution.
	// If nil, stages run sequentially.
	WorkerPool workerpool.Pool

	// Timeout is the default timeout for pipeline execution.
	Timeout time.Duration

	// OnStageStart is called when a stage starts execution.
	OnStageStart func(stageName string, input interface{})

	// OnStageComplete is called when a stage completes.
	OnStageComplete func(result StageResult)

	// OnPipelineStart is called when pipeline execution starts.
	OnPipelineStart func(input interface{})

	// OnPipelineComplete is called when pipeline execution completes.
	OnPipelineComplete func(result Result)

	// OnError is called when an error occurs.
	OnError func(stageName string, err error)

	// StopOnError determines if pipeline should stop on first error.
	// If false, errors are collected and pipeline continues.
	StopOnError bool

	// MaxConcurrency limits concurrent stage execution when using worker pool.
	// 0 means no limit.
	MaxConcurrency int
}

Config holds pipeline configuration options.

type Pipeline

type Pipeline interface {
	// Execute runs the pipeline with the given input data.
	Execute(ctx context.Context, input interface{}) (*Result, error)

	// ExecuteAsync runs the pipeline asynchronously and returns a channel for the result.
	ExecuteAsync(ctx context.Context, input interface{}) <-chan *Result

	// AddStage adds a stage to the pipeline.
	AddStage(stage Stage) Pipeline

	// AddStageFunc adds a stage function to the pipeline.
	AddStageFunc(name string, fn func(ctx context.Context, input interface{}) (interface{}, error)) Pipeline

	// SetWorkerPool sets the worker pool for parallel execution.
	SetWorkerPool(pool workerpool.Pool) Pipeline

	// SetTimeout sets the default timeout for pipeline execution.
	SetTimeout(timeout time.Duration) Pipeline

	// GetStages returns all stages in the pipeline.
	GetStages() []Stage

	// Stats returns pipeline execution statistics.
	Stats() Stats
}

Pipeline represents a series of stages that process data sequentially or in parallel.

func New

func New() Pipeline

New creates a new pipeline with default configuration.

func NewWithConfig

func NewWithConfig(config Config) Pipeline

NewWithConfig creates a new pipeline with the specified configuration.

type Result

type Result struct {
	// Input is the original input data
	Input interface{}

	// Output is the final output data
	Output interface{}

	// Error is any error that occurred during execution
	Error error

	// Duration is the total execution time
	Duration time.Duration

	// StageResults contains results from each stage
	StageResults []StageResult

	// StartTime is when the pipeline execution started
	StartTime time.Time

	// EndTime is when the pipeline execution finished
	EndTime time.Time
}

Result represents the outcome of a pipeline execution.

type Stage

type Stage interface {
	// Execute processes the input data and returns the result.
	Execute(ctx context.Context, input interface{}) (interface{}, error)

	// Name returns a unique identifier for this stage.
	Name() string
}

Stage represents a single processing stage in a pipeline.

func NewStageFunc

func NewStageFunc(name string, fn func(ctx context.Context, input interface{}) (interface{}, error)) Stage

NewStageFunc creates a new stage from a function.

type StageFunc

type StageFunc struct {
	// contains filtered or unexported fields
}

StageFunc is a function type that implements the Stage interface.

func (*StageFunc) Execute

func (sf *StageFunc) Execute(ctx context.Context, input interface{}) (interface{}, error)

Execute implements the Stage interface for StageFunc.

func (*StageFunc) Name

func (sf *StageFunc) Name() string

Name returns the stage name.

type StageResult

type StageResult struct {
	// StageName is the name of the stage
	StageName string

	// Input is the input to this stage
	Input interface{}

	// Output is the output from this stage
	Output interface{}

	// Error is any error from this stage
	Error error

	// Duration is how long this stage took
	Duration time.Duration

	// StartTime is when the stage started
	StartTime time.Time

	// EndTime is when the stage finished
	EndTime time.Time
}

StageResult represents the result of a single stage execution.

type StageStats

type StageStats struct {
	Name            string
	ExecutionCount  int64
	SuccessCount    int64
	ErrorCount      int64
	TotalDuration   time.Duration
	AverageDuration time.Duration
}

StageStats holds statistics for individual stages.

type Stats

type Stats struct {
	TotalExecutions int64
	SuccessfulRuns  int64
	FailedRuns      int64
	TotalDuration   time.Duration
	AverageDuration time.Duration
	StageStats      map[string]StageStats
	LastExecutionAt time.Time
}

Stats holds pipeline execution statistics.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL