saga

package
v1.0.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 20, 2025 License: MIT Imports: 9 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type CancelBookingCompensationHandler

type CancelBookingCompensationHandler struct {
	// contains filtered or unexported fields
}

CancelBookingCompensationHandler handles booking cancellation compensation

func NewCancelBookingCompensationHandler

func NewCancelBookingCompensationHandler(logger *logrus.Logger) *CancelBookingCompensationHandler

NewCancelBookingCompensationHandler creates a new cancel booking compensation handler

func (*CancelBookingCompensationHandler) Compensate

func (cbch *CancelBookingCompensationHandler) Compensate(ctx context.Context, data map[string]interface{}) error

Compensate executes booking cancellation compensation

func (*CancelBookingCompensationHandler) GetName

func (cbch *CancelBookingCompensationHandler) GetName() string

GetName returns the compensation handler name

type CompensationHandler

type CompensationHandler interface {
	Compensate(ctx context.Context, data map[string]interface{}) error
	GetName() string
}

CompensationHandler defines the interface for compensation execution

type Compensator

type Compensator interface {
	CompensateStep(ctx context.Context, instance *SagaInstance, step *SagaStep) (*SagaStepResult, error)
	GetCompensationHandler(compensation string) (CompensationHandler, error)
}

Compensator defines the interface for saga compensation

type CreateBookingStepHandler

type CreateBookingStepHandler struct {
	// contains filtered or unexported fields
}

CreateBookingStepHandler handles booking creation

func NewCreateBookingStepHandler

func NewCreateBookingStepHandler(logger *logrus.Logger) *CreateBookingStepHandler

NewCreateBookingStepHandler creates a new create booking step handler

func (*CreateBookingStepHandler) Execute

func (cbsh *CreateBookingStepHandler) Execute(ctx context.Context, data map[string]interface{}) (map[string]interface{}, error)

Execute executes the create booking step

func (*CreateBookingStepHandler) GetName

func (cbsh *CreateBookingStepHandler) GetName() string

GetName returns the handler name

type CreateUserStepHandler

type CreateUserStepHandler struct {
	// contains filtered or unexported fields
}

CreateUserStepHandler handles user creation

func NewCreateUserStepHandler

func NewCreateUserStepHandler(logger *logrus.Logger) *CreateUserStepHandler

NewCreateUserStepHandler creates a new create user step handler

func (*CreateUserStepHandler) Execute

func (cush *CreateUserStepHandler) Execute(ctx context.Context, data map[string]interface{}) (map[string]interface{}, error)

Execute executes the create user step

func (*CreateUserStepHandler) GetName

func (cush *CreateUserStepHandler) GetName() string

GetName returns the handler name

type DefaultCompensator

type DefaultCompensator struct {
	// contains filtered or unexported fields
}

DefaultCompensator provides the default implementation of saga compensation

func NewDefaultCompensator

func NewDefaultCompensator(logger *logrus.Logger) *DefaultCompensator

NewDefaultCompensator creates a new default compensator

func (*DefaultCompensator) CompensateStep

func (dc *DefaultCompensator) CompensateStep(ctx context.Context, instance *SagaInstance, step *SagaStep) (*SagaStepResult, error)

CompensateStep executes compensation for a saga step

func (*DefaultCompensator) GetCompensationHandler

func (dc *DefaultCompensator) GetCompensationHandler(compensation string) (CompensationHandler, error)

GetCompensationHandler retrieves a compensation handler by name

func (*DefaultCompensator) GetRegisteredHandlers

func (dc *DefaultCompensator) GetRegisteredHandlers() []string

GetRegisteredHandlers returns all registered compensation handlers

func (*DefaultCompensator) RegisterCompensationHandler

func (dc *DefaultCompensator) RegisterCompensationHandler(handler CompensationHandler) error

RegisterCompensationHandler registers a compensation handler

type DefaultOrchestrator

type DefaultOrchestrator struct {
	// contains filtered or unexported fields
}

DefaultOrchestrator provides the default implementation of saga orchestration

func NewDefaultOrchestrator

func NewDefaultOrchestrator(logger *logrus.Logger) *DefaultOrchestrator

NewDefaultOrchestrator creates a new default orchestrator

func (*DefaultOrchestrator) ExecuteStep

func (do *DefaultOrchestrator) ExecuteStep(ctx context.Context, instance *SagaInstance, step *SagaStep) (*SagaStepResult, error)

ExecuteStep executes a saga step

func (*DefaultOrchestrator) GetRegisteredHandlers

func (do *DefaultOrchestrator) GetRegisteredHandlers() []string

GetRegisteredHandlers returns all registered step handlers

func (*DefaultOrchestrator) GetStepHandler

func (do *DefaultOrchestrator) GetStepHandler(action string) (StepHandler, error)

GetStepHandler retrieves a step handler by name

func (*DefaultOrchestrator) RegisterStepHandler

func (do *DefaultOrchestrator) RegisterStepHandler(handler StepHandler) error

RegisterStepHandler registers a step handler

type DeleteUserCompensationHandler

type DeleteUserCompensationHandler struct {
	// contains filtered or unexported fields
}

DeleteUserCompensationHandler handles user deletion compensation

func NewDeleteUserCompensationHandler

func NewDeleteUserCompensationHandler(logger *logrus.Logger) *DeleteUserCompensationHandler

NewDeleteUserCompensationHandler creates a new delete user compensation handler

func (*DeleteUserCompensationHandler) Compensate

func (duch *DeleteUserCompensationHandler) Compensate(ctx context.Context, data map[string]interface{}) error

Compensate executes user deletion compensation

func (*DeleteUserCompensationHandler) GetName

func (duch *DeleteUserCompensationHandler) GetName() string

GetName returns the compensation handler name

type Orchestrator

type Orchestrator interface {
	ExecuteStep(ctx context.Context, instance *SagaInstance, step *SagaStep) (*SagaStepResult, error)
	GetStepHandler(action string) (StepHandler, error)
}

Orchestrator defines the interface for saga orchestration

type ProcessPaymentStepHandler

type ProcessPaymentStepHandler struct {
	// contains filtered or unexported fields
}

ProcessPaymentStepHandler handles payment processing

func NewProcessPaymentStepHandler

func NewProcessPaymentStepHandler(logger *logrus.Logger) *ProcessPaymentStepHandler

NewProcessPaymentStepHandler creates a new process payment step handler

func (*ProcessPaymentStepHandler) Execute

func (ppsh *ProcessPaymentStepHandler) Execute(ctx context.Context, data map[string]interface{}) (map[string]interface{}, error)

Execute executes the process payment step

func (*ProcessPaymentStepHandler) GetName

func (ppsh *ProcessPaymentStepHandler) GetName() string

GetName returns the handler name

type RefundPaymentCompensationHandler

type RefundPaymentCompensationHandler struct {
	// contains filtered or unexported fields
}

RefundPaymentCompensationHandler handles payment refund compensation

func NewRefundPaymentCompensationHandler

func NewRefundPaymentCompensationHandler(logger *logrus.Logger) *RefundPaymentCompensationHandler

NewRefundPaymentCompensationHandler creates a new refund payment compensation handler

func (*RefundPaymentCompensationHandler) Compensate

func (rpch *RefundPaymentCompensationHandler) Compensate(ctx context.Context, data map[string]interface{}) error

Compensate executes payment refund compensation

func (*RefundPaymentCompensationHandler) GetName

func (rpch *RefundPaymentCompensationHandler) GetName() string

GetName returns the compensation handler name

type RetryPolicy

type RetryPolicy struct {
	MaxAttempts int           `json:"max_attempts"`
	Delay       time.Duration `json:"delay"`
	Backoff     string        `json:"backoff"` // linear, exponential
}

RetryPolicy defines retry behavior for saga steps

type SagaConfig

type SagaConfig struct {
	TimeoutDuration     time.Duration `json:"timeout_duration"`
	RetryAttempts       int           `json:"retry_attempts"`
	RetryDelay          time.Duration `json:"retry_delay"`
	CompensationTimeout time.Duration `json:"compensation_timeout"`
	EnableMetrics       bool          `json:"enable_metrics"`
	BatchSize           int           `json:"batch_size"`
}

SagaConfig holds saga manager configuration

type SagaDefinition

type SagaDefinition struct {
	Name  string     `json:"name"`
	Steps []SagaStep `json:"steps"`
}

SagaDefinition defines the structure of a saga

type SagaDefinitionRegistry

type SagaDefinitionRegistry struct {
	// contains filtered or unexported fields
}

SagaDefinitionRegistry manages saga definitions

func NewSagaDefinitionRegistry

func NewSagaDefinitionRegistry() *SagaDefinitionRegistry

NewSagaDefinitionRegistry creates a new saga definition registry

func (*SagaDefinitionRegistry) Get

Get retrieves a saga definition by name

func (*SagaDefinitionRegistry) GetAll

func (sdr *SagaDefinitionRegistry) GetAll() map[string]*SagaDefinition

GetAll returns all registered saga definitions

func (*SagaDefinitionRegistry) Register

func (sdr *SagaDefinitionRegistry) Register(definition *SagaDefinition) error

Register registers a saga definition

type SagaFilter

type SagaFilter struct {
	SagaType      string     `json:"saga_type,omitempty"`
	Status        SagaStatus `json:"status,omitempty"`
	TenantID      string     `json:"tenant_id,omitempty"`
	CorrelationID string     `json:"correlation_id,omitempty"`
	CreatedBy     string     `json:"created_by,omitempty"`
	StartedAfter  *time.Time `json:"started_after,omitempty"`
	StartedBefore *time.Time `json:"started_before,omitempty"`
}

SagaFilter defines filtering options for saga queries

type SagaInstance

type SagaInstance struct {
	ID               string     `gorm:"type:uuid;primary_key;default:gen_random_uuid()" json:"id"`
	SagaType         string     `gorm:"type:varchar(255);not null;index" json:"saga_type"`
	Status           SagaStatus `gorm:"type:varchar(50);not null" json:"status"`
	CurrentStep      int        `gorm:"not null;default:0" json:"current_step"`
	Data             string     `gorm:"type:jsonb" json:"data"`
	CompletedSteps   string     `gorm:"type:jsonb" json:"completed_steps"`
	FailedSteps      string     `gorm:"type:jsonb" json:"failed_steps"`
	CompensatedSteps string     `gorm:"type:jsonb" json:"compensated_steps"`
	StartedAt        time.Time  `gorm:"not null" json:"started_at"`
	CompletedAt      *time.Time `json:"completed_at,omitempty"`
	LastError        string     `json:"last_error,omitempty"`
	TenantID         string     `gorm:"type:uuid;index" json:"tenant_id"`
	CorrelationID    string     `gorm:"type:uuid;index" json:"correlation_id"`
	CreatedBy        string     `gorm:"type:uuid" json:"created_by"`
	UpdatedAt        time.Time  `gorm:"autoUpdateTime" json:"updated_at"`
}

SagaInstance represents an instance of a running saga

type SagaManager

type SagaManager struct {
	// contains filtered or unexported fields
}

SagaManager manages distributed transactions using the Saga pattern

func NewSagaManager

func NewSagaManager(
	repository *SagaRepository,
	orchestrator Orchestrator,
	compensator Compensator,
	config SagaConfig,
	logger *logrus.Logger,
) *SagaManager

NewSagaManager creates a new saga manager

func (*SagaManager) CancelSaga

func (sm *SagaManager) CancelSaga(ctx context.Context, sagaID string, reason string) error

CancelSaga cancels a running saga and triggers compensation

func (*SagaManager) Close

func (sm *SagaManager) Close() error

Close closes the saga manager

func (*SagaManager) ContinueSaga

func (sm *SagaManager) ContinueSaga(ctx context.Context, sagaID string) error

ContinueSaga continues an existing saga instance

func (*SagaManager) GetSagaStatus

func (sm *SagaManager) GetSagaStatus(ctx context.Context, sagaID string) (*SagaInstance, error)

GetSagaStatus returns the status of a saga instance

func (*SagaManager) HealthCheck

func (sm *SagaManager) HealthCheck(ctx context.Context) error

HealthCheck performs a health check on the saga manager

func (*SagaManager) ListSagas

func (sm *SagaManager) ListSagas(ctx context.Context, filter SagaFilter, limit, offset int) ([]*SagaInstance, int64, error)

ListSagas lists saga instances with optional filtering

func (*SagaManager) RegisterDefinition

func (sm *SagaManager) RegisterDefinition(definition *SagaDefinition) error

RegisterDefinition registers a saga definition

func (*SagaManager) StartSaga

func (sm *SagaManager) StartSaga(ctx context.Context, sagaType string, data map[string]interface{}, tenantID, userID string) (*SagaInstance, error)

StartSaga starts a new saga instance

type SagaRepository

type SagaRepository struct {
	// contains filtered or unexported fields
}

SagaRepository handles persistence of saga instances

func NewSagaRepository

func NewSagaRepository(db *gorm.DB, config SagaRepositoryConfig, logger *logrus.Logger) (*SagaRepository, error)

NewSagaRepository creates a new saga repository

func (*SagaRepository) CleanupOld

func (sr *SagaRepository) CleanupOld(ctx context.Context) (int64, error)

CleanupOld removes old saga instances based on retention period

func (*SagaRepository) Close

func (sr *SagaRepository) Close() error

Close closes the repository

func (*SagaRepository) Delete

func (sr *SagaRepository) Delete(ctx context.Context, sagaID string) error

Delete deletes a saga instance

func (*SagaRepository) GetByCorrelationID

func (sr *SagaRepository) GetByCorrelationID(ctx context.Context, correlationID string) ([]*SagaInstance, error)

GetByCorrelationID retrieves saga instances by correlation ID

func (*SagaRepository) GetByID

func (sr *SagaRepository) GetByID(ctx context.Context, sagaID string) (*SagaInstance, error)

GetByID retrieves a saga instance by ID

func (*SagaRepository) GetRunning

func (sr *SagaRepository) GetRunning(ctx context.Context) ([]*SagaInstance, error)

GetRunning retrieves all running saga instances

func (*SagaRepository) GetStatistics

func (sr *SagaRepository) GetStatistics(ctx context.Context, from, to time.Time) (*SagaStatistics, error)

GetStatistics returns saga statistics

func (*SagaRepository) GetStuck

func (sr *SagaRepository) GetStuck(ctx context.Context, timeout time.Duration) ([]*SagaInstance, error)

GetStuck retrieves saga instances that appear to be stuck

func (*SagaRepository) HealthCheck

func (sr *SagaRepository) HealthCheck(ctx context.Context) error

HealthCheck performs a health check on the repository

func (*SagaRepository) List

func (sr *SagaRepository) List(ctx context.Context, filter SagaFilter, limit, offset int) ([]*SagaInstance, int64, error)

List retrieves saga instances with filtering and pagination

func (*SagaRepository) Save

func (sr *SagaRepository) Save(ctx context.Context, instance *SagaInstance) error

Save saves a saga instance

func (*SagaRepository) Update

func (sr *SagaRepository) Update(ctx context.Context, instance *SagaInstance) error

Update updates a saga instance

type SagaRepositoryConfig

type SagaRepositoryConfig struct {
	TableName       string        `json:"table_name"`
	RetentionPeriod time.Duration `json:"retention_period"`
	EnableMetrics   bool          `json:"enable_metrics"`
}

SagaRepositoryConfig holds repository configuration

type SagaStatistics

type SagaStatistics struct {
	FromDate               time.Time            `json:"from_date"`
	ToDate                 time.Time            `json:"to_date"`
	TotalCount             int64                `json:"total_count"`
	StatusCounts           map[SagaStatus]int64 `json:"status_counts"`
	TypeCounts             map[string]int64     `json:"type_counts"`
	AverageDurationSeconds float64              `json:"average_duration_seconds"`
}

SagaStatistics represents saga statistics

type SagaStatus

type SagaStatus string

SagaStatus represents the status of a saga instance

const (
	SagaStatusPending      SagaStatus = "pending"
	SagaStatusRunning      SagaStatus = "running"
	SagaStatusCompleted    SagaStatus = "completed"
	SagaStatusFailed       SagaStatus = "failed"
	SagaStatusCompensating SagaStatus = "compensating"
	SagaStatusCompensated  SagaStatus = "compensated"
	SagaStatusTimeout      SagaStatus = "timeout"
	SagaStatusCancelled    SagaStatus = "cancelled"
)

type SagaStep

type SagaStep struct {
	Name         string                 `json:"name"`
	Action       string                 `json:"action"`
	Compensation string                 `json:"compensation"`
	Parameters   map[string]interface{} `json:"parameters"`
	Timeout      time.Duration          `json:"timeout"`
	RetryPolicy  RetryPolicy            `json:"retry_policy"`
	DependsOn    []string               `json:"depends_on"`
	Critical     bool                   `json:"critical"`
}

SagaStep represents a single step in a saga

type SagaStepResult

type SagaStepResult struct {
	StepName    string                 `json:"step_name"`
	Success     bool                   `json:"success"`
	Error       error                  `json:"error,omitempty"`
	Data        map[string]interface{} `json:"data,omitempty"`
	Compensated bool                   `json:"compensated"`
	Duration    time.Duration          `json:"duration"`
	Attempt     int                    `json:"attempt"`
}

SagaStepResult represents the result of a saga step execution

type SendWelcomeEmailStepHandler

type SendWelcomeEmailStepHandler struct {
	// contains filtered or unexported fields
}

SendWelcomeEmailStepHandler handles sending welcome emails

func NewSendWelcomeEmailStepHandler

func NewSendWelcomeEmailStepHandler(logger *logrus.Logger) *SendWelcomeEmailStepHandler

NewSendWelcomeEmailStepHandler creates a new send welcome email step handler

func (*SendWelcomeEmailStepHandler) Execute

func (swesh *SendWelcomeEmailStepHandler) Execute(ctx context.Context, data map[string]interface{}) (map[string]interface{}, error)

Execute executes the send welcome email step

func (*SendWelcomeEmailStepHandler) GetName

func (swesh *SendWelcomeEmailStepHandler) GetName() string

GetName returns the handler name

type StepHandler

type StepHandler interface {
	Execute(ctx context.Context, data map[string]interface{}) (map[string]interface{}, error)
	GetName() string
}

StepHandler defines the interface for step execution

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL