Documentation
¶
Overview ¶
Package channels owns the channel naming contract and the channel manager (public / private namespaces, TTL bookkeeping, expiry).
The grammar is:
<visibility>:<app>.<domain>[.<id>][.<topic>]
where visibility is "public" or "private". Anything else is rejected by ParseName so the CLI and Twirp surfaces share one validator.
Index ¶
- Constants
- type Channel
- type Event
- type EventBus
- type EventKind
- type Manager
- func (m *Manager) CreatePrivate(name Name, ttl time.Duration) (*Channel, error)
- func (m *Manager) Delete(name Name) error
- func (m *Manager) EventBus() EventBus
- func (m *Manager) Get(name Name) (*Channel, error)
- func (m *Manager) IsDelta(name Name) bool
- func (m *Manager) IsOpen(name Name) bool
- func (m *Manager) List() []Channel
- func (m *Manager) OpenPublic(name Name, ttl time.Duration) (*Channel, error)
- func (m *Manager) RunEventBus(ctx context.Context) error
- func (m *Manager) RunSweeper(ctx context.Context, every time.Duration)
- func (m *Manager) SetClock(c func() time.Time)
- func (m *Manager) SetDelta(name Name, enabled bool) error
- func (m *Manager) SetEventBus(b EventBus)
- func (m *Manager) SetOnSweep(cb func([]Channel))
- func (m *Manager) Subscribe(ch chan<- Event)
- func (m *Manager) Sweep(now time.Time) int
- func (m *Manager) Touch(name Name) error
- type MemoryStore
- func (s *MemoryStore) CreatePrivate(name Name, ttl time.Duration, now time.Time) (*Channel, Event, error)
- func (s *MemoryStore) Delete(name Name, now time.Time) (Event, error)
- func (s *MemoryStore) Get(name Name) (*Channel, error)
- func (s *MemoryStore) IsOpen(name Name) bool
- func (s *MemoryStore) List() ([]Channel, error)
- func (s *MemoryStore) OpenPublic(name Name, ttl time.Duration, now time.Time) (*Channel, Event, error)
- func (s *MemoryStore) SetDelta(name Name, enabled bool) error
- func (s *MemoryStore) Sweep(now time.Time) ([]Event, error)
- func (s *MemoryStore) Touch(name Name, now time.Time) error
- type Name
- type Pattern
- type RedisEventBus
- type RedisStore
- func (s *RedisStore) CreatePrivate(name Name, ttl time.Duration, now time.Time) (*Channel, Event, error)
- func (s *RedisStore) Delete(name Name, now time.Time) (Event, error)
- func (s *RedisStore) Get(name Name) (*Channel, error)
- func (s *RedisStore) IsOpen(name Name) bool
- func (s *RedisStore) List() ([]Channel, error)
- func (s *RedisStore) OpenPublic(name Name, ttl time.Duration, now time.Time) (*Channel, Event, error)
- func (s *RedisStore) SetDelta(name Name, enabled bool) error
- func (s *RedisStore) Sweep(now time.Time) ([]Event, error)
- func (s *RedisStore) Touch(name Name, now time.Time) error
- func (s *RedisStore) WithKeyPrefix(p string) *RedisStore
- type State
- type Store
- type Visibility
Constants ¶
const DefaultPublicTTL = 30 * time.Minute
DefaultPublicTTL is applied when the caller does not specify a TTL on channel creation for the public namespace.
const MaxPatternDepth = 8
MaxPatternDepth caps the segment count of a pattern. Patterns that try to address more than this many dot-separated segments are rejected at parse time so a pathological token like `**.**.**...` cannot become a denial-of-service vector. The cap mirrors the channel grammar's practical depth (app.domain.id.topic) plus headroom.
const MaxPrivateTTL = 1 * time.Hour
MaxPrivateTTL is the hard cap on private channel inactivity TTL. Anything larger is rejected at creation time.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Channel ¶
type Channel struct {
Name Name
State State
TTL time.Duration
CreatedAt time.Time
LastActive time.Time
// DeltaEnabled toggles centrifuge fossil-delta encoding for this
// channel. Off by default; enable for high-frequency channels with
// small mutations (scoreboards, tickers, partial state snapshots).
DeltaEnabled bool
}
Channel is the persisted view of a managed channel. Token material is never stored here — the auth package owns secrets; the manager owns channel lifecycle.
type EventBus ¶
type EventBus interface {
// Publish broadcasts ev to remote nodes. The caller will already have
// fanned it out locally; the bus must NOT re-emit on the originating
// node.
Publish(ctx context.Context, ev Event) error
// Run subscribes to the bus. onRemote fires for every remote event
// (own publishes are filtered by NodeID). Blocks until ctx is done or
// the underlying transport errors fatally.
Run(ctx context.Context, onRemote func(Event)) error
}
EventBus is the cross-node bridge for channel lifecycle events. A publish on node A is delivered to every other node's local Manager so subscribers everywhere see the same lifecycle transitions.
Implementations are responsible for tagging publications with the originating NodeID and skipping local re-delivery — the Manager fans out local events itself; the bus carries only the cross-node fan-in.
type EventKind ¶
type EventKind string
EventKind is the lifecycle transition a Channel just made.
const ( // EventOpened fires on first open or re-open of a public channel. EventOpened EventKind = "opened" // EventClosed fires when a public channel transitions to StateClosed via // Sweep (TTL exceeded). Public-close semantics: no new subscribers, but // existing subscribers are not kicked. EventClosed EventKind = "closed" // EventDeleted fires when a channel is removed — either by an explicit // Delete call or because a private channel expired and was reaped from // the manager. Subscribers MUST be kicked. EventDeleted EventKind = "deleted" )
type Manager ¶
type Manager struct {
// contains filtered or unexported fields
}
Manager owns channel lifecycle, TTL enforcement, and the inactivity tick. State persistence is delegated to a Store; event fanout to subscribers stays in process. Safe for concurrent use.
func NewManager ¶
func NewManager() *Manager
NewManager constructs a Manager backed by an in-memory store. Equivalent to NewManagerWithStore(NewMemoryStore()).
func NewManagerWithStore ¶
NewManagerWithStore constructs a Manager backed by the supplied Store.
func (*Manager) CreatePrivate ¶
CreatePrivate registers a new private channel. TTL is capped at MaxPrivateTTL.
func (*Manager) OpenPublic ¶
OpenPublic creates or re-opens a public channel. Pass 0 for DefaultPublicTTL.
func (*Manager) RunEventBus ¶
RunEventBus subscribes to the cross-node bus and re-emits remote events to local subscribers. Returns when ctx is canceled or the bus exits. No-op when no bus is configured.
func (*Manager) RunSweeper ¶
RunSweeper drives Sweep on every tick until ctx is done.
func (*Manager) SetDelta ¶
SetDelta toggles fossil-delta encoding for the channel. Off by default. Enable for high-frequency channels where small mutations dominate the publish stream.
func (*Manager) SetEventBus ¶
SetEventBus wires a cross-node event bus. When non-nil, every locally emitted event is also published to the bus, and remote events are fanned out to local subscribers. Used to make manager state coherent across multi-node deployments.
func (*Manager) SetOnSweep ¶
SetOnSweep registers a callback invoked after every Sweep tick with the current channel list. Used by the metrics layer to publish gauges without leaking channel names. Pass nil to remove.
func (*Manager) Subscribe ¶
Subscribe registers a consumer for lifecycle events. The caller owns the channel and must drain it; the manager does a non-blocking send and drops events on full buffers. Use a buffered channel (cap >= 16) for production.
type MemoryStore ¶
type MemoryStore struct {
// contains filtered or unexported fields
}
MemoryStore is the in-process Store. Single-node Parsec deployments use it by default; multi-node deployments swap it for a RedisStore.
func NewMemoryStore ¶
func NewMemoryStore() *MemoryStore
NewMemoryStore returns an empty in-memory store.
func (*MemoryStore) CreatePrivate ¶
func (s *MemoryStore) CreatePrivate(name Name, ttl time.Duration, now time.Time) (*Channel, Event, error)
CreatePrivate registers a new private channel.
func (*MemoryStore) Get ¶
func (s *MemoryStore) Get(name Name) (*Channel, error)
Get returns a snapshot or ChannelNotFound.
func (*MemoryStore) IsOpen ¶
func (s *MemoryStore) IsOpen(name Name) bool
IsOpen reports whether the channel is in StateOpen.
func (*MemoryStore) OpenPublic ¶
func (s *MemoryStore) OpenPublic(name Name, ttl time.Duration, now time.Time) (*Channel, Event, error)
OpenPublic creates or re-opens a public channel.
func (*MemoryStore) SetDelta ¶
func (s *MemoryStore) SetDelta(name Name, enabled bool) error
SetDelta toggles fossil-delta encoding on the channel.
type Name ¶
type Name struct {
Visibility Visibility
App string
Domain string
ID string // optional for public, required for private
Topic string // optional
}
Name is a parsed, validated channel name. The zero value is invalid; build one with ParseName or BuildName.
func BuildName ¶
func BuildName(vis Visibility, app, domain, id, topic string) (Name, error)
BuildName composes and validates a Name from its components. The same rules as ParseName apply.
func ParseName ¶
ParseName validates s and returns the structured Name. Errors carry PARSEC_CHANNEL_* codes so callers can map them to the right HTTP/Twirp status without string matching.
type Pattern ¶
type Pattern struct {
// contains filtered or unexported fields
}
Pattern is a parsed channel-name pattern. It carries a fixed visibility (cross-visibility patterns are rejected at parse time) and a slice of segments with optional `*` and trailing `**` wildcards.
The zero value is invalid; build one with ParsePattern.
func ParsePattern ¶
ParsePattern validates s and returns a structured Pattern. The grammar matches channels.ParseName with two additions:
- `*` (single segment wildcard) is allowed in any position
- `**` (multi-segment wildcard) is allowed only as the last segment
The visibility prefix is mandatory and concrete: `*:foo.*` or any other cross-visibility form is rejected.
func (Pattern) IsValid ¶
IsValid reports whether the pattern has been parsed and is usable. A zero Pattern returns false.
func (Pattern) Matches ¶
Matches reports whether n satisfies p. Visibility must match exactly; each segment is then walked left-to-right with the wildcard semantics described on ParsePattern. The matcher is hand-rolled (no regex) and performs no allocations in the hot path.
func (Pattern) String ¶
String returns the raw pattern text. Round-trips through ParsePattern for any valid Pattern.
func (Pattern) Visibility ¶
func (p Pattern) Visibility() Visibility
Visibility returns the visibility the pattern is bound to. Patterns always carry a concrete visibility (no cross-visibility wildcard).
type RedisEventBus ¶
type RedisEventBus struct {
// contains filtered or unexported fields
}
RedisEventBus fans channel lifecycle events across Parsec nodes via Redis pub/sub. Every locally emitted event is published with the originating NodeID; remote subscribers re-emit it locally, skipping echoes of their own publishes.
func NewRedisEventBus ¶
func NewRedisEventBus(client redis.UniversalClient, nodeID string) *RedisEventBus
NewRedisEventBus constructs a bus. nodeID identifies this Parsec instance; pass an empty string to auto-generate one.
func (*RedisEventBus) NodeID ¶
func (b *RedisEventBus) NodeID() string
NodeID returns this bus's identifier.
func (*RedisEventBus) Publish ¶
func (b *RedisEventBus) Publish(ctx context.Context, ev Event) error
Publish broadcasts ev to other nodes. Local fanout is the Manager's responsibility — the bus only carries cross-node traffic.
func (*RedisEventBus) Run ¶
func (b *RedisEventBus) Run(ctx context.Context, onRemote func(Event)) error
Run subscribes to the bus. onRemote fires for every event whose NodeID differs from this bus's NodeID. Blocks until ctx is canceled.
func (*RedisEventBus) WithKeyPrefix ¶
func (b *RedisEventBus) WithKeyPrefix(p string) *RedisEventBus
WithKeyPrefix sets the namespace. Pub/sub channel becomes "<prefix>:events".
type RedisStore ¶
type RedisStore struct {
// contains filtered or unexported fields
}
RedisStore persists channel records in a Redis hash so that every Parsec node sharing the same Redis sees the same channel registry. Sweep transitions run inside a Lua script for atomicity.
Storage layout:
parsec:channels — HASH, field = channel name, value = JSON record
The store is safe for concurrent use across processes; Redis is the single source of truth.
func NewRedisStore ¶
func NewRedisStore(client redis.UniversalClient) *RedisStore
NewRedisStore constructs a RedisStore against the given client.
func (*RedisStore) CreatePrivate ¶
func (s *RedisStore) CreatePrivate(name Name, ttl time.Duration, now time.Time) (*Channel, Event, error)
CreatePrivate registers a new private channel.
func (*RedisStore) Get ¶
func (s *RedisStore) Get(name Name) (*Channel, error)
Get returns a snapshot or ChannelNotFound.
func (*RedisStore) IsOpen ¶
func (s *RedisStore) IsOpen(name Name) bool
IsOpen reports whether the channel exists and is StateOpen.
func (*RedisStore) List ¶
func (s *RedisStore) List() ([]Channel, error)
List returns every non-deleted channel.
func (*RedisStore) OpenPublic ¶
func (s *RedisStore) OpenPublic(name Name, ttl time.Duration, now time.Time) (*Channel, Event, error)
OpenPublic creates or re-opens a public channel using a Lua script for atomicity.
func (*RedisStore) SetDelta ¶
func (s *RedisStore) SetDelta(name Name, enabled bool) error
SetDelta toggles fossil-delta encoding on the channel.
func (*RedisStore) Sweep ¶
func (s *RedisStore) Sweep(now time.Time) ([]Event, error)
Sweep runs one expiry pass via Lua.
func (*RedisStore) Touch ¶
func (s *RedisStore) Touch(name Name, now time.Time) error
Touch refreshes LastActive via a Lua script.
func (*RedisStore) WithKeyPrefix ¶
func (s *RedisStore) WithKeyPrefix(p string) *RedisStore
WithKeyPrefix sets a custom Redis key namespace. Useful when multiple Parsec deployments share a Redis instance.
type Store ¶
type Store interface {
// Get returns a snapshot. Deleted channels are reported as not-found
// so callers do not need to redo the state check.
Get(name Name) (*Channel, error)
// List returns a snapshot of all non-deleted channel records.
List() ([]Channel, error)
// IsOpen reports whether the channel exists and is in StateOpen.
IsOpen(name Name) bool
// OpenPublic creates a new public channel, or re-opens a closed one.
// Returns the channel snapshot and the event the caller should emit
// (zero-value Event{} means no transition happened — e.g. already-open
// channel had its TTL refreshed but no state change).
OpenPublic(name Name, ttl time.Duration, now time.Time) (*Channel, Event, error)
// CreatePrivate registers a new private channel. Fails if one already
// exists with the same name.
CreatePrivate(name Name, ttl time.Duration, now time.Time) (*Channel, Event, error)
// Touch refreshes LastActive on the channel.
Touch(name Name, now time.Time) error
// Delete marks the channel deleted. The second return value is the
// event the caller should emit (zero-value Event{} on a redundant
// delete of an already-deleted channel).
Delete(name Name, now time.Time) (Event, error)
// Sweep runs one expiry pass and returns transition events. The Store
// performs the writes; the Manager fans the events out.
Sweep(now time.Time) ([]Event, error)
// SetDelta toggles fossil-delta encoding for the channel. Used by
// the Manager after open/create to enable the per-channel delta
// flag without changing the OpenPublic/CreatePrivate signatures.
SetDelta(name Name, enabled bool) error
}
Store is the persistence interface for channel records. Two implementations ship: an in-memory store (single-node) and a Redis- backed store that lets multiple Parsec nodes share the same channel registry.
Implementations MUST be safe for concurrent use. All mutating methods return the event(s) that the Manager should fan out to subscribers; the Store does NOT do its own emit — that's the Manager's job.
type Visibility ¶
type Visibility string
Visibility is the public/private discriminator that prefixes every channel name. It maps 1:1 to a Centrifuge namespace at broker boot.
const ( VisibilityPublic Visibility = "public" VisibilityPrivate Visibility = "private" )
func (Visibility) String ¶
func (v Visibility) String() string
String returns the canonical lowercase form used on the wire.