event

package
v0.2.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 27, 2026 License: MIT Imports: 10 Imported by: 0

Documentation

Overview

Package event provides a subject-routed publish/subscribe bus for Envelopes, plus an in-memory implementation suitable for single-process fan-out.

File layout:

bus.go         — Bus / Subscription / SubOption interface contract
                 plus NoopBus. This is the stable surface; everything
                 else in the package implements or supports it.
memory.go      — MemoryBus, the in-process Bus implementation.
envelope.go    — Envelope value type and well-known headers.
subject.go     — Subject / Pattern routing keys.
observer.go    — Observer instrumentation hook + BackpressurePolicy.

Index

Constants

View Source
const (
	HeaderRunID   = "run_id"
	HeaderNodeID  = "node_id"
	HeaderActorID = "actor_id"
	HeaderGraphID = "graph_id"
	HeaderTenant  = "tenant"
)

Well-known header keys. Consumers may add arbitrary headers; these constants exist to prevent key drift across producers.

Variables

View Source
var ErrBusClosed = errors.New("event: bus closed")

ErrBusClosed is returned by Publish / Subscribe after a Bus has been closed. Implementations must return this exact value (not a wrapped variant) so callers can compare with errors.Is.

View Source
var ErrInvalidPattern = errors.New("event: invalid pattern")

ErrInvalidPattern indicates a malformed Pattern literal.

View Source
var ErrInvalidSubject = errors.New("event: invalid subject")

ErrInvalidSubject indicates a malformed Subject literal.

Functions

This section is empty.

Types

type BackpressurePolicy added in v0.1.12

type BackpressurePolicy int

BackpressurePolicy controls what happens when a subscription's buffer is already full at Publish time.

const (
	// DropNewest drops the incoming envelope and keeps existing buffered
	// items. Default policy: prioritises older, presumably already-observed
	// events.
	DropNewest BackpressurePolicy = iota

	// DropOldest drops the oldest buffered envelope to make room for the
	// new one. Use when the latest state is more valuable than history.
	DropOldest

	// Block makes Publish wait until either the buffer has room or the
	// publishing context is cancelled. Use sparingly: a slow subscriber
	// will back-pressure every publisher.
	Block
)

func (BackpressurePolicy) String added in v0.1.12

func (p BackpressurePolicy) String() string

String returns a stable label for diagnostics.

type Bus added in v0.1.12

type Bus interface {
	// Publish delivers env to every matching subscription. Implementations
	// must:
	//   - fill env.ID and env.Time when zero;
	//   - return ErrBusClosed once Close has been invoked;
	//   - guarantee that, on return, the envelope has been enqueued for
	//     every matching subscription, or dropped according to that
	//     subscription's BackpressurePolicy.
	//
	// Cross-process implementations (added in later commits) may relax
	// the on-return delivery guarantee to "fire-and-forget"; that
	// relaxation must be documented at the implementation level.
	Publish(ctx context.Context, env Envelope) error

	// Subscribe creates a new subscription matching pattern. The returned
	// Subscription is closed automatically when ctx is cancelled.
	// pattern is validated and a malformed value returns an error.
	Subscribe(ctx context.Context, pattern Pattern, opts ...SubOption) (Subscription, error)

	// Close shuts down the bus. After Close returns, every subscription
	// channel will be closed and subsequent Publish / Subscribe calls
	// return ErrBusClosed. Close is idempotent.
	Close() error
}

Bus is a publish-subscribe channel for Envelopes routed by Subject / Pattern.

type DropReason added in v0.1.12

type DropReason int

DropReason explains why an envelope was discarded for a particular subscription.

const (
	// DropReasonBufferFull indicates the subscription buffer was full and
	// the active BackpressurePolicy chose to drop.
	DropReasonBufferFull DropReason = iota

	// DropReasonClosed indicates Publish raced with subscription close;
	// the envelope was not delivered.
	DropReasonClosed
)

func (DropReason) String added in v0.1.12

func (r DropReason) String() string

String returns a stable label for diagnostics.

type Envelope added in v0.1.12

type Envelope struct {
	ID      string            `json:"id"`
	Subject Subject           `json:"subject"`
	Time    time.Time         `json:"time"`
	Source  string            `json:"source,omitempty"`
	TraceID string            `json:"trace_id,omitempty"`
	SpanID  string            `json:"span_id,omitempty"`
	Headers map[string]string `json:"headers,omitempty"`
	Payload json.RawMessage   `json:"payload,omitempty"`
}

Envelope is the cross-process-friendly carrier for a single event.

Every field serialises cleanly to JSON. Payload is stored as json.RawMessage so:

  1. in-memory and remote bus implementations behave identically (bytes in, bytes out — no any-typed surprises);
  2. server-side persistence and SSE forwarding can avoid an extra round of marshal/unmarshal;
  3. consumers decide when to decode (and into what concrete type).

Use NewEnvelope / MustEnvelope to construct envelopes; use Decode to extract the payload into a typed Go value.

func MustEnvelope added in v0.1.12

func MustEnvelope(ctx context.Context, subject Subject, payload any) Envelope

MustEnvelope is the panic variant of NewEnvelope, intended for static initialisation paths where a marshalling failure indicates a programming bug rather than a runtime condition.

func NewEnvelope added in v0.1.12

func NewEnvelope(ctx context.Context, subject Subject, payload any) (Envelope, error)

NewEnvelope constructs an Envelope with ID and Time populated.

Behaviour:

  • if subject is empty, returns ErrInvalidSubject (Subject.Validate is called for early rejection of malformed routing keys);
  • if payload is nil, Payload stays nil (no "null" bytes);
  • if payload is already a json.RawMessage, it is reused verbatim — no re-encoding;
  • otherwise, payload is JSON-encoded;
  • if ctx carries an OTel span, TraceID / SpanID are filled from it so downstream subscribers can correlate envelopes back to the producing trace without separate plumbing.

func (Envelope) ActorID added in v0.1.12

func (e Envelope) ActorID() string

ActorID returns the value of the well-known actor_id header.

func (Envelope) Decode added in v0.1.12

func (e Envelope) Decode(out any) error

Decode unmarshals Payload into out. Returns nil (no-op) when Payload is empty so that callers can ignore decode for header-only events.

func (Envelope) GraphID added in v0.1.12

func (e Envelope) GraphID() string

GraphID returns the value of the well-known graph_id header.

func (Envelope) Header added in v0.1.12

func (e Envelope) Header(key string) string

Header returns the header value or "" when absent.

func (Envelope) NodeID added in v0.1.12

func (e Envelope) NodeID() string

NodeID returns the value of the well-known node_id header.

func (Envelope) RunID added in v0.1.12

func (e Envelope) RunID() string

RunID returns the value of the well-known run_id header.

func (*Envelope) SetActorID added in v0.1.12

func (e *Envelope) SetActorID(id string)

SetActorID is a typed shorthand for SetHeader(HeaderActorID, id).

func (*Envelope) SetGraphID added in v0.1.12

func (e *Envelope) SetGraphID(id string)

SetGraphID is a typed shorthand for SetHeader(HeaderGraphID, id).

func (*Envelope) SetHeader added in v0.1.12

func (e *Envelope) SetHeader(key, value string)

SetHeader sets a header value, allocating Headers lazily.

func (*Envelope) SetNodeID added in v0.1.12

func (e *Envelope) SetNodeID(id string)

SetNodeID is a typed shorthand for SetHeader(HeaderNodeID, id).

func (*Envelope) SetRunID added in v0.1.12

func (e *Envelope) SetRunID(id string)

SetRunID is a typed shorthand for SetHeader(HeaderRunID, id).

func (*Envelope) SetTenant added in v0.1.12

func (e *Envelope) SetTenant(id string)

SetTenant is a typed shorthand for SetHeader(HeaderTenant, id).

func (Envelope) Tenant added in v0.1.12

func (e Envelope) Tenant() string

Tenant returns the value of the well-known tenant header.

type MemoryBus

type MemoryBus struct {
	// contains filtered or unexported fields
}

MemoryBus is the in-process Bus implementation.

Concurrency model:

  • subscribers map is guarded by mu;
  • Publish takes RLock for the route-and-enqueue scan;
  • Subscribe / Close / removeSub take the write lock;
  • in-flight Publish calls are tracked by inflight (sync.WaitGroup) so Close can wait for them to drain before any subscriber channel is closed — this avoids the "send on closed channel" race that bit earlier per-subscription bus implementations.

Hot-path lock-freedom:

  • Observer callbacks are deferred to a local slice and executed after the RLock has been released.

Route cache:

  • routeCache memoises subject → []*memSub so a hot subject avoids the O(N) match loop. Guarded by routeCacheMu (a separate mutex from b.mu so cache writes don't compete with the publish RLock fan-out).
  • Lock order when both are held: ALWAYS b.mu first, then routeCacheMu. Subscribe / Close / removeSub clear the cache while holding b.mu.Lock(); Publish reads/writes the cache while holding b.mu.RLock(). This ordering prevents a writer's clear from racing a publisher's lookup of stale entries.

func NewMemoryBus

func NewMemoryBus(opts ...MemoryBusOption) *MemoryBus

NewMemoryBus constructs an in-process Bus.

func (*MemoryBus) Close

func (b *MemoryBus) Close() error

Close performs a multi-phase shutdown:

  1. Take the write lock, mark closed, snapshot subscribers, release lock. New Publish / Subscribe calls now fail with ErrBusClosed.
  2. Wake every Block-backpressure publisher by closing each sub.done (signalClose). They will exit with a DropReasonClosed drop.
  3. Wait for every in-flight Publish to return — both publishers parked in Block sends and publishers in the middle of a non-blocking enqueue.
  4. Wait for every sub's per-sub senders wg to drain (defence in depth: any still-parked Block send must call senders.Done before we close the channel).
  5. Close every subscription channel.

Without step 2, step 3 (inflight.Wait) would deadlock against any Block publisher whose target channel is full and whose subscription has no remaining consumer.

func (*MemoryBus) Dropped

func (b *MemoryBus) Dropped() int64

Dropped returns the cumulative count of envelopes dropped due to DropNewest / DropOldest policies. Diagnostic only.

func (*MemoryBus) Publish

func (b *MemoryBus) Publish(ctx context.Context, env Envelope) error

Publish routes env to every matching subscription according to each subscription's BackpressurePolicy.

func (*MemoryBus) Subscribe

func (b *MemoryBus) Subscribe(ctx context.Context, pattern Pattern, opts ...SubOption) (Subscription, error)

Subscribe creates a subscription. pattern is validated; ctx cancellation triggers Close.

type MemoryBusOption

type MemoryBusOption func(*MemoryBus)

MemoryBusOption configures a MemoryBus at construction time.

func WithObserver added in v0.1.12

func WithObserver(o Observer) MemoryBusOption

WithObserver attaches an Observer for lifecycle instrumentation.

Replaces the legacy WithDropCallback option. See Observer for the concurrency contract.

func WithRouteCacheSize added in v0.1.12

func WithRouteCacheSize(n int) MemoryBusOption

WithRouteCacheSize bounds the subject→subscribers route cache.

The route cache memoises the result of scanning every subscription for a given Subject so that hot subjects skip the per-publish O(N) match loop. Cache entries are invalidated whenever the subscription set changes (Subscribe / removeSub / Close).

Sizing:

  • n > 0 : cap at n distinct subjects; on overflow the cache is wholesale cleared (poor-man's LRU — adequate when the working set is small or churns slowly, which matches every current SDK producer).
  • n == 0 : disable the cache entirely; every Publish scans subscribers. Use when subjects are unique-per-publish (e.g. ID embedded in subject and never repeated).
  • n < 0 : ignored; default applies.

Default is defaultRouteCacheSize.

type NoopBus

type NoopBus struct{}

NoopBus is a Bus that discards every Publish and yields a closed channel for every Subscribe. Useful for tests and as a default value to keep nil-checks out of producer code.

func NewNoopBus added in v0.1.12

func NewNoopBus() NoopBus

NewNoopBus returns a NoopBus value (kept for symmetry with NewMemoryBus).

func (NoopBus) Close

func (NoopBus) Close() error

func (NoopBus) Publish

func (NoopBus) Publish(context.Context, Envelope) error

func (NoopBus) Subscribe

type Observer added in v0.1.12

type Observer interface {
	// OnPublish fires once per Publish call, before any delivery decision.
	OnPublish(env Envelope)
	// OnDeliver fires after a successful enqueue into a subscription's
	// channel.
	OnDeliver(subID SubscriptionID, env Envelope)
	// OnDrop fires when an envelope is discarded for a specific
	// subscription, identifying the reason.
	OnDrop(subID SubscriptionID, env Envelope, reason DropReason)
}

Observer is the bus-level instrumentation hook. It carries the OnPublish / OnDeliver / OnDrop lifecycle for every envelope, giving callers a single place to wire metrics / tracing / drop accounting.

Concurrency contract — every Bus implementation must guarantee:

  • Observer methods are invoked outside any bus-internal lock.
  • Implementations must return promptly. They MUST NOT call back into Bus methods (Publish / Subscribe / Close); doing so risks deadlock and the bus is allowed to detect such calls and panic in debug builds.
  • Observer methods may be invoked concurrently from multiple goroutines; implementations must be safe for concurrent use.

Ordering:

  • For a single Publish call, OnPublish is invoked exactly once, followed by zero or more OnDeliver / OnDrop calls (one per matching subscription).
  • The relative order of OnDeliver / OnDrop calls within one Publish reflects the bus's internal subscription scan and is therefore implementation-defined. MemoryBus, for example, memoises the match result per Subject so a hot subject sees a stable callback order across publishes — observers MUST NOT rely on that order being either stable or random.

Observer is intentionally an interface (not a func) so a single hook can correlate the three lifecycle moments without ad-hoc closures.

type Pattern added in v0.1.12

type Pattern string

Pattern is a Subject matcher using NATS-style wildcards:

  • matches exactly one segment > matches one or more trailing segments (must be the last segment)

Examples:

graph.run.r1.>             every event for run r1
graph.run.*.node.*.start   every node start across runs
kanban.>                   every kanban event

Pattern matching is case-sensitive.

func (Pattern) Matches added in v0.1.12

func (p Pattern) Matches(s Subject) bool

Matches reports whether s satisfies pattern p.

Matching is segment-wise:

  • literal segments must compare byte-for-byte equal;
  • '*' matches any single segment;
  • '>' matches one or more trailing segments (must be the last pattern segment; Validate enforces this).

An empty pattern matches nothing. An empty subject matches nothing. Matches does not validate p; callers that accept untrusted input should call p.Validate() first (Bus implementations are required to).

Matches splits both p and s on every call. Hot paths inside the package (MemoryBus.Publish) use matchSegs directly with pre-split inputs to avoid the per-call allocations.

func (Pattern) Validate added in v0.1.12

func (p Pattern) Validate() error

Validate reports whether p is a well-formed pattern literal.

A pattern must:

  • be non-empty;
  • not exceed subjectMaxBytes;
  • have no leading, trailing or consecutive '.' separators;
  • have each '*' / '>' segment occupy a whole segment;
  • have at most one '>' segment, and only as the last segment.

type SubOption added in v0.1.12

type SubOption func(*subOptions)

SubOption configures a Subscription at creation time.

func WithBackpressure added in v0.1.12

func WithBackpressure(p BackpressurePolicy) SubOption

WithBackpressure overrides the default DropNewest policy.

func WithBufferSize

func WithBufferSize(n int) SubOption

WithBufferSize sets the buffered channel capacity. Values <= 0 fall back to the default (64).

func WithPredicate added in v0.1.12

func WithPredicate(fn func(Envelope) bool) SubOption

WithPredicate adds a secondary filter applied after pattern matching. Useful when subject routing alone cannot express the desired filter (e.g. multi-tenant header checks).

type Subject added in v0.1.12

type Subject string

Subject is a dot-delimited routing key, e.g.:

graph.run.r1.start
graph.run.r1.node.n1.complete
kanban.board.b1.update

Segments are separated by '.'. A segment must not contain '.', '*' or '>' and must not be empty.

func (Subject) Validate added in v0.1.12

func (s Subject) Validate() error

Validate reports whether s is a well-formed subject literal.

A subject must:

  • be non-empty;
  • not exceed subjectMaxBytes;
  • have no leading, trailing or consecutive '.' separators;
  • have no segment containing '*' or '>' (those are pattern-only).

type Subscription

type Subscription interface {
	// ID is unique within the originating bus instance.
	ID() SubscriptionID
	// C returns the envelope channel. The channel is closed exactly once,
	// either when Close is invoked or when the parent bus is closed.
	C() <-chan Envelope
	// Close cancels the subscription. Idempotent.
	Close() error
}

Subscription is an active receive-side handle.

type SubscriptionID added in v0.1.12

type SubscriptionID string

SubscriptionID identifies a subscription within a single Bus instance. It is opaque to consumers; bus implementations choose the format.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL