aggregator

package module
v0.0.0-...-92edfa0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 12, 2025 License: MIT Imports: 4 Imported by: 0

README

Aggregator

An aggregator is used to aggregate events of any type until either the time limit or the count limit is reached. It supports configuring a non-blocking behavior and may be combined with a static pool for increased concurrency.

Usage

Configuration
  • MaxDuration: Maximum duration for an event to wait for delivery.
  • MaxCount: Maximum number of events to be buffered.
  • Handler: Callback function for handling aggregated events.
  • QueueSize: Maximum number of aggregations that can be queued for handling.
  • OnQueueFull: Optional callback function to be called when the queue is full.
Example
package main

import (
	"context"
	"fmt"
	"github.com/gavraz/async/aggregator"
	"sync"
	"time"
)

func main() {
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	conf := aggregator.Config[int]{
		MaxDuration: 5 * time.Second,
		MaxCount:    10,
		Handler: func(events []int) {
			fmt.Println("Handling events:", events)
		},
		QueueSize: 5,
		OnQueueFull: func(events []int) {
			fmt.Println("Queue full, dropping events:", events)
		},
	}

	agg := aggregator.StartNew(ctx, conf)

	for i := 0; i < 20; i++ {
		go func(event int) {
			agg.OnEvent(event)
		}(i)
	}

	time.Sleep(time.Second)

	// output:
	// Handling events: [2 1 3 4 5 6 0 8 7 9]
	// Handling events: [19 10 11 12 13 14 15 16 17 18]
}

Documentation

Index

Constants

View Source
const (
	// DisableTimeLimit can be passed as Config.MaxDuration to disable the time limit.
	DisableTimeLimit = maxDuration
	// DisableCountLimit can be passed as Config.MaxCount to disable the count limit.
	DisableCountLimit = 0
	// ImmediateDelivery can be passed as Config.MaxCount along with DisableTimeLimit to enable immediate delivery.
	ImmediateDelivery = 1
)

Variables

This section is empty.

Functions

func NewStarter

func NewStarter[T any](conf Config[T], clock quartz.Clock) (start func(ctx context.Context) *Aggregator[T])

NewStarter initiates an aggregator and returns a starter function to start and get the aggregator.

Types

type Aggregator

type Aggregator[T any] struct {
	// contains filtered or unexported fields
}

Aggregator is used to aggregate events of any type. Events are accumulated until either the time limit specified by Config.MaxDuration or the count limit specified by Config.MaxCount is reached. When a limit is reached, the events are processed using the handler defined in Config.Handler. To ensure non-blocking handling of events, Config.QueueSize can be set, which is useful if event handling can be slower than event generation. Optionally, a Config.OnQueueFull callback can be specified to handle situations when the buffer is full.

func StartNew

func StartNew[T any](ctx context.Context, conf Config[T]) *Aggregator[T]

StartNew initiates and starts a new aggregator for the provided type and configurations.

func (*Aggregator[T]) OnEvent

func (a *Aggregator[T]) OnEvent(event T)

OnEvent adds a new event to be processed by the aggregator.

func (*Aggregator[T]) QueueLen

func (a *Aggregator[T]) QueueLen() int

QueueLen returns the number of aggregations that are queued for handling.

type Config

type Config[T any] struct {
	// MaxDuration is the max duration for an event to wait for delivery.
	// This limit can be disabled using DisableTimeLimit.
	MaxDuration time.Duration
	// MaxCount is the max number of events to be buffered.
	// This limit can be disabled using DisableCountLimit.
	// A value of indicates no limit on buffering while a limit of 1 indicates immediate event delivery.
	MaxCount int
	// Handler is the callback for handling aggregated events.
	Handler func(events []T)
	// QueueSize represents max number of aggregations that can be queued for handling.
	QueueSize int
	// OnQueueFull is an optional callback to be called when the queue is full.
	OnQueueFull func(events []T)
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL