engine

package
v1.7.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 29, 2026 License: Apache-2.0 Imports: 17 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func CreateWorkflowInstance

func CreateWorkflowInstance(wm *WorkflowManager, name string) (core.Workflow, error)

func RunWorkflow

func RunWorkflow(ctx context.Context, w core.Workflow, r WorkflowRepo, wa WorkflowActionRepo, executorID int64, workerID string)

Engine runs a workflow

func Worker

func Worker(ctx context.Context, id int, executorID int64, workflowRepository WorkflowRepo, workflowActionRepository WorkflowActionRepo, workflowQueue <-chan core.Workflow)

Worker function that processes workflows from the queue

Types

type DefinitionRepo added in v1.7.0

type DefinitionRepo interface {
	FindAll() (*[]domain.WorkflowDefinition, error)
	FindByName(name string) (*domain.WorkflowDefinition, error)
	Save(def *domain.WorkflowDefinition) error
}

DefinitionRepo defines the interface for workflow definition persistence.

type ExecutorRepo added in v1.7.0

type ExecutorRepo interface {
	Save(e *domain.Executor) (int64, error)
	UpdateLastActive(id int64, ts time.Time) error
	GetExecutorsByLastActive(limit int) ([]*domain.Executor, error)
}

ExecutorRepo defines the interface for executor persistence.

type UserRepo added in v1.7.0

type UserRepo interface {
	FindBySessionID(sessionID string, now time.Time) (*domain.User, error)
	FindByApiKey(apiKey string) (*domain.User, error)
	FindAll() (*[]domain.User, error)
	Save(user *domain.User) (int64, error)
	FindById(id int64) (*domain.User, error)
	DeleteById(id int64) error
	FindByUsername(username string) (*domain.User, error)
	UpdateSession(userID int64, sessionID string, expiry time.Time) error
	ClearSessionBySessionID(sessionID string) error
	UpdateUser(id int64, username string, apiKey sql.NullString, enabled sql.NullBool) error
}

UserRepo defines the interface for user persistence.

type WorkflowActionRepo added in v1.7.0

type WorkflowActionRepo interface {
	Save(a *domain.WorkflowAction) (int64, error)
	FindAllByWorkflowID(workflowID int64) (*[]domain.WorkflowAction, error)
}

WorkflowActionRepo defines the interface for workflow action persistence.

type WorkflowManager

type WorkflowManager struct {
	WorkflowRegistry   *map[string]func() core.Workflow
	WorkflowRepo       WorkflowRepo
	WorkflowActionRepo WorkflowActionRepo

	DefinitionRepo DefinitionRepo
	// contains filtered or unexported fields
}

func NewWorkflowManager

func NewWorkflowManager(workflowRepo WorkflowRepo, workflowActionRepo WorkflowActionRepo, executorRepo ExecutorRepo,
	definitionRepo DefinitionRepo, WorkflowRegistry *map[string]func() core.Workflow, clock core.Clock) *WorkflowManager

func (*WorkflowManager) DefinitionOverview

func (wm *WorkflowManager) DefinitionOverview(workflowType string) ([]repository.DefinitionStateRow, error)

DefinitionOverview exposes counts by state for a workflow type

func (*WorkflowManager) GetWorkflowDefinitionByName

func (wm *WorkflowManager) GetWorkflowDefinitionByName(name string) (*domain.WorkflowDefinition, error)

GetWorkflowDefinitionByName exposes repository get by name for web/API layers.

func (*WorkflowManager) ListExecutors

func (wm *WorkflowManager) ListExecutors(limit int) ([]*domain.Executor, error)

ListExecutors returns recent executors ordered by last_active desc.

func (*WorkflowManager) ListWorkflowDefinitions

func (wm *WorkflowManager) ListWorkflowDefinitions() (*[]domain.WorkflowDefinition, error)

ListWorkflowDefinitions exposes repository list for web/API layers.

func (*WorkflowManager) NextToExecute

func (wm *WorkflowManager) NextToExecute(limit int) (*[]domain.Workflow, error)

NextToExecute exposes repository method for dashboard

func (*WorkflowManager) Overview

Overview exposes grouped counts for home dashboard

func (*WorkflowManager) SearchWorkflows

func (wm *WorkflowManager) SearchWorkflows(req models.SearchWorkflowRequest) (*[]domain.Workflow, error)

SearchWorkflows delegates to the repository to search based on request filters.

func (*WorkflowManager) StartEngine

func (wm *WorkflowManager) StartEngine(ctx context.Context, pollInterval time.Duration)

StartEngine starts polling for new workflows at the given interval

func (*WorkflowManager) TopExecuting

func (wm *WorkflowManager) TopExecuting(limit int) (*[]domain.Workflow, error)

TopExecuting exposes repository method for dashboard

func (*WorkflowManager) Wakeup

func (wm *WorkflowManager) Wakeup()

type WorkflowRepo added in v1.7.0

type WorkflowRepo interface {
	GetChildrenByParentID(parentID int64, onlyActive bool) (*[]domain.Workflow, error)
	UpdateWorkflowStatus(id int64, status string) error
	UpdateWorkflowStartingTime(id int64) error
	UpdateState(id int64, state string) error
	SaveWorkflowVariables(id int64, vars string) error
	WakeParentWorkflow(parentID int64) error
	Save(wf *domain.Workflow) (int64, error)
	FindByID(id int64) (*domain.Workflow, error)
	UpdateNextActivationSpecific(id int64, next time.Time) error
	UpdateNextActivationOffset(id int64, offset string) error
	ClearExecutorId(id int64) error
	IncrementRetryCounterAndSetNextActivation(id int64, activation time.Time) error
	FindPendingWorkflows(size int, executorGroup string) (*[]domain.Workflow, error)
	MarkWorkflowAsScheduledForExecution(id int64, executorId int64, modified time.Time) bool
	FindStuckWorkflows(minutesRepair string, executorGroup string, limit int) (*[]domain.Workflow, error)
	LockWorkflowByModified(id int64, modified time.Time) bool
	SearchWorkflows(req models.SearchWorkflowRequest) (*[]domain.Workflow, error)
	GetTopExecuting(limit int) (*[]domain.Workflow, error)
	GetNextToExecute(limit int) (*[]domain.Workflow, error)
	GetWorkflowOverview() ([]repository.WorkflowOverviewRow, error)
	GetDefinitionStateOverview(workflowType string) ([]repository.DefinitionStateRow, error)
	FindByExternalId(id string) (*domain.Workflow, error)
	SaveWorkflowVariablesAndTouch(id int64, vars string) error
}

WorkflowRepo defines the interface for workflow persistence, matching repository.WorkflowRepository.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL