saga

package
v0.3.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 6, 2026 License: Apache-2.0 Imports: 13 Imported by: 0

Documentation

Overview

Package saga implements the Saga pattern for distributed operations with crash recovery. Each saga is a sequence of steps where each step has a corresponding undo operation. The framework guarantees that either all steps complete successfully or all completed steps are rolled back.

See RFD-35 for detailed design documentation.

Example (MakeSandwich)

Example_makeSandwich demonstrates a successful sandwich-making saga.

package main

import (
	"context"
	"encoding/json"
	"fmt"
	"strings"

	"miren.dev/runtime/pkg/saga"
)

// Kitchen tracks what happened during sandwich making.
type Kitchen struct {
	log []string
}

func (k *Kitchen) record(action string) {
	k.log = append(k.log, action)
}

// Pantry holds bread and dry goods.
type Pantry struct {
	stock map[string]int
}

func NewPantry(stock map[string]int) *Pantry {
	return &Pantry{stock: stock}
}

func (p *Pantry) Take(item string) error {
	if p.stock[item] <= 0 {
		return fmt.Errorf("out of %s", item)
	}
	p.stock[item]--
	return nil
}

func (p *Pantry) Return(item string) {
	p.stock[item]++
}

// Fridge holds proteins, condiments, and cold items.
type Fridge struct {
	stock map[string]int
}

func NewFridge(stock map[string]int) *Fridge {
	return &Fridge{stock: stock}
}

func (f *Fridge) Take(item string) error {
	if f.stock[item] <= 0 {
		return fmt.Errorf("out of %s", item)
	}
	f.stock[item]--
	return nil
}

func (f *Fridge) Return(item string) {
	f.stock[item]++
}

type GetBreadIn struct {
	BreadType string
}

type GetBreadOut struct {
	Bread string
}

func GetBread(ctx context.Context, in GetBreadIn) (GetBreadOut, error) {
	kitchen := saga.Get[*Kitchen](ctx)
	pantry := saga.Get[*Pantry](ctx)

	if err := pantry.Take(in.BreadType); err != nil {
		kitchen.record(fmt.Sprintf("Checked pantry - %v", err))
		return GetBreadOut{}, err
	}

	kitchen.record(fmt.Sprintf("Got %s from pantry", in.BreadType))
	return GetBreadOut{Bread: in.BreadType + " slice"}, nil
}

func UndoGetBread(ctx context.Context, in GetBreadIn, out GetBreadOut) error {
	kitchen := saga.Get[*Kitchen](ctx)
	pantry := saga.Get[*Pantry](ctx)

	pantry.Return(in.BreadType)
	kitchen.record(fmt.Sprintf("Returned %s to pantry", in.BreadType))
	return nil
}

type AddCondimentIn struct {
	Bread     string
	Condiment string
}

type AddCondimentOut struct {
	PreparedBread string
}

func AddCondiment(ctx context.Context, in AddCondimentIn) (AddCondimentOut, error) {
	kitchen := saga.Get[*Kitchen](ctx)
	fridge := saga.Get[*Fridge](ctx)

	if err := fridge.Take(in.Condiment); err != nil {
		kitchen.record(fmt.Sprintf("Checked fridge - %v", err))
		return AddCondimentOut{}, err
	}

	kitchen.record(fmt.Sprintf("Spread %s on %s", in.Condiment, in.Bread))
	return AddCondimentOut{
		PreparedBread: in.Bread + " with " + in.Condiment,
	}, nil
}

func UndoAddCondiment(ctx context.Context, in AddCondimentIn, out AddCondimentOut) error {
	kitchen := saga.Get[*Kitchen](ctx)
	fridge := saga.Get[*Fridge](ctx)

	fridge.Return(in.Condiment)
	kitchen.record(fmt.Sprintf("Scraped %s back into jar", in.Condiment))
	return nil
}

type AddProteinIn struct {
	PreparedBread string
	Protein       string
}

type AddProteinOut struct {
	Stack string
}

func AddProtein(ctx context.Context, in AddProteinIn) (AddProteinOut, error) {
	kitchen := saga.Get[*Kitchen](ctx)
	fridge := saga.Get[*Fridge](ctx)

	if err := fridge.Take(in.Protein); err != nil {
		kitchen.record(fmt.Sprintf("Checked fridge - %v", err))
		return AddProteinOut{}, err
	}

	kitchen.record(fmt.Sprintf("Layered %s on %s", in.Protein, in.PreparedBread))
	return AddProteinOut{
		Stack: in.PreparedBread + " + " + in.Protein,
	}, nil
}

func UndoAddProtein(ctx context.Context, in AddProteinIn, out AddProteinOut) error {
	kitchen := saga.Get[*Kitchen](ctx)
	fridge := saga.Get[*Fridge](ctx)

	fridge.Return(in.Protein)
	kitchen.record(fmt.Sprintf("Put %s back in fridge", in.Protein))
	return nil
}

type AddToppingsIn struct {
	Stack    string
	Toppings []string `saga:",optional"`
}

type AddToppingsOut struct {
	OpenSandwich string
}

func AddToppings(ctx context.Context, in AddToppingsIn) (AddToppingsOut, error) {
	kitchen := saga.Get[*Kitchen](ctx)

	result := in.Stack
	if len(in.Toppings) > 0 {
		toppingList := strings.Join(in.Toppings, ", ")
		kitchen.record(fmt.Sprintf("Added %s", toppingList))
		result += " + " + toppingList
	} else {
		kitchen.record("No toppings requested")
	}
	return AddToppingsOut{OpenSandwich: result}, nil
}

func UndoAddToppings(ctx context.Context, in AddToppingsIn, out AddToppingsOut) error {
	kitchen := saga.Get[*Kitchen](ctx)
	if len(in.Toppings) > 0 {
		kitchen.record(fmt.Sprintf("Removed %s", strings.Join(in.Toppings, ", ")))
	}
	return nil
}

type CloseSandwichIn struct {
	OpenSandwich string
}

type CloseSandwichOut struct {
	Sandwich string
}

func CloseSandwich(ctx context.Context, in CloseSandwichIn) (CloseSandwichOut, error) {
	kitchen := saga.Get[*Kitchen](ctx)
	kitchen.record("Closed sandwich with top slice")
	return CloseSandwichOut{
		Sandwich: "[" + in.OpenSandwich + "]",
	}, nil
}

func UndoCloseSandwich(ctx context.Context, in CloseSandwichIn, out CloseSandwichOut) error {
	kitchen := saga.Get[*Kitchen](ctx)
	kitchen.record("Opened sandwich back up")
	return nil
}

func main() {
	kitchen := &Kitchen{}
	pantry := NewPantry(map[string]int{
		"sourdough": 2,
		"wheat":     1,
		"rye":       1,
	})
	fridge := NewFridge(map[string]int{
		"mayo":     3,
		"mustard":  2,
		"ham":      4,
		"turkey":   2,
		"pastrami": 1,
	})

	registry := saga.NewRegistry()
	saga.Define("make-sandwich").
		Using(kitchen).
		Using(pantry).
		Using(fridge).
		Action(GetBread).Undo(UndoGetBread).
		Action(AddCondiment).Undo(UndoAddCondiment).
		Action(AddProtein).Undo(UndoAddProtein).
		Action(AddToppings).Undo(UndoAddToppings).
		Action(CloseSandwich).Undo(UndoCloseSandwich).
		RegisterTo(registry)

	storage := saga.NewMemoryStorage()
	executor := saga.NewExecutor(storage, saga.WithRegistry(registry))

	ctx := context.Background()
	err := executor.Start("make-sandwich").
		Input("breadtype", "sourdough").
		Input("condiment", "mayo").
		Input("protein", "ham").
		Input("toppings", []string{"lettuce", "tomato"}).
		WithID("order-1").
		Execute(ctx)

	if err != nil {
		fmt.Printf("Failed: %v\n", err)
		return
	}

	// Get the final result
	exec, _ := storage.Get(ctx, "order-1")
	var result CloseSandwichOut
	json.Unmarshal(exec.ExecutedActions["close-sandwich"].Output, &result)

	fmt.Printf("Result: %s\n", result.Sandwich)

	fmt.Println("\nKitchen log:")
	for _, entry := range kitchen.log {
		fmt.Println("  -", entry)
	}

}
Output:
Result: [sourdough slice with mayo + ham + lettuce, tomato]

Kitchen log:
  - Got sourdough from pantry
  - Spread mayo on sourdough slice
  - Layered ham on sourdough slice with mayo
  - Added lettuce, tomato
  - Closed sandwich with top slice
Example (OutOfStock)

Example_outOfStock demonstrates saga compensation when the fridge is empty.

package main

