scheduler

package
v0.23.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 7, 2026 License: MIT Imports: 20 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// SessionReuse reuses the same session across job executions (default).
	SessionReuse = "reuse"
	// SessionNew creates a fresh session for each execution.
	SessionNew = "new"
)
View Source
const (
	// ExecScopeSystem runs once with no user context (system workspace).
	ExecScopeSystem = "system"
	// ExecScopeUser runs once with a specific user's context (UserID must be set).
	ExecScopeUser = "user"
	// ExecScopeAllUsers fans out: runs once per active user, each with that user's context.
	ExecScopeAllUsers = "all_users"
)

ExecScope constants define how a job runs at execution time.

View Source
const (
	JobOwnerUser   = "user"
	JobOwnerPlugin = "plugin"
	JobOwnerSystem = "system"
)

Job is the persisted job definition.

View Source
const (
	RunStatusRunning = "running"
	RunStatusSuccess = "success"
	RunStatusError   = "error"
)

Variables

This section is empty.

Functions

func DecodePluginJob added in v0.9.0

func DecodePluginJob(job Job) (pluginID, key, runtimeName, description string, payload map[string]any, ok bool)

DecodePluginJob decodes the legacy reserved-message plugin job envelope. It remains only for migration of pre-schema-hardening scheduler rows.

func IsPluginJob added in v0.9.0

func IsPluginJob(job Job) bool

func RegisterBuiltin added in v0.22.0

func RegisterBuiltin(job BuiltinJob)

RegisterBuiltin registers a builtin job spec. Call from init().

func RunSessionIDFromContext added in v0.22.0

func RunSessionIDFromContext(ctx context.Context) string

func WithRunSessionID added in v0.22.0

func WithRunSessionID(ctx context.Context, sid string) context.Context

Types

type BuiltinJob added in v0.22.0

type BuiltinJob struct {
	Name        string
	Message     string
	Schedule    Schedule
	SessionMode string
	AgentID     string
	// ExecScope controls how the job runs: ExecScopeSystem (once, no user context),
	// ExecScopeUser (once for a specific user), or ExecScopeAllUsers (fan-out per active user).
	// Defaults to ExecScopeSystem when empty.
	ExecScope string
}

BuiltinJob defines a job that is automatically seeded on scheduler startup.

type ChatFunc

type ChatFunc func(ctx context.Context, sessionID, message, model string) <-chan agent.Event

ChatFunc streams runner events for heartbeat decision/execution prompts.

type Decision

type Decision struct {
	Action string `json:"action"`
	Reason string `json:"reason,omitempty"`
}

Decision is the gate-keeper response from the LLM.

type HeartbeatConfig

type HeartbeatConfig struct {
	File      string
	FastModel string
}

HeartbeatConfig holds heartbeat-specific settings.

type Job

type Job struct {
	ID          string         `json:"id"`
	OwnerKind   string         `json:"owner_kind,omitempty"`
	ExecScope   string         `json:"exec_scope,omitempty"`
	PluginID    string         `json:"plugin_id,omitempty"`
	JobKey      string         `json:"job_key,omitempty"`
	RuntimeName string         `json:"runtime_name,omitempty"`
	Name        string         `json:"name"`
	Description string         `json:"description,omitempty"`
	Schedule    Schedule       `json:"schedule"`
	Message     string         `json:"message,omitempty"`
	Payload     map[string]any `json:"payload,omitempty"`
	SessionMode string         `json:"session_mode"` // "reuse" (default) or "new"
	Enabled     bool           `json:"enabled"`
	AgentID     string         `json:"agent_id,omitempty"` // agent to route to (empty = default)
	UserID      int64          `json:"user_id,omitempty"`  // user context (0 = none)
	CreatedAt   time.Time      `json:"created_at"`
	UpdatedAt   time.Time      `json:"updated_at"`
	LastRunAt   *time.Time     `json:"last_run_at,omitempty"`
	LastError   string         `json:"last_error,omitempty"`
}

func (Job) SessionID

func (j Job) SessionID() string

SessionID returns the stable session identifier for system or single-user job executions. For all_users fan-out runs, use UserSessionID instead.

func (Job) UserSessionID added in v0.22.0

func (j Job) UserSessionID(userID int64) string

UserSessionID returns a user-scoped session ID for all_users fan-out sub-runs. In reuse mode the ID is stable per user; in new mode a timestamp suffix ensures freshness.

type JobRun added in v0.22.0

type JobRun struct {
	ID         string     `json:"id"`
	JobID      string     `json:"job_id"`
	SessionID  string     `json:"session_id"`
	UserID     int64      `json:"user_id,omitempty"`
	Status     string     `json:"status"`
	StartedAt  time.Time  `json:"started_at"`
	FinishedAt *time.Time `json:"finished_at,omitempty"`
	Error      string     `json:"error,omitempty"`
}

type ListActiveUsersFunc added in v0.22.0

type ListActiveUsersFunc func(ctx context.Context) ([]int64, error)

ListActiveUsersFunc returns the IDs of all currently active users. Used by the scheduler to fan out ExecScopeAllUsers jobs.

type OnJobFunc

type OnJobFunc func(ctx context.Context, job Job) error

OnJobFunc is called when a scheduled job fires.

type Schedule

type Schedule struct {
	Cron  string `json:"cron,omitempty"`  // "0 9 * * 1-5"
	Every string `json:"every,omitempty"` // "30m", "2h"
	At    string `json:"at,omitempty"`    // RFC3339: "2024-01-15T14:30:00+08:00"
}

Schedule defines when a job runs. Exactly one field must be set.

type Service

type Service struct {
	// contains filtered or unexported fields
}

Service manages scheduled jobs backed by gocron/v2 with database persistence.

func New

func New(db *sql.DB) (*Service, error)

New creates a scheduler service backed by the given database. Call Start to load persisted jobs and begin scheduling.

func NewFromPath

func NewFromPath(dbPath string) (*Service, error)

NewFromPath creates a scheduler service that opens its own SQLite database at the given path. The database is closed when Stop is called.

func (*Service) AddJob

func (s *Service) AddJob(name, message string, sched Schedule, sessionMode string) (Job, error)

AddJob creates, persists, and schedules a new system-scoped job (no user context). sessionMode controls session reuse: "reuse" (default) or "new".

func (*Service) AddJobForContext added in v0.12.0

func (s *Service) AddJobForContext(ctx context.Context, name, message string, sched Schedule, sessionMode string) (Job, error)

AddJobForContext creates a user-owned job bound to the current execution context. When the caller context carries agent/user scope, scheduled executions inherit it.

func (*Service) AddJobWithOwner added in v0.22.0

func (s *Service) AddJobWithOwner(name, message string, sched Schedule, sessionMode, agentID string, userID int64) (Job, error)

AddJobWithOwner creates a user-owned job with explicit owner parameters. Use AddJobForContext when a Go context carries agent/user scope.

func (*Service) AddOnJobListener added in v0.9.0

func (s *Service) AddOnJobListener(fn OnJobFunc)

AddOnJobListener appends an additional callback invoked when a job fires.

func (*Service) AddPluginJob added in v0.9.0

func (s *Service) AddPluginJob(pluginID, key, runtimeName, name, description string, sched Schedule, payload map[string]any) (Job, error)

AddPluginJob creates, persists, and schedules a plugin-owned job.

func (*Service) EnsureBuiltinJobs added in v0.22.0

func (s *Service) EnsureBuiltinJobs()

EnsureBuiltinJobs creates or updates all builtin jobs (one DB row per job regardless of ExecScope). For ExecScopeAllUsers jobs the scheduler fans out to all active users at execution time.

func (*Service) EnsureJob added in v0.22.0

func (s *Service) EnsureJob(name, message string, sched Schedule, sessionMode, agentID, execScope string) (Job, error)

EnsureJob creates a job if no job with the same name exists, or updates the existing job when any field has changed. It is intended for builtin jobs that should be seeded on startup. Jobs created by EnsureJob are owned by the system (JobOwnerSystem).

func (*Service) ListJobRuns added in v0.22.0

func (s *Service) ListJobRuns(ctx context.Context, jobID string, limit int) ([]JobRun, error)

ListJobRuns returns recent runs for a job.

func (*Service) ListJobs

func (s *Service) ListJobs() []Job

ListJobs returns all jobs.

func (*Service) RemoveJob

func (s *Service) RemoveJob(id string) error

RemoveJob unschedules and removes a job.

func (*Service) RunJobNow added in v0.22.0

func (s *Service) RunJobNow(ctx context.Context, jobID string) (string, error)

RunJobNow triggers an immediate execution of the given job asynchronously. For ExecScopeAllUsers jobs all user sub-runs are launched; no run ID is returned. For other scopes, returns the run ID of the newly created run record. Returns errJobAlreadyRunning (wrapped) if a run is already active for the job.

func (*Service) ScheduleEvery

func (s *Service) ScheduleEvery(ctx context.Context, every string, fn TaskFunc) error

ScheduleEvery registers a non-persisted recurring task on the existing scheduler.

func (*Service) SetHeartbeat

func (s *Service) SetHeartbeat(cfg HeartbeatConfig, chat ChatFunc, notifier notify.Notifier)

SetHeartbeat configures the heartbeat on the scheduler service.

func (*Service) SetLegacyDataPath

func (s *Service) SetLegacyDataPath(path string)

SetLegacyDataPath sets the directory where the legacy jobs.json file may exist. If set, Start will attempt a one-time migration from file to DB.

func (*Service) SetListActiveUsersFunc added in v0.22.0

func (s *Service) SetListActiveUsersFunc(fn ListActiveUsersFunc)

SetListActiveUsersFunc registers the function used to enumerate active users when fanning out ExecScopeAllUsers jobs.

func (*Service) SetOnJob

func (s *Service) SetOnJob(fn OnJobFunc)

SetOnJob sets the primary callback invoked when a job fires.

func (*Service) SetUserJobsEnabled added in v0.9.0

func (s *Service) SetUserJobsEnabled(enabled bool)

SetUserJobsEnabled controls whether persisted user-owned and all_users scheduler jobs are loaded.

func (*Service) Start

func (s *Service) Start(ctx context.Context) error

Start loads persisted jobs and starts the scheduler.

func (*Service) StartEphemeral

func (s *Service) StartEphemeral(ctx context.Context) error

StartEphemeral starts the shared scheduler without loading persisted jobs. Use this when the scheduler is only needed for internal tasks such as heartbeat.

func (*Service) StartHeartbeat

func (s *Service) StartHeartbeat(ctx context.Context, every string) error

StartHeartbeat schedules the heartbeat poll on the shared scheduler.

func (*Service) Stop

func (s *Service) Stop() error

Stop shuts down the scheduler and closes the database if owned.

type TaskFunc

type TaskFunc func(ctx context.Context)

TaskFunc is a lightweight scheduled callback that is not persisted as a scheduled job.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL