redis

package
v0.0.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 21, 2026 License: Apache-2.0 Imports: 14 Imported by: 0

Documentation

Overview

Package redis is fabriq's Redis adapter: event fan-out over Streams (publisher for the relay, tailer for the hub, consumer groups for projections), versioned-prefix caching, and ephemeral presence pub/sub.

It deliberately uses go-redis directly (not grove's kv driver): the stream paths need MAXLEN~ trimming, XAUTOCLAIM and blocking group reads that the kv abstraction does not expose — see docs/decisions/0003-redis-client.md. The import is fenced to adapters/ by depguard.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Adapter

type Adapter struct {
	// contains filtered or unexported fields
}

Adapter wraps one Redis client with fabriq's stream/cache/pubsub surfaces.

func Open

func Open(ctx context.Context, cfg Config, opts ...Option) (*Adapter, error)

Open dials Redis and pings it.

func (*Adapter) Cache

func (a *Adapter) Cache(modelVersion int) *Cache

Cache returns a cache view for the given projection model version.

func (*Adapter) Close

func (a *Adapter) Close() error

Close releases the client.

func (*Adapter) Cluster

func (a *Adapter) Cluster(ttl time.Duration) *ClusterTransport

Cluster returns the Redis-backed cluster transport. ttl is the heartbeat liveness window (default 6s); a shard must refresh well within it.

func (*Adapter) Consume

func (a *Adapter) Consume(ctx context.Context, group, consumer string, handle func(streamID string, env event.Envelope) error) error

Consume runs a consumer-group loop on the event stream: claimed-pending entries first (crash recovery), then new entries. Handler success acks; handler failure leaves the entry pending for redelivery (at-least-once). Returns when ctx ends.

func (*Adapter) EnsureGroup

func (a *Adapter) EnsureGroup(ctx context.Context, group string) error

EnsureGroup creates a projection consumer group on the event stream (idempotent), starting from the beginning so a new projection replays what the stream still holds.

func (*Adapter) GroupLag

func (a *Adapter) GroupLag(ctx context.Context, group string) (int64, error)

GroupLag reports how many event-stream entries a consumer group has not yet processed (Redis 7 XINFO GROUPS lag + pending).

func (*Adapter) Publish

func (a *Adapter) Publish(ctx context.Context, env event.Envelope, channels []string) (string, error)

Publish implements event.Publisher: one XADD to the main event stream (consumed by projection groups) plus one per derived change channel (short MAXLEN~; reconnecting clients catch up from Last-Event-ID or fall back to refetch).

func (*Adapter) PublishPresence

func (a *Adapter) PublishPresence(ctx context.Context, room string, payload []byte) error

PublishPresence sends one awareness payload to a room.

func (*Adapter) PublishToChannel

func (a *Adapter) PublishToChannel(ctx context.Context, channel string, env event.Envelope) (string, error)

PublishToChannel XADDs one envelope to a SINGLE channel stream — the document-sync fan-out: sync frames never touch the main event stream (projections must not see them).

func (*Adapter) ReadRange

func (a *Adapter) ReadRange(ctx context.Context, channel, afterID string, limit int) ([]query.Delta, error)

ReadRange implements subscribe.Tailer's catch-up read: entries strictly after afterID ("0" reads from the beginning).

func (*Adapter) SubscribePresence

func (a *Adapter) SubscribePresence(ctx context.Context, room string, fn func([]byte), ready chan<- struct{}) error

SubscribePresence delivers room payloads to fn until ctx ends. ready is closed once the subscription is active (pass nil if not needed).

func (*Adapter) Tail

func (a *Adapter) Tail(ctx context.Context, channel, fromID string, deliver func(query.Delta)) error

Tail implements subscribe.Tailer: blocking XREAD on one change channel, delivering every entry after fromID until ctx ends.

func (*Adapter) TailEvents

func (a *Adapter) TailEvents(ctx context.Context, handle func(event.Envelope) error) error

TailEvents broadcasts the main event stream to THIS caller from "now": every node calling TailEvents sees every committed envelope (fan-out, not a consumer group). Used for per-node L1 cache eviction. Blocks until ctx ends.

type Cache

type Cache struct {
	// contains filtered or unexported fields
}

Cache is a tenant-scoped, version-prefixed byte cache. Keys look like fabriq:v3:{tenant}:{entity}:{id}; bumping the model version makes every stale entry invisible at once (no mass deletes), and the tenant segment comes exclusively from the context.

func (*Cache) Delete

func (c *Cache) Delete(ctx context.Context, entity, id string) error

Delete removes a value.

func (*Cache) Get

func (c *Cache) Get(ctx context.Context, entity, id string) (val []byte, ok bool, err error)

Get reads a cached value; ok=false on miss.

func (*Cache) Set

func (c *Cache) Set(ctx context.Context, entity, id string, val []byte, ttl time.Duration) error

Set stores a value with a TTL.

type ClusterTransport

type ClusterTransport struct {
	// contains filtered or unexported fields
}

ClusterTransport backs the sharded live query tier on Redis: shard liveness via TTL heartbeat keys (cluster.Membership), gateway→shard control via per-shard request streams (cluster.ControlBus), and shard→gateway deltas via per-gateway streams (cluster.DeltaBus).

func (*ClusterTransport) Control

func (c *ClusterTransport) Control(ctx context.Context, shardID string) (events <-chan cluster.Control, stop func(), retErr error)

func (*ClusterTransport) Deltas

func (c *ClusterTransport) Deltas(ctx context.Context, gatewayID string) (events <-chan cluster.GatewayDelta, stop func(), retErr error)

func (*ClusterTransport) Heartbeat

func (c *ClusterTransport) Heartbeat(ctx context.Context, shardID string) error

func (*ClusterTransport) Leave

func (c *ClusterTransport) Leave(ctx context.Context, shardID string) error

func (*ClusterTransport) LiveShards

func (c *ClusterTransport) LiveShards(ctx context.Context) ([]string, error)

func (*ClusterTransport) SendControl

func (c *ClusterTransport) SendControl(ctx context.Context, shardID string, ctrl cluster.Control) error

func (*ClusterTransport) SendDelta

func (c *ClusterTransport) SendDelta(ctx context.Context, gatewayID string, d cluster.GatewayDelta) error

type Config

type Config struct {
	Addr     string
	DB       int
	Username string
	Password string
}

Config locates the Redis instance.

type Option

type Option func(*Adapter)

Option tunes the adapter.

func WithChannelMaxLen

func WithChannelMaxLen(n int64) Option

WithChannelMaxLen sets the approximate per-channel stream cap (default 500): the catch-up depth before clients must fall back to refetch.

func WithEventsMaxLen

func WithEventsMaxLen(n int64) Option

WithEventsMaxLen caps the main event stream (default 1,000,000 — projections that fall further behind rebuild from Postgres, which is always possible by design).

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL