Documentation
¶
Overview ¶
Package broker wraps the github.com/centrifugal/centrifuge OSS library in the small surface Parsec actually needs: boot a Node, publish to a named channel, authorize subscribes against the Parsec channel grammar, and expose presence + history for the management surface.
Centrifuge primitives that leak through:
- *centrifuge.Node — the running broker
- centrifuge.Publication — the on-wire envelope (data + offset + epoch)
Everything else is fronted by Parsec types so the CLI and Twirp surfaces never type-assert on centrifuge internals.
Index ¶
- type Broker
- func (b *Broker) Node() *centrifuge.Node
- func (b *Broker) Presence(ctx context.Context, ch channels.Name) (int, error)
- func (b *Broker) Publish(ctx context.Context, ch channels.Name, data []byte, delta bool) (PublishResult, error)
- func (b *Broker) Run(ctx context.Context) error
- func (b *Broker) Started() bool
- func (b *Broker) SubscribeAuthorizer() SubscribeAuthorizer
- func (b *Broker) UnsubscribeAll(ch channels.Name)
- type DeltaProvider
- type Options
- type PublishResult
- type SubscribeAuthorizer
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Broker ¶
type Broker struct {
// contains filtered or unexported fields
}
Broker is the Parsec-flavored handle over a centrifuge Node. Construct with New, start with Run.
func (*Broker) Node ¶
func (b *Broker) Node() *centrifuge.Node
Node returns the underlying *centrifuge.Node. The HTTP transport mount in internal/server needs it. Library callers should not.
func (*Broker) Publish ¶
func (b *Broker) Publish(ctx context.Context, ch channels.Name, data []byte, delta bool) (PublishResult, error)
Publish ships data to ch with history persistence. When delta is true, centrifuge fans out fossil-delta patches to subscribers that negotiated delta encoding; non-delta subscribers still receive the full payload.
func (*Broker) Started ¶
Started reports whether Run has finished booting the centrifuge node. Useful for test harnesses that launch Run in a goroutine and need to gate the first Publish on broker readiness.
func (*Broker) SubscribeAuthorizer ¶ added in v0.3.0
func (b *Broker) SubscribeAuthorizer() SubscribeAuthorizer
SubscribeAuthorizer returns the configured authorizer. Surface code uses this to drive integration tests of the revocation / token / rate-limit chain without spinning a real client connection.
func (*Broker) UnsubscribeAll ¶
UnsubscribeAll kicks every connection subscribed to ch. Used by the manager-event bridge when a channel is deleted or expires.
type DeltaProvider ¶
DeltaProvider reports whether ch has fossil-delta encoding enabled. Used at subscribe time to negotiate the AllowedDeltaTypes set; the broker has no opinion of its own.
type Options ¶
type Options struct {
// HistorySize bounds how many publications are retained per channel for
// late-subscriber replay. Default 100.
HistorySize int
// PublicHistoryTTL is how long public history is kept. Default 5m.
PublicHistoryTTL time.Duration
// PrivateHistoryTTL is the same for private channels. Default 5m.
PrivateHistoryTTL time.Duration
// LogHandler receives broker-level log events. Optional.
LogHandler centrifuge.LogHandler
// SubscribeAuthorizer is consulted on every subscribe attempt. Default
// authorizer allows any well-formed public channel and rejects all
// private channels (callers must provide their own authorizer for
// private access).
SubscribeAuthorizer SubscribeAuthorizer
// DeltaProvider, when non-nil, lets the broker negotiate fossil-delta
// encoding per channel at subscribe time. Returns true to enable
// delta on that subscription.
DeltaProvider DeltaProvider
// RedisShards turn the broker into a Redis-backed broker shared with
// any other Parsec node configured against the same shards. Empty
// means in-memory single-node mode. Pre-built via centrifuge.NewRedisShard.
RedisShards []*centrifuge.RedisShard
// RedisBrokerPrefix overrides the default centrifuge key prefix
// ("parsec"). Only used when RedisShards is non-empty.
RedisBrokerPrefix string
// OnSubscriberChange, when non-nil, is invoked once per subscribe
// and once per unsubscribe with the channel's parsed name and the
// signed delta (+1 / -1). Used by the metrics layer to keep a live
// subscriber gauge by visibility without leaking channel names.
OnSubscriberChange func(ch channels.Name, delta int)
}
Options configures the broker at boot. Zero values give safe defaults suitable for a single-node, in-memory deployment.
type PublishResult ¶
PublishResult is the Parsec-shaped publish ack.
type SubscribeAuthorizer ¶
type SubscribeAuthorizer func(ctx context.Context, userID string, ch channels.Name, event centrifuge.SubscribeEvent) error
SubscribeAuthorizer decides whether a connection may subscribe to ch. Implementations typically check a JWT in event.Token or a per-channel ACL.