Documentation
¶
Index ¶
- type CompensatedStep
- type CompensationAction
- type CompensationPlan
- type CompensationStep
- type CompletedStep
- type Coordinator
- func (c *Coordinator) CompleteSaga(sagaID string) error
- func (c *Coordinator) FinishCompensation(sagaID string) error
- func (c *Coordinator) GetState(sagaID string) (*SagaState, error)
- func (c *Coordinator) IsTimedOut(sagaID string) (bool, error)
- func (c *Coordinator) ListSagas() []*SagaState
- func (c *Coordinator) RecordCompensation(sagaID string, stepName string, compErr error) error
- func (c *Coordinator) RecordStepCompleted(sagaID string, step CompletedStep, compensation *CompensationStep) error
- func (c *Coordinator) StartSaga(id, pipelineName string, config SagaConfig) *SagaState
- func (c *Coordinator) TimeoutSaga(ctx context.Context, sagaID string) (*CompensationPlan, error)
- func (c *Coordinator) TriggerCompensation(ctx context.Context, sagaID string, failedStep string, err error) (*CompensationPlan, error)
- type SagaConfig
- type SagaState
- type SagaStatus
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type CompensatedStep ¶
type CompensatedStep struct {
Name string `json:"name"`
CompensatedAt time.Time `json:"compensated_at"`
Error string `json:"error,omitempty"`
}
CompensatedStep records a compensation that was executed.
type CompensationAction ¶
type CompensationAction struct {
StepName string
Compensation CompensationStep
StepOutput map[string]any // original step's output, available to compensation
}
CompensationAction pairs a compensation step definition with the original step's output so the compensation logic can reference what was produced.
type CompensationPlan ¶
type CompensationPlan struct {
SagaID string
Steps []CompensationAction
}
CompensationPlan contains the ordered list of compensations to execute.
type CompensationStep ¶
type CompensationStep struct {
StepName string `json:"step_name"`
Type string `json:"type"` // step type to run for compensation
Config map[string]any `json:"config"`
}
CompensationStep defines how to undo a step's effects.
type CompletedStep ¶
type CompletedStep struct {
Name string `json:"name"`
Output map[string]any `json:"output"`
CompletedAt time.Time `json:"completed_at"`
}
CompletedStep records a successfully executed step.
type Coordinator ¶
type Coordinator struct {
// contains filtered or unexported fields
}
Coordinator manages saga execution and compensation.
func NewCoordinator ¶
func NewCoordinator(logger *slog.Logger) *Coordinator
NewCoordinator creates a new Coordinator with the given logger.
func (*Coordinator) CompleteSaga ¶
func (c *Coordinator) CompleteSaga(sagaID string) error
CompleteSaga marks a saga as successfully completed.
func (*Coordinator) FinishCompensation ¶
func (c *Coordinator) FinishCompensation(sagaID string) error
FinishCompensation marks the saga as fully compensated or failed depending on whether any compensation step errors occurred.
func (*Coordinator) GetState ¶
func (c *Coordinator) GetState(sagaID string) (*SagaState, error)
GetState returns the current state of a saga.
func (*Coordinator) IsTimedOut ¶
func (c *Coordinator) IsTimedOut(sagaID string) (bool, error)
IsTimedOut checks whether a saga has exceeded its configured timeout.
func (*Coordinator) ListSagas ¶
func (c *Coordinator) ListSagas() []*SagaState
ListSagas returns all active/recent sagas.
func (*Coordinator) RecordCompensation ¶
func (c *Coordinator) RecordCompensation(sagaID string, stepName string, compErr error) error
RecordCompensation records the result of executing a single compensation step.
func (*Coordinator) RecordStepCompleted ¶
func (c *Coordinator) RecordStepCompleted(sagaID string, step CompletedStep, compensation *CompensationStep) error
RecordStepCompleted records a step completion with its output and optional compensation config.
func (*Coordinator) StartSaga ¶
func (c *Coordinator) StartSaga(id, pipelineName string, config SagaConfig) *SagaState
StartSaga begins tracking a new saga execution.
func (*Coordinator) TimeoutSaga ¶
func (c *Coordinator) TimeoutSaga(ctx context.Context, sagaID string) (*CompensationPlan, error)
TimeoutSaga marks a running saga as failed due to timeout and returns a CompensationPlan so the caller can execute compensations.
func (*Coordinator) TriggerCompensation ¶
func (c *Coordinator) TriggerCompensation(ctx context.Context, sagaID string, failedStep string, err error) (*CompensationPlan, error)
TriggerCompensation initiates compensation for a failed saga. It returns a CompensationPlan containing the ordered list of compensation steps to execute. The order is determined by the saga's CompensationOrder config ("reverse" by default, or "forward").
type SagaConfig ¶
type SagaConfig struct {
Enabled bool `yaml:"enabled" json:"enabled"`
Timeout time.Duration `yaml:"timeout" json:"timeout"`
CompensationOrder string `yaml:"compensation_order" json:"compensation_order"` // "reverse" or "forward"
TrackCompensation bool `yaml:"track_compensation" json:"track_compensation"`
}
SagaConfig defines saga behavior for a pipeline.
type SagaState ¶
type SagaState struct {
ID string `json:"id"`
PipelineName string `json:"pipeline_name"`
Status SagaStatus `json:"status"`
Config SagaConfig `json:"config"`
CompletedSteps []CompletedStep `json:"completed_steps"`
CompensatedSteps []CompensatedStep `json:"compensated_steps,omitempty"`
FailedStep string `json:"failed_step,omitempty"`
FailureError string `json:"failure_error,omitempty"`
StartedAt time.Time `json:"started_at"`
CompletedAt *time.Time `json:"completed_at,omitempty"`
// contains filtered or unexported fields
}
SagaState tracks the state of a saga execution.
type SagaStatus ¶
type SagaStatus string
SagaStatus represents the current status of a saga.
const ( SagaRunning SagaStatus = "running" SagaCompensating SagaStatus = "compensating" SagaCompensated SagaStatus = "compensated" SagaCompleted SagaStatus = "completed" SagaFailed SagaStatus = "failed" )