dcache

package
v0.10.13 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 8, 2026 License: Apache-2.0 Imports: 26 Imported by: 0

Documentation

Index

Constants

View Source
const (
	MIN_GOROUTINES = 10000 //nolint:staticcheck
	// TOPIC_REDIS_SET_DEL is the topic name to publish the entry associated with the key should update/delete event.
	TOPIC_REDIS_SET_DEL = "core-distributed-cache-set-del" //nolint:staticcheck
	// TOPIC_REDIS_DONE is the topic name to receive the entry associated with the key was update/delete event,
	// We should update/delete the local cache now.
	TOPIC_REDIS_DONE = "core-distributed-cache-done" //nolint:staticcheck

	GROUP_REDIS_SET_DEL = "core-distributed-cache-set-del" //nolint:staticcheck
	GROUP_REDIS_DONE    = "core-distributed-cache-done"    //nolint:staticcheck
)

Variables

This section is empty.

Functions

func Init

func Init() error

Init initializes the distributed cache system as a state node that manages Redis operations and coordinates cache synchronization across multiple distributed core nodes.

This function serves as the central coordinator for distributed cache operations by:

  • Consuming cache operation events (Set/Delete) from Kafka
  • Executing Redis operations in a consistent, ordered manner
  • Publishing completion events to notify other nodes to update their local caches
  • Maintaining data consistency through timestamp-based ordering and deduplication

Architecture Overview:

The distributed cache system consists of:
1. State Node (this Init function): Manages Redis and coordinates operations
2. Core Nodes: Maintain local secondary caches and send operation requests
3. Kafka: Message broker for event communication between nodes
4. Redis: Centralized cache storage for distributed data

Key Implementation Rules:

  1. Timestamp-based event filtering: Events with timestamps older than the recorded maximum timestamp are discarded to prevent out-of-order operations
  2. Per-key deduplication: Only the latest operation for each key is retained within a batch, ensuring consistency (e.g., if Set(11:14) and Delete(11:10) exist, only Set(11:14) will be executed)
  3. Ordered execution: Operations are sorted by timestamp and executed sequentially to maintain strict ordering of Redis cache operations
  4. Batch processing: Events are processed in batches with maximum timestamp tracking for efficient throughput and consistency guarantees

Error Handling:

  • Uses sync.Once to ensure single initialization
  • Validates Redis client availability before starting
  • Implements comprehensive error logging and metrics collection
  • Gracefully handles Kafka connection issues and message processing failures

Performance Optimizations:

  • Utilizes goroutine pools to control Kafka consumer concurrency
  • Implements batch processing to reduce Redis round trips
  • Uses concurrent maps for thread-safe timestamp tracking per key

func NewDistributedCache

func NewDistributedCache[T any](opts ...DistributedCacheOption[T]) (types.DistributedCache[T], error)

NewDistributedCache 为什么要为每种类型创建一个单独的缓存, 并放在一个并发 map 中? 每个类型的缓存都有自己的 goroutine 来监控 opSetDone, opDelDone 事件, 互不干涉 因为数据类型有限, 所以不会有太多的 goroutine 监听 kafka 事件, 监听者不多, 则效率会更高.

如果不这么做, 每调用一次 NewDistributedCache 就会创建一个 goroutine 监听 kafka 事件. 会导致创建过多的 kafka 消费者, 这完全不是我们想要的. 既然提供了这个函数, 我们没办法完全保证其他开发者不会频繁调用这个函数, 控制权还需要交给自己.

计算:

kafka 在单节点上的消费者数量: 服务进程个数 * DistributedCache个数, 一般都是跑一个服务进程的.
kafka 监听者总数量: 但节点上消费者个数 * 节点个数

func NewLocalCache

func NewLocalCache[T any]() (types.Cache[T], error)

NewLocalCache 创建的缓存不具备分布式的能力, 需要分布式缓存请使用 NewDistributedCache

func NewRedisCache

func NewRedisCache[T any](ctx context.Context, cli redis.UniversalClient, opts ...RedisCacheOption[T]) (types.Cache[T], error)

NewRedisCache creates CacheManager implementation that uses Redis as backend. It is your responsibility to ensure the redis client is valid.

Types

type CacheMetricsProvider

type CacheMetricsProvider interface {
	Metrics() *localMetrics
}

type DistributedCacheOption

type DistributedCacheOption[T any] func(*distributedCache[T]) error

func WithKafkaBrokers

func WithKafkaBrokers[T any](brokers []string) DistributedCacheOption[T]

func WithLocalCache

func WithLocalCache[T any](localCache types.Cache[T]) DistributedCacheOption[T]

func WithLogger

func WithLogger[T any](logger types.Logger) DistributedCacheOption[T]

func WithMaxGoroutines

func WithMaxGoroutines[T any](maxGoRoutines int) DistributedCacheOption[T]

func WithRedisCache

func WithRedisCache[T any](redisCache types.Cache[any]) DistributedCacheOption[T]

func WithTrace

func WithTrace[T any](trace bool) DistributedCacheOption[T]

type RedisCacheOption

type RedisCacheOption[T any] func(*redisCache[T]) error

RedisCacheOption is used to configure RedisCache.

func WithRedisKeyPrefix

func WithRedisKeyPrefix[T any](prefix string) RedisCacheOption[T]

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL