saga

package
v0.6.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 24, 2026 License: Apache-2.0 Imports: 18 Imported by: 0

Documentation

Overview

Package saga implements the Saga pattern for distributed operations with crash recovery. Each saga is a sequence of steps where each step has a corresponding undo operation. The framework guarantees that either all steps complete successfully or all completed steps are rolled back.

See RFD-35 for detailed design documentation.

Example (MakeSandwich)

Example_makeSandwich demonstrates a successful sandwich-making saga.

package main

import (
	"context"
	"encoding/json"
	"fmt"
	"strings"

	"miren.dev/runtime/pkg/saga"
)

// Kitchen tracks what happened during sandwich making.
type Kitchen struct {
	log []string
}

func (k *Kitchen) record(action string) {
	k.log = append(k.log, action)
}

// Pantry holds bread and dry goods.
type Pantry struct {
	stock map[string]int
}

func NewPantry(stock map[string]int) *Pantry {
	return &Pantry{stock: stock}
}

func (p *Pantry) Take(item string) error {
	if p.stock[item] <= 0 {
		return fmt.Errorf("out of %s", item)
	}
	p.stock[item]--
	return nil
}

func (p *Pantry) Return(item string) {
	p.stock[item]++
}

// Fridge holds proteins, condiments, and cold items.
type Fridge struct {
	stock map[string]int
}

func NewFridge(stock map[string]int) *Fridge {
	return &Fridge{stock: stock}
}

func (f *Fridge) Take(item string) error {
	if f.stock[item] <= 0 {
		return fmt.Errorf("out of %s", item)
	}
	f.stock[item]--
	return nil
}

func (f *Fridge) Return(item string) {
	f.stock[item]++
}

type GetBreadIn struct {
	BreadType string
}

type GetBreadOut struct {
	Bread string
}

func GetBread(ctx context.Context, in GetBreadIn) (GetBreadOut, error) {
	kitchen := saga.Get[*Kitchen](ctx)
	pantry := saga.Get[*Pantry](ctx)

	if err := pantry.Take(in.BreadType); err != nil {
		kitchen.record(fmt.Sprintf("Checked pantry - %v", err))
		return GetBreadOut{}, err
	}

	kitchen.record(fmt.Sprintf("Got %s from pantry", in.BreadType))
	return GetBreadOut{Bread: in.BreadType + " slice"}, nil
}

func UndoGetBread(ctx context.Context, in GetBreadIn, out GetBreadOut) error {
	kitchen := saga.Get[*Kitchen](ctx)
	pantry := saga.Get[*Pantry](ctx)

	pantry.Return(in.BreadType)
	kitchen.record(fmt.Sprintf("Returned %s to pantry", in.BreadType))
	return nil
}

type AddCondimentIn struct {
	Bread     string
	Condiment string
}

type AddCondimentOut struct {
	PreparedBread string
}

func AddCondiment(ctx context.Context, in AddCondimentIn) (AddCondimentOut, error) {
	kitchen := saga.Get[*Kitchen](ctx)
	fridge := saga.Get[*Fridge](ctx)

	if err := fridge.Take(in.Condiment); err != nil {
		kitchen.record(fmt.Sprintf("Checked fridge - %v", err))
		return AddCondimentOut{}, err
	}

	kitchen.record(fmt.Sprintf("Spread %s on %s", in.Condiment, in.Bread))
	return AddCondimentOut{
		PreparedBread: in.Bread + " with " + in.Condiment,
	}, nil
}

func UndoAddCondiment(ctx context.Context, in AddCondimentIn, out AddCondimentOut) error {
	kitchen := saga.Get[*Kitchen](ctx)
	fridge := saga.Get[*Fridge](ctx)

	fridge.Return(in.Condiment)
	kitchen.record(fmt.Sprintf("Scraped %s back into jar", in.Condiment))
	return nil
}

type AddProteinIn struct {
	PreparedBread string
	Protein       string
}

type AddProteinOut struct {
	Stack string
}

func AddProtein(ctx context.Context, in AddProteinIn) (AddProteinOut, error) {
	kitchen := saga.Get[*Kitchen](ctx)
	fridge := saga.Get[*Fridge](ctx)

	if err := fridge.Take(in.Protein); err != nil {
		kitchen.record(fmt.Sprintf("Checked fridge - %v", err))
		return AddProteinOut{}, err
	}

	kitchen.record(fmt.Sprintf("Layered %s on %s", in.Protein, in.PreparedBread))
	return AddProteinOut{
		Stack: in.PreparedBread + " + " + in.Protein,
	}, nil
}

func UndoAddProtein(ctx context.Context, in AddProteinIn, out AddProteinOut) error {
	kitchen := saga.Get[*Kitchen](ctx)
	fridge := saga.Get[*Fridge](ctx)

	fridge.Return(in.Protein)
	kitchen.record(fmt.Sprintf("Put %s back in fridge", in.Protein))
	return nil
}

type AddToppingsIn struct {
	Stack    string
	Toppings []string `saga:",optional"`
}

type AddToppingsOut struct {
	OpenSandwich string
}

func AddToppings(ctx context.Context, in AddToppingsIn) (AddToppingsOut, error) {
	kitchen := saga.Get[*Kitchen](ctx)

	result := in.Stack
	if len(in.Toppings) > 0 {
		toppingList := strings.Join(in.Toppings, ", ")
		kitchen.record(fmt.Sprintf("Added %s", toppingList))
		result += " + " + toppingList
	} else {
		kitchen.record("No toppings requested")
	}
	return AddToppingsOut{OpenSandwich: result}, nil
}

func UndoAddToppings(ctx context.Context, in AddToppingsIn, out AddToppingsOut) error {
	kitchen := saga.Get[*Kitchen](ctx)
	if len(in.Toppings) > 0 {
		kitchen.record(fmt.Sprintf("Removed %s", strings.Join(in.Toppings, ", ")))
	}
	return nil
}

type CloseSandwichIn struct {
	OpenSandwich string
}

type CloseSandwichOut struct {
	Sandwich string
}

func CloseSandwich(ctx context.Context, in CloseSandwichIn) (CloseSandwichOut, error) {
	kitchen := saga.Get[*Kitchen](ctx)
	kitchen.record("Closed sandwich with top slice")
	return CloseSandwichOut{
		Sandwich: "[" + in.OpenSandwich + "]",
	}, nil
}

func UndoCloseSandwich(ctx context.Context, in CloseSandwichIn, out CloseSandwichOut) error {
	kitchen := saga.Get[*Kitchen](ctx)
	kitchen.record("Opened sandwich back up")
	return nil
}

func main() {
	kitchen := &Kitchen{}
	pantry := NewPantry(map[string]int{
		"sourdough": 2,
		"wheat":     1,
		"rye":       1,
	})
	fridge := NewFridge(map[string]int{
		"mayo":     3,
		"mustard":  2,
		"ham":      4,
		"turkey":   2,
		"pastrami": 1,
	})

	registry := saga.NewRegistry()
	saga.Define("make-sandwich").
		Using(kitchen).
		Using(pantry).
		Using(fridge).
		Action(GetBread).Undo(UndoGetBread).
		Action(AddCondiment).Undo(UndoAddCondiment).
		Action(AddProtein).Undo(UndoAddProtein).
		Action(AddToppings).Undo(UndoAddToppings).
		Action(CloseSandwich).Undo(UndoCloseSandwich).
		RegisterTo(registry)

	storage := saga.NewMemoryStorage()
	executor := saga.NewExecutor(storage, saga.WithRegistry(registry))

	ctx := context.Background()
	err := executor.Start("make-sandwich").
		Input("breadtype", "sourdough").
		Input("condiment", "mayo").
		Input("protein", "ham").
		Input("toppings", []string{"lettuce", "tomato"}).
		WithID("order-1").
		Execute(ctx)

	if err != nil {
		fmt.Printf("Failed: %v\n", err)
		return
	}

	// Get the final result
	exec, _ := storage.Get(ctx, "order-1")
	var result CloseSandwichOut
	json.Unmarshal(exec.ExecutedActions["close-sandwich"].Output, &result)

	fmt.Printf("Result: %s\n", result.Sandwich)

	fmt.Println("\nKitchen log:")
	for _, entry := range kitchen.log {
		fmt.Println("  -", entry)
	}

}
Output:
Result: [sourdough slice with mayo + ham + lettuce, tomato]

Kitchen log:
  - Got sourdough from pantry
  - Spread mayo on sourdough slice
  - Layered ham on sourdough slice with mayo
  - Added lettuce, tomato
  - Closed sandwich with top slice
Example (OutOfStock)

Example_outOfStock demonstrates saga compensation when the fridge is empty.

package main

import (
	"context"
	"fmt"
	"strings"

	"miren.dev/runtime/pkg/saga"
)

// Kitchen tracks what happened during sandwich making.
type Kitchen struct {
	log []string
}

func (k *Kitchen) record(action string) {
	k.log = append(k.log, action)
}

// Pantry holds bread and dry goods.
type Pantry struct {
	stock map[string]int
}

func NewPantry(stock map[string]int) *Pantry {
	return &Pantry{stock: stock}
}

func (p *Pantry) Take(item string) error {
	if p.stock[item] <= 0 {
		return fmt.Errorf("out of %s", item)
	}
	p.stock[item]--
	return nil
}

func (p *Pantry) Return(item string) {
	p.stock[item]++
}

// Fridge holds proteins, condiments, and cold items.
type Fridge struct {
	stock map[string]int
}

func NewFridge(stock map[string]int) *Fridge {
	return &Fridge{stock: stock}
}

func (f *Fridge) Take(item string) error {
	if f.stock[item] <= 0 {
		return fmt.Errorf("out of %s", item)
	}
	f.stock[item]--
	return nil
}

func (f *Fridge) Return(item string) {
	f.stock[item]++
}

type GetBreadIn struct {
	BreadType string
}

type GetBreadOut struct {
	Bread string
}

func GetBread(ctx context.Context, in GetBreadIn) (GetBreadOut, error) {
	kitchen := saga.Get[*Kitchen](ctx)
	pantry := saga.Get[*Pantry](ctx)

	if err := pantry.Take(in.BreadType); err != nil {
		kitchen.record(fmt.Sprintf("Checked pantry - %v", err))
		return GetBreadOut{}, err
	}

	kitchen.record(fmt.Sprintf("Got %s from pantry", in.BreadType))
	return GetBreadOut{Bread: in.BreadType + " slice"}, nil
}

func UndoGetBread(ctx context.Context, in GetBreadIn, out GetBreadOut) error {
	kitchen := saga.Get[*Kitchen](ctx)
	pantry := saga.Get[*Pantry](ctx)

	pantry.Return(in.BreadType)
	kitchen.record(fmt.Sprintf("Returned %s to pantry", in.BreadType))
	return nil
}

type AddCondimentIn struct {
	Bread     string
	Condiment string
}

type AddCondimentOut struct {
	PreparedBread string
}

func AddCondiment(ctx context.Context, in AddCondimentIn) (AddCondimentOut, error) {
	kitchen := saga.Get[*Kitchen](ctx)
	fridge := saga.Get[*Fridge](ctx)

	if err := fridge.Take(in.Condiment); err != nil {
		kitchen.record(fmt.Sprintf("Checked fridge - %v", err))
		return AddCondimentOut{}, err
	}

	kitchen.record(fmt.Sprintf("Spread %s on %s", in.Condiment, in.Bread))
	return AddCondimentOut{
		PreparedBread: in.Bread + " with " + in.Condiment,
	}, nil
}

func UndoAddCondiment(ctx context.Context, in AddCondimentIn, out AddCondimentOut) error {
	kitchen := saga.Get[*Kitchen](ctx)
	fridge := saga.Get[*Fridge](ctx)

	fridge.Return(in.Condiment)
	kitchen.record(fmt.Sprintf("Scraped %s back into jar", in.Condiment))
	return nil
}

type AddProteinIn struct {
	PreparedBread string
	Protein       string
}

type AddProteinOut struct {
	Stack string
}

func AddProtein(ctx context.Context, in AddProteinIn) (AddProteinOut, error) {
	kitchen := saga.Get[*Kitchen](ctx)
	fridge := saga.Get[*Fridge](ctx)

	if err := fridge.Take(in.Protein); err != nil {
		kitchen.record(fmt.Sprintf("Checked fridge - %v", err))
		return AddProteinOut{}, err
	}

	kitchen.record(fmt.Sprintf("Layered %s on %s", in.Protein, in.PreparedBread))
	return AddProteinOut{
		Stack: in.PreparedBread + " + " + in.Protein,
	}, nil
}

func UndoAddProtein(ctx context.Context, in AddProteinIn, out AddProteinOut) error {
	kitchen := saga.Get[*Kitchen](ctx)
	fridge := saga.Get[*Fridge](ctx)

	fridge.Return(in.Protein)
	kitchen.record(fmt.Sprintf("Put %s back in fridge", in.Protein))
	return nil
}

type AddToppingsIn struct {
	Stack    string
	Toppings []string `saga:",optional"`
}

type AddToppingsOut struct {
	OpenSandwich string
}

func AddToppings(ctx context.Context, in AddToppingsIn) (AddToppingsOut, error) {
	kitchen := saga.Get[*Kitchen](ctx)

	result := in.Stack
	if len(in.Toppings) > 0 {
		toppingList := strings.Join(in.Toppings, ", ")
		kitchen.record(fmt.Sprintf("Added %s", toppingList))
		result += " + " + toppingList
	} else {
		kitchen.record("No toppings requested")
	}
	return AddToppingsOut{OpenSandwich: result}, nil
}

func UndoAddToppings(ctx context.Context, in AddToppingsIn, out AddToppingsOut) error {
	kitchen := saga.Get[*Kitchen](ctx)
	if len(in.Toppings) > 0 {
		kitchen.record(fmt.Sprintf("Removed %s", strings.Join(in.Toppings, ", ")))
	}
	return nil
}

type CloseSandwichIn struct {
	OpenSandwich string
}

type CloseSandwichOut struct {
	Sandwich string
}

func CloseSandwich(ctx context.Context, in CloseSandwichIn) (CloseSandwichOut, error) {
	kitchen := saga.Get[*Kitchen](ctx)
	kitchen.record("Closed sandwich with top slice")
	return CloseSandwichOut{
		Sandwich: "[" + in.OpenSandwich + "]",
	}, nil
}

func UndoCloseSandwich(ctx context.Context, in CloseSandwichIn, out CloseSandwichOut) error {
	kitchen := saga.Get[*Kitchen](ctx)
	kitchen.record("Opened sandwich back up")
	return nil
}

func main() {
	kitchen := &Kitchen{}
	pantry := NewPantry(map[string]int{
		"wheat": 1,
	})
	fridge := NewFridge(map[string]int{
		"mustard": 1,
		"turkey":  0, // Out of turkey!
	})

	registry := saga.NewRegistry()
	saga.Define("make-sandwich-fail").
		Using(kitchen).
		Using(pantry).
		Using(fridge).
		Action(GetBread).Undo(UndoGetBread).
		Action(AddCondiment).Undo(UndoAddCondiment).
		Action(AddProtein).Undo(UndoAddProtein).
		Action(AddToppings).Undo(UndoAddToppings).
		Action(CloseSandwich).Undo(UndoCloseSandwich).
		RegisterTo(registry)

	storage := saga.NewMemoryStorage()
	executor := saga.NewExecutor(storage, saga.WithRegistry(registry))

	ctx := context.Background()
	err := executor.Start("make-sandwich-fail").
		Input("breadtype", "wheat").
		Input("condiment", "mustard").
		Input("protein", "turkey").
		Input("toppings", []string{"pickles"}).
		Execute(ctx)

	if err != nil {
		fmt.Println("Sandwich failed - Loss Prevented")
	}

	fmt.Println("\nKitchen log:")
	for _, entry := range kitchen.log {
		fmt.Println("  -", entry)
	}

	fmt.Printf("\nInventory restored: wheat=%d, mustard=%d\n",
		pantry.stock["wheat"], fridge.stock["mustard"])

}
Output:
Sandwich failed - Loss Prevented

Kitchen log:
  - Got wheat from pantry
  - Spread mustard on wheat slice
  - Checked fridge - out of turkey
  - Scraped mustard back into jar
  - Returned wheat to pantry

Inventory restored: wheat=1, mustard=1
Example (SimpleSandwich)

Example_simpleSandwich demonstrates optional toppings.

package main

import (
	"context"
	"encoding/json"
	"fmt"
	"strings"

	"miren.dev/runtime/pkg/saga"
)

// Kitchen tracks what happened during sandwich making.
type Kitchen struct {
	log []string
}

func (k *Kitchen) record(action string) {
	k.log = append(k.log, action)
}

// Pantry holds bread and dry goods.
type Pantry struct {
	stock map[string]int
}

func NewPantry(stock map[string]int) *Pantry {
	return &Pantry{stock: stock}
}

func (p *Pantry) Take(item string) error {
	if p.stock[item] <= 0 {
		return fmt.Errorf("out of %s", item)
	}
	p.stock[item]--
	return nil
}

func (p *Pantry) Return(item string) {
	p.stock[item]++
}

// Fridge holds proteins, condiments, and cold items.
type Fridge struct {
	stock map[string]int
}

func NewFridge(stock map[string]int) *Fridge {
	return &Fridge{stock: stock}
}

func (f *Fridge) Take(item string) error {
	if f.stock[item] <= 0 {
		return fmt.Errorf("out of %s", item)
	}
	f.stock[item]--
	return nil
}

func (f *Fridge) Return(item string) {
	f.stock[item]++
}

type GetBreadIn struct {
	BreadType string
}

type GetBreadOut struct {
	Bread string
}

func GetBread(ctx context.Context, in GetBreadIn) (GetBreadOut, error) {
	kitchen := saga.Get[*Kitchen](ctx)
	pantry := saga.Get[*Pantry](ctx)

	if err := pantry.Take(in.BreadType); err != nil {
		kitchen.record(fmt.Sprintf("Checked pantry - %v", err))
		return GetBreadOut{}, err
	}

	kitchen.record(fmt.Sprintf("Got %s from pantry", in.BreadType))
	return GetBreadOut{Bread: in.BreadType + " slice"}, nil
}

func UndoGetBread(ctx context.Context, in GetBreadIn, out GetBreadOut) error {
	kitchen := saga.Get[*Kitchen](ctx)
	pantry := saga.Get[*Pantry](ctx)

	pantry.Return(in.BreadType)
	kitchen.record(fmt.Sprintf("Returned %s to pantry", in.BreadType))
	return nil
}

type AddCondimentIn struct {
	Bread     string
	Condiment string
}

type AddCondimentOut struct {
	PreparedBread string
}

func AddCondiment(ctx context.Context, in AddCondimentIn) (AddCondimentOut, error) {
	kitchen := saga.Get[*Kitchen](ctx)
	fridge := saga.Get[*Fridge](ctx)

	if err := fridge.Take(in.Condiment); err != nil {
		kitchen.record(fmt.Sprintf("Checked fridge - %v", err))
		return AddCondimentOut{}, err
	}

	kitchen.record(fmt.Sprintf("Spread %s on %s", in.Condiment, in.Bread))
	return AddCondimentOut{
		PreparedBread: in.Bread + " with " + in.Condiment,
	}, nil
}

func UndoAddCondiment(ctx context.Context, in AddCondimentIn, out AddCondimentOut) error {
	kitchen := saga.Get[*Kitchen](ctx)
	fridge := saga.Get[*Fridge](ctx)

	fridge.Return(in.Condiment)
	kitchen.record(fmt.Sprintf("Scraped %s back into jar", in.Condiment))
	return nil
}

type AddProteinIn struct {
	PreparedBread string
	Protein       string
}

type AddProteinOut struct {
	Stack string
}

func AddProtein(ctx context.Context, in AddProteinIn) (AddProteinOut, error) {
	kitchen := saga.Get[*Kitchen](ctx)
	fridge := saga.Get[*Fridge](ctx)

	if err := fridge.Take(in.Protein); err != nil {
		kitchen.record(fmt.Sprintf("Checked fridge - %v", err))
		return AddProteinOut{}, err
	}

	kitchen.record(fmt.Sprintf("Layered %s on %s", in.Protein, in.PreparedBread))
	return AddProteinOut{
		Stack: in.PreparedBread + " + " + in.Protein,
	}, nil
}

func UndoAddProtein(ctx context.Context, in AddProteinIn, out AddProteinOut) error {
	kitchen := saga.Get[*Kitchen](ctx)
	fridge := saga.Get[*Fridge](ctx)

	fridge.Return(in.Protein)
	kitchen.record(fmt.Sprintf("Put %s back in fridge", in.Protein))
	return nil
}

type AddToppingsIn struct {
	Stack    string
	Toppings []string `saga:",optional"`
}

type AddToppingsOut struct {
	OpenSandwich string
}

func AddToppings(ctx context.Context, in AddToppingsIn) (AddToppingsOut, error) {
	kitchen := saga.Get[*Kitchen](ctx)

	result := in.Stack
	if len(in.Toppings) > 0 {
		toppingList := strings.Join(in.Toppings, ", ")
		kitchen.record(fmt.Sprintf("Added %s", toppingList))
		result += " + " + toppingList
	} else {
		kitchen.record("No toppings requested")
	}
	return AddToppingsOut{OpenSandwich: result}, nil
}

func UndoAddToppings(ctx context.Context, in AddToppingsIn, out AddToppingsOut) error {
	kitchen := saga.Get[*Kitchen](ctx)
	if len(in.Toppings) > 0 {
		kitchen.record(fmt.Sprintf("Removed %s", strings.Join(in.Toppings, ", ")))
	}
	return nil
}

type CloseSandwichIn struct {
	OpenSandwich string
}

type CloseSandwichOut struct {
	Sandwich string
}

func CloseSandwich(ctx context.Context, in CloseSandwichIn) (CloseSandwichOut, error) {
	kitchen := saga.Get[*Kitchen](ctx)
	kitchen.record("Closed sandwich with top slice")
	return CloseSandwichOut{
		Sandwich: "[" + in.OpenSandwich + "]",
	}, nil
}

func UndoCloseSandwich(ctx context.Context, in CloseSandwichIn, out CloseSandwichOut) error {
	kitchen := saga.Get[*Kitchen](ctx)
	kitchen.record("Opened sandwich back up")
	return nil
}

func main() {
	kitchen := &Kitchen{}
	pantry := NewPantry(map[string]int{"rye": 1})
	fridge := NewFridge(map[string]int{"butter": 1, "pastrami": 1})

	registry := saga.NewRegistry()
	saga.Define("simple-sandwich").
		Using(kitchen).
		Using(pantry).
		Using(fridge).
		Action(GetBread).Undo(UndoGetBread).
		Action(AddCondiment).Undo(UndoAddCondiment).
		Action(AddProtein).Undo(UndoAddProtein).
		Action(AddToppings).Undo(UndoAddToppings).
		Action(CloseSandwich).Undo(UndoCloseSandwich).
		RegisterTo(registry)

	storage := saga.NewMemoryStorage()
	executor := saga.NewExecutor(storage, saga.WithRegistry(registry))

	ctx := context.Background()
	err := executor.Start("simple-sandwich").
		Input("breadtype", "rye").
		Input("condiment", "butter").
		Input("protein", "pastrami").
		// Note: no toppings - that's ok, it's optional!
		WithID("order-3").
		Execute(ctx)

	if err != nil {
		fmt.Printf("Failed: %v\n", err)
		return
	}

	// Get the final result
	exec, _ := storage.Get(ctx, "order-3")
	var result CloseSandwichOut
	json.Unmarshal(exec.ExecutedActions["close-sandwich"].Output, &result)

	fmt.Printf("Result: %s\n", result.Sandwich)

	fmt.Println("\nKitchen log:")
	for _, entry := range kitchen.log {
		fmt.Println("  -", entry)
	}

}
Output:
Result: [rye slice with butter + pastrami]

Kitchen log:
  - Got rye from pantry
  - Spread butter on rye slice
  - Layered pastrami on rye slice with butter
  - No toppings requested
  - Closed sandwich with top slice

Index

Examples

Constants

This section is empty.

Variables

View Source
var ErrExecutionNotFound = errors.New("execution not found")

ErrExecutionNotFound is returned by Storage.Get when no execution exists for the given ID.

Functions

func Get

func Get[T any](ctx context.Context) T

Get retrieves a dependency of type T from the context. Panics if the dependency is not found.

func TryGet

func TryGet[T any](ctx context.Context) (T, bool)

TryGet retrieves a dependency of type T from the context. Returns the zero value and false if not found.

func UndoNested added in v0.4.0

func UndoNested(ctx context.Context, executionID string) error

UndoNested compensates a previously completed nested saga. Call this from an undo handler to roll back the child saga's actions.

Types

type Action

type Action interface {
	// Execute performs the action and returns an output that can be used
	// by subsequent actions. The output must be JSON-serializable.
	Execute(ctx context.Context, inputs ActionInputs) (output any, err error)

	// Undo reverses the action. It receives the same inputs and the output
	// that was produced by Execute. Undo should be idempotent.
	Undo(ctx context.Context, inputs ActionInputs, output any) error
}

Action represents a single step in a saga. Actions are stateless and created by factories that are registered at application startup. All runtime data flows through ActionInputs.

type ActionBuilder

type ActionBuilder struct {
	// contains filtered or unexported fields
}

ActionBuilder provides a fluent API for defining a single action.

func (*ActionBuilder) Undo

func (ab *ActionBuilder) Undo(undo any) *Builder

Undo sets the undo function for the action. The function signature must be: func(ctx context.Context, in InType, out OutType) error

type ActionInputs

type ActionInputs interface {
	// Get retrieves an input by key, deserializing it into target.
	// Returns an error if the key doesn't exist or deserialization fails.
	Get(key string, target any) error

	// Has checks if an input exists (for optional inputs).
	Has(key string) bool

	// Keys returns all available input keys.
	Keys() []string
}

ActionInputs provides access to initial saga inputs and outputs from prior actions. All outputs live in a flat namespace.

type ActionNode

type ActionNode struct {
	// Name is the unique name of this action within the saga.
	Name string

	// Action is the stateless action implementation.
	Action Action

	// InputKeys are the saga keys this action reads from.
	InputKeys []string

	// OutputKeys are the saga keys this action writes to.
	OutputKeys []string

	// Dependencies are action names that must complete before this action.
	// Computed from InputKeys and other actions' OutputKeys.
	Dependencies []string
}

ActionNode describes a single action within a saga definition.

type ActionResult

type ActionResult struct {
	// Output is the JSON-serialized output from the action.
	Output []byte `json:"output,omitempty"`

	// ExecutedAt is when the action was executed.
	ExecutedAt time.Time `json:"executed_at"`

	// UndoneAt is when the action was undone (nil if not undone).
	UndoneAt *time.Time `json:"undone_at,omitempty"`

	// Error is set if the action failed during execution.
	Error string `json:"error,omitempty"`
}

ActionResult stores the outcome of a single action execution.

type Builder

type Builder struct {
	// contains filtered or unexported fields
}

Builder provides a fluent API for defining sagas.

func Define

func Define(name string) *Builder

Define starts building a new saga definition with the given name.

func UsingAs

func UsingAs[T any](b *Builder, dep T) *Builder

UsingAs adds a dependency keyed by type T, allowing retrieval via Get[T](ctx). This is useful for injecting implementations that should be retrieved by interface type. For example: UsingAs[MyInterface](b, impl) allows Get[MyInterface](ctx).

func (*Builder) Action

func (b *Builder) Action(args ...any) *ActionBuilder

Action adds an action to the saga using a typed execute function. The function signature must be: func(ctx context.Context, in InType) (OutType, error)

Can be called two ways:

  • Action(GetBread) - name derived from function name ("getbread")
  • Action("custom-name", GetBread) - explicit name

func (*Builder) Build

func (b *Builder) Build() (*Definition, error)

Build constructs and validates the Definition without registering it. Useful for testing.

func (*Builder) Register

func (b *Builder) Register() error

Register validates and registers the saga definition with the global registry. Returns an error if validation fails (cycles, duplicate outputs, type mismatches).

func (*Builder) RegisterTo

func (b *Builder) RegisterTo(r *Registry) error

RegisterTo validates and registers the saga definition with the given registry. Useful for testing to avoid global state.

func (*Builder) Using

func (b *Builder) Using(deps ...any) *Builder

Using adds dependencies that will be injected into the context during execution. These can be retrieved using Get[T](ctx) within action functions. Note: Dependencies are keyed by their concrete type. To key by an interface type, use UsingAs[T] instead.

func (*Builder) Version

func (b *Builder) Version(v int) *Builder

Version sets the definition version (defaults to 1).

type Definition

type Definition struct {
	// Name uniquely identifies this saga definition.
	Name string

	// Version is incremented for breaking changes. Defaults to 1.
	Version int

	// Actions in this saga, keyed by action name.
	Actions map[string]*ActionNode
	// contains filtered or unexported fields
}

Definition describes a saga's structure - its actions and their dependencies. Definitions are stateless and registered at application startup.

func GetDefinition

func GetDefinition(name string) (*Definition, bool)

GetDefinition retrieves a saga definition from the global registry.

func (*Definition) ExecutionOrder added in v0.4.0

func (d *Definition) ExecutionOrder() []string

ExecutionOrder returns the computed execution order for the saga's actions.

type EACStorage added in v0.6.0

type EACStorage struct {
	// contains filtered or unexported fields
}

EACStorage implements Storage using an EntityAccessClient RPC connection. This is used by runners which don't have direct entity.Store access.

func NewEACStorage added in v0.6.0

func NewEACStorage(eac *es.EntityAccessClient, log *slog.Logger) *EACStorage

NewEACStorage creates a storage backed by an EntityAccessClient.

func (*EACStorage) Get added in v0.6.0

func (s *EACStorage) Get(ctx context.Context, id string) (*Execution, error)

Get retrieves an execution by ID via EAC.

func (*EACStorage) ListIncomplete added in v0.6.0

func (s *EACStorage) ListIncomplete(ctx context.Context) ([]*Execution, error)

ListIncomplete returns all executions that need recovery via EAC.

func (*EACStorage) Save added in v0.6.0

func (s *EACStorage) Save(ctx context.Context, exec *Execution) error

Save persists the execution state as an entity via EAC.

type Edge added in v0.6.0

type Edge struct{}

Edge is a zero-size type used to declare ordering dependencies between saga actions without carrying data. Edge fields participate in the dependency graph (via saga struct tags) but are skipped during serialization and deserialization at runtime.

type EntityStorage

type EntityStorage struct {
	// contains filtered or unexported fields
}

EntityStorage implements Storage using the entity store.

func NewEntityStorage

func NewEntityStorage(store entity.Store, log *slog.Logger) *EntityStorage

NewEntityStorage creates a storage backed by an entity store.

func (*EntityStorage) Get

func (s *EntityStorage) Get(ctx context.Context, id string) (*Execution, error)

Get retrieves an execution by ID.

func (*EntityStorage) ListIncomplete

func (s *EntityStorage) ListIncomplete(ctx context.Context) ([]*Execution, error)

ListIncomplete returns all executions that need recovery.

func (*EntityStorage) Save

func (s *EntityStorage) Save(ctx context.Context, exec *Execution) error

Save persists the execution state as an entity.

type Execution

type Execution struct {
	// ID is the unique identifier for this execution.
	ID string `json:"id"`

	// DefinitionName references the registered saga definition.
	DefinitionName string `json:"definition_name"`

	// DefinitionVersion is the version of the definition when started.
	DefinitionVersion int `json:"definition_version"`

	// InitialInputs contains the bootstrap data for the saga.
	// All values must be JSON-serializable.
	InitialInputs map[string]any `json:"initial_inputs"`

	// Status is the current state of the execution.
	Status Status `json:"status"`

	// ExecutedActions maps action names to their results.
	ExecutedActions map[string]*ActionResult `json:"executed_actions"`

	// ExecutionOrder records the order actions were executed for reverse undo.
	ExecutionOrder []string `json:"execution_order"`

	// ParentExecutionID links this execution to a parent saga when run as a nested child.
	ParentExecutionID string `json:"parent_execution_id,omitempty"`

	// Error is set if the saga failed.
	Error string `json:"error,omitempty"`

	// CreatedAt is when the execution was created.
	CreatedAt time.Time `json:"created_at"`

	// UpdatedAt is when the execution was last updated.
	UpdatedAt time.Time `json:"updated_at"`
}

Execution tracks the runtime state of a saga, persisted after each step.

type Executor

type Executor struct {
	// contains filtered or unexported fields
}

Executor orchestrates saga execution with durable logging.

func NewExecutor

func NewExecutor(storage Storage, opts ...ExecutorOption) *Executor

NewExecutor creates an executor with the given storage and options.

func (*Executor) ExecutionOutputs added in v0.4.0

func (e *Executor) ExecutionOutputs(ctx context.Context, executionID string) (*NestedResult, error)

ExecutionOutputs loads a completed execution from storage and collects its outputs into a NestedResult. Useful for reading saga results without a capture struct.

func (*Executor) Recover

func (e *Executor) Recover(ctx context.Context) error

Recover finds and resumes incomplete sagas after a restart.

func (*Executor) Start

func (e *Executor) Start(definitionName string) *StartBuilder

Start begins building a saga execution.

type ExecutorOption

type ExecutorOption func(*Executor)

ExecutorOption configures an Executor.

func WithLogger

func WithLogger(log *slog.Logger) ExecutorOption

WithLogger sets a custom logger for the executor.

func WithRegistry

func WithRegistry(r *Registry) ExecutorOption

WithRegistry sets a custom registry for the executor. Useful for testing to avoid global state.

type MemoryStorage

type MemoryStorage struct {
	// contains filtered or unexported fields
}

MemoryStorage is a simple in-memory storage implementation for testing and examples.

func NewMemoryStorage

func NewMemoryStorage() *MemoryStorage

NewMemoryStorage creates a new in-memory storage.

func (*MemoryStorage) Get

func (m *MemoryStorage) Get(ctx context.Context, id string) (*Execution, error)

Get retrieves an execution by ID.

func (*MemoryStorage) ListIncomplete

func (m *MemoryStorage) ListIncomplete(ctx context.Context) ([]*Execution, error)

ListIncomplete returns all executions that need recovery. This includes pending (crashed before starting), running, and undoing sagas.

func (*MemoryStorage) Save

func (m *MemoryStorage) Save(ctx context.Context, exec *Execution) error

Save persists an execution to memory.

type NestedOption added in v0.4.0

type NestedOption func(*nestedConfig)

NestedOption configures a RunNested call.

func WithNestedID added in v0.4.0

func WithNestedID(id string) NestedOption

WithNestedID sets a specific execution ID for the child saga.

func WithNestedInput added in v0.4.0

func WithNestedInput(key string, value any) NestedOption

WithNestedInput adds an initial input to the child saga.

type NestedResult added in v0.4.0

type NestedResult struct {
	ExecutionID string
	// contains filtered or unexported fields
}

NestedResult wraps the outputs from a completed child saga execution.

func RunNested added in v0.4.0

func RunNested(ctx context.Context, sagaName string, opts ...NestedOption) (*NestedResult, error)

RunNested executes a child saga from within a parent saga action. It reuses the parent executor's registry and storage for durability and observability. The child execution's ParentExecutionID is set to the current execution.

func (*NestedResult) Get added in v0.4.0

func (nr *NestedResult) Get(key string, target any) error

Get deserializes a named output from the child saga into target.

func (*NestedResult) Has added in v0.4.0

func (nr *NestedResult) Has(key string) bool

Has returns true if the child saga produced an output with the given key.

type Registry

type Registry struct {
	// contains filtered or unexported fields
}

Registry holds registered saga definitions.

func NewRegistry

func NewRegistry() *Registry

NewRegistry creates a new empty registry. Useful for testing to avoid global state.

func (*Registry) Get

func (r *Registry) Get(name string) (*Definition, bool)

Get retrieves a definition by name.

func (*Registry) Register

func (r *Registry) Register(def *Definition) error

Register adds a definition to the registry.

type StartBuilder

type StartBuilder struct {
	// contains filtered or unexported fields
}

StartBuilder provides a fluent API for starting saga executions.

func (*StartBuilder) Execute

func (sb *StartBuilder) Execute(ctx context.Context) error

Execute runs the saga to completion or failure.

func (*StartBuilder) Input

func (sb *StartBuilder) Input(key string, value any) *StartBuilder

Input adds an initial input value to the saga execution.

func (*StartBuilder) WithID

func (sb *StartBuilder) WithID(id string) *StartBuilder

WithID sets a specific execution ID (otherwise one is generated).

type Status

type Status string

Status represents the current state of a saga execution.

const (
	// StatusPending indicates the saga has been created but not started.
	StatusPending Status = "pending"

	// StatusRunning indicates the saga is actively executing actions.
	StatusRunning Status = "running"

	// StatusUndoing indicates the saga is rolling back due to a failure.
	StatusUndoing Status = "undoing"

	// StatusCompleted indicates all actions completed successfully.
	StatusCompleted Status = "completed"

	// StatusFailed indicates the saga failed and all undos have been attempted.
	StatusFailed Status = "failed"
)

type Storage

type Storage interface {
	// Save persists the execution state.
	Save(ctx context.Context, exec *Execution) error

	// Get retrieves an execution by ID.
	Get(ctx context.Context, id string) (*Execution, error)

	// ListIncomplete returns all executions that need recovery (Pending, Running, or Undoing).
	ListIncomplete(ctx context.Context) ([]*Execution, error)
}

Storage persists saga execution state.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL