scheduler

package
v0.8.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 10, 2026 License: Apache-2.0 Imports: 13 Imported by: 0

Documentation

Overview

Package anticipation provides storage and matching for agent anticipations. Anticipations are "things I'm expecting to happen" that bridge scheduled/event wakes to purpose — the agent knows *why* it woke up.

Package scheduler handles future task scheduling and execution.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func FormatMatchedContext added in v0.8.0

func FormatMatchedContext(matched []*Anticipation) string

FormatMatchedContext builds the context injection text for matched anticipations.

func IsAnticipationTool added in v0.8.0

func IsAnticipationTool(name string) bool

IsAnticipationTool reports whether the named tool is an anticipation tool.

func MarshalToolCall added in v0.8.0

func MarshalToolCall(name string, args map[string]any) string

MarshalToolCall converts a tool call to JSON for logging.

func NewID

func NewID() string

NewID generates a new UUIDv7.

Types

type Anticipation added in v0.8.0

type Anticipation struct {
	ID              string            `json:"id"`
	Description     string            `json:"description"`                // Human-readable: "Dan's flight arriving"
	Context         string            `json:"context"`                    // Injected on match: instructions/reasoning
	ContextEntities []string          `json:"context_entities,omitempty"` // Entity IDs to snapshot on wake
	Recurring       bool              `json:"recurring,omitempty"`        // true = keep firing; false = auto-resolve after wake
	CooldownSeconds int               `json:"cooldown_seconds,omitempty"` // 0 = use global default
	Model           string            `json:"model,omitempty"`            // Soft model preference for wake execution
	LocalOnly       *bool             `json:"local_only,omitempty"`       // nil = use default (true); explicit false allows cloud
	QualityFloor    int               `json:"quality_floor,omitempty"`    // 0 = use wake default (6)
	Trigger         Trigger           `json:"trigger"`                    // When this anticipation activates
	CreatedAt       time.Time         `json:"created_at"`
	ExpiresAt       *time.Time        `json:"expires_at,omitempty"` // nil = no expiration
	ResolvedAt      *time.Time        `json:"resolved_at,omitempty"`
	LastFiredAt     *time.Time        `json:"last_fired_at,omitempty"`
	Metadata        map[string]string `json:"metadata,omitempty"` // Arbitrary k/v for matching
}

Anticipation represents something the agent is expecting/waiting for.

type AnticipationProvider added in v0.8.0

type AnticipationProvider struct {
	// contains filtered or unexported fields
}

Provider implements agent.ContextProvider for anticipation context injection. It checks for matching anticipations on each agent wake and injects relevant context into the system prompt. All methods are safe for concurrent use.

func NewAnticipationProvider added in v0.8.0

func NewAnticipationProvider(store *AnticipationStore) *AnticipationProvider

NewProvider creates an anticipation context provider.

func (*AnticipationProvider) ClearWakeContext added in v0.8.0

func (p *AnticipationProvider) ClearWakeContext()

ClearWakeContext resets the wake context after processing.

func (*AnticipationProvider) GetContext added in v0.8.0

func (p *AnticipationProvider) GetContext(ctx context.Context, userMessage string) (string, error)

GetContext implements agent.ContextProvider. Returns formatted anticipation context for any matching active anticipations.

func (*AnticipationProvider) SetWakeContext added in v0.8.0

func (p *AnticipationProvider) SetWakeContext(ctx WakeContext)

SetWakeContext updates the current wake context for matching. Call this before processing a message when you have event context.

type AnticipationStore added in v0.8.0

type AnticipationStore struct {
	// contains filtered or unexported fields
}

Store manages anticipation persistence.

func NewAnticipationStore added in v0.8.0

func NewAnticipationStore(db *sql.DB) (*AnticipationStore, error)

NewStore creates a new anticipation store.

func (*AnticipationStore) Active added in v0.8.0

func (s *AnticipationStore) Active() ([]*Anticipation, error)

Active returns all non-resolved, non-expired, non-deleted anticipations.

func (*AnticipationStore) All added in v0.8.0

func (s *AnticipationStore) All() ([]*Anticipation, error)

All returns all non-deleted anticipations, including resolved and expired. This is used by the dashboard to show a complete view of anticipation history.

func (*AnticipationStore) Create added in v0.8.0

func (s *AnticipationStore) Create(a *Anticipation) error

Create adds a new anticipation.

func (*AnticipationStore) Delete added in v0.8.0

func (s *AnticipationStore) Delete(id string) error

Delete soft-deletes an anticipation.

func (*AnticipationStore) Get added in v0.8.0

func (s *AnticipationStore) Get(id string) (*Anticipation, error)

Get retrieves a single anticipation by ID.

func (*AnticipationStore) MarkFired added in v0.8.0

func (s *AnticipationStore) MarkFired(id string) error

MarkFired records the current time as the last fire time for the anticipation. This persists across restarts unlike the previous in-memory tracking.

func (*AnticipationStore) Match added in v0.8.0

func (s *AnticipationStore) Match(ctx WakeContext) ([]*Anticipation, error)

Match checks which active anticipations match the current wake context. Returns matched anticipations with their injected context.

func (*AnticipationStore) OnCooldown added in v0.8.0

func (s *AnticipationStore) OnCooldown(id string, globalDefault time.Duration) (bool, error)

OnCooldown reports whether the anticipation has fired too recently. If the anticipation has a per-row cooldown_seconds > 0, that value is used; otherwise globalDefault applies. Returns false, nil if the anticipation has never fired or does not exist. Database errors other than sql.ErrNoRows are returned so the caller can decide how to handle them.

func (*AnticipationStore) Resolve added in v0.8.0

func (s *AnticipationStore) Resolve(id string) error

Resolve marks an anticipation as resolved (it happened).

type AnticipationTools added in v0.8.0

type AnticipationTools struct {
	// contains filtered or unexported fields
}

Tools provides anticipation management tools for the agent.

func NewAnticipationTools added in v0.8.0

func NewAnticipationTools(store *AnticipationStore) *AnticipationTools

NewTools creates anticipation tools backed by the given store.

func (*AnticipationTools) Execute added in v0.8.0

func (t *AnticipationTools) Execute(name string, args map[string]any) (string, error)

Execute runs an anticipation tool and returns the result.

func (*AnticipationTools) ToolDefinitions added in v0.8.0

func (t *AnticipationTools) ToolDefinitions() []map[string]any

ToolDefinitions returns the tool schemas for LLM function calling.

type Duration

type Duration struct {
	time.Duration
}

Duration wraps time.Duration for JSON serialization.

func (Duration) MarshalJSON

func (d Duration) MarshalJSON() ([]byte, error)

MarshalJSON implements json.Marshaler.

func (*Duration) UnmarshalJSON

func (d *Duration) UnmarshalJSON(b []byte) error

UnmarshalJSON implements json.Unmarshaler.

type ExecuteFunc

type ExecuteFunc func(ctx context.Context, task *Task, execution *Execution) error

ExecuteFunc is called when a task fires.

type Execution

type Execution struct {
	ID          string          `json:"id"`           // UUIDv7
	TaskID      string          `json:"task_id"`      // FK to Task
	ScheduledAt time.Time       `json:"scheduled_at"` // When it was supposed to run
	StartedAt   *time.Time      `json:"started_at,omitempty"`
	CompletedAt *time.Time      `json:"completed_at,omitempty"`
	Status      ExecutionStatus `json:"status"`
	Result      string          `json:"result,omitempty"` // Output or error
}

Execution represents a single run of a task.

type ExecutionStatus

type ExecutionStatus string

ExecutionStatus indicates the state of an execution.

const (
	StatusPending   ExecutionStatus = "pending"
	StatusRunning   ExecutionStatus = "running"
	StatusCompleted ExecutionStatus = "completed"
	StatusFailed    ExecutionStatus = "failed"
	StatusSkipped   ExecutionStatus = "skipped" // Missed window, chose not to catch up
)

type Payload

type Payload struct {
	Kind   PayloadKind    `json:"kind"`
	Target string         `json:"target,omitempty"` // Session ID, entity ID, etc.
	Data   map[string]any `json:"data,omitempty"`   // Kind-specific data
}

Payload defines what action to take when a task fires.

type PayloadKind

type PayloadKind string

PayloadKind identifies the payload type.

const (
	PayloadWake       PayloadKind = "wake"       // Wake the agent with a message
	PayloadService    PayloadKind = "service"    // Call an HA service
	PayloadAutomation PayloadKind = "automation" // Trigger an HA automation
	PayloadWebhook    PayloadKind = "webhook"    // Call external webhook
)

type Schedule

type Schedule struct {
	Kind     ScheduleKind `json:"kind"`
	At       *time.Time   `json:"at,omitempty"`       // For "at" kind
	Every    *Duration    `json:"every,omitempty"`    // For "every" kind
	Cron     string       `json:"cron,omitempty"`     // For "cron" kind
	Timezone string       `json:"timezone,omitempty"` // IANA timezone
}

Schedule defines when a task should run.

type ScheduleKind

type ScheduleKind string

ScheduleKind identifies the schedule type.

const (
	ScheduleAt    ScheduleKind = "at"    // One-shot at specific time
	ScheduleEvery ScheduleKind = "every" // Recurring interval
	ScheduleCron  ScheduleKind = "cron"  // Cron expression
)

type Scheduler

type Scheduler struct {
	// contains filtered or unexported fields
}

Scheduler manages task scheduling and execution.

func New

func New(logger *slog.Logger, store *Store, execute ExecuteFunc) *Scheduler

New creates a new scheduler.

func (*Scheduler) CreateTask

func (s *Scheduler) CreateTask(task *Task) error

CreateTask adds a new task and schedules it.

func (*Scheduler) DeleteTask

func (s *Scheduler) DeleteTask(id string) error

DeleteTask removes a task.

func (*Scheduler) GetAllTasks

func (s *Scheduler) GetAllTasks() ([]*Task, error)

GetAllTasks returns all tasks for checkpointing.

func (*Scheduler) GetTask

func (s *Scheduler) GetTask(id string) (*Task, error)

GetTask retrieves a task by ID.

func (*Scheduler) GetTaskExecutions

func (s *Scheduler) GetTaskExecutions(taskID string, limit int) ([]*Execution, error)

GetTaskExecutions returns execution history for a task.

func (*Scheduler) ListTasks

func (s *Scheduler) ListTasks(enabledOnly bool) ([]*Task, error)

ListTasks returns all tasks.

func (*Scheduler) Start

func (s *Scheduler) Start(ctx context.Context) error

Start begins the scheduler, loading tasks and setting up timers.

func (*Scheduler) Stats

func (s *Scheduler) Stats() map[string]any

Stats returns scheduler statistics.

func (*Scheduler) Stop

func (s *Scheduler) Stop()

Stop halts the scheduler.

func (*Scheduler) TriggerTask

func (s *Scheduler) TriggerTask(ctx context.Context, taskID string) (*Execution, error)

TriggerTask immediately executes a task (bypassing schedule).

func (*Scheduler) UpdateTask

func (s *Scheduler) UpdateTask(task *Task) error

UpdateTask modifies a task and reschedules it.

type Store

type Store struct {
	// contains filtered or unexported fields
}

Store handles task and execution persistence.

func NewStore

func NewStore(dbPath string) (*Store, error)

NewStore creates a scheduler store with SQLite backend.

func (*Store) Close

func (s *Store) Close() error

Close closes the database connection.

func (*Store) CreateExecution

func (s *Store) CreateExecution(e *Execution) error

CreateExecution records a new execution.

func (*Store) CreateTask

func (s *Store) CreateTask(t *Task) error

CreateTask persists a new task.

func (*Store) DeleteTask

func (s *Store) DeleteTask(id string) error

DeleteTask removes a task and its executions.

func (*Store) GetExecution

func (s *Store) GetExecution(id string) (*Execution, error)

GetExecution retrieves an execution by ID.

func (*Store) GetPendingExecutions

func (s *Store) GetPendingExecutions() ([]*Execution, error)

GetPendingExecutions returns executions that need to run.

func (*Store) GetTask

func (s *Store) GetTask(id string) (*Task, error)

GetTask retrieves a task by ID.

func (*Store) GetTaskByName added in v0.7.0

func (s *Store) GetTaskByName(name string) (*Task, error)

GetTaskByName retrieves a task by its human-readable name. Returns nil, nil when no task with the given name exists. If multiple tasks share the same name (which should not happen), returns an error to surface the data integrity problem.

func (*Store) ListExecutions

func (s *Store) ListExecutions(taskID string, limit int) ([]*Execution, error)

ListExecutions returns executions for a task.

func (*Store) ListTasks

func (s *Store) ListTasks(enabledOnly bool) ([]*Task, error)

ListTasks returns all tasks, optionally filtered by enabled status.

func (*Store) UpdateExecution

func (s *Store) UpdateExecution(e *Execution) error

UpdateExecution updates an execution record.

func (*Store) UpdateTask

func (s *Store) UpdateTask(t *Task) error

UpdateTask updates an existing task.

type Task

type Task struct {
	ID        string    `json:"id"`       // UUIDv7
	Name      string    `json:"name"`     // Human-readable label
	Schedule  Schedule  `json:"schedule"` // When to run
	Payload   Payload   `json:"payload"`  // What to do
	Enabled   bool      `json:"enabled"`
	CreatedAt time.Time `json:"created_at"`
	CreatedBy string    `json:"created_by"` // Session or user ID
	UpdatedAt time.Time `json:"updated_at"`
}

Task is the definition of a scheduled action.

func (*Task) NextRun

func (t *Task) NextRun(after time.Time) (time.Time, bool)

NextRun calculates the next execution time for a task.

type Trigger added in v0.8.0

type Trigger struct {
	// Time-based: activates after this time
	AfterTime *time.Time `json:"after_time,omitempty"`

	// State-based: activates when entity matches state
	EntityID    string `json:"entity_id,omitempty"`    // e.g., "person.dan"
	EntityState string `json:"entity_state,omitempty"` // e.g., "home" or state to match

	// Zone-based: activates on zone transition
	Zone       string `json:"zone,omitempty"`        // e.g., "airport"
	ZoneAction string `json:"zone_action,omitempty"` // "enter" or "leave"

	// Event-based: activates on specific event type
	EventType string `json:"event_type,omitempty"` // e.g., "presence_change"

	// Custom expression (future: CEL or simple DSL)
	Expression string `json:"expression,omitempty"`
}

Trigger defines conditions for when an anticipation activates.

type WakeContext added in v0.8.0

type WakeContext struct {
	Time        time.Time
	EventType   string            // What triggered the wake: "cron", "presence", "state_change", etc.
	EntityID    string            // Relevant entity if any
	EntityState string            // Current state of that entity
	Zone        string            // Zone involved if presence event
	ZoneAction  string            // "enter" or "leave"
	Metadata    map[string]string // Additional context
}

WakeContext represents the current state when the agent wakes.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL