Documentation
¶
Overview ¶
Package notifications provides provider-agnostic notification delivery with routing, callback tracking, and timeout management.
Index ¶
- Constants
- func ActionPrefix(deviceName string) string
- type Action
- type ActionableRequest
- type CallbackDispatcher
- type ChannelActivity
- type ChannelActivitySource
- type ContactResolver
- type DelegateSpawner
- type EscalationResponse
- type EscalationSender
- type HAClient
- type HAPushProvider
- type HistoryProvider
- type HistoryProviderConfig
- type MessageRecorder
- type Notification
- type NotificationProvider
- type NotificationRequest
- type NotificationRouter
- func (r *NotificationRouter) RegisterProvider(p NotificationProvider)
- func (r *NotificationRouter) Route(recipient string) (NotificationProvider, error)
- func (r *NotificationRouter) Send(ctx context.Context, n Notification) error
- func (r *NotificationRouter) SendActionable(ctx context.Context, req ActionableRequest, sessionID, conversationID string) (string, error)
- func (r *NotificationRouter) SendNotification(ctx context.Context, req NotificationRequest) error
- func (r *NotificationRouter) SetActivitySource(src ChannelActivitySource)
- func (r *NotificationRouter) SetSourceFunc(fn SourceFunc)
- type OpstateStore
- type Record
- type RecordStore
- func (s *RecordStore) Create(r *Record) error
- func (s *RecordStore) Expire(requestID string) (bool, error)
- func (s *RecordStore) Get(requestID string) (*Record, error)
- func (s *RecordStore) Log(r *Record) error
- func (s *RecordStore) PendingExpired() ([]*Record, error)
- func (s *RecordStore) Recent(since time.Time, limit int) ([]*Record, error)
- func (s *RecordStore) Respond(requestID, actionID string) (bool, error)
- func (s *RecordStore) SetResponseAction(requestID, actionID string) error
- type ResponseWaiter
- func (w *ResponseWaiter) Register(requestID string) <-chan EscalationResponse
- func (w *ResponseWaiter) Signal(requestID, actionID string) bool
- func (w *ResponseWaiter) SignalTimeout(requestID string) bool
- func (w *ResponseWaiter) Wait(ctx context.Context, requestID string, ch <-chan EscalationResponse) (EscalationResponse, error)
- func (w *ResponseWaiter) WaitWithTimeout(ctx context.Context, requestID string, ch <-chan EscalationResponse, ...) (EscalationResponse, error)
- type Sender
- type SessionInjector
- type SignalProvider
- type SignalSender
- type SourceFunc
- type TimeoutWatcher
Constants ¶
const ( StatusPending = "pending" StatusResponded = "responded" StatusExpired = "expired" StatusSent = "sent" // fire-and-forget, no response expected )
Status constants for notification records.
const ( KindFireAndForget = "fire_and_forget" KindActionable = "actionable" )
Kind constants for notification records.
Variables ¶
This section is empty.
Functions ¶
func ActionPrefix ¶
ActionPrefix returns the action string prefix derived from a device name. Hyphens are replaced with underscores and the result is uppercased, e.g. "aimee-thane" becomes "AIMEE_THANE". An empty deviceName falls back to "THANE".
Types ¶
type ActionableRequest ¶
type ActionableRequest struct {
NotificationRequest
Actions []Action // required, at least one
RequestID string // UUIDv7, set by router
Timeout time.Duration // how long to wait for response
TimeoutAction string // action ID to auto-execute, "escalate", or "cancel"
Context string // model-provided context for callback handling
}
ActionableRequest extends NotificationRequest with callback tracking fields for human-in-the-loop notifications.
type CallbackDispatcher ¶
type CallbackDispatcher struct {
// contains filtered or unexported fields
}
CallbackDispatcher handles MQTT messages on the instance-specific callbacks topic. It parses action strings with the configured prefix, looks up the notification record, marks it as responded, and routes the response to the appropriate handler (session injection or delegate).
func NewCallbackDispatcher ¶
func NewCallbackDispatcher(records *RecordStore, injector SessionInjector, delegate DelegateSpawner, deviceName string, logger *slog.Logger) *CallbackDispatcher
NewCallbackDispatcher creates a callback dispatcher. The deviceName parameter is used to derive the action prefix via ActionPrefix.
func (*CallbackDispatcher) DispatchAction ¶
func (d *CallbackDispatcher) DispatchAction(record *Record, actionID string)
DispatchAction routes a notification response to the originating session (via system message injection) or spawns a delegate if the session is no longer alive. Delegate spawning is offloaded to a goroutine so it does not block the MQTT handler. This method is also used by the timeout watcher to dispatch auto-fired timeout actions.
func (*CallbackDispatcher) Handle ¶
func (d *CallbackDispatcher) Handle(_ string, payload []byte)
Handle is an mqtt.MessageHandler that processes callback messages published by the HA automation when a user taps a notification action button.
func (*CallbackDispatcher) ResponseWaiter ¶ added in v0.9.1
func (d *CallbackDispatcher) ResponseWaiter() *ResponseWaiter
ResponseWaiter returns the configured waiter, or nil.
func (*CallbackDispatcher) SetResponseWaiter ¶ added in v0.9.1
func (d *CallbackDispatcher) SetResponseWaiter(w *ResponseWaiter)
SetResponseWaiter configures synchronous escalation support. When set, [DispatchAction] signals any waiting escalation tool in addition to injecting session messages or spawning delegates.
type ChannelActivity ¶ added in v0.9.1
type ChannelActivity struct {
Channel string // provider name: "signal", "owu"
Contact string // resolved contact name
LastActive time.Time // when the channel last had activity
}
ChannelActivity describes a channel with recent interaction for a specific contact.
type ChannelActivitySource ¶ added in v0.9.1
type ChannelActivitySource interface {
ActiveChannels() []ChannelActivity
}
ChannelActivitySource provides information about which channels have recent activity for which contacts. Used by the notification router to prefer channels the user is actively engaged on.
type ContactResolver ¶
type ContactResolver interface {
ResolveContact(name string) (*contacts.Contact, error)
GetPropertiesMap(contactID uuid.UUID) (map[string][]string, error)
}
ContactResolver resolves a contact name to its record and properties. ResolveContact uses cascading resolution (exact name → nickname → search) for flexible name matching.
type DelegateSpawner ¶
DelegateSpawner executes a task in a lightweight delegate loop when the originating session is no longer alive.
type EscalationResponse ¶ added in v0.9.1
type EscalationResponse struct {
ActionID string // the action chosen by the human
TimedOut bool // true if the timeout expired without a response
}
EscalationResponse holds the result of a synchronous escalation.
type EscalationSender ¶
type EscalationSender interface {
Send(ctx context.Context, n Notification) error
}
EscalationSender sends fire-and-forget notifications for timeout escalation. Both *Sender and *NotificationRouter implement this interface, allowing TimeoutWatcher to use either.
type HAClient ¶
type HAClient interface {
CallService(ctx context.Context, domain, service string, data map[string]any) error
}
HAClient is the subset of homeassistant.Client needed for notifications.
type HAPushProvider ¶
type HAPushProvider struct {
// contains filtered or unexported fields
}
HAPushProvider delivers notifications via Home Assistant companion app push by wrapping the existing Sender.
func NewHAPushProvider ¶
func NewHAPushProvider(sender *Sender) *HAPushProvider
NewHAPushProvider creates a provider that delegates to an existing HA notification Sender.
func (*HAPushProvider) Name ¶
func (p *HAPushProvider) Name() string
Name returns the provider identifier.
func (*HAPushProvider) Send ¶
func (p *HAPushProvider) Send(ctx context.Context, req NotificationRequest) error
Send delivers a fire-and-forget notification via HA push.
func (*HAPushProvider) SendActionable ¶
func (p *HAPushProvider) SendActionable(ctx context.Context, req ActionableRequest) error
SendActionable delivers an actionable notification via HA push. The provider only handles delivery; record creation is the router's responsibility.
type HistoryProvider ¶ added in v0.9.1
type HistoryProvider struct {
// contains filtered or unexported fields
}
HistoryProvider injects a compact JSON summary of recent notifications into the system prompt. This gives the agent (and autonomous loops like metacognitive) awareness of what has already been sent, preventing duplicate notifications.
Implements agent.ContextProvider.
func NewHistoryProvider ¶ added in v0.9.1
func NewHistoryProvider(cfg HistoryProviderConfig) *HistoryProvider
NewHistoryProvider creates a provider that queries the notification record store and formats recent sends for system prompt injection.
func (*HistoryProvider) GetContext ¶ added in v0.9.1
GetContext returns a compact JSON summary of recent notifications for injection into the system prompt. Returns empty string when no recent notifications exist.
type HistoryProviderConfig ¶ added in v0.9.1
type HistoryProviderConfig struct {
Records *RecordStore // optional; nil disables history output
Window time.Duration // lookback window; default 6h
Limit int // max entries; default 30
Logger *slog.Logger // optional; defaults to slog.Default
}
HistoryProviderConfig holds dependencies for NewHistoryProvider.
type MessageRecorder ¶ added in v0.9.1
MessageRecorder records outbound messages in conversation memory so they appear in history when the recipient replies. Callers provide the conversation ID derivation appropriate for their channel.
type Notification ¶
type Notification struct {
Recipient string // contact name (e.g., "nugget")
Title string // notification title (optional)
Message string // notification body (required)
Priority string // "low", "normal" (default), "urgent"
Actions []Action // optional: action buttons (creates tracked notification)
RequestID string // UUIDv7, set by caller when Actions is non-empty
Timeout time.Duration // how long to wait for response (default: 30m)
TimeoutAction string // action ID to auto-execute on timeout, or "escalate"/"cancel"
Context string // model-provided context for callback handling
}
Notification is a push notification. Without Actions it is fire-and-forget (Phase 1). With Actions it creates an actionable notification with callback tracking (Phase 2).
type NotificationProvider ¶
type NotificationProvider interface {
// Send delivers a fire-and-forget notification.
Send(ctx context.Context, req NotificationRequest) error
// SendActionable delivers a notification with action buttons.
// The RequestID is pre-set by the router before calling the
// provider; the provider only handles delivery.
SendActionable(ctx context.Context, req ActionableRequest) error
// Name returns the provider identifier (e.g., "ha_push").
Name() string
}
NotificationProvider delivers notifications through a specific transport (e.g., HA companion app push, Signal, email).
type NotificationRequest ¶
type NotificationRequest struct {
Recipient string // contact name (resolved by router)
Title string // optional
Message string // required
Priority string // "low", "normal", "urgent"
}
NotificationRequest is a fire-and-forget notification delivery request. It contains only the fields common to all providers.
type NotificationRouter ¶
type NotificationRouter struct {
// contains filtered or unexported fields
}
NotificationRouter selects a notification provider based on contact facts and channel activity, then orchestrates delivery for both fire-and-forget and actionable notifications. It is the single entry point for the provider-agnostic send_notification and request_human_decision tools.
func NewNotificationRouter ¶
func NewNotificationRouter(contacts ContactResolver, records *RecordStore, logger *slog.Logger) *NotificationRouter
NewNotificationRouter creates a router with contact resolution and optional record tracking for actionable notifications. A nil logger defaults to slog.Default.
func (*NotificationRouter) RegisterProvider ¶
func (r *NotificationRouter) RegisterProvider(p NotificationProvider)
RegisterProvider adds a notification provider to the router. Nil providers and providers with empty names are rejected. Registering a provider whose name already exists overwrites the previous one and logs a warning.
func (*NotificationRouter) Route ¶
func (r *NotificationRouter) Route(recipient string) (NotificationProvider, error)
Route resolves a recipient to the appropriate provider based on contact properties. It checks for an explicit notification_preference property first, then falls back to checking for known delivery channels.
func (*NotificationRouter) Send ¶
func (r *NotificationRouter) Send(ctx context.Context, n Notification) error
Send satisfies EscalationSender so the router can be used by TimeoutWatcher for escalation notifications. It adapts the legacy Notification struct to a NotificationRequest and routes it.
func (*NotificationRouter) SendActionable ¶
func (r *NotificationRouter) SendActionable(ctx context.Context, req ActionableRequest, sessionID, conversationID string) (string, error)
SendActionable delivers an actionable notification and creates a tracking record. Returns the request ID for the created record. Delivery is attempted first; records are only created on success to avoid dangling records that trigger timeout actions for undelivered notifications.
func (*NotificationRouter) SendNotification ¶
func (r *NotificationRouter) SendNotification(ctx context.Context, req NotificationRequest) error
SendNotification delivers a fire-and-forget notification via the appropriate provider for the recipient. On success, a record is logged for notification history awareness.
func (*NotificationRouter) SetActivitySource ¶ added in v0.9.1
func (r *NotificationRouter) SetActivitySource(src ChannelActivitySource)
SetActivitySource configures channel activity awareness. When set, [Route] prefers providers with recent activity for the recipient.
func (*NotificationRouter) SetSourceFunc ¶ added in v0.9.1
func (r *NotificationRouter) SetSourceFunc(fn SourceFunc)
SetSourceFunc configures the function that extracts a source identifier from the request context for notification history logging.
type OpstateStore ¶
OpstateStore is the subset of opstate.Store needed for recording notifications.
type Record ¶
type Record struct {
RequestID string
Recipient string
OriginSession string
OriginConversation string
Context string
Actions []Action
TimeoutSeconds int
TimeoutAction string
Status string
ResponseAction string
RespondedAt time.Time
CreatedAt time.Time
ExpiresAt time.Time
// Fields added for notification history awareness (issue #614).
Channel string // provider name: "ha_push", "signal"
Source string // originating loop/conversation: "metacognitive", "signal/+1234"
Kind string // "fire_and_forget" or "actionable"
Title string // notification title
Message string // notification body (may be truncated for display)
}
Record tracks a notification from creation through delivery, and optionally through response or expiry for actionable notifications. Fire-and-forget records have empty Actions, zero Timeout fields, and Status set to StatusSent.
type RecordStore ¶
type RecordStore struct {
// contains filtered or unexported fields
}
RecordStore provides SQLite-backed CRUD for notification records.
func NewRecordStore ¶
NewRecordStore creates a notification record store using the given database connection. The caller owns the connection — RecordStore does not close it. The schema is created automatically on first use.
func (*RecordStore) Create ¶
func (s *RecordStore) Create(r *Record) error
Create inserts a new notification record. The record's Status is set to StatusPending regardless of the caller's value. Kind defaults to KindActionable if not set.
func (*RecordStore) Expire ¶
func (s *RecordStore) Expire(requestID string) (bool, error)
Expire marks a pending record as expired. Returns true if the record was updated (was still pending), false if it was already responded or expired. Callers should check the bool to avoid executing timeout actions on records that were concurrently responded to.
func (*RecordStore) Get ¶
func (s *RecordStore) Get(requestID string) (*Record, error)
Get retrieves a notification record by request ID. Returns sql.ErrNoRows if no record is found.
func (*RecordStore) Log ¶ added in v0.9.1
func (s *RecordStore) Log(r *Record) error
Log inserts a fire-and-forget notification record. Unlike [Create], this is for notifications that need no callback tracking — the record exists solely for history awareness. Status is set to StatusSent and ExpiresAt to the zero value.
func (*RecordStore) PendingExpired ¶
func (s *RecordStore) PendingExpired() ([]*Record, error)
PendingExpired returns all records that are still pending but whose expiry time has passed.
func (*RecordStore) Recent ¶ added in v0.9.1
Recent returns the most recent notification records created since the given time, ordered newest-first. It returns both fire-and-forget and actionable records for history awareness.
func (*RecordStore) Respond ¶
func (s *RecordStore) Respond(requestID, actionID string) (bool, error)
Respond marks a pending record as responded with the given action ID. Returns true if the record was updated (was still pending), false if it was already responded or expired. Callers should check the bool to avoid double-processing in race scenarios.
func (*RecordStore) SetResponseAction ¶
func (s *RecordStore) SetResponseAction(requestID, actionID string) error
SetResponseAction records the action that was executed for an expired record. This is used by the timeout watcher to persist which action was auto-executed on timeout, since the record has already transitioned from pending to expired and Respond cannot update it.
type ResponseWaiter ¶ added in v0.9.1
type ResponseWaiter struct {
// contains filtered or unexported fields
}
ResponseWaiter provides synchronous blocking for escalation tools. When a tool sends an actionable notification, it registers a waiter. When the callback fires (MQTT or resolve_actionable), it signals the waiter with the chosen action.
func NewResponseWaiter ¶ added in v0.9.1
func NewResponseWaiter() *ResponseWaiter
NewResponseWaiter creates a waiter registry.
func (*ResponseWaiter) Register ¶ added in v0.9.1
func (w *ResponseWaiter) Register(requestID string) <-chan EscalationResponse
Register creates a waiter channel for the given request ID. The returned channel receives the response when Signal resolves the callback or the timeout expires.
func (*ResponseWaiter) Signal ¶ added in v0.9.1
func (w *ResponseWaiter) Signal(requestID, actionID string) bool
Signal delivers a response to a waiting escalation tool. Returns true if a waiter was found, false if the request has no waiter (e.g., the escalation timed out and was cleaned up, or it was a non-synchronous notification).
func (*ResponseWaiter) SignalTimeout ¶ added in v0.9.1
func (w *ResponseWaiter) SignalTimeout(requestID string) bool
SignalTimeout delivers a timeout response to a waiting escalation.
func (*ResponseWaiter) Wait ¶ added in v0.9.1
func (w *ResponseWaiter) Wait(ctx context.Context, requestID string, ch <-chan EscalationResponse) (EscalationResponse, error)
Wait blocks until the response arrives or the context is cancelled. Returns an error only when the parent context is cancelled (e.g., the run is shutting down). Cleans up the waiter on exit.
func (*ResponseWaiter) WaitWithTimeout ¶ added in v0.9.1
func (w *ResponseWaiter) WaitWithTimeout(ctx context.Context, requestID string, ch <-chan EscalationResponse, timeout time.Duration) (EscalationResponse, error)
WaitWithTimeout blocks until the response arrives, the timeout expires, or the parent context is cancelled. A timeout expiry returns an EscalationResponse with TimedOut=true (not an error), since timeouts are an expected outcome. Only parent context cancellation (e.g., run shutdown) returns an error.
type Sender ¶
type Sender struct {
// contains filtered or unexported fields
}
Sender delivers notifications via Home Assistant companion app push.
func NewSender ¶
func NewSender(ha HAClient, contacts ContactResolver, opstate OpstateStore, deviceName string, logger *slog.Logger) *Sender
NewSender creates a notification sender. The deviceName parameter is used to derive the action callback prefix via ActionPrefix.
type SessionInjector ¶
type SessionInjector interface {
// InjectSystemMessage adds a system message to the conversation's
// memory. The message will be visible on the next agent turn.
InjectSystemMessage(conversationID, message string) error
// IsSessionAlive reports whether a conversation has an active
// archive session.
IsSessionAlive(conversationID string) bool
}
SessionInjector injects a system message into a live conversation. Implementations are wired in main.go to avoid import cycles.
type SignalProvider ¶ added in v0.9.1
type SignalProvider struct {
// contains filtered or unexported fields
}
SignalProvider delivers fire-and-forget notifications via Signal by resolving the recipient's phone number from the contact store. When a MessageRecorder is set, outbound notifications are recorded in conversation memory with provenance metadata.
func NewSignalProvider ¶ added in v0.9.1
func NewSignalProvider(sender SignalSender, contacts ContactResolver, logger *slog.Logger) *SignalProvider
NewSignalProvider creates a Signal notification provider.
func (*SignalProvider) Name ¶ added in v0.9.1
func (p *SignalProvider) Name() string
Name returns the provider identifier.
func (*SignalProvider) Send ¶ added in v0.9.1
func (p *SignalProvider) Send(ctx context.Context, req NotificationRequest) error
Send delivers a fire-and-forget notification via Signal. The recipient is resolved to a phone number via contact properties (IMPP with signal: prefix, or TEL).
func (*SignalProvider) SendActionable ¶ added in v0.9.1
func (p *SignalProvider) SendActionable(ctx context.Context, req ActionableRequest) error
SendActionable delivers an actionable notification via Signal as a text message with numbered options. The model interprets the user's natural-language reply and can resolve the callback via a future resolve_actionable tool. The message is recorded in conversation memory with request_id and action metadata for that tool to use.
func (*SignalProvider) SetRecorder ¶ added in v0.9.1
func (p *SignalProvider) SetRecorder(r MessageRecorder)
SetRecorder configures memory recording for outbound notifications. When set, sent messages are recorded in conversation history with provenance metadata so the agent has context when the user replies.
type SignalSender ¶ added in v0.9.1
SignalSender abstracts Signal message delivery so the notifications package does not import the signal package. Implemented by signal.Client.
type SourceFunc ¶ added in v0.9.1
SourceFunc extracts a human-readable source identifier from the request context. The returned string identifies the originating loop or conversation (e.g., "metacognitive", "signal/+15125551234"). Used by the router to populate the Source field on notification records for history awareness.
type TimeoutWatcher ¶
type TimeoutWatcher struct {
// contains filtered or unexported fields
}
TimeoutWatcher periodically checks for pending notification records whose expiry time has passed and executes their configured timeout action.
func NewTimeoutWatcher ¶
func NewTimeoutWatcher(records *RecordStore, dispatcher *CallbackDispatcher, sender EscalationSender, interval time.Duration, logger *slog.Logger) *TimeoutWatcher
NewTimeoutWatcher creates a timeout watcher. The sender parameter is optional and only needed for "escalate" timeout actions that re-send a notification at urgent priority. It accepts any EscalationSender (e.g., *Sender or *NotificationRouter).
func (*TimeoutWatcher) Start ¶
func (w *TimeoutWatcher) Start(ctx context.Context)
Start runs the timeout watcher loop until ctx is cancelled. It checks for expired records at the configured interval.