channels

package
v0.3.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 8, 2026 License: MIT Imports: 12 Imported by: 0

Documentation

Overview

Package channels owns the channel naming contract and the channel manager (public / private namespaces, TTL bookkeeping, expiry).

The grammar is:

<visibility>:<app>.<domain>[.<id>][.<topic>]

where visibility is "public" or "private". Anything else is rejected by ParseName so the CLI and Twirp surfaces share one validator.

Index

Constants

View Source
const DefaultPublicTTL = 30 * time.Minute

DefaultPublicTTL is applied when the caller does not specify a TTL on channel creation for the public namespace.

View Source
const MaxPatternDepth = 8

MaxPatternDepth caps the segment count of a pattern. Patterns that try to address more than this many dot-separated segments are rejected at parse time so a pathological token like `**.**.**...` cannot become a denial-of-service vector. The cap mirrors the channel grammar's practical depth (app.domain.id.topic) plus headroom.

View Source
const MaxPrivateTTL = 1 * time.Hour

MaxPrivateTTL is the hard cap on private channel inactivity TTL. Anything larger is rejected at creation time.

Variables

This section is empty.

Functions

This section is empty.

Types

type Channel

type Channel struct {
	Name       Name
	State      State
	TTL        time.Duration
	CreatedAt  time.Time
	LastActive time.Time
	// DeltaEnabled toggles centrifuge fossil-delta encoding for this
	// channel. Off by default; enable for high-frequency channels with
	// small mutations (scoreboards, tickers, partial state snapshots).
	DeltaEnabled bool
}

Channel is the persisted view of a managed channel. Token material is never stored here — the auth package owns secrets; the manager owns channel lifecycle.

type Event

type Event struct {
	Kind EventKind
	Name Name
	At   time.Time
}

Event is the payload emitted on every lifecycle transition.

type EventBus

type EventBus interface {
	// Publish broadcasts ev to remote nodes. The caller will already have
	// fanned it out locally; the bus must NOT re-emit on the originating
	// node.
	Publish(ctx context.Context, ev Event) error

	// Run subscribes to the bus. onRemote fires for every remote event
	// (own publishes are filtered by NodeID). Blocks until ctx is done or
	// the underlying transport errors fatally.
	Run(ctx context.Context, onRemote func(Event)) error
}

EventBus is the cross-node bridge for channel lifecycle events. A publish on node A is delivered to every other node's local Manager so subscribers everywhere see the same lifecycle transitions.

Implementations are responsible for tagging publications with the originating NodeID and skipping local re-delivery — the Manager fans out local events itself; the bus carries only the cross-node fan-in.

type EventKind

type EventKind string

EventKind is the lifecycle transition a Channel just made.

const (
	// EventOpened fires on first open or re-open of a public channel.
	EventOpened EventKind = "opened"
	// EventClosed fires when a public channel transitions to StateClosed via
	// Sweep (TTL exceeded). Public-close semantics: no new subscribers, but
	// existing subscribers are not kicked.
	EventClosed EventKind = "closed"
	// EventDeleted fires when a channel is removed — either by an explicit
	// Delete call or because a private channel expired and was reaped from
	// the manager. Subscribers MUST be kicked.
	EventDeleted EventKind = "deleted"
)

type Manager

type Manager struct {
	// contains filtered or unexported fields
}

Manager owns channel lifecycle, TTL enforcement, and the inactivity tick. State persistence is delegated to a Store; event fanout to subscribers stays in process. Safe for concurrent use.

func NewManager

func NewManager() *Manager

NewManager constructs a Manager backed by an in-memory store. Equivalent to NewManagerWithStore(NewMemoryStore()).

func NewManagerWithStore

func NewManagerWithStore(s Store) *Manager

NewManagerWithStore constructs a Manager backed by the supplied Store.

func (*Manager) CreatePrivate

func (m *Manager) CreatePrivate(name Name, ttl time.Duration) (*Channel, error)

CreatePrivate registers a new private channel. TTL is capped at MaxPrivateTTL.

func (*Manager) Delete

func (m *Manager) Delete(name Name) error

Delete removes the channel. Idempotent on already-deleted channels.

func (*Manager) EventBus

func (m *Manager) EventBus() EventBus

EventBus returns the configured cross-node bus, or nil.

func (*Manager) Get

func (m *Manager) Get(name Name) (*Channel, error)

Get returns a snapshot.

func (*Manager) IsDelta

func (m *Manager) IsDelta(name Name) bool

IsDelta reports whether the channel has delta encoding enabled.

func (*Manager) IsOpen

func (m *Manager) IsOpen(name Name) bool

IsOpen reports whether the channel exists and is in StateOpen.

func (*Manager) List

func (m *Manager) List() []Channel

List returns a snapshot of all managed channels.

func (*Manager) OpenPublic

func (m *Manager) OpenPublic(name Name, ttl time.Duration) (*Channel, error)

OpenPublic creates or re-opens a public channel. Pass 0 for DefaultPublicTTL.

func (*Manager) RunEventBus

func (m *Manager) RunEventBus(ctx context.Context) error

RunEventBus subscribes to the cross-node bus and re-emits remote events to local subscribers. Returns when ctx is canceled or the bus exits. No-op when no bus is configured.

func (*Manager) RunSweeper

func (m *Manager) RunSweeper(ctx context.Context, every time.Duration)

RunSweeper drives Sweep on every tick until ctx is done.

func (*Manager) SetClock

func (m *Manager) SetClock(c func() time.Time)

SetClock overrides the time source. Used in tests.

func (*Manager) SetDelta

func (m *Manager) SetDelta(name Name, enabled bool) error

SetDelta toggles fossil-delta encoding for the channel. Off by default. Enable for high-frequency channels where small mutations dominate the publish stream.

func (*Manager) SetEventBus

func (m *Manager) SetEventBus(b EventBus)

SetEventBus wires a cross-node event bus. When non-nil, every locally emitted event is also published to the bus, and remote events are fanned out to local subscribers. Used to make manager state coherent across multi-node deployments.

func (*Manager) SetOnSweep

func (m *Manager) SetOnSweep(cb func([]Channel))

SetOnSweep registers a callback invoked after every Sweep tick with the current channel list. Used by the metrics layer to publish gauges without leaking channel names. Pass nil to remove.

func (*Manager) Subscribe

func (m *Manager) Subscribe(ch chan<- Event)

Subscribe registers a consumer for lifecycle events. The caller owns the channel and must drain it; the manager does a non-blocking send and drops events on full buffers. Use a buffered channel (cap >= 16) for production.

func (*Manager) Sweep

func (m *Manager) Sweep(now time.Time) int

Sweep runs one expiry pass and returns the count of transitions.

func (*Manager) Touch

func (m *Manager) Touch(name Name) error

Touch refreshes the channel's LastActive.

type MemoryStore

type MemoryStore struct {
	// contains filtered or unexported fields
}

MemoryStore is the in-process Store. Single-node Parsec deployments use it by default; multi-node deployments swap it for a RedisStore.

func NewMemoryStore

func NewMemoryStore() *MemoryStore

NewMemoryStore returns an empty in-memory store.

func (*MemoryStore) CreatePrivate

func (s *MemoryStore) CreatePrivate(name Name, ttl time.Duration, now time.Time) (*Channel, Event, error)

CreatePrivate registers a new private channel.

func (*MemoryStore) Delete

func (s *MemoryStore) Delete(name Name, now time.Time) (Event, error)

Delete marks the channel deleted.

func (*MemoryStore) Get

func (s *MemoryStore) Get(name Name) (*Channel, error)

Get returns a snapshot or ChannelNotFound.

func (*MemoryStore) IsOpen

func (s *MemoryStore) IsOpen(name Name) bool

IsOpen reports whether the channel is in StateOpen.

func (*MemoryStore) List

func (s *MemoryStore) List() ([]Channel, error)

List returns a snapshot.

func (*MemoryStore) OpenPublic

func (s *MemoryStore) OpenPublic(name Name, ttl time.Duration, now time.Time) (*Channel, Event, error)

OpenPublic creates or re-opens a public channel.

func (*MemoryStore) SetDelta

func (s *MemoryStore) SetDelta(name Name, enabled bool) error

SetDelta toggles fossil-delta encoding on the channel.

func (*MemoryStore) Sweep

func (s *MemoryStore) Sweep(now time.Time) ([]Event, error)

Sweep runs one expiry pass.

func (*MemoryStore) Touch

func (s *MemoryStore) Touch(name Name, now time.Time) error

Touch refreshes LastActive.

type Name

type Name struct {
	Visibility Visibility
	App        string
	Domain     string
	ID         string // optional for public, required for private
	Topic      string // optional
}

Name is a parsed, validated channel name. The zero value is invalid; build one with ParseName or BuildName.

func BuildName

func BuildName(vis Visibility, app, domain, id, topic string) (Name, error)

BuildName composes and validates a Name from its components. The same rules as ParseName apply.

func ParseName

func ParseName(s string) (Name, error)

ParseName validates s and returns the structured Name. Errors carry PARSEC_CHANNEL_* codes so callers can map them to the right HTTP/Twirp status without string matching.

func (Name) IsPrivate

func (n Name) IsPrivate() bool

IsPrivate reports whether the channel is in the private namespace.

func (Name) String

func (n Name) String() string

String renders the canonical wire form. The output round-trips through ParseName.

type Pattern

type Pattern struct {
	// contains filtered or unexported fields
}

Pattern is a parsed channel-name pattern. It carries a fixed visibility (cross-visibility patterns are rejected at parse time) and a slice of segments with optional `*` and trailing `**` wildcards.

The zero value is invalid; build one with ParsePattern.

func ParsePattern

func ParsePattern(s string) (Pattern, error)

ParsePattern validates s and returns a structured Pattern. The grammar matches channels.ParseName with two additions:

  • `*` (single segment wildcard) is allowed in any position
  • `**` (multi-segment wildcard) is allowed only as the last segment

The visibility prefix is mandatory and concrete: `*:foo.*` or any other cross-visibility form is rejected.

func (Pattern) IsValid

func (p Pattern) IsValid() bool

IsValid reports whether the pattern has been parsed and is usable. A zero Pattern returns false.

func (Pattern) Matches

func (p Pattern) Matches(n Name) bool

Matches reports whether n satisfies p. Visibility must match exactly; each segment is then walked left-to-right with the wildcard semantics described on ParsePattern. The matcher is hand-rolled (no regex) and performs no allocations in the hot path.

func (Pattern) String

func (p Pattern) String() string

String returns the raw pattern text. Round-trips through ParsePattern for any valid Pattern.

func (Pattern) Visibility

func (p Pattern) Visibility() Visibility

Visibility returns the visibility the pattern is bound to. Patterns always carry a concrete visibility (no cross-visibility wildcard).

type RedisEventBus

type RedisEventBus struct {
	// contains filtered or unexported fields
}

RedisEventBus fans channel lifecycle events across Parsec nodes via Redis pub/sub. Every locally emitted event is published with the originating NodeID; remote subscribers re-emit it locally, skipping echoes of their own publishes.

func NewRedisEventBus

func NewRedisEventBus(client redis.UniversalClient, nodeID string) *RedisEventBus

NewRedisEventBus constructs a bus. nodeID identifies this Parsec instance; pass an empty string to auto-generate one.

func (*RedisEventBus) NodeID

func (b *RedisEventBus) NodeID() string

NodeID returns this bus's identifier.

func (*RedisEventBus) Publish

func (b *RedisEventBus) Publish(ctx context.Context, ev Event) error

Publish broadcasts ev to other nodes. Local fanout is the Manager's responsibility — the bus only carries cross-node traffic.

func (*RedisEventBus) Run

func (b *RedisEventBus) Run(ctx context.Context, onRemote func(Event)) error

Run subscribes to the bus. onRemote fires for every event whose NodeID differs from this bus's NodeID. Blocks until ctx is canceled.

func (*RedisEventBus) WithKeyPrefix

func (b *RedisEventBus) WithKeyPrefix(p string) *RedisEventBus

WithKeyPrefix sets the namespace. Pub/sub channel becomes "<prefix>:events".

type RedisStore

type RedisStore struct {
	// contains filtered or unexported fields
}

RedisStore persists channel records in a Redis hash so that every Parsec node sharing the same Redis sees the same channel registry. Sweep transitions run inside a Lua script for atomicity.

Storage layout:

parsec:channels        — HASH, field = channel name, value = JSON record

The store is safe for concurrent use across processes; Redis is the single source of truth.

func NewRedisStore

func NewRedisStore(client redis.UniversalClient) *RedisStore

NewRedisStore constructs a RedisStore against the given client.

func (*RedisStore) CreatePrivate

func (s *RedisStore) CreatePrivate(name Name, ttl time.Duration, now time.Time) (*Channel, Event, error)

CreatePrivate registers a new private channel.

func (*RedisStore) Delete

func (s *RedisStore) Delete(name Name, now time.Time) (Event, error)

Delete marks the channel deleted.

func (*RedisStore) Get

func (s *RedisStore) Get(name Name) (*Channel, error)

Get returns a snapshot or ChannelNotFound.

func (*RedisStore) IsOpen

func (s *RedisStore) IsOpen(name Name) bool

IsOpen reports whether the channel exists and is StateOpen.

func (*RedisStore) List

func (s *RedisStore) List() ([]Channel, error)

List returns every non-deleted channel.

func (*RedisStore) OpenPublic

func (s *RedisStore) OpenPublic(name Name, ttl time.Duration, now time.Time) (*Channel, Event, error)

OpenPublic creates or re-opens a public channel using a Lua script for atomicity.

func (*RedisStore) SetDelta

func (s *RedisStore) SetDelta(name Name, enabled bool) error

SetDelta toggles fossil-delta encoding on the channel.

func (*RedisStore) Sweep

func (s *RedisStore) Sweep(now time.Time) ([]Event, error)

Sweep runs one expiry pass via Lua.

func (*RedisStore) Touch

func (s *RedisStore) Touch(name Name, now time.Time) error

Touch refreshes LastActive via a Lua script.

func (*RedisStore) WithKeyPrefix

func (s *RedisStore) WithKeyPrefix(p string) *RedisStore

WithKeyPrefix sets a custom Redis key namespace. Useful when multiple Parsec deployments share a Redis instance.

type State

type State string

State describes the lifecycle position of a managed channel.

const (
	StateOpen    State = "open"
	StateClosed  State = "closed" // public only — inactive past TTL, can reopen
	StateDeleted State = "deleted"
)

type Store

type Store interface {
	// Get returns a snapshot. Deleted channels are reported as not-found
	// so callers do not need to redo the state check.
	Get(name Name) (*Channel, error)

	// List returns a snapshot of all non-deleted channel records.
	List() ([]Channel, error)

	// IsOpen reports whether the channel exists and is in StateOpen.
	IsOpen(name Name) bool

	// OpenPublic creates a new public channel, or re-opens a closed one.
	// Returns the channel snapshot and the event the caller should emit
	// (zero-value Event{} means no transition happened — e.g. already-open
	// channel had its TTL refreshed but no state change).
	OpenPublic(name Name, ttl time.Duration, now time.Time) (*Channel, Event, error)

	// CreatePrivate registers a new private channel. Fails if one already
	// exists with the same name.
	CreatePrivate(name Name, ttl time.Duration, now time.Time) (*Channel, Event, error)

	// Touch refreshes LastActive on the channel.
	Touch(name Name, now time.Time) error

	// Delete marks the channel deleted. The second return value is the
	// event the caller should emit (zero-value Event{} on a redundant
	// delete of an already-deleted channel).
	Delete(name Name, now time.Time) (Event, error)

	// Sweep runs one expiry pass and returns transition events. The Store
	// performs the writes; the Manager fans the events out.
	Sweep(now time.Time) ([]Event, error)

	// SetDelta toggles fossil-delta encoding for the channel. Used by
	// the Manager after open/create to enable the per-channel delta
	// flag without changing the OpenPublic/CreatePrivate signatures.
	SetDelta(name Name, enabled bool) error
}

Store is the persistence interface for channel records. Two implementations ship: an in-memory store (single-node) and a Redis- backed store that lets multiple Parsec nodes share the same channel registry.

Implementations MUST be safe for concurrent use. All mutating methods return the event(s) that the Manager should fan out to subscribers; the Store does NOT do its own emit — that's the Manager's job.

type Visibility

type Visibility string

Visibility is the public/private discriminator that prefixes every channel name. It maps 1:1 to a Centrifuge namespace at broker boot.

const (
	VisibilityPublic  Visibility = "public"
	VisibilityPrivate Visibility = "private"
)

func (Visibility) String

func (v Visibility) String() string

String returns the canonical lowercase form used on the wire.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL