Documentation
¶
Index ¶
- Constants
- Variables
- type Action
- type AuthMethod
- type ExecResults
- type Flow
- type FlowExecutionHandler
- type FlowExecutionPayload
- type FlowHandlerConfig
- type FlowLoaderFn
- type Handler
- type HookFn
- type Input
- type InputType
- type Job
- type JobSyncerFn
- type Metadata
- type Node
- type NodeAuth
- type NotificationHandler
- type NotificationPayload
- type Notify
- type NotifyEvent
- type Output
- type PayloadType
- type QueueConfig
- type QueueWeight
- type RetryOptions
- type ScheduledJob
- type Scheduler
- func (s *Scheduler) CancelTask(ctx context.Context, execID string) error
- func (s *Scheduler) QueueScheduledTask(ctx context.Context, payloadType PayloadType, execID string, payload any, ...) (string, error)
- func (s *Scheduler) QueueScheduledTaskWithRetries(ctx context.Context, payloadType PayloadType, execID string, payload any, ...) (string, error)
- func (s *Scheduler) QueueTask(ctx context.Context, payloadType PayloadType, execID string, payload any) (string, error)
- func (s *Scheduler) QueueTaskWithRetries(ctx context.Context, payloadType PayloadType, execID string, payload any, ...) (string, error)
- func (s *Scheduler) SetHandler(h Handler) error
- func (s *Scheduler) SetJobSyncer(syncer JobSyncerFn)
- func (s *Scheduler) SetQueueConfig(cfg QueueConfig) error
- func (s *Scheduler) Start(ctx context.Context) error
- func (s *Scheduler) Stop(ctx context.Context) error
- type SchedulerBuilder
- func (b *SchedulerBuilder) Build() (*Scheduler, error)
- func (b *SchedulerBuilder) WithCronSyncInterval(s time.Duration) *SchedulerBuilder
- func (b *SchedulerBuilder) WithHandler(h Handler) *SchedulerBuilder
- func (b *SchedulerBuilder) WithJobStore(jobStore storage.Storage) *SchedulerBuilder
- func (b *SchedulerBuilder) WithQueueConfig(cfg QueueConfig) *SchedulerBuilder
- func (b *SchedulerBuilder) WithRetryOptions(opts RetryOptions) *SchedulerBuilder
- func (b *SchedulerBuilder) WithWorkerCount(c int) *SchedulerBuilder
- type Scheduling
- type SecretsProviderFn
- type Task
- type TaskQueuer
- type TaskScheduler
- type TriggerType
- type Variable
Constants ¶
const ( TaskTicker = 2 * time.Second PeriodicTicker = 1 * time.Minute )
const ( TaskStatusPending = "pending" TaskStatusRunning = "running" TaskStatusCompleted = "completed" TaskStatusFailed = "failed" TaskStatusCancelled = "cancelled" )
const NodeConnectionTimeout = 5 * time.Second
Variables ¶
var ( ErrPendingApproval = errors.New("pending approval") ErrExecutionCancelled = errors.New("execution cancelled") )
Functions ¶
This section is empty.
Types ¶
type Action ¶
type Action struct {
ID string `yaml:"id" validate:"required,alphanum_underscore"`
Name string `yaml:"name" validate:"required"`
Executor string `yaml:"executor"`
With map[string]any `yaml:"with" validate:"required"`
Approval bool `yaml:"approval"`
Variables []Variable `yaml:"variables"`
On []Node `yaml:"on"`
}
type AuthMethod ¶
type AuthMethod string
const ( AuthMethodPrivateKey AuthMethod = "private_key" AuthMethodPassword AuthMethod = "password" )
type ExecResults ¶
type ExecResults struct {
// contains filtered or unexported fields
}
type FlowExecutionHandler ¶ added in v0.2.0
type FlowExecutionHandler struct {
// contains filtered or unexported fields
}
FlowExecutionHandler handles flow execution jobs
func NewFlowExecutionHandler ¶ added in v0.2.0
func NewFlowExecutionHandler(cfg FlowHandlerConfig) *FlowExecutionHandler
NewFlowExecutionHandler creates a new flow execution handler
func (*FlowExecutionHandler) Handle ¶ added in v0.2.0
func (h *FlowExecutionHandler) Handle(ctx context.Context, job Job) error
Handle processes a flow execution job
func (*FlowExecutionHandler) SetSecretsProvider ¶ added in v0.2.0
func (h *FlowExecutionHandler) SetSecretsProvider(sp SecretsProviderFn)
SetSecretsProvider allows updating secrets provider after creation
func (*FlowExecutionHandler) SetTaskQueuer ¶ added in v0.2.0
func (h *FlowExecutionHandler) SetTaskQueuer(tq TaskQueuer)
SetTaskQueuer allows setting the task queuer after creation
func (*FlowExecutionHandler) Type ¶ added in v0.2.0
func (h *FlowExecutionHandler) Type() PayloadType
Type returns the payload type this handler processes
type FlowExecutionPayload ¶
type FlowHandlerConfig ¶ added in v0.2.0
type FlowHandlerConfig struct {
Store repo.Store
SecretsProvider SecretsProviderFn
LogManager streamlogger.LogManager
Logger *slog.Logger
Metrics *metrics.Manager
FlowExecutionTimeout time.Duration
ExecutorKeys map[string]string // executor_name → API token
APIBaseURL string
}
FlowHandlerConfig holds configuration for FlowExecutionHandler
type FlowLoaderFn ¶
type Handler ¶ added in v0.2.0
type Handler interface {
// Type returns the payload type this handler processes
Type() PayloadType
// Handle processes a job
Handle(ctx context.Context, job Job) error
}
Handler processes jobs of a specific payload type
type Input ¶
type Input struct {
Name string `yaml:"name" json:"name" validate:"required,alphanum_underscore"`
Type InputType `yaml:"type" json:"type" validate:"required,oneof=string int float bool slice_string slice_int slice_uint slice_float"`
Label string `yaml:"label" json:"label"`
Description string `yaml:"description" json:"description"`
Validation string `yaml:"validation" json:"validation"`
Required bool `yaml:"required" json:"required"`
Default string `yaml:"default" json:"default"`
MaxFileSize int64 `yaml:"max_file_size" json:"max_file_size"`
}
type InputType ¶
type InputType string
const ( INPUT_TYPE_STRING InputType = "string" INPUT_TYPE_INT InputType = "int" INPUT_TYPE_FLOAT InputType = "float" INPUT_TYPE_BOOL InputType = "bool" INPUT_TYPE_SLICE_STRING InputType = "slice_string" INPUT_TYPE_SLICE_INT InputType = "slice_int" INPUT_TYPE_SLICE_UINT InputType = "slice_uint" INPUT_TYPE_SLICE_FLOAT InputType = "slice_float" )
type Job ¶ added in v0.2.0
type Job struct {
ID int64
ExecID string
PayloadType PayloadType
Payload []byte
CreatedAt time.Time
ScheduledAt time.Time
MaxRetries int
Attempt int
}
Job represents a job passed to handlers
func (Job) ShouldRetry ¶ added in v0.6.0
type JobSyncerFn ¶ added in v0.2.0
type JobSyncerFn func(ctx context.Context) ([]ScheduledJob, error)
JobSyncerFn syncs scheduled jobs from a data source
type Node ¶
type Node struct {
ID string
Name string
Hostname string
Port int
Username string
OSFamily string
ConnectionType string
Tags []string
Auth NodeAuth
}
func (*Node) CheckConnectivity ¶
CheckConnectivity can be used to check if a remote node is accessible at the given IP:Port The default connection timeout is 5 seconds Non-nil error is returned if the node is not accessible
type NodeAuth ¶
type NodeAuth struct {
CredentialID string
Method AuthMethod
Key string
}
type NotificationHandler ¶ added in v0.2.0
type NotificationHandler struct {
// contains filtered or unexported fields
}
NotificationHandler processes notification jobs
func NewNotificationHandler ¶ added in v0.2.0
func NewNotificationHandler(m map[string]messengers.Messenger, store repo.Store, logger *slog.Logger) *NotificationHandler
func (*NotificationHandler) Handle ¶ added in v0.2.0
func (h *NotificationHandler) Handle(ctx context.Context, job Job) error
func (*NotificationHandler) Type ¶ added in v0.2.0
func (h *NotificationHandler) Type() PayloadType
type NotificationPayload ¶ added in v0.2.0
type NotificationPayload struct {
FlowID string `json:"flow_id"`
FlowName string `json:"flow_name"`
ExecID string `json:"exec_id"`
Status string `json:"status"`
Error string `json:"error,omitempty"`
Config map[string]any `json:"config"`
NamespaceID string `json:"namespace_id"`
Channel string `json:"channel"`
}
type Notify ¶ added in v0.2.0
type Notify struct {
Channel string `yaml:"channel" json:"channel"`
Config map[string]any `yaml:"config" json:"config"`
Events []NotifyEvent `yaml:"events" json:"events"`
}
type NotifyEvent ¶ added in v0.2.0
type NotifyEvent string
const ( NotifyEventOnSuccess NotifyEvent = "on_success" NotifyEventOnFailure NotifyEvent = "on_failure" NotifyEventOnWaiting NotifyEvent = "on_waiting" NotifyEventOnCancelled NotifyEvent = "on_cancelled" )
type PayloadType ¶ added in v0.2.0
type PayloadType string
PayloadType identifies different types of jobs in the queue
const PayloadTypeFlowExecution PayloadType = "flow_execution"
const PayloadTypeNotification PayloadType = "notification"
type QueueConfig ¶ added in v0.2.0
type QueueConfig struct {
Queues []QueueWeight
}
QueueConfig holds the weighted queue configuration
func (QueueConfig) GetWorkerCount ¶ added in v0.2.0
func (c QueueConfig) GetWorkerCount(pt PayloadType, totalWorkers int) int
GetWorkerCount calculates the number of goroutines for a payload type
func (QueueConfig) Validate ¶ added in v0.2.0
func (c QueueConfig) Validate() error
Validate ensures queue weights sum to 100
type QueueWeight ¶ added in v0.2.0
type QueueWeight struct {
PayloadType PayloadType
Weight int // 1-100, all weights must sum to 100
}
QueueWeight defines weight for a payload type
type RetryOptions ¶ added in v0.6.0
type RetryOptions struct {
InitialDelay time.Duration
MaxDelay time.Duration
BackoffFactor float64
}
RetryOptions configures exponential backoff for job retries
func DefaultRetryOptions ¶ added in v0.6.0
func DefaultRetryOptions() RetryOptions
func (RetryOptions) CalculateDelay ¶ added in v0.6.0
func (r RetryOptions) CalculateDelay(attempt int) time.Duration
CalculateDelay returns the delay for the given attempt using exponential backoff
type ScheduledJob ¶ added in v0.2.0
type ScheduledJob struct {
ID string
Name string
Cron string
Timezone string
PayloadType PayloadType
Payload any
}
ScheduledJob represents a job that can be scheduled via cron
type Scheduler ¶
type Scheduler struct {
// contains filtered or unexported fields
}
Scheduler implements TaskScheduler
func (*Scheduler) CancelTask ¶
CancelTask cancels a running or pending execution
func (*Scheduler) QueueScheduledTask ¶ added in v0.4.0
func (s *Scheduler) QueueScheduledTask(ctx context.Context, payloadType PayloadType, execID string, payload any, scheduledAt time.Time) (string, error)
QueueScheduledTask queues a task for delayed execution at the specified time
func (*Scheduler) QueueScheduledTaskWithRetries ¶ added in v0.6.0
func (s *Scheduler) QueueScheduledTaskWithRetries(ctx context.Context, payloadType PayloadType, execID string, payload any, scheduledAt time.Time, maxRetries int) (string, error)
QueueScheduledTaskWithRetries queues a scheduled task with retry configuration
func (*Scheduler) QueueTask ¶
func (s *Scheduler) QueueTask(ctx context.Context, payloadType PayloadType, execID string, payload any) (string, error)
QueueTask queues a task for execution with specified payload type
func (*Scheduler) QueueTaskWithRetries ¶ added in v0.6.0
func (s *Scheduler) QueueTaskWithRetries(ctx context.Context, payloadType PayloadType, execID string, payload any, maxRetries int) (string, error)
QueueTaskWithRetries queues a task with retry configuration
func (*Scheduler) SetHandler ¶ added in v0.2.0
SetHandler registers a handler for a payload type
func (*Scheduler) SetJobSyncer ¶ added in v0.2.0
func (s *Scheduler) SetJobSyncer(syncer JobSyncerFn)
SetJobSyncer sets the job syncer for cron-based scheduling
func (*Scheduler) SetQueueConfig ¶ added in v0.2.0
func (s *Scheduler) SetQueueConfig(cfg QueueConfig) error
SetQueueConfig sets the queue configuration
type SchedulerBuilder ¶
type SchedulerBuilder struct {
// contains filtered or unexported fields
}
SchedulerBuilder provides an interface for building schedulers
func NewSchedulerBuilder ¶
func NewSchedulerBuilder(logger *slog.Logger) *SchedulerBuilder
NewSchedulerBuilder creates a new scheduler builder
func (*SchedulerBuilder) Build ¶
func (b *SchedulerBuilder) Build() (*Scheduler, error)
Build creates the scheduler instance
func (*SchedulerBuilder) WithCronSyncInterval ¶
func (b *SchedulerBuilder) WithCronSyncInterval(s time.Duration) *SchedulerBuilder
WithCronSyncInterval sets the cron sync interval
func (*SchedulerBuilder) WithHandler ¶ added in v0.2.0
func (b *SchedulerBuilder) WithHandler(h Handler) *SchedulerBuilder
WithHandler adds a task handler
func (*SchedulerBuilder) WithJobStore ¶
func (b *SchedulerBuilder) WithJobStore(jobStore storage.Storage) *SchedulerBuilder
WithJobStore sets the job store
func (*SchedulerBuilder) WithQueueConfig ¶ added in v0.2.0
func (b *SchedulerBuilder) WithQueueConfig(cfg QueueConfig) *SchedulerBuilder
WithQueueConfig sets the queue configuration
func (*SchedulerBuilder) WithRetryOptions ¶ added in v0.6.0
func (b *SchedulerBuilder) WithRetryOptions(opts RetryOptions) *SchedulerBuilder
WithRetryOptions sets the retry options for failed jobs
func (*SchedulerBuilder) WithWorkerCount ¶
func (b *SchedulerBuilder) WithWorkerCount(c int) *SchedulerBuilder
WithWorkerCount sets the worker count
type Scheduling ¶
type SecretsProviderFn ¶
type TaskQueuer ¶ added in v0.2.0
type TaskQueuer interface {
QueueTask(ctx context.Context, payloadType PayloadType, execID string, payload any) (string, error)
QueueTaskWithRetries(ctx context.Context, payloadType PayloadType, execID string, payload any, maxRetries int) (string, error)
}
TaskQueuer allows handlers to enqueue new tasks
type TaskScheduler ¶
type TaskScheduler interface {
QueueTask(ctx context.Context, payloadType PayloadType, execID string, payload any) (string, error)
QueueTaskWithRetries(ctx context.Context, payloadType PayloadType, execID string, payload any, maxRetries int) (string, error)
QueueScheduledTask(ctx context.Context, payloadType PayloadType, execID string, payload any, scheduledAt time.Time) (string, error)
QueueScheduledTaskWithRetries(ctx context.Context, payloadType PayloadType, execID string, payload any, scheduledAt time.Time, maxRetries int) (string, error)
CancelTask(ctx context.Context, execID string) error
Start(ctx context.Context) error
Stop(ctx context.Context) error
}
type TriggerType ¶
type TriggerType string
const ( TriggerTypeManual TriggerType = "manual" TriggerTypeScheduled TriggerType = "scheduled" )