batching

package
v0.13.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 22, 2024 License: MIT Imports: 2 Imported by: 0

Documentation

Overview

Package batching provides a batcher to collect points and emit them as batches.

Example (Batcher)
package main

import (
	"context"
	"fmt"
	"log"
	"math/rand"
	"time"

	"github.com/InfluxCommunity/influxdb3-go/influxdb3"
	"github.com/InfluxCommunity/influxdb3-go/influxdb3/batching"
)

func main() {
	// Create a random number generator
	r := rand.New(rand.NewSource(456))

	// Instantiate a client using your credentials.
	client, err := influxdb3.NewFromEnv()
	if err != nil {
		log.Fatal(err)
	}

	// Close the client when finished and raise any errors.
	defer client.Close()

	// Synchronous use

	// Create a Batcher with a size of 5
	b := batching.NewBatcher(batching.WithSize(5))

	// Simulate delay of a second
	t := time.Now().Add(-54 * time.Second)

	// Write 54 points synchronously to the batcher
	for range 54 {
		p := influxdb3.NewPoint("stat",
			map[string]string{"location": "Paris"},
			map[string]any{
				"temperature": 15 + r.Float64()*20,
				"humidity":    30 + r.Int63n(40),
			},
			t)

		// Add the point to the batcher
		b.Add(p)
		// Update time
		t = t.Add(time.Second)

		// If the batcher is ready, write the batch to the client and reset the batcher
		if b.Ready() {
			err := client.WritePoints(context.Background(), b.Emit())
			if err != nil {
				log.Fatal(err)
			}
		}
	}

	// Write the final batch to the client
	err = client.WritePoints(context.Background(), b.Emit())
	if err != nil {
		panic(err)
	}

	// Asynchronous use

	// Create a batcher with a size of 5, a ready callback and an emit callback to write the batch to the client
	b = batching.NewBatcher(
		batching.WithSize(5),
		batching.WithReadyCallback(func() { fmt.Println("ready") }),
		batching.WithEmitCallback(func(points []*influxdb3.Point) {
			err = client.WritePoints(context.Background(), points)
			if err != nil {
				log.Fatal(err)
			}
		}),
	)

	// Simulate delay of a second
	t = time.Now().Add(-54 * time.Second)

	// Write 54 points synchronously to the batcher
	for range 54 {
		p := influxdb3.NewPoint("stat",
			map[string]string{"location": "Madrid"},
			map[string]any{
				"temperature": 15 + r.Float64()*20,
				"humidity":    30 + r.Int63n(40),
			},
			t)

		// Add the point to the batcher
		b.Add(p)
		// Update time
		t = t.Add(time.Second)
	}

	// Write the final batch to the client
	err = client.WritePoints(context.Background(), b.Emit())
	if err != nil {
		log.Fatal(err)
	}
}

Index

Examples

Constants

View Source
const DefaultBatchSize = 1000

DefaultBatchSize is the default number of points emitted

View Source
const DefaultCapacity = 2 * DefaultBatchSize

DefaultCapacity is the default initial capacity of the point buffer

Variables

This section is empty.

Functions

This section is empty.

Types

type Batcher

type Batcher struct {
	sync.Mutex
	// contains filtered or unexported fields
}

Batcher collects points and emits them as batches

func NewBatcher

func NewBatcher(options ...Option) *Batcher

NewBatcher creates and initializes a new Batcher instance applying the specified options. By default, a batch-size is DefaultBatchSize and the initial capacity is DefaultCapacity.

func (*Batcher) Add

func (b *Batcher) Add(p ...*influxdb3.Point)

Add metric(s) to the batcher and call the given callbacks if any

func (*Batcher) Emit

func (b *Batcher) Emit() []*influxdb3.Point

Emit returns a new batch of points with the provided batch size or with the remaining points. Please drain the points at the end of your processing to get the remaining points not filling up a batch.

func (*Batcher) Ready

func (b *Batcher) Ready() bool

Ready tells the call if a new batch is ready to be emitted

type Option

type Option func(*Batcher)

Option to adapt properties of a batcher

func WithCapacity

func WithCapacity(capacity int) Option

WithCapacity changes the initial capacity of the points buffer

func WithEmitCallback

func WithEmitCallback(f func([]*influxdb3.Point)) Option

WithEmitCallback sets the function called when a new batch is ready with the batch of points. The batcher will wait for the callback to finish, so please return as fast as possible and move long-running processing to a go-routine.

func WithReadyCallback

func WithReadyCallback(f func()) Option

WithReadyCallback sets the function called when a new batch is ready. The batcher will wait for the callback to finish, so please return as fast as possible and move long-running processing to a go-routine.

func WithSize

func WithSize(size int) Option

WithSize changes the batch-size emitted by the batcher

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL