xcron

package
v1.8.7 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 21, 2026 License: MIT Imports: 11 Imported by: 0

README

xcron

xcron is a high-reliability scheduled task system for Agents, built on top of robfig/cron/v3. It supports single delayed triggers, periodic triggers, and standard cron expressions, with persistence, concurrency safety, and retry mechanisms.

Features

  • Trigger Modes:
    • OneShot: Execute once after a specified delay (ms precision).
    • Periodic: Execute at fixed intervals (e.g., @every 10s).
    • Cron: Standard Cron syntax (e.g., 0 0 * * * *).
  • Persistence: Auto-resume tasks after restart using JobStore (GORM supported).
  • Reliability:
    • Automatic retries with configurable count.
    • Distributed lock support via Locker interface.
    • Failure callbacks for monitoring.
  • Concurrency: Thread-safe. Optional cap: SetMaxConcurrent(n)n=1 for serial execution, n=0 or unset for parallel.

Installation

go get github.com/xichan96/cortex/xcron

Usage

1. Initialize Scheduler
import (
    "github.com/xichan96/cortex/xcron"
    "gorm.io/driver/sqlite"
    "gorm.io/gorm"
)

// Setup DB
db, _ := gorm.Open(sqlite.Open("cron.db"), &gorm.Config{})
db.AutoMigrate(&xcron.Job{})

// Create Store and Scheduler
store := xcron.NewGormJobStore(db)
scheduler := xcron.NewScheduler(store)

// Optional: serial execution (one job at a time)
scheduler.SetMaxConcurrent(1)

// Optional: Set Failure Handler
scheduler.SetFailureHandler(func(job *xcron.Job, err error) {
    fmt.Printf("Job %s failed: %v\n", job.ID, err)
})

// Start
scheduler.Start()
defer scheduler.Stop()
2. Register Task Handlers

Define business logic for different task types.

scheduler.RegisterHandler(xcron.TaskTypeGreet, func(ctx context.Context, payload string) error {
    fmt.Println("Hello user:", payload)
    return nil
})
3. Add Jobs

One-Shot (Delayed):

// Run once after 500ms
id, err := scheduler.AddJob(ctx, "greet-once", xcron.JobTypeOneShot, "500ms", xcron.TaskTypeGreet, "UserA", 3)

Periodic:

// Run every 10 seconds
id, err := scheduler.AddJob(ctx, "poll-status", xcron.JobTypePeriodic, "10s", xcron.TaskTypeFunction, nil, 0)

Cron:

// Run at top of every hour
id, err := scheduler.AddJob(ctx, "hourly-report", xcron.JobTypeCron, "0 0 * * * *", xcron.TaskTypeWorkflow, nil, 1)
4. Manage Jobs
// List jobs
jobs, count, err := scheduler.ListJobs(ctx, 0, 10)

// Stop a job
err := scheduler.StopJob(ctx, jobID)

// Remove a job (delete from DB)
err := scheduler.RemoveJob(ctx, jobID)

Metrics

Optional: implement xcron.MetricsRecorder (JobStarted, JobCompleted, JobFailed) and call scheduler.SetMetricsRecorder(rec) to wire Prometheus/OTel.

Distributed Locking

To enable distributed locking, implement the xcron.Locker interface (e.g., using Redis) and set it on the scheduler.

scheduler.SetLocker(myRedisLocker)

Testing

Run unit tests:

go test -v ./xcron/...

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type AgentPayload

type AgentPayload struct {
	Message        string   `json:"message"`
	Model          string   `json:"model,omitempty"`
	Thinking       string   `json:"thinking,omitempty"`
	TimeoutSeconds int      `json:"timeout_seconds,omitempty"`
	MaxIterations  int      `json:"max_iterations,omitempty"`
	Tools          []string `json:"tools,omitempty"`
	Skills         []string `json:"skills,omitempty"`
}

AgentPayload defines the structure for agent-driven tasks

type GormJobStore

type GormJobStore struct {
	// contains filtered or unexported fields
}

func NewGormJobStore

func NewGormJobStore(db *gorm.DB) *GormJobStore

func (*GormJobStore) Delete

func (s *GormJobStore) Delete(ctx context.Context, id string) error

func (*GormJobStore) Get

func (s *GormJobStore) Get(ctx context.Context, id string) (*Job, error)

func (*GormJobStore) GetPendingJobs

func (s *GormJobStore) GetPendingJobs(ctx context.Context) ([]*Job, error)

func (*GormJobStore) List

func (s *GormJobStore) List(ctx context.Context, offset, limit int) ([]*Job, int64, error)

func (*GormJobStore) ListWithOptions added in v1.7.0

func (s *GormJobStore) ListWithOptions(ctx context.Context, opts ListOptions) ([]*Job, int64, error)

func (*GormJobStore) ResetStuckJobs

func (s *GormJobStore) ResetStuckJobs(ctx context.Context, timeout time.Duration) error

func (*GormJobStore) Save

func (s *GormJobStore) Save(ctx context.Context, job *Job) error

func (*GormJobStore) UpdateStatus

func (s *GormJobStore) UpdateStatus(ctx context.Context, id string, status JobStatus, lastRun *time.Time, nextRun time.Time, lastDuration time.Duration, lastError string) error

type Job

type Job struct {
	ID         string     `gorm:"primaryKey" json:"id"`
	Name       string     `json:"name"`
	Type       JobType    `json:"type"`
	SessionID  string     `json:"session_id" gorm:"index"`
	Schedule   string     `json:"schedule"` // Cron expr, duration string, or timestamp
	Status     JobStatus  `json:"status"`
	Payload    string     `json:"payload"` // JSON encoded data
	TaskType   TaskType   `json:"task_type"`
	Retries    int        `json:"retries"`
	MaxRetries int        `json:"max_retries"`
	LastRunAt  *time.Time `json:"last_run_at"`
	NextRunAt  time.Time  `json:"next_run_at" gorm:"index"`
	// Execution stats
	RunningAt    *time.Time    `json:"running_at,omitempty"`
	LastDuration time.Duration `json:"last_duration,omitempty"`
	LastError    string        `json:"last_error,omitempty"`
	Enabled      bool          `json:"enabled" gorm:"default:true"`

	CreatedAt     time.Time      `json:"created_at"`
	UpdatedAt     time.Time      `json:"updated_at"`
	DeletedAt     gorm.DeletedAt `gorm:"index" json:"-"`
	CronEntryID   int            `gorm:"-" json:"-"` // Internal cron entry ID
	ExecutionMode string         `json:"execution_mode" gorm:"column:execution_mode;default:''"`
}

Job represents a scheduled task

type JobPayload

type JobPayload struct {
	TargetID string            `json:"target_id"` // UserID or AgentID
	Data     map[string]string `json:"data"`
}

JobPayload is the generic structure for job data

type JobStatus

type JobStatus string

JobStatus defines the current state of the job

const (
	JobStatusPending   JobStatus = "pending"
	JobStatusRunning   JobStatus = "running"
	JobStatusCompleted JobStatus = "completed"
	JobStatusFailed    JobStatus = "failed"
	JobStatusStopped   JobStatus = "stopped"
)

type JobStore

type JobStore interface {
	Save(ctx context.Context, job *Job) error
	Get(ctx context.Context, id string) (*Job, error)
	Delete(ctx context.Context, id string) error
	List(ctx context.Context, offset, limit int) ([]*Job, int64, error)
	ListWithOptions(ctx context.Context, opts ListOptions) ([]*Job, int64, error)
	GetPendingJobs(ctx context.Context) ([]*Job, error)
	UpdateStatus(ctx context.Context, id string, status JobStatus, lastRun *time.Time, nextRun time.Time, lastDuration time.Duration, lastError string) error
	ResetStuckJobs(ctx context.Context, timeout time.Duration) error
}

type JobType

type JobType string

JobType defines the trigger mode of the job

const (
	JobTypeOneShot  JobType = "one_shot"
	JobTypePeriodic JobType = "periodic"
	JobTypeCron     JobType = "cron"
)

type ListOptions added in v1.7.0

type ListOptions struct {
	Status    []JobStatus
	Type      []JobType
	SessionID string
	OrderBy   string
	Limit     int
	Offset    int
}

type Locker

type Locker interface {
	Lock(ctx context.Context, key string, ttl time.Duration) (bool, error)
	Unlock(ctx context.Context, key string) error
}

type MemoryJobStore added in v1.6.13

type MemoryJobStore struct {
	// contains filtered or unexported fields
}

MemoryJobStore implements JobStore using in-memory map

func NewMemoryJobStore added in v1.6.13

func NewMemoryJobStore() *MemoryJobStore

NewMemoryJobStore creates a new in-memory job store

func (*MemoryJobStore) Delete added in v1.6.13

func (s *MemoryJobStore) Delete(ctx context.Context, id string) error

func (*MemoryJobStore) Get added in v1.6.13

func (s *MemoryJobStore) Get(ctx context.Context, id string) (*Job, error)

func (*MemoryJobStore) GetPendingJobs added in v1.6.13

func (s *MemoryJobStore) GetPendingJobs(ctx context.Context) ([]*Job, error)

func (*MemoryJobStore) List added in v1.6.13

func (s *MemoryJobStore) List(ctx context.Context, offset, limit int) ([]*Job, int64, error)

func (*MemoryJobStore) ListWithOptions added in v1.7.0

func (s *MemoryJobStore) ListWithOptions(ctx context.Context, opts ListOptions) ([]*Job, int64, error)

func (*MemoryJobStore) ResetStuckJobs added in v1.6.13

func (s *MemoryJobStore) ResetStuckJobs(ctx context.Context, timeout time.Duration) error

func (*MemoryJobStore) Save added in v1.6.13

func (s *MemoryJobStore) Save(ctx context.Context, job *Job) error

func (*MemoryJobStore) UpdateStatus added in v1.6.13

func (s *MemoryJobStore) UpdateStatus(ctx context.Context, id string, status JobStatus, lastRun *time.Time, nextRun time.Time, lastDuration time.Duration, lastError string) error

type MetricsRecorder added in v1.7.0

type MetricsRecorder interface {
	JobStarted(jobID, taskType string)
	JobCompleted(jobID, taskType string, duration time.Duration)
	JobFailed(jobID, taskType string)
}

MetricsRecorder is an optional observer for job execution (e.g. Prometheus/OTel).

type OneShotSchedule

type OneShotSchedule struct {
	TargetTime time.Time
}

Custom Schedule for OneShot

func (*OneShotSchedule) Next

func (s *OneShotSchedule) Next(t time.Time) time.Time

type Scheduler

type Scheduler struct {
	// contains filtered or unexported fields
}

func NewScheduler

func NewScheduler(store JobStore) *Scheduler

func (*Scheduler) AddJob

func (s *Scheduler) AddJob(ctx context.Context, name string, jobType JobType, schedule string, taskType TaskType, payload interface{}, sessionID string, maxRetries int) (string, error)

func (*Scheduler) AddJobWithOptions added in v1.7.0

func (s *Scheduler) AddJobWithOptions(ctx context.Context, name string, jobType JobType, schedule string, taskType TaskType, payload interface{}, sessionID string, maxRetries int, executionMode string) (string, error)

func (*Scheduler) ListJobs

func (s *Scheduler) ListJobs(ctx context.Context, offset, limit int) ([]*Job, int64, error)

func (*Scheduler) ListJobsWithOptions added in v1.7.0

func (s *Scheduler) ListJobsWithOptions(ctx context.Context, opts ListOptions) ([]*Job, int64, error)

func (*Scheduler) RegisterHandler

func (s *Scheduler) RegisterHandler(taskType TaskType, handler TaskHandler)

func (*Scheduler) RemoveJob

func (s *Scheduler) RemoveJob(ctx context.Context, id string) error

func (*Scheduler) SetFailureHandler

func (s *Scheduler) SetFailureHandler(h func(job *Job, err error))

func (*Scheduler) SetLocker

func (s *Scheduler) SetLocker(l Locker)

func (*Scheduler) SetMaxConcurrent added in v1.7.0

func (s *Scheduler) SetMaxConcurrent(n int)

func (*Scheduler) SetMetricsRecorder added in v1.7.0

func (s *Scheduler) SetMetricsRecorder(rec MetricsRecorder)

func (*Scheduler) SetStuckCheck added in v1.7.0

func (s *Scheduler) SetStuckCheck(interval, timeout time.Duration)

func (*Scheduler) Start

func (s *Scheduler) Start()

func (*Scheduler) Stop

func (s *Scheduler) Stop()

func (*Scheduler) StopJob

func (s *Scheduler) StopJob(ctx context.Context, id string) error

type SystemPayload

type SystemPayload struct {
	Event string `json:"event"`
	Text  string `json:"text"`
}

SystemPayload defines the structure for system events

type TaskHandler

type TaskHandler func(ctx context.Context, job *Job) error

type TaskType

type TaskType string

TaskType defines the business logic to execute

const (
	TaskTypeGreet    TaskType = "greet"
	TaskTypeReply    TaskType = "reply"
	TaskTypeWorkflow TaskType = "workflow"
	TaskTypeFunction TaskType = "function"
	TaskTypeAgent    TaskType = "agent_task"
)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL