Documentation
¶
Overview ¶
Copyright 2026 Teradata
Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
Copyright 2026 Teradata ¶
Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
Copyright 2026 Teradata ¶
Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
Copyright 2026 Teradata ¶
Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. Package interrupt provides a dedicated communication channel for agent interrupts.
This is the 4th channel in Loom's quad-modal communication system: 1. MESSAGE QUEUE - ordered, persistent task delivery 2. SHARED MEMORY - KV store for agent state 3. BROADCAST BUS - pub/sub for events 4. INTERRUPT CHANNEL - targeted, guaranteed signal delivery (THIS PACKAGE)
Interrupts are semantically different from broadcasts: - Targeted (not anonymous pub/sub) - Guaranteed delivery for CRITICAL priority - Type-safe enums (not string topics) - Fast path (<1ms) + Slow path (persistent SQLite queue)
Signal Priority Ranges: - 0-9: CRITICAL (guaranteed delivery, persistent queue, <1s) - 10-19: HIGH (best-effort, <5s) - 20-29: NORMAL (best-effort, <30s) - 30-39: LOW (background) - 40-49: LEARNING (autonomous learning triggers) - 1000+: CUSTOM (user-defined signals)
Index ¶
- type Handler
- type HandlerRegistration
- type Interrupt
- type InterruptChannel
- func (ic *InterruptChannel) Broadcast(ctx context.Context, signal InterruptSignal, payload []byte) error
- func (ic *InterruptChannel) BroadcastFrom(ctx context.Context, signal InterruptSignal, payload []byte, senderID string) error
- func (ic *InterruptChannel) Close() error
- func (ic *InterruptChannel) GetStats() (sent, dropped, retried int64)
- func (ic *InterruptChannel) ListAgents() []string
- func (ic *InterruptChannel) ListHandlers(agentID string) []InterruptSignal
- func (ic *InterruptChannel) RegisterHandler(agentID string, signal InterruptSignal, handler Handler, wakeOnSignal bool) error
- func (ic *InterruptChannel) Send(ctx context.Context, signal InterruptSignal, targetAgentID string, ...) error
- func (ic *InterruptChannel) SendFrom(ctx context.Context, signal InterruptSignal, targetAgentID string, ...) error
- func (ic *InterruptChannel) SetHooks(onSend func(i *Interrupt), onDelivered func(i *Interrupt, agentID string), ...)
- func (ic *InterruptChannel) UnregisterHandler(agentID string, signal InterruptSignal) error
- func (ic *InterruptChannel) WithTracer(tracer observability.Tracer) *InterruptChannel
- type InterruptSignal
- type PersistentQueue
- func (pq *PersistentQueue) Acknowledge(ctx context.Context, id int64) error
- func (pq *PersistentQueue) ClearOld(ctx context.Context, olderThan time.Duration) (int, error)
- func (pq *PersistentQueue) Close() error
- func (pq *PersistentQueue) Enqueue(ctx context.Context, interrupt *Interrupt) error
- func (pq *PersistentQueue) GetPendingCount(ctx context.Context) (int, error)
- func (pq *PersistentQueue) GetStats(ctx context.Context) (map[string]int, error)
- func (pq *PersistentQueue) ListPending(ctx context.Context, limit int) ([]*QueueEntry, error)
- func (pq *PersistentQueue) WithTracer(tracer observability.Tracer) *PersistentQueue
- type Priority
- type QueueEntry
- type Router
- func (r *Router) Close() error
- func (r *Router) GetStats() map[string]int
- func (r *Router) RegisterHandler(agentID string, signal InterruptSignal, handler Handler) error
- func (r *Router) Send(ctx context.Context, signal InterruptSignal, targetAgentID string, ...) (bool, error)
- func (r *Router) UnregisterHandler(agentID string, signal InterruptSignal) error
- func (r *Router) WithTracer(tracer observability.Tracer) *Router
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Handler ¶
type Handler func(ctx context.Context, signal InterruptSignal, payload []byte) error
Handler is a function that processes an interrupt signal. Handlers must be idempotent and fast (<100ms for non-CRITICAL, <10ms for CRITICAL). Long-running work should be dispatched asynchronously.
type HandlerRegistration ¶
type HandlerRegistration struct {
AgentID string // Agent that registered this handler
Signal InterruptSignal // Signal to handle
Handler Handler // Handler function
WakeOnSignal bool // If true, wake DORMANT agent when signal received
}
HandlerRegistration tracks a registered interrupt handler.
type Interrupt ¶
type Interrupt struct {
ID string // Unique interrupt ID for tracing
TraceID string // Trace ID for correlation across system
Signal InterruptSignal // The interrupt signal
TargetID string // Target agent ID (empty for broadcast)
Payload []byte // Optional payload (JSON recommended)
Timestamp time.Time // When the interrupt was created
SenderID string // ID of the sender (for tracing)
}
Interrupt represents a single interrupt message.
type InterruptChannel ¶
type InterruptChannel struct {
// contains filtered or unexported fields
}
InterruptChannel is the 4th communication channel in Loom's quad-modal system. It provides targeted, guaranteed interrupt delivery with type-safe enums.
Architecture: - Fast path: Go channels with large buffers (<1ms delivery) - Slow path: Persistent SQLite queue for CRITICAL signals (guaranteed delivery) - Router: Multiplexes signals to registered handlers - Type-safe: Compile-time signal validation via enums
Usage:
ic := interrupt.NewInterruptChannel(ctx, router, queue)
defer ic.Close()
// Register handler
ic.RegisterHandler("my-agent", SignalEmergencyStop, myHandler, true)
// Send to specific agent
ic.Send(ctx, SignalEmergencyStop, "my-agent", payload)
// Broadcast to all handlers for signal
ic.Broadcast(ctx, SignalSystemShutdown, payload)
func NewInterruptChannel ¶
func NewInterruptChannel(ctx context.Context, router *Router, queue *PersistentQueue) *InterruptChannel
NewInterruptChannel creates a new interrupt channel. The router handles fast-path delivery, and queue handles slow-path (CRITICAL only).
func (*InterruptChannel) Broadcast ¶
func (ic *InterruptChannel) Broadcast( ctx context.Context, signal InterruptSignal, payload []byte, ) error
Broadcast sends an interrupt to all registered handlers for the signal. For CRITICAL signals, falls back to persistent queue for any failed deliveries.
func (*InterruptChannel) BroadcastFrom ¶
func (ic *InterruptChannel) BroadcastFrom( ctx context.Context, signal InterruptSignal, payload []byte, senderID string, ) error
BroadcastFrom broadcasts an interrupt with explicit sender ID (for tracing).
func (*InterruptChannel) Close ¶
func (ic *InterruptChannel) Close() error
Close shuts down the interrupt channel gracefully.
func (*InterruptChannel) GetStats ¶
func (ic *InterruptChannel) GetStats() (sent, dropped, retried int64)
GetStats returns current interrupt channel statistics.
func (*InterruptChannel) ListAgents ¶
func (ic *InterruptChannel) ListAgents() []string
ListAgents returns all agent IDs with registered handlers.
func (*InterruptChannel) ListHandlers ¶
func (ic *InterruptChannel) ListHandlers(agentID string) []InterruptSignal
ListHandlers returns all registered handlers for an agent.
func (*InterruptChannel) RegisterHandler ¶
func (ic *InterruptChannel) RegisterHandler( agentID string, signal InterruptSignal, handler Handler, wakeOnSignal bool, ) error
RegisterHandler registers a handler for a specific signal on an agent. If wakeOnSignal is true, the agent will be awakened from DORMANT state when this signal is received. Returns an error if the agent/signal combination is already registered.
func (*InterruptChannel) Send ¶
func (ic *InterruptChannel) Send( ctx context.Context, signal InterruptSignal, targetAgentID string, payload []byte, ) error
Send sends an interrupt to a specific agent. For CRITICAL signals, falls back to persistent queue if fast path fails.
func (*InterruptChannel) SendFrom ¶
func (ic *InterruptChannel) SendFrom( ctx context.Context, signal InterruptSignal, targetAgentID string, payload []byte, senderID string, ) error
SendFrom sends an interrupt with explicit sender ID (for tracing).
func (*InterruptChannel) SetHooks ¶
func (ic *InterruptChannel) SetHooks( onSend func(i *Interrupt), onDelivered func(i *Interrupt, agentID string), onDropped func(i *Interrupt, reason string), )
SetHooks sets lifecycle hooks for testing and observability.
func (*InterruptChannel) UnregisterHandler ¶
func (ic *InterruptChannel) UnregisterHandler(agentID string, signal InterruptSignal) error
UnregisterHandler removes a handler for a specific signal on an agent.
func (*InterruptChannel) WithTracer ¶
func (ic *InterruptChannel) WithTracer(tracer observability.Tracer) *InterruptChannel
WithTracer sets an optional tracer for observability. This enables Hawk integration for spans, metrics, and tracing.
type InterruptSignal ¶
type InterruptSignal int
InterruptSignal represents a predefined interrupt type. Signals are type-safe enums that prevent typos and enable compile-time validation.
const ( // SignalEmergencyStop immediately halts all agent operations. // Used for: Critical system failures, data corruption, security breaches SignalEmergencyStop InterruptSignal = 0 // SignalSystemShutdown initiates graceful system-wide shutdown sequence. // Used for: Maintenance windows, cluster rebalancing, emergency drains SignalSystemShutdown InterruptSignal = 1 // SignalThresholdCritical alerts on critical threshold breach (SLA violations, resource exhaustion). // Used for: Database connection pool exhausted, memory >95%, disk full SignalThresholdCritical InterruptSignal = 2 // SignalDatabaseDown signals complete database unavailability. // Used for: Connection failures, cluster down, network partition SignalDatabaseDown InterruptSignal = 3 // SignalSecurityBreach signals confirmed security incident. // Used for: Unauthorized access, injection attempts, credential leaks SignalSecurityBreach InterruptSignal = 4 // SignalThresholdHigh alerts on high threshold breach (early warning). // Used for: Memory >80%, CPU sustained >70%, query latency P99 >5s SignalThresholdHigh InterruptSignal = 10 // SignalAlertSecurity signals suspicious activity requiring investigation. // Used for: Failed login attempts, unusual access patterns, rate limit hits SignalAlertSecurity InterruptSignal = 11 // SignalAlertError signals non-critical errors requiring attention. // Used for: Query failures, timeout errors, retry exhaustion (non-critical) SignalAlertError InterruptSignal = 12 // SignalResourceExhausted signals resource pressure (not yet critical). // Used for: Connection pool pressure, cache eviction storms, queue backlog SignalResourceExhausted InterruptSignal = 13 // SignalWakeup awakens a DORMANT agent to ACTIVE state. // Used for: Scheduled workflows, cron triggers, on-demand activation SignalWakeup InterruptSignal = 20 // SignalGracefulShutdown initiates graceful agent shutdown (not system-wide). // Used for: Agent TTL expiry, idle timeout, resource cleanup SignalGracefulShutdown InterruptSignal = 21 // SignalHealthCheck requests health status from agent. // Used for: Kubernetes liveness probes, orchestrator health checks SignalHealthCheck InterruptSignal = 22 // SignalConfigReload triggers hot-reload of agent configuration. // Used for: Pattern updates, prompt changes, settings refresh SignalConfigReload InterruptSignal = 23 // SignalMetricsCollection triggers metrics aggregation and reporting. // Used for: Periodic Hawk exports, cost tracking, performance profiling SignalMetricsCollection InterruptSignal = 30 // SignalLogRotation triggers log file rotation and archival. // Used for: Periodic log cleanup, compression, upload to storage SignalLogRotation InterruptSignal = 31 // SignalLearningAnalyze triggers pattern analysis on recent executions. // Used for: Post-execution pattern mining, performance profiling SignalLearningAnalyze InterruptSignal = 40 // SignalLearningOptimize triggers DSPy optimization on current prompts. // Used for: Scheduled prompt tuning, A/B test winner selection SignalLearningOptimize InterruptSignal = 41 // SignalLearningABTest starts A/B test for new pattern vs baseline. // Used for: Validating learned patterns, comparing prompt variations SignalLearningABTest InterruptSignal = 42 // SignalLearningProposal notifies that a new pattern proposal is ready for review. // Used for: Async learning cycles, human-in-the-loop approval SignalLearningProposal InterruptSignal = 43 // SignalLearningValidate triggers validation of learned patterns against held-out data. // Used for: Post-optimization validation, regression testing SignalLearningValidate InterruptSignal = 44 // SignalLearningExport triggers export of learned knowledge to Promptio. // Used for: Persisting approved patterns, cross-agent knowledge sharing SignalLearningExport InterruptSignal = 45 // SignalLearningSync synchronizes learned patterns with other agents. // Used for: Multi-agent learning, cluster-wide pattern distribution SignalLearningSync InterruptSignal = 46 // SignalCustomBase is the starting point for user-defined custom signals. // Example: const SignalMyCustomAlert = interrupt.SignalCustomBase + 1 SignalCustomBase InterruptSignal = 1000 )
func (InterruptSignal) IsCritical ¶
func (s InterruptSignal) IsCritical() bool
IsCritical returns true if this signal requires guaranteed delivery.
func (InterruptSignal) Priority ¶
func (s InterruptSignal) Priority() Priority
Priority returns the delivery priority for this signal based on its range.
func (InterruptSignal) String ¶
func (s InterruptSignal) String() string
String returns human-readable signal name.
type PersistentQueue ¶
type PersistentQueue struct {
// contains filtered or unexported fields
}
PersistentQueue provides guaranteed delivery for CRITICAL interrupts. Uses SQLite for persistence and implements retry logic with exponential backoff.
Design: - SQLite database for persistence (survives process restarts) - Retry loop with exponential backoff (100ms, 200ms, 400ms, ..., up to 30s) - Maximum 50 retry attempts per interrupt - ACK protocol for delivery confirmation (delivered -> acknowledged) - Background goroutine processes queue every 100ms
func NewPersistentQueue ¶
func NewPersistentQueue(ctx context.Context, dbPath string, router *Router) (*PersistentQueue, error)
NewPersistentQueue creates a new persistent interrupt queue. dbPath is the path to the SQLite database file.
func (*PersistentQueue) Acknowledge ¶
func (pq *PersistentQueue) Acknowledge(ctx context.Context, id int64) error
Acknowledge marks an interrupt as acknowledged (delivery confirmed by handler). This provides confirmation that the handler successfully processed the interrupt. Use this after the handler completes its work to transition from 'delivered' to 'acknowledged'.
func (*PersistentQueue) ClearOld ¶
ClearOld removes old acknowledged interrupts from the queue (cleanup).
func (*PersistentQueue) Close ¶
func (pq *PersistentQueue) Close() error
Close shuts down the persistent queue gracefully.
func (*PersistentQueue) Enqueue ¶
func (pq *PersistentQueue) Enqueue(ctx context.Context, interrupt *Interrupt) error
Enqueue adds a CRITICAL interrupt to the persistent queue.
func (*PersistentQueue) GetPendingCount ¶
func (pq *PersistentQueue) GetPendingCount(ctx context.Context) (int, error)
GetPendingCount returns the number of pending interrupts in the queue.
func (*PersistentQueue) ListPending ¶
func (pq *PersistentQueue) ListPending(ctx context.Context, limit int) ([]*QueueEntry, error)
ListPending returns pending interrupts ordered by age.
func (*PersistentQueue) WithTracer ¶
func (pq *PersistentQueue) WithTracer(tracer observability.Tracer) *PersistentQueue
WithTracer sets an optional tracer for observability.
type Priority ¶
type Priority int
Priority defines interrupt delivery priority level.
const ( // PriorityCritical: <1s delivery, guaranteed via persistent queue, never dropped PriorityCritical Priority = 0 // PriorityHigh: <5s delivery, best-effort via large buffers (10k) PriorityHigh Priority = 1 // PriorityNormal: <30s delivery, best-effort via medium buffers (1k) PriorityNormal Priority = 2 // PriorityLow: background, best-effort via small buffers (100) PriorityLow Priority = 3 )
func (Priority) BufferSize ¶
BufferSize returns the recommended channel buffer size for this priority.
type QueueEntry ¶
type QueueEntry struct {
ID int64 `json:"id"`
Signal int `json:"signal"` // InterruptSignal as int
TargetID string `json:"target_id"` // Target agent ID
Payload string `json:"payload"` // JSON payload
SenderID string `json:"sender_id"` // Sender ID for tracing
CreatedAt time.Time `json:"created_at"` // When interrupt was created
EnqueuedAt time.Time `json:"enqueued_at"` // When interrupt was queued
DeliveredAt *time.Time `json:"delivered_at"` // When delivered (null if pending)
AckAt *time.Time `json:"ack_at"` // When acknowledged (null if pending)
RetryCount int `json:"retry_count"` // Number of retry attempts
State string `json:"state"` // pending, delivered, acknowledged, failed
ErrorMessage string `json:"error_message"` // Last error message (if any)
}
QueueEntry represents a persisted interrupt in the queue.
type Router ¶
type Router struct {
// contains filtered or unexported fields
}
Router handles fast-path interrupt delivery via Go channels. Each registered handler gets a dedicated channel with priority-appropriate buffer size.
Design: - Dedicated channel per handler (no multiplexing contention) - Non-blocking sends (returns false if buffer full) - Background goroutine processes each handler's queue - Graceful shutdown waits for in-flight handlers
func (*Router) Close ¶
Close shuts down the router gracefully. Waits for all in-flight handlers to complete (with timeout).
func (*Router) RegisterHandler ¶
func (r *Router) RegisterHandler(agentID string, signal InterruptSignal, handler Handler) error
RegisterHandler registers a handler for fast-path delivery. Creates a dedicated channel and background goroutine for this handler.
func (*Router) Send ¶
func (r *Router) Send(ctx context.Context, signal InterruptSignal, targetAgentID string, payload []byte) (bool, error)
Send attempts non-blocking delivery to a specific agent's handler. Returns (true, nil) if delivered successfully. Returns (false, nil) if buffer is full (caller should fall back to persistent queue). Returns (false, err) if handler not found.
func (*Router) UnregisterHandler ¶
func (r *Router) UnregisterHandler(agentID string, signal InterruptSignal) error
UnregisterHandler removes a handler and stops its background goroutine.
func (*Router) WithTracer ¶
func (r *Router) WithTracer(tracer observability.Tracer) *Router
WithTracer sets an optional tracer for observability.