Documentation
¶
Overview ¶
Package batching provides a batcher to collect points and emit them as batches.
Example (Batcher) ¶
package main
import (
"context"
"fmt"
"log"
"math/rand"
"time"
"github.com/InfluxCommunity/influxdb3-go/influxdb3"
"github.com/InfluxCommunity/influxdb3-go/influxdb3/batching"
)
func main() {
// Create a random number generator
r := rand.New(rand.NewSource(456))
// Instantiate a client using your credentials.
client, err := influxdb3.NewFromEnv()
if err != nil {
log.Fatal(err)
}
// Close the client when finished and raise any errors.
defer client.Close()
// Synchronous use
// Create a Batcher with a size of 5
b := batching.NewBatcher(batching.WithSize(5))
// Simulate delay of a second
t := time.Now().Add(-54 * time.Second)
// Write 54 points synchronously to the batcher
for range 54 {
p := influxdb3.NewPoint("stat",
map[string]string{"location": "Paris"},
map[string]any{
"temperature": 15 + r.Float64()*20,
"humidity": 30 + r.Int63n(40),
},
t)
// Add the point to the batcher
b.Add(p)
// Update time
t = t.Add(time.Second)
// If the batcher is ready, write the batch to the client and reset the batcher
if b.Ready() {
err := client.WritePoints(context.Background(), b.Emit())
if err != nil {
log.Fatal(err)
}
}
}
// Write the final batch to the client
err = client.WritePoints(context.Background(), b.Emit())
if err != nil {
panic(err)
}
// Asynchronous use
// Create a batcher with a size of 5, a ready callback and an emit callback to write the batch to the client
b = batching.NewBatcher(
batching.WithSize(5),
batching.WithReadyCallback(func() { fmt.Println("ready") }),
batching.WithEmitCallback(func(points []*influxdb3.Point) {
err = client.WritePoints(context.Background(), points)
if err != nil {
log.Fatal(err)
}
}),
)
// Simulate delay of a second
t = time.Now().Add(-54 * time.Second)
// Write 54 points synchronously to the batcher
for range 54 {
p := influxdb3.NewPoint("stat",
map[string]string{"location": "Madrid"},
map[string]any{
"temperature": 15 + r.Float64()*20,
"humidity": 30 + r.Int63n(40),
},
t)
// Add the point to the batcher
b.Add(p)
// Update time
t = t.Add(time.Second)
}
// Write the final batch to the client
err = client.WritePoints(context.Background(), b.Emit())
if err != nil {
log.Fatal(err)
}
}
Index ¶
Examples ¶
Constants ¶
const DefaultBatchSize = 1000
DefaultBatchSize is the default number of points emitted
const DefaultCapacity = 2 * DefaultBatchSize
DefaultCapacity is the default initial capacity of the point buffer
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Batcher ¶
Batcher collects points and emits them as batches
func NewBatcher ¶
NewBatcher creates and initializes a new Batcher instance applying the specified options. By default, a batch-size is DefaultBatchSize and the initial capacity is DefaultCapacity.
type Option ¶
type Option func(*Batcher)
Option to adapt properties of a batcher
func WithCapacity ¶
WithCapacity changes the initial capacity of the points buffer
func WithEmitCallback ¶
WithEmitCallback sets the function called when a new batch is ready with the batch of points. The batcher will wait for the callback to finish, so please return as fast as possible and move long-running processing to a go-routine.
func WithReadyCallback ¶
func WithReadyCallback(f func()) Option
WithReadyCallback sets the function called when a new batch is ready. The batcher will wait for the callback to finish, so please return as fast as possible and move long-running processing to a go-routine.