Documentation
¶
Overview ¶
Package saga implements the Saga pattern for distributed operations with crash recovery. Each saga is a sequence of steps where each step has a corresponding undo operation. The framework guarantees that either all steps complete successfully or all completed steps are rolled back.
See RFD-35 for detailed design documentation.
Example (MakeSandwich) ¶
Example_makeSandwich demonstrates a successful sandwich-making saga.
package main
import (
"context"
"encoding/json"
"fmt"
"strings"
"miren.dev/runtime/pkg/saga"
)
// Kitchen tracks what happened during sandwich making.
type Kitchen struct {
log []string
}
func (k *Kitchen) record(action string) {
k.log = append(k.log, action)
}
// Pantry holds bread and dry goods.
type Pantry struct {
stock map[string]int
}
func NewPantry(stock map[string]int) *Pantry {
return &Pantry{stock: stock}
}
func (p *Pantry) Take(item string) error {
if p.stock[item] <= 0 {
return fmt.Errorf("out of %s", item)
}
p.stock[item]--
return nil
}
func (p *Pantry) Return(item string) {
p.stock[item]++
}
// Fridge holds proteins, condiments, and cold items.
type Fridge struct {
stock map[string]int
}
func NewFridge(stock map[string]int) *Fridge {
return &Fridge{stock: stock}
}
func (f *Fridge) Take(item string) error {
if f.stock[item] <= 0 {
return fmt.Errorf("out of %s", item)
}
f.stock[item]--
return nil
}
func (f *Fridge) Return(item string) {
f.stock[item]++
}
type GetBreadIn struct {
BreadType string
}
type GetBreadOut struct {
Bread string
}
func GetBread(ctx context.Context, in GetBreadIn) (GetBreadOut, error) {
kitchen := saga.Get[*Kitchen](ctx)
pantry := saga.Get[*Pantry](ctx)
if err := pantry.Take(in.BreadType); err != nil {
kitchen.record(fmt.Sprintf("Checked pantry - %v", err))
return GetBreadOut{}, err
}
kitchen.record(fmt.Sprintf("Got %s from pantry", in.BreadType))
return GetBreadOut{Bread: in.BreadType + " slice"}, nil
}
func UndoGetBread(ctx context.Context, in GetBreadIn, out GetBreadOut) error {
kitchen := saga.Get[*Kitchen](ctx)
pantry := saga.Get[*Pantry](ctx)
pantry.Return(in.BreadType)
kitchen.record(fmt.Sprintf("Returned %s to pantry", in.BreadType))
return nil
}
type AddCondimentIn struct {
Bread string
Condiment string
}
type AddCondimentOut struct {
PreparedBread string
}
func AddCondiment(ctx context.Context, in AddCondimentIn) (AddCondimentOut, error) {
kitchen := saga.Get[*Kitchen](ctx)
fridge := saga.Get[*Fridge](ctx)
if err := fridge.Take(in.Condiment); err != nil {
kitchen.record(fmt.Sprintf("Checked fridge - %v", err))
return AddCondimentOut{}, err
}
kitchen.record(fmt.Sprintf("Spread %s on %s", in.Condiment, in.Bread))
return AddCondimentOut{
PreparedBread: in.Bread + " with " + in.Condiment,
}, nil
}
func UndoAddCondiment(ctx context.Context, in AddCondimentIn, out AddCondimentOut) error {
kitchen := saga.Get[*Kitchen](ctx)
fridge := saga.Get[*Fridge](ctx)
fridge.Return(in.Condiment)
kitchen.record(fmt.Sprintf("Scraped %s back into jar", in.Condiment))
return nil
}
type AddProteinIn struct {
PreparedBread string
Protein string
}
type AddProteinOut struct {
Stack string
}
func AddProtein(ctx context.Context, in AddProteinIn) (AddProteinOut, error) {
kitchen := saga.Get[*Kitchen](ctx)
fridge := saga.Get[*Fridge](ctx)
if err := fridge.Take(in.Protein); err != nil {
kitchen.record(fmt.Sprintf("Checked fridge - %v", err))
return AddProteinOut{}, err
}
kitchen.record(fmt.Sprintf("Layered %s on %s", in.Protein, in.PreparedBread))
return AddProteinOut{
Stack: in.PreparedBread + " + " + in.Protein,
}, nil
}
func UndoAddProtein(ctx context.Context, in AddProteinIn, out AddProteinOut) error {
kitchen := saga.Get[*Kitchen](ctx)
fridge := saga.Get[*Fridge](ctx)
fridge.Return(in.Protein)
kitchen.record(fmt.Sprintf("Put %s back in fridge", in.Protein))
return nil
}
type AddToppingsIn struct {
Stack string
Toppings []string `saga:",optional"`
}
type AddToppingsOut struct {
OpenSandwich string
}
func AddToppings(ctx context.Context, in AddToppingsIn) (AddToppingsOut, error) {
kitchen := saga.Get[*Kitchen](ctx)
result := in.Stack
if len(in.Toppings) > 0 {
toppingList := strings.Join(in.Toppings, ", ")
kitchen.record(fmt.Sprintf("Added %s", toppingList))
result += " + " + toppingList
} else {
kitchen.record("No toppings requested")
}
return AddToppingsOut{OpenSandwich: result}, nil
}
func UndoAddToppings(ctx context.Context, in AddToppingsIn, out AddToppingsOut) error {
kitchen := saga.Get[*Kitchen](ctx)
if len(in.Toppings) > 0 {
kitchen.record(fmt.Sprintf("Removed %s", strings.Join(in.Toppings, ", ")))
}
return nil
}
type CloseSandwichIn struct {
OpenSandwich string
}
type CloseSandwichOut struct {
Sandwich string
}
func CloseSandwich(ctx context.Context, in CloseSandwichIn) (CloseSandwichOut, error) {
kitchen := saga.Get[*Kitchen](ctx)
kitchen.record("Closed sandwich with top slice")
return CloseSandwichOut{
Sandwich: "[" + in.OpenSandwich + "]",
}, nil
}
func UndoCloseSandwich(ctx context.Context, in CloseSandwichIn, out CloseSandwichOut) error {
kitchen := saga.Get[*Kitchen](ctx)
kitchen.record("Opened sandwich back up")
return nil
}
func main() {
kitchen := &Kitchen{}
pantry := NewPantry(map[string]int{
"sourdough": 2,
"wheat": 1,
"rye": 1,
})
fridge := NewFridge(map[string]int{
"mayo": 3,
"mustard": 2,
"ham": 4,
"turkey": 2,
"pastrami": 1,
})
registry := saga.NewRegistry()
saga.Define("make-sandwich").
Using(kitchen).
Using(pantry).
Using(fridge).
Action(GetBread).Undo(UndoGetBread).
Action(AddCondiment).Undo(UndoAddCondiment).
Action(AddProtein).Undo(UndoAddProtein).
Action(AddToppings).Undo(UndoAddToppings).
Action(CloseSandwich).Undo(UndoCloseSandwich).
RegisterTo(registry)
storage := saga.NewMemoryStorage()
executor := saga.NewExecutor(storage, saga.WithRegistry(registry))
ctx := context.Background()
err := executor.Start("make-sandwich").
Input("breadtype", "sourdough").
Input("condiment", "mayo").
Input("protein", "ham").
Input("toppings", []string{"lettuce", "tomato"}).
WithID("order-1").
Execute(ctx)
if err != nil {
fmt.Printf("Failed: %v\n", err)
return
}
// Get the final result
exec, _ := storage.Get(ctx, "order-1")
var result CloseSandwichOut
json.Unmarshal(exec.ExecutedActions["close-sandwich"].Output, &result)
fmt.Printf("Result: %s\n", result.Sandwich)
fmt.Println("\nKitchen log:")
for _, entry := range kitchen.log {
fmt.Println(" -", entry)
}
}
Output: Result: [sourdough slice with mayo + ham + lettuce, tomato] Kitchen log: - Got sourdough from pantry - Spread mayo on sourdough slice - Layered ham on sourdough slice with mayo - Added lettuce, tomato - Closed sandwich with top slice
Example (OutOfStock) ¶
Example_outOfStock demonstrates saga compensation when the fridge is empty.
package main
import (
"context"
"fmt"
"strings"
"miren.dev/runtime/pkg/saga"
)
// Kitchen tracks what happened during sandwich making.
type Kitchen struct {
log []string
}
func (k *Kitchen) record(action string) {
k.log = append(k.log, action)
}
// Pantry holds bread and dry goods.
type Pantry struct {
stock map[string]int
}
func NewPantry(stock map[string]int) *Pantry {
return &Pantry{stock: stock}
}
func (p *Pantry) Take(item string) error {
if p.stock[item] <= 0 {
return fmt.Errorf("out of %s", item)
}
p.stock[item]--
return nil
}
func (p *Pantry) Return(item string) {
p.stock[item]++
}
// Fridge holds proteins, condiments, and cold items.
type Fridge struct {
stock map[string]int
}
func NewFridge(stock map[string]int) *Fridge {
return &Fridge{stock: stock}
}
func (f *Fridge) Take(item string) error {
if f.stock[item] <= 0 {
return fmt.Errorf("out of %s", item)
}
f.stock[item]--
return nil
}
func (f *Fridge) Return(item string) {
f.stock[item]++
}
type GetBreadIn struct {
BreadType string
}
type GetBreadOut struct {
Bread string
}
func GetBread(ctx context.Context, in GetBreadIn) (GetBreadOut, error) {
kitchen := saga.Get[*Kitchen](ctx)
pantry := saga.Get[*Pantry](ctx)
if err := pantry.Take(in.BreadType); err != nil {
kitchen.record(fmt.Sprintf("Checked pantry - %v", err))
return GetBreadOut{}, err
}
kitchen.record(fmt.Sprintf("Got %s from pantry", in.BreadType))
return GetBreadOut{Bread: in.BreadType + " slice"}, nil
}
func UndoGetBread(ctx context.Context, in GetBreadIn, out GetBreadOut) error {
kitchen := saga.Get[*Kitchen](ctx)
pantry := saga.Get[*Pantry](ctx)
pantry.Return(in.BreadType)
kitchen.record(fmt.Sprintf("Returned %s to pantry", in.BreadType))
return nil
}
type AddCondimentIn struct {
Bread string
Condiment string
}
type AddCondimentOut struct {
PreparedBread string
}
func AddCondiment(ctx context.Context, in AddCondimentIn) (AddCondimentOut, error) {
kitchen := saga.Get[*Kitchen](ctx)
fridge := saga.Get[*Fridge](ctx)
if err := fridge.Take(in.Condiment); err != nil {
kitchen.record(fmt.Sprintf("Checked fridge - %v", err))
return AddCondimentOut{}, err
}
kitchen.record(fmt.Sprintf("Spread %s on %s", in.Condiment, in.Bread))
return AddCondimentOut{
PreparedBread: in.Bread + " with " + in.Condiment,
}, nil
}
func UndoAddCondiment(ctx context.Context, in AddCondimentIn, out AddCondimentOut) error {
kitchen := saga.Get[*Kitchen](ctx)
fridge := saga.Get[*Fridge](ctx)
fridge.Return(in.Condiment)
kitchen.record(fmt.Sprintf("Scraped %s back into jar", in.Condiment))
return nil
}
type AddProteinIn struct {
PreparedBread string
Protein string
}
type AddProteinOut struct {
Stack string
}
func AddProtein(ctx context.Context, in AddProteinIn) (AddProteinOut, error) {
kitchen := saga.Get[*Kitchen](ctx)
fridge := saga.Get[*Fridge](ctx)
if err := fridge.Take(in.Protein); err != nil {
kitchen.record(fmt.Sprintf("Checked fridge - %v", err))
return AddProteinOut{}, err
}
kitchen.record(fmt.Sprintf("Layered %s on %s", in.Protein, in.PreparedBread))
return AddProteinOut{
Stack: in.PreparedBread + " + " + in.Protein,
}, nil
}
func UndoAddProtein(ctx context.Context, in AddProteinIn, out AddProteinOut) error {
kitchen := saga.Get[*Kitchen](ctx)
fridge := saga.Get[*Fridge](ctx)
fridge.Return(in.Protein)
kitchen.record(fmt.Sprintf("Put %s back in fridge", in.Protein))
return nil
}
type AddToppingsIn struct {
Stack string
Toppings []string `saga:",optional"`
}
type AddToppingsOut struct {
OpenSandwich string
}
func AddToppings(ctx context.Context, in AddToppingsIn) (AddToppingsOut, error) {
kitchen := saga.Get[*Kitchen](ctx)
result := in.Stack
if len(in.Toppings) > 0 {
toppingList := strings.Join(in.Toppings, ", ")
kitchen.record(fmt.Sprintf("Added %s", toppingList))
result += " + " + toppingList
} else {
kitchen.record("No toppings requested")
}
return AddToppingsOut{OpenSandwich: result}, nil
}
func UndoAddToppings(ctx context.Context, in AddToppingsIn, out AddToppingsOut) error {
kitchen := saga.Get[*Kitchen](ctx)
if len(in.Toppings) > 0 {
kitchen.record(fmt.Sprintf("Removed %s", strings.Join(in.Toppings, ", ")))
}
return nil
}
type CloseSandwichIn struct {
OpenSandwich string
}
type CloseSandwichOut struct {
Sandwich string
}
func CloseSandwich(ctx context.Context, in CloseSandwichIn) (CloseSandwichOut, error) {
kitchen := saga.Get[*Kitchen](ctx)
kitchen.record("Closed sandwich with top slice")
return CloseSandwichOut{
Sandwich: "[" + in.OpenSandwich + "]",
}, nil
}
func UndoCloseSandwich(ctx context.Context, in CloseSandwichIn, out CloseSandwichOut) error {
kitchen := saga.Get[*Kitchen](ctx)
kitchen.record("Opened sandwich back up")
return nil
}
func main() {
kitchen := &Kitchen{}
pantry := NewPantry(map[string]int{
"wheat": 1,
})
fridge := NewFridge(map[string]int{
"mustard": 1,
"turkey": 0, // Out of turkey!
})
registry := saga.NewRegistry()
saga.Define("make-sandwich-fail").
Using(kitchen).
Using(pantry).
Using(fridge).
Action(GetBread).Undo(UndoGetBread).
Action(AddCondiment).Undo(UndoAddCondiment).
Action(AddProtein).Undo(UndoAddProtein).
Action(AddToppings).Undo(UndoAddToppings).
Action(CloseSandwich).Undo(UndoCloseSandwich).
RegisterTo(registry)
storage := saga.NewMemoryStorage()
executor := saga.NewExecutor(storage, saga.WithRegistry(registry))
ctx := context.Background()
err := executor.Start("make-sandwich-fail").
Input("breadtype", "wheat").
Input("condiment", "mustard").
Input("protein", "turkey").
Input("toppings", []string{"pickles"}).
Execute(ctx)
if err != nil {
fmt.Println("Sandwich failed - Loss Prevented")
}
fmt.Println("\nKitchen log:")
for _, entry := range kitchen.log {
fmt.Println(" -", entry)
}
fmt.Printf("\nInventory restored: wheat=%d, mustard=%d\n",
pantry.stock["wheat"], fridge.stock["mustard"])
}
Output: Sandwich failed - Loss Prevented Kitchen log: - Got wheat from pantry - Spread mustard on wheat slice - Checked fridge - out of turkey - Scraped mustard back into jar - Returned wheat to pantry Inventory restored: wheat=1, mustard=1
Example (SimpleSandwich) ¶
Example_simpleSandwich demonstrates optional toppings.
package main
import (
"context"
"encoding/json"
"fmt"
"strings"
"miren.dev/runtime/pkg/saga"
)
// Kitchen tracks what happened during sandwich making.
type Kitchen struct {
log []string
}
func (k *Kitchen) record(action string) {
k.log = append(k.log, action)
}
// Pantry holds bread and dry goods.
type Pantry struct {
stock map[string]int
}
func NewPantry(stock map[string]int) *Pantry {
return &Pantry{stock: stock}
}
func (p *Pantry) Take(item string) error {
if p.stock[item] <= 0 {
return fmt.Errorf("out of %s", item)
}
p.stock[item]--
return nil
}
func (p *Pantry) Return(item string) {
p.stock[item]++
}
// Fridge holds proteins, condiments, and cold items.
type Fridge struct {
stock map[string]int
}
func NewFridge(stock map[string]int) *Fridge {
return &Fridge{stock: stock}
}
func (f *Fridge) Take(item string) error {
if f.stock[item] <= 0 {
return fmt.Errorf("out of %s", item)
}
f.stock[item]--
return nil
}
func (f *Fridge) Return(item string) {
f.stock[item]++
}
type GetBreadIn struct {
BreadType string
}
type GetBreadOut struct {
Bread string
}
func GetBread(ctx context.Context, in GetBreadIn) (GetBreadOut, error) {
kitchen := saga.Get[*Kitchen](ctx)
pantry := saga.Get[*Pantry](ctx)
if err := pantry.Take(in.BreadType); err != nil {
kitchen.record(fmt.Sprintf("Checked pantry - %v", err))
return GetBreadOut{}, err
}
kitchen.record(fmt.Sprintf("Got %s from pantry", in.BreadType))
return GetBreadOut{Bread: in.BreadType + " slice"}, nil
}
func UndoGetBread(ctx context.Context, in GetBreadIn, out GetBreadOut) error {
kitchen := saga.Get[*Kitchen](ctx)
pantry := saga.Get[*Pantry](ctx)
pantry.Return(in.BreadType)
kitchen.record(fmt.Sprintf("Returned %s to pantry", in.BreadType))
return nil
}
type AddCondimentIn struct {
Bread string
Condiment string
}
type AddCondimentOut struct {
PreparedBread string
}
func AddCondiment(ctx context.Context, in AddCondimentIn) (AddCondimentOut, error) {
kitchen := saga.Get[*Kitchen](ctx)
fridge := saga.Get[*Fridge](ctx)
if err := fridge.Take(in.Condiment); err != nil {
kitchen.record(fmt.Sprintf("Checked fridge - %v", err))
return AddCondimentOut{}, err
}
kitchen.record(fmt.Sprintf("Spread %s on %s", in.Condiment, in.Bread))
return AddCondimentOut{
PreparedBread: in.Bread + " with " + in.Condiment,
}, nil
}
func UndoAddCondiment(ctx context.Context, in AddCondimentIn, out AddCondimentOut) error {
kitchen := saga.Get[*Kitchen](ctx)
fridge := saga.Get[*Fridge](ctx)
fridge.Return(in.Condiment)
kitchen.record(fmt.Sprintf("Scraped %s back into jar", in.Condiment))
return nil
}
type AddProteinIn struct {
PreparedBread string
Protein string
}
type AddProteinOut struct {
Stack string
}
func AddProtein(ctx context.Context, in AddProteinIn) (AddProteinOut, error) {
kitchen := saga.Get[*Kitchen](ctx)
fridge := saga.Get[*Fridge](ctx)
if err := fridge.Take(in.Protein); err != nil {
kitchen.record(fmt.Sprintf("Checked fridge - %v", err))
return AddProteinOut{}, err
}
kitchen.record(fmt.Sprintf("Layered %s on %s", in.Protein, in.PreparedBread))
return AddProteinOut{
Stack: in.PreparedBread + " + " + in.Protein,
}, nil
}
func UndoAddProtein(ctx context.Context, in AddProteinIn, out AddProteinOut) error {
kitchen := saga.Get[*Kitchen](ctx)
fridge := saga.Get[*Fridge](ctx)
fridge.Return(in.Protein)
kitchen.record(fmt.Sprintf("Put %s back in fridge", in.Protein))
return nil
}
type AddToppingsIn struct {
Stack string
Toppings []string `saga:",optional"`
}
type AddToppingsOut struct {
OpenSandwich string
}
func AddToppings(ctx context.Context, in AddToppingsIn) (AddToppingsOut, error) {
kitchen := saga.Get[*Kitchen](ctx)
result := in.Stack
if len(in.Toppings) > 0 {
toppingList := strings.Join(in.Toppings, ", ")
kitchen.record(fmt.Sprintf("Added %s", toppingList))
result += " + " + toppingList
} else {
kitchen.record("No toppings requested")
}
return AddToppingsOut{OpenSandwich: result}, nil
}
func UndoAddToppings(ctx context.Context, in AddToppingsIn, out AddToppingsOut) error {
kitchen := saga.Get[*Kitchen](ctx)
if len(in.Toppings) > 0 {
kitchen.record(fmt.Sprintf("Removed %s", strings.Join(in.Toppings, ", ")))
}
return nil
}
type CloseSandwichIn struct {
OpenSandwich string
}
type CloseSandwichOut struct {
Sandwich string
}
func CloseSandwich(ctx context.Context, in CloseSandwichIn) (CloseSandwichOut, error) {
kitchen := saga.Get[*Kitchen](ctx)
kitchen.record("Closed sandwich with top slice")
return CloseSandwichOut{
Sandwich: "[" + in.OpenSandwich + "]",
}, nil
}
func UndoCloseSandwich(ctx context.Context, in CloseSandwichIn, out CloseSandwichOut) error {
kitchen := saga.Get[*Kitchen](ctx)
kitchen.record("Opened sandwich back up")
return nil
}
func main() {
kitchen := &Kitchen{}
pantry := NewPantry(map[string]int{"rye": 1})
fridge := NewFridge(map[string]int{"butter": 1, "pastrami": 1})
registry := saga.NewRegistry()
saga.Define("simple-sandwich").
Using(kitchen).
Using(pantry).
Using(fridge).
Action(GetBread).Undo(UndoGetBread).
Action(AddCondiment).Undo(UndoAddCondiment).
Action(AddProtein).Undo(UndoAddProtein).
Action(AddToppings).Undo(UndoAddToppings).
Action(CloseSandwich).Undo(UndoCloseSandwich).
RegisterTo(registry)
storage := saga.NewMemoryStorage()
executor := saga.NewExecutor(storage, saga.WithRegistry(registry))
ctx := context.Background()
err := executor.Start("simple-sandwich").
Input("breadtype", "rye").
Input("condiment", "butter").
Input("protein", "pastrami").
// Note: no toppings - that's ok, it's optional!
WithID("order-3").
Execute(ctx)
if err != nil {
fmt.Printf("Failed: %v\n", err)
return
}
// Get the final result
exec, _ := storage.Get(ctx, "order-3")
var result CloseSandwichOut
json.Unmarshal(exec.ExecutedActions["close-sandwich"].Output, &result)
fmt.Printf("Result: %s\n", result.Sandwich)
fmt.Println("\nKitchen log:")
for _, entry := range kitchen.log {
fmt.Println(" -", entry)
}
}
Output: Result: [rye slice with butter + pastrami] Kitchen log: - Got rye from pantry - Spread butter on rye slice - Layered pastrami on rye slice with butter - No toppings requested - Closed sandwich with top slice
Index ¶
- Variables
- func Get[T any](ctx context.Context) T
- func TryGet[T any](ctx context.Context) (T, bool)
- func UndoNested(ctx context.Context, executionID string) error
- type Action
- type ActionBuilder
- type ActionInputs
- type ActionNode
- type ActionResult
- type Builder
- type Definition
- type EntityStorage
- type Execution
- type Executor
- type ExecutorOption
- type MemoryStorage
- type NestedOption
- type NestedResult
- type Registry
- type StartBuilder
- type Status
- type Storage
Examples ¶
Constants ¶
This section is empty.
Variables ¶
var ErrExecutionNotFound = errors.New("execution not found")
ErrExecutionNotFound is returned by Storage.Get when no execution exists for the given ID.
Functions ¶
func Get ¶
Get retrieves a dependency of type T from the context. Panics if the dependency is not found.
Types ¶
type Action ¶
type Action interface {
// Execute performs the action and returns an output that can be used
// by subsequent actions. The output must be JSON-serializable.
Execute(ctx context.Context, inputs ActionInputs) (output any, err error)
// Undo reverses the action. It receives the same inputs and the output
// that was produced by Execute. Undo should be idempotent.
Undo(ctx context.Context, inputs ActionInputs, output any) error
}
Action represents a single step in a saga. Actions are stateless and created by factories that are registered at application startup. All runtime data flows through ActionInputs.
type ActionBuilder ¶
type ActionBuilder struct {
// contains filtered or unexported fields
}
ActionBuilder provides a fluent API for defining a single action.
func (*ActionBuilder) Undo ¶
func (ab *ActionBuilder) Undo(undo any) *Builder
Undo sets the undo function for the action. The function signature must be: func(ctx context.Context, in InType, out OutType) error
type ActionInputs ¶
type ActionInputs interface {
// Get retrieves an input by key, deserializing it into target.
// Returns an error if the key doesn't exist or deserialization fails.
Get(key string, target any) error
// Has checks if an input exists (for optional inputs).
Has(key string) bool
// Keys returns all available input keys.
Keys() []string
}
ActionInputs provides access to initial saga inputs and outputs from prior actions. All outputs live in a flat namespace.
type ActionNode ¶
type ActionNode struct {
// Name is the unique name of this action within the saga.
Name string
// Action is the stateless action implementation.
Action Action
// InputKeys are the saga keys this action reads from.
InputKeys []string
// OutputKeys are the saga keys this action writes to.
OutputKeys []string
// Dependencies are action names that must complete before this action.
// Computed from InputKeys and other actions' OutputKeys.
Dependencies []string
}
ActionNode describes a single action within a saga definition.
type ActionResult ¶
type ActionResult struct {
// Output is the JSON-serialized output from the action.
Output []byte `json:"output,omitempty"`
// ExecutedAt is when the action was executed.
ExecutedAt time.Time `json:"executed_at"`
// UndoneAt is when the action was undone (nil if not undone).
UndoneAt *time.Time `json:"undone_at,omitempty"`
// Error is set if the action failed during execution.
Error string `json:"error,omitempty"`
}
ActionResult stores the outcome of a single action execution.
type Builder ¶
type Builder struct {
// contains filtered or unexported fields
}
Builder provides a fluent API for defining sagas.
func UsingAs ¶
UsingAs adds a dependency keyed by type T, allowing retrieval via Get[T](ctx). This is useful for injecting implementations that should be retrieved by interface type. For example: UsingAs[MyInterface](b, impl) allows Get[MyInterface](ctx).
func (*Builder) Action ¶
func (b *Builder) Action(args ...any) *ActionBuilder
Action adds an action to the saga using a typed execute function. The function signature must be: func(ctx context.Context, in InType) (OutType, error)
Can be called two ways:
- Action(GetBread) - name derived from function name ("getbread")
- Action("custom-name", GetBread) - explicit name
func (*Builder) Build ¶
func (b *Builder) Build() (*Definition, error)
Build constructs and validates the Definition without registering it. Useful for testing.
func (*Builder) Register ¶
Register validates and registers the saga definition with the global registry. Returns an error if validation fails (cycles, duplicate outputs, type mismatches).
func (*Builder) RegisterTo ¶
RegisterTo validates and registers the saga definition with the given registry. Useful for testing to avoid global state.
type Definition ¶
type Definition struct {
// Name uniquely identifies this saga definition.
Name string
// Version is incremented for breaking changes. Defaults to 1.
Version int
// Actions in this saga, keyed by action name.
Actions map[string]*ActionNode
// contains filtered or unexported fields
}
Definition describes a saga's structure - its actions and their dependencies. Definitions are stateless and registered at application startup.
func GetDefinition ¶
func GetDefinition(name string) (*Definition, bool)
GetDefinition retrieves a saga definition from the global registry.
func (*Definition) ExecutionOrder ¶ added in v0.4.0
func (d *Definition) ExecutionOrder() []string
ExecutionOrder returns the computed execution order for the saga's actions.
type EntityStorage ¶
type EntityStorage struct {
// contains filtered or unexported fields
}
EntityStorage implements Storage using the entity store.
func NewEntityStorage ¶
func NewEntityStorage(store entity.Store, log *slog.Logger) *EntityStorage
NewEntityStorage creates a storage backed by an entity store.
func (*EntityStorage) ListIncomplete ¶
func (s *EntityStorage) ListIncomplete(ctx context.Context) ([]*Execution, error)
ListIncomplete returns all executions that need recovery.
type Execution ¶
type Execution struct {
// ID is the unique identifier for this execution.
ID string `json:"id"`
// DefinitionName references the registered saga definition.
DefinitionName string `json:"definition_name"`
// DefinitionVersion is the version of the definition when started.
DefinitionVersion int `json:"definition_version"`
// InitialInputs contains the bootstrap data for the saga.
// All values must be JSON-serializable.
InitialInputs map[string]any `json:"initial_inputs"`
// Status is the current state of the execution.
Status Status `json:"status"`
// ExecutedActions maps action names to their results.
ExecutedActions map[string]*ActionResult `json:"executed_actions"`
// ExecutionOrder records the order actions were executed for reverse undo.
ExecutionOrder []string `json:"execution_order"`
// ParentExecutionID links this execution to a parent saga when run as a nested child.
ParentExecutionID string `json:"parent_execution_id,omitempty"`
// Error is set if the saga failed.
Error string `json:"error,omitempty"`
// CreatedAt is when the execution was created.
CreatedAt time.Time `json:"created_at"`
// UpdatedAt is when the execution was last updated.
UpdatedAt time.Time `json:"updated_at"`
}
Execution tracks the runtime state of a saga, persisted after each step.
type Executor ¶
type Executor struct {
// contains filtered or unexported fields
}
Executor orchestrates saga execution with durable logging.
func NewExecutor ¶
func NewExecutor(storage Storage, opts ...ExecutorOption) *Executor
NewExecutor creates an executor with the given storage and options.
func (*Executor) ExecutionOutputs ¶ added in v0.4.0
ExecutionOutputs loads a completed execution from storage and collects its outputs into a NestedResult. Useful for reading saga results without a capture struct.
func (*Executor) Start ¶
func (e *Executor) Start(definitionName string) *StartBuilder
Start begins building a saga execution.
type ExecutorOption ¶
type ExecutorOption func(*Executor)
ExecutorOption configures an Executor.
func WithLogger ¶
func WithLogger(log *slog.Logger) ExecutorOption
WithLogger sets a custom logger for the executor.
func WithRegistry ¶
func WithRegistry(r *Registry) ExecutorOption
WithRegistry sets a custom registry for the executor. Useful for testing to avoid global state.
type MemoryStorage ¶
type MemoryStorage struct {
// contains filtered or unexported fields
}
MemoryStorage is a simple in-memory storage implementation for testing and examples.
func NewMemoryStorage ¶
func NewMemoryStorage() *MemoryStorage
NewMemoryStorage creates a new in-memory storage.
func (*MemoryStorage) ListIncomplete ¶
func (m *MemoryStorage) ListIncomplete(ctx context.Context) ([]*Execution, error)
ListIncomplete returns all executions that need recovery. This includes pending (crashed before starting), running, and undoing sagas.
type NestedOption ¶ added in v0.4.0
type NestedOption func(*nestedConfig)
NestedOption configures a RunNested call.
func WithNestedID ¶ added in v0.4.0
func WithNestedID(id string) NestedOption
WithNestedID sets a specific execution ID for the child saga.
func WithNestedInput ¶ added in v0.4.0
func WithNestedInput(key string, value any) NestedOption
WithNestedInput adds an initial input to the child saga.
type NestedResult ¶ added in v0.4.0
type NestedResult struct {
ExecutionID string
// contains filtered or unexported fields
}
NestedResult wraps the outputs from a completed child saga execution.
func RunNested ¶ added in v0.4.0
func RunNested(ctx context.Context, sagaName string, opts ...NestedOption) (*NestedResult, error)
RunNested executes a child saga from within a parent saga action. It reuses the parent executor's registry and storage for durability and observability. The child execution's ParentExecutionID is set to the current execution.
func (*NestedResult) Get ¶ added in v0.4.0
func (nr *NestedResult) Get(key string, target any) error
Get deserializes a named output from the child saga into target.
func (*NestedResult) Has ¶ added in v0.4.0
func (nr *NestedResult) Has(key string) bool
Has returns true if the child saga produced an output with the given key.
type Registry ¶
type Registry struct {
// contains filtered or unexported fields
}
Registry holds registered saga definitions.
func NewRegistry ¶
func NewRegistry() *Registry
NewRegistry creates a new empty registry. Useful for testing to avoid global state.
func (*Registry) Get ¶
func (r *Registry) Get(name string) (*Definition, bool)
Get retrieves a definition by name.
func (*Registry) Register ¶
func (r *Registry) Register(def *Definition) error
Register adds a definition to the registry.
type StartBuilder ¶
type StartBuilder struct {
// contains filtered or unexported fields
}
StartBuilder provides a fluent API for starting saga executions.
func (*StartBuilder) Execute ¶
func (sb *StartBuilder) Execute(ctx context.Context) error
Execute runs the saga to completion or failure.
func (*StartBuilder) Input ¶
func (sb *StartBuilder) Input(key string, value any) *StartBuilder
Input adds an initial input value to the saga execution.
func (*StartBuilder) WithID ¶
func (sb *StartBuilder) WithID(id string) *StartBuilder
WithID sets a specific execution ID (otherwise one is generated).
type Status ¶
type Status string
Status represents the current state of a saga execution.
const ( // StatusPending indicates the saga has been created but not started. StatusPending Status = "pending" // StatusRunning indicates the saga is actively executing actions. StatusRunning Status = "running" // StatusUndoing indicates the saga is rolling back due to a failure. StatusUndoing Status = "undoing" // StatusCompleted indicates all actions completed successfully. StatusCompleted Status = "completed" // StatusFailed indicates the saga failed and all undos have been attempted. StatusFailed Status = "failed" )
type Storage ¶
type Storage interface {
// Save persists the execution state.
Save(ctx context.Context, exec *Execution) error
// Get retrieves an execution by ID.
Get(ctx context.Context, id string) (*Execution, error)
// ListIncomplete returns all executions that need recovery (Pending, Running, or Undoing).
ListIncomplete(ctx context.Context) ([]*Execution, error)
}
Storage persists saga execution state.