Documentation
¶
Overview ¶
Package redis is celeris's native Redis driver. It speaks RESP2 and RESP3 directly on top of the celeris event loop using the same async-bridge architecture as the celeris PostgreSQL driver: one in-flight request enqueued per conn, responses demuxed from the loop's recv callback, and per-worker idle pools so handlers stay on the same CPU that dispatched them.
Usage ¶
client, err := redis.NewClient("localhost:6379",
redis.WithEngine(srv), // optional; omit for a standalone loop
redis.WithPassword("secret"),
redis.WithDB(0),
)
defer client.Close()
v, err := client.Get(ctx, "key")
WithEngine routes the driver's FDs through the same epoll/io_uring instance as the HTTP workers. When it is omitted, a package-level standalone loop is resolved and reference-counted (driver/internal/eventloop).
Connections are lazily dialed on first command. The first dial does the RESP3 handshake (HELLO 3 + AUTH + SETNAME), falls back to RESP2 + AUTH + SELECT if HELLO is rejected, or speaks RESP2 unconditionally when WithForceRESP2 is set (required for ElastiCache classic clusters and for older servers that do not implement HELLO).
Commands ¶
The typed API on Client covers:
- Strings: Get, GetBytes, Set, SetNX, Del, Exists, Incr, Decr, MGet, Expire, PExpire, ExpireAt, PExpireAt, Persist, TTL.
- Hashes: HGet, HSet, HDel, HGetAll, HExists, HKeys, HVals.
- Lists: LPush, RPush, LPop, RPop, LRange, LLen.
- Sets: SAdd, SRem, SMembers, SIsMember, SCard.
- Sorted sets: ZAdd, ZRange, ZRangeByScore, ZRem, ZScore, ZCard.
- Keys: Type, Rename, RandomKey, Scan (cursor iteration).
- Pub/Sub: Publish, Subscribe, PSubscribe (see below).
- Scripting: Eval, EvalSHA, ScriptLoad.
- Watch for optimistic locking (pins a conn for the callback).
Any command the typed surface does not expose can be sent via Client.Do, which accepts ...any args, converts them through [argify], and returns a *protocol.Value plus a server-side error as *RedisError.
Pipeline ¶
Client.Pipeline batches commands into a single network write. Each queued call returns a typed handle (*StringCmd, *IntCmd, ...); the caller then invokes Pipeline.Exec to flush and harvest the replies:
p := client.Pipeline()
a := p.Set("k", "v", 0)
b := p.Incr("counter")
if err := p.Exec(ctx); err != nil { ... }
_, _ = a.Result()
n, _ := b.Result()
All commands in one Exec ride the same connection, so replies are returned in the same order as commands were enqueued. Replies are detached from the reader's scratch buffer before the conn is released, so each Result() call is safe to retain.
Transactions ¶
Typed MULTI/EXEC transactions are built via Client.TxPipeline. Queued commands return deferred *Cmd handles that populate after Tx.Exec:
tx, _ := client.TxPipeline(ctx)
defer tx.Discard() // no-op after a successful Exec
a := tx.Incr("visits")
b := tx.Incr("uniques")
if err := tx.Exec(ctx); err != nil { ... }
va, _ := a.Result()
vb, _ := b.Result()
Exec sends MULTI + the buffered commands + EXEC in a single write. If a WATCHed key changed (EXEC returns a null array) every queued *Cmd.Result() returns ErrTxAborted. WATCH / UNWATCH are available on the Tx itself and must be called before any command is queued.
Raw MULTI/EXEC via Client.Do also works; the pool tracks the MULTI/EXEC/DISCARD state on the pinned conn so a premature release issues a DISCARD before the conn returns to the idle list.
Pub/Sub ¶
Client.Subscribe and Client.PSubscribe open a dedicated subscriber connection. Messages arrive on PubSub.Channel:
ps, _ := client.Subscribe(ctx, "events")
defer ps.Close()
for m := range ps.Channel() { handle(m) }
On transport error the driver automatically reconnects with exponential backoff and replays the tracked SUBSCRIBE/PSUBSCRIBE list. Messages that arrive while the connection is down are lost — delivery is at-most-once. The channel buffer defaults to 256; when it fills, [PubSub.deliver] drops the oldest message to make room and bumps PubSub.Drops.
RESP2 vs RESP3 ¶
RESP3 is negotiated with HELLO 3 during the handshake and brings richer reply types (Map, Set, Double, Bool, Verbatim, BigNumber, Push) that the driver surfaces via the protocol.Value type-tag enum. When the server rejects HELLO the driver transparently downgrades to RESP2 and speaks AUTH + SELECT, so almost all callers can leave the default alone. WithForceRESP2 pins the connection to RESP2 for deployments that must avoid HELLO entirely. Client.Proto reports the negotiated version.
Zero-copy semantics ¶
The RESP reader returns protocol.Value structs whose Str / Array / Map fields alias the reader's internal buffer. To keep hot-path reads allocation-free, the driver does NOT copy on every reply. Instead:
- Typed accessors on Client (Get, HGet, MGet, ...) return freshly allocated Go strings / slices / maps — the copy happens inside the decode helper, so the caller never sees aliased bytes.
- Pipeline results are detached (deep-copied) before the conn returns to the idle pool, so each *Cmd.Result() is independent.
- Client.Do returns a *protocol.Value that has already been detached from the reader buffer; callers can retain it freely.
The only place raw aliasing is observable is inside custom dispatch routines that reach into the internal protocol APIs directly — typed callers are always safe.
Errors ¶
Server-side error replies surface as *RedisError with Prefix (e.g. "WRONGTYPE", "ERR", "MOVED") and full Msg. Sentinels are wired through RedisError.Is:
- ErrNil null bulk reply (GET on missing key, LPOP on empty list).
- ErrClosed Client or PubSub after Close.
- ErrProtocol reply did not parse.
- ErrMoved cluster redirect (not followed — cluster is unsupported).
- ErrWrongType operation on a key of the wrong kind.
- ErrPoolExhausted all idle conns are stale (rare; pool blocks when full).
- ErrTxAborted WATCH / EXEC aborted the transaction.
WithEngine and standalone operation ¶
WithEngine is optional. When omitted, NewClient resolves a standalone event loop backed by the platform's best mechanism (epoll on Linux, goroutine- per-conn on Darwin). The standalone loop is reference-counted inside driver/internal/eventloop and shared across all drivers that omit WithEngine. Correctness is identical with or without WithEngine; the difference is performance: sharing the HTTP server's event loop saves one epoll/uring syscall per I/O because driver FDs land on the same worker goroutine as HTTP handlers, improving data locality. Expect ~5-20% lower latency for serial queries when WithEngine is used.
Pipeline.Release lifetime ¶
Pipeline.Release returns the Pipeline — and all of its internal slabs — to a sync.Pool for reuse. After Release, every typed Cmd handle (*StringCmd, *IntCmd, *StatusCmd, *FloatCmd, *BoolCmd) previously returned from the Pipeline is invalid. Calling Result() on an invalidated handle returns ErrClosed. Release is optional; un-Released Pipelines are GC'd normally. For tight hot paths, pooling via Release eliminates per-request slab allocs.
SCAN usage ¶
Cursor-based iteration is exposed via Client.Scan:
it := client.Scan(ctx, "user:*", 100)
for {
key, ok := it.Next(ctx)
if !ok {
break
}
// process key
}
if err := it.Err(); err != nil {
log.Fatal(err)
}
The ScanIterator handles cursor paging internally. The match pattern is passed as MATCH and the count hint as COUNT. Both are optional — pass "" and 0 to iterate all keys. The iterator is NOT safe for concurrent use.
Watch (optimistic locking) ¶
Client.Watch pins a connection, WATCHes the given keys, and invokes fn with a Tx that queues commands under MULTI/EXEC. If a WATCHed key is modified before EXEC, the transaction aborts with ErrTxAborted and the caller can retry:
err := client.Watch(ctx, func(tx *redis.Tx) error {
val, err := client.Get(ctx, "counter")
if err != nil { return err }
n, _ := strconv.Atoi(val)
tx.Set("counter", strconv.Itoa(n+1), 0)
return tx.Exec(ctx)
}, "counter")
Push callbacks (client tracking) ¶
RESP3 push frames on command connections (e.g. CLIENT TRACKING invalidation messages) can be intercepted via WithOnPush or Client.OnPush:
client, _ := redis.NewClient("localhost:6379",
redis.WithOnPush(func(channel string, data []protocol.Value) {
log.Printf("push: %s %v", channel, data)
}),
)
When no callback is registered, push frames on command connections are silently dropped. Push frames on pub/sub connections are always routed through the PubSub.Channel mechanism.
Cluster Transactions ¶
Redis Cluster requires all keys in a MULTI/EXEC transaction to reside in the same hash slot. Cross-slot transactions are impossible by design — the server returns -CROSSSLOT. The celeris driver validates slot affinity client-side and returns ErrCrossSlot before contacting the server when keys span multiple slots.
Use hash tags to colocate related keys on the same slot:
user:{123}:name → hashes only "{123}" → slot X
user:{123}:email → hashes only "{123}" → slot X
ClusterClient.TxPipeline returns a ClusterTx that queues commands and verifies all keys target the same slot:
tx := cluster.TxPipeline()
a := tx.Set("{user:123}:name", "Alice", 0)
b := tx.Set("{user:123}:email", "alice@example.com", 0)
if err := tx.Exec(ctx); err != nil { ... }
ClusterClient.Watch validates slot affinity for the watched keys and delegates to the target node's Client.Watch:
err := cluster.Watch(ctx, func(tx *redis.Tx) error {
tx.Incr("{user:123}:visits")
return tx.Exec(ctx)
}, "{user:123}:visits")
Known limitations (v1.4.0) ¶
- TLS (rediss://) is not supported in v1.4.0; the scheme is rejected in NewClient with a clear error. Deploy over VPC, loopback, or a sidecar TLS terminator. TLS support is planned for v1.4.x.
- Cluster (ClusterClient) and Sentinel (SentinelClient) are supported. MOVED/ASK redirects are handled transparently by ClusterClient; Sentinel auto-discovers the master and handles failovers via +switch-master. Cluster pipeline splits commands by slot and executes per-node sub-pipelines in parallel. SSUBSCRIBE (shard channels, Redis 7+) is not supported; use regular SUBSCRIBE which is cluster-wide.
- No wrappers for RedisJSON, RediSearch, RedisGraph, or RedisTimeSeries — use Client.Do with the raw command strings.
- Pub/Sub auto-reconnects but messages delivered while the conn is down are lost. Callers needing at-least-once should add server-side durability (Streams + consumer groups).
- database/sql is not implemented — Redis is not a SQL database.
- Read/write timeouts are advisory today; cancellation is via ctx.
Index ¶
- Variables
- func Slot(key string) uint16
- func WithWorker(ctx context.Context, workerID int) context.Context
- type BoolCmd
- type Client
- func (c *Client) Append(ctx context.Context, key string, value string) (int64, error)
- func (c *Client) Close() error
- func (c *Client) DBSize(ctx context.Context) (int64, error)
- func (c *Client) Decr(ctx context.Context, key string) (int64, error)
- func (c *Client) DecrBy(ctx context.Context, key string, val int64) (int64, error)
- func (c *Client) Del(ctx context.Context, keys ...string) (int64, error)
- func (c *Client) Do(ctx context.Context, args ...any) (*protocol.Value, error)
- func (c *Client) DoBool(ctx context.Context, args ...any) (bool, error)
- func (c *Client) DoInt(ctx context.Context, args ...any) (int64, error)
- func (c *Client) DoSlice(ctx context.Context, args ...any) ([]string, error)
- func (c *Client) DoString(ctx context.Context, args ...any) (string, error)
- func (c *Client) Eval(ctx context.Context, script string, keys []string, args ...any) (*protocol.Value, error)
- func (c *Client) EvalSHA(ctx context.Context, sha string, keys []string, args ...any) (*protocol.Value, error)
- func (c *Client) Exists(ctx context.Context, keys ...string) (int64, error)
- func (c *Client) Expire(ctx context.Context, key string, expiration time.Duration) (bool, error)
- func (c *Client) ExpireAt(ctx context.Context, key string, at time.Time) (bool, error)
- func (c *Client) FlushDB(ctx context.Context) error
- func (c *Client) Get(ctx context.Context, key string) (string, error)
- func (c *Client) GetBytes(ctx context.Context, key string) ([]byte, error)
- func (c *Client) GetDel(ctx context.Context, key string) (string, error)
- func (c *Client) HDel(ctx context.Context, key string, fields ...string) (int64, error)
- func (c *Client) HExists(ctx context.Context, key, field string) (bool, error)
- func (c *Client) HGet(ctx context.Context, key, field string) (string, error)
- func (c *Client) HGetAll(ctx context.Context, key string) (map[string]string, error)
- func (c *Client) HIncrBy(ctx context.Context, key, field string, incr int64) (int64, error)
- func (c *Client) HIncrByFloat(ctx context.Context, key, field string, incr float64) (float64, error)
- func (c *Client) HKeys(ctx context.Context, key string) ([]string, error)
- func (c *Client) HLen(ctx context.Context, key string) (int64, error)
- func (c *Client) HMGet(ctx context.Context, key string, fields ...string) ([]string, error)
- func (c *Client) HSet(ctx context.Context, key string, values ...any) (int64, error)
- func (c *Client) HSetNX(ctx context.Context, key, field string, value any) (bool, error)
- func (c *Client) HVals(ctx context.Context, key string) ([]string, error)
- func (c *Client) IdleConnWorkers() []int
- func (c *Client) Incr(ctx context.Context, key string) (int64, error)
- func (c *Client) IncrBy(ctx context.Context, key string, val int64) (int64, error)
- func (c *Client) IncrByFloat(ctx context.Context, key string, val float64) (float64, error)
- func (c *Client) LIndex(ctx context.Context, key string, index int64) (string, error)
- func (c *Client) LLen(ctx context.Context, key string) (int64, error)
- func (c *Client) LPop(ctx context.Context, key string) (string, error)
- func (c *Client) LPush(ctx context.Context, key string, values ...any) (int64, error)
- func (c *Client) LRange(ctx context.Context, key string, start, stop int64) ([]string, error)
- func (c *Client) LRem(ctx context.Context, key string, count int64, element any) (int64, error)
- func (c *Client) MGet(ctx context.Context, keys ...string) ([]string, error)
- func (c *Client) MSet(ctx context.Context, pairs ...any) error
- func (c *Client) OnPush(fn func(channel string, data []protocol.Value))
- func (c *Client) PExpire(ctx context.Context, key string, ms time.Duration) (bool, error)
- func (c *Client) PExpireAt(ctx context.Context, key string, at time.Time) (bool, error)
- func (c *Client) PSubscribe(ctx context.Context, patterns ...string) (*PubSub, error)
- func (c *Client) PTTL(ctx context.Context, key string) (time.Duration, error)
- func (c *Client) Persist(ctx context.Context, key string) (bool, error)
- func (c *Client) Ping(ctx context.Context) error
- func (c *Client) Pipeline() *Pipeline
- func (c *Client) Proto() int
- func (c *Client) Publish(ctx context.Context, channel, message string) (int64, error)
- func (c *Client) RPop(ctx context.Context, key string) (string, error)
- func (c *Client) RPush(ctx context.Context, key string, values ...any) (int64, error)
- func (c *Client) RandomKey(ctx context.Context) (string, error)
- func (c *Client) Rename(ctx context.Context, key, newKey string) error
- func (c *Client) SAdd(ctx context.Context, key string, members ...any) (int64, error)
- func (c *Client) SCard(ctx context.Context, key string) (int64, error)
- func (c *Client) SDiff(ctx context.Context, keys ...string) ([]string, error)
- func (c *Client) SInter(ctx context.Context, keys ...string) ([]string, error)
- func (c *Client) SIsMember(ctx context.Context, key string, member any) (bool, error)
- func (c *Client) SMembers(ctx context.Context, key string) ([]string, error)
- func (c *Client) SRem(ctx context.Context, key string, members ...any) (int64, error)
- func (c *Client) SUnion(ctx context.Context, keys ...string) ([]string, error)
- func (c *Client) Scan(_ context.Context, match string, count int64) *ScanIterator
- func (c *Client) ScriptLoad(ctx context.Context, script string) (string, error)
- func (c *Client) Set(ctx context.Context, key string, value any, expiration time.Duration) error
- func (c *Client) SetEX(ctx context.Context, key string, value any, ttl time.Duration) error
- func (c *Client) SetNX(ctx context.Context, key string, value any, expiration time.Duration) (bool, error)
- func (c *Client) Stats() async.PoolStats
- func (c *Client) Subscribe(ctx context.Context, channels ...string) (*PubSub, error)
- func (c *Client) TTL(ctx context.Context, key string) (time.Duration, error)
- func (c *Client) TxPipeline(ctx context.Context) (*Tx, error)
- func (c *Client) Type(ctx context.Context, key string) (string, error)
- func (c *Client) Watch(ctx context.Context, fn func(tx *Tx) error, keys ...string) error
- func (c *Client) ZAdd(ctx context.Context, key string, members ...Z) (int64, error)
- func (c *Client) ZCard(ctx context.Context, key string) (int64, error)
- func (c *Client) ZCount(ctx context.Context, key, minScore, maxScore string) (int64, error)
- func (c *Client) ZIncrBy(ctx context.Context, key string, incr float64, member string) (float64, error)
- func (c *Client) ZRange(ctx context.Context, key string, start, stop int64) ([]string, error)
- func (c *Client) ZRangeByScore(ctx context.Context, key string, opt *ZRangeBy) ([]string, error)
- func (c *Client) ZRank(ctx context.Context, key, member string) (int64, error)
- func (c *Client) ZRem(ctx context.Context, key string, members ...any) (int64, error)
- func (c *Client) ZRevRange(ctx context.Context, key string, start, stop int64) ([]string, error)
- func (c *Client) ZScore(ctx context.Context, key, member string) (float64, error)
- type ClusterClient
- func (c *ClusterClient) Close() error
- func (c *ClusterClient) Decr(ctx context.Context, key string) (int64, error)
- func (c *ClusterClient) Del(ctx context.Context, keys ...string) (int64, error)
- func (c *ClusterClient) Do(ctx context.Context, args ...any) (*protocol.Value, error)
- func (c *ClusterClient) Exists(ctx context.Context, keys ...string) (int64, error)
- func (c *ClusterClient) Expire(ctx context.Context, key string, expiration time.Duration) (bool, error)
- func (c *ClusterClient) ForEachNode(_ context.Context, fn func(*Client) error) error
- func (c *ClusterClient) Get(ctx context.Context, key string) (string, error)
- func (c *ClusterClient) GetBytes(ctx context.Context, key string) ([]byte, error)
- func (c *ClusterClient) HDel(ctx context.Context, key string, fields ...string) (int64, error)
- func (c *ClusterClient) HGet(ctx context.Context, key, field string) (string, error)
- func (c *ClusterClient) HGetAll(ctx context.Context, key string) (map[string]string, error)
- func (c *ClusterClient) HSet(ctx context.Context, key string, values ...any) (int64, error)
- func (c *ClusterClient) Incr(ctx context.Context, key string) (int64, error)
- func (c *ClusterClient) LPop(ctx context.Context, key string) (string, error)
- func (c *ClusterClient) LPush(ctx context.Context, key string, values ...any) (int64, error)
- func (c *ClusterClient) LRange(ctx context.Context, key string, start, stop int64) ([]string, error)
- func (c *ClusterClient) Ping(ctx context.Context) error
- func (c *ClusterClient) Pipeline() *ClusterPipeline
- func (c *ClusterClient) Publish(ctx context.Context, channel, message string) (int64, error)
- func (c *ClusterClient) RPop(ctx context.Context, key string) (string, error)
- func (c *ClusterClient) RPush(ctx context.Context, key string, values ...any) (int64, error)
- func (c *ClusterClient) SAdd(ctx context.Context, key string, members ...any) (int64, error)
- func (c *ClusterClient) SMembers(ctx context.Context, key string) ([]string, error)
- func (c *ClusterClient) SPublish(ctx context.Context, channel, message string) (int64, error)
- func (c *ClusterClient) SSubscribe(ctx context.Context, channels ...string) (*PubSub, error)
- func (c *ClusterClient) Set(ctx context.Context, key string, value any, ttl time.Duration) error
- func (c *ClusterClient) SetNX(ctx context.Context, key string, value any, ttl time.Duration) (bool, error)
- func (c *ClusterClient) Subscribe(ctx context.Context, channels ...string) (*PubSub, error)
- func (c *ClusterClient) TTL(ctx context.Context, key string) (time.Duration, error)
- func (c *ClusterClient) TxPipeline() *ClusterTx
- func (c *ClusterClient) Watch(ctx context.Context, fn func(tx *Tx) error, keys ...string) error
- func (c *ClusterClient) ZAdd(ctx context.Context, key string, members ...Z) (int64, error)
- func (c *ClusterClient) ZRange(ctx context.Context, key string, start, stop int64) ([]string, error)
- type ClusterConfig
- type ClusterPipeline
- type ClusterTx
- func (t *ClusterTx) Decr(key string) *IntCmd
- func (t *ClusterTx) Del(keys ...string) *IntCmd
- func (t *ClusterTx) Discard()
- func (t *ClusterTx) Exec(ctx context.Context) error
- func (t *ClusterTx) Exists(keys ...string) *IntCmd
- func (t *ClusterTx) Expire(key string, d time.Duration) *BoolCmd
- func (t *ClusterTx) Get(key string) *StringCmd
- func (t *ClusterTx) HGet(key, field string) *StringCmd
- func (t *ClusterTx) HSet(key string, values ...any) *IntCmd
- func (t *ClusterTx) Incr(key string) *IntCmd
- func (t *ClusterTx) LPush(key string, values ...any) *IntCmd
- func (t *ClusterTx) RPush(key string, values ...any) *IntCmd
- func (t *ClusterTx) SAdd(key string, members ...any) *IntCmd
- func (t *ClusterTx) Set(key string, value any, expiration time.Duration) *StatusCmd
- func (t *ClusterTx) ZAdd(key string, members ...Z) *IntCmd
- type Config
- type FloatCmd
- type IntCmd
- type Message
- type Option
- func WithDB(db int) Option
- func WithDialTimeout(d time.Duration) Option
- func WithEngine(sp eventloop.ServerProvider) Option
- func WithForceRESP2() Option
- func WithHealthCheckInterval(d time.Duration) Option
- func WithMaxIdlePerWorker(n int) Option
- func WithMaxIdleTime(d time.Duration) Option
- func WithMaxLifetime(d time.Duration) Option
- func WithOnPush(fn func(channel string, data []protocol.Value)) Option
- func WithPassword(s string) Option
- func WithPoolSize(n int) Option
- func WithProto(p int) Option
- func WithReadTimeout(d time.Duration) Option
- func WithUsername(s string) Option
- func WithWriteTimeout(d time.Duration) Option
- type Pipeline
- func (p *Pipeline) Decr(key string) *IntCmd
- func (p *Pipeline) Del(keys ...string) *IntCmd
- func (p *Pipeline) Discard()
- func (p *Pipeline) Exec(ctx context.Context) error
- func (p *Pipeline) Exists(keys ...string) *IntCmd
- func (p *Pipeline) Expire(key string, d time.Duration) *BoolCmd
- func (p *Pipeline) Get(key string) *StringCmd
- func (p *Pipeline) HGet(key, field string) *StringCmd
- func (p *Pipeline) HSet(key string, values ...any) *IntCmd
- func (p *Pipeline) Incr(key string) *IntCmd
- func (p *Pipeline) LPush(key string, values ...any) *IntCmd
- func (p *Pipeline) RPush(key string, values ...any) *IntCmd
- func (p *Pipeline) Release()
- func (p *Pipeline) SAdd(key string, members ...any) *IntCmd
- func (p *Pipeline) Set(key string, value any, expiration time.Duration) *StatusCmd
- func (p *Pipeline) ZAdd(key string, members ...Z) *IntCmd
- type PubSub
- func (ps *PubSub) Channel() <-chan *Message
- func (ps *PubSub) Close() error
- func (ps *PubSub) Drops() uint64
- func (ps *PubSub) PSubscribe(_ context.Context, patterns ...string) error
- func (ps *PubSub) PUnsubscribe(_ context.Context, patterns ...string) error
- func (ps *PubSub) SSubscribe(_ context.Context, channels ...string) error
- func (ps *PubSub) SUnsubscribe(_ context.Context, channels ...string) error
- func (ps *PubSub) Subscribe(_ context.Context, channels ...string) error
- func (ps *PubSub) Unsubscribe(_ context.Context, channels ...string) error
- type RedisError
- type ScanIterator
- type SentinelClient
- func (s *SentinelClient) Close() error
- func (s *SentinelClient) Decr(ctx context.Context, key string) (int64, error)
- func (s *SentinelClient) Del(ctx context.Context, keys ...string) (int64, error)
- func (s *SentinelClient) Do(ctx context.Context, args ...any) (*protocol.Value, error)
- func (s *SentinelClient) Exists(ctx context.Context, keys ...string) (int64, error)
- func (s *SentinelClient) Expire(ctx context.Context, key string, expiration time.Duration) (bool, error)
- func (s *SentinelClient) Get(ctx context.Context, key string) (string, error)
- func (s *SentinelClient) GetBytes(ctx context.Context, key string) ([]byte, error)
- func (s *SentinelClient) HDel(ctx context.Context, key string, fields ...string) (int64, error)
- func (s *SentinelClient) HGet(ctx context.Context, key, field string) (string, error)
- func (s *SentinelClient) HGetAll(ctx context.Context, key string) (map[string]string, error)
- func (s *SentinelClient) HSet(ctx context.Context, key string, values ...any) (int64, error)
- func (s *SentinelClient) Incr(ctx context.Context, key string) (int64, error)
- func (s *SentinelClient) LPop(ctx context.Context, key string) (string, error)
- func (s *SentinelClient) LPush(ctx context.Context, key string, values ...any) (int64, error)
- func (s *SentinelClient) LRange(ctx context.Context, key string, start, stop int64) ([]string, error)
- func (s *SentinelClient) MGet(ctx context.Context, keys ...string) ([]string, error)
- func (s *SentinelClient) MSet(ctx context.Context, pairs ...any) error
- func (s *SentinelClient) Ping(ctx context.Context) error
- func (s *SentinelClient) Pipeline() *Pipeline
- func (s *SentinelClient) Publish(ctx context.Context, channel, message string) (int64, error)
- func (s *SentinelClient) RPop(ctx context.Context, key string) (string, error)
- func (s *SentinelClient) RPush(ctx context.Context, key string, values ...any) (int64, error)
- func (s *SentinelClient) SAdd(ctx context.Context, key string, members ...any) (int64, error)
- func (s *SentinelClient) SMembers(ctx context.Context, key string) ([]string, error)
- func (s *SentinelClient) Set(ctx context.Context, key string, value any, expiration time.Duration) error
- func (s *SentinelClient) SetNX(ctx context.Context, key string, value any, expiration time.Duration) (bool, error)
- func (s *SentinelClient) Stats() async.PoolStats
- func (s *SentinelClient) Subscribe(ctx context.Context, channels ...string) (*PubSub, error)
- func (s *SentinelClient) TTL(ctx context.Context, key string) (time.Duration, error)
- func (s *SentinelClient) ZAdd(ctx context.Context, key string, members ...Z) (int64, error)
- func (s *SentinelClient) ZRange(ctx context.Context, key string, start, stop int64) ([]string, error)
- type SentinelConfig
- type StatusCmd
- type StringCmd
- type Tx
- func (tx *Tx) Decr(key string) *IntCmd
- func (tx *Tx) Del(keys ...string) *IntCmd
- func (tx *Tx) Discard() error
- func (tx *Tx) Exec(ctx context.Context) error
- func (tx *Tx) Exists(keys ...string) *IntCmd
- func (tx *Tx) Expire(key string, d time.Duration) *BoolCmd
- func (tx *Tx) Get(key string) *StringCmd
- func (tx *Tx) HGet(key, field string) *StringCmd
- func (tx *Tx) HSet(key string, values ...any) *IntCmd
- func (tx *Tx) Incr(key string) *IntCmd
- func (tx *Tx) LPush(key string, values ...any) *IntCmd
- func (tx *Tx) RPush(key string, values ...any) *IntCmd
- func (tx *Tx) SAdd(key string, members ...any) *IntCmd
- func (tx *Tx) Set(key string, value any, expiration time.Duration) *StatusCmd
- func (tx *Tx) Unwatch(ctx context.Context) error
- func (tx *Tx) Watch(ctx context.Context, keys ...string) error
- func (tx *Tx) ZAdd(key string, members ...Z) *IntCmd
- type Z
- type ZRangeBy
Examples ¶
Constants ¶
This section is empty.
Variables ¶
var ErrAsk = errors.New("celeris-redis: ASK redirect")
ErrAsk is returned when the server responds with an -ASK redirect during a cluster slot migration.
var ErrClosed = errors.New("celeris-redis: client closed")
ErrClosed is returned when a command is issued against a closed Client or PubSub.
var ErrClusterMaxRedirects = errors.New("celeris-redis: cluster max redirects exceeded")
ErrClusterMaxRedirects is returned when the maximum number of MOVED/ASK redirects has been exhausted.
var ErrCrossSlot = errors.New("celeris-redis: CROSSSLOT keys in request don't hash to the same slot; use hash tags {}")
ErrCrossSlot is returned by ClusterTx and ClusterClient.Watch when the queued commands (or watched keys) span multiple hash slots. Redis Cluster requires all keys in a MULTI/EXEC transaction to reside in the same slot. Use hash tags (e.g. "{tag}") to colocate related keys.
var ErrMoved = errors.New("celeris-redis: MOVED redirect")
ErrMoved is returned when the server responds with a -MOVED redirect. When using ClusterClient, MOVED redirects are handled transparently; this sentinel is still useful for errors.Is checks on single-node Client.
var ErrNil = errors.New("celeris-redis: nil")
ErrNil is returned by commands whose reply is a null bulk — e.g. GET on a missing key, LPOP on an empty list, or HGET on a missing field.
var ErrPoolExhausted = errors.New("celeris-redis: pool exhausted")
ErrPoolExhausted is returned when no connection is available and none can be dialed within MaxOpen.
var ErrProtocol = errors.New("celeris-redis: protocol error")
ErrProtocol is returned when the server reply cannot be decoded.
var ErrSentinelUnhealthy = errors.New("celeris-redis: sentinel client unhealthy, failover dial failed")
ErrSentinelUnhealthy is returned by commands when the sentinel client failed to connect to the new master after a failover and is in an unhealthy state.
var ErrTxAborted = errors.New("celeris-redis: transaction aborted")
ErrTxAborted is returned from each typed *Cmd.Result() after a MULTI/EXEC transaction aborts — e.g. because a WATCHed key was modified between the WATCH call and EXEC, or the server rejected the transaction with -EXECABORT.
var ErrWrongType = errors.New("celeris-redis: WRONGTYPE operation against a key holding the wrong kind of value")
ErrWrongType matches the server's WRONGTYPE error reply for operations against a key of the wrong data type.
Functions ¶
Types ¶
type BoolCmd ¶
type BoolCmd struct {
// contains filtered or unexported fields
}
BoolCmd is a deferred bool reply.
type Client ¶
type Client struct {
// contains filtered or unexported fields
}
Client is the high-level handle users interact with. It owns a pool of connections and hands out per-command round trips.
func NewClient ¶
NewClient dials a pool at addr using the given options. Connections are established lazily on first command.
Example ¶
ExampleNewClient shows the basic Get/Set loop. The client is lazy: the first command triggers the TCP dial and the RESP3 (or RESP2) handshake.
package main
import (
"context"
"fmt"
"log"
"time"
"github.com/goceleris/celeris/driver/redis"
)
func main() {
client, err := redis.NewClient("localhost:6379",
redis.WithPassword("secret"),
redis.WithDB(0),
)
if err != nil {
log.Fatal(err)
}
defer func() { _ = client.Close() }()
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
if err := client.Set(ctx, "greeting", "hello, world", time.Minute); err != nil {
log.Fatal(err)
}
v, err := client.Get(ctx, "greeting")
if err != nil {
log.Fatal(err)
}
fmt.Println(v)
}
Output:
func (*Client) Do ¶
Do is an escape hatch for commands the typed API does not cover (OBJECT ENCODING, CLUSTER INFO, SCRIPT LOAD, XADD, FUNCTION, ...). Args are converted to strings via [argify]; the returned protocol.Value is detached from the Reader buffer and safe to retain. Server error replies surface as *RedisError via the returned error.
Example ¶
ExampleClient_Do is the escape hatch for commands the typed API does not cover. Args are stringified through the driver's internal argify helper; the reply is a detached protocol.Value safe to retain.
package main
import (
"context"
"fmt"
"log"
"github.com/goceleris/celeris/driver/redis"
)
func main() {
client, err := redis.NewClient("localhost:6379")
if err != nil {
log.Fatal(err)
}
defer func() { _ = client.Close() }()
ctx := context.Background()
// XADD stream * field value — returns the new entry ID as a bulk string.
id, err := client.DoString(ctx, "XADD", "events", "*", "field", "value")
if err != nil {
log.Fatal(err)
}
fmt.Println(id)
}
Output:
func (*Client) DoBool ¶
DoBool is a convenience wrapper around Client.Do that decodes the reply as a bool.
func (*Client) DoInt ¶
DoInt is a convenience wrapper around Client.Do that decodes the reply as an int64.
func (*Client) DoSlice ¶
DoSlice is a convenience wrapper around Client.Do that decodes the reply as a string slice.
func (*Client) DoString ¶
DoString is a convenience wrapper around Client.Do that decodes the reply as a string.
func (*Client) Eval ¶
func (c *Client) Eval(ctx context.Context, script string, keys []string, args ...any) (*protocol.Value, error)
Eval runs a Lua script server-side. keys and args may be empty; numkeys is inferred from len(keys). The returned *protocol.Value is detached and safe to retain.
func (*Client) EvalSHA ¶
func (c *Client) EvalSHA(ctx context.Context, sha string, keys []string, args ...any) (*protocol.Value, error)
EvalSHA runs a previously-loaded script by sha1. Returns NOSCRIPT as *RedisError when the server does not have the script cached.
func (*Client) Expire ¶
Expire sets a TTL on a key. If expiration is zero or negative, Expire calls PERSIST instead (removes the TTL).
func (*Client) GetBytes ¶
GetBytes is the []byte variant of Get. The returned slice is a fresh copy.
func (*Client) HIncrByFloat ¶
func (c *Client) HIncrByFloat(ctx context.Context, key, field string, incr float64) (float64, error)
HIncrByFloat increments a hash field by a float delta.
func (*Client) HMGet ¶
HMGet returns the values of the specified fields in the hash at key. Missing fields yield empty strings.
func (*Client) HSet ¶
HSet sets one or more field/value pairs. values is a flat list [f1, v1, f2, v2, ...].
func (*Client) IdleConnWorkers ¶
IdleConnWorkers returns the Worker() IDs of every currently-idle command connection. Intended for tests and introspection asserting that per-CPU affinity is actually honored by the dial path.
func (*Client) IncrByFloat ¶
IncrByFloat increments key by val and returns the new value.
func (*Client) MSet ¶
MSet sets multiple key-value pairs atomically. pairs is a flat list [k1, v1, k2, v2, ...].
func (*Client) OnPush ¶
OnPush registers (or replaces) the push callback for RESP3 push frames arriving on command connections. It is safe to call concurrently with in-flight commands; delivery is best-effort. Pass nil to clear.
func (*Client) PExpire ¶
PExpire sets a TTL in milliseconds. If ms is zero or negative, PExpire calls PERSIST instead (removes the TTL).
func (*Client) PExpireAt ¶
PExpireAt sets an absolute expiration timestamp (millisecond granularity).
func (*Client) PSubscribe ¶
PSubscribe opens a pub/sub conn and subscribes to patterns.
func (*Client) PTTL ¶
PTTL returns the remaining TTL in milliseconds; -1 if no TTL, -2 if key is missing.
func (*Client) Pipeline ¶
Pipeline returns a Pipeline bound to c. Call Pipeline.Release after reading results to return it to the pool for reuse — holding the Pipeline past Release invalidates every typed Cmd handle it produced.
Example ¶
ExampleClient_Pipeline batches several commands into one network write. Results are harvested from typed *Cmd handles after Exec returns.
package main
import (
"context"
"fmt"
"log"
"time"
"github.com/goceleris/celeris/driver/redis"
)
func main() {
client, err := redis.NewClient("localhost:6379")
if err != nil {
log.Fatal(err)
}
defer func() { _ = client.Close() }()
ctx := context.Background()
p := client.Pipeline()
setRes := p.Set("counter", 0, 0)
incrRes := p.Incr("counter")
ttlRes := p.Expire("counter", time.Hour)
if err := p.Exec(ctx); err != nil {
log.Fatal(err)
}
_, _ = setRes.Result()
n, _ := incrRes.Result()
ok, _ := ttlRes.Result()
fmt.Println(n, ok)
}
Output:
func (*Client) Proto ¶
Proto returns the negotiated RESP protocol version on the last dialed connection. Returns 0 if no conn has been dialed yet (call Ping first).
func (*Client) Publish ¶
Publish sends message to channel via PUBLISH. Returns the number of subscribers that received the message.
func (*Client) Scan ¶
Scan returns a new iterator. match is the pattern passed as MATCH (empty means no MATCH arg). count is the COUNT hint (0 means no COUNT arg). The ctx argument is passed through on each Next call.
func (*Client) ScriptLoad ¶
ScriptLoad caches a script on the server and returns its SHA1 digest.
func (*Client) Set ¶
Set stores value at key. If expiration > 0, an EX argument is appended with whole-second granularity (or PX for sub-second).
func (*Client) SetEX ¶
SetEX sets key to value with the given TTL (second granularity). This is the atomic equivalent of SET + EXPIRE.
func (*Client) SetNX ¶
func (c *Client) SetNX(ctx context.Context, key string, value any, expiration time.Duration) (bool, error)
SetNX sets key only if it does not exist.
func (*Client) Subscribe ¶
Subscribe opens a pub/sub conn and subscribes to channels.
Example ¶
ExampleClient_Subscribe opens a dedicated pub/sub connection and consumes messages from PubSub.Channel. A goroutine publishes one message via the cmd-pool client to drive the subscriber.
package main
import (
"context"
"fmt"
"log"
"time"
"github.com/goceleris/celeris/driver/redis"
)
func main() {
client, err := redis.NewClient("localhost:6379")
if err != nil {
log.Fatal(err)
}
defer func() { _ = client.Close() }()
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
ps, err := client.Subscribe(ctx, "events")
if err != nil {
log.Fatal(err)
}
defer func() { _ = ps.Close() }()
go func() {
time.Sleep(50 * time.Millisecond)
_, _ = client.Publish(ctx, "events", "hello")
}()
select {
case m := <-ps.Channel():
fmt.Println(m.Channel, m.Payload)
case <-ctx.Done():
log.Fatal(ctx.Err())
}
}
Output:
func (*Client) TxPipeline ¶
TxPipeline opens a MULTI/EXEC transaction. The returned Tx pins a connection from the cmd pool until Exec or Discard. Typed command methods (Get, Set, Incr, ...) queue commands onto the Tx; Exec flushes them all under a single MULTI/EXEC.
Example ¶
ExampleClient_TxPipeline queues a pair of INCRs under a single MULTI/EXEC transaction. Per-command results are deferred until Exec returns.
package main
import (
"context"
"fmt"
"log"
"github.com/goceleris/celeris/driver/redis"
)
func main() {
client, err := redis.NewClient("localhost:6379")
if err != nil {
log.Fatal(err)
}
defer func() { _ = client.Close() }()
ctx := context.Background()
tx, err := client.TxPipeline(ctx)
if err != nil {
log.Fatal(err)
}
defer func() { _ = tx.Discard() }() // no-op after a successful Exec.
visits := tx.Incr("visits")
uniques := tx.Incr("uniques")
if err := tx.Exec(ctx); err != nil {
log.Fatal(err)
}
v, _ := visits.Result()
u, _ := uniques.Result()
fmt.Println(v, u)
}
Output:
func (*Client) Watch ¶
Watch acquires a dedicated connection, issues WATCH on keys, and invokes fn with a Tx pinned to that connection. The connection is released when fn returns (after Exec or Discard). This is the only correct way to use WATCH on a pooled client because the WATCH state is per-connection.
Usage:
err := client.Watch(ctx, func(tx *Tx) error {
// Read current value (on the same pinned conn via the Tx's client).
// Queue commands:
tx.Set("k", "new", 0)
return tx.Exec(ctx)
}, "k")
If the WATCHed keys change before EXEC, Exec returns ErrTxAborted.
func (*Client) ZCount ¶
ZCount returns the count of members with scores between minScore and maxScore (inclusive string boundaries, e.g. "-inf", "+inf", "1", "(5").
func (*Client) ZIncrBy ¶
func (c *Client) ZIncrBy(ctx context.Context, key string, incr float64, member string) (float64, error)
ZIncrBy increments the score of member in the sorted set at key by incr.
func (*Client) ZRangeByScore ¶
ZRangeByScore returns members with scores in [min,max] with optional LIMIT.
type ClusterClient ¶
type ClusterClient struct {
// contains filtered or unexported fields
}
ClusterClient is a Redis Cluster client. It maintains a slot-to-node mapping, routes commands by key hash slot, and handles MOVED/ASK redirects transparently. Each cluster node gets its own Client with its own pool.
func NewClusterClient ¶
func NewClusterClient(cfg ClusterConfig) (*ClusterClient, error)
NewClusterClient creates a Redis Cluster client. It connects to the seed nodes, fetches the cluster topology via CLUSTER SLOTS, and populates the slot map. A background goroutine refreshes the topology periodically.
func (*ClusterClient) Close ¶
func (c *ClusterClient) Close() error
Close tears down all node clients and stops the background refresh.
func (*ClusterClient) ForEachNode ¶
ForEachNode calls fn on every node's Client. Used for cluster-wide operations like FLUSHDB or DBSIZE. The ctx argument is kept for API symmetry; cancellation is not honored.
func (*ClusterClient) Pipeline ¶
func (c *ClusterClient) Pipeline() *ClusterPipeline
Pipeline returns a ClusterPipeline that groups commands by slot.
func (*ClusterClient) SPublish ¶
SPublish publishes a message to a shard channel (Redis 7+ SPUBLISH). The command is routed to the node owning the channel's slot.
func (*ClusterClient) SSubscribe ¶
SSubscribe opens a pub/sub connection and subscribes to shard channels (Redis 7+ SSUBSCRIBE). Shard channels are slot-scoped: the connection is established to the node owning the slot of the first channel. All channels SHOULD hash to the same slot (use hash tags); mixing slots in one PubSub is undefined behavior in cluster mode.
func (*ClusterClient) TxPipeline ¶
func (c *ClusterClient) TxPipeline() *ClusterTx
TxPipeline returns a ClusterTx that validates slot affinity for every queued command. If commands target different slots, ClusterTx.Exec returns ErrCrossSlot.
func (*ClusterClient) Watch ¶
Watch validates that all keys hash to the same slot, routes to the correct cluster node, and delegates to the node's Client.Watch. The fn callback receives a single-node Tx that can queue commands normally.
type ClusterConfig ¶
type ClusterConfig struct {
// Addrs is the list of seed node addresses in "host:port" form. At least
// one is required; the client discovers the rest via CLUSTER SLOTS.
Addrs []string
// Password is the AUTH credential for cluster nodes.
Password string
// Username is the ACL user (Redis 6+).
Username string
PoolSize int
MaxIdlePerWorker int
DialTimeout time.Duration
ReadTimeout time.Duration
WriteTimeout time.Duration
// MaxRedirects is the maximum number of MOVED/ASK redirects before an
// error is returned. Default: 3.
MaxRedirects int
// RouteByLatency, when true, sends reads to the lowest-latency node.
// Not implemented in v1.4.0; reserved for future use.
RouteByLatency bool
// ReadOnly, when true, allows reads from replica nodes.
// Not implemented in v1.4.0; reserved for future use.
ReadOnly bool
Engine eventloop.ServerProvider
}
ClusterConfig configures a Redis Cluster client.
type ClusterPipeline ¶
type ClusterPipeline struct {
// contains filtered or unexported fields
}
ClusterPipeline groups commands by slot and executes them on the correct nodes in parallel. It uses per-node Pipelines internally.
func (*ClusterPipeline) Discard ¶
func (cp *ClusterPipeline) Discard()
Discard drops all queued commands.
func (*ClusterPipeline) Exec ¶
Exec sends all queued commands, grouped by slot, executing per-node sub-pipelines in parallel. Returns per-command results and errors indexed by the original enqueue order. MOVED/ASK redirects within pipeline responses are retried once: MOVED triggers a topology refresh and re-route; ASK sends ASKING on the target node before re-issuing the command.
type ClusterTx ¶
type ClusterTx struct {
// contains filtered or unexported fields
}
ClusterTx buffers commands for a MULTI/EXEC transaction on a ClusterClient. All keys must hash to the same slot; if a command targets a different slot, the ClusterTx records ErrCrossSlot and [Exec] returns it without contacting the server.
Obtain a ClusterTx via ClusterClient.TxPipeline. After queuing commands, call ClusterTx.Exec to execute atomically on the owning node. Call ClusterTx.Discard to drop all queued commands without executing.
func (*ClusterTx) Exec ¶
Exec sends all queued commands inside a MULTI/EXEC transaction to the cluster node that owns the pinned slot. Returns ErrCrossSlot if any command targeted a different slot than the first.
type Config ¶
type Config struct {
// Addr is the TCP endpoint in "host:port" form.
Addr string
// Password is the AUTH credential; empty for no auth.
Password string
// Username is the ACL user (Redis 6+); empty for legacy auth.
Username string
// DB is the numeric database index applied via SELECT post-HELLO.
DB int
// DialTimeout is the TCP dial timeout; default 5s.
DialTimeout time.Duration
// ReadTimeout is unused currently — responses are driven by the event
// loop and cancellation is via the request context.
ReadTimeout time.Duration
// WriteTimeout is unused currently — writes are non-blocking via the
// event loop's per-FD queue.
WriteTimeout time.Duration
// PoolSize is the total connection cap across all workers; defaults to
// NumWorkers*4.
PoolSize int
// MaxIdlePerWorker bounds each worker's idle list; default 2.
MaxIdlePerWorker int
// MaxLifetime is the max age of a pooled conn; default 30m.
MaxLifetime time.Duration
// MaxIdleTime is the max idle duration before eviction; default 5m.
MaxIdleTime time.Duration
// HealthCheckInterval is the interval at which the pool's background
// health checker pings idle connections. Default 30s; 0 disables.
HealthCheckInterval time.Duration
// Proto selects the negotiation target (2 or 3); default 3 with
// RESP2 fallback.
Proto int
// ForceRESP2 disables the HELLO handshake and speaks RESP2 plus AUTH+
// SELECT. Useful for ElastiCache compatibility.
ForceRESP2 bool
// Engine hooks the driver into a running celeris.Server's event loop.
// If nil, a standalone loop is resolved on NewClient.
Engine eventloop.ServerProvider
// OnPush is invoked when a RESP3 push frame arrives on a command
// connection. The channel is the first element of the push array
// (e.g. "invalidate" for client-tracking invalidations) and data
// carries the remaining elements. If nil, push frames on command
// connections are silently dropped.
OnPush func(channel string, data []protocol.Value)
}
Config controls a Client. Use Option functions to set fields; the zero value is not directly usable.
type FloatCmd ¶
type FloatCmd struct {
// contains filtered or unexported fields
}
FloatCmd is a deferred float reply.
type IntCmd ¶
type IntCmd struct {
// contains filtered or unexported fields
}
IntCmd is a deferred int64 reply.
type Message ¶
Message is one pub/sub notification. Pattern is set only for PSUBSCRIBE deliveries ("pmessage" push type). Shard is true for shard channel deliveries ("smessage" push type, Redis 7+).
type Option ¶
type Option func(*Config)
Option mutates a Config during NewClient.
func WithDialTimeout ¶
WithDialTimeout sets the TCP dial timeout.
func WithEngine ¶
func WithEngine(sp eventloop.ServerProvider) Option
WithEngine hooks the driver into a celeris.Server's event loop.
func WithForceRESP2 ¶
func WithForceRESP2() Option
WithForceRESP2 forces the RESP2 dialect; skips HELLO.
func WithHealthCheckInterval ¶
WithHealthCheckInterval sets the background health-check sweep interval. Default is 30s; pass 0 to disable.
func WithMaxIdlePerWorker ¶
WithMaxIdlePerWorker bounds each worker's idle list.
func WithMaxIdleTime ¶
WithMaxIdleTime sets the pooled conn idle eviction.
func WithMaxLifetime ¶
WithMaxLifetime sets the pooled conn lifetime.
func WithOnPush ¶
WithOnPush registers a callback for RESP3 push frames that arrive on command connections (e.g. client-tracking invalidation messages). The channel is the push kind (first array element) and data carries the remaining elements. If unset, push frames on command connections are silently dropped.
func WithReadTimeout ¶
WithReadTimeout sets the read timeout (currently advisory).
func WithUsername ¶
WithUsername sets the AUTH username (Redis 6 ACL).
func WithWriteTimeout ¶
WithWriteTimeout sets the write timeout (currently advisory).
type Pipeline ¶
type Pipeline struct {
// contains filtered or unexported fields
}
Pipeline batches commands into one wire write. Results are reachable via the *Cmd types returned from each method. A Pipeline is re-usable and pooled on its parent Client: allocate via Client.Pipeline, read results after Exec, then return it to the pool via Pipeline.Release. Release is optional but recommended for tight hot paths — without it a fresh Pipeline (and its internal slabs) is allocated on every call.
Pipeline is NOT safe for concurrent use. A caller must not invoke methods on the same Pipeline from multiple goroutines at once.
func (*Pipeline) Discard ¶
func (p *Pipeline) Discard()
Discard drops all queued commands without sending anything. The Pipeline remains usable for subsequent enqueues.
func (*Pipeline) Exec ¶
Exec sends every queued command in one write and blocks until the last reply is received. Returns the first parse/transport error; per-command errors are reachable via each *Cmd.Result().
func (*Pipeline) Release ¶
func (p *Pipeline) Release()
Release returns p to its Client's pool. After Release, every *StringCmd, *IntCmd, ... previously returned from p is invalid — reading from them is undefined behaviour. Safe to call at most once; subsequent calls are no-ops. Calling Release is optional; an un-Released Pipeline is simply GC'd like any other heap object.
type PubSub ¶
type PubSub struct {
// contains filtered or unexported fields
}
PubSub is a dedicated pub/sub connection. Callers read from PubSub.Channel and use PubSub.Subscribe / PubSub.Unsubscribe / PubSub.PSubscribe / PubSub.PUnsubscribe to alter the subscription set.
The connection is re-established automatically on transport errors: a hook registered with the underlying redisConn fires when the FD closes and a background goroutine dials a replacement, re-sends SUBSCRIBE/PSUBSCRIBE for every tracked channel/pattern, and resumes message delivery. Messages arriving while the conn is down are lost (at-most-once delivery).
Callers should treat subscribe/unsubscribe as "at most once" — the server ack is not tracked synchronously. The channel returned by PubSub.Channel is closed once PubSub.Close is called or the reconnect loop gives up.
func (*PubSub) Channel ¶
Channel returns the read-only message stream. It remains open until Close is called or the reconnect loop exhausts its retries.
func (*PubSub) Drops ¶
Drops returns the number of messages dropped because the channel buffer was full.
func (*PubSub) PSubscribe ¶
PSubscribe adds patterns to the subscription set. The ctx argument is kept for API symmetry; cancellation is not honored.
func (*PubSub) PUnsubscribe ¶
PUnsubscribe removes patterns. Empty list unsubscribes all patterns. The ctx argument is kept for API symmetry; cancellation is not honored.
func (*PubSub) SSubscribe ¶
SSubscribe adds shard channels (Redis 7+ SSUBSCRIBE) to the subscription set. Shard channels are scoped to the cluster slot of the channel name. The ctx argument is kept for API symmetry; cancellation is not honored.
func (*PubSub) SUnsubscribe ¶
SUnsubscribe removes shard channels. Empty list unsubscribes all shard channels. The ctx argument is kept for API symmetry; cancellation is not honored.
func (*PubSub) Subscribe ¶
Subscribe adds channels to the subscription set. The ctx argument is kept for API symmetry with Client.Subscribe; cancellation is not honored.
type RedisError ¶
RedisError wraps a server-side error reply. The Prefix (e.g. "WRONGTYPE", "ERR", "MOVED") distinguishes categories; the Msg holds the full text.
func (*RedisError) Error ¶
func (e *RedisError) Error() string
Error implements the error interface.
func (*RedisError) Is ¶
func (e *RedisError) Is(target error) bool
Is supports errors.Is for well-known prefixes.
type ScanIterator ¶
type ScanIterator struct {
// contains filtered or unexported fields
}
ScanIterator walks the keyspace via SCAN with automatic cursor paging. It is NOT safe for concurrent use; start one iterator per goroutine.
Typical use:
it := client.Scan(ctx, "user:*", 100)
for {
key, ok := it.Next(ctx)
if !ok {
break
}
// process key
}
if err := it.Err(); err != nil {
// transport or server error
}
func (*ScanIterator) Err ¶
func (it *ScanIterator) Err() error
Err returns any error encountered during iteration.
func (*ScanIterator) Next ¶
func (it *ScanIterator) Next(ctx context.Context) (string, bool)
Next returns the next key and true, or "" and false when the iteration ends or an error occurs. Call ScanIterator.Err to distinguish the two.
type SentinelClient ¶
type SentinelClient struct {
// contains filtered or unexported fields
}
SentinelClient is a Redis client managed by Redis Sentinel. It discovers the current primary via Sentinel, connects to it, and automatically reconnects to the new primary when a failover is detected.
func NewSentinelClient ¶
func NewSentinelClient(cfg SentinelConfig) (*SentinelClient, error)
NewSentinelClient creates a Sentinel-managed client. It connects to the first responsive sentinel, discovers the master address, and dials the master. A background goroutine subscribes to +switch-master on sentinel connections for automatic failover handling.
func (*SentinelClient) Close ¶
func (s *SentinelClient) Close() error
Close tears down the sentinel client and all sentinel connections.
func (*SentinelClient) MSet ¶
func (s *SentinelClient) MSet(ctx context.Context, pairs ...any) error
func (*SentinelClient) Pipeline ¶
func (s *SentinelClient) Pipeline() *Pipeline
func (*SentinelClient) Stats ¶
func (s *SentinelClient) Stats() async.PoolStats
type SentinelConfig ¶
type SentinelConfig struct {
// MasterName is the Sentinel master group name (required).
MasterName string
// SentinelAddrs is the list of sentinel addresses in "host:port" form.
// At least one is required.
SentinelAddrs []string
// Password is the AUTH credential for the Redis master (not sentinel).
Password string
// SentinelPassword is the AUTH credential for sentinel connections (Redis 6+).
SentinelPassword string
// Username is the ACL user for the Redis master (Redis 6+).
Username string
// DB is the database index applied via SELECT on the master.
DB int
PoolSize int
MaxIdlePerWorker int
DialTimeout time.Duration
ReadTimeout time.Duration
WriteTimeout time.Duration
Engine eventloop.ServerProvider
}
SentinelConfig configures a Sentinel-managed Redis client.
type StatusCmd ¶
type StatusCmd struct {
// contains filtered or unexported fields
}
StatusCmd is a deferred simple-string reply.
type StringCmd ¶
type StringCmd struct {
// contains filtered or unexported fields
}
StringCmd is a deferred string reply.
type Tx ¶
type Tx struct {
// contains filtered or unexported fields
}
Tx is a MULTI/EXEC transaction. Commands are buffered into a single wire write (MULTI + cmds + EXEC) and dispatched atomically. Per-command typed results are reachable via the *Cmd handles returned by each method — they populate only after Exec returns.
A Tx pins one connection for its entire lifetime. Exec releases the conn; Discard sends DISCARD and releases. Forgetting to call either leaks the conn until the pool's idle reaper closes it, so a sync.Pool-style "always defer tx.Close()" is recommended — Close is an alias for Discard that is a no-op after Exec.
func (*Tx) Discard ¶
Discard aborts the transaction. If Exec was not called, DISCARD is sent to the server to cancel MULTI state, and the conn is released. After Discard, the Tx is done.
func (*Tx) Exec ¶
Exec sends MULTI, every queued command, and EXEC in a single write, then dispatches the EXEC array's elements to the typed result slots.
Behaviour:
- The MULTI reply is +OK; each queued command's immediate reply is +QUEUED; the final EXEC reply is an array of N results.
- If EXEC returns a null array (RESP2 *-1 / RESP3 _), the transaction was aborted (e.g. a WATCHed key changed) — every typed result will return ErrTxAborted.
- If a single command errors server-side (WRONGTYPE, ...), that command's result returns the error but the others complete normally.
- If the server returns -EXECABORT, every typed result returns that error.
- After Exec, the tx is done: further Watch/Exec/command calls return an error. The conn is returned to the pool.