Documentation
¶
Overview ¶
Package redis is celeris's native RESP2/RESP3 Redis driver, designed to run its socket I/O on the celeris event loop alongside your HTTP handlers.
NewClient returns a Client backed by a per-worker connection pool; connections are dialed lazily on first command and negotiate RESP3 (HELLO 3) with automatic RESP2 fallback. Pass WithEngine to colocate the driver's file descriptors on the same workers as a running celeris server (lower latency via better data locality); omit it to use a standalone, internally reference-counted event loop. Other options include WithPassword, WithUsername, WithDB, WithForceRESP2, WithProto, and WithOnPush; see Config for the full set.
client, err := redis.NewClient("localhost:6379", redis.WithEngine(srv))
defer client.Close()
v, err := client.Get(ctx, "key")
Key exported symbols and when to reach for them:
- Client — typed commands for strings, hashes, lists, sets, sorted sets, keys, scripting (Eval, EvalSHA, ScriptLoad) and pub/sub. Client.Do (plus DoString/DoInt/DoBool/DoSlice) is the escape hatch for any command the typed surface omits.
- Client.Pipeline returns a Pipeline that batches commands into one write; queued calls return typed handles (*StringCmd, *IntCmd, etc.) resolved after Pipeline.Exec.
- Client.TxPipeline and Client.Watch provide MULTI/EXEC transactions and optimistic locking via Tx; aborts surface as ErrTxAborted.
- Client.Subscribe / Client.PSubscribe return a PubSub whose PubSub.Channel delivers [Message]s, with automatic reconnect.
- Client.Scan returns a ScanIterator for cursor-based key iteration.
- NewClusterClient (ClusterClient) and NewSentinelClient (SentinelClient) cover Redis Cluster and Sentinel deployments.
Typed accessors and pipeline/Do results are detached from the reader buffer, so returned values are safe to retain. Server error replies surface as *RedisError (with sentinels such as ErrNil, ErrWrongType, ErrCrossSlot); see errors.Is. TLS (rediss://) is not yet supported.
Documentation ¶
Full guides and examples: https://goceleris.dev/docs/data-stores
Index ¶
- Variables
- func Slot(key string) uint16
- func WithWorker(ctx context.Context, workerID int) context.Context
- type BoolCmd
- type Client
- func (c *Client) Append(ctx context.Context, key string, value string) (int64, error)
- func (c *Client) Close() error
- func (c *Client) DBSize(ctx context.Context) (int64, error)
- func (c *Client) Decr(ctx context.Context, key string) (int64, error)
- func (c *Client) DecrBy(ctx context.Context, key string, val int64) (int64, error)
- func (c *Client) Del(ctx context.Context, keys ...string) (int64, error)
- func (c *Client) Do(ctx context.Context, args ...any) (*protocol.Value, error)
- func (c *Client) DoBool(ctx context.Context, args ...any) (bool, error)
- func (c *Client) DoInt(ctx context.Context, args ...any) (int64, error)
- func (c *Client) DoSlice(ctx context.Context, args ...any) ([]string, error)
- func (c *Client) DoString(ctx context.Context, args ...any) (string, error)
- func (c *Client) Eval(ctx context.Context, script string, keys []string, args ...any) (*protocol.Value, error)
- func (c *Client) EvalSHA(ctx context.Context, sha string, keys []string, args ...any) (*protocol.Value, error)
- func (c *Client) Exists(ctx context.Context, keys ...string) (int64, error)
- func (c *Client) Expire(ctx context.Context, key string, expiration time.Duration) (bool, error)
- func (c *Client) ExpireAt(ctx context.Context, key string, at time.Time) (bool, error)
- func (c *Client) FlushDB(ctx context.Context) error
- func (c *Client) Get(ctx context.Context, key string) (string, error)
- func (c *Client) GetBytes(ctx context.Context, key string) ([]byte, error)
- func (c *Client) GetDel(ctx context.Context, key string) (string, error)
- func (c *Client) GetDelBytes(ctx context.Context, key string) ([]byte, error)
- func (c *Client) HDel(ctx context.Context, key string, fields ...string) (int64, error)
- func (c *Client) HExists(ctx context.Context, key, field string) (bool, error)
- func (c *Client) HGet(ctx context.Context, key, field string) (string, error)
- func (c *Client) HGetAll(ctx context.Context, key string) (map[string]string, error)
- func (c *Client) HIncrBy(ctx context.Context, key, field string, incr int64) (int64, error)
- func (c *Client) HIncrByFloat(ctx context.Context, key, field string, incr float64) (float64, error)
- func (c *Client) HKeys(ctx context.Context, key string) ([]string, error)
- func (c *Client) HLen(ctx context.Context, key string) (int64, error)
- func (c *Client) HMGet(ctx context.Context, key string, fields ...string) ([]string, error)
- func (c *Client) HSet(ctx context.Context, key string, values ...any) (int64, error)
- func (c *Client) HSetBytes(ctx context.Context, key, field string, value []byte) (int64, error)
- func (c *Client) HSetNX(ctx context.Context, key, field string, value any) (bool, error)
- func (c *Client) HSetNXBytes(ctx context.Context, key, field string, value []byte) (bool, error)
- func (c *Client) HVals(ctx context.Context, key string) ([]string, error)
- func (c *Client) IdleConnWorkers() []int
- func (c *Client) Incr(ctx context.Context, key string) (int64, error)
- func (c *Client) IncrBy(ctx context.Context, key string, val int64) (int64, error)
- func (c *Client) IncrByFloat(ctx context.Context, key string, val float64) (float64, error)
- func (c *Client) LIndex(ctx context.Context, key string, index int64) (string, error)
- func (c *Client) LLen(ctx context.Context, key string) (int64, error)
- func (c *Client) LPop(ctx context.Context, key string) (string, error)
- func (c *Client) LPush(ctx context.Context, key string, values ...any) (int64, error)
- func (c *Client) LPushBytes(ctx context.Context, key string, values ...[]byte) (int64, error)
- func (c *Client) LRange(ctx context.Context, key string, start, stop int64) ([]string, error)
- func (c *Client) LRem(ctx context.Context, key string, count int64, element any) (int64, error)
- func (c *Client) MGet(ctx context.Context, keys ...string) ([]string, error)
- func (c *Client) MSet(ctx context.Context, pairs ...any) error
- func (c *Client) OnPush(fn func(channel string, data []protocol.Value))
- func (c *Client) PExpire(ctx context.Context, key string, ms time.Duration) (bool, error)
- func (c *Client) PExpireAt(ctx context.Context, key string, at time.Time) (bool, error)
- func (c *Client) PSubscribe(ctx context.Context, patterns ...string) (*PubSub, error)
- func (c *Client) PTTL(ctx context.Context, key string) (time.Duration, error)
- func (c *Client) Persist(ctx context.Context, key string) (bool, error)
- func (c *Client) Ping(ctx context.Context) error
- func (c *Client) Pipeline() *Pipeline
- func (c *Client) Proto() int
- func (c *Client) Publish(ctx context.Context, channel, message string) (int64, error)
- func (c *Client) RPop(ctx context.Context, key string) (string, error)
- func (c *Client) RPush(ctx context.Context, key string, values ...any) (int64, error)
- func (c *Client) RPushBytes(ctx context.Context, key string, values ...[]byte) (int64, error)
- func (c *Client) RandomKey(ctx context.Context) (string, error)
- func (c *Client) Rename(ctx context.Context, key, newKey string) error
- func (c *Client) SAdd(ctx context.Context, key string, members ...any) (int64, error)
- func (c *Client) SCard(ctx context.Context, key string) (int64, error)
- func (c *Client) SDiff(ctx context.Context, keys ...string) ([]string, error)
- func (c *Client) SInter(ctx context.Context, keys ...string) ([]string, error)
- func (c *Client) SIsMember(ctx context.Context, key string, member any) (bool, error)
- func (c *Client) SMembers(ctx context.Context, key string) ([]string, error)
- func (c *Client) SRem(ctx context.Context, key string, members ...any) (int64, error)
- func (c *Client) SUnion(ctx context.Context, keys ...string) ([]string, error)
- func (c *Client) Scan(_ context.Context, match string, count int64) *ScanIterator
- func (c *Client) ScriptLoad(ctx context.Context, script string) (string, error)
- func (c *Client) Set(ctx context.Context, key string, value any, expiration time.Duration) error
- func (c *Client) SetBytes(ctx context.Context, key string, value []byte, expiration time.Duration) error
- func (c *Client) SetEX(ctx context.Context, key string, value any, ttl time.Duration) error
- func (c *Client) SetEXBytes(ctx context.Context, key string, value []byte, ttl time.Duration) error
- func (c *Client) SetNX(ctx context.Context, key string, value any, expiration time.Duration) (bool, error)
- func (c *Client) SetNXBytes(ctx context.Context, key string, value []byte, expiration time.Duration) (bool, error)
- func (c *Client) Stats() async.PoolStats
- func (c *Client) Subscribe(ctx context.Context, channels ...string) (*PubSub, error)
- func (c *Client) TTL(ctx context.Context, key string) (time.Duration, error)
- func (c *Client) TxPipeline(ctx context.Context) (*Tx, error)
- func (c *Client) Type(ctx context.Context, key string) (string, error)
- func (c *Client) Watch(ctx context.Context, fn func(tx *Tx) error, keys ...string) error
- func (c *Client) ZAdd(ctx context.Context, key string, members ...Z) (int64, error)
- func (c *Client) ZCard(ctx context.Context, key string) (int64, error)
- func (c *Client) ZCount(ctx context.Context, key, minScore, maxScore string) (int64, error)
- func (c *Client) ZIncrBy(ctx context.Context, key string, incr float64, member string) (float64, error)
- func (c *Client) ZRange(ctx context.Context, key string, start, stop int64) ([]string, error)
- func (c *Client) ZRangeByScore(ctx context.Context, key string, opt *ZRangeBy) ([]string, error)
- func (c *Client) ZRank(ctx context.Context, key, member string) (int64, error)
- func (c *Client) ZRem(ctx context.Context, key string, members ...any) (int64, error)
- func (c *Client) ZRevRange(ctx context.Context, key string, start, stop int64) ([]string, error)
- func (c *Client) ZScore(ctx context.Context, key, member string) (float64, error)
- type ClusterClient
- func (c *ClusterClient) Close() error
- func (c *ClusterClient) Decr(ctx context.Context, key string) (int64, error)
- func (c *ClusterClient) Del(ctx context.Context, keys ...string) (int64, error)
- func (c *ClusterClient) Do(ctx context.Context, args ...any) (*protocol.Value, error)
- func (c *ClusterClient) Exists(ctx context.Context, keys ...string) (int64, error)
- func (c *ClusterClient) Expire(ctx context.Context, key string, expiration time.Duration) (bool, error)
- func (c *ClusterClient) ForEachNode(_ context.Context, fn func(*Client) error) error
- func (c *ClusterClient) Get(ctx context.Context, key string) (string, error)
- func (c *ClusterClient) GetBytes(ctx context.Context, key string) ([]byte, error)
- func (c *ClusterClient) HDel(ctx context.Context, key string, fields ...string) (int64, error)
- func (c *ClusterClient) HGet(ctx context.Context, key, field string) (string, error)
- func (c *ClusterClient) HGetAll(ctx context.Context, key string) (map[string]string, error)
- func (c *ClusterClient) HSet(ctx context.Context, key string, values ...any) (int64, error)
- func (c *ClusterClient) Incr(ctx context.Context, key string) (int64, error)
- func (c *ClusterClient) LPop(ctx context.Context, key string) (string, error)
- func (c *ClusterClient) LPush(ctx context.Context, key string, values ...any) (int64, error)
- func (c *ClusterClient) LRange(ctx context.Context, key string, start, stop int64) ([]string, error)
- func (c *ClusterClient) Ping(ctx context.Context) error
- func (c *ClusterClient) Pipeline() *ClusterPipeline
- func (c *ClusterClient) Publish(ctx context.Context, channel, message string) (int64, error)
- func (c *ClusterClient) RPop(ctx context.Context, key string) (string, error)
- func (c *ClusterClient) RPush(ctx context.Context, key string, values ...any) (int64, error)
- func (c *ClusterClient) SAdd(ctx context.Context, key string, members ...any) (int64, error)
- func (c *ClusterClient) SMembers(ctx context.Context, key string) ([]string, error)
- func (c *ClusterClient) SPublish(ctx context.Context, channel, message string) (int64, error)
- func (c *ClusterClient) SSubscribe(ctx context.Context, channels ...string) (*PubSub, error)
- func (c *ClusterClient) Set(ctx context.Context, key string, value any, ttl time.Duration) error
- func (c *ClusterClient) SetNX(ctx context.Context, key string, value any, ttl time.Duration) (bool, error)
- func (c *ClusterClient) Subscribe(ctx context.Context, channels ...string) (*PubSub, error)
- func (c *ClusterClient) TTL(ctx context.Context, key string) (time.Duration, error)
- func (c *ClusterClient) TxPipeline() *ClusterTx
- func (c *ClusterClient) Watch(ctx context.Context, fn func(tx *Tx) error, keys ...string) error
- func (c *ClusterClient) ZAdd(ctx context.Context, key string, members ...Z) (int64, error)
- func (c *ClusterClient) ZRange(ctx context.Context, key string, start, stop int64) ([]string, error)
- type ClusterConfig
- type ClusterPipeline
- type ClusterTx
- func (t *ClusterTx) Decr(key string) *IntCmd
- func (t *ClusterTx) Del(keys ...string) *IntCmd
- func (t *ClusterTx) Discard()
- func (t *ClusterTx) Exec(ctx context.Context) error
- func (t *ClusterTx) Exists(keys ...string) *IntCmd
- func (t *ClusterTx) Expire(key string, d time.Duration) *BoolCmd
- func (t *ClusterTx) Get(key string) *StringCmd
- func (t *ClusterTx) HGet(key, field string) *StringCmd
- func (t *ClusterTx) HSet(key string, values ...any) *IntCmd
- func (t *ClusterTx) HSetBytes(key, field string, value []byte) *IntCmd
- func (t *ClusterTx) Incr(key string) *IntCmd
- func (t *ClusterTx) LPush(key string, values ...any) *IntCmd
- func (t *ClusterTx) LPushBytes(key string, values ...[]byte) *IntCmd
- func (t *ClusterTx) RPush(key string, values ...any) *IntCmd
- func (t *ClusterTx) RPushBytes(key string, values ...[]byte) *IntCmd
- func (t *ClusterTx) SAdd(key string, members ...any) *IntCmd
- func (t *ClusterTx) Set(key string, value any, expiration time.Duration) *StatusCmd
- func (t *ClusterTx) SetBytes(key string, value []byte, expiration time.Duration) *StatusCmd
- func (t *ClusterTx) ZAdd(key string, members ...Z) *IntCmd
- type Config
- type FloatCmd
- type IntCmd
- type Message
- type Option
- func WithDB(db int) Option
- func WithDialTimeout(d time.Duration) Option
- func WithEngine(sp eventloop.ServerProvider) Option
- func WithForceRESP2() Option
- func WithHealthCheckInterval(d time.Duration) Option
- func WithMaxIdlePerWorker(n int) Option
- func WithMaxIdleTime(d time.Duration) Option
- func WithMaxLifetime(d time.Duration) Option
- func WithOnPush(fn func(channel string, data []protocol.Value)) Option
- func WithPassword(s string) Option
- func WithPoolSize(n int) Option
- func WithProto(p int) Option
- func WithReadTimeout(d time.Duration) Option
- func WithUsername(s string) Option
- func WithWriteTimeout(d time.Duration) Option
- type Pipeline
- func (p *Pipeline) Decr(key string) *IntCmd
- func (p *Pipeline) Del(keys ...string) *IntCmd
- func (p *Pipeline) Discard()
- func (p *Pipeline) Exec(ctx context.Context) error
- func (p *Pipeline) Exists(keys ...string) *IntCmd
- func (p *Pipeline) Expire(key string, d time.Duration) *BoolCmd
- func (p *Pipeline) Get(key string) *StringCmd
- func (p *Pipeline) HGet(key, field string) *StringCmd
- func (p *Pipeline) HSet(key string, values ...any) *IntCmd
- func (p *Pipeline) Incr(key string) *IntCmd
- func (p *Pipeline) LPush(key string, values ...any) *IntCmd
- func (p *Pipeline) RPush(key string, values ...any) *IntCmd
- func (p *Pipeline) Release()
- func (p *Pipeline) SAdd(key string, members ...any) *IntCmd
- func (p *Pipeline) Set(key string, value any, expiration time.Duration) *StatusCmd
- func (p *Pipeline) SetBytes(key string, value []byte, expiration time.Duration) *StatusCmd
- func (p *Pipeline) ZAdd(key string, members ...Z) *IntCmd
- type PubSub
- func (ps *PubSub) Channel() <-chan *Message
- func (ps *PubSub) Close() error
- func (ps *PubSub) Drops() uint64
- func (ps *PubSub) PSubscribe(_ context.Context, patterns ...string) error
- func (ps *PubSub) PUnsubscribe(_ context.Context, patterns ...string) error
- func (ps *PubSub) SSubscribe(_ context.Context, channels ...string) error
- func (ps *PubSub) SUnsubscribe(_ context.Context, channels ...string) error
- func (ps *PubSub) Subscribe(_ context.Context, channels ...string) error
- func (ps *PubSub) Unsubscribe(_ context.Context, channels ...string) error
- type RedisError
- type ScanIterator
- type SentinelClient
- func (s *SentinelClient) Close() error
- func (s *SentinelClient) Decr(ctx context.Context, key string) (int64, error)
- func (s *SentinelClient) Del(ctx context.Context, keys ...string) (int64, error)
- func (s *SentinelClient) Do(ctx context.Context, args ...any) (*protocol.Value, error)
- func (s *SentinelClient) Exists(ctx context.Context, keys ...string) (int64, error)
- func (s *SentinelClient) Expire(ctx context.Context, key string, expiration time.Duration) (bool, error)
- func (s *SentinelClient) Get(ctx context.Context, key string) (string, error)
- func (s *SentinelClient) GetBytes(ctx context.Context, key string) ([]byte, error)
- func (s *SentinelClient) HDel(ctx context.Context, key string, fields ...string) (int64, error)
- func (s *SentinelClient) HGet(ctx context.Context, key, field string) (string, error)
- func (s *SentinelClient) HGetAll(ctx context.Context, key string) (map[string]string, error)
- func (s *SentinelClient) HSet(ctx context.Context, key string, values ...any) (int64, error)
- func (s *SentinelClient) Incr(ctx context.Context, key string) (int64, error)
- func (s *SentinelClient) LPop(ctx context.Context, key string) (string, error)
- func (s *SentinelClient) LPush(ctx context.Context, key string, values ...any) (int64, error)
- func (s *SentinelClient) LRange(ctx context.Context, key string, start, stop int64) ([]string, error)
- func (s *SentinelClient) MGet(ctx context.Context, keys ...string) ([]string, error)
- func (s *SentinelClient) MSet(ctx context.Context, pairs ...any) error
- func (s *SentinelClient) Ping(ctx context.Context) error
- func (s *SentinelClient) Pipeline() *Pipeline
- func (s *SentinelClient) Publish(ctx context.Context, channel, message string) (int64, error)
- func (s *SentinelClient) RPop(ctx context.Context, key string) (string, error)
- func (s *SentinelClient) RPush(ctx context.Context, key string, values ...any) (int64, error)
- func (s *SentinelClient) SAdd(ctx context.Context, key string, members ...any) (int64, error)
- func (s *SentinelClient) SMembers(ctx context.Context, key string) ([]string, error)
- func (s *SentinelClient) Set(ctx context.Context, key string, value any, expiration time.Duration) error
- func (s *SentinelClient) SetNX(ctx context.Context, key string, value any, expiration time.Duration) (bool, error)
- func (s *SentinelClient) Stats() async.PoolStats
- func (s *SentinelClient) Subscribe(ctx context.Context, channels ...string) (*PubSub, error)
- func (s *SentinelClient) TTL(ctx context.Context, key string) (time.Duration, error)
- func (s *SentinelClient) ZAdd(ctx context.Context, key string, members ...Z) (int64, error)
- func (s *SentinelClient) ZRange(ctx context.Context, key string, start, stop int64) ([]string, error)
- type SentinelConfig
- type StatusCmd
- type StringCmd
- type Tx
- func (tx *Tx) Decr(key string) *IntCmd
- func (tx *Tx) Del(keys ...string) *IntCmd
- func (tx *Tx) Discard() error
- func (tx *Tx) Exec(ctx context.Context) error
- func (tx *Tx) Exists(keys ...string) *IntCmd
- func (tx *Tx) Expire(key string, d time.Duration) *BoolCmd
- func (tx *Tx) Get(key string) *StringCmd
- func (tx *Tx) HGet(key, field string) *StringCmd
- func (tx *Tx) HSet(key string, values ...any) *IntCmd
- func (tx *Tx) Incr(key string) *IntCmd
- func (tx *Tx) LPush(key string, values ...any) *IntCmd
- func (tx *Tx) RPush(key string, values ...any) *IntCmd
- func (tx *Tx) SAdd(key string, members ...any) *IntCmd
- func (tx *Tx) Set(key string, value any, expiration time.Duration) *StatusCmd
- func (tx *Tx) Unwatch(ctx context.Context) error
- func (tx *Tx) Watch(ctx context.Context, keys ...string) error
- func (tx *Tx) ZAdd(key string, members ...Z) *IntCmd
- type Z
- type ZRangeBy
Examples ¶
Constants ¶
This section is empty.
Variables ¶
var ErrAsk = errors.New("celeris-redis: ASK redirect")
ErrAsk is returned when the server responds with an -ASK redirect during a cluster slot migration.
var ErrClosed = errors.New("celeris-redis: client closed")
ErrClosed is returned when a command is issued against a closed Client or PubSub.
var ErrClusterMaxRedirects = errors.New("celeris-redis: cluster max redirects exceeded")
ErrClusterMaxRedirects is returned when the maximum number of MOVED/ASK redirects has been exhausted.
var ErrCrossSlot = errors.New("celeris-redis: CROSSSLOT keys in request don't hash to the same slot; use hash tags {}")
ErrCrossSlot is returned by ClusterTx and ClusterClient.Watch when the queued commands (or watched keys) span multiple hash slots. Redis Cluster requires all keys in a MULTI/EXEC transaction to reside in the same slot. Use hash tags (e.g. "{tag}") to colocate related keys.
var ErrMoved = errors.New("celeris-redis: MOVED redirect")
ErrMoved is returned when the server responds with a -MOVED redirect. When using ClusterClient, MOVED redirects are handled transparently; this sentinel is still useful for errors.Is checks on single-node Client.
var ErrNil = errors.New("celeris-redis: nil")
ErrNil is returned by commands whose reply is a null bulk — e.g. GET on a missing key, LPOP on an empty list, or HGET on a missing field.
var ErrPoolExhausted = errors.New("celeris-redis: pool exhausted")
ErrPoolExhausted is returned when no connection is available and none can be dialed within MaxOpen.
var ErrProtocol = errors.New("celeris-redis: protocol error")
ErrProtocol is returned when the server reply cannot be decoded.
var ErrSentinelUnhealthy = errors.New("celeris-redis: sentinel client unhealthy, failover dial failed")
ErrSentinelUnhealthy is returned by commands when the sentinel client failed to connect to the new master after a failover and is in an unhealthy state.
var ErrTxAborted = errors.New("celeris-redis: transaction aborted")
ErrTxAborted is returned from each typed *Cmd.Result() after a MULTI/EXEC transaction aborts — e.g. because a WATCHed key was modified between the WATCH call and EXEC, or the server rejected the transaction with -EXECABORT.
var ErrWrongType = errors.New("celeris-redis: WRONGTYPE operation against a key holding the wrong kind of value")
ErrWrongType matches the server's WRONGTYPE error reply for operations against a key of the wrong data type.
Functions ¶
Types ¶
type BoolCmd ¶
type BoolCmd struct {
// contains filtered or unexported fields
}
BoolCmd is a deferred bool reply.
type Client ¶
type Client struct {
// contains filtered or unexported fields
}
Client is the high-level handle users interact with. It owns a pool of connections and hands out per-command round trips.
func NewClient ¶
NewClient dials a pool at addr using the given options. Connections are established lazily on first command.
Example ¶
ExampleNewClient shows the basic Get/Set loop. The client is lazy: the first command triggers the TCP dial and the RESP3 (or RESP2) handshake.
package main
import (
"context"
"fmt"
"log"
"time"
"github.com/goceleris/celeris/driver/redis"
)
func main() {
client, err := redis.NewClient("localhost:6379",
redis.WithPassword("secret"),
redis.WithDB(0),
)
if err != nil {
log.Fatal(err)
}
defer func() { _ = client.Close() }()
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
if err := client.Set(ctx, "greeting", "hello, world", time.Minute); err != nil {
log.Fatal(err)
}
v, err := client.Get(ctx, "greeting")
if err != nil {
log.Fatal(err)
}
fmt.Println(v)
}
Output:
func (*Client) Do ¶
Do is an escape hatch for commands the typed API does not cover (OBJECT ENCODING, CLUSTER INFO, SCRIPT LOAD, XADD, FUNCTION, ...). Args are converted to strings via [argify]; the returned protocol.Value is detached from the Reader buffer and safe to retain. Server error replies surface as *RedisError via the returned error.
Example ¶
ExampleClient_Do is the escape hatch for commands the typed API does not cover. Args are stringified through the driver's internal argify helper; the reply is a detached protocol.Value safe to retain.
package main
import (
"context"
"fmt"
"log"
"github.com/goceleris/celeris/driver/redis"
)
func main() {
client, err := redis.NewClient("localhost:6379")
if err != nil {
log.Fatal(err)
}
defer func() { _ = client.Close() }()
ctx := context.Background()
// XADD stream * field value — returns the new entry ID as a bulk string.
id, err := client.DoString(ctx, "XADD", "events", "*", "field", "value")
if err != nil {
log.Fatal(err)
}
fmt.Println(id)
}
Output:
func (*Client) DoBool ¶
DoBool is a convenience wrapper around Client.Do that decodes the reply as a bool.
func (*Client) DoInt ¶
DoInt is a convenience wrapper around Client.Do that decodes the reply as an int64.
func (*Client) DoSlice ¶
DoSlice is a convenience wrapper around Client.Do that decodes the reply as a string slice.
func (*Client) DoString ¶
DoString is a convenience wrapper around Client.Do that decodes the reply as a string.
func (*Client) Eval ¶
func (c *Client) Eval(ctx context.Context, script string, keys []string, args ...any) (*protocol.Value, error)
Eval runs a Lua script server-side. keys and args may be empty; numkeys is inferred from len(keys). The returned *protocol.Value is detached and safe to retain.
func (*Client) EvalSHA ¶
func (c *Client) EvalSHA(ctx context.Context, sha string, keys []string, args ...any) (*protocol.Value, error)
EvalSHA runs a previously-loaded script by sha1. Returns NOSCRIPT as *RedisError when the server does not have the script cached.
func (*Client) Expire ¶
Expire sets a TTL on a key. If expiration is zero or negative, Expire calls PERSIST instead (removes the TTL).
func (*Client) GetBytes ¶
GetBytes is the []byte variant of Get. The returned slice is a fresh copy.
func (*Client) GetDelBytes ¶ added in v1.4.1
GetDelBytes is the []byte-returning variant of Client.GetDel, skipping the string→[]byte conversion that Client.GetDel callers otherwise incur when passing the result through middleware/store.KV.GetAndDelete.
func (*Client) HIncrByFloat ¶
func (c *Client) HIncrByFloat(ctx context.Context, key, field string, incr float64) (float64, error)
HIncrByFloat increments a hash field by a float delta.
func (*Client) HMGet ¶
HMGet returns the values of the specified fields in the hash at key. Missing fields yield empty strings.
func (*Client) HSet ¶
HSet sets one or more field/value pairs. values is a flat list [f1, v1, f2, v2, ...].
func (*Client) HSetBytes ¶ added in v1.4.1
HSetBytes is the allocation-lean variant of Client.HSet for a single field/value pair where value is already a []byte. The field name is still a string (hash fields are typed string).
func (*Client) HSetNXBytes ¶ added in v1.4.1
HSetNXBytes is the allocation-lean variant of Client.HSetNX.
func (*Client) IdleConnWorkers ¶
IdleConnWorkers returns the Worker() IDs of every currently-idle command connection. Intended for tests and introspection asserting that per-CPU affinity is actually honored by the dial path.
func (*Client) IncrByFloat ¶
IncrByFloat increments key by val and returns the new value.
func (*Client) LPushBytes ¶ added in v1.4.1
LPushBytes is the allocation-lean variant of Client.LPush for callers that already hold the values as [][]byte. Skips argify's string(x) copy per value.
func (*Client) MSet ¶
MSet sets multiple key-value pairs atomically. pairs is a flat list [k1, v1, k2, v2, ...].
func (*Client) OnPush ¶
OnPush registers (or replaces) the push callback for RESP3 push frames arriving on command connections. It is safe to call concurrently with in-flight commands; delivery is best-effort. Pass nil to clear.
func (*Client) PExpire ¶
PExpire sets a TTL in milliseconds. If ms is zero or negative, PExpire calls PERSIST instead (removes the TTL).
func (*Client) PExpireAt ¶
PExpireAt sets an absolute expiration timestamp (millisecond granularity).
func (*Client) PSubscribe ¶
PSubscribe opens a pub/sub conn and subscribes to patterns.
func (*Client) PTTL ¶
PTTL returns the remaining TTL in milliseconds; -1 if no TTL, -2 if key is missing.
func (*Client) Pipeline ¶
Pipeline returns a Pipeline bound to c. Call Pipeline.Release after reading results to return it to the pool for reuse — holding the Pipeline past Release invalidates every typed Cmd handle it produced.
Example ¶
ExampleClient_Pipeline batches several commands into one network write. Results are harvested from typed *Cmd handles after Exec returns.
package main
import (
"context"
"fmt"
"log"
"time"
"github.com/goceleris/celeris/driver/redis"
)
func main() {
client, err := redis.NewClient("localhost:6379")
if err != nil {
log.Fatal(err)
}
defer func() { _ = client.Close() }()
ctx := context.Background()
p := client.Pipeline()
setRes := p.Set("counter", 0, 0)
incrRes := p.Incr("counter")
ttlRes := p.Expire("counter", time.Hour)
if err := p.Exec(ctx); err != nil {
log.Fatal(err)
}
_, _ = setRes.Result()
n, _ := incrRes.Result()
ok, _ := ttlRes.Result()
fmt.Println(n, ok)
}
Output:
func (*Client) Proto ¶
Proto returns the negotiated RESP protocol version on the last dialed connection. Returns 0 if no conn has been dialed yet (call Ping first).
func (*Client) Publish ¶
Publish sends message to channel via PUBLISH. Returns the number of subscribers that received the message.
func (*Client) RPushBytes ¶ added in v1.4.1
RPushBytes is the allocation-lean variant of Client.RPush. Skips argify's per-value string(x) copy.
func (*Client) Scan ¶
Scan returns a new iterator. match is the pattern passed as MATCH (empty means no MATCH arg). count is the COUNT hint (0 means no COUNT arg). The ctx argument is passed through on each Next call.
func (*Client) ScriptLoad ¶
ScriptLoad caches a script on the server and returns its SHA1 digest.
func (*Client) Set ¶
Set stores value at key. If expiration > 0, an EX argument is appended with whole-second granularity (or PX for sub-second).
func (*Client) SetBytes ¶ added in v1.4.1
func (c *Client) SetBytes(ctx context.Context, key string, value []byte, expiration time.Duration) error
SetBytes is the allocation-lean variant of Client.Set. It converts the caller-owned []byte into a string without copying via unsafe.String — safe because the RESP writer only reads the bytes to stream to the wire and does not retain the string past the round trip. Skips argify's interface type switch + allocating string(x) for the []byte case.
func (*Client) SetEX ¶
SetEX sets key to value with the given TTL (second granularity). This is the atomic equivalent of SET + EXPIRE.
func (*Client) SetEXBytes ¶ added in v1.4.1
SetEXBytes is the allocation-lean variant of Client.SetEX.
func (*Client) SetNX ¶
func (c *Client) SetNX(ctx context.Context, key string, value any, expiration time.Duration) (bool, error)
SetNX sets key only if it does not exist.
func (*Client) SetNXBytes ¶ added in v1.4.1
func (c *Client) SetNXBytes(ctx context.Context, key string, value []byte, expiration time.Duration) (bool, error)
SetNXBytes is the allocation-lean variant of Client.SetNX. Skips argify's string(x) copy via unsafe.String — same safety rationale as Client.SetBytes.
func (*Client) Subscribe ¶
Subscribe opens a pub/sub conn and subscribes to channels.
Example ¶
ExampleClient_Subscribe opens a dedicated pub/sub connection and consumes messages from PubSub.Channel. A goroutine publishes one message via the cmd-pool client to drive the subscriber.
package main
import (
"context"
"fmt"
"log"
"time"
"github.com/goceleris/celeris/driver/redis"
)
func main() {
client, err := redis.NewClient("localhost:6379")
if err != nil {
log.Fatal(err)
}
defer func() { _ = client.Close() }()
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
ps, err := client.Subscribe(ctx, "events")
if err != nil {
log.Fatal(err)
}
defer func() { _ = ps.Close() }()
go func() {
time.Sleep(50 * time.Millisecond)
_, _ = client.Publish(ctx, "events", "hello")
}()
select {
case m := <-ps.Channel():
fmt.Println(m.Channel, m.Payload)
case <-ctx.Done():
log.Fatal(ctx.Err())
}
}
Output:
func (*Client) TxPipeline ¶
TxPipeline opens a MULTI/EXEC transaction. The returned Tx pins a connection from the cmd pool until Exec or Discard. Typed command methods (Get, Set, Incr, ...) queue commands onto the Tx; Exec flushes them all under a single MULTI/EXEC.
Example ¶
ExampleClient_TxPipeline queues a pair of INCRs under a single MULTI/EXEC transaction. Per-command results are deferred until Exec returns.
package main
import (
"context"
"fmt"
"log"
"github.com/goceleris/celeris/driver/redis"
)
func main() {
client, err := redis.NewClient("localhost:6379")
if err != nil {
log.Fatal(err)
}
defer func() { _ = client.Close() }()
ctx := context.Background()
tx, err := client.TxPipeline(ctx)
if err != nil {
log.Fatal(err)
}
defer func() { _ = tx.Discard() }() // no-op after a successful Exec.
visits := tx.Incr("visits")
uniques := tx.Incr("uniques")
if err := tx.Exec(ctx); err != nil {
log.Fatal(err)
}
v, _ := visits.Result()
u, _ := uniques.Result()
fmt.Println(v, u)
}
Output:
func (*Client) Watch ¶
Watch acquires a dedicated connection, issues WATCH on keys, and invokes fn with a Tx pinned to that connection. The connection is released when fn returns (after Exec or Discard). This is the only correct way to use WATCH on a pooled client because the WATCH state is per-connection.
Usage:
err := client.Watch(ctx, func(tx *Tx) error {
// Read current value (on the same pinned conn via the Tx's client).
// Queue commands:
tx.Set("k", "new", 0)
return tx.Exec(ctx)
}, "k")
If the WATCHed keys change before EXEC, Exec returns ErrTxAborted.
func (*Client) ZCount ¶
ZCount returns the count of members with scores between minScore and maxScore (inclusive string boundaries, e.g. "-inf", "+inf", "1", "(5").
func (*Client) ZIncrBy ¶
func (c *Client) ZIncrBy(ctx context.Context, key string, incr float64, member string) (float64, error)
ZIncrBy increments the score of member in the sorted set at key by incr.
func (*Client) ZRangeByScore ¶
ZRangeByScore returns members with scores in [min,max] with optional LIMIT.
type ClusterClient ¶
type ClusterClient struct {
// contains filtered or unexported fields
}
ClusterClient is a Redis Cluster client. It maintains a slot-to-node mapping, routes commands by key hash slot, and handles MOVED/ASK redirects transparently. Each cluster node gets its own Client with its own pool.
func NewClusterClient ¶
func NewClusterClient(cfg ClusterConfig) (*ClusterClient, error)
NewClusterClient creates a Redis Cluster client. It connects to the seed nodes, fetches the cluster topology via CLUSTER SLOTS, and populates the slot map. A background goroutine refreshes the topology periodically.
func (*ClusterClient) Close ¶
func (c *ClusterClient) Close() error
Close tears down all node clients and stops the background refresh.
func (*ClusterClient) ForEachNode ¶
ForEachNode calls fn on every node's Client. Used for cluster-wide operations like FLUSHDB or DBSIZE. The ctx argument is kept for API symmetry; cancellation is not honored.
func (*ClusterClient) Pipeline ¶
func (c *ClusterClient) Pipeline() *ClusterPipeline
Pipeline returns a ClusterPipeline that groups commands by slot.
func (*ClusterClient) SPublish ¶
SPublish publishes a message to a shard channel (Redis 7+ SPUBLISH). The command is routed to the node owning the channel's slot.
func (*ClusterClient) SSubscribe ¶
SSubscribe opens a pub/sub connection and subscribes to shard channels (Redis 7+ SSUBSCRIBE). Shard channels are slot-scoped: the connection is established to the node owning the slot of the first channel. All channels SHOULD hash to the same slot (use hash tags); mixing slots in one PubSub is undefined behavior in cluster mode.
func (*ClusterClient) TxPipeline ¶
func (c *ClusterClient) TxPipeline() *ClusterTx
TxPipeline returns a ClusterTx that validates slot affinity for every queued command. If commands target different slots, ClusterTx.Exec returns ErrCrossSlot.
func (*ClusterClient) Watch ¶
Watch validates that all keys hash to the same slot, routes to the correct cluster node, and delegates to the node's Client.Watch. The fn callback receives a single-node Tx that can queue commands normally.
type ClusterConfig ¶
type ClusterConfig struct {
// Addrs is the list of seed node addresses in "host:port" form. At least
// one is required; the client discovers the rest via CLUSTER SLOTS.
Addrs []string
// Password is the AUTH credential for cluster nodes.
Password string
// Username is the ACL user (Redis 6+).
Username string
PoolSize int
MaxIdlePerWorker int
DialTimeout time.Duration
ReadTimeout time.Duration
WriteTimeout time.Duration
// MaxRedirects is the maximum number of MOVED/ASK redirects before an
// error is returned. Default: 3.
MaxRedirects int
// RouteByLatency, when true, sends reads to the lowest-latency node.
// Not implemented in v1.4.0; reserved for future use.
RouteByLatency bool
// ReadOnly, when true, allows reads from replica nodes.
// Not implemented in v1.4.0; reserved for future use.
ReadOnly bool
Engine eventloop.ServerProvider
}
ClusterConfig configures a Redis Cluster client.
type ClusterPipeline ¶
type ClusterPipeline struct {
// contains filtered or unexported fields
}
ClusterPipeline groups commands by slot and executes them on the correct nodes in parallel. It uses per-node Pipelines internally.
func (*ClusterPipeline) Discard ¶
func (cp *ClusterPipeline) Discard()
Discard drops all queued commands.
func (*ClusterPipeline) Exec ¶
Exec sends all queued commands, grouped by slot, executing per-node sub-pipelines in parallel. Returns per-command results and errors indexed by the original enqueue order. MOVED/ASK redirects within pipeline responses are retried once: MOVED triggers a topology refresh and re-route; ASK sends ASKING on the target node before re-issuing the command.
type ClusterTx ¶
type ClusterTx struct {
// contains filtered or unexported fields
}
ClusterTx buffers commands for a MULTI/EXEC transaction on a ClusterClient. All keys must hash to the same slot; if a command targets a different slot, the ClusterTx records ErrCrossSlot and [Exec] returns it without contacting the server.
Obtain a ClusterTx via ClusterClient.TxPipeline. After queuing commands, call ClusterTx.Exec to execute atomically on the owning node. Call ClusterTx.Discard to drop all queued commands without executing.
func (*ClusterTx) Exec ¶
Exec sends all queued commands inside a MULTI/EXEC transaction to the cluster node that owns the pinned slot. Returns ErrCrossSlot if any command targeted a different slot than the first.
func (*ClusterTx) HSetBytes ¶ added in v1.4.1
HSetBytes enqueues single-field HSET using a zero-copy []byte value.
func (*ClusterTx) LPushBytes ¶ added in v1.4.1
LPushBytes enqueues LPUSH with zero-copy []byte values.
func (*ClusterTx) RPushBytes ¶ added in v1.4.1
RPushBytes enqueues RPUSH with zero-copy []byte values.
func (*ClusterTx) SetBytes ¶ added in v1.4.1
SetBytes is the allocation-lean variant of ClusterTx.Set — uses unsafe.String to avoid the argify string(x) copy.
type Config ¶
type Config struct {
// Addr is the TCP endpoint in "host:port" form.
Addr string
// Password is the AUTH credential; empty for no auth.
Password string
// Username is the ACL user (Redis 6+); empty for legacy auth.
Username string
// DB is the numeric database index applied via SELECT post-HELLO.
DB int
// DialTimeout is the TCP dial timeout; default 5s.
DialTimeout time.Duration
// ReadTimeout is unused currently — responses are driven by the event
// loop and cancellation is via the request context.
ReadTimeout time.Duration
// WriteTimeout is unused currently — writes are non-blocking via the
// event loop's per-FD queue.
WriteTimeout time.Duration
// PoolSize is the total connection cap across all workers; defaults to
// NumWorkers*4.
PoolSize int
// MaxIdlePerWorker bounds each worker's idle list; default 2.
MaxIdlePerWorker int
// MaxLifetime is the max age of a pooled conn; default 30m.
MaxLifetime time.Duration
// MaxIdleTime is the max idle duration before eviction; default 5m.
MaxIdleTime time.Duration
// HealthCheckInterval is the interval at which the pool's background
// health checker pings idle connections. Default 30s; 0 disables.
HealthCheckInterval time.Duration
// Proto selects the negotiation target (2 or 3); default 3 with
// RESP2 fallback.
Proto int
// ForceRESP2 disables the HELLO handshake and speaks RESP2 plus AUTH+
// SELECT. Useful for ElastiCache compatibility.
ForceRESP2 bool
// Engine hooks the driver into a running celeris.Server's event loop.
// If nil, a standalone loop is resolved on NewClient.
Engine eventloop.ServerProvider
// OnPush is invoked when a RESP3 push frame arrives on a command
// connection. The channel is the first element of the push array
// (e.g. "invalidate" for client-tracking invalidations) and data
// carries the remaining elements. If nil, push frames on command
// connections are silently dropped.
OnPush func(channel string, data []protocol.Value)
}
Config controls a Client. Use Option functions to set fields; the zero value is not directly usable.
type FloatCmd ¶
type FloatCmd struct {
// contains filtered or unexported fields
}
FloatCmd is a deferred float reply.
type IntCmd ¶
type IntCmd struct {
// contains filtered or unexported fields
}
IntCmd is a deferred int64 reply.
type Message ¶
Message is one pub/sub notification. Pattern is set only for PSUBSCRIBE deliveries ("pmessage" push type). Shard is true for shard channel deliveries ("smessage" push type, Redis 7+).
type Option ¶
type Option func(*Config)
Option mutates a Config during NewClient.
func WithDialTimeout ¶
WithDialTimeout sets the TCP dial timeout.
func WithEngine ¶
func WithEngine(sp eventloop.ServerProvider) Option
WithEngine hooks the driver into a celeris.Server's event loop.
func WithForceRESP2 ¶
func WithForceRESP2() Option
WithForceRESP2 forces the RESP2 dialect; skips HELLO.
func WithHealthCheckInterval ¶
WithHealthCheckInterval sets the background health-check sweep interval. Default is 30s; pass 0 to disable.
func WithMaxIdlePerWorker ¶
WithMaxIdlePerWorker bounds each worker's idle list.
func WithMaxIdleTime ¶
WithMaxIdleTime sets the pooled conn idle eviction.
func WithMaxLifetime ¶
WithMaxLifetime sets the pooled conn lifetime.
func WithOnPush ¶
WithOnPush registers a callback for RESP3 push frames that arrive on command connections (e.g. client-tracking invalidation messages). The channel is the push kind (first array element) and data carries the remaining elements. If unset, push frames on command connections are silently dropped.
func WithReadTimeout ¶
WithReadTimeout sets the read timeout (currently advisory).
func WithUsername ¶
WithUsername sets the AUTH username (Redis 6 ACL).
func WithWriteTimeout ¶
WithWriteTimeout sets the write timeout (currently advisory).
type Pipeline ¶
type Pipeline struct {
// contains filtered or unexported fields
}
Pipeline batches commands into one wire write. Results are reachable via the *Cmd types returned from each method. A Pipeline is re-usable and pooled on its parent Client: allocate via Client.Pipeline, read results after Exec, then return it to the pool via Pipeline.Release. Release is optional but recommended for tight hot paths — without it a fresh Pipeline (and its internal slabs) is allocated on every call.
Pipeline is NOT safe for concurrent use. A caller must not invoke methods on the same Pipeline from multiple goroutines at once.
func (*Pipeline) Discard ¶
func (p *Pipeline) Discard()
Discard drops all queued commands without sending anything. The Pipeline remains usable for subsequent enqueues.
func (*Pipeline) Exec ¶
Exec sends every queued command in one write and blocks until the last reply is received. Returns the first parse/transport error; per-command errors are reachable via each *Cmd.Result().
func (*Pipeline) Release ¶
func (p *Pipeline) Release()
Release returns p to its Client's pool. After Release, every *StringCmd, *IntCmd, ... previously returned from p is invalid — reading from them is undefined behaviour. Safe to call at most once; subsequent calls are no-ops. Calling Release is optional; an un-Released Pipeline is simply GC'd like any other heap object.
func (*Pipeline) SetBytes ¶ added in v1.4.1
SetBytes is the allocation-lean variant of Pipeline.Set — uses unsafe.String to avoid the argify string(x) copy.
type PubSub ¶
type PubSub struct {
// contains filtered or unexported fields
}
PubSub is a dedicated pub/sub connection. Callers read from PubSub.Channel and use PubSub.Subscribe / PubSub.Unsubscribe / PubSub.PSubscribe / PubSub.PUnsubscribe to alter the subscription set.
The connection is re-established automatically on transport errors: a hook registered with the underlying redisConn fires when the FD closes and a background goroutine dials a replacement, re-sends SUBSCRIBE/PSUBSCRIBE for every tracked channel/pattern, and resumes message delivery. Messages arriving while the conn is down are lost (at-most-once delivery).
Callers should treat subscribe/unsubscribe as "at most once" — the server ack is not tracked synchronously. The channel returned by PubSub.Channel is closed once PubSub.Close is called or the reconnect loop gives up.
func (*PubSub) Channel ¶
Channel returns the read-only message stream. It remains open until Close is called or the reconnect loop exhausts its retries.
func (*PubSub) Drops ¶
Drops returns the number of messages dropped because the channel buffer was full.
func (*PubSub) PSubscribe ¶
PSubscribe adds patterns to the subscription set. The ctx argument is kept for API symmetry; cancellation is not honored.
func (*PubSub) PUnsubscribe ¶
PUnsubscribe removes patterns. Empty list unsubscribes all patterns. The ctx argument is kept for API symmetry; cancellation is not honored.
func (*PubSub) SSubscribe ¶
SSubscribe adds shard channels (Redis 7+ SSUBSCRIBE) to the subscription set. Shard channels are scoped to the cluster slot of the channel name. The ctx argument is kept for API symmetry; cancellation is not honored.
func (*PubSub) SUnsubscribe ¶
SUnsubscribe removes shard channels. Empty list unsubscribes all shard channels. The ctx argument is kept for API symmetry; cancellation is not honored.
func (*PubSub) Subscribe ¶
Subscribe adds channels to the subscription set. The ctx argument is kept for API symmetry with Client.Subscribe; cancellation is not honored.
type RedisError ¶
RedisError wraps a server-side error reply. The Prefix (e.g. "WRONGTYPE", "ERR", "MOVED") distinguishes categories; the Msg holds the full text.
func (*RedisError) Error ¶
func (e *RedisError) Error() string
Error implements the error interface.
func (*RedisError) Is ¶
func (e *RedisError) Is(target error) bool
Is supports errors.Is for well-known prefixes.
type ScanIterator ¶
type ScanIterator struct {
// contains filtered or unexported fields
}
ScanIterator walks the keyspace via SCAN with automatic cursor paging. It is NOT safe for concurrent use; start one iterator per goroutine.
Typical use:
it := client.Scan(ctx, "user:*", 100)
for {
key, ok := it.Next(ctx)
if !ok {
break
}
// process key
}
if err := it.Err(); err != nil {
// transport or server error
}
func (*ScanIterator) Err ¶
func (it *ScanIterator) Err() error
Err returns any error encountered during iteration.
func (*ScanIterator) Next ¶
func (it *ScanIterator) Next(ctx context.Context) (string, bool)
Next returns the next key and true, or "" and false when the iteration ends or an error occurs. Call ScanIterator.Err to distinguish the two.
type SentinelClient ¶
type SentinelClient struct {
// contains filtered or unexported fields
}
SentinelClient is a Redis client managed by Redis Sentinel. It discovers the current primary via Sentinel, connects to it, and automatically reconnects to the new primary when a failover is detected.
func NewSentinelClient ¶
func NewSentinelClient(cfg SentinelConfig) (*SentinelClient, error)
NewSentinelClient creates a Sentinel-managed client. It connects to the first responsive sentinel, discovers the master address, and dials the master. A background goroutine subscribes to +switch-master on sentinel connections for automatic failover handling.
func (*SentinelClient) Close ¶
func (s *SentinelClient) Close() error
Close tears down the sentinel client and all sentinel connections.
func (*SentinelClient) MSet ¶
func (s *SentinelClient) MSet(ctx context.Context, pairs ...any) error
func (*SentinelClient) Pipeline ¶
func (s *SentinelClient) Pipeline() *Pipeline
func (*SentinelClient) Stats ¶
func (s *SentinelClient) Stats() async.PoolStats
type SentinelConfig ¶
type SentinelConfig struct {
// MasterName is the Sentinel master group name (required).
MasterName string
// SentinelAddrs is the list of sentinel addresses in "host:port" form.
// At least one is required.
SentinelAddrs []string
// Password is the AUTH credential for the Redis master (not sentinel).
Password string
// SentinelPassword is the AUTH credential for sentinel connections (Redis 6+).
SentinelPassword string
// Username is the ACL user for the Redis master (Redis 6+).
Username string
// DB is the database index applied via SELECT on the master.
DB int
PoolSize int
MaxIdlePerWorker int
DialTimeout time.Duration
ReadTimeout time.Duration
WriteTimeout time.Duration
Engine eventloop.ServerProvider
}
SentinelConfig configures a Sentinel-managed Redis client.
type StatusCmd ¶
type StatusCmd struct {
// contains filtered or unexported fields
}
StatusCmd is a deferred simple-string reply.
type StringCmd ¶
type StringCmd struct {
// contains filtered or unexported fields
}
StringCmd is a deferred string reply.
type Tx ¶
type Tx struct {
// contains filtered or unexported fields
}
Tx is a MULTI/EXEC transaction. Commands are buffered into a single wire write (MULTI + cmds + EXEC) and dispatched atomically. Per-command typed results are reachable via the *Cmd handles returned by each method — they populate only after Exec returns.
A Tx pins one connection for its entire lifetime. Exec releases the conn; Discard sends DISCARD and releases. Forgetting to call either leaks the conn until the pool's idle reaper closes it, so a sync.Pool-style "always defer tx.Close()" is recommended — Close is an alias for Discard that is a no-op after Exec.
func (*Tx) Discard ¶
Discard aborts the transaction. If Exec was not called, DISCARD is sent to the server to cancel MULTI state, and the conn is released. After Discard, the Tx is done.
func (*Tx) Exec ¶
Exec sends MULTI, every queued command, and EXEC in a single write, then dispatches the EXEC array's elements to the typed result slots.
Behaviour:
- The MULTI reply is +OK; each queued command's immediate reply is +QUEUED; the final EXEC reply is an array of N results.
- If EXEC returns a null array (RESP2 *-1 / RESP3 _), the transaction was aborted (e.g. a WATCHed key changed) — every typed result will return ErrTxAborted.
- If a single command errors server-side (WRONGTYPE, ...), that command's result returns the error but the others complete normally.
- If the server returns -EXECABORT, every typed result returns that error.
- After Exec, the tx is done: further Watch/Exec/command calls return an error. The conn is returned to the pool.