scheduler

package
v1.0.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 15, 2026 License: Apache-2.0 Imports: 19 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Config

type Config struct {
	WorkflowDir  string
	DBPath       string
	Orchestrator *orchestration.Orchestrator
	Registry     *agent.Registry
	Tracer       observability.Tracer
	Logger       *zap.Logger
	HotReload    bool
}

Config contains scheduler configuration.

type Loader

type Loader struct {
	// contains filtered or unexported fields
}

Loader loads workflow YAML files with schedule sections and manages hot-reload.

func (*Loader) ScanDirectory

func (l *Loader) ScanDirectory(ctx context.Context) error

ScanDirectory scans the workflow directory for YAML files with schedule sections. Loads new schedules and reloads changed ones.

type Scheduler

type Scheduler struct {
	// contains filtered or unexported fields
}

Scheduler manages cron-based workflow execution.

func NewScheduler

func NewScheduler(ctx context.Context, config Config) (*Scheduler, error)

NewScheduler creates a new workflow scheduler.

func (*Scheduler) AddSchedule

func (s *Scheduler) AddSchedule(ctx context.Context, schedule *loomv1.ScheduledWorkflow) error

AddSchedule adds a new scheduled workflow.

func (*Scheduler) GetHistory

func (s *Scheduler) GetHistory(ctx context.Context, scheduleID string, limit int) ([]*loomv1.ScheduleExecution, error)

GetHistory retrieves execution history for a schedule.

func (*Scheduler) GetSchedule

func (s *Scheduler) GetSchedule(ctx context.Context, scheduleID string) (*loomv1.ScheduledWorkflow, error)

GetSchedule retrieves a schedule by ID.

func (*Scheduler) ListSchedules

func (s *Scheduler) ListSchedules(ctx context.Context) ([]*loomv1.ScheduledWorkflow, error)

ListSchedules returns all schedules.

func (*Scheduler) PauseSchedule

func (s *Scheduler) PauseSchedule(ctx context.Context, scheduleID string) error

PauseSchedule disables a schedule without removing it.

func (*Scheduler) RemoveSchedule

func (s *Scheduler) RemoveSchedule(ctx context.Context, scheduleID string) error

RemoveSchedule removes a schedule from the scheduler.

func (*Scheduler) ResumeSchedule

func (s *Scheduler) ResumeSchedule(ctx context.Context, scheduleID string) error

ResumeSchedule re-enables a paused schedule.

func (*Scheduler) Start

func (s *Scheduler) Start(ctx context.Context) error

Start initializes the scheduler and begins executing workflows.

func (*Scheduler) Stop

func (s *Scheduler) Stop(ctx context.Context) error

Stop gracefully shuts down the scheduler.

func (*Scheduler) TriggerNow

func (s *Scheduler) TriggerNow(ctx context.Context, scheduleID string, skipIfRunning bool, variables map[string]string) (string, error)

TriggerNow manually triggers a scheduled workflow immediately.

func (*Scheduler) UpdateSchedule

func (s *Scheduler) UpdateSchedule(ctx context.Context, schedule *loomv1.ScheduledWorkflow) error

UpdateSchedule updates an existing schedule.

type Store

type Store struct {
	// contains filtered or unexported fields
}

Store persists scheduled workflows and execution history to SQLite. Uses WAL mode for concurrent read/write access.

func NewStore

func NewStore(ctx context.Context, dbPath string, logger *zap.Logger) (*Store, error)

NewStore creates a new scheduler store with SQLite backend. The dbPath should point to ~/.loom/scheduler.db.

func (*Store) Close

func (s *Store) Close() error

Close closes the database connection.

func (*Store) Create

func (s *Store) Create(ctx context.Context, schedule *loomv1.ScheduledWorkflow) error

Create persists a new scheduled workflow.

func (*Store) Delete

func (s *Store) Delete(ctx context.Context, id string) error

Delete removes a scheduled workflow from the store.

func (*Store) Get

Get retrieves a scheduled workflow by ID.

func (*Store) GetDueSchedules

func (s *Store) GetDueSchedules(ctx context.Context, currentTime int64) ([]*loomv1.ScheduledWorkflow, error)

GetDueSchedules returns all schedules that should execute now. A schedule is due if its next_execution_at is <= current time and it's enabled.

func (*Store) GetExecutionHistory

func (s *Store) GetExecutionHistory(ctx context.Context, scheduleID string, limit int) ([]*loomv1.ScheduleExecution, error)

GetExecutionHistory retrieves execution history for a schedule.

func (*Store) IncrementSkipped

func (s *Store) IncrementSkipped(ctx context.Context, scheduleID string) error

IncrementSkipped increments the skipped execution counter.

func (*Store) List

func (s *Store) List(ctx context.Context) ([]*loomv1.ScheduledWorkflow, error)

List returns all scheduled workflows, optionally filtered by enabled status.

func (*Store) RecordExecution

func (s *Store) RecordExecution(ctx context.Context, exec *loomv1.ScheduleExecution, scheduleID string) error

RecordExecution stores an execution record for audit trail.

func (*Store) RecordFailure

func (s *Store) RecordFailure(ctx context.Context, scheduleID, errorMsg string) error

RecordFailure increments failed execution count and stores error.

func (*Store) RecordSuccess

func (s *Store) RecordSuccess(ctx context.Context, scheduleID string) error

RecordSuccess increments successful execution count and updates stats.

func (*Store) Update

func (s *Store) Update(ctx context.Context, schedule *loomv1.ScheduledWorkflow) error

Update modifies an existing scheduled workflow.

func (*Store) UpdateCurrentExecution

func (s *Store) UpdateCurrentExecution(ctx context.Context, scheduleID, executionID string) error

UpdateCurrentExecution sets or clears the current execution ID for a schedule.

func (*Store) UpdateNextExecution

func (s *Store) UpdateNextExecution(ctx context.Context, scheduleID string, nextExecution int64) error

UpdateNextExecution sets the next scheduled execution time.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL