Versions in this module Expand all Collapse all v0 v0.12.0 Feb 23, 2026 Changes in this version + const JobTypeExecuteWorkflow + const JobTypeScheduleWorkflows + var ErrInvalidAssignee = errors.New("invalid assignee") + var ErrInvalidStepTransition = errors.New("invalid step transition") + var ErrReassignmentNotAllowed = errors.New("step status does not allow reassignment") + var ErrWorkflowExecutionAlreadyExists = errors.New("workflow execution already exists for instance and period") + func CalculateDueDate(scheduledTime time.Time, gracePeriodDays int) time.Time + func GeneratePeriodLabel(cadence string, t time.Time) string + func JobInsertOptionsForScheduler() *river.InsertOpts + func JobInsertOptionsForWorkflow() *river.InsertOpts + func ResolveGraceDays(instance *workflows.WorkflowInstance, defaultDays int) int + type Assignee struct + ID string + Type string + type AssignmentService struct + func NewAssignmentService(roleAssignmentService RoleAssignmentServiceInterface, ...) *AssignmentService + func (s *AssignmentService) BulkReassignByRole(ctx context.Context, workflowExecutionID uuid.UUID, roleName string, ...) (*BulkReassignResult, error) + func (s *AssignmentService) ReassignStep(ctx context.Context, stepExecutionID uuid.UUID, newAssignee Assignee, ...) error + func (s *AssignmentService) ResolveStepAssignees(ctx context.Context, instance *workflows.WorkflowInstance, ...) (map[uuid.UUID]Assignee, error) + type AssignmentServiceInterface interface + ResolveStepAssignees func(ctx context.Context, instance *workflows.WorkflowInstance, ...) (map[uuid.UUID]Assignee, error) + type BulkReassignResult struct + ExecutionID uuid.UUID + ReassignedCount int + ReassignedStepExecIDs []uuid.UUID + RoleName string + type DAGExecutor struct + func NewDAGExecutor(stepExecutionService StepExecutionServiceInterface, ...) *DAGExecutor + func (e *DAGExecutor) CheckAutomaticTriggers(ctx context.Context, stepExecutionID *uuid.UUID) error + func (e *DAGExecutor) InitializeWorkflow(ctx context.Context, workflowExecutionID *uuid.UUID) error + func (e *DAGExecutor) ProcessStepCompletion(ctx context.Context, stepExecutionID *uuid.UUID) error + func (e *DAGExecutor) ProcessStepFailure(ctx context.Context, stepExecutionID *uuid.UUID) error + func (e *DAGExecutor) SetEvidenceIntegration(evidenceIntegration *EvidenceIntegration) + type EvidenceIntegration struct + func NewEvidenceIntegration(db *gorm.DB, logger *zap.SugaredLogger) *EvidenceIntegration + func (e *EvidenceIntegration) AddExecutionCompletionEvidence(ctx context.Context, workflowExecutionID *uuid.UUID) error + func (e *EvidenceIntegration) AddExecutionFailureEvidence(ctx context.Context, workflowExecutionID *uuid.UUID) error + func (e *EvidenceIntegration) AddStepStartedEvidence(ctx context.Context, stepExecutionID *uuid.UUID) error + func (e *EvidenceIntegration) AddWorkflowExecutionEvidence(ctx context.Context, workflowExecutionID *uuid.UUID, status string) error + func (e *EvidenceIntegration) GetOrCreateExecutionStream(ctx context.Context, workflowExecutionID *uuid.UUID) (*relational.Evidence, error) + func (e *EvidenceIntegration) GetOrCreateInstanceStream(ctx context.Context, workflowInstanceID *uuid.UUID) (*relational.Evidence, error) + type EvidenceRequirement struct + Description string + Required bool + Type string + type EvidenceSubmission struct + Description string + EvidenceID *uuid.UUID + EvidenceType string + FileContent string + FileHash string + FilePath string + FileSize int64 + MediaType string + Metadata string + Name string + type ExecuteWorkflowArgs struct + TriggeredBy string + TriggeredByID string + WorkflowExecutionID uuid.UUID + func (ExecuteWorkflowArgs) Kind() string + func (ExecuteWorkflowArgs) Timeout() time.Duration + type ExecutionMetrics struct + AverageStepDuration time.Duration + Duration time.Duration + ExecutionID uuid.UUID + LongestStepDuration time.Duration + TotalSteps int + type ExecutionStatus struct + BlockedSteps int + CancelledSteps int + CompletedAt *time.Time + CompletedSteps int + ExecutionID uuid.UUID + FailedAt *time.Time + FailedSteps int + FailureReason string + InProgressSteps int + OverdueSteps int + PendingSteps int + StartedAt *time.Time + Status string + TotalSteps int + type Manager struct + func NewManager(riverClient RiverClient, ...) *Manager + func (m *Manager) CancelExecution(ctx context.Context, executionID *uuid.UUID, reason string) (*workflows.WorkflowExecution, error) + func (m *Manager) GetExecutionMetrics(ctx context.Context, executionID *uuid.UUID) (*ExecutionMetrics, error) + func (m *Manager) GetExecutionStatus(ctx context.Context, executionID *uuid.UUID) (*ExecutionStatus, error) + func (m *Manager) ListExecutions(ctx context.Context, workflowInstanceID *uuid.UUID, limit, offset int) ([]*workflows.WorkflowExecution, error) + func (m *Manager) RetryExecution(ctx context.Context, executionID *uuid.UUID) (*workflows.WorkflowExecution, error) + func (m *Manager) StartWorkflowExecution(ctx context.Context, workflowInstanceID *uuid.UUID, opts StartWorkflowOptions) (*workflows.WorkflowExecution, error) + type NotificationEnqueuer interface + EnqueueWorkflowExecutionFailed func(ctx context.Context, execution *workflows.WorkflowExecution) error + EnqueueWorkflowTaskAssigned func(ctx context.Context, stepExecution *workflows.StepExecution) error + type OverdueService struct + func NewOverdueService(db *gorm.DB, workflowExecutionService *workflows.WorkflowExecutionService, ...) *OverdueService + func (s *OverdueService) CheckFailedExecutions(ctx context.Context) (int, error) + func (s *OverdueService) CheckOverdueExecutions(ctx context.Context) (int, error) + func (s *OverdueService) CheckOverdueSteps(ctx context.Context) (int, error) + type RiverClient interface + InsertMany func(ctx context.Context, params []river.InsertManyParams) ([]*rivertype.JobInsertResult, error) + type RoleAssignmentServiceInterface interface + FindAssigneeForRole func(instanceID *uuid.UUID, roleName string) (*workflows.RoleAssignment, error) + GetByWorkflowInstanceID func(instanceID *uuid.UUID) ([]workflows.RoleAssignment, error) + type ScheduleWorkflowsArgs struct + func (ScheduleWorkflowsArgs) Kind() string + type StartWorkflowOptions struct + DueDate *time.Time + PeriodLabel string + TriggeredBy string + TriggeredByID string + type StepExecutionAssignmentService interface + ReassignWithTx func(tx *gorm.DB, id *uuid.UUID, assignedToType, assignedToID string, ...) error + type StepExecutionServiceInterface interface + CanUnblock func(id *uuid.UUID) (bool, error) + Create func(stepExecution *workflows.StepExecution) error + Fail func(id *uuid.UUID, reason string) error + GetByID func(id *uuid.UUID) (*workflows.StepExecution, error) + GetByWorkflowExecutionID func(executionID *uuid.UUID) ([]workflows.StepExecution, error) + Unblock func(id *uuid.UUID) error + UpdateStatus func(ctx context.Context, id *uuid.UUID, status string) error + type StepStatus string + const StatusBlocked + const StatusCancelled + const StatusCompleted + const StatusFailed + const StatusInProgress + const StatusOverdue + const StatusPending + const StatusSkipped + func (s StepStatus) String() string + type StepStatusCounts struct + Blocked int + Cancelled int + Completed int + Failed int + InProgress int + Overdue int + Pending int + Skipped int + func CountStepStatuses(steps []workflows.StepExecution) StepStatusCounts + func (c StepStatusCounts) AllTerminal() bool + type StepTransitionRequest struct + Evidence []EvidenceSubmission + Notes string + Status string + UserID string + UserType string + type StepTransitionService struct + func NewStepTransitionService(stepExecutionService StepExecutionServiceInterface, ...) *StepTransitionService + func (s *StepTransitionService) CanUserTransitionStep(stepExecutionID *uuid.UUID, userID, userType string) (bool, error) + func (s *StepTransitionService) FailStep(ctx context.Context, stepExecutionID *uuid.UUID, reason string) error + func (s *StepTransitionService) GetEvidenceRequirements(stepExecutionID *uuid.UUID) ([]workflows.EvidenceRequirement, error) + func (s *StepTransitionService) GetStepExecutionService() *workflows.StepExecutionService + func (s *StepTransitionService) TransitionStepStatus(ctx context.Context, stepExecutionID *uuid.UUID, ...) error + type WorkflowDefinitionServiceInterface interface + GetByID func(id *uuid.UUID) (*workflows.WorkflowDefinition, error) + type WorkflowExecutionServiceInterface interface + Cancel func(id *uuid.UUID) error + Create func(execution *workflows.WorkflowExecution) error + Fail func(id *uuid.UUID, reason string) error + FailIfNotTerminal func(ctx context.Context, id *uuid.UUID, reason string) (bool, error) + GetByID func(id *uuid.UUID) (*workflows.WorkflowExecution, error) + GetByWorkflowInstanceID func(instanceID *uuid.UUID) ([]workflows.WorkflowExecution, error) + UpdateStatus func(ctx context.Context, id *uuid.UUID, status string) error + type WorkflowExecutionWorker struct + func NewWorkflowExecutionWorker(executor *DAGExecutor, evidenceIntegration *EvidenceIntegration, ...) *WorkflowExecutionWorker + func (w *WorkflowExecutionWorker) Work(ctx context.Context, job *river.Job[ExecuteWorkflowArgs]) error + type WorkflowInstanceServiceInterface interface + AdvanceSchedule func(ctx context.Context, id *uuid.UUID) error + GetByID func(id *uuid.UUID) (*workflows.WorkflowInstance, error) + GetDueInstances func(ctx context.Context) ([]workflows.WorkflowInstance, error) + UpdateSchedule func(ctx context.Context, id *uuid.UUID, nextSchedule time.Time) error + type WorkflowSchedulerWorker struct + func NewWorkflowSchedulerWorker(manager *Manager, workflowInstanceService WorkflowInstanceServiceInterface, ...) *WorkflowSchedulerWorker + func (w *WorkflowSchedulerWorker) Work(ctx context.Context, job *river.Job[ScheduleWorkflowsArgs]) error + type WorkflowStepDefinitionServiceInterface interface + GetByID func(id *uuid.UUID) (*workflows.WorkflowStepDefinition, error) + GetByWorkflowDefinitionID func(workflowDefID *uuid.UUID) ([]workflows.WorkflowStepDefinition, error) + GetDependencies func(stepID *uuid.UUID) ([]workflows.WorkflowStepDefinition, error) + GetDependentSteps func(stepID *uuid.UUID) ([]workflows.WorkflowStepDefinition, error)