redis

package
v0.1.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 17, 2026 License: GPL-3.0 Imports: 17 Imported by: 0

Documentation

Overview

Package redis provides production-ready Redis integration for the OpenCTEM application.

Overview

This package provides four main components:

  • Client: Connection management with TLS, pooling, and retry logic
  • Cache[T]: Type-safe generic caching with TTL support
  • TokenStore: JWT blacklist, session management, and refresh tokens
  • RateLimiter: Distributed rate limiting using sliding window algorithm

Quick Start

Initialize the Redis client:

cfg := &config.RedisConfig{
	Host:          "localhost",
	Port:          6379,
	Password:      "secret",
	DB:            0,
	PoolSize:      10,
	MinIdleConns:  2,
	DialTimeout:   5 * time.Second,
	ReadTimeout:   3 * time.Second,
	WriteTimeout:  3 * time.Second,
	TLSEnabled:    true,  // Required in production
	TLSSkipVerify: false, // Must be false in production
	MaxRetries:    3,
	MinRetryDelay: 100 * time.Millisecond,
	MaxRetryDelay: 3 * time.Second,
}

client, err := redis.New(cfg, logger)
if err != nil {
	log.Fatal(err)
}
defer client.Close()

Using the Generic Cache

Create a type-safe cache for any struct:

type User struct {
	ID    string `json:"id"`
	Name  string `json:"name"`
	Email string `json:"email"`
}

// Create cache with 1 hour TTL
userCache, err := redis.NewCache[User](client, "users", time.Hour)
if err != nil {
	log.Fatal(err)
}

// Store a user
user := User{ID: "123", Name: "John", Email: "john@example.com"}
if err := userCache.Set(ctx, user.ID, user); err != nil {
	log.Error("failed to cache user", "error", err)
}

// Retrieve a user
cached, err := userCache.Get(ctx, "123")
if errors.Is(err, redis.ErrCacheMiss) {
	// Cache miss - load from database
} else if err != nil {
	log.Error("cache error", "error", err)
}

// Get or set pattern (cache-aside)
user, err := userCache.GetOrSet(ctx, "123", func(ctx context.Context) (*User, error) {
	return userRepo.FindByID(ctx, "123")
})

Using the Token Store

Manage JWT blacklist and sessions:

tokenStore, err := redis.NewTokenStore(client, logger)
if err != nil {
	log.Fatal(err)
}

// Blacklist a JWT token (for logout)
err = tokenStore.BlacklistToken(ctx, jti, tokenExpiry)

// Check if token is blacklisted (in auth middleware)
blacklisted, err := tokenStore.IsBlacklisted(ctx, jti)
if blacklisted {
	return ErrTokenRevoked
}

// Store user session
sessionData := map[string]string{
	"user_agent": r.UserAgent(),
	"ip":         r.RemoteAddr,
	"created_at": time.Now().Format(time.RFC3339),
}
err = tokenStore.StoreSession(ctx, userID, sessionID, sessionData, 24*time.Hour)

// Delete all sessions (force logout from all devices)
err = tokenStore.DeleteAllUserSessions(ctx, userID)

// Refresh token rotation
err = tokenStore.RotateRefreshToken(ctx, userID, oldHash, newHash, 7*24*time.Hour)

Using the Rate Limiter

Distributed rate limiting:

rateLimiter, err := redis.NewRateLimiter(
	client,
	"api",           // key prefix
	100,             // 100 requests
	time.Minute,     // per minute
	logger,
)
if err != nil {
	log.Fatal(err)
}

// In HTTP middleware
result, err := rateLimiter.Allow(ctx, clientIP)
if err != nil {
	// Redis error - decide on fallback strategy
	log.Error("rate limit check failed", "error", err)
}
if !result.Allowed {
	w.Header().Set("Retry-After", result.RetryAt.Format(time.RFC1123))
	http.Error(w, "Too Many Requests", http.StatusTooManyRequests)
	return
}
w.Header().Set("X-RateLimit-Remaining", strconv.Itoa(result.Remaining))

Production Configuration

Required settings for production:

# .env (production)
REDIS_HOST=redis.internal
REDIS_PORT=6379
REDIS_PASSWORD=<strong-password>
REDIS_DB=0
REDIS_POOL_SIZE=25
REDIS_MIN_IDLE_CONNS=5
REDIS_DIAL_TIMEOUT=5s
REDIS_READ_TIMEOUT=3s
REDIS_WRITE_TIMEOUT=3s
REDIS_TLS_ENABLED=true
REDIS_TLS_SKIP_VERIFY=false
REDIS_MAX_RETRIES=3
REDIS_MIN_RETRY_DELAY=100ms
REDIS_MAX_RETRY_DELAY=3s

Health Checks

Use the Ping method for health checks:

func (h *HealthHandler) Ready(w http.ResponseWriter, r *http.Request) {
	ctx, cancel := context.WithTimeout(r.Context(), 5*time.Second)
	defer cancel()

	if err := h.redis.Ping(ctx); err != nil {
		http.Error(w, "redis unavailable", http.StatusServiceUnavailable)
		return
	}
	w.WriteHeader(http.StatusOK)
}

Error Handling

The package defines specific errors for common cases:

var (
	ErrKeyNotFound = errors.New("redis: key not found")
	ErrCacheMiss   = errors.New("cache: key not found")
)

Use errors.Is for error checking:

if errors.Is(err, redis.ErrCacheMiss) {
	// Handle cache miss
}

Thread Safety

All components are safe for concurrent use. The underlying go-redis client manages connection pooling automatically.

Graceful Shutdown

Always close the client on application shutdown:

// In main.go
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
<-sigChan

// Graceful shutdown
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()

if err := redisClient.Close(); err != nil {
	log.Error("failed to close redis", "error", err)
}

Index

Constants

View Source
const (
	// JobNotifyChannel is the Redis pub/sub channel for job notifications.
	JobNotifyChannel = "platform:jobs:notify"

	// JobNotifyChannelPrefix is the prefix for capability-specific channels.
	JobNotifyChannelPrefix = "platform:jobs:notify:"
)

Variables

View Source
var (
	// ErrKeyNotFound is returned when a key does not exist.
	ErrKeyNotFound = errors.New("redis: key not found")

	// ErrCacheMiss is returned when a cached item is not found.
	ErrCacheMiss = errors.New("cache: key not found")
)

Redis-specific errors.

Functions

func StartPoolStatsCollector

func StartPoolStatsCollector(ctx context.Context, client *Client, interval time.Duration) func()

StartPoolStatsCollector starts a goroutine that periodically updates pool stats. Returns a cancel function to stop the collector.

func Timed

func Timed(operation string) func(error)

Timed is a helper to time operations. Use with defer:

done := redis.Timed("get")
result, err := cache.Get(ctx, key)
done(err)

Types

type AgentHeartbeat

type AgentHeartbeat struct {
	AgentID       string    `json:"agent_id"`
	TenantID      string    `json:"tenant_id,omitempty"` // Empty for platform agents
	IsPlatform    bool      `json:"is_platform"`
	Status        string    `json:"status"`
	Health        string    `json:"health"`
	CurrentJobs   int       `json:"current_jobs"`
	MaxConcurrent int       `json:"max_concurrent"`
	LastHeartbeat time.Time `json:"last_heartbeat"`
	IPAddress     string    `json:"ip_address,omitempty"`
	Region        string    `json:"region,omitempty"`
	Version       string    `json:"version,omitempty"`
	// Extended metrics for load balancing
	CPUPercent    float64 `json:"cpu_percent,omitempty"`
	MemoryPercent float64 `json:"memory_percent,omitempty"`
	LoadScore     float64 `json:"load_score,omitempty"` // Weighted load score (lower is better)
}

AgentHeartbeat represents the heartbeat data stored in Redis.

type AgentStateStore

type AgentStateStore struct {
	// contains filtered or unexported fields
}

AgentStateStore manages ephemeral agent state in Redis.

func NewAgentStateStore

func NewAgentStateStore(client *Client, log *logger.Logger) *AgentStateStore

NewAgentStateStore creates a new AgentStateStore.

func (*AgentStateStore) CleanupStaleAgents

func (s *AgentStateStore) CleanupStaleAgents(ctx context.Context, threshold time.Duration) (int, error)

CleanupStaleAgents removes stale agent data from Redis.

func (*AgentStateStore) GetAgentActiveJobCount

func (s *AgentStateStore) GetAgentActiveJobCount(ctx context.Context, agentID shared.ID) (int64, error)

GetAgentActiveJobCount returns the count of active jobs for an agent.

func (*AgentStateStore) GetAgentActiveJobs

func (s *AgentStateStore) GetAgentActiveJobs(ctx context.Context, agentID shared.ID) ([]string, error)

GetAgentActiveJobs returns all active job IDs for an agent.

func (*AgentStateStore) GetAgentConfig

func (s *AgentStateStore) GetAgentConfig(ctx context.Context, agentID shared.ID) (*CachedAgentConfig, error)

GetAgentConfig retrieves cached agent configuration. Returns nil, nil if not cached (caller should load from DB and cache).

func (*AgentStateStore) GetAgentsWithStaleHeartbeat

func (s *AgentStateStore) GetAgentsWithStaleHeartbeat(ctx context.Context, threshold time.Duration) ([]string, error)

GetAgentsWithStaleHeartbeat returns agent IDs whose heartbeat is older than the threshold. Used by health monitor to detect agents that went offline.

func (*AgentStateStore) GetHeartbeat

func (s *AgentStateStore) GetHeartbeat(ctx context.Context, agentID shared.ID) (*AgentHeartbeat, error)

GetHeartbeat retrieves the latest heartbeat for an agent.

func (*AgentStateStore) GetLastHeartbeatTime

func (s *AgentStateStore) GetLastHeartbeatTime(ctx context.Context, agentID shared.ID) (time.Time, error)

GetLastHeartbeatTime returns the last heartbeat timestamp for an agent. Returns zero time if no heartbeat exists.

func (*AgentStateStore) GetOnlinePlatformAgentCount

func (s *AgentStateStore) GetOnlinePlatformAgentCount(ctx context.Context) (int64, error)

GetOnlinePlatformAgentCount returns the count of online platform agents.

func (*AgentStateStore) GetOnlinePlatformAgents

func (s *AgentStateStore) GetOnlinePlatformAgents(ctx context.Context) ([]string, error)

GetOnlinePlatformAgents returns all online platform agent IDs.

func (*AgentStateStore) GetPlatformAgentState

func (s *AgentStateStore) GetPlatformAgentState(ctx context.Context, agentID shared.ID) (*PlatformAgentState, error)

GetPlatformAgentState retrieves the state of a platform agent.

func (*AgentStateStore) GetPreviousHealthState

func (s *AgentStateStore) GetPreviousHealthState(ctx context.Context, agentID shared.ID) (string, error)

GetPreviousHealthState returns the previous health state of an agent. Used to detect state transitions (offline -> online, online -> offline).

func (*AgentStateStore) GetQueueStats

func (s *AgentStateStore) GetQueueStats(ctx context.Context) (*QueueStats, error)

GetQueueStats retrieves queue statistics.

func (*AgentStateStore) IncrementQueueStat

func (s *AgentStateStore) IncrementQueueStat(ctx context.Context, field string, delta int64) error

IncrementQueueStat increments a specific queue stat counter.

func (*AgentStateStore) InvalidateAgentConfig

func (s *AgentStateStore) InvalidateAgentConfig(ctx context.Context, agentID shared.ID) error

InvalidateAgentConfig removes cached agent configuration (e.g., after admin update).

func (*AgentStateStore) IsAgentOnline

func (s *AgentStateStore) IsAgentOnline(ctx context.Context, agentID shared.ID) (bool, error)

IsAgentOnline checks if an agent is online based on heartbeat.

func (*AgentStateStore) MarkAgentOfflineInCache

func (s *AgentStateStore) MarkAgentOfflineInCache(ctx context.Context, agentID shared.ID) error

MarkAgentOfflineInCache marks an agent as offline in the cache. Called by health monitor when heartbeat timeout is detected.

func (*AgentStateStore) RecordHeartbeat

func (s *AgentStateStore) RecordHeartbeat(ctx context.Context, hb *AgentHeartbeat) error

RecordHeartbeat records an agent heartbeat.

func (*AgentStateStore) RemoveHeartbeat

func (s *AgentStateStore) RemoveHeartbeat(ctx context.Context, agentID shared.ID) error

RemoveHeartbeat removes an agent's heartbeat (for clean shutdown).

func (*AgentStateStore) SetAgentConfig

func (s *AgentStateStore) SetAgentConfig(ctx context.Context, config *CachedAgentConfig) error

SetAgentConfig caches agent configuration to avoid DB reads on every heartbeat.

func (*AgentStateStore) SetPlatformAgentState

func (s *AgentStateStore) SetPlatformAgentState(ctx context.Context, state *PlatformAgentState) error

SetPlatformAgentState stores the state of a platform agent.

func (*AgentStateStore) SetPreviousHealthState

func (s *AgentStateStore) SetPreviousHealthState(ctx context.Context, agentID shared.ID, health string) error

SetPreviousHealthState stores the previous health state of an agent. TTL is set longer than heartbeat to ensure we can detect offline -> online transitions.

func (*AgentStateStore) TrackAgentJob

func (s *AgentStateStore) TrackAgentJob(ctx context.Context, agentID, jobID shared.ID) error

TrackAgentJob adds a job to an agent's active job set.

func (*AgentStateStore) UntrackAgentJob

func (s *AgentStateStore) UntrackAgentJob(ctx context.Context, agentID, jobID shared.ID) error

UntrackAgentJob removes a job from an agent's active job set.

func (*AgentStateStore) UpdateQueueStats

func (s *AgentStateStore) UpdateQueueStats(ctx context.Context, stats *QueueStats) error

UpdateQueueStats updates queue statistics.

func (*AgentStateStore) WasAgentOffline

func (s *AgentStateStore) WasAgentOffline(ctx context.Context, agentID shared.ID) (bool, error)

WasAgentOffline checks if agent was previously offline (for detecting online transition). Returns true if: 1. Previous health state was "offline" or "error" 2. No previous heartbeat exists (new agent or first heartbeat after long downtime)

type Cache

type Cache[T any] struct {
	// contains filtered or unexported fields
}

Cache provides type-safe caching operations.

func MustNewCache

func MustNewCache[T any](client *Client, prefix string, ttl time.Duration) *Cache[T]

MustNewCache creates a new cache or panics on error. Use only in initialization code where failure is unrecoverable.

func NewCache

func NewCache[T any](client *Client, prefix string, ttl time.Duration) (*Cache[T], error)

NewCache creates a new type-safe cache. Returns error if any parameter is invalid.

func (*Cache[T]) Delete

func (c *Cache[T]) Delete(ctx context.Context, key string) error

Delete removes a key from the cache.

func (*Cache[T]) DeletePattern

func (c *Cache[T]) DeletePattern(ctx context.Context, pattern string) error

DeletePattern removes all keys matching a pattern. Pattern example: "*" removes all keys with the cache prefix.

func (*Cache[T]) Exists

func (c *Cache[T]) Exists(ctx context.Context, key string) (bool, error)

Exists checks if a key exists in the cache.

func (*Cache[T]) Get

func (c *Cache[T]) Get(ctx context.Context, key string) (*T, error)

Get retrieves a cached value by key. Returns ErrCacheMiss if the key does not exist.

func (*Cache[T]) GetOrSet

func (c *Cache[T]) GetOrSet(ctx context.Context, key string, loader func(ctx context.Context) (*T, error)) (*T, error)

GetOrSet retrieves a value from cache, or calls the loader function and caches the result. On cache miss, loads from source and caches the result. On Redis errors (connection, timeout), fails fast to prevent cascading failures. Cache set errors are logged but do not fail the operation.

func (*Cache[T]) GetOrSetFallback

func (c *Cache[T]) GetOrSetFallback(ctx context.Context, key string, loader func(ctx context.Context) (*T, error)) (*T, error)

GetOrSetFallback is like GetOrSet but falls back to loader on any cache error. Use this when availability is more important than protecting the database. WARNING: If Redis is down, ALL requests will hit your database.

func (*Cache[T]) GetOrSetFallbackWithTTL

func (c *Cache[T]) GetOrSetFallbackWithTTL(ctx context.Context, key string, ttl time.Duration, loader func(ctx context.Context) (*T, error)) (*T, error)

GetOrSetFallbackWithTTL is like GetOrSetFallback but with a custom TTL.

func (*Cache[T]) GetOrSetWithTTL

func (c *Cache[T]) GetOrSetWithTTL(ctx context.Context, key string, ttl time.Duration, loader func(ctx context.Context) (*T, error)) (*T, error)

GetOrSetWithTTL is like GetOrSet but with a custom TTL. On cache miss, loads from source and caches the result. On Redis errors (connection, timeout), fails fast to prevent cascading failures. Cache set errors are logged but do not fail the operation.

func (*Cache[T]) Invalidate

func (c *Cache[T]) Invalidate(ctx context.Context, key string) error

Invalidate removes a key from the cache (alias for Delete).

func (*Cache[T]) MGet

func (c *Cache[T]) MGet(ctx context.Context, keys ...string) (map[string]*T, error)

MGet retrieves multiple values by keys. Returns a map of key to value. Missing keys are not included in the result.

func (*Cache[T]) MSet

func (c *Cache[T]) MSet(ctx context.Context, items map[string]T) error

MSet stores multiple values in the cache.

func (*Cache[T]) Prefix

func (c *Cache[T]) Prefix() string

Prefix returns the key prefix for this cache.

func (*Cache[T]) Refresh

func (c *Cache[T]) Refresh(ctx context.Context, key string) error

Refresh updates the TTL of a cached key without changing its value.

func (*Cache[T]) Set

func (c *Cache[T]) Set(ctx context.Context, key string, value T) error

Set stores a value in the cache with the default TTL.

func (*Cache[T]) SetWithTTL

func (c *Cache[T]) SetWithTTL(ctx context.Context, key string, value T, ttl time.Duration) error

SetWithTTL stores a value in the cache with a custom TTL.

func (*Cache[T]) TTL

func (c *Cache[T]) TTL() time.Duration

TTL returns the default TTL for this cache.

type CacheStore

type CacheStore[T any] interface {
	// Get retrieves a cached value by key.
	// Returns ErrCacheMiss if the key does not exist.
	Get(ctx context.Context, key string) (*T, error)

	// Set stores a value in the cache with the default TTL.
	Set(ctx context.Context, key string, value T) error

	// SetWithTTL stores a value in the cache with a custom TTL.
	SetWithTTL(ctx context.Context, key string, value T, ttl time.Duration) error

	// Delete removes a key from the cache.
	Delete(ctx context.Context, key string) error

	// Exists checks if a key exists in the cache.
	Exists(ctx context.Context, key string) (bool, error)

	// GetOrSet retrieves from cache or calls loader and caches the result.
	GetOrSet(ctx context.Context, key string, loader func(ctx context.Context) (*T, error)) (*T, error)
}

CacheStore defines the interface for cache operations. Use this interface in application code for better testability.

type CachedAgentConfig

type CachedAgentConfig struct {
	AgentID       string   `json:"agent_id"`
	TenantID      string   `json:"tenant_id,omitempty"` // Empty for platform agents
	IsPlatform    bool     `json:"is_platform"`
	Status        string   `json:"status"` // Admin-controlled status (active, disabled, revoked)
	Capabilities  []string `json:"capabilities"`
	Tools         []string `json:"tools"`
	MaxConcurrent int      `json:"max_concurrent"`
	Region        string   `json:"region,omitempty"`
	CachedAt      int64    `json:"cached_at"` // Unix timestamp
}

CachedAgentConfig represents the cached agent configuration to avoid DB reads on every heartbeat.

type Client

type Client struct {
	// contains filtered or unexported fields
}

Client wraps redis.Client with additional functionality.

func New

func New(cfg *config.RedisConfig, log *logger.Logger) (*Client, error)

New creates a new Redis client.

func (*Client) Client

func (c *Client) Client() *redis.Client

Client returns the underlying redis.Client for advanced operations.

func (*Client) Close

func (c *Client) Close() error

Close closes the Redis connection.

func (*Client) Del

func (c *Client) Del(ctx context.Context, keys ...string) error

Del deletes one or more keys.

func (*Client) Exists

func (c *Client) Exists(ctx context.Context, key string) (bool, error)

Exists checks if a key exists.

func (*Client) Expire

func (c *Client) Expire(ctx context.Context, key string, ttl time.Duration) error

Expire sets a TTL on a key.

func (*Client) Get

func (c *Client) Get(ctx context.Context, key string) (string, error)

Get retrieves a string value by key.

func (*Client) Keys

func (c *Client) Keys(ctx context.Context, pattern string) ([]string, error)

Keys returns all keys matching a pattern. Warning: Use with caution in production as it can be slow.

func (*Client) Logger

func (c *Client) Logger() *logger.Logger

Logger returns the client's logger for use by other redis components.

func (*Client) Ping

func (c *Client) Ping(ctx context.Context) error

Ping checks if Redis is available.

func (*Client) PoolStats

func (c *Client) PoolStats() *redis.PoolStats

PoolStats returns connection pool statistics.

func (*Client) Scan

func (c *Client) Scan(ctx context.Context, pattern string, count int64) ([]string, error)

Scan iterates through keys matching a pattern. This is more production-safe than Keys().

func (*Client) Set

func (c *Client) Set(ctx context.Context, key, value string, ttl time.Duration) error

Set stores a string value with optional TTL.

func (*Client) TTL

func (c *Client) TTL(ctx context.Context, key string) (time.Duration, error)

TTL returns the remaining TTL of a key.

type Closer

type Closer interface {
	Close() error
}

Closer is an interface for graceful shutdown.

type JobNotification

type JobNotification struct {
	JobID        string   `json:"job_id"`
	TenantID     string   `json:"tenant_id"`
	Capabilities []string `json:"capabilities,omitempty"`
	Tools        []string `json:"tools,omitempty"`
	Priority     int      `json:"priority"`
	CreatedAt    int64    `json:"created_at"` // Unix timestamp
}

JobNotification represents a notification about a new platform job.

type JobNotifier

type JobNotifier struct {
	// contains filtered or unexported fields
}

JobNotifier handles pub/sub notifications for platform jobs. It uses Redis pub/sub to notify waiting agents when new jobs are available.

func NewJobNotifier

func NewJobNotifier(client *Client, log *logger.Logger) *JobNotifier

NewJobNotifier creates a new JobNotifier.

func (*JobNotifier) NotifyNewJob

func (n *JobNotifier) NotifyNewJob(ctx context.Context, notification *JobNotification) error

NotifyNewJob publishes a notification that a new job is available. This should be called when a new platform job is created.

func (*JobNotifier) NotifyNewPlatformJob

func (n *JobNotifier) NotifyNewPlatformJob(ctx context.Context, jobID, tenantID string, capabilities, tools []string, priority int, createdAt int64) error

NotifyNewPlatformJob is an adapter method that accepts app.PlatformJobNotification and converts it to the internal JobNotification format. This allows JobNotifier to implement app.PlatformJobNotifier interface.

func (*JobNotifier) StartListener

func (n *JobNotifier) StartListener(ctx context.Context) error

StartListener starts listening for Redis pub/sub messages and dispatches them to subscribed agents. This should be called once when the application starts.

func (*JobNotifier) Subscribe

func (n *JobNotifier) Subscribe(agentID string, capabilities []string) <-chan *JobNotification

Subscribe creates a subscription for an agent to receive job notifications. Returns a channel that will receive notifications when jobs are available. The caller should call Unsubscribe when done.

func (*JobNotifier) SubscriberCount

func (n *JobNotifier) SubscriberCount() int

SubscriberCount returns the current number of subscribers.

func (*JobNotifier) Unsubscribe

func (n *JobNotifier) Unsubscribe(agentID string)

Unsubscribe removes an agent's subscription.

func (*JobNotifier) WaitForJob

func (n *JobNotifier) WaitForJob(ctx context.Context, agentID string, capabilities []string, timeout time.Duration) bool

WaitForJob waits for a job notification or timeout. This is the main method used by the long-polling handler. Returns true if a notification was received, false on timeout.

type Metrics

type Metrics struct {
	// contains filtered or unexported fields
}

Metrics holds all Redis-related Prometheus metrics.

var DefaultMetrics *Metrics

DefaultMetrics is the default metrics instance.

func NewMetrics

func NewMetrics(namespace string) *Metrics

NewMetrics creates a new Metrics instance with the given namespace.

func (*Metrics) ObserveOperation

func (m *Metrics) ObserveOperation(operation string, duration time.Duration, err error)

ObserveOperation records the duration and result of a Redis operation.

func (*Metrics) RecordCacheHit

func (m *Metrics) RecordCacheHit(cacheName string)

RecordCacheHit records a cache hit for the given cache name.

func (*Metrics) RecordCacheMiss

func (m *Metrics) RecordCacheMiss(cacheName string)

RecordCacheMiss records a cache miss for the given cache name.

func (*Metrics) RecordRateLimitResult

func (m *Metrics) RecordRateLimitResult(limiterName string, allowed bool)

RecordRateLimitResult records the result of a rate limit check.

func (*Metrics) UpdatePoolStats

func (m *Metrics) UpdatePoolStats(client *Client)

UpdatePoolStats updates the connection pool metrics from the client.

type MetricsCollector

type MetricsCollector struct {
	// contains filtered or unexported fields
}

MetricsCollector implements prometheus.Collector for Redis pool stats.

func NewMetricsCollector

func NewMetricsCollector(client *Client, metrics *Metrics) *MetricsCollector

NewMetricsCollector creates a new MetricsCollector.

func (*MetricsCollector) Collect

func (c *MetricsCollector) Collect(_ chan<- prometheus.Metric)

Collect implements prometheus.Collector.

func (*MetricsCollector) Describe

func (c *MetricsCollector) Describe(_ chan<- *prometheus.Desc)

Describe implements prometheus.Collector.

type MiddlewareAdapter

type MiddlewareAdapter struct {
	// contains filtered or unexported fields
}

MiddlewareAdapter wraps RateLimiter to implement the middleware interface. This adapter converts the internal RateLimitResult to the middleware's expected type.

func NewMiddlewareAdapter

func NewMiddlewareAdapter(rl *RateLimiter) *MiddlewareAdapter

NewMiddlewareAdapter creates an adapter for use with the HTTP middleware.

func (*MiddlewareAdapter) Allow

Allow checks if a request is allowed and returns the result in middleware format.

func (*MiddlewareAdapter) Limit

func (a *MiddlewareAdapter) Limit() int

Limit returns the configured maximum requests per window.

type MiddlewareRateLimitResult

type MiddlewareRateLimitResult struct {
	Allowed   bool
	Remaining int
	ResetAt   time.Time
	RetryAt   time.Time
}

MiddlewareRateLimitResult is the result type expected by the middleware.

type Pinger

type Pinger interface {
	Ping(ctx context.Context) error
}

Pinger is an interface for health check operations.

type PlatformAgentState

type PlatformAgentState struct {
	AgentID       string    `json:"agent_id"`
	Health        string    `json:"health"`
	CurrentJobs   int       `json:"current_jobs"`
	MaxConcurrent int       `json:"max_concurrent"`
	Region        string    `json:"region"`
	Capabilities  []string  `json:"capabilities"`
	Tools         []string  `json:"tools"`
	LastHeartbeat time.Time `json:"last_heartbeat"`
	LastJobAt     time.Time `json:"last_job_at,omitempty"`
	TotalJobs     int64     `json:"total_jobs"`
	FailedJobs    int64     `json:"failed_jobs"`
	// Extended metrics for load balancing
	CPUPercent    float64 `json:"cpu_percent,omitempty"`
	MemoryPercent float64 `json:"memory_percent,omitempty"`
	LoadScore     float64 `json:"load_score,omitempty"` // Weighted load score (lower is better)
}

PlatformAgentState represents the state of a platform agent.

type QueueStats

type QueueStats struct {
	TotalQueued       int64     `json:"total_queued"`
	TotalProcessing   int64     `json:"total_processing"`
	TotalCompleted    int64     `json:"total_completed"`
	TotalFailed       int64     `json:"total_failed"`
	AvgWaitTimeSec    float64   `json:"avg_wait_time_sec"`
	AvgProcessTimeSec float64   `json:"avg_process_time_sec"`
	LastUpdated       time.Time `json:"last_updated"`
}

QueueStats represents queue statistics.

type RateLimitResult

type RateLimitResult struct {
	// Allowed indicates if the request is permitted.
	Allowed bool

	// Remaining is the number of requests left in the current window.
	Remaining int

	// ResetAt is when the rate limit window resets.
	ResetAt time.Time

	// RetryAt is when the client should retry (only set when not allowed).
	RetryAt time.Time
}

RateLimitResult contains the result of a rate limit check.

type RateLimiter

type RateLimiter struct {
	// contains filtered or unexported fields
}

RateLimiter implements distributed rate limiting using Redis. It uses the sliding window log algorithm with sorted sets for accurate rate limiting across distributed systems.

The sliding window algorithm tracks individual request timestamps, providing more accurate rate limiting compared to fixed windows.

func MustNewRateLimiter

func MustNewRateLimiter(client *Client, prefix string, limit int, window time.Duration, log *logger.Logger) *RateLimiter

MustNewRateLimiter creates a rate limiter or panics on error. Use only in initialization code where failure is unrecoverable.

func NewRateLimiter

func NewRateLimiter(client *Client, prefix string, limit int, window time.Duration, log *logger.Logger) (*RateLimiter, error)

NewRateLimiter creates a new distributed rate limiter.

Parameters:

  • client: Redis client for storage
  • prefix: Key prefix for namespacing (e.g., "ratelimit:api")
  • limit: Maximum requests allowed per window
  • window: Time window duration
  • log: Logger for debugging

Example:

rl, err := redis.NewRateLimiter(client, "api", 100, time.Minute, logger)

func (*RateLimiter) Allow

func (rl *RateLimiter) Allow(ctx context.Context, key string) (*RateLimitResult, error)

Allow checks if a request is allowed and consumes one token atomically. Returns the result with remaining count and reset time.

This method is safe for concurrent use and uses Lua scripting to ensure atomic check-and-update operations.

func (*RateLimiter) AllowN

func (rl *RateLimiter) AllowN(ctx context.Context, key string, n int) (*RateLimitResult, error)

AllowN checks if N requests are allowed and consumes them atomically. This is useful for operations that consume multiple tokens (e.g., bulk APIs).

func (*RateLimiter) Limit

func (rl *RateLimiter) Limit() int

Limit returns the configured maximum requests per window.

func (*RateLimiter) Reset

func (rl *RateLimiter) Reset(ctx context.Context, key string) error

Reset removes the rate limit for a key, allowing immediate access. Use with caution as this bypasses rate limiting protections.

func (*RateLimiter) Status

func (rl *RateLimiter) Status(ctx context.Context, key string) (*RateLimitResult, error)

Status returns the current rate limit status without consuming a token. This is useful for displaying rate limit information to clients.

This method uses Lua scripting to ensure atomic reads, preventing race conditions between cleanup and count operations.

func (*RateLimiter) Window

func (rl *RateLimiter) Window() time.Duration

Window returns the configured time window duration.

type RateLimiterStore

type RateLimiterStore interface {
	// Allow checks if a request is allowed and consumes one token.
	Allow(ctx context.Context, key string) (*RateLimitResult, error)

	// AllowN checks if N requests are allowed and consumes them.
	AllowN(ctx context.Context, key string, n int) (*RateLimitResult, error)

	// Status returns the current rate limit status without consuming a token.
	Status(ctx context.Context, key string) (*RateLimitResult, error)

	// Reset removes the rate limit for a key.
	Reset(ctx context.Context, key string) error

	// Limit returns the configured limit.
	Limit() int

	// Window returns the configured window duration.
	Window() time.Duration
}

RateLimiterStore defines the interface for rate limiting operations. Use this interface in application code for better testability.

type RedisClient

type RedisClient interface {
	Ping(ctx context.Context) *goredis.StatusCmd
	Get(ctx context.Context, key string) *goredis.StringCmd
	Set(ctx context.Context, key string, value interface{}, expiration time.Duration) *goredis.StatusCmd
	Del(ctx context.Context, keys ...string) *goredis.IntCmd
	Exists(ctx context.Context, keys ...string) *goredis.IntCmd
	Expire(ctx context.Context, key string, expiration time.Duration) *goredis.BoolCmd
	TTL(ctx context.Context, key string) *goredis.DurationCmd
	Close() error
}

RedisClient is an interface that wraps the essential redis.Client methods. This allows for easier testing with mock implementations.

type SessionData

type SessionData struct {
	UserID    string
	SessionID string
	UserAgent string
	IP        string
	CreatedAt time.Time
	ExpiresAt time.Time
	Data      map[string]string
}

SessionData represents session metadata.

type TokenStore

type TokenStore struct {
	// contains filtered or unexported fields
}

TokenStore manages JWT tokens, sessions, and refresh tokens.

func MustNewTokenStore

func MustNewTokenStore(client *Client, log *logger.Logger) *TokenStore

MustNewTokenStore creates a token store or panics on error.

func NewTokenStore

func NewTokenStore(client *Client, log *logger.Logger) (*TokenStore, error)

NewTokenStore creates a new token store.

func (*TokenStore) BlacklistToken

func (ts *TokenStore) BlacklistToken(ctx context.Context, jti string, expiry time.Duration) error

BlacklistToken adds a token to the blacklist. The token will be automatically removed after the expiry duration.

func (*TokenStore) CountActiveSessions

func (ts *TokenStore) CountActiveSessions(ctx context.Context, userID string) (int64, error)

CountActiveSessions returns the number of active sessions for a user.

func (*TokenStore) DeleteAllUserSessions

func (ts *TokenStore) DeleteAllUserSessions(ctx context.Context, userID string) error

DeleteAllUserSessions removes all sessions for a user atomically.

func (*TokenStore) DeleteSession

func (ts *TokenStore) DeleteSession(ctx context.Context, userID, sessionID string) error

DeleteSession removes a user session atomically.

func (*TokenStore) GetSession

func (ts *TokenStore) GetSession(ctx context.Context, userID, sessionID string) (map[string]string, error)

GetSession retrieves a user session.

func (*TokenStore) GetUserSessions

func (ts *TokenStore) GetUserSessions(ctx context.Context, userID string) ([]string, error)

GetUserSessions returns all active session IDs for a user.

func (*TokenStore) IsBlacklisted

func (ts *TokenStore) IsBlacklisted(ctx context.Context, jti string) (bool, error)

IsBlacklisted checks if a token is blacklisted.

func (*TokenStore) RefreshSession

func (ts *TokenStore) RefreshSession(ctx context.Context, userID, sessionID string, ttl time.Duration) error

RefreshSession extends the TTL of a session atomically. Uses Lua scripting to prevent race conditions between existence check and TTL update.

func (*TokenStore) RevokeAllRefreshTokens

func (ts *TokenStore) RevokeAllRefreshTokens(ctx context.Context, userID string) error

RevokeAllRefreshTokens revokes all refresh tokens for a user atomically.

func (*TokenStore) RevokeRefreshToken

func (ts *TokenStore) RevokeRefreshToken(ctx context.Context, userID, tokenHash string) error

RevokeRefreshToken removes a refresh token atomically.

func (*TokenStore) RotateRefreshToken

func (ts *TokenStore) RotateRefreshToken(ctx context.Context, userID, oldTokenHash, newTokenHash string, ttl time.Duration) error

RotateRefreshToken atomically revokes old token and stores new one.

func (*TokenStore) StoreRefreshToken

func (ts *TokenStore) StoreRefreshToken(ctx context.Context, userID, tokenHash string, ttl time.Duration) error

StoreRefreshToken stores a refresh token hash atomically.

func (*TokenStore) StoreSession

func (ts *TokenStore) StoreSession(ctx context.Context, userID, sessionID string, data map[string]string, ttl time.Duration) error

StoreSession stores a user session atomically. All operations (HSet, Expire, SAdd) are executed in a transaction.

func (*TokenStore) ValidateRefreshToken

func (ts *TokenStore) ValidateRefreshToken(ctx context.Context, userID, tokenHash string) (bool, error)

ValidateRefreshToken checks if a refresh token is valid.

type TokenStorer

type TokenStorer interface {
	// JWT Blacklist
	BlacklistToken(ctx context.Context, jti string, expiry time.Duration) error
	IsBlacklisted(ctx context.Context, jti string) (bool, error)

	// Session Management
	StoreSession(ctx context.Context, userID, sessionID string, data map[string]string, ttl time.Duration) error
	GetSession(ctx context.Context, userID, sessionID string) (map[string]string, error)
	DeleteSession(ctx context.Context, userID, sessionID string) error
	DeleteAllUserSessions(ctx context.Context, userID string) error
	GetUserSessions(ctx context.Context, userID string) ([]string, error)
	RefreshSession(ctx context.Context, userID, sessionID string, ttl time.Duration) error
	CountActiveSessions(ctx context.Context, userID string) (int64, error)

	// Refresh Tokens
	StoreRefreshToken(ctx context.Context, userID, tokenHash string, ttl time.Duration) error
	ValidateRefreshToken(ctx context.Context, userID, tokenHash string) (bool, error)
	RevokeRefreshToken(ctx context.Context, userID, tokenHash string) error
	RevokeAllRefreshTokens(ctx context.Context, userID string) error
	RotateRefreshToken(ctx context.Context, userID, oldTokenHash, newTokenHash string, ttl time.Duration) error
}

TokenStorer defines the interface for token/session operations. Use this interface in application code for better testability.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL