Documentation
¶
Index ¶
- Constants
- Variables
- func CalculateDueDate(scheduledTime time.Time, gracePeriodDays int) time.Time
- func GeneratePeriodLabel(cadence string, t time.Time) string
- func JobInsertOptionsForScheduler() *river.InsertOpts
- func JobInsertOptionsForWorkflow() *river.InsertOpts
- func ResolveGraceDays(instance *workflows.WorkflowInstance, defaultDays int) int
- type Assignee
- type AssignmentService
- func (s *AssignmentService) BulkReassignByRole(ctx context.Context, workflowExecutionID uuid.UUID, roleName string, ...) (*BulkReassignResult, error)
- func (s *AssignmentService) ReassignStep(ctx context.Context, stepExecutionID uuid.UUID, newAssignee Assignee, ...) error
- func (s *AssignmentService) ResolveStepAssignees(ctx context.Context, instance *workflows.WorkflowInstance, ...) (map[uuid.UUID]Assignee, error)
- type AssignmentServiceInterface
- type BulkReassignResult
- type DAGExecutor
- func (e *DAGExecutor) CheckAutomaticTriggers(ctx context.Context, stepExecutionID *uuid.UUID) error
- func (e *DAGExecutor) InitializeWorkflow(ctx context.Context, workflowExecutionID *uuid.UUID) error
- func (e *DAGExecutor) ProcessStepCompletion(ctx context.Context, stepExecutionID *uuid.UUID) error
- func (e *DAGExecutor) ProcessStepFailure(ctx context.Context, stepExecutionID *uuid.UUID) error
- func (e *DAGExecutor) SetEvidenceIntegration(evidenceIntegration *EvidenceIntegration)
- type EvidenceIntegration
- func (e *EvidenceIntegration) AddExecutionCompletionEvidence(ctx context.Context, workflowExecutionID *uuid.UUID) error
- func (e *EvidenceIntegration) AddExecutionFailureEvidence(ctx context.Context, workflowExecutionID *uuid.UUID) error
- func (e *EvidenceIntegration) AddStepStartedEvidence(ctx context.Context, stepExecutionID *uuid.UUID) error
- func (e *EvidenceIntegration) AddWorkflowExecutionEvidence(ctx context.Context, workflowExecutionID *uuid.UUID, status string) error
- func (e *EvidenceIntegration) GetOrCreateExecutionStream(ctx context.Context, workflowExecutionID *uuid.UUID) (*relational.Evidence, error)
- func (e *EvidenceIntegration) GetOrCreateInstanceStream(ctx context.Context, workflowInstanceID *uuid.UUID) (*relational.Evidence, error)
- type EvidenceRequirement
- type EvidenceSubmission
- type ExecuteWorkflowArgs
- type ExecutionMetrics
- type ExecutionStatus
- type Manager
- func (m *Manager) CancelExecution(ctx context.Context, executionID *uuid.UUID, reason string) (*workflows.WorkflowExecution, error)
- func (m *Manager) GetExecutionMetrics(ctx context.Context, executionID *uuid.UUID) (*ExecutionMetrics, error)
- func (m *Manager) GetExecutionStatus(ctx context.Context, executionID *uuid.UUID) (*ExecutionStatus, error)
- func (m *Manager) ListExecutions(ctx context.Context, workflowInstanceID *uuid.UUID, limit, offset int) ([]*workflows.WorkflowExecution, error)
- func (m *Manager) RetryExecution(ctx context.Context, executionID *uuid.UUID) (*workflows.WorkflowExecution, error)
- func (m *Manager) StartWorkflowExecution(ctx context.Context, workflowInstanceID *uuid.UUID, opts StartWorkflowOptions) (*workflows.WorkflowExecution, error)
- type NotificationEnqueuer
- type OverdueService
- type RiverClient
- type RoleAssignmentServiceInterface
- type ScheduleWorkflowsArgs
- type StartWorkflowOptions
- type StepExecutionAssignmentService
- type StepExecutionServiceInterface
- type StepStatus
- type StepStatusCounts
- type StepTransitionRequest
- type StepTransitionService
- func (s *StepTransitionService) CanUserTransitionStep(stepExecutionID *uuid.UUID, userID, userType string) (bool, error)
- func (s *StepTransitionService) FailStep(ctx context.Context, stepExecutionID *uuid.UUID, reason string) error
- func (s *StepTransitionService) GetEvidenceRequirements(stepExecutionID *uuid.UUID) ([]workflows.EvidenceRequirement, error)
- func (s *StepTransitionService) GetStepExecutionService() *workflows.StepExecutionService
- func (s *StepTransitionService) TransitionStepStatus(ctx context.Context, stepExecutionID *uuid.UUID, ...) error
- type WorkflowDefinitionServiceInterface
- type WorkflowExecutionServiceInterface
- type WorkflowExecutionWorker
- type WorkflowInstanceServiceInterface
- type WorkflowSchedulerWorker
- type WorkflowStepDefinitionServiceInterface
Constants ¶
const ( JobTypeExecuteWorkflow = "execute_workflow" JobTypeScheduleWorkflows = "schedule_workflows" )
Job types for workflow processing
Variables ¶
var ( ErrReassignmentNotAllowed = errors.New("step status does not allow reassignment") ErrInvalidAssignee = errors.New("invalid assignee") )
var ErrInvalidStepTransition = errors.New("invalid step transition")
var ErrWorkflowExecutionAlreadyExists = errors.New("workflow execution already exists for instance and period")
Functions ¶
func CalculateDueDate ¶
CalculateDueDate calculates the due date based on schedule time and grace period
func GeneratePeriodLabel ¶
GeneratePeriodLabel generates a label for the period based on cadence and time
func JobInsertOptionsForScheduler ¶
func JobInsertOptionsForScheduler() *river.InsertOpts
JobInsertOptionsForScheduler returns insert options for the scheduler job
func JobInsertOptionsForWorkflow ¶
func JobInsertOptionsForWorkflow() *river.InsertOpts
JobInsertOptionsForWorkflow returns insert options for workflow execution jobs
func ResolveGraceDays ¶
func ResolveGraceDays(instance *workflows.WorkflowInstance, defaultDays int) int
ResolveGraceDays returns the effective grace period days for a workflow instance, falling back through: instance → definition → provided default.
Types ¶
type AssignmentService ¶
type AssignmentService struct {
// contains filtered or unexported fields
}
AssignmentService handles logic for resolving step assignments
func NewAssignmentService ¶
func NewAssignmentService( roleAssignmentService RoleAssignmentServiceInterface, stepExecutionService StepExecutionAssignmentService, db *gorm.DB, logger *zap.SugaredLogger, notificationEnqueuer NotificationEnqueuer, ) *AssignmentService
NewAssignmentService creates a new assignment service
func (*AssignmentService) BulkReassignByRole ¶
func (*AssignmentService) ReassignStep ¶
func (*AssignmentService) ResolveStepAssignees ¶
func (s *AssignmentService) ResolveStepAssignees(ctx context.Context, instance *workflows.WorkflowInstance, stepDefinitions []workflows.WorkflowStepDefinition) (map[uuid.UUID]Assignee, error)
ResolveStepAssignees resolves assignees for a list of step definitions based on role assignments
type AssignmentServiceInterface ¶
type AssignmentServiceInterface interface {
ResolveStepAssignees(ctx context.Context, instance *workflows.WorkflowInstance, stepDefinitions []workflows.WorkflowStepDefinition) (map[uuid.UUID]Assignee, error)
}
type BulkReassignResult ¶
type DAGExecutor ¶
type DAGExecutor struct {
// contains filtered or unexported fields
}
DAGExecutor handles the execution of workflow DAGs with dependency resolution and parallel step execution capabilities
func NewDAGExecutor ¶
func NewDAGExecutor( stepExecutionService StepExecutionServiceInterface, workflowExecutionService WorkflowExecutionServiceInterface, stepDefinitionService WorkflowStepDefinitionServiceInterface, assignmentService AssignmentServiceInterface, logger *log.Logger, notificationEnqueuer NotificationEnqueuer, ) *DAGExecutor
NewDAGExecutor creates a new DAG executor instance
func (*DAGExecutor) CheckAutomaticTriggers ¶
CheckAutomaticTriggers checks if a step has automatic triggers configured and evaluates them (for future Phase 5 implementation)
func (*DAGExecutor) InitializeWorkflow ¶
InitializeWorkflow initializes a workflow execution by creating step execution records and setting up the initial DAG state (blocked/pending based on dependencies)
func (*DAGExecutor) ProcessStepCompletion ¶
ProcessStepCompletion processes a step completion and unblocks dependent steps This is called after a user manually completes a step
func (*DAGExecutor) ProcessStepFailure ¶
ProcessStepFailure propagates a failed step through dependent steps and reevaluates workflow completion.
func (*DAGExecutor) SetEvidenceIntegration ¶
func (e *DAGExecutor) SetEvidenceIntegration(evidenceIntegration *EvidenceIntegration)
SetEvidenceIntegration sets the evidence integration service (optional)
type EvidenceIntegration ¶
type EvidenceIntegration struct {
// contains filtered or unexported fields
}
EvidenceIntegration handles evidence stream creation and management for workflow executions Design: - 1 evidence stream per workflow execution (accumulates step completion evidence) - 1 evidence stream per workflow instance (accumulates execution completion evidence) - Streams use label-seeded UUIDs for deterministic identification
func NewEvidenceIntegration ¶
func NewEvidenceIntegration( db *gorm.DB, logger *zap.SugaredLogger, ) *EvidenceIntegration
NewEvidenceIntegration creates a new evidence integration service
func (*EvidenceIntegration) AddExecutionCompletionEvidence ¶
func (e *EvidenceIntegration) AddExecutionCompletionEvidence(ctx context.Context, workflowExecutionID *uuid.UUID) error
AddExecutionCompletionEvidence adds an execution completion evidence record to the instance stream
func (*EvidenceIntegration) AddExecutionFailureEvidence ¶
func (e *EvidenceIntegration) AddExecutionFailureEvidence(ctx context.Context, workflowExecutionID *uuid.UUID) error
AddExecutionFailureEvidence adds workflow execution failure evidence to both execution and instance streams.
func (*EvidenceIntegration) AddStepStartedEvidence ¶
func (e *EvidenceIntegration) AddStepStartedEvidence(ctx context.Context, stepExecutionID *uuid.UUID) error
AddStepStartedEvidence adds a step started evidence record to the execution stream
func (*EvidenceIntegration) AddWorkflowExecutionEvidence ¶
func (e *EvidenceIntegration) AddWorkflowExecutionEvidence(ctx context.Context, workflowExecutionID *uuid.UUID, status string) error
AddWorkflowExecutionEvidence adds a workflow execution evidence record to the instance stream
func (*EvidenceIntegration) GetOrCreateExecutionStream ¶
func (e *EvidenceIntegration) GetOrCreateExecutionStream(ctx context.Context, workflowExecutionID *uuid.UUID) (*relational.Evidence, error)
GetOrCreateExecutionStream gets or creates the evidence stream for a workflow execution This stream accumulates all step completion evidence for this execution
func (*EvidenceIntegration) GetOrCreateInstanceStream ¶
func (e *EvidenceIntegration) GetOrCreateInstanceStream(ctx context.Context, workflowInstanceID *uuid.UUID) (*relational.Evidence, error)
GetOrCreateInstanceStream gets or creates the evidence stream for a workflow instance This stream accumulates all execution completion evidence for this instance
type EvidenceRequirement ¶
type EvidenceRequirement struct {
Type string `json:"type"`
Description string `json:"description"`
Required bool `json:"required"`
}
EvidenceRequirement represents a required evidence type for a step
type EvidenceSubmission ¶
type EvidenceSubmission struct {
EvidenceID *uuid.UUID `json:"evidence-id"`
EvidenceType string `json:"evidence-type"`
Name string `json:"name"`
Description string `json:"description"`
FilePath string `json:"file-path,omitempty"`
FileSize int64 `json:"file-size,omitempty"`
FileHash string `json:"file-hash,omitempty"`
FileContent string `json:"file-content,omitempty"` // Base64 encoded file content
MediaType string `json:"media-type,omitempty"` // MIME type (e.g., "application/pdf", "image/png")
Metadata string `json:"metadata,omitempty"`
}
EvidenceSubmission represents evidence being submitted with a step transition
type ExecuteWorkflowArgs ¶
type ExecuteWorkflowArgs struct {
WorkflowExecutionID uuid.UUID `json:"workflow_execution_id"`
TriggeredBy string `json:"triggered_by"`
TriggeredByID string `json:"triggered_by_id"`
}
ExecuteWorkflowArgs represents the arguments for executing a workflow
func (ExecuteWorkflowArgs) Kind ¶
func (ExecuteWorkflowArgs) Kind() string
Kind returns the job kind for River
func (ExecuteWorkflowArgs) Timeout ¶
func (ExecuteWorkflowArgs) Timeout() time.Duration
Timeout returns the timeout for workflow execution jobs
type ExecutionMetrics ¶
type ExecutionMetrics struct {
ExecutionID uuid.UUID
TotalSteps int
Duration time.Duration
AverageStepDuration time.Duration
LongestStepDuration time.Duration
}
ExecutionMetrics represents metrics for a workflow execution
type ExecutionStatus ¶
type ExecutionStatus struct {
ExecutionID uuid.UUID
Status string
TotalSteps int
PendingSteps int
BlockedSteps int
InProgressSteps int
OverdueSteps int
CompletedSteps int
FailedSteps int
CancelledSteps int
StartedAt *time.Time
CompletedAt *time.Time
FailedAt *time.Time
FailureReason string
}
ExecutionStatus represents the current status of a workflow execution
type Manager ¶
type Manager struct {
// contains filtered or unexported fields
}
Manager orchestrates workflow execution lifecycle using River for async operations
func NewManager ¶
func NewManager( riverClient RiverClient, workflowExecutionService WorkflowExecutionServiceInterface, workflowInstanceService WorkflowInstanceServiceInterface, stepExecutionService StepExecutionServiceInterface, logger *zap.SugaredLogger, notificationEnqueuer NotificationEnqueuer, ) *Manager
NewManager creates a new workflow manager
func (*Manager) CancelExecution ¶
func (m *Manager) CancelExecution(ctx context.Context, executionID *uuid.UUID, reason string) (*workflows.WorkflowExecution, error)
CancelExecution cancels a running workflow execution
func (*Manager) GetExecutionMetrics ¶
func (m *Manager) GetExecutionMetrics(ctx context.Context, executionID *uuid.UUID) (*ExecutionMetrics, error)
GetExecutionMetrics returns metrics for a workflow execution
func (*Manager) GetExecutionStatus ¶
func (m *Manager) GetExecutionStatus(ctx context.Context, executionID *uuid.UUID) (*ExecutionStatus, error)
GetExecutionStatus returns the current status of a workflow execution
func (*Manager) ListExecutions ¶
func (m *Manager) ListExecutions(ctx context.Context, workflowInstanceID *uuid.UUID, limit, offset int) ([]*workflows.WorkflowExecution, error)
ListExecutions returns workflow executions for a workflow instance
func (*Manager) RetryExecution ¶
func (m *Manager) RetryExecution(ctx context.Context, executionID *uuid.UUID) (*workflows.WorkflowExecution, error)
RetryExecution creates a new execution for a failed workflow
func (*Manager) StartWorkflowExecution ¶
func (m *Manager) StartWorkflowExecution(ctx context.Context, workflowInstanceID *uuid.UUID, opts StartWorkflowOptions) (*workflows.WorkflowExecution, error)
StartWorkflowExecution creates and starts a workflow execution via River
type NotificationEnqueuer ¶
type NotificationEnqueuer interface {
EnqueueWorkflowTaskAssigned(ctx context.Context, stepExecution *workflows.StepExecution) error
EnqueueWorkflowExecutionFailed(ctx context.Context, execution *workflows.WorkflowExecution) error
}
NotificationEnqueuer is the minimal interface for enqueuing workflow notification jobs. Implemented by the worker service to avoid a direct River dependency in this package.
type OverdueService ¶
type OverdueService struct {
// contains filtered or unexported fields
}
OverdueService automates overdue and failed status transitions for workflow and step executions.
func NewOverdueService ¶
func NewOverdueService( db *gorm.DB, workflowExecutionService *workflows.WorkflowExecutionService, stepExecutionService *workflows.StepExecutionService, evidenceIntegration *EvidenceIntegration, logger *zap.SugaredLogger, defaultGracePeriodDays int, notificationEnqueuer NotificationEnqueuer, ) *OverdueService
func (*OverdueService) CheckFailedExecutions ¶
func (s *OverdueService) CheckFailedExecutions(ctx context.Context) (int, error)
CheckFailedExecutions marks overdue executions as failed after overdue grace period expires.
func (*OverdueService) CheckOverdueExecutions ¶
func (s *OverdueService) CheckOverdueExecutions(ctx context.Context) (int, error)
CheckOverdueExecutions marks workflow executions as overdue once due date passes.
func (*OverdueService) CheckOverdueSteps ¶
func (s *OverdueService) CheckOverdueSteps(ctx context.Context) (int, error)
CheckOverdueSteps marks incomplete steps as overdue once step due date passes.
type RiverClient ¶
type RiverClient interface {
InsertMany(ctx context.Context, params []river.InsertManyParams) ([]*rivertype.JobInsertResult, error)
}
RiverClient interface for job enqueueing (enables testing)
type RoleAssignmentServiceInterface ¶
type RoleAssignmentServiceInterface interface {
FindAssigneeForRole(instanceID *uuid.UUID, roleName string) (*workflows.RoleAssignment, error)
GetByWorkflowInstanceID(instanceID *uuid.UUID) ([]workflows.RoleAssignment, error)
}
RoleAssignmentServiceInterface defines the interface for role assignment operations
type ScheduleWorkflowsArgs ¶
type ScheduleWorkflowsArgs struct {
}
ScheduleWorkflowsArgs represents the arguments for the periodic scheduler job
func (ScheduleWorkflowsArgs) Kind ¶
func (ScheduleWorkflowsArgs) Kind() string
Kind returns the job kind for River
type StartWorkflowOptions ¶
type StartWorkflowOptions struct {
TriggeredBy string
TriggeredByID string
PeriodLabel string
DueDate *time.Time
}
StartWorkflowOptions contains options for starting a workflow execution
type StepExecutionServiceInterface ¶
type StepExecutionServiceInterface interface {
Create(stepExecution *workflows.StepExecution) error
GetByID(id *uuid.UUID) (*workflows.StepExecution, error)
GetByWorkflowExecutionID(executionID *uuid.UUID) ([]workflows.StepExecution, error)
UpdateStatus(ctx context.Context, id *uuid.UUID, status string) error
Fail(id *uuid.UUID, reason string) error
CanUnblock(id *uuid.UUID) (bool, error)
Unblock(id *uuid.UUID) error
}
Define interfaces for dependency injection
type StepStatus ¶
type StepStatus string
StepStatus represents the status of a workflow step execution Note: This type mirrors workflows.StepExecutionStatus for use in the workflow orchestration layer. Both types must be kept in sync. The string values are identical to ensure compatibility.
const ( StatusPending StepStatus = "pending" StatusBlocked StepStatus = "blocked" StatusInProgress StepStatus = "in_progress" StatusOverdue StepStatus = "overdue" StatusCompleted StepStatus = "completed" StatusFailed StepStatus = "failed" StatusSkipped StepStatus = "skipped" StatusCancelled StepStatus = "cancelled" )
Status constants for workflow step executions These values match workflows.StepExecutionStatus constants
func (StepStatus) String ¶
func (s StepStatus) String() string
String returns the string representation of the status
type StepStatusCounts ¶
type StepStatusCounts struct {
Pending int
Blocked int
InProgress int
Overdue int
Completed int
Failed int
Skipped int
Cancelled int
}
StepStatusCounts holds a count of step executions per status.
func CountStepStatuses ¶
func CountStepStatuses(steps []workflows.StepExecution) StepStatusCounts
CountStepStatuses counts step executions by status.
func (StepStatusCounts) AllTerminal ¶
func (c StepStatusCounts) AllTerminal() bool
AllTerminal returns true when no step can make further progress, matching the semantics of checkWorkflowCompletion: all steps must be in {completed, failed, skipped} — cancelled steps are intentionally not treated as terminal here so that a partially-cancelled execution is not incorrectly marked complete.
type StepTransitionRequest ¶
type StepTransitionRequest struct {
Status string `json:"status"` // "in_progress" or "completed"
Evidence []EvidenceSubmission `json:"evidence,omitempty"`
Notes string `json:"notes,omitempty"`
UserID string `json:"user_id"` // ID of the user making the transition
UserType string `json:"user_type"` // "user", "group", "email"
}
StepTransitionRequest represents a request to transition a step
type StepTransitionService ¶
type StepTransitionService struct {
// contains filtered or unexported fields
}
StepTransitionService handles user-driven step transitions with role verification
func NewStepTransitionService ¶
func NewStepTransitionService( stepExecutionService StepExecutionServiceInterface, stepDefinitionService WorkflowStepDefinitionServiceInterface, workflowExecutionService WorkflowExecutionServiceInterface, roleAssignmentService RoleAssignmentServiceInterface, workflowInstanceService WorkflowInstanceServiceInterface, workflowDefinitionService WorkflowDefinitionServiceInterface, executor *DAGExecutor, db *gorm.DB, evidenceIntegration *EvidenceIntegration, ) *StepTransitionService
NewStepTransitionService creates a new StepTransitionService
func (*StepTransitionService) CanUserTransitionStep ¶
func (s *StepTransitionService) CanUserTransitionStep(stepExecutionID *uuid.UUID, userID, userType string) (bool, error)
CanUserTransitionStep checks if a user can transition a specific step
func (*StepTransitionService) FailStep ¶
func (s *StepTransitionService) FailStep(ctx context.Context, stepExecutionID *uuid.UUID, reason string) error
FailStep marks a step as failed and propagates failure through dependent steps.
func (*StepTransitionService) GetEvidenceRequirements ¶
func (s *StepTransitionService) GetEvidenceRequirements(stepExecutionID *uuid.UUID) ([]workflows.EvidenceRequirement, error)
GetEvidenceRequirements returns the evidence requirements for a step
func (*StepTransitionService) GetStepExecutionService ¶
func (s *StepTransitionService) GetStepExecutionService() *workflows.StepExecutionService
GetStepExecutionService returns the underlying step execution service
func (*StepTransitionService) TransitionStepStatus ¶
func (s *StepTransitionService) TransitionStepStatus(ctx context.Context, stepExecutionID *uuid.UUID, request *StepTransitionRequest) error
TransitionStepStatus handles user-driven step status transitions with role verification
type WorkflowDefinitionServiceInterface ¶
type WorkflowDefinitionServiceInterface interface {
GetByID(id *uuid.UUID) (*workflows.WorkflowDefinition, error)
}
WorkflowDefinitionServiceInterface defines the interface for workflow definition operations
type WorkflowExecutionServiceInterface ¶
type WorkflowExecutionServiceInterface interface {
Create(execution *workflows.WorkflowExecution) error
GetByID(id *uuid.UUID) (*workflows.WorkflowExecution, error)
GetByWorkflowInstanceID(instanceID *uuid.UUID) ([]workflows.WorkflowExecution, error)
UpdateStatus(ctx context.Context, id *uuid.UUID, status string) error
Cancel(id *uuid.UUID) error
Fail(id *uuid.UUID, reason string) error
FailIfNotTerminal(ctx context.Context, id *uuid.UUID, reason string) (bool, error)
}
type WorkflowExecutionWorker ¶
type WorkflowExecutionWorker struct {
// contains filtered or unexported fields
}
WorkflowExecutionWorker handles workflow execution jobs
func NewWorkflowExecutionWorker ¶
func NewWorkflowExecutionWorker(executor *DAGExecutor, evidenceIntegration *EvidenceIntegration, logger *zap.SugaredLogger) *WorkflowExecutionWorker
NewWorkflowExecutionWorker creates a new WorkflowExecutionWorker
func (*WorkflowExecutionWorker) Work ¶
func (w *WorkflowExecutionWorker) Work(ctx context.Context, job *river.Job[ExecuteWorkflowArgs]) error
Work is the River work function for initializing workflows
type WorkflowInstanceServiceInterface ¶
type WorkflowInstanceServiceInterface interface {
GetByID(id *uuid.UUID) (*workflows.WorkflowInstance, error)
GetDueInstances(ctx context.Context) ([]workflows.WorkflowInstance, error)
AdvanceSchedule(ctx context.Context, id *uuid.UUID) error
UpdateSchedule(ctx context.Context, id *uuid.UUID, nextSchedule time.Time) error
}
type WorkflowSchedulerWorker ¶
type WorkflowSchedulerWorker struct {
// contains filtered or unexported fields
}
WorkflowSchedulerWorker handles the periodic scheduling of workflows
func NewWorkflowSchedulerWorker ¶
func NewWorkflowSchedulerWorker( manager *Manager, workflowInstanceService WorkflowInstanceServiceInterface, overdueService *OverdueService, overdueCheckEnabled bool, logger *zap.SugaredLogger, defaultGracePeriod int, ) *WorkflowSchedulerWorker
NewWorkflowSchedulerWorker creates a new WorkflowSchedulerWorker
func (*WorkflowSchedulerWorker) Work ¶
func (w *WorkflowSchedulerWorker) Work(ctx context.Context, job *river.Job[ScheduleWorkflowsArgs]) error
Work is the River work function for scheduling workflows
type WorkflowStepDefinitionServiceInterface ¶
type WorkflowStepDefinitionServiceInterface interface {
GetByID(id *uuid.UUID) (*workflows.WorkflowStepDefinition, error)
GetByWorkflowDefinitionID(workflowDefID *uuid.UUID) ([]workflows.WorkflowStepDefinition, error)
GetDependencies(stepID *uuid.UUID) ([]workflows.WorkflowStepDefinition, error)
GetDependentSteps(stepID *uuid.UUID) ([]workflows.WorkflowStepDefinition, error)
}