notifications

package
v0.9.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 23, 2026 License: Apache-2.0 Imports: 12 Imported by: 0

Documentation

Overview

Package notifications provides provider-agnostic notification delivery with routing, callback tracking, and timeout management.

Index

Constants

View Source
const (
	StatusPending   = "pending"
	StatusResponded = "responded"
	StatusExpired   = "expired"
	StatusSent      = "sent" // fire-and-forget, no response expected
)

Status constants for notification records.

View Source
const (
	KindFireAndForget = "fire_and_forget"
	KindActionable    = "actionable"
)

Kind constants for notification records.

Variables

This section is empty.

Functions

func ActionPrefix

func ActionPrefix(deviceName string) string

ActionPrefix returns the action string prefix derived from a device name. Hyphens are replaced with underscores and the result is uppercased, e.g. "aimee-thane" becomes "AIMEE_THANE". An empty deviceName falls back to "THANE".

Types

type Action

type Action struct {
	ID    string `json:"id"`
	Label string `json:"label"`
}

Action represents a single action button on a notification.

type ActionableRequest

type ActionableRequest struct {
	NotificationRequest
	Actions       []Action      // required, at least one
	RequestID     string        // UUIDv7, set by router
	Timeout       time.Duration // how long to wait for response
	TimeoutAction string        // action ID to auto-execute, "escalate", or "cancel"
	Context       string        // model-provided context for callback handling
}

ActionableRequest extends NotificationRequest with callback tracking fields for human-in-the-loop notifications.

type CallbackDispatcher

type CallbackDispatcher struct {
	// contains filtered or unexported fields
}

CallbackDispatcher handles MQTT messages on the instance-specific callbacks topic. It parses action strings with the configured prefix, looks up the notification record, marks it as responded, and routes the response to the appropriate handler (session injection or delegate).

func NewCallbackDispatcher

func NewCallbackDispatcher(records *RecordStore, injector SessionInjector, delegate DelegateSpawner, deviceName string, logger *slog.Logger) *CallbackDispatcher

NewCallbackDispatcher creates a callback dispatcher. The deviceName parameter is used to derive the action prefix via ActionPrefix.

func (*CallbackDispatcher) DispatchAction

func (d *CallbackDispatcher) DispatchAction(record *Record, actionID string)

DispatchAction routes a notification response to the originating session (via system message injection) or spawns a delegate if the session is no longer alive. Delegate spawning is offloaded to a goroutine so it does not block the MQTT handler. This method is also used by the timeout watcher to dispatch auto-fired timeout actions.

func (*CallbackDispatcher) Handle

func (d *CallbackDispatcher) Handle(_ string, payload []byte)

Handle is an mqtt.MessageHandler that processes callback messages published by the HA automation when a user taps a notification action button.

func (*CallbackDispatcher) ResponseWaiter added in v0.9.1

func (d *CallbackDispatcher) ResponseWaiter() *ResponseWaiter

ResponseWaiter returns the configured waiter, or nil.

func (*CallbackDispatcher) SetResponseWaiter added in v0.9.1

func (d *CallbackDispatcher) SetResponseWaiter(w *ResponseWaiter)

SetResponseWaiter configures synchronous escalation support. When set, [DispatchAction] signals any waiting escalation tool in addition to injecting session messages or spawning delegates.

type ChannelActivity added in v0.9.1

type ChannelActivity struct {
	Channel    string    // provider name: "signal", "owu"
	Contact    string    // resolved contact name
	LastActive time.Time // when the channel last had activity
}

ChannelActivity describes a channel with recent interaction for a specific contact.

type ChannelActivitySource added in v0.9.1

type ChannelActivitySource interface {
	ActiveChannels() []ChannelActivity
}

ChannelActivitySource provides information about which channels have recent activity for which contacts. Used by the notification router to prefer channels the user is actively engaged on.

type ContactResolver

type ContactResolver interface {
	ResolveContact(name string) (*contacts.Contact, error)
	GetPropertiesMap(contactID uuid.UUID) (map[string][]string, error)
}

ContactResolver resolves a contact name to its record and properties. ResolveContact uses cascading resolution (exact name → nickname → search) for flexible name matching.

type DelegateSpawner

type DelegateSpawner interface {
	Spawn(ctx context.Context, task, guidance string) error
}

DelegateSpawner executes a task in a lightweight delegate loop when the originating session is no longer alive.

type EscalationResponse added in v0.9.1

type EscalationResponse struct {
	ActionID string // the action chosen by the human
	TimedOut bool   // true if the timeout expired without a response
}

EscalationResponse holds the result of a synchronous escalation.

type EscalationSender

type EscalationSender interface {
	Send(ctx context.Context, n Notification) error
}

EscalationSender sends fire-and-forget notifications for timeout escalation. Both *Sender and *NotificationRouter implement this interface, allowing TimeoutWatcher to use either.

type HAClient

type HAClient interface {
	CallService(ctx context.Context, domain, service string, data map[string]any) error
}

HAClient is the subset of homeassistant.Client needed for notifications.

type HAPushProvider

type HAPushProvider struct {
	// contains filtered or unexported fields
}

HAPushProvider delivers notifications via Home Assistant companion app push by wrapping the existing Sender.

func NewHAPushProvider

func NewHAPushProvider(sender *Sender) *HAPushProvider

NewHAPushProvider creates a provider that delegates to an existing HA notification Sender.

func (*HAPushProvider) Name

func (p *HAPushProvider) Name() string

Name returns the provider identifier.

func (*HAPushProvider) Send

Send delivers a fire-and-forget notification via HA push.

func (*HAPushProvider) SendActionable

func (p *HAPushProvider) SendActionable(ctx context.Context, req ActionableRequest) error

SendActionable delivers an actionable notification via HA push. The provider only handles delivery; record creation is the router's responsibility.

type HistoryProvider added in v0.9.1

type HistoryProvider struct {
	// contains filtered or unexported fields
}

HistoryProvider injects a compact JSON summary of recent notifications into the system prompt. This gives the agent (and autonomous loops like metacognitive) awareness of what has already been sent, preventing duplicate notifications.

Implements agent.ContextProvider.

func NewHistoryProvider added in v0.9.1

func NewHistoryProvider(cfg HistoryProviderConfig) *HistoryProvider

NewHistoryProvider creates a provider that queries the notification record store and formats recent sends for system prompt injection.

func (*HistoryProvider) GetContext added in v0.9.1

func (p *HistoryProvider) GetContext(_ context.Context, _ string) (string, error)

GetContext returns a compact JSON summary of recent notifications for injection into the system prompt. Returns empty string when no recent notifications exist.

type HistoryProviderConfig added in v0.9.1

type HistoryProviderConfig struct {
	Records *RecordStore  // optional; nil disables history output
	Window  time.Duration // lookback window; default 6h
	Limit   int           // max entries; default 30
	Logger  *slog.Logger  // optional; defaults to slog.Default
}

HistoryProviderConfig holds dependencies for NewHistoryProvider.

type MessageRecorder added in v0.9.1

type MessageRecorder interface {
	RecordOutbound(phone, message string) error
}

MessageRecorder records outbound messages in conversation memory so they appear in history when the recipient replies. Callers provide the conversation ID derivation appropriate for their channel.

type Notification

type Notification struct {
	Recipient     string        // contact name (e.g., "nugget")
	Title         string        // notification title (optional)
	Message       string        // notification body (required)
	Priority      string        // "low", "normal" (default), "urgent"
	Actions       []Action      // optional: action buttons (creates tracked notification)
	RequestID     string        // UUIDv7, set by caller when Actions is non-empty
	Timeout       time.Duration // how long to wait for response (default: 30m)
	TimeoutAction string        // action ID to auto-execute on timeout, or "escalate"/"cancel"
	Context       string        // model-provided context for callback handling
}

Notification is a push notification. Without Actions it is fire-and-forget (Phase 1). With Actions it creates an actionable notification with callback tracking (Phase 2).

type NotificationProvider

type NotificationProvider interface {
	// Send delivers a fire-and-forget notification.
	Send(ctx context.Context, req NotificationRequest) error
	// SendActionable delivers a notification with action buttons.
	// The RequestID is pre-set by the router before calling the
	// provider; the provider only handles delivery.
	SendActionable(ctx context.Context, req ActionableRequest) error
	// Name returns the provider identifier (e.g., "ha_push").
	Name() string
}

NotificationProvider delivers notifications through a specific transport (e.g., HA companion app push, Signal, email).

type NotificationRequest

type NotificationRequest struct {
	Recipient string // contact name (resolved by router)
	Title     string // optional
	Message   string // required
	Priority  string // "low", "normal", "urgent"
}

NotificationRequest is a fire-and-forget notification delivery request. It contains only the fields common to all providers.

type NotificationRouter

type NotificationRouter struct {
	// contains filtered or unexported fields
}

NotificationRouter selects a notification provider based on contact facts and channel activity, then orchestrates delivery for both fire-and-forget and actionable notifications. It is the single entry point for the provider-agnostic send_notification and request_human_decision tools.

func NewNotificationRouter

func NewNotificationRouter(contacts ContactResolver, records *RecordStore, logger *slog.Logger) *NotificationRouter

NewNotificationRouter creates a router with contact resolution and optional record tracking for actionable notifications. A nil logger defaults to slog.Default.

func (*NotificationRouter) RegisterProvider

func (r *NotificationRouter) RegisterProvider(p NotificationProvider)

RegisterProvider adds a notification provider to the router. Nil providers and providers with empty names are rejected. Registering a provider whose name already exists overwrites the previous one and logs a warning.

func (*NotificationRouter) Route

func (r *NotificationRouter) Route(recipient string) (NotificationProvider, error)

Route resolves a recipient to the appropriate provider based on contact properties. It checks for an explicit notification_preference property first, then falls back to checking for known delivery channels.

func (*NotificationRouter) Send

Send satisfies EscalationSender so the router can be used by TimeoutWatcher for escalation notifications. It adapts the legacy Notification struct to a NotificationRequest and routes it.

func (*NotificationRouter) SendActionable

func (r *NotificationRouter) SendActionable(ctx context.Context, req ActionableRequest, sessionID, conversationID string) (string, error)

SendActionable delivers an actionable notification and creates a tracking record. Returns the request ID for the created record. Delivery is attempted first; records are only created on success to avoid dangling records that trigger timeout actions for undelivered notifications.

func (*NotificationRouter) SendNotification

func (r *NotificationRouter) SendNotification(ctx context.Context, req NotificationRequest) error

SendNotification delivers a fire-and-forget notification via the appropriate provider for the recipient. On success, a record is logged for notification history awareness.

func (*NotificationRouter) SetActivitySource added in v0.9.1

func (r *NotificationRouter) SetActivitySource(src ChannelActivitySource)

SetActivitySource configures channel activity awareness. When set, [Route] prefers providers with recent activity for the recipient.

func (*NotificationRouter) SetSourceFunc added in v0.9.1

func (r *NotificationRouter) SetSourceFunc(fn SourceFunc)

SetSourceFunc configures the function that extracts a source identifier from the request context for notification history logging.

type OpstateStore

type OpstateStore interface {
	SetWithTTL(namespace, key, value string, ttl time.Duration) error
}

OpstateStore is the subset of opstate.Store needed for recording notifications.

type Record

type Record struct {
	RequestID          string
	Recipient          string
	OriginSession      string
	OriginConversation string
	Context            string
	Actions            []Action
	TimeoutSeconds     int
	TimeoutAction      string
	Status             string
	ResponseAction     string
	RespondedAt        time.Time
	CreatedAt          time.Time
	ExpiresAt          time.Time

	// Fields added for notification history awareness (issue #614).
	Channel string // provider name: "ha_push", "signal"
	Source  string // originating loop/conversation: "metacognitive", "signal/+1234"
	Kind    string // "fire_and_forget" or "actionable"
	Title   string // notification title
	Message string // notification body (may be truncated for display)
}

Record tracks a notification from creation through delivery, and optionally through response or expiry for actionable notifications. Fire-and-forget records have empty Actions, zero Timeout fields, and Status set to StatusSent.

type RecordStore

type RecordStore struct {
	// contains filtered or unexported fields
}

RecordStore provides SQLite-backed CRUD for notification records.

func NewRecordStore

func NewRecordStore(db *sql.DB, logger *slog.Logger) (*RecordStore, error)

NewRecordStore creates a notification record store using the given database connection. The caller owns the connection — RecordStore does not close it. The schema is created automatically on first use.

func (*RecordStore) Create

func (s *RecordStore) Create(r *Record) error

Create inserts a new notification record. The record's Status is set to StatusPending regardless of the caller's value. Kind defaults to KindActionable if not set.

func (*RecordStore) Expire

func (s *RecordStore) Expire(requestID string) (bool, error)

Expire marks a pending record as expired. Returns true if the record was updated (was still pending), false if it was already responded or expired. Callers should check the bool to avoid executing timeout actions on records that were concurrently responded to.

func (*RecordStore) Get

func (s *RecordStore) Get(requestID string) (*Record, error)

Get retrieves a notification record by request ID. Returns sql.ErrNoRows if no record is found.

func (*RecordStore) Log added in v0.9.1

func (s *RecordStore) Log(r *Record) error

Log inserts a fire-and-forget notification record. Unlike [Create], this is for notifications that need no callback tracking — the record exists solely for history awareness. Status is set to StatusSent and ExpiresAt to the zero value.

func (*RecordStore) PendingExpired

func (s *RecordStore) PendingExpired() ([]*Record, error)

PendingExpired returns all records that are still pending but whose expiry time has passed.

func (*RecordStore) Recent added in v0.9.1

func (s *RecordStore) Recent(since time.Time, limit int) ([]*Record, error)

Recent returns the most recent notification records created since the given time, ordered newest-first. It returns both fire-and-forget and actionable records for history awareness.

func (*RecordStore) Respond

func (s *RecordStore) Respond(requestID, actionID string) (bool, error)

Respond marks a pending record as responded with the given action ID. Returns true if the record was updated (was still pending), false if it was already responded or expired. Callers should check the bool to avoid double-processing in race scenarios.

func (*RecordStore) SetResponseAction

func (s *RecordStore) SetResponseAction(requestID, actionID string) error

SetResponseAction records the action that was executed for an expired record. This is used by the timeout watcher to persist which action was auto-executed on timeout, since the record has already transitioned from pending to expired and Respond cannot update it.

type ResponseWaiter added in v0.9.1

type ResponseWaiter struct {
	// contains filtered or unexported fields
}

ResponseWaiter provides synchronous blocking for escalation tools. When a tool sends an actionable notification, it registers a waiter. When the callback fires (MQTT or resolve_actionable), it signals the waiter with the chosen action.

func NewResponseWaiter added in v0.9.1

func NewResponseWaiter() *ResponseWaiter

NewResponseWaiter creates a waiter registry.

func (*ResponseWaiter) Register added in v0.9.1

func (w *ResponseWaiter) Register(requestID string) <-chan EscalationResponse

Register creates a waiter channel for the given request ID. The returned channel receives the response when Signal resolves the callback or the timeout expires.

func (*ResponseWaiter) Signal added in v0.9.1

func (w *ResponseWaiter) Signal(requestID, actionID string) bool

Signal delivers a response to a waiting escalation tool. Returns true if a waiter was found, false if the request has no waiter (e.g., the escalation timed out and was cleaned up, or it was a non-synchronous notification).

func (*ResponseWaiter) SignalTimeout added in v0.9.1

func (w *ResponseWaiter) SignalTimeout(requestID string) bool

SignalTimeout delivers a timeout response to a waiting escalation.

func (*ResponseWaiter) Wait added in v0.9.1

func (w *ResponseWaiter) Wait(ctx context.Context, requestID string, ch <-chan EscalationResponse) (EscalationResponse, error)

Wait blocks until the response arrives or the context is cancelled. Returns an error only when the parent context is cancelled (e.g., the run is shutting down). Cleans up the waiter on exit.

func (*ResponseWaiter) WaitWithTimeout added in v0.9.1

func (w *ResponseWaiter) WaitWithTimeout(ctx context.Context, requestID string, ch <-chan EscalationResponse, timeout time.Duration) (EscalationResponse, error)

WaitWithTimeout blocks until the response arrives, the timeout expires, or the parent context is cancelled. A timeout expiry returns an EscalationResponse with TimedOut=true (not an error), since timeouts are an expected outcome. Only parent context cancellation (e.g., run shutdown) returns an error.

type Sender

type Sender struct {
	// contains filtered or unexported fields
}

Sender delivers notifications via Home Assistant companion app push.

func NewSender

func NewSender(ha HAClient, contacts ContactResolver, opstate OpstateStore, deviceName string, logger *slog.Logger) *Sender

NewSender creates a notification sender. The deviceName parameter is used to derive the action callback prefix via ActionPrefix.

func (*Sender) Send

func (s *Sender) Send(ctx context.Context, n Notification) error

Send resolves the recipient to a HA companion app entity and sends a push notification via HA's notify service.

type SessionInjector

type SessionInjector interface {
	// InjectSystemMessage adds a system message to the conversation's
	// memory. The message will be visible on the next agent turn.
	InjectSystemMessage(conversationID, message string) error
	// IsSessionAlive reports whether a conversation has an active
	// archive session.
	IsSessionAlive(conversationID string) bool
}

SessionInjector injects a system message into a live conversation. Implementations are wired in main.go to avoid import cycles.

type SignalProvider added in v0.9.1

type SignalProvider struct {
	// contains filtered or unexported fields
}

SignalProvider delivers fire-and-forget notifications via Signal by resolving the recipient's phone number from the contact store. When a MessageRecorder is set, outbound notifications are recorded in conversation memory with provenance metadata.

func NewSignalProvider added in v0.9.1

func NewSignalProvider(sender SignalSender, contacts ContactResolver, logger *slog.Logger) *SignalProvider

NewSignalProvider creates a Signal notification provider.

func (*SignalProvider) Name added in v0.9.1

func (p *SignalProvider) Name() string

Name returns the provider identifier.

func (*SignalProvider) Send added in v0.9.1

Send delivers a fire-and-forget notification via Signal. The recipient is resolved to a phone number via contact properties (IMPP with signal: prefix, or TEL).

func (*SignalProvider) SendActionable added in v0.9.1

func (p *SignalProvider) SendActionable(ctx context.Context, req ActionableRequest) error

SendActionable delivers an actionable notification via Signal as a text message with numbered options. The model interprets the user's natural-language reply and can resolve the callback via a future resolve_actionable tool. The message is recorded in conversation memory with request_id and action metadata for that tool to use.

func (*SignalProvider) SetRecorder added in v0.9.1

func (p *SignalProvider) SetRecorder(r MessageRecorder)

SetRecorder configures memory recording for outbound notifications. When set, sent messages are recorded in conversation history with provenance metadata so the agent has context when the user replies.

type SignalSender added in v0.9.1

type SignalSender interface {
	Send(ctx context.Context, recipient, message string) (int64, error)
}

SignalSender abstracts Signal message delivery so the notifications package does not import the signal package. Implemented by signal.Client.

type SourceFunc added in v0.9.1

type SourceFunc func(ctx context.Context) string

SourceFunc extracts a human-readable source identifier from the request context. The returned string identifies the originating loop or conversation (e.g., "metacognitive", "signal/+15125551234"). Used by the router to populate the Source field on notification records for history awareness.

type TimeoutWatcher

type TimeoutWatcher struct {
	// contains filtered or unexported fields
}

TimeoutWatcher periodically checks for pending notification records whose expiry time has passed and executes their configured timeout action.

func NewTimeoutWatcher

func NewTimeoutWatcher(records *RecordStore, dispatcher *CallbackDispatcher, sender EscalationSender, interval time.Duration, logger *slog.Logger) *TimeoutWatcher

NewTimeoutWatcher creates a timeout watcher. The sender parameter is optional and only needed for "escalate" timeout actions that re-send a notification at urgent priority. It accepts any EscalationSender (e.g., *Sender or *NotificationRouter).

func (*TimeoutWatcher) Start

func (w *TimeoutWatcher) Start(ctx context.Context)

Start runs the timeout watcher loop until ctx is cancelled. It checks for expired records at the configured interval.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL