Documentation
¶
Index ¶
- Constants
- type AutoRefresh
- type Batch
- type CreateBatchesFunc
- type InMemoryAutoRefresh
- func (w *InMemoryAutoRefresh) Delete(key interface{})
- func (w *InMemoryAutoRefresh) DeleteDelayed(id ItemID) error
- func (w *InMemoryAutoRefresh) Get(id ItemID) (Item, error)
- func (w *InMemoryAutoRefresh) GetOrCreate(id ItemID, item Item) (Item, error)
- func (w *InMemoryAutoRefresh) Start(ctx context.Context) error
- func (w *InMemoryAutoRefresh) Update(id ItemID, item Item) (ok bool)
- type Item
- type ItemID
- type ItemSyncResponse
- type ItemWrapper
- type Option
- type Options
- type SyncAction
- type SyncFunc
Examples ¶
Constants ¶
const (
ErrNotFound errors.ErrorCode = "NOT_FOUND"
)
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type AutoRefresh ¶
type AutoRefresh interface {
// Starts background refresh of items. To shutdown the cache, cancel the context.
Start(ctx context.Context) error
// Get item by id.
Get(id ItemID) (Item, error)
// Get object if exists else create it.
GetOrCreate(id ItemID, item Item) (Item, error)
// DeleteDelayed queues an item for deletion. It Will get deleted as part of the next Sync cycle. Until the next sync
// cycle runs, Get and GetOrCreate will continue to return the Item in its previous state.
DeleteDelayed(id ItemID) error
}
AutoRefresh with regular GetOrCreate and Delete along with background asynchronous refresh. Caller provides callbacks for create, refresh and delete item. The cache doesn't provide apis to update items.
func NewAutoRefreshBatchedCache ¶
func NewAutoRefreshBatchedCache(name string, createBatches CreateBatchesFunc, syncCb SyncFunc, syncRateLimiter workqueue.RateLimiter, resyncPeriod time.Duration, parallelizm, size uint, scope promutils.Scope) (AutoRefresh, error)
Instantiates a new AutoRefresh Cache that syncs items in batches.
func NewAutoRefreshCache ¶
func NewAutoRefreshCache(name string, syncCb SyncFunc, syncRateLimiter workqueue.RateLimiter, resyncPeriod time.Duration, parallelizm, size uint, scope promutils.Scope) (AutoRefresh, error)
Instantiates a new AutoRefresh Cache that syncs items periodically.
Example ¶
package main
import (
"context"
"fmt"
"sync"
"time"
"k8s.io/client-go/util/workqueue"
"github.com/flyteorg/flyte/flytestdlib/errors"
"github.com/flyteorg/flyte/flytestdlib/promutils"
)
type ExampleItemStatus string
const (
ExampleStatusNotStarted ExampleItemStatus = "Not-enqueueLoopRunning"
ExampleStatusStarted ExampleItemStatus = "Started"
ExampleStatusSucceeded ExampleItemStatus = "Completed"
)
type ExampleCacheItem struct {
status ExampleItemStatus
id string
}
func (e *ExampleCacheItem) IsTerminal() bool {
return e.status == ExampleStatusSucceeded
}
func (e *ExampleCacheItem) ID() string {
return e.id
}
type ExampleService struct {
jobStatus map[string]ExampleItemStatus
lock sync.RWMutex
}
func newExampleService() *ExampleService {
return &ExampleService{
jobStatus: make(map[string]ExampleItemStatus),
lock: sync.RWMutex{},
}
}
// advance the status to next, and return
func (f *ExampleService) getStatus(id string) *ExampleCacheItem {
f.lock.Lock()
defer f.lock.Unlock()
if _, ok := f.jobStatus[id]; !ok {
f.jobStatus[id] = ExampleStatusStarted
}
f.jobStatus[id] = ExampleStatusSucceeded
return &ExampleCacheItem{f.jobStatus[id], id}
}
func main() {
// This auto-refresh cache can be used for cases where keys are created by caller but processed by
// an external service and we want to asynchronously keep track of its progress.
exampleService := newExampleService()
// define a sync method that the cache can use to auto-refresh in background
syncItemCb := func(ctx context.Context, batch []ItemWrapper) ([]ItemSyncResponse, error) {
updatedItems := make([]ItemSyncResponse, 0, len(batch))
for _, obj := range batch {
oldItem := obj.GetItem().(*ExampleCacheItem)
newItem := exampleService.getStatus(oldItem.ID())
if newItem.status != oldItem.status {
updatedItems = append(updatedItems, ItemSyncResponse{
ID: oldItem.ID(),
Item: newItem,
Action: Update,
})
}
}
return updatedItems, nil
}
// define resync period as time duration we want cache to refresh. We can go as low as we want but cache
// would still be constrained by time it takes to run Sync call for each item.
resyncPeriod := time.Millisecond
// Since number of items in the cache is dynamic, rate limiter is our knob to control resources we spend on
// sync.
rateLimiter := workqueue.DefaultControllerRateLimiter()
// since cache refreshes itself asynchronously, it may not notice that an object has been deleted immediately,
// so users of the cache should have the delete logic aware of this shortcoming (eg. not-exists may be a valid
// error during removal if based on status in cache).
cache, err := NewAutoRefreshCache("my-cache", syncItemCb, rateLimiter, resyncPeriod, 10, 100, promutils.NewTestScope())
if err != nil {
panic(err)
}
// start the cache with a context that would be to stop the cache by cancelling the context
ctx, cancel := context.WithCancel(context.Background())
err = cache.Start(ctx)
if err != nil {
panic(err)
}
// creating objects that go through a couple of state transitions to reach the final state.
item1 := &ExampleCacheItem{status: ExampleStatusNotStarted, id: "item1"}
item2 := &ExampleCacheItem{status: ExampleStatusNotStarted, id: "item2"}
_, err1 := cache.GetOrCreate(item1.id, item1)
_, err2 := cache.GetOrCreate(item2.id, item2)
if err1 != nil || err2 != nil {
fmt.Printf("unexpected error in create; err1: %v, err2: %v", err1, err2)
}
// wait for the cache to go through a few refresh cycles and then check status
time.Sleep(resyncPeriod * 10)
item, err := cache.Get(item1.ID())
if err != nil && errors.IsCausedBy(err, ErrNotFound) {
fmt.Printf("Item1 is no longer in the cache")
} else {
fmt.Printf("Current status for item1 is %v", item.(*ExampleCacheItem).status)
}
// stop the cache
cancel()
}
Output: Current status for item1 is Completed
type Batch ¶
type Batch = []ItemWrapper
func SingleItemBatches ¶
func SingleItemBatches(_ context.Context, snapshot []ItemWrapper) (batches []Batch, err error)
SingleItemBatches is a function that creates n batches of items, each with size 1
type CreateBatchesFunc ¶
type CreateBatchesFunc func(ctx context.Context, snapshot []ItemWrapper) (batches []Batch, err error)
CreateBatchesFunc is a func type. Your implementation of this function for your cache instance is responsible for subdividing the list of cache items into batches.
type InMemoryAutoRefresh ¶ added in v1.15.0
type InMemoryAutoRefresh struct {
// contains filtered or unexported fields
}
InMemoryAutoRefresh is an in-memory implementation of the AutoRefresh interface. It is a thread-safe general purpose auto-refresh cache that watches for updates asynchronously for the keys after they are added to the cache. An item can be inserted only once.
Get reads from sync.map while refresh is invoked on a snapshot of keys. Cache eventually catches up on deleted items.
Sync is run as a fixed-interval-scheduled-task, and is skipped if sync from previous cycle is still running.
func NewInMemoryAutoRefresh ¶ added in v1.15.0
func NewInMemoryAutoRefresh( name string, syncCb SyncFunc, syncRateLimiter workqueue.RateLimiter, resyncPeriod time.Duration, parallelizm uint, size uint, scope promutils.Scope, options ...Option, ) (*InMemoryAutoRefresh, error)
NewInMemoryAutoRefresh creates a new InMemoryAutoRefresh
func (*InMemoryAutoRefresh) Delete ¶ added in v1.15.0
func (w *InMemoryAutoRefresh) Delete(key interface{})
Delete deletes the item from the cache if it exists.
func (*InMemoryAutoRefresh) DeleteDelayed ¶ added in v1.15.0
func (w *InMemoryAutoRefresh) DeleteDelayed(id ItemID) error
DeleteDelayed queues an item for deletion. It Will get deleted as part of the next Sync cycle. Until the next sync cycle runs, Get and GetOrCreate will continue to return the Item in its previous state.
func (*InMemoryAutoRefresh) Get ¶ added in v1.15.0
func (w *InMemoryAutoRefresh) Get(id ItemID) (Item, error)
func (*InMemoryAutoRefresh) GetOrCreate ¶ added in v1.15.0
func (w *InMemoryAutoRefresh) GetOrCreate(id ItemID, item Item) (Item, error)
Return the item if exists else create it. Create should be invoked only once. recreating the object is not supported.
type ItemSyncResponse ¶
type ItemSyncResponse struct {
ID ItemID
Item Item
Action SyncAction
}
Represents the response for the sync func
type ItemWrapper ¶
Items are wrapped inside an ItemWrapper to be stored in the cache.
type Option ¶ added in v1.15.0
type Option func(*Options)
Option for the KeyfuncProvider
func WithClock ¶ added in v1.15.0
func WithClock(clock clock.WithTicker) Option
WithClock configures the clock to use for time related operations. Mainly used for unit testing.
func WithCreateBatchesFunc ¶ added in v1.15.0
func WithCreateBatchesFunc(createBatchesCb CreateBatchesFunc) Option
WithCreateBatchesFunc configures how cache items should be batched for refresh. Defaults to single item batching.
func WithSyncOnCreate ¶ added in v1.15.0
WithSyncOnCreate configures whether the cache will attempt to sync items upon creation or wait until the next sync interval. Disabling this can be useful when the cache is under high load and synchronization both frequently and in large batches. Defaults to true.
type Options ¶ added in v1.15.0
type Options struct {
// contains filtered or unexported fields
}
Options are configurable options for the InMemoryAutoRefresh.
type SyncAction ¶
type SyncAction int
Possible actions for the cache to take as a result of running the sync function on any given cache item
const ( Unchanged SyncAction = iota // The item returned has been updated and should be updated in the cache Update )
type SyncFunc ¶
type SyncFunc func(ctx context.Context, batch Batch) ( updatedBatch []ItemSyncResponse, err error)
SyncFunc func type. Your implementation of this function for your cache instance is responsible for returning The new Item and what action should be taken. The sync function has no insight into your object, and needs to be told explicitly if the new item is different from the old one.