Documentation
¶
Index ¶
- func CreateWorkflowInstance(wm *WorkflowManager, name string) (core.Workflow, error)
- func RunWorkflow(ctx context.Context, w core.Workflow, r WorkflowRepo, wa WorkflowActionRepo, ...)
- func Worker(ctx context.Context, id int, executorID int64, workflowRepository WorkflowRepo, ...)
- type DefinitionRepo
- type ExecutorRepo
- type UserRepo
- type WorkflowActionRepo
- type WorkflowManager
- func (wm *WorkflowManager) DefinitionOverview(workflowType string) ([]repository.DefinitionStateRow, error)
- func (wm *WorkflowManager) GetWorkflowDefinitionByName(name string) (*domain.WorkflowDefinition, error)
- func (wm *WorkflowManager) ListExecutors(limit int) ([]*domain.Executor, error)
- func (wm *WorkflowManager) ListWorkflowDefinitions() (*[]domain.WorkflowDefinition, error)
- func (wm *WorkflowManager) NextToExecute(limit int) (*[]domain.Workflow, error)
- func (wm *WorkflowManager) Overview() ([]repository.WorkflowOverviewRow, error)
- func (wm *WorkflowManager) SearchWorkflows(req models.SearchWorkflowRequest) (*[]domain.Workflow, error)
- func (wm *WorkflowManager) StartEngine(ctx context.Context, pollInterval time.Duration)
- func (wm *WorkflowManager) TopExecuting(limit int) (*[]domain.Workflow, error)
- func (wm *WorkflowManager) Wakeup()
- type WorkflowRepo
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func CreateWorkflowInstance ¶
func CreateWorkflowInstance(wm *WorkflowManager, name string) (core.Workflow, error)
func RunWorkflow ¶
func RunWorkflow(ctx context.Context, w core.Workflow, r WorkflowRepo, wa WorkflowActionRepo, executorID int64, workerID string)
Engine runs a workflow
func Worker ¶
func Worker(ctx context.Context, id int, executorID int64, workflowRepository WorkflowRepo, workflowActionRepository WorkflowActionRepo, workflowQueue <-chan core.Workflow)
Worker function that processes workflows from the queue
Types ¶
type DefinitionRepo ¶ added in v1.7.0
type DefinitionRepo interface {
FindAll() (*[]domain.WorkflowDefinition, error)
FindByName(name string) (*domain.WorkflowDefinition, error)
Save(def *domain.WorkflowDefinition) error
}
DefinitionRepo defines the interface for workflow definition persistence.
type ExecutorRepo ¶ added in v1.7.0
type ExecutorRepo interface {
Save(e *domain.Executor) (int64, error)
UpdateLastActive(id int64, ts time.Time) error
GetExecutorsByLastActive(limit int) ([]*domain.Executor, error)
}
ExecutorRepo defines the interface for executor persistence.
type UserRepo ¶ added in v1.7.0
type UserRepo interface {
FindBySessionID(sessionID string, now time.Time) (*domain.User, error)
FindByApiKey(apiKey string) (*domain.User, error)
FindAll() (*[]domain.User, error)
Save(user *domain.User) (int64, error)
FindById(id int64) (*domain.User, error)
DeleteById(id int64) error
FindByUsername(username string) (*domain.User, error)
UpdateSession(userID int64, sessionID string, expiry time.Time) error
ClearSessionBySessionID(sessionID string) error
UpdateUser(id int64, username string, apiKey sql.NullString, enabled sql.NullBool) error
}
UserRepo defines the interface for user persistence.
type WorkflowActionRepo ¶ added in v1.7.0
type WorkflowActionRepo interface {
Save(a *domain.WorkflowAction) (int64, error)
FindAllByWorkflowID(workflowID int64) (*[]domain.WorkflowAction, error)
}
WorkflowActionRepo defines the interface for workflow action persistence.
type WorkflowManager ¶
type WorkflowManager struct {
WorkflowRegistry *map[string]func() core.Workflow
WorkflowRepo WorkflowRepo
WorkflowActionRepo WorkflowActionRepo
DefinitionRepo DefinitionRepo
// contains filtered or unexported fields
}
func NewWorkflowManager ¶
func NewWorkflowManager(workflowRepo WorkflowRepo, workflowActionRepo WorkflowActionRepo, executorRepo ExecutorRepo, definitionRepo DefinitionRepo, WorkflowRegistry *map[string]func() core.Workflow, clock core.Clock) *WorkflowManager
func (*WorkflowManager) DefinitionOverview ¶
func (wm *WorkflowManager) DefinitionOverview(workflowType string) ([]repository.DefinitionStateRow, error)
DefinitionOverview exposes counts by state for a workflow type
func (*WorkflowManager) GetWorkflowDefinitionByName ¶
func (wm *WorkflowManager) GetWorkflowDefinitionByName(name string) (*domain.WorkflowDefinition, error)
GetWorkflowDefinitionByName exposes repository get by name for web/API layers.
func (*WorkflowManager) ListExecutors ¶
func (wm *WorkflowManager) ListExecutors(limit int) ([]*domain.Executor, error)
ListExecutors returns recent executors ordered by last_active desc.
func (*WorkflowManager) ListWorkflowDefinitions ¶
func (wm *WorkflowManager) ListWorkflowDefinitions() (*[]domain.WorkflowDefinition, error)
ListWorkflowDefinitions exposes repository list for web/API layers.
func (*WorkflowManager) NextToExecute ¶
func (wm *WorkflowManager) NextToExecute(limit int) (*[]domain.Workflow, error)
NextToExecute exposes repository method for dashboard
func (*WorkflowManager) Overview ¶
func (wm *WorkflowManager) Overview() ([]repository.WorkflowOverviewRow, error)
Overview exposes grouped counts for home dashboard
func (*WorkflowManager) SearchWorkflows ¶
func (wm *WorkflowManager) SearchWorkflows(req models.SearchWorkflowRequest) (*[]domain.Workflow, error)
SearchWorkflows delegates to the repository to search based on request filters.
func (*WorkflowManager) StartEngine ¶
func (wm *WorkflowManager) StartEngine(ctx context.Context, pollInterval time.Duration)
StartEngine starts polling for new workflows at the given interval
func (*WorkflowManager) TopExecuting ¶
func (wm *WorkflowManager) TopExecuting(limit int) (*[]domain.Workflow, error)
TopExecuting exposes repository method for dashboard
func (*WorkflowManager) Wakeup ¶
func (wm *WorkflowManager) Wakeup()
type WorkflowRepo ¶ added in v1.7.0
type WorkflowRepo interface {
GetChildrenByParentID(parentID int64, onlyActive bool) (*[]domain.Workflow, error)
UpdateWorkflowStatus(id int64, status string) error
UpdateWorkflowStartingTime(id int64) error
UpdateState(id int64, state string) error
SaveWorkflowVariables(id int64, vars string) error
WakeParentWorkflow(parentID int64) error
Save(wf *domain.Workflow) (int64, error)
FindByID(id int64) (*domain.Workflow, error)
UpdateNextActivationSpecific(id int64, next time.Time) error
UpdateNextActivationOffset(id int64, offset string) error
ClearExecutorId(id int64) error
IncrementRetryCounterAndSetNextActivation(id int64, activation time.Time) error
FindPendingWorkflows(size int, executorGroup string) (*[]domain.Workflow, error)
MarkWorkflowAsScheduledForExecution(id int64, executorId int64, modified time.Time) bool
FindStuckWorkflows(minutesRepair string, executorGroup string, limit int) (*[]domain.Workflow, error)
LockWorkflowByModified(id int64, modified time.Time) bool
SearchWorkflows(req models.SearchWorkflowRequest) (*[]domain.Workflow, error)
GetTopExecuting(limit int) (*[]domain.Workflow, error)
GetNextToExecute(limit int) (*[]domain.Workflow, error)
GetWorkflowOverview() ([]repository.WorkflowOverviewRow, error)
GetDefinitionStateOverview(workflowType string) ([]repository.DefinitionStateRow, error)
FindByExternalId(id string) (*domain.Workflow, error)
SaveWorkflowVariablesAndTouch(id int64, vars string) error
}
WorkflowRepo defines the interface for workflow persistence, matching repository.WorkflowRepository.