scheduler

package
v0.1.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 8, 2025 License: Apache-2.0 Imports: 28 Imported by: 0

Documentation

Index

Constants

View Source
const (
	TaskStatusPending   = "pending"
	TaskStatusRunning   = "running"
	TaskStatusCompleted = "completed"
	TaskStatusFailed    = "failed"
	TaskStatusCancelled = "cancelled"
)

Variables

View Source
var (
	ErrExecutionCancelled = errors.New("execution cancelled")
)
View Source
var (
	ErrPendingApproval = errors.New("pending approval")
)

Functions

This section is empty.

Types

type Action

type Action struct {
	ID        string         `yaml:"id" validate:"required,alphanum_underscore"`
	Name      string         `yaml:"name" validate:"required"`
	Executor  string         `yaml:"executor" validate:"required,oneof=script docker"`
	With      map[string]any `yaml:"with" validate:"required"`
	Approval  bool           `yaml:"approval"`
	Variables []Variable     `yaml:"variables"`
	On        []Node         `yaml:"on"`
}

type AuthMethod

type AuthMethod string
const (
	AuthMethodPrivateKey AuthMethod = "private_key"
	AuthMethodPassword   AuthMethod = "password"
)

type ExecResults

type ExecResults struct {
	// contains filtered or unexported fields
}

type Flow

type Flow struct {
	Meta      Metadata     `yaml:"metadata" validate:"required"`
	Inputs    []Input      `yaml:"inputs" validate:"required"`
	Actions   []Action     `yaml:"actions" validate:"required"`
	Outputs   []Output     `yaml:"outputs"`
	Schedules []Scheduling `yaml:"scheduling"`
}

type FlowExecutionPayload

type FlowExecutionPayload struct {
	Workflow          Flow
	Input             map[string]any
	StartingActionIdx int
	ExecID            string
	NamespaceID       string
	TriggerType       TriggerType
	UserUUID          string
	FlowDirectory     string
}

type FlowLoaderFn

type FlowLoaderFn func(ctx context.Context, flowSlug string, namespaceUUID string) (Flow, error)

type HookFn

type HookFn func(ctx context.Context, execID string, action Action, namespaceID string) error

Hook function types for flow execution

type Input

type Input struct {
	Name        string    `yaml:"name" json:"name" validate:"required,alphanum_underscore"`
	Type        InputType `yaml:"type" json:"type" validate:"required,oneof=string int float bool slice_string slice_int slice_uint slice_float"`
	Label       string    `yaml:"label" json:"label"`
	Description string    `yaml:"description" json:"description"`
	Validation  string    `yaml:"validation" json:"validation"`
	Required    bool      `yaml:"required" json:"required"`
	Default     string    `yaml:"default" json:"default"`
}

type InputType

type InputType string
const (
	INPUT_TYPE_STRING       InputType = "string"
	INPUT_TYPE_INT          InputType = "int"
	INPUT_TYPE_FLOAT        InputType = "float"
	INPUT_TYPE_BOOL         InputType = "bool"
	INPUT_TYPE_SLICE_STRING InputType = "slice_string"
	INPUT_TYPE_SLICE_INT    InputType = "slice_int"
	INPUT_TYPE_SLICE_UINT   InputType = "slice_uint"
	INPUT_TYPE_SLICE_FLOAT  InputType = "slice_float"
)

type Metadata

type Metadata struct {
	ID          string `yaml:"id" validate:"required,alphanum_underscore"`
	DBID        int32  `yaml:"-"`
	Name        string `yaml:"name" validate:"required"`
	Description string `yaml:"description"`
	SrcDir      string `yaml:"-"`
	Namespace   string `yaml:"namespace"`
}

type Node

type Node struct {
	ID             string
	Name           string
	Hostname       string
	Port           int
	Username       string
	OSFamily       string
	ConnectionType string
	Tags           []string
	Auth           NodeAuth
}

func (*Node) CheckConnectivity

func (n *Node) CheckConnectivity() error

CheckConnectivity can be used to check if a remote node is accessible at the given IP:Port The default connection timeout is 5 seconds Non-nil error is returned if the node is not accessible

type NodeAuth

type NodeAuth struct {
	CredentialID string
	Method       AuthMethod
	Key          string
}

type Output

type Output map[string]any

type Scheduler

type Scheduler struct {
	// contains filtered or unexported fields
}

Scheduler implements TaskScheduler

func (*Scheduler) CancelTask

func (s *Scheduler) CancelTask(ctx context.Context, execID string) error

CancelTask cancels a running or pending execution

func (*Scheduler) QueueTask

func (s *Scheduler) QueueTask(ctx context.Context, payload FlowExecutionPayload) (string, error)

QueueTask queues an immediate task for execution

func (*Scheduler) SetFlowLoader

func (s *Scheduler) SetFlowLoader(fl FlowLoaderFn)

SetFlowLoader allows updating flow loader after build

func (*Scheduler) SetSecretsProvider

func (s *Scheduler) SetSecretsProvider(sp SecretsProviderFn)

SetSecretsProvider allows updating secrets provider after build

func (*Scheduler) Start

func (s *Scheduler) Start(ctx context.Context) error

Start begins the scheduler's task processing loops

func (*Scheduler) Stop

func (s *Scheduler) Stop(ctx context.Context) error

Stop shuts down the scheduler

type SchedulerBuilder

type SchedulerBuilder struct {
	// contains filtered or unexported fields
}

SchedulerBuilder provides a fluent interface for building schedulers

func NewSchedulerBuilder

func NewSchedulerBuilder(logger *slog.Logger) *SchedulerBuilder

NewSchedulerBuilder creates a new scheduler builder

func (*SchedulerBuilder) Build

func (b *SchedulerBuilder) Build() (*Scheduler, error)

Build creates the scheduler instance

func (*SchedulerBuilder) WithCronSyncInterval

func (b *SchedulerBuilder) WithCronSyncInterval(s time.Duration) *SchedulerBuilder

func (*SchedulerBuilder) WithFlowLoader

func (b *SchedulerBuilder) WithFlowLoader(fl FlowLoaderFn) *SchedulerBuilder

WithFlowLoader sets the flow loader

func (*SchedulerBuilder) WithJobStore

func (b *SchedulerBuilder) WithJobStore(jobStore storage.Storage) *SchedulerBuilder

WithJobStore sets the job store

func (*SchedulerBuilder) WithLogManager

WithLogManager sets the log manager

func (*SchedulerBuilder) WithMetrics

func (b *SchedulerBuilder) WithMetrics(m *metrics.Manager) *SchedulerBuilder

func (*SchedulerBuilder) WithSecretsProvider

func (b *SchedulerBuilder) WithSecretsProvider(sp SecretsProviderFn) *SchedulerBuilder

WithSecretsProvider sets the secrets provider

func (*SchedulerBuilder) WithStore

func (b *SchedulerBuilder) WithStore(store repo.Store) *SchedulerBuilder

WithStore sets the store

func (*SchedulerBuilder) WithWorkerCount

func (b *SchedulerBuilder) WithWorkerCount(c int) *SchedulerBuilder

type SchedulerDependencies

type SchedulerDependencies struct {
	OnBeforeAction  HookFn
	OnAfterAction   HookFn
	SecretsProvider SecretsProviderFn
	FlowLoader      FlowLoaderFn
	LogManager      interface{} // streamlogger.LogManager
}

SchedulerDependencies contains dependencies needed by the scheduler

type Scheduling

type Scheduling struct {
	Cron     string `yaml:"cron" json:"cron"`
	Timezone string `yaml:"timezone" json:"timezone"`
}

type SecretsProviderFn

type SecretsProviderFn func(ctx context.Context, flowID string, namespaceID string) (map[string]string, error)

type Task

type Task struct {
	UUID      string
	ExecID    string
	Payload   []byte
	Status    string
	CreatedAt time.Time
}

type TaskScheduler

type TaskScheduler interface {
	QueueTask(ctx context.Context, payload FlowExecutionPayload) (string, error)
	CancelTask(ctx context.Context, execID string) error
	Start(ctx context.Context) error
	Stop(ctx context.Context) error
}

type TriggerType

type TriggerType string
const (
	TriggerTypeManual    TriggerType = "manual"
	TriggerTypeScheduled TriggerType = "scheduled"
)

type Variable

type Variable map[string]any

func (Variable) Name

func (v Variable) Name() string

func (Variable) Valid

func (v Variable) Valid() bool

func (Variable) Value

func (v Variable) Value() string

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL