Documentation
¶
Overview ¶
Package event provides a subject-routed publish/subscribe bus for Envelopes, plus an in-memory implementation suitable for single-process fan-out.
File layout:
bus.go — Bus / Subscription / SubOption interface contract
plus NoopBus. This is the stable surface; everything
else in the package implements or supports it.
memory.go — MemoryBus, the in-process Bus implementation.
envelope.go — Envelope value type and well-known headers.
subject.go — Subject / Pattern routing keys.
observer.go — Observer instrumentation hook + BackpressurePolicy.
Index ¶
- Constants
- Variables
- type BackpressurePolicy
- type Bus
- type DropReason
- type Envelope
- func (e Envelope) ActorID() string
- func (e Envelope) Decode(out any) error
- func (e Envelope) GraphID() string
- func (e Envelope) Header(key string) string
- func (e Envelope) NodeID() string
- func (e Envelope) RunID() string
- func (e *Envelope) SetActorID(id string)
- func (e *Envelope) SetGraphID(id string)
- func (e *Envelope) SetHeader(key, value string)
- func (e *Envelope) SetNodeID(id string)
- func (e *Envelope) SetRunID(id string)
- func (e *Envelope) SetTenant(id string)
- func (e Envelope) Tenant() string
- type MemoryBus
- type MemoryBusOption
- type NoopBus
- type Observer
- type Pattern
- type SubOption
- type Subject
- type Subscription
- type SubscriptionID
Constants ¶
const ( HeaderRunID = "run_id" HeaderNodeID = "node_id" HeaderActorID = "actor_id" HeaderGraphID = "graph_id" HeaderTenant = "tenant" )
Well-known header keys. Consumers may add arbitrary headers; these constants exist to prevent key drift across producers.
Variables ¶
var ErrBusClosed = errors.New("event: bus closed")
ErrBusClosed is returned by Publish / Subscribe after a Bus has been closed. Implementations must return this exact value (not a wrapped variant) so callers can compare with errors.Is.
var ErrInvalidPattern = errors.New("event: invalid pattern")
ErrInvalidPattern indicates a malformed Pattern literal.
var ErrInvalidSubject = errors.New("event: invalid subject")
ErrInvalidSubject indicates a malformed Subject literal.
Functions ¶
This section is empty.
Types ¶
type BackpressurePolicy ¶ added in v0.1.12
type BackpressurePolicy int
BackpressurePolicy controls what happens when a subscription's buffer is already full at Publish time.
const ( // DropNewest drops the incoming envelope and keeps existing buffered // items. Default policy: prioritises older, presumably already-observed // events. DropNewest BackpressurePolicy = iota // DropOldest drops the oldest buffered envelope to make room for the // new one. Use when the latest state is more valuable than history. DropOldest // Block makes Publish wait until either the buffer has room or the // publishing context is cancelled. Use sparingly: a slow subscriber // will back-pressure every publisher. Block )
func (BackpressurePolicy) String ¶ added in v0.1.12
func (p BackpressurePolicy) String() string
String returns a stable label for diagnostics.
type Bus ¶ added in v0.1.12
type Bus interface {
// Publish delivers env to every matching subscription. Implementations
// must:
// - fill env.ID and env.Time when zero;
// - return ErrBusClosed once Close has been invoked;
// - guarantee that, on return, the envelope has been enqueued for
// every matching subscription, or dropped according to that
// subscription's BackpressurePolicy.
//
// Cross-process implementations (added in later commits) may relax
// the on-return delivery guarantee to "fire-and-forget"; that
// relaxation must be documented at the implementation level.
Publish(ctx context.Context, env Envelope) error
// Subscribe creates a new subscription matching pattern. The returned
// Subscription is closed automatically when ctx is cancelled.
// pattern is validated and a malformed value returns an error.
Subscribe(ctx context.Context, pattern Pattern, opts ...SubOption) (Subscription, error)
// Close shuts down the bus. After Close returns, every subscription
// channel will be closed and subsequent Publish / Subscribe calls
// return ErrBusClosed. Close is idempotent.
Close() error
}
Bus is a publish-subscribe channel for Envelopes routed by Subject / Pattern.
type DropReason ¶ added in v0.1.12
type DropReason int
DropReason explains why an envelope was discarded for a particular subscription.
const ( // DropReasonBufferFull indicates the subscription buffer was full and // the active BackpressurePolicy chose to drop. DropReasonBufferFull DropReason = iota // DropReasonClosed indicates Publish raced with subscription close; // the envelope was not delivered. DropReasonClosed )
func (DropReason) String ¶ added in v0.1.12
func (r DropReason) String() string
String returns a stable label for diagnostics.
type Envelope ¶ added in v0.1.12
type Envelope struct {
ID string `json:"id"`
Subject Subject `json:"subject"`
Time time.Time `json:"time"`
Source string `json:"source,omitempty"`
TraceID string `json:"trace_id,omitempty"`
SpanID string `json:"span_id,omitempty"`
Headers map[string]string `json:"headers,omitempty"`
Payload json.RawMessage `json:"payload,omitempty"`
}
Envelope is the cross-process-friendly carrier for a single event.
Every field serialises cleanly to JSON. Payload is stored as json.RawMessage so:
- in-memory and remote bus implementations behave identically (bytes in, bytes out — no any-typed surprises);
- server-side persistence and SSE forwarding can avoid an extra round of marshal/unmarshal;
- consumers decide when to decode (and into what concrete type).
Use NewEnvelope / MustEnvelope to construct envelopes; use Decode to extract the payload into a typed Go value.
func MustEnvelope ¶ added in v0.1.12
MustEnvelope is the panic variant of NewEnvelope, intended for static initialisation paths where a marshalling failure indicates a programming bug rather than a runtime condition.
func NewEnvelope ¶ added in v0.1.12
NewEnvelope constructs an Envelope with ID and Time populated.
Behaviour:
- if subject is empty, returns ErrInvalidSubject (Subject.Validate is called for early rejection of malformed routing keys);
- if payload is nil, Payload stays nil (no "null" bytes);
- if payload is already a json.RawMessage, it is reused verbatim — no re-encoding;
- otherwise, payload is JSON-encoded;
- if ctx carries an OTel span, TraceID / SpanID are filled from it so downstream subscribers can correlate envelopes back to the producing trace without separate plumbing.
func (Envelope) ActorID ¶ added in v0.1.12
ActorID returns the value of the well-known actor_id header.
func (Envelope) Decode ¶ added in v0.1.12
Decode unmarshals Payload into out. Returns nil (no-op) when Payload is empty so that callers can ignore decode for header-only events.
func (Envelope) GraphID ¶ added in v0.1.12
GraphID returns the value of the well-known graph_id header.
func (Envelope) NodeID ¶ added in v0.1.12
NodeID returns the value of the well-known node_id header.
func (*Envelope) SetActorID ¶ added in v0.1.12
SetActorID is a typed shorthand for SetHeader(HeaderActorID, id).
func (*Envelope) SetGraphID ¶ added in v0.1.12
SetGraphID is a typed shorthand for SetHeader(HeaderGraphID, id).
func (*Envelope) SetHeader ¶ added in v0.1.12
SetHeader sets a header value, allocating Headers lazily.
func (*Envelope) SetNodeID ¶ added in v0.1.12
SetNodeID is a typed shorthand for SetHeader(HeaderNodeID, id).
func (*Envelope) SetRunID ¶ added in v0.1.12
SetRunID is a typed shorthand for SetHeader(HeaderRunID, id).
type MemoryBus ¶
type MemoryBus struct {
// contains filtered or unexported fields
}
MemoryBus is the in-process Bus implementation.
Concurrency model:
- subscribers map is guarded by mu;
- Publish takes RLock for the route-and-enqueue scan;
- Subscribe / Close / removeSub take the write lock;
- in-flight Publish calls are tracked by inflight (sync.WaitGroup) so Close can wait for them to drain before any subscriber channel is closed — this avoids the "send on closed channel" race that bit earlier per-subscription bus implementations.
Hot-path lock-freedom:
- Observer callbacks are deferred to a local slice and executed after the RLock has been released.
Route cache:
- routeCache memoises subject → []*memSub so a hot subject avoids the O(N) match loop. Guarded by routeCacheMu (a separate mutex from b.mu so cache writes don't compete with the publish RLock fan-out).
- Lock order when both are held: ALWAYS b.mu first, then routeCacheMu. Subscribe / Close / removeSub clear the cache while holding b.mu.Lock(); Publish reads/writes the cache while holding b.mu.RLock(). This ordering prevents a writer's clear from racing a publisher's lookup of stale entries.
func NewMemoryBus ¶
func NewMemoryBus(opts ...MemoryBusOption) *MemoryBus
NewMemoryBus constructs an in-process Bus.
func (*MemoryBus) Close ¶
Close performs a multi-phase shutdown:
- Take the write lock, mark closed, snapshot subscribers, release lock. New Publish / Subscribe calls now fail with ErrBusClosed.
- Wake every Block-backpressure publisher by closing each sub.done (signalClose). They will exit with a DropReasonClosed drop.
- Wait for every in-flight Publish to return — both publishers parked in Block sends and publishers in the middle of a non-blocking enqueue.
- Wait for every sub's per-sub senders wg to drain (defence in depth: any still-parked Block send must call senders.Done before we close the channel).
- Close every subscription channel.
Without step 2, step 3 (inflight.Wait) would deadlock against any Block publisher whose target channel is full and whose subscription has no remaining consumer.
func (*MemoryBus) Dropped ¶
Dropped returns the cumulative count of envelopes dropped due to DropNewest / DropOldest policies. Diagnostic only.
type MemoryBusOption ¶
type MemoryBusOption func(*MemoryBus)
MemoryBusOption configures a MemoryBus at construction time.
func WithObserver ¶ added in v0.1.12
func WithObserver(o Observer) MemoryBusOption
WithObserver attaches an Observer for lifecycle instrumentation.
Replaces the legacy WithDropCallback option. See Observer for the concurrency contract.
func WithRouteCacheSize ¶ added in v0.1.12
func WithRouteCacheSize(n int) MemoryBusOption
WithRouteCacheSize bounds the subject→subscribers route cache.
The route cache memoises the result of scanning every subscription for a given Subject so that hot subjects skip the per-publish O(N) match loop. Cache entries are invalidated whenever the subscription set changes (Subscribe / removeSub / Close).
Sizing:
- n > 0 : cap at n distinct subjects; on overflow the cache is wholesale cleared (poor-man's LRU — adequate when the working set is small or churns slowly, which matches every current SDK producer).
- n == 0 : disable the cache entirely; every Publish scans subscribers. Use when subjects are unique-per-publish (e.g. ID embedded in subject and never repeated).
- n < 0 : ignored; default applies.
Default is defaultRouteCacheSize.
type NoopBus ¶
type NoopBus struct{}
NoopBus is a Bus that discards every Publish and yields a closed channel for every Subscribe. Useful for tests and as a default value to keep nil-checks out of producer code.
func NewNoopBus ¶ added in v0.1.12
func NewNoopBus() NoopBus
NewNoopBus returns a NoopBus value (kept for symmetry with NewMemoryBus).
type Observer ¶ added in v0.1.12
type Observer interface {
// OnPublish fires once per Publish call, before any delivery decision.
OnPublish(env Envelope)
// OnDeliver fires after a successful enqueue into a subscription's
// channel.
OnDeliver(subID SubscriptionID, env Envelope)
// OnDrop fires when an envelope is discarded for a specific
// subscription, identifying the reason.
OnDrop(subID SubscriptionID, env Envelope, reason DropReason)
}
Observer is the bus-level instrumentation hook. It carries the OnPublish / OnDeliver / OnDrop lifecycle for every envelope, giving callers a single place to wire metrics / tracing / drop accounting.
Concurrency contract — every Bus implementation must guarantee:
- Observer methods are invoked outside any bus-internal lock.
- Implementations must return promptly. They MUST NOT call back into Bus methods (Publish / Subscribe / Close); doing so risks deadlock and the bus is allowed to detect such calls and panic in debug builds.
- Observer methods may be invoked concurrently from multiple goroutines; implementations must be safe for concurrent use.
Ordering:
- For a single Publish call, OnPublish is invoked exactly once, followed by zero or more OnDeliver / OnDrop calls (one per matching subscription).
- The relative order of OnDeliver / OnDrop calls within one Publish reflects the bus's internal subscription scan and is therefore implementation-defined. MemoryBus, for example, memoises the match result per Subject so a hot subject sees a stable callback order across publishes — observers MUST NOT rely on that order being either stable or random.
Observer is intentionally an interface (not a func) so a single hook can correlate the three lifecycle moments without ad-hoc closures.
type Pattern ¶ added in v0.1.12
type Pattern string
Pattern is a Subject matcher using NATS-style wildcards:
- matches exactly one segment > matches one or more trailing segments (must be the last segment)
Examples:
graph.run.r1.> every event for run r1 graph.run.*.node.*.start every node start across runs kanban.> every kanban event
Pattern matching is case-sensitive.
func (Pattern) Matches ¶ added in v0.1.12
Matches reports whether s satisfies pattern p.
Matching is segment-wise:
- literal segments must compare byte-for-byte equal;
- '*' matches any single segment;
- '>' matches one or more trailing segments (must be the last pattern segment; Validate enforces this).
An empty pattern matches nothing. An empty subject matches nothing. Matches does not validate p; callers that accept untrusted input should call p.Validate() first (Bus implementations are required to).
Matches splits both p and s on every call. Hot paths inside the package (MemoryBus.Publish) use matchSegs directly with pre-split inputs to avoid the per-call allocations.
func (Pattern) Validate ¶ added in v0.1.12
Validate reports whether p is a well-formed pattern literal.
A pattern must:
- be non-empty;
- not exceed subjectMaxBytes;
- have no leading, trailing or consecutive '.' separators;
- have each '*' / '>' segment occupy a whole segment;
- have at most one '>' segment, and only as the last segment.
type SubOption ¶ added in v0.1.12
type SubOption func(*subOptions)
SubOption configures a Subscription at creation time.
func WithBackpressure ¶ added in v0.1.12
func WithBackpressure(p BackpressurePolicy) SubOption
WithBackpressure overrides the default DropNewest policy.
func WithBufferSize ¶
WithBufferSize sets the buffered channel capacity. Values <= 0 fall back to the default (64).
func WithPredicate ¶ added in v0.1.12
WithPredicate adds a secondary filter applied after pattern matching. Useful when subject routing alone cannot express the desired filter (e.g. multi-tenant header checks).
type Subject ¶ added in v0.1.12
type Subject string
Subject is a dot-delimited routing key, e.g.:
graph.run.r1.start graph.run.r1.node.n1.complete kanban.board.b1.update
Segments are separated by '.'. A segment must not contain '.', '*' or '>' and must not be empty.
type Subscription ¶
type Subscription interface {
// ID is unique within the originating bus instance.
ID() SubscriptionID
// C returns the envelope channel. The channel is closed exactly once,
// either when Close is invoked or when the parent bus is closed.
C() <-chan Envelope
// Close cancels the subscription. Idempotent.
Close() error
}
Subscription is an active receive-side handle.
type SubscriptionID ¶ added in v0.1.12
type SubscriptionID string
SubscriptionID identifies a subscription within a single Bus instance. It is opaque to consumers; bus implementations choose the format.