bus

package
v0.15.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 16, 2026 License: MIT Imports: 6 Imported by: 0

Documentation

Overview

Package bus provides a small message-bus abstraction for FastConf.

Many infrastructure teams already standardise on a publish/subscribe system (NATS, Kafka, Redis pub/sub, MQTT, GCP Pub/Sub, ...). When a configuration change is broadcast over such a bus, every subscribing instance can refresh in milliseconds without polling. This package exposes a Broker interface that hides the chosen transport and a BusProvider that adapts any Broker into a contracts.Provider so it can participate in a Manager's normal merge pipeline.

The MemoryBroker reference implementation is goroutine-safe and production-suitable for single-process tests / examples; real transports (NATS, Kafka, ...) live in sibling sub-modules so users only pay for the dependency they actually use.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func YAMLDecoder

func YAMLDecoder(b []byte) (map[string]any, error)

YAMLDecoder is the default Decoder.

Types

type Broker

type Broker interface {
	Publish(ctx context.Context, msg Message) error
	Subscribe(ctx context.Context, subject string) (<-chan Message, error)
	Close() error
}

Broker is a minimal pub/sub abstraction. Implementations MUST be safe for concurrent use. Subscribe returns a channel that is closed when ctx is cancelled or the broker is closed.

type BusProvider

type BusProvider struct {
	// contains filtered or unexported fields
}

BusProvider adapts a Broker into a contracts.SnapshotProvider.

On Watch, it subscribes to Subject and forwards every message as a contracts.Event whose Revision matches Message.Revision; the latest Message is cached so Load() / LoadSnapshot() return the most recent payload (or an empty map until the first message arrives).

func New

func New(name, subject string, prio int, broker Broker, dec Decoder) *BusProvider

New returns a BusProvider over broker for the given subject. Use nil decoder to default to YAML.

func (*BusProvider) Load

func (p *BusProvider) Load(_ context.Context) (map[string]any, error)

Load returns the most recently observed payload (empty map until a message arrives).

func (*BusProvider) LoadSnapshot

func (p *BusProvider) LoadSnapshot(ctx context.Context) (contracts.Snapshot, error)

LoadSnapshot implements contracts.SnapshotProvider, exposing the revision and a Stale flag (Stale=true until the first message arrives).

func (*BusProvider) Name

func (p *BusProvider) Name() string

Name implements contracts.Provider.

func (*BusProvider) Priority

func (p *BusProvider) Priority() int

Priority implements contracts.Provider.

func (*BusProvider) SetOnDecodeError

func (p *BusProvider) SetOnDecodeError(fn func(subject string, payload []byte, err error))

SetOnDecodeError installs a callback fired on each Decoder failure. Pass nil to detach. Safe to call before or after Watch.

func (*BusProvider) Watch

func (p *BusProvider) Watch(ctx context.Context) (<-chan contracts.Event, error)

Watch subscribes to the bus subject and forwards events.

type Decoder

type Decoder func([]byte) (map[string]any, error)

Decoder converts raw payload bytes into a generic map[string]any. The default decoder uses YAML which is also a JSON superset.

type MemoryBroker

type MemoryBroker struct {
	// contains filtered or unexported fields
}

MemoryBroker is an in-process Broker reference. It keeps a fan-out list of subscriber channels per subject and never blocks the publisher (slow subscribers drop the oldest message — back-pressure is the caller's responsibility).

func NewMemoryBroker

func NewMemoryBroker(bufSize int) *MemoryBroker

NewMemoryBroker returns a MemoryBroker whose per-subscriber buffer holds bufSize messages (default 16).

func (*MemoryBroker) Close

func (b *MemoryBroker) Close() error

Close shuts the broker down; subsequent Publish/Subscribe calls error.

func (*MemoryBroker) Publish

func (b *MemoryBroker) Publish(_ context.Context, msg Message) error

Publish delivers msg to every active subscriber of msg.Subject.

func (*MemoryBroker) Subscribe

func (b *MemoryBroker) Subscribe(ctx context.Context, subject string) (<-chan Message, error)

Subscribe returns a channel for new messages on subject; closed when ctx is cancelled or Close() is called.

type Message

type Message struct {
	// Subject is the topic / channel name the message was published on.
	Subject string
	// Payload carries the raw bytes (typically YAML or JSON).
	Payload []byte
	// Revision is an optional opaque version identifier; when non-empty
	// it is forwarded as Event.Revision and Snapshot.Revision so the
	// reload pipeline can short-circuit duplicate broadcasts.
	Revision string
	// At is the publish timestamp (set by the broker).
	At time.Time
}

Message is a single payload broadcast through the bus.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL