workflow

package
v0.12.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 23, 2026 License: AGPL-3.0 Imports: 21 Imported by: 0

Documentation

Index

Constants

View Source
const (
	JobTypeExecuteWorkflow   = "execute_workflow"
	JobTypeScheduleWorkflows = "schedule_workflows"
)

Job types for workflow processing

Variables

View Source
var (
	ErrReassignmentNotAllowed = errors.New("step status does not allow reassignment")
	ErrInvalidAssignee        = errors.New("invalid assignee")
)
View Source
var ErrInvalidStepTransition = errors.New("invalid step transition")
View Source
var ErrWorkflowExecutionAlreadyExists = errors.New("workflow execution already exists for instance and period")

Functions

func CalculateDueDate

func CalculateDueDate(scheduledTime time.Time, gracePeriodDays int) time.Time

CalculateDueDate calculates the due date based on schedule time and grace period

func GeneratePeriodLabel

func GeneratePeriodLabel(cadence string, t time.Time) string

GeneratePeriodLabel generates a label for the period based on cadence and time

func JobInsertOptionsForScheduler

func JobInsertOptionsForScheduler() *river.InsertOpts

JobInsertOptionsForScheduler returns insert options for the scheduler job

func JobInsertOptionsForWorkflow

func JobInsertOptionsForWorkflow() *river.InsertOpts

JobInsertOptionsForWorkflow returns insert options for workflow execution jobs

func ResolveGraceDays

func ResolveGraceDays(instance *workflows.WorkflowInstance, defaultDays int) int

ResolveGraceDays returns the effective grace period days for a workflow instance, falling back through: instance → definition → provided default.

Types

type Assignee

type Assignee struct {
	Type string // user, group, email
	ID   string // UUID or email address
}

Assignee represents a user or group assigned to a step

type AssignmentService

type AssignmentService struct {
	// contains filtered or unexported fields
}

AssignmentService handles logic for resolving step assignments

func NewAssignmentService

func NewAssignmentService(
	roleAssignmentService RoleAssignmentServiceInterface,
	stepExecutionService StepExecutionAssignmentService,
	db *gorm.DB,
	logger *zap.SugaredLogger,
	notificationEnqueuer NotificationEnqueuer,
) *AssignmentService

NewAssignmentService creates a new assignment service

func (*AssignmentService) BulkReassignByRole

func (s *AssignmentService) BulkReassignByRole(
	ctx context.Context,
	workflowExecutionID uuid.UUID,
	roleName string,
	newAssignee Assignee,
	reason string,
	reassignedByUserID *uuid.UUID,
	reassignedByEmail string,
) (*BulkReassignResult, error)

func (*AssignmentService) ReassignStep

func (s *AssignmentService) ReassignStep(
	ctx context.Context,
	stepExecutionID uuid.UUID,
	newAssignee Assignee,
	reason string,
	reassignedByUserID *uuid.UUID,
	reassignedByEmail string,
) error

func (*AssignmentService) ResolveStepAssignees

func (s *AssignmentService) ResolveStepAssignees(ctx context.Context, instance *workflows.WorkflowInstance, stepDefinitions []workflows.WorkflowStepDefinition) (map[uuid.UUID]Assignee, error)

ResolveStepAssignees resolves assignees for a list of step definitions based on role assignments

type AssignmentServiceInterface

type AssignmentServiceInterface interface {
	ResolveStepAssignees(ctx context.Context, instance *workflows.WorkflowInstance, stepDefinitions []workflows.WorkflowStepDefinition) (map[uuid.UUID]Assignee, error)
}

type BulkReassignResult

type BulkReassignResult struct {
	ExecutionID           uuid.UUID
	RoleName              string
	ReassignedCount       int
	ReassignedStepExecIDs []uuid.UUID
}

type DAGExecutor

type DAGExecutor struct {
	// contains filtered or unexported fields
}

DAGExecutor handles the execution of workflow DAGs with dependency resolution and parallel step execution capabilities

func NewDAGExecutor

func NewDAGExecutor(
	stepExecutionService StepExecutionServiceInterface,
	workflowExecutionService WorkflowExecutionServiceInterface,
	stepDefinitionService WorkflowStepDefinitionServiceInterface,
	assignmentService AssignmentServiceInterface,
	logger *log.Logger,
	notificationEnqueuer NotificationEnqueuer,
) *DAGExecutor

NewDAGExecutor creates a new DAG executor instance

func (*DAGExecutor) CheckAutomaticTriggers

func (e *DAGExecutor) CheckAutomaticTriggers(ctx context.Context, stepExecutionID *uuid.UUID) error

CheckAutomaticTriggers checks if a step has automatic triggers configured and evaluates them (for future Phase 5 implementation)

func (*DAGExecutor) InitializeWorkflow

func (e *DAGExecutor) InitializeWorkflow(ctx context.Context, workflowExecutionID *uuid.UUID) error

InitializeWorkflow initializes a workflow execution by creating step execution records and setting up the initial DAG state (blocked/pending based on dependencies)

func (*DAGExecutor) ProcessStepCompletion

func (e *DAGExecutor) ProcessStepCompletion(ctx context.Context, stepExecutionID *uuid.UUID) error

ProcessStepCompletion processes a step completion and unblocks dependent steps This is called after a user manually completes a step

func (*DAGExecutor) ProcessStepFailure

func (e *DAGExecutor) ProcessStepFailure(ctx context.Context, stepExecutionID *uuid.UUID) error

ProcessStepFailure propagates a failed step through dependent steps and reevaluates workflow completion.

func (*DAGExecutor) SetEvidenceIntegration

func (e *DAGExecutor) SetEvidenceIntegration(evidenceIntegration *EvidenceIntegration)

SetEvidenceIntegration sets the evidence integration service (optional)

type EvidenceIntegration

type EvidenceIntegration struct {
	// contains filtered or unexported fields
}

EvidenceIntegration handles evidence stream creation and management for workflow executions Design: - 1 evidence stream per workflow execution (accumulates step completion evidence) - 1 evidence stream per workflow instance (accumulates execution completion evidence) - Streams use label-seeded UUIDs for deterministic identification

func NewEvidenceIntegration

func NewEvidenceIntegration(
	db *gorm.DB,
	logger *zap.SugaredLogger,
) *EvidenceIntegration

NewEvidenceIntegration creates a new evidence integration service

func (*EvidenceIntegration) AddExecutionCompletionEvidence

func (e *EvidenceIntegration) AddExecutionCompletionEvidence(ctx context.Context, workflowExecutionID *uuid.UUID) error

AddExecutionCompletionEvidence adds an execution completion evidence record to the instance stream

func (*EvidenceIntegration) AddExecutionFailureEvidence

func (e *EvidenceIntegration) AddExecutionFailureEvidence(ctx context.Context, workflowExecutionID *uuid.UUID) error

AddExecutionFailureEvidence adds workflow execution failure evidence to both execution and instance streams.

func (*EvidenceIntegration) AddStepStartedEvidence

func (e *EvidenceIntegration) AddStepStartedEvidence(ctx context.Context, stepExecutionID *uuid.UUID) error

AddStepStartedEvidence adds a step started evidence record to the execution stream

func (*EvidenceIntegration) AddWorkflowExecutionEvidence

func (e *EvidenceIntegration) AddWorkflowExecutionEvidence(ctx context.Context, workflowExecutionID *uuid.UUID, status string) error

AddWorkflowExecutionEvidence adds a workflow execution evidence record to the instance stream

func (*EvidenceIntegration) GetOrCreateExecutionStream

func (e *EvidenceIntegration) GetOrCreateExecutionStream(ctx context.Context, workflowExecutionID *uuid.UUID) (*relational.Evidence, error)

GetOrCreateExecutionStream gets or creates the evidence stream for a workflow execution This stream accumulates all step completion evidence for this execution

func (*EvidenceIntegration) GetOrCreateInstanceStream

func (e *EvidenceIntegration) GetOrCreateInstanceStream(ctx context.Context, workflowInstanceID *uuid.UUID) (*relational.Evidence, error)

GetOrCreateInstanceStream gets or creates the evidence stream for a workflow instance This stream accumulates all execution completion evidence for this instance

type EvidenceRequirement

type EvidenceRequirement struct {
	Type        string `json:"type"`
	Description string `json:"description"`
	Required    bool   `json:"required"`
}

EvidenceRequirement represents a required evidence type for a step

type EvidenceSubmission

type EvidenceSubmission struct {
	EvidenceID   *uuid.UUID `json:"evidence-id"`
	EvidenceType string     `json:"evidence-type"`
	Name         string     `json:"name"`
	Description  string     `json:"description"`
	FilePath     string     `json:"file-path,omitempty"`
	FileSize     int64      `json:"file-size,omitempty"`
	FileHash     string     `json:"file-hash,omitempty"`
	FileContent  string     `json:"file-content,omitempty"` // Base64 encoded file content
	MediaType    string     `json:"media-type,omitempty"`   // MIME type (e.g., "application/pdf", "image/png")
	Metadata     string     `json:"metadata,omitempty"`
}

EvidenceSubmission represents evidence being submitted with a step transition

type ExecuteWorkflowArgs

type ExecuteWorkflowArgs struct {
	WorkflowExecutionID uuid.UUID `json:"workflow_execution_id"`
	TriggeredBy         string    `json:"triggered_by"`
	TriggeredByID       string    `json:"triggered_by_id"`
}

ExecuteWorkflowArgs represents the arguments for executing a workflow

func (ExecuteWorkflowArgs) Kind

func (ExecuteWorkflowArgs) Kind() string

Kind returns the job kind for River

func (ExecuteWorkflowArgs) Timeout

func (ExecuteWorkflowArgs) Timeout() time.Duration

Timeout returns the timeout for workflow execution jobs

type ExecutionMetrics

type ExecutionMetrics struct {
	ExecutionID         uuid.UUID
	TotalSteps          int
	Duration            time.Duration
	AverageStepDuration time.Duration
	LongestStepDuration time.Duration
}

ExecutionMetrics represents metrics for a workflow execution

type ExecutionStatus

type ExecutionStatus struct {
	ExecutionID     uuid.UUID
	Status          string
	TotalSteps      int
	PendingSteps    int
	BlockedSteps    int
	InProgressSteps int
	OverdueSteps    int
	CompletedSteps  int
	FailedSteps     int
	CancelledSteps  int
	StartedAt       *time.Time
	CompletedAt     *time.Time
	FailedAt        *time.Time
	FailureReason   string
}

ExecutionStatus represents the current status of a workflow execution

type Manager

type Manager struct {
	// contains filtered or unexported fields
}

Manager orchestrates workflow execution lifecycle using River for async operations

func NewManager

func NewManager(
	riverClient RiverClient,
	workflowExecutionService WorkflowExecutionServiceInterface,
	workflowInstanceService WorkflowInstanceServiceInterface,
	stepExecutionService StepExecutionServiceInterface,
	logger *zap.SugaredLogger,
	notificationEnqueuer NotificationEnqueuer,
) *Manager

NewManager creates a new workflow manager

func (*Manager) CancelExecution

func (m *Manager) CancelExecution(ctx context.Context, executionID *uuid.UUID, reason string) (*workflows.WorkflowExecution, error)

CancelExecution cancels a running workflow execution

func (*Manager) GetExecutionMetrics

func (m *Manager) GetExecutionMetrics(ctx context.Context, executionID *uuid.UUID) (*ExecutionMetrics, error)

GetExecutionMetrics returns metrics for a workflow execution

func (*Manager) GetExecutionStatus

func (m *Manager) GetExecutionStatus(ctx context.Context, executionID *uuid.UUID) (*ExecutionStatus, error)

GetExecutionStatus returns the current status of a workflow execution

func (*Manager) ListExecutions

func (m *Manager) ListExecutions(ctx context.Context, workflowInstanceID *uuid.UUID, limit, offset int) ([]*workflows.WorkflowExecution, error)

ListExecutions returns workflow executions for a workflow instance

func (*Manager) RetryExecution

func (m *Manager) RetryExecution(ctx context.Context, executionID *uuid.UUID) (*workflows.WorkflowExecution, error)

RetryExecution creates a new execution for a failed workflow

func (*Manager) StartWorkflowExecution

func (m *Manager) StartWorkflowExecution(ctx context.Context, workflowInstanceID *uuid.UUID, opts StartWorkflowOptions) (*workflows.WorkflowExecution, error)

StartWorkflowExecution creates and starts a workflow execution via River

type NotificationEnqueuer

type NotificationEnqueuer interface {
	EnqueueWorkflowTaskAssigned(ctx context.Context, stepExecution *workflows.StepExecution) error
	EnqueueWorkflowExecutionFailed(ctx context.Context, execution *workflows.WorkflowExecution) error
}

NotificationEnqueuer is the minimal interface for enqueuing workflow notification jobs. Implemented by the worker service to avoid a direct River dependency in this package.

type OverdueService

type OverdueService struct {
	// contains filtered or unexported fields
}

OverdueService automates overdue and failed status transitions for workflow and step executions.

func NewOverdueService

func NewOverdueService(
	db *gorm.DB,
	workflowExecutionService *workflows.WorkflowExecutionService,
	stepExecutionService *workflows.StepExecutionService,
	evidenceIntegration *EvidenceIntegration,
	logger *zap.SugaredLogger,
	defaultGracePeriodDays int,
	notificationEnqueuer NotificationEnqueuer,
) *OverdueService

func (*OverdueService) CheckFailedExecutions

func (s *OverdueService) CheckFailedExecutions(ctx context.Context) (int, error)

CheckFailedExecutions marks overdue executions as failed after overdue grace period expires.

func (*OverdueService) CheckOverdueExecutions

func (s *OverdueService) CheckOverdueExecutions(ctx context.Context) (int, error)

CheckOverdueExecutions marks workflow executions as overdue once due date passes.

func (*OverdueService) CheckOverdueSteps

func (s *OverdueService) CheckOverdueSteps(ctx context.Context) (int, error)

CheckOverdueSteps marks incomplete steps as overdue once step due date passes.

type RiverClient

type RiverClient interface {
	InsertMany(ctx context.Context, params []river.InsertManyParams) ([]*rivertype.JobInsertResult, error)
}

RiverClient interface for job enqueueing (enables testing)

type RoleAssignmentServiceInterface

type RoleAssignmentServiceInterface interface {
	FindAssigneeForRole(instanceID *uuid.UUID, roleName string) (*workflows.RoleAssignment, error)
	GetByWorkflowInstanceID(instanceID *uuid.UUID) ([]workflows.RoleAssignment, error)
}

RoleAssignmentServiceInterface defines the interface for role assignment operations

type ScheduleWorkflowsArgs

type ScheduleWorkflowsArgs struct {
}

ScheduleWorkflowsArgs represents the arguments for the periodic scheduler job

func (ScheduleWorkflowsArgs) Kind

Kind returns the job kind for River

type StartWorkflowOptions

type StartWorkflowOptions struct {
	TriggeredBy   string
	TriggeredByID string
	PeriodLabel   string
	DueDate       *time.Time
}

StartWorkflowOptions contains options for starting a workflow execution

type StepExecutionAssignmentService

type StepExecutionAssignmentService interface {
	ReassignWithTx(tx *gorm.DB, id *uuid.UUID, assignedToType, assignedToID string, assignedAt time.Time) error
}

type StepExecutionServiceInterface

type StepExecutionServiceInterface interface {
	Create(stepExecution *workflows.StepExecution) error
	GetByID(id *uuid.UUID) (*workflows.StepExecution, error)
	GetByWorkflowExecutionID(executionID *uuid.UUID) ([]workflows.StepExecution, error)
	UpdateStatus(ctx context.Context, id *uuid.UUID, status string) error
	Fail(id *uuid.UUID, reason string) error
	CanUnblock(id *uuid.UUID) (bool, error)
	Unblock(id *uuid.UUID) error
}

Define interfaces for dependency injection

type StepStatus

type StepStatus string

StepStatus represents the status of a workflow step execution Note: This type mirrors workflows.StepExecutionStatus for use in the workflow orchestration layer. Both types must be kept in sync. The string values are identical to ensure compatibility.

const (
	StatusPending    StepStatus = "pending"
	StatusBlocked    StepStatus = "blocked"
	StatusInProgress StepStatus = "in_progress"
	StatusOverdue    StepStatus = "overdue"
	StatusCompleted  StepStatus = "completed"
	StatusFailed     StepStatus = "failed"
	StatusSkipped    StepStatus = "skipped"
	StatusCancelled  StepStatus = "cancelled"
)

Status constants for workflow step executions These values match workflows.StepExecutionStatus constants

func (StepStatus) String

func (s StepStatus) String() string

String returns the string representation of the status

type StepStatusCounts

type StepStatusCounts struct {
	Pending    int
	Blocked    int
	InProgress int
	Overdue    int
	Completed  int
	Failed     int
	Skipped    int
	Cancelled  int
}

StepStatusCounts holds a count of step executions per status.

func CountStepStatuses

func CountStepStatuses(steps []workflows.StepExecution) StepStatusCounts

CountStepStatuses counts step executions by status.

func (StepStatusCounts) AllTerminal

func (c StepStatusCounts) AllTerminal() bool

AllTerminal returns true when no step can make further progress, matching the semantics of checkWorkflowCompletion: all steps must be in {completed, failed, skipped} — cancelled steps are intentionally not treated as terminal here so that a partially-cancelled execution is not incorrectly marked complete.

type StepTransitionRequest

type StepTransitionRequest struct {
	Status   string               `json:"status"` // "in_progress" or "completed"
	Evidence []EvidenceSubmission `json:"evidence,omitempty"`
	Notes    string               `json:"notes,omitempty"`
	UserID   string               `json:"user_id"`   // ID of the user making the transition
	UserType string               `json:"user_type"` // "user", "group", "email"
}

StepTransitionRequest represents a request to transition a step

type StepTransitionService

type StepTransitionService struct {
	// contains filtered or unexported fields
}

StepTransitionService handles user-driven step transitions with role verification

func NewStepTransitionService

func NewStepTransitionService(
	stepExecutionService StepExecutionServiceInterface,
	stepDefinitionService WorkflowStepDefinitionServiceInterface,
	workflowExecutionService WorkflowExecutionServiceInterface,
	roleAssignmentService RoleAssignmentServiceInterface,
	workflowInstanceService WorkflowInstanceServiceInterface,
	workflowDefinitionService WorkflowDefinitionServiceInterface,
	executor *DAGExecutor,
	db *gorm.DB,
	evidenceIntegration *EvidenceIntegration,
) *StepTransitionService

NewStepTransitionService creates a new StepTransitionService

func (*StepTransitionService) CanUserTransitionStep

func (s *StepTransitionService) CanUserTransitionStep(stepExecutionID *uuid.UUID, userID, userType string) (bool, error)

CanUserTransitionStep checks if a user can transition a specific step

func (*StepTransitionService) FailStep

func (s *StepTransitionService) FailStep(ctx context.Context, stepExecutionID *uuid.UUID, reason string) error

FailStep marks a step as failed and propagates failure through dependent steps.

func (*StepTransitionService) GetEvidenceRequirements

func (s *StepTransitionService) GetEvidenceRequirements(stepExecutionID *uuid.UUID) ([]workflows.EvidenceRequirement, error)

GetEvidenceRequirements returns the evidence requirements for a step

func (*StepTransitionService) GetStepExecutionService

func (s *StepTransitionService) GetStepExecutionService() *workflows.StepExecutionService

GetStepExecutionService returns the underlying step execution service

func (*StepTransitionService) TransitionStepStatus

func (s *StepTransitionService) TransitionStepStatus(ctx context.Context, stepExecutionID *uuid.UUID, request *StepTransitionRequest) error

TransitionStepStatus handles user-driven step status transitions with role verification

type WorkflowDefinitionServiceInterface

type WorkflowDefinitionServiceInterface interface {
	GetByID(id *uuid.UUID) (*workflows.WorkflowDefinition, error)
}

WorkflowDefinitionServiceInterface defines the interface for workflow definition operations

type WorkflowExecutionServiceInterface

type WorkflowExecutionServiceInterface interface {
	Create(execution *workflows.WorkflowExecution) error
	GetByID(id *uuid.UUID) (*workflows.WorkflowExecution, error)
	GetByWorkflowInstanceID(instanceID *uuid.UUID) ([]workflows.WorkflowExecution, error)
	UpdateStatus(ctx context.Context, id *uuid.UUID, status string) error
	Cancel(id *uuid.UUID) error
	Fail(id *uuid.UUID, reason string) error
	FailIfNotTerminal(ctx context.Context, id *uuid.UUID, reason string) (bool, error)
}

type WorkflowExecutionWorker

type WorkflowExecutionWorker struct {
	// contains filtered or unexported fields
}

WorkflowExecutionWorker handles workflow execution jobs

func NewWorkflowExecutionWorker

func NewWorkflowExecutionWorker(executor *DAGExecutor, evidenceIntegration *EvidenceIntegration, logger *zap.SugaredLogger) *WorkflowExecutionWorker

NewWorkflowExecutionWorker creates a new WorkflowExecutionWorker

func (*WorkflowExecutionWorker) Work

Work is the River work function for initializing workflows

type WorkflowInstanceServiceInterface

type WorkflowInstanceServiceInterface interface {
	GetByID(id *uuid.UUID) (*workflows.WorkflowInstance, error)
	GetDueInstances(ctx context.Context) ([]workflows.WorkflowInstance, error)
	AdvanceSchedule(ctx context.Context, id *uuid.UUID) error
	UpdateSchedule(ctx context.Context, id *uuid.UUID, nextSchedule time.Time) error
}

type WorkflowSchedulerWorker

type WorkflowSchedulerWorker struct {
	// contains filtered or unexported fields
}

WorkflowSchedulerWorker handles the periodic scheduling of workflows

func NewWorkflowSchedulerWorker

func NewWorkflowSchedulerWorker(
	manager *Manager,
	workflowInstanceService WorkflowInstanceServiceInterface,
	overdueService *OverdueService,
	overdueCheckEnabled bool,
	logger *zap.SugaredLogger,
	defaultGracePeriod int,
) *WorkflowSchedulerWorker

NewWorkflowSchedulerWorker creates a new WorkflowSchedulerWorker

func (*WorkflowSchedulerWorker) Work

Work is the River work function for scheduling workflows

type WorkflowStepDefinitionServiceInterface

type WorkflowStepDefinitionServiceInterface interface {
	GetByID(id *uuid.UUID) (*workflows.WorkflowStepDefinition, error)
	GetByWorkflowDefinitionID(workflowDefID *uuid.UUID) ([]workflows.WorkflowStepDefinition, error)
	GetDependencies(stepID *uuid.UUID) ([]workflows.WorkflowStepDefinition, error)
	GetDependentSteps(stepID *uuid.UUID) ([]workflows.WorkflowStepDefinition, error)
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL