cache

package
v2.0.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 31, 2026 License: Apache-2.0, Apache-2.0 Imports: 32 Imported by: 0

Documentation

Index

Examples

Constants

View Source
const (
	ErrNotFound errors.ErrorCode = "NOT_FOUND"
)

Variables

This section is empty.

Functions

func MustRegisterSubsection added in v2.0.1

func MustRegisterSubsection(name string, cfg config.Config) config.Section

func New added in v2.0.1

func New[T any](name string, cacheType Type, f Factory, loadFunc cache.LoadFunction[T], scope promutils.Scope) (cache.CacheInterface[T], error)

New creates a new cache with the given name and load function.

func NewRedisClient added in v2.0.1

func NewRedisClient(ctx context.Context, cfg RedisOptions, secretManager SecretManager, scope promutils.Scope) (*redis.Client, error)

NewRedisClient initializes a new redis client with the given options.

Types

type AutoRefresh

type AutoRefresh interface {
	// Starts background refresh of items. To shutdown the cache, cancel the context.
	Start(ctx context.Context) error

	// Get item by id.
	Get(id ItemID) (Item, error)

	// Get object if exists else create it.
	GetOrCreate(id ItemID, item Item) (Item, error)

	// DeleteDelayed queues an item for deletion. It Will get deleted as part of the next Sync cycle. Until the next sync
	// cycle runs, Get and GetOrCreate will continue to return the Item in its previous state.
	DeleteDelayed(id ItemID) error
}

AutoRefresh with regular GetOrCreate and Delete along with background asynchronous refresh. Caller provides callbacks for create, refresh and delete item. The cache doesn't provide apis to update items.

func NewAutoRefreshBatchedCache

func NewAutoRefreshBatchedCache(name string, createBatches CreateBatchesFunc, syncCb SyncFunc, syncRateLimiter workqueue.RateLimiter,
	resyncPeriod time.Duration, parallelizm, size uint, scope promutils.Scope) (AutoRefresh, error)

Instantiates a new AutoRefresh Cache that syncs items in batches.

func NewAutoRefreshCache

func NewAutoRefreshCache(name string, syncCb SyncFunc, syncRateLimiter workqueue.RateLimiter, resyncPeriod time.Duration,
	parallelizm, size uint, scope promutils.Scope) (AutoRefresh, error)

Instantiates a new AutoRefresh Cache that syncs items periodically.

Example
package main

import (
	"context"
	"fmt"
	"sync"
	"time"

	"k8s.io/client-go/util/workqueue"

	"github.com/flyteorg/flyte/v2/flytestdlib/errors"
	"github.com/flyteorg/flyte/v2/flytestdlib/promutils"
)

type ExampleItemStatus string

const (
	ExampleStatusNotStarted ExampleItemStatus = "Not-enqueueLoopRunning"
	ExampleStatusStarted    ExampleItemStatus = "Started"
	ExampleStatusSucceeded  ExampleItemStatus = "Completed"
)

type ExampleCacheItem struct {
	status ExampleItemStatus
	id     string
}

func (e *ExampleCacheItem) IsTerminal() bool {
	return e.status == ExampleStatusSucceeded
}

func (e *ExampleCacheItem) ID() string {
	return e.id
}

type ExampleService struct {
	jobStatus map[string]ExampleItemStatus
	lock      sync.RWMutex
}

func newExampleService() *ExampleService {
	return &ExampleService{
		jobStatus: make(map[string]ExampleItemStatus),
		lock:      sync.RWMutex{},
	}
}

// advance the status to next, and return
func (f *ExampleService) getStatus(id string) *ExampleCacheItem {
	f.lock.Lock()
	defer f.lock.Unlock()
	if _, ok := f.jobStatus[id]; !ok {
		f.jobStatus[id] = ExampleStatusStarted
	}

	f.jobStatus[id] = ExampleStatusSucceeded
	return &ExampleCacheItem{f.jobStatus[id], id}
}

func main() {
	// This auto-refresh cache can be used for cases where keys are created by caller but processed by
	// an external service and we want to asynchronously keep track of its progress.
	exampleService := newExampleService()

	// define a sync method that the cache can use to auto-refresh in background
	syncItemCb := func(ctx context.Context, batch []ItemWrapper) ([]ItemSyncResponse, error) {
		updatedItems := make([]ItemSyncResponse, 0, len(batch))
		for _, obj := range batch {
			oldItem := obj.GetItem().(*ExampleCacheItem)
			newItem := exampleService.getStatus(oldItem.ID())
			if newItem.status != oldItem.status {
				updatedItems = append(updatedItems, ItemSyncResponse{
					ID:     oldItem.ID(),
					Item:   newItem,
					Action: Update,
				})
			}
		}

		return updatedItems, nil
	}

	// define resync period as time duration we want cache to refresh. We can go as low as we want but cache
	// would still be constrained by time it takes to run Sync call for each item.
	resyncPeriod := time.Millisecond

	// Since number of items in the cache is dynamic, rate limiter is our knob to control resources we spend on
	// sync.
	rateLimiter := workqueue.DefaultControllerRateLimiter()

	// since cache refreshes itself asynchronously, it may not notice that an object has been deleted immediately,
	// so users of the cache should have the delete logic aware of this shortcoming (eg. not-exists may be a valid
	// error during removal if based on status in cache).
	cache, err := NewAutoRefreshCache("my-cache", syncItemCb, rateLimiter, resyncPeriod, 10, 100, promutils.NewTestScope())
	if err != nil {
		panic(err)
	}

	// start the cache with a context that would be to stop the cache by cancelling the context
	ctx, cancel := context.WithCancel(context.Background())
	err = cache.Start(ctx)
	if err != nil {
		panic(err)
	}

	// creating objects that go through a couple of state transitions to reach the final state.
	item1 := &ExampleCacheItem{status: ExampleStatusNotStarted, id: "item1"}
	item2 := &ExampleCacheItem{status: ExampleStatusNotStarted, id: "item2"}
	_, err1 := cache.GetOrCreate(item1.id, item1)
	_, err2 := cache.GetOrCreate(item2.id, item2)
	if err1 != nil || err2 != nil {
		fmt.Printf("unexpected error in create; err1: %v, err2: %v", err1, err2)
	}

	// wait for the cache to go through a few refresh cycles and then check status
	time.Sleep(resyncPeriod * 10)
	item, err := cache.Get(item1.ID())
	if err != nil && errors.IsCausedBy(err, ErrNotFound) {
		fmt.Printf("Item1 is no longer in the cache")
	} else {
		fmt.Printf("Current status for item1 is %v", item.(*ExampleCacheItem).status)
	}

	// stop the cache
	cancel()

}
Output:

Current status for item1 is Completed

type Batch

type Batch = []ItemWrapper

func SingleItemBatches

func SingleItemBatches(_ context.Context, snapshot []ItemWrapper) (batches []Batch, err error)

SingleItemBatches is a function that creates n batches of items, each with size 1

type Config added in v2.0.1

type Config struct {
	// Type of cache to use
	Type Type `json:"type" pflag:",type, Type of cache to use"`

	// Config for in-memory cache
	InMemoryFixedSize InMemoryFixedSizeConfig `json:"inMemoryFixedSize" pflag:"-,Config for in-memory cache"`

	// Config for Redis cache
	Redis RedisConfig `json:"redis" pflag:"-,Config for Redis cache"`
}

func GetConfig added in v2.0.1

func GetConfig() *Config

type CreateBatchesFunc

type CreateBatchesFunc func(ctx context.Context, snapshot []ItemWrapper) (batches []Batch, err error)

CreateBatchesFunc is a func type. Your implementation of this function for your cache instance is responsible for subdividing the list of cache items into batches.

type Factory added in v2.0.1

type Factory struct {
	// contains filtered or unexported fields
}

func NewFactory added in v2.0.1

func NewFactory(ctx context.Context, c *Config, secretManager SecretManager, scope promutils.Scope) (Factory, error)

NewFactory initializes a new cache factory that should be used whenever typed caches are created.

func (Factory) Type added in v2.0.1

func (f Factory) Type() Type

type InMemoryAutoRefresh

type InMemoryAutoRefresh struct {
	// contains filtered or unexported fields
}

InMemoryAutoRefresh is an in-memory implementation of the AutoRefresh interface. It is a thread-safe general purpose auto-refresh cache that watches for updates asynchronously for the keys after they are added to the cache. An item can be inserted only once.

Get reads from sync.map while refresh is invoked on a snapshot of keys. Cache eventually catches up on deleted items.

Sync is run as a fixed-interval-scheduled-task, and is skipped if sync from previous cycle is still running.

func NewInMemoryAutoRefresh

func NewInMemoryAutoRefresh(
	name string,
	syncCb SyncFunc,
	syncRateLimiter workqueue.RateLimiter,
	resyncPeriod time.Duration,
	parallelizm uint,
	size uint,
	scope promutils.Scope,
	options ...Option,
) (*InMemoryAutoRefresh, error)

NewInMemoryAutoRefresh creates a new InMemoryAutoRefresh

func (*InMemoryAutoRefresh) Delete

func (w *InMemoryAutoRefresh) Delete(key interface{})

Delete deletes the item from the cache if it exists.

func (*InMemoryAutoRefresh) DeleteDelayed

func (w *InMemoryAutoRefresh) DeleteDelayed(id ItemID) error

DeleteDelayed queues an item for deletion. It Will get deleted as part of the next Sync cycle. Until the next sync cycle runs, Get and GetOrCreate will continue to return the Item in its previous state.

func (*InMemoryAutoRefresh) Get

func (w *InMemoryAutoRefresh) Get(id ItemID) (Item, error)

func (*InMemoryAutoRefresh) GetOrCreate

func (w *InMemoryAutoRefresh) GetOrCreate(id ItemID, item Item) (Item, error)

Return the item if exists else create it. Create should be invoked only once. recreating the object is not supported.

func (*InMemoryAutoRefresh) Start

func (w *InMemoryAutoRefresh) Start(ctx context.Context) error

func (*InMemoryAutoRefresh) Update

func (w *InMemoryAutoRefresh) Update(id ItemID, item Item) (ok bool)

Update updates the item only if it exists in the cache, return true if we updated the item.

type InMemoryFixedSizeConfig added in v2.0.1

type InMemoryFixedSizeConfig struct {
	Size              *resource.Quantity `` /* 150-byte string literal not displayed */
	DefaultExpiration config.Duration    `json:"defaultExpiration" pflag:",Default expiration time for items"`
}

InMemoryFixedSizeConfig is a copy of ristretto.Config that can be used in config files (removed func references)

type Item

type Item interface {
	IsTerminal() bool
}

type ItemID

type ItemID = string

type ItemSyncResponse

type ItemSyncResponse struct {
	ID     ItemID
	Item   Item
	Action SyncAction
}

Represents the response for the sync func

type ItemWrapper

type ItemWrapper interface {
	GetID() ItemID
	GetItem() Item
}

Items are wrapped inside an ItemWrapper to be stored in the cache.

type Marshaler added in v2.0.1

type Marshaler struct {
	cache.CacheInterface[any]
	// contains filtered or unexported fields
}

Marshaler is the struct that marshal and unmarshal cache values

func NewMsgPackMarshaler added in v2.0.1

func NewMsgPackMarshaler(cache cache.CacheInterface[any]) *Marshaler

NewMsgPackMarshaler creates a new marshaler that marshals/unmarshals cache values using MsgPack.

func NewProtoMarshaler added in v2.0.1

func NewProtoMarshaler(cache cache.CacheInterface[any]) *Marshaler

NewProtoMarshaler creates a new marshaler that marshals/unmarshals cache values using proto.Marshal.

func (*Marshaler) Get added in v2.0.1

func (c *Marshaler) Get(ctx context.Context, key any, returnObj any) (any, error)

Get obtains a value from cache and unmarshal value with given object

func (*Marshaler) Set added in v2.0.1

func (c *Marshaler) Set(ctx context.Context, key, object any, options ...store.Option) (err error)

Set sets a value in cache by marshaling value

type NamespacedCache added in v2.0.1

type NamespacedCache[T any] struct {
	cache.CacheInterface[T]
	// contains filtered or unexported fields
}

NamespacedCache is a wrapper around a cache that adds a namespace to all keys

func NewNamespacedCache added in v2.0.1

func NewNamespacedCache[T any](namespace string, underlying cache.CacheInterface[T]) NamespacedCache[T]

NewNamespacedCache creates a new namespaced cache that prefixes any key with the given namespace

func (NamespacedCache[T]) Delete added in v2.0.1

func (n NamespacedCache[T]) Delete(ctx context.Context, key any) error

func (NamespacedCache[T]) Get added in v2.0.1

func (n NamespacedCache[T]) Get(ctx context.Context, key any) (T, error)

func (NamespacedCache[T]) GetType added in v2.0.1

func (n NamespacedCache[T]) GetType() string

func (NamespacedCache[T]) Set added in v2.0.1

func (n NamespacedCache[T]) Set(ctx context.Context, key any, object T, options ...store.Option) error

type Option

type Option func(*Options)

Option for the KeyfuncProvider

func WithClock

func WithClock(clock clock.WithTicker) Option

WithClock configures the clock to use for time related operations. Mainly used for unit testing.

func WithCreateBatchesFunc

func WithCreateBatchesFunc(createBatchesCb CreateBatchesFunc) Option

WithCreateBatchesFunc configures how cache items should be batched for refresh. Defaults to single item batching.

func WithSyncOnCreate

func WithSyncOnCreate(syncOnCreate bool) Option

WithSyncOnCreate configures whether the cache will attempt to sync items upon creation or wait until the next sync interval. Disabling this can be useful when the cache is under high load and synchronization both frequently and in large batches. Defaults to true.

type Options

type Options struct {
	// contains filtered or unexported fields
}

Options are configurable options for the InMemoryAutoRefresh.

type RedisConfig added in v2.0.1

type RedisConfig struct {
	Options           RedisOptions    `json:"options" pflag:"-,Redis options."`
	DefaultExpiration config.Duration `json:"defaultExpiration" pflag:",Default expiration time for items."`
}

type RedisInstrumentationHook added in v2.0.1

type RedisInstrumentationHook struct {
	// contains filtered or unexported fields
}

A hook that adds metrics around redis operations.

Usage:

redisClient := redis.NewClient(...)
redisClient.AddHook(NewRedisInstrumentationHook(scope.NewSubScope("redis_client")))

func NewRedisInstrumentationHook added in v2.0.1

func NewRedisInstrumentationHook(scope promutils.Scope) *RedisInstrumentationHook

func (RedisInstrumentationHook) DialHook added in v2.0.1

func (RedisInstrumentationHook) ProcessHook added in v2.0.1

func (RedisInstrumentationHook) ProcessPipelineHook added in v2.0.1

type RedisOptions added in v2.0.1

type RedisOptions struct {
	// The network type, either tcp or unix.
	// Default is tcp.
	Network string
	// host:port address.
	Addr string

	// ClientName will execute the `CLIENT SETNAME ClientName` command for each conn.
	ClientName string

	// Protocol 2 or 3. Use the version to negotiate RESP version with redis-server.
	// Default is 3.
	Protocol int
	// Use the specified Username to authenticate the current connection
	// with one of the connections defined in the ACL list when connecting
	// to a Redis 6.0 instance, or greater, that is using the Redis ACL system.
	Username string
	// Optional password. Must match the password specified in the
	// requirepass server configuration option (if connecting to a Redis 5.0 instance, or lower),
	// or the User Password when connecting to a Redis 6.0 instance, or greater,
	// that is using the Redis ACL system.
	Password string

	// PasswordSecretName is the name of the secret that contains the password.
	PasswordSecretName string

	// Database to be selected after connecting to the server.
	DB int

	// Maximum number of retries before giving up.
	// Default is 3 retries; -1 (not 0) disables retries.
	MaxRetries int
	// Minimum backoff between each retry.
	// Default is 8 milliseconds; -1 disables backoff.
	MinRetryBackoff config.Duration
	// Maximum backoff between each retry.
	// Default is 512 milliseconds; -1 disables backoff.
	MaxRetryBackoff config.Duration

	// Dial timeout for establishing new connections.
	// Default is 5 seconds.
	DialTimeout config.Duration
	// Timeout for socket reads. If reached, commands will fail
	// with a timeout instead of blocking. Supported values:
	//   - `0` - default timeout (3 seconds).
	//   - `-1` - no timeout (block indefinitely).
	//   - `-2` - disables SetReadDeadline calls completely.
	ReadTimeout config.Duration
	// Timeout for socket writes. If reached, commands will fail
	// with a timeout instead of blocking.  Supported values:
	//   - `0` - default timeout (3 seconds).
	//   - `-1` - no timeout (block indefinitely).
	//   - `-2` - disables SetWriteDeadline calls completely.
	WriteTimeout config.Duration
	// ContextTimeoutEnabled controls whether the client respects context timeouts and deadlines.
	// See https://redis.uptrace.dev/guide/go-redis-debugging.html#timeouts
	ContextTimeoutEnabled bool

	// Type of connection pool.
	// true for FIFO pool, false for LIFO pool.
	// Note that FIFO has slightly higher overhead compared to LIFO,
	// but it helps closing idle connections faster reducing the pool size.
	PoolFIFO bool
	// Base number of socket connections.
	// Default is 10 connections per every available CPU as reported by runtime.GOMAXPROCS.
	// If there is not enough connections in the pool, new connections will be allocated in excess of PoolSize,
	// you can limit it through MaxActiveConns
	PoolSize int
	// Amount of time client waits for connection if all connections
	// are busy before returning an error.
	// Default is ReadTimeout + 1 second.
	PoolTimeout config.Duration
	// Minimum number of idle connections which is useful when establishing
	// new connection is slow.
	// Default is 0. the idle connections are not closed by default.
	MinIdleConns int
	// Maximum number of idle connections.
	// Default is 0. the idle connections are not closed by default.
	MaxIdleConns int
	// Maximum number of connections allocated by the pool at a given time.
	// When zero, there is no limit on the number of connections in the pool.
	MaxActiveConns int
	// ConnMaxIdleTime is the maximum amount of time a connection may be idle.
	// Should be less than server's timeout.
	//
	// Expired connections may be closed lazily before reuse.
	// If d <= 0, connections are not closed due to a connection's idle time.
	//
	// Default is 30 minutes. -1 disables idle timeout check.
	ConnMaxIdleTime config.Duration
	// ConnMaxLifetime is the maximum amount of time a connection may be reused.
	//
	// Expired connections may be closed lazily before reuse.
	// If <= 0, connections are not closed due to a connection's age.
	//
	// Default is to not close idle connections.
	ConnMaxLifetime config.Duration

	// TLS Config to use. When set, TLS will be negotiated.
	TLSConfig *tls.Config

	// // Disable set-lib on connect. Default is false.
	DisableIndentity bool
}

RedisOptions is a copy of redis.Options that can be used in config files (removed func references)

func (*RedisOptions) GetOptions added in v2.0.1

func (r *RedisOptions) GetOptions(ctx context.Context, secretManager SecretManager) (*redis.Options, error)

type SecretManager added in v2.0.1

type SecretManager interface {
	Get(ctx context.Context, key string) (string, error)
}

type StringCache added in v2.0.1

type StringCache = cache.CacheInterface[string]

type SyncAction

type SyncAction int

Possible actions for the cache to take as a result of running the sync function on any given cache item

const (
	Unchanged SyncAction = iota

	// The item returned has been updated and should be updated in the cache
	Update
)

type SyncFunc

type SyncFunc func(ctx context.Context, batch Batch) (
	updatedBatch []ItemSyncResponse, err error)

SyncFunc func type. Your implementation of this function for your cache instance is responsible for returning The new Item and what action should be taken. The sync function has no insight into your object, and needs to be told explicitly if the new item is different from the old one.

type Type added in v2.0.1

type Type uint8
const (
	TypeInMemoryFixedSize Type = iota
	TypeRedis
)

func TypeString added in v2.0.1

func TypeString(s string) (Type, error)

TypeString retrieves an enum value from the enum constants string name. Throws an error if the param is not part of the enum.

func TypeValues added in v2.0.1

func TypeValues() []Type

TypeValues returns all values of the enum

func (Type) IsAType added in v2.0.1

func (i Type) IsAType() bool

IsAType returns "true" if the value is listed in the enum definition. "false" otherwise

func (Type) MarshalJSON added in v2.0.1

func (i Type) MarshalJSON() ([]byte, error)

MarshalJSON implements the json.Marshaler interface for Type

func (Type) MarshalYAML added in v2.0.1

func (i Type) MarshalYAML() (interface{}, error)

MarshalYAML implements a YAML Marshaler for Type

func (Type) String added in v2.0.1

func (i Type) String() string

func (*Type) UnmarshalJSON added in v2.0.1

func (i *Type) UnmarshalJSON(data []byte) error

UnmarshalJSON implements the json.Unmarshaler interface for Type

func (*Type) UnmarshalYAML added in v2.0.1

func (i *Type) UnmarshalYAML(unmarshal func(interface{}) error) error

UnmarshalYAML implements a YAML Unmarshaler for Type

type TypedMarshaler added in v2.0.1

type TypedMarshaler[T any] struct {
	*Marshaler
	// contains filtered or unexported fields
}

TypedMarshaler is the struct that handles typed marshaling and unmarshaling

func NewTypedMarshaler added in v2.0.1

func NewTypedMarshaler[T any](marshaler *Marshaler) *TypedMarshaler[T]

NewTypedMarshaler creates a new typed marshaler. It takes a marshaler and a type T. It returns a typed marshaler that can be used to marshal and unmarshal values of type T. If T is a pointer type, it will unmarshal into a new instance of T. Otherwise, it will unmarshal into a new instance of *T.

func (*TypedMarshaler[T]) Get added in v2.0.1

func (t *TypedMarshaler[T]) Get(ctx context.Context, key any) (T, error)

func (*TypedMarshaler[T]) Set added in v2.0.1

func (t *TypedMarshaler[T]) Set(ctx context.Context, key any, object T, options ...store.Option) error

type UInt64Cache added in v2.0.1

type UInt64Cache = cache.CacheInterface[uint64]

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL