Documentation
¶
Index ¶
- type Backend
- type BackendOptions
- type CommandCategory
- type Divergence
- type DivergenceKind
- type DualWriter
- func (d *DualWriter) Admin(ctx context.Context, cmd string, args [][]byte) (any, error)
- func (d *DualWriter) Blocking(ctx context.Context, cmd string, args [][]byte) (any, error)
- func (d *DualWriter) Close()
- func (d *DualWriter) Primary() Backend
- func (d *DualWriter) PubSubBackend() PubSubBackend
- func (d *DualWriter) Read(ctx context.Context, cmd string, args [][]byte) (any, error)
- func (d *DualWriter) Script(ctx context.Context, cmd string, args [][]byte) (any, error)
- func (d *DualWriter) Secondary() Backend
- func (d *DualWriter) ShadowPubSubBackend() PubSubBackend
- func (d *DualWriter) Write(ctx context.Context, cmd string, args [][]byte) (any, error)
- type ProxyConfig
- type ProxyMetrics
- type ProxyMode
- type ProxyServer
- type PubSubBackend
- type RedisBackend
- func (b *RedisBackend) Close() error
- func (b *RedisBackend) Do(ctx context.Context, args ...any) *redis.Cmd
- func (b *RedisBackend) DoWithTimeout(ctx context.Context, timeout time.Duration, args ...any) *redis.Cmd
- func (b *RedisBackend) Name() string
- func (b *RedisBackend) NewPubSub(ctx context.Context) *redis.PubSub
- func (b *RedisBackend) Pipeline(ctx context.Context, cmds [][]any) ([]*redis.Cmd, error)
- type SentryReporter
- type ShadowReader
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Backend ¶
type Backend interface {
// Do sends a single command and returns its result.
Do(ctx context.Context, args ...any) *redis.Cmd
// Pipeline sends multiple commands in a pipeline.
Pipeline(ctx context.Context, cmds [][]any) ([]*redis.Cmd, error)
// Close releases the underlying connection.
Close() error
// Name identifies this backend for logging and metrics.
Name() string
}
Backend abstracts a Redis-protocol endpoint (real Redis or ElasticKV).
type BackendOptions ¶
type BackendOptions struct {
DB int
Password string
PoolSize int
DialTimeout time.Duration
ReadTimeout time.Duration
WriteTimeout time.Duration
}
BackendOptions configures the underlying go-redis connection pool.
func DefaultBackendOptions ¶
func DefaultBackendOptions() BackendOptions
DefaultBackendOptions returns reasonable defaults for a proxy backend.
type CommandCategory ¶
type CommandCategory int
CommandCategory classifies a Redis command for routing purposes.
const ( CmdRead CommandCategory = iota // GET, HGET, LRANGE, ZRANGE, etc. CmdWrite // SET, DEL, HSET, LPUSH, ZADD, etc. CmdBlocking // BLPOP, BRPOP, BZPOPMIN, XREAD (with BLOCK) CmdPubSub // SUBSCRIBE, UNSUBSCRIBE, PSUBSCRIBE, PUNSUBSCRIBE, PUBSUB (note: PUBLISH is CmdWrite) CmdAdmin // PING, INFO, CLIENT, SELECT, QUIT, DBSIZE, SCAN, AUTH CmdTxn // MULTI, EXEC, DISCARD CmdScript // EVAL, EVALSHA )
func ClassifyCommand ¶
func ClassifyCommand(name string, args [][]byte) CommandCategory
ClassifyCommand returns the category for a Redis command name. XREAD/XREADGROUP is classified as CmdBlocking if args contain BLOCK, otherwise CmdRead. Unknown commands default to CmdWrite (sent to both backends).
type Divergence ¶
type Divergence struct {
Command string
Key string
Pattern string // non-empty for PSUBSCRIBE/pmessage divergences
Kind DivergenceKind
Primary any
Secondary any
DetectedAt time.Time
}
Divergence records a detected mismatch between primary and secondary.
type DivergenceKind ¶
type DivergenceKind int
DivergenceKind classifies the nature of a shadow-read mismatch.
const ( DivMigrationGap DivergenceKind = iota // Secondary nil/empty, Primary has data → expected during migration DivDataMismatch // Both have data but differ → real inconsistency DivExtraData // Primary nil, Secondary has data → unexpected )
func (DivergenceKind) String ¶
func (k DivergenceKind) String() string
type DualWriter ¶
type DualWriter struct {
// contains filtered or unexported fields
}
DualWriter routes commands to primary and secondary backends based on mode.
func NewDualWriter ¶
func NewDualWriter(primary, secondary Backend, cfg ProxyConfig, metrics *ProxyMetrics, sentryReporter *SentryReporter, logger *slog.Logger) *DualWriter
NewDualWriter creates a DualWriter with the given backends.
func (*DualWriter) Admin ¶
Admin forwards an admin command to the primary only. cmd must be the pre-uppercased command name.
func (*DualWriter) Blocking ¶
Blocking forwards a blocking command to the primary only. Optionally sends a short-timeout version to secondary for warmup. cmd must be the pre-uppercased command name.
func (*DualWriter) Close ¶
func (d *DualWriter) Close()
Close waits for all in-flight async goroutines to finish. Should be called during graceful shutdown.
func (*DualWriter) Primary ¶
func (d *DualWriter) Primary() Backend
Primary returns the primary backend for direct use (e.g., PubSub).
func (*DualWriter) PubSubBackend ¶
func (d *DualWriter) PubSubBackend() PubSubBackend
PubSubBackend returns the primary backend as a PubSubBackend, or nil.
func (*DualWriter) Read ¶
Read sends a read command to the primary and optionally performs a shadow read. cmd must be the pre-uppercased command name.
func (*DualWriter) Script ¶
Script forwards EVAL/EVALSHA to the primary, and async replays to secondary. cmd must be the pre-uppercased command name.
func (*DualWriter) Secondary ¶
func (d *DualWriter) Secondary() Backend
Secondary returns the secondary backend.
func (*DualWriter) ShadowPubSubBackend ¶
func (d *DualWriter) ShadowPubSubBackend() PubSubBackend
ShadowPubSubBackend returns the secondary backend as a PubSubBackend when shadow mode is active, or nil otherwise.
type ProxyConfig ¶
type ProxyConfig struct {
ListenAddr string
PrimaryAddr string
PrimaryDB int
PrimaryPassword string
SecondaryAddr string
SecondaryDB int
SecondaryPassword string
Mode ProxyMode
SecondaryTimeout time.Duration
ShadowTimeout time.Duration
SentryDSN string
SentryEnv string
SentrySampleRate float64
MetricsAddr string
PubSubCompareWindow time.Duration
}
ProxyConfig holds all configuration for the dual-write proxy.
func DefaultConfig ¶
func DefaultConfig() ProxyConfig
DefaultConfig returns a ProxyConfig with sensible defaults.
type ProxyMetrics ¶
type ProxyMetrics struct {
CommandTotal *prometheus.CounterVec
CommandDuration *prometheus.HistogramVec
PrimaryWriteErrors prometheus.Counter
SecondaryWriteErrors prometheus.Counter
PrimaryReadErrors prometheus.Counter
ShadowReadErrors prometheus.Counter
Divergences *prometheus.CounterVec
MigrationGaps *prometheus.CounterVec
ActiveConnections prometheus.Gauge
AsyncDrops prometheus.Counter
PubSubShadowDivergences *prometheus.CounterVec
PubSubShadowErrors prometheus.Counter
}
ProxyMetrics holds all Prometheus metrics for the dual-write proxy.
func NewProxyMetrics ¶
func NewProxyMetrics(reg prometheus.Registerer) *ProxyMetrics
NewProxyMetrics creates and registers all proxy metrics.
type ProxyMode ¶
type ProxyMode int
ProxyMode controls which backends receive reads and writes.
const ( ModeRedisOnly ProxyMode = iota // Redis only (passthrough) ModeDualWrite // Write to both, read from Redis ModeDualWriteShadow // Write to both, read from Redis + shadow read from ElasticKV ModeElasticKVPrimary // Write to both, read from ElasticKV + shadow read from Redis ModeElasticKVOnly // ElasticKV only )
func ParseProxyMode ¶
ParseProxyMode converts a string to ProxyMode. Returns false if unknown.
type ProxyServer ¶
type ProxyServer struct {
// contains filtered or unexported fields
}
ProxyServer is a Redis-protocol proxy that dual-writes to two backends.
func NewProxyServer ¶
func NewProxyServer(cfg ProxyConfig, dual *DualWriter, metrics *ProxyMetrics, sentryReporter *SentryReporter, logger *slog.Logger) *ProxyServer
NewProxyServer creates a proxy server with the given configuration and backends.
func (*ProxyServer) ListenAndServe ¶
func (p *ProxyServer) ListenAndServe(ctx context.Context) error
ListenAndServe starts the redcon proxy server.
type PubSubBackend ¶
PubSubBackend is an optional interface for backends that support creating dedicated PubSub connections.
type RedisBackend ¶
type RedisBackend struct {
// contains filtered or unexported fields
}
RedisBackend connects to an upstream Redis instance via go-redis.
func NewRedisBackend ¶
func NewRedisBackend(addr string, name string) *RedisBackend
NewRedisBackend creates a Backend targeting a Redis server with default pool options.
func NewRedisBackendWithOptions ¶
func NewRedisBackendWithOptions(addr string, name string, opts BackendOptions) *RedisBackend
NewRedisBackendWithOptions creates a Backend with explicit pool configuration.
func (*RedisBackend) Close ¶
func (b *RedisBackend) Close() error
func (*RedisBackend) DoWithTimeout ¶
func (b *RedisBackend) DoWithTimeout(ctx context.Context, timeout time.Duration, args ...any) *redis.Cmd
DoWithTimeout executes a command using a per-call socket timeout override. This is used for blocking commands whose wait time exceeds the backend's default read timeout.
func (*RedisBackend) Name ¶
func (b *RedisBackend) Name() string
type SentryReporter ¶
type SentryReporter struct {
// contains filtered or unexported fields
}
SentryReporter sends anomaly events to Sentry with de-duplication.
func NewSentryReporter ¶
func NewSentryReporter(dsn string, environment string, sampleRate float64, logger *slog.Logger) *SentryReporter
NewSentryReporter initialises Sentry. If dsn is empty, reporting is disabled.
func (*SentryReporter) CaptureDivergence ¶
func (r *SentryReporter) CaptureDivergence(div Divergence)
CaptureDivergence reports a data divergence to Sentry with cooldown-based de-duplication.
func (*SentryReporter) CaptureException ¶
func (r *SentryReporter) CaptureException(err error, operation string, args [][]byte)
CaptureException reports an error to Sentry.
func (*SentryReporter) Flush ¶
func (r *SentryReporter) Flush(timeout time.Duration)
Flush waits for pending Sentry events.
func (*SentryReporter) ShouldReport ¶
func (r *SentryReporter) ShouldReport(fingerprint string) bool
ShouldReport checks if this fingerprint has been reported recently (cooldown-based). Evicts expired entries when the map reaches maxReportEntries to bound memory usage. Returns false (drops the report) if the map is still at capacity after eviction. Returns false immediately if Sentry reporting is disabled.
type ShadowReader ¶
type ShadowReader struct {
// contains filtered or unexported fields
}
ShadowReader compares primary and secondary read results.
func NewShadowReader ¶
func NewShadowReader(secondary Backend, metrics *ProxyMetrics, sentryReporter *SentryReporter, logger *slog.Logger, timeout time.Duration) *ShadowReader
NewShadowReader creates a ShadowReader.