Documentation
¶
Overview ¶
Package rater provides the functionality to calculate the processing rate of each vertex partition. The processing rate is calculated based on the metric forwarder_data_read_total.
Index ¶
- func CalculateLookback(counts []*TimestampedCounts, startIndex, endIndex int) int64
- func CalculatePending(q *sharedqueue.OverflowQueue[*TimestampedCounts], lookbackSeconds int64, ...) int64
- func CalculateRate(q *sharedqueue.OverflowQueue[*TimestampedCounts], lookbackSeconds int64, ...) float64
- type Option
- type PodInfo
- type PodPendingCount
- type PodReadCount
- type PodTracker
- type PodTrackerOption
- type Ratable
- type Rater
- type TimestampedCounts
- func (tc *TimestampedCounts) PodPartitionCountSnapshot() map[string]map[string]float64
- func (tc *TimestampedCounts) PodTimestamp() int64
- func (tc *TimestampedCounts) String() string
- func (tc *TimestampedCounts) Update(podReadCount *PodReadCount)
- func (tc *TimestampedCounts) UpdatePending(podPendingCount *PodPendingCount)
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func CalculateLookback ¶ added in v1.6.0
func CalculateLookback(counts []*TimestampedCounts, startIndex, endIndex int) int64
CalculateLookback computes the maximum duration (in seconds) for which the count of messages processed across all the partitions of a given vertex remain unchanged. This helps determine how long the system should look back when calculating processing rates.
The function analyzes timestamped count data to find the longest period during which any partition had no change in its processed message count. This is useful for two scenarios: 1. Slow processing vertices - where a vertex takes a long time to process messages 2. Slow data sources - where data arrives infrequently, causing long gaps between count changes
The returned duration represents the maximum "unchanged period" across all partitions, which helps the system adjust its lookback window to ensure it captures enough data for accurate rate calculations.
func CalculatePending ¶ added in v1.6.0
func CalculatePending(q *sharedqueue.OverflowQueue[*TimestampedCounts], lookbackSeconds int64, partitionName string) int64
CalculatePending calculates the pending messages for a given partition in the last lookback seconds
func CalculateRate ¶
func CalculateRate(q *sharedqueue.OverflowQueue[*TimestampedCounts], lookbackSeconds int64, partitionName string) float64
CalculateRate calculates the rate of the vertex partition in the last lookback seconds
Types ¶
type PodInfo ¶ added in v1.7.0
type PodInfo struct {
// contains filtered or unexported fields
}
PodInfo represents the information of a pod that is used for tracking the processing rate
type PodPendingCount ¶ added in v1.6.0
type PodPendingCount struct {
// contains filtered or unexported fields
}
func (*PodPendingCount) Name ¶ added in v1.6.0
func (p *PodPendingCount) Name() string
func (*PodPendingCount) PartitionPendingCounts ¶ added in v1.6.0
func (p *PodPendingCount) PartitionPendingCounts() map[string]float64
type PodReadCount ¶ added in v0.9.0
type PodReadCount struct {
// contains filtered or unexported fields
}
PodReadCount is a struct to maintain count of messages read from each partition by a pod
func (*PodReadCount) Name ¶ added in v0.9.0
func (p *PodReadCount) Name() string
func (*PodReadCount) PartitionReadCounts ¶ added in v0.9.0
func (p *PodReadCount) PartitionReadCounts() map[string]float64
type PodTracker ¶
type PodTracker struct {
// contains filtered or unexported fields
}
PodTracker maintains a set of active pods for a pipeline It periodically sends http requests to pods to check if they are still active
func NewPodTracker ¶
func NewPodTracker(ctx context.Context, p *v1alpha1.Pipeline, opts ...PodTrackerOption) *PodTracker
func (*PodTracker) GetActivePodsCount ¶ added in v0.10.0
func (pt *PodTracker) GetActivePodsCount() int
GetActivePodsCount returns the number of active pods.
func (*PodTracker) GetPodInfo ¶ added in v1.0.0
func (pt *PodTracker) GetPodInfo(key string) (*PodInfo, error)
func (*PodTracker) IsActive ¶ added in v0.10.0
func (pt *PodTracker) IsActive(podKey string) bool
IsActive returns true if the pod is active, false otherwise.
func (*PodTracker) LeastRecentlyUsed ¶ added in v0.10.0
func (pt *PodTracker) LeastRecentlyUsed() string
LeastRecentlyUsed returns the least recently used pod from the active pod list. if there are no active pods, it returns an empty string.
type PodTrackerOption ¶
type PodTrackerOption func(*PodTracker)
func WithRefreshInterval ¶
func WithRefreshInterval(d time.Duration) PodTrackerOption
WithRefreshInterval sets how often to refresh the rate metrics.
type Ratable ¶ added in v0.9.0
type Ratable interface {
Start(ctx context.Context) error
GetRates(vertexName, partitionName string) map[string]*wrapperspb.DoubleValue
GetPending(pipelineName, vertexName, vertexType, partitionName string) map[string]*wrapperspb.Int64Value
}
type Rater ¶
type Rater struct {
// contains filtered or unexported fields
}
Rater is a struct that maintains information about the processing rate of each vertex. It monitors the number of processed messages for each pod in a vertex and calculates the rate.
func (*Rater) GetPending ¶ added in v1.6.0
func (r *Rater) GetPending(pipelineName, vertexName, vertexType, partitionName string) map[string]*wrapperspb.Int64Value
GetPending returns the pending count for the vertex partition in the format of lookback second to pending mappings
func (*Rater) GetRates ¶
func (r *Rater) GetRates(vertexName, partitionName string) map[string]*wrapperspb.DoubleValue
GetRates returns the processing rates of the vertex partition in the format of lookback second to rate mappings
type TimestampedCounts ¶
type TimestampedCounts struct {
// contains filtered or unexported fields
}
TimestampedCounts track the total count of processed messages for a list of pods at a given timestamp
func NewTimestampedCounts ¶
func NewTimestampedCounts(t int64) *TimestampedCounts
func (*TimestampedCounts) PodPartitionCountSnapshot ¶ added in v0.9.0
func (tc *TimestampedCounts) PodPartitionCountSnapshot() map[string]map[string]float64
PodPartitionCountSnapshot returns a copy of podPartitionCount it's used to ensure the returned map is not modified by other goroutines
func (*TimestampedCounts) PodTimestamp ¶ added in v1.6.0
func (tc *TimestampedCounts) PodTimestamp() int64
func (*TimestampedCounts) String ¶ added in v0.10.0
func (tc *TimestampedCounts) String() string
String returns a string representation of the TimestampedCounts it's used for debugging purpose
func (*TimestampedCounts) Update ¶
func (tc *TimestampedCounts) Update(podReadCount *PodReadCount)
Update updates the count of processed messages for a pod
func (*TimestampedCounts) UpdatePending ¶ added in v1.6.0
func (tc *TimestampedCounts) UpdatePending(podPendingCount *PodPendingCount)