rater

package
v1.7.5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 20, 2026 License: Apache-2.0 Imports: 21 Imported by: 0

Documentation

Overview

Package rater provides the functionality to calculate the processing rate of each vertex partition. The processing rate is calculated based on the metric forwarder_data_read_total.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func CalculateLookback added in v1.6.0

func CalculateLookback(counts []*TimestampedCounts, startIndex, endIndex int) int64

CalculateLookback computes the maximum duration (in seconds) for which the count of messages processed across all the partitions of a given vertex remain unchanged. This helps determine how long the system should look back when calculating processing rates.

The function analyzes timestamped count data to find the longest period during which any partition had no change in its processed message count. This is useful for two scenarios: 1. Slow processing vertices - where a vertex takes a long time to process messages 2. Slow data sources - where data arrives infrequently, causing long gaps between count changes

The returned duration represents the maximum "unchanged period" across all partitions, which helps the system adjust its lookback window to ensure it captures enough data for accurate rate calculations.

func CalculatePending added in v1.6.0

func CalculatePending(q *sharedqueue.OverflowQueue[*TimestampedCounts], lookbackSeconds int64, partitionName string) int64

CalculatePending calculates the pending messages for a given partition in the last lookback seconds

func CalculateRate

func CalculateRate(q *sharedqueue.OverflowQueue[*TimestampedCounts], lookbackSeconds int64, partitionName string) float64

CalculateRate calculates the rate of the vertex partition in the last lookback seconds

Types

type Option

type Option func(*options)

func WithTaskInterval

func WithTaskInterval(duration time.Duration) Option

func WithWorkers

func WithWorkers(n int) Option

type PodInfo added in v1.7.0

type PodInfo struct {
	// contains filtered or unexported fields
}

PodInfo represents the information of a pod that is used for tracking the processing rate

type PodPendingCount added in v1.6.0

type PodPendingCount struct {
	// contains filtered or unexported fields
}

func (*PodPendingCount) Name added in v1.6.0

func (p *PodPendingCount) Name() string

func (*PodPendingCount) PartitionPendingCounts added in v1.6.0

func (p *PodPendingCount) PartitionPendingCounts() map[string]float64

type PodReadCount added in v0.9.0

type PodReadCount struct {
	// contains filtered or unexported fields
}

PodReadCount is a struct to maintain count of messages read from each partition by a pod

func (*PodReadCount) Name added in v0.9.0

func (p *PodReadCount) Name() string

func (*PodReadCount) PartitionReadCounts added in v0.9.0

func (p *PodReadCount) PartitionReadCounts() map[string]float64

type PodTracker

type PodTracker struct {
	// contains filtered or unexported fields
}

PodTracker maintains a set of active pods for a pipeline It periodically sends http requests to pods to check if they are still active

func NewPodTracker

func NewPodTracker(ctx context.Context, p *v1alpha1.Pipeline, opts ...PodTrackerOption) *PodTracker

func (*PodTracker) GetActivePodsCount added in v0.10.0

func (pt *PodTracker) GetActivePodsCount() int

GetActivePodsCount returns the number of active pods.

func (*PodTracker) GetPodInfo added in v1.0.0

func (pt *PodTracker) GetPodInfo(key string) (*PodInfo, error)

func (*PodTracker) IsActive added in v0.10.0

func (pt *PodTracker) IsActive(podKey string) bool

IsActive returns true if the pod is active, false otherwise.

func (*PodTracker) LeastRecentlyUsed added in v0.10.0

func (pt *PodTracker) LeastRecentlyUsed() string

LeastRecentlyUsed returns the least recently used pod from the active pod list. if there are no active pods, it returns an empty string.

func (*PodTracker) Start

func (pt *PodTracker) Start(ctx context.Context) error

type PodTrackerOption

type PodTrackerOption func(*PodTracker)

func WithRefreshInterval

func WithRefreshInterval(d time.Duration) PodTrackerOption

WithRefreshInterval sets how often to refresh the rate metrics.

type Ratable added in v0.9.0

type Ratable interface {
	Start(ctx context.Context) error
	GetRates(vertexName, partitionName string) map[string]*wrapperspb.DoubleValue
	GetPending(pipelineName, vertexName, vertexType, partitionName string) map[string]*wrapperspb.Int64Value
}

type Rater

type Rater struct {
	// contains filtered or unexported fields
}

Rater is a struct that maintains information about the processing rate of each vertex. It monitors the number of processed messages for each pod in a vertex and calculates the rate.

func NewRater

func NewRater(ctx context.Context, p *v1alpha1.Pipeline, opts ...Option) *Rater

func (*Rater) GetPending added in v1.6.0

func (r *Rater) GetPending(pipelineName, vertexName, vertexType, partitionName string) map[string]*wrapperspb.Int64Value

GetPending returns the pending count for the vertex partition in the format of lookback second to pending mappings

func (*Rater) GetRates

func (r *Rater) GetRates(vertexName, partitionName string) map[string]*wrapperspb.DoubleValue

GetRates returns the processing rates of the vertex partition in the format of lookback second to rate mappings

func (*Rater) Start

func (r *Rater) Start(ctx context.Context) error

type TimestampedCounts

type TimestampedCounts struct {
	// contains filtered or unexported fields
}

TimestampedCounts track the total count of processed messages for a list of pods at a given timestamp

func NewTimestampedCounts

func NewTimestampedCounts(t int64) *TimestampedCounts

func (*TimestampedCounts) PodPartitionCountSnapshot added in v0.9.0

func (tc *TimestampedCounts) PodPartitionCountSnapshot() map[string]map[string]float64

PodPartitionCountSnapshot returns a copy of podPartitionCount it's used to ensure the returned map is not modified by other goroutines

func (*TimestampedCounts) PodTimestamp added in v1.6.0

func (tc *TimestampedCounts) PodTimestamp() int64

func (*TimestampedCounts) String added in v0.10.0

func (tc *TimestampedCounts) String() string

String returns a string representation of the TimestampedCounts it's used for debugging purpose

func (*TimestampedCounts) Update

func (tc *TimestampedCounts) Update(podReadCount *PodReadCount)

Update updates the count of processed messages for a pod

func (*TimestampedCounts) UpdatePending added in v1.6.0

func (tc *TimestampedCounts) UpdatePending(podPendingCount *PodPendingCount)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL