signalutils

package module
v1.12.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 1, 2020 License: MIT Imports: 7 Imported by: 4

README

signalutils

Build Status

Event/Signal processing utilities lib for Golang. Online moving averager, linear regression, timed value, worker frequency control in Golang etc

See API documentation at https://pkg.go.dev/github.com/flaviostutz/signalutils?tab=doc

Usage

package main
import (
	"fmt"
	"github.com/flaviostutz/signalutils"
)

func main() {
	fmt.Printf("Moving Average\n")
	ma := signalutils.NewMovingAverage(5)
	ma.AddSample(0.00)
	ma.AddSample(99999.00)
	fmt.Printf("Average is %f\n", ma.Average())
	ma.AddSample(1000.00)
	ma.AddSample(2000.00)
	fmt.Printf("Average is %f\n", ma.Average())
	ma.AddSample(3000.00)
	ma.AddSample(4000.00)
	fmt.Printf("Average is %f\n", ma.Average())
	ma.AddSample(5000.00)
	ma.AddSample(6000.00)
	fmt.Printf("Average is %f\n", ma.Average())
}

Results

Moving Average
Average is 49999.500000
Average is 25749.750000
Average is 21999.800000
Average is 4000.000000

Utilities

  • MovingAverage - add values to an array with a fixed max size and query for the average of values in this fixed size array
	ma := NewMovingAverageTimeWindow(1*time.Second, 10)
	ma.AddSample(1000)
	ma.AddSample(2000)
	ma.AddSample(3000)
	ma.AddSample(4000)
	ma.AddSample(3000)
	ma.AddSample(2000)
	ma.AddSample(3000)
	ma.AddSample(2000)
	assert.Equal(t, 1000.0, ma.Average())
	time.Sleep(100 * time.Millisecond)
	ma.AddSample(3000)
	assert.Equal(t, 2000.0, ma.Average())
	ma.AddSample(2000)
	assert.Equal(t, 2000.0, ma.Average())
  • SchmittTrigger - set current values and track current up/down state based on schmitt trigger algorithm
	st, _ := NewSchmittTrigger(10, 20, false)
	assert.False(t, st.IsUpperRange())
	st.SetCurrentValue(11)
	assert.False(t, st.IsUpperRange())
	st.SetCurrentValue(15)
	assert.False(t, st.IsUpperRange())

	st.SetCurrentValue(21)
	assert.True(t, st.IsUpperRange())
	st.SetCurrentValue(25)
	assert.True(t, st.IsUpperRange())
	st.SetCurrentValue(18)
	assert.True(t, st.IsUpperRange())
  • DynamicSchmittTrigger - set current values and track current up/down state based on schmitt trigger algorithm. Trigger points are set dynamically set in a timelly manner.

  • StateTracker - set state identifications and if state has lots of successive repetitions, perform a state transition. Useful to filter out noises from state changes.

	st := NewStateTracker("state1", 3, onChange, 0, nil, true)
	st.SetTransientState("state2")
	st.SetTransientState("state2")
	assert.Equal(t, "state1", st.CurrentState.Name)
	st.SetTransientState("state2")
	assert.Equal(t, "state2", notifiedNewState.Name)
	st.SetTransientState("state3")
	assert.Equal(t, "state2", st.CurrentState.Name)
	st.SetTransientState("state3")
	st.SetTransientState("state2")
	st.SetTransientState("state3")
	st.SetTransientState("state3")
	assert.Equal(t, "state2", st.CurrentState.Name)
  • Timeseries - time/value array with max time span for keeping size at control. If you try to get values between time points, interpolation will occur.
	ts := NewTimeseries(1000 * time.Millisecond)

	ts.AddSample(-100)
	time.Sleep(500 * time.Millisecond)
	ts.AddSample(-1000)

	nv, ok := ts.GetValue(time.Now().Add(-250 * time.Millisecond))
	assert.True(t, ok)
	assert.InDeltaf(t, float64(-555), nv.Value, float64(20), "")
  • TimeseriesCounterRate - add counter values to a timeseries and query for rate at any time range. Something that ressembles "rate(metric_name[1m])" on Prometheus queries, for example.
	ts := NewTimeseriesCounterRate(1 * time.Second)

	_, ok := ts.Rate(1 * time.Second)
	assert.False(t, ok)

	ts.Inc(100000) //100000
	time.Sleep(300 * time.Millisecond)
	ts.Inc(100000) //200000
	time.Sleep(300 * time.Millisecond)
	ts.Inc(200000) //400000
	time.Sleep(300 * time.Millisecond)
	ts.Inc(100000) //500000
	time.Sleep(300 * time.Millisecond)

	_, ok = ts.Rate(2 * time.Second)
	assert.False(t, ok)
  • Worker - useful for workloads that works on a "while true" loop. It launches a Go routine with a function, limits the loop frequency, measures actual frequency and alerts if frequency is outside desired limits.
	w := StartWorker("test1", func() error {
		//do some real work here
		time.Sleep(15 * time.Millisecond)
		return nil
	}, 5, true)
	time.Sleep(200 * time.Millisecond)
	assert.True(t, w.active)
	time.Sleep(2000 * time.Millisecond)
	assert.InDeltaf(t, 5, w.CurrentFreq, 2, "")
	assert.InDeltaf(t, 15, w.CurrentStepTime.Milliseconds(), 5, "")
	w.Stop()
	time.Sleep(300 * time.Millisecond)
	assert.False(t, w.active)

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type DynamicSchmittTrigger

type DynamicSchmittTrigger struct {
	// contains filtered or unexported fields
}

DynamicSchmittTrigger adjusts its internal lower and upper limits according to a moving average on observed values Only initialize this with NewDynamicSchmittTriggerTimeWindow(..)

func NewDynamicSchmittTriggerTimeWindow

func NewDynamicSchmittTriggerTimeWindow(minMaxMovingAverageTime time.Duration, maxMovingAverageSamples int, groupByMinMaxSamples int, ignoreSamplesTooDifferentRatio float64, minMaxUpperLowerRatio float64, upperRange bool) (DynamicSchmittTrigger, error)

NewDynamicSchmittTriggerTimeWindow new schmitt trigger creation minMaxMovingAverageSamples defines the time window of the moving average that defines min/max values for schmitt trigger maxMovingAverageSamples - max number of samples in minmax averager. if a too high rate of samples are set, some may be ignored groupByMinMaxSamples - number of signal samples to use to calculate each min/max sampling ignoreSamplesTooHighRatio - if SetCurrentValue sets a value that is too high or too low according to min/max moving average, ignore it minMaxUpperLowerRatio - 1.0 indicates the lower and upper limits will be placed just like the min/max moving average, which is not too practical. A number between 0.3 and 0.7 is good here.

func (*DynamicSchmittTrigger) GetLowerUpperLimits

func (s *DynamicSchmittTrigger) GetLowerUpperLimits() (float64, float64)

GetLowerUpperLimits get current lower and upper limits for this schmtt trigger

func (*DynamicSchmittTrigger) IsUpperRange

func (s *DynamicSchmittTrigger) IsUpperRange() bool

IsUpperRange returns if this trigger is in upper or low range

func (*DynamicSchmittTrigger) SetCurrentValue

func (s *DynamicSchmittTrigger) SetCurrentValue(value float64) (bool, float64)

SetCurrentValue set current value and calculate if it is in upper or lower range returns 1-true or false if value was accepted by internal moving averager (rate not too high)

2-how much the current value is distant from the lower limit (if it is in 'upperRange' state) or distant from the upper limit (if in 'lowerRange' state) for a new change to occur in trigger. a ratio in relation to max-min range will be returned

type MovingAverage

type MovingAverage struct {
	Size    int
	Samples []float64
	// contains filtered or unexported fields
}

MovingAverage running moving averager Only initialize this with NewMovingAverage(..)

func NewMovingAverage

func NewMovingAverage(size int) MovingAverage

NewMovingAverage creates a new moving averager with a fixed size

func NewMovingAverageTimeWindow

func NewMovingAverageTimeWindow(samplesDuration time.Duration, maxSamples int) MovingAverage

NewMovingAverageTimeWindow creates a new moving averager that will average samples no older than 'samplesDuration', limiting the number of samples to 'maxSamples' in time window. If two consecutive samples are added to the averager in a period less than duration/maxSamples, it will be ignored.

func (*MovingAverage) AddSample

func (m *MovingAverage) AddSample(value float64) bool

AddSample adds a new sample to the moving average. If there is more than 'size' samples, the oldest sample will be removed. If this is a timed window averager and the last sample was added in less than sampleDurate/maxSamples time, it will be ignored.

func (*MovingAverage) AddSampleIfNearAverage

func (m *MovingAverage) AddSampleIfNearAverage(value float64, avgDiff float64) bool

AddSampleIfNearAverage Add sample only if its value is near current average to avoid espurious samples to be added to the average. *avgDiff* 1 means samples between [-currentAvg, +currentAvg] will be accepted. Returns true if sample was accepted

func (*MovingAverage) Average

func (m *MovingAverage) Average() float64

Average computes average with current samples in fixed length list

func (*MovingAverage) AverageMinMax

func (m *MovingAverage) AverageMinMax(groupBySamples int) (float64, float64)

AverageMinMax - returns the min/max values in current window Group min/max each 'groupBySamples' and perform average over theses samples for min and max values

func (*MovingAverage) Reset

func (m *MovingAverage) Reset()

Reset internal samples

type SchmittTrigger

type SchmittTrigger struct {
	LowerLimit float64
	UpperLimit float64
	UpperRange bool
	// contains filtered or unexported fields
}

SchmittTrigger utility Only initialize this with NewSchmittTrigger(..)

func NewSchmittTrigger

func NewSchmittTrigger(lowerLimit float64, upperLimit float64, upperRange bool) (SchmittTrigger, error)

NewSchmittTrigger new schmitt trigger creation

func (*SchmittTrigger) IsUpperRange

func (s *SchmittTrigger) IsUpperRange() bool

IsUpperRange returns whatever it is in upper range or not

func (*SchmittTrigger) SetCurrentValue

func (s *SchmittTrigger) SetCurrentValue(value float64)

SetCurrentValue set current value and calculate if it is in upper or lower range

func (*SchmittTrigger) UpdateLowerUpperLimits

func (s *SchmittTrigger) UpdateLowerUpperLimits(lowerLimit float64, upperLimit float64)

UpdateLowerUpperLimits changes current lower/upper limits for schmitt trigger

type State added in v1.3.0

type State struct {
	Name         string
	Start        time.Time
	Stop         *time.Time
	Data         interface{}
	Level        *float64
	HighestLevel *float64
	HighestTime  *time.Time
	HighestData  interface{}
}

State event struct

type StateTracker

type StateTracker struct {
	CurrentState   *State
	CandidateState string
	CandidateCount int
	// contains filtered or unexported fields
}

StateTracker state transition tracker

func NewStateTracker

func NewStateTracker(initialState string, changeConfirmations int, onChange func(*State, *State), unchangedTimer time.Duration, onUnchanged func(*State), resetHighestOnunchanged bool) *StateTracker

NewStateTracker new state transition tracker instantiation initialState - states are simply strings. a different string denotes a new state changeConfirmations - number of sequential state samples with a different state before transitioning onChange - listener function that will be called on state transition. ex.: func(newState, previousState) {}. nil value disables this unchangedTimer - after this time without changing state, 'onUnchanged' func will be invoked recurrently//. current highest sample will be calculated based on this time slice onUnchanged - listener function to be invoked if state is not changed after unchangedStateCount. onUnchanged(state). nil value disables this feature resetHighestOnunchanged - calculate highest level according to whole state duration (false) or only during the onChanged recurrent timer

func (*StateTracker) Close

func (s *StateTracker) Close()

Close closes the internal timers for notifying unchanged

func (*StateTracker) SetTransientState

func (s *StateTracker) SetTransientState(stateName string) (*State, error)

SetTransientState sets a transient state to tracker so that it can find possible transitions if this state gets recurrent returns the time this state started

func (*StateTracker) SetTransientStateWithData

func (s *StateTracker) SetTransientStateWithData(stateName string, level float64, data interface{}) (*State, error)

SetTransientStateWithData sets a transient state to tracker so that it can find possible transitions if this state gets recurrent data is any type that will be sent to listener function returns current state count

type StepFunc added in v1.5.0

type StepFunc func() error

StepFunc function interface for the application that will be called in a loop

type TimeValue added in v1.3.0

type TimeValue struct {
	Time  time.Time
	Value float64
}

TimeValue a point in time

type Timeseries added in v1.3.0

type Timeseries struct {
	TimeseriesSpan time.Duration
	Values         []TimeValue
	// contains filtered or unexported fields
}

Timeseries utility Only initialize this with NewTimeseries(..)

func NewTimeseries added in v1.3.0

func NewTimeseries(maxTimeseriesSpan time.Duration) Timeseries

NewTimeseries create a new timeseries with a limited size in time. After that limit older values will be deleted from time to time to avoid too much memory usage

func (*Timeseries) Add added in v1.7.0

func (t *Timeseries) Add(value float64)

Add add a new sample to this timeseries using time.Now()

func (*Timeseries) AddWithTime added in v1.9.0

func (t *Timeseries) AddWithTime(value float64, when time.Time) error

AddWithTime adds ad new sample to the head of this timeseries 'when' must be after the last element (no middle insertions allowed)

func (*Timeseries) Avg added in v1.7.0

func (t *Timeseries) Avg(from time.Time, to time.Time) (value float64, ok bool)

Avg calculates the average value of points compreended between time 'from' and 'to' No interpolation is used here

func (*Timeseries) Get added in v1.7.0

func (t *Timeseries) Get(time time.Time) (tv TimeValue, ok bool)

Get get value in a specific time in timeseries. If time is between two points inside timeseries, the value will be interpolated according to the requested time and neighboring values If time is before or after timeseries points, ok is false

func (*Timeseries) Last added in v1.7.0

func (t *Timeseries) Last() (tv TimeValue, ok bool)

Last get last point in time element, the head element

func (*Timeseries) LinearRegression added in v1.8.0

func (t *Timeseries) LinearRegression(from time.Time, to time.Time) (alpha float64, beta float64, rsquared float64)

LinearRegression calculates the linear regression coeficients for the time range x is in range of time.UnixNano() returns alpha and beta as for y = alpha + beta*x and rsquared with fit from 0-1

func (*Timeseries) Pos added in v1.7.0

func (t *Timeseries) Pos(time time.Time) (i1 int, i2 int, ok bool)

Pos searches for which two point indexes are between the desired time Find the time is exacly the same as a point time, the two returned indexes will be equal

func (*Timeseries) Reset added in v1.3.0

func (t *Timeseries) Reset()

Reset remove all elements from this timeseries

func (*Timeseries) Size added in v1.3.0

func (t *Timeseries) Size() int

Size current number of elements in this timeseries

func (*Timeseries) StdDev added in v1.8.0

func (t *Timeseries) StdDev(from time.Time, to time.Time) (std float64, mean float64)

StdDev calculates the standard deviation and mean for the time range returns standard deviation and mean value

func (*Timeseries) ValuesRange added in v1.8.0

func (t *Timeseries) ValuesRange(from time.Time, to time.Time) (timeValues []TimeValue, values []float64)

ValuesRange get values in time range returns an array of TimeValue and and array with just the float values

type TimeseriesCounterRate added in v1.3.0

type TimeseriesCounterRate struct {
	Timeseries Timeseries
	// contains filtered or unexported fields
}

TimeseriesCounterRate this is a utility for storing counter values in time while enabling the measurement of rates in various time spans with without having to perform average over all points. The optimization strategy here is based on the fact that this timeseries contains a counter, so that averages between two times are calculated by just averaging the first and last points, not all the points between. Very useful for metrics monitoring. See more at https://prometheus.io/docs/concepts/metric_types/#counter Only initialize this with NewTimeseries(..)

func NewTimeseriesCounterRate added in v1.3.0

func NewTimeseriesCounterRate(timeseriesSpan time.Duration) TimeseriesCounterRate

NewTimeseriesCounterRate creates a time timeseries with max time span of timeseriesSpan

func (*TimeseriesCounterRate) Inc added in v1.3.0

func (t *TimeseriesCounterRate) Inc(value float64) error

Inc increments the last value from the timeseries by 'value' and sets add the new point with time.Now() time

func (*TimeseriesCounterRate) Rate added in v1.3.0

func (t *TimeseriesCounterRate) Rate(timeSpan time.Duration) (float64, bool)

Rate calculates the rate of change between the last point in time of this timeseries and the time in past, specified by timeSpan

func (*TimeseriesCounterRate) RateOverTime added in v1.9.0

func (t *TimeseriesCounterRate) RateOverTime(rateLen time.Duration, timeseriesSpan time.Duration) (ts Timeseries, ok bool)

RateOverTime calculates a new Timeseries containing rate over time which each value is a rate over 'rateLen' The timeseries total length will be of 'timeseriesSpan' For each point in the counter timeseries, there will be calculated the counter rate and put to the resulting new Timeseries

func (*TimeseriesCounterRate) RateRange added in v1.9.0

func (t *TimeseriesCounterRate) RateRange(from time.Time, to time.Time) (float64, bool)

RateRange calculate the rate of change in the date range

func (*TimeseriesCounterRate) Set added in v1.4.0

func (t *TimeseriesCounterRate) Set(value float64) error

Set sets the absolute value at time time.Now(). The value cannot be less then last value from the timeseries as this must be a counter

type Worker added in v1.5.0

type Worker struct {
	CurrentFreq     float64
	CurrentStepTime time.Duration
	// contains filtered or unexported fields
}

Worker utility for launching Go routines that will loop over a function the max frequency of calls to this function is limited and the actual frequency is measured

func StartWorker added in v1.5.0

func StartWorker(ctx context.Context, name string, step StepFunc, minFreq float64, maxFreq float64, stopOnErr bool) *Worker

StartWorker launches a Go routine looping in this step function limiting by maxFreq if the function is being run in a frequency less than minFreq, a logrus.Debug log will show this this situation happens when the function is too slow

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL