orchestration

package
v0.0.0-...-dac86b4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 16, 2026 License: MIT Imports: 5 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type CompensatedStep

type CompensatedStep struct {
	Name          string    `json:"name"`
	CompensatedAt time.Time `json:"compensated_at"`
	Error         string    `json:"error,omitempty"`
}

CompensatedStep records a compensation that was executed.

type CompensationAction

type CompensationAction struct {
	StepName     string
	Compensation CompensationStep
	StepOutput   map[string]any // original step's output, available to compensation
}

CompensationAction pairs a compensation step definition with the original step's output so the compensation logic can reference what was produced.

type CompensationPlan

type CompensationPlan struct {
	SagaID string
	Steps  []CompensationAction
}

CompensationPlan contains the ordered list of compensations to execute.

type CompensationStep

type CompensationStep struct {
	StepName string         `json:"step_name"`
	Type     string         `json:"type"` // step type to run for compensation
	Config   map[string]any `json:"config"`
}

CompensationStep defines how to undo a step's effects.

type CompletedStep

type CompletedStep struct {
	Name        string         `json:"name"`
	Output      map[string]any `json:"output"`
	CompletedAt time.Time      `json:"completed_at"`
}

CompletedStep records a successfully executed step.

type Coordinator

type Coordinator struct {
	// contains filtered or unexported fields
}

Coordinator manages saga execution and compensation.

func NewCoordinator

func NewCoordinator(logger *slog.Logger) *Coordinator

NewCoordinator creates a new Coordinator with the given logger.

func (*Coordinator) CompleteSaga

func (c *Coordinator) CompleteSaga(sagaID string) error

CompleteSaga marks a saga as successfully completed.

func (*Coordinator) FinishCompensation

func (c *Coordinator) FinishCompensation(sagaID string) error

FinishCompensation marks the saga as fully compensated or failed depending on whether any compensation step errors occurred.

func (*Coordinator) GetState

func (c *Coordinator) GetState(sagaID string) (*SagaState, error)

GetState returns the current state of a saga.

func (*Coordinator) IsTimedOut

func (c *Coordinator) IsTimedOut(sagaID string) (bool, error)

IsTimedOut checks whether a saga has exceeded its configured timeout.

func (*Coordinator) ListSagas

func (c *Coordinator) ListSagas() []*SagaState

ListSagas returns all active/recent sagas.

func (*Coordinator) RecordCompensation

func (c *Coordinator) RecordCompensation(sagaID string, stepName string, compErr error) error

RecordCompensation records the result of executing a single compensation step.

func (*Coordinator) RecordStepCompleted

func (c *Coordinator) RecordStepCompleted(sagaID string, step CompletedStep, compensation *CompensationStep) error

RecordStepCompleted records a step completion with its output and optional compensation config.

func (*Coordinator) StartSaga

func (c *Coordinator) StartSaga(id, pipelineName string, config SagaConfig) *SagaState

StartSaga begins tracking a new saga execution.

func (*Coordinator) TimeoutSaga

func (c *Coordinator) TimeoutSaga(ctx context.Context, sagaID string) (*CompensationPlan, error)

TimeoutSaga marks a running saga as failed due to timeout and returns a CompensationPlan so the caller can execute compensations.

func (*Coordinator) TriggerCompensation

func (c *Coordinator) TriggerCompensation(ctx context.Context, sagaID string, failedStep string, err error) (*CompensationPlan, error)

TriggerCompensation initiates compensation for a failed saga. It returns a CompensationPlan containing the ordered list of compensation steps to execute. The order is determined by the saga's CompensationOrder config ("reverse" by default, or "forward").

type SagaConfig

type SagaConfig struct {
	Enabled           bool          `yaml:"enabled" json:"enabled"`
	Timeout           time.Duration `yaml:"timeout" json:"timeout"`
	CompensationOrder string        `yaml:"compensation_order" json:"compensation_order"` // "reverse" or "forward"
	TrackCompensation bool          `yaml:"track_compensation" json:"track_compensation"`
}

SagaConfig defines saga behavior for a pipeline.

type SagaState

type SagaState struct {
	ID               string            `json:"id"`
	PipelineName     string            `json:"pipeline_name"`
	Status           SagaStatus        `json:"status"`
	Config           SagaConfig        `json:"config"`
	CompletedSteps   []CompletedStep   `json:"completed_steps"`
	CompensatedSteps []CompensatedStep `json:"compensated_steps,omitempty"`
	FailedStep       string            `json:"failed_step,omitempty"`
	FailureError     string            `json:"failure_error,omitempty"`
	StartedAt        time.Time         `json:"started_at"`
	CompletedAt      *time.Time        `json:"completed_at,omitempty"`
	// contains filtered or unexported fields
}

SagaState tracks the state of a saga execution.

type SagaStatus

type SagaStatus string

SagaStatus represents the current status of a saga.

const (
	SagaRunning      SagaStatus = "running"
	SagaCompensating SagaStatus = "compensating"
	SagaCompensated  SagaStatus = "compensated"
	SagaCompleted    SagaStatus = "completed"
	SagaFailed       SagaStatus = "failed"
)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL