automation

package
v0.0.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 27, 2026 License: MIT Imports: 28 Imported by: 0

Documentation

Overview

Package automation hosts the runtime automation components and re-exports the canonical automation model from `internal/automation/model`.

Index

Constants

View Source
const (
	// JobResourceKind is the canonical desired-state kind for scheduled automation jobs.
	JobResourceKind resources.ResourceKind = "automation.job"
	// TriggerResourceKind is the canonical desired-state kind for event-driven automation triggers.
	TriggerResourceKind resources.ResourceKind = "automation.trigger"
)
View Source
const (
	// AutomationScopeGlobal targets daemon-wide automation without a workspace binding.
	AutomationScopeGlobal = modelpkg.AutomationScopeGlobal
	// AutomationScopeWorkspace targets automation bound to a specific workspace.
	AutomationScopeWorkspace = modelpkg.AutomationScopeWorkspace
)
View Source
const (
	// JobSourceConfig identifies a TOML-backed automation definition.
	JobSourceConfig = modelpkg.JobSourceConfig
	// JobSourcePackage identifies a daemon-managed extension bundle definition.
	JobSourcePackage = modelpkg.JobSourcePackage
	// JobSourceDynamic identifies a runtime-created automation definition.
	JobSourceDynamic = modelpkg.JobSourceDynamic
)
View Source
const (
	// ScheduleModeCron evaluates a cron expression.
	ScheduleModeCron = modelpkg.ScheduleModeCron
	// ScheduleModeEvery evaluates a Go duration interval.
	ScheduleModeEvery = modelpkg.ScheduleModeEvery
	// ScheduleModeAt evaluates a one-shot RFC3339 timestamp.
	ScheduleModeAt = modelpkg.ScheduleModeAt
)
View Source
const (
	// RetryStrategyNone disables retries after a failed run.
	RetryStrategyNone = modelpkg.RetryStrategyNone
	// RetryStrategyBackoff retries failed runs with exponential backoff.
	RetryStrategyBackoff = modelpkg.RetryStrategyBackoff
)
View Source
const (
	// RunScheduled reports a run that has been accepted but not yet started.
	RunScheduled = modelpkg.RunScheduled
	// RunRunning reports a run that is actively dispatching or executing.
	RunRunning = modelpkg.RunRunning
	// RunDelegated reports a run that delegated execution into the task domain.
	RunDelegated = modelpkg.RunDelegated
	// RunCompleted reports a run that finished successfully.
	RunCompleted = modelpkg.RunCompleted
	// RunFailed reports a run that finished with an error.
	RunFailed = modelpkg.RunFailed
	// RunCancelled reports a run that was canceled before completion.
	RunCancelled = modelpkg.RunCancelled
)
View Source
const (
	// ActivationSourceObserver identifies observer-backed trigger ingress.
	ActivationSourceObserver = modelpkg.ActivationSourceObserver
	// ActivationSourceHook identifies hook-backed trigger ingress.
	ActivationSourceHook = modelpkg.ActivationSourceHook
	// ActivationSourceWebhook identifies external webhook ingress.
	ActivationSourceWebhook = modelpkg.ActivationSourceWebhook
	// ActivationSourceExtension identifies extension-provided ingress.
	ActivationSourceExtension = modelpkg.ActivationSourceExtension
)
View Source
const DefaultMaxConcurrentJobs = modelpkg.DefaultMaxConcurrentJobs

DefaultMaxConcurrentJobs is the default global automation concurrency limit.

View Source
const DefaultTimezone = modelpkg.DefaultTimezone

DefaultTimezone is the default schedule timezone used by automation config.

View Source
const DefaultWebhookFreshnessWindow = 5 * time.Minute

DefaultWebhookFreshnessWindow is the default accepted clock skew for webhook requests.

View Source
const (
	// SchedulerCatchUpPolicySkipMissed advances missed cursors without dispatching stale fires.
	SchedulerCatchUpPolicySkipMissed = modelpkg.SchedulerCatchUpPolicySkipMissed
)

Variables

View Source
var (
	// ErrConcurrencyLimitReached reports that the shared automation gate rejected a new run.
	ErrConcurrencyLimitReached = errors.New("automation: global concurrency limit reached")
	// ErrFireLimitReached reports that a definition exceeded its rolling fire-limit window.
	ErrFireLimitReached = errors.New("automation: fire limit reached")
)
View Source
var (
	// ErrManagerNotRunning reports that a runtime-only manager action was called
	// before Start or after Shutdown.
	ErrManagerNotRunning = errors.New("automation: manager not running")
	// ErrDefinitionReadOnly reports that a managed definition cannot be
	// mutated through the runtime CRUD surface.
	ErrDefinitionReadOnly = errors.New("automation: definition is managed and read-only")
	// ErrSessionTaskActorNotFound reports that no automation-linked task actor
	// context is recorded for the supplied session.
	ErrSessionTaskActorNotFound = errors.New("automation: session task actor context not found")
)
View Source
var (
	// ErrJobNotFound reports that the requested automation job does not exist.
	ErrJobNotFound = modelpkg.ErrJobNotFound
	// ErrTriggerNotFound reports that the requested automation trigger does not exist.
	ErrTriggerNotFound = modelpkg.ErrTriggerNotFound
	// ErrRunNotFound reports that the requested automation run does not exist.
	ErrRunNotFound = modelpkg.ErrRunNotFound
	// ErrRunAlreadyExists reports that an automation run identity has already been claimed.
	ErrRunAlreadyExists = modelpkg.ErrRunAlreadyExists
	// ErrSchedulerStateNotFound reports that no durable scheduler cursor exists for a job.
	ErrSchedulerStateNotFound = modelpkg.ErrSchedulerStateNotFound
	// ErrScheduledFireAlreadyClaimed reports that a scheduled fire identity was already claimed.
	ErrScheduledFireAlreadyClaimed = modelpkg.ErrScheduledFireAlreadyClaimed
	// ErrJobNameTaken reports a duplicate job name within the same automation scope.
	ErrJobNameTaken = modelpkg.ErrJobNameTaken
	// ErrTriggerNameTaken reports a duplicate trigger name within the same automation scope.
	ErrTriggerNameTaken = modelpkg.ErrTriggerNameTaken
	// ErrTriggerWebhookIDTaken reports a duplicate stable webhook identifier.
	ErrTriggerWebhookIDTaken = modelpkg.ErrTriggerWebhookIDTaken
	// ErrOverlayRequiresConfigSource reports that enabled overlays only apply to TOML-backed definitions.
	ErrOverlayRequiresConfigSource = modelpkg.ErrOverlayRequiresConfigSource
	// ErrJobOverlayNotFound reports that a job enabled overlay row does not exist.
	ErrJobOverlayNotFound = modelpkg.ErrJobOverlayNotFound
	// ErrTriggerOverlayNotFound reports that a trigger enabled overlay row does not exist.
	ErrTriggerOverlayNotFound = modelpkg.ErrTriggerOverlayNotFound
)
View Source
var (
	// ErrScheduledJobNotFound reports that a scheduler registration does not exist.
	ErrScheduledJobNotFound = errors.New("automation: scheduled job not found")
	// ErrScheduledJobAlreadyRegistered reports that the job is already registered with the scheduler.
	ErrScheduledJobAlreadyRegistered = errors.New("automation: scheduled job already registered")
	// ErrSchedulerStopped reports that the scheduler has already been stopped and cannot accept new work.
	ErrSchedulerStopped = errors.New("automation: scheduler stopped")
)
View Source
var (
	// ErrTriggerAlreadyRegistered reports that the trigger id already exists in the runtime.
	ErrTriggerAlreadyRegistered = errors.New("automation: trigger already registered")
	// ErrTriggerEngineStopped reports that the trigger engine has already been stopped.
	ErrTriggerEngineStopped = errors.New("automation: trigger engine stopped")
	// ErrWebhookEndpointInvalid reports that a webhook endpoint value cannot be normalized.
	ErrWebhookEndpointInvalid = errors.New("automation: invalid webhook endpoint")
	// ErrWebhookTriggerNotRegistered reports that no runtime webhook registration matches the endpoint id.
	ErrWebhookTriggerNotRegistered = errors.New("automation: webhook trigger not registered")
	// ErrWebhookTimestampInvalid reports that a webhook timestamp is outside the accepted freshness window.
	ErrWebhookTimestampInvalid = errors.New("automation: webhook timestamp outside freshness window")
	// ErrWebhookSignatureInvalid reports that a webhook signature does not match the expected HMAC.
	ErrWebhookSignatureInvalid = errors.New("automation: webhook signature invalid")
	// ErrWebhookSecretRequired reports that a webhook registration did not provide auth material.
	ErrWebhookSecretRequired = errors.New("automation: webhook secret is required")
	// ErrWebhookReplayDetected reports that the same authenticated delivery id
	// was already processed within the replay window.
	ErrWebhookReplayDetected = errors.New("automation: webhook delivery already processed")
)

Functions

func FormatWebhookEndpoint

func FormatWebhookEndpoint(endpointSlug string, webhookID string) (string, error)

FormatWebhookEndpoint returns the stable public endpoint segment for one webhook registration.

func NewJobResourceCodec

func NewJobResourceCodec() (resources.KindCodec[Job], error)

NewJobResourceCodec builds the typed codec for automation.job records.

func NewTriggerResourceCodec

func NewTriggerResourceCodec() (resources.KindCodec[Trigger], error)

NewTriggerResourceCodec builds the typed codec for automation.trigger records.

func ParseTriggerPromptTemplate

func ParseTriggerPromptTemplate(prompt string) (*template.Template, error)

ParseTriggerPromptTemplate parses a trigger prompt template with strict activation-envelope validation.

func ResourceScopeForAutomation

func ResourceScopeForAutomation(scope Scope, workspaceID string) resources.ResourceScope

ResourceScopeForAutomation converts automation scope fields into the shared resource scope.

func SignWebhookPayload

func SignWebhookPayload(secret string, timestamp time.Time, payload []byte) (string, error)

SignWebhookPayload calculates the expected HMAC signature for a webhook request payload.

func ValidateScopeBinding

func ValidateScopeBinding(scope Scope, workspaceBinding string, path string, workspaceField string) error

ValidateScopeBinding enforces the global/workspace binding invariants shared by jobs, triggers, and envelopes.

func ValidateTriggerFilter

func ValidateTriggerFilter(filter map[string]string, path string) error

ValidateTriggerFilter ensures trigger filters only reference supported activation-envelope field paths.

func ValidateTriggerPromptTemplate

func ValidateTriggerPromptTemplate(prompt string) error

ValidateTriggerPromptTemplate validates a trigger prompt template against the normalized activation-envelope model.

func ValidateWebhookSignature

func ValidateWebhookSignature(secret string, timestamp time.Time, payload []byte, signature string) error

ValidateWebhookSignature verifies the provided signature before any trigger dispatch occurs.

func ValidateWebhookTimestamp

func ValidateWebhookTimestamp(timestamp time.Time, now time.Time, window time.Duration) error

ValidateWebhookTimestamp rejects stale or far-future webhook timestamps.

Types

type ActivationEnvelope

type ActivationEnvelope = modelpkg.ActivationEnvelope

ActivationEnvelope is the normalized trigger input regardless of source.

type ActivationSource

type ActivationSource = modelpkg.ActivationSource

ActivationSource identifies which ingress path produced an activation envelope.

type DispatchKind

type DispatchKind string

DispatchKind identifies which activation path produced a dispatch request.

const (
	// DispatchKindSchedule identifies time-based schedule execution.
	DispatchKindSchedule DispatchKind = "schedule"
	// DispatchKindTrigger identifies event-driven trigger execution.
	DispatchKindTrigger DispatchKind = "trigger"
	// DispatchKindManual identifies explicit user-initiated job execution.
	DispatchKindManual DispatchKind = "manual"
	// DispatchKindExtension identifies extension-fired automation execution.
	DispatchKindExtension DispatchKind = "extension"
)

func (DispatchKind) Validate

func (k DispatchKind) Validate(path string) error

Validate ensures the dispatch kind is one of the supported activation paths.

type DispatchRequest

type DispatchRequest struct {
	Kind        DispatchKind        `json:"kind"`
	Job         *Job                `json:"job,omitempty"`
	Trigger     *Trigger            `json:"trigger,omitempty"`
	Envelope    *ActivationEnvelope `json:"envelope,omitempty"`
	Payload     map[string]any      `json:"payload,omitempty"`
	Prompt      string              `json:"prompt,omitempty"`
	ReservedRun *Run                `json:"-"`
}

DispatchRequest describes one normalized automation execution attempt.

Exactly one of Job or Trigger must be provided. Triggers also require an activation envelope so prompt templates can render against the normalized trigger payload. Manual job dispatch can carry caller-supplied payload for job lifecycle hooks. Prompt allows later callers to inject a pre-render override after pre-fire hooks patch the outbound prompt.

func (DispatchRequest) Validate

func (r DispatchRequest) Validate(path string) error

Validate ensures the request can be executed by the shared dispatcher.

type Dispatcher

type Dispatcher struct {
	// contains filtered or unexported fields
}

Dispatcher routes every automation activation through one execution path.

func NewDispatcher

func NewDispatcher(sessions SessionCreator, runs RunStore, opts ...DispatcherOption) (*Dispatcher, error)

NewDispatcher constructs a shared automation dispatcher.

func (*Dispatcher) Dispatch

func (d *Dispatcher) Dispatch(ctx context.Context, req DispatchRequest) (*Run, error)

Dispatch executes one automation request through the shared governance path.

type DispatcherOption

type DispatcherOption func(*Dispatcher)

DispatcherOption customizes shared automation dispatch behavior.

func WithDispatcherGlobalWorkspacePath

func WithDispatcherGlobalWorkspacePath(path string) DispatcherOption

WithDispatcherGlobalWorkspacePath overrides the fallback path used for global automations.

func WithDispatcherHooks

func WithDispatcherHooks(hooks HookDispatcher) DispatcherOption

WithDispatcherHooks injects the automation lifecycle hook dispatcher.

func WithDispatcherLogger

func WithDispatcherLogger(logger *slog.Logger) DispatcherOption

WithDispatcherLogger overrides the dispatcher logger.

func WithDispatcherMaxConcurrent

func WithDispatcherMaxConcurrent(limit int) DispatcherOption

WithDispatcherMaxConcurrent overrides the shared automation concurrency gate.

func WithDispatcherNow

func WithDispatcherNow(now func() time.Time) DispatcherOption

WithDispatcherNow overrides the dispatcher clock.

func WithDispatcherSessionStopTimeout

func WithDispatcherSessionStopTimeout(timeout time.Duration) DispatcherOption

WithDispatcherSessionStopTimeout overrides the automation session stop budget.

func WithDispatcherSleep

func WithDispatcherSleep(sleep SleepFunc) DispatcherOption

WithDispatcherSleep overrides retry waiting, mainly for tests.

func WithDispatcherTaskActorRecorder

func WithDispatcherTaskActorRecorder(recorder SessionTaskActorRecorder) DispatcherOption

WithDispatcherTaskActorRecorder injects the session provenance recorder used to support automation-linked agent task creation.

func WithDispatcherTasks

func WithDispatcherTasks(tasks TaskService) DispatcherOption

WithDispatcherTasks injects the task-domain service used for direct task-backed automation jobs.

type ExtensionTriggerRequest

type ExtensionTriggerRequest struct {
	Event       string         `json:"event"`
	Scope       Scope          `json:"scope"`
	WorkspaceID string         `json:"workspace_id,omitempty"`
	Payload     map[string]any `json:"payload,omitempty"`
}

ExtensionTriggerRequest describes one extension-originated trigger fire.

func (ExtensionTriggerRequest) Validate

func (r ExtensionTriggerRequest) Validate(path string) error

Validate ensures the extension trigger request matches the ext.* ingress contract.

type FireLimitConfig

type FireLimitConfig = modelpkg.FireLimitConfig

FireLimitConfig caps how often a job or trigger may fire within a rolling window.

func DefaultFireLimitConfig

func DefaultFireLimitConfig() FireLimitConfig

DefaultFireLimitConfig returns the default rolling fire-limit policy.

type FireLimitError

type FireLimitError struct {
	Count   int64
	Limit   int
	Window  time.Duration
	RetryAt time.Time
}

FireLimitError carries the next eligible retry instant for fire-limit backoff.

func (*FireLimitError) Error

func (e *FireLimitError) Error() string

func (*FireLimitError) Unwrap

func (e *FireLimitError) Unwrap() error

type HookDispatcher

type HookDispatcher interface {
	DispatchAutomationJobPreFire(
		ctx context.Context,
		payload hookspkg.AutomationJobPreFirePayload,
	) (hookspkg.AutomationJobPreFirePayload, error)
	DispatchAutomationJobPostFire(
		ctx context.Context,
		payload hookspkg.AutomationJobPostFirePayload,
	) (hookspkg.AutomationJobPostFirePayload, error)
	DispatchAutomationTriggerPreFire(
		ctx context.Context,
		payload hookspkg.AutomationTriggerPreFirePayload,
	) (hookspkg.AutomationTriggerPreFirePayload, error)
	DispatchAutomationTriggerPostFire(
		ctx context.Context,
		payload hookspkg.AutomationTriggerPostFirePayload,
	) (hookspkg.AutomationTriggerPostFirePayload, error)
	DispatchAutomationRunCompleted(
		ctx context.Context,
		payload hookspkg.AutomationRunCompletedPayload,
	) (hookspkg.AutomationRunCompletedPayload, error)
	DispatchAutomationRunFailed(
		ctx context.Context,
		payload hookspkg.AutomationRunFailedPayload,
	) (hookspkg.AutomationRunFailedPayload, error)
}

HookDispatcher emits automation lifecycle hooks around shared dispatch.

type HookSessionResolver

type HookSessionResolver interface {
	Status(ctx context.Context, id string) (*session.Info, error)
}

HookSessionResolver resolves session metadata for hook-completion ingress.

type Job

type Job = modelpkg.Job

Job is the canonical scheduled automation definition used by runtime and storage layers.

type JobEnabledOverlay

type JobEnabledOverlay = modelpkg.JobEnabledOverlay

JobEnabledOverlay stores the runtime enabled override for a config-backed job.

type JobListQuery

type JobListQuery = modelpkg.JobListQuery

JobListQuery filters persisted automation job listings.

type JobSource

type JobSource = modelpkg.JobSource

JobSource identifies where a job or trigger definition originated.

type JobTaskConfig

type JobTaskConfig = modelpkg.JobTaskConfig

JobTaskConfig configures direct automation-to-task materialization for one job.

type Manager

type Manager struct {
	// contains filtered or unexported fields
}

Manager composes persistence, dispatch, schedules, triggers, and runtime status into one daemon-owned automation subsystem.

func New

func New(opts ...Option) (*Manager, error)

New constructs the composed automation manager.

func (*Manager) ApplyJobResourceState

func (m *Manager) ApplyJobResourceState(ctx context.Context, plan resources.ProjectionPlan) error

ApplyJobResourceState atomically swaps the scheduler and desired job catalog.

func (*Manager) ApplyTriggerResourceState

func (m *Manager) ApplyTriggerResourceState(ctx context.Context, plan resources.ProjectionPlan) error

ApplyTriggerResourceState atomically swaps the trigger engine and desired trigger catalog.

func (*Manager) BuildJobResourceState

func (m *Manager) BuildJobResourceState(
	ctx context.Context,
	records []resources.Record[Job],
) (resources.ProjectionPlan, error)

BuildJobResourceState builds the next scheduler plan from canonical automation.job records.

func (*Manager) BuildTriggerResourceState

func (m *Manager) BuildTriggerResourceState(
	ctx context.Context,
	records []resources.Record[Trigger],
) (resources.ProjectionPlan, error)

BuildTriggerResourceState builds the next trigger-engine plan from canonical automation.trigger records.

func (*Manager) CreateJob

func (m *Manager) CreateJob(ctx context.Context, job Job) (Job, error)

CreateJob stores a new dynamic automation job and registers it into the runtime when the scheduler is active.

func (*Manager) CreateTrigger

func (m *Manager) CreateTrigger(
	ctx context.Context,
	trigger Trigger,
	webhookSecret WebhookSecretWrite,
) (Trigger, error)

CreateTrigger stores a new dynamic trigger definition plus its write-only webhook secret value when applicable, then registers it into the runtime engine.

func (*Manager) DeleteAutomationSessionTaskActor

func (m *Manager) DeleteAutomationSessionTaskActor(sessionID string)

DeleteAutomationSessionTaskActor removes any recorded task actor context for the supplied automation-launched session.

func (*Manager) DeleteJob

func (m *Manager) DeleteJob(ctx context.Context, id string) error

DeleteJob removes one dynamic automation job definition and unregisters it from the runtime scheduler when needed.

func (*Manager) DeleteTrigger

func (m *Manager) DeleteTrigger(ctx context.Context, id string) error

DeleteTrigger removes one dynamic trigger definition and clears any persisted webhook secret.

func (*Manager) FireExtensionTrigger

func (m *Manager) FireExtensionTrigger(ctx context.Context, request ExtensionTriggerRequest) (TriggerResult, error)

FireExtensionTrigger routes one extension-originated ext.* event through the shared trigger engine.

func (*Manager) GetJob

func (m *Manager) GetJob(ctx context.Context, id string) (Job, error)

GetJob returns one overlay-aware job definition by id.

func (*Manager) GetRun

func (m *Manager) GetRun(ctx context.Context, id string) (Run, error)

GetRun returns one persisted automation run by id.

func (*Manager) GetTrigger

func (m *Manager) GetTrigger(ctx context.Context, id string) (Trigger, error)

GetTrigger returns one overlay-aware trigger definition by id.

func (*Manager) HandleWebhook

func (m *Manager) HandleWebhook(ctx context.Context, request WebhookRequest) (TriggerResult, error)

HandleWebhook validates, normalizes, and dispatches a webhook delivery through the running trigger engine.

func (*Manager) HookTelemetrySink

func (m *Manager) HookTelemetrySink() hookspkg.TelemetrySink

HookTelemetrySink exposes the existing hook telemetry sink seam for hook-completion trigger ingress.

func (*Manager) Jobs

func (m *Manager) Jobs(ctx context.Context) ([]Job, error)

Jobs returns overlay-aware job definitions from persistence.

func (*Manager) ListJobs

func (m *Manager) ListJobs(ctx context.Context, query JobListQuery) ([]Job, error)

ListJobs returns overlay-aware job definitions using the supplied filters.

func (*Manager) ListRuns

func (m *Manager) ListRuns(ctx context.Context, query RunQuery) ([]Run, error)

ListRuns returns persisted automation run history using the supplied filters.

func (*Manager) ListTriggers

func (m *Manager) ListTriggers(ctx context.Context, query TriggerListQuery) ([]Trigger, error)

ListTriggers returns overlay-aware trigger definitions using the supplied filters.

func (*Manager) MemoryObserver

func (m *Manager) MemoryObserver() MemoryConsolidationObserver

MemoryObserver exposes the automation memory-consolidation observer seam for callers that can publish completion events.

func (*Manager) RecordAutomationSessionTaskActor

func (m *Manager) RecordAutomationSessionTaskActor(sessionID string, actor taskpkg.ActorContext) error

RecordAutomationSessionTaskActor stores the trusted task-domain actor context for one automation-launched session.

func (*Manager) Runs

func (m *Manager) Runs(ctx context.Context, query RunQuery) ([]Run, error)

Runs returns persisted automation run history.

func (*Manager) SessionObserver

func (m *Manager) SessionObserver() session.Notifier

SessionObserver exposes the existing session notifier seam for automation trigger ingress.

func (*Manager) SetJobEnabled

func (m *Manager) SetJobEnabled(ctx context.Context, id string, enabled bool) (Job, error)

SetJobEnabled updates the effective enabled state for one job. Config-backed jobs use overlay rows while dynamic jobs mutate their persisted definition.

func (*Manager) SetTriggerEnabled

func (m *Manager) SetTriggerEnabled(ctx context.Context, id string, enabled bool) (Trigger, error)

SetTriggerEnabled updates the effective enabled state for one trigger. Config-backed triggers use overlay rows while dynamic triggers mutate their persisted definition.

func (*Manager) Shutdown

func (m *Manager) Shutdown(ctx context.Context) error

Shutdown stops trigger ingestion, cancels in-flight work, and shuts down the runtime scheduler.

func (*Manager) Start

func (m *Manager) Start(ctx context.Context) error

Start synchronizes TOML definitions into persistence, loads effective automation state, and starts the runtime scheduler and trigger engine.

func (*Manager) Status

func (m *Manager) Status(ctx context.Context) (ManagerStatus, error)

Status returns aggregate automation lifecycle and next-fire metadata.

func (*Manager) SyncManagedDefinitions

func (m *Manager) SyncManagedDefinitions(
	ctx context.Context,
	source JobSource,
	desiredJobs []Job,
	desiredTriggers []Trigger,
) (SyncStats, error)

SyncManagedDefinitions reconciles one daemon-managed automation source against the persisted store and runtime registries.

func (*Manager) TaskActorContextForSession

func (m *Manager) TaskActorContextForSession(sessionID string) (taskpkg.ActorContext, error)

TaskActorContextForSession returns the automation-linked task actor context previously recorded for one automation-launched session.

func (*Manager) TriggerJob

func (m *Manager) TriggerJob(ctx context.Context, id string) (Run, error)

TriggerJob forces one immediate manual execution through the shared dispatcher path.

func (*Manager) TriggerJobWithPayload

func (m *Manager) TriggerJobWithPayload(ctx context.Context, id string, payload map[string]any) (Run, error)

TriggerJobWithPayload forces one immediate manual execution and exposes the caller-supplied payload to job lifecycle hooks.

func (*Manager) Triggers

func (m *Manager) Triggers(ctx context.Context) ([]Trigger, error)

Triggers returns overlay-aware trigger definitions from persistence.

func (*Manager) UpdateJob

func (m *Manager) UpdateJob(ctx context.Context, job Job) (Job, error)

UpdateJob replaces one existing dynamic automation job definition.

func (*Manager) UpdateTrigger

func (m *Manager) UpdateTrigger(
	ctx context.Context,
	trigger Trigger,
	webhookSecret *WebhookSecretWrite,
) (Trigger, error)

UpdateTrigger replaces one existing dynamic trigger definition.

type ManagerStatus

type ManagerStatus struct {
	Running          bool                `json:"running"`
	SchedulerRunning bool                `json:"scheduler_running"`
	Jobs             ResourceStatus      `json:"jobs"`
	Triggers         ResourceStatus      `json:"triggers"`
	ScheduledJobs    []ScheduledJobState `json:"scheduled_jobs,omitempty"`
	NextFire         *time.Time          `json:"next_fire,omitempty"`
	LastSync         SyncStats           `json:"last_sync"`
}

ManagerStatus exposes automation lifecycle, count, and next-fire metadata without transport-specific wrappers.

type MemoryConsolidatedEvent

type MemoryConsolidatedEvent struct {
	WorkspaceID string         `json:"workspace_id,omitempty"`
	Timestamp   time.Time      `json:"timestamp"`
	Data        map[string]any `json:"data,omitempty"`
}

MemoryConsolidatedEvent is the observer-facing completion payload used for normalized memory ingress.

type MemoryConsolidationObserver

type MemoryConsolidationObserver interface {
	OnMemoryConsolidated(ctx context.Context, event MemoryConsolidatedEvent) error
}

MemoryConsolidationObserver receives dream consolidation completions at the trigger-engine boundary.

type Option

type Option func(*managerOptions)

Option customizes automation manager construction.

func WithConfig

func WithConfig(cfg aghconfig.AutomationConfig) Option

WithConfig injects the loaded automation config.

func WithDispatcherOptions

func WithDispatcherOptions(options ...DispatcherOption) Option

WithDispatcherOptions appends dispatcher options used when constructing the shared dispatcher.

func WithGlobalWorkspacePath

func WithGlobalWorkspacePath(path string) Option

WithGlobalWorkspacePath injects the fallback workspace path used for global automation sessions.

func WithHooks

func WithHooks(hooks HookDispatcher) Option

WithHooks injects the automation lifecycle hook dispatcher used by the shared dispatcher path.

func WithLogger

func WithLogger(logger *slog.Logger) Option

WithLogger injects the subsystem logger.

func WithManagerNow

func WithManagerNow(now func() time.Time) Option

WithManagerNow overrides the manager clock used for sync bookkeeping.

func WithResourceDefinitions

func WithResourceDefinitions(
	jobStore resources.Store[Job],
	triggerStore resources.Store[Trigger],
	actor resources.MutationActor,
	trigger func(context.Context, resources.ResourceKind, resources.ReconcileReason) error,
) Option

WithResourceDefinitions switches desired-state automation definitions to the shared resource runtime while keeping operational run state on Store.

func WithSchedulerOptions

func WithSchedulerOptions(options ...SchedulerOption) Option

WithSchedulerOptions appends scheduler options used when constructing the runtime scheduler.

func WithSessions

func WithSessions(sessions SessionManager) Option

WithSessions injects the runtime session manager used by the dispatcher and hook-derived trigger ingress.

func WithStore

func WithStore(store Store) Option

WithStore injects the automation persistence store.

func WithTasks

func WithTasks(tasks TaskService) Option

WithTasks injects the task-domain service used for task-backed automation jobs.

func WithTriggerEngineOptions

func WithTriggerEngineOptions(options ...TriggerEngineOption) Option

WithTriggerEngineOptions appends trigger-engine options used when constructing the runtime engine.

func WithWebhookSecretStore

func WithWebhookSecretStore(store WebhookSecretStore) Option

WithWebhookSecretStore injects the vault-backed store used for webhook trigger secrets.

func WithWorkspaceResolver

func WithWorkspaceResolver(resolver workspacepkg.RuntimeResolver) Option

WithWorkspaceResolver injects the canonical workspace resolver used to turn TOML workspace references into registered workspace IDs.

type ParsedWebhookEndpoint

type ParsedWebhookEndpoint struct {
	EndpointSlug string `json:"endpoint_slug"`
	WebhookID    string `json:"webhook_id"`
}

ParsedWebhookEndpoint is the normalized webhook endpoint split into slug and stable webhook id.

func ParseWebhookEndpoint

func ParseWebhookEndpoint(endpoint string) (ParsedWebhookEndpoint, error)

ParseWebhookEndpoint resolves the human slug and stable webhook id from an endpoint path segment.

type ResourceStatus

type ResourceStatus struct {
	Total   int `json:"total"`
	Enabled int `json:"enabled"`
}

ResourceStatus reports total and enabled counts for one automation resource family.

type RetryConfig

type RetryConfig = modelpkg.RetryConfig

RetryConfig defines retry behavior for a failed automation run.

func DefaultBackoffRetryConfig

func DefaultBackoffRetryConfig() RetryConfig

DefaultBackoffRetryConfig returns the default exponential backoff retry policy.

func DefaultRetryConfig

func DefaultRetryConfig() RetryConfig

DefaultRetryConfig returns the default retry policy for automation definitions.

type RetryStrategy

type RetryStrategy = modelpkg.RetryStrategy

RetryStrategy identifies how failed runs should be retried.

type Run

type Run = modelpkg.Run

Run records the execution state of a single automation fire.

type RunQuery

type RunQuery = modelpkg.RunQuery

RunQuery filters automation run history and fire-limit window lookups.

type RunStatus

type RunStatus = modelpkg.RunStatus

RunStatus identifies the current lifecycle state of an automation run.

type RunStore

type RunStore interface {
	CreateRun(ctx context.Context, run Run) (Run, error)
	UpdateRun(ctx context.Context, run Run) (Run, error)
	CountRuns(ctx context.Context, query RunQuery) (int64, error)
	ListRuns(ctx context.Context, query RunQuery) ([]Run, error)
}

RunStore persists automation run state and restart-safe fire-limit inputs.

type ScheduleDispatcher

type ScheduleDispatcher interface {
	Dispatch(ctx context.Context, req DispatchRequest) (*Run, error)
}

ScheduleDispatcher is the execution surface used by scheduled jobs.

type ScheduleMode

type ScheduleMode = modelpkg.ScheduleMode

ScheduleMode identifies how a scheduled job determines its next fire time.

type ScheduleSpec

type ScheduleSpec = modelpkg.ScheduleSpec

ScheduleSpec describes how a job should be scheduled.

type ScheduledJobState

type ScheduledJobState struct {
	JobID               string                 `json:"job_id"`
	Registered          bool                   `json:"registered"`
	NextRun             *time.Time             `json:"next_run,omitempty"`
	LastRun             *time.Time             `json:"last_run,omitempty"`
	LastScheduledAt     *time.Time             `json:"last_scheduled_at,omitempty"`
	LastFireID          string                 `json:"last_fire_id,omitempty"`
	CatchUpPolicy       SchedulerCatchUpPolicy `json:"catch_up_policy,omitempty"`
	MisfireGraceSeconds int                    `json:"misfire_grace_seconds,omitempty"`
	LastMisfireAt       *time.Time             `json:"last_misfire_at,omitempty"`
	MisfireCount        int                    `json:"misfire_count,omitempty"`
	Durable             *SchedulerState        `json:"durable,omitempty"`
}

ScheduledJobState exposes runtime schedule metadata for one registered job.

type Scheduler

type Scheduler struct {
	// contains filtered or unexported fields
}

Scheduler owns durable cursor-driven scheduled-job dispatch.

func NewScheduler

func NewScheduler(dispatcher ScheduleDispatcher, opts ...SchedulerOption) (*Scheduler, error)

NewScheduler constructs a scheduled-job runtime over gocron.

func (*Scheduler) Register

func (s *Scheduler) Register(ctx context.Context, job Job) (ScheduledJobState, error)

Register adds a new scheduled job registration.

func (*Scheduler) Shutdown

func (s *Scheduler) Shutdown(ctx context.Context) error

Shutdown is an alias for Stop to match daemon runtime conventions.

func (*Scheduler) Start

func (s *Scheduler) Start(ctx context.Context) error

Start begins scheduled-job execution.

func (*Scheduler) State

func (s *Scheduler) State(jobID string) (ScheduledJobState, error)

State returns runtime metadata for one scheduled job.

func (*Scheduler) States

func (s *Scheduler) States() []ScheduledJobState

States returns runtime metadata for every registered scheduled job.

func (*Scheduler) Stop

func (s *Scheduler) Stop(ctx context.Context) error

Stop shuts the scheduler down and cancels in-flight dispatches.

func (*Scheduler) Unregister

func (s *Scheduler) Unregister(ctx context.Context, jobID string) error

Unregister removes a scheduled job registration.

func (*Scheduler) Update

func (s *Scheduler) Update(ctx context.Context, job Job) (ScheduledJobState, error)

Update replaces an existing scheduled job registration.

type SchedulerCatchUpPolicy

type SchedulerCatchUpPolicy = modelpkg.SchedulerCatchUpPolicy

SchedulerCatchUpPolicy identifies how missed scheduled fires are reconciled.

type SchedulerClaim

type SchedulerClaim = modelpkg.SchedulerClaim

SchedulerClaim reserves one scheduled fire after cursor advancement.

type SchedulerClaimResult

type SchedulerClaimResult = modelpkg.SchedulerClaimResult

SchedulerClaimResult reports the durable state and run reservation for one scheduled fire.

type SchedulerOption

type SchedulerOption func(*Scheduler)

SchedulerOption customizes scheduled-job runtime behavior.

func WithSchedulerClock

func WithSchedulerClock(clock clockwork.Clock) SchedulerOption

WithSchedulerClock overrides the scheduler clock, mainly for tests.

func WithSchedulerLocation

func WithSchedulerLocation(location *time.Location) SchedulerOption

WithSchedulerLocation overrides the timezone used for schedule evaluation.

func WithSchedulerLogger

func WithSchedulerLogger(logger *slog.Logger) SchedulerOption

WithSchedulerLogger overrides the scheduler logger.

func WithSchedulerStopTimeout

func WithSchedulerStopTimeout(timeout time.Duration) SchedulerOption

WithSchedulerStopTimeout overrides the graceful shutdown timeout used by gocron.

func WithSchedulerStore

func WithSchedulerStore(store SchedulerStore) SchedulerOption

WithSchedulerStore injects durable scheduler cursor persistence.

type SchedulerState

type SchedulerState = modelpkg.SchedulerState

SchedulerState stores the durable scheduling cursor for one automation job.

type SchedulerStore

type SchedulerStore interface {
	GetSchedulerState(ctx context.Context, jobID string) (SchedulerState, error)
	SaveSchedulerState(ctx context.Context, state SchedulerState) (SchedulerState, error)
	DeleteSchedulerState(ctx context.Context, jobID string) error
	ClaimScheduledRun(ctx context.Context, claim SchedulerClaim) (SchedulerClaimResult, error)
	RecordRunDeliveryError(ctx context.Context, runID string, runErr error) (Run, error)
}

SchedulerStore persists durable scheduler cursor state and run reservations before dispatch.

type Scope

type Scope = modelpkg.Scope

Scope identifies the visibility boundary of an automation resource.

type SessionCreator

type SessionCreator interface {
	Create(ctx context.Context, opts session.CreateOpts) (*session.Session, error)
	Prompt(ctx context.Context, id string, msg string) (<-chan acp.AgentEvent, error)
	StopWithCause(ctx context.Context, id string, cause session.StopCause, detail string) error
}

SessionCreator is the subset of session.Manager needed by the dispatcher.

type SessionManager

type SessionManager interface {
	SessionCreator
	Status(ctx context.Context, id string) (*session.Info, error)
}

SessionManager is the runtime session surface required by the automation manager. It extends the dispatcher path with lookup support for hook-derived trigger ingress.

type SessionTaskActorRecorder

type SessionTaskActorRecorder interface {
	RecordAutomationSessionTaskActor(sessionID string, actor taskpkg.ActorContext) error
	DeleteAutomationSessionTaskActor(sessionID string)
}

AutomationSessionTaskActorRecorder stores trusted task-domain provenance for automation-launched sessions that may later create tasks explicitly.

type SleepFunc

type SleepFunc func(ctx context.Context, delay time.Duration) error

SleepFunc waits for retry backoff with context cancellation support.

type Store

type Store interface {
	RunStore
	GetRun(ctx context.Context, id string) (Run, error)
	CreateJob(ctx context.Context, job Job) (Job, error)
	UpdateJob(ctx context.Context, job Job) (Job, error)
	DeleteJob(ctx context.Context, id string) error
	GetJob(ctx context.Context, id string) (Job, error)
	ListJobs(ctx context.Context, query JobListQuery) ([]Job, error)
	CreateTrigger(ctx context.Context, trigger Trigger) (Trigger, error)
	UpdateTrigger(ctx context.Context, trigger Trigger) (Trigger, error)
	DeleteTrigger(ctx context.Context, id string) error
	GetTrigger(ctx context.Context, id string) (Trigger, error)
	ListTriggers(ctx context.Context, query TriggerListQuery) ([]Trigger, error)
	ListRuns(ctx context.Context, query RunQuery) ([]Run, error)
	GetSchedulerState(ctx context.Context, jobID string) (SchedulerState, error)
	ListSchedulerStates(ctx context.Context) ([]SchedulerState, error)
	SaveSchedulerState(ctx context.Context, state SchedulerState) (SchedulerState, error)
	DeleteSchedulerState(ctx context.Context, jobID string) error
	ClaimScheduledRun(ctx context.Context, claim SchedulerClaim) (SchedulerClaimResult, error)
	RecordRunDeliveryError(ctx context.Context, runID string, runErr error) (Run, error)
	SetJobEnabledOverlay(ctx context.Context, overlay JobEnabledOverlay) (JobEnabledOverlay, error)
	GetJobEnabledOverlay(ctx context.Context, jobID string) (JobEnabledOverlay, error)
	ListJobEnabledOverlays(ctx context.Context) ([]JobEnabledOverlay, error)
	DeleteJobEnabledOverlay(ctx context.Context, jobID string) error
	SetTriggerEnabledOverlay(ctx context.Context, overlay TriggerEnabledOverlay) (TriggerEnabledOverlay, error)
	GetTriggerEnabledOverlay(ctx context.Context, triggerID string) (TriggerEnabledOverlay, error)
	ListTriggerEnabledOverlays(ctx context.Context) ([]TriggerEnabledOverlay, error)
	DeleteTriggerEnabledOverlay(ctx context.Context, triggerID string) error
}

Store is the automation persistence surface consumed by the composed automation manager.

type SyncStats

type SyncStats struct {
	JobsSynced      int       `json:"jobs_synced"`
	TriggersSynced  int       `json:"triggers_synced"`
	JobsRemoved     int       `json:"jobs_removed"`
	TriggersRemoved int       `json:"triggers_removed"`
	SyncedAt        time.Time `json:"synced_at"`
}

SyncStats summarizes one TOML synchronization pass.

type TaskService

type TaskService interface {
	CreateTask(ctx context.Context, spec taskpkg.CreateTask, actor taskpkg.ActorContext) (*taskpkg.Task, error)
	EnqueueRun(ctx context.Context, spec taskpkg.EnqueueRun, actor taskpkg.ActorContext) (*taskpkg.Run, error)
}

TaskService exposes the minimal task-domain surface used by task-backed automation jobs.

type Trigger

type Trigger = modelpkg.Trigger

Trigger is the canonical event-driven automation definition used by runtime and storage layers.

type TriggerDispatcher

type TriggerDispatcher interface {
	Dispatch(ctx context.Context, req DispatchRequest) (*Run, error)
}

TriggerDispatcher is the shared execution surface used by matched triggers.

type TriggerEnabledOverlay

type TriggerEnabledOverlay = modelpkg.TriggerEnabledOverlay

TriggerEnabledOverlay stores the runtime enabled override for a config-backed trigger.

type TriggerEngine

type TriggerEngine struct {
	// contains filtered or unexported fields
}

TriggerEngine matches normalized activations against registered triggers and dispatches runs.

func NewTriggerEngine

func NewTriggerEngine(dispatcher TriggerDispatcher, opts ...TriggerEngineOption) (*TriggerEngine, error)

NewTriggerEngine constructs a trigger runtime over the shared dispatcher path.

func (*TriggerEngine) Fire

Fire matches one normalized activation envelope against all registered triggers.

func (*TriggerEngine) FireHookCompletion

func (e *TriggerEngine) FireHookCompletion(
	ctx context.Context,
	sessionID string,
	record hookspkg.HookRunRecord,
) (TriggerResult, error)

FireHookCompletion normalizes one hook-completion telemetry record into the shared matching path.

func (*TriggerEngine) FireMemoryConsolidated

func (e *TriggerEngine) FireMemoryConsolidated(
	ctx context.Context,
	event MemoryConsolidatedEvent,
) (TriggerResult, error)

FireMemoryConsolidated normalizes a dream-consolidation completion into the shared matching path.

func (*TriggerEngine) FireSessionCreated

func (e *TriggerEngine) FireSessionCreated(ctx context.Context, sess *session.Session) (TriggerResult, error)

FireSessionCreated normalizes a session-created lifecycle event and routes it through the shared matching path.

func (*TriggerEngine) FireSessionStopped

func (e *TriggerEngine) FireSessionStopped(ctx context.Context, sess *session.Session) (TriggerResult, error)

FireSessionStopped normalizes a session-stopped lifecycle event and routes it through the shared matching path.

func (*TriggerEngine) HandleWebhook

func (e *TriggerEngine) HandleWebhook(ctx context.Context, request WebhookRequest) (TriggerResult, error)

HandleWebhook authenticates, normalizes, and dispatches one webhook delivery.

func (*TriggerEngine) HookTelemetrySink

func (e *TriggerEngine) HookTelemetrySink() hookspkg.TelemetrySink

HookTelemetrySink exposes the existing hook telemetry sink shape for hook-completion ingress.

func (*TriggerEngine) MemoryObserver

func (e *TriggerEngine) MemoryObserver() MemoryConsolidationObserver

MemoryObserver exposes the observer-facing dream-consolidation completion adapter.

func (*TriggerEngine) Register

func (e *TriggerEngine) Register(registration TriggerRegistration) error

Register adds a new trigger definition to the runtime.

func (*TriggerEngine) SessionObserver

func (e *TriggerEngine) SessionObserver() session.Notifier

SessionObserver exposes the existing session notifier shape for internal lifecycle ingress.

func (*TriggerEngine) Shutdown

func (e *TriggerEngine) Shutdown(ctx context.Context) error

Shutdown marks the runtime as stopped and clears registered triggers.

func (*TriggerEngine) Start

func (e *TriggerEngine) Start(ctx context.Context) error

Start validates the runtime start contract. Trigger matching is synchronous so no background work begins here.

func (*TriggerEngine) Unregister

func (e *TriggerEngine) Unregister(id string) error

Unregister removes one trigger registration by id.

func (*TriggerEngine) Update

func (e *TriggerEngine) Update(registration TriggerRegistration) error

Update replaces an existing trigger registration.

type TriggerEngineOption

type TriggerEngineOption func(*TriggerEngine)

TriggerEngineOption customizes trigger runtime behavior.

func WithTriggerEngineHookSessionResolver

func WithTriggerEngineHookSessionResolver(resolver HookSessionResolver) TriggerEngineOption

WithTriggerEngineHookSessionResolver injects session lookup support for hook-completion ingress.

func WithTriggerEngineLogger

func WithTriggerEngineLogger(logger *slog.Logger) TriggerEngineOption

WithTriggerEngineLogger overrides the trigger-engine logger.

func WithTriggerEngineNow

func WithTriggerEngineNow(now func() time.Time) TriggerEngineOption

WithTriggerEngineNow overrides the trigger-engine clock.

func WithTriggerEngineWebhookDeliveryStore

func WithTriggerEngineWebhookDeliveryStore(store WebhookDeliveryStore) TriggerEngineOption

WithTriggerEngineWebhookDeliveryStore injects durable replay protection for webhook delivery IDs.

func WithTriggerEngineWebhookFreshnessWindow

func WithTriggerEngineWebhookFreshnessWindow(window time.Duration) TriggerEngineOption

WithTriggerEngineWebhookFreshnessWindow overrides the accepted webhook clock skew.

func WithTriggerEngineWebhookSecretResolver

func WithTriggerEngineWebhookSecretResolver(resolver WebhookSecretResolver) TriggerEngineOption

WithTriggerEngineWebhookSecretResolver injects the vault-backed resolver for webhook auth refs.

type TriggerListQuery

type TriggerListQuery = modelpkg.TriggerListQuery

TriggerListQuery filters persisted automation trigger listings.

type TriggerRegistration

type TriggerRegistration struct {
	Trigger Trigger `json:"trigger"`
	// contains filtered or unexported fields
}

TriggerRegistration stores one runtime trigger definition plus write-only webhook auth material.

func (TriggerRegistration) Validate

func (r TriggerRegistration) Validate(path string) error

Validate ensures the runtime registration is internally consistent.

type TriggerResult

type TriggerResult struct {
	Matched int   `json:"matched"`
	Runs    []Run `json:"runs,omitempty"`
}

TriggerResult reports how many triggers matched one activation and which runs were created.

type WebhookDeliveryStore

type WebhookDeliveryStore interface {
	CreateRun(ctx context.Context, run Run) (Run, error)
	GetRun(ctx context.Context, id string) (Run, error)
	DeleteRun(ctx context.Context, id string) error
}

WebhookDeliveryStore persists authenticated delivery claims across trigger-engine restarts.

type WebhookRequest

type WebhookRequest struct {
	Scope       Scope          `json:"scope"`
	WorkspaceID string         `json:"workspace_id,omitempty"`
	Endpoint    string         `json:"endpoint"`
	DeliveryID  string         `json:"delivery_id"`
	Timestamp   time.Time      `json:"timestamp"`
	Signature   string         `json:"signature"`
	Payload     []byte         `json:"payload,omitempty"`
	Data        map[string]any `json:"data,omitempty"`
}

WebhookRequest is the transport-neutral webhook delivery input consumed by the trigger engine.

func (WebhookRequest) Validate

func (r WebhookRequest) Validate(path string) error

Validate ensures the request can be normalized before any dispatch occurs.

type WebhookSecretResolver

type WebhookSecretResolver interface {
	ResolveRef(ctx context.Context, ref string) (string, error)
}

WebhookSecretResolver resolves a persisted webhook secret reference.

type WebhookSecretStore

type WebhookSecretStore interface {
	WebhookSecretResolver
	PutSecret(ctx context.Context, ref string, kind string, value string) (vault.Metadata, error)
	DeleteSecret(ctx context.Context, ref string) error
}

WebhookSecretStore persists daemon-managed webhook secret values.

type WebhookSecretWrite

type WebhookSecretWrite struct {
	Ref   string
	Value *string
}

WebhookSecretWrite carries the optional write-only webhook secret mutation.

Directories

Path Synopsis
Package model defines the transport-agnostic automation domain model shared by configuration, persistence, runtime, and API layers.
Package model defines the transport-agnostic automation domain model shared by configuration, persistence, runtime, and API layers.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL