Documentation
¶
Overview ¶
Package redis is fabriq's Redis adapter: event fan-out over Streams (publisher for the relay, tailer for the hub, consumer groups for projections), versioned-prefix caching, and ephemeral presence pub/sub.
It deliberately uses go-redis directly (not grove's kv driver): the stream paths need MAXLEN~ trimming, XAUTOCLAIM and blocking group reads that the kv abstraction does not expose — see docs/decisions/0003-redis-client.md. The import is fenced to adapters/ by depguard.
Index ¶
- type Adapter
- func (a *Adapter) Cache(modelVersion int) *Cache
- func (a *Adapter) Close() error
- func (a *Adapter) Cluster(ttl time.Duration) *ClusterTransport
- func (a *Adapter) Consume(ctx context.Context, group, consumer string, ...) error
- func (a *Adapter) EnsureGroup(ctx context.Context, group string) error
- func (a *Adapter) GroupLag(ctx context.Context, group string) (int64, error)
- func (a *Adapter) Publish(ctx context.Context, env event.Envelope, channels []string) (string, error)
- func (a *Adapter) PublishPresence(ctx context.Context, room string, payload []byte) error
- func (a *Adapter) PublishToChannel(ctx context.Context, channel string, env event.Envelope) (string, error)
- func (a *Adapter) ReadRange(ctx context.Context, channel, afterID string, limit int) ([]query.Delta, error)
- func (a *Adapter) SubscribePresence(ctx context.Context, room string, fn func([]byte), ready chan<- struct{}) error
- func (a *Adapter) Tail(ctx context.Context, channel, fromID string, deliver func(query.Delta)) error
- func (a *Adapter) TailEvents(ctx context.Context, handle func(event.Envelope) error) error
- type Cache
- type ClusterTransport
- func (c *ClusterTransport) Control(ctx context.Context, shardID string) (events <-chan cluster.Control, stop func(), retErr error)
- func (c *ClusterTransport) Deltas(ctx context.Context, gatewayID string) (events <-chan cluster.GatewayDelta, stop func(), retErr error)
- func (c *ClusterTransport) Heartbeat(ctx context.Context, shardID string) error
- func (c *ClusterTransport) Leave(ctx context.Context, shardID string) error
- func (c *ClusterTransport) LiveShards(ctx context.Context) ([]string, error)
- func (c *ClusterTransport) SendControl(ctx context.Context, shardID string, ctrl cluster.Control) error
- func (c *ClusterTransport) SendDelta(ctx context.Context, gatewayID string, d cluster.GatewayDelta) error
- type Config
- type Option
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Adapter ¶
type Adapter struct {
// contains filtered or unexported fields
}
Adapter wraps one Redis client with fabriq's stream/cache/pubsub surfaces.
func (*Adapter) Cluster ¶
func (a *Adapter) Cluster(ttl time.Duration) *ClusterTransport
Cluster returns the Redis-backed cluster transport. ttl is the heartbeat liveness window (default 6s); a shard must refresh well within it.
func (*Adapter) Consume ¶
func (a *Adapter) Consume(ctx context.Context, group, consumer string, handle func(streamID string, env event.Envelope) error) error
Consume runs a consumer-group loop on the event stream: claimed-pending entries first (crash recovery), then new entries. Handler success acks; handler failure leaves the entry pending for redelivery (at-least-once). Returns when ctx ends.
func (*Adapter) EnsureGroup ¶
EnsureGroup creates a projection consumer group on the event stream (idempotent), starting from the beginning so a new projection replays what the stream still holds.
func (*Adapter) GroupLag ¶
GroupLag reports how many event-stream entries a consumer group has not yet processed (Redis 7 XINFO GROUPS lag + pending).
func (*Adapter) Publish ¶
func (a *Adapter) Publish(ctx context.Context, env event.Envelope, channels []string) (string, error)
Publish implements event.Publisher: one XADD to the main event stream (consumed by projection groups) plus one per derived change channel (short MAXLEN~; reconnecting clients catch up from Last-Event-ID or fall back to refetch).
func (*Adapter) PublishPresence ¶
PublishPresence sends one awareness payload to a room.
func (*Adapter) PublishToChannel ¶
func (a *Adapter) PublishToChannel(ctx context.Context, channel string, env event.Envelope) (string, error)
PublishToChannel XADDs one envelope to a SINGLE channel stream — the document-sync fan-out: sync frames never touch the main event stream (projections must not see them).
func (*Adapter) ReadRange ¶
func (a *Adapter) ReadRange(ctx context.Context, channel, afterID string, limit int) ([]query.Delta, error)
ReadRange implements subscribe.Tailer's catch-up read: entries strictly after afterID ("0" reads from the beginning).
func (*Adapter) SubscribePresence ¶
func (a *Adapter) SubscribePresence(ctx context.Context, room string, fn func([]byte), ready chan<- struct{}) error
SubscribePresence delivers room payloads to fn until ctx ends. ready is closed once the subscription is active (pass nil if not needed).
func (*Adapter) Tail ¶
func (a *Adapter) Tail(ctx context.Context, channel, fromID string, deliver func(query.Delta)) error
Tail implements subscribe.Tailer: blocking XREAD on one change channel, delivering every entry after fromID until ctx ends.
func (*Adapter) TailEvents ¶
TailEvents broadcasts the main event stream to THIS caller from "now": every node calling TailEvents sees every committed envelope (fan-out, not a consumer group). Used for per-node L1 cache eviction. Blocks until ctx ends.
type Cache ¶
type Cache struct {
// contains filtered or unexported fields
}
Cache is a tenant-scoped, version-prefixed byte cache. Keys look like fabriq:v3:{tenant}:{entity}:{id}; bumping the model version makes every stale entry invisible at once (no mass deletes), and the tenant segment comes exclusively from the context.
type ClusterTransport ¶
type ClusterTransport struct {
// contains filtered or unexported fields
}
ClusterTransport backs the sharded live query tier on Redis: shard liveness via TTL heartbeat keys (cluster.Membership), gateway→shard control via per-shard request streams (cluster.ControlBus), and shard→gateway deltas via per-gateway streams (cluster.DeltaBus).
func (*ClusterTransport) Deltas ¶
func (c *ClusterTransport) Deltas(ctx context.Context, gatewayID string) (events <-chan cluster.GatewayDelta, stop func(), retErr error)
func (*ClusterTransport) Heartbeat ¶
func (c *ClusterTransport) Heartbeat(ctx context.Context, shardID string) error
func (*ClusterTransport) Leave ¶
func (c *ClusterTransport) Leave(ctx context.Context, shardID string) error
func (*ClusterTransport) LiveShards ¶
func (c *ClusterTransport) LiveShards(ctx context.Context) ([]string, error)
func (*ClusterTransport) SendControl ¶
func (*ClusterTransport) SendDelta ¶
func (c *ClusterTransport) SendDelta(ctx context.Context, gatewayID string, d cluster.GatewayDelta) error
type Option ¶
type Option func(*Adapter)
Option tunes the adapter.
func WithChannelMaxLen ¶
WithChannelMaxLen sets the approximate per-channel stream cap (default 500): the catch-up depth before clients must fall back to refetch.
func WithEventsMaxLen ¶
WithEventsMaxLen caps the main event stream (default 1,000,000 — projections that fall further behind rebuild from Postgres, which is always possible by design).