Documentation
¶
Overview ¶
Package bus provides a small message-bus abstraction for FastConf.
Many infrastructure teams already standardise on a publish/subscribe system (NATS, Kafka, Redis pub/sub, MQTT, GCP Pub/Sub, ...). When a configuration change is broadcast over such a bus, every subscribing instance can refresh in milliseconds without polling. This package exposes a Broker interface that hides the chosen transport and a BusProvider that adapts any Broker into a contracts.Provider so it can participate in a Manager's normal merge pipeline.
The MemoryBroker reference implementation is goroutine-safe and production-suitable for single-process tests / examples; real transports (NATS, Kafka, ...) live in sibling sub-modules so users only pay for the dependency they actually use.
Index ¶
- func YAMLDecoder(b []byte) (map[string]any, error)
- type Broker
- type BusProvider
- func (p *BusProvider) Load(_ context.Context) (map[string]any, error)
- func (p *BusProvider) LoadSnapshot(ctx context.Context) (contracts.Snapshot, error)
- func (p *BusProvider) Name() string
- func (p *BusProvider) Priority() int
- func (p *BusProvider) SetOnDecodeError(fn func(subject string, payload []byte, err error))
- func (p *BusProvider) Watch(ctx context.Context) (<-chan contracts.Event, error)
- type Decoder
- type MemoryBroker
- type Message
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
Types ¶
type Broker ¶
type Broker interface {
Publish(ctx context.Context, msg Message) error
Subscribe(ctx context.Context, subject string) (<-chan Message, error)
Close() error
}
Broker is a minimal pub/sub abstraction. Implementations MUST be safe for concurrent use. Subscribe returns a channel that is closed when ctx is cancelled or the broker is closed.
type BusProvider ¶
type BusProvider struct {
// contains filtered or unexported fields
}
BusProvider adapts a Broker into a contracts.SnapshotProvider.
On Watch, it subscribes to Subject and forwards every message as a contracts.Event whose Revision matches Message.Revision; the latest Message is cached so Load() / LoadSnapshot() return the most recent payload (or an empty map until the first message arrives).
func New ¶
func New(name, subject string, prio int, broker Broker, dec Decoder) *BusProvider
New returns a BusProvider over broker for the given subject. Use nil decoder to default to YAML.
func (*BusProvider) Load ¶
Load returns the most recently observed payload (empty map until a message arrives).
func (*BusProvider) LoadSnapshot ¶
LoadSnapshot implements contracts.SnapshotProvider, exposing the revision and a Stale flag (Stale=true until the first message arrives).
func (*BusProvider) Priority ¶
func (p *BusProvider) Priority() int
Priority implements contracts.Provider.
func (*BusProvider) SetOnDecodeError ¶
func (p *BusProvider) SetOnDecodeError(fn func(subject string, payload []byte, err error))
SetOnDecodeError installs a callback fired on each Decoder failure. Pass nil to detach. Safe to call before or after Watch.
type Decoder ¶
Decoder converts raw payload bytes into a generic map[string]any. The default decoder uses YAML which is also a JSON superset.
type MemoryBroker ¶
type MemoryBroker struct {
// contains filtered or unexported fields
}
MemoryBroker is an in-process Broker reference. It keeps a fan-out list of subscriber channels per subject and never blocks the publisher (slow subscribers drop the oldest message — back-pressure is the caller's responsibility).
func NewMemoryBroker ¶
func NewMemoryBroker(bufSize int) *MemoryBroker
NewMemoryBroker returns a MemoryBroker whose per-subscriber buffer holds bufSize messages (default 16).
func (*MemoryBroker) Close ¶
func (b *MemoryBroker) Close() error
Close shuts the broker down; subsequent Publish/Subscribe calls error.
type Message ¶
type Message struct {
// Subject is the topic / channel name the message was published on.
Subject string
// Payload carries the raw bytes (typically YAML or JSON).
Payload []byte
// Revision is an optional opaque version identifier; when non-empty
// it is forwarded as Event.Revision and Snapshot.Revision so the
// reload pipeline can short-circuit duplicate broadcasts.
Revision string
// At is the publish timestamp (set by the broker).
At time.Time
}
Message is a single payload broadcast through the bus.