Documentation
¶
Index ¶
- type CancelBookingCompensationHandler
- type CompensationHandler
- type Compensator
- type CreateBookingStepHandler
- type CreateUserStepHandler
- type DefaultCompensator
- func (dc *DefaultCompensator) CompensateStep(ctx context.Context, instance *SagaInstance, step *SagaStep) (*SagaStepResult, error)
- func (dc *DefaultCompensator) GetCompensationHandler(compensation string) (CompensationHandler, error)
- func (dc *DefaultCompensator) GetRegisteredHandlers() []string
- func (dc *DefaultCompensator) RegisterCompensationHandler(handler CompensationHandler) error
- type DefaultOrchestrator
- func (do *DefaultOrchestrator) ExecuteStep(ctx context.Context, instance *SagaInstance, step *SagaStep) (*SagaStepResult, error)
- func (do *DefaultOrchestrator) GetRegisteredHandlers() []string
- func (do *DefaultOrchestrator) GetStepHandler(action string) (StepHandler, error)
- func (do *DefaultOrchestrator) RegisterStepHandler(handler StepHandler) error
- type DeleteUserCompensationHandler
- type Orchestrator
- type ProcessPaymentStepHandler
- type RefundPaymentCompensationHandler
- type RetryPolicy
- type SagaConfig
- type SagaDefinition
- type SagaDefinitionRegistry
- type SagaFilter
- type SagaInstance
- type SagaManager
- func (sm *SagaManager) CancelSaga(ctx context.Context, sagaID string, reason string) error
- func (sm *SagaManager) Close() error
- func (sm *SagaManager) ContinueSaga(ctx context.Context, sagaID string) error
- func (sm *SagaManager) GetSagaStatus(ctx context.Context, sagaID string) (*SagaInstance, error)
- func (sm *SagaManager) HealthCheck(ctx context.Context) error
- func (sm *SagaManager) ListSagas(ctx context.Context, filter SagaFilter, limit, offset int) ([]*SagaInstance, int64, error)
- func (sm *SagaManager) RegisterDefinition(definition *SagaDefinition) error
- func (sm *SagaManager) StartSaga(ctx context.Context, sagaType string, data map[string]interface{}, ...) (*SagaInstance, error)
- type SagaRepository
- func (sr *SagaRepository) CleanupOld(ctx context.Context) (int64, error)
- func (sr *SagaRepository) Close() error
- func (sr *SagaRepository) Delete(ctx context.Context, sagaID string) error
- func (sr *SagaRepository) GetByCorrelationID(ctx context.Context, correlationID string) ([]*SagaInstance, error)
- func (sr *SagaRepository) GetByID(ctx context.Context, sagaID string) (*SagaInstance, error)
- func (sr *SagaRepository) GetRunning(ctx context.Context) ([]*SagaInstance, error)
- func (sr *SagaRepository) GetStatistics(ctx context.Context, from, to time.Time) (*SagaStatistics, error)
- func (sr *SagaRepository) GetStuck(ctx context.Context, timeout time.Duration) ([]*SagaInstance, error)
- func (sr *SagaRepository) HealthCheck(ctx context.Context) error
- func (sr *SagaRepository) List(ctx context.Context, filter SagaFilter, limit, offset int) ([]*SagaInstance, int64, error)
- func (sr *SagaRepository) Save(ctx context.Context, instance *SagaInstance) error
- func (sr *SagaRepository) Update(ctx context.Context, instance *SagaInstance) error
- type SagaRepositoryConfig
- type SagaStatistics
- type SagaStatus
- type SagaStep
- type SagaStepResult
- type SendWelcomeEmailStepHandler
- type StepHandler
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type CancelBookingCompensationHandler ¶
type CancelBookingCompensationHandler struct {
// contains filtered or unexported fields
}
CancelBookingCompensationHandler handles booking cancellation compensation
func NewCancelBookingCompensationHandler ¶
func NewCancelBookingCompensationHandler(logger *logrus.Logger) *CancelBookingCompensationHandler
NewCancelBookingCompensationHandler creates a new cancel booking compensation handler
func (*CancelBookingCompensationHandler) Compensate ¶
func (cbch *CancelBookingCompensationHandler) Compensate(ctx context.Context, data map[string]interface{}) error
Compensate executes booking cancellation compensation
func (*CancelBookingCompensationHandler) GetName ¶
func (cbch *CancelBookingCompensationHandler) GetName() string
GetName returns the compensation handler name
type CompensationHandler ¶
type CompensationHandler interface {
Compensate(ctx context.Context, data map[string]interface{}) error
GetName() string
}
CompensationHandler defines the interface for compensation execution
type Compensator ¶
type Compensator interface {
CompensateStep(ctx context.Context, instance *SagaInstance, step *SagaStep) (*SagaStepResult, error)
GetCompensationHandler(compensation string) (CompensationHandler, error)
}
Compensator defines the interface for saga compensation
type CreateBookingStepHandler ¶
type CreateBookingStepHandler struct {
// contains filtered or unexported fields
}
CreateBookingStepHandler handles booking creation
func NewCreateBookingStepHandler ¶
func NewCreateBookingStepHandler(logger *logrus.Logger) *CreateBookingStepHandler
NewCreateBookingStepHandler creates a new create booking step handler
func (*CreateBookingStepHandler) Execute ¶
func (cbsh *CreateBookingStepHandler) Execute(ctx context.Context, data map[string]interface{}) (map[string]interface{}, error)
Execute executes the create booking step
func (*CreateBookingStepHandler) GetName ¶
func (cbsh *CreateBookingStepHandler) GetName() string
GetName returns the handler name
type CreateUserStepHandler ¶
type CreateUserStepHandler struct {
// contains filtered or unexported fields
}
CreateUserStepHandler handles user creation
func NewCreateUserStepHandler ¶
func NewCreateUserStepHandler(logger *logrus.Logger) *CreateUserStepHandler
NewCreateUserStepHandler creates a new create user step handler
func (*CreateUserStepHandler) Execute ¶
func (cush *CreateUserStepHandler) Execute(ctx context.Context, data map[string]interface{}) (map[string]interface{}, error)
Execute executes the create user step
func (*CreateUserStepHandler) GetName ¶
func (cush *CreateUserStepHandler) GetName() string
GetName returns the handler name
type DefaultCompensator ¶
type DefaultCompensator struct {
// contains filtered or unexported fields
}
DefaultCompensator provides the default implementation of saga compensation
func NewDefaultCompensator ¶
func NewDefaultCompensator(logger *logrus.Logger) *DefaultCompensator
NewDefaultCompensator creates a new default compensator
func (*DefaultCompensator) CompensateStep ¶
func (dc *DefaultCompensator) CompensateStep(ctx context.Context, instance *SagaInstance, step *SagaStep) (*SagaStepResult, error)
CompensateStep executes compensation for a saga step
func (*DefaultCompensator) GetCompensationHandler ¶
func (dc *DefaultCompensator) GetCompensationHandler(compensation string) (CompensationHandler, error)
GetCompensationHandler retrieves a compensation handler by name
func (*DefaultCompensator) GetRegisteredHandlers ¶
func (dc *DefaultCompensator) GetRegisteredHandlers() []string
GetRegisteredHandlers returns all registered compensation handlers
func (*DefaultCompensator) RegisterCompensationHandler ¶
func (dc *DefaultCompensator) RegisterCompensationHandler(handler CompensationHandler) error
RegisterCompensationHandler registers a compensation handler
type DefaultOrchestrator ¶
type DefaultOrchestrator struct {
// contains filtered or unexported fields
}
DefaultOrchestrator provides the default implementation of saga orchestration
func NewDefaultOrchestrator ¶
func NewDefaultOrchestrator(logger *logrus.Logger) *DefaultOrchestrator
NewDefaultOrchestrator creates a new default orchestrator
func (*DefaultOrchestrator) ExecuteStep ¶
func (do *DefaultOrchestrator) ExecuteStep(ctx context.Context, instance *SagaInstance, step *SagaStep) (*SagaStepResult, error)
ExecuteStep executes a saga step
func (*DefaultOrchestrator) GetRegisteredHandlers ¶
func (do *DefaultOrchestrator) GetRegisteredHandlers() []string
GetRegisteredHandlers returns all registered step handlers
func (*DefaultOrchestrator) GetStepHandler ¶
func (do *DefaultOrchestrator) GetStepHandler(action string) (StepHandler, error)
GetStepHandler retrieves a step handler by name
func (*DefaultOrchestrator) RegisterStepHandler ¶
func (do *DefaultOrchestrator) RegisterStepHandler(handler StepHandler) error
RegisterStepHandler registers a step handler
type DeleteUserCompensationHandler ¶
type DeleteUserCompensationHandler struct {
// contains filtered or unexported fields
}
DeleteUserCompensationHandler handles user deletion compensation
func NewDeleteUserCompensationHandler ¶
func NewDeleteUserCompensationHandler(logger *logrus.Logger) *DeleteUserCompensationHandler
NewDeleteUserCompensationHandler creates a new delete user compensation handler
func (*DeleteUserCompensationHandler) Compensate ¶
func (duch *DeleteUserCompensationHandler) Compensate(ctx context.Context, data map[string]interface{}) error
Compensate executes user deletion compensation
func (*DeleteUserCompensationHandler) GetName ¶
func (duch *DeleteUserCompensationHandler) GetName() string
GetName returns the compensation handler name
type Orchestrator ¶
type Orchestrator interface {
ExecuteStep(ctx context.Context, instance *SagaInstance, step *SagaStep) (*SagaStepResult, error)
GetStepHandler(action string) (StepHandler, error)
}
Orchestrator defines the interface for saga orchestration
type ProcessPaymentStepHandler ¶
type ProcessPaymentStepHandler struct {
// contains filtered or unexported fields
}
ProcessPaymentStepHandler handles payment processing
func NewProcessPaymentStepHandler ¶
func NewProcessPaymentStepHandler(logger *logrus.Logger) *ProcessPaymentStepHandler
NewProcessPaymentStepHandler creates a new process payment step handler
func (*ProcessPaymentStepHandler) Execute ¶
func (ppsh *ProcessPaymentStepHandler) Execute(ctx context.Context, data map[string]interface{}) (map[string]interface{}, error)
Execute executes the process payment step
func (*ProcessPaymentStepHandler) GetName ¶
func (ppsh *ProcessPaymentStepHandler) GetName() string
GetName returns the handler name
type RefundPaymentCompensationHandler ¶
type RefundPaymentCompensationHandler struct {
// contains filtered or unexported fields
}
RefundPaymentCompensationHandler handles payment refund compensation
func NewRefundPaymentCompensationHandler ¶
func NewRefundPaymentCompensationHandler(logger *logrus.Logger) *RefundPaymentCompensationHandler
NewRefundPaymentCompensationHandler creates a new refund payment compensation handler
func (*RefundPaymentCompensationHandler) Compensate ¶
func (rpch *RefundPaymentCompensationHandler) Compensate(ctx context.Context, data map[string]interface{}) error
Compensate executes payment refund compensation
func (*RefundPaymentCompensationHandler) GetName ¶
func (rpch *RefundPaymentCompensationHandler) GetName() string
GetName returns the compensation handler name
type RetryPolicy ¶
type RetryPolicy struct {
MaxAttempts int `json:"max_attempts"`
Delay time.Duration `json:"delay"`
Backoff string `json:"backoff"` // linear, exponential
}
RetryPolicy defines retry behavior for saga steps
type SagaConfig ¶
type SagaConfig struct {
TimeoutDuration time.Duration `json:"timeout_duration"`
RetryAttempts int `json:"retry_attempts"`
RetryDelay time.Duration `json:"retry_delay"`
CompensationTimeout time.Duration `json:"compensation_timeout"`
EnableMetrics bool `json:"enable_metrics"`
BatchSize int `json:"batch_size"`
}
SagaConfig holds saga manager configuration
type SagaDefinition ¶
SagaDefinition defines the structure of a saga
type SagaDefinitionRegistry ¶
type SagaDefinitionRegistry struct {
// contains filtered or unexported fields
}
SagaDefinitionRegistry manages saga definitions
func NewSagaDefinitionRegistry ¶
func NewSagaDefinitionRegistry() *SagaDefinitionRegistry
NewSagaDefinitionRegistry creates a new saga definition registry
func (*SagaDefinitionRegistry) Get ¶
func (sdr *SagaDefinitionRegistry) Get(name string) (*SagaDefinition, error)
Get retrieves a saga definition by name
func (*SagaDefinitionRegistry) GetAll ¶
func (sdr *SagaDefinitionRegistry) GetAll() map[string]*SagaDefinition
GetAll returns all registered saga definitions
func (*SagaDefinitionRegistry) Register ¶
func (sdr *SagaDefinitionRegistry) Register(definition *SagaDefinition) error
Register registers a saga definition
type SagaFilter ¶
type SagaFilter struct {
SagaType string `json:"saga_type,omitempty"`
Status SagaStatus `json:"status,omitempty"`
TenantID string `json:"tenant_id,omitempty"`
CorrelationID string `json:"correlation_id,omitempty"`
CreatedBy string `json:"created_by,omitempty"`
StartedAfter *time.Time `json:"started_after,omitempty"`
StartedBefore *time.Time `json:"started_before,omitempty"`
}
SagaFilter defines filtering options for saga queries
type SagaInstance ¶
type SagaInstance struct {
ID string `gorm:"type:uuid;primary_key;default:gen_random_uuid()" json:"id"`
SagaType string `gorm:"type:varchar(255);not null;index" json:"saga_type"`
Status SagaStatus `gorm:"type:varchar(50);not null" json:"status"`
CurrentStep int `gorm:"not null;default:0" json:"current_step"`
Data string `gorm:"type:jsonb" json:"data"`
CompletedSteps string `gorm:"type:jsonb" json:"completed_steps"`
FailedSteps string `gorm:"type:jsonb" json:"failed_steps"`
CompensatedSteps string `gorm:"type:jsonb" json:"compensated_steps"`
StartedAt time.Time `gorm:"not null" json:"started_at"`
CompletedAt *time.Time `json:"completed_at,omitempty"`
LastError string `json:"last_error,omitempty"`
TenantID string `gorm:"type:uuid;index" json:"tenant_id"`
CorrelationID string `gorm:"type:uuid;index" json:"correlation_id"`
CreatedBy string `gorm:"type:uuid" json:"created_by"`
UpdatedAt time.Time `gorm:"autoUpdateTime" json:"updated_at"`
}
SagaInstance represents an instance of a running saga
type SagaManager ¶
type SagaManager struct {
// contains filtered or unexported fields
}
SagaManager manages distributed transactions using the Saga pattern
func NewSagaManager ¶
func NewSagaManager( repository *SagaRepository, orchestrator Orchestrator, compensator Compensator, config SagaConfig, logger *logrus.Logger, ) *SagaManager
NewSagaManager creates a new saga manager
func (*SagaManager) CancelSaga ¶
CancelSaga cancels a running saga and triggers compensation
func (*SagaManager) ContinueSaga ¶
func (sm *SagaManager) ContinueSaga(ctx context.Context, sagaID string) error
ContinueSaga continues an existing saga instance
func (*SagaManager) GetSagaStatus ¶
func (sm *SagaManager) GetSagaStatus(ctx context.Context, sagaID string) (*SagaInstance, error)
GetSagaStatus returns the status of a saga instance
func (*SagaManager) HealthCheck ¶
func (sm *SagaManager) HealthCheck(ctx context.Context) error
HealthCheck performs a health check on the saga manager
func (*SagaManager) ListSagas ¶
func (sm *SagaManager) ListSagas(ctx context.Context, filter SagaFilter, limit, offset int) ([]*SagaInstance, int64, error)
ListSagas lists saga instances with optional filtering
func (*SagaManager) RegisterDefinition ¶
func (sm *SagaManager) RegisterDefinition(definition *SagaDefinition) error
RegisterDefinition registers a saga definition
func (*SagaManager) StartSaga ¶
func (sm *SagaManager) StartSaga(ctx context.Context, sagaType string, data map[string]interface{}, tenantID, userID string) (*SagaInstance, error)
StartSaga starts a new saga instance
type SagaRepository ¶
type SagaRepository struct {
// contains filtered or unexported fields
}
SagaRepository handles persistence of saga instances
func NewSagaRepository ¶
func NewSagaRepository(db *gorm.DB, config SagaRepositoryConfig, logger *logrus.Logger) (*SagaRepository, error)
NewSagaRepository creates a new saga repository
func (*SagaRepository) CleanupOld ¶
func (sr *SagaRepository) CleanupOld(ctx context.Context) (int64, error)
CleanupOld removes old saga instances based on retention period
func (*SagaRepository) Delete ¶
func (sr *SagaRepository) Delete(ctx context.Context, sagaID string) error
Delete deletes a saga instance
func (*SagaRepository) GetByCorrelationID ¶
func (sr *SagaRepository) GetByCorrelationID(ctx context.Context, correlationID string) ([]*SagaInstance, error)
GetByCorrelationID retrieves saga instances by correlation ID
func (*SagaRepository) GetByID ¶
func (sr *SagaRepository) GetByID(ctx context.Context, sagaID string) (*SagaInstance, error)
GetByID retrieves a saga instance by ID
func (*SagaRepository) GetRunning ¶
func (sr *SagaRepository) GetRunning(ctx context.Context) ([]*SagaInstance, error)
GetRunning retrieves all running saga instances
func (*SagaRepository) GetStatistics ¶
func (sr *SagaRepository) GetStatistics(ctx context.Context, from, to time.Time) (*SagaStatistics, error)
GetStatistics returns saga statistics
func (*SagaRepository) GetStuck ¶
func (sr *SagaRepository) GetStuck(ctx context.Context, timeout time.Duration) ([]*SagaInstance, error)
GetStuck retrieves saga instances that appear to be stuck
func (*SagaRepository) HealthCheck ¶
func (sr *SagaRepository) HealthCheck(ctx context.Context) error
HealthCheck performs a health check on the repository
func (*SagaRepository) List ¶
func (sr *SagaRepository) List(ctx context.Context, filter SagaFilter, limit, offset int) ([]*SagaInstance, int64, error)
List retrieves saga instances with filtering and pagination
func (*SagaRepository) Save ¶
func (sr *SagaRepository) Save(ctx context.Context, instance *SagaInstance) error
Save saves a saga instance
func (*SagaRepository) Update ¶
func (sr *SagaRepository) Update(ctx context.Context, instance *SagaInstance) error
Update updates a saga instance
type SagaRepositoryConfig ¶
type SagaRepositoryConfig struct {
TableName string `json:"table_name"`
RetentionPeriod time.Duration `json:"retention_period"`
EnableMetrics bool `json:"enable_metrics"`
}
SagaRepositoryConfig holds repository configuration
type SagaStatistics ¶
type SagaStatistics struct {
FromDate time.Time `json:"from_date"`
ToDate time.Time `json:"to_date"`
TotalCount int64 `json:"total_count"`
StatusCounts map[SagaStatus]int64 `json:"status_counts"`
TypeCounts map[string]int64 `json:"type_counts"`
AverageDurationSeconds float64 `json:"average_duration_seconds"`
}
SagaStatistics represents saga statistics
type SagaStatus ¶
type SagaStatus string
SagaStatus represents the status of a saga instance
const ( SagaStatusPending SagaStatus = "pending" SagaStatusRunning SagaStatus = "running" SagaStatusCompleted SagaStatus = "completed" SagaStatusFailed SagaStatus = "failed" SagaStatusCompensating SagaStatus = "compensating" SagaStatusCompensated SagaStatus = "compensated" SagaStatusTimeout SagaStatus = "timeout" SagaStatusCancelled SagaStatus = "cancelled" )
type SagaStep ¶
type SagaStep struct {
Name string `json:"name"`
Action string `json:"action"`
Compensation string `json:"compensation"`
Parameters map[string]interface{} `json:"parameters"`
Timeout time.Duration `json:"timeout"`
RetryPolicy RetryPolicy `json:"retry_policy"`
DependsOn []string `json:"depends_on"`
Critical bool `json:"critical"`
}
SagaStep represents a single step in a saga
type SagaStepResult ¶
type SagaStepResult struct {
StepName string `json:"step_name"`
Success bool `json:"success"`
Error error `json:"error,omitempty"`
Data map[string]interface{} `json:"data,omitempty"`
Compensated bool `json:"compensated"`
Duration time.Duration `json:"duration"`
Attempt int `json:"attempt"`
}
SagaStepResult represents the result of a saga step execution
type SendWelcomeEmailStepHandler ¶
type SendWelcomeEmailStepHandler struct {
// contains filtered or unexported fields
}
SendWelcomeEmailStepHandler handles sending welcome emails
func NewSendWelcomeEmailStepHandler ¶
func NewSendWelcomeEmailStepHandler(logger *logrus.Logger) *SendWelcomeEmailStepHandler
NewSendWelcomeEmailStepHandler creates a new send welcome email step handler
func (*SendWelcomeEmailStepHandler) Execute ¶
func (swesh *SendWelcomeEmailStepHandler) Execute(ctx context.Context, data map[string]interface{}) (map[string]interface{}, error)
Execute executes the send welcome email step
func (*SendWelcomeEmailStepHandler) GetName ¶
func (swesh *SendWelcomeEmailStepHandler) GetName() string
GetName returns the handler name