Documentation
¶
Overview ¶
Package redis provides production-ready Redis integration for the OpenCTEM application.
Overview ¶
This package provides four main components:
- Client: Connection management with TLS, pooling, and retry logic
- Cache[T]: Type-safe generic caching with TTL support
- TokenStore: JWT blacklist, session management, and refresh tokens
- RateLimiter: Distributed rate limiting using sliding window algorithm
Quick Start ¶
Initialize the Redis client:
cfg := &config.RedisConfig{
Host: "localhost",
Port: 6379,
Password: "secret",
DB: 0,
PoolSize: 10,
MinIdleConns: 2,
DialTimeout: 5 * time.Second,
ReadTimeout: 3 * time.Second,
WriteTimeout: 3 * time.Second,
TLSEnabled: true, // Required in production
TLSSkipVerify: false, // Must be false in production
MaxRetries: 3,
MinRetryDelay: 100 * time.Millisecond,
MaxRetryDelay: 3 * time.Second,
}
client, err := redis.New(cfg, logger)
if err != nil {
log.Fatal(err)
}
defer client.Close()
Using the Generic Cache ¶
Create a type-safe cache for any struct:
type User struct {
ID string `json:"id"`
Name string `json:"name"`
Email string `json:"email"`
}
// Create cache with 1 hour TTL
userCache, err := redis.NewCache[User](client, "users", time.Hour)
if err != nil {
log.Fatal(err)
}
// Store a user
user := User{ID: "123", Name: "John", Email: "john@example.com"}
if err := userCache.Set(ctx, user.ID, user); err != nil {
log.Error("failed to cache user", "error", err)
}
// Retrieve a user
cached, err := userCache.Get(ctx, "123")
if errors.Is(err, redis.ErrCacheMiss) {
// Cache miss - load from database
} else if err != nil {
log.Error("cache error", "error", err)
}
// Get or set pattern (cache-aside)
user, err := userCache.GetOrSet(ctx, "123", func(ctx context.Context) (*User, error) {
return userRepo.FindByID(ctx, "123")
})
Using the Token Store ¶
Manage JWT blacklist and sessions:
tokenStore, err := redis.NewTokenStore(client, logger)
if err != nil {
log.Fatal(err)
}
// Blacklist a JWT token (for logout)
err = tokenStore.BlacklistToken(ctx, jti, tokenExpiry)
// Check if token is blacklisted (in auth middleware)
blacklisted, err := tokenStore.IsBlacklisted(ctx, jti)
if blacklisted {
return ErrTokenRevoked
}
// Store user session
sessionData := map[string]string{
"user_agent": r.UserAgent(),
"ip": r.RemoteAddr,
"created_at": time.Now().Format(time.RFC3339),
}
err = tokenStore.StoreSession(ctx, userID, sessionID, sessionData, 24*time.Hour)
// Delete all sessions (force logout from all devices)
err = tokenStore.DeleteAllUserSessions(ctx, userID)
// Refresh token rotation
err = tokenStore.RotateRefreshToken(ctx, userID, oldHash, newHash, 7*24*time.Hour)
Using the Rate Limiter ¶
Distributed rate limiting:
rateLimiter, err := redis.NewRateLimiter(
client,
"api", // key prefix
100, // 100 requests
time.Minute, // per minute
logger,
)
if err != nil {
log.Fatal(err)
}
// In HTTP middleware
result, err := rateLimiter.Allow(ctx, clientIP)
if err != nil {
// Redis error - decide on fallback strategy
log.Error("rate limit check failed", "error", err)
}
if !result.Allowed {
w.Header().Set("Retry-After", result.RetryAt.Format(time.RFC1123))
http.Error(w, "Too Many Requests", http.StatusTooManyRequests)
return
}
w.Header().Set("X-RateLimit-Remaining", strconv.Itoa(result.Remaining))
Production Configuration ¶
Required settings for production:
# .env (production) REDIS_HOST=redis.internal REDIS_PORT=6379 REDIS_PASSWORD=<strong-password> REDIS_DB=0 REDIS_POOL_SIZE=25 REDIS_MIN_IDLE_CONNS=5 REDIS_DIAL_TIMEOUT=5s REDIS_READ_TIMEOUT=3s REDIS_WRITE_TIMEOUT=3s REDIS_TLS_ENABLED=true REDIS_TLS_SKIP_VERIFY=false REDIS_MAX_RETRIES=3 REDIS_MIN_RETRY_DELAY=100ms REDIS_MAX_RETRY_DELAY=3s
Health Checks ¶
Use the Ping method for health checks:
func (h *HealthHandler) Ready(w http.ResponseWriter, r *http.Request) {
ctx, cancel := context.WithTimeout(r.Context(), 5*time.Second)
defer cancel()
if err := h.redis.Ping(ctx); err != nil {
http.Error(w, "redis unavailable", http.StatusServiceUnavailable)
return
}
w.WriteHeader(http.StatusOK)
}
Error Handling ¶
The package defines specific errors for common cases:
var (
ErrKeyNotFound = errors.New("redis: key not found")
ErrCacheMiss = errors.New("cache: key not found")
)
Use errors.Is for error checking:
if errors.Is(err, redis.ErrCacheMiss) {
// Handle cache miss
}
Thread Safety ¶
All components are safe for concurrent use. The underlying go-redis client manages connection pooling automatically.
Graceful Shutdown ¶
Always close the client on application shutdown:
// In main.go
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
<-sigChan
// Graceful shutdown
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
if err := redisClient.Close(); err != nil {
log.Error("failed to close redis", "error", err)
}
Index ¶
- Constants
- Variables
- func StartPoolStatsCollector(ctx context.Context, client *Client, interval time.Duration) func()
- func Timed(operation string) func(error)
- type AgentHeartbeat
- type AgentStateStore
- func (s *AgentStateStore) CleanupStaleAgents(ctx context.Context, threshold time.Duration) (int, error)
- func (s *AgentStateStore) GetAgentActiveJobCount(ctx context.Context, agentID shared.ID) (int64, error)
- func (s *AgentStateStore) GetAgentActiveJobs(ctx context.Context, agentID shared.ID) ([]string, error)
- func (s *AgentStateStore) GetAgentConfig(ctx context.Context, agentID shared.ID) (*CachedAgentConfig, error)
- func (s *AgentStateStore) GetAgentsWithStaleHeartbeat(ctx context.Context, threshold time.Duration) ([]string, error)
- func (s *AgentStateStore) GetHeartbeat(ctx context.Context, agentID shared.ID) (*AgentHeartbeat, error)
- func (s *AgentStateStore) GetLastHeartbeatTime(ctx context.Context, agentID shared.ID) (time.Time, error)
- func (s *AgentStateStore) GetOnlinePlatformAgentCount(ctx context.Context) (int64, error)
- func (s *AgentStateStore) GetOnlinePlatformAgents(ctx context.Context) ([]string, error)
- func (s *AgentStateStore) GetPlatformAgentState(ctx context.Context, agentID shared.ID) (*PlatformAgentState, error)
- func (s *AgentStateStore) GetPreviousHealthState(ctx context.Context, agentID shared.ID) (string, error)
- func (s *AgentStateStore) GetQueueStats(ctx context.Context) (*QueueStats, error)
- func (s *AgentStateStore) IncrementQueueStat(ctx context.Context, field string, delta int64) error
- func (s *AgentStateStore) InvalidateAgentConfig(ctx context.Context, agentID shared.ID) error
- func (s *AgentStateStore) IsAgentOnline(ctx context.Context, agentID shared.ID) (bool, error)
- func (s *AgentStateStore) MarkAgentOfflineInCache(ctx context.Context, agentID shared.ID) error
- func (s *AgentStateStore) RecordHeartbeat(ctx context.Context, hb *AgentHeartbeat) error
- func (s *AgentStateStore) RemoveHeartbeat(ctx context.Context, agentID shared.ID) error
- func (s *AgentStateStore) SetAgentConfig(ctx context.Context, config *CachedAgentConfig) error
- func (s *AgentStateStore) SetPlatformAgentState(ctx context.Context, state *PlatformAgentState) error
- func (s *AgentStateStore) SetPreviousHealthState(ctx context.Context, agentID shared.ID, health string) error
- func (s *AgentStateStore) TrackAgentJob(ctx context.Context, agentID, jobID shared.ID) error
- func (s *AgentStateStore) UntrackAgentJob(ctx context.Context, agentID, jobID shared.ID) error
- func (s *AgentStateStore) UpdateQueueStats(ctx context.Context, stats *QueueStats) error
- func (s *AgentStateStore) WasAgentOffline(ctx context.Context, agentID shared.ID) (bool, error)
- type Cache
- func (c *Cache[T]) Delete(ctx context.Context, key string) error
- func (c *Cache[T]) DeletePattern(ctx context.Context, pattern string) error
- func (c *Cache[T]) Exists(ctx context.Context, key string) (bool, error)
- func (c *Cache[T]) Get(ctx context.Context, key string) (*T, error)
- func (c *Cache[T]) GetOrSet(ctx context.Context, key string, loader func(ctx context.Context) (*T, error)) (*T, error)
- func (c *Cache[T]) GetOrSetFallback(ctx context.Context, key string, loader func(ctx context.Context) (*T, error)) (*T, error)
- func (c *Cache[T]) GetOrSetFallbackWithTTL(ctx context.Context, key string, ttl time.Duration, ...) (*T, error)
- func (c *Cache[T]) GetOrSetWithTTL(ctx context.Context, key string, ttl time.Duration, ...) (*T, error)
- func (c *Cache[T]) Invalidate(ctx context.Context, key string) error
- func (c *Cache[T]) MGet(ctx context.Context, keys ...string) (map[string]*T, error)
- func (c *Cache[T]) MSet(ctx context.Context, items map[string]T) error
- func (c *Cache[T]) Prefix() string
- func (c *Cache[T]) Refresh(ctx context.Context, key string) error
- func (c *Cache[T]) Set(ctx context.Context, key string, value T) error
- func (c *Cache[T]) SetWithTTL(ctx context.Context, key string, value T, ttl time.Duration) error
- func (c *Cache[T]) TTL() time.Duration
- type CacheStore
- type CachedAgentConfig
- type Client
- func (c *Client) Client() *redis.Client
- func (c *Client) Close() error
- func (c *Client) Del(ctx context.Context, keys ...string) error
- func (c *Client) Exists(ctx context.Context, key string) (bool, error)
- func (c *Client) Expire(ctx context.Context, key string, ttl time.Duration) error
- func (c *Client) Get(ctx context.Context, key string) (string, error)
- func (c *Client) Keys(ctx context.Context, pattern string) ([]string, error)
- func (c *Client) Logger() *logger.Logger
- func (c *Client) Ping(ctx context.Context) error
- func (c *Client) PoolStats() *redis.PoolStats
- func (c *Client) Scan(ctx context.Context, pattern string, count int64) ([]string, error)
- func (c *Client) Set(ctx context.Context, key, value string, ttl time.Duration) error
- func (c *Client) TTL(ctx context.Context, key string) (time.Duration, error)
- type Closer
- type JobNotification
- type JobNotifier
- func (n *JobNotifier) NotifyNewJob(ctx context.Context, notification *JobNotification) error
- func (n *JobNotifier) NotifyNewPlatformJob(ctx context.Context, jobID, tenantID string, capabilities, tools []string, ...) error
- func (n *JobNotifier) StartListener(ctx context.Context) error
- func (n *JobNotifier) Subscribe(agentID string, capabilities []string) <-chan *JobNotification
- func (n *JobNotifier) SubscriberCount() int
- func (n *JobNotifier) Unsubscribe(agentID string)
- func (n *JobNotifier) WaitForJob(ctx context.Context, agentID string, capabilities []string, ...) bool
- type Metrics
- func (m *Metrics) ObserveOperation(operation string, duration time.Duration, err error)
- func (m *Metrics) RecordCacheHit(cacheName string)
- func (m *Metrics) RecordCacheMiss(cacheName string)
- func (m *Metrics) RecordRateLimitResult(limiterName string, allowed bool)
- func (m *Metrics) UpdatePoolStats(client *Client)
- type MetricsCollector
- type MiddlewareAdapter
- type MiddlewareRateLimitResult
- type Pinger
- type PlatformAgentState
- type QueueStats
- type RateLimitResult
- type RateLimiter
- func (rl *RateLimiter) Allow(ctx context.Context, key string) (*RateLimitResult, error)
- func (rl *RateLimiter) AllowN(ctx context.Context, key string, n int) (*RateLimitResult, error)
- func (rl *RateLimiter) Limit() int
- func (rl *RateLimiter) Reset(ctx context.Context, key string) error
- func (rl *RateLimiter) Status(ctx context.Context, key string) (*RateLimitResult, error)
- func (rl *RateLimiter) Window() time.Duration
- type RateLimiterStore
- type RedisClient
- type SessionData
- type TokenStore
- func (ts *TokenStore) BlacklistToken(ctx context.Context, jti string, expiry time.Duration) error
- func (ts *TokenStore) CountActiveSessions(ctx context.Context, userID string) (int64, error)
- func (ts *TokenStore) DeleteAllUserSessions(ctx context.Context, userID string) error
- func (ts *TokenStore) DeleteSession(ctx context.Context, userID, sessionID string) error
- func (ts *TokenStore) GetSession(ctx context.Context, userID, sessionID string) (map[string]string, error)
- func (ts *TokenStore) GetUserSessions(ctx context.Context, userID string) ([]string, error)
- func (ts *TokenStore) IsBlacklisted(ctx context.Context, jti string) (bool, error)
- func (ts *TokenStore) RefreshSession(ctx context.Context, userID, sessionID string, ttl time.Duration) error
- func (ts *TokenStore) RevokeAllRefreshTokens(ctx context.Context, userID string) error
- func (ts *TokenStore) RevokeRefreshToken(ctx context.Context, userID, tokenHash string) error
- func (ts *TokenStore) RotateRefreshToken(ctx context.Context, userID, oldTokenHash, newTokenHash string, ...) error
- func (ts *TokenStore) StoreRefreshToken(ctx context.Context, userID, tokenHash string, ttl time.Duration) error
- func (ts *TokenStore) StoreSession(ctx context.Context, userID, sessionID string, data map[string]string, ...) error
- func (ts *TokenStore) ValidateRefreshToken(ctx context.Context, userID, tokenHash string) (bool, error)
- type TokenStorer
Constants ¶
const ( // JobNotifyChannel is the Redis pub/sub channel for job notifications. JobNotifyChannel = "platform:jobs:notify" // JobNotifyChannelPrefix is the prefix for capability-specific channels. JobNotifyChannelPrefix = "platform:jobs:notify:" )
Variables ¶
var ( // ErrKeyNotFound is returned when a key does not exist. ErrKeyNotFound = errors.New("redis: key not found") // ErrCacheMiss is returned when a cached item is not found. ErrCacheMiss = errors.New("cache: key not found") )
Redis-specific errors.
Functions ¶
func StartPoolStatsCollector ¶
StartPoolStatsCollector starts a goroutine that periodically updates pool stats. Returns a cancel function to stop the collector.
Types ¶
type AgentHeartbeat ¶
type AgentHeartbeat struct {
AgentID string `json:"agent_id"`
TenantID string `json:"tenant_id,omitempty"` // Empty for platform agents
IsPlatform bool `json:"is_platform"`
Status string `json:"status"`
Health string `json:"health"`
CurrentJobs int `json:"current_jobs"`
MaxConcurrent int `json:"max_concurrent"`
LastHeartbeat time.Time `json:"last_heartbeat"`
IPAddress string `json:"ip_address,omitempty"`
Region string `json:"region,omitempty"`
Version string `json:"version,omitempty"`
// Extended metrics for load balancing
CPUPercent float64 `json:"cpu_percent,omitempty"`
MemoryPercent float64 `json:"memory_percent,omitempty"`
LoadScore float64 `json:"load_score,omitempty"` // Weighted load score (lower is better)
}
AgentHeartbeat represents the heartbeat data stored in Redis.
type AgentStateStore ¶
type AgentStateStore struct {
// contains filtered or unexported fields
}
AgentStateStore manages ephemeral agent state in Redis.
func NewAgentStateStore ¶
func NewAgentStateStore(client *Client, log *logger.Logger) *AgentStateStore
NewAgentStateStore creates a new AgentStateStore.
func (*AgentStateStore) CleanupStaleAgents ¶
func (s *AgentStateStore) CleanupStaleAgents(ctx context.Context, threshold time.Duration) (int, error)
CleanupStaleAgents removes stale agent data from Redis.
func (*AgentStateStore) GetAgentActiveJobCount ¶
func (s *AgentStateStore) GetAgentActiveJobCount(ctx context.Context, agentID shared.ID) (int64, error)
GetAgentActiveJobCount returns the count of active jobs for an agent.
func (*AgentStateStore) GetAgentActiveJobs ¶
func (s *AgentStateStore) GetAgentActiveJobs(ctx context.Context, agentID shared.ID) ([]string, error)
GetAgentActiveJobs returns all active job IDs for an agent.
func (*AgentStateStore) GetAgentConfig ¶
func (s *AgentStateStore) GetAgentConfig(ctx context.Context, agentID shared.ID) (*CachedAgentConfig, error)
GetAgentConfig retrieves cached agent configuration. Returns nil, nil if not cached (caller should load from DB and cache).
func (*AgentStateStore) GetAgentsWithStaleHeartbeat ¶
func (s *AgentStateStore) GetAgentsWithStaleHeartbeat(ctx context.Context, threshold time.Duration) ([]string, error)
GetAgentsWithStaleHeartbeat returns agent IDs whose heartbeat is older than the threshold. Used by health monitor to detect agents that went offline.
func (*AgentStateStore) GetHeartbeat ¶
func (s *AgentStateStore) GetHeartbeat(ctx context.Context, agentID shared.ID) (*AgentHeartbeat, error)
GetHeartbeat retrieves the latest heartbeat for an agent.
func (*AgentStateStore) GetLastHeartbeatTime ¶
func (s *AgentStateStore) GetLastHeartbeatTime(ctx context.Context, agentID shared.ID) (time.Time, error)
GetLastHeartbeatTime returns the last heartbeat timestamp for an agent. Returns zero time if no heartbeat exists.
func (*AgentStateStore) GetOnlinePlatformAgentCount ¶
func (s *AgentStateStore) GetOnlinePlatformAgentCount(ctx context.Context) (int64, error)
GetOnlinePlatformAgentCount returns the count of online platform agents.
func (*AgentStateStore) GetOnlinePlatformAgents ¶
func (s *AgentStateStore) GetOnlinePlatformAgents(ctx context.Context) ([]string, error)
GetOnlinePlatformAgents returns all online platform agent IDs.
func (*AgentStateStore) GetPlatformAgentState ¶
func (s *AgentStateStore) GetPlatformAgentState(ctx context.Context, agentID shared.ID) (*PlatformAgentState, error)
GetPlatformAgentState retrieves the state of a platform agent.
func (*AgentStateStore) GetPreviousHealthState ¶
func (s *AgentStateStore) GetPreviousHealthState(ctx context.Context, agentID shared.ID) (string, error)
GetPreviousHealthState returns the previous health state of an agent. Used to detect state transitions (offline -> online, online -> offline).
func (*AgentStateStore) GetQueueStats ¶
func (s *AgentStateStore) GetQueueStats(ctx context.Context) (*QueueStats, error)
GetQueueStats retrieves queue statistics.
func (*AgentStateStore) IncrementQueueStat ¶
IncrementQueueStat increments a specific queue stat counter.
func (*AgentStateStore) InvalidateAgentConfig ¶
InvalidateAgentConfig removes cached agent configuration (e.g., after admin update).
func (*AgentStateStore) IsAgentOnline ¶
IsAgentOnline checks if an agent is online based on heartbeat.
func (*AgentStateStore) MarkAgentOfflineInCache ¶
MarkAgentOfflineInCache marks an agent as offline in the cache. Called by health monitor when heartbeat timeout is detected.
func (*AgentStateStore) RecordHeartbeat ¶
func (s *AgentStateStore) RecordHeartbeat(ctx context.Context, hb *AgentHeartbeat) error
RecordHeartbeat records an agent heartbeat.
func (*AgentStateStore) RemoveHeartbeat ¶
RemoveHeartbeat removes an agent's heartbeat (for clean shutdown).
func (*AgentStateStore) SetAgentConfig ¶
func (s *AgentStateStore) SetAgentConfig(ctx context.Context, config *CachedAgentConfig) error
SetAgentConfig caches agent configuration to avoid DB reads on every heartbeat.
func (*AgentStateStore) SetPlatformAgentState ¶
func (s *AgentStateStore) SetPlatformAgentState(ctx context.Context, state *PlatformAgentState) error
SetPlatformAgentState stores the state of a platform agent.
func (*AgentStateStore) SetPreviousHealthState ¶
func (s *AgentStateStore) SetPreviousHealthState(ctx context.Context, agentID shared.ID, health string) error
SetPreviousHealthState stores the previous health state of an agent. TTL is set longer than heartbeat to ensure we can detect offline -> online transitions.
func (*AgentStateStore) TrackAgentJob ¶
TrackAgentJob adds a job to an agent's active job set.
func (*AgentStateStore) UntrackAgentJob ¶
UntrackAgentJob removes a job from an agent's active job set.
func (*AgentStateStore) UpdateQueueStats ¶
func (s *AgentStateStore) UpdateQueueStats(ctx context.Context, stats *QueueStats) error
UpdateQueueStats updates queue statistics.
func (*AgentStateStore) WasAgentOffline ¶
WasAgentOffline checks if agent was previously offline (for detecting online transition). Returns true if: 1. Previous health state was "offline" or "error" 2. No previous heartbeat exists (new agent or first heartbeat after long downtime)
type Cache ¶
type Cache[T any] struct { // contains filtered or unexported fields }
Cache provides type-safe caching operations.
func MustNewCache ¶
MustNewCache creates a new cache or panics on error. Use only in initialization code where failure is unrecoverable.
func (*Cache[T]) DeletePattern ¶
DeletePattern removes all keys matching a pattern. Pattern example: "*" removes all keys with the cache prefix.
func (*Cache[T]) Get ¶
Get retrieves a cached value by key. Returns ErrCacheMiss if the key does not exist.
func (*Cache[T]) GetOrSet ¶
func (c *Cache[T]) GetOrSet(ctx context.Context, key string, loader func(ctx context.Context) (*T, error)) (*T, error)
GetOrSet retrieves a value from cache, or calls the loader function and caches the result. On cache miss, loads from source and caches the result. On Redis errors (connection, timeout), fails fast to prevent cascading failures. Cache set errors are logged but do not fail the operation.
func (*Cache[T]) GetOrSetFallback ¶
func (c *Cache[T]) GetOrSetFallback(ctx context.Context, key string, loader func(ctx context.Context) (*T, error)) (*T, error)
GetOrSetFallback is like GetOrSet but falls back to loader on any cache error. Use this when availability is more important than protecting the database. WARNING: If Redis is down, ALL requests will hit your database.
func (*Cache[T]) GetOrSetFallbackWithTTL ¶
func (c *Cache[T]) GetOrSetFallbackWithTTL(ctx context.Context, key string, ttl time.Duration, loader func(ctx context.Context) (*T, error)) (*T, error)
GetOrSetFallbackWithTTL is like GetOrSetFallback but with a custom TTL.
func (*Cache[T]) GetOrSetWithTTL ¶
func (c *Cache[T]) GetOrSetWithTTL(ctx context.Context, key string, ttl time.Duration, loader func(ctx context.Context) (*T, error)) (*T, error)
GetOrSetWithTTL is like GetOrSet but with a custom TTL. On cache miss, loads from source and caches the result. On Redis errors (connection, timeout), fails fast to prevent cascading failures. Cache set errors are logged but do not fail the operation.
func (*Cache[T]) Invalidate ¶
Invalidate removes a key from the cache (alias for Delete).
func (*Cache[T]) MGet ¶
MGet retrieves multiple values by keys. Returns a map of key to value. Missing keys are not included in the result.
func (*Cache[T]) SetWithTTL ¶
SetWithTTL stores a value in the cache with a custom TTL.
type CacheStore ¶
type CacheStore[T any] interface { // Get retrieves a cached value by key. // Returns ErrCacheMiss if the key does not exist. Get(ctx context.Context, key string) (*T, error) // Set stores a value in the cache with the default TTL. Set(ctx context.Context, key string, value T) error // SetWithTTL stores a value in the cache with a custom TTL. SetWithTTL(ctx context.Context, key string, value T, ttl time.Duration) error // Delete removes a key from the cache. Delete(ctx context.Context, key string) error // Exists checks if a key exists in the cache. Exists(ctx context.Context, key string) (bool, error) // GetOrSet retrieves from cache or calls loader and caches the result. GetOrSet(ctx context.Context, key string, loader func(ctx context.Context) (*T, error)) (*T, error) }
CacheStore defines the interface for cache operations. Use this interface in application code for better testability.
type CachedAgentConfig ¶
type CachedAgentConfig struct {
AgentID string `json:"agent_id"`
TenantID string `json:"tenant_id,omitempty"` // Empty for platform agents
IsPlatform bool `json:"is_platform"`
Status string `json:"status"` // Admin-controlled status (active, disabled, revoked)
Capabilities []string `json:"capabilities"`
Tools []string `json:"tools"`
MaxConcurrent int `json:"max_concurrent"`
Region string `json:"region,omitempty"`
CachedAt int64 `json:"cached_at"` // Unix timestamp
}
CachedAgentConfig represents the cached agent configuration to avoid DB reads on every heartbeat.
type Client ¶
type Client struct {
// contains filtered or unexported fields
}
Client wraps redis.Client with additional functionality.
func (*Client) Keys ¶
Keys returns all keys matching a pattern. Warning: Use with caution in production as it can be slow.
func (*Client) Scan ¶
Scan iterates through keys matching a pattern. This is more production-safe than Keys().
type JobNotification ¶
type JobNotification struct {
JobID string `json:"job_id"`
TenantID string `json:"tenant_id"`
Capabilities []string `json:"capabilities,omitempty"`
Tools []string `json:"tools,omitempty"`
Priority int `json:"priority"`
CreatedAt int64 `json:"created_at"` // Unix timestamp
}
JobNotification represents a notification about a new platform job.
type JobNotifier ¶
type JobNotifier struct {
// contains filtered or unexported fields
}
JobNotifier handles pub/sub notifications for platform jobs. It uses Redis pub/sub to notify waiting agents when new jobs are available.
func NewJobNotifier ¶
func NewJobNotifier(client *Client, log *logger.Logger) *JobNotifier
NewJobNotifier creates a new JobNotifier.
func (*JobNotifier) NotifyNewJob ¶
func (n *JobNotifier) NotifyNewJob(ctx context.Context, notification *JobNotification) error
NotifyNewJob publishes a notification that a new job is available. This should be called when a new platform job is created.
func (*JobNotifier) NotifyNewPlatformJob ¶
func (n *JobNotifier) NotifyNewPlatformJob(ctx context.Context, jobID, tenantID string, capabilities, tools []string, priority int, createdAt int64) error
NotifyNewPlatformJob is an adapter method that accepts app.PlatformJobNotification and converts it to the internal JobNotification format. This allows JobNotifier to implement app.PlatformJobNotifier interface.
func (*JobNotifier) StartListener ¶
func (n *JobNotifier) StartListener(ctx context.Context) error
StartListener starts listening for Redis pub/sub messages and dispatches them to subscribed agents. This should be called once when the application starts.
func (*JobNotifier) Subscribe ¶
func (n *JobNotifier) Subscribe(agentID string, capabilities []string) <-chan *JobNotification
Subscribe creates a subscription for an agent to receive job notifications. Returns a channel that will receive notifications when jobs are available. The caller should call Unsubscribe when done.
func (*JobNotifier) SubscriberCount ¶
func (n *JobNotifier) SubscriberCount() int
SubscriberCount returns the current number of subscribers.
func (*JobNotifier) Unsubscribe ¶
func (n *JobNotifier) Unsubscribe(agentID string)
Unsubscribe removes an agent's subscription.
func (*JobNotifier) WaitForJob ¶
func (n *JobNotifier) WaitForJob(ctx context.Context, agentID string, capabilities []string, timeout time.Duration) bool
WaitForJob waits for a job notification or timeout. This is the main method used by the long-polling handler. Returns true if a notification was received, false on timeout.
type Metrics ¶
type Metrics struct {
// contains filtered or unexported fields
}
Metrics holds all Redis-related Prometheus metrics.
var DefaultMetrics *Metrics
DefaultMetrics is the default metrics instance.
func NewMetrics ¶
NewMetrics creates a new Metrics instance with the given namespace.
func (*Metrics) ObserveOperation ¶
ObserveOperation records the duration and result of a Redis operation.
func (*Metrics) RecordCacheHit ¶
RecordCacheHit records a cache hit for the given cache name.
func (*Metrics) RecordCacheMiss ¶
RecordCacheMiss records a cache miss for the given cache name.
func (*Metrics) RecordRateLimitResult ¶
RecordRateLimitResult records the result of a rate limit check.
func (*Metrics) UpdatePoolStats ¶
UpdatePoolStats updates the connection pool metrics from the client.
type MetricsCollector ¶
type MetricsCollector struct {
// contains filtered or unexported fields
}
MetricsCollector implements prometheus.Collector for Redis pool stats.
func NewMetricsCollector ¶
func NewMetricsCollector(client *Client, metrics *Metrics) *MetricsCollector
NewMetricsCollector creates a new MetricsCollector.
func (*MetricsCollector) Collect ¶
func (c *MetricsCollector) Collect(_ chan<- prometheus.Metric)
Collect implements prometheus.Collector.
func (*MetricsCollector) Describe ¶
func (c *MetricsCollector) Describe(_ chan<- *prometheus.Desc)
Describe implements prometheus.Collector.
type MiddlewareAdapter ¶
type MiddlewareAdapter struct {
// contains filtered or unexported fields
}
MiddlewareAdapter wraps RateLimiter to implement the middleware interface. This adapter converts the internal RateLimitResult to the middleware's expected type.
func NewMiddlewareAdapter ¶
func NewMiddlewareAdapter(rl *RateLimiter) *MiddlewareAdapter
NewMiddlewareAdapter creates an adapter for use with the HTTP middleware.
func (*MiddlewareAdapter) Allow ¶
func (a *MiddlewareAdapter) Allow(ctx context.Context, key string) (*MiddlewareRateLimitResult, error)
Allow checks if a request is allowed and returns the result in middleware format.
func (*MiddlewareAdapter) Limit ¶
func (a *MiddlewareAdapter) Limit() int
Limit returns the configured maximum requests per window.
type MiddlewareRateLimitResult ¶
type MiddlewareRateLimitResult struct {
Allowed bool
Remaining int
ResetAt time.Time
RetryAt time.Time
}
MiddlewareRateLimitResult is the result type expected by the middleware.
type PlatformAgentState ¶
type PlatformAgentState struct {
AgentID string `json:"agent_id"`
Health string `json:"health"`
CurrentJobs int `json:"current_jobs"`
MaxConcurrent int `json:"max_concurrent"`
Region string `json:"region"`
Capabilities []string `json:"capabilities"`
Tools []string `json:"tools"`
LastHeartbeat time.Time `json:"last_heartbeat"`
LastJobAt time.Time `json:"last_job_at,omitempty"`
TotalJobs int64 `json:"total_jobs"`
FailedJobs int64 `json:"failed_jobs"`
// Extended metrics for load balancing
CPUPercent float64 `json:"cpu_percent,omitempty"`
MemoryPercent float64 `json:"memory_percent,omitempty"`
LoadScore float64 `json:"load_score,omitempty"` // Weighted load score (lower is better)
}
PlatformAgentState represents the state of a platform agent.
type QueueStats ¶
type QueueStats struct {
TotalQueued int64 `json:"total_queued"`
TotalProcessing int64 `json:"total_processing"`
TotalCompleted int64 `json:"total_completed"`
TotalFailed int64 `json:"total_failed"`
AvgWaitTimeSec float64 `json:"avg_wait_time_sec"`
AvgProcessTimeSec float64 `json:"avg_process_time_sec"`
LastUpdated time.Time `json:"last_updated"`
}
QueueStats represents queue statistics.
type RateLimitResult ¶
type RateLimitResult struct {
// Allowed indicates if the request is permitted.
Allowed bool
// Remaining is the number of requests left in the current window.
Remaining int
// ResetAt is when the rate limit window resets.
ResetAt time.Time
// RetryAt is when the client should retry (only set when not allowed).
RetryAt time.Time
}
RateLimitResult contains the result of a rate limit check.
type RateLimiter ¶
type RateLimiter struct {
// contains filtered or unexported fields
}
RateLimiter implements distributed rate limiting using Redis. It uses the sliding window log algorithm with sorted sets for accurate rate limiting across distributed systems.
The sliding window algorithm tracks individual request timestamps, providing more accurate rate limiting compared to fixed windows.
func MustNewRateLimiter ¶
func MustNewRateLimiter(client *Client, prefix string, limit int, window time.Duration, log *logger.Logger) *RateLimiter
MustNewRateLimiter creates a rate limiter or panics on error. Use only in initialization code where failure is unrecoverable.
func NewRateLimiter ¶
func NewRateLimiter(client *Client, prefix string, limit int, window time.Duration, log *logger.Logger) (*RateLimiter, error)
NewRateLimiter creates a new distributed rate limiter.
Parameters:
- client: Redis client for storage
- prefix: Key prefix for namespacing (e.g., "ratelimit:api")
- limit: Maximum requests allowed per window
- window: Time window duration
- log: Logger for debugging
Example:
rl, err := redis.NewRateLimiter(client, "api", 100, time.Minute, logger)
func (*RateLimiter) Allow ¶
func (rl *RateLimiter) Allow(ctx context.Context, key string) (*RateLimitResult, error)
Allow checks if a request is allowed and consumes one token atomically. Returns the result with remaining count and reset time.
This method is safe for concurrent use and uses Lua scripting to ensure atomic check-and-update operations.
func (*RateLimiter) AllowN ¶
func (rl *RateLimiter) AllowN(ctx context.Context, key string, n int) (*RateLimitResult, error)
AllowN checks if N requests are allowed and consumes them atomically. This is useful for operations that consume multiple tokens (e.g., bulk APIs).
func (*RateLimiter) Limit ¶
func (rl *RateLimiter) Limit() int
Limit returns the configured maximum requests per window.
func (*RateLimiter) Reset ¶
func (rl *RateLimiter) Reset(ctx context.Context, key string) error
Reset removes the rate limit for a key, allowing immediate access. Use with caution as this bypasses rate limiting protections.
func (*RateLimiter) Status ¶
func (rl *RateLimiter) Status(ctx context.Context, key string) (*RateLimitResult, error)
Status returns the current rate limit status without consuming a token. This is useful for displaying rate limit information to clients.
This method uses Lua scripting to ensure atomic reads, preventing race conditions between cleanup and count operations.
func (*RateLimiter) Window ¶
func (rl *RateLimiter) Window() time.Duration
Window returns the configured time window duration.
type RateLimiterStore ¶
type RateLimiterStore interface {
// Allow checks if a request is allowed and consumes one token.
Allow(ctx context.Context, key string) (*RateLimitResult, error)
// AllowN checks if N requests are allowed and consumes them.
AllowN(ctx context.Context, key string, n int) (*RateLimitResult, error)
// Status returns the current rate limit status without consuming a token.
Status(ctx context.Context, key string) (*RateLimitResult, error)
// Reset removes the rate limit for a key.
Reset(ctx context.Context, key string) error
// Limit returns the configured limit.
Limit() int
// Window returns the configured window duration.
Window() time.Duration
}
RateLimiterStore defines the interface for rate limiting operations. Use this interface in application code for better testability.
type RedisClient ¶
type RedisClient interface {
Ping(ctx context.Context) *goredis.StatusCmd
Get(ctx context.Context, key string) *goredis.StringCmd
Set(ctx context.Context, key string, value interface{}, expiration time.Duration) *goredis.StatusCmd
Del(ctx context.Context, keys ...string) *goredis.IntCmd
Exists(ctx context.Context, keys ...string) *goredis.IntCmd
Expire(ctx context.Context, key string, expiration time.Duration) *goredis.BoolCmd
TTL(ctx context.Context, key string) *goredis.DurationCmd
Close() error
}
RedisClient is an interface that wraps the essential redis.Client methods. This allows for easier testing with mock implementations.
type SessionData ¶
type SessionData struct {
UserID string
SessionID string
UserAgent string
IP string
CreatedAt time.Time
ExpiresAt time.Time
Data map[string]string
}
SessionData represents session metadata.
type TokenStore ¶
type TokenStore struct {
// contains filtered or unexported fields
}
TokenStore manages JWT tokens, sessions, and refresh tokens.
func MustNewTokenStore ¶
func MustNewTokenStore(client *Client, log *logger.Logger) *TokenStore
MustNewTokenStore creates a token store or panics on error.
func NewTokenStore ¶
func NewTokenStore(client *Client, log *logger.Logger) (*TokenStore, error)
NewTokenStore creates a new token store.
func (*TokenStore) BlacklistToken ¶
BlacklistToken adds a token to the blacklist. The token will be automatically removed after the expiry duration.
func (*TokenStore) CountActiveSessions ¶
CountActiveSessions returns the number of active sessions for a user.
func (*TokenStore) DeleteAllUserSessions ¶
func (ts *TokenStore) DeleteAllUserSessions(ctx context.Context, userID string) error
DeleteAllUserSessions removes all sessions for a user atomically.
func (*TokenStore) DeleteSession ¶
func (ts *TokenStore) DeleteSession(ctx context.Context, userID, sessionID string) error
DeleteSession removes a user session atomically.
func (*TokenStore) GetSession ¶
func (ts *TokenStore) GetSession(ctx context.Context, userID, sessionID string) (map[string]string, error)
GetSession retrieves a user session.
func (*TokenStore) GetUserSessions ¶
GetUserSessions returns all active session IDs for a user.
func (*TokenStore) IsBlacklisted ¶
IsBlacklisted checks if a token is blacklisted.
func (*TokenStore) RefreshSession ¶
func (ts *TokenStore) RefreshSession(ctx context.Context, userID, sessionID string, ttl time.Duration) error
RefreshSession extends the TTL of a session atomically. Uses Lua scripting to prevent race conditions between existence check and TTL update.
func (*TokenStore) RevokeAllRefreshTokens ¶
func (ts *TokenStore) RevokeAllRefreshTokens(ctx context.Context, userID string) error
RevokeAllRefreshTokens revokes all refresh tokens for a user atomically.
func (*TokenStore) RevokeRefreshToken ¶
func (ts *TokenStore) RevokeRefreshToken(ctx context.Context, userID, tokenHash string) error
RevokeRefreshToken removes a refresh token atomically.
func (*TokenStore) RotateRefreshToken ¶
func (ts *TokenStore) RotateRefreshToken(ctx context.Context, userID, oldTokenHash, newTokenHash string, ttl time.Duration) error
RotateRefreshToken atomically revokes old token and stores new one.
func (*TokenStore) StoreRefreshToken ¶
func (ts *TokenStore) StoreRefreshToken(ctx context.Context, userID, tokenHash string, ttl time.Duration) error
StoreRefreshToken stores a refresh token hash atomically.
func (*TokenStore) StoreSession ¶
func (ts *TokenStore) StoreSession(ctx context.Context, userID, sessionID string, data map[string]string, ttl time.Duration) error
StoreSession stores a user session atomically. All operations (HSet, Expire, SAdd) are executed in a transaction.
func (*TokenStore) ValidateRefreshToken ¶
func (ts *TokenStore) ValidateRefreshToken(ctx context.Context, userID, tokenHash string) (bool, error)
ValidateRefreshToken checks if a refresh token is valid.
type TokenStorer ¶
type TokenStorer interface {
// JWT Blacklist
BlacklistToken(ctx context.Context, jti string, expiry time.Duration) error
IsBlacklisted(ctx context.Context, jti string) (bool, error)
// Session Management
StoreSession(ctx context.Context, userID, sessionID string, data map[string]string, ttl time.Duration) error
GetSession(ctx context.Context, userID, sessionID string) (map[string]string, error)
DeleteSession(ctx context.Context, userID, sessionID string) error
DeleteAllUserSessions(ctx context.Context, userID string) error
GetUserSessions(ctx context.Context, userID string) ([]string, error)
RefreshSession(ctx context.Context, userID, sessionID string, ttl time.Duration) error
CountActiveSessions(ctx context.Context, userID string) (int64, error)
// Refresh Tokens
StoreRefreshToken(ctx context.Context, userID, tokenHash string, ttl time.Duration) error
ValidateRefreshToken(ctx context.Context, userID, tokenHash string) (bool, error)
RevokeRefreshToken(ctx context.Context, userID, tokenHash string) error
RevokeAllRefreshTokens(ctx context.Context, userID string) error
RotateRefreshToken(ctx context.Context, userID, oldTokenHash, newTokenHash string, ttl time.Duration) error
}
TokenStorer defines the interface for token/session operations. Use this interface in application code for better testability.