Documentation
¶
Overview ¶
Package events owns Harbor's typed event bus surface — the single pub/sub channel every subsystem (telemetry, audit, governance, runtime, planner, tools) Publishes to and Subscribes from. There is no parallel observability channel; the unification of telemetry + chunked output on one bus is a load-bearing decision that closes the predecessor's split-channel sharp edge (brief 06 §1).
Phase 05 ships:
- The exhaustive EventType registry (V1 starter set + the IsValidEventType / EventTypes API; future phases add types by declaring an exported constant + an init() registration in this file).
- The sealed EventPayload interface; concrete payload types live in their owning subsystems and embed events.Sealed to satisfy the seal.
- The Event record, Filter, Subscription and EventBus interfaces.
- Sentinel errors callers compare via errors.Is.
- The §4.4 driver-registry seam (registry.go) so future drivers (replay-equipped Phase 06, durable-log Phase 57) plug in without changing callers.
- Ctx helpers (WithBus / MustFrom / From) mirroring the audit / identity ctx-helper pattern.
What is OUT of scope for Phase 05:
- Replay-from-cursor / ring-buffered driver — Phase 06.
- Durable event-log driver against StateStore — Phase 57.
- Cryptographic Admin scope verification — Phase 61 (Protocol auth).
- Protocol wire encoding / remote consumers — Phase 60.
- Metric label derivation — Phase 56.
Index ¶
- Constants
- Variables
- func IsValidEventType(t EventType) bool
- func MatchWire(ev Event, wire prototypes.EventFilter) bool
- func Register(name string, factory Factory)
- func RegisterEventType(t EventType)
- func RegisterForTest(t interface{ ... }, name string, factory Factory) string
- func RegisteredDrivers() []string
- func ValidateEvent(ev Event) error
- func WithBus(ctx context.Context, bus EventBus) context.Context
- type AdminScopeUsedPayload
- type Aggregator
- type AggregatorClock
- type AggregatorOption
- type AuditRedactionFailedPayload
- type BusDroppedPayload
- type Cursor
- type Event
- type EventBus
- type EventID
- type EventPayload
- type EventType
- type Factory
- type Filter
- type MCPRawHTMLTrustToggledPayload
- type RedactedMap
- type Replayer
- type RunCancelledPayload
- type RunOverridesSetPayload
- type RuntimeErrorPayload
- type SafePayload
- type SafeSealed
- type Sealed
- type Subscription
- type SubscriptionIdleClosedPayload
- type TopologyChangedPayload
- type WireConversion
Constants ¶
const DefaultDriver = "inmem"
DefaultDriver is the Phase 05 production driver name. Phase 06 replay-equipped drivers and Phase 57 durable-log drivers will register additional names; Open switches on cfg.Driver once EventsConfig.Driver is populated by Phase 02.
Variables ¶
var ( // ErrUnknownEventType — Publish was called with an EventType not // in the canonical registry. ErrUnknownEventType = errors.New("events: unknown EventType") // ErrIdentityScopeRequired — Subscribe filter elides the identity // triple AND Admin is false. ErrIdentityScopeRequired = errors.New("events: filter must specify (tenant, user, session) unless Admin") // ErrAdminScopeRequired — reserved; Phase 61 wiring will return // this when a caller claims Admin without a verified scope claim. // Phase 05 trusts the caller; the sentinel is exposed now so the // API surface is stable across the auth wiring. ErrAdminScopeRequired = errors.New("events: admin scope required for cross-session/cross-tenant subscription") // ErrSubscriberLimitReached — per-session subscriber cap hit. ErrSubscriberLimitReached = errors.New("events: per-session subscriber limit reached") // ErrBusClosed — Publish or Subscribe called after Close. ErrBusClosed = errors.New("events: bus is closed") // ErrSequenceProvided — caller pre-filled Event.Sequence; Publish // owns sequence numbering. ErrSequenceProvided = errors.New("events: caller pre-filled Sequence; bus owns sequencing") // ErrInvalidEvent — Event failed structural validation (empty // identity triple, missing Payload, etc.). ErrInvalidEvent = errors.New("events: invalid event") // ErrIdentityRequired — Publish event identity is missing the // triple. Wraps identity.ErrIdentityIncomplete in spirit; bus-side // rejection happens before any redaction or queueing. ErrIdentityRequired = errors.New("events: event identity missing one or more components") // ErrCursorTooOld — Replay was called with a Cursor whose Sequence // is older than the ring's oldest retained entry. Wraps a // "(oldest, requested)" detail in the formatted message so callers // that fall through to a durable log (Phase 57) can interpret the // gap. errors.Is(err, ErrCursorTooOld) is the comparison. ErrCursorTooOld = errors.New("events: cursor older than ring tail") // (EventsConfig.ReplayBufferSize=0) or the driver does not // implement Replayer at all. The type assertion // bus.(events.Replayer) succeeds even when the configured ring // size is zero — callers learn at call time, not at assertion // time, so the same call sites work whether replay is enabled or // not. ErrReplayUnavailable = errors.New("events: replay not available on this driver") )
Sentinel errors. Callers compare via errors.Is.
var ErrAggregateBadWindow = errors.New("events: aggregate Window/Bucket is invalid")
ErrAggregateBadWindow — the request's Window / Bucket pair was structurally invalid: zero or negative Window, zero or negative Bucket, or Bucket does not evenly divide Window. The aggregator fails loudly (CLAUDE.md §5) rather than silently rounding so a rendering client never sees a fractional trailing bucket.
var ErrAggregateIdentityRequired = errors.New("events: aggregate filter must specify (tenant, user, session) unless the caller's identity tuple was supplied")
ErrAggregateIdentityRequired — the request's filter elided the identity triple AND the caller's identity tuple was not supplied. Mirrors ErrIdentityRequired's shape but distinguished so callers can branch on the cause when they want to.
var ErrUnknownDriver = errors.New("events: unknown driver")
ErrUnknownDriver — the requested driver name is not in the registry.
Functions ¶
func IsValidEventType ¶
IsValidEventType reports whether t is in the canonical registry.
func MatchWire ¶
func MatchWire(ev Event, wire prototypes.EventFilter) bool
MatchWire reports whether ev satisfies the wire EventFilter (header fields only — payload bytes are explicitly out of scope per the EventFilter godoc + Brief 11 §CC-4). The matcher is identity + type + window — exactly the surface the aggregator and any post- filtering consumer iterate.
Identity semantics:
- Empty TenantIDs / UserIDs / SessionIDs / RunIDs means "any" on that axis (the aggregator/consumer has already gated on the scope claim; FilterFromWire returned RequiresAdminScope=true if the request needed it).
- A non-empty set means "the event's component must be in the set".
Time semantics:
- Since.IsZero() means "no lower bound." Otherwise ev.OccurredAt must be >= Since (inclusive).
- Until.IsZero() means "no upper bound." Otherwise ev.OccurredAt must be < Until (exclusive).
MatchWire is a pure function — no package-level state, safe for concurrent use (D-025).
func Register ¶
Register installs a driver factory under name. Drivers self-register from their package init(); cmd/harbor blank-imports the production driver to trigger registration. Per AGENTS.md §4.4.
Re-registering the same name panics — the registration model is write-once-at-init and a duplicate signals a build mis-configuration.
func RegisterEventType ¶
func RegisterEventType(t EventType)
RegisterEventType installs a new canonical EventType into the registry. Call from a subsystem-side init() so the type is in the registry before any Publish runs (Publish rejects unregistered types with ErrUnknownEventType).
Re-registering the same value is a no-op; registering an empty EventType panics — silent acceptance would defeat the exhaustive-enum invariant the registry exists to enforce.
func RegisterForTest ¶
func RegisterForTest(t interface {
Helper()
Name() string
Cleanup(func())
}, name string, factory Factory) string
RegisterForTest installs a driver factory under a per-test unique name and registers a t.Cleanup that removes it when the test ends. Use this in tests that need to register a sentinel driver at runtime — production code path is still Register from init().
The function returns the actual registered name (suffixed with the test's name and a counter so two `-count=N` iterations don't collide). The returned name is what callers should set on EventsConfig.Driver to route Open through the sentinel factory.
Without this helper, runtime Register calls leak entries into the process-wide map and panic on the second invocation under `go test -count=N`. The cleanup hook closes that gap.
func RegisteredDrivers ¶
func RegisteredDrivers() []string
RegisteredDrivers returns a sorted list of driver names. Useful for boot-log output and for surfacing in error messages.
func ValidateEvent ¶
ValidateEvent does structural validation: the EventType is in the registry; the identity quadruple has at least the triple; Sequence is zero (assigned by Publish); Payload is non-nil. Returns wrapped sentinels. Callers can call this directly to validate before Publish if they want compile-shaped check; Publish calls it internally.
Types ¶
type AdminScopeUsedPayload ¶
type AdminScopeUsedPayload struct {
SafeSealed
Tenant string
User string
Session string
SubscriberID uint64
}
AdminScopeUsedPayload is emitted whenever Subscribe is called with Filter.Admin=true, regardless of whether the triple is empty or partially specified. Surfaces admin-scope use for after-the-fact auditability — Phase 61 will additionally enforce a cryptographic scope claim, but the audit emit itself is Phase 05's contribution.
type Aggregator ¶
type Aggregator struct {
// contains filtered or unexported fields
}
Aggregator is the compiled artifact that produces time-bucketed event-type counts over a window. It is a reusable artifact in the D-025 sense: bus and clock are set once at construction and never mutated; Aggregate() holds no per-request state on the Aggregator (each request creates its own buckets slice). One Aggregator serves N concurrent requests safely (concurrent_test.go pins it under -race).
The aggregator consumes the bus's Replayer surface for the historical snapshot — runtime-side aggregation per brief 11 §CC-4 ("events are high-cardinality runtime-side; the runtime owns the index and exposes a search method"). When the bus does not implement Replayer (a forward-only driver), Aggregate returns ErrReplayUnavailable — fail loudly, never an empty series that looks like "no events."
func NewAggregator ¶
func NewAggregator(bus EventBus, opts ...AggregatorOption) (*Aggregator, error)
NewAggregator builds the aggregator over a bus. bus is mandatory — a nil fails loud rather than producing an aggregator that nil-panics on the first request. The returned *Aggregator is immutable after construction (D-025) and safe for concurrent use.
func (*Aggregator) Aggregate ¶
func (a *Aggregator) Aggregate(ctx context.Context, req prototypes.EventAggregateRequest) (prototypes.EventAggregateResponse, error)
Aggregate executes one aggregation request. The request's Window + Bucket define a contiguous time series of buckets; the aggregator snapshots the bus's ring (via Replayer) once, filters in Go using MatchWire over the wire filter, and counts each matching event into its bucket.
Window MUST be > 0 AND Bucket MUST be > 0 AND Window % Bucket == 0, else ErrAggregateBadWindow.
The buckets in the response are in chronological order (oldest first); empty buckets are present with an empty Counts map so the rendering client sees a contiguous time axis.
The aggregator respects ctx — ctx.Err() is checked before any expensive work and before each bucket fill. A long aggregate against a high-cardinality bus that is cancelled by the caller returns ctx.Err() promptly.
Aggregate does NOT enforce the cross-tenant scope claim — that is the wire transport's job (the transport calls FilterFromWire, reads RequiresAdminScope, gates on auth.HasScope). By the time Aggregate runs, the request is authorised.
type AggregatorClock ¶
AggregatorClock abstracts the runtime's notion of "now" so the aggregator can be tested with a fake clock. Production callers pass nil and the aggregator falls back to time.Now (UTC). The interface is intentionally narrow: only Now() — the aggregator never needs to tick. Mirrors the inmem-driver Clock seam (no second clock vocabulary added).
type AggregatorOption ¶
type AggregatorOption func(*Aggregator)
AggregatorOption configures NewAggregator at construction.
func WithAggregatorClock ¶
func WithAggregatorClock(c AggregatorClock) AggregatorOption
WithAggregatorClock injects an AggregatorClock for tests. Production callers do not use this; the default realAggregatorClock is correct.
type AuditRedactionFailedPayload ¶
type AuditRedactionFailedPayload struct {
SafeSealed
OriginalType EventType
Reason string
}
AuditRedactionFailedPayload reports a Publish call whose payload the audit redactor refused. Carries the failing event's type and identity but NO original payload bytes — the failure is observable without leaking the bytes the redactor refused.
type BusDroppedPayload ¶
type BusDroppedPayload struct {
SafeSealed
FromSeq uint64
ToSeq uint64
DroppedCount uint64
SubscriberID uint64
}
BusDroppedPayload reports a bounded burst of dropped events that were silently lost before the bus emitted this notification. The dropped sequence range is closed/closed: [FromSeq, ToSeq] inclusive.
The bus emits at most one BusDroppedPayload per DropWindow per subscriber; the range covers every event lost since the last emit.
type Cursor ¶
Cursor identifies the last event a subscriber has consumed for a session. Sequence is the per-bus monotonic value assigned by Publish; SessionID scopes the cursor so two subscribers on different sessions can use the same numeric Sequence without collision. The bus that issued the Sequence is the only bus the Cursor is meaningful against; cross-bus replay is not supported.
Cursor{Sequence: 0} has the special meaning "from the beginning" for Replayer.Replay — equivalent to "the caller has seen nothing yet" — and bypasses the ErrCursorTooOld check so a fresh client can read whatever the ring still retains without coordinating on its tail.
type Event ¶
type Event struct {
Type EventType
Identity identity.Quadruple
OccurredAt time.Time
Sequence uint64
Payload EventPayload
Extra map[string]string
}
Event is the canonical bus record.
Sequence is per-bus monotonic and gap-free; assigned by Publish. Callers MUST NOT pre-fill Sequence (Publish rejects with ErrSequenceProvided). OccurredAt defaults to time.Now() when zero.
Extra is reserved for Phase 56's bounded low-cardinality metric labels. Phase 05 does not derive metrics; the slot exists so later phases can populate it without changing the Event shape.
type EventBus ¶
type EventBus interface {
Publish(ctx context.Context, ev Event) error
Subscribe(ctx context.Context, f Filter) (Subscription, error)
Close(ctx context.Context) error
}
EventBus is the canonical pub/sub surface. Implementations MUST be safe for concurrent use by N goroutines against a single shared instance (D-025).
func MustFrom ¶
MustFrom returns the EventBus in ctx; panics with ErrBusClosed (used as the sentinel for "no bus configured") when none is present. Use in handler/runtime paths where a bus is mandatory.
func Open ¶
Open returns an EventBus built by the factory whose name matches cfg.Driver (defaults to DefaultDriver when cfg.Driver is empty). The audit.Redactor is mandatory: every Publish runs payloads through it before enqueueing.
func OpenDriver ¶
OpenDriver opens a specific driver by name; useful for tests that want to exercise the registry against a non-default driver.
type EventID ¶
type EventID string
EventID is a per-event idempotency key. ULID-shaped at construction time; the convention is to generate via `state.NewEventID` (or any caller-side ULID source) at the publish site. Used by `internal/distributed.BusEnvelope.EventID` to dedupe at-least-once deliveries on `(TaskID, Edge, EventID)`.
The type lives here (and not in `internal/state`) so distributed callers can reference it without importing state's persistence surface. The two namespaces are intentionally parallel: state's `EventID` is the persistence-layer idempotency key; events' `EventID` is the bus-layer correlation key. Both ULID-shaped, both caller-supplied; consumers MAY pass the same value across both layers when convenient.
type EventPayload ¶
type EventPayload interface {
// contains filtered or unexported methods
}
EventPayload is the sealed payload interface. Concrete payload types live alongside their owning subsystems and embed Sealed to satisfy the seal. The unexported method on Sealed is the seal.
type EventType ¶
type EventType string
EventType is a string-typed exhaustive enum. Each canonical type is declared as an exported constant plus registered in init() so the registry stays the single source of truth.
Adding a new type in a later phase: declare an exported constant, extend canonicalTypes, and update the master plan / glossary if the new type is reader-facing.
const ( // EventTypeRuntimeError — emitted by the Phase 04 logger on Error. EventTypeRuntimeError EventType = "runtime.error" // EventTypeRuntimeWarning — reserved for future runtime-warn emits. EventTypeRuntimeWarning EventType = "runtime.warning" // EventTypeBusDropped — emitted by the bus into a subscriber's // stream when its buffer overflowed; carries the dropped sequence // range. At most one per DropWindow per subscriber. EventTypeBusDropped EventType = "bus.dropped" // EventTypeBusSubscriptionIdleClosed — emitted by the reaper when // a subscription is cancelled for not draining within IdleTimeout. EventTypeBusSubscriptionIdleClosed EventType = "bus.subscription_idle_closed" // EventTypeAuditRedactionFailed — emitted when audit.Redactor.Redact // returns an error during Publish. Carries the failing event's // type + identity but NO payload bytes. EventTypeAuditRedactionFailed EventType = "audit.redaction_failed" // EventTypeAdminScopeUsed — emitted when a Subscribe call passes // Admin: true (cross-session/cross-tenant) so admin-scope use is // retroactively detectable. EventTypeAdminScopeUsed EventType = "audit.admin_scope_used" // EventTypeGovernanceBudgetExceeded — reserved for Phase 36b emit. EventTypeGovernanceBudgetExceeded EventType = "governance.budget_exceeded" // EventTypeGovernanceRateLimited — reserved for Phase 36b emit. EventTypeGovernanceRateLimited EventType = "governance.rate_limited" // EventTypeRuntimeRunCancelled — emitted by Engine.Cancel(runID) // when the cancellation was observed for an active run. Payload is // RunCancelledPayload (SafePayload). Phase 13. EventTypeRuntimeRunCancelled EventType = "runtime.run_cancelled" // EventTypeTopologyChanged — emitted by the engine on construction // and on every adjacency-set change, carrying the canonical // TopologyProjection (Phase 74 / D-114). Payload is // TopologyChangedPayload (SafePayload — the projection has no // secret-shaped fields, so the audit redactor bypasses it and // subscribers keep typed access). The paired request-side surface // is the `topology.snapshot` Protocol method. EventTypeTopologyChanged EventType = "topology.changed" // EventTypeMCPRawHTMLTrustToggled — emitted by the runtime (the // Phase 73k MCPSurface) when a Console admin flips the per-server // raw-HTML opt-in flag via `mcp.servers.set_raw_html_trust`. Payload // is MCPRawHTMLTrustToggledPayload (SafePayload by construction — // server name + boolean + actor identity quadruple; no upstream MCP // content). The default-deny posture for raw-HTML / SVG rendering // (brief 11 §8) makes this audit event the load-bearing record of // the operator's explicit opt-in. Phase 73k / D-119. EventTypeMCPRawHTMLTrustToggled EventType = "mcp.raw_html_trust_toggled" // EventTypeRunOverridesSet — emitted by the runtime (the Phase 73n // Runs surface) when the Console Playground page records a // next-message override via `runs.set_overrides`. Payload is // RunOverridesSetPayload (SafePayload by construction — the actor's // identity quadruple, the session id, and a bounded set of boolean // "which field was set" flags; the override VALUES themselves are // NOT in the payload, since a system-prompt override is // caller-supplied text that must not reach an audit subscriber // unredacted — CLAUDE.md §7). Phase 73n / D-130. EventTypeRunOverridesSet EventType = "runs.overrides_set" )
V1 starter set. Phase 03 / 04 / 36b will populate the matching emit paths; Phase 05 emits the bus-internal types itself.
func EventTypes ¶
func EventTypes() []EventType
EventTypes returns a deterministic snapshot of every registered type, lexicographically sorted.
type Factory ¶
Factory builds an EventBus from EventsConfig + an audit.Redactor. Drivers expose one Factory each via init() → Register.
type Filter ¶
type Filter struct {
Tenant string
User string
Session string
// Run, when non-empty, narrows the subscription to a single run
// inside the (tenant, user, session) scope. An empty Run means
// "every run in the session" (session-scoped subscription) — the
// Phase 60 default. The wire transport carries this via the
// optional `X-Harbor-Run` (stream/HeaderRun) carrier header.
// PR #91 / D-082 (Wave 10 audit WARN-5).
Run string
Types []EventType
Admin bool
}
Filter is the server-enforced subscription predicate. Subscribe rejects filters that elide the identity triple (Tenant + User + Session) unless Admin is set. When Admin is set with a partial triple, the bus emits an audit.admin_scope_used event before returning the subscription.
The Admin claim is trust-based in Phase 05. Cryptographic verification arrives with Protocol auth in Phase 61; until then the audit emit on every Admin-true Subscribe makes any abuse retroactively detectable.
func (Filter) HasFullTriple ¶
HasFullTriple reports whether the filter specifies all three identity components.
type MCPRawHTMLTrustToggledPayload ¶
type MCPRawHTMLTrustToggledPayload struct {
SafeSealed
// Actor is the identity quadruple of the admin who toggled the flag.
Actor identity.Quadruple
// ServerName is the MCP server the flag was toggled on.
ServerName string
// Trusted is the new raw-HTML trust value.
Trusted bool
// OccurredAt is the wall-clock instant the toggle was applied.
OccurredAt time.Time
}
MCPRawHTMLTrustToggledPayload is the SafePayload carried by an EventTypeMCPRawHTMLTrustToggled event (Phase 73k / D-119). The runtime publishes one on every successful `mcp.servers.set_raw_html_trust` Protocol call.
It is SafePayload by construction (D-028): the payload carries only the MCP server name, the new boolean trust value, the actor's identity quadruple, and the toggle instant — no upstream MCP content, no secret-shaped fields. The bus therefore skips the audit.Redactor for this payload and a subscriber keeps typed access.
type RedactedMap ¶
RedactedMap is the post-redaction payload form for payloads that did NOT implement SafePayload and whose Redact() output became a generic map[string]any (the audit redactor's normalised shape for reflective struct walks). Subscribers can extract redacted fields via Data lookup.
type Replayer ¶
type Replayer interface {
// Replay returns events whose Sequence > from.Sequence and that
// match f, in Sequence order (strictly increasing). The returned
// slice is owned by the caller. (nil, nil) is the "nothing newer
// to replay" case (cursor at or past the bus head). See
// ErrCursorTooOld and ErrReplayUnavailable for failure modes.
//
// The same filter rules as Subscribe apply: empty-triple
// non-admin filters are rejected with ErrIdentityScopeRequired,
// and Admin-scope replay emits an audit.admin_scope_used event
// before returning the snapshot so admin-scope use is observable.
Replay(ctx context.Context, from Cursor, f Filter) ([]Event, error)
}
Replayer is the optional capability interface drivers may implement to support replay-from-cursor. EventBus.Subscribe + Replayer.Replay together give the caller a "resume cleanly after disconnect" pattern:
- Open a fresh Subscribe with the desired filter — let the live stream begin queuing into the subscriber's buffer.
- Call Replay(lastSeenCursor, filter) — drain the historical snapshot strictly newer than the cursor.
- Live-tail the Subscribe channel; dedupe against the snapshot's last sequence so a Publish landing between Subscribe and Replay is not double-counted.
A driver that does not implement Replayer (or whose EventsConfig.ReplayBufferSize is 0) returns ErrReplayUnavailable. The type assertion bus.(events.Replayer) still succeeds in the configured-off case — the assertion is a compile-shaped contract; the runtime decision lives in the call.
type RunCancelledPayload ¶
type RunCancelledPayload struct {
SafeSealed
RunID string
CancelledAt int64
DroppedEnvelopeCount int64
}
RunCancelledPayload is emitted by Engine.Cancel(runID) when the cancellation was observed for an active run. Carries the RunID, the wall-clock CancelledAt timestamp (unix nanoseconds), and the number of envelopes the cancellation drained from the engine's channels (a coarse "how loaded was the run" metric).
SafePayload by construction — every field is internal bookkeeping (no caller-controlled bytes). Subscribers consume the typed shape directly without an audit-redactor walk. Phase 13.
type RunOverridesSetPayload ¶
type RunOverridesSetPayload struct {
SafeSealed
// Actor is the identity quadruple of the operator who recorded the
// override.
Actor identity.Quadruple
// SessionID is the session the override applies to.
SessionID string
// SetReasoningEffort reports whether the reasoning-effort field was
// set in this override.
SetReasoningEffort bool
// SetTemperature reports whether the temperature field was set.
SetTemperature bool
// SetMaxTokens reports whether the max-tokens field was set.
SetMaxTokens bool
// SetSystemPrompt reports whether the system-prompt-override field
// was set.
SetSystemPrompt bool
// OccurredAt is the wall-clock instant the override was recorded.
OccurredAt time.Time
}
RunOverridesSetPayload is the SafePayload carried by an EventTypeRunOverridesSet event (Phase 73n / D-130). The runtime publishes one on every successful `runs.set_overrides` Protocol call.
It is SafePayload by construction (D-028): the payload carries only the actor's identity quadruple, the target session id, the recording instant, and a bounded set of boolean "which field was set" flags. The override VALUES are deliberately NOT in the payload — a system-prompt override is caller-supplied free text, and a reasoning-effort / temperature / max-tokens value could leak intent; the bus must never carry caller-supplied bytes unredacted (CLAUDE.md §7). A subscriber that needs the values reads them from the runtime's pending-override slot under the same identity scope. The bus therefore skips the audit.Redactor for this payload and a subscriber keeps typed access.
type RuntimeErrorPayload ¶
RuntimeErrorPayload is the bus-side projection of a Logger.Error call. The telemetry/eventbus adapter constructs one of these from the redacted (msg, attrs) the Logger handed to its BusEmitter seam, so RuntimeErrorPayload arrives at the bus pre-redacted.
Even so, it is NOT marked SafePayload: a defensive contributor might later construct a RuntimeErrorPayload outside the Logger path (e.g. emitting a runtime error directly from a handler that bypassed the redactor). Running this payload through the bus redactor on every Publish is an extra walk per error event, but it preserves the audit-redactor-as-bus-boundary contract (D-020).
Fields is the slog.Attr key/value map after Logger redaction, in `map[string]any` shape so the audit redactor's reflective walk is deterministic.
type SafePayload ¶
type SafePayload interface {
EventPayload
// contains filtered or unexported methods
}
SafePayload marks payload types whose contents are known not to carry secrets. Bus implementations skip the audit.Redactor for these — typed access is preserved on the subscriber side.
Bus-internal payloads (BusDropped, IdleClosed, AuditRedactionFailed, AdminScopeUsed) are SafePayload by construction. External payloads MAY implement SafePayload when their declarer is confident the type carries no secret-shaped data; declarers in doubt should NOT implement SafePayload — the bus will run the value through the redactor and the subscriber-side payload becomes a RedactedMap.
type SafeSealed ¶
type SafeSealed struct{ Sealed }
SafeSealed is embedded in payload structs that are both EventPayload AND SafePayload. Composes Sealed.
type Sealed ¶
type Sealed struct{}
Sealed is embedded in concrete payload structs to satisfy EventPayload from any package. The interface stays sealed in spirit — to declare a payload, you have to import this package.
Standard Go pattern (mirrors net/netip.Addr's seal, encoding/gob stdlib types, etc.). Compile-time enforcement.
type Subscription ¶
type Subscription interface {
Events() <-chan Event
Cancel()
}
Subscription delivers events to one consumer.
Events() returns a receive-only channel. The channel is closed by the bus on Cancel or Close — consumers can use the close as the termination signal.
Cancel is idempotent and safe to call from any goroutine.
type SubscriptionIdleClosedPayload ¶
type SubscriptionIdleClosedPayload struct {
SafeSealed
SubscriberID uint64
IdleSeconds float64
}
SubscriptionIdleClosedPayload reports that the reaper cancelled a subscription that had not drained its channel within IdleTimeout.
type TopologyChangedPayload ¶
type TopologyChangedPayload struct {
SafeSealed
// Projection is the canonical engine topology at the moment the
// event was emitted.
Projection types.TopologyProjection
}
TopologyChangedPayload is the SafePayload carried by an EventTypeTopologyChanged event (Phase 74 / D-114). The engine publishes one on construction and on every adjacency-set change.
It is SafePayload by construction (D-028): a TopologyProjection carries only node names, edge endpoints, kind tags, and bounded integer queue depths — no secret-shaped fields. The bus therefore skips the audit.Redactor for this payload and a subscriber keeps typed access to Projection (no RedactedMap downgrade).
type WireConversion ¶
type WireConversion struct {
// Filter is the bus-facing predicate the caller passes to Subscribe.
// Triple components are backfilled from the caller's identity.
Filter Filter
// RequiresAdminScope is true when the wire EventFilter requested a
// cross-tenant fan-in (a TenantID set other than the caller's own,
// or len(TenantIDs) > 1). The wire edge gates on auth.HasScope
// before calling Subscribe; without the scope, the request is
// rejected with CodeIdentityScopeRequired (HTTP 403).
RequiresAdminScope bool
// Since / Until carry the optional time-window bounds from the wire
// filter. They are NOT enforced by Filter.Matches (the bus's
// per-event match is identity + type only); the aggregator + any
// post-filtering replay consult them directly.
Since time.Time
Until time.Time
}
WireConversion is the result of converting a wire EventFilter into a runtime Filter (see FilterFromWire). The wire shape (prototypes.EventFilter) is operator-facing and lives in internal/protocol/types; the runtime shape (events.Filter) is bus-facing and lives here. This conversion is the one place the two namespaces meet — the wire transport produces a WireConversion once per request.
Semantics of the conversion:
- Empty TenantIDs / UserIDs / SessionIDs / RunIDs on the wire are interpreted as "the caller's own component" — the caller's identity quadruple supplies the value. This preserves backward compatibility with the pre-72a wire shape (no filter struct → full triple-scoped subscription).
- A single non-caller TenantID OR len(TenantIDs) > 1 signals a cross-tenant request — the result has RequiresAdminScope == true so the wire edge can gate on auth.HasScope before calling Subscribe.
- EventTypes are converted 1:1.
- Since / Until are passed through unchanged.
The conversion NEVER silently drops a missing identity component: it returns the (best-effort) Filter alongside RequiresAdminScope; the caller is responsible for the actual rejection if RequiresAdminScope is true but the scope claim is absent. (CLAUDE.md §5 "fail loudly.")
WireConversion is a pure value type: zero package-level state, safe for concurrent use (D-025).
func FilterFromWire ¶
func FilterFromWire( wire prototypes.EventFilter, callerTenant, callerUser, callerSession string, ) WireConversion
FilterFromWire converts a wire EventFilter into a runtime Filter, resolving missing identity components from callerTenant / callerUser / callerSession (the (tenant, user, session) triple verified at the auth middleware edge). callerTenant must be non-empty — a missing caller triple is a rejection upstream (CodeIdentityRequired) and this helper is not the place to mask it.
The returned WireConversion is a value; the caller passes WireConversion.Filter to bus.Subscribe and reads RequiresAdminScope to decide whether to gate on auth.HasScope.
Directories
¶
| Path | Synopsis |
|---|---|
|
drivers
|
|
|
durable
Package durable is Harbor's StateStore-backed durable event log driver (Phase 57, RFC §6.13).
|
Package durable is Harbor's StateStore-backed durable event log driver (Phase 57, RFC §6.13). |
|
inmem
Package inmem is Harbor's V1 in-memory EventBus driver.
|
Package inmem is Harbor's V1 in-memory EventBus driver. |