Documentation
¶
Overview ¶
Package event provides a subject-routed publish/subscribe bus for Envelopes, plus an in-memory implementation suitable for single-process fan-out.
File layout:
bus.go — Bus / Subscription / SubOption interface contract
plus NoopBus. This is the stable surface; everything
else in the package implements or supports it.
memory.go — MemoryBus, the in-process Bus implementation.
envelope.go — Envelope value type and well-known headers.
subject.go — Subject / Pattern routing keys.
observer.go — Observer instrumentation hook + BackpressurePolicy.
Index ¶
- Constants
- Variables
- func ProducerKind(subj Subject) string
- type AckSubscription
- type BackpressurePolicy
- type Bus
- type DropReason
- type Envelope
- func (e Envelope) ActorID() stringdeprecated
- func (e Envelope) AgentID() string
- func (e Envelope) Decode(out any) error
- func (e Envelope) GraphID() string
- func (e Envelope) Header(key string) string
- func (e Envelope) KanbanScopeID() string
- func (e Envelope) NodeID() string
- func (e Envelope) RunID() string
- func (e *Envelope) SetActorID(id string)deprecated
- func (e *Envelope) SetAgentID(id string)
- func (e *Envelope) SetGraphID(id string)
- func (e *Envelope) SetHeader(key, value string)
- func (e *Envelope) SetKanbanScopeID(id string)
- func (e *Envelope) SetNodeID(id string)
- func (e *Envelope) SetRunID(id string)
- func (e *Envelope) SetTenant(id string)
- func (e Envelope) SpanContext() trace.SpanContext
- func (e Envelope) Tenant() string
- func (e Envelope) WithRemoteContext(ctx context.Context) context.Context
- type MemoryBus
- type MemoryBusOption
- type NoopBus
- type Observer
- type Pattern
- type SubOption
- type Subject
- type Subscription
- type SubscriptionID
Constants ¶
const ( HeaderRunID = "run_id" HeaderNodeID = "node_id" // HeaderAgentID identifies the agent (sdk/agent.Agent.ID) that // produced this envelope — i.e. the executor identity in // engine-neutral terms. The producer end is sdk/agent.Run, which // promotes Agent.ID into engine.Run.Attributes under // telemetry.AttrAgentID; engines forward that into the envelope // header via SetAgentID. Consumers that need to fan-in / filter // by "which agent did this" subscribe on this dimension. HeaderAgentID = "agent_id" // HeaderActorID is the legacy spelling of HeaderAgentID. It // exists for back-compat with envelopes produced by pre-v0.4 // SDK consumers; SetAgentID dual-writes both header keys until // v0.5.0 so existing observers keep working without a coordinated // flag day. // // Deprecated: use HeaderAgentID. This constant and the matching // SetActorID / ActorID accessors are removed in v0.5.0. HeaderActorID = "actor_id" HeaderGraphID = "graph_id" HeaderTenant = "tenant" // HeaderKanbanScopeID identifies the sdk/kanban Board scope // (Board.ScopeID) that produced an envelope. Distinct from // HeaderRunID — kanban events do not happen inside an engine run // — so consumers that want to fan-in by board need a dedicated // dimension. Stored as a typed header (mirroring HeaderRunID / // HeaderGraphID) rather than crammed into Envelope.Source so it // composes with the existing well-known-header convention. HeaderKanbanScopeID = "kanban_scope_id" )
Well-known header keys. Consumers may add arbitrary headers; these constants exist to prevent key drift across producers.
Variables ¶
var ErrBusClosed = errors.New("event: bus closed")
ErrBusClosed is returned by Publish / Subscribe after a Bus has been closed. Implementations must return this exact value (not a wrapped variant) so callers can compare with errors.Is.
var ErrInvalidPattern = errors.New("event: invalid pattern")
ErrInvalidPattern indicates a malformed Pattern literal.
var ErrInvalidSubject = errors.New("event: invalid subject")
ErrInvalidSubject indicates a malformed Subject literal.
Functions ¶
func ProducerKind ¶ added in v0.2.5
ProducerKind returns the leading segment of subj — by SDK convention this identifies the producer ("engine", "kanban", and future "pod" / "agent" / "history"). Returns "" when subj is empty or has no separator.
This is the canonical way to discriminate producer kind in a fan-out subscription that matches multiple producers (e.g. ">"). It avoids inventing a parallel naming system on Envelope.Source: the Subject already encodes the producer, and pattern matching (`engine.>` / `kanban.>`) already does the corresponding subscribe- time filter without any helper.
Types ¶
type AckSubscription ¶ added in v0.2.5
type AckSubscription interface {
Subscription
// Ack acknowledges that envelopeID has been successfully processed.
// See the AckSubscription doc comment for the per-call contract.
Ack(envelopeID string) error
}
AckSubscription is the optional capability a Subscription may expose when its parent Bus implements at-least-once delivery (typically cross-process buses backed by NATS / Redis Streams / Kafka). In-process implementations such as MemoryBus do not implement this interface; consumers should type-assert and treat the absence as "auto-ack".
Semantics (v1, intentionally minimal):
- Ack confirms successful processing of the envelope identified by envelopeID. Implementations MUST tolerate duplicate Ack calls and return nil on the second invocation.
- Ack on an unknown envelopeID returns nil (defensive — replays and restarts can legitimately surface envelopes outside the current subscription's tracked window).
- Negative-acknowledge / explicit redelivery is intentionally NOT part of v1. Implementations that need it should expose an additional, named interface (e.g. NackSubscription) so callers can opt in without breaking the minimal contract.
Bus implementations without persistent delivery (MemoryBus, NoopBus) MUST NOT implement this interface; their consumers therefore take the auto-ack branch automatically.
type BackpressurePolicy ¶ added in v0.1.12
type BackpressurePolicy int
BackpressurePolicy controls what happens when a subscription's buffer is already full at Publish time.
const ( // DropNewest drops the incoming envelope and keeps existing buffered // items. Default policy: prioritises older, presumably already-observed // events. DropNewest BackpressurePolicy = iota // DropOldest drops the oldest buffered envelope to make room for the // new one. Use when the latest state is more valuable than history. DropOldest // Block makes Publish wait until either the buffer has room or the // publishing context is cancelled. Use sparingly: a slow subscriber // will back-pressure every publisher. Block // Sample probabilistically forwards each envelope according to the // per-subscription rate set with WithSampleRate (default 1.0 = // pass-through). Envelopes that pass the sampling gate are then // enqueued non-blockingly; if the buffer is full at that point the // envelope is dropped with reason DropReasonBufferFull. Envelopes // rejected by the sampling gate itself are dropped with reason // DropReasonSampled. // // Use this for high-volume streams (token deltas, voice frames, // progress beacons) where exact delivery is unnecessary and the // alternatives have undesirable side effects: DropNewest / // DropOldest only kick in once the buffer fills (so a fast // subscriber sees every event and a slow one loses bursts at // random), while Block back-pressures the producer. Sample yields // a bounded, predictable delivery ratio independent of subscriber // speed. Sample )
func (BackpressurePolicy) String ¶ added in v0.1.12
func (p BackpressurePolicy) String() string
String returns a stable label for diagnostics.
type Bus ¶ added in v0.1.12
type Bus interface {
// Publish delivers env to every matching subscription. Implementations
// must:
// - fill env.ID and env.Time when zero;
// - return ErrBusClosed once Close has been invoked;
// - guarantee that, on return, the envelope has been enqueued for
// every matching subscription, or dropped according to that
// subscription's BackpressurePolicy.
//
// Cross-process implementations (added in later commits) may relax
// the on-return delivery guarantee to "fire-and-forget"; that
// relaxation must be documented at the implementation level.
Publish(ctx context.Context, env Envelope) error
// Subscribe creates a new subscription matching pattern. The returned
// Subscription is closed automatically when ctx is cancelled.
// pattern is validated and a malformed value returns an error.
Subscribe(ctx context.Context, pattern Pattern, opts ...SubOption) (Subscription, error)
// Close shuts down the bus. After Close returns, every subscription
// channel will be closed and subsequent Publish / Subscribe calls
// return ErrBusClosed. Close is idempotent.
Close() error
}
Bus is a publish-subscribe channel for Envelopes routed by Subject / Pattern.
type DropReason ¶ added in v0.1.12
type DropReason int
DropReason explains why an envelope was discarded for a particular subscription.
const ( // DropReasonBufferFull indicates the subscription buffer was full and // the active BackpressurePolicy chose to drop. DropReasonBufferFull DropReason = iota // DropReasonClosed indicates Publish raced with subscription close; // the envelope was not delivered. DropReasonClosed // DropReasonSampled indicates the subscription's Sample // backpressure policy probabilistically rejected this envelope. // Distinct from DropReasonBufferFull so dashboards can // differentiate "we voluntarily threw it away" from "the consumer // could not keep up". DropReasonSampled )
func (DropReason) String ¶ added in v0.1.12
func (r DropReason) String() string
String returns a stable label for diagnostics.
type Envelope ¶ added in v0.1.12
type Envelope struct {
ID string `json:"id"`
Subject Subject `json:"subject"`
Time time.Time `json:"time"`
Source string `json:"source,omitempty"`
TraceID string `json:"trace_id,omitempty"`
SpanID string `json:"span_id,omitempty"`
Headers map[string]string `json:"headers,omitempty"`
Payload json.RawMessage `json:"payload,omitempty"`
}
Envelope is the cross-process-friendly carrier for a single event.
Every field serialises cleanly to JSON. Payload is stored as json.RawMessage so:
- in-memory and remote bus implementations behave identically (bytes in, bytes out — no any-typed surprises);
- server-side persistence and SSE forwarding can avoid an extra round of marshal/unmarshal;
- consumers decide when to decode (and into what concrete type).
Use NewEnvelope / MustEnvelope to construct envelopes; use Decode to extract the payload into a typed Go value.
func MustEnvelope ¶ added in v0.1.12
MustEnvelope is the panic variant of NewEnvelope, intended for static initialisation paths where a marshalling failure indicates a programming bug rather than a runtime condition.
func NewEnvelope ¶ added in v0.1.12
NewEnvelope constructs an Envelope with ID and Time populated.
Behaviour:
- if subject is empty, returns ErrInvalidSubject (Subject.Validate is called for early rejection of malformed routing keys);
- if payload is nil, Payload stays nil (no "null" bytes);
- if payload is already a json.RawMessage, it is reused verbatim — no re-encoding;
- otherwise, payload is JSON-encoded;
- if ctx carries an OTel span, TraceID / SpanID are filled from it so downstream subscribers can correlate envelopes back to the producing trace without separate plumbing.
func (Envelope) ActorID
deprecated
added in
v0.1.12
ActorID is the legacy spelling of Envelope.AgentID.
Deprecated: use Envelope.AgentID. Removed in v0.5.0.
func (Envelope) AgentID ¶ added in v0.3.4
AgentID returns the producer's agent identifier. It prefers HeaderAgentID (the post-v0.4 canonical key) and falls back to the legacy HeaderActorID so envelopes produced by older SDK versions still resolve correctly. After v0.5.0 only HeaderAgentID is read.
func (Envelope) Decode ¶ added in v0.1.12
Decode unmarshals Payload into out. Returns nil (no-op) when Payload is empty so that callers can ignore decode for header-only events.
func (Envelope) GraphID ¶ added in v0.1.12
GraphID returns the value of the well-known graph_id header.
func (Envelope) KanbanScopeID ¶ added in v0.2.5
KanbanScopeID returns the value of the well-known kanban_scope_id header.
func (Envelope) NodeID ¶ added in v0.1.12
NodeID returns the value of the well-known node_id header.
func (*Envelope) SetActorID
deprecated
added in
v0.1.12
SetActorID is the legacy spelling of SetAgentID.
Deprecated: use Envelope.SetAgentID. The "actor" terminology pre-dates the agent / step-actor distinction settled in v0.4 (envelope header "actor_id" is the producer agent identity; the per-step "actor" segment in engine.SubjectStep* subjects is a separate dimension, see sdk/engine/subjects.go). Removed in v0.5.0.
func (*Envelope) SetAgentID ¶ added in v0.3.4
SetAgentID stamps the agent identifier (sdk/agent.Agent.ID) of the producer onto the envelope.
The call is dual-write: it sets both HeaderAgentID (the canonical post-v0.4 key) AND HeaderActorID (the legacy spelling) so observers that have not yet migrated keep working unchanged. The legacy header is removed in v0.5.0; producers should call SetAgentID and stop relying on SetActorID.
func (*Envelope) SetGraphID ¶ added in v0.1.12
SetGraphID is a typed shorthand for SetHeader(HeaderGraphID, id).
func (*Envelope) SetHeader ¶ added in v0.1.12
SetHeader sets a header value, allocating Headers lazily.
func (*Envelope) SetKanbanScopeID ¶ added in v0.2.5
SetKanbanScopeID is a typed shorthand for SetHeader(HeaderKanbanScopeID, id).
func (*Envelope) SetNodeID ¶ added in v0.1.12
SetNodeID is a typed shorthand for SetHeader(HeaderNodeID, id).
func (*Envelope) SetRunID ¶ added in v0.1.12
SetRunID is a typed shorthand for SetHeader(HeaderRunID, id).
func (*Envelope) SetTenant ¶ added in v0.1.12
SetTenant is a typed shorthand for SetHeader(HeaderTenant, id).
func (Envelope) SpanContext ¶ added in v0.2.9
func (e Envelope) SpanContext() trace.SpanContext
SpanContext returns the OTel SpanContext encoded in this envelope's TraceID / SpanID fields, or an invalid SpanContext if the envelope was produced outside of a span (or the IDs are malformed).
The returned SpanContext has its Remote flag set so downstream code that uses trace.ContextWithRemoteSpanContext / [WithRemoteContext] produces the correct "follows-from-remote-process" semantics in the trace UI.
Use Envelope.WithRemoteContext when you want to attach the envelope's parent to a context for child span creation.
func (Envelope) WithRemoteContext ¶ added in v0.2.9
WithRemoteContext returns a child context whose parent span is the remote span carried by this envelope. Use it on the SUBSCRIBER side when starting a span that should appear as a child of the envelope-emitting span:
ctx := env.WithRemoteContext(ctx) ctx, span := tracer.Start(ctx, "consumer.handle") defer span.End()
If the envelope carries no usable parent (no TraceID, malformed IDs), the original context is returned unchanged so callers do not need a separate code path for "no parent" envelopes.
type MemoryBus ¶
type MemoryBus struct {
// contains filtered or unexported fields
}
MemoryBus is the in-process Bus implementation.
Concurrency model:
- subscribers map is guarded by mu;
- Publish takes RLock for the route-and-enqueue scan;
- Subscribe / Close / removeSub take the write lock;
- in-flight Publish calls are tracked by inflight (sync.WaitGroup) so Close can wait for them to drain before any subscriber channel is closed — this avoids the "send on closed channel" race that bit earlier per-subscription bus implementations.
Hot-path lock-freedom:
- Observer callbacks are deferred to a local slice and executed after the RLock has been released.
Route cache:
- routeCache memoises subject → []*memSub so a hot subject avoids the O(N) match loop. Guarded by routeCacheMu (a separate mutex from b.mu so cache writes don't compete with the publish RLock fan-out).
- Lock order when both are held: ALWAYS b.mu first, then routeCacheMu. Subscribe / Close / removeSub clear the cache while holding b.mu.Lock(); Publish reads/writes the cache while holding b.mu.RLock(). This ordering prevents a writer's clear from racing a publisher's lookup of stale entries.
func NewMemoryBus ¶
func NewMemoryBus(opts ...MemoryBusOption) *MemoryBus
NewMemoryBus constructs an in-process Bus.
func (*MemoryBus) Close ¶
Close performs a multi-phase shutdown:
- Take the write lock, mark closed, snapshot subscribers, release lock. New Publish / Subscribe calls now fail with ErrBusClosed.
- Wake every Block-backpressure publisher by closing each sub.done (signalClose). They will exit with a DropReasonClosed drop.
- Wait for every in-flight Publish to return — both publishers parked in Block sends and publishers in the middle of a non-blocking enqueue.
- Wait for every sub's per-sub senders wg to drain (defence in depth: any still-parked Block send must call senders.Done before we close the channel).
- Close every subscription channel.
Without step 2, step 3 (inflight.Wait) would deadlock against any Block publisher whose target channel is full and whose subscription has no remaining consumer.
func (*MemoryBus) Dropped ¶
Dropped returns the cumulative count of envelopes dropped due to DropNewest / DropOldest policies. Diagnostic only.
type MemoryBusOption ¶
type MemoryBusOption func(*MemoryBus)
MemoryBusOption configures a MemoryBus at construction time.
func WithObserver ¶ added in v0.1.12
func WithObserver(o Observer) MemoryBusOption
WithObserver attaches an Observer for lifecycle instrumentation.
Replaces the legacy WithDropCallback option. See Observer for the concurrency contract.
func WithRouteCacheSize ¶ added in v0.1.12
func WithRouteCacheSize(n int) MemoryBusOption
WithRouteCacheSize bounds the subject→subscribers route cache.
The route cache memoises the result of scanning every subscription for a given Subject so that hot subjects skip the per-publish O(N) match loop. Cache entries are invalidated whenever the subscription set changes (Subscribe / removeSub / Close).
Sizing:
- n > 0 : cap at n distinct subjects; on overflow the cache is wholesale cleared (poor-man's LRU — adequate when the working set is small or churns slowly, which matches every current SDK producer).
- n == 0 : disable the cache entirely; every Publish scans subscribers. Use when subjects are unique-per-publish (e.g. ID embedded in subject and never repeated).
- n < 0 : ignored; default applies.
Default is defaultRouteCacheSize.
type NoopBus ¶
type NoopBus struct{}
NoopBus is a Bus that discards every Publish and yields a closed channel for every Subscribe. Useful for tests and as a default value to keep nil-checks out of producer code.
func NewNoopBus ¶ added in v0.1.12
func NewNoopBus() NoopBus
NewNoopBus returns a NoopBus value (kept for symmetry with NewMemoryBus).
type Observer ¶ added in v0.1.12
type Observer interface {
// OnPublish fires once per Publish call, before any delivery decision.
OnPublish(env Envelope)
// OnDeliver fires after a successful enqueue into a subscription's
// channel.
OnDeliver(subID SubscriptionID, env Envelope)
// OnDrop fires when an envelope is discarded for a specific
// subscription, identifying the reason.
OnDrop(subID SubscriptionID, env Envelope, reason DropReason)
}
Observer is the bus-level instrumentation hook. It carries the OnPublish / OnDeliver / OnDrop lifecycle for every envelope, giving callers a single place to wire metrics / tracing / drop accounting.
Concurrency contract — every Bus implementation must guarantee:
- Observer methods are invoked outside any bus-internal lock.
- Implementations must return promptly. They MUST NOT call back into Bus methods (Publish / Subscribe / Close); doing so risks deadlock and the bus is allowed to detect such calls and panic in debug builds.
- Observer methods may be invoked concurrently from multiple goroutines; implementations must be safe for concurrent use.
Ordering:
- For a single Publish call, OnPublish is invoked exactly once, followed by zero or more OnDeliver / OnDrop calls (one per matching subscription).
- The relative order of OnDeliver / OnDrop calls within one Publish reflects the bus's internal subscription scan and is therefore implementation-defined. MemoryBus, for example, memoises the match result per Subject so a hot subject sees a stable callback order across publishes — observers MUST NOT rely on that order being either stable or random.
Observer is intentionally an interface (not a func) so a single hook can correlate the three lifecycle moments without ad-hoc closures.
type Pattern ¶ added in v0.1.12
type Pattern string
Pattern is a Subject matcher using NATS-style wildcards:
- matches exactly one segment > matches one or more trailing segments (must be the last segment)
Examples:
graph.run.r1.> every event for run r1 graph.run.*.node.*.start every node start across runs kanban.> every kanban event
Pattern matching is case-sensitive.
func (Pattern) Matches ¶ added in v0.1.12
Matches reports whether s satisfies pattern p.
Matching is segment-wise:
- literal segments must compare byte-for-byte equal;
- '*' matches any single segment;
- '>' matches one or more trailing segments (must be the last pattern segment; Validate enforces this).
An empty pattern matches nothing. An empty subject matches nothing. Matches does not validate p; callers that accept untrusted input should call p.Validate() first (Bus implementations are required to).
Matches splits both p and s on every call. Hot paths inside the package (MemoryBus.Publish) use matchSegs directly with pre-split inputs to avoid the per-call allocations.
func (Pattern) Validate ¶ added in v0.1.12
Validate reports whether p is a well-formed pattern literal.
A pattern must:
- be non-empty;
- not exceed subjectMaxBytes;
- have no leading, trailing or consecutive '.' separators;
- have each '*' / '>' segment occupy a whole segment;
- have at most one '>' segment, and only as the last segment.
type SubOption ¶ added in v0.1.12
type SubOption func(*subOptions)
SubOption configures a Subscription at creation time.
func WithBackpressure ¶ added in v0.1.12
func WithBackpressure(p BackpressurePolicy) SubOption
WithBackpressure overrides the default DropNewest policy.
func WithBufferSize ¶
WithBufferSize sets the buffered channel capacity. Values <= 0 fall back to the default (64).
func WithPredicate ¶ added in v0.1.12
WithPredicate adds a secondary filter applied after pattern matching. Useful when subject routing alone cannot express the desired filter (e.g. multi-tenant header checks).
func WithSampleRate ¶ added in v0.2.5
WithSampleRate configures the per-envelope keep probability when the subscription uses BackpressurePolicy Sample. Values are clamped to [0.0, 1.0]; values <= 0 reject every envelope (DropReasonSampled), values >= 1 pass every envelope through to the buffer-full check.
Has no effect on subscriptions whose policy is not Sample, so it is safe to set unconditionally on a shared option set.
type Subject ¶ added in v0.1.12
type Subject string
Subject is a dot-delimited routing key, e.g.:
graph.run.r1.start graph.run.r1.node.n1.complete kanban.board.b1.update
Segments are separated by '.'. A segment must not contain '.', '*' or '>' and must not be empty.
type Subscription ¶
type Subscription interface {
// ID is unique within the originating bus instance.
ID() SubscriptionID
// C returns the envelope channel. The channel is closed exactly once,
// either when Close is invoked or when the parent bus is closed.
C() <-chan Envelope
// Close cancels the subscription. Idempotent.
Close() error
}
Subscription is an active receive-side handle.
type SubscriptionID ¶ added in v0.1.12
type SubscriptionID string
SubscriptionID identifies a subscription within a single Bus instance. It is opaque to consumers; bus implementations choose the format.