import (
	"context"
	"fmt"
	"strings"

	"miren.dev/runtime/pkg/saga"
)

// Kitchen tracks what happened during sandwich making.
type Kitchen struct {
	log []string
}

func (k *Kitchen) record(action string) {
	k.log = append(k.log, action)
}

// Pantry holds bread and dry goods.
type Pantry struct {
	stock map[string]int
}

func NewPantry(stock map[string]int) *Pantry {
	return &Pantry{stock: stock}
}

func (p *Pantry) Take(item string) error {
	if p.stock[item] <= 0 {
		return fmt.Errorf("out of %s", item)
	}
	p.stock[item]--
	return nil
}

func (p *Pantry) Return(item string) {
	p.stock[item]++
}

// Fridge holds proteins, condiments, and cold items.
type Fridge struct {
	stock map[string]int
}

func NewFridge(stock map[string]int) *Fridge {
	return &Fridge{stock: stock}
}

func (f *Fridge) Take(item string) error {
	if f.stock[item] <= 0 {
		return fmt.Errorf("out of %s", item)
	}
	f.stock[item]--
	return nil
}

func (f *Fridge) Return(item string) {
	f.stock[item]++
}

type GetBreadIn struct {
	BreadType string
}

type GetBreadOut struct {
	Bread string
}

func GetBread(ctx context.Context, in GetBreadIn) (GetBreadOut, error) {
	kitchen := saga.Get[*Kitchen](ctx)
	pantry := saga.Get[*Pantry](ctx)

	if err := pantry.Take(in.BreadType); err != nil {
		kitchen.record(fmt.Sprintf("Checked pantry - %v", err))
		return GetBreadOut{}, err
	}

	kitchen.record(fmt.Sprintf("Got %s from pantry", in.BreadType))
	return GetBreadOut{Bread: in.BreadType + " slice"}, nil
}

func UndoGetBread(ctx context.Context, in GetBreadIn, out GetBreadOut) error {
	kitchen := saga.Get[*Kitchen](ctx)
	pantry := saga.Get[*Pantry](ctx)

	pantry.Return(in.BreadType)
	kitchen.record(fmt.Sprintf("Returned %s to pantry", in.BreadType))
	return nil
}

type AddCondimentIn struct {
	Bread     string
	Condiment string
}

type AddCondimentOut struct {
	PreparedBread string
}

func AddCondiment(ctx context.Context, in AddCondimentIn) (AddCondimentOut, error) {
	kitchen := saga.Get[*Kitchen](ctx)
	fridge := saga.Get[*Fridge](ctx)

	if err := fridge.Take(in.Condiment); err != nil {
		kitchen.record(fmt.Sprintf("Checked fridge - %v", err))
		return AddCondimentOut{}, err
	}

	kitchen.record(fmt.Sprintf("Spread %s on %s", in.Condiment, in.Bread))
	return AddCondimentOut{
		PreparedBread: in.Bread + " with " + in.Condiment,
	}, nil
}

func UndoAddCondiment(ctx context.Context, in AddCondimentIn, out AddCondimentOut) error {
	kitchen := saga.Get[*Kitchen](ctx)
	fridge := saga.Get[*Fridge](ctx)

	fridge.Return(in.Condiment)
	kitchen.record(fmt.Sprintf("Scraped %s back into jar", in.Condiment))
	return nil
}

type AddProteinIn struct {
	PreparedBread string
	Protein       string
}

type AddProteinOut struct {
	Stack string
}

func AddProtein(ctx context.Context, in AddProteinIn) (AddProteinOut, error) {
	kitchen := saga.Get[*Kitchen](ctx)
	fridge := saga.Get[*Fridge](ctx)

	if err := fridge.Take(in.Protein); err != nil {
		kitchen.record(fmt.Sprintf("Checked fridge - %v", err))
		return AddProteinOut{}, err
	}

	kitchen.record(fmt.Sprintf("Layered %s on %s", in.Protein, in.PreparedBread))
	return AddProteinOut{
		Stack: in.PreparedBread + " + " + in.Protein,
	}, nil
}

func UndoAddProtein(ctx context.Context, in AddProteinIn, out AddProteinOut) error {
	kitchen := saga.Get[*Kitchen](ctx)
	fridge := saga.Get[*Fridge](ctx)

	fridge.Return(in.Protein)
	kitchen.record(fmt.Sprintf("Put %s back in fridge", in.Protein))
	return nil
}

type AddToppingsIn struct {
	Stack    string
	Toppings []string `saga:",optional"`
}

type AddToppingsOut struct {
	OpenSandwich string
}

func AddToppings(ctx context.Context, in AddToppingsIn) (AddToppingsOut, error) {
	kitchen := saga.Get[*Kitchen](ctx)

	result := in.Stack
	if len(in.Toppings) > 0 {
		toppingList := strings.Join(in.Toppings, ", ")
		kitchen.record(fmt.Sprintf("Added %s", toppingList))
		result += " + " + toppingList
	} else {
		kitchen.record("No toppings requested")
	}
	return AddToppingsOut{OpenSandwich: result}, nil
}

func UndoAddToppings(ctx context.Context, in AddToppingsIn, out AddToppingsOut) error {
	kitchen := saga.Get[*Kitchen](ctx)
	if len(in.Toppings) > 0 {
		kitchen.record(fmt.Sprintf("Removed %s", strings.Join(in.Toppings, ", ")))
	}
	return nil
}

type CloseSandwichIn struct {
	OpenSandwich string
}

type CloseSandwichOut struct {
	Sandwich string
}

func CloseSandwich(ctx context.Context, in CloseSandwichIn) (CloseSandwichOut, error) {
	kitchen := saga.Get[*Kitchen](ctx)
	kitchen.record("Closed sandwich with top slice")
	return CloseSandwichOut{
		Sandwich: "[" + in.OpenSandwich + "]",
	}, nil
}

func UndoCloseSandwich(ctx context.Context, in CloseSandwichIn, out CloseSandwichOut) error {
	kitchen := saga.Get[*Kitchen](ctx)
	kitchen.record("Opened sandwich back up")
	return nil
}

func main() {
	kitchen := &Kitchen{}
	pantry := NewPantry(map[string]int{
		"wheat": 1,
	})
	fridge := NewFridge(map[string]int{
		"mustard": 1,
		"turkey":  0, // Out of turkey!
	})

	registry := saga.NewRegistry()
	saga.Define("make-sandwich-fail").
		Using(kitchen).
		Using(pantry).
		Using(fridge).
		Action(GetBread).Undo(UndoGetBread).
		Action(AddCondiment).Undo(UndoAddCondiment).
		Action(AddProtein).Undo(UndoAddProtein).
		Action(AddToppings).Undo(UndoAddToppings).
		Action(CloseSandwich).Undo(UndoCloseSandwich).
		RegisterTo(registry)

	storage := saga.NewMemoryStorage()
	executor := saga.NewExecutor(storage, saga.WithRegistry(registry))

	ctx := context.Background()
	err := executor.Start("make-sandwich-fail").
		Input("breadtype", "wheat").
		Input("condiment", "mustard").
		Input("protein", "turkey").
		Input("toppings", []string{"pickles"}).
		Execute(ctx)

	if err != nil {
		fmt.Println("Sandwich failed - Loss Prevented")
	}

	fmt.Println("\nKitchen log:")
	for _, entry := range kitchen.log {
		fmt.Println("  -", entry)
	}

	fmt.Printf("\nInventory restored: wheat=%d, mustard=%d\n",
		pantry.stock["wheat"], fridge.stock["mustard"])

}
Output:
Sandwich failed - Loss Prevented

Kitchen log:
  - Got wheat from pantry
  - Spread mustard on wheat slice
  - Checked fridge - out of turkey
  - Scraped mustard back into jar
  - Returned wheat to pantry

Inventory restored: wheat=1, mustard=1
Example (SimpleSandwich)

Example_simpleSandwich demonstrates optional toppings.

package main

import (
	"context"
	"encoding/json"
	"fmt"
	"strings"

	"miren.dev/runtime/pkg/saga"
)

// Kitchen tracks what happened during sandwich making.
type Kitchen struct {
	log []string
}

func (k *Kitchen) record(action string) {
	k.log = append(k.log, action)
}

// Pantry holds bread and dry goods.
type Pantry struct {
	stock map[string]int
}

func NewPantry(stock map[string]int) *Pantry {
	return &Pantry{stock: stock}
}

func (p *Pantry) Take(item string) error {
	if p.stock[item] <= 0 {
		return fmt.Errorf("out of %s", item)
	}
	p.stock[item]--
	return nil
}

func (p *Pantry) Return(item string) {
	p.stock[item]++
}

// Fridge holds proteins, condiments, and cold items.
type Fridge struct {
	stock map[string]int
}

func NewFridge(stock map[string]int) *Fridge {
	return &Fridge{stock: stock}
}

func (f *Fridge) Take(item string) error {
	if f.stock[item] <= 0 {
		return fmt.Errorf("out of %s", item)
	}
	f.stock[item]--
	return nil
}

func (f *Fridge) Return(item string) {
	f.stock[item]++
}

type GetBreadIn struct {
	BreadType string
}

type GetBreadOut struct {
	Bread string
}

func GetBread(ctx context.Context, in GetBreadIn) (GetBreadOut, error) {
	kitchen := saga.Get[*Kitchen](ctx)
	pantry := saga.Get[*Pantry](ctx)

	if err := pantry.Take(in.BreadType); err != nil {
		kitchen.record(fmt.Sprintf("Checked pantry - %v", err))
		return GetBreadOut{}, err
	}

	kitchen.record(fmt.Sprintf("Got %s from pantry", in.BreadType))
	return GetBreadOut{Bread: in.BreadType + " slice"}, nil
}

func UndoGetBread(ctx context.Context, in GetBreadIn, out GetBreadOut) error {
	kitchen := saga.Get[*Kitchen](ctx)
	pantry := saga.Get[*Pantry](ctx)

	pantry.Return(in.BreadType)
	kitchen.record(fmt.Sprintf("Returned %s to pantry", in.BreadType))
	return nil
}

type AddCondimentIn struct {
	Bread     string
	Condiment string
}

type AddCondimentOut struct {
	PreparedBread string
}

func AddCondiment(ctx context.Context, in AddCondimentIn) (AddCondimentOut, error) {
	kitchen := saga.Get[*Kitchen](ctx)
	fridge := saga.Get[*Fridge](ctx)

	if err := fridge.Take(in.Condiment); err != nil {
		kitchen.record(fmt.Sprintf("Checked fridge - %v", err))
		return AddCondimentOut{}, err
	}

	kitchen.record(fmt.Sprintf("Spread %s on %s", in.Condiment, in.Bread))
	return AddCondimentOut{
		PreparedBread: in.Bread + " with " + in.Condiment,
	}, nil
}

func UndoAddCondiment(ctx context.Context, in AddCondimentIn, out AddCondimentOut) error {
	kitchen := saga.Get[*Kitchen](ctx)
	fridge := saga.Get[*Fridge](ctx)

	fridge.Return(in.Condiment)
	kitchen.record(fmt.Sprintf("Scraped %s back into jar", in.Condiment))
	return nil
}

type AddProteinIn struct {
	PreparedBread string
	Protein       string
}

type AddProteinOut struct {
	Stack string
}

func AddProtein(ctx context.Context, in AddProteinIn) (AddProteinOut, error) {
	kitchen := saga.Get[*Kitchen](ctx)
	fridge := saga.Get[*Fridge](ctx)

	if err := fridge.Take(in.Protein); err != nil {
		kitchen.record(fmt.Sprintf("Checked fridge - %v", err))
		return AddProteinOut{}, err
	}

	kitchen.record(fmt.Sprintf("Layered %s on %s", in.Protein, in.PreparedBread))
	return AddProteinOut{
		Stack: in.PreparedBread + " + " + in.Protein,
	}, nil
}

func UndoAddProtein(ctx context.Context, in AddProteinIn, out AddProteinOut) error {
	kitchen := saga.Get[*Kitchen](ctx)
	fridge := saga.Get[*Fridge](ctx)

	fridge.Return(in.Protein)
	kitchen.record(fmt.Sprintf("Put %s back in fridge", in.Protein))
	return nil
}

type AddToppingsIn struct {
	Stack    string
	Toppings []string `saga:",optional"`
}

type AddToppingsOut struct {
	OpenSandwich string
}

func AddToppings(ctx context.Context, in AddToppingsIn) (AddToppingsOut, error) {
	kitchen := saga.Get[*Kitchen](ctx)

	result := in.Stack
	if len(in.Toppings) > 0 {
		toppingList := strings.Join(in.Toppings, ", ")
		kitchen.record(fmt.Sprintf("Added %s", toppingList))
		result += " + " + toppingList
	} else {
		kitchen.record("No toppings requested")
	}
	return AddToppingsOut{OpenSandwich: result}, nil
}

func UndoAddToppings(ctx context.Context, in AddToppingsIn, out AddToppingsOut) error {
	kitchen := saga.Get[*Kitchen](ctx)
	if len(in.Toppings) > 0 {
		kitchen.record(fmt.Sprintf("Removed %s", strings.Join(in.Toppings, ", ")))
	}
	return nil
}

type CloseSandwichIn struct {
	OpenSandwich string
}

type CloseSandwichOut struct {
	Sandwich string
}

func CloseSandwich(ctx context.Context, in CloseSandwichIn) (CloseSandwichOut, error) {
	kitchen := saga.Get[*Kitchen](ctx)
	kitchen.record("Closed sandwich with top slice")
	return CloseSandwichOut{
		Sandwich: "[" + in.OpenSandwich + "]",
	}, nil
}

func UndoCloseSandwich(ctx context.Context, in CloseSandwichIn, out CloseSandwichOut) error {
	kitchen := saga.Get[*Kitchen](ctx)
	kitchen.record("Opened sandwich back up")
	return nil
}

func main() {
	kitchen := &Kitchen{}
	pantry := NewPantry(map[string]int{"rye": 1})
	fridge := NewFridge(map[string]int{"butter": 1, "pastrami": 1})

	registry := saga.NewRegistry()
	saga.Define("simple-sandwich").
		Using(kitchen).
		Using(pantry).
		Using(fridge).
		Action(GetBread).Undo(UndoGetBread).
		Action(AddCondiment).Undo(UndoAddCondiment).
		Action(AddProtein).Undo(UndoAddProtein).
		Action(AddToppings).Undo(UndoAddToppings).
		Action(CloseSandwich).Undo(UndoCloseSandwich).
		RegisterTo(registry)

	storage := saga.NewMemoryStorage()
	executor := saga.NewExecutor(storage, saga.WithRegistry(registry))

	ctx := context.Background()
	err := executor.Start("simple-sandwich").
		Input("breadtype", "rye").
		Input("condiment", "butter").
		Input("protein", "pastrami").
		// Note: no toppings - that's ok, it's optional!
		WithID("order-3").
		Execute(ctx)

	if err != nil {
		fmt.Printf("Failed: %v\n", err)
		return
	}

	// Get the final result
	exec, _ := storage.Get(ctx, "order-3")
	var result CloseSandwichOut
	json.Unmarshal(exec.ExecutedActions["close-sandwich"].Output, &result)

	fmt.Printf("Result: %s\n", result.Sandwich)

	fmt.Println("\nKitchen log:")
	for _, entry := range kitchen.log {
		fmt.Println("  -", entry)
	}

}
Output:
Result: [rye slice with butter + pastrami]

Kitchen log:
  - Got rye from pantry
  - Spread butter on rye slice
  - Layered pastrami on rye slice with butter
  - No toppings requested
  - Closed sandwich with top slice

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

func Get

func Get[T any](ctx context.Context) T

Get retrieves a dependency of type T from the context. Panics if the dependency is not found.

func TryGet

func TryGet[T any](ctx context.Context) (T, bool)

TryGet retrieves a dependency of type T from the context. Returns the zero value and false if not found.

Types

type Action

type Action interface {
	// Execute performs the action and returns an output that can be used
	// by subsequent actions. The output must be JSON-serializable.
	Execute(ctx context.Context, inputs ActionInputs) (output any, err error)

	// Undo reverses the action. It receives the same inputs and the output
	// that was produced by Execute. Undo should be idempotent.
	Undo(ctx context.Context, inputs ActionInputs, output any) error
}

Action represents a single step in a saga. Actions are stateless and created by factories that are registered at application startup. All runtime data flows through ActionInputs.

type ActionBuilder

type ActionBuilder struct {
	// contains filtered or unexported fields
}

ActionBuilder provides a fluent API for defining a single action.

func (*ActionBuilder) Undo

func (ab *ActionBuilder) Undo(undo any) *Builder

Undo sets the undo function for the action. The function signature must be: func(ctx context.Context, in InType, out OutType) error

type ActionInputs

type ActionInputs interface {
	// Get retrieves an input by key, deserializing it into target.
	// Returns an error if the key doesn't exist or deserialization fails.
	Get(key string, target any) error

	// Has checks if an input exists (for optional inputs).
	Has(key string) bool

	// Keys returns all available input keys.
	Keys() []string
}

ActionInputs provides access to initial saga inputs and outputs from prior actions. All outputs live in a flat namespace.

type ActionNode

type ActionNode struct {
	// Name is the unique name of this action within the saga.
	Name string

	// Action is the stateless action implementation.
	Action Action

	// InputKeys are the saga keys this action reads from.
	InputKeys []string

	// OutputKeys are the saga keys this action writes to.
	OutputKeys []string

	// Dependencies are action names that must complete before this action.
	// Computed from InputKeys and other actions' OutputKeys.
	Dependencies []string
}

ActionNode describes a single action within a saga definition.

type ActionResult

type ActionResult struct {
	// Output is the JSON-serialized output from the action.
	Output []byte `json:"output,omitempty"`

	// ExecutedAt is when the action was executed.
	ExecutedAt time.Time `json:"executed_at"`

	// UndoneAt is when the action was undone (nil if not undone).
	UndoneAt *time.Time `json:"undone_at,omitempty"`

	// Error is set if the action failed during execution.
	Error string `json:"error,omitempty"`
}

ActionResult stores the outcome of a single action execution.

type Builder

type Builder struct {
	// contains filtered or unexported fields
}

Builder provides a fluent API for defining sagas.

func Define

func Define(name string) *Builder

Define starts building a new saga definition with the given name.

func UsingAs

func UsingAs[T any](b *Builder, dep T) *Builder

UsingAs adds a dependency keyed by type T, allowing retrieval via Get[T](ctx). This is useful for injecting implementations that should be retrieved by interface type. For example: UsingAs[MyInterface](b, impl) allows Get[MyInterface](ctx).

func (*Builder) Action

func (b *Builder) Action(args ...any) *ActionBuilder

Action adds an action to the saga using a typed execute function. The function signature must be: func(ctx context.Context, in InType) (OutType, error)

Can be called two ways:

  • Action(GetBread) - name derived from function name ("getbread")
  • Action("custom-name", GetBread) - explicit name

func (*Builder) Build

func (b *Builder) Build() (*Definition, error)

Build constructs and validates the Definition without registering it. Useful for testing.

func (*Builder) Register

func (b *Builder) Register() error

Register validates and registers the saga definition with the global registry. Returns an error if validation fails (cycles, duplicate outputs, type mismatches).

func (*Builder) RegisterTo

func (b *Builder) RegisterTo(r *Registry) error

RegisterTo validates and registers the saga definition with the given registry. Useful for testing to avoid global state.

func (*Builder) Using

func (b *Builder) Using(deps ...any) *Builder

Using adds dependencies that will be injected into the context during execution. These can be retrieved using Get[T](ctx) within action functions. Note: Dependencies are keyed by their concrete type. To key by an interface type, use UsingAs[T] instead.

func (*Builder) Version

func (b *Builder) Version(v int) *Builder

Version sets the definition version (defaults to 1).

type Definition

type Definition struct {
	// Name uniquely identifies this saga definition.
	Name string

	// Version is incremented for breaking changes. Defaults to 1.
	Version int

	// Actions in this saga, keyed by action name.
	Actions map[string]*ActionNode
	// contains filtered or unexported fields
}

Definition describes a saga's structure - its actions and their dependencies. Definitions are stateless and registered at application startup.

func GetDefinition

func GetDefinition(name string) (*Definition, bool)

GetDefinition retrieves a saga definition from the global registry.

type EntityStorage

type EntityStorage struct {
	// contains filtered or unexported fields
}

EntityStorage implements Storage using the entity store.

func NewEntityStorage

func NewEntityStorage(store entity.Store, log *slog.Logger) *EntityStorage

NewEntityStorage creates a storage backed by an entity store.

func (*EntityStorage) Get

func (s *EntityStorage) Get(ctx context.Context, id string) (*Execution, error)

Get retrieves an execution by ID.

func (*EntityStorage) ListIncomplete

func (s *EntityStorage) ListIncomplete(ctx context.Context) ([]*Execution, error)

ListIncomplete returns all executions that need recovery.

func (*EntityStorage) Save

func (s *EntityStorage) Save(ctx context.Context, exec *Execution) error

Save persists the execution state as an entity.

type Execution

type Execution struct {
	// ID is the unique identifier for this execution.
	ID string `json:"id"`

	// DefinitionName references the registered saga definition.
	DefinitionName string `json:"definition_name"`

	// DefinitionVersion is the version of the definition when started.
	DefinitionVersion int `json:"definition_version"`

	// InitialInputs contains the bootstrap data for the saga.
	// All values must be JSON-serializable.
	InitialInputs map[string]any `json:"initial_inputs"`

	// Status is the current state of the execution.
	Status Status `json:"status"`

	// ExecutedActions maps action names to their results.
	ExecutedActions map[string]*ActionResult `json:"executed_actions"`

	// ExecutionOrder records the order actions were executed for reverse undo.
	ExecutionOrder []string `json:"execution_order"`

	// Error is set if the saga failed.
	Error string `json:"error,omitempty"`

	// CreatedAt is when the execution was created.
	CreatedAt time.Time `json:"created_at"`

	// UpdatedAt is when the execution was last updated.
	UpdatedAt time.Time `json:"updated_at"`
}

Execution tracks the runtime state of a saga, persisted after each step.

type Executor

type Executor struct {
	// contains filtered or unexported fields
}

Executor orchestrates saga execution with durable logging.

func NewExecutor

func NewExecutor(storage Storage, opts ...ExecutorOption) *Executor

NewExecutor creates an executor with the given storage and options.

func (*Executor) Recover

func (e *Executor) Recover(ctx context.Context) error

Recover finds and resumes incomplete sagas after a restart.

func (*Executor) Start

func (e *Executor) Start(definitionName string) *StartBuilder

Start begins building a saga execution.

type ExecutorOption

type ExecutorOption func(*Executor)

ExecutorOption configures an Executor.

func WithLogger

func WithLogger(log *slog.Logger) ExecutorOption

WithLogger sets a custom logger for the executor.

func WithRegistry

func WithRegistry(r *Registry) ExecutorOption

WithRegistry sets a custom registry for the executor. Useful for testing to avoid global state.

type MemoryStorage

type MemoryStorage struct {
	// contains filtered or unexported fields
}

MemoryStorage is a simple in-memory storage implementation for testing and examples.

func NewMemoryStorage

func NewMemoryStorage() *MemoryStorage

NewMemoryStorage creates a new in-memory storage.

func (*MemoryStorage) Get

func (m *MemoryStorage) Get(ctx context.Context, id string) (*Execution, error)

Get retrieves an execution by ID.

func (*MemoryStorage) ListIncomplete

func (m *MemoryStorage) ListIncomplete(ctx context.Context) ([]*Execution, error)

ListIncomplete returns all executions that need recovery. This includes pending (crashed before starting), running, and undoing sagas.

func (*MemoryStorage) Save

func (m *MemoryStorage) Save(ctx context.Context, exec *Execution) error

Save persists an execution to memory.

type Registry

type Registry struct {
	// contains filtered or unexported fields
}

Registry holds registered saga definitions.

func NewRegistry

func NewRegistry() *Registry

NewRegistry creates a new empty registry. Useful for testing to avoid global state.

func (*Registry) Get

func (r *Registry) Get(name string) (*Definition, bool)

Get retrieves a definition by name.

func (*Registry) Register

func (r *Registry) Register(def *Definition) error

Register adds a definition to the registry.

type StartBuilder

type StartBuilder struct {
	// contains filtered or unexported fields
}

StartBuilder provides a fluent API for starting saga executions.

func (*StartBuilder) Execute

func (sb *StartBuilder) Execute(ctx context.Context) error

Execute runs the saga to completion or failure.

func (*StartBuilder) Input

func (sb *StartBuilder) Input(key string, value any) *StartBuilder

Input adds an initial input value to the saga execution.

func (*StartBuilder) WithID

func (sb *StartBuilder) WithID(id string) *StartBuilder

WithID sets a specific execution ID (otherwise one is generated).

type Status

type Status string

Status represents the current state of a saga execution.

const (
	// StatusPending indicates the saga has been created but not started.
	StatusPending Status = "pending"

	// StatusRunning indicates the saga is actively executing actions.
	StatusRunning Status = "running"

	// StatusUndoing indicates the saga is rolling back due to a failure.
	StatusUndoing Status = "undoing"

	// StatusCompleted indicates all actions completed successfully.
	StatusCompleted Status = "completed"

	// StatusFailed indicates the saga failed and all undos have been attempted.
	StatusFailed Status = "failed"
)

type Storage

type Storage interface {
	// Save persists the execution state.
	Save(ctx context.Context, exec *Execution) error

	// Get retrieves an execution by ID.
	Get(ctx context.Context, id string) (*Execution, error)

	// ListIncomplete returns all executions that need recovery (Pending, Running, or Undoing).
	ListIncomplete(ctx context.Context) ([]*Execution, error)
}

Storage persists saga execution state.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL