sturdyc

package module
v1.0.8 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 21, 2024 License: MIT Imports: 14 Imported by: 21

README

sturdyC-fn-2

sturdyc: a caching library for building sturdy systems

Mentioned in Awesome Go Go Reference Go Report Card Test codecov

Sturdyc is an in-memory cache that supports non-blocking reads and has a configurable number of shards that makes it possible to achieve writes without any lock contention. The xxhash algorithm is used for efficient key distribution.

It has all the functionality you would expect from a caching library, but what sets it apart are the features designed to make I/O heavy applications both robust and highly performant.

Installing

go get github.com/viccon/sturdyc

At a glance

Deduplication

sturdyc performs in-flight tracking for every key. This also works for batch operations, where it can deduplicate a batch of cache misses and then assemble the response by picking records from multiple in-flight requests.

Early refreshes

There is also a lot of extra functionality you can enable, one being early refreshes which instructs the cache to refresh the keys which are in active rotation, thereby preventing them from ever expiring. This can have a huge impact on an applications latency as you're able to continiously serve the most frequently used data from memory:

sturdyc.WithEarlyRefreshes(minRefreshDelay, maxRefreshDelay, exponentialBackOff)
Batching

When the cache retrieves data from a batchable source, it will disassemble the response and then cache each record individually based on the permutations of the options with which it was fetched.

This can be used to significantly reduce the application's outgoing requests by also enabling refresh coalescing. Internally, sturdyc creates a buffer for each unique option set and gathers IDs until the idealBatchSize is reached or the batchBufferTimeout expires:

sturdyc.WithRefreshCoalescing(idealBatchSize, batchBufferTimeout)
Distributed key-value store

You can also configure sturdyc to synchronize its in-memory cache with a distributed key-value store of your choosing:

sturdyc.WithDistributedStorage(storage),
Evictions

The cache runs a background job which continuously evicts expired records from each shard. However, there are options to both tweak the interval and disable the functionality altogether. This is can give you a slight performance boost in situations where you're unlikely to exceed any memory limits.

When the cache reaches its capacity, a fallback eviction is triggered. This process performs evictions on a per-shard basis, selecting records for removal based on recency. The eviction algorithm uses quickselect, which has an O(N) time complexity without requiring write locks on reads to update a recency list.

Latency improvements

Below is a screenshot showing the latency improvements we've observed after replacing our old cache with this package:

  Screenshot 2024-05-10 at 10 15 18  

In addition to this, we've seen our number of outgoing requests decrease by more than 90% after enabling refresh coalescing.

Adding sturdyc to your application:

I have tried to design the API in a way that should make it effortless to add sturdyc to an existing application. We'll use the following two methods of an API client as examples:

// Order retrieves a single order by ID.
func (c *Client) Order(ctx context.Context, id string) (Order, error) {
	timeoutCtx, cancel := context.WithTimeout(ctx, c.timeout)
	defer cancel()

	var response Order
	err := requests.URL(c.orderURL).
		Pathf("/order/%s", id).
		ToJSON(&response).
		Fetch(timeoutCtx)

	return response, err
}

// Orders retrieves a batch of orders by their IDs.
func (c *Client) Orders(ctx context.Context, ids []string) (map[string]Order, error) {
	timeoutCtx, cancel := context.WithTimeout(ctx, c.timeout)
	defer cancel()

	var response map[string]Order
	err := requests.URL(c.orderURL).
		Path("/orders").
		Param("ids", strings.Join(ids, ",")).
		ToJSON(&response).
		Fetch(timeoutCtx)

	return response, err
}

All we have to do is wrap the code that retrieves the data in a function, and then hand it over to our cache client:

func (c *Client) Order(ctx context.Context, id string) (Order, error) {
	fetchFunc := func(ctx context.Context) (Order, error) {
		timeoutCtx, cancel := context.WithTimeout(ctx, c.timeout)
		defer cancel()

		var response Order
		err := requests.URL(c.orderURL).
			Pathf("/order/%s", id).
			ToJSON(&response).
			Fetch(timeoutCtx)

		return response, err
	}

	return c.cache.GetOrFetch(ctx, "order-"+id, fetchFunc)
}

func (c *Client) Orders(ctx context.Context, ids []string) (map[string]Order, error) {
	fetchFunc := func(ctx context.Context, cacheMisses []string) (map[string]Order, error) {
		timeoutCtx, cancel := context.WithTimeout(ctx, c.timeout)
		defer cancel()

		var response map[string]Order
		err := requests.URL(c.orderURL).
			Path("/orders").
			Param("ids", strings.Join(cacheMisses, ",")).
			ToJSON(&response).
			Fetch(timeoutCtx)

		return response, err
	}

	return c.cache.GetOrFetchBatch(ctx, ids, c.persistentCache.BatchKeyFn("orders"), fetchFunc)
}

The example above retrieves the data from an HTTP API, but it's just as easy to wrap a database query, a remote procedure call, a disk read, or any other I/O operation. We can also use closures to make sure that the function that retrieves the data has all of the values it needs.

Next, we'll look at how to configure the cache in more detail.

Table of contents

I've included examples that cover the entire API, and I encourage you to read these examples in the order they appear. Most of them build on each other, and many share configurations. Here is a brief overview of what the examples are going to cover:

Creating a cache client

The first thing you will have to do is to create a cache client to hold your configuration:

	// Maximum number of entries in the cache. Exceeding this number will trigger
	// an eviction (as long as the "evictionPercentage" is greater than 0).
	capacity := 10000
	// Number of shards to use. Increasing this number will reduce write lock collisions.
	numShards := 10
	// Time-to-live for cache entries.
	ttl := 2 * time.Hour
	// Percentage of entries to evict when the cache reaches its capacity. Setting this
	// to 0 will make writes a no-op until an item has either expired or been deleted.
	evictionPercentage := 10

	// Create a cache client with the specified configuration.
	cacheClient := sturdyc.New[int](capacity, numShards, ttl, evictionPercentage)

	cacheClient.Set("key1", 99)
	log.Println(cacheClient.Size())
	log.Println(cacheClient.Get("key1"))

	cacheClient.Delete("key1")
	log.Println(cacheClient.Size())
	log.Println(cacheClient.Get("key1"))

Next, we'll look at some of the more advanced features.

Stampede protection

Cache stampedes (also known as thundering herd) occur when many requests for a particular piece of data, which has just expired or been evicted from the cache, come in at once.

Preventing this has been one of the key objectives for this package. We do not want to cause a significant load on an underlying data source every time one of our keys expires.

The GetOrFetch function takes a key and a function for retrieving the data if it's not in the cache. The cache is going to ensure that we never have more than a single request per key. It achieves this by tracking all of the in-flight requests:

	var count atomic.Int32
	fetchFn := func(_ context.Context) (int, error) {
		count.Add(1)
		time.Sleep(time.Second)
		return 1337, nil
	}

	var wg sync.WaitGroup
	for i := 0; i < 5; i++ {
		wg.Add(1)
		go func() {
			// We can ignore the error given the fetchFn we're using.
			val, _ := cacheClient.GetOrFetch(context.Background(), "key2", fetchFn)
			log.Printf("got value: %d\n", val)
			wg.Done()
		}()
	}
	wg.Wait()

	log.Printf("fetchFn was called %d time\n", count.Load())
	log.Println(cacheClient.Get("key2"))

Running this program we'll see that our requests for "key2" were deduplicated, and that the fetchFn only got called once:

❯ go run .
2024/05/21 08:06:29 got value: 1337
2024/05/21 08:06:29 got value: 1337
2024/05/21 08:06:29 got value: 1337
2024/05/21 08:06:29 got value: 1337
2024/05/21 08:06:29 got value: 1337
2024/05/21 08:06:29 fetchFn was called 1 time
2024/05/21 08:06:29 1337 true

For data sources that supports batching, we're able to use the GetOrFetchBatch function. To demonstrate this, I'll create a mock function that sleeps for 5 seconds, and then returns a map with a numerical value for every ID:

	var count atomic.Int32
	fetchFn := func(_ context.Context, ids []string) (map[string]int, error) {
		count.Add(1)
		time.Sleep(time.Second * 5)

		response := make(map[string]int, len(ids))
		for _, id := range ids {
			num, _ := strconv.Atoi(id)
			response[id] = num
		}

		return response, nil
	}

Next, we'll need some batches to test with, so I created three batches with 5 IDs each:

	batches := [][]string{
		{"1", "2", "3", "4", "5"},
		{"6", "7", "8", "9", "10"},
		{"11", "12", "13", "14", "15"},
	}

IDs can often be fetched from multiple data sources. Hence, we'll want to prefix the ID in order to make the cache key unique. The package provides more functionality for this that we'll see later on, but for now we'll use the most simple version which adds a string prefix to every ID:

	keyPrefixFn := cacheClient.BatchKeyFn("my-data-source")

We can now request each batch in a separate goroutine:

	for _, batch := range batches {
		go func() {
			res, _ := cacheClient.GetOrFetchBatch(context.Background(), batch, keyPrefixFn, fetchFn)
			log.Printf("got batch: %v\n", res)
		}()
	}

	// Give the goroutines above a chance to run to ensure that the batches are in-flight.
	time.Sleep(time.Second * 3)

At this point, the cache should have in-flight requests for IDs 1-15. Knowing this, we'll test the stampede protection by launching another five goroutines. Each goroutine is going to request two random IDs from our batches:

	// Launch another 5 goroutines that are going to pick two random IDs from any of the batches.
	var wg sync.WaitGroup
	for i := 0; i < 5; i++ {
		wg.Add(1)
		go func() {
			ids := []string{batches[rand.IntN(2)][rand.IntN(4)], batches[rand.IntN(2)][rand.IntN(4)]}
			res, _ := cacheClient.GetOrFetchBatch(context.Background(), ids, keyPrefixFn, fetchFn)
			log.Printf("got batch: %v\n", res)
			wg.Done()
		}()
	}

	wg.Wait()
	log.Printf("fetchFn was called %d times\n", count.Load())

Running this program, and looking at the logs, we'll see that the cache is able to pick IDs from different batches:

❯ go run .
2024/05/21 09:14:23 got batch: map[8:8 9:9]
2024/05/21 09:14:23 got batch: map[4:4 9:9] <---- NOTE: ID 4 and 9 are part of different batches
2024/05/21 09:14:23 got batch: map[11:11 12:12 13:13 14:14 15:15]
2024/05/21 09:14:23 got batch: map[1:1 7:7] <---- NOTE: ID 1 and 7 are part of different batches
2024/05/21 09:14:23 got batch: map[10:10 6:6 7:7 8:8 9:9]
2024/05/21 09:14:23 got batch: map[3:3 9:9] <---- NOTE: ID 3 and 9 are part of different batches
2024/05/21 09:14:23 got batch: map[1:1 2:2 3:3 4:4 5:5]
2024/05/21 09:14:23 got batch: map[4:4 9:9] <---- NOTE: ID 4 and 9 are part of different batches
2024/05/21 09:14:23 fetchFn was called 3 times <---- NOTE: We only generated 3 outgoing requests.

And on the last line, we can see that the additional calls didn't generate any further outgoing requests. The entire example is available here.

Early refreshes

Being able to prevent your most frequently used records from ever expiring can have a significant impact on your application's latency. Therefore, the package provides a WithEarlyRefreshes option, which instructs the cache to continuously refresh these records in the background before they expire.

A refresh gets scheduled if a key is requested again after a configurable amount of time has passed. This is an important distinction because it means that the cache doesn't just naively refresh every key it's ever seen. Instead, it only refreshes the records that are actually in active rotation, while allowing unused keys to be deleted once their TTL expires.

Below is an example configuration that you can use to enable this functionality:

func main() {
	// Set a minimum and maximum refresh delay for the record. This is
	// used to spread out the refreshes of our entries evenly over time.
	// We don't want our outgoing requests graph to look like a comb that
    // sends a spike of refreshes every 30 ms.
	minRefreshDelay := time.Millisecond * 10
	maxRefreshDelay := time.Millisecond * 30
	// The base used for exponential backoff when retrying a refresh. Most of the
	// time, we perform refreshes well in advance of the records expiry time.
	// Hence, we can use this to make it easier for a system that is having
	// trouble to get back on it's feet by making fewer refreshes when we're
	// seeing a lot of errors. Once we receive a successful response, the
	// refreshes return to their original frequency. You can set this to 0
	// if you don't want this behavior.
	retryBaseDelay := time.Millisecond * 10

	// Create a cache client with the specified configuration.
	cacheClient := sturdyc.New[string](capacity, numShards, ttl, evictionPercentage,
		sturdyc.WithEarlyRefreshes(minRefreshDelay, maxRefreshDelay, retryBaseDelay),
	)
}

And to get a feeling for how this works, we'll use the configuration above and then create a simple API client which embedds the cache:

type API struct {
	*sturdyc.Client[string]
}

func NewAPI(c *sturdyc.Client[string]) *API {
	return &API{c}
}

func (a *API) Get(ctx context.Context, key string) (string, error) {
	// This could be an API call, a database query, etc.
    fetchFn := func(_ context.Context) (string, error) {
		log.Printf("Fetching value for key: %s\n", key)
		return "value", nil
	}
	return a.GetOrFetch(ctx, key, fetchFn)
}

Now we can return to our main function to create an instance of it, and then call the Get method in a loop:

func main() {
	// ...

	cacheClient := sturdyc.New[string](...)

	// Create a new API instance with the cache client.
	api := NewAPI(cacheClient)

	// We are going to retrieve the values every 10 milliseconds, however the
	// logs will reveal that actual refreshes fluctuate randomly within a 10-30
	// millisecond range.
	for i := 0; i < 100; i++ {
		val, err := api.Get(context.Background(), "key")
		if err != nil {
			log.Println("Failed to  retrieve the record from the cache.")
			continue
		}
		log.Printf("Value: %s\n", val)
		time.Sleep(minRefreshDelay)
	}
}

Running this program, we're going to see that the value gets refreshed once every 2-3 retrievals:

go run .
2024/04/07 09:05:29 Fetching value for key: key
2024/04/07 09:05:29 Value: value
2024/04/07 09:05:29 Value: value
2024/04/07 09:05:29 Fetching value for key: key
2024/04/07 09:05:29 Value: value
2024/04/07 09:05:29 Value: value
2024/04/07 09:05:29 Value: value
2024/04/07 09:05:29 Fetching value for key: key
...

This is going to reduce your response times significantly because none of your users will have to wait for the I/O operation that refreshes the data. It's always performed in the background as long as the key is being continuously requested. Being afraid that the record might get too stale if users stop requesting it is an indication of a TTL that is set too high. Remember, even if the TTL is exceeded and the key expires, you'll still get deduplication if it's suddenly requested in a burst again. The only difference is that the users will have to wait for the I/O operation that retrieves it.

Additionally, to provide a degraded experience when an upstream system encounters issues, you can set a high TTL and a low refresh time. When everything is working as expected, the records will be refreshed continuously. However, if the upstream system encounters issues and stops responding, you can fall back to cached records for the duration of the TTL.

What if the record was deleted? Our cache might use a 2-hour-long TTL, and we definitely don't want it to take that long for the deletion to propagate.

However, if we were to modify our client so that it returns an error after the first request:

type API struct {
	count int
	*sturdyc.Client[string]
}

func NewAPI(c *sturdyc.Client[string]) *API {
	return &API{0, c}
}

func (a *API) Get(ctx context.Context, key string) (string, error) {
	fetchFn := func(_ context.Context) (string, error) {
		a.count++
		log.Printf("Fetching value for key: %s\n", key)
		if a.count == 1 {
			return "value", nil
		}
		return "", errors.New("error this key does not exist")
	}
	return a.GetOrFetch(ctx, key, fetchFn)
}

and then run the program again:

cd examples/stampede
go run .

We'll see that the exponential backoff kicks in, resulting in more iterations for every refresh, but the value is still being printed:

2024/05/09 13:22:03 Fetching value for key: key
2024/05/09 13:22:03 Value: value
2024/05/09 13:22:03 Value: value
2024/05/09 13:22:03 Value: value
2024/05/09 13:22:03 Value: value
2024/05/09 13:22:03 Value: value
2024/05/09 13:22:03 Value: value
2024/05/09 13:22:03 Value: value
2024/05/09 13:22:04 Value: value
2024/05/09 13:22:04 Value: value
2024/05/09 13:22:04 Value: value
2024/05/09 13:22:04 Value: value
2024/05/09 13:22:04 Value: value
2024/05/09 13:22:04 Value: value
2024/05/09 13:22:04 Value: value
2024/05/09 13:22:04 Value: value
2024/05/09 13:22:04 Value: value
2024/05/09 13:22:04 Value: value
2024/05/09 13:22:04 Value: value
2024/05/09 13:22:04 Value: value
2024/05/09 13:22:04 Value: value
2024/05/09 13:22:04 Value: value
2024/05/09 13:22:04 Value: value
2024/05/09 13:22:04 Value: value
2024/05/09 13:22:04 Value: value
2024/05/09 13:22:04 Value: value
2024/05/09 13:22:04 Value: value
2024/05/09 13:22:04 Value: value
2024/05/09 13:22:04 Value: value
2024/05/09 13:22:04 Value: value
2024/05/09 13:22:04 Value: value
2024/05/09 13:22:04 Fetching value for key: key

This is a bit tricky because how you determine if a record has been deleted is going to vary based on your data source. It could be a status code, zero value, empty list, specific error message, etc. There is no way for the cache to figure this out implicitly.

It couldn't simply delete a record every time it receives an error. If an upstream system goes down, we want to be able to serve stale data for the duration of the TTL, while reducing the frequency of our refreshes to make it easier for them to recover.

Therefore, if a record is deleted, we'll have to explicitly inform the cache about it by returning a custom error:

fetchFn := func(_ context.Context) (string, error) {
		a.count++
		log.Printf("Fetching value for key: %s\n", key)
		if a.count == 1 {
			return "value", nil
		}
		return "", sturdyc.ErrNotFound
	}

This tell's the cache that the record is no longer available at the underlying data source. Therefore, if this record is being fetched as a background refresh, the cache will quickly see if it has a record for this key, and subsequently delete it.

If we run this application again we'll see that it works, and that we're no longer getting any cache hits. This leads to outgoing requests for every iteration:

2024/05/09 13:40:47 Fetching value for key: key
2024/05/09 13:40:47 Value: value
2024/05/09 13:40:47 Value: value
2024/05/09 13:40:47 Value: value
2024/05/09 13:40:47 Fetching value for key: key
2024/05/09 13:40:47 Failed to  retrieve the record from the cache.
2024/05/09 13:40:47 Fetching value for key: key
2024/05/09 13:40:47 Failed to  retrieve the record from the cache.
2024/05/09 13:40:47 Fetching value for key: key
2024/05/09 13:40:47 Failed to  retrieve the record from the cache.
2024/05/09 13:40:47 Fetching value for key: key
2024/05/09 13:40:47 Failed to  retrieve the record from the cache.

Please note that we only have to return the sturdyc.ErrNotFound when we're using GetOrFetch. For GetOrFetchBatch, we'll simply omit the key from the map we're returning. I think this inconsistency is a little unfortunate, but it was the best API I could come up with. Having to return an error like this if just a single ID wasn't found:

	batchFetchFn := func(_ context.Context, cacheMisses []string) (map[string]string, error) {
		response, err := myDataSource(cacheMisses)
		for _, id := range cacheMisses {
			// NOTE: Don't do this, it's just an example.
			if response[id]; !id {
                return response, sturdyc.ErrNotFound
            }
		}
		return response, nil
	}

and then have the cache swallow that error and return nil, felt much less intuitive.

The entire example is available here.

Non-existent records

In the example above, we could see that once we delete the key, the following iterations lead to a continuous stream of outgoing requests. This will happen for every ID that doesn't exist at the data source. If we can't retrieve it, we can't cache it. If we can't cache it, we can't serve it from memory. If this happens frequently, we'll experience a lot of I/O operations, which will significantly increase our system's latency.

The reasons why someone might request IDs that don't exist can vary. It could be due to a faulty CMS configuration, or perhaps it's caused by a slow ingestion process where it takes time for a new entity to propagate through a distributed system. Regardless, this will negatively impact our systems performance.

To address this issue, we can instruct the cache to mark these IDs as missing records. Missing records are refreshed at the same frequency as regular records. Hence, if an ID is continuously requested, and the upstream eventually returns a valid response, we'll see it propagate to our cache.

To illustrate, I'll make some small modifications to the code from the previous example. The only thing I'm going to change is to make the API client return a ErrNotFound for the first three requests:

type API struct {
	*sturdyc.Client[string]
	count int
}

func NewAPI(c *sturdyc.Client[string]) *API {
	return &API{c, 0}
}

func (a *API) Get(ctx context.Context, key string) (string, error) {
	fetchFn := func(_ context.Context) (string, error) {
		a.count++
		log.Printf("Fetching value for key: %s\n", key)
		if a.count > 3 {
			return "value", nil
		}
		// This error tells the cache that the data does not exist at the source.
		return "", sturdyc.ErrNotFound
	}
	return a.GetOrFetch(ctx, key, fetchFn)
}

Next, we'll just have to enable missing record storage which tells the cache that anytime it gets a ErrNotFound error it should mark the key as missing:

func main() {
	// ...

	cacheClient := sturdyc.New[string](capacity, numShards, ttl, evictionPercentage,
		sturdyc.WithEarlyRefreshes(minRefreshDelay, maxRefreshDelay, retryBaseDelay),
		sturdyc.WithMissingRecordStorage(),
	)

	api := NewAPI(cacheClient)

	// ...
	for i := 0; i < 100; i++ {
		val, err := api.Get(context.Background(), "key")
		// The cache returns ErrMissingRecord for any key that has been marked as missing.
		// You can use this to exit-early, or return some type of default state.
		if errors.Is(err, sturdyc.ErrMissingRecord) {
			log.Println("Record does not exist.")
		}
		if err == nil {
			log.Printf("Value: %s\n", val)
		}
		time.Sleep(minRefreshDelay)
	}
}

Running this program, we'll see that the record is missing during the first 3 refreshes, and then transitions into having a value:

❯ go run .
2024/05/09 21:25:28 Fetching value for key: key
2024/05/09 21:25:28 Record does not exist.
2024/05/09 21:25:28 Record does not exist.
2024/05/09 21:25:28 Record does not exist.
2024/05/09 21:25:28 Fetching value for key: key
2024/05/09 21:25:28 Record does not exist.
2024/05/09 21:25:28 Record does not exist.
2024/05/09 21:25:28 Fetching value for key: key
2024/05/09 21:25:28 Record does not exist.
2024/05/09 21:25:28 Record does not exist.
2024/05/09 21:25:28 Fetching value for key: key
2024/05/09 21:25:28 Value: value
2024/05/09 21:25:28 Value: value
2024/05/09 21:25:28 Value: value
2024/05/09 21:25:28 Fetching value for key: key
...

Please note that this functionality is implicit for GetOrFetchBatch. You simply just have to omit the key from the map:

	batchFetchFn := func(_ context.Context, cacheMisses []string) (map[string]string, error) {
		// The cache will check if every ID in cacheMisses is present in the response.
		// If it finds any IDs that are missing it will proceed to mark them as missing
		// if missing record storage is enabled.
		response, err := myDataSource(cacheMisses)
		return response, nil
	}

The entire example is available here.

Batch endpoints

One challenge with caching batchable endpoints is that you have to find a way to reduce the number of keys. To illustrate, let's say that we have 10 000 records, and an endpoint for fetching them that allows for batches of 20. The IDs for the batch are supplied as query parameters, for example, https://example.com?ids=1,2,3,4,5,...20. If we were to use this as the cache key, the way many CDNs would do, we could quickly calculate the number of keys we would generate like this:

$$ C(n, k) = \binom{n}{k} = \frac{n!}{k!(n-k)!} $$

For $n = 10,000$ and $k = 20$, this becomes:

$$ C(10,000, 20) = \binom{10,000}{20} = \frac{10,000!}{20!(10,000-20)!} $$

This results in an approximate value of:

$$ \approx 4.032 \times 10^{61} $$

and this is if we're sending perfect batches of 20. If we were to do 1 to 20 IDs (not just exactly 20 each time) the total number of combinations would be the sum of combinations for each k from 1 to 20.

At this point, we would essentially just be paying for extra RAM, as the hit rate for each key would be so low that we'd have better odds of winning the lottery.

To prevent this, sturdyc pulls the response apart and caches each record individually. This effectively prevents super-polynomial growth in the number of cache keys because the batch itself is never going to be inlcuded in the key.

To get a feeling for how this works, let's once again build a small example application. This time, we'll start with the API client:

type API struct {
	*sturdyc.Client[string]
}

func NewAPI(c *sturdyc.Client[string]) *API {
	return &API{c}
}

func (a *API) GetBatch(ctx context.Context, ids []string) (map[string]string, error) {
	// We are going to use a cache a key function that prefixes each id.
	// This makes it possible to save the same id for different data sources.
	cacheKeyFn := a.BatchKeyFn("some-prefix")

	// The fetchFn is only going to retrieve the IDs that are not in the cache.
	fetchFn := func(_ context.Context, cacheMisses []string) (map[string]string, error) {
		log.Printf("Cache miss. Fetching ids: %s\n", strings.Join(cacheMisses, ", "))
		// Batch functions should return a map where the key is the id of the record.
		response := make(map[string]string, len(cacheMisses))
		for _, id := range cacheMisses {
			response[id] = "value"
		}
		return response, nil
	}

	return a.GetOrFetchBatch(ctx, ids, cacheKeyFn, fetchFn)
}

and we're going to use the same cache configuration as the previous example, so I've omitted it for brevity:

func main() {
	// ...

	// Create a new API instance with the cache client.
	api := NewAPI(cacheClient)

	// Make an initial call to make sure that IDs 1-10 are retrieved and cached.
	log.Println("Seeding ids 1-10")
	ids := []string{"1", "2", "3", "4", "5", "6", "7", "8", "9", "10"}
	api.GetBatch(context.Background(), ids)
	log.Println("Seed completed")

	// To demonstrate that the records have been cached individually, we can continue
	// fetching a random subset of records from the original batch, plus a new
	// ID. By examining the logs, we should be able to see that the cache only
	// fetches the ID that wasn't present in the original batch, indicating that
	// the batch itself isn't part of the key.
	for i := 1; i <= 100; i++ {
		// Get N ids from the original batch.
		recordsToFetch := rand.IntN(10) + 1
		batch := make([]string, recordsToFetch)
		copy(batch, ids[:recordsToFetch])
		// Add a random ID between 1 and 100 to the batch.
		batch = append(batch, strconv.Itoa(rand.IntN(1000)+10))
		values, _ := api.GetBatch(context.Background(), batch)
		// Print the records we retrieved from the cache.
		log.Println(values)
	}
}

Running this code, we can see that we only end up fetching the randomized ID, while continuously getting cache hits for IDs 1-10, regardless of what the batch looks like:

2024/04/07 11:09:58 Seed completed
2024/04/07 11:09:58 Cache miss. Fetching ids: 173
2024/04/07 11:09:58 map[1:value 173:value 2:value 3:value 4:value]
2024/04/07 11:09:58 Cache miss. Fetching ids: 12
2024/04/07 11:09:58 map[1:value 12:value 2:value 3:value 4:value]
2024/04/07 11:09:58 Cache miss. Fetching ids: 730
2024/04/07 11:09:58 map[1:value 2:value 3:value 4:value 730:value]
2024/04/07 11:09:58 Cache miss. Fetching ids: 520
2024/04/07 11:09:58 map[1:value 2:value 3:value 4:value 5:value 520:value 6:value 7:value 8:value]
...

The entire example is available here.

Cache key permutations

If you're attempting to cache data from an upstream system, the ID alone may be insufficient to uniquely identify the record in your cache. The endpoint you're calling might accept a variety of options that transform the data in different ways.

Consider this:

curl https://movie-api/movies?ids=1,2,3&filterUpcoming=true&includeTrailers=false
curl https://movie-api/movies?ids=1,2,3&filterUpcoming=false&includeTrailers=true

The IDs might be enough to uniquely identify these records in a database. However, when you're consuming them through another system, they will probably appear completely different as transformations are applied based on the options you pass it. Hence, it's important that we store these records once for each unique option set.

The options does not have to be query parameters either. The data source you're consuming could still be a database, and the options that you want to make part of the cache key could be different types of filters.

Below is a small example application to showcase this functionality:

type OrderOptions struct {
	CarrierName        string
	LatestDeliveryTime string
}

type OrderAPI struct {
	*sturdyc.Client[string]
}

func NewOrderAPI(c *sturdyc.Client[string]) *OrderAPI {
	return &OrderAPI{c}
}

func (a *OrderAPI) OrderStatus(ctx context.Context, ids []string, opts OrderOptions) (map[string]string, error) {
	// We use the PermutedBatchKeyFn when an ID isn't enough to uniquely identify a
	// record. The cache is going to store each id once per set of options.
	cacheKeyFn := a.PermutatedBatchKeyFn("key", opts)

	// We'll create a fetchFn with a closure that captures the options. For this
	// simple example, it logs and returns the status for each order, but you could
	// just as easily have called an external API.
	fetchFn := func(_ context.Context, cacheMisses []string) (map[string]string, error) {
		log.Printf("Fetching: %v, carrier: %s, delivery time: %s\n", cacheMisses, opts.CarrierName, opts.LatestDeliveryTime)
		response := map[string]string{}
		for _, id := range cacheMisses {
			response[id] = fmt.Sprintf("Available for %s", opts.CarrierName)
		}
		return response, nil
	}
	return a.GetOrFetchBatch(ctx, ids, cacheKeyFn, fetchFn)
}

The main difference from the previous example is that we're using PermutatedBatchKeyFn instead of BatchKeyFn. Internally, the cache will use reflection to extract the names and values of every exported field in the opts struct, and then include them when it constructs the cache keys.

The struct should be flat without nesting. The fields can be time.Time values, as well as any basic types, pointers to these types, and slices containing them.

Now, let's try to use this client:

func main() {
	// ...

	// Create a new cache client with the specified configuration.
	cacheClient := sturdyc.New[string](capacity, numShards, ttl, evictionPercentage,
		sturdyc.WithEarlyRefreshes(minRefreshDelay, maxRefreshDelay, retryBaseDelay),
	)

	// We will fetch these IDs using three different option sets.
	ids := []string{"id1", "id2", "id3"}
	optionSetOne := OrderOptions{CarrierName: "FEDEX", LatestDeliveryTime: "2024-04-06"}
	optionSetTwo := OrderOptions{CarrierName: "DHL", LatestDeliveryTime: "2024-04-07"}
	optionSetThree := OrderOptions{CarrierName: "UPS", LatestDeliveryTime: "2024-04-08"}

	orderClient := NewOrderAPI(cacheClient)
	ctx := context.Background()

	// Next, we'll call the orderClient to make sure that we've retrieved and cached
	// these IDs for all of our option sets.
	log.Println("Filling the cache with all IDs for all option sets")
	orderClient.OrderStatus(ctx, ids, optionSetOne)
	orderClient.OrderStatus(ctx, ids, optionSetTwo)
	orderClient.OrderStatus(ctx, ids, optionSetThree)
	log.Println("Cache filled")
}

At this point, the cache has stored each record individually for each option set. We can imagine that the keys would look something like this:

FEDEX-2024-04-06-id1
DHL-2024-04-07-id1
UPS-2024-04-08-id1
etc..

Next, we'll add a sleep to make sure that all of the records are due for a refresh, and then request the ids individually for each set of options:

func main() {
	// ...

	// Sleep to make sure that all records are due for a refresh.
	time.Sleep(maxRefreshDelay + 1)

	// Fetch each id for each option set.
	for i := 0; i < len(ids); i++ {
		// NOTE: We're using the same ID for these requests.
		orderClient.OrderStatus(ctx, []string{ids[i]}, optionSetOne)
		orderClient.OrderStatus(ctx, []string{ids[i]}, optionSetTwo)
		orderClient.OrderStatus(ctx, []string{ids[i]}, optionSetThree)
	}

	// Sleep for a second to allow the refresh logs to print.
	time.Sleep(time.Second)
}

Running this program, we can see that the records are refreshed once per unique id+option combination:

go run .
2024/04/07 13:33:56 Filling the cache with all IDs for all option sets
2024/04/07 13:33:56 Fetching: [id1 id2 id3], carrier: FEDEX, delivery time: 2024-04-06
2024/04/07 13:33:56 Fetching: [id1 id2 id3], carrier: DHL, delivery time: 2024-04-07
2024/04/07 13:33:56 Fetching: [id1 id2 id3], carrier: UPS, delivery time: 2024-04-08
2024/04/07 13:33:56 Cache filled
2024/04/07 13:33:58 Fetching: [id1], carrier: FEDEX, delivery time: 2024-04-06
2024/04/07 13:33:58 Fetching: [id1], carrier: UPS, delivery time: 2024-04-08
2024/04/07 13:33:58 Fetching: [id1], carrier: DHL, delivery time: 2024-04-07
2024/04/07 13:33:58 Fetching: [id2], carrier: UPS, delivery time: 2024-04-08
2024/04/07 13:33:58 Fetching: [id2], carrier: FEDEX, delivery time: 2024-04-06
2024/04/07 13:33:58 Fetching: [id2], carrier: DHL, delivery time: 2024-04-07
2024/04/07 13:33:58 Fetching: [id3], carrier: FEDEX, delivery time: 2024-04-06
2024/04/07 13:33:58 Fetching: [id3], carrier: UPS, delivery time: 2024-04-08
2024/04/07 13:33:58 Fetching: [id3], carrier: DHL, delivery time: 2024-04-07

The entire example is available here.

Refresh coalescing

As seen in the example above, we're storing the records once for every set of options. However, we're not really utilizing the fact that the endpoint is batchable when we're performing the refreshes.

To make this more efficient, we can enable the refresh coalescing functionality. Internally, the cache is going to create a buffer for every cache key permutation. It is then going to collect ids until it reaches a certain size, or exceeds a time-based threshold.

The only change we have to make to the previous example is to enable this feature:

func main() {
	// ...

	// With refresh coalescing enabled, the cache will buffer refreshes
	// until the batch size is reached or the buffer timeout is hit.
	batchSize := 3
	batchBufferTimeout := time.Second * 30

	// Create a new cache client with the specified configuration.
	cacheClient := sturdyc.New[string](capacity, numShards, ttl, evictionPercentage,
		sturdyc.WithEarlyRefreshes(minRefreshDelay, maxRefreshDelay, retryBaseDelay),
		sturdyc.WithRefreshCoalescing(batchSize, batchBufferTimeout),
	)

	// ...
}

and now we can see that the cache performs the refreshes in batches per permutation of our query params:

go run .
2024/04/07 13:45:42 Filling the cache with all IDs for all option sets
2024/04/07 13:45:42 Fetching: [id1 id2 id3], carrier: FEDEX, delivery time: 2024-04-06
2024/04/07 13:45:42 Fetching: [id1 id2 id3], carrier: DHL, delivery time: 2024-04-07
2024/04/07 13:45:42 Fetching: [id1 id2 id3], carrier: UPS, delivery time: 2024-04-08
2024/04/07 13:45:42 Cache filled
2024/04/07 13:45:44 Fetching: [id1 id2 id3], carrier: FEDEX, delivery time: 2024-04-06
2024/04/07 13:45:44 Fetching: [id1 id3 id2], carrier: DHL, delivery time: 2024-04-07
2024/04/07 13:45:44 Fetching: [id1 id2 id3], carrier: UPS, delivery time: 2024-04-08

The number of outgoing requests for the refreshes went from 9 to 3. Imagine what a batch size of 50 would do for your applications performance!

The entire example is available here.

Passthrough

There are times when you want to always retrieve the latest data from the source and only use the in-memory cache as a fallback. In such scenarios, you can use the Passthrough and PassthroughBatch functions. The cache will still perform in-flight request tracking and deduplicate your requests.

Distributed storage

I think it's important to read the previous sections before jumping here in order to understand all the heavy lifting sturdyc does when it comes to creating cache keys, tracking in-flight requests, refreshing records in the background to improve latency, and buffering/coalescing requests to minimize the number of round trips to underlying data sources.

Adding distributed storage to the cache is, from the package's point of view, essentially just another data source with a higher priority. Hence, we're still able to take great advantage of all the features we've seen so far, and these efficiency gains will hopefully allow you to use a much cheaper cluster.

Slightly simplified, we can think of the cache's interaction with the distributed storage like this:

// NOTE: This is an example. The cache has this functionality internally.
func (o *OrderAPI) OrderStatus(ctx context.Context, id string) (string, error) {
	cacheKey := "order-status-" + id
	fetchFn := func(ctx context.Context) (string, error) {
		// Check redis cache first.
		if orderStatus, ok := o.redisClient.Get(cacheKey); ok {
			return orderStatus, nil
		}

		// Fetch the order status from the underlying data source.
		var response OrderStatusResponse
		err := requests.URL(o.baseURL).
			Param("id", id).
			ToJSON(&response).
			Fetch(ctx)
		if err != nil {
			return "", err
		}

		// Add the order status to the redis cache.
		go func() { o.RedisClient.Set(cacheKey, response.OrderStatus, time.Hour) }()

		return response.OrderStatus, nil
	}

	return o.GetOrFetch(ctx, id, fetchFn)
}

Syncing the keys and values to a distributed storage like this can be highly beneficial, especially when we're deploying new containers where the in-memory cache will be empty, as it prevents sudden bursts of traffic to the underlying data sources.

Keeping the in-memory caches in sync with a distributed storage requires a bit more work though. sturdyc has therefore been designed to work with an abstraction that could represent any key-value store of your choosing, all you have to do is implement this interface:

type DistributedStorage interface {
	Get(ctx context.Context, key string) ([]byte, bool)
	Set(ctx context.Context, key string, value []byte)
	GetBatch(ctx context.Context, keys []string) map[string][]byte
	SetBatch(ctx context.Context, records map[string][]byte)
}

and then pass it to the WithDistributedStorage option when you create your cache client:

cacheClient := sturdyc.New[string](capacity, numShards, ttl, evictionPercentage,
	sturdyc.WithDistributedStorage(storage),
)

Please note that you are responsible for configuring the TTL and eviction policies of this storage. sturdyc will only make sure that it's being kept up-to-date with the data it has in-memory.

I've included an example to showcase this functionality here.

When running that application, you should see output that looks something like this:

❯ go run .
2024/06/07 10:32:56 Getting key shipping-options-1234-asc from the distributed storage
2024/06/07 10:32:56 Fetching shipping options from the underlying data source
2024/06/07 10:32:56 The shipping options were retrieved successfully!
2024/06/07 10:32:56 Writing key shipping-options-1234-asc to the distributed storage
2024/06/07 10:32:56 The shipping options were retrieved successfully!
2024/06/07 10:32:57 The shipping options were retrieved successfully!
2024/06/07 10:32:57 Getting key shipping-options-1234-asc from the distributed storage
2024/06/07 10:32:57 The shipping options were retrieved successfully!
2024/06/07 10:32:57 Getting key shipping-options-1234-asc from the distributed storage
2024/06/07 10:32:57 The shipping options were retrieved successfully!
2024/06/07 10:32:57 The shipping options were retrieved successfully!
2024/06/07 10:32:57 Getting key shipping-options-1234-asc from the distributed storage
2024/06/07 10:32:58 The shipping options were retrieved successfully!
2024/06/07 10:32:58 The shipping options were retrieved successfully!
2024/06/07 10:32:58 Getting key shipping-options-1234-asc from the distributed storage
2024/06/07 10:32:58 The shipping options were retrieved successfully!
2024/06/07 10:32:58 Getting key shipping-options-1234-asc from the distributed storage
2024/06/07 10:32:58 The shipping options were retrieved successfully!

Above we can see that the underlying data source was only visited once, and the in-memory cache performed a background refresh from the distributed storage every 2 to 3 retrievals to ensure that it's being kept up-to-date.

This sequence of events will repeat once the TTL expires.

Distributed storage early refreshes

Similar to the in-memory cache, we're also able to use a distributed storage where the data is refreshed before the TTL expires.

This would also allow us to serve stale data if an upstream was to experience any downtime:

cacheClient := sturdyc.New[string](capacity, numShards, ttl, evictionPercentage,
	sturdyc.WithDistributedStorageEarlyRefreshes(storage, time.Minute),
)

With the configuration above, we're essentially saying that we'd prefer if the data was refreshed once it's more than a minute old. However, if you're writing records with a 60 minute TTL, the cache will continously fallback to these if the refreshes were to fail, so the interaction with the distributed storage would look something like this:

  • Start by trying to retrieve the key from the distributeted storage. If the data is fresh, it's returned immediately and written to the in-memory cache.
  • If the key was found in the distributed storage, but wasn't fresh enough, we'll visit the underlying data source, and then write the response to both the distributed cache and the one we have in-memory.
  • If the call to refresh the data failed, the cache will use the value from the distributed storage as a fallback.

However, there is one more scenario we must cover that requires two additional methods to be implemented:

type DistributedStorageEarlyRefreshes interface {
	DistributedStorage
	Delete(ctx context.Context, key string)
	DeleteBatch(ctx context.Context, keys []string)
}

These delete methods will be called when a refresh occurs, and the cache notices that it can no longer find the key at the underlying data source. This indicates that the key has been deleted, and we will want this change to propagate to the distributed key-value store

Please note that you are still responsible for setting the TTL and eviction policies for the distributed store. The cache will only invoke the delete methods when a record has gone missing from the underlying data source. If you're using missing record storage, it will write the key as a missing record instead.

I've included an example to showcase this functionality here.

Custom metrics

The cache can be configured to report custom metrics for:

  • Size of the cache
  • Cache hits
  • Cache misses
  • Evictions
  • Forced evictions
  • The number of entries evicted
  • Shard distribution
  • The size of the refresh buckets

There are also distributed metrics if you're using the cache with a distributed storage, which adds the following metrics in addition to what we've seen above:

  • Distributed cache hits
  • Distributed cache misses
  • Distributed stale fallback

All you have to do is implement one of these interfaces:

type MetricsRecorder interface {
	CacheMiss()
	Eviction()
	ForcedEviction()
	EntriesEvicted(int)
	ShardIndex(int)
	CacheBatchRefreshSize(size int)
	ObserveCacheSize(callback func() int)
}

type DistributedMetricsRecorder interface {
	MetricsRecorder
	DistributedCacheHit()
	DistributedCacheMiss()
	DistributedFallback()
}

and pass it as an option when you create the client:

cacheBasicMetrics := sturdyc.New[any](
	cacheSize,
	shardSize,
	cacheTTL,
	evictWhenFullPercentage,
	sturdyc.WithMetrics(metricsRecorder),
)

cacheDistributedMetrics := sturdyc.New[any](
	cacheSize,
	shardSize,
	cacheTTL,
	evictWhenFullPercentage,
	sturdyc.WithDistributedStorage(metricsRecorder),
	sturdyc.WithDistributedMetrics(metricsRecorder),
)

Below are a few images where these metrics have been visualized in Grafana:

Screenshot 2024-05-04 at 12 36 43 Here we can how often we're able to serve from memory. Screenshot 2024-05-04 at 12 37 39 This image displays the number of items we have cached. Screenshot 2024-05-04 at 12 38 04 This chart shows the batch sizes for the buffered refreshes. Screenshot 2024-05-04 at 12 38 20 And lastly, we can see the average batch size of our refreshes for two different data sources.

You are also able to visualize evictions, forced evictions which occur when the cache has reached its capacity, as well as the distribution between the shards.

Generics

Personally, I tend to create caches based on how frequently the data needs to be refreshed rather than what type of data it stores. I'll often have one transient cache which refreshes the data every 2-5 milliseconds, and another cache where I'm fine if the data is up to a minute old.

Hence, I don't want to tie the cache to any specific type so I'll often just use any:

	cacheClient := sturdyc.New[any](capacity, numShards, ttl, evictionPercentage,
		sturdyc.WithEarlyRefreshes(minRefreshDelay, maxRefreshDelay, retryBaseDelay),
		sturdyc.WithRefreshCoalescing(10, time.Second*15),
	)

However, having all client methods return any can quickly add a lot of boilerplate if you're storing more than a handful of types, and need to make type assertions.

If you want to avoid this, you can use any of the package level exports:

They will take the cache, call the function for you, and perform the type conversions internally. If the type conversions were to fail, you'll get a ErrInvalidType error.

Below is an example of what an API client that uses these functions could look like:

type OrderAPI struct {
	cacheClient *sturdyc.Client[any]
}

func NewOrderAPI(c *sturdyc.Client[any]) *OrderAPI {
	return &OrderAPI{cacheClient: c}
}

func (a *OrderAPI) OrderStatus(ctx context.Context, ids []string) (map[string]string, error) {
	cacheKeyFn := a.cacheClient.BatchKeyFn("order-status")
	fetchFn := func(_ context.Context, cacheMisses []string) (map[string]string, error) {
		response := make(map[string]string, len(ids))
		for _, id := range cacheMisses {
			response[id] = "Order status: pending"
		}
		return response, nil
	}
	return sturdyc.GetOrFetchBatch(ctx, a.cacheClient, ids, cacheKeyFn, fetchFn)
}

func (a *OrderAPI) DeliveryTime(ctx context.Context, ids []string) (map[string]time.Time, error) {
	cacheKeyFn := a.cacheClient.BatchKeyFn("delivery-time")
	fetchFn := func(_ context.Context, cacheMisses []string) (map[string]time.Time, error) {
		response := make(map[string]time.Time, len(ids))
		for _, id := range cacheMisses {
			response[id] = time.Now()
		}
		return response, nil
	}
	return sturdyc.GetOrFetchBatch(ctx, a.cacheClient, ids, cacheKeyFn, fetchFn)
}

The entire example is available here.

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (

	// ErrNotFound should be returned from a FetchFn to indicate that a record is
	// missing at the underlying data source. This helps the cache to determine
	// if a record should be deleted or stored as a missing record if you have
	// that functionality enabled. Missing records are refreshed like any other
	// record, and if your FetchFn returns a value for it, the record will no
	// longer be considered missing. Please note that this only applies to
	// client.GetOrFetch and client.Passthrough. For client.GetOrFetchBatch and
	// client.PassthroughBatch, this works implicitly if you return
	// a map without the ID, and have store missing records enabled.
	ErrNotFound = errors.New("sturdyc: err not found")
	// ErrMissingRecord is returned by client.GetOrFetch and client.Passthrough when a record has been marked
	// as missing. The cache will still try to refresh the record in the background if it's being requested.
	ErrMissingRecord = errors.New("sturdyc: the record has been marked as missing in the cache")
	// ErrOnlyCachedRecords is returned by client.GetOrFetchBatch and client.PassthroughBatch
	// when some of the requested records are available in the cache, but the attempt to
	// fetch the remaining records failed. As the consumer, you can then decide whether to
	// proceed with the cached records or if the entire batch is necessary.
	ErrOnlyCachedRecords = errors.New("sturdyc: failed to fetch the records that were not in the cache")
	// ErrInvalidType is returned when you try to use one of the generic
	// package level functions but the type assertion fails.
	ErrInvalidType = errors.New("sturdyc: invalid response type")
)

Functions

func FindCutoff

func FindCutoff(times []time.Time, percentile float64) time.Time

FindCutoff returns the time that is the k-th smallest time in the slice.

func GetOrFetch

func GetOrFetch[V, T any](ctx context.Context, c *Client[T], key string, fetchFn FetchFn[V]) (V, error)

GetOrFetch is a convenience function that performs type assertion on the result of client.GetOrFetch.

Parameters:

ctx - The context to be used for the request.
c - The cache client.
key - The key to be fetched.
fetchFn - Used to retrieve the data from the underlying data source if the key is not found in the cache.

Returns:

The value corresponding to the key and an error if one occurred.

Type Parameters:

V - The type returned by the fetchFn. Must be assignable to T.
T - The type stored in the cache.

func GetOrFetchBatch

func GetOrFetchBatch[V, T any](ctx context.Context, c *Client[T], ids []string, keyFn KeyFn, fetchFn BatchFetchFn[V]) (map[string]V, error)

func Passthrough

func Passthrough[T, V any](ctx context.Context, c *Client[T], key string, fetchFn FetchFn[V]) (V, error)

Passthrough is a convenience function that performs type assertion on the result of client.PassthroughBatch.

Parameters:

ctx - The context to be used for the request.
c - The cache client.
key - The key to be fetched.
fetchFn - Used to retrieve the data from the underlying data source.

Returns:

The value and an error if one occurred and the key was not found in the cache.

Type Parameters:

V - The type returned by the fetchFn. Must be assignable to T.
T - The type stored in the cache.

func PassthroughBatch

func PassthroughBatch[V, T any](ctx context.Context, c *Client[T], ids []string, keyFn KeyFn, fetchFn BatchFetchFn[V]) (map[string]V, error)

PassthroughBatch is a convenience function that performs type assertion on the result of client.PassthroughBatch.

Parameters:

ctx - The context to be used for the request.
c - The cache client.
ids - The list of IDs to be fetched.
keyFn - Used to prefix each ID in order to create a unique cache key.
fetchFn - Used to retrieve the data from the underlying data source.

Returns:

A map of ids to their corresponding values and an error if one occurred.

Type Parameters:

V - The type returned by the fetchFn. Must be assignable to T.
T - The type stored in the cache.

Types

type BatchFetchFn

type BatchFetchFn[T any] func(ctx context.Context, ids []string) (map[string]T, error)

BatchFetchFn represents a function that can be used to fetch multiple records from a data source.

type BatchResponse

type BatchResponse[T any] map[string]T

type Client

type Client[T any] struct {
	*Config
	// contains filtered or unexported fields
}

Client represents a cache client that can be used to store and retrieve values.

func New

func New[T any](capacity, numShards int, ttl time.Duration, evictionPercentage int, opts ...Option) *Client[T]

New creates a new Client instance with the specified configuration.

`capacity` defines the maximum number of entries that the cache can store.
`numShards` Is used to set the number of shards. Has to be greater than 0.
`ttl` Sets the time to live for each entry in the cache. Has to be greater than 0.
`evictionPercentage` Percentage of items to evict when the cache exceeds its capacity.
`opts` allows for additional configurations to be applied to the cache client.

func (*Client[T]) BatchKeyFn

func (c *Client[T]) BatchKeyFn(prefix string) KeyFn

BatchKeyFn provides a function that can be used in conjunction with "GetOrFetchBatch". It takes in a prefix and returns a function that will append the ID as a suffix for each item.

Parameters:

prefix - The prefix to be used for each cache key.

Returns:

A function that takes an ID and returns a cache key string with the given prefix and ID.

func (*Client[T]) Delete

func (c *Client[T]) Delete(key string)

Delete removes a single entry from the cache.

Parameters:

key: The key of the entry to be removed.

func (*Client[T]) Get

func (c *Client[T]) Get(key string) (T, bool)

Get retrieves a single value from the cache.

Parameters:

key - The key to be retrieved.

Returns:

The value corresponding to the key and a boolean indicating if the value was found.

func (*Client[T]) GetMany

func (c *Client[T]) GetMany(keys []string) map[string]T

GetMany retrieves multiple values from the cache.

Parameters:

keys - The list of keys to be retrieved.

Returns:

A map of keys to their corresponding values.

func (*Client[T]) GetManyKeyFn

func (c *Client[T]) GetManyKeyFn(ids []string, keyFn KeyFn) map[string]T

GetManyKeyFn follows the same API as GetOrFetchBatch and PassthroughBatch. You provide it with a slice of IDs and a keyFn, which is applied to create the cache key. The returned map uses the IDs as keys instead of the cache key. If you've used ScanKeys to retrieve the actual keys, you can retrieve the records using GetMany instead.

Parameters:

ids - The list of IDs to be retrieved.
keyFn - A function that generates the cache key for each ID.

Returns:

A map of IDs to their corresponding values.

func (*Client[T]) GetOrFetch

func (c *Client[T]) GetOrFetch(ctx context.Context, key string, fetchFn FetchFn[T]) (T, error)

GetOrFetch attempts to retrieve the specified key from the cache. If the value is absent, it invokes the fetchFn function to obtain it and then stores the result. Additionally, when background refreshes are enabled, GetOrFetch determines if the record needs refreshing and, if necessary, schedules this task for background execution.

Parameters:

ctx - The context to be used for the request.
key - The key to be fetched.
fetchFn - Used to retrieve the data from the underlying data source if the key is not found in the cache.

Returns:

The value corresponding to the key and an error if one occurred.

func (*Client[T]) GetOrFetchBatch

func (c *Client[T]) GetOrFetchBatch(ctx context.Context, ids []string, keyFn KeyFn, fetchFn BatchFetchFn[T]) (map[string]T, error)

GetOrFetchBatch attempts to retrieve the specified ids from the cache. If any of the values are absent, it invokes the fetchFn function to obtain them and then stores the result. Additionally, when background refreshes are enabled, GetOrFetch determines if any of the records need refreshing and, if necessary, schedules this to be performed in the background.

Parameters:

ctx - The context to be used for the request.
ids - The list of IDs to be fetched.
keyFn - Used to generate the cache key for each ID.
fetchFn - Used to retrieve the data from the underlying data source if any IDs are not found in the cache.

Returns:

A map of IDs to their corresponding values and an error if one occurred.

func (*Client[T]) NumKeysInflight

func (c *Client[T]) NumKeysInflight() int

NumKeysInflight returns the number of keys that are currently being fetched.

Returns:

An integer representing the total number of keys that are currently being fetched.

func (*Client[T]) Passthrough

func (c *Client[T]) Passthrough(ctx context.Context, key string, fetchFn FetchFn[T]) (T, error)

Passthrough attempts to retrieve the latest data by calling the provided fetchFn. If fetchFn encounters an error, the cache is used as a fallback.

Parameters:

ctx - The context to be used for the request.
key - The key to be fetched.
fetchFn - Used to retrieve the data from the underlying data source.

Returns:

The value and an error if one occurred and the key was not found in the cache.

func (*Client[T]) PassthroughBatch

func (c *Client[T]) PassthroughBatch(ctx context.Context, ids []string, keyFn KeyFn, fetchFn BatchFetchFn[T]) (map[string]T, error)

PassthroughBatch attempts to retrieve the latest data by calling the provided fetchFn. If fetchFn encounters an error, the cache is used as a fallback.

Parameters:

ctx - The context to be used for the request.
ids - The list of IDs to be fetched.
keyFn - Used to prefix each ID in order to create a unique cache key.
fetchFn - Used to retrieve the data from the underlying data source.

Returns:

A map of IDs to their corresponding values, and an error if one occurred and
none of the IDs were found in the cache.

func (*Client[T]) PermutatedBatchKeyFn

func (c *Client[T]) PermutatedBatchKeyFn(prefix string, permutationStruct interface{}) KeyFn

PermutatedBatchKeyFn provides a function that can be used in conjunction with GetOrFetchBatch. It takes a prefix and a struct where the fields are concatenated with the ID in order to make a unique cache key. Passing anything but a struct for "permutationStruct" will result in a panic. The cache will only use the EXPORTED fields of the struct to construct the key. The permutation struct should be FLAT, with no nested structs. The fields can be any of the basic types, as well as slices and time.Time values.

Parameters:

prefix - The prefix for the cache key.
permutationStruct - A struct whose fields are concatenated to form a unique cache key. Only exported fields are used.

Returns:

A function that takes an ID and returns a cache key string with the given prefix, permutation struct fields, and ID.

func (*Client[T]) PermutatedKey

func (c *Client[T]) PermutatedKey(prefix string, permutationStruct interface{}) string

PermutatedKey takes a prefix and a struct where the fields are concatenated in order to create a unique cache key. Passing anything but a struct for "permutationStruct" will result in a panic. The cache will only use the EXPORTED fields of the struct to construct the key. The permutation struct should be FLAT, with no nested structs. The fields can be any of the basic types, as well as slices and time.Time values.

Parameters:

prefix - The prefix for the cache key.
permutationStruct - A struct whose fields are concatenated to form a unique cache key.
Only exported fields are used.

Returns:

A string to be used as the cache key.

func (*Client[T]) ScanKeys

func (c *Client[T]) ScanKeys() []string

ScanKeys returns a list of all keys in the cache.

Returns:

A slice of strings representing all the keys in the cache.

func (*Client[T]) Set

func (c *Client[T]) Set(key string, value T) bool

Set writes a single value to the cache.

Parameters:

key - The key to be set.
value - The value to be associated with the key.

Returns:

A boolean indicating if the set operation triggered an eviction.

func (*Client[T]) SetMany

func (c *Client[T]) SetMany(records map[string]T) bool

SetMany writes a map of key-value pairs to the cache.

Parameters:

records - A map of keys to values to be set in the cache.

Returns:

A boolean indicating if any of the set operations triggered an eviction.

func (*Client[T]) SetManyKeyFn

func (c *Client[T]) SetManyKeyFn(records map[string]T, cacheKeyFn KeyFn) bool

SetManyKeyFn follows the same API as GetOrFetchBatch and PassthroughBatch. It takes a map of records where the keyFn is applied to each key in the map before it's stored in the cache.

Parameters:

records - A map of IDs to values to be set in the cache.
cacheKeyFn - A function that generates the cache key for each ID.

Returns:

A boolean indicating if any of the set operations triggered an eviction.

func (*Client[T]) Size

func (c *Client[T]) Size() int

Size returns the number of entries in the cache.

Returns:

An integer representing the total number of entries in the cache.

func (*Client[T]) StoreMissingRecord

func (c *Client[T]) StoreMissingRecord(key string) bool

StoreMissingRecord writes a single value to the cache. Returns true if it triggered an eviction.

type Clock

type Clock interface {
	Now() time.Time
	NewTicker(d time.Duration) (<-chan time.Time, func())
	NewTimer(d time.Duration) (<-chan time.Time, func() bool)
	Since(t time.Time) time.Duration
}

Clock is an abstraction for time.Time package that allows for testing.

type Config

type Config struct {
	// contains filtered or unexported fields
}

Config represents the configuration that can be applied to the cache.

type DistributedMetricsRecorder

type DistributedMetricsRecorder interface {
	MetricsRecorder
	// DistributedCacheHit is called for every key that results in a cache hit.
	DistributedCacheHit()
	// DistributedCacheMiss is called for every key that results in a cache miss.
	DistributedCacheMiss()
	// DistributedRefresh is called when we retrieve a record from
	// the distributed storage that should be refreshed.
	DistributedRefresh()
	// DistributedMissingRecord is called when we retrieve a record from the
	// distributed storage that has been marked as a missing record.
	DistributedMissingRecord()
	// DistributedFallback is called when you are using a distributed storage
	// with early refreshes, and the call for a value was supposed to refresh it,
	// but the call failed. When that happens, the cache fallbacks to the latest
	// value from the distributed storage.
	DistributedFallback()
}

type DistributedStorage

type DistributedStorage interface {
	Get(ctx context.Context, key string) ([]byte, bool)
	Set(ctx context.Context, key string, value []byte)
	GetBatch(ctx context.Context, keys []string) map[string][]byte
	SetBatch(ctx context.Context, records map[string][]byte)
}

DistributedStorage is an abstraction that the cache interacts with in order to keep the distributed storage and in-memory cache in sync. Please note that you are responsible for setting the TTL and eviction policy of this storage.

type DistributedStorageWithDeletions

type DistributedStorageWithDeletions interface {
	DistributedStorage
	Delete(ctx context.Context, key string)
	DeleteBatch(ctx context.Context, keys []string)
}

DistributedStorageWithDeletions is an abstraction that the cache interacts with when you want to use a distributed storage with early refreshes. Please note that you are responsible for setting the TTL and eviction policy of this storage. The cache will only call the delete functions when it performs a refresh and notices that the record has been deleted at the underlying data source.

type FetchFn

type FetchFn[T any] func(ctx context.Context) (T, error)

FetchFn Fetch represents a function that can be used to fetch a single record from a data source.

type KeyFn

type KeyFn func(id string) string

KeyFn is called invoked for each record that a batch fetch operation returns. It is used to create unique cache keys.

type Logger

type Logger interface {
	Warn(msg string, args ...any)
	Error(msg string, args ...any)
}

type MetricsRecorder

type MetricsRecorder interface {
	// CacheHit is called for every key that results in a cache hit.
	CacheHit()
	// CacheMiss is called for every key that results in a cache miss.
	CacheMiss()
	// Refresh is called when a get operation results in a refresh.
	Refresh()
	// MissingRecord is called every time the cache is asked to
	// look up a key which has been marked as missing.
	MissingRecord()
	// ForcedEviction is called when the cache reaches its capacity, and has to
	// evict keys in order to write a new one.
	ForcedEviction()
	// EntriesEvicted is called when the cache evicts keys from a shard.
	EntriesEvicted(int)
	// ShardIndex is called to report which shard it was that performed an operation.
	ShardIndex(int)
	// CacheBatchRefreshSize is called to report the size of the batch refresh.
	CacheBatchRefreshSize(size int)
	// ObserveCacheSize is called to report the size of the cache.
	ObserveCacheSize(callback func() int)
}

type NoopLogger

type NoopLogger struct{}

func (*NoopLogger) Error

func (l *NoopLogger) Error(_ string, _ ...any)

func (*NoopLogger) Warn

func (l *NoopLogger) Warn(_ string, _ ...any)

type Option

type Option func(*Config)

func WithClock

func WithClock(clock Clock) Option

WithClock can be used to change the clock that the cache uses. This is useful for testing.

func WithDistributedMetrics

func WithDistributedMetrics(metricsRecorder DistributedMetricsRecorder) Option

WithDistributedMetrics instructs the cache to report additional metrics regarding its interaction with the distributed storage.

func WithDistributedStorage

func WithDistributedStorage(storage DistributedStorage) Option

WithDistributedStorage allows you to use the cache with a distributed key-value store. The "GetOrFetch" and "GetOrFetchBatch" functions will check this store first and only proceed to the underlying data source if the key is missing. When a record is retrieved from the underlying data source, it is written both to memory and to the distributed storage. You are responsible for setting TTL and eviction policies for the distributed storage. Sturdyc will only read and write records.

func WithDistributedStorageEarlyRefreshes

func WithDistributedStorageEarlyRefreshes(storage DistributedStorageWithDeletions, refreshAfter time.Duration) Option

WithDistributedStorageEarlyRefreshes is the distributed equivalent of the "WithEarlyRefreshes" option. It allows distributed records to be refreshed before their TTL expires. If a refresh fails, the cache will fall back to what was returned by the distributed storage. This ensures that data can be served for the duration of the TTL even if an upstream system goes down. To use this functionality, you need to implement an interface with two additional methods for deleting records compared to the simpler "WithDistributedStorage" option. This is because a distributed cache that is used with this option might have low refresh durations but high TTLs. If a record is deleted from the underlying data source, it needs to be propagated to the distributed storage before the TTL expires. However, please note that you are still responsible for managing the TTL and eviction policies for the distributed storage. Sturdyc will only delete records that have been removed at the underlying data source.

func WithEarlyRefreshes

func WithEarlyRefreshes(minRefreshTime, maxRefreshTime, retryBaseDelay time.Duration) Option

WithEarlyRefreshes instructs the cache to refresh the keys that are in active rotation, thereby preventing them from ever expiring. This can have a significant impact on your application's latency as you're able to continuously serve frequently used keys from memory. The background refresh gets scheduled when the key is requested again after a random time between minRefreshTime and maxRefreshTime. This is an important distinction because it means that the cache won't just naively refresh every key it's ever seen.

func WithEvictionInterval

func WithEvictionInterval(interval time.Duration) Option

WithEvictionInterval sets the interval at which the cache scans a shard to evict expired entries. Setting this to a higher value will increase cache performance and is advised if you don't think you'll exceed the capacity. If the capacity is reached, the cache will still trigger an eviction.

func WithLog

func WithLog(log Logger) Option

WithLog allows you to set a custom logger for the cache. The cache isn't chatty, and will only log warnings and errors that would be a nightmare to debug. If you absolutely don't want any logs, you can pass in the sturdyc.NoopLogger.

func WithMetrics

func WithMetrics(recorder MetricsRecorder) Option

WithMetrics is used to make the cache report metrics.

func WithMissingRecordStorage

func WithMissingRecordStorage() Option

WithMissingRecordStorage allows the cache to mark keys as missing from the underlying data source. This allows you to stop streams of outgoing requests for requests that don't exist. The keys will still have the same TTL and refresh durations as any of the other record in the cache.

func WithNoContinuousEvictions

func WithNoContinuousEvictions() Option

WithNoContinuousEvictions improves cache performance when the cache capacity is unlikely to be exceeded. While this setting disables the continuous eviction job, it still allows for the eviction of the least recently used items once the cache reaches its full capacity.

func WithRefreshCoalescing

func WithRefreshCoalescing(bufferSize int, bufferDuration time.Duration) Option

WithRefreshCoalescing will make the cache refresh data from batchable endpoints more efficiently. It is going to create a buffer for each cache key permutation, and gather IDs until the bufferSize is reached, or the bufferDuration has passed.

NOTE: This requires the WithEarlyRefreshes functionality to be enabled.

func WithRelativeTimeKeyFormat

func WithRelativeTimeKeyFormat(truncation time.Duration) Option

WithRelativeTimeKeyFormat allows you to control the truncation of time.Time values that are being passed in to the cache key functions.

type RealClock

type RealClock struct{}

RealClock provides functions that wraps the real time.Time package.

func NewClock

func NewClock() *RealClock

NewClock returns a new RealClock.

func (*RealClock) NewTicker

func (c *RealClock) NewTicker(d time.Duration) (<-chan time.Time, func())

NewTicker returns the channel and stop function from the ticker from the standard library.

func (*RealClock) NewTimer

func (c *RealClock) NewTimer(d time.Duration) (<-chan time.Time, func() bool)

NewTimer returns the channel and stop function from the timer from the standard library.

func (*RealClock) Now

func (c *RealClock) Now() time.Time

Now wraps time.Now() from the standard library.

func (*RealClock) Since

func (c *RealClock) Since(t time.Time) time.Duration

Since wraps time.Since() from the standard library.

type TestClock

type TestClock struct {
	// contains filtered or unexported fields
}

TestClock is a clock that satisfies the Clock interface. It should only be used for testing.

func NewTestClock

func NewTestClock(time time.Time) *TestClock

NewTestClock returns a new TestClock with the specified time.

func (*TestClock) Add

func (c *TestClock) Add(d time.Duration)

Add adds the duration to the internal time of the test clock and triggers any timers or tickers that should fire.

func (*TestClock) NewTicker

func (c *TestClock) NewTicker(d time.Duration) (<-chan time.Time, func())

NewTicker creates a new ticker that will fire every time the internal clock advances by the specified duration.

func (*TestClock) NewTimer

func (c *TestClock) NewTimer(d time.Duration) (<-chan time.Time, func() bool)

NewTimer creates a new timer that will fire once the internal time of the clock has been advanced passed the specified duration.

func (*TestClock) Now

func (c *TestClock) Now() time.Time

Now returns the internal time of the test clock.

func (*TestClock) Set

func (c *TestClock) Set(t time.Time)

Set sets the internal time of the test clock and triggers any timers or tickers that should fire.

func (*TestClock) Since

func (c *TestClock) Since(t time.Time) time.Duration

Since returns the duration between the internal time of the clock and the specified time.

Directories

Path Synopsis
examples
basic command
batch command
buffering command
distribution command
generics command
missing command
permutations command
refreshes command

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL