win

package
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 18, 2023 License: MIT Imports: 5 Imported by: 0

Documentation

Index

Examples

Constants

This section is empty.

Variables

View Source
var (
	ErrorKeyBehind = errors.New("Key is behind")
)

Functions

func NewBuckets

func NewBuckets[K Ordered, V any]() *buckets[K, V]

Types

type Bucket

type Bucket[K Ordered, V any] struct {
	Key K

	Mu    sync.RWMutex
	Value []V
}

Bucket is a key-value pair.

func (*Bucket[K, V]) Add

func (b *Bucket[K, V]) Add(v V)

Add adds a value to the bucket.

func (*Bucket[K, V]) Values

func (b *Bucket[K, V]) Values() []V

Values returns the values in the bucket.

type Ordered

type Ordered = constraints.Ordered

type Result

type Result[K Ordered, V any] struct {
	SlideOut      *Bucket[K, V]
	CurrentWindow []*Bucket[K, V]
}

Result is the result of slided out.

type Sliding

type Sliding[K Ordered, V any] struct {

	// SlidedChan is a channel to receive slided buckets.
	SlidedChan chan Result[K, V] // insert slided buckets into channel to inform other watchers
	// contains filtered or unexported fields
}

Sliding is a sliding window.

Each granularity has one bucket. One window contains several buckets.

The window slides forward by granularity and slides out a bucket.

Example
package main

import (
	"log"
	"math/rand"
	"time"

	"github.com/smallnest/exp/stat/win"
)

type Metric struct {
	TimeStamp int64
	DestIP    string
	Success   int
	Fail      int
}

func main() {
	w, err := win.NewChanSize[int64, Metric](time.Second, time.Second, 5*time.Second, 100)
	if err != nil {
		log.Fatal(err)
	}

	done := make(chan struct{})

	// use the sliding window in your code for stat.
	go func() {
		for {
			select {
			case <-done:
				return
			default:
			}

			ts := time.Now().UnixNano()
			key := ts / int64(time.Second)
			w.Add(key, Metric{
				TimeStamp: ts,
				DestIP:    "192.168.1.1",
				Success:   rand.Intn(10000),
				Fail:      rand.Intn(10),
			})

			time.Sleep(time.Millisecond)
		}
	}()

	for b := range w.SlidedChan {
		if b.SlideOut == nil {
			return
		}

		key := b.SlideOut.Key

		var total, fail int
		for _, v := range b.SlideOut.Values() {
			total += v.Success
			fail += v.Fail
		}

		log.Printf("key: %s, total: %d, fail: %d, %d buckets in current window", time.Unix(key, 0).Format(time.DateTime), total, fail, len(b.CurrentWindow))
	}

}

func New

func New[K Ordered, V any](window, granularity, delay time.Duration) (*Sliding[K, V], error)

New creates a new sliding window.

func NewChanSize

func NewChanSize[K Ordered, V any](window, granularity, delay time.Duration, chanSize int) (*Sliding[K, V], error)

NewChanSize creates a new sliding window with a channel size.

func (*Sliding[K, V]) Add

func (s *Sliding[K, V]) Add(key K, v V) error

Add adds a value to the current window.

func (*Sliding[K, V]) ForceForward

func (s *Sliding[K, V]) ForceForward()

ForceForward forces the window to slide forward once.

func (*Sliding[K, V]) Last

func (s *Sliding[K, V]) Last() *Bucket[K, V]

Last returns the last bucket.

func (*Sliding[_, _]) Stop

func (s *Sliding[_, _]) Stop()

Stop stops the sliding window.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL