scheduler

package
v0.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 20, 2025 License: Apache-2.0 Imports: 32 Imported by: 0

Documentation

Index

Constants

View Source
const (
	TaskTicker     = 2 * time.Second
	PeriodicTicker = 1 * time.Minute
)
View Source
const (
	TaskStatusPending   = "pending"
	TaskStatusRunning   = "running"
	TaskStatusCompleted = "completed"
	TaskStatusFailed    = "failed"
	TaskStatusCancelled = "cancelled"
)
View Source
const NodeConnectionTimeout = 5 * time.Second

Variables

View Source
var (
	ErrPendingApproval    = errors.New("pending approval")
	ErrExecutionCancelled = errors.New("execution cancelled")
)

Functions

This section is empty.

Types

type Action

type Action struct {
	ID        string         `yaml:"id" validate:"required,alphanum_underscore"`
	Name      string         `yaml:"name" validate:"required"`
	Executor  string         `yaml:"executor" validate:"required,oneof=script docker"`
	With      map[string]any `yaml:"with" validate:"required"`
	Approval  bool           `yaml:"approval"`
	Variables []Variable     `yaml:"variables"`
	On        []Node         `yaml:"on"`
}

type AuthMethod

type AuthMethod string
const (
	AuthMethodPrivateKey AuthMethod = "private_key"
	AuthMethodPassword   AuthMethod = "password"
)

type ExecResults

type ExecResults struct {
	// contains filtered or unexported fields
}

type Flow

type Flow struct {
	Meta      Metadata     `yaml:"metadata" validate:"required"`
	Inputs    []Input      `yaml:"inputs" validate:"required"`
	Actions   []Action     `yaml:"actions" validate:"required"`
	Outputs   []Output     `yaml:"outputs"`
	Schedules []Scheduling `yaml:"scheduling"`
	Notify    []Notify     `yaml:"notify"`
}

type FlowExecutionHandler added in v0.2.0

type FlowExecutionHandler struct {
	// contains filtered or unexported fields
}

FlowExecutionHandler handles flow execution jobs

func NewFlowExecutionHandler added in v0.2.0

func NewFlowExecutionHandler(cfg FlowHandlerConfig) *FlowExecutionHandler

NewFlowExecutionHandler creates a new flow execution handler

func (*FlowExecutionHandler) Handle added in v0.2.0

func (h *FlowExecutionHandler) Handle(ctx context.Context, job Job) error

Handle processes a flow execution job

func (*FlowExecutionHandler) SetSecretsProvider added in v0.2.0

func (h *FlowExecutionHandler) SetSecretsProvider(sp SecretsProviderFn)

SetSecretsProvider allows updating secrets provider after creation

func (*FlowExecutionHandler) SetTaskQueuer added in v0.2.0

func (h *FlowExecutionHandler) SetTaskQueuer(tq TaskQueuer)

SetTaskQueuer allows setting the task queuer after creation

func (*FlowExecutionHandler) Type added in v0.2.0

Type returns the payload type this handler processes

type FlowExecutionPayload

type FlowExecutionPayload struct {
	Workflow          Flow
	Input             map[string]any
	StartingActionIdx int
	NamespaceID       string
	TriggerType       TriggerType
	UserUUID          string
	FlowDirectory     string
}

type FlowHandlerConfig added in v0.2.0

type FlowHandlerConfig struct {
	Store                repo.Store
	SecretsProvider      SecretsProviderFn
	LogManager           streamlogger.LogManager
	Logger               *slog.Logger
	Metrics              *metrics.Manager
	FlowExecutionTimeout time.Duration
}

FlowHandlerConfig holds configuration for FlowExecutionHandler

type FlowLoaderFn

type FlowLoaderFn func(ctx context.Context, flowSlug string, namespaceUUID string) (Flow, error)

type Handler added in v0.2.0

type Handler interface {
	// Type returns the payload type this handler processes
	Type() PayloadType
	// Handle processes a job
	Handle(ctx context.Context, job Job) error
}

Handler processes jobs of a specific payload type

type HookFn

type HookFn func(ctx context.Context, execID string, action Action, namespaceID string) error

Hook function types for flow execution

type Input

type Input struct {
	Name        string    `yaml:"name" json:"name" validate:"required,alphanum_underscore"`
	Type        InputType `yaml:"type" json:"type" validate:"required,oneof=string int float bool slice_string slice_int slice_uint slice_float"`
	Label       string    `yaml:"label" json:"label"`
	Description string    `yaml:"description" json:"description"`
	Validation  string    `yaml:"validation" json:"validation"`
	Required    bool      `yaml:"required" json:"required"`
	Default     string    `yaml:"default" json:"default"`
}

type InputType

type InputType string
const (
	INPUT_TYPE_STRING       InputType = "string"
	INPUT_TYPE_INT          InputType = "int"
	INPUT_TYPE_FLOAT        InputType = "float"
	INPUT_TYPE_BOOL         InputType = "bool"
	INPUT_TYPE_SLICE_STRING InputType = "slice_string"
	INPUT_TYPE_SLICE_INT    InputType = "slice_int"
	INPUT_TYPE_SLICE_UINT   InputType = "slice_uint"
	INPUT_TYPE_SLICE_FLOAT  InputType = "slice_float"
)

type Job added in v0.2.0

type Job struct {
	ID          int64
	ExecID      string
	PayloadType PayloadType
	Payload     []byte
	CreatedAt   time.Time
}

Job represents a job passed to handlers

type JobSyncerFn added in v0.2.0

type JobSyncerFn func(ctx context.Context) ([]ScheduledJob, error)

JobSyncerFn syncs scheduled jobs from a data source

type Metadata

type Metadata struct {
	ID          string `yaml:"id" validate:"required,alphanum_underscore"`
	DBID        int32  `yaml:"-"`
	Name        string `yaml:"name" validate:"required"`
	Description string `yaml:"description"`
	SrcDir      string `yaml:"-"`
	Namespace   string `yaml:"namespace"`
}

type Node

type Node struct {
	ID             string
	Name           string
	Hostname       string
	Port           int
	Username       string
	OSFamily       string
	ConnectionType string
	Tags           []string
	Auth           NodeAuth
}

func (*Node) CheckConnectivity

func (n *Node) CheckConnectivity() error

CheckConnectivity can be used to check if a remote node is accessible at the given IP:Port The default connection timeout is 5 seconds Non-nil error is returned if the node is not accessible

type NodeAuth

type NodeAuth struct {
	CredentialID string
	Method       AuthMethod
	Key          string
}

type NotificationHandler added in v0.2.0

type NotificationHandler struct {
	// contains filtered or unexported fields
}

NotificationHandler processes notification jobs

func NewNotificationHandler added in v0.2.0

func NewNotificationHandler(messengers map[string]messengers.Messenger, store repo.Store, logger *slog.Logger, rootURL string) (*NotificationHandler, error)

func (*NotificationHandler) Handle added in v0.2.0

func (h *NotificationHandler) Handle(ctx context.Context, job Job) error

func (*NotificationHandler) Type added in v0.2.0

func (h *NotificationHandler) Type() PayloadType

type NotificationPayload added in v0.2.0

type NotificationPayload struct {
	FlowID      string   `json:"flow_id"`
	FlowName    string   `json:"flow_name"`
	ExecID      string   `json:"exec_id"`
	Status      string   `json:"status"`
	Error       string   `json:"error,omitempty"`
	Receivers   []string `json:"receivers"`
	NamespaceID string   `json:"namespace_id"`
	Channel     string   `json:"channel"`
}

type Notify added in v0.2.0

type Notify struct {
	Channel   string        `yaml:"channel" json:"channel"`
	Receivers []string      `yaml:"receivers" json:"receivers"`
	Events    []NotifyEvent `yaml:"events" json:"events"`
}

type NotifyEvent added in v0.2.0

type NotifyEvent string
const (
	NotifyEventOnSuccess   NotifyEvent = "on_success"
	NotifyEventOnFailure   NotifyEvent = "on_failure"
	NotifyEventOnWaiting   NotifyEvent = "on_waiting"
	NotifyEventOnCancelled NotifyEvent = "on_cancelled"
)

type Output

type Output map[string]any

type PayloadType added in v0.2.0

type PayloadType string

PayloadType identifies different types of jobs in the queue

const PayloadTypeFlowExecution PayloadType = "flow_execution"
const PayloadTypeNotification PayloadType = "notification"

type QueueConfig added in v0.2.0

type QueueConfig struct {
	Queues []QueueWeight
}

QueueConfig holds the weighted queue configuration

func (QueueConfig) GetWorkerCount added in v0.2.0

func (c QueueConfig) GetWorkerCount(pt PayloadType, totalWorkers int) int

GetWorkerCount calculates the number of goroutines for a payload type

func (QueueConfig) Validate added in v0.2.0

func (c QueueConfig) Validate() error

Validate ensures queue weights sum to 100

type QueueWeight added in v0.2.0

type QueueWeight struct {
	PayloadType PayloadType
	Weight      int // 1-100, all weights must sum to 100
}

QueueWeight defines weight for a payload type

type ScheduledJob added in v0.2.0

type ScheduledJob struct {
	ID          string
	Name        string
	Cron        string
	Timezone    string
	PayloadType PayloadType
	Payload     any
}

ScheduledJob represents a job that can be scheduled via cron

type Scheduler

type Scheduler struct {
	// contains filtered or unexported fields
}

Scheduler implements TaskScheduler

func (*Scheduler) CancelTask

func (s *Scheduler) CancelTask(ctx context.Context, execID string) error

CancelTask cancels a running or pending execution

func (*Scheduler) QueueTask

func (s *Scheduler) QueueTask(ctx context.Context, payloadType PayloadType, execID string, payload any) (string, error)

QueueTask queues a task for execution with specified payload type

func (*Scheduler) SetHandler added in v0.2.0

func (s *Scheduler) SetHandler(h Handler) error

SetHandler registers a handler for a payload type

func (*Scheduler) SetJobSyncer added in v0.2.0

func (s *Scheduler) SetJobSyncer(syncer JobSyncerFn)

SetJobSyncer sets the job syncer for cron-based scheduling

func (*Scheduler) SetQueueConfig added in v0.2.0

func (s *Scheduler) SetQueueConfig(cfg QueueConfig) error

SetQueueConfig sets the queue configuration

func (*Scheduler) Start

func (s *Scheduler) Start(ctx context.Context) error

Start begins the scheduler's task processing loops

func (*Scheduler) Stop

func (s *Scheduler) Stop(ctx context.Context) error

Stop shuts down the scheduler

type SchedulerBuilder

type SchedulerBuilder struct {
	// contains filtered or unexported fields
}

SchedulerBuilder provides an interface for building schedulers

func NewSchedulerBuilder

func NewSchedulerBuilder(logger *slog.Logger) *SchedulerBuilder

NewSchedulerBuilder creates a new scheduler builder

func (*SchedulerBuilder) Build

func (b *SchedulerBuilder) Build() (*Scheduler, error)

Build creates the scheduler instance

func (*SchedulerBuilder) WithCronSyncInterval

func (b *SchedulerBuilder) WithCronSyncInterval(s time.Duration) *SchedulerBuilder

WithCronSyncInterval sets the cron sync interval

func (*SchedulerBuilder) WithHandler added in v0.2.0

func (b *SchedulerBuilder) WithHandler(h Handler) *SchedulerBuilder

WithHandler adds a task handler

func (*SchedulerBuilder) WithJobStore

func (b *SchedulerBuilder) WithJobStore(jobStore storage.Storage) *SchedulerBuilder

WithJobStore sets the job store

func (*SchedulerBuilder) WithQueueConfig added in v0.2.0

func (b *SchedulerBuilder) WithQueueConfig(cfg QueueConfig) *SchedulerBuilder

WithQueueConfig sets the queue configuration

func (*SchedulerBuilder) WithWorkerCount

func (b *SchedulerBuilder) WithWorkerCount(c int) *SchedulerBuilder

WithWorkerCount sets the worker count

type Scheduling

type Scheduling struct {
	Cron     string `yaml:"cron" json:"cron"`
	Timezone string `yaml:"timezone" json:"timezone"`
}

type SecretsProviderFn

type SecretsProviderFn func(ctx context.Context, flowID string, namespaceID string) (map[string]string, error)

type Task

type Task struct {
	UUID      string
	ExecID    string
	Payload   []byte
	Status    string
	CreatedAt time.Time
}

type TaskQueuer added in v0.2.0

type TaskQueuer interface {
	QueueTask(ctx context.Context, payloadType PayloadType, execID string, payload any) (string, error)
}

TaskQueuer allows handlers to enqueue new tasks

type TaskScheduler

type TaskScheduler interface {
	QueueTask(ctx context.Context, payloadType PayloadType, execID string, payload any) (string, error)
	CancelTask(ctx context.Context, execID string) error
	Start(ctx context.Context) error
	Stop(ctx context.Context) error
}

type TriggerType

type TriggerType string
const (
	TriggerTypeManual    TriggerType = "manual"
	TriggerTypeScheduled TriggerType = "scheduled"
)

type Variable

type Variable map[string]any

func (Variable) Name

func (v Variable) Name() string

func (Variable) Valid

func (v Variable) Valid() bool

func (Variable) Value

func (v Variable) Value() string

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL