repository

package
v1.1.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 2, 2025 License: Apache-2.0 Imports: 11 Imported by: 0

Documentation

Index

Constants

View Source
const ALL_COLUMNS = `` /* 195-byte string literal not displayed */

Variables

This section is empty.

Functions

func ParsePostgresInterval

func ParsePostgresInterval(interval string) (time.Duration, error)

ParsePostgresInterval converts a PostgreSQL interval string to time.Duration

Types

type DefinitionStateRow

type DefinitionStateRow struct {
	State           string
	NewCount        int
	ScheduledCount  int
	ExecutingCount  int
	InProgressCount int
	FinishedCount   int
}

DefinitionStateRow holds counts by state for a workflow type

type ExecutorRepository

type ExecutorRepository struct {
	// contains filtered or unexported fields
}

ExecutorRepository provides persistence for executors table.

func NewExecutorRepository

func NewExecutorRepository(db *sql.DB) *ExecutorRepository

func (*ExecutorRepository) GetExecutorsByLastActive

func (r *ExecutorRepository) GetExecutorsByLastActive(limit int) ([]*domain.Executor, error)

func (*ExecutorRepository) Save

func (r *ExecutorRepository) Save(e *domain.Executor) (int64, error)

func (*ExecutorRepository) UpdateLastActive

func (r *ExecutorRepository) UpdateLastActive(id int64, ts time.Time) error

UpdateLastActive sets last_active for the executor id to the provided timestamp.

type UserRepository

type UserRepository struct {
	// contains filtered or unexported fields
}

UserRepository provides persistence methods for the users table.

func NewUserRepository

func NewUserRepository(db *sql.DB) *UserRepository

func (*UserRepository) ClearSessionBySessionID

func (r *UserRepository) ClearSessionBySessionID(sessionID string) error

ClearSessionBySessionID nulls session_id and sessionExpiry for the user with the given current session_id.

func (*UserRepository) FindAll

func (r *UserRepository) FindAll() (*[]domain.User, error)

FindAll returns all users ordered by id ascending.

func (*UserRepository) FindByApiKey

func (r *UserRepository) FindByApiKey(apiKey string) (*domain.User, error)

FindByApiKey fetches a user by api_key (exact match). Returns (nil, nil) if not found.

func (*UserRepository) FindBySessionID

func (r *UserRepository) FindBySessionID(sessionID string, now time.Time) (*domain.User, error)

FindBySessionID fetches a user by session_id and ensures sessionExpiry is in the future.

func (*UserRepository) FindByUsername

func (r *UserRepository) FindByUsername(username string) (*domain.User, error)

FindByUsername fetches a user by exact username. Returns (nil, nil) if not found.

func (*UserRepository) Save

func (r *UserRepository) Save(u *domain.User) (int64, error)

Save inserts a new user and returns its generated id. It will set Created to now if it's not provided (null or zero).

func (*UserRepository) UpdateSession

func (r *UserRepository) UpdateSession(userID int64, sessionID string, expiry time.Time) error

UpdateSession sets session_id and sessionExpiry for a user by id.

type WorkflowActionRepository

type WorkflowActionRepository struct {
	// contains filtered or unexported fields
}

WorkflowActionRepository provides methods to persist and query workflow action records.

func NewWorkflowActionRepository

func NewWorkflowActionRepository(db *sql.DB) *WorkflowActionRepository

func (*WorkflowActionRepository) FindAllByWorkflowID

func (r *WorkflowActionRepository) FindAllByWorkflowID(workflowID int64) (*[]domain.WorkflowAction, error)

FindAllByWorkflowID returns all actions for a specific workflow ordered by date_time ascending.

func (*WorkflowActionRepository) FindByID

FindByID fetches a single workflow action by its ID.

func (*WorkflowActionRepository) Save

Save inserts a new workflow action and returns its ID. It expects the following table schema (PostgreSQL):

workflow_actions(id BIGSERIAL PK, workflow_id BIGINT, executor_id BIGINT, execution_count INT,
                retry_count INT, type TEXT, name TEXT, text TEXT, date_time TIMESTAMP)

type WorkflowDefinitionRepository

type WorkflowDefinitionRepository struct {
	// contains filtered or unexported fields
}

func NewWorkflowDefinitionRepository

func NewWorkflowDefinitionRepository(db *sql.DB) *WorkflowDefinitionRepository

func (*WorkflowDefinitionRepository) FindAll

FindAll returns all workflow definitions.

func (*WorkflowDefinitionRepository) FindByName

FindByName fetches a workflow definition by its unique name.

func (*WorkflowDefinitionRepository) Save

Save inserts a new workflow definition or updates an existing one by name. Returns nil on success or an error.

type WorkflowOverviewRow

type WorkflowOverviewRow struct {
	ExecutorGroup   string
	WorkflowType    string
	NewCount        int
	ScheduledCount  int
	ExecutingCount  int
	FinishedCount   int
	InProgressCount int
}

WorkflowOverviewRow holds grouped counts by executor_group and workflow_type

type WorkflowRepository

type WorkflowRepository struct {
	// contains filtered or unexported fields
}

func NewWorkflowRepository

func NewWorkflowRepository(db *sql.DB) *WorkflowRepository

func (*WorkflowRepository) ClearExecutorId

func (r *WorkflowRepository) ClearExecutorId(id int64) error

func (*WorkflowRepository) FindByExternalId

func (r *WorkflowRepository) FindByExternalId(id string) (*domain.Workflow, error)

func (*WorkflowRepository) FindByID

func (r *WorkflowRepository) FindByID(id int64) (*domain.Workflow, error)

func (*WorkflowRepository) FindPendingWorkflows

func (r *WorkflowRepository) FindPendingWorkflows(size int, executorGroup string) (*[]domain.Workflow, error)

func (*WorkflowRepository) FindStuckWorkflows

func (r *WorkflowRepository) FindStuckWorkflows(minutesRepair string, executorGroup string, limit int) (*[]domain.Workflow, error)

func (*WorkflowRepository) GetDefinitionStateOverview

func (r *WorkflowRepository) GetDefinitionStateOverview(workflowType string) ([]DefinitionStateRow, error)

GetDefinitionStateOverview returns counts by state for a given workflow type

func (*WorkflowRepository) GetNextToExecute

func (r *WorkflowRepository) GetNextToExecute(limit int) (*[]domain.Workflow, error)

GetNextToExecute returns upcoming workflows with status NEW or IN_PROGRESS ordered by next_activation asc

func (*WorkflowRepository) GetTopExecuting

func (r *WorkflowRepository) GetTopExecuting(limit int) (*[]domain.Workflow, error)

GetTopExecuting returns workflows currently executing ordered by modified desc

func (*WorkflowRepository) GetWorkflowOverview

func (r *WorkflowRepository) GetWorkflowOverview() ([]WorkflowOverviewRow, error)

GetWorkflowOverview returns aggregated counts grouped by executor_group and workflow_type

func (*WorkflowRepository) IncrementRetryCounterAndSetNextActivation

func (r *WorkflowRepository) IncrementRetryCounterAndSetNextActivation(id int64, activation time.Time) error

func (*WorkflowRepository) LockWorkflowByModified added in v1.0.13

func (r *WorkflowRepository) LockWorkflowByModified(id int64, modified time.Time) bool

func (*WorkflowRepository) MarkWorkflowAsScheduledForExecution

func (r *WorkflowRepository) MarkWorkflowAsScheduledForExecution(id int64, executorId int64, modified time.Time) bool

func (*WorkflowRepository) Save

func (r *WorkflowRepository) Save(wf *domain.Workflow) (int64, error)

func (*WorkflowRepository) SaveWorkflowVariables

func (r *WorkflowRepository) SaveWorkflowVariables(id int64, vars string) error

func (*WorkflowRepository) SaveWorkflowVariablesAndTouch

func (r *WorkflowRepository) SaveWorkflowVariablesAndTouch(id int64, vars string) error

SaveWorkflowVariablesAndTouch updates state_vars and touches modified timestamp.

func (*WorkflowRepository) SearchWorkflows

func (r *WorkflowRepository) SearchWorkflows(req models.SearchWorkflowRequest) (*[]domain.Workflow, error)

func (*WorkflowRepository) UpdateNextActivationOffset

func (r *WorkflowRepository) UpdateNextActivationOffset(id int64, offset string) error

func (*WorkflowRepository) UpdateNextActivationSpecific

func (r *WorkflowRepository) UpdateNextActivationSpecific(id int64, next time.Time) error

func (*WorkflowRepository) UpdateState

func (r *WorkflowRepository) UpdateState(id int64, state string) error

func (*WorkflowRepository) UpdateWorkflowStartingTime

func (r *WorkflowRepository) UpdateWorkflowStartingTime(id int64) error

func (*WorkflowRepository) UpdateWorkflowStatus

func (r *WorkflowRepository) UpdateWorkflowStatus(id int64, status string) error

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL