event

package
v0.3.13 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 14, 2026 License: MIT Imports: 12 Imported by: 0

Documentation

Overview

Package event provides a subject-routed publish/subscribe bus for Envelopes, plus an in-memory implementation suitable for single-process fan-out.

File layout:

bus.go         — Bus / Subscription / SubOption interface contract
                 plus NoopBus. This is the stable surface; everything
                 else in the package implements or supports it.
memory.go      — MemoryBus, the in-process Bus implementation.
envelope.go    — Envelope value type and well-known headers.
subject.go     — Subject / Pattern routing keys.
observer.go    — Observer instrumentation hook + BackpressurePolicy.

Index

Constants

View Source
const (
	HeaderRunID  = "run_id"
	HeaderNodeID = "node_id"

	// HeaderAgentID identifies the agent (sdk/agent.Agent.ID) that
	// produced this envelope — i.e. the executor identity in
	// engine-neutral terms. The producer end is sdk/agent.Run, which
	// promotes Agent.ID into engine.Run.Attributes under
	// telemetry.AttrAgentID; engines forward that into the envelope
	// header via SetAgentID. Consumers that need to fan-in / filter
	// by "which agent did this" subscribe on this dimension.
	HeaderAgentID = "agent_id"

	// HeaderActorID is the legacy spelling of HeaderAgentID. It
	// exists for back-compat with envelopes produced by pre-v0.4
	// SDK consumers; SetAgentID dual-writes both header keys until
	// v0.5.0 so existing observers keep working without a coordinated
	// flag day.
	//
	// Deprecated: use HeaderAgentID. This constant and the matching
	// SetActorID / ActorID accessors are removed in v0.5.0.
	HeaderActorID = "actor_id"

	HeaderGraphID = "graph_id"
	HeaderTenant  = "tenant"

	// HeaderKanbanScopeID identifies the sdk/kanban Board scope
	// (Board.ScopeID) that produced an envelope. Distinct from
	// HeaderRunID — kanban events do not happen inside an engine run
	// — so consumers that want to fan-in by board need a dedicated
	// dimension. Stored as a typed header (mirroring HeaderRunID /
	// HeaderGraphID) rather than crammed into Envelope.Source so it
	// composes with the existing well-known-header convention.
	HeaderKanbanScopeID = "kanban_scope_id"
)

Well-known header keys. Consumers may add arbitrary headers; these constants exist to prevent key drift across producers.

Variables

View Source
var ErrBusClosed = errors.New("event: bus closed")

ErrBusClosed is returned by Publish / Subscribe after a Bus has been closed. Implementations must return this exact value (not a wrapped variant) so callers can compare with errors.Is.

View Source
var ErrInvalidPattern = errors.New("event: invalid pattern")

ErrInvalidPattern indicates a malformed Pattern literal.

View Source
var ErrInvalidSubject = errors.New("event: invalid subject")

ErrInvalidSubject indicates a malformed Subject literal.

Functions

func ProducerKind added in v0.2.5

func ProducerKind(subj Subject) string

ProducerKind returns the leading segment of subj — by SDK convention this identifies the producer ("engine", "kanban", and future "pod" / "agent" / "history"). Returns "" when subj is empty or has no separator.

This is the canonical way to discriminate producer kind in a fan-out subscription that matches multiple producers (e.g. ">"). It avoids inventing a parallel naming system on Envelope.Source: the Subject already encodes the producer, and pattern matching (`engine.>` / `kanban.>`) already does the corresponding subscribe- time filter without any helper.

Types

type AckSubscription added in v0.2.5

type AckSubscription interface {
	Subscription

	// Ack acknowledges that envelopeID has been successfully processed.
	// See the AckSubscription doc comment for the per-call contract.
	Ack(envelopeID string) error
}

AckSubscription is the optional capability a Subscription may expose when its parent Bus implements at-least-once delivery (typically cross-process buses backed by NATS / Redis Streams / Kafka). In-process implementations such as MemoryBus do not implement this interface; consumers should type-assert and treat the absence as "auto-ack".

Semantics (v1, intentionally minimal):

  • Ack confirms successful processing of the envelope identified by envelopeID. Implementations MUST tolerate duplicate Ack calls and return nil on the second invocation.
  • Ack on an unknown envelopeID returns nil (defensive — replays and restarts can legitimately surface envelopes outside the current subscription's tracked window).
  • Negative-acknowledge / explicit redelivery is intentionally NOT part of v1. Implementations that need it should expose an additional, named interface (e.g. NackSubscription) so callers can opt in without breaking the minimal contract.

Bus implementations without persistent delivery (MemoryBus, NoopBus) MUST NOT implement this interface; their consumers therefore take the auto-ack branch automatically.

type BackpressurePolicy added in v0.1.12

type BackpressurePolicy int

BackpressurePolicy controls what happens when a subscription's buffer is already full at Publish time.

const (
	// DropNewest drops the incoming envelope and keeps existing buffered
	// items. Default policy: prioritises older, presumably already-observed
	// events.
	DropNewest BackpressurePolicy = iota

	// DropOldest drops the oldest buffered envelope to make room for the
	// new one. Use when the latest state is more valuable than history.
	DropOldest

	// Block makes Publish wait until either the buffer has room or the
	// publishing context is cancelled. Use sparingly: a slow subscriber
	// will back-pressure every publisher.
	Block

	// Sample probabilistically forwards each envelope according to the
	// per-subscription rate set with WithSampleRate (default 1.0 =
	// pass-through). Envelopes that pass the sampling gate are then
	// enqueued non-blockingly; if the buffer is full at that point the
	// envelope is dropped with reason DropReasonBufferFull. Envelopes
	// rejected by the sampling gate itself are dropped with reason
	// DropReasonSampled.
	//
	// Use this for high-volume streams (token deltas, voice frames,
	// progress beacons) where exact delivery is unnecessary and the
	// alternatives have undesirable side effects: DropNewest /
	// DropOldest only kick in once the buffer fills (so a fast
	// subscriber sees every event and a slow one loses bursts at
	// random), while Block back-pressures the producer. Sample yields
	// a bounded, predictable delivery ratio independent of subscriber
	// speed.
	Sample
)

func (BackpressurePolicy) String added in v0.1.12

func (p BackpressurePolicy) String() string

String returns a stable label for diagnostics.

type Bus added in v0.1.12

type Bus interface {
	// Publish delivers env to every matching subscription. Implementations
	// must:
	//   - fill env.ID and env.Time when zero;
	//   - return ErrBusClosed once Close has been invoked;
	//   - guarantee that, on return, the envelope has been enqueued for
	//     every matching subscription, or dropped according to that
	//     subscription's BackpressurePolicy.
	//
	// Cross-process implementations (added in later commits) may relax
	// the on-return delivery guarantee to "fire-and-forget"; that
	// relaxation must be documented at the implementation level.
	Publish(ctx context.Context, env Envelope) error

	// Subscribe creates a new subscription matching pattern. The returned
	// Subscription is closed automatically when ctx is cancelled.
	// pattern is validated and a malformed value returns an error.
	Subscribe(ctx context.Context, pattern Pattern, opts ...SubOption) (Subscription, error)

	// Close shuts down the bus. After Close returns, every subscription
	// channel will be closed and subsequent Publish / Subscribe calls
	// return ErrBusClosed. Close is idempotent.
	Close() error
}

Bus is a publish-subscribe channel for Envelopes routed by Subject / Pattern.

type DropReason added in v0.1.12

type DropReason int

DropReason explains why an envelope was discarded for a particular subscription.

const (
	// DropReasonBufferFull indicates the subscription buffer was full and
	// the active BackpressurePolicy chose to drop.
	DropReasonBufferFull DropReason = iota

	// DropReasonClosed indicates Publish raced with subscription close;
	// the envelope was not delivered.
	DropReasonClosed

	// DropReasonSampled indicates the subscription's Sample
	// backpressure policy probabilistically rejected this envelope.
	// Distinct from DropReasonBufferFull so dashboards can
	// differentiate "we voluntarily threw it away" from "the consumer
	// could not keep up".
	DropReasonSampled
)

func (DropReason) String added in v0.1.12

func (r DropReason) String() string

String returns a stable label for diagnostics.

type Envelope added in v0.1.12

type Envelope struct {
	ID      string            `json:"id"`
	Subject Subject           `json:"subject"`
	Time    time.Time         `json:"time"`
	Source  string            `json:"source,omitempty"`
	TraceID string            `json:"trace_id,omitempty"`
	SpanID  string            `json:"span_id,omitempty"`
	Headers map[string]string `json:"headers,omitempty"`
	Payload json.RawMessage   `json:"payload,omitempty"`
}

Envelope is the cross-process-friendly carrier for a single event.

Every field serialises cleanly to JSON. Payload is stored as json.RawMessage so:

  1. in-memory and remote bus implementations behave identically (bytes in, bytes out — no any-typed surprises);
  2. server-side persistence and SSE forwarding can avoid an extra round of marshal/unmarshal;
  3. consumers decide when to decode (and into what concrete type).

Use NewEnvelope / MustEnvelope to construct envelopes; use Decode to extract the payload into a typed Go value.

func MustEnvelope added in v0.1.12

func MustEnvelope(ctx context.Context, subject Subject, payload any) Envelope

MustEnvelope is the panic variant of NewEnvelope, intended for static initialisation paths where a marshalling failure indicates a programming bug rather than a runtime condition.

func NewEnvelope added in v0.1.12

func NewEnvelope(ctx context.Context, subject Subject, payload any) (Envelope, error)

NewEnvelope constructs an Envelope with ID and Time populated.

Behaviour:

  • if subject is empty, returns ErrInvalidSubject (Subject.Validate is called for early rejection of malformed routing keys);
  • if payload is nil, Payload stays nil (no "null" bytes);
  • if payload is already a json.RawMessage, it is reused verbatim — no re-encoding;
  • otherwise, payload is JSON-encoded;
  • if ctx carries an OTel span, TraceID / SpanID are filled from it so downstream subscribers can correlate envelopes back to the producing trace without separate plumbing.

func (Envelope) ActorID deprecated added in v0.1.12

func (e Envelope) ActorID() string

ActorID is the legacy spelling of Envelope.AgentID.

Deprecated: use Envelope.AgentID. Removed in v0.5.0.

func (Envelope) AgentID added in v0.3.4

func (e Envelope) AgentID() string

AgentID returns the producer's agent identifier. It prefers HeaderAgentID (the post-v0.4 canonical key) and falls back to the legacy HeaderActorID so envelopes produced by older SDK versions still resolve correctly. After v0.5.0 only HeaderAgentID is read.

func (Envelope) Decode added in v0.1.12

func (e Envelope) Decode(out any) error

Decode unmarshals Payload into out. Returns nil (no-op) when Payload is empty so that callers can ignore decode for header-only events.

func (Envelope) GraphID added in v0.1.12

func (e Envelope) GraphID() string

GraphID returns the value of the well-known graph_id header.

func (Envelope) Header added in v0.1.12

func (e Envelope) Header(key string) string

Header returns the header value or "" when absent.

func (Envelope) KanbanScopeID added in v0.2.5

func (e Envelope) KanbanScopeID() string

KanbanScopeID returns the value of the well-known kanban_scope_id header.

func (Envelope) NodeID added in v0.1.12

func (e Envelope) NodeID() string

NodeID returns the value of the well-known node_id header.

func (Envelope) RunID added in v0.1.12

func (e Envelope) RunID() string

RunID returns the value of the well-known run_id header.

func (*Envelope) SetActorID deprecated added in v0.1.12

func (e *Envelope) SetActorID(id string)

SetActorID is the legacy spelling of SetAgentID.

Deprecated: use Envelope.SetAgentID. The "actor" terminology pre-dates the agent / step-actor distinction settled in v0.4 (envelope header "actor_id" is the producer agent identity; the per-step "actor" segment in engine.SubjectStep* subjects is a separate dimension, see sdk/engine/subjects.go). Removed in v0.5.0.

func (*Envelope) SetAgentID added in v0.3.4

func (e *Envelope) SetAgentID(id string)

SetAgentID stamps the agent identifier (sdk/agent.Agent.ID) of the producer onto the envelope.

The call is dual-write: it sets both HeaderAgentID (the canonical post-v0.4 key) AND HeaderActorID (the legacy spelling) so observers that have not yet migrated keep working unchanged. The legacy header is removed in v0.5.0; producers should call SetAgentID and stop relying on SetActorID.

func (*Envelope) SetGraphID added in v0.1.12

func (e *Envelope) SetGraphID(id string)

SetGraphID is a typed shorthand for SetHeader(HeaderGraphID, id).

func (*Envelope) SetHeader added in v0.1.12

func (e *Envelope) SetHeader(key, value string)

SetHeader sets a header value, allocating Headers lazily.

func (*Envelope) SetKanbanScopeID added in v0.2.5

func (e *Envelope) SetKanbanScopeID(id string)

SetKanbanScopeID is a typed shorthand for SetHeader(HeaderKanbanScopeID, id).

func (*Envelope) SetNodeID added in v0.1.12

func (e *Envelope) SetNodeID(id string)

SetNodeID is a typed shorthand for SetHeader(HeaderNodeID, id).

func (*Envelope) SetRunID added in v0.1.12

func (e *Envelope) SetRunID(id string)

SetRunID is a typed shorthand for SetHeader(HeaderRunID, id).

func (*Envelope) SetTenant added in v0.1.12

func (e *Envelope) SetTenant(id string)

SetTenant is a typed shorthand for SetHeader(HeaderTenant, id).

func (Envelope) SpanContext added in v0.2.9

func (e Envelope) SpanContext() trace.SpanContext

SpanContext returns the OTel SpanContext encoded in this envelope's TraceID / SpanID fields, or an invalid SpanContext if the envelope was produced outside of a span (or the IDs are malformed).

The returned SpanContext has its Remote flag set so downstream code that uses trace.ContextWithRemoteSpanContext / [WithRemoteContext] produces the correct "follows-from-remote-process" semantics in the trace UI.

Use Envelope.WithRemoteContext when you want to attach the envelope's parent to a context for child span creation.

func (Envelope) Tenant added in v0.1.12

func (e Envelope) Tenant() string

Tenant returns the value of the well-known tenant header.

func (Envelope) WithRemoteContext added in v0.2.9

func (e Envelope) WithRemoteContext(ctx context.Context) context.Context

WithRemoteContext returns a child context whose parent span is the remote span carried by this envelope. Use it on the SUBSCRIBER side when starting a span that should appear as a child of the envelope-emitting span:

ctx := env.WithRemoteContext(ctx)
ctx, span := tracer.Start(ctx, "consumer.handle")
defer span.End()

If the envelope carries no usable parent (no TraceID, malformed IDs), the original context is returned unchanged so callers do not need a separate code path for "no parent" envelopes.

type MemoryBus

type MemoryBus struct {
	// contains filtered or unexported fields
}

MemoryBus is the in-process Bus implementation.

Concurrency model:

  • subscribers map is guarded by mu;
  • Publish takes RLock for the route-and-enqueue scan;
  • Subscribe / Close / removeSub take the write lock;
  • in-flight Publish calls are tracked by inflight (sync.WaitGroup) so Close can wait for them to drain before any subscriber channel is closed — this avoids the "send on closed channel" race that bit earlier per-subscription bus implementations.

Hot-path lock-freedom:

  • Observer callbacks are deferred to a local slice and executed after the RLock has been released.

Route cache:

  • routeCache memoises subject → []*memSub so a hot subject avoids the O(N) match loop. Guarded by routeCacheMu (a separate mutex from b.mu so cache writes don't compete with the publish RLock fan-out).
  • Lock order when both are held: ALWAYS b.mu first, then routeCacheMu. Subscribe / Close / removeSub clear the cache while holding b.mu.Lock(); Publish reads/writes the cache while holding b.mu.RLock(). This ordering prevents a writer's clear from racing a publisher's lookup of stale entries.

func NewMemoryBus

func NewMemoryBus(opts ...MemoryBusOption) *MemoryBus

NewMemoryBus constructs an in-process Bus.

func (*MemoryBus) Close

func (b *MemoryBus) Close() error

Close performs a multi-phase shutdown:

  1. Take the write lock, mark closed, snapshot subscribers, release lock. New Publish / Subscribe calls now fail with ErrBusClosed.
  2. Wake every Block-backpressure publisher by closing each sub.done (signalClose). They will exit with a DropReasonClosed drop.
  3. Wait for every in-flight Publish to return — both publishers parked in Block sends and publishers in the middle of a non-blocking enqueue.
  4. Wait for every sub's per-sub senders wg to drain (defence in depth: any still-parked Block send must call senders.Done before we close the channel).
  5. Close every subscription channel.

Without step 2, step 3 (inflight.Wait) would deadlock against any Block publisher whose target channel is full and whose subscription has no remaining consumer.

func (*MemoryBus) Dropped

func (b *MemoryBus) Dropped() int64

Dropped returns the cumulative count of envelopes dropped due to DropNewest / DropOldest policies. Diagnostic only.

func (*MemoryBus) Publish

func (b *MemoryBus) Publish(ctx context.Context, env Envelope) error

Publish routes env to every matching subscription according to each subscription's BackpressurePolicy.

func (*MemoryBus) Subscribe

func (b *MemoryBus) Subscribe(ctx context.Context, pattern Pattern, opts ...SubOption) (Subscription, error)

Subscribe creates a subscription. pattern is validated; ctx cancellation triggers Close.

type MemoryBusOption

type MemoryBusOption func(*MemoryBus)

MemoryBusOption configures a MemoryBus at construction time.

func WithObserver added in v0.1.12

func WithObserver(o Observer) MemoryBusOption

WithObserver attaches an Observer for lifecycle instrumentation.

Replaces the legacy WithDropCallback option. See Observer for the concurrency contract.

func WithRouteCacheSize added in v0.1.12

func WithRouteCacheSize(n int) MemoryBusOption

WithRouteCacheSize bounds the subject→subscribers route cache.

The route cache memoises the result of scanning every subscription for a given Subject so that hot subjects skip the per-publish O(N) match loop. Cache entries are invalidated whenever the subscription set changes (Subscribe / removeSub / Close).

Sizing:

  • n > 0 : cap at n distinct subjects; on overflow the cache is wholesale cleared (poor-man's LRU — adequate when the working set is small or churns slowly, which matches every current SDK producer).
  • n == 0 : disable the cache entirely; every Publish scans subscribers. Use when subjects are unique-per-publish (e.g. ID embedded in subject and never repeated).
  • n < 0 : ignored; default applies.

Default is defaultRouteCacheSize.

type NoopBus

type NoopBus struct{}

NoopBus is a Bus that discards every Publish and yields a closed channel for every Subscribe. Useful for tests and as a default value to keep nil-checks out of producer code.

func NewNoopBus added in v0.1.12

func NewNoopBus() NoopBus

NewNoopBus returns a NoopBus value (kept for symmetry with NewMemoryBus).

func (NoopBus) Close

func (NoopBus) Close() error

func (NoopBus) Publish

func (NoopBus) Publish(context.Context, Envelope) error

func (NoopBus) Subscribe

type Observer added in v0.1.12

type Observer interface {
	// OnPublish fires once per Publish call, before any delivery decision.
	OnPublish(env Envelope)
	// OnDeliver fires after a successful enqueue into a subscription's
	// channel.
	OnDeliver(subID SubscriptionID, env Envelope)
	// OnDrop fires when an envelope is discarded for a specific
	// subscription, identifying the reason.
	OnDrop(subID SubscriptionID, env Envelope, reason DropReason)
}

Observer is the bus-level instrumentation hook. It carries the OnPublish / OnDeliver / OnDrop lifecycle for every envelope, giving callers a single place to wire metrics / tracing / drop accounting.

Concurrency contract — every Bus implementation must guarantee:

  • Observer methods are invoked outside any bus-internal lock.
  • Implementations must return promptly. They MUST NOT call back into Bus methods (Publish / Subscribe / Close); doing so risks deadlock and the bus is allowed to detect such calls and panic in debug builds.
  • Observer methods may be invoked concurrently from multiple goroutines; implementations must be safe for concurrent use.

Ordering:

  • For a single Publish call, OnPublish is invoked exactly once, followed by zero or more OnDeliver / OnDrop calls (one per matching subscription).
  • The relative order of OnDeliver / OnDrop calls within one Publish reflects the bus's internal subscription scan and is therefore implementation-defined. MemoryBus, for example, memoises the match result per Subject so a hot subject sees a stable callback order across publishes — observers MUST NOT rely on that order being either stable or random.

Observer is intentionally an interface (not a func) so a single hook can correlate the three lifecycle moments without ad-hoc closures.

type Pattern added in v0.1.12

type Pattern string

Pattern is a Subject matcher using NATS-style wildcards:

  • matches exactly one segment > matches one or more trailing segments (must be the last segment)

Examples:

graph.run.r1.>             every event for run r1
graph.run.*.node.*.start   every node start across runs
kanban.>                   every kanban event

Pattern matching is case-sensitive.

func (Pattern) Matches added in v0.1.12

func (p Pattern) Matches(s Subject) bool

Matches reports whether s satisfies pattern p.

Matching is segment-wise:

  • literal segments must compare byte-for-byte equal;
  • '*' matches any single segment;
  • '>' matches one or more trailing segments (must be the last pattern segment; Validate enforces this).

An empty pattern matches nothing. An empty subject matches nothing. Matches does not validate p; callers that accept untrusted input should call p.Validate() first (Bus implementations are required to).

Matches splits both p and s on every call. Hot paths inside the package (MemoryBus.Publish) use matchSegs directly with pre-split inputs to avoid the per-call allocations.

func (Pattern) Validate added in v0.1.12

func (p Pattern) Validate() error

Validate reports whether p is a well-formed pattern literal.

A pattern must:

  • be non-empty;
  • not exceed subjectMaxBytes;
  • have no leading, trailing or consecutive '.' separators;
  • have each '*' / '>' segment occupy a whole segment;
  • have at most one '>' segment, and only as the last segment.

type SubOption added in v0.1.12

type SubOption func(*subOptions)

SubOption configures a Subscription at creation time.

func WithBackpressure added in v0.1.12

func WithBackpressure(p BackpressurePolicy) SubOption

WithBackpressure overrides the default DropNewest policy.

func WithBufferSize

func WithBufferSize(n int) SubOption

WithBufferSize sets the buffered channel capacity. Values <= 0 fall back to the default (64).

func WithPredicate added in v0.1.12

func WithPredicate(fn func(Envelope) bool) SubOption

WithPredicate adds a secondary filter applied after pattern matching. Useful when subject routing alone cannot express the desired filter (e.g. multi-tenant header checks).

func WithSampleRate added in v0.2.5

func WithSampleRate(rate float64) SubOption

WithSampleRate configures the per-envelope keep probability when the subscription uses BackpressurePolicy Sample. Values are clamped to [0.0, 1.0]; values <= 0 reject every envelope (DropReasonSampled), values >= 1 pass every envelope through to the buffer-full check.

Has no effect on subscriptions whose policy is not Sample, so it is safe to set unconditionally on a shared option set.

type Subject added in v0.1.12

type Subject string

Subject is a dot-delimited routing key, e.g.:

graph.run.r1.start
graph.run.r1.node.n1.complete
kanban.board.b1.update

Segments are separated by '.'. A segment must not contain '.', '*' or '>' and must not be empty.

func (Subject) Validate added in v0.1.12

func (s Subject) Validate() error

Validate reports whether s is a well-formed subject literal.

A subject must:

  • be non-empty;
  • not exceed subjectMaxBytes;
  • have no leading, trailing or consecutive '.' separators;
  • have no segment containing '*' or '>' (those are pattern-only).

type Subscription

type Subscription interface {
	// ID is unique within the originating bus instance.
	ID() SubscriptionID
	// C returns the envelope channel. The channel is closed exactly once,
	// either when Close is invoked or when the parent bus is closed.
	C() <-chan Envelope
	// Close cancels the subscription. Idempotent.
	Close() error
}

Subscription is an active receive-side handle.

type SubscriptionID added in v0.1.12

type SubscriptionID string

SubscriptionID identifies a subscription within a single Bus instance. It is opaque to consumers; bus implementations choose the format.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL