cron

package
v0.35.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 22, 2026 License: MIT Imports: 16 Imported by: 0

README

Cron Scheduler (internal/cron)

The cron package provides a robust scheduling system for periodic AI agent tasks. It allows users to automate long-running or repetitive workflows using natural language prompts.

Overview

The scheduler uses robfig/cron/v3 for precise timing and integrates with the internal/engine to execute tasks in isolated sessions.

Core Components

  • CronScheduler: The main orchestrator that manages job registration, timing, and execution.
  • CronStore: Persistent storage for job definitions (JSON-based).
  • RunsStore: Persistent storage for execution history and logs.
  • Executor: Handles the actual execution of jobs via the SessionManager.

Features

  • Standard Cron Syntax: Support for standard cron expressions (e.g., 0 0 * * *).
  • Isolation: Each job run creates a dedicated session in the engine.
  • Retries: Automatic exponential backoff retries on failure (1s → 2s → 4s).
  • Concurrency Control: Global limit on concurrent job executions (default: 4).
  • Webhooks: Optional OnComplete and OnFail callback URLs.

Usage

// Create a new job
job := &cron.CronJob{
    CronExpr: "*/30 * * * *",
    Prompt:   "Audit the logs for security anomalies",
    WorkDir:  "/app/logs",
    Enabled:  true,
}

// Add to scheduler
err := scheduler.AddJob(job)

CLI Interface

Manage cron jobs via hotplexd cron:

  • hotplexd cron add_cron: Create a new scheduled task.
  • hotplexd cron list_crons: View all registered jobs.
  • hotplexd cron list_runs: Check execution history and error logs.

Documentation

Overview

Package cron provides the cron scheduling subsystem for HotPlex.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type CronCallback

type CronCallback interface {
	OnComplete(run *CronRun) error
	OnFail(run *CronRun) error
}

CronCallback handles job lifecycle events.

type CronJob

type CronJob struct {
	ID         string `json:"id"`
	CronExpr   string `json:"cron_expr"`
	Prompt     string `json:"prompt"`
	SessionKey string `json:"session_key,omitempty"`
	WorkDir    string `json:"work_dir,omitempty"`

	Type        JobType       `json:"type"`
	TimeoutMins int           `json:"timeout_mins"`
	Retries     int           `json:"retries"`
	RetryDelay  time.Duration `json:"retry_delay"`

	OutputFormat OutputFormat `json:"output_format"`
	OutputSchema string       `json:"output_schema,omitempty"`

	Enabled  bool    `json:"enabled"`
	Silent   bool    `json:"silent"`
	NotifyOn []Event `json:"notify_on"`

	CreatedBy string    `json:"created_by"`
	CreatedAt time.Time `json:"created_at"`
	LastRun   time.Time `json:"last_run"`
	LastError string    `json:"last_error,omitempty"`
	NextRun   time.Time `json:"next_run"`
	RunCount  int       `json:"run_count"`

	OnComplete string `json:"on_complete,omitempty"`
	OnFail     string `json:"on_fail,omitempty"`
}

CronJob represents a scheduled task.

func (*CronJob) Clone

func (j *CronJob) Clone() *CronJob

Clone returns a deep copy of the CronJob.

type CronRun

type CronRun struct {
	ID         string        `json:"id"`
	JobID      string        `json:"job_id"`
	StartedAt  time.Time     `json:"started_at"`
	FinishedAt time.Time     `json:"finished_at"`
	Duration   time.Duration `json:"duration"`
	Status     string        `json:"status"`
	Error      string        `json:"error,omitempty"`
	RetryCount int           `json:"retry_count"`
	Response   string        `json:"response,omitempty"`
}

CronRun represents a single execution of a cron job.

type CronScheduler

type CronScheduler struct {
	// contains filtered or unexported fields
}

CronScheduler manages scheduled jobs using robfig/cron/v3.

func NewCronScheduler

func NewCronScheduler(store *CronStore, executor *Executor, logger *slog.Logger, maxConcurrent int) *CronScheduler

NewCronScheduler creates a scheduler backed by a store and executor. maxConcurrent limits how many jobs run simultaneously (default 4).

func (*CronScheduler) AddJob

func (cs *CronScheduler) AddJob(job *CronJob) error

AddJob registers a new job and starts scheduling it.

func (*CronScheduler) GetJob

func (cs *CronScheduler) GetJob(id string) *CronJob

GetJob returns a job by ID, or nil if not found.

func (*CronScheduler) ListJobs

func (cs *CronScheduler) ListJobs() []*CronJob

ListJobs returns all jobs from the store.

func (*CronScheduler) ListRuns

func (cs *CronScheduler) ListRuns(jobID string) []*CronRun

ListRuns returns all runs for a job from the RunsStore.

func (*CronScheduler) PauseJob

func (cs *CronScheduler) PauseJob(id string) error

PauseJob disables a job without removing it.

func (*CronScheduler) RemoveJob

func (cs *CronScheduler) RemoveJob(id string) error

RemoveJob stops and deletes a job.

func (*CronScheduler) ResumeJob

func (cs *CronScheduler) ResumeJob(id string) error

ResumeJob re-enables a paused job.

func (*CronScheduler) SetRunsStore

func (cs *CronScheduler) SetRunsStore(rs *RunsStore)

SetRunsStore injects the RunsStore after construction.

func (*CronScheduler) Start

func (cs *CronScheduler) Start() error

Start registers all enabled jobs and begins scheduling.

func (*CronScheduler) Stop

func (cs *CronScheduler) Stop() context.Context

Stop gracefully stops the scheduler.

type CronStore

type CronStore struct {
	// contains filtered or unexported fields
}

CronStore persists CronJobs to disk using atomic writes (Mutex + os.Rename).

func NewCronStore

func NewCronStore(dataDir string) (*CronStore, error)

NewCronStore loads or creates a CronStore at the given path.

func (*CronStore) Add

func (cs *CronStore) Add(job *CronJob) error

Add creates and persists a new CronJob with a UUID v4 ID.

func (*CronStore) Delete

func (cs *CronStore) Delete(id string) error

Delete removes a job by ID.

func (*CronStore) Get

func (cs *CronStore) Get(id string) *CronJob

Get returns a job by ID, or nil if not found.

func (*CronStore) List

func (cs *CronStore) List() []*CronJob

List returns a snapshot of all jobs.

func (*CronStore) Update

func (cs *CronStore) Update(job *CronJob) error

Update persists changes to an existing job.

type Event

type Event string

Event represents a lifecycle event of a cron job.

const (
	EventCompleted Event = "completed"
	EventFailed    Event = "failed"
	EventCanceled  Event = "canceled"
)

type ExecuteRequest

type ExecuteRequest struct {
	Job       *CronJob
	SessionID string
	WorkDir   string
	Prompt    string
}

ExecuteRequest is the input for a cron job execution.

type ExecuteResult

type ExecuteResult struct {
	Response string
	Error    error
	Duration time.Duration
}

ExecuteResult is the output of a cron job execution.

type Executor

type Executor struct {
	// contains filtered or unexported fields
}

Executor runs cron jobs via the SessionManager.

func NewExecutor

func NewExecutor(manager intengine.SessionManager) *Executor

NewExecutor creates an Executor backed by a SessionManager.

func (*Executor) Execute

func (e *Executor) Execute(ctx context.Context, req *ExecuteRequest) *ExecuteResult

Execute runs the job prompt via the SessionManager.

type JobType

type JobType string

JobType classifies a cron job by resource intensity.

const (
	JobTypeLight             JobType = "light"
	JobTypeMedium            JobType = "medium"
	JobTypeResourceIntensive JobType = "resource-intensive"
)

type OutputFormat

type OutputFormat string

OutputFormat specifies how the job output should be formatted.

const (
	OutputFormatText       OutputFormat = "text"
	OutputFormatJSON       OutputFormat = "json"
	OutputFormatStructured OutputFormat = "structured"
)

type RunsStore

type RunsStore struct {
	// contains filtered or unexported fields
}

RunsStore persists CronRun records to runs.json with a per-job retention limit.

func NewRunsStore

func NewRunsStore(dataDir string) (*RunsStore, error)

NewRunsStore loads or creates a RunsStore at dataDir/runs.json.

func (*RunsStore) AddRun

func (rs *RunsStore) AddRun(run *CronRun) error

AddRun appends a run to the job's history, trims to the per-job limit, and persists.

func (*RunsStore) GetRuns

func (rs *RunsStore) GetRuns(jobID string) []*CronRun

GetRuns returns all runs for a job, newest first.

type WebhookCallback

type WebhookCallback struct {
	URL     string
	Token   string
	Timeout time.Duration
	Retry   int
}

WebhookCallback calls a remote URL on job completion or failure.

func (*WebhookCallback) OnComplete

func (wc *WebhookCallback) OnComplete(run *CronRun) error

func (*WebhookCallback) OnFail

func (wc *WebhookCallback) OnFail(run *CronRun) error

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL