notifications

package
v1.3.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 11, 2026 License: Apache-2.0 Imports: 10 Imported by: 0

Documentation

Overview

Package notifications ships Harbor's `notification.*` event family — a NEW event topic on the typed event bus carrying operator-facing notifications (alerts surfaced by the Console's notification centre, the Overview page alert ribbon, and the Settings notification-routing matrix per Brief 11 §CC-3).

The family uses **per-class topic naming**:

  • notification.task_failed (severity Error)
  • notification.tool_approval_requested (severity Warning)
  • notification.governance_budget_exceeded (severity Error)
  • notification.auth_required (severity Warning)
  • notification.pause_requested (severity Info)

Per-class topics compose naturally with the rest of Harbor's event taxonomy (`tool.failed`, `task.completed`, `governance.budget_exceeded` all use per-class topics) and with the `events.subscribe` topic-filter shape (Phase 72a). The alternative — a single `notification.emit` with a payload class field — was rejected per `docs/plans/wave-13-decomposition.md` §12 ("Wire-shape decision left to me").

Architecture:

  • Map(ctx, triggering ev) → []events.Event is a pure function. No I/O. No global state. No time.Now() dependency (OccurredAt is filled by the bus's Publish path, matching the existing bus convention). Concurrent calls against a single shared mapper are safe by construction (no state to share). D-025 is trivially satisfied; the test still runs N=100 concurrent invocations under -race to validate end-to-end.

  • Subscriber is a long-lived bus consumer that wires Map onto Publish. NewSubscriber(bus, log) constructs the wiring; (*Subscriber).Run(ctx) opens an Admin-scope subscription on the V1 trigger event types, runs Map on every delivered event, and republishes each synthesised notification.* event onto the same bus. The originating event's identity.Quadruple is preserved on the synthesised event so identity-scope filtering on the downstream subscriber works without elevation.

  • The NotificationPayload embeds events.Sealed (NOT SafeSealed). The Summary field is caller-controlled (a human-readable one-liner derived from the originating event's typed payload), so the bus's redactor walks the payload on Publish (per D-020 audit-redactor- as-bus-boundary). The synthesised event still rides the same bus.Publish path every other event ships through.

What is OUT of scope for Phase 72d:

  • Notification routing fan-out (email / Slack / web-push). Per-user routing lives in 73m Settings + the Console DB's notifications_routing table (Phase 72h).
  • Severity escalation policy ("notify only when severity >= warning"). V1 emits one notification per matching trigger; downstream filtering uses the existing events.subscribe filter shape.
  • Snooze / dismiss / mute-this-trigger user actions (Console DB only, not runtime entities).
  • Anomaly detection (post-V1).
  • Persistence of notification history (rides the same events bus replay surface — Phase 06 ring + Phase 57 durable log).
  • New Protocol methods (notification.* is an event topic, consumed via the existing events.subscribe surface).

Phase 72d implements the §13 primitive-with-consumer rule via a Stage-1 test consumer (`subscriber_test.go::TestSubscriber_TaskFailedSynthesisesNotificationTaskFailed`) that fires a deliberate `task.failed` and asserts the synthesised `notification.task_failed` arrives at a separately-scoped subscriber via the bus. The UI consumers (73a Overview alert ribbon, 73m Settings notification-routing matrix) land in Stage 2 and cannot substitute for the Stage-1 test consumer per `docs/plans/wave-13-decomposition.md` §12 item 5. See also docs/decisions.md D-109.

Index

Constants

View Source
const (
	// EventTypeNotificationTaskFailed — synthesised from task.failed
	// (Phase 20). Severity Error. The deep-link points at the failed
	// task in the Console.
	EventTypeNotificationTaskFailed events.EventType = "notification.task_failed"

	// EventTypeNotificationToolApprovalRequested — synthesised from
	// tool.approval_requested (Phase 31). Severity Warning. The deep-link
	// points at the pending approval in the Console's Tools page.
	EventTypeNotificationToolApprovalRequested events.EventType = "notification.tool_approval_requested"

	// EventTypeNotificationGovernanceBudgetExceeded — synthesised from
	// governance.budget_exceeded (Phase 36a). Severity Error. The
	// deep-link points at the affected tenant/session's governance
	// posture in the Console.
	EventTypeNotificationGovernanceBudgetExceeded events.EventType = "notification.governance_budget_exceeded"

	// EventTypeNotificationAuthRequired — synthesised from
	// tool.auth_required (Phase 30). Severity Warning. The deep-link
	// points at the OAuth-binding flow in the Console's MCP Connections
	// page.
	EventTypeNotificationAuthRequired events.EventType = "notification.auth_required"

	// EventTypeNotificationPauseRequested — synthesised from
	// pause.requested (Phase 50). Severity Info. The deep-link points
	// at the paused task in the Console's Interventions queue.
	EventTypeNotificationPauseRequested events.EventType = "notification.pause_requested"

	// EventTypeNotificationIdentityRejected — emitted by the Subscriber
	// when a trigger event arrives with the D-033 `<missing>` sentinel
	// substituted into one or more identity components. Mirrors the
	// `memory.identity_rejected` / `skill.identity_rejected` shape so
	// audit observers consume one identity-rejection vocabulary across
	// the runtime. Fail-loudly per CLAUDE.md §13 — the Subscriber
	// emits this rejection event instead of silently dropping the input
	// or silently publishing a notification with a malformed identity.
	EventTypeNotificationIdentityRejected events.EventType = "notification.identity_rejected"
)

V1 notification event-type constants. Each is registered with the canonical events.EventType registry from this package's init() so a Publish never trips events.ErrUnknownEventType.

Per-class topic naming locked per docs/plans/wave-13-decomposition.md §12 + D-109.

Variables

View Source
var ErrUnmappable = errors.New("notifications: triggering event structurally invalid for mapping")

ErrUnmappable is returned by Map when a triggering event is structurally invalid for mapping — its payload type does not match the expected typed payload for its declared event type. Callers compare via errors.Is.

Fail-loudly per CLAUDE.md §13: a structurally invalid trigger event MUST NOT silently degrade to "no notifications." The Subscriber catches ErrUnmappable, logs at Error, and emits a runtime.error observability event; it does NOT republish a malformed notification.

Functions

func Map

func Map(_ context.Context, ev events.Event) ([]events.Event, error)

Map translates a triggering bus event into zero or more synthesised notification.* events.

Map is a PURE function. No I/O. No global state. No time.Now() dependency (the bus's Publish path fills OccurredAt on the synthesised event). Concurrent calls against a single shared mapper instance are trivially safe — there is nothing to share. D-025 concurrent-reuse is satisfied by construction.

Return contract:

  • (nil, nil) — the event's Type is not in V1TriggerEventTypes. The vast majority of bus traffic hits this branch; the Subscriber's bus filter already narrows the input set so the happy-unmapped case is rare in practice, but the contract is defined here so the function is safe to call against any event.

  • ([]events.Event with one element, nil) — the event was a known trigger type with a well-formed typed payload; the slice carries the synthesised notification.* event.

  • (nil, wrapped ErrUnmappable) — the event was a known trigger type BUT the typed payload assertion failed (wrong payload type for the declared event type, or a RedactedMap arrived where the bus normally delivers the typed shape). Fail-loudly per CLAUDE.md §13 — never silently degrade to "no notifications."

The synthesised event carries the trigger's identity.Quadruple, the V1 class's per-class severity (Brief 11 §CC-3 heuristic — see the per-case comments below for rationale), and a per-class deep-link shape. The bus's Publish path fills Sequence and OccurredAt.

Note: trigger event payloads in V1 are all SafePayload by construction (TaskFailedPayload, ToolApprovalRequestedPayload, BudgetExceededPayload, ToolAuthRequiredPayload, PauseRequestedPayload all embed events.SafeSealed). The bus therefore delivers them with their typed shape preserved. A RedactedMap on a known trigger type is a contract violation upstream — Map fails loudly via ErrUnmappable so the violation does not silently downgrade.

func V1NotificationClasses

func V1NotificationClasses() []events.EventType

V1NotificationClasses returns a deterministic snapshot of every notification event-type the V1 mapper synthesises. Useful for boot- log output, for tests asserting exhaustiveness, and for the Subscriber to scope its bus subscription. The identity-rejection class is NOT included — it is emitted only by the Subscriber's own error path and is not a class consumers filter on positively.

func V1TriggerEventTypes

func V1TriggerEventTypes() []events.EventType

V1TriggerEventTypes returns the set of bus event types the V1 mapper listens for. Anything outside this set is unmapped (Map returns (nil, nil)); anything inside maps to exactly one notification.* event.

`agent.credentials_expired` and `runtime.health_degraded` are named in Brief 11 §CC-3's "starter list" but are not shipped V1 event types; the mapper accepts them via the input-type registry as future inputs but emits nothing for V1 unmapped inputs. Adding the mappings is a one-line change in a future phase.

Types

type IdentityRejectedPayload

type IdentityRejectedPayload struct {
	events.SafeSealed

	// Operation is the rejected operation name (always
	// "Subscriber.Run" for V1; future operations may extend the
	// vocabulary).
	Operation string

	// Reason names the missing identity components ("tenant_id empty",
	// "user_id and session_id empty", etc.). Deterministic ordering.
	Reason string

	// OriginEventType is the bus event-type the Subscriber was trying
	// to map when the identity check failed.
	OriginEventType events.EventType
}

IdentityRejectedPayload reports a Subscriber-side identity rejection: the trigger event arrived with the D-033 `<missing>` sentinel substituted into one or more identity components, so the Subscriber emits this rejection event instead of silently dropping the input or synthesising a malformed notification (CLAUDE.md §13 fail-loudly + mirrors the `memory.identity_rejected` shape).

SafePayload by construction — `Operation` is a bounded constant ("Subscriber.Run"), `Reason` is a short static string naming the missing component(s), `OriginEventType` is an EventType enum.

type NotificationPayload

type NotificationPayload struct {
	events.Sealed

	// Class is the notification.* class this payload belongs to. One
	// of EventTypeNotificationTaskFailed / ToolApprovalRequested /
	// GovernanceBudgetExceeded / AuthRequired / PauseRequested.
	Class events.EventType

	// Severity is the operator-visible urgency.
	Severity Severity

	// Summary is a human-readable one-liner the Console renders.
	// Caller-controlled (the mapper derives it from the originating
	// event's typed payload); the audit redactor walks it on Publish.
	Summary string

	// DeepLink is a Protocol-relative path (e.g. "/console/tasks/<id>").
	// The Console renders this via its router.
	DeepLink string

	// OriginEventType is the originating event's Type
	// (e.g. "task.failed").
	OriginEventType events.EventType

	// OriginEventSequence is the originating event's bus Sequence
	// (correlation key).
	OriginEventSequence uint64
}

NotificationPayload is the typed payload for every notification.* event. Embeds events.Sealed (NOT events.SafeSealed): the Summary field is human-readable and derived from caller-controlled bytes on the originating event's typed payload, so the bus's redactor walks the payload on Publish per D-020.

Field semantics:

  • Class is the notification.* class — one of the V1 constants. Echoed here so subscribers parsing only the payload (e.g. when reading from a durable event log without the bus envelope) still have the class.
  • Severity is the operator-visible urgency.
  • Summary is a one-liner the Console renders in its notification centre. Derived from the originating event's typed payload (e.g. `task.failed` → "Task <id> failed with code <code>"). The mapper builds the summary; the audit redactor walks it.
  • DeepLink is a Protocol-relative path the Console's router deep-links into. V1 hard-codes the shape per class; if the Console's route shape changes post-V1 the mapper updates without a Protocol break (the runtime stays the source of truth).
  • OriginEventType is the bus event-type the mapper consumed. Lets subscribers join the notification with its source event without additional bus traffic.
  • OriginEventSequence is the bus Sequence of the originating event. Stable correlation key across the runtime's lifetime.

type Severity

type Severity string

Severity classifies a notification's operator-visible urgency. Per Brief 11 §CC-3 the Console renders notifications in alert ribbons / notification centres with a severity-derived colour and ordering.

V1 is a fixed three-value enum. A richer model (payload-derived severity, dynamic thresholds) is post-V1.

const (
	// SeverityInfo — operator-visible but non-actionable. Example:
	// notification.pause_requested (a planner asked for human input;
	// the runtime is healthy).
	SeverityInfo Severity = "info"

	// SeverityWarning — operator-actionable but not blocking. Example:
	// notification.tool_approval_requested (a tool call is parked on
	// an approval gate); notification.auth_required (a tool needs
	// OAuth binding).
	SeverityWarning Severity = "warning"

	// SeverityError — operator-actionable and blocking. Example:
	// notification.task_failed (a task entered a terminal failure
	// state); notification.governance_budget_exceeded (a budget cap
	// triggered).
	SeverityError Severity = "error"
)

type Subscriber

type Subscriber struct {
	// contains filtered or unexported fields
}

Subscriber wires the rules-engine-lite mapper onto a long-lived bus subscription. NewSubscriber + Run together implement the §13 primitive-with-consumer rule for Phase 72d: the Subscriber consumes the trigger events Phase 20 / 30 / 31 / 36a / 50 already emit and republishes a `notification.*` event for each match through the same bus.

Concurrent-reuse contract (D-025): the Subscriber is a long-lived component constructed once and shared across the bus's lifetime. It owns no per-run state — every call into the mapper is pure. Multiple Subscribers may be wired onto the same bus (rare in production — the default boot wires one — but the type does not block it).

Goroutine-leak contract (CLAUDE.md §11): Run blocks until ctx is cancelled OR the bus closes its delivery channel. On return, the subscription's Cancel() has been called and the bus's reaper has joined the underlying goroutine. The mandatory leak test (`subscriber_test.go::TestSubscriber_Run_GoroutineLeak`) asserts runtime.NumGoroutine() returns to baseline after Run returns.

func NewSubscriber

func NewSubscriber(bus events.EventBus, log *slog.Logger) *Subscriber

NewSubscriber constructs a Subscriber. The bus is mandatory (nil panics — the constructor fails loudly per CLAUDE.md §13 because a nil bus would silently degrade the entire subscriber to a no-op). The logger is mandatory; pass slog.Default() if no contextual logger is available.

func (*Subscriber) Run

func (s *Subscriber) Run(ctx context.Context) error

Run opens an Admin-scope subscription on the V1 trigger event types and republishes each synthesised notification.* event onto the same bus. Blocks until ctx is cancelled OR the bus closes the subscription's delivery channel.

The subscription is Admin-scope (Filter.Admin=true) because the Subscriber is a runtime-internal infrastructure consumer that must fan in across the full identity space — every tenant, user, session generates trigger events the notification topic should cover. The Admin scope use is audit-emitted by the bus on subscription open (events.EventTypeAdminScopeUsed); the Subscriber is therefore observable as a privileged consumer the same way every other Admin-scope subscriber is.

Identity-rejection fail-loudly path: if a delivered trigger event arrives with the D-033 `<missing>` identity sentinel in any component, Run emits a `notification.identity_rejected` event (SafePayload — no caller bytes) AND logs at Error, then continues. The malformed trigger does NOT silently produce a malformed notification.

Mapper-error fail-loudly path: if Map returns a non-nil error (always wrapped ErrUnmappable), Run logs at Error and emits a `runtime.error` event via the bus; no notification.* event is emitted for that trigger.

Publish errors are logged at Error and counted as observability failures; Run continues so a transient bus issue does not collapse the subscriber.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL