Saga Pattern Package
Home / Saga Pattern Package
The saga package provides distributed transaction support using the Saga pattern for RabbitMQ applications. The Saga pattern is a sequence of local transactions where each transaction updates data within a single service. If a local transaction fails, the saga executes compensating transactions to undo the impact of preceding transactions.
Features
- Orchestration Engine - Fully automated saga orchestration with message-driven step execution
- Atomic State Updates - Concurrency-safe state management with atomic step and saga updates
- Step Execution - Execute saga steps with automatic progression and error handling
- Compensation Logic - Automatic rollback through compensating actions
- Pluggable Storage - Interface for different persistence implementations with atomic operations
- In-Memory Store - Built-in concurrent-safe store for testing and development
- Message-Driven - Uses RabbitMQ for reliable step coordination and orchestration
- Error Handling - Comprehensive error tracking, recovery, and compensation triggers
- Production Ready - Designed for high-concurrency production workloads
🔝 back to top
Quick Start
Setting Up the Orchestration Engine
package main
import (
"context"
"log"
"time"
"github.com/cloudresty/go-rabbitmq"
"github.com/cloudresty/go-rabbitmq/saga"
)
func main() {
// Create RabbitMQ client
client, err := rabbitmq.NewClient(
rabbitmq.WithHosts("localhost:5672"),
)
if err != nil {
log.Fatal(err)
}
defer client.Close()
// Create saga store
store := saga.NewInMemoryStore()
// Define step and compensation handlers
stepHandlers := map[string]saga.StepHandler{
"orders.create": createOrderHandler,
"inventory.reserve": reserveInventoryHandler,
"payment.charge": chargePaymentHandler,
}
compensationHandlers := map[string]saga.CompensationHandler{
"orders.delete": deleteOrderHandler,
"inventory.release": releaseInventoryHandler,
"payment.refund": refundPaymentHandler,
}
// Create saga manager with handlers
manager, err := saga.NewManager(client, store, saga.Config{
SagaExchange: "sagas",
StepQueue: "saga.steps",
CompensateQueue: "saga.compensate",
StepHandlers: stepHandlers,
CompensationHandlers: compensationHandlers,
})
if err != nil {
log.Fatal(err)
}
defer manager.Close()
// Start the orchestration engine (this is the heart of the saga system)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Run the orchestration engine in the background
go func() {
if err := manager.Run(ctx); err != nil {
log.Printf("Orchestration engine error: %v", err)
}
}()
// Define saga steps
steps := []saga.Step{
{
Name: "create_order",
Action: "orders.create",
Compensation: "orders.delete",
Input: map[string]any{
"product_id": "prod-123",
"quantity": 2,
},
},
{
Name: "reserve_inventory",
Action: "inventory.reserve",
Compensation: "inventory.release",
Input: map[string]any{
"product_id": "prod-123",
"quantity": 2,
},
},
{
Name: "charge_payment",
Action: "payment.charge",
Compensation: "payment.refund",
Input: map[string]any{
"amount": 99.99,
"payment_method": "card-456",
},
},
}
// Start saga (orchestration engine will automatically execute steps)
orderContext := map[string]any{
"customer_id": "cust-789",
"order_total": 99.99,
}
s, err := manager.Start(context.Background(), "order_processing", steps, orderContext)
if err != nil {
log.Fatal(err)
}
log.Printf("Started saga: %s", s.ID)
// Monitor saga progress
for {
time.Sleep(1 * time.Second)
currentSaga, err := manager.Get(context.Background(), s.ID)
if err != nil {
log.Printf("Error getting saga: %v", err)
continue
}
log.Printf("Saga %s is %s", currentSaga.ID, currentSaga.State)
if currentSaga.IsCompleted() {
log.Println("Saga completed successfully!")
break
} else if currentSaga.IsFailed() || currentSaga.IsCompensated() {
log.Printf("Saga failed or compensated: %s", currentSaga.State)
break
}
}
}
🔝 back to top
Implementing Step and Compensation Handlers
// Step handler for creating orders
func createOrderHandler(ctx context.Context, s *saga.Saga, step *saga.Step) error {
// Extract input data
productID := step.Input["product_id"].(string)
quantity := step.Input["quantity"].(int)
customerID := s.Context["customer_id"].(string)
log.Printf("Creating order for customer %s: %d x %s", customerID, quantity, productID)
// Simulate order creation logic
orderID := generateOrderID()
// Perform actual order creation
if err := createOrderInDatabase(orderID, customerID, productID, quantity); err != nil {
return fmt.Errorf("failed to create order: %w", err)
}
// Update step output for use in subsequent steps or compensation
step.Output = map[string]any{
"order_id": orderID,
"status": "created",
"created_at": time.Now(),
}
log.Printf("Order created successfully: %s", orderID)
return nil
}
// Step handler for inventory reservation
func reserveInventoryHandler(ctx context.Context, s *saga.Saga, step *saga.Step) error {
productID := step.Input["product_id"].(string)
quantity := step.Input["quantity"].(int)
log.Printf("Reserving %d units of %s", quantity, productID)
// Check inventory availability
available, err := checkInventoryAvailability(productID, quantity)
if err != nil {
return fmt.Errorf("failed to check inventory: %w", err)
}
if !available {
return fmt.Errorf("insufficient inventory for product %s (need %d)", productID, quantity)
}
// Reserve inventory
reservationID, err := reserveInventory(productID, quantity)
if err != nil {
return fmt.Errorf("failed to reserve inventory: %w", err)
}
step.Output = map[string]any{
"reservation_id": reservationID,
"reserved_qty": quantity,
"reserved_at": time.Now(),
}
log.Printf("Inventory reserved: %s", reservationID)
return nil
}
// Step handler for payment processing
func chargePaymentHandler(ctx context.Context, s *saga.Saga, step *saga.Step) error {
amount := step.Input["amount"].(float64)
paymentMethod := step.Input["payment_method"].(string)
customerID := s.Context["customer_id"].(string)
log.Printf("Charging $%.2f to %s for customer %s", amount, paymentMethod, customerID)
// Process payment with timeout
ctx, cancel := context.WithTimeout(ctx, 30*time.Second)
defer cancel()
transactionID, err := processPayment(ctx, paymentMethod, amount, customerID)
if err != nil {
return fmt.Errorf("payment failed: %w", err)
}
step.Output = map[string]any{
"transaction_id": transactionID,
"amount_charged": amount,
"charged_at": time.Now(),
"payment_method": paymentMethod,
}
log.Printf("Payment processed successfully: %s", transactionID)
return nil
}
// Compensation handler for order deletion
func deleteOrderHandler(ctx context.Context, s *saga.Saga, step *saga.Step) error {
// Extract order ID from step output
orderID, exists := step.Output["order_id"]
if !exists {
log.Printf("No order ID found in step output, skipping deletion")
return nil // Idempotent - if no order was created, nothing to delete
}
log.Printf("Deleting order %s", orderID)
if err := deleteOrderFromDatabase(orderID.(string)); err != nil {
return fmt.Errorf("failed to delete order: %w", err)
}
log.Printf("Order %s deleted successfully", orderID)
return nil
}
// Compensation handler for inventory release
func releaseInventoryHandler(ctx context.Context, s *saga.Saga, step *saga.Step) error {
reservationID, exists := step.Output["reservation_id"]
if !exists {
log.Printf("No reservation ID found, skipping inventory release")
return nil // Idempotent
}
log.Printf("Releasing inventory reservation %s", reservationID)
if err := releaseInventoryReservation(reservationID.(string)); err != nil {
return fmt.Errorf("failed to release inventory: %w", err)
}
log.Printf("Inventory reservation %s released successfully", reservationID)
return nil
}
// Compensation handler for payment refund
func refundPaymentHandler(ctx context.Context, s *saga.Saga, step *saga.Step) error {
transactionID, exists := step.Output["transaction_id"]
if !exists {
log.Printf("No transaction ID found, skipping refund")
return nil // Idempotent
}
amount := step.Output["amount_charged"].(float64)
log.Printf("Refunding $%.2f for transaction %s", amount, transactionID)
refundID, err := processRefund(transactionID.(string), amount)
if err != nil {
return fmt.Errorf("failed to process refund: %w", err)
}
log.Printf("Refund processed successfully: %s", refundID)
return nil
}
🔝 back to top
Saga States and Lifecycle
Orchestration Engine
The saga package includes a complete orchestration engine that automatically manages saga execution:
// Start the orchestration engine
go func() {
if err := manager.Run(ctx); err != nil {
log.Printf("Orchestration engine error: %v", err)
}
}()
🔝 back to top
What the orchestration engine does:
- Message Processing: Consumes step execution and compensation messages from RabbitMQ queues
- Handler Execution: Looks up and executes registered step/compensation handlers
- State Management: Atomically updates saga and step states in the store
- Flow Control: Automatically publishes the next step message when a step completes
- Error Handling: Triggers compensation when steps fail
- Concurrency Safety: Uses atomic operations to prevent race conditions
🔝 back to top
Atomic State Updates
The package provides concurrency-safe state updates through atomic operations:
// Store interface includes atomic update methods
type Store interface {
SaveSaga(ctx context.Context, saga *Saga) error
LoadSaga(ctx context.Context, sagaID string) (*Saga, error)
// Atomic operations for concurrency safety
UpdateSagaStep(ctx context.Context, sagaID, stepID string, status State, output map[string]any, errorMsg string) (*Saga, error)
UpdateSagaState(ctx context.Context, sagaID string, state State) (*Saga, error)
}
🔝 back to top
Benefits of atomic updates:
- Race Condition Prevention: Multiple concurrent processes can safely update the same saga
- Consistency: Guarantees that state changes are atomic and consistent
- Performance: Eliminates the need for load-modify-save patterns
- Reliability: Ensures saga state is always accurate, even under high concurrency
🔝 back to top
Saga States
saga.StateStarted // Saga has been initiated
saga.StateCompleted // All steps completed successfully
saga.StateFailed // One or more steps failed
saga.StateCompensating // Compensation is in progress
saga.StateCompensated // All compensations completed
🔝 back to top
Step States
saga.StateStarted // Step is ready to execute
saga.StateCompleted // Step executed successfully
saga.StateFailed // Step execution failed
🔝 back to top
Monitoring Saga Progress
// Get saga status
s, err := manager.Get(context.Background(), sagaID)
if err != nil {
log.Fatal(err)
}
fmt.Printf("Saga %s is %s\n", s.ID, s.State)
// Check individual steps
for _, step := range s.Steps {
fmt.Printf("Step %s (%s): %s\n", step.Name, step.Action, step.Status)
if step.Error != "" {
fmt.Printf(" Error: %s\n", step.Error)
}
}
// Use helper methods
if s.IsCompleted() {
fmt.Println("Saga completed successfully!")
}
if s.IsFailed() {
fmt.Println("Saga failed, starting compensation...")
err := manager.Compensate(context.Background(), s.ID)
if err != nil {
log.Printf("Compensation failed: %v", err)
}
}
🔝 back to top
Error Handling and Compensation
Automatic Compensation
The orchestration engine automatically triggers compensation when steps fail:
// No manual intervention needed - the engine handles this automatically
// When a step handler returns an error:
func problematicStepHandler(ctx context.Context, s *saga.Saga, step *saga.Step) error {
// If this returns an error, the orchestration engine will:
// 1. Mark the step as failed
// 2. Mark the saga as failed
// 3. Automatically start compensation
return fmt.Errorf("step failed for some reason")
}
🔝 back to top
Manual Compensation Trigger
You can also manually trigger compensation:
// Manually trigger compensation for a saga
err := manager.Compensate(context.Background(), sagaID)
if err != nil {
log.Printf("Failed to start compensation: %v", err)
}
🔝 back to top
Monitoring Saga State
// Monitor saga progress
saga, err := manager.Get(context.Background(), sagaID)
if err != nil {
log.Printf("Error getting saga: %v", err)
return
}
// Check saga state
switch saga.State {
case saga.StateCompleted:
log.Println("Saga completed successfully!")
case saga.StateFailed:
log.Println("Saga failed, compensation should start automatically")
case saga.StateCompensating:
log.Println("Saga is currently being compensated")
case saga.StateCompensated:
log.Println("Saga has been fully compensated")
default:
log.Printf("Saga is in progress: %s", saga.State)
}
// Check individual steps
for _, step := range saga.Steps {
if step.Status == saga.StateFailed {
log.Printf("Step %s failed: %s", step.Name, step.Error)
}
}
🔝 back to top
Custom Persistence Store
Implement the Store interface for production use with atomic operations:
type PostgresSagaStore struct {
db *sql.DB
}
func (p *PostgresSagaStore) SaveSaga(ctx context.Context, saga *saga.Saga) error {
query := `
INSERT INTO sagas (id, name, state, steps, context, created_at, updated_at, completed_at)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
ON CONFLICT (id) DO UPDATE SET
state = $3, steps = $4, context = $5, updated_at = $7, completed_at = $8
`
stepsJSON, _ := json.Marshal(saga.Steps)
contextJSON, _ := json.Marshal(saga.Context)
_, err := p.db.ExecContext(ctx, query,
saga.ID, saga.Name, saga.State, stepsJSON, contextJSON,
saga.CreatedAt, saga.UpdatedAt, saga.CompletedAt)
return err
}
func (p *PostgresSagaStore) LoadSaga(ctx context.Context, sagaID string) (*saga.Saga, error) {
query := `
SELECT id, name, state, steps, context, created_at, updated_at, completed_at
FROM sagas WHERE id = $1
`
var s saga.Saga
var stepsJSON, contextJSON []byte
err := p.db.QueryRowContext(ctx, query, sagaID).Scan(
&s.ID, &s.Name, &s.State, &stepsJSON, &contextJSON,
&s.CreatedAt, &s.UpdatedAt, &s.CompletedAt)
if err != nil {
return nil, err
}
json.Unmarshal(stepsJSON, &s.Steps)
json.Unmarshal(contextJSON, &s.Context)
return &s, nil
}
// Atomic step update - critical for concurrency safety
func (p *PostgresSagaStore) UpdateSagaStep(ctx context.Context, sagaID, stepID string, status saga.State, output map[string]any, errorMsg string) (*saga.Saga, error) {
tx, err := p.db.BeginTx(ctx, nil)
if err != nil {
return nil, err
}
defer tx.Rollback()
// Lock the saga row for update
var s saga.Saga
var stepsJSON, contextJSON []byte
query := `
SELECT id, name, state, steps, context, created_at, updated_at, completed_at
FROM sagas WHERE id = $1 FOR UPDATE
`
err = tx.QueryRowContext(ctx, query, sagaID).Scan(
&s.ID, &s.Name, &s.State, &stepsJSON, &contextJSON,
&s.CreatedAt, &s.UpdatedAt, &s.CompletedAt)
if err != nil {
return nil, err
}
json.Unmarshal(stepsJSON, &s.Steps)
json.Unmarshal(contextJSON, &s.Context)
// Find and update the step
stepFound := false
for i := range s.Steps {
if s.Steps[i].ID == stepID {
s.Steps[i].Status = status
s.Steps[i].Output = output
s.Steps[i].Error = errorMsg
s.Steps[i].ExecutedAt = time.Now()
stepFound = true
break
}
}
if !stepFound {
return nil, fmt.Errorf("step not found: %s", stepID)
}
// Update saga state based on step states
s.UpdatedAt = time.Now()
allCompleted := true
anyFailed := false
for _, step := range s.Steps {
if step.Status == saga.StateFailed {
anyFailed = true
break
}
if step.Status != saga.StateCompleted {
allCompleted = false
}
}
if anyFailed {
s.State = saga.StateFailed
} else if allCompleted {
s.State = saga.StateCompleted
now := time.Now()
s.CompletedAt = &now
}
// Save updated saga
updatedStepsJSON, _ := json.Marshal(s.Steps)
updatedContextJSON, _ := json.Marshal(s.Context)
updateQuery := `
UPDATE sagas SET
state = $2, steps = $3, context = $4, updated_at = $5, completed_at = $6
WHERE id = $1
`
_, err = tx.ExecContext(ctx, updateQuery,
s.ID, s.State, updatedStepsJSON, updatedContextJSON, s.UpdatedAt, s.CompletedAt)
if err != nil {
return nil, err
}
return &s, tx.Commit()
}
// Atomic saga state update
func (p *PostgresSagaStore) UpdateSagaState(ctx context.Context, sagaID string, state saga.State) (*saga.Saga, error) {
tx, err := p.db.BeginTx(ctx, nil)
if err != nil {
return nil, err
}
defer tx.Rollback()
// Lock and load saga
var s saga.Saga
var stepsJSON, contextJSON []byte
query := `
SELECT id, name, state, steps, context, created_at, updated_at, completed_at
FROM sagas WHERE id = $1 FOR UPDATE
`
err = tx.QueryRowContext(ctx, query, sagaID).Scan(
&s.ID, &s.Name, &s.State, &stepsJSON, &contextJSON,
&s.CreatedAt, &s.UpdatedAt, &s.CompletedAt)
if err != nil {
return nil, err
}
json.Unmarshal(stepsJSON, &s.Steps)
json.Unmarshal(contextJSON, &s.Context)
// Update state
s.State = state
s.UpdatedAt = time.Now()
if state == saga.StateCompleted {
now := time.Now()
s.CompletedAt = &now
}
// Save updated saga
updateQuery := `
UPDATE sagas SET state = $2, updated_at = $3, completed_at = $4
WHERE id = $1
`
_, err = tx.ExecContext(ctx, updateQuery, s.ID, s.State, s.UpdatedAt, s.CompletedAt)
if err != nil {
return nil, err
}
return &s, tx.Commit()
}
// Implement remaining Store methods...
func (p *PostgresSagaStore) DeleteSaga(ctx context.Context, sagaID string) error {
_, err := p.db.ExecContext(ctx, "DELETE FROM sagas WHERE id = $1", sagaID)
return err
}
func (p *PostgresSagaStore) ListActiveSagas(ctx context.Context) ([]*saga.Saga, error) {
query := `
SELECT id, name, state, steps, context, created_at, updated_at, completed_at
FROM sagas WHERE state IN ('started', 'compensating')
`
rows, err := p.db.QueryContext(ctx, query)
if err != nil {
return nil, err
}
defer rows.Close()
var sagas []*saga.Saga
for rows.Next() {
var s saga.Saga
var stepsJSON, contextJSON []byte
err := rows.Scan(&s.ID, &s.Name, &s.State, &stepsJSON, &contextJSON,
&s.CreatedAt, &s.UpdatedAt, &s.CompletedAt)
if err != nil {
return nil, err
}
json.Unmarshal(stepsJSON, &s.Steps)
json.Unmarshal(contextJSON, &s.Context)
sagas = append(sagas, &s)
}
return sagas, rows.Err()
}
🔝 back to top
Advanced Patterns
Parallel Steps
// Define parallel steps with dependencies
steps := []saga.Step{
{Name: "create_order", Action: "orders.create"},
// These can run in parallel
{Name: "reserve_inventory", Action: "inventory.reserve"},
{Name: "validate_address", Action: "shipping.validate"},
// This waits for parallel steps
{Name: "charge_payment", Action: "payment.charge"},
}
🔝 back to top
Conditional Steps
stepHandler := func(ctx context.Context, s *saga.Saga, step *saga.Step) error {
// Check saga context for conditions
if s.Context["requires_approval"].(bool) {
// Execute approval step
return requestApproval(s.Context)
}
// Skip this step
return nil
}
🔝 back to top
Saga Composition
// Compose larger sagas from smaller ones
func createOrderSaga(orderData map[string]any) []saga.Step {
steps := []saga.Step{
{Name: "validate_order", Action: "orders.validate"},
}
// Add payment steps if required
if orderData["payment_required"].(bool) {
steps = append(steps, createPaymentSteps(orderData)...)
}
// Add shipping steps if physical goods
if orderData["requires_shipping"].(bool) {
steps = append(steps, createShippingSteps(orderData)...)
}
return steps
}
🔝 back to top
Integration Examples
Production Service Setup
func main() {
// Create RabbitMQ client
client, err := rabbitmq.NewClient(
rabbitmq.WithHosts("localhost:5672"),
rabbitmq.WithConnectionName("saga-service"),
)
if err != nil {
log.Fatal(err)
}
defer client.Close()
// Create production store (PostgreSQL example)
store, err := NewPostgresSagaStore(databaseURL)
if err != nil {
log.Fatal(err)
}
// Create saga manager with all handlers
manager, err := saga.NewManager(client, store, saga.Config{
SagaExchange: "sagas",
StepQueue: "saga.steps",
CompensateQueue: "saga.compensate",
StepHandlers: buildStepHandlers(),
CompensationHandlers: buildCompensationHandlers(),
})
if err != nil {
log.Fatal(err)
}
defer manager.Close()
// Start orchestration engine
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Handle graceful shutdown
go func() {
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
<-sigChan
log.Println("Shutting down saga service...")
cancel()
}()
// Start the orchestration engine
log.Println("Starting saga orchestration engine...")
if err := manager.Run(ctx); err != nil && err != context.Canceled {
log.Printf("Orchestration engine error: %v", err)
}
}
func buildStepHandlers() map[string]saga.StepHandler {
return map[string]saga.StepHandler{
"orders.create": orderService.CreateOrder,
"inventory.reserve": inventoryService.ReserveInventory,
"payment.charge": paymentService.ChargePayment,
"shipping.schedule": shippingService.ScheduleShipment,
"notifications.send": notificationService.SendNotification,
}
}
func buildCompensationHandlers() map[string]saga.CompensationHandler {
return map[string]saga.CompensationHandler{
"orders.delete": orderService.DeleteOrder,
"inventory.release": inventoryService.ReleaseInventory,
"payment.refund": paymentService.RefundPayment,
"shipping.cancel": shippingService.CancelShipment,
"notifications.cancel": notificationService.CancelNotification,
}
}
🔝 back to top
With HTTP API
func startOrderSaga(w http.ResponseWriter, r *http.Request) {
var orderRequest OrderRequest
if err := json.NewDecoder(r.Body).Decode(&orderRequest); err != nil {
http.Error(w, "Invalid request", http.StatusBadRequest)
return
}
steps := createOrderSteps(orderRequest)
context := map[string]any{
"customer_id": orderRequest.CustomerID,
"order_data": orderRequest,
"request_id": r.Header.Get("X-Request-ID"),
}
saga, err := sagaManager.Start(r.Context(), "order_processing", steps, context)
if err != nil {
log.Printf("Failed to start saga: %v", err)
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
response := map[string]any{
"saga_id": saga.ID,
"status": saga.State,
"message": "Order processing started",
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(response)
}
func getSagaStatus(w http.ResponseWriter, r *http.Request) {
sagaID := mux.Vars(r)["sagaId"]
saga, err := sagaManager.Get(r.Context(), sagaID)
if err != nil {
http.Error(w, "Saga not found", http.StatusNotFound)
return
}
// Create detailed response
response := map[string]any{
"saga_id": saga.ID,
"name": saga.Name,
"state": saga.State,
"created_at": saga.CreatedAt,
"updated_at": saga.UpdatedAt,
"completed_at": saga.CompletedAt,
"steps": func() []map[string]any {
steps := make([]map[string]any, len(saga.Steps))
for i, step := range saga.Steps {
steps[i] = map[string]any{
"id": step.ID,
"name": step.Name,
"action": step.Action,
"status": step.Status,
"executed_at": step.ExecutedAt,
"error": step.Error,
}
}
return steps
}(),
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(response)
}
// Webhook endpoint for external service notifications
func handleStepCompletion(w http.ResponseWriter, r *http.Request) {
var notification struct {
SagaID string `json:"saga_id"`
StepID string `json:"step_id"`
Status string `json:"status"`
Output map[string]any `json:"output"`
Error string `json:"error"`
}
if err := json.NewDecoder(r.Body).Decode(¬ification); err != nil {
http.Error(w, "Invalid notification", http.StatusBadRequest)
return
}
// External services can notify saga completion via webhooks
// The orchestration engine will handle the next steps automatically
log.Printf("Received step completion notification: saga=%s, step=%s, status=%s",
notification.SagaID, notification.StepID, notification.Status)
w.WriteHeader(http.StatusOK)
}
🔝 back to top
With Worker Pattern
func startSagaWorkers(manager *saga.Manager, config saga.Config) {
// Start step execution workers
go func() {
for {
// Consume step execution messages
err := consumer.Consume(context.Background(), config.StepQueue,
func(ctx context.Context, delivery rabbitmq.Delivery) error {
return handleStepExecution(ctx, manager, delivery)
})
if err != nil {
log.Printf("Step consumer error: %v", err)
}
}
}()
// Start compensation workers
go func() {
for {
// Consume compensation messages
err := consumer.Consume(context.Background(), config.CompensateQueue,
func(ctx context.Context, delivery rabbitmq.Delivery) error {
return handleCompensation(ctx, manager, delivery)
})
if err != nil {
log.Printf("Compensation consumer error: %v", err)
}
}
}()
}
🔝 back to top
Best Practices
1. Orchestration Engine Management
// Always run the orchestration engine in production
func startSagaService() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Graceful shutdown
go func() {
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
<-sigChan
log.Println("Shutting down...")
cancel()
}()
// Start orchestration engine
if err := manager.Run(ctx); err != nil && err != context.Canceled {
log.Printf("Engine error: %v", err)
}
}
🔝 back to top
2. Idempotent Operations
// Make steps idempotent to handle retries safely
stepHandler := func(ctx context.Context, s *saga.Saga, step *saga.Step) error {
// Check if already processed
if step.Output["processed"] == true {
return nil
}
// Perform operation
result, err := performOperation(step.Input)
if err != nil {
return err
}
// Mark as processed
step.Output = map[string]any{
"processed": true,
"result": result,
"timestamp": time.Now(),
}
return nil
}
🔝 back to top
3. Timeout Handling
stepHandler := func(ctx context.Context, s *saga.Saga, step *saga.Step) error {
// Create timeout context for step execution
ctx, cancel := context.WithTimeout(ctx, 30*time.Second)
defer cancel()
// Perform operation with timeout
return performOperationWithTimeout(ctx, step.Input)
}
🔝 back to top
4. Retry Logic with Exponential Backoff
stepHandler := func(ctx context.Context, s *saga.Saga, step *saga.Step) error {
maxRetries := 3
baseDelay := time.Second
for attempt := 0; attempt < maxRetries; attempt++ {
err := performOperation(step.Input)
if err == nil {
return nil
}
// Check if this is the last attempt
if attempt == maxRetries-1 {
return fmt.Errorf("operation failed after %d attempts: %w", maxRetries, err)
}
// Exponential backoff
delay := baseDelay * time.Duration(1<<attempt)
log.Printf("Step %s failed (attempt %d), retrying in %v: %v",
step.Name, attempt+1, delay, err)
select {
case <-time.After(delay):
continue
case <-ctx.Done():
return ctx.Err()
}
}
return nil
}
🔝 back to top
5. Error Classification and Handling
// Define error types for better compensation decisions
type StepError struct {
Code string
Message string
Retryable bool
}
func (e StepError) Error() string {
return e.Message
}
stepHandler := func(ctx context.Context, s *saga.Saga, step *saga.Step) error {
result, err := performBusinessOperation(step.Input)
if err != nil {
// Classify the error
switch {
case isTemporaryError(err):
return StepError{
Code: "TEMPORARY_ERROR",
Message: err.Error(),
Retryable: true,
}
case isBusinessRuleViolation(err):
return StepError{
Code: "BUSINESS_RULE_VIOLATION",
Message: err.Error(),
Retryable: false,
}
default:
return err
}
}
step.Output = map[string]any{
"result": result,
"success": true,
}
return nil
}
🔝 back to top
6. Monitoring and Observability
func monitorSagas(manager *saga.Manager) {
ticker := time.NewTicker(time.Minute)
go func() {
defer ticker.Stop()
for range ticker.C {
active, err := manager.ListActive(context.Background())
if err != nil {
log.Printf("Failed to list active sagas: %v", err)
continue
}
log.Printf("Active sagas: %d", len(active))
// Check for stale sagas
staleThreshold := time.Hour
for _, saga := range active {
if time.Since(saga.UpdatedAt) > staleThreshold {
log.Printf("Stale saga detected: %s (last updated: %v)",
saga.ID, saga.UpdatedAt)
// Emit metrics for alerting
emitStaleSagaMetric(saga.ID, saga.Name, time.Since(saga.UpdatedAt))
}
}
}
}()
}
func emitStaleSagaMetric(sagaID, sagaName string, staleDuration time.Duration) {
// Integrate with your monitoring system (Prometheus, etc.)
staleSagaCounter.WithLabelValues(sagaName).Inc()
staleDurationGauge.WithLabelValues(sagaName).Set(staleDuration.Seconds())
}
🔝 back to top
7. Resource Management
// Properly manage resources in handlers
func createOrderHandler(ctx context.Context, s *saga.Saga, step *saga.Step) error {
// Use connection pools for database operations
db := getDBConnection()
defer db.Close()
// Use transactions for consistency
tx, err := db.BeginTx(ctx, nil)
if err != nil {
return err
}
defer tx.Rollback()
// Perform operations
orderID, err := createOrderInTx(tx, step.Input)
if err != nil {
return err
}
// Commit transaction
if err := tx.Commit(); err != nil {
return err
}
step.Output = map[string]any{
"order_id": orderID,
"created_at": time.Now(),
}
return nil
}
🔝 back to top
8. Compensation Best Practices
// Make compensations idempotent and safe
func deleteOrderCompensation(ctx context.Context, s *saga.Saga, step *saga.Step) error {
orderID, exists := step.Output["order_id"]
if !exists {
log.Printf("No order ID found for compensation, skipping")
return nil // Safe to skip if no order was created
}
// Check if order still exists
exists, err := orderExists(ctx, orderID.(string))
if err != nil {
return fmt.Errorf("failed to check order existence: %w", err)
}
if !exists {
log.Printf("Order %s already deleted, compensation idempotent", orderID)
return nil // Idempotent - already deleted
}
// Delete the order
if err := deleteOrder(ctx, orderID.(string)); err != nil {
return fmt.Errorf("failed to delete order %s: %w", orderID, err)
}
log.Printf("Order %s deleted successfully", orderID)
return nil
}
🔝 back to top
Configuration Reference
Config Structure
| Field |
Type |
Description |
SagaExchange |
string |
Exchange for saga coordination messages |
StepQueue |
string |
Queue for step execution messages |
CompensateQueue |
string |
Queue for compensation messages |
StepHandlers |
map[string]StepHandler |
Step execution handlers (required for orchestration) |
CompensationHandlers |
map[string]CompensationHandler |
Compensation handlers (required for orchestration) |
🔝 back to top
Store Interface
The Store interface now includes atomic operations for concurrency safety:
| Method |
Description |
SaveSaga |
Save a complete saga (used for initial creation) |
LoadSaga |
Load a saga by ID |
DeleteSaga |
Delete a saga |
ListActiveSagas |
List all active sagas (started or compensating) |
UpdateSagaStep |
Atomically update a specific step within a saga |
UpdateSagaState |
Atomically update the overall saga state |
🔝 back to top
Saga Structure
| Field |
Type |
Description |
ID |
string |
Unique saga identifier (ULID) |
Name |
string |
Saga name/type |
State |
State |
Current saga state |
Steps |
[]Step |
Saga steps with execution details |
Context |
map[string]any |
Saga-wide context data |
CreatedAt |
time.Time |
Creation timestamp |
UpdatedAt |
time.Time |
Last update timestamp |
CompletedAt |
*time.Time |
Completion timestamp (nil if not completed) |
🔝 back to top
Step Structure
| Field |
Type |
Description |
ID |
string |
Unique step identifier (ULID) |
Name |
string |
Human-readable step name |
Action |
string |
Action identifier for handler lookup |
Compensation |
string |
Compensation action identifier |
Input |
map[string]any |
Step input data |
Output |
map[string]any |
Step output data |
Status |
State |
Current step status |
Error |
string |
Error message if step failed |
ExecutedAt |
time.Time |
Step execution timestamp |
🔝 back to top
Manager Methods
| Method |
Description |
NewManager |
Create a new saga manager with handlers |
Run |
Start the orchestration engine (blocks until context cancelled) |
Start |
Start a new saga (returns immediately) |
Get |
Retrieve a saga by ID |
ListActive |
List all active sagas |
Compensate |
Manually trigger compensation for a saga |
Stop |
Stop the orchestration engine gracefully |
Close |
Close all resources (publisher, consumer) |
🔝 back to top
Testing
# Run saga package tests
go test ./saga
# Run with race detection
go test -race ./saga
🔝 back to top
Orchestration Pattern
The saga package provides a complete orchestration engine with automatic step execution:
// Create manager with handlers
manager, err := saga.NewManager(client, store, saga.Config{
SagaExchange: "sagas",
StepQueue: "saga.steps",
CompensateQueue: "saga.compensate",
StepHandlers: stepHandlers, // Required
CompensationHandlers: compHandlers, // Required
})
// Start orchestration engine (critical!)
go func() {
if err := manager.Run(ctx); err != nil {
log.Printf("Engine error: %v", err)
}
}()
// Start sagas (they execute automatically)
s, err := manager.Start(ctx, name, steps, context)
The orchestration engine automatically handles step execution, state updates, and compensation flow.
🔝 back to top
An open source project brought to you by the Cloudresty team.
Website | LinkedIn | BlueSky | GitHub | Docker Hub