Documentation
¶
Overview ¶
Package notifications provides provider-agnostic notification delivery with routing, callback tracking, and timeout management.
Index ¶
- Constants
- func ActionPrefix(deviceName string) string
- type Action
- type ActionableRequest
- type CallbackDispatcher
- type ContactResolver
- type DelegateSpawner
- type EscalationSender
- type HAClient
- type HAPushProvider
- type Notification
- type NotificationProvider
- type NotificationRequest
- type NotificationRouter
- func (r *NotificationRouter) RegisterProvider(p NotificationProvider)
- func (r *NotificationRouter) Route(recipient string) (NotificationProvider, error)
- func (r *NotificationRouter) Send(ctx context.Context, n Notification) error
- func (r *NotificationRouter) SendActionable(ctx context.Context, req ActionableRequest, sessionID, conversationID string) (string, error)
- func (r *NotificationRouter) SendNotification(ctx context.Context, req NotificationRequest) error
- type OpstateStore
- type Record
- type RecordStore
- func (s *RecordStore) Close() error
- func (s *RecordStore) Create(r *Record) error
- func (s *RecordStore) Expire(requestID string) (bool, error)
- func (s *RecordStore) Get(requestID string) (*Record, error)
- func (s *RecordStore) PendingExpired() ([]*Record, error)
- func (s *RecordStore) Respond(requestID, actionID string) (bool, error)
- func (s *RecordStore) SetResponseAction(requestID, actionID string) error
- type Sender
- type SessionInjector
- type TimeoutWatcher
Constants ¶
const ( StatusPending = "pending" StatusResponded = "responded" StatusExpired = "expired" )
Status constants for notification records.
Variables ¶
This section is empty.
Functions ¶
func ActionPrefix ¶
ActionPrefix returns the action string prefix derived from a device name. Hyphens are replaced with underscores and the result is uppercased, e.g. "aimee-thane" becomes "AIMEE_THANE". An empty deviceName falls back to "THANE".
Types ¶
type ActionableRequest ¶
type ActionableRequest struct {
NotificationRequest
Actions []Action // required, at least one
RequestID string // UUIDv7, set by router
Timeout time.Duration // how long to wait for response
TimeoutAction string // action ID to auto-execute, "escalate", or "cancel"
Context string // model-provided context for callback handling
}
ActionableRequest extends NotificationRequest with callback tracking fields for human-in-the-loop notifications.
type CallbackDispatcher ¶
type CallbackDispatcher struct {
// contains filtered or unexported fields
}
CallbackDispatcher handles MQTT messages on the instance-specific callbacks topic. It parses action strings with the configured prefix, looks up the notification record, marks it as responded, and routes the response to the appropriate handler (session injection or delegate).
func NewCallbackDispatcher ¶
func NewCallbackDispatcher(records *RecordStore, injector SessionInjector, delegate DelegateSpawner, deviceName string, logger *slog.Logger) *CallbackDispatcher
NewCallbackDispatcher creates a callback dispatcher. The deviceName parameter is used to derive the action prefix via ActionPrefix.
func (*CallbackDispatcher) DispatchAction ¶
func (d *CallbackDispatcher) DispatchAction(record *Record, actionID string)
DispatchAction routes a notification response to the originating session (via system message injection) or spawns a delegate if the session is no longer alive. Delegate spawning is offloaded to a goroutine so it does not block the MQTT handler. This method is also used by the timeout watcher to dispatch auto-fired timeout actions.
func (*CallbackDispatcher) Handle ¶
func (d *CallbackDispatcher) Handle(_ string, payload []byte)
Handle is an mqtt.MessageHandler that processes callback messages published by the HA automation when a user taps a notification action button.
type ContactResolver ¶
type ContactResolver interface {
ResolveContact(name string) (*contacts.Contact, error)
GetPropertiesMap(contactID uuid.UUID) (map[string][]string, error)
}
ContactResolver resolves a contact name to its record and properties. ResolveContact uses cascading resolution (exact name → nickname → search) for flexible name matching.
type DelegateSpawner ¶
DelegateSpawner executes a task in a lightweight delegate loop when the originating session is no longer alive.
type EscalationSender ¶
type EscalationSender interface {
Send(ctx context.Context, n Notification) error
}
EscalationSender sends fire-and-forget notifications for timeout escalation. Both *Sender and *NotificationRouter implement this interface, allowing TimeoutWatcher to use either.
type HAClient ¶
type HAClient interface {
CallService(ctx context.Context, domain, service string, data map[string]any) error
}
HAClient is the subset of homeassistant.Client needed for notifications.
type HAPushProvider ¶
type HAPushProvider struct {
// contains filtered or unexported fields
}
HAPushProvider delivers notifications via Home Assistant companion app push by wrapping the existing Sender.
func NewHAPushProvider ¶
func NewHAPushProvider(sender *Sender) *HAPushProvider
NewHAPushProvider creates a provider that delegates to an existing HA notification Sender.
func (*HAPushProvider) Name ¶
func (p *HAPushProvider) Name() string
Name returns the provider identifier.
func (*HAPushProvider) Send ¶
func (p *HAPushProvider) Send(ctx context.Context, req NotificationRequest) error
Send delivers a fire-and-forget notification via HA push.
func (*HAPushProvider) SendActionable ¶
func (p *HAPushProvider) SendActionable(ctx context.Context, req ActionableRequest) error
SendActionable delivers an actionable notification via HA push. The provider only handles delivery; record creation is the router's responsibility.
type Notification ¶
type Notification struct {
Recipient string // contact name (e.g., "nugget")
Title string // notification title (optional)
Message string // notification body (required)
Priority string // "low", "normal" (default), "urgent"
Actions []Action // optional: action buttons (creates tracked notification)
RequestID string // UUIDv7, set by caller when Actions is non-empty
Timeout time.Duration // how long to wait for response (default: 30m)
TimeoutAction string // action ID to auto-execute on timeout, or "escalate"/"cancel"
Context string // model-provided context for callback handling
}
Notification is a push notification. Without Actions it is fire-and-forget (Phase 1). With Actions it creates an actionable notification with callback tracking (Phase 2).
type NotificationProvider ¶
type NotificationProvider interface {
// Send delivers a fire-and-forget notification.
Send(ctx context.Context, req NotificationRequest) error
// SendActionable delivers a notification with action buttons.
// The RequestID is pre-set by the router before calling the
// provider; the provider only handles delivery.
SendActionable(ctx context.Context, req ActionableRequest) error
// Name returns the provider identifier (e.g., "ha_push").
Name() string
}
NotificationProvider delivers notifications through a specific transport (e.g., HA companion app push, Signal, email).
type NotificationRequest ¶
type NotificationRequest struct {
Recipient string // contact name (resolved by router)
Title string // optional
Message string // required
Priority string // "low", "normal", "urgent"
}
NotificationRequest is a fire-and-forget notification delivery request. It contains only the fields common to all providers.
type NotificationRouter ¶
type NotificationRouter struct {
// contains filtered or unexported fields
}
NotificationRouter selects a notification provider based on contact facts and orchestrates delivery for both fire-and-forget and actionable notifications. It is the single entry point for the provider-agnostic send_notification and request_human_decision tools.
func NewNotificationRouter ¶
func NewNotificationRouter(contacts ContactResolver, records *RecordStore, logger *slog.Logger) *NotificationRouter
NewNotificationRouter creates a router with contact resolution and optional record tracking for actionable notifications. A nil logger defaults to slog.Default.
func (*NotificationRouter) RegisterProvider ¶
func (r *NotificationRouter) RegisterProvider(p NotificationProvider)
RegisterProvider adds a notification provider to the router. Nil providers and providers with empty names are rejected. Registering a provider whose name already exists overwrites the previous one and logs a warning.
func (*NotificationRouter) Route ¶
func (r *NotificationRouter) Route(recipient string) (NotificationProvider, error)
Route resolves a recipient to the appropriate provider based on contact properties. It checks for an explicit notification_preference property first, then falls back to checking for known delivery channels.
func (*NotificationRouter) Send ¶
func (r *NotificationRouter) Send(ctx context.Context, n Notification) error
Send satisfies EscalationSender so the router can be used by TimeoutWatcher for escalation notifications. It adapts the legacy Notification struct to a NotificationRequest and routes it.
func (*NotificationRouter) SendActionable ¶
func (r *NotificationRouter) SendActionable(ctx context.Context, req ActionableRequest, sessionID, conversationID string) (string, error)
SendActionable delivers an actionable notification and creates a tracking record. Returns the request ID for the created record. Delivery is attempted first; records are only created on success to avoid dangling records that trigger timeout actions for undelivered notifications.
func (*NotificationRouter) SendNotification ¶
func (r *NotificationRouter) SendNotification(ctx context.Context, req NotificationRequest) error
SendNotification delivers a fire-and-forget notification via the appropriate provider for the recipient.
type OpstateStore ¶
OpstateStore is the subset of opstate.Store needed for recording notifications.
type Record ¶
type Record struct {
RequestID string
Recipient string
OriginSession string
OriginConversation string
Context string
Actions []Action
TimeoutSeconds int
TimeoutAction string
Status string
ResponseAction string
RespondedAt time.Time
CreatedAt time.Time
ExpiresAt time.Time
}
Record tracks an actionable notification from creation through response or expiry. It is the central data type for the HITL callback routing system.
type RecordStore ¶
type RecordStore struct {
// contains filtered or unexported fields
}
RecordStore provides SQLite-backed CRUD for notification records.
func NewRecordStore ¶
func NewRecordStore(dbPath string, logger *slog.Logger) (*RecordStore, error)
NewRecordStore opens (or creates) the notifications database at dbPath and runs schema migrations.
func (*RecordStore) Close ¶
func (s *RecordStore) Close() error
Close closes the underlying database connection.
func (*RecordStore) Create ¶
func (s *RecordStore) Create(r *Record) error
Create inserts a new notification record. The record's Status is set to StatusPending regardless of the caller's value.
func (*RecordStore) Expire ¶
func (s *RecordStore) Expire(requestID string) (bool, error)
Expire marks a pending record as expired. Returns true if the record was updated (was still pending), false if it was already responded or expired. Callers should check the bool to avoid executing timeout actions on records that were concurrently responded to.
func (*RecordStore) Get ¶
func (s *RecordStore) Get(requestID string) (*Record, error)
Get retrieves a notification record by request ID. Returns sql.ErrNoRows if no record is found.
func (*RecordStore) PendingExpired ¶
func (s *RecordStore) PendingExpired() ([]*Record, error)
PendingExpired returns all records that are still pending but whose expiry time has passed.
func (*RecordStore) Respond ¶
func (s *RecordStore) Respond(requestID, actionID string) (bool, error)
Respond marks a pending record as responded with the given action ID. Returns true if the record was updated (was still pending), false if it was already responded or expired. Callers should check the bool to avoid double-processing in race scenarios.
func (*RecordStore) SetResponseAction ¶
func (s *RecordStore) SetResponseAction(requestID, actionID string) error
SetResponseAction records the action that was executed for an expired record. This is used by the timeout watcher to persist which action was auto-executed on timeout, since the record has already transitioned from pending to expired and Respond cannot update it.
type Sender ¶
type Sender struct {
// contains filtered or unexported fields
}
Sender delivers notifications via Home Assistant companion app push.
func NewSender ¶
func NewSender(ha HAClient, contacts ContactResolver, opstate OpstateStore, deviceName string, logger *slog.Logger) *Sender
NewSender creates a notification sender. The deviceName parameter is used to derive the action callback prefix via ActionPrefix.
type SessionInjector ¶
type SessionInjector interface {
// InjectSystemMessage adds a system message to the conversation's
// memory. The message will be visible on the next agent turn.
InjectSystemMessage(conversationID, message string) error
// IsSessionAlive reports whether a conversation has an active
// archive session.
IsSessionAlive(conversationID string) bool
}
SessionInjector injects a system message into a live conversation. Implementations are wired in main.go to avoid import cycles.
type TimeoutWatcher ¶
type TimeoutWatcher struct {
// contains filtered or unexported fields
}
TimeoutWatcher periodically checks for pending notification records whose expiry time has passed and executes their configured timeout action.
func NewTimeoutWatcher ¶
func NewTimeoutWatcher(records *RecordStore, dispatcher *CallbackDispatcher, sender EscalationSender, interval time.Duration, logger *slog.Logger) *TimeoutWatcher
NewTimeoutWatcher creates a timeout watcher. The sender parameter is optional and only needed for "escalate" timeout actions that re-send a notification at urgent priority. It accepts any EscalationSender (e.g., *Sender or *NotificationRouter).
func (*TimeoutWatcher) Start ¶
func (w *TimeoutWatcher) Start(ctx context.Context)
Start runs the timeout watcher loop until ctx is cancelled. It checks for expired records at the configured interval.