 Documentation
      ¶
      Documentation
      ¶
    
    
  
    
  
    Index ¶
- type CancelBookingCompensationHandler
- type CompensationHandler
- type Compensator
- type CreateBookingStepHandler
- type CreateUserStepHandler
- type DefaultCompensator
- func (dc *DefaultCompensator) CompensateStep(ctx context.Context, instance *SagaInstance, step *SagaStep) (*SagaStepResult, error)
- func (dc *DefaultCompensator) GetCompensationHandler(compensation string) (CompensationHandler, error)
- func (dc *DefaultCompensator) GetRegisteredHandlers() []string
- func (dc *DefaultCompensator) RegisterCompensationHandler(handler CompensationHandler) error
 
- type DefaultOrchestrator
- func (do *DefaultOrchestrator) ExecuteStep(ctx context.Context, instance *SagaInstance, step *SagaStep) (*SagaStepResult, error)
- func (do *DefaultOrchestrator) GetRegisteredHandlers() []string
- func (do *DefaultOrchestrator) GetStepHandler(action string) (StepHandler, error)
- func (do *DefaultOrchestrator) RegisterStepHandler(handler StepHandler) error
 
- type DeleteUserCompensationHandler
- type Orchestrator
- type ProcessPaymentStepHandler
- type RefundPaymentCompensationHandler
- type RetryPolicy
- type SagaConfig
- type SagaDefinition
- type SagaDefinitionRegistry
- type SagaFilter
- type SagaInstance
- type SagaManager
- func (sm *SagaManager) CancelSaga(ctx context.Context, sagaID string, reason string) error
- func (sm *SagaManager) Close() error
- func (sm *SagaManager) ContinueSaga(ctx context.Context, sagaID string) error
- func (sm *SagaManager) GetSagaStatus(ctx context.Context, sagaID string) (*SagaInstance, error)
- func (sm *SagaManager) HealthCheck(ctx context.Context) error
- func (sm *SagaManager) ListSagas(ctx context.Context, filter SagaFilter, limit, offset int) ([]*SagaInstance, int64, error)
- func (sm *SagaManager) RegisterDefinition(definition *SagaDefinition) error
- func (sm *SagaManager) StartSaga(ctx context.Context, sagaType string, data map[string]interface{}, ...) (*SagaInstance, error)
 
- type SagaRepository
- func (sr *SagaRepository) CleanupOld(ctx context.Context) (int64, error)
- func (sr *SagaRepository) Close() error
- func (sr *SagaRepository) Delete(ctx context.Context, sagaID string) error
- func (sr *SagaRepository) GetByCorrelationID(ctx context.Context, correlationID string) ([]*SagaInstance, error)
- func (sr *SagaRepository) GetByID(ctx context.Context, sagaID string) (*SagaInstance, error)
- func (sr *SagaRepository) GetRunning(ctx context.Context) ([]*SagaInstance, error)
- func (sr *SagaRepository) GetStatistics(ctx context.Context, from, to time.Time) (*SagaStatistics, error)
- func (sr *SagaRepository) GetStuck(ctx context.Context, timeout time.Duration) ([]*SagaInstance, error)
- func (sr *SagaRepository) HealthCheck(ctx context.Context) error
- func (sr *SagaRepository) List(ctx context.Context, filter SagaFilter, limit, offset int) ([]*SagaInstance, int64, error)
- func (sr *SagaRepository) Save(ctx context.Context, instance *SagaInstance) error
- func (sr *SagaRepository) Update(ctx context.Context, instance *SagaInstance) error
 
- type SagaRepositoryConfig
- type SagaStatistics
- type SagaStatus
- type SagaStep
- type SagaStepResult
- type SendWelcomeEmailStepHandler
- type StepHandler
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type CancelBookingCompensationHandler ¶
type CancelBookingCompensationHandler struct {
	// contains filtered or unexported fields
}
    CancelBookingCompensationHandler handles booking cancellation compensation
func NewCancelBookingCompensationHandler ¶
func NewCancelBookingCompensationHandler(logger *logrus.Logger) *CancelBookingCompensationHandler
NewCancelBookingCompensationHandler creates a new cancel booking compensation handler
func (*CancelBookingCompensationHandler) Compensate ¶
func (cbch *CancelBookingCompensationHandler) Compensate(ctx context.Context, data map[string]interface{}) error
Compensate executes booking cancellation compensation
func (*CancelBookingCompensationHandler) GetName ¶
func (cbch *CancelBookingCompensationHandler) GetName() string
GetName returns the compensation handler name
type CompensationHandler ¶
type CompensationHandler interface {
	Compensate(ctx context.Context, data map[string]interface{}) error
	GetName() string
}
    CompensationHandler defines the interface for compensation execution
type Compensator ¶
type Compensator interface {
	CompensateStep(ctx context.Context, instance *SagaInstance, step *SagaStep) (*SagaStepResult, error)
	GetCompensationHandler(compensation string) (CompensationHandler, error)
}
    Compensator defines the interface for saga compensation
type CreateBookingStepHandler ¶
type CreateBookingStepHandler struct {
	// contains filtered or unexported fields
}
    CreateBookingStepHandler handles booking creation
func NewCreateBookingStepHandler ¶
func NewCreateBookingStepHandler(logger *logrus.Logger) *CreateBookingStepHandler
NewCreateBookingStepHandler creates a new create booking step handler
func (*CreateBookingStepHandler) Execute ¶
func (cbsh *CreateBookingStepHandler) Execute(ctx context.Context, data map[string]interface{}) (map[string]interface{}, error)
Execute executes the create booking step
func (*CreateBookingStepHandler) GetName ¶
func (cbsh *CreateBookingStepHandler) GetName() string
GetName returns the handler name
type CreateUserStepHandler ¶
type CreateUserStepHandler struct {
	// contains filtered or unexported fields
}
    CreateUserStepHandler handles user creation
func NewCreateUserStepHandler ¶
func NewCreateUserStepHandler(logger *logrus.Logger) *CreateUserStepHandler
NewCreateUserStepHandler creates a new create user step handler
func (*CreateUserStepHandler) Execute ¶
func (cush *CreateUserStepHandler) Execute(ctx context.Context, data map[string]interface{}) (map[string]interface{}, error)
Execute executes the create user step
func (*CreateUserStepHandler) GetName ¶
func (cush *CreateUserStepHandler) GetName() string
GetName returns the handler name
type DefaultCompensator ¶
type DefaultCompensator struct {
	// contains filtered or unexported fields
}
    DefaultCompensator provides the default implementation of saga compensation
func NewDefaultCompensator ¶
func NewDefaultCompensator(logger *logrus.Logger) *DefaultCompensator
NewDefaultCompensator creates a new default compensator
func (*DefaultCompensator) CompensateStep ¶
func (dc *DefaultCompensator) CompensateStep(ctx context.Context, instance *SagaInstance, step *SagaStep) (*SagaStepResult, error)
CompensateStep executes compensation for a saga step
func (*DefaultCompensator) GetCompensationHandler ¶
func (dc *DefaultCompensator) GetCompensationHandler(compensation string) (CompensationHandler, error)
GetCompensationHandler retrieves a compensation handler by name
func (*DefaultCompensator) GetRegisteredHandlers ¶
func (dc *DefaultCompensator) GetRegisteredHandlers() []string
GetRegisteredHandlers returns all registered compensation handlers
func (*DefaultCompensator) RegisterCompensationHandler ¶
func (dc *DefaultCompensator) RegisterCompensationHandler(handler CompensationHandler) error
RegisterCompensationHandler registers a compensation handler
type DefaultOrchestrator ¶
type DefaultOrchestrator struct {
	// contains filtered or unexported fields
}
    DefaultOrchestrator provides the default implementation of saga orchestration
func NewDefaultOrchestrator ¶
func NewDefaultOrchestrator(logger *logrus.Logger) *DefaultOrchestrator
NewDefaultOrchestrator creates a new default orchestrator
func (*DefaultOrchestrator) ExecuteStep ¶
func (do *DefaultOrchestrator) ExecuteStep(ctx context.Context, instance *SagaInstance, step *SagaStep) (*SagaStepResult, error)
ExecuteStep executes a saga step
func (*DefaultOrchestrator) GetRegisteredHandlers ¶
func (do *DefaultOrchestrator) GetRegisteredHandlers() []string
GetRegisteredHandlers returns all registered step handlers
func (*DefaultOrchestrator) GetStepHandler ¶
func (do *DefaultOrchestrator) GetStepHandler(action string) (StepHandler, error)
GetStepHandler retrieves a step handler by name
func (*DefaultOrchestrator) RegisterStepHandler ¶
func (do *DefaultOrchestrator) RegisterStepHandler(handler StepHandler) error
RegisterStepHandler registers a step handler
type DeleteUserCompensationHandler ¶
type DeleteUserCompensationHandler struct {
	// contains filtered or unexported fields
}
    DeleteUserCompensationHandler handles user deletion compensation
func NewDeleteUserCompensationHandler ¶
func NewDeleteUserCompensationHandler(logger *logrus.Logger) *DeleteUserCompensationHandler
NewDeleteUserCompensationHandler creates a new delete user compensation handler
func (*DeleteUserCompensationHandler) Compensate ¶
func (duch *DeleteUserCompensationHandler) Compensate(ctx context.Context, data map[string]interface{}) error
Compensate executes user deletion compensation
func (*DeleteUserCompensationHandler) GetName ¶
func (duch *DeleteUserCompensationHandler) GetName() string
GetName returns the compensation handler name
type Orchestrator ¶
type Orchestrator interface {
	ExecuteStep(ctx context.Context, instance *SagaInstance, step *SagaStep) (*SagaStepResult, error)
	GetStepHandler(action string) (StepHandler, error)
}
    Orchestrator defines the interface for saga orchestration
type ProcessPaymentStepHandler ¶
type ProcessPaymentStepHandler struct {
	// contains filtered or unexported fields
}
    ProcessPaymentStepHandler handles payment processing
func NewProcessPaymentStepHandler ¶
func NewProcessPaymentStepHandler(logger *logrus.Logger) *ProcessPaymentStepHandler
NewProcessPaymentStepHandler creates a new process payment step handler
func (*ProcessPaymentStepHandler) Execute ¶
func (ppsh *ProcessPaymentStepHandler) Execute(ctx context.Context, data map[string]interface{}) (map[string]interface{}, error)
Execute executes the process payment step
func (*ProcessPaymentStepHandler) GetName ¶
func (ppsh *ProcessPaymentStepHandler) GetName() string
GetName returns the handler name
type RefundPaymentCompensationHandler ¶
type RefundPaymentCompensationHandler struct {
	// contains filtered or unexported fields
}
    RefundPaymentCompensationHandler handles payment refund compensation
func NewRefundPaymentCompensationHandler ¶
func NewRefundPaymentCompensationHandler(logger *logrus.Logger) *RefundPaymentCompensationHandler
NewRefundPaymentCompensationHandler creates a new refund payment compensation handler
func (*RefundPaymentCompensationHandler) Compensate ¶
func (rpch *RefundPaymentCompensationHandler) Compensate(ctx context.Context, data map[string]interface{}) error
Compensate executes payment refund compensation
func (*RefundPaymentCompensationHandler) GetName ¶
func (rpch *RefundPaymentCompensationHandler) GetName() string
GetName returns the compensation handler name
type RetryPolicy ¶
type RetryPolicy struct {
	MaxAttempts int           `json:"max_attempts"`
	Delay       time.Duration `json:"delay"`
	Backoff     string        `json:"backoff"` // linear, exponential
}
    RetryPolicy defines retry behavior for saga steps
type SagaConfig ¶
type SagaConfig struct {
	TimeoutDuration     time.Duration `json:"timeout_duration"`
	RetryAttempts       int           `json:"retry_attempts"`
	RetryDelay          time.Duration `json:"retry_delay"`
	CompensationTimeout time.Duration `json:"compensation_timeout"`
	EnableMetrics       bool          `json:"enable_metrics"`
	BatchSize           int           `json:"batch_size"`
}
    SagaConfig holds saga manager configuration
type SagaDefinition ¶
SagaDefinition defines the structure of a saga
type SagaDefinitionRegistry ¶
type SagaDefinitionRegistry struct {
	// contains filtered or unexported fields
}
    SagaDefinitionRegistry manages saga definitions
func NewSagaDefinitionRegistry ¶
func NewSagaDefinitionRegistry() *SagaDefinitionRegistry
NewSagaDefinitionRegistry creates a new saga definition registry
func (*SagaDefinitionRegistry) Get ¶
func (sdr *SagaDefinitionRegistry) Get(name string) (*SagaDefinition, error)
Get retrieves a saga definition by name
func (*SagaDefinitionRegistry) GetAll ¶
func (sdr *SagaDefinitionRegistry) GetAll() map[string]*SagaDefinition
GetAll returns all registered saga definitions
func (*SagaDefinitionRegistry) Register ¶
func (sdr *SagaDefinitionRegistry) Register(definition *SagaDefinition) error
Register registers a saga definition
type SagaFilter ¶
type SagaFilter struct {
	SagaType      string     `json:"saga_type,omitempty"`
	Status        SagaStatus `json:"status,omitempty"`
	TenantID      string     `json:"tenant_id,omitempty"`
	CorrelationID string     `json:"correlation_id,omitempty"`
	CreatedBy     string     `json:"created_by,omitempty"`
	StartedAfter  *time.Time `json:"started_after,omitempty"`
	StartedBefore *time.Time `json:"started_before,omitempty"`
}
    SagaFilter defines filtering options for saga queries
type SagaInstance ¶
type SagaInstance struct {
	ID               string     `gorm:"type:uuid;primary_key;default:gen_random_uuid()" json:"id"`
	SagaType         string     `gorm:"type:varchar(255);not null;index" json:"saga_type"`
	Status           SagaStatus `gorm:"type:varchar(50);not null" json:"status"`
	CurrentStep      int        `gorm:"not null;default:0" json:"current_step"`
	Data             string     `gorm:"type:jsonb" json:"data"`
	CompletedSteps   string     `gorm:"type:jsonb" json:"completed_steps"`
	FailedSteps      string     `gorm:"type:jsonb" json:"failed_steps"`
	CompensatedSteps string     `gorm:"type:jsonb" json:"compensated_steps"`
	StartedAt        time.Time  `gorm:"not null" json:"started_at"`
	CompletedAt      *time.Time `json:"completed_at,omitempty"`
	LastError        string     `json:"last_error,omitempty"`
	TenantID         string     `gorm:"type:uuid;index" json:"tenant_id"`
	CorrelationID    string     `gorm:"type:uuid;index" json:"correlation_id"`
	CreatedBy        string     `gorm:"type:uuid" json:"created_by"`
	UpdatedAt        time.Time  `gorm:"autoUpdateTime" json:"updated_at"`
}
    SagaInstance represents an instance of a running saga
type SagaManager ¶
type SagaManager struct {
	// contains filtered or unexported fields
}
    SagaManager manages distributed transactions using the Saga pattern
func NewSagaManager ¶
func NewSagaManager( repository *SagaRepository, orchestrator Orchestrator, compensator Compensator, config SagaConfig, logger *logrus.Logger, ) *SagaManager
NewSagaManager creates a new saga manager
func (*SagaManager) CancelSaga ¶
CancelSaga cancels a running saga and triggers compensation
func (*SagaManager) ContinueSaga ¶
func (sm *SagaManager) ContinueSaga(ctx context.Context, sagaID string) error
ContinueSaga continues an existing saga instance
func (*SagaManager) GetSagaStatus ¶
func (sm *SagaManager) GetSagaStatus(ctx context.Context, sagaID string) (*SagaInstance, error)
GetSagaStatus returns the status of a saga instance
func (*SagaManager) HealthCheck ¶
func (sm *SagaManager) HealthCheck(ctx context.Context) error
HealthCheck performs a health check on the saga manager
func (*SagaManager) ListSagas ¶
func (sm *SagaManager) ListSagas(ctx context.Context, filter SagaFilter, limit, offset int) ([]*SagaInstance, int64, error)
ListSagas lists saga instances with optional filtering
func (*SagaManager) RegisterDefinition ¶
func (sm *SagaManager) RegisterDefinition(definition *SagaDefinition) error
RegisterDefinition registers a saga definition
func (*SagaManager) StartSaga ¶
func (sm *SagaManager) StartSaga(ctx context.Context, sagaType string, data map[string]interface{}, tenantID, userID string) (*SagaInstance, error)
StartSaga starts a new saga instance
type SagaRepository ¶
type SagaRepository struct {
	// contains filtered or unexported fields
}
    SagaRepository handles persistence of saga instances
func NewSagaRepository ¶
func NewSagaRepository(db *gorm.DB, config SagaRepositoryConfig, logger *logrus.Logger) (*SagaRepository, error)
NewSagaRepository creates a new saga repository
func (*SagaRepository) CleanupOld ¶
func (sr *SagaRepository) CleanupOld(ctx context.Context) (int64, error)
CleanupOld removes old saga instances based on retention period
func (*SagaRepository) Delete ¶
func (sr *SagaRepository) Delete(ctx context.Context, sagaID string) error
Delete deletes a saga instance
func (*SagaRepository) GetByCorrelationID ¶
func (sr *SagaRepository) GetByCorrelationID(ctx context.Context, correlationID string) ([]*SagaInstance, error)
GetByCorrelationID retrieves saga instances by correlation ID
func (*SagaRepository) GetByID ¶
func (sr *SagaRepository) GetByID(ctx context.Context, sagaID string) (*SagaInstance, error)
GetByID retrieves a saga instance by ID
func (*SagaRepository) GetRunning ¶
func (sr *SagaRepository) GetRunning(ctx context.Context) ([]*SagaInstance, error)
GetRunning retrieves all running saga instances
func (*SagaRepository) GetStatistics ¶
func (sr *SagaRepository) GetStatistics(ctx context.Context, from, to time.Time) (*SagaStatistics, error)
GetStatistics returns saga statistics
func (*SagaRepository) GetStuck ¶
func (sr *SagaRepository) GetStuck(ctx context.Context, timeout time.Duration) ([]*SagaInstance, error)
GetStuck retrieves saga instances that appear to be stuck
func (*SagaRepository) HealthCheck ¶
func (sr *SagaRepository) HealthCheck(ctx context.Context) error
HealthCheck performs a health check on the repository
func (*SagaRepository) List ¶
func (sr *SagaRepository) List(ctx context.Context, filter SagaFilter, limit, offset int) ([]*SagaInstance, int64, error)
List retrieves saga instances with filtering and pagination
func (*SagaRepository) Save ¶
func (sr *SagaRepository) Save(ctx context.Context, instance *SagaInstance) error
Save saves a saga instance
func (*SagaRepository) Update ¶
func (sr *SagaRepository) Update(ctx context.Context, instance *SagaInstance) error
Update updates a saga instance
type SagaRepositoryConfig ¶
type SagaRepositoryConfig struct {
	TableName       string        `json:"table_name"`
	RetentionPeriod time.Duration `json:"retention_period"`
	EnableMetrics   bool          `json:"enable_metrics"`
}
    SagaRepositoryConfig holds repository configuration
type SagaStatistics ¶
type SagaStatistics struct {
	FromDate               time.Time            `json:"from_date"`
	ToDate                 time.Time            `json:"to_date"`
	TotalCount             int64                `json:"total_count"`
	StatusCounts           map[SagaStatus]int64 `json:"status_counts"`
	TypeCounts             map[string]int64     `json:"type_counts"`
	AverageDurationSeconds float64              `json:"average_duration_seconds"`
}
    SagaStatistics represents saga statistics
type SagaStatus ¶
type SagaStatus string
SagaStatus represents the status of a saga instance
const ( SagaStatusPending SagaStatus = "pending" SagaStatusRunning SagaStatus = "running" SagaStatusCompleted SagaStatus = "completed" SagaStatusFailed SagaStatus = "failed" SagaStatusCompensating SagaStatus = "compensating" SagaStatusCompensated SagaStatus = "compensated" SagaStatusTimeout SagaStatus = "timeout" SagaStatusCancelled SagaStatus = "cancelled" )
type SagaStep ¶
type SagaStep struct {
	Name         string                 `json:"name"`
	Action       string                 `json:"action"`
	Compensation string                 `json:"compensation"`
	Parameters   map[string]interface{} `json:"parameters"`
	Timeout      time.Duration          `json:"timeout"`
	RetryPolicy  RetryPolicy            `json:"retry_policy"`
	DependsOn    []string               `json:"depends_on"`
	Critical     bool                   `json:"critical"`
}
    SagaStep represents a single step in a saga
type SagaStepResult ¶
type SagaStepResult struct {
	StepName    string                 `json:"step_name"`
	Success     bool                   `json:"success"`
	Error       error                  `json:"error,omitempty"`
	Data        map[string]interface{} `json:"data,omitempty"`
	Compensated bool                   `json:"compensated"`
	Duration    time.Duration          `json:"duration"`
	Attempt     int                    `json:"attempt"`
}
    SagaStepResult represents the result of a saga step execution
type SendWelcomeEmailStepHandler ¶
type SendWelcomeEmailStepHandler struct {
	// contains filtered or unexported fields
}
    SendWelcomeEmailStepHandler handles sending welcome emails
func NewSendWelcomeEmailStepHandler ¶
func NewSendWelcomeEmailStepHandler(logger *logrus.Logger) *SendWelcomeEmailStepHandler
NewSendWelcomeEmailStepHandler creates a new send welcome email step handler
func (*SendWelcomeEmailStepHandler) Execute ¶
func (swesh *SendWelcomeEmailStepHandler) Execute(ctx context.Context, data map[string]interface{}) (map[string]interface{}, error)
Execute executes the send welcome email step
func (*SendWelcomeEmailStepHandler) GetName ¶
func (swesh *SendWelcomeEmailStepHandler) GetName() string
GetName returns the handler name