interrupt

package
v1.0.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 15, 2026 License: Apache-2.0 Imports: 9 Imported by: 0

Documentation

Overview

Copyright 2026 Teradata

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. Package interrupt provides a dedicated communication channel for agent interrupts.

This is the 4th channel in Loom's quad-modal communication system: 1. MESSAGE QUEUE - ordered, persistent task delivery 2. SHARED MEMORY - KV store for agent state 3. BROADCAST BUS - pub/sub for events 4. INTERRUPT CHANNEL - targeted, guaranteed signal delivery (THIS PACKAGE)

Interrupts are semantically different from broadcasts: - Targeted (not anonymous pub/sub) - Guaranteed delivery for CRITICAL priority - Type-safe enums (not string topics) - Fast path (<1ms) + Slow path (persistent SQLite queue)

Signal Priority Ranges: - 0-9: CRITICAL (guaranteed delivery, persistent queue, <1s) - 10-19: HIGH (best-effort, <5s) - 20-29: NORMAL (best-effort, <30s) - 30-39: LOW (background) - 40-49: LEARNING (autonomous learning triggers) - 1000+: CUSTOM (user-defined signals)

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Handler

type Handler func(ctx context.Context, signal InterruptSignal, payload []byte) error

Handler is a function that processes an interrupt signal. Handlers must be idempotent and fast (<100ms for non-CRITICAL, <10ms for CRITICAL). Long-running work should be dispatched asynchronously.

type HandlerRegistration

type HandlerRegistration struct {
	AgentID      string          // Agent that registered this handler
	Signal       InterruptSignal // Signal to handle
	Handler      Handler         // Handler function
	WakeOnSignal bool            // If true, wake DORMANT agent when signal received
}

HandlerRegistration tracks a registered interrupt handler.

type Interrupt

type Interrupt struct {
	ID        string          // Unique interrupt ID for tracing
	TraceID   string          // Trace ID for correlation across system
	Signal    InterruptSignal // The interrupt signal
	TargetID  string          // Target agent ID (empty for broadcast)
	Payload   []byte          // Optional payload (JSON recommended)
	Timestamp time.Time       // When the interrupt was created
	SenderID  string          // ID of the sender (for tracing)
}

Interrupt represents a single interrupt message.

type InterruptChannel

type InterruptChannel struct {
	// contains filtered or unexported fields
}

InterruptChannel is the 4th communication channel in Loom's quad-modal system. It provides targeted, guaranteed interrupt delivery with type-safe enums.

Architecture: - Fast path: Go channels with large buffers (<1ms delivery) - Slow path: Persistent SQLite queue for CRITICAL signals (guaranteed delivery) - Router: Multiplexes signals to registered handlers - Type-safe: Compile-time signal validation via enums

Usage:

ic := interrupt.NewInterruptChannel(ctx, router, queue)
defer ic.Close()

// Register handler
ic.RegisterHandler("my-agent", SignalEmergencyStop, myHandler, true)

// Send to specific agent
ic.Send(ctx, SignalEmergencyStop, "my-agent", payload)

// Broadcast to all handlers for signal
ic.Broadcast(ctx, SignalSystemShutdown, payload)

func NewInterruptChannel

func NewInterruptChannel(ctx context.Context, router *Router, queue *PersistentQueue) *InterruptChannel

NewInterruptChannel creates a new interrupt channel. The router handles fast-path delivery, and queue handles slow-path (CRITICAL only).

func (*InterruptChannel) Broadcast

func (ic *InterruptChannel) Broadcast(
	ctx context.Context,
	signal InterruptSignal,
	payload []byte,
) error

Broadcast sends an interrupt to all registered handlers for the signal. For CRITICAL signals, falls back to persistent queue for any failed deliveries.

func (*InterruptChannel) BroadcastFrom

func (ic *InterruptChannel) BroadcastFrom(
	ctx context.Context,
	signal InterruptSignal,
	payload []byte,
	senderID string,
) error

BroadcastFrom broadcasts an interrupt with explicit sender ID (for tracing).

func (*InterruptChannel) Close

func (ic *InterruptChannel) Close() error

Close shuts down the interrupt channel gracefully.

func (*InterruptChannel) GetStats

func (ic *InterruptChannel) GetStats() (sent, dropped, retried int64)

GetStats returns current interrupt channel statistics.

func (*InterruptChannel) ListAgents

func (ic *InterruptChannel) ListAgents() []string

ListAgents returns all agent IDs with registered handlers.

func (*InterruptChannel) ListHandlers

func (ic *InterruptChannel) ListHandlers(agentID string) []InterruptSignal

ListHandlers returns all registered handlers for an agent.

func (*InterruptChannel) RegisterHandler

func (ic *InterruptChannel) RegisterHandler(
	agentID string,
	signal InterruptSignal,
	handler Handler,
	wakeOnSignal bool,
) error

RegisterHandler registers a handler for a specific signal on an agent. If wakeOnSignal is true, the agent will be awakened from DORMANT state when this signal is received. Returns an error if the agent/signal combination is already registered.

func (*InterruptChannel) Send

func (ic *InterruptChannel) Send(
	ctx context.Context,
	signal InterruptSignal,
	targetAgentID string,
	payload []byte,
) error

Send sends an interrupt to a specific agent. For CRITICAL signals, falls back to persistent queue if fast path fails.

func (*InterruptChannel) SendFrom

func (ic *InterruptChannel) SendFrom(
	ctx context.Context,
	signal InterruptSignal,
	targetAgentID string,
	payload []byte,
	senderID string,
) error

SendFrom sends an interrupt with explicit sender ID (for tracing).

func (*InterruptChannel) SetHooks

func (ic *InterruptChannel) SetHooks(
	onSend func(i *Interrupt),
	onDelivered func(i *Interrupt, agentID string),
	onDropped func(i *Interrupt, reason string),
)

SetHooks sets lifecycle hooks for testing and observability.

func (*InterruptChannel) UnregisterHandler

func (ic *InterruptChannel) UnregisterHandler(agentID string, signal InterruptSignal) error

UnregisterHandler removes a handler for a specific signal on an agent.

func (*InterruptChannel) WithTracer

func (ic *InterruptChannel) WithTracer(tracer observability.Tracer) *InterruptChannel

WithTracer sets an optional tracer for observability. This enables Hawk integration for spans, metrics, and tracing.

type InterruptSignal

type InterruptSignal int

InterruptSignal represents a predefined interrupt type. Signals are type-safe enums that prevent typos and enable compile-time validation.

const (

	// SignalEmergencyStop immediately halts all agent operations.
	// Used for: Critical system failures, data corruption, security breaches
	SignalEmergencyStop InterruptSignal = 0

	// SignalSystemShutdown initiates graceful system-wide shutdown sequence.
	// Used for: Maintenance windows, cluster rebalancing, emergency drains
	SignalSystemShutdown InterruptSignal = 1

	// SignalThresholdCritical alerts on critical threshold breach (SLA violations, resource exhaustion).
	// Used for: Database connection pool exhausted, memory >95%, disk full
	SignalThresholdCritical InterruptSignal = 2

	// SignalDatabaseDown signals complete database unavailability.
	// Used for: Connection failures, cluster down, network partition
	SignalDatabaseDown InterruptSignal = 3

	// SignalSecurityBreach signals confirmed security incident.
	// Used for: Unauthorized access, injection attempts, credential leaks
	SignalSecurityBreach InterruptSignal = 4

	// SignalThresholdHigh alerts on high threshold breach (early warning).
	// Used for: Memory >80%, CPU sustained >70%, query latency P99 >5s
	SignalThresholdHigh InterruptSignal = 10

	// SignalAlertSecurity signals suspicious activity requiring investigation.
	// Used for: Failed login attempts, unusual access patterns, rate limit hits
	SignalAlertSecurity InterruptSignal = 11

	// SignalAlertError signals non-critical errors requiring attention.
	// Used for: Query failures, timeout errors, retry exhaustion (non-critical)
	SignalAlertError InterruptSignal = 12

	// SignalResourceExhausted signals resource pressure (not yet critical).
	// Used for: Connection pool pressure, cache eviction storms, queue backlog
	SignalResourceExhausted InterruptSignal = 13

	// SignalWakeup awakens a DORMANT agent to ACTIVE state.
	// Used for: Scheduled workflows, cron triggers, on-demand activation
	SignalWakeup InterruptSignal = 20

	// SignalGracefulShutdown initiates graceful agent shutdown (not system-wide).
	// Used for: Agent TTL expiry, idle timeout, resource cleanup
	SignalGracefulShutdown InterruptSignal = 21

	// SignalHealthCheck requests health status from agent.
	// Used for: Kubernetes liveness probes, orchestrator health checks
	SignalHealthCheck InterruptSignal = 22

	// SignalConfigReload triggers hot-reload of agent configuration.
	// Used for: Pattern updates, prompt changes, settings refresh
	SignalConfigReload InterruptSignal = 23

	// SignalMetricsCollection triggers metrics aggregation and reporting.
	// Used for: Periodic Hawk exports, cost tracking, performance profiling
	SignalMetricsCollection InterruptSignal = 30

	// SignalLogRotation triggers log file rotation and archival.
	// Used for: Periodic log cleanup, compression, upload to storage
	SignalLogRotation InterruptSignal = 31

	// SignalLearningAnalyze triggers pattern analysis on recent executions.
	// Used for: Post-execution pattern mining, performance profiling
	SignalLearningAnalyze InterruptSignal = 40

	// SignalLearningOptimize triggers DSPy optimization on current prompts.
	// Used for: Scheduled prompt tuning, A/B test winner selection
	SignalLearningOptimize InterruptSignal = 41

	// SignalLearningABTest starts A/B test for new pattern vs baseline.
	// Used for: Validating learned patterns, comparing prompt variations
	SignalLearningABTest InterruptSignal = 42

	// SignalLearningProposal notifies that a new pattern proposal is ready for review.
	// Used for: Async learning cycles, human-in-the-loop approval
	SignalLearningProposal InterruptSignal = 43

	// SignalLearningValidate triggers validation of learned patterns against held-out data.
	// Used for: Post-optimization validation, regression testing
	SignalLearningValidate InterruptSignal = 44

	// SignalLearningExport triggers export of learned knowledge to Promptio.
	// Used for: Persisting approved patterns, cross-agent knowledge sharing
	SignalLearningExport InterruptSignal = 45

	// SignalLearningSync synchronizes learned patterns with other agents.
	// Used for: Multi-agent learning, cluster-wide pattern distribution
	SignalLearningSync InterruptSignal = 46

	// SignalCustomBase is the starting point for user-defined custom signals.
	// Example: const SignalMyCustomAlert = interrupt.SignalCustomBase + 1
	SignalCustomBase InterruptSignal = 1000
)

func (InterruptSignal) IsCritical

func (s InterruptSignal) IsCritical() bool

IsCritical returns true if this signal requires guaranteed delivery.

func (InterruptSignal) Priority

func (s InterruptSignal) Priority() Priority

Priority returns the delivery priority for this signal based on its range.

func (InterruptSignal) String

func (s InterruptSignal) String() string

String returns human-readable signal name.

type PersistentQueue

type PersistentQueue struct {
	// contains filtered or unexported fields
}

PersistentQueue provides guaranteed delivery for CRITICAL interrupts. Uses SQLite for persistence and implements retry logic with exponential backoff.

Design: - SQLite database for persistence (survives process restarts) - Retry loop with exponential backoff (100ms, 200ms, 400ms, ..., up to 30s) - Maximum 50 retry attempts per interrupt - ACK protocol for delivery confirmation (delivered -> acknowledged) - Background goroutine processes queue every 100ms

func NewPersistentQueue

func NewPersistentQueue(ctx context.Context, dbPath string, router *Router) (*PersistentQueue, error)

NewPersistentQueue creates a new persistent interrupt queue. dbPath is the path to the SQLite database file.

func (*PersistentQueue) Acknowledge

func (pq *PersistentQueue) Acknowledge(ctx context.Context, id int64) error

Acknowledge marks an interrupt as acknowledged (delivery confirmed by handler). This provides confirmation that the handler successfully processed the interrupt. Use this after the handler completes its work to transition from 'delivered' to 'acknowledged'.

func (*PersistentQueue) ClearOld

func (pq *PersistentQueue) ClearOld(ctx context.Context, olderThan time.Duration) (int, error)

ClearOld removes old acknowledged interrupts from the queue (cleanup).

func (*PersistentQueue) Close

func (pq *PersistentQueue) Close() error

Close shuts down the persistent queue gracefully.

func (*PersistentQueue) Enqueue

func (pq *PersistentQueue) Enqueue(ctx context.Context, interrupt *Interrupt) error

Enqueue adds a CRITICAL interrupt to the persistent queue.

func (*PersistentQueue) GetPendingCount

func (pq *PersistentQueue) GetPendingCount(ctx context.Context) (int, error)

GetPendingCount returns the number of pending interrupts in the queue.

func (*PersistentQueue) GetStats

func (pq *PersistentQueue) GetStats(ctx context.Context) (map[string]int, error)

GetStats returns queue statistics.

func (*PersistentQueue) ListPending

func (pq *PersistentQueue) ListPending(ctx context.Context, limit int) ([]*QueueEntry, error)

ListPending returns pending interrupts ordered by age.

func (*PersistentQueue) WithTracer

func (pq *PersistentQueue) WithTracer(tracer observability.Tracer) *PersistentQueue

WithTracer sets an optional tracer for observability.

type Priority

type Priority int

Priority defines interrupt delivery priority level.

const (
	// PriorityCritical: <1s delivery, guaranteed via persistent queue, never dropped
	PriorityCritical Priority = 0

	// PriorityHigh: <5s delivery, best-effort via large buffers (10k)
	PriorityHigh Priority = 1

	// PriorityNormal: <30s delivery, best-effort via medium buffers (1k)
	PriorityNormal Priority = 2

	// PriorityLow: background, best-effort via small buffers (100)
	PriorityLow Priority = 3
)

func (Priority) BufferSize

func (p Priority) BufferSize() int

BufferSize returns the recommended channel buffer size for this priority.

func (Priority) String

func (p Priority) String() string

String returns human-readable priority name.

type QueueEntry

type QueueEntry struct {
	ID           int64      `json:"id"`
	Signal       int        `json:"signal"`        // InterruptSignal as int
	TargetID     string     `json:"target_id"`     // Target agent ID
	Payload      string     `json:"payload"`       // JSON payload
	SenderID     string     `json:"sender_id"`     // Sender ID for tracing
	CreatedAt    time.Time  `json:"created_at"`    // When interrupt was created
	EnqueuedAt   time.Time  `json:"enqueued_at"`   // When interrupt was queued
	DeliveredAt  *time.Time `json:"delivered_at"`  // When delivered (null if pending)
	AckAt        *time.Time `json:"ack_at"`        // When acknowledged (null if pending)
	RetryCount   int        `json:"retry_count"`   // Number of retry attempts
	State        string     `json:"state"`         // pending, delivered, acknowledged, failed
	ErrorMessage string     `json:"error_message"` // Last error message (if any)
}

QueueEntry represents a persisted interrupt in the queue.

type Router

type Router struct {
	// contains filtered or unexported fields
}

Router handles fast-path interrupt delivery via Go channels. Each registered handler gets a dedicated channel with priority-appropriate buffer size.

Design: - Dedicated channel per handler (no multiplexing contention) - Non-blocking sends (returns false if buffer full) - Background goroutine processes each handler's queue - Graceful shutdown waits for in-flight handlers

func NewRouter

func NewRouter(ctx context.Context) *Router

NewRouter creates a new fast-path router.

func (*Router) Close

func (r *Router) Close() error

Close shuts down the router gracefully. Waits for all in-flight handlers to complete (with timeout).

func (*Router) GetStats

func (r *Router) GetStats() map[string]int

GetStats returns router statistics.

func (*Router) RegisterHandler

func (r *Router) RegisterHandler(agentID string, signal InterruptSignal, handler Handler) error

RegisterHandler registers a handler for fast-path delivery. Creates a dedicated channel and background goroutine for this handler.

func (*Router) Send

func (r *Router) Send(ctx context.Context, signal InterruptSignal, targetAgentID string, payload []byte) (bool, error)

Send attempts non-blocking delivery to a specific agent's handler. Returns (true, nil) if delivered successfully. Returns (false, nil) if buffer is full (caller should fall back to persistent queue). Returns (false, err) if handler not found.

func (*Router) UnregisterHandler

func (r *Router) UnregisterHandler(agentID string, signal InterruptSignal) error

UnregisterHandler removes a handler and stops its background goroutine.

func (*Router) WithTracer

func (r *Router) WithTracer(tracer observability.Tracer) *Router

WithTracer sets an optional tracer for observability.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL