proxy

package
v0.0.0-...-a779712 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 23, 2026 License: AGPL-3.0 Imports: 18 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Backend

type Backend interface {
	// Do sends a single command and returns its result.
	Do(ctx context.Context, args ...any) *redis.Cmd
	// Pipeline sends multiple commands in a pipeline.
	Pipeline(ctx context.Context, cmds [][]any) ([]*redis.Cmd, error)
	// Close releases the underlying connection.
	Close() error
	// Name identifies this backend for logging and metrics.
	Name() string
}

Backend abstracts a Redis-protocol endpoint (real Redis or ElasticKV).

type BackendOptions

type BackendOptions struct {
	DB           int
	Password     string
	PoolSize     int
	DialTimeout  time.Duration
	ReadTimeout  time.Duration
	WriteTimeout time.Duration
}

BackendOptions configures the underlying go-redis connection pool.

func DefaultBackendOptions

func DefaultBackendOptions() BackendOptions

DefaultBackendOptions returns reasonable defaults for a proxy backend.

type CommandCategory

type CommandCategory int

CommandCategory classifies a Redis command for routing purposes.

const (
	CmdRead     CommandCategory = iota // GET, HGET, LRANGE, ZRANGE, etc.
	CmdWrite                           // SET, DEL, HSET, LPUSH, ZADD, etc.
	CmdBlocking                        // BLPOP, BRPOP, BZPOPMIN, XREAD (with BLOCK)
	CmdPubSub                          // SUBSCRIBE, UNSUBSCRIBE, PSUBSCRIBE, PUNSUBSCRIBE, PUBSUB (note: PUBLISH is CmdWrite)
	CmdAdmin                           // PING, INFO, CLIENT, SELECT, QUIT, DBSIZE, SCAN, AUTH
	CmdTxn                             // MULTI, EXEC, DISCARD
	CmdScript                          // EVAL, EVALSHA
)

func ClassifyCommand

func ClassifyCommand(name string, args [][]byte) CommandCategory

ClassifyCommand returns the category for a Redis command name. XREAD/XREADGROUP is classified as CmdBlocking if args contain BLOCK, otherwise CmdRead. Unknown commands default to CmdWrite (sent to both backends).

type Divergence

type Divergence struct {
	Command    string
	Key        string
	Pattern    string // non-empty for PSUBSCRIBE/pmessage divergences
	Kind       DivergenceKind
	Primary    any
	Secondary  any
	DetectedAt time.Time
}

Divergence records a detected mismatch between primary and secondary.

type DivergenceKind

type DivergenceKind int

DivergenceKind classifies the nature of a shadow-read mismatch.

const (
	DivMigrationGap DivergenceKind = iota // Secondary nil/empty, Primary has data → expected during migration
	DivDataMismatch                       // Both have data but differ → real inconsistency
	DivExtraData                          // Primary nil, Secondary has data → unexpected
)

func (DivergenceKind) String

func (k DivergenceKind) String() string

type DualWriter

type DualWriter struct {
	// contains filtered or unexported fields
}

DualWriter routes commands to primary and secondary backends based on mode.

func NewDualWriter

func NewDualWriter(primary, secondary Backend, cfg ProxyConfig, metrics *ProxyMetrics, sentryReporter *SentryReporter, logger *slog.Logger) *DualWriter

NewDualWriter creates a DualWriter with the given backends.

func (*DualWriter) Admin

func (d *DualWriter) Admin(ctx context.Context, cmd string, args [][]byte) (any, error)

Admin forwards an admin command to the primary only. cmd must be the pre-uppercased command name.

func (*DualWriter) Blocking

func (d *DualWriter) Blocking(ctx context.Context, cmd string, args [][]byte) (any, error)

Blocking forwards a blocking command to the primary only. Optionally sends a short-timeout version to secondary for warmup. cmd must be the pre-uppercased command name.

func (*DualWriter) Close

func (d *DualWriter) Close()

Close waits for all in-flight async goroutines to finish. Should be called during graceful shutdown.

func (*DualWriter) Primary

func (d *DualWriter) Primary() Backend

Primary returns the primary backend for direct use (e.g., PubSub).

func (*DualWriter) PubSubBackend

func (d *DualWriter) PubSubBackend() PubSubBackend

PubSubBackend returns the primary backend as a PubSubBackend, or nil.

func (*DualWriter) Read

func (d *DualWriter) Read(ctx context.Context, cmd string, args [][]byte) (any, error)

Read sends a read command to the primary and optionally performs a shadow read. cmd must be the pre-uppercased command name.

func (*DualWriter) Script

func (d *DualWriter) Script(ctx context.Context, cmd string, args [][]byte) (any, error)

Script forwards EVAL/EVALSHA to the primary, and async replays to secondary. cmd must be the pre-uppercased command name.

func (*DualWriter) Secondary

func (d *DualWriter) Secondary() Backend

Secondary returns the secondary backend.

func (*DualWriter) ShadowPubSubBackend

func (d *DualWriter) ShadowPubSubBackend() PubSubBackend

ShadowPubSubBackend returns the secondary backend as a PubSubBackend when shadow mode is active, or nil otherwise.

func (*DualWriter) Write

func (d *DualWriter) Write(ctx context.Context, cmd string, args [][]byte) (any, error)

Write sends a write command to the primary synchronously, then to the secondary asynchronously. cmd must be the pre-uppercased command name.

type ProxyConfig

type ProxyConfig struct {
	ListenAddr          string
	PrimaryAddr         string
	PrimaryDB           int
	PrimaryPassword     string
	SecondaryAddr       string
	SecondaryDB         int
	SecondaryPassword   string
	Mode                ProxyMode
	SecondaryTimeout    time.Duration
	ShadowTimeout       time.Duration
	SentryDSN           string
	SentryEnv           string
	SentrySampleRate    float64
	MetricsAddr         string
	PubSubCompareWindow time.Duration
}

ProxyConfig holds all configuration for the dual-write proxy.

func DefaultConfig

func DefaultConfig() ProxyConfig

DefaultConfig returns a ProxyConfig with sensible defaults.

type ProxyMetrics

type ProxyMetrics struct {
	CommandTotal    *prometheus.CounterVec
	CommandDuration *prometheus.HistogramVec

	PrimaryWriteErrors   prometheus.Counter
	SecondaryWriteErrors prometheus.Counter
	PrimaryReadErrors    prometheus.Counter
	ShadowReadErrors     prometheus.Counter
	Divergences          *prometheus.CounterVec
	MigrationGaps        *prometheus.CounterVec

	ActiveConnections prometheus.Gauge

	AsyncDrops prometheus.Counter

	PubSubShadowDivergences *prometheus.CounterVec
	PubSubShadowErrors      prometheus.Counter
}

ProxyMetrics holds all Prometheus metrics for the dual-write proxy.

func NewProxyMetrics

func NewProxyMetrics(reg prometheus.Registerer) *ProxyMetrics

NewProxyMetrics creates and registers all proxy metrics.

type ProxyMode

type ProxyMode int

ProxyMode controls which backends receive reads and writes.

const (
	ModeRedisOnly        ProxyMode = iota // Redis only (passthrough)
	ModeDualWrite                         // Write to both, read from Redis
	ModeDualWriteShadow                   // Write to both, read from Redis + shadow read from ElasticKV
	ModeElasticKVPrimary                  // Write to both, read from ElasticKV + shadow read from Redis
	ModeElasticKVOnly                     // ElasticKV only
)

func ParseProxyMode

func ParseProxyMode(s string) (ProxyMode, bool)

ParseProxyMode converts a string to ProxyMode. Returns false if unknown.

func (ProxyMode) String

func (m ProxyMode) String() string

type ProxyServer

type ProxyServer struct {
	// contains filtered or unexported fields
}

ProxyServer is a Redis-protocol proxy that dual-writes to two backends.

func NewProxyServer

func NewProxyServer(cfg ProxyConfig, dual *DualWriter, metrics *ProxyMetrics, sentryReporter *SentryReporter, logger *slog.Logger) *ProxyServer

NewProxyServer creates a proxy server with the given configuration and backends.

func (*ProxyServer) ListenAndServe

func (p *ProxyServer) ListenAndServe(ctx context.Context) error

ListenAndServe starts the redcon proxy server.

type PubSubBackend

type PubSubBackend interface {
	NewPubSub(ctx context.Context) *redis.PubSub
}

PubSubBackend is an optional interface for backends that support creating dedicated PubSub connections.

type RedisBackend

type RedisBackend struct {
	// contains filtered or unexported fields
}

RedisBackend connects to an upstream Redis instance via go-redis.

func NewRedisBackend

func NewRedisBackend(addr string, name string) *RedisBackend

NewRedisBackend creates a Backend targeting a Redis server with default pool options.

func NewRedisBackendWithOptions

func NewRedisBackendWithOptions(addr string, name string, opts BackendOptions) *RedisBackend

NewRedisBackendWithOptions creates a Backend with explicit pool configuration.

func (*RedisBackend) Close

func (b *RedisBackend) Close() error

func (*RedisBackend) Do

func (b *RedisBackend) Do(ctx context.Context, args ...any) *redis.Cmd

func (*RedisBackend) DoWithTimeout

func (b *RedisBackend) DoWithTimeout(ctx context.Context, timeout time.Duration, args ...any) *redis.Cmd

DoWithTimeout executes a command using a per-call socket timeout override. This is used for blocking commands whose wait time exceeds the backend's default read timeout.

func (*RedisBackend) Name

func (b *RedisBackend) Name() string

func (*RedisBackend) NewPubSub

func (b *RedisBackend) NewPubSub(ctx context.Context) *redis.PubSub

NewPubSub creates a dedicated PubSub connection (not from the pool).

func (*RedisBackend) Pipeline

func (b *RedisBackend) Pipeline(ctx context.Context, cmds [][]any) ([]*redis.Cmd, error)

type SentryReporter

type SentryReporter struct {
	// contains filtered or unexported fields
}

SentryReporter sends anomaly events to Sentry with de-duplication.

func NewSentryReporter

func NewSentryReporter(dsn string, environment string, sampleRate float64, logger *slog.Logger) *SentryReporter

NewSentryReporter initialises Sentry. If dsn is empty, reporting is disabled.

func (*SentryReporter) CaptureDivergence

func (r *SentryReporter) CaptureDivergence(div Divergence)

CaptureDivergence reports a data divergence to Sentry with cooldown-based de-duplication.

func (*SentryReporter) CaptureException

func (r *SentryReporter) CaptureException(err error, operation string, args [][]byte)

CaptureException reports an error to Sentry.

func (*SentryReporter) Flush

func (r *SentryReporter) Flush(timeout time.Duration)

Flush waits for pending Sentry events.

func (*SentryReporter) ShouldReport

func (r *SentryReporter) ShouldReport(fingerprint string) bool

ShouldReport checks if this fingerprint has been reported recently (cooldown-based). Evicts expired entries when the map reaches maxReportEntries to bound memory usage. Returns false (drops the report) if the map is still at capacity after eviction. Returns false immediately if Sentry reporting is disabled.

type ShadowReader

type ShadowReader struct {
	// contains filtered or unexported fields
}

ShadowReader compares primary and secondary read results.

func NewShadowReader

func NewShadowReader(secondary Backend, metrics *ProxyMetrics, sentryReporter *SentryReporter, logger *slog.Logger, timeout time.Duration) *ShadowReader

NewShadowReader creates a ShadowReader.

func (*ShadowReader) Compare

func (s *ShadowReader) Compare(ctx context.Context, cmd string, args [][]byte, primaryResp any, primaryErr error)

Compare issues the same read to the secondary and checks for divergence.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL