sagas

package
v0.3.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 10, 2021 License: MIT Imports: 14 Imported by: 0

Documentation

Overview

Package sagas implements the orchestration based saga pattern. See https://microservices.io/patterns/data/saga.html

Introduction

A saga is a sequence of local transactions. Each local transaction updates the database and publishes a message or event to trigger the next local transaction in the saga. If a local transaction fails because it violates a business rule then the saga executes a series of compensating transactions that undo the changes that were made by the preceding local transactions.

Usage

The saga is managed by sagas.Registry. Each saga step has an forward operation and a rollback counterpart. They must be registered beforehand by calling Registry.AddStep. A new endpoint will be returned to the caller. Use the returned endpoint to perform transactional operation.

store := sagas.NewInProcessStore()
registry := sagas.NewRegistry(store)
addOrder := registry.AddStep(&sagas.Step{
	Name: "Add Order",
	Do: func(ctx context.Context, request interface{}) (response interface{}, err error) {
		resp, err := orderEndpoint(ctx, request.(OrderRequest))
		if err != nil {
			return nil, err
		}
		return resp, nil
	},
	Undo: func(ctx context.Context, req interface{}) (response interface{}, err error) {
		return orderCancelEndpoint(ctx, req)
	},
})
makePayment := registry.AddStep(&sagas.Step{
	Name: "Make Payment",
	Do: func(ctx context.Context, request interface{}) (response interface{}, err error) {
		resp, err := paymentEndpoint(ctx, request.(PaymentRequest))
		if err != nil {
			return nil, err
		}
		return resp, nil
	},
	Undo: func(ctx context.Context, req interface{}) (response interface{}, err error) {
		return paymentCancelEndpoint(ctx)
	},
})

Initiate the transaction by calling registry.StartTX. Pass the context returned to the transaction branches. You can rollback or commit at your will. If the TX.Rollback is called, the previously registered rollback operations will be applied automatically, on condition that the forward operation is indeed executed within the transaction.

tx, ctx := registry.StartTX(context.Background())
resp, err := addOrder(ctx, OrderRequest{Sku: "1"})
if err != nil {
	tx.Rollback(ctx)
}
resp, err = makePayment(ctx, PaymentRequest{})
if err != nil {
	tx.Rollback(ctx)
}
tx.Commit(ctx)

Integration

The package leader exports configuration in this format:

saga:
	sagaTimeoutSecond: 600
	recoverIntervalSecond: 60

To use package sagas with package core:

var c *core.C = core.Default()
c.Provide(sagas.Providers)
c.Invoke(func(registry *sagas.Registry) {
	tx, ctx := registry.StartTX(context.Background())
	resp, err := addOrder(ctx, OrderRequest{Sku: "1"})
	if err != nil {
		tx.Rollback(ctx)
	}
	resp, err = makePayment(ctx, PaymentRequest{})
	if err != nil {
		tx.Rollback(ctx)
	}
	tx.Commit(ctx)
})
Example
package main

import (
	"context"
	"fmt"

	"github.com/DoNewsCode/core/dtx"
	"github.com/DoNewsCode/core/dtx/sagas"
)

var orderTable = make(map[string]interface{})
var paymentTable = make(map[string]interface{})

type OrderRequest struct {
	Sku string
}

type OrderResponse struct {
	OrderID string
	Sku     string
	Cost    float64
}

type PaymentRequest struct {
	OrderID string
	Sku     string
	Cost    float64
}

type PaymentResponse struct {
	Success bool
}

func orderEndpoint(ctx context.Context, request interface{}) (response interface{}, err error) {
	correlationID := ctx.Value(dtx.CorrelationID).(string)
	orderTable[correlationID] = request
	return OrderResponse{
		OrderID: "1",
		Sku:     "1",
		Cost:    10.0,
	}, nil
}

func orderCancelEndpoint(ctx context.Context, request interface{}) (response interface{}, err error) {
	correlationID := ctx.Value(dtx.CorrelationID).(string)
	delete(orderTable, correlationID)
	return nil, nil
}

func paymentEndpoint(ctx context.Context, request interface{}) (response interface{}, err error) {
	correlationID := ctx.Value(dtx.CorrelationID).(string)
	paymentTable[correlationID] = request
	if request.(PaymentRequest).Cost < 20 {
		return PaymentResponse{
			Success: true,
		}, nil
	}
	return PaymentResponse{
		Success: false,
	}, nil
}

func paymentCancelEndpoint(ctx context.Context) (response interface{}, err error) {
	correlationID := ctx.Value(dtx.CorrelationID).(string)
	delete(paymentTable, correlationID)
	return nil, nil
}

func main() {
	store := sagas.NewInProcessStore()
	registry := sagas.NewRegistry(store)
	addOrder := registry.AddStep(&sagas.Step{
		Name: "Add Order",
		Do: func(ctx context.Context, request interface{}) (response interface{}, err error) {
			resp, err := orderEndpoint(ctx, request.(OrderRequest))
			if err != nil {
				return nil, err
			}
			// Convert the response to next request
			return resp, nil
		},
		Undo: func(ctx context.Context, req interface{}) (response interface{}, err error) {
			return orderCancelEndpoint(ctx, req)
		},
	})
	makePayment := registry.AddStep(&sagas.Step{
		Name: "Make Payment",
		Do: func(ctx context.Context, request interface{}) (response interface{}, err error) {
			resp, err := paymentEndpoint(ctx, request.(PaymentRequest))
			if err != nil {
				return nil, err
			}
			return resp, nil
		},
		Undo: func(ctx context.Context, req interface{}) (response interface{}, err error) {
			return paymentCancelEndpoint(ctx)
		},
	})

	tx, ctx := registry.StartTX(context.Background())
	resp, err := addOrder(ctx, OrderRequest{Sku: "1"})
	if err != nil {
		tx.Rollback(ctx)
	}
	resp, err = makePayment(ctx, PaymentRequest{
		OrderID: resp.(OrderResponse).OrderID,
		Sku:     resp.(OrderResponse).Sku,
		Cost:    resp.(OrderResponse).Cost,
	})
	if err != nil {
		tx.Rollback(ctx)
	}
	tx.Commit(ctx)
	fmt.Println(resp.(PaymentResponse).Success)

}
Output:

true

Index

Examples

Constants

View Source
const TxContextKey contextKey = "coordinator"

TxContextKey is the context key for TX.

Variables

This section is empty.

Functions

func Providers

func Providers() di.Deps

Providers returns a set of dependency providers.

Depends On:
	contract.ConfigAccessor
	log.Logger
	Store   `optional:"true"`
	[]*Step `group:"saga"`
Provide:
	*Registry
	SagaEndpoints

Types

type InProcessStore

type InProcessStore struct {
	// contains filtered or unexported fields
}

InProcessStore creates an in process storage that implements Store.

func NewInProcessStore

func NewInProcessStore() *InProcessStore

NewInProcessStore creates a InProcessStore.

func (*InProcessStore) Ack

func (i *InProcessStore) Ack(ctx context.Context, logID string, err error) error

Ack marks the log entry as acknowledged, either with an error or not. It is safe to call ack to the same log entry more than once.

func (*InProcessStore) Log

func (i *InProcessStore) Log(ctx context.Context, log Log) error

Log appends a new unacknowledged log entry to the store.

func (*InProcessStore) UnacknowledgedSteps

func (i *InProcessStore) UnacknowledgedSteps(ctx context.Context, correlationID string) ([]Log, error)

UnacknowledgedSteps searches the InProcessStore for unacknowledged steps under the given correlationID.

func (*InProcessStore) UncommittedSagas

func (i *InProcessStore) UncommittedSagas(ctx context.Context) ([]Log, error)

UncommittedSagas searches the store for all uncommitted sagas, and return log entries under the matching sagas.

type Log

type Log struct {
	ID string

	StartedAt  time.Time
	FinishedAt time.Time
	LogType    LogType
	StepNumber int
	StepParam  interface{}
	StepName   string
	StepError  error
	// contains filtered or unexported fields
}

Log is the structural Log type of the distributed saga.

type LogType

type LogType uint

LogType is a type enum that describes the types of Log.

const (
	// Session type logs the occurrence of a new distributed transaction.
	Session LogType = iota
	// Do type logs an incremental action in the distributed saga step.
	Do
	// Undo type logs a compensation action in the distributed saga step.
	Undo
)

type Option

type Option func(registry *Registry)

Option is the functional option for NewRegistry.

func WithLogger

func WithLogger(logger log.Logger) Option

WithLogger is an option that adds a logger to the registry.

func WithTimeout

func WithTimeout(duration time.Duration) Option

WithTimeout is an option that configures when the unacknowledged steps should be marked as stale and become candidates for rollback.

type Registry

type Registry struct {
	Store Store
	// contains filtered or unexported fields
}

Registry holds all transaction sagas in this process. It should be populated during the initialization of the application.

func NewRegistry

func NewRegistry(store Store, opts ...Option) *Registry

NewRegistry creates a new Registry.

func (*Registry) AddStep

func (r *Registry) AddStep(step *Step) endpoint.Endpoint

AddStep registers the saga steps in the registry. The registration should be done during the bootstrapping of application.

func (*Registry) Recover

func (r *Registry) Recover(ctx context.Context)

Recover rollbacks all uncommitted sagas by retrieving them in the store.

func (*Registry) StartTX

func (r *Registry) StartTX(ctx context.Context) (*TX, context.Context)

StartTX starts a transaction using saga pattern.

type SagaEndpoints

type SagaEndpoints map[string]endpoint.Endpoint

SagaEndpoints is a collection of all registered endpoint in the saga registry

type Step

type Step struct {
	Name string
	Do   endpoint.Endpoint
	Undo endpoint.Endpoint
}

Step is a step in the Saga.

type Store

type Store interface {
	Log(ctx context.Context, log Log) error
	Ack(ctx context.Context, id string, err error) error
	UnacknowledgedSteps(ctx context.Context, correlationID string) ([]Log, error)
	UncommittedSagas(ctx context.Context) ([]Log, error)
}

Store is the interface to persist logs of transactions.

type TX

type TX struct {
	// contains filtered or unexported fields
}

TX is a distributed transaction coordinator. It should be initialized by directly assigning its public members.

func TxFromContext

func TxFromContext(ctx context.Context) *TX

TxFromContext returns the tx instance from context.

func (*TX) Commit

func (tx *TX) Commit(ctx context.Context) error

Commit commits the current transaction.

func (*TX) Rollback

func (tx *TX) Rollback(ctx context.Context) error

Rollback rollbacks the current transaction.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL