Documentation
¶
Overview ¶
Package saga implements the Saga pattern for distributed operations with crash recovery. Each saga is a sequence of steps where each step has a corresponding undo operation. The framework guarantees that either all steps complete successfully or all completed steps are rolled back.
See RFD-35 for detailed design documentation.
Example (MakeSandwich) ¶
Example_makeSandwich demonstrates a successful sandwich-making saga.
package main
import (
"context"
"encoding/json"
"fmt"
"strings"
"miren.dev/runtime/pkg/saga"
)
// Kitchen tracks what happened during sandwich making.
type Kitchen struct {
log []string
}
func (k *Kitchen) record(action string) {
k.log = append(k.log, action)
}
// Pantry holds bread and dry goods.
type Pantry struct {
stock map[string]int
}
func NewPantry(stock map[string]int) *Pantry {
return &Pantry{stock: stock}
}
func (p *Pantry) Take(item string) error {
if p.stock[item] <= 0 {
return fmt.Errorf("out of %s", item)
}
p.stock[item]--
return nil
}
func (p *Pantry) Return(item string) {
p.stock[item]++
}
// Fridge holds proteins, condiments, and cold items.
type Fridge struct {
stock map[string]int
}
func NewFridge(stock map[string]int) *Fridge {
return &Fridge{stock: stock}
}
func (f *Fridge) Take(item string) error {
if f.stock[item] <= 0 {
return fmt.Errorf("out of %s", item)
}
f.stock[item]--
return nil
}
func (f *Fridge) Return(item string) {
f.stock[item]++
}
type GetBreadIn struct {
BreadType string
}
type GetBreadOut struct {
Bread string
}
func GetBread(ctx context.Context, in GetBreadIn) (GetBreadOut, error) {
kitchen := saga.Get[*Kitchen](ctx)
pantry := saga.Get[*Pantry](ctx)
if err := pantry.Take(in.BreadType); err != nil {
kitchen.record(fmt.Sprintf("Checked pantry - %v", err))
return GetBreadOut{}, err
}
kitchen.record(fmt.Sprintf("Got %s from pantry", in.BreadType))
return GetBreadOut{Bread: in.BreadType + " slice"}, nil
}
func UndoGetBread(ctx context.Context, in GetBreadIn, out GetBreadOut) error {
kitchen := saga.Get[*Kitchen](ctx)
pantry := saga.Get[*Pantry](ctx)
pantry.Return(in.BreadType)
kitchen.record(fmt.Sprintf("Returned %s to pantry", in.BreadType))
return nil
}
type AddCondimentIn struct {
Bread string
Condiment string
}
type AddCondimentOut struct {
PreparedBread string
}
func AddCondiment(ctx context.Context, in AddCondimentIn) (AddCondimentOut, error) {
kitchen := saga.Get[*Kitchen](ctx)
fridge := saga.Get[*Fridge](ctx)
if err := fridge.Take(in.Condiment); err != nil {
kitchen.record(fmt.Sprintf("Checked fridge - %v", err))
return AddCondimentOut{}, err
}
kitchen.record(fmt.Sprintf("Spread %s on %s", in.Condiment, in.Bread))
return AddCondimentOut{
PreparedBread: in.Bread + " with " + in.Condiment,
}, nil
}
func UndoAddCondiment(ctx context.Context, in AddCondimentIn, out AddCondimentOut) error {
kitchen := saga.Get[*Kitchen](ctx)
fridge := saga.Get[*Fridge](ctx)
fridge.Return(in.Condiment)
kitchen.record(fmt.Sprintf("Scraped %s back into jar", in.Condiment))
return nil
}
type AddProteinIn struct {
PreparedBread string
Protein string
}
type AddProteinOut struct {
Stack string
}
func AddProtein(ctx context.Context, in AddProteinIn) (AddProteinOut, error) {
kitchen := saga.Get[*Kitchen](ctx)
fridge := saga.Get[*Fridge](ctx)
if err := fridge.Take(in.Protein); err != nil {
kitchen.record(fmt.Sprintf("Checked fridge - %v", err))
return AddProteinOut{}, err
}
kitchen.record(fmt.Sprintf("Layered %s on %s", in.Protein, in.PreparedBread))
return AddProteinOut{
Stack: in.PreparedBread + " + " + in.Protein,
}, nil
}
func UndoAddProtein(ctx context.Context, in AddProteinIn, out AddProteinOut) error {
kitchen := saga.Get[*Kitchen](ctx)
fridge := saga.Get[*Fridge](ctx)
fridge.Return(in.Protein)
kitchen.record(fmt.Sprintf("Put %s back in fridge", in.Protein))
return nil
}
type AddToppingsIn struct {
Stack string
Toppings []string `saga:",optional"`
}
type AddToppingsOut struct {
OpenSandwich string
}
func AddToppings(ctx context.Context, in AddToppingsIn) (AddToppingsOut, error) {
kitchen := saga.Get[*Kitchen](ctx)
result := in.Stack
if len(in.Toppings) > 0 {
toppingList := strings.Join(in.Toppings, ", ")
kitchen.record(fmt.Sprintf("Added %s", toppingList))
result += " + " + toppingList
} else {
kitchen.record("No toppings requested")
}
return AddToppingsOut{OpenSandwich: result}, nil
}
func UndoAddToppings(ctx context.Context, in AddToppingsIn, out AddToppingsOut) error {
kitchen := saga.Get[*Kitchen](ctx)
if len(in.Toppings) > 0 {
kitchen.record(fmt.Sprintf("Removed %s", strings.Join(in.Toppings, ", ")))
}
return nil
}
type CloseSandwichIn struct {
OpenSandwich string
}
type CloseSandwichOut struct {
Sandwich string
}
func CloseSandwich(ctx context.Context, in CloseSandwichIn) (CloseSandwichOut, error) {
kitchen := saga.Get[*Kitchen](ctx)
kitchen.record("Closed sandwich with top slice")
return CloseSandwichOut{
Sandwich: "[" + in.OpenSandwich + "]",
}, nil
}
func UndoCloseSandwich(ctx context.Context, in CloseSandwichIn, out CloseSandwichOut) error {
kitchen := saga.Get[*Kitchen](ctx)
kitchen.record("Opened sandwich back up")
return nil
}
func main() {
kitchen := &Kitchen{}
pantry := NewPantry(map[string]int{
"sourdough": 2,
"wheat": 1,
"rye": 1,
})
fridge := NewFridge(map[string]int{
"mayo": 3,
"mustard": 2,
"ham": 4,
"turkey": 2,
"pastrami": 1,
})
registry := saga.NewRegistry()
saga.Define("make-sandwich").
Using(kitchen).
Using(pantry).
Using(fridge).
Action(GetBread).Undo(UndoGetBread).
Action(AddCondiment).Undo(UndoAddCondiment).
Action(AddProtein).Undo(UndoAddProtein).
Action(AddToppings).Undo(UndoAddToppings).
Action(CloseSandwich).Undo(UndoCloseSandwich).
RegisterTo(registry)
storage := saga.NewMemoryStorage()
executor := saga.NewExecutor(storage, saga.WithRegistry(registry))
ctx := context.Background()
err := executor.Start("make-sandwich").
Input("breadtype", "sourdough").
Input("condiment", "mayo").
Input("protein", "ham").
Input("toppings", []string{"lettuce", "tomato"}).
WithID("order-1").
Execute(ctx)
if err != nil {
fmt.Printf("Failed: %v\n", err)
return
}
// Get the final result
exec, _ := storage.Get(ctx, "order-1")
var result CloseSandwichOut
json.Unmarshal(exec.ExecutedActions["close-sandwich"].Output, &result)
fmt.Printf("Result: %s\n", result.Sandwich)
fmt.Println("\nKitchen log:")
for _, entry := range kitchen.log {
fmt.Println(" -", entry)
}
}
Output: Result: [sourdough slice with mayo + ham + lettuce, tomato] Kitchen log: - Got sourdough from pantry - Spread mayo on sourdough slice - Layered ham on sourdough slice with mayo - Added lettuce, tomato - Closed sandwich with top slice
Example (OutOfStock) ¶
Example_outOfStock demonstrates saga compensation when the fridge is empty.
package main
import (
"context"
"fmt"
"strings"
"miren.dev/runtime/pkg/saga"
)
// Kitchen tracks what happened during sandwich making.
type Kitchen struct {
log []string
}
func (k *Kitchen) record(action string) {
k.log = append(k.log, action)
}
// Pantry holds bread and dry goods.
type Pantry struct {
stock map[string]int
}
func NewPantry(stock map[string]int) *Pantry {
return &Pantry{stock: stock}
}
func (p *Pantry) Take(item string) error {
if p.stock[item] <= 0 {
return fmt.Errorf("out of %s", item)
}
p.stock[item]--
return nil
}
func (p *Pantry) Return(item string) {
p.stock[item]++
}
// Fridge holds proteins, condiments, and cold items.
type Fridge struct {
stock map[string]int
}
func NewFridge(stock map[string]int) *Fridge {
return &Fridge{stock: stock}
}
func (f *Fridge) Take(item string) error {
if f.stock[item] <= 0 {
return fmt.Errorf("out of %s", item)
}
f.stock[item]--
return nil
}
func (f *Fridge) Return(item string) {
f.stock[item]++
}
type GetBreadIn struct {
BreadType string
}
type GetBreadOut struct {
Bread string
}
func GetBread(ctx context.Context, in GetBreadIn) (GetBreadOut, error) {
kitchen := saga.Get[*Kitchen](ctx)
pantry := saga.Get[*Pantry](ctx)
if err := pantry.Take(in.BreadType); err != nil {
kitchen.record(fmt.Sprintf("Checked pantry - %v", err))
return GetBreadOut{}, err
}
kitchen.record(fmt.Sprintf("Got %s from pantry", in.BreadType))
return GetBreadOut{Bread: in.BreadType + " slice"}, nil
}
func UndoGetBread(ctx context.Context, in GetBreadIn, out GetBreadOut) error {
kitchen := saga.Get[*Kitchen](ctx)
pantry := saga.Get[*Pantry](ctx)
pantry.Return(in.BreadType)
kitchen.record(fmt.Sprintf("Returned %s to pantry", in.BreadType))
return nil
}
type AddCondimentIn struct {
Bread string
Condiment string
}
type AddCondimentOut struct {
PreparedBread string
}
func AddCondiment(ctx context.Context, in AddCondimentIn) (AddCondimentOut, error) {
kitchen := saga.Get[*Kitchen](ctx)
fridge := saga.Get[*Fridge](ctx)
if err := fridge.Take(in.Condiment); err != nil {
kitchen.record(fmt.Sprintf("Checked fridge - %v", err))
return AddCondimentOut{}, err
}
kitchen.record(fmt.Sprintf("Spread %s on %s", in.Condiment, in.Bread))
return AddCondimentOut{
PreparedBread: in.Bread + " with " + in.Condiment,
}, nil
}
func UndoAddCondiment(ctx context.Context, in AddCondimentIn, out AddCondimentOut) error {
kitchen := saga.Get[*Kitchen](ctx)
fridge := saga.Get[*Fridge](ctx)
fridge.Return(in.Condiment)
kitchen.record(fmt.Sprintf("Scraped %s back into jar", in.Condiment))
return nil
}
type AddProteinIn struct {
PreparedBread string
Protein string
}
type AddProteinOut struct {
Stack string
}
func AddProtein(ctx context.Context, in AddProteinIn) (AddProteinOut, error) {
kitchen := saga.Get[*Kitchen](ctx)
fridge := saga.Get[*Fridge](ctx)
if err := fridge.Take(in.Protein); err != nil {
kitchen.record(fmt.Sprintf("Checked fridge - %v", err))
return AddProteinOut{}, err
}
kitchen.record(fmt.Sprintf("Layered %s on %s", in.Protein, in.PreparedBread))
return AddProteinOut{
Stack: in.PreparedBread + " + " + in.Protein,
}, nil
}
func UndoAddProtein(ctx context.Context, in AddProteinIn, out AddProteinOut) error {
kitchen := saga.Get[*Kitchen](ctx)
fridge := saga.Get[*Fridge](ctx)
fridge.Return(in.Protein)
kitchen.record(fmt.Sprintf("Put %s back in fridge", in.Protein))
return nil
}
type AddToppingsIn struct {
Stack string
Toppings []string `saga:",optional"`
}
type AddToppingsOut struct {
OpenSandwich string
}
func AddToppings(ctx context.Context, in AddToppingsIn) (AddToppingsOut, error) {
kitchen := saga.Get[*Kitchen](ctx)
result := in.Stack
if len(in.Toppings) > 0 {
toppingList := strings.Join(in.Toppings, ", ")
kitchen.record(fmt.Sprintf("Added %s", toppingList))
result += " + " + toppingList
} else {
kitchen.record("No toppings requested")
}
return AddToppingsOut{OpenSandwich: result}, nil
}
func UndoAddToppings(ctx context.Context, in AddToppingsIn, out AddToppingsOut) error {
kitchen := saga.Get[*Kitchen](ctx)
if len(in.Toppings) > 0 {
kitchen.record(fmt.Sprintf("Removed %s", strings.Join(in.Toppings, ", ")))
}
return nil
}
type CloseSandwichIn struct {
OpenSandwich string
}
type CloseSandwichOut struct {
Sandwich string
}
func CloseSandwich(ctx context.Context, in CloseSandwichIn) (CloseSandwichOut, error) {
kitchen := saga.Get[*Kitchen](ctx)
kitchen.record("Closed sandwich with top slice")
return CloseSandwichOut{
Sandwich: "[" + in.OpenSandwich + "]",
}, nil
}
func UndoCloseSandwich(ctx context.Context, in CloseSandwichIn, out CloseSandwichOut) error {
kitchen := saga.Get[*Kitchen](ctx)
kitchen.record("Opened sandwich back up")
return nil
}
func main() {
kitchen := &Kitchen{}
pantry := NewPantry(map[string]int{
"wheat": 1,
})
fridge := NewFridge(map[string]int{
"mustard": 1,
"turkey": 0, // Out of turkey!
})
registry := saga.NewRegistry()
saga.Define("make-sandwich-fail").
Using(kitchen).
Using(pantry).
Using(fridge).
Action(GetBread).Undo(UndoGetBread).
Action(AddCondiment).Undo(UndoAddCondiment).
Action(AddProtein).Undo(UndoAddProtein).
Action(AddToppings).Undo(UndoAddToppings).
Action(CloseSandwich).Undo(UndoCloseSandwich).
RegisterTo(registry)
storage := saga.NewMemoryStorage()
executor := saga.NewExecutor(storage, saga.WithRegistry(registry))
ctx := context.Background()
err := executor.Start("make-sandwich-fail").
Input("breadtype", "wheat").
Input("condiment", "mustard").
Input("protein", "turkey").
Input("toppings", []string{"pickles"}).
Execute(ctx)
if err != nil {
fmt.Println("Sandwich failed - Loss Prevented")
}
fmt.Println("\nKitchen log:")
for _, entry := range kitchen.log {
fmt.Println(" -", entry)
}
fmt.Printf("\nInventory restored: wheat=%d, mustard=%d\n",
pantry.stock["wheat"], fridge.stock["mustard"])
}
Output: Sandwich failed - Loss Prevented Kitchen log: - Got wheat from pantry - Spread mustard on wheat slice - Checked fridge - out of turkey - Scraped mustard back into jar - Returned wheat to pantry Inventory restored: wheat=1, mustard=1
Example (SimpleSandwich) ¶
Example_simpleSandwich demonstrates optional toppings.
package main
import (
"context"
"encoding/json"
"fmt"
"strings"
"miren.dev/runtime/pkg/saga"
)
// Kitchen tracks what happened during sandwich making.
type Kitchen struct {
log []string
}
func (k *Kitchen) record(action string) {
k.log = append(k.log, action)
}
// Pantry holds bread and dry goods.
type Pantry struct {
stock map[string]int
}
func NewPantry(stock map[string]int) *Pantry {
return &Pantry{stock: stock}
}
func (p *Pantry) Take(item string) error {
if p.stock[item] <= 0 {
return fmt.Errorf("out of %s", item)
}
p.stock[item]--
return nil
}
func (p *Pantry) Return(item string) {
p.stock[item]++
}
// Fridge holds proteins, condiments, and cold items.
type Fridge struct {
stock map[string]int
}
func NewFridge(stock map[string]int) *Fridge {
return &Fridge{stock: stock}
}
func (f *Fridge) Take(item string) error {
if f.stock[item] <= 0 {
return fmt.Errorf("out of %s", item)
}
f.stock[item]--
return nil
}
func (f *Fridge) Return(item string) {
f.stock[item]++
}
type GetBreadIn struct {
BreadType string
}
type GetBreadOut struct {
Bread string
}
func GetBread(ctx context.Context, in GetBreadIn) (GetBreadOut, error) {
kitchen := saga.Get[*Kitchen](ctx)
pantry := saga.Get[*Pantry](ctx)
if err := pantry.Take(in.BreadType); err != nil {
kitchen.record(fmt.Sprintf("Checked pantry - %v", err))
return GetBreadOut{}, err
}
kitchen.record(fmt.Sprintf("Got %s from pantry", in.BreadType))
return GetBreadOut{Bread: in.BreadType + " slice"}, nil
}
func UndoGetBread(ctx context.Context, in GetBreadIn, out GetBreadOut) error {
kitchen := saga.Get[*Kitchen](ctx)
pantry := saga.Get[*Pantry](ctx)
pantry.Return(in.BreadType)
kitchen.record(fmt.Sprintf("Returned %s to pantry", in.BreadType))
return nil
}
type AddCondimentIn struct {
Bread string
Condiment string
}
type AddCondimentOut struct {
PreparedBread string
}
func AddCondiment(ctx context.Context, in AddCondimentIn) (AddCondimentOut, error) {
kitchen := saga.Get[*Kitchen](ctx)
fridge := saga.Get[*Fridge](ctx)
if err := fridge.Take(in.Condiment); err != nil {
kitchen.record(fmt.Sprintf("Checked fridge - %v", err))
return AddCondimentOut{}, err
}
kitchen.record(fmt.Sprintf("Spread %s on %s", in.Condiment, in.Bread))
return AddCondimentOut{
PreparedBread: in.Bread + " with " + in.Condiment,
}, nil
}
func UndoAddCondiment(ctx context.Context, in AddCondimentIn, out AddCondimentOut) error {
kitchen := saga.Get[*Kitchen](ctx)
fridge := saga.Get[*Fridge](ctx)
fridge.Return(in.Condiment)
kitchen.record(fmt.Sprintf("Scraped %s back into jar", in.Condiment))
return nil
}
type AddProteinIn struct {
PreparedBread string
Protein string
}
type AddProteinOut struct {
Stack string
}
func AddProtein(ctx context.Context, in AddProteinIn) (AddProteinOut, error) {
kitchen := saga.Get[*Kitchen](ctx)
fridge := saga.Get[*Fridge](ctx)
if err := fridge.Take(in.Protein); err != nil {
kitchen.record(fmt.Sprintf("Checked fridge - %v", err))
return AddProteinOut{}, err
}
kitchen.record(fmt.Sprintf("Layered %s on %s", in.Protein, in.PreparedBread))
return AddProteinOut{
Stack: in.PreparedBread + " + " + in.Protein,
}, nil
}
func UndoAddProtein(ctx context.Context, in AddProteinIn, out AddProteinOut) error {
kitchen := saga.Get[*Kitchen](ctx)
fridge := saga.Get[*Fridge](ctx)
fridge.Return(in.Protein)
kitchen.record(fmt.Sprintf("Put %s back in fridge", in.Protein))
return nil
}
type AddToppingsIn struct {
Stack string
Toppings []string `saga:",optional"`
}
type AddToppingsOut struct {
OpenSandwich string
}
func AddToppings(ctx context.Context, in AddToppingsIn) (AddToppingsOut, error) {
kitchen := saga.Get[*Kitchen](ctx)
result := in.Stack
if len(in.Toppings) > 0 {
toppingList := strings.Join(in.Toppings, ", ")
kitchen.record(fmt.Sprintf("Added %s", toppingList))
result += " + " + toppingList
} else {
kitchen.record("No toppings requested")
}
return AddToppingsOut{OpenSandwich: result}, nil
}
func UndoAddToppings(ctx context.Context, in AddToppingsIn, out AddToppingsOut) error {
kitchen := saga.Get[*Kitchen](ctx)
if len(in.Toppings) > 0 {
kitchen.record(fmt.Sprintf("Removed %s", strings.Join(in.Toppings, ", ")))
}
return nil
}
type CloseSandwichIn struct {
OpenSandwich string
}
type CloseSandwichOut struct {
Sandwich string
}
func CloseSandwich(ctx context.Context, in CloseSandwichIn) (CloseSandwichOut, error) {
kitchen := saga.Get[*Kitchen](ctx)
kitchen.record("Closed sandwich with top slice")
return CloseSandwichOut{
Sandwich: "[" + in.OpenSandwich + "]",
}, nil
}
func UndoCloseSandwich(ctx context.Context, in CloseSandwichIn, out CloseSandwichOut) error {
kitchen := saga.Get[*Kitchen](ctx)
kitchen.record("Opened sandwich back up")
return nil
}
func main() {
kitchen := &Kitchen{}
pantry := NewPantry(map[string]int{"rye": 1})
fridge := NewFridge(map[string]int{"butter": 1, "pastrami": 1})
registry := saga.NewRegistry()
saga.Define("simple-sandwich").
Using(kitchen).
Using(pantry).
Using(fridge).
Action(GetBread).Undo(UndoGetBread).
Action(AddCondiment).Undo(UndoAddCondiment).
Action(AddProtein).Undo(UndoAddProtein).
Action(AddToppings).Undo(UndoAddToppings).
Action(CloseSandwich).Undo(UndoCloseSandwich).
RegisterTo(registry)
storage := saga.NewMemoryStorage()
executor := saga.NewExecutor(storage, saga.WithRegistry(registry))
ctx := context.Background()
err := executor.Start("simple-sandwich").
Input("breadtype", "rye").
Input("condiment", "butter").
Input("protein", "pastrami").
// Note: no toppings - that's ok, it's optional!
WithID("order-3").
Execute(ctx)
if err != nil {
fmt.Printf("Failed: %v\n", err)
return
}
// Get the final result
exec, _ := storage.Get(ctx, "order-3")
var result CloseSandwichOut
json.Unmarshal(exec.ExecutedActions["close-sandwich"].Output, &result)
fmt.Printf("Result: %s\n", result.Sandwich)
fmt.Println("\nKitchen log:")
for _, entry := range kitchen.log {
fmt.Println(" -", entry)
}
}
Output: Result: [rye slice with butter + pastrami] Kitchen log: - Got rye from pantry - Spread butter on rye slice - Layered pastrami on rye slice with butter - No toppings requested - Closed sandwich with top slice
Index ¶
- Variables
- func Get[T any](ctx context.Context) T
- func TryGet[T any](ctx context.Context) (T, bool)
- func UndoNested(ctx context.Context, executionID string) error
- type Action
- type ActionBuilder
- type ActionInputs
- type ActionNode
- type ActionResult
- type Builder
- type Definition
- type EACStorage
- type Edge
- type EntityStorage
- type Execution
- type Executor
- type ExecutorOption
- type MemoryStorage
- type NestedOption
- type NestedResult
- type Registry
- type StartBuilder
- type Status
- type Storage
Examples ¶
Constants ¶
This section is empty.
Variables ¶
var ErrExecutionNotFound = errors.New("execution not found")
ErrExecutionNotFound is returned by Storage.Get when no execution exists for the given ID.
Functions ¶
func Get ¶
Get retrieves a dependency of type T from the context. Panics if the dependency is not found.
Types ¶
type Action ¶
type Action interface {
// Execute performs the action and returns an output that can be used
// by subsequent actions. The output must be JSON-serializable.
Execute(ctx context.Context, inputs ActionInputs) (output any, err error)
// Undo reverses the action. It receives the same inputs and the output
// that was produced by Execute. Undo should be idempotent.
Undo(ctx context.Context, inputs ActionInputs, output any) error
}
Action represents a single step in a saga. Actions are stateless and created by factories that are registered at application startup. All runtime data flows through ActionInputs.
type ActionBuilder ¶
type ActionBuilder struct {
// contains filtered or unexported fields
}
ActionBuilder provides a fluent API for defining a single action.
func (*ActionBuilder) Undo ¶
func (ab *ActionBuilder) Undo(undo any) *Builder
Undo sets the undo function for the action. The function signature must be: func(ctx context.Context, in InType, out OutType) error
type ActionInputs ¶
type ActionInputs interface {
// Get retrieves an input by key, deserializing it into target.
// Returns an error if the key doesn't exist or deserialization fails.
Get(key string, target any) error
// Has checks if an input exists (for optional inputs).
Has(key string) bool
// Keys returns all available input keys.
Keys() []string
}
ActionInputs provides access to initial saga inputs and outputs from prior actions. All outputs live in a flat namespace.
type ActionNode ¶
type ActionNode struct {
// Name is the unique name of this action within the saga.
Name string
// Action is the stateless action implementation.
Action Action
// InputKeys are the saga keys this action reads from.
InputKeys []string
// OutputKeys are the saga keys this action writes to.
OutputKeys []string
// Dependencies are action names that must complete before this action.
// Computed from InputKeys and other actions' OutputKeys.
Dependencies []string
}
ActionNode describes a single action within a saga definition.
type ActionResult ¶
type ActionResult struct {
// Output is the JSON-serialized output from the action.
Output []byte `json:"output,omitempty"`
// ExecutedAt is when the action was executed.
ExecutedAt time.Time `json:"executed_at"`
// UndoneAt is when the action was undone (nil if not undone).
UndoneAt *time.Time `json:"undone_at,omitempty"`
// Error is set if the action failed during execution.
Error string `json:"error,omitempty"`
}
ActionResult stores the outcome of a single action execution.
type Builder ¶
type Builder struct {
// contains filtered or unexported fields
}
Builder provides a fluent API for defining sagas.
func UsingAs ¶
UsingAs adds a dependency keyed by type T, allowing retrieval via Get[T](ctx). This is useful for injecting implementations that should be retrieved by interface type. For example: UsingAs[MyInterface](b, impl) allows Get[MyInterface](ctx).
func (*Builder) Action ¶
func (b *Builder) Action(args ...any) *ActionBuilder
Action adds an action to the saga using a typed execute function. The function signature must be: func(ctx context.Context, in InType) (OutType, error)
Can be called two ways:
- Action(GetBread) - name derived from function name ("getbread")
- Action("custom-name", GetBread) - explicit name
func (*Builder) Build ¶
func (b *Builder) Build() (*Definition, error)
Build constructs and validates the Definition without registering it. Useful for testing.
func (*Builder) Register ¶
Register validates and registers the saga definition with the global registry. Returns an error if validation fails (cycles, duplicate outputs, type mismatches).
func (*Builder) RegisterTo ¶
RegisterTo validates and registers the saga definition with the given registry. Useful for testing to avoid global state.
type Definition ¶
type Definition struct {
// Name uniquely identifies this saga definition.
Name string
// Version is incremented for breaking changes. Defaults to 1.
Version int
// Actions in this saga, keyed by action name.
Actions map[string]*ActionNode
// contains filtered or unexported fields
}
Definition describes a saga's structure - its actions and their dependencies. Definitions are stateless and registered at application startup.
func GetDefinition ¶
func GetDefinition(name string) (*Definition, bool)
GetDefinition retrieves a saga definition from the global registry.
func (*Definition) ExecutionOrder ¶ added in v0.4.0
func (d *Definition) ExecutionOrder() []string
ExecutionOrder returns the computed execution order for the saga's actions.
type EACStorage ¶ added in v0.6.0
type EACStorage struct {
// contains filtered or unexported fields
}
EACStorage implements Storage using an EntityAccessClient RPC connection. This is used by runners which don't have direct entity.Store access.
func NewEACStorage ¶ added in v0.6.0
func NewEACStorage(eac *es.EntityAccessClient, log *slog.Logger) *EACStorage
NewEACStorage creates a storage backed by an EntityAccessClient.
func (*EACStorage) ListIncomplete ¶ added in v0.6.0
func (s *EACStorage) ListIncomplete(ctx context.Context) ([]*Execution, error)
ListIncomplete returns all executions that need recovery via EAC.
type Edge ¶ added in v0.6.0
type Edge struct{}
Edge is a zero-size type used to declare ordering dependencies between saga actions without carrying data. Edge fields participate in the dependency graph (via saga struct tags) but are skipped during serialization and deserialization at runtime.
type EntityStorage ¶
type EntityStorage struct {
// contains filtered or unexported fields
}
EntityStorage implements Storage using the entity store.
func NewEntityStorage ¶
func NewEntityStorage(store entity.Store, log *slog.Logger) *EntityStorage
NewEntityStorage creates a storage backed by an entity store.
func (*EntityStorage) ListIncomplete ¶
func (s *EntityStorage) ListIncomplete(ctx context.Context) ([]*Execution, error)
ListIncomplete returns all executions that need recovery.
type Execution ¶
type Execution struct {
// ID is the unique identifier for this execution.
ID string `json:"id"`
// DefinitionName references the registered saga definition.
DefinitionName string `json:"definition_name"`
// DefinitionVersion is the version of the definition when started.
DefinitionVersion int `json:"definition_version"`
// InitialInputs contains the bootstrap data for the saga.
// All values must be JSON-serializable.
InitialInputs map[string]any `json:"initial_inputs"`
// Status is the current state of the execution.
Status Status `json:"status"`
// ExecutedActions maps action names to their results.
ExecutedActions map[string]*ActionResult `json:"executed_actions"`
// ExecutionOrder records the order actions were executed for reverse undo.
ExecutionOrder []string `json:"execution_order"`
// ParentExecutionID links this execution to a parent saga when run as a nested child.
ParentExecutionID string `json:"parent_execution_id,omitempty"`
// Error is set if the saga failed.
Error string `json:"error,omitempty"`
// CreatedAt is when the execution was created.
CreatedAt time.Time `json:"created_at"`
// UpdatedAt is when the execution was last updated.
UpdatedAt time.Time `json:"updated_at"`
}
Execution tracks the runtime state of a saga, persisted after each step.
type Executor ¶
type Executor struct {
// contains filtered or unexported fields
}
Executor orchestrates saga execution with durable logging.
func NewExecutor ¶
func NewExecutor(storage Storage, opts ...ExecutorOption) *Executor
NewExecutor creates an executor with the given storage and options.
func (*Executor) ExecutionOutputs ¶ added in v0.4.0
ExecutionOutputs loads a completed execution from storage and collects its outputs into a NestedResult. Useful for reading saga results without a capture struct.
func (*Executor) Start ¶
func (e *Executor) Start(definitionName string) *StartBuilder
Start begins building a saga execution.
type ExecutorOption ¶
type ExecutorOption func(*Executor)
ExecutorOption configures an Executor.
func WithLogger ¶
func WithLogger(log *slog.Logger) ExecutorOption
WithLogger sets a custom logger for the executor.
func WithRegistry ¶
func WithRegistry(r *Registry) ExecutorOption
WithRegistry sets a custom registry for the executor. Useful for testing to avoid global state.
type MemoryStorage ¶
type MemoryStorage struct {
// contains filtered or unexported fields
}
MemoryStorage is a simple in-memory storage implementation for testing and examples.
func NewMemoryStorage ¶
func NewMemoryStorage() *MemoryStorage
NewMemoryStorage creates a new in-memory storage.
func (*MemoryStorage) ListIncomplete ¶
func (m *MemoryStorage) ListIncomplete(ctx context.Context) ([]*Execution, error)
ListIncomplete returns all executions that need recovery. This includes pending (crashed before starting), running, and undoing sagas.
type NestedOption ¶ added in v0.4.0
type NestedOption func(*nestedConfig)
NestedOption configures a RunNested call.
func WithNestedID ¶ added in v0.4.0
func WithNestedID(id string) NestedOption
WithNestedID sets a specific execution ID for the child saga.
func WithNestedInput ¶ added in v0.4.0
func WithNestedInput(key string, value any) NestedOption
WithNestedInput adds an initial input to the child saga.
type NestedResult ¶ added in v0.4.0
type NestedResult struct {
ExecutionID string
// contains filtered or unexported fields
}
NestedResult wraps the outputs from a completed child saga execution.
func RunNested ¶ added in v0.4.0
func RunNested(ctx context.Context, sagaName string, opts ...NestedOption) (*NestedResult, error)
RunNested executes a child saga from within a parent saga action. It reuses the parent executor's registry and storage for durability and observability. The child execution's ParentExecutionID is set to the current execution.
func (*NestedResult) Get ¶ added in v0.4.0
func (nr *NestedResult) Get(key string, target any) error
Get deserializes a named output from the child saga into target.
func (*NestedResult) Has ¶ added in v0.4.0
func (nr *NestedResult) Has(key string) bool
Has returns true if the child saga produced an output with the given key.
type Registry ¶
type Registry struct {
// contains filtered or unexported fields
}
Registry holds registered saga definitions.
func NewRegistry ¶
func NewRegistry() *Registry
NewRegistry creates a new empty registry. Useful for testing to avoid global state.
func (*Registry) Get ¶
func (r *Registry) Get(name string) (*Definition, bool)
Get retrieves a definition by name.
func (*Registry) Register ¶
func (r *Registry) Register(def *Definition) error
Register adds a definition to the registry.
type StartBuilder ¶
type StartBuilder struct {
// contains filtered or unexported fields
}
StartBuilder provides a fluent API for starting saga executions.
func (*StartBuilder) Execute ¶
func (sb *StartBuilder) Execute(ctx context.Context) error
Execute runs the saga to completion or failure.
func (*StartBuilder) Input ¶
func (sb *StartBuilder) Input(key string, value any) *StartBuilder
Input adds an initial input value to the saga execution.
func (*StartBuilder) WithID ¶
func (sb *StartBuilder) WithID(id string) *StartBuilder
WithID sets a specific execution ID (otherwise one is generated).
type Status ¶
type Status string
Status represents the current state of a saga execution.
const ( // StatusPending indicates the saga has been created but not started. StatusPending Status = "pending" // StatusRunning indicates the saga is actively executing actions. StatusRunning Status = "running" // StatusUndoing indicates the saga is rolling back due to a failure. StatusUndoing Status = "undoing" // StatusCompleted indicates all actions completed successfully. StatusCompleted Status = "completed" // StatusFailed indicates the saga failed and all undos have been attempted. StatusFailed Status = "failed" )
type Storage ¶
type Storage interface {
// Save persists the execution state.
Save(ctx context.Context, exec *Execution) error
// Get retrieves an execution by ID.
Get(ctx context.Context, id string) (*Execution, error)
// ListIncomplete returns all executions that need recovery (Pending, Running, or Undoing).
ListIncomplete(ctx context.Context) ([]*Execution, error)
}
Storage persists saga execution state.