broker

package
v0.3.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 16, 2026 License: MIT Imports: 6 Imported by: 0

Documentation

Overview

Package broker wraps the github.com/centrifugal/centrifuge OSS library in the small surface Parsec actually needs: boot a Node, publish to a named channel, authorize subscribes against the Parsec channel grammar, and expose presence + history for the management surface.

Centrifuge primitives that leak through:

  • *centrifuge.Node — the running broker
  • centrifuge.Publication — the on-wire envelope (data + offset + epoch)

Everything else is fronted by Parsec types so the CLI and Twirp surfaces never type-assert on centrifuge internals.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Broker

type Broker struct {
	// contains filtered or unexported fields
}

Broker is the Parsec-flavored handle over a centrifuge Node. Construct with New, start with Run.

func New

func New(opts Options) (*Broker, error)

New constructs a broker. The Node is created but not started.

func (*Broker) Node

func (b *Broker) Node() *centrifuge.Node

Node returns the underlying *centrifuge.Node. The HTTP transport mount in internal/server needs it. Library callers should not.

func (*Broker) Presence

func (b *Broker) Presence(ctx context.Context, ch channels.Name) (int, error)

Presence returns the active subscriber count on ch.

func (*Broker) Publish

func (b *Broker) Publish(ctx context.Context, ch channels.Name, data []byte, delta bool) (PublishResult, error)

Publish ships data to ch with history persistence. When delta is true, centrifuge fans out fossil-delta patches to subscribers that negotiated delta encoding; non-delta subscribers still receive the full payload.

func (*Broker) Run

func (b *Broker) Run(ctx context.Context) error

Run starts the broker. Blocks until ctx is canceled.

func (*Broker) Started

func (b *Broker) Started() bool

Started reports whether Run has finished booting the centrifuge node. Useful for test harnesses that launch Run in a goroutine and need to gate the first Publish on broker readiness.

func (*Broker) SubscribeAuthorizer added in v0.3.0

func (b *Broker) SubscribeAuthorizer() SubscribeAuthorizer

SubscribeAuthorizer returns the configured authorizer. Surface code uses this to drive integration tests of the revocation / token / rate-limit chain without spinning a real client connection.

func (*Broker) UnsubscribeAll

func (b *Broker) UnsubscribeAll(ch channels.Name)

UnsubscribeAll kicks every connection subscribed to ch. Used by the manager-event bridge when a channel is deleted or expires.

type DeltaProvider

type DeltaProvider func(ch channels.Name) bool

DeltaProvider reports whether ch has fossil-delta encoding enabled. Used at subscribe time to negotiate the AllowedDeltaTypes set; the broker has no opinion of its own.

type Options

type Options struct {
	// HistorySize bounds how many publications are retained per channel for
	// late-subscriber replay. Default 100.
	HistorySize int
	// PublicHistoryTTL is how long public history is kept. Default 5m.
	PublicHistoryTTL time.Duration
	// PrivateHistoryTTL is the same for private channels. Default 5m.
	PrivateHistoryTTL time.Duration
	// LogHandler receives broker-level log events. Optional.
	LogHandler centrifuge.LogHandler
	// SubscribeAuthorizer is consulted on every subscribe attempt. Default
	// authorizer allows any well-formed public channel and rejects all
	// private channels (callers must provide their own authorizer for
	// private access).
	SubscribeAuthorizer SubscribeAuthorizer
	// DeltaProvider, when non-nil, lets the broker negotiate fossil-delta
	// encoding per channel at subscribe time. Returns true to enable
	// delta on that subscription.
	DeltaProvider DeltaProvider
	// RedisShards turn the broker into a Redis-backed broker shared with
	// any other Parsec node configured against the same shards. Empty
	// means in-memory single-node mode. Pre-built via centrifuge.NewRedisShard.
	RedisShards []*centrifuge.RedisShard
	// RedisBrokerPrefix overrides the default centrifuge key prefix
	// ("parsec"). Only used when RedisShards is non-empty.
	RedisBrokerPrefix string
	// OnSubscriberChange, when non-nil, is invoked once per subscribe
	// and once per unsubscribe with the channel's parsed name and the
	// signed delta (+1 / -1). Used by the metrics layer to keep a live
	// subscriber gauge by visibility without leaking channel names.
	OnSubscriberChange func(ch channels.Name, delta int)
}

Options configures the broker at boot. Zero values give safe defaults suitable for a single-node, in-memory deployment.

type PublishResult

type PublishResult struct {
	Offset uint64
	Epoch  string
}

PublishResult is the Parsec-shaped publish ack.

type SubscribeAuthorizer

type SubscribeAuthorizer func(ctx context.Context, userID string, ch channels.Name, event centrifuge.SubscribeEvent) error

SubscribeAuthorizer decides whether a connection may subscribe to ch. Implementations typically check a JWT in event.Token or a per-channel ACL.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL