Documentation
¶
Index ¶
- Constants
- func Init() error
- func NewDistributedCache[T any](opts ...DistributedCacheOption[T]) (types.DistributedCache[T], error)
- func NewLocalCache[T any]() (types.Cache[T], error)
- func NewRedisCache[T any](ctx context.Context, cli redis.UniversalClient, opts ...RedisCacheOption[T]) (types.Cache[T], error)
- type CacheMetricsProvider
- type DistributedCacheOption
- func WithKafkaBrokers[T any](brokers []string) DistributedCacheOption[T]
- func WithLocalCache[T any](localCache types.Cache[T]) DistributedCacheOption[T]
- func WithLogger[T any](logger types.Logger) DistributedCacheOption[T]
- func WithMaxGoroutines[T any](maxGoRoutines int) DistributedCacheOption[T]
- func WithRedisCache[T any](redisCache types.Cache[any]) DistributedCacheOption[T]
- func WithTrace[T any](trace bool) DistributedCacheOption[T]
- type RedisCacheOption
Constants ¶
const ( MIN_GOROUTINES = 10000 //nolint:staticcheck // TOPIC_REDIS_SET_DEL is the topic name to publish the entry associated with the key should update/delete event. TOPIC_REDIS_SET_DEL = "core-distributed-cache-set-del" //nolint:staticcheck // TOPIC_REDIS_DONE is the topic name to receive the entry associated with the key was update/delete event, // We should update/delete the local cache now. TOPIC_REDIS_DONE = "core-distributed-cache-done" //nolint:staticcheck GROUP_REDIS_SET_DEL = "core-distributed-cache-set-del" //nolint:staticcheck GROUP_REDIS_DONE = "core-distributed-cache-done" //nolint:staticcheck )
Variables ¶
This section is empty.
Functions ¶
func Init ¶
func Init() error
Init initializes the distributed cache system as a state node that manages Redis operations and coordinates cache synchronization across multiple distributed core nodes.
This function serves as the central coordinator for distributed cache operations by:
- Consuming cache operation events (Set/Delete) from Kafka
- Executing Redis operations in a consistent, ordered manner
- Publishing completion events to notify other nodes to update their local caches
- Maintaining data consistency through timestamp-based ordering and deduplication
Architecture Overview:
The distributed cache system consists of: 1. State Node (this Init function): Manages Redis and coordinates operations 2. Core Nodes: Maintain local secondary caches and send operation requests 3. Kafka: Message broker for event communication between nodes 4. Redis: Centralized cache storage for distributed data
Key Implementation Rules:
- Timestamp-based event filtering: Events with timestamps older than the recorded maximum timestamp are discarded to prevent out-of-order operations
- Per-key deduplication: Only the latest operation for each key is retained within a batch, ensuring consistency (e.g., if Set(11:14) and Delete(11:10) exist, only Set(11:14) will be executed)
- Ordered execution: Operations are sorted by timestamp and executed sequentially to maintain strict ordering of Redis cache operations
- Batch processing: Events are processed in batches with maximum timestamp tracking for efficient throughput and consistency guarantees
Error Handling:
- Uses sync.Once to ensure single initialization
- Validates Redis client availability before starting
- Implements comprehensive error logging and metrics collection
- Gracefully handles Kafka connection issues and message processing failures
Performance Optimizations:
- Utilizes goroutine pools to control Kafka consumer concurrency
- Implements batch processing to reduce Redis round trips
- Uses concurrent maps for thread-safe timestamp tracking per key
func NewDistributedCache ¶
func NewDistributedCache[T any](opts ...DistributedCacheOption[T]) (types.DistributedCache[T], error)
NewDistributedCache 为什么要为每种类型创建一个单独的缓存, 并放在一个并发 map 中? 每个类型的缓存都有自己的 goroutine 来监控 opSetDone, opDelDone 事件, 互不干涉 因为数据类型有限, 所以不会有太多的 goroutine 监听 kafka 事件, 监听者不多, 则效率会更高.
如果不这么做, 每调用一次 NewDistributedCache 就会创建一个 goroutine 监听 kafka 事件. 会导致创建过多的 kafka 消费者, 这完全不是我们想要的. 既然提供了这个函数, 我们没办法完全保证其他开发者不会频繁调用这个函数, 控制权还需要交给自己.
计算:
kafka 在单节点上的消费者数量: 服务进程个数 * DistributedCache个数, 一般都是跑一个服务进程的. kafka 监听者总数量: 但节点上消费者个数 * 节点个数
func NewLocalCache ¶
NewLocalCache 创建的缓存不具备分布式的能力, 需要分布式缓存请使用 NewDistributedCache
func NewRedisCache ¶
func NewRedisCache[T any](ctx context.Context, cli redis.UniversalClient, opts ...RedisCacheOption[T]) (types.Cache[T], error)
NewRedisCache creates CacheManager implementation that uses Redis as backend. It is your responsibility to ensure the redis client is valid.
Types ¶
type CacheMetricsProvider ¶
type CacheMetricsProvider interface {
Metrics() *localMetrics
}
type DistributedCacheOption ¶
func WithKafkaBrokers ¶
func WithKafkaBrokers[T any](brokers []string) DistributedCacheOption[T]
func WithLocalCache ¶
func WithLocalCache[T any](localCache types.Cache[T]) DistributedCacheOption[T]
func WithLogger ¶
func WithLogger[T any](logger types.Logger) DistributedCacheOption[T]
func WithMaxGoroutines ¶
func WithMaxGoroutines[T any](maxGoRoutines int) DistributedCacheOption[T]
func WithRedisCache ¶
func WithRedisCache[T any](redisCache types.Cache[any]) DistributedCacheOption[T]
func WithTrace ¶
func WithTrace[T any](trace bool) DistributedCacheOption[T]
type RedisCacheOption ¶
RedisCacheOption is used to configure RedisCache.
func WithRedisKeyPrefix ¶
func WithRedisKeyPrefix[T any](prefix string) RedisCacheOption[T]