Documentation
¶
Overview ¶
Package state provides a continuation-passing style framework for building composable state machines and processing pipelines.
Core Concepts ¶
Step - The Building Block ¶
A Step represents a single operation that processes a context and returns the next step to execute (or nil to terminate):
type Step interface {
Run(context.Context) Step
}
Example - Writing a custom Step:
// A simple step that prints and continues
type PrintStep struct {
message string
next Step
}
func (p *PrintStep) Run(ctx context.Context) Step {
fmt.Println(p.message)
if p.next != nil {
return p.next.Run(ctx)
}
return nil
}
NewStep - The Composition Unit ¶
A NewStep is a factory function that creates a Step when given the next step to execute. This enables continuation-passing style composition:
type NewStep func(next Step) Step
ContextFunc - The Work Unit ¶
A ContextFunc is a function that does work with context. It takes a context, performs operations, and returns the context (potentially modified) so it can be threaded through the pipeline:
type ContextFunc func(context.Context) context.Context
Use with Do to add work to the pipeline.
Example - Creating reusable NewSteps:
type ctxKey string
// A NewStep that adds a value to context
func AddValue(key ctxKey, value string) NewStep {
return NewStepFunc(func(ctx context.Context, next Step) Step {
fmt.Printf("adding %s=%s to context\n", key, value)
newCtx := context.WithValue(ctx, key, value)
return Continue(newCtx, next)
})
}
// A NewStep that checks a condition
func CheckError() NewStep {
return NewStepFunc(func(ctx context.Context, next Step) Step {
if ctx.Err() != nil {
fmt.Println("Error detected, stopping")
return nil
}
return Continue(ctx, next)
})
}
// A NewStep that reads from context
func PrintUser() NewStep {
return NewStepFunc(func(ctx context.Context, next Step) Step {
user := ctx.Value(ctxKey("user")).(string)
fmt.Printf("current user: %s\n", user)
return Continue(ctx, next)
})
}
// Compose them - context values flow through the pipeline:
pipeline := Sequence(
AddValue(ctxKey("user"), "alice"),
CheckError(),
PrintUser(), // Can access "user" value set earlier
AddValue(ctxKey("processed"), "true"),
)
Run(context.Background(), pipeline)
// Output:
// adding user=alice to context
// current user: alice
// adding processed=true to context
Running a Pipeline ¶
Use Run to execute a NewStep pipeline:
func Run(ctx context.Context, newStep NewStep)
The pipeline executes to completion using continuation-passing style.
Error Handling ¶
By default, Continue stops the pipeline when context is cancelled. You can customize this behavior:
// Default: stops on cancellation
ctx := context.Background()
pipeline := Sequence(step1, step2, step3)
Run(ctx, pipeline)
// With error handler: cleanup on cancellation
ctx = WithErrorHandler(ctx, func(err error) Step {
log.Printf("pipeline cancelled: %v", err)
return cleanupStep.Step()
})
Run(ctx, pipeline)
Helper Functions ¶
The package provides helper functions for common patterns. These are all built on top of Step and NewStep:
Basic Helpers:
- NewStepFunc(fn) - Create a NewStep from func(context.Context, next Step) Step
- Do(fn ContextFunc) - Lift a work function into a pipeline step
- Continue(ctx, next) - Run the next step if it exists and context is not cancelled
- WithErrorHandler(ctx, handler) - Set an error handler for cancelled contexts
- Terminal - Stop pipeline execution
- Noop - Continue to next step without doing anything
Composition Operators:
- Sequence(...steps) - Execute steps sequentially
- Parallel(...steps) - Execute steps concurrently
- Decision(predicate, ifTrue, ifFalse) - Binary branching (if/else)
- Enum(selector, cases, default) - Multi-way branching on comparable types
- Switch(selector, cases, default) - Multi-way branching on strings
- Recover(step) - Wrap a step to suppress panics (opt-in; panics propagate by default)
Choosing a Pattern ¶
What do you need to do? ├─ Run steps in order? │ └─ Use: Sequence(a, b, c) │ ├─ Run steps concurrently? │ └─ Use: Parallel(a, b, c) │ ├─ Make a binary choice (if/else)? │ └─ Use: Decision(predicate, ifTrue, ifFalse) │ ├─ Choose from multiple options (switch/case)? │ ├─ String-based? → Use: Switch(selector, cases, default) │ └─ Generic type? → Use: Enum(selector, cases, default) │ └─ Do work in a step? └─ Use: Do(contextFunc)
Basic Usage ¶
Simple Sequential Pipeline:
type ctxKey string
pipeline := state.Sequence(
state.Do(func(ctx context.Context) context.Context {
fmt.Println("step 1: setting user")
return context.WithValue(ctx, ctxKey("user"), "alice")
}),
state.Do(func(ctx context.Context) context.Context {
user := ctx.Value(ctxKey("user")).(string)
fmt.Printf("step 2: user is %s, adding request ID\n", user)
return context.WithValue(ctx, ctxKey("requestID"), "req-123")
}),
state.Do(func(ctx context.Context) context.Context {
user := ctx.Value(ctxKey("user")).(string)
reqID := ctx.Value(ctxKey("requestID")).(string)
fmt.Printf("step 3: processing request %s for user %s\n", reqID, user)
return ctx
}),
)
state.Run(context.Background(), pipeline)
// Output:
// step 1: setting user
// step 2: user is alice, adding request ID
// step 3: processing request req-123 for user alice
Conditional Branching:
type ctxKey string
pipeline := state.Sequence(
state.Do(func(ctx context.Context) context.Context {
fmt.Println("checking authorization")
return context.WithValue(ctx, ctxKey("authorized"), true)
}),
state.Decision(
func(ctx context.Context) bool {
return ctx.Value(ctxKey("authorized")).(bool)
},
// True branch - user is authorized
state.Do(func(ctx context.Context) context.Context {
fmt.Println("access granted, setting permissions")
return context.WithValue(ctx, ctxKey("permissions"), "read,write")
}),
// False branch - user is not authorized
state.Do(func(ctx context.Context) context.Context {
fmt.Println("access denied")
return context.WithValue(ctx, ctxKey("permissions"), "none")
}),
),
state.Do(func(ctx context.Context) context.Context {
perms := ctx.Value(ctxKey("permissions")).(string)
fmt.Printf("final permissions: %s\n", perms)
return ctx
}),
)
Parallel Execution:
type ctxKey string
pipeline := state.Sequence(
state.Do(func(ctx context.Context) context.Context {
fmt.Println("initializing request context")
ctx = context.WithValue(ctx, ctxKey("requestID"), "req-456")
return context.WithValue(ctx, ctxKey("userID"), "user-789")
}),
state.Parallel(
// Each parallel branch receives the same context
state.Do(func(ctx context.Context) context.Context {
reqID := ctx.Value(ctxKey("requestID")).(string)
fmt.Printf("validating request %s\n", reqID)
return ctx
}),
state.Do(func(ctx context.Context) context.Context {
userID := ctx.Value(ctxKey("userID")).(string)
fmt.Printf("loading user profile %s\n", userID)
return ctx
}),
state.Do(func(ctx context.Context) context.Context {
reqID := ctx.Value(ctxKey("requestID")).(string)
fmt.Printf("logging request %s\n", reqID)
return ctx
}),
),
state.Do(func(ctx context.Context) context.Context {
reqID := ctx.Value(ctxKey("requestID")).(string)
fmt.Printf("all parallel work complete for %s\n", reqID)
return ctx
}),
)
To collect results from parallel branches, use typedctx.Box:
import "github.com/authzed/controller-idioms/typedctx"
type ValidationResult struct { Valid bool }
type PermissionsResult struct { Allowed bool }
pipeline := state.Sequence(
// Set up boxes before parallel execution
state.Do(func(ctx context.Context) context.Context {
ctx = typedctx.WithBox[ValidationResult](ctx)
ctx = typedctx.WithBox[PermissionsResult](ctx)
return ctx
}),
// Parallel branches write to their boxes
state.Parallel(
state.Do(func(ctx context.Context) context.Context {
result := validate(ctx)
typedctx.MustStore(ctx, ValidationResult{Valid: result})
return ctx
}),
state.Do(func(ctx context.Context) context.Context {
allowed := checkPermissions(ctx)
typedctx.MustStore(ctx, PermissionsResult{Allowed: allowed})
return ctx
}),
),
// After parallel completes, read the results
state.Do(func(ctx context.Context) context.Context {
validation := typedctx.MustValue[ValidationResult](ctx)
permissions := typedctx.MustValue[PermissionsResult](ctx)
fmt.Printf("Valid: %v, Allowed: %v\n", validation.Valid, permissions.Allowed)
return ctx
}),
)
Multi-way Branching with Enum:
type ctxKey string
pipeline := state.Sequence(
state.Do(func(ctx context.Context) context.Context {
// Resource type set by earlier processing
return context.WithValue(ctx, ctxKey("resourceType"), "deployment")
}),
state.Enum(
func(ctx context.Context) string {
return ctx.Value(ctxKey("resourceType")).(string)
},
map[string]state.NewStep{
"deployment": state.Do(func(ctx context.Context) context.Context {
fmt.Println("handling deployment")
return context.WithValue(ctx, ctxKey("replicas"), 3)
}),
"service": state.Do(func(ctx context.Context) context.Context {
fmt.Println("handling service")
return context.WithValue(ctx, ctxKey("port"), 8080)
}),
"configmap": state.Do(func(ctx context.Context) context.Context {
fmt.Println("handling configmap")
return context.WithValue(ctx, ctxKey("dataKeys"), []string{"config.yaml"})
}),
},
// Default case
state.Do(func(ctx context.Context) context.Context {
fmt.Println("unknown resource type")
return ctx
}),
),
state.Do(func(ctx context.Context) context.Context {
// Values set by the enum branch are available here
if replicas, ok := ctx.Value(ctxKey("replicas")).(int); ok {
fmt.Printf("deployment configured with %d replicas\n", replicas)
}
return ctx
}),
)
String-based Branching with Switch:
pipeline := state.Switch(
func(ctx context.Context) string {
return getOperationPhase(ctx)
},
map[string]state.NewStep{
"pending": initializationStep,
"running": monitoringStep,
"completed": cleanupStep,
"failed": recoveryStep,
},
unknownPhaseStep, // Default case
)
Controller Example ¶
A typical controller pipeline:
// Main controller pipeline
mainPipeline := state.Sequence(
state.Do(func(ctx context.Context) context.Context {
// Set finalizer
return ctx
}),
state.Do(func(ctx context.Context) context.Context {
// Check for safe deletion
return ctx
}),
state.Do(func(ctx context.Context) context.Context {
// Check pause condition
return ctx
}),
state.Decision(
func(ctx context.Context) bool {
return hasDatabaseInstance(ctx)
},
// New style: has database instance
state.Sequence(
ensureMetadata,
createScopedCredentials,
),
// Old style: direct secret reference
state.Sequence(
adoptExistingSecret,
ensureMetadata,
),
),
)
// Execute the pipeline
state.Run(ctx, mainPipeline)
Key Benefits ¶
- No ID Management: Direct references eliminate the need for step IDs and lookup
- Simpler Composition: Steps compose naturally without complex builder patterns
- Type Safety: Factory functions provide compile-time safety
- Multi-way Branching: Enum and Switch operators handle complex branching logic elegantly
- Clear Control Flow: Continuation-passing makes execution flow explicit
- Reusability: Steps can be easily reused across different pipelines
- Testability: Individual steps and compositions are easy to test
Integration with Queue Operations ¶
The state package works seamlessly with the existing queue operations:
import "github.com/authzed/controller-idioms/queue"
validationStep := state.Do(func(ctx context.Context) context.Context {
if err := validateResource(ctx); err != nil {
queue.NewQueueOperationsCtx().RequeueErr(ctx, err)
return ctx
}
// Continue processing...
return ctx
})
The continuation-passing style provides a clean, functional approach to building complex state machines.
Package state contains Step units used to compose reconciliation loops for controllers.
You write functions or types that implement the Step interface, and then compose them via NewStep functions and composition operators.
NewStep functions are used for creating Step instances, and Handlers are the things that actually process requests - each step returns the next step to execute, or nil to terminate.
Writing controllers in this style permits composition patterns including Sequence, Parallel, Decision, and other operations like Map and Bind.
Why Context? ¶
This package uses context.Context as the state container instead of a custom type or generic map for several reasons:
**Integration**: Controllers already use context.Context for cancellation, deadlines, and request-scoped values. Using it here means zero friction.
**Standardization**: Context is Go's standard way to pass request-scoped data. Using it makes the pattern immediately familiar to Go developers.
**Immutability**: context.WithValue() returns a new context, encouraging immutable transformations (though this isn't enforced - see ContextFunc docs).
**Ecosystem**: Existing middleware, logging, tracing, and client libraries already understand context.Context.
**Cancellation**: Built-in support for cancellation and deadlines, crucial for long-running controller operations.
Think of Context as a "sufficiently capable state container" rather than as purely a cancellation mechanism. For type-safe operations, use typedctx.
Example (BranchingComparison) ¶
ctx := context.Background()
// This is equivalent to the complex handler branching example:
// hasSecretHandler := chain(c.ensureMetadata, c.ensureScopedDatabaseCreds(...))
// directSecretHandler := chain(c.adoptDBRootSecret, c.removeMissingSecretCondition, hasSecretHandler)
// mainHandler := chain(c.setFinalizer, c.safeDelete, c.checkPause, c.validateInstance(hasSecretHandler, directSecretHandler))
// Define reusable stage builders
ensureMetadata := Do(func(ctx context.Context) context.Context {
fmt.Println("ensuring metadata")
return ctx
})
ensureScopedDatabaseCreds := Do(func(ctx context.Context) context.Context {
fmt.Println("ensuring scoped database credentials")
return ctx
})
createLogicalDatabase := Do(func(ctx context.Context) context.Context {
fmt.Println("creating logical database")
return ctx
})
adoptDBRootSecret := Do(func(ctx context.Context) context.Context {
fmt.Println("adopting DB root secret")
return ctx
})
removeMissingSecretCondition := Do(func(ctx context.Context) context.Context {
fmt.Println("removing missing secret condition")
return ctx
})
// hasSecretChain - runs when secret is found via database instance
hasSecretChain := Sequence(
ensureMetadata,
ensureScopedDatabaseCreds,
createLogicalDatabase,
)
// directSecretChain - runs when database instance not found (old style)
directSecretChain := Sequence(
adoptDBRootSecret,
removeMissingSecretCondition,
hasSecretChain, // Reuse the hasSecret chain
)
// validateInstance - decides between the two branches
validateInstance := Decision(
func(_ context.Context) bool {
// In real code, this would check if database instance exists
fmt.Println("validating instance")
return true // Has database instance
},
hasSecretChain, // True branch
directSecretChain, // False branch
)
// Main controller pipeline
mainHandler := Sequence(
Do(func(ctx context.Context) context.Context {
fmt.Println("setting finalizer")
return ctx
}),
Do(func(ctx context.Context) context.Context {
fmt.Println("safe delete check")
return ctx
}),
Do(func(ctx context.Context) context.Context {
fmt.Println("checking pause")
return ctx
}),
validateInstance,
)
Run(ctx, mainHandler)
Output: setting finalizer safe delete check checking pause validating instance ensuring metadata ensuring scoped database credentials creating logical database
Example (BuilderReplacement) ¶
ctx := context.Background()
// In the old handler system, you needed:
// 1. Builder functions
// 2. Chain() to compose builders
// 3. .Handler(id) to instantiate
// 4. Complex ID management for branching
// In the state system, it's much simpler:
// Just compose NewStep functions directly
// Old way (conceptually):
// validationBuilder := func(next Handler) Handler { ... }
// processingBuilder := func(next Handler) Handler { ... }
// pipeline := Chain(validationBuilder, processingBuilder).Handler("myPipeline")
// New way:
validation := Do(func(ctx context.Context) context.Context {
fmt.Println("validation")
return ctx
})
processing := Do(func(ctx context.Context) context.Context {
fmt.Println("processing")
return ctx
})
pipeline := Sequence(validation, processing)
Run(ctx, pipeline)
Output: validation processing
Example (ComplexConditionals) ¶
userRoleKey := ctxkey.New[string]()
ctx := userRoleKey.Set(context.Background(), "admin")
// Multi-way branching using Enum - much cleaner than nested decisions
pipeline := Sequence(
Do(func(ctx context.Context) context.Context {
fmt.Println("authentication")
return ctx
}),
Enum(
func(ctx context.Context) string {
return userRoleKey.MustValue(ctx)
},
map[string]NewStep{
"admin": Do(func(ctx context.Context) context.Context {
fmt.Println("admin workflow")
return ctx
}),
"user": Do(func(ctx context.Context) context.Context {
fmt.Println("user workflow")
return ctx
}),
"moderator": Do(func(ctx context.Context) context.Context {
fmt.Println("moderator workflow")
return ctx
}),
},
Do(func(ctx context.Context) context.Context {
fmt.Println("guest workflow")
return ctx
}),
),
Do(func(ctx context.Context) context.Context {
fmt.Println("logging user action")
return ctx
}),
)
Run(ctx, pipeline)
Output: authentication admin workflow logging user action
Example (ComplexPipeline) ¶
ctx := context.Background()
// A more complex example showing composition
pipeline := Sequence(
Do(func(ctx context.Context) context.Context {
fmt.Println("initialization")
return ctx
}),
Decision(
func(_ context.Context) bool {
return true // some validation logic
},
// Validation passed - processing
Do(func(ctx context.Context) context.Context {
fmt.Println("validation passed")
return ctx
}),
// Validation failed
Do(func(ctx context.Context) context.Context {
fmt.Println("validation failed")
return ctx
}),
),
Do(func(ctx context.Context) context.Context {
fmt.Println("finalization")
return ctx
}),
)
Run(ctx, pipeline)
Output: initialization validation passed finalization
Example (ConditionalExecution) ¶
shouldProcessKey := ctxkey.New[bool]()
ctx := shouldProcessKey.Set(context.Background(), false)
pipeline := Sequence(
Do(func(ctx context.Context) context.Context {
fmt.Println("starting process")
return ctx
}),
Decision(
func(ctx context.Context) bool {
return shouldProcessKey.MustValue(ctx)
},
Do(func(ctx context.Context) context.Context {
fmt.Println("processing enabled")
return ctx
}),
Do(func(ctx context.Context) context.Context {
fmt.Println("processing disabled")
return ctx
}),
),
Do(func(ctx context.Context) context.Context {
fmt.Println("cleanup")
return ctx
}),
)
Run(ctx, pipeline)
Output: starting process processing disabled cleanup
Example (ControllerWithEnum) ¶
operationKey := ctxkey.New[string]()
ctx := operationKey.Set(context.Background(), "reconcile")
// Define reusable stages
setFinalizer := Do(func(ctx context.Context) context.Context {
fmt.Println("setting finalizer")
return ctx
})
validateSpec := Do(func(ctx context.Context) context.Context {
fmt.Println("validating spec")
return ctx
})
createResources := Do(func(ctx context.Context) context.Context {
fmt.Println("creating resources")
return ctx
})
updateResources := Do(func(ctx context.Context) context.Context {
fmt.Println("updating resources")
return ctx
})
deleteResources := Do(func(ctx context.Context) context.Context {
fmt.Println("deleting resources")
return ctx
})
removeFinalizer := Do(func(ctx context.Context) context.Context {
fmt.Println("removing finalizer")
return ctx
})
// Main controller pipeline using Enum for operation dispatch
pipeline := Sequence(
Do(func(ctx context.Context) context.Context {
fmt.Println("starting controller operation")
return ctx
}),
Enum(
func(ctx context.Context) string {
return operationKey.MustValue(ctx)
},
map[string]NewStep{
"reconcile": Sequence(
setFinalizer,
validateSpec,
Decision(
func(_ context.Context) bool {
// Check if resources exist
return false // Assume they don't exist
},
updateResources,
createResources,
),
),
"delete": Sequence(
deleteResources,
removeFinalizer,
),
"validate": validateSpec,
},
Do(func(ctx context.Context) context.Context {
fmt.Println("unsupported operation")
return ctx
}),
),
Do(func(ctx context.Context) context.Context {
fmt.Println("operation completed")
return ctx
}),
)
Run(ctx, pipeline)
Output: starting controller operation setting finalizer validating spec creating resources operation completed
Example (EnumBranching) ¶
resourceTypeKey := ctxkey.New[string]()
ctx := resourceTypeKey.Set(context.Background(), "deployment")
pipeline := Sequence(
Do(func(ctx context.Context) context.Context {
fmt.Println("processing resource")
return ctx
}),
Enum(
func(ctx context.Context) string {
return resourceTypeKey.MustValue(ctx)
},
map[string]NewStep{
"deployment": Sequence(
Do(func(ctx context.Context) context.Context {
fmt.Println("validating deployment spec")
return ctx
}),
Do(func(ctx context.Context) context.Context {
fmt.Println("creating deployment")
return ctx
}),
),
"service": Sequence(
Do(func(ctx context.Context) context.Context {
fmt.Println("validating service spec")
return ctx
}),
Do(func(ctx context.Context) context.Context {
fmt.Println("creating service")
return ctx
}),
),
"configmap": Do(func(ctx context.Context) context.Context {
fmt.Println("creating configmap")
return ctx
}),
},
Do(func(ctx context.Context) context.Context {
fmt.Println("unsupported resource type")
return ctx
}),
),
Do(func(ctx context.Context) context.Context {
fmt.Println("resource processing complete")
return ctx
}),
)
Run(ctx, pipeline)
Output: processing resource validating deployment spec creating deployment resource processing complete
Example (MonadicPatterns) ¶
configKey := ctxkey.New[string]()
validatedConfigKey := ctxkey.New[string]()
ctx := configKey.Set(context.Background(), "production")
// Sequential composition with context transformation
pipeline := Sequence(
Do(func(ctx context.Context) context.Context {
config := configKey.MustValue(ctx)
return validatedConfigKey.Set(ctx, config+"-validated")
}),
Do(func(ctx context.Context) context.Context {
config := validatedConfigKey.MustValue(ctx)
fmt.Printf("using config: %s\n", config)
return ctx
}),
Do(func(ctx context.Context) context.Context {
fmt.Println("first operation")
return ctx
}),
Do(func(ctx context.Context) context.Context {
fmt.Println("dependent operation")
return ctx
}),
)
Run(ctx, pipeline)
Output: using config: production-validated first operation dependent operation
Example (ReusableStages) ¶
ctx := context.Background()
// Define reusable stages
logStart := func(name string) NewStep {
return Do(func(ctx context.Context) context.Context {
fmt.Printf("starting %s\n", name)
return ctx
})
}
logEnd := func(name string) NewStep {
return Do(func(ctx context.Context) context.Context {
fmt.Printf("completed %s\n", name)
return ctx
})
}
// Wrap a stage with logging
withLogging := func(name string, stage NewStep) NewStep {
return Sequence(
logStart(name),
stage,
logEnd(name),
)
}
// Use the reusable pattern
pipeline := Sequence(
withLogging("validation", Do(func(ctx context.Context) context.Context {
fmt.Println("validating input")
return ctx
})),
withLogging("processing", Do(func(ctx context.Context) context.Context {
fmt.Println("processing work")
return ctx
})),
withLogging("cleanup", Do(func(ctx context.Context) context.Context {
fmt.Println("cleaning up")
return ctx
})),
)
Run(ctx, pipeline)
Output: starting validation validating input completed validation starting processing processing work completed processing starting cleanup cleaning up completed cleanup
Example (SwitchWorkflow) ¶
phaseKey := ctxkey.New[string]()
ctx := phaseKey.Set(context.Background(), "pending")
pipeline := Sequence(
Do(func(ctx context.Context) context.Context {
fmt.Println("checking resource phase")
return ctx
}),
Switch(
func(ctx context.Context) string {
return phaseKey.MustValue(ctx)
},
map[string]NewStep{
"pending": Sequence(
Do(func(ctx context.Context) context.Context {
fmt.Println("initializing resources")
return ctx
}),
Do(func(ctx context.Context) context.Context {
fmt.Println("setting up dependencies")
return ctx
}),
),
"running": Do(func(ctx context.Context) context.Context {
fmt.Println("monitoring running state")
return ctx
}),
"failed": Sequence(
Do(func(ctx context.Context) context.Context {
fmt.Println("analyzing failure")
return ctx
}),
Do(func(ctx context.Context) context.Context {
fmt.Println("attempting recovery")
return ctx
}),
),
"completed": Do(func(ctx context.Context) context.Context {
fmt.Println("cleaning up completed resources")
return ctx
}),
},
Do(func(ctx context.Context) context.Context {
fmt.Println("unknown phase - logging for investigation")
return ctx
}),
),
)
Run(ctx, pipeline)
Output: checking resource phase initializing resources setting up dependencies
Index ¶
- Variables
- func Run(ctx context.Context, newStep NewStep)
- func WithErrorHandler(ctx context.Context, handler func(error) Step) context.Context
- type ContextFunc
- type DecisionStep
- type EnumStep
- type NewStep
- func Decision(predicate func(context.Context) bool, trueHandler, falseHandler NewStep) NewStep
- func Do(fn ContextFunc) NewStep
- func Enum[T comparable](selector func(context.Context) T, cases map[T]NewStep, defaultHandler NewStep) NewStep
- func NewStepFunc(fn func(ctx context.Context, next Step) Step) NewStep
- func NewTerminalStepFunc(fn func(ctx context.Context)) NewStep
- func Parallel(steps ...NewStep) NewStep
- func ParallelWith(wrapper func(NewStep) NewStep, steps ...NewStep) NewStep
- func Recover(step NewStep) NewStep
- func Sequence(steps ...NewStep) NewStep
- func Switch(selector func(context.Context) string, cases map[string]NewStep, ...) NewStep
- func When(predicate func(context.Context) bool, trueHandler NewStep) NewStep
- type PanicError
- type ParallelStep
- type Step
- type StepFunc
Examples ¶
- Package (BranchingComparison)
- Package (BuilderReplacement)
- Package (ComplexConditionals)
- Package (ComplexPipeline)
- Package (ConditionalExecution)
- Package (ControllerWithEnum)
- Package (EnumBranching)
- Package (MonadicPatterns)
- Package (ReusableStages)
- Package (SwitchWorkflow)
- Decision
- Enum
- Parallel
- Sequence
- Switch
- WithErrorHandler
Constants ¶
This section is empty.
Variables ¶
var Noop = NewStepFunc(Continue)
Noop creates a step that does nothing and continues to the next step.
var Terminal = NewTerminalStepFunc(func(context.Context) {})
Terminal creates a step that terminates the pipeline.
Functions ¶
func Run ¶
Run executes a NewStep pipeline until completion. With continuation-passing style, the pipeline executes completely in one call.
func WithErrorHandler ¶
WithErrorHandler adds an error handler to the context. When Continue encounters a cancelled context, it will call this handler instead of stopping the pipeline.
Example ¶
ctx, cancel := context.WithCancel(context.Background()) //nolint:gosec // cancel called inside pipeline step
// Set up error handler
ctx = WithErrorHandler(ctx, func(_ error) Step {
fmt.Println("handling cancellation")
return nil
})
pipeline := Sequence(
Do(func(ctx context.Context) context.Context {
fmt.Println("step 1")
cancel() // Simulate cancellation
return ctx
}),
Do(func(ctx context.Context) context.Context {
fmt.Println("step 2 (should not run)")
return ctx
}),
)
Run(ctx, pipeline)
Output: step 1 handling cancellation
Types ¶
type ContextFunc ¶
ContextFunc is a function that does work with context. It takes a context, performs operations, and returns the (potentially modified) context so it can be threaded through the pipeline.
type DecisionStep ¶
type DecisionStep struct {
// contains filtered or unexported fields
}
DecisionStep implements conditional execution.
type EnumStep ¶
type EnumStep[T comparable] struct { // contains filtered or unexported fields }
EnumStep implements multi-way branching based on enum values.
type NewStep ¶
NewStep is a function that creates a Step when given the next step to execute. This is the basic building block for composing step pipelines using continuation-passing style.
func Decision ¶
Decision creates a conditional step that chooses between two paths.
Example ¶
ctx := context.Background()
// Decision based on a simple condition
condition := true
pipeline := Decision(
func(_ context.Context) bool {
return condition
},
Do(func(ctx context.Context) context.Context {
fmt.Println("true branch")
return ctx
}),
Do(func(ctx context.Context) context.Context {
fmt.Println("false branch")
return ctx
}),
)
Run(ctx, pipeline)
Output: true branch
func Do ¶
func Do(fn ContextFunc) NewStep
Do lifts a ContextFunc into a pipeline step. This is the primary way to add work to a pipeline.
func Enum ¶
func Enum[T comparable]( selector func(context.Context) T, cases map[T]NewStep, defaultHandler NewStep, ) NewStep
Enum creates a multi-way branching step based on a selector function. The selector function returns a value that is matched against the cases map. If no match is found, the defaultHandler is executed.
Example ¶
operationKey := ctxkey.New[string]()
ctx := operationKey.Set(context.Background(), "create")
pipeline := Enum(
func(ctx context.Context) string {
return operationKey.MustValue(ctx)
},
map[string]NewStep{
"create": Do(func(ctx context.Context) context.Context {
fmt.Println("creating resource")
return ctx
}),
"update": Do(func(ctx context.Context) context.Context {
fmt.Println("updating resource")
return ctx
}),
"delete": Do(func(ctx context.Context) context.Context {
fmt.Println("deleting resource")
return ctx
}),
},
Do(func(ctx context.Context) context.Context {
fmt.Println("unknown operation")
return ctx
}),
)
Run(ctx, pipeline)
Output: creating resource
func NewStepFunc ¶
NewStepFunc creates a NewStep from a function that takes both context and next step. This is a convenience wrapper that eliminates boilerplate:
func MyStep() NewStep {
return NewStepFunc(func(ctx context.Context, next Step) Step {
// your logic here
return Continue(ctx, next)
})
}
Instead of the more verbose:
func MyStep() NewStep {
return func(next Step) Step {
return StepFunc(func(ctx context.Context) Step {
// your logic here
return Continue(ctx, next)
})
}
}
func NewTerminalStepFunc ¶
NewTerminalStepFunc creates a NewStep that executes a side-effect function and terminates the pipeline. This is a convenience wrapper for steps that don't need to continue to the next step.
Example:
markComplete := state.NewTerminalStepFunc(func(ctx context.Context) {
log.Println("Processing complete")
queue.NewQueueOperationsCtx().Done(ctx)
})
func Parallel ¶
Parallel composes multiple NewStep functions to run in parallel, then continues to the next step after all complete.
Example ¶
ctx := context.Background()
// Use atomic counter to demonstrate parallel execution
var counter int32
pipeline := Parallel(
Do(func(ctx context.Context) context.Context {
atomic.AddInt32(&counter, 1)
return ctx
}),
Do(func(ctx context.Context) context.Context {
atomic.AddInt32(&counter, 1)
return ctx
}),
Do(func(ctx context.Context) context.Context {
atomic.AddInt32(&counter, 1)
return ctx
}),
)
Run(ctx, pipeline)
fmt.Printf("counter: %d", atomic.LoadInt32(&counter))
Output: counter: 3
func ParallelWith ¶
ParallelWith composes multiple NewStep functions to run in parallel, applying wrapper to each branch before execution. This is useful for applying a uniform policy to all branches, such as panic recovery:
ParallelWith(Recover, step1, step2, step3)
The wrapper is called once per branch at instantiation time (when the returned NewStep is called), not at execution time.
If using Recover as the wrapper and multiple branches panic concurrently, the error handler may be called multiple times — once per panicking branch. The context cause will reflect whichever panic cancelled the context first.
func Recover ¶
Recover wraps a step with panic recovery. On panic, it cancels a child context with a *PanicError cause and routes through Continue, so any WithErrorHandler registered on the context will be called with the *PanicError. The pipeline does not continue past the recovered panic. In most cases, panics should propagate naturally — only use Recover when you explicitly need to handle a panic from a specific step.
Note: Recover cannot catch panics from goroutines spawned by the wrapped step (e.g. a Parallel step). Go's runtime does not allow cross-goroutine panic recovery; those panics will still crash the program.
func Sequence ¶
Sequence composes multiple NewStep functions into a sequential pipeline. Each step in the sequence executes in order with proper context threading.
Example ¶
ctx := context.Background()
pipeline := Sequence(
Do(func(ctx context.Context) context.Context {
fmt.Println("first stage")
return ctx
}),
Do(func(ctx context.Context) context.Context {
fmt.Println("second stage")
return ctx
}),
Do(func(ctx context.Context) context.Context {
fmt.Println("third stage")
return ctx
}),
)
Run(ctx, pipeline)
Output: first stage second stage third stage
func Switch ¶
func Switch( selector func(context.Context) string, cases map[string]NewStep, defaultHandler NewStep, ) NewStep
Switch creates a multi-way branching step based on string values. This is a convenience function for the common case of string-based branching.
Example ¶
statusKey := ctxkey.New[string]()
ctx := statusKey.Set(context.Background(), "pending")
pipeline := Sequence(
Do(func(ctx context.Context) context.Context {
fmt.Println("processing status")
return ctx
}),
Switch(
func(ctx context.Context) string {
return statusKey.MustValue(ctx)
},
map[string]NewStep{
"pending": Do(func(ctx context.Context) context.Context {
fmt.Println("handling pending status")
return ctx
}),
"complete": Do(func(ctx context.Context) context.Context {
fmt.Println("handling complete status")
return ctx
}),
"failed": Do(func(ctx context.Context) context.Context {
fmt.Println("handling failed status")
return ctx
}),
},
Do(func(ctx context.Context) context.Context {
fmt.Println("handling unknown status")
return ctx
}),
),
)
Run(ctx, pipeline)
Output: processing status handling pending status
func When ¶
When creates a conditional step with only a true branch. If the predicate returns true, the trueHandler is executed and the pipeline terminates. If the predicate returns false, execution continues to the next step.
This is equivalent to Decision(predicate, trueHandler, Noop) but more readable when there is no meaningful false branch:
state.When(
func(ctx context.Context) bool { return !resourceReady(ctx) },
queue.RequeueAfter(30 * time.Second),
)
type PanicError ¶
type PanicError struct {
Value any
}
PanicError wraps a recovered panic value as an error. It is set as the cause on the context passed to WithErrorHandler when Recover catches a panic, so callers can distinguish panics from normal cancellations and recover the original panic value via errors.As.
func (*PanicError) Error ¶
func (p *PanicError) Error() string
type ParallelStep ¶
type ParallelStep struct {
// contains filtered or unexported fields
}
ParallelStep implements parallel execution of steps.
type Step ¶
Step represents a step in a processing pipeline. Each step processes a context and returns the next step to execute, or nil to terminate.
func Continue ¶
Continue runs the next step if it exists and context is not cancelled. If context is cancelled:
- If an error handler is set via WithErrorHandler, calls the handler
- Otherwise, stops the pipeline (returns nil)
This is a helper to avoid the common pattern:
if ctx.Err() != nil {
return nil
}
if next != nil {
return next.Run(ctx)
}
return nil