Documentation
¶
Overview ¶
Package loop implements persistent goroutine-based delegate loops — lightweight autonomous observers that run continuously alongside the main agent, reporting via output targets without blocking conversation flow.
Loops are the universal primitive replacing three currently-separate systems (metacognitive loop, anticipations, observers) with a single Registry.SpawnLoop abstraction. Each loop is a background goroutine that iterates on a randomized bounded sleep schedule, running LLM iterations via the agent runner and optionally writing observations to output targets.
A Registry tracks all active loops and provides visibility into what is running, their health, and resource usage. It enforces concurrency limits and coordinates graceful shutdown.
See issue #509 for the full design.
Index ¶
- Constants
- Variables
- func Float64Ptr(v float64) *float64
- func IterationSummary(ctx context.Context) map[string]any
- func ProgressFunc(ctx context.Context) func(string, map[string]any)
- func SpawnDemoLoops(ctx context.Context, registry *Registry, eventBus *events.Bus, ...) error
- type Config
- type Deps
- type IterationResult
- type IterationSnapshot
- type Loop
- type RandSource
- type Registry
- func (r *Registry) ActiveCount() int
- func (r *Registry) Deregister(id string)
- func (r *Registry) Get(id string) *Loop
- func (r *Registry) GetByName(name string) *Loop
- func (r *Registry) List() []*Loop
- func (r *Registry) Register(l *Loop) error
- func (r *Registry) ShutdownAll(ctx context.Context) int
- func (r *Registry) SpawnLoop(ctx context.Context, cfg Config, deps Deps) (string, error)
- func (r *Registry) Statuses() []Status
- func (r *Registry) StopLoop(id string) error
- type RegistryOption
- type RetriggerMode
- type RunMessage
- type RunRequest
- type RunResponse
- type Runner
- type State
- type Status
- type StreamCallback
Constants ¶
const ( DefaultSleepMin = 30 * time.Second DefaultSleepMax = 5 * time.Minute DefaultSleepDefault = 1 * time.Minute DefaultJitter = 0.2 DefaultSupervisorProb = 0.1 )
Default configuration values. Exported so callers can reference them when building Config values without memorizing magic numbers.
Variables ¶
var ErrLoopStopped = errors.New("loop: cannot start a stopped loop")
ErrLoopStopped is returned by Loop.Start when the loop has already been stopped. A stopped loop cannot be restarted.
var ErrNilRunner = errors.New("loop: Runner is required")
ErrNilRunner is returned by New when Deps.Runner is nil.
Functions ¶
func Float64Ptr ¶
Float64Ptr returns a pointer to v. Use it to set optional *float64 config fields like [Config.Jitter]:
Config{Jitter: loop.Float64Ptr(0.3)} // custom jitter
Config{Jitter: loop.Float64Ptr(0)} // explicitly no jitter
Config{} // nil → DefaultJitter
func IterationSummary ¶
IterationSummary returns the summary map for the current iteration from the handler context, or nil if not available. Handler implementations call this to report per-iteration metrics to the dashboard timeline. Values should be small scalars (int, string, bool) to keep SSE payloads compact.
func ProgressFunc ¶
ProgressFunc returns the loop's progress callback from the handler context, or nil if not available. Handler implementations that dispatch LLM calls can use this to forward in-flight events (tool calls, LLM responses) to the event bus for dashboard visibility.
The returned function has signature func(kind string, data map[string]any) where kind is an events.KindLoop* constant and data carries the event payload.
func SpawnDemoLoops ¶
func SpawnDemoLoops(ctx context.Context, registry *Registry, eventBus *events.Bus, logger *slog.Logger) error
SpawnDemoLoops creates simulated loops covering all visual categories, parent/child hierarchies, error states, and node churn. Intended for dashboard layout iteration without real service dependencies.
Types ¶
type Config ¶
type Config struct {
// Name is the unique identifier for this loop. Required.
Name string
// Task is the LLM prompt for each iteration. It describes what
// the loop should observe, check, or do on each wake.
Task string
// Tags are capability tags for tool scoping. When non-empty,
// the loop's tool registry is filtered to tools matching these
// tags (plus always-active tags).
Tags []string
// ExcludeTools lists tool names to exclude from the loop's
// available tools.
ExcludeTools []string
// SleepMin is the minimum sleep duration between iterations.
// Default: 30s.
SleepMin time.Duration
// SleepMax is the maximum sleep duration between iterations.
// Default: 5m.
SleepMax time.Duration
// SleepDefault is the initial sleep duration before the loop
// self-adjusts. Default: 1m.
SleepDefault time.Duration
// Jitter is the randomization factor applied to sleep durations
// to break periodicity. Range [0.0, 1.0]. Nil is defaulted to
// DefaultJitter (0.2) by applyDefaults. Use Float64Ptr(0) to
// explicitly disable jitter.
Jitter *float64
// MaxDuration is the maximum wall-clock time the loop may run.
// Zero means unlimited.
MaxDuration time.Duration
// MaxIter is the maximum number of iteration attempts the loop
// may make (including failures). Zero means unlimited.
MaxIter int
// Supervisor enables frontier model dice rolls. When true, a
// fraction of iterations (controlled by SupervisorProb) use a
// more capable model for oversight.
Supervisor bool
// SupervisorProb is the probability [0.0, 1.0] that a given
// iteration uses the supervisor model. Only meaningful when
// Supervisor is true. Zero means never (use DefaultSupervisorProb
// for the recommended default).
SupervisorProb float64
// QualityFloor is the minimum model quality rating for normal
// iterations. Zero uses the router default.
QualityFloor int
// SupervisorContext is an optional prompt prepended to the Task
// during supervisor iterations. Use it to give the frontier model
// review instructions, recent iteration summaries, or oversight
// criteria. Empty means supervisor runs the same Task as normal.
SupervisorContext string
// SupervisorQualityFloor is the minimum model quality rating
// for supervisor iterations. Zero uses the router default.
SupervisorQualityFloor int
// OnRetrigger determines behavior when the loop's start
// condition fires again while running. Default: RetriggerSingle.
OnRetrigger RetriggerMode
// TaskBuilder is called per-iteration to generate a dynamic prompt.
// When set, the static Task field is ignored. The isSupervisor
// argument indicates whether this is a supervisor iteration.
TaskBuilder func(ctx context.Context, isSupervisor bool) (string, error) `json:"-"`
// PostIterate is called after each successful iteration. Use it
// for side effects like appending iteration logs. Errors are
// logged but do not count as iteration failures.
PostIterate func(ctx context.Context, result IterationResult) error `json:"-"`
// WaitFunc blocks until an external event arrives. When set, the
// loop enters [StateWaiting] and calls WaitFunc instead of
// sleeping between iterations. The returned value is passed to
// Handler (if set) or discarded for LLM-based loops. If WaitFunc
// returns a non-context error, the loop treats it as an iteration
// error (backoff + retry).
WaitFunc func(ctx context.Context) (any, error) `json:"-"`
// Handler processes each iteration directly without an LLM call.
// When set, [Deps].Runner is not required. Receives the event
// from WaitFunc (nil for timer-triggered loops). Handler-only
// loops still track iterations, errors, and health.
Handler func(ctx context.Context, event any) error `json:"-"`
// Hints are merged into RunRequest hints for each iteration.
// Config hints override loop-generated defaults (e.g., setting
// "source" to "metacognitive" instead of "loop").
Hints map[string]string
// Setup is called by [Registry.SpawnLoop] after [New] but before
// [Loop.Start]. Use it to register tools or perform other setup
// that requires a *Loop reference before the goroutine launches.
Setup func(l *Loop) `json:"-"`
// Metadata holds arbitrary key/value pairs for the loop.
Metadata map[string]string
// ParentID is the loop ID of the parent that spawned this loop,
// if any. Empty for top-level loops.
ParentID string
}
Config holds the configuration for a loop. All fields with zero values use sensible defaults.
type Deps ¶
type Deps struct {
// Runner executes LLM iterations. Required unless [Config].Handler
// is set (Handler-only loops do not call the Runner).
Runner Runner
// Logger for loop operations. Defaults to slog.Default().
Logger *slog.Logger
// EventBus publishes loop lifecycle events. Nil disables events.
EventBus *events.Bus
// Rand provides randomness for sleep jitter and supervisor dice.
// Nil uses math/rand/v2 default.
Rand RandSource
}
Deps holds injected dependencies for a loop. Using a struct avoids a growing parameter list as loops evolve.
type IterationResult ¶
type IterationResult struct {
// ConvID is the conversation ID for this iteration.
ConvID string
// Model is the LLM model used for this iteration.
Model string
// InputTokens is the number of input tokens consumed.
InputTokens int
// OutputTokens is the number of output tokens produced.
OutputTokens int
// ContextWindow is the maximum context size (in tokens) of the model used.
ContextWindow int
// ToolsUsed maps tool names to invocation counts.
ToolsUsed map[string]int
// Elapsed is the wall-clock duration of the iteration.
Elapsed time.Duration
// Supervisor indicates whether this was a supervisor iteration.
Supervisor bool
// Sleep is the computed sleep duration before the next iteration.
Sleep time.Duration
}
IterationResult holds data from a completed loop iteration, passed to [Config.PostIterate] callbacks.
type IterationSnapshot ¶
type IterationSnapshot struct {
// Number is the 1-based iteration number (matches the loop's
// cumulative iteration counter at the time of completion).
Number int `json:"number"`
// ConvID is the conversation ID used for this iteration.
ConvID string `json:"conv_id,omitempty"`
// Model is the LLM model used.
Model string `json:"model,omitempty"`
// InputTokens consumed by this iteration.
InputTokens int `json:"input_tokens,omitempty"`
// OutputTokens produced by this iteration.
OutputTokens int `json:"output_tokens,omitempty"`
// ContextWindow is the model's maximum context size in tokens.
ContextWindow int `json:"context_window,omitempty"`
// ToolsUsed maps tool names to invocation counts.
ToolsUsed map[string]int `json:"tools_used,omitempty"`
// ElapsedMs is the wall-clock duration of the iteration in
// milliseconds. Stored as int64 (not time.Duration) so the JSON
// value is directly usable by the client without nanosecond
// conversion.
ElapsedMs int64 `json:"elapsed_ms"`
// Supervisor indicates whether this was a supervisor iteration.
Supervisor bool `json:"supervisor,omitempty"`
// Error holds the error message if the iteration failed.
Error string `json:"error,omitempty"`
// StartedAt is when the iteration began.
StartedAt time.Time `json:"started_at"`
// CompletedAt is when the iteration finished.
CompletedAt time.Time `json:"completed_at"`
// SleepAfterMs is the computed sleep duration (in milliseconds)
// following this iteration. Zero for WaitFunc-based loops.
SleepAfterMs int64 `json:"sleep_after_ms,omitempty"`
// WaitAfter is true when the loop entered WaitFunc after this
// iteration instead of sleeping.
WaitAfter bool `json:"wait_after,omitempty"`
// Summary holds handler-reported metrics for this iteration,
// written by handlers via [IterationSummary] during execution.
// Values should be small scalars (int, string, bool).
Summary map[string]any `json:"summary,omitempty"`
}
IterationSnapshot is a serializable summary of a completed loop iteration, retained in a ring buffer for the dashboard timeline.
type Loop ¶
type Loop struct {
// contains filtered or unexported fields
}
Loop is a persistent background goroutine that iterates on a timer or in response to external events. Each iteration runs an LLM call via the agent runner, or a direct [Config.Handler] function for infrastructure loops that don't need an LLM. Create with New, start with [Start], stop with [Stop].
func New ¶
New creates a loop with the given configuration and dependencies. Returns an error if required fields are missing or invalid. Call Loop.Start to launch the background goroutine.
func (*Loop) CurrentConvID ¶
CurrentConvID returns the conversation ID of the in-flight iteration, or empty string if no iteration is running. Tool handlers use this to tag their outputs with the current conversation.
func (*Loop) Done ¶
func (l *Loop) Done() <-chan struct{}
Done returns a channel that is closed when the loop's goroutine exits. Returns nil if the loop has not been started.
func (*Loop) SetNextSleep ¶
SetNextSleep sets the sleep duration for the next cycle. This is intended for tool handlers (e.g., set_next_sleep) to communicate the LLM's chosen sleep duration back to the loop.
func (*Loop) Start ¶
Start launches the background goroutine. Calling Start on an already running loop is a no-op (returns nil). Returns ErrLoopStopped if Loop.Stop was called before Start. The goroutine runs until ctx is cancelled or Loop.Stop is called.
func (*Loop) Status ¶
Status returns a snapshot of the loop's current state and metrics. The returned Config is a deep copy; callers cannot mutate loop state via the snapshot.
func (*Loop) Stop ¶
func (l *Loop) Stop()
Stop cancels the loop and waits for the goroutine to exit. Safe to call multiple times or before Start. After Stop, Loop.Start will return ErrLoopStopped. Blocks until the goroutine exits or 10 seconds elapse.
type RandSource ¶
type RandSource interface {
Float64() float64
}
RandSource abstracts randomness for deterministic testing.
type Registry ¶
type Registry struct {
// contains filtered or unexported fields
}
Registry tracks all active loops and provides visibility into what is running. It enforces concurrency limits and coordinates graceful shutdown.
func NewRegistry ¶
func NewRegistry(opts ...RegistryOption) *Registry
NewRegistry creates a new loop registry.
func (*Registry) ActiveCount ¶
ActiveCount returns the number of registered loops.
func (*Registry) Deregister ¶
Deregister removes a loop from the registry. Safe to call for a loop that is not registered (no-op).
func (*Registry) GetByName ¶
GetByName returns the first loop with the given name, or nil if not found. If multiple loops share a name, the result is undefined.
func (*Registry) Register ¶
Register adds a loop to the registry. Returns an error if the loop's ID is already registered or the concurrency limit would be exceeded. The loop is not started — call Loop.Start after registering.
func (*Registry) ShutdownAll ¶
ShutdownAll cancels all registered loops and waits for them to drain. The provided context controls the maximum time to wait; if it expires, remaining loops are abandoned. Returns the number of loops that were stopped.
func (*Registry) SpawnLoop ¶
SpawnLoop creates a new loop with the given config, registers it, and starts it. This is the primary entry point for creating loops. Returns the loop ID on success.
func (*Registry) Statuses ¶
Statuses returns a snapshot of all registered loop statuses sorted by name.
type RegistryOption ¶
type RegistryOption func(*Registry)
RegistryOption configures a Registry.
func WithMaxLoops ¶
func WithMaxLoops(n int) RegistryOption
WithMaxLoops sets the maximum number of concurrent loops the registry will allow. Zero means unlimited.
func WithRegistryLogger ¶
func WithRegistryLogger(l *slog.Logger) RegistryOption
WithRegistryLogger sets the logger for registry operations. Nil is ignored (keeps slog.Default()).
type RetriggerMode ¶
type RetriggerMode int
RetriggerMode determines what happens when a loop's start condition fires again while the loop is already running.
const ( // RetriggerSingle ignores re-triggers while the loop is running. RetriggerSingle RetriggerMode = iota // RetriggerRestart cancels the current loop and starts fresh. RetriggerRestart // RetriggerQueue queues the trigger and runs after current completes. RetriggerQueue // RetriggerSpawn spawns another instance of the loop. RetriggerSpawn )
type RunMessage ¶
RunMessage is a chat message for the runner.
type RunRequest ¶
type RunRequest struct {
ConversationID string
Messages []RunMessage
ExcludeTools []string
SkipTagFilter bool
Hints map[string]string
// OnProgress is called by the Runner during execution to report
// in-flight activity (tool calls, LLM responses). The kind
// parameter maps to an [events.Kind] constant; data holds
// event-specific fields. The loop automatically injects loop_id
// and loop_name into data before publishing. Nil means no
// progress reporting.
OnProgress func(kind string, data map[string]any) `json:"-"`
}
RunRequest mirrors the fields of agent.Request that loops need. The loop package defines its own type to avoid importing agent.
type RunResponse ¶
type RunResponse struct {
Content string
Model string
InputTokens int
OutputTokens int
ContextWindow int
ToolsUsed map[string]int
}
RunResponse mirrors agent.Response fields that loops consume. RunResponse holds the result of an LLM call executed by a Runner.
type Runner ¶
type Runner interface {
Run(ctx context.Context, req RunRequest, stream StreamCallback) (*RunResponse, error)
}
Runner abstracts the agent loop for LLM calls. Satisfied by *agent.Loop. Defined here to avoid a circular import.
type State ¶
type State string
State represents the lifecycle state of a running loop.
const ( // StatePending means the loop is registered but not yet started // (e.g., waiting for a StartWhen condition). StatePending State = "pending" // StateSleeping means the loop is between iterations, waiting for // the next wake. StateSleeping State = "sleeping" // StateWaiting means the loop is blocked on a WaitFunc, waiting // for an external event to trigger the next iteration. StateWaiting State = "waiting" // StateProcessing means the loop is actively running an iteration // (LLM call or Handler execution). StateProcessing State = "processing" // StateError means the loop's last iteration failed. It will // retry on the next sleep cycle. StateError State = "error" // StateStopped means the loop has been cancelled and is no // longer running. StateStopped State = "stopped" )
Loop states.
type Status ¶
type Status struct {
// ID is the unique loop identifier.
ID string `json:"id"`
// Name is the human-readable loop name.
Name string `json:"name"`
// State is the current lifecycle state.
State State `json:"state"`
// ParentID is the ID of the parent loop, if any.
ParentID string `json:"parent_id,omitempty"`
// StartedAt is when the loop was started.
StartedAt time.Time `json:"started_at"`
// LastWakeAt is when the loop last began an iteration.
LastWakeAt time.Time `json:"last_wake_at,omitempty"`
// Iterations is the total number of completed (successful) iterations.
Iterations int `json:"iterations"`
// Attempts is the total number of iteration attempts (including failures).
Attempts int `json:"attempts"`
// TotalInputTokens is the cumulative input tokens across all iterations.
TotalInputTokens int `json:"total_input_tokens"`
// TotalOutputTokens is the cumulative output tokens across all iterations.
TotalOutputTokens int `json:"total_output_tokens"`
// LastInputTokens is the input token count from the most recent iteration.
LastInputTokens int `json:"last_input_tokens,omitempty"`
// LastOutputTokens is the output token count from the most recent iteration.
LastOutputTokens int `json:"last_output_tokens,omitempty"`
// ContextWindow is the maximum context size (in tokens) of the model used.
ContextWindow int `json:"context_window,omitempty"`
// LastError is the error message from the most recent failed iteration.
LastError string `json:"last_error,omitempty"`
// ConsecutiveErrors is the number of consecutive failed iterations.
ConsecutiveErrors int `json:"consecutive_errors"`
// RecentConvIDs holds conversation IDs from the most recent iterations
// (up to 10), newest first. Used by the visualizer to query log entries
// scoped to this loop.
RecentConvIDs []string `json:"recent_conv_ids,omitempty"`
// HandlerOnly is true when the loop uses a Handler instead of LLM
// iterations. Handler-only loops have no token metrics.
HandlerOnly bool `json:"handler_only,omitempty"`
// EventDriven is true when the loop uses a WaitFunc instead of
// timer-based sleeping.
EventDriven bool `json:"event_driven,omitempty"`
// RecentIterations holds up to 10 completed iteration snapshots
// (newest first), used by the dashboard timeline.
RecentIterations []IterationSnapshot `json:"recent_iterations,omitempty"`
// LastSupervisorIter is the iteration number of the most recent
// successful supervisor iteration. Zero means no supervisor
// iteration has completed yet.
LastSupervisorIter int `json:"last_supervisor_iter,omitempty"`
// LLMContext holds enrichment data from the most recent
// loop_llm_start event (model, est_tokens, messages, tools,
// complexity, intent, reasoning). Only populated while the loop
// is in processing state, so late-connecting dashboard clients
// can display it immediately.
LLMContext map[string]any `json:"llm_context,omitempty"`
// Config is a copy of the loop's configuration.
Config Config `json:"config"`
}
Status is a snapshot of a loop's current state and metrics, suitable for external inspection via the registry.
type StreamCallback ¶
type StreamCallback func(event any)
StreamCallback receives streaming events. Nil disables streaming.