Documentation
¶
Overview ¶
Package repository provides data persistence for workflow executions.
Index ¶
- Variables
- type CompensationRecord
- type Filter
- type SQLWorkflowRepository
- func (r *SQLWorkflowRepository) CountByStatus(ctx context.Context, status Status) (int64, error)
- func (r *SQLWorkflowRepository) CreateTable(ctx context.Context) error
- func (r *SQLWorkflowRepository) DeleteExecution(ctx context.Context, id string) error
- func (r *SQLWorkflowRepository) GetExecution(ctx context.Context, id string) (*WorkflowExecution, error)
- func (r *SQLWorkflowRepository) GetExecutionByWorkflowID(ctx context.Context, workflowID string) (*WorkflowExecution, error)
- func (r *SQLWorkflowRepository) ListExecutions(ctx context.Context, filter Filter) ([]WorkflowExecution, error)
- func (r *SQLWorkflowRepository) SaveExecution(ctx context.Context, exec *WorkflowExecution) error
- func (r *SQLWorkflowRepository) UpdateCompensations(ctx context.Context, id string, compensations []CompensationRecord) error
- func (r *SQLWorkflowRepository) UpdateOutput(ctx context.Context, id string, output json.RawMessage) error
- func (r *SQLWorkflowRepository) UpdateStatus(ctx context.Context, id string, status Status, errorMsg string) error
- type Status
- type WorkflowExecution
- type WorkflowRepository
Constants ¶
This section is empty.
Variables ¶
var ErrNotFound = errors.New("workflow execution not found")
ErrNotFound is returned when a workflow execution is not found.
Functions ¶
This section is empty.
Types ¶
type CompensationRecord ¶
type CompensationRecord struct {
Name string `json:"name"`
Status string `json:"status"`
Error string `json:"error,omitempty"`
ExecutedAt time.Time `json:"executedAt,omitempty"`
}
CompensationRecord tracks a compensation action.
type Filter ¶
type Filter struct {
WorkflowType string
Status Status
StartedAfter *time.Time
StartedBefore *time.Time
Limit int
Offset int
}
Filter defines filtering options for listing workflow executions.
type SQLWorkflowRepository ¶
type SQLWorkflowRepository struct {
// contains filtered or unexported fields
}
SQLWorkflowRepository implements WorkflowRepository using SQL.
func NewSQLWorkflowRepository ¶
func NewSQLWorkflowRepository(db *sql.DB) *SQLWorkflowRepository
NewSQLWorkflowRepository creates a new SQL-based workflow repository.
func (*SQLWorkflowRepository) CountByStatus ¶
CountByStatus counts workflow executions by status.
func (*SQLWorkflowRepository) CreateTable ¶
func (r *SQLWorkflowRepository) CreateTable(ctx context.Context) error
CreateTable creates the workflow_executions table if it doesn't exist.
func (*SQLWorkflowRepository) DeleteExecution ¶
func (r *SQLWorkflowRepository) DeleteExecution(ctx context.Context, id string) error
DeleteExecution deletes a workflow execution by its ID.
func (*SQLWorkflowRepository) GetExecution ¶
func (r *SQLWorkflowRepository) GetExecution(ctx context.Context, id string) (*WorkflowExecution, error)
GetExecution retrieves a workflow execution by its ID.
func (*SQLWorkflowRepository) GetExecutionByWorkflowID ¶
func (r *SQLWorkflowRepository) GetExecutionByWorkflowID(ctx context.Context, workflowID string) (*WorkflowExecution, error)
GetExecutionByWorkflowID retrieves a workflow execution by workflow ID.
func (*SQLWorkflowRepository) ListExecutions ¶
func (r *SQLWorkflowRepository) ListExecutions(ctx context.Context, filter Filter) ([]WorkflowExecution, error)
ListExecutions lists workflow executions with optional filtering.
func (*SQLWorkflowRepository) SaveExecution ¶
func (r *SQLWorkflowRepository) SaveExecution(ctx context.Context, exec *WorkflowExecution) error
SaveExecution saves a new workflow execution record.
func (*SQLWorkflowRepository) UpdateCompensations ¶
func (r *SQLWorkflowRepository) UpdateCompensations(ctx context.Context, id string, compensations []CompensationRecord) error
UpdateCompensations updates the compensation records of a workflow execution.
func (*SQLWorkflowRepository) UpdateOutput ¶
func (r *SQLWorkflowRepository) UpdateOutput(ctx context.Context, id string, output json.RawMessage) error
UpdateOutput updates the output of a workflow execution.
func (*SQLWorkflowRepository) UpdateStatus ¶
func (r *SQLWorkflowRepository) UpdateStatus(ctx context.Context, id string, status Status, errorMsg string) error
UpdateStatus updates the status of a workflow execution.
type WorkflowExecution ¶
type WorkflowExecution struct {
ID string `json:"id"`
WorkflowID string `json:"workflowId"`
WorkflowType string `json:"workflowType"`
RunID string `json:"runId,omitempty"`
Status Status `json:"status"`
Input json.RawMessage `json:"input,omitempty"`
Output json.RawMessage `json:"output,omitempty"`
Error string `json:"error,omitempty"`
StartedAt time.Time `json:"startedAt"`
CompletedAt *time.Time `json:"completedAt,omitempty"`
Compensations []CompensationRecord `json:"compensations,omitempty"`
Metadata map[string]string `json:"metadata,omitempty"`
CreatedAt time.Time `json:"createdAt"`
UpdatedAt time.Time `json:"updatedAt"`
}
WorkflowExecution represents a workflow execution record.
type WorkflowRepository ¶
type WorkflowRepository interface {
// SaveExecution saves a new workflow execution record.
SaveExecution(ctx context.Context, exec *WorkflowExecution) error
// GetExecution retrieves a workflow execution by its ID.
GetExecution(ctx context.Context, id string) (*WorkflowExecution, error)
// GetExecutionByWorkflowID retrieves a workflow execution by workflow ID.
GetExecutionByWorkflowID(ctx context.Context, workflowID string) (*WorkflowExecution, error)
// ListExecutions lists workflow executions with optional filtering.
ListExecutions(ctx context.Context, filter Filter) ([]WorkflowExecution, error)
// UpdateStatus updates the status of a workflow execution.
UpdateStatus(ctx context.Context, id string, status Status, errorMsg string) error
// UpdateOutput updates the output of a workflow execution.
UpdateOutput(ctx context.Context, id string, output json.RawMessage) error
// UpdateCompensations updates the compensation records of a workflow execution.
UpdateCompensations(ctx context.Context, id string, compensations []CompensationRecord) error
// DeleteExecution deletes a workflow execution by its ID.
DeleteExecution(ctx context.Context, id string) error
// CountByStatus counts workflow executions by status.
CountByStatus(ctx context.Context, status Status) (int64, error)
}
WorkflowRepository defines the interface for workflow execution persistence.