Documentation
¶
Index ¶
- Constants
- func ParsePostgresInterval(interval string) (time.Duration, error)
- type DefinitionStateRow
- type ExecutorRepository
- type UserRepository
- func (r *UserRepository) ClearSessionBySessionID(sessionID string) error
- func (r *UserRepository) FindAll() (*[]domain.User, error)
- func (r *UserRepository) FindByApiKey(apiKey string) (*domain.User, error)
- func (r *UserRepository) FindBySessionID(sessionID string, now time.Time) (*domain.User, error)
- func (r *UserRepository) FindByUsername(username string) (*domain.User, error)
- func (r *UserRepository) Save(u *domain.User) (int64, error)
- func (r *UserRepository) UpdateSession(userID int64, sessionID string, expiry time.Time) error
- type WorkflowActionRepository
- type WorkflowDefinitionRepository
- type WorkflowOverviewRow
- type WorkflowRepository
- func (r *WorkflowRepository) ClearExecutorId(id int64) error
- func (r *WorkflowRepository) FindByExternalId(id string) (*domain.Workflow, error)
- func (r *WorkflowRepository) FindByID(id int64) (*domain.Workflow, error)
- func (r *WorkflowRepository) FindPendingWorkflows(size int, executorGroup string) (*[]domain.Workflow, error)
- func (r *WorkflowRepository) FindStuckWorkflows(minutesRepair string, executorGroup string, limit int) (*[]domain.Workflow, error)
- func (r *WorkflowRepository) GetDefinitionStateOverview(workflowType string) ([]DefinitionStateRow, error)
- func (r *WorkflowRepository) GetNextToExecute(limit int) (*[]domain.Workflow, error)
- func (r *WorkflowRepository) GetTopExecuting(limit int) (*[]domain.Workflow, error)
- func (r *WorkflowRepository) GetWorkflowOverview() ([]WorkflowOverviewRow, error)
- func (r *WorkflowRepository) IncrementRetryCounterAndSetNextActivation(id int64, activation time.Time) error
- func (r *WorkflowRepository) LockWorkflowByModified(id int64, modified time.Time) bool
- func (r *WorkflowRepository) MarkWorkflowAsScheduledForExecution(id int64, executorId int64, modified time.Time) bool
- func (r *WorkflowRepository) Save(wf *domain.Workflow) (int64, error)
- func (r *WorkflowRepository) SaveWorkflowVariables(id int64, vars string) error
- func (r *WorkflowRepository) SaveWorkflowVariablesAndTouch(id int64, vars string) error
- func (r *WorkflowRepository) SearchWorkflows(req models.SearchWorkflowRequest) (*[]domain.Workflow, error)
- func (r *WorkflowRepository) UpdateNextActivationOffset(id int64, offset string) error
- func (r *WorkflowRepository) UpdateNextActivationSpecific(id int64, next time.Time) error
- func (r *WorkflowRepository) UpdateState(id int64, state string) error
- func (r *WorkflowRepository) UpdateWorkflowStartingTime(id int64) error
- func (r *WorkflowRepository) UpdateWorkflowStatus(id int64, status string) error
Constants ¶
const ALL_COLUMNS = `` /* 195-byte string literal not displayed */
Variables ¶
This section is empty.
Functions ¶
Types ¶
type DefinitionStateRow ¶
type DefinitionStateRow struct { State string NewCount int ScheduledCount int ExecutingCount int InProgressCount int FinishedCount int }
DefinitionStateRow holds counts by state for a workflow type
type ExecutorRepository ¶
type ExecutorRepository struct {
// contains filtered or unexported fields
}
ExecutorRepository provides persistence for executors table.
func NewExecutorRepository ¶
func NewExecutorRepository(db *sql.DB) *ExecutorRepository
func (*ExecutorRepository) GetExecutorsByLastActive ¶
func (r *ExecutorRepository) GetExecutorsByLastActive(limit int) ([]*domain.Executor, error)
func (*ExecutorRepository) Save ¶
func (r *ExecutorRepository) Save(e *domain.Executor) (int64, error)
func (*ExecutorRepository) UpdateLastActive ¶
func (r *ExecutorRepository) UpdateLastActive(id int64, ts time.Time) error
UpdateLastActive sets last_active for the executor id to the provided timestamp.
type UserRepository ¶
type UserRepository struct {
// contains filtered or unexported fields
}
UserRepository provides persistence methods for the users table.
func NewUserRepository ¶
func NewUserRepository(db *sql.DB) *UserRepository
func (*UserRepository) ClearSessionBySessionID ¶
func (r *UserRepository) ClearSessionBySessionID(sessionID string) error
ClearSessionBySessionID nulls session_id and sessionExpiry for the user with the given current session_id.
func (*UserRepository) FindAll ¶
func (r *UserRepository) FindAll() (*[]domain.User, error)
FindAll returns all users ordered by id ascending.
func (*UserRepository) FindByApiKey ¶
func (r *UserRepository) FindByApiKey(apiKey string) (*domain.User, error)
FindByApiKey fetches a user by api_key (exact match). Returns (nil, nil) if not found.
func (*UserRepository) FindBySessionID ¶
FindBySessionID fetches a user by session_id and ensures sessionExpiry is in the future.
func (*UserRepository) FindByUsername ¶
func (r *UserRepository) FindByUsername(username string) (*domain.User, error)
FindByUsername fetches a user by exact username. Returns (nil, nil) if not found.
func (*UserRepository) Save ¶
func (r *UserRepository) Save(u *domain.User) (int64, error)
Save inserts a new user and returns its generated id. It will set Created to now if it's not provided (null or zero).
func (*UserRepository) UpdateSession ¶
UpdateSession sets session_id and sessionExpiry for a user by id.
type WorkflowActionRepository ¶
type WorkflowActionRepository struct {
// contains filtered or unexported fields
}
WorkflowActionRepository provides methods to persist and query workflow action records.
func NewWorkflowActionRepository ¶
func NewWorkflowActionRepository(db *sql.DB) *WorkflowActionRepository
func (*WorkflowActionRepository) FindAllByWorkflowID ¶
func (r *WorkflowActionRepository) FindAllByWorkflowID(workflowID int64) (*[]domain.WorkflowAction, error)
FindAllByWorkflowID returns all actions for a specific workflow ordered by date_time ascending.
func (*WorkflowActionRepository) FindByID ¶
func (r *WorkflowActionRepository) FindByID(id int64) (*domain.WorkflowAction, error)
FindByID fetches a single workflow action by its ID.
func (*WorkflowActionRepository) Save ¶
func (r *WorkflowActionRepository) Save(a *domain.WorkflowAction) (int64, error)
Save inserts a new workflow action and returns its ID. It expects the following table schema (PostgreSQL):
workflow_actions(id BIGSERIAL PK, workflow_id BIGINT, executor_id BIGINT, execution_count INT, retry_count INT, type TEXT, name TEXT, text TEXT, date_time TIMESTAMP)
type WorkflowDefinitionRepository ¶
type WorkflowDefinitionRepository struct {
// contains filtered or unexported fields
}
func NewWorkflowDefinitionRepository ¶
func NewWorkflowDefinitionRepository(db *sql.DB) *WorkflowDefinitionRepository
func (*WorkflowDefinitionRepository) FindAll ¶
func (r *WorkflowDefinitionRepository) FindAll() (*[]domain.WorkflowDefinition, error)
FindAll returns all workflow definitions.
func (*WorkflowDefinitionRepository) FindByName ¶
func (r *WorkflowDefinitionRepository) FindByName(name string) (*domain.WorkflowDefinition, error)
FindByName fetches a workflow definition by its unique name.
func (*WorkflowDefinitionRepository) Save ¶
func (r *WorkflowDefinitionRepository) Save(def *domain.WorkflowDefinition) error
Save inserts a new workflow definition or updates an existing one by name. Returns nil on success or an error.
type WorkflowOverviewRow ¶
type WorkflowOverviewRow struct { ExecutorGroup string WorkflowType string NewCount int ScheduledCount int ExecutingCount int FinishedCount int InProgressCount int }
WorkflowOverviewRow holds grouped counts by executor_group and workflow_type
type WorkflowRepository ¶
type WorkflowRepository struct {
// contains filtered or unexported fields
}
func NewWorkflowRepository ¶
func NewWorkflowRepository(db *sql.DB) *WorkflowRepository
func (*WorkflowRepository) ClearExecutorId ¶
func (r *WorkflowRepository) ClearExecutorId(id int64) error
func (*WorkflowRepository) FindByExternalId ¶
func (r *WorkflowRepository) FindByExternalId(id string) (*domain.Workflow, error)
func (*WorkflowRepository) FindByID ¶
func (r *WorkflowRepository) FindByID(id int64) (*domain.Workflow, error)
func (*WorkflowRepository) FindPendingWorkflows ¶
func (*WorkflowRepository) FindStuckWorkflows ¶
func (*WorkflowRepository) GetDefinitionStateOverview ¶
func (r *WorkflowRepository) GetDefinitionStateOverview(workflowType string) ([]DefinitionStateRow, error)
GetDefinitionStateOverview returns counts by state for a given workflow type
func (*WorkflowRepository) GetNextToExecute ¶
func (r *WorkflowRepository) GetNextToExecute(limit int) (*[]domain.Workflow, error)
GetNextToExecute returns upcoming workflows with status NEW or IN_PROGRESS ordered by next_activation asc
func (*WorkflowRepository) GetTopExecuting ¶
func (r *WorkflowRepository) GetTopExecuting(limit int) (*[]domain.Workflow, error)
GetTopExecuting returns workflows currently executing ordered by modified desc
func (*WorkflowRepository) GetWorkflowOverview ¶
func (r *WorkflowRepository) GetWorkflowOverview() ([]WorkflowOverviewRow, error)
GetWorkflowOverview returns aggregated counts grouped by executor_group and workflow_type
func (*WorkflowRepository) IncrementRetryCounterAndSetNextActivation ¶
func (r *WorkflowRepository) IncrementRetryCounterAndSetNextActivation(id int64, activation time.Time) error
func (*WorkflowRepository) LockWorkflowByModified ¶ added in v1.0.13
func (r *WorkflowRepository) LockWorkflowByModified(id int64, modified time.Time) bool
func (*WorkflowRepository) MarkWorkflowAsScheduledForExecution ¶
func (*WorkflowRepository) Save ¶
func (r *WorkflowRepository) Save(wf *domain.Workflow) (int64, error)
func (*WorkflowRepository) SaveWorkflowVariables ¶
func (r *WorkflowRepository) SaveWorkflowVariables(id int64, vars string) error
func (*WorkflowRepository) SaveWorkflowVariablesAndTouch ¶
func (r *WorkflowRepository) SaveWorkflowVariablesAndTouch(id int64, vars string) error
SaveWorkflowVariablesAndTouch updates state_vars and touches modified timestamp.
func (*WorkflowRepository) SearchWorkflows ¶
func (r *WorkflowRepository) SearchWorkflows(req models.SearchWorkflowRequest) (*[]domain.Workflow, error)
func (*WorkflowRepository) UpdateNextActivationOffset ¶
func (r *WorkflowRepository) UpdateNextActivationOffset(id int64, offset string) error
func (*WorkflowRepository) UpdateNextActivationSpecific ¶
func (r *WorkflowRepository) UpdateNextActivationSpecific(id int64, next time.Time) error
func (*WorkflowRepository) UpdateState ¶
func (r *WorkflowRepository) UpdateState(id int64, state string) error
func (*WorkflowRepository) UpdateWorkflowStartingTime ¶
func (r *WorkflowRepository) UpdateWorkflowStartingTime(id int64) error
func (*WorkflowRepository) UpdateWorkflowStatus ¶
func (r *WorkflowRepository) UpdateWorkflowStatus(id int64, status string) error