saga

package
v1.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 8, 2025 License: MIT Imports: 7 Imported by: 0

README

Saga Pattern Package

Home  /  Saga Pattern Package

 

The saga package provides distributed transaction support using the Saga pattern for RabbitMQ applications. The Saga pattern is a sequence of local transactions where each transaction updates data within a single service. If a local transaction fails, the saga executes compensating transactions to undo the impact of preceding transactions.

 

Features

  • Orchestration Engine - Fully automated saga orchestration with message-driven step execution
  • Atomic State Updates - Concurrency-safe state management with atomic step and saga updates
  • Step Execution - Execute saga steps with automatic progression and error handling
  • Compensation Logic - Automatic rollback through compensating actions
  • Pluggable Storage - Interface for different persistence implementations with atomic operations
  • In-Memory Store - Built-in concurrent-safe store for testing and development
  • Message-Driven - Uses RabbitMQ for reliable step coordination and orchestration
  • Error Handling - Comprehensive error tracking, recovery, and compensation triggers
  • Production Ready - Designed for high-concurrency production workloads

🔝 back to top

 

Quick Start

Setting Up the Orchestration Engine
package main

import (
    "context"
    "log"
    "time"

    "github.com/cloudresty/go-rabbitmq"
    "github.com/cloudresty/go-rabbitmq/saga"
)

func main() {
    // Create RabbitMQ client
    client, err := rabbitmq.NewClient(
        rabbitmq.WithHosts("localhost:5672"),
    )
    if err != nil {
        log.Fatal(err)
    }
    defer client.Close()

    // Create saga store
    store := saga.NewInMemoryStore()

    // Define step and compensation handlers
    stepHandlers := map[string]saga.StepHandler{
        "orders.create": createOrderHandler,
        "inventory.reserve": reserveInventoryHandler,
        "payment.charge": chargePaymentHandler,
    }

    compensationHandlers := map[string]saga.CompensationHandler{
        "orders.delete": deleteOrderHandler,
        "inventory.release": releaseInventoryHandler,
        "payment.refund": refundPaymentHandler,
    }

    // Create saga manager with handlers
    manager, err := saga.NewManager(client, store, saga.Config{
        SagaExchange:         "sagas",
        StepQueue:           "saga.steps",
        CompensateQueue:     "saga.compensate",
        StepHandlers:        stepHandlers,
        CompensationHandlers: compensationHandlers,
    })
    if err != nil {
        log.Fatal(err)
    }
    defer manager.Close()

    // Start the orchestration engine (this is the heart of the saga system)
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()

    // Run the orchestration engine in the background
    go func() {
        if err := manager.Run(ctx); err != nil {
            log.Printf("Orchestration engine error: %v", err)
        }
    }()

    // Define saga steps
    steps := []saga.Step{
        {
            Name:         "create_order",
            Action:       "orders.create",
            Compensation: "orders.delete",
            Input: map[string]any{
                "product_id": "prod-123",
                "quantity":   2,
            },
        },
        {
            Name:         "reserve_inventory",
            Action:       "inventory.reserve",
            Compensation: "inventory.release",
            Input: map[string]any{
                "product_id": "prod-123",
                "quantity":   2,
            },
        },
        {
            Name:         "charge_payment",
            Action:       "payment.charge",
            Compensation: "payment.refund",
            Input: map[string]any{
                "amount":         99.99,
                "payment_method": "card-456",
            },
        },
    }

    // Start saga (orchestration engine will automatically execute steps)
    orderContext := map[string]any{
        "customer_id": "cust-789",
        "order_total": 99.99,
    }

    s, err := manager.Start(context.Background(), "order_processing", steps, orderContext)
    if err != nil {
        log.Fatal(err)
    }

    log.Printf("Started saga: %s", s.ID)

    // Monitor saga progress
    for {
        time.Sleep(1 * time.Second)

        currentSaga, err := manager.Get(context.Background(), s.ID)
        if err != nil {
            log.Printf("Error getting saga: %v", err)
            continue
        }

        log.Printf("Saga %s is %s", currentSaga.ID, currentSaga.State)

        if currentSaga.IsCompleted() {
            log.Println("Saga completed successfully!")
            break
        } else if currentSaga.IsFailed() || currentSaga.IsCompensated() {
            log.Printf("Saga failed or compensated: %s", currentSaga.State)
            break
        }
    }
}

🔝 back to top

 

Implementing Step and Compensation Handlers
// Step handler for creating orders
func createOrderHandler(ctx context.Context, s *saga.Saga, step *saga.Step) error {
    // Extract input data
    productID := step.Input["product_id"].(string)
    quantity := step.Input["quantity"].(int)
    customerID := s.Context["customer_id"].(string)

    log.Printf("Creating order for customer %s: %d x %s", customerID, quantity, productID)

    // Simulate order creation logic
    orderID := generateOrderID()

    // Perform actual order creation
    if err := createOrderInDatabase(orderID, customerID, productID, quantity); err != nil {
        return fmt.Errorf("failed to create order: %w", err)
    }

    // Update step output for use in subsequent steps or compensation
    step.Output = map[string]any{
        "order_id": orderID,
        "status":   "created",
        "created_at": time.Now(),
    }

    log.Printf("Order created successfully: %s", orderID)
    return nil
}

// Step handler for inventory reservation
func reserveInventoryHandler(ctx context.Context, s *saga.Saga, step *saga.Step) error {
    productID := step.Input["product_id"].(string)
    quantity := step.Input["quantity"].(int)

    log.Printf("Reserving %d units of %s", quantity, productID)

    // Check inventory availability
    available, err := checkInventoryAvailability(productID, quantity)
    if err != nil {
        return fmt.Errorf("failed to check inventory: %w", err)
    }

    if !available {
        return fmt.Errorf("insufficient inventory for product %s (need %d)", productID, quantity)
    }

    // Reserve inventory
    reservationID, err := reserveInventory(productID, quantity)
    if err != nil {
        return fmt.Errorf("failed to reserve inventory: %w", err)
    }

    step.Output = map[string]any{
        "reservation_id": reservationID,
        "reserved_qty":   quantity,
        "reserved_at":    time.Now(),
    }

    log.Printf("Inventory reserved: %s", reservationID)
    return nil
}

// Step handler for payment processing
func chargePaymentHandler(ctx context.Context, s *saga.Saga, step *saga.Step) error {
    amount := step.Input["amount"].(float64)
    paymentMethod := step.Input["payment_method"].(string)
    customerID := s.Context["customer_id"].(string)

    log.Printf("Charging $%.2f to %s for customer %s", amount, paymentMethod, customerID)

    // Process payment with timeout
    ctx, cancel := context.WithTimeout(ctx, 30*time.Second)
    defer cancel()

    transactionID, err := processPayment(ctx, paymentMethod, amount, customerID)
    if err != nil {
        return fmt.Errorf("payment failed: %w", err)
    }

    step.Output = map[string]any{
        "transaction_id":  transactionID,
        "amount_charged":  amount,
        "charged_at":      time.Now(),
        "payment_method":  paymentMethod,
    }

    log.Printf("Payment processed successfully: %s", transactionID)
    return nil
}

// Compensation handler for order deletion
func deleteOrderHandler(ctx context.Context, s *saga.Saga, step *saga.Step) error {
    // Extract order ID from step output
    orderID, exists := step.Output["order_id"]
    if !exists {
        log.Printf("No order ID found in step output, skipping deletion")
        return nil // Idempotent - if no order was created, nothing to delete
    }

    log.Printf("Deleting order %s", orderID)

    if err := deleteOrderFromDatabase(orderID.(string)); err != nil {
        return fmt.Errorf("failed to delete order: %w", err)
    }

    log.Printf("Order %s deleted successfully", orderID)
    return nil
}

// Compensation handler for inventory release
func releaseInventoryHandler(ctx context.Context, s *saga.Saga, step *saga.Step) error {
    reservationID, exists := step.Output["reservation_id"]
    if !exists {
        log.Printf("No reservation ID found, skipping inventory release")
        return nil // Idempotent
    }

    log.Printf("Releasing inventory reservation %s", reservationID)

    if err := releaseInventoryReservation(reservationID.(string)); err != nil {
        return fmt.Errorf("failed to release inventory: %w", err)
    }

    log.Printf("Inventory reservation %s released successfully", reservationID)
    return nil
}

// Compensation handler for payment refund
func refundPaymentHandler(ctx context.Context, s *saga.Saga, step *saga.Step) error {
    transactionID, exists := step.Output["transaction_id"]
    if !exists {
        log.Printf("No transaction ID found, skipping refund")
        return nil // Idempotent
    }

    amount := step.Output["amount_charged"].(float64)
    log.Printf("Refunding $%.2f for transaction %s", amount, transactionID)

    refundID, err := processRefund(transactionID.(string), amount)
    if err != nil {
        return fmt.Errorf("failed to process refund: %w", err)
    }

    log.Printf("Refund processed successfully: %s", refundID)
    return nil
}

🔝 back to top

 

Saga States and Lifecycle

Orchestration Engine

The saga package includes a complete orchestration engine that automatically manages saga execution:

// Start the orchestration engine
go func() {
    if err := manager.Run(ctx); err != nil {
        log.Printf("Orchestration engine error: %v", err)
    }
}()

🔝 back to top

 

What the orchestration engine does:

  1. Message Processing: Consumes step execution and compensation messages from RabbitMQ queues
  2. Handler Execution: Looks up and executes registered step/compensation handlers
  3. State Management: Atomically updates saga and step states in the store
  4. Flow Control: Automatically publishes the next step message when a step completes
  5. Error Handling: Triggers compensation when steps fail
  6. Concurrency Safety: Uses atomic operations to prevent race conditions

🔝 back to top

 

Atomic State Updates

The package provides concurrency-safe state updates through atomic operations:

// Store interface includes atomic update methods
type Store interface {
    SaveSaga(ctx context.Context, saga *Saga) error
    LoadSaga(ctx context.Context, sagaID string) (*Saga, error)

    // Atomic operations for concurrency safety
    UpdateSagaStep(ctx context.Context, sagaID, stepID string, status State, output map[string]any, errorMsg string) (*Saga, error)
    UpdateSagaState(ctx context.Context, sagaID string, state State) (*Saga, error)
}

🔝 back to top

 

Benefits of atomic updates:

  • Race Condition Prevention: Multiple concurrent processes can safely update the same saga
  • Consistency: Guarantees that state changes are atomic and consistent
  • Performance: Eliminates the need for load-modify-save patterns
  • Reliability: Ensures saga state is always accurate, even under high concurrency

🔝 back to top

 

Saga States
saga.StateStarted      // Saga has been initiated
saga.StateCompleted    // All steps completed successfully
saga.StateFailed       // One or more steps failed
saga.StateCompensating // Compensation is in progress
saga.StateCompensated  // All compensations completed

🔝 back to top

 

Step States
saga.StateStarted   // Step is ready to execute
saga.StateCompleted // Step executed successfully
saga.StateFailed    // Step execution failed

🔝 back to top

 

Monitoring Saga Progress
// Get saga status
s, err := manager.Get(context.Background(), sagaID)
if err != nil {
    log.Fatal(err)
}

fmt.Printf("Saga %s is %s\n", s.ID, s.State)

// Check individual steps
for _, step := range s.Steps {
    fmt.Printf("Step %s (%s): %s\n", step.Name, step.Action, step.Status)
    if step.Error != "" {
        fmt.Printf("  Error: %s\n", step.Error)
    }
}

// Use helper methods
if s.IsCompleted() {
    fmt.Println("Saga completed successfully!")
}

if s.IsFailed() {
    fmt.Println("Saga failed, starting compensation...")
    err := manager.Compensate(context.Background(), s.ID)
    if err != nil {
        log.Printf("Compensation failed: %v", err)
    }
}

🔝 back to top

 

Error Handling and Compensation

Automatic Compensation

The orchestration engine automatically triggers compensation when steps fail:

// No manual intervention needed - the engine handles this automatically
// When a step handler returns an error:
func problematicStepHandler(ctx context.Context, s *saga.Saga, step *saga.Step) error {
    // If this returns an error, the orchestration engine will:
    // 1. Mark the step as failed
    // 2. Mark the saga as failed
    // 3. Automatically start compensation
    return fmt.Errorf("step failed for some reason")
}

🔝 back to top

 

Manual Compensation Trigger

You can also manually trigger compensation:

// Manually trigger compensation for a saga
err := manager.Compensate(context.Background(), sagaID)
if err != nil {
    log.Printf("Failed to start compensation: %v", err)
}

🔝 back to top

 

Monitoring Saga State
// Monitor saga progress
saga, err := manager.Get(context.Background(), sagaID)
if err != nil {
    log.Printf("Error getting saga: %v", err)
    return
}

// Check saga state
switch saga.State {
case saga.StateCompleted:
    log.Println("Saga completed successfully!")
case saga.StateFailed:
    log.Println("Saga failed, compensation should start automatically")
case saga.StateCompensating:
    log.Println("Saga is currently being compensated")
case saga.StateCompensated:
    log.Println("Saga has been fully compensated")
default:
    log.Printf("Saga is in progress: %s", saga.State)
}

// Check individual steps
for _, step := range saga.Steps {
    if step.Status == saga.StateFailed {
        log.Printf("Step %s failed: %s", step.Name, step.Error)
    }
}

🔝 back to top

 

Custom Persistence Store

Implement the Store interface for production use with atomic operations:

type PostgresSagaStore struct {
    db *sql.DB
}

func (p *PostgresSagaStore) SaveSaga(ctx context.Context, saga *saga.Saga) error {
    query := `
        INSERT INTO sagas (id, name, state, steps, context, created_at, updated_at, completed_at)
        VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
        ON CONFLICT (id) DO UPDATE SET
            state = $3, steps = $4, context = $5, updated_at = $7, completed_at = $8
    `

    stepsJSON, _ := json.Marshal(saga.Steps)
    contextJSON, _ := json.Marshal(saga.Context)

    _, err := p.db.ExecContext(ctx, query,
        saga.ID, saga.Name, saga.State, stepsJSON, contextJSON,
        saga.CreatedAt, saga.UpdatedAt, saga.CompletedAt)

    return err
}

func (p *PostgresSagaStore) LoadSaga(ctx context.Context, sagaID string) (*saga.Saga, error) {
    query := `
        SELECT id, name, state, steps, context, created_at, updated_at, completed_at
        FROM sagas WHERE id = $1
    `

    var s saga.Saga
    var stepsJSON, contextJSON []byte

    err := p.db.QueryRowContext(ctx, query, sagaID).Scan(
        &s.ID, &s.Name, &s.State, &stepsJSON, &contextJSON,
        &s.CreatedAt, &s.UpdatedAt, &s.CompletedAt)

    if err != nil {
        return nil, err
    }

    json.Unmarshal(stepsJSON, &s.Steps)
    json.Unmarshal(contextJSON, &s.Context)

    return &s, nil
}

// Atomic step update - critical for concurrency safety
func (p *PostgresSagaStore) UpdateSagaStep(ctx context.Context, sagaID, stepID string, status saga.State, output map[string]any, errorMsg string) (*saga.Saga, error) {
    tx, err := p.db.BeginTx(ctx, nil)
    if err != nil {
        return nil, err
    }
    defer tx.Rollback()

    // Lock the saga row for update
    var s saga.Saga
    var stepsJSON, contextJSON []byte

    query := `
        SELECT id, name, state, steps, context, created_at, updated_at, completed_at
        FROM sagas WHERE id = $1 FOR UPDATE
    `

    err = tx.QueryRowContext(ctx, query, sagaID).Scan(
        &s.ID, &s.Name, &s.State, &stepsJSON, &contextJSON,
        &s.CreatedAt, &s.UpdatedAt, &s.CompletedAt)

    if err != nil {
        return nil, err
    }

    json.Unmarshal(stepsJSON, &s.Steps)
    json.Unmarshal(contextJSON, &s.Context)

    // Find and update the step
    stepFound := false
    for i := range s.Steps {
        if s.Steps[i].ID == stepID {
            s.Steps[i].Status = status
            s.Steps[i].Output = output
            s.Steps[i].Error = errorMsg
            s.Steps[i].ExecutedAt = time.Now()
            stepFound = true
            break
        }
    }

    if !stepFound {
        return nil, fmt.Errorf("step not found: %s", stepID)
    }

    // Update saga state based on step states
    s.UpdatedAt = time.Now()
    allCompleted := true
    anyFailed := false

    for _, step := range s.Steps {
        if step.Status == saga.StateFailed {
            anyFailed = true
            break
        }
        if step.Status != saga.StateCompleted {
            allCompleted = false
        }
    }

    if anyFailed {
        s.State = saga.StateFailed
    } else if allCompleted {
        s.State = saga.StateCompleted
        now := time.Now()
        s.CompletedAt = &now
    }

    // Save updated saga
    updatedStepsJSON, _ := json.Marshal(s.Steps)
    updatedContextJSON, _ := json.Marshal(s.Context)

    updateQuery := `
        UPDATE sagas SET
            state = $2, steps = $3, context = $4, updated_at = $5, completed_at = $6
        WHERE id = $1
    `

    _, err = tx.ExecContext(ctx, updateQuery,
        s.ID, s.State, updatedStepsJSON, updatedContextJSON, s.UpdatedAt, s.CompletedAt)

    if err != nil {
        return nil, err
    }

    return &s, tx.Commit()
}

// Atomic saga state update
func (p *PostgresSagaStore) UpdateSagaState(ctx context.Context, sagaID string, state saga.State) (*saga.Saga, error) {
    tx, err := p.db.BeginTx(ctx, nil)
    if err != nil {
        return nil, err
    }
    defer tx.Rollback()

    // Lock and load saga
    var s saga.Saga
    var stepsJSON, contextJSON []byte

    query := `
        SELECT id, name, state, steps, context, created_at, updated_at, completed_at
        FROM sagas WHERE id = $1 FOR UPDATE
    `

    err = tx.QueryRowContext(ctx, query, sagaID).Scan(
        &s.ID, &s.Name, &s.State, &stepsJSON, &contextJSON,
        &s.CreatedAt, &s.UpdatedAt, &s.CompletedAt)

    if err != nil {
        return nil, err
    }

    json.Unmarshal(stepsJSON, &s.Steps)
    json.Unmarshal(contextJSON, &s.Context)

    // Update state
    s.State = state
    s.UpdatedAt = time.Now()

    if state == saga.StateCompleted {
        now := time.Now()
        s.CompletedAt = &now
    }

    // Save updated saga
    updateQuery := `
        UPDATE sagas SET state = $2, updated_at = $3, completed_at = $4
        WHERE id = $1
    `

    _, err = tx.ExecContext(ctx, updateQuery, s.ID, s.State, s.UpdatedAt, s.CompletedAt)
    if err != nil {
        return nil, err
    }

    return &s, tx.Commit()
}

// Implement remaining Store methods...
func (p *PostgresSagaStore) DeleteSaga(ctx context.Context, sagaID string) error {
    _, err := p.db.ExecContext(ctx, "DELETE FROM sagas WHERE id = $1", sagaID)
    return err
}

func (p *PostgresSagaStore) ListActiveSagas(ctx context.Context) ([]*saga.Saga, error) {
    query := `
        SELECT id, name, state, steps, context, created_at, updated_at, completed_at
        FROM sagas WHERE state IN ('started', 'compensating')
    `

    rows, err := p.db.QueryContext(ctx, query)
    if err != nil {
        return nil, err
    }
    defer rows.Close()

    var sagas []*saga.Saga
    for rows.Next() {
        var s saga.Saga
        var stepsJSON, contextJSON []byte

        err := rows.Scan(&s.ID, &s.Name, &s.State, &stepsJSON, &contextJSON,
            &s.CreatedAt, &s.UpdatedAt, &s.CompletedAt)
        if err != nil {
            return nil, err
        }

        json.Unmarshal(stepsJSON, &s.Steps)
        json.Unmarshal(contextJSON, &s.Context)

        sagas = append(sagas, &s)
    }

    return sagas, rows.Err()
}

🔝 back to top

 

Advanced Patterns

Parallel Steps
// Define parallel steps with dependencies
steps := []saga.Step{
    {Name: "create_order", Action: "orders.create"},

    // These can run in parallel
    {Name: "reserve_inventory", Action: "inventory.reserve"},
    {Name: "validate_address", Action: "shipping.validate"},

    // This waits for parallel steps
    {Name: "charge_payment", Action: "payment.charge"},
}

🔝 back to top

 

Conditional Steps
stepHandler := func(ctx context.Context, s *saga.Saga, step *saga.Step) error {
    // Check saga context for conditions
    if s.Context["requires_approval"].(bool) {
        // Execute approval step
        return requestApproval(s.Context)
    }

    // Skip this step
    return nil
}

🔝 back to top

 

Saga Composition
// Compose larger sagas from smaller ones
func createOrderSaga(orderData map[string]any) []saga.Step {
    steps := []saga.Step{
        {Name: "validate_order", Action: "orders.validate"},
    }

    // Add payment steps if required
    if orderData["payment_required"].(bool) {
        steps = append(steps, createPaymentSteps(orderData)...)
    }

    // Add shipping steps if physical goods
    if orderData["requires_shipping"].(bool) {
        steps = append(steps, createShippingSteps(orderData)...)
    }

    return steps
}

🔝 back to top

 

Integration Examples

Production Service Setup
func main() {
    // Create RabbitMQ client
    client, err := rabbitmq.NewClient(
        rabbitmq.WithHosts("localhost:5672"),
        rabbitmq.WithConnectionName("saga-service"),
    )
    if err != nil {
        log.Fatal(err)
    }
    defer client.Close()

    // Create production store (PostgreSQL example)
    store, err := NewPostgresSagaStore(databaseURL)
    if err != nil {
        log.Fatal(err)
    }

    // Create saga manager with all handlers
    manager, err := saga.NewManager(client, store, saga.Config{
        SagaExchange:         "sagas",
        StepQueue:           "saga.steps",
        CompensateQueue:     "saga.compensate",
        StepHandlers:        buildStepHandlers(),
        CompensationHandlers: buildCompensationHandlers(),
    })
    if err != nil {
        log.Fatal(err)
    }
    defer manager.Close()

    // Start orchestration engine
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()

    // Handle graceful shutdown
    go func() {
        sigChan := make(chan os.Signal, 1)
        signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
        <-sigChan
        log.Println("Shutting down saga service...")
        cancel()
    }()

    // Start the orchestration engine
    log.Println("Starting saga orchestration engine...")
    if err := manager.Run(ctx); err != nil && err != context.Canceled {
        log.Printf("Orchestration engine error: %v", err)
    }
}

func buildStepHandlers() map[string]saga.StepHandler {
    return map[string]saga.StepHandler{
        "orders.create":       orderService.CreateOrder,
        "inventory.reserve":   inventoryService.ReserveInventory,
        "payment.charge":      paymentService.ChargePayment,
        "shipping.schedule":   shippingService.ScheduleShipment,
        "notifications.send":  notificationService.SendNotification,
    }
}

func buildCompensationHandlers() map[string]saga.CompensationHandler {
    return map[string]saga.CompensationHandler{
        "orders.delete":        orderService.DeleteOrder,
        "inventory.release":    inventoryService.ReleaseInventory,
        "payment.refund":       paymentService.RefundPayment,
        "shipping.cancel":      shippingService.CancelShipment,
        "notifications.cancel": notificationService.CancelNotification,
    }
}

🔝 back to top

 

With HTTP API
func startOrderSaga(w http.ResponseWriter, r *http.Request) {
    var orderRequest OrderRequest
    if err := json.NewDecoder(r.Body).Decode(&orderRequest); err != nil {
        http.Error(w, "Invalid request", http.StatusBadRequest)
        return
    }

    steps := createOrderSteps(orderRequest)
    context := map[string]any{
        "customer_id": orderRequest.CustomerID,
        "order_data":  orderRequest,
        "request_id":  r.Header.Get("X-Request-ID"),
    }

    saga, err := sagaManager.Start(r.Context(), "order_processing", steps, context)
    if err != nil {
        log.Printf("Failed to start saga: %v", err)
        http.Error(w, err.Error(), http.StatusInternalServerError)
        return
    }

    response := map[string]any{
        "saga_id": saga.ID,
        "status":  saga.State,
        "message": "Order processing started",
    }

    w.Header().Set("Content-Type", "application/json")
    json.NewEncoder(w).Encode(response)
}

func getSagaStatus(w http.ResponseWriter, r *http.Request) {
    sagaID := mux.Vars(r)["sagaId"]

    saga, err := sagaManager.Get(r.Context(), sagaID)
    if err != nil {
        http.Error(w, "Saga not found", http.StatusNotFound)
        return
    }

    // Create detailed response
    response := map[string]any{
        "saga_id":      saga.ID,
        "name":         saga.Name,
        "state":        saga.State,
        "created_at":   saga.CreatedAt,
        "updated_at":   saga.UpdatedAt,
        "completed_at": saga.CompletedAt,
        "steps": func() []map[string]any {
            steps := make([]map[string]any, len(saga.Steps))
            for i, step := range saga.Steps {
                steps[i] = map[string]any{
                    "id":          step.ID,
                    "name":        step.Name,
                    "action":      step.Action,
                    "status":      step.Status,
                    "executed_at": step.ExecutedAt,
                    "error":       step.Error,
                }
            }
            return steps
        }(),
    }

    w.Header().Set("Content-Type", "application/json")
    json.NewEncoder(w).Encode(response)
}

// Webhook endpoint for external service notifications
func handleStepCompletion(w http.ResponseWriter, r *http.Request) {
    var notification struct {
        SagaID   string                 `json:"saga_id"`
        StepID   string                 `json:"step_id"`
        Status   string                 `json:"status"`
        Output   map[string]any `json:"output"`
        Error    string                 `json:"error"`
    }

    if err := json.NewDecoder(r.Body).Decode(&notification); err != nil {
        http.Error(w, "Invalid notification", http.StatusBadRequest)
        return
    }

    // External services can notify saga completion via webhooks
    // The orchestration engine will handle the next steps automatically
    log.Printf("Received step completion notification: saga=%s, step=%s, status=%s",
        notification.SagaID, notification.StepID, notification.Status)

    w.WriteHeader(http.StatusOK)
}

🔝 back to top

 

With Worker Pattern
func startSagaWorkers(manager *saga.Manager, config saga.Config) {
    // Start step execution workers
    go func() {
        for {
            // Consume step execution messages
            err := consumer.Consume(context.Background(), config.StepQueue,
                func(ctx context.Context, delivery rabbitmq.Delivery) error {
                    return handleStepExecution(ctx, manager, delivery)
                })
            if err != nil {
                log.Printf("Step consumer error: %v", err)
            }
        }
    }()

    // Start compensation workers
    go func() {
        for {
            // Consume compensation messages
            err := consumer.Consume(context.Background(), config.CompensateQueue,
                func(ctx context.Context, delivery rabbitmq.Delivery) error {
                    return handleCompensation(ctx, manager, delivery)
                })
            if err != nil {
                log.Printf("Compensation consumer error: %v", err)
            }
        }
    }()
}

🔝 back to top

 

Best Practices

1. Orchestration Engine Management
// Always run the orchestration engine in production
func startSagaService() {
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()

    // Graceful shutdown
    go func() {
        sigChan := make(chan os.Signal, 1)
        signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
        <-sigChan
        log.Println("Shutting down...")
        cancel()
    }()

    // Start orchestration engine
    if err := manager.Run(ctx); err != nil && err != context.Canceled {
        log.Printf("Engine error: %v", err)
    }
}

🔝 back to top

 

2. Idempotent Operations
// Make steps idempotent to handle retries safely
stepHandler := func(ctx context.Context, s *saga.Saga, step *saga.Step) error {
    // Check if already processed
    if step.Output["processed"] == true {
        return nil
    }

    // Perform operation
    result, err := performOperation(step.Input)
    if err != nil {
        return err
    }

    // Mark as processed
    step.Output = map[string]any{
        "processed": true,
        "result":    result,
        "timestamp": time.Now(),
    }

    return nil
}

🔝 back to top

 

3. Timeout Handling
stepHandler := func(ctx context.Context, s *saga.Saga, step *saga.Step) error {
    // Create timeout context for step execution
    ctx, cancel := context.WithTimeout(ctx, 30*time.Second)
    defer cancel()

    // Perform operation with timeout
    return performOperationWithTimeout(ctx, step.Input)
}

🔝 back to top

 

4. Retry Logic with Exponential Backoff
stepHandler := func(ctx context.Context, s *saga.Saga, step *saga.Step) error {
    maxRetries := 3
    baseDelay := time.Second

    for attempt := 0; attempt < maxRetries; attempt++ {
        err := performOperation(step.Input)
        if err == nil {
            return nil
        }

        // Check if this is the last attempt
        if attempt == maxRetries-1 {
            return fmt.Errorf("operation failed after %d attempts: %w", maxRetries, err)
        }

        // Exponential backoff
        delay := baseDelay * time.Duration(1<<attempt)
        log.Printf("Step %s failed (attempt %d), retrying in %v: %v",
            step.Name, attempt+1, delay, err)

        select {
        case <-time.After(delay):
            continue
        case <-ctx.Done():
            return ctx.Err()
        }
    }

    return nil
}

🔝 back to top

 

5. Error Classification and Handling
// Define error types for better compensation decisions
type StepError struct {
    Code    string
    Message string
    Retryable bool
}

func (e StepError) Error() string {
    return e.Message
}

stepHandler := func(ctx context.Context, s *saga.Saga, step *saga.Step) error {
    result, err := performBusinessOperation(step.Input)
    if err != nil {
        // Classify the error
        switch {
        case isTemporaryError(err):
            return StepError{
                Code: "TEMPORARY_ERROR",
                Message: err.Error(),
                Retryable: true,
            }
        case isBusinessRuleViolation(err):
            return StepError{
                Code: "BUSINESS_RULE_VIOLATION",
                Message: err.Error(),
                Retryable: false,
            }
        default:
            return err
        }
    }

    step.Output = map[string]any{
        "result": result,
        "success": true,
    }

    return nil
}

🔝 back to top

 

6. Monitoring and Observability
func monitorSagas(manager *saga.Manager) {
    ticker := time.NewTicker(time.Minute)
    go func() {
        defer ticker.Stop()
        for range ticker.C {
            active, err := manager.ListActive(context.Background())
            if err != nil {
                log.Printf("Failed to list active sagas: %v", err)
                continue
            }

            log.Printf("Active sagas: %d", len(active))

            // Check for stale sagas
            staleThreshold := time.Hour
            for _, saga := range active {
                if time.Since(saga.UpdatedAt) > staleThreshold {
                    log.Printf("Stale saga detected: %s (last updated: %v)",
                        saga.ID, saga.UpdatedAt)

                    // Emit metrics for alerting
                    emitStaleSagaMetric(saga.ID, saga.Name, time.Since(saga.UpdatedAt))
                }
            }
        }
    }()
}

func emitStaleSagaMetric(sagaID, sagaName string, staleDuration time.Duration) {
    // Integrate with your monitoring system (Prometheus, etc.)
    staleSagaCounter.WithLabelValues(sagaName).Inc()
    staleDurationGauge.WithLabelValues(sagaName).Set(staleDuration.Seconds())
}

🔝 back to top

 

7. Resource Management
// Properly manage resources in handlers
func createOrderHandler(ctx context.Context, s *saga.Saga, step *saga.Step) error {
    // Use connection pools for database operations
    db := getDBConnection()
    defer db.Close()

    // Use transactions for consistency
    tx, err := db.BeginTx(ctx, nil)
    if err != nil {
        return err
    }
    defer tx.Rollback()

    // Perform operations
    orderID, err := createOrderInTx(tx, step.Input)
    if err != nil {
        return err
    }

    // Commit transaction
    if err := tx.Commit(); err != nil {
        return err
    }

    step.Output = map[string]any{
        "order_id": orderID,
        "created_at": time.Now(),
    }

    return nil
}

🔝 back to top

 

8. Compensation Best Practices
// Make compensations idempotent and safe
func deleteOrderCompensation(ctx context.Context, s *saga.Saga, step *saga.Step) error {
    orderID, exists := step.Output["order_id"]
    if !exists {
        log.Printf("No order ID found for compensation, skipping")
        return nil // Safe to skip if no order was created
    }

    // Check if order still exists
    exists, err := orderExists(ctx, orderID.(string))
    if err != nil {
        return fmt.Errorf("failed to check order existence: %w", err)
    }

    if !exists {
        log.Printf("Order %s already deleted, compensation idempotent", orderID)
        return nil // Idempotent - already deleted
    }

    // Delete the order
    if err := deleteOrder(ctx, orderID.(string)); err != nil {
        return fmt.Errorf("failed to delete order %s: %w", orderID, err)
    }

    log.Printf("Order %s deleted successfully", orderID)
    return nil
}

🔝 back to top

 

Configuration Reference

Config Structure
Field Type Description
SagaExchange string Exchange for saga coordination messages
StepQueue string Queue for step execution messages
CompensateQueue string Queue for compensation messages
StepHandlers map[string]StepHandler Step execution handlers (required for orchestration)
CompensationHandlers map[string]CompensationHandler Compensation handlers (required for orchestration)

🔝 back to top

 

Store Interface

The Store interface now includes atomic operations for concurrency safety:

Method Description
SaveSaga Save a complete saga (used for initial creation)
LoadSaga Load a saga by ID
DeleteSaga Delete a saga
ListActiveSagas List all active sagas (started or compensating)
UpdateSagaStep Atomically update a specific step within a saga
UpdateSagaState Atomically update the overall saga state

🔝 back to top

 

Saga Structure
Field Type Description
ID string Unique saga identifier (ULID)
Name string Saga name/type
State State Current saga state
Steps []Step Saga steps with execution details
Context map[string]any Saga-wide context data
CreatedAt time.Time Creation timestamp
UpdatedAt time.Time Last update timestamp
CompletedAt *time.Time Completion timestamp (nil if not completed)

🔝 back to top

 

Step Structure
Field Type Description
ID string Unique step identifier (ULID)
Name string Human-readable step name
Action string Action identifier for handler lookup
Compensation string Compensation action identifier
Input map[string]any Step input data
Output map[string]any Step output data
Status State Current step status
Error string Error message if step failed
ExecutedAt time.Time Step execution timestamp

🔝 back to top

 

Manager Methods
Method Description
NewManager Create a new saga manager with handlers
Run Start the orchestration engine (blocks until context cancelled)
Start Start a new saga (returns immediately)
Get Retrieve a saga by ID
ListActive List all active sagas
Compensate Manually trigger compensation for a saga
Stop Stop the orchestration engine gracefully
Close Close all resources (publisher, consumer)

🔝 back to top

 

Testing

# Run saga package tests
go test ./saga

# Run with race detection
go test -race ./saga

🔝 back to top

 

Orchestration Pattern

The saga package provides a complete orchestration engine with automatic step execution:

// Create manager with handlers
manager, err := saga.NewManager(client, store, saga.Config{
    SagaExchange:         "sagas",
    StepQueue:            "saga.steps",
    CompensateQueue:      "saga.compensate",
    StepHandlers:         stepHandlers,     // Required
    CompensationHandlers: compHandlers,    // Required
})

// Start orchestration engine (critical!)
go func() {
    if err := manager.Run(ctx); err != nil {
        log.Printf("Engine error: %v", err)
    }
}()

// Start sagas (they execute automatically)
s, err := manager.Start(ctx, name, steps, context)

The orchestration engine automatically handles step execution, state updates, and compensation flow.

🔝 back to top

 


 

An open source project brought to you by the Cloudresty team.

Website  |  LinkedIn  |  BlueSky  |  GitHub  |  Docker Hub

 

Documentation

Overview

Package saga provides distributed transaction support using the Saga pattern.

The Saga pattern is a sequence of local transactions where each transaction updates data within a single service. If a local transaction fails because it violates a business rule, the saga executes a compensating transaction to undo the impact of the preceding transactions.

This implementation provides:

  • Saga orchestration and coordination
  • Step execution with compensating actions
  • Pluggable persistence stores
  • Built-in in-memory store for testing
  • Message-driven step execution

Example usage:

// Create saga manager
store := saga.NewInMemoryStore()
manager, err := saga.NewManager(client, store, saga.Config{
	SagaExchange:    "sagas",
	StepQueue:       "saga.steps",
	CompensateQueue: "saga.compensate",
})

// Define saga steps
steps := []saga.Step{
	{Name: "create_order", Action: "orders.create", Compensation: "orders.delete"},
	{Name: "reserve_inventory", Action: "inventory.reserve", Compensation: "inventory.release"},
	{Name: "charge_payment", Action: "payment.charge", Compensation: "payment.refund"},
}

// Start saga
s, err := manager.Start(ctx, "order_processing", steps, orderContext)

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type CompensationHandler

type CompensationHandler func(ctx context.Context, saga *Saga, step *Step) error

CompensationHandler defines the function signature for compensation handlers

type Config

type Config struct {
	SagaExchange         string // Exchange for saga coordination messages
	StepQueue            string // Queue for saga step execution
	CompensateQueue      string // Queue for saga compensation
	StepHandlers         map[string]StepHandler
	CompensationHandlers map[string]CompensationHandler
	Logger               rabbitmq.Logger
}

Config holds configuration for saga management

type InMemoryStore

type InMemoryStore struct {
	// contains filtered or unexported fields
}

InMemoryStore provides a simple in-memory saga store for testing

func NewInMemoryStore

func NewInMemoryStore() *InMemoryStore

NewInMemoryStore creates a new in-memory saga store

func (*InMemoryStore) DeleteSaga

func (s *InMemoryStore) DeleteSaga(ctx context.Context, sagaID string) error

DeleteSaga deletes a saga from the in-memory store

func (*InMemoryStore) ListActiveSagas

func (s *InMemoryStore) ListActiveSagas(ctx context.Context) ([]*Saga, error)

ListActiveSagas lists all active sagas

func (*InMemoryStore) LoadSaga

func (s *InMemoryStore) LoadSaga(ctx context.Context, sagaID string) (*Saga, error)

LoadSaga loads a saga from the in-memory store

func (*InMemoryStore) SaveSaga

func (s *InMemoryStore) SaveSaga(ctx context.Context, saga *Saga) error

SaveSaga saves a saga to the in-memory store

func (*InMemoryStore) UpdateSagaState

func (s *InMemoryStore) UpdateSagaState(ctx context.Context, sagaID string, state State) (*Saga, error)

UpdateSagaState atomically updates the state of a saga

func (*InMemoryStore) UpdateSagaStep

func (s *InMemoryStore) UpdateSagaStep(ctx context.Context, sagaID, stepID string, status State, output map[string]any, errorMsg string) (*Saga, error)

UpdateSagaStep atomically updates a specific step within a saga

type Manager

type Manager struct {
	// contains filtered or unexported fields
}

Manager manages saga execution and coordination

func NewManager

func NewManager(client *rabbitmq.Client, store Store, config Config) (*Manager, error)

NewManager creates a new saga manager

Example

Example functions showing usage patterns

client, _ := rabbitmq.NewClient(rabbitmq.WithHosts("localhost:5672"))
defer func() {
	if err := client.Close(); err != nil {
		// Handle error in real code
	}
}()

store := NewInMemoryStore()
config := Config{
	SagaExchange:    "sagas",
	StepQueue:       "saga.steps",
	CompensateQueue: "saga.compensate",
}

manager, _ := NewManager(client, store, config)
defer func() {
	_ = manager.Close() // Ignore close error in defer
}()

_ = manager

func (*Manager) Close

func (sm *Manager) Close() error

Close closes the saga manager

func (*Manager) Compensate

func (sm *Manager) Compensate(ctx context.Context, sagaID string) error

Compensate starts compensation for a failed saga

func (*Manager) CompensateSaga

func (sm *Manager) CompensateSaga(ctx context.Context, sagaID string) error

CompensateSaga implements the simplified SagaOrchestrator interface

func (*Manager) Get

func (sm *Manager) Get(ctx context.Context, sagaID string) (*Saga, error)

Get retrieves a saga by ID

func (*Manager) GetSagaStatus

func (sm *Manager) GetSagaStatus(ctx context.Context, sagaID string) (rabbitmq.SagaStatus, error)

GetSagaStatus implements the simplified SagaOrchestrator interface

func (*Manager) ListActive

func (sm *Manager) ListActive(ctx context.Context) ([]*Saga, error)

ListActive lists all active sagas

func (*Manager) Run

func (sm *Manager) Run(ctx context.Context) error

Run starts the saga orchestration engine - this is the heart of the orchestrator

func (*Manager) Start

func (sm *Manager) Start(ctx context.Context, name string, steps []Step, sagaContext map[string]any) (*Saga, error)

Start starts a new saga

Example
client, _ := rabbitmq.NewClient(rabbitmq.WithHosts("localhost:5672"))
defer func() {
	if err := client.Close(); err != nil {
		// Handle error in real code
	}
}()

store := NewInMemoryStore()
config := Config{
	SagaExchange:    "sagas",
	StepQueue:       "saga.steps",
	CompensateQueue: "saga.compensate",
}

manager, _ := NewManager(client, store, config)
defer func() {
	_ = manager.Close() // Ignore close error in defer
}()

steps := []Step{
	{Name: "create_order", Action: "orders.create", Compensation: "orders.delete"},
	{Name: "reserve_inventory", Action: "inventory.reserve", Compensation: "inventory.release"},
	{Name: "charge_payment", Action: "payment.charge", Compensation: "payment.refund"},
}

ctx := context.Background()
saga, _ := manager.Start(ctx, "order_processing", steps, map[string]any{
	"customer_id": "cust-123",
})

_ = saga

func (*Manager) StartSaga

func (sm *Manager) StartSaga(ctx context.Context, sagaName string, steps []rabbitmq.SagaStep, context map[string]any) (string, error)

StartSaga implements the simplified SagaOrchestrator interface

func (*Manager) Stop

func (sm *Manager) Stop()

Stop stops the saga orchestration engine

type Saga

type Saga struct {
	ID          string         `json:"id"`
	Name        string         `json:"name"`
	State       State          `json:"state"`
	Steps       []Step         `json:"steps"`
	Context     map[string]any `json:"context"`
	CreatedAt   time.Time      `json:"created_at"`
	UpdatedAt   time.Time      `json:"updated_at"`
	CompletedAt *time.Time     `json:"completed_at,omitempty"`
}

Saga represents a distributed transaction saga

func (*Saga) GetCompletedSteps

func (s *Saga) GetCompletedSteps() []Step

GetCompletedSteps returns all steps that have completed successfully

func (*Saga) GetFailedSteps

func (s *Saga) GetFailedSteps() []Step

GetFailedSteps returns all steps that have failed

func (*Saga) GetNextStep

func (s *Saga) GetNextStep() *Step

GetNextStep returns the next step to execute, or nil if no more steps

func (*Saga) IsCompensated

func (s *Saga) IsCompensated() bool

IsCompensated returns true if the saga has been fully compensated

func (*Saga) IsCompensating

func (s *Saga) IsCompensating() bool

IsCompensating returns true if the saga is currently compensating

func (*Saga) IsCompleted

func (s *Saga) IsCompleted() bool

IsCompleted returns true if the saga has completed successfully

func (*Saga) IsFailed

func (s *Saga) IsFailed() bool

IsFailed returns true if the saga has failed

type State

type State string

State represents the current state of a saga or step

const (
	StateStarted      State = "started"
	StateCompleted    State = "completed"
	StateFailed       State = "failed"
	StateCompensating State = "compensating"
	StateCompensated  State = "compensated"
)

type Step

type Step struct {
	ID           string         `json:"id"`
	Name         string         `json:"name"`
	Action       string         `json:"action"`       // The action to perform
	Compensation string         `json:"compensation"` // The compensation action
	Input        map[string]any `json:"input"`
	Output       map[string]any `json:"output"`
	Status       State          `json:"status"`
	Error        string         `json:"error,omitempty"`
	ExecutedAt   time.Time      `json:"executed_at"`
}

Step represents a single step in a saga

type StepHandler

type StepHandler func(ctx context.Context, saga *Saga, step *Step) error

StepHandler defines the function signature for saga step handlers

type Store

type Store interface {
	SaveSaga(ctx context.Context, saga *Saga) error
	LoadSaga(ctx context.Context, sagaID string) (*Saga, error)
	DeleteSaga(ctx context.Context, sagaID string) error
	ListActiveSagas(ctx context.Context) ([]*Saga, error)
	// Atomic operations for concurrency safety
	UpdateSagaStep(ctx context.Context, sagaID, stepID string, status State, output map[string]any, errorMsg string) (*Saga, error)
	UpdateSagaState(ctx context.Context, sagaID string, state State) (*Saga, error)
}

Store defines the interface for saga persistence with atomic operations

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL