Documentation
¶
Index ¶
- Constants
- Variables
- type Action
- type AuthMethod
- type ExecResults
- type Flow
- type FlowExecutionHandler
- type FlowExecutionPayload
- type FlowHandlerConfig
- type FlowLoaderFn
- type Handler
- type HookFn
- type Input
- type InputType
- type Job
- type JobSyncerFn
- type Metadata
- type Node
- type NodeAuth
- type NotificationHandler
- type NotificationPayload
- type Notify
- type NotifyEvent
- type Output
- type PayloadType
- type QueueConfig
- type QueueWeight
- type ScheduledJob
- type Scheduler
- func (s *Scheduler) CancelTask(ctx context.Context, execID string) error
- func (s *Scheduler) QueueTask(ctx context.Context, payloadType PayloadType, execID string, payload any) (string, error)
- func (s *Scheduler) SetHandler(h Handler) error
- func (s *Scheduler) SetJobSyncer(syncer JobSyncerFn)
- func (s *Scheduler) SetQueueConfig(cfg QueueConfig) error
- func (s *Scheduler) Start(ctx context.Context) error
- func (s *Scheduler) Stop(ctx context.Context) error
- type SchedulerBuilder
- func (b *SchedulerBuilder) Build() (*Scheduler, error)
- func (b *SchedulerBuilder) WithCronSyncInterval(s time.Duration) *SchedulerBuilder
- func (b *SchedulerBuilder) WithHandler(h Handler) *SchedulerBuilder
- func (b *SchedulerBuilder) WithJobStore(jobStore storage.Storage) *SchedulerBuilder
- func (b *SchedulerBuilder) WithQueueConfig(cfg QueueConfig) *SchedulerBuilder
- func (b *SchedulerBuilder) WithWorkerCount(c int) *SchedulerBuilder
- type Scheduling
- type SecretsProviderFn
- type Task
- type TaskQueuer
- type TaskScheduler
- type TriggerType
- type Variable
Constants ¶
const ( TaskTicker = 2 * time.Second PeriodicTicker = 1 * time.Minute )
const ( TaskStatusPending = "pending" TaskStatusRunning = "running" TaskStatusCompleted = "completed" TaskStatusFailed = "failed" TaskStatusCancelled = "cancelled" )
const NodeConnectionTimeout = 5 * time.Second
Variables ¶
var ( ErrPendingApproval = errors.New("pending approval") ErrExecutionCancelled = errors.New("execution cancelled") )
Functions ¶
This section is empty.
Types ¶
type Action ¶
type Action struct {
ID string `yaml:"id" validate:"required,alphanum_underscore"`
Name string `yaml:"name" validate:"required"`
Executor string `yaml:"executor" validate:"required,oneof=script docker"`
With map[string]any `yaml:"with" validate:"required"`
Approval bool `yaml:"approval"`
Variables []Variable `yaml:"variables"`
On []Node `yaml:"on"`
}
type AuthMethod ¶
type AuthMethod string
const ( AuthMethodPrivateKey AuthMethod = "private_key" AuthMethodPassword AuthMethod = "password" )
type ExecResults ¶
type ExecResults struct {
// contains filtered or unexported fields
}
type FlowExecutionHandler ¶ added in v0.2.0
type FlowExecutionHandler struct {
// contains filtered or unexported fields
}
FlowExecutionHandler handles flow execution jobs
func NewFlowExecutionHandler ¶ added in v0.2.0
func NewFlowExecutionHandler(cfg FlowHandlerConfig) *FlowExecutionHandler
NewFlowExecutionHandler creates a new flow execution handler
func (*FlowExecutionHandler) Handle ¶ added in v0.2.0
func (h *FlowExecutionHandler) Handle(ctx context.Context, job Job) error
Handle processes a flow execution job
func (*FlowExecutionHandler) SetSecretsProvider ¶ added in v0.2.0
func (h *FlowExecutionHandler) SetSecretsProvider(sp SecretsProviderFn)
SetSecretsProvider allows updating secrets provider after creation
func (*FlowExecutionHandler) SetTaskQueuer ¶ added in v0.2.0
func (h *FlowExecutionHandler) SetTaskQueuer(tq TaskQueuer)
SetTaskQueuer allows setting the task queuer after creation
func (*FlowExecutionHandler) Type ¶ added in v0.2.0
func (h *FlowExecutionHandler) Type() PayloadType
Type returns the payload type this handler processes
type FlowExecutionPayload ¶
type FlowHandlerConfig ¶ added in v0.2.0
type FlowHandlerConfig struct {
Store repo.Store
SecretsProvider SecretsProviderFn
LogManager streamlogger.LogManager
Logger *slog.Logger
Metrics *metrics.Manager
FlowExecutionTimeout time.Duration
}
FlowHandlerConfig holds configuration for FlowExecutionHandler
type FlowLoaderFn ¶
type Handler ¶ added in v0.2.0
type Handler interface {
// Type returns the payload type this handler processes
Type() PayloadType
// Handle processes a job
Handle(ctx context.Context, job Job) error
}
Handler processes jobs of a specific payload type
type Input ¶
type Input struct {
Name string `yaml:"name" json:"name" validate:"required,alphanum_underscore"`
Type InputType `yaml:"type" json:"type" validate:"required,oneof=string int float bool slice_string slice_int slice_uint slice_float"`
Label string `yaml:"label" json:"label"`
Description string `yaml:"description" json:"description"`
Validation string `yaml:"validation" json:"validation"`
Required bool `yaml:"required" json:"required"`
Default string `yaml:"default" json:"default"`
}
type InputType ¶
type InputType string
const ( INPUT_TYPE_STRING InputType = "string" INPUT_TYPE_INT InputType = "int" INPUT_TYPE_FLOAT InputType = "float" INPUT_TYPE_BOOL InputType = "bool" INPUT_TYPE_SLICE_STRING InputType = "slice_string" INPUT_TYPE_SLICE_INT InputType = "slice_int" INPUT_TYPE_SLICE_UINT InputType = "slice_uint" INPUT_TYPE_SLICE_FLOAT InputType = "slice_float" )
type Job ¶ added in v0.2.0
type Job struct {
ID int64
ExecID string
PayloadType PayloadType
Payload []byte
CreatedAt time.Time
}
Job represents a job passed to handlers
type JobSyncerFn ¶ added in v0.2.0
type JobSyncerFn func(ctx context.Context) ([]ScheduledJob, error)
JobSyncerFn syncs scheduled jobs from a data source
type Node ¶
type Node struct {
ID string
Name string
Hostname string
Port int
Username string
OSFamily string
ConnectionType string
Tags []string
Auth NodeAuth
}
func (*Node) CheckConnectivity ¶
CheckConnectivity can be used to check if a remote node is accessible at the given IP:Port The default connection timeout is 5 seconds Non-nil error is returned if the node is not accessible
type NodeAuth ¶
type NodeAuth struct {
CredentialID string
Method AuthMethod
Key string
}
type NotificationHandler ¶ added in v0.2.0
type NotificationHandler struct {
// contains filtered or unexported fields
}
NotificationHandler processes notification jobs
func NewNotificationHandler ¶ added in v0.2.0
func NewNotificationHandler(messengers map[string]messengers.Messenger, store repo.Store, logger *slog.Logger, rootURL string) (*NotificationHandler, error)
func (*NotificationHandler) Handle ¶ added in v0.2.0
func (h *NotificationHandler) Handle(ctx context.Context, job Job) error
func (*NotificationHandler) Type ¶ added in v0.2.0
func (h *NotificationHandler) Type() PayloadType
type NotificationPayload ¶ added in v0.2.0
type NotificationPayload struct {
FlowID string `json:"flow_id"`
FlowName string `json:"flow_name"`
ExecID string `json:"exec_id"`
Status string `json:"status"`
Error string `json:"error,omitempty"`
Receivers []string `json:"receivers"`
NamespaceID string `json:"namespace_id"`
Channel string `json:"channel"`
}
type Notify ¶ added in v0.2.0
type Notify struct {
Channel string `yaml:"channel" json:"channel"`
Receivers []string `yaml:"receivers" json:"receivers"`
Events []NotifyEvent `yaml:"events" json:"events"`
}
type NotifyEvent ¶ added in v0.2.0
type NotifyEvent string
const ( NotifyEventOnSuccess NotifyEvent = "on_success" NotifyEventOnFailure NotifyEvent = "on_failure" NotifyEventOnWaiting NotifyEvent = "on_waiting" NotifyEventOnCancelled NotifyEvent = "on_cancelled" )
type PayloadType ¶ added in v0.2.0
type PayloadType string
PayloadType identifies different types of jobs in the queue
const PayloadTypeFlowExecution PayloadType = "flow_execution"
const PayloadTypeNotification PayloadType = "notification"
type QueueConfig ¶ added in v0.2.0
type QueueConfig struct {
Queues []QueueWeight
}
QueueConfig holds the weighted queue configuration
func (QueueConfig) GetWorkerCount ¶ added in v0.2.0
func (c QueueConfig) GetWorkerCount(pt PayloadType, totalWorkers int) int
GetWorkerCount calculates the number of goroutines for a payload type
func (QueueConfig) Validate ¶ added in v0.2.0
func (c QueueConfig) Validate() error
Validate ensures queue weights sum to 100
type QueueWeight ¶ added in v0.2.0
type QueueWeight struct {
PayloadType PayloadType
Weight int // 1-100, all weights must sum to 100
}
QueueWeight defines weight for a payload type
type ScheduledJob ¶ added in v0.2.0
type ScheduledJob struct {
ID string
Name string
Cron string
Timezone string
PayloadType PayloadType
Payload any
}
ScheduledJob represents a job that can be scheduled via cron
type Scheduler ¶
type Scheduler struct {
// contains filtered or unexported fields
}
Scheduler implements TaskScheduler
func (*Scheduler) CancelTask ¶
CancelTask cancels a running or pending execution
func (*Scheduler) QueueTask ¶
func (s *Scheduler) QueueTask(ctx context.Context, payloadType PayloadType, execID string, payload any) (string, error)
QueueTask queues a task for execution with specified payload type
func (*Scheduler) SetHandler ¶ added in v0.2.0
SetHandler registers a handler for a payload type
func (*Scheduler) SetJobSyncer ¶ added in v0.2.0
func (s *Scheduler) SetJobSyncer(syncer JobSyncerFn)
SetJobSyncer sets the job syncer for cron-based scheduling
func (*Scheduler) SetQueueConfig ¶ added in v0.2.0
func (s *Scheduler) SetQueueConfig(cfg QueueConfig) error
SetQueueConfig sets the queue configuration
type SchedulerBuilder ¶
type SchedulerBuilder struct {
// contains filtered or unexported fields
}
SchedulerBuilder provides an interface for building schedulers
func NewSchedulerBuilder ¶
func NewSchedulerBuilder(logger *slog.Logger) *SchedulerBuilder
NewSchedulerBuilder creates a new scheduler builder
func (*SchedulerBuilder) Build ¶
func (b *SchedulerBuilder) Build() (*Scheduler, error)
Build creates the scheduler instance
func (*SchedulerBuilder) WithCronSyncInterval ¶
func (b *SchedulerBuilder) WithCronSyncInterval(s time.Duration) *SchedulerBuilder
WithCronSyncInterval sets the cron sync interval
func (*SchedulerBuilder) WithHandler ¶ added in v0.2.0
func (b *SchedulerBuilder) WithHandler(h Handler) *SchedulerBuilder
WithHandler adds a task handler
func (*SchedulerBuilder) WithJobStore ¶
func (b *SchedulerBuilder) WithJobStore(jobStore storage.Storage) *SchedulerBuilder
WithJobStore sets the job store
func (*SchedulerBuilder) WithQueueConfig ¶ added in v0.2.0
func (b *SchedulerBuilder) WithQueueConfig(cfg QueueConfig) *SchedulerBuilder
WithQueueConfig sets the queue configuration
func (*SchedulerBuilder) WithWorkerCount ¶
func (b *SchedulerBuilder) WithWorkerCount(c int) *SchedulerBuilder
WithWorkerCount sets the worker count
type Scheduling ¶
type SecretsProviderFn ¶
type TaskQueuer ¶ added in v0.2.0
type TaskQueuer interface {
QueueTask(ctx context.Context, payloadType PayloadType, execID string, payload any) (string, error)
}
TaskQueuer allows handlers to enqueue new tasks
type TaskScheduler ¶
type TriggerType ¶
type TriggerType string
const ( TriggerTypeManual TriggerType = "manual" TriggerTypeScheduled TriggerType = "scheduled" )