Documentation
¶
Index ¶
Constants ¶
const ( // DisableTimeLimit can be passed as Config.MaxDuration to disable the time limit. DisableTimeLimit = maxDuration // DisableCountLimit can be passed as Config.MaxCount to disable the count limit. DisableCountLimit = 0 // ImmediateDelivery can be passed as Config.MaxCount along with DisableTimeLimit to enable immediate delivery. ImmediateDelivery = 1 )
Variables ¶
This section is empty.
Functions ¶
func NewStarter ¶
func NewStarter[T any](conf Config[T], clock quartz.Clock) (start func(ctx context.Context) *Aggregator[T])
NewStarter initiates an aggregator and returns a starter function to start and get the aggregator.
Types ¶
type Aggregator ¶
type Aggregator[T any] struct { // contains filtered or unexported fields }
Aggregator is used to aggregate events of any type. Events are accumulated until either the time limit specified by Config.MaxDuration or the count limit specified by Config.MaxCount is reached. When a limit is reached, the events are processed using the handler defined in Config.Handler. To ensure non-blocking handling of events, Config.QueueSize can be set, which is useful if event handling can be slower than event generation. Optionally, a Config.OnQueueFull callback can be specified to handle situations when the buffer is full.
func StartNew ¶
func StartNew[T any](ctx context.Context, conf Config[T]) *Aggregator[T]
StartNew initiates and starts a new aggregator for the provided type and configurations.
func (*Aggregator[T]) OnEvent ¶
func (a *Aggregator[T]) OnEvent(event T)
OnEvent adds a new event to be processed by the aggregator.
func (*Aggregator[T]) QueueLen ¶
func (a *Aggregator[T]) QueueLen() int
QueueLen returns the number of aggregations that are queued for handling.
type Config ¶
type Config[T any] struct { // MaxDuration is the max duration for an event to wait for delivery. // This limit can be disabled using DisableTimeLimit. MaxDuration time.Duration // MaxCount is the max number of events to be buffered. // This limit can be disabled using DisableCountLimit. // A value of indicates no limit on buffering while a limit of 1 indicates immediate event delivery. MaxCount int // Handler is the callback for handling aggregated events. Handler func(events []T) // QueueSize represents max number of aggregations that can be queued for handling. QueueSize int // OnQueueFull is an optional callback to be called when the queue is full. OnQueueFull func(events []T) }