vkutil

package module
v0.20.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 1, 2025 License: MIT Imports: 11 Imported by: 1

README

🧰 vkutil Build Status codecov Go Report Card

A go library of Valkey utilities built on the redigo client library.

[!IMPORTANT] Because this library is built on redigo it doesn't support cluster mode. However care has been taken to ensure this is possible in future by 1) not dynamically constructing keys in LUA scripts and 2) using hashtags to ensure that keys that are accessed together would hash to the same hash slot.

Interval Based Structs

IntervalSet

Creating very large numbers of keys can hurt performance, but putting them all in a single set requires that they all have the same expiration. IntervalSet is a way to have multiple sets based on time intervals, accessible like a single set. You trade accuracy of expiry times for a significantly reduced key space. For example using 2 intervals of 24 hours:

set := vkutil.NewIntervalSet("foos", time.Hour*24, 2)
set.Add(ctx, vc, "A")  // time is 2021-12-02T09:00
...
set.Add(ctx, vc, "B")  // time is 2021-12-03T10:00
set.Add(ctx, vc, "C")  // time is 2021-12-03T11:00

Creates 2 sets as follows:

{foos}:2021-12-02 => {"A"}       // expires at 2021-12-04T09:00
{foos}:2021-12-03 => {"B", "C"}  // expires at 2021-12-05T11:00

But can be accessed like a single set:

set.IsMember(ctx, vc, "A")   // true
set.IsMember(ctx, vc, "B")   // true
set.IsMember(ctx, vc, "D")   // false
IntervalHash

Same idea as IntervalSet but for hashes, and works well for caching values. For example using 2 intervals of 1 hour:

hash := vkutil.NewIntervalHash("foos", time.Hour, 2)
hash.Set(ctx, vc, "A", "1")  // time is 2021-12-02T09:10
...
hash.Set(ctx, vc, "B", "2")  // time is 2021-12-02T10:15
hash.Set(ctx, vc, "C", "3")  // time is 2021-12-02T10:20

Creates 2 hashes like:

{foos}:2021-12-02T09:00 => {"A": "1"}            // expires at 2021-12-02T11:10
{foos}:2021-12-02T10:00 => {"B": "2", "C": "3"}  // expires at 2021-12-02T12:20

But can be accessed like a single hash:

hash.Get(ctx, vc, "A")   // "1"
hash.Get(ctx, vc, "B")   // "2"
hash.Get(ctx, vc, "D")   // ""
IntervalSeries

When getting a value from an IntervalHash you're getting the newest value by looking back through the intervals. IntervalSeries however lets you get an accumulated value from each interval.

For example using 3 intervals of 1 hour:

series := vkutil.NewIntervalSeries("foos", time.Hour, 3)
series.Record(ctx, vc, "A", 1)  // time is 2021-12-02T09:10
series.Record(ctx, vc, "A", 2)  // time is 2021-12-02T09:15
...
series.Record(ctx, vc, "A", 3)  // time is 2021-12-02T10:15
series.Record(ctx, vc, "A", 4)  // time is 2021-12-02T10:20
...
series.Record(ctx, vc, "A", 5)  // time is 2021-12-02T11:25
series.Record(ctx, vc, "B", 1)  // time is 2021-12-02T11:30

Creates 3 hashes like:

{foos}:2021-12-02T09:00 => {"A": "3"}            // expires at 2021-12-02T12:15
{foos}:2021-12-02T10:00 => {"A": "7"}            // expires at 2021-12-02T13:20
{foos}:2021-12-02T11:00 => {"A": "5", "B": "1"}  // expires at 2021-12-02T14:30

But lets us retrieve values across intervals:

series.Get(ctx, vc, "A")   // [5, 7, 3]
series.Get(ctx, vc, "B")   // [1, 0, 0]
series.Get(ctx, vc, "C")   // [0, 0, 0]

Queues

Fair
import "github.com/nyaruka/vkutil/queues"

queue := queues.NewFair("jobs", 10)
queue.Push(ctx, vc, "owner1", true, []byte(`{...}`))
queue.Push(ctx, vc, "owner2", false, []byte(`{...}`))
owner, task, err := queue.Pop(ctx, vc)
...
queue.Done(ctx, vc, owner)

Locks

Locker
import "github.com/nyaruka/vkutil/locks"

locker := locks.NewLocker("mylock", time.Minute)
lock, err := locker.Grab(ctx, vp, 10 * time.Second)
...
locker.Release(ctx, vc, lock)

Other

NewPool

Simplifies creating a new connection pool, with optional auth, and tests that the connection works:

vp, err := vkutil.NewPool(
    "valkey://username:password@localhost:6379/15", 
    vkutil.WithMaxActive(10), 
    vkutil.WithMaxIdle(3), 
    vkutil.WithIdleTimeout(time.Minute)
)
CappedZSet

The CappedZSet type is based on a sorted set but enforces a cap on size, by only retaining the highest ranked members.

cset := vkutil.NewCappedZSet("foos", 3, time.Hour*24)
cset.Add(ctx, vc, "A", 1) 
cset.Add(ctx, vc, "C", 3) 
cset.Add(ctx, vc, "D", 4)
cset.Add(ctx, vc, "B", 2) 
cset.Add(ctx, vc, "E", 5) 
cset.Members(ctx, vc)      // ["C", "D", "E"] / [3, 4, 5]

Testing

Asserts

The assertvk package contains several asserts useful for testing the state of a database.

vp := assertvk.TestDB()
vc := vp.Get()
defer vc.Close()

assertvk.Keys(t, vc, "*", []string{"foo", "bar"})
assertvk.Exists(t, vc, "foo")
assertvk.NotExists(t, vc, "bar")
assertvk.Get(t, vc, "foo", "123")
assertvk.SCard(t, vc, "foo_set", 2)
assertvk.SMembers(t, vc, "foo_set", []string{"123", "234"})

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func NewPool

func NewPool(redisURL string, options ...func(*valkey.Pool)) (*valkey.Pool, error)

NewPool creates a new pool with the given options

func RandomBase64

func RandomBase64(n int) string

RandomBase64 creates a random string of the length passed in

func StringsWithScores

func StringsWithScores(reply any, err error) ([]string, []float64, error)

StringsWithScores parses an array reply which is alternating pairs of strings and scores (floats)

func WithIdleTimeout

func WithIdleTimeout(v time.Duration) func(*valkey.Pool)

WithIdleTimeout configures how long to wait before reaping a connection

func WithMaxActive

func WithMaxActive(v int) func(*valkey.Pool)

WithMaxActive configures maximum number of concurrent connections to allow

func WithMaxIdle

func WithMaxIdle(v int) func(*valkey.Pool)

WithMaxIdle configures the maximum number of idle connections to keep

Types

type CappedZSet

type CappedZSet struct {
	// contains filtered or unexported fields
}

CappedZSet is a sorted set but enforces a cap on size

func NewCappedZSet

func NewCappedZSet(key string, cap int, expire time.Duration) *CappedZSet

NewCappedZSet creates a new capped sorted set

func (*CappedZSet) Add

func (z *CappedZSet) Add(ctx context.Context, vc valkey.Conn, member string, score float64) error

Add adds an element to the set, if its score puts in the top `cap` members

func (*CappedZSet) Card

func (z *CappedZSet) Card(ctx context.Context, vc valkey.Conn) (int, error)

Card returns the cardinality of the set

func (*CappedZSet) Members

func (z *CappedZSet) Members(ctx context.Context, vc valkey.Conn) ([]string, []float64, error)

Members returns all members of the set, ordered by ascending rank

type IntervalHash

type IntervalHash struct {
	// contains filtered or unexported fields
}

IntervalHash operates like a hash map but with expiring intervals

func NewIntervalHash

func NewIntervalHash(keyBase string, interval time.Duration, size int) *IntervalHash

NewIntervalHash creates a new empty interval hash

func (*IntervalHash) Clear

func (h *IntervalHash) Clear(ctx context.Context, vc valkey.Conn) error

Clear removes all fields

func (*IntervalHash) Del

func (h *IntervalHash) Del(ctx context.Context, vc valkey.Conn, fields ...string) error

Del removes the given fields

func (*IntervalHash) Get

func (h *IntervalHash) Get(ctx context.Context, vc valkey.Conn, field string) (string, error)

Get returns the value of the given field

func (*IntervalHash) MGet

func (h *IntervalHash) MGet(ctx context.Context, vc valkey.Conn, fields ...string) ([]string, error)

MGet returns the values of the given fields

func (*IntervalHash) Set

func (h *IntervalHash) Set(ctx context.Context, vc valkey.Conn, field, value string) error

Set sets the value of the given field

type IntervalSeries

type IntervalSeries struct {
	// contains filtered or unexported fields
}

IntervalSeries returns all values from interval based hashes.

func NewIntervalSeries

func NewIntervalSeries(keyBase string, interval time.Duration, size int) *IntervalSeries

NewIntervalSeries creates a new empty series

func (*IntervalSeries) Get

func (s *IntervalSeries) Get(ctx context.Context, vc valkey.Conn, field string) ([]int64, error)

Get gets the values of field in all intervals

func (*IntervalSeries) Record

func (s *IntervalSeries) Record(ctx context.Context, vc valkey.Conn, field string, value int64) error

Record increments the value of field by value in the current interval

func (*IntervalSeries) Total

func (s *IntervalSeries) Total(ctx context.Context, vc valkey.Conn, field string) (int64, error)

Total gets the total value of field across all intervals

type IntervalSet

type IntervalSet struct {
	// contains filtered or unexported fields
}

IntervalSet operates like a set but with expiring intervals

func NewIntervalSet

func NewIntervalSet(keyBase string, interval time.Duration, size int) *IntervalSet

NewIntervalSet creates a new empty interval set

func (*IntervalSet) Add

func (s *IntervalSet) Add(ctx context.Context, vc valkey.Conn, member string) error

Add adds the given value

func (*IntervalSet) Clear

func (s *IntervalSet) Clear(ctx context.Context, vc valkey.Conn) error

Clear removes all values

func (*IntervalSet) IsMember

func (s *IntervalSet) IsMember(ctx context.Context, vc valkey.Conn, member string) (bool, error)

IsMember returns whether we contain the given value

func (*IntervalSet) Rem

func (s *IntervalSet) Rem(ctx context.Context, vc valkey.Conn, members ...string) error

Rem removes the given values

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL