Documentation
¶
Overview ¶
Package batching provides a batcher to collect points and emit them as batches.
Example (Batcher) ¶
package main
import (
"context"
"fmt"
"log"
"math/rand"
"time"
"github.com/InfluxCommunity/influxdb3-go/influxdb3"
"github.com/InfluxCommunity/influxdb3-go/influxdb3/batching"
)
func main() {
// Create a random number generator
r := rand.New(rand.NewSource(456))
// Instantiate a client using your credentials.
client, err := influxdb3.NewFromEnv()
if err != nil {
log.Fatal(err)
}
// Close the client when finished and raise any errors.
defer client.Close()
// Synchronous use
// Create a Batcher with a size of 5
b := batching.NewBatcher(batching.WithSize(5))
// Simulate delay of a second
t := time.Now().Add(-54 * time.Second)
// Write 54 points synchronously to the batcher
for range 54 {
p := influxdb3.NewPoint("stat",
map[string]string{"location": "Paris"},
map[string]any{
"temperature": 15 + r.Float64()*20,
"humidity": 30 + r.Int63n(40),
},
t)
// Add the point to the batcher
b.Add(p)
// Update time
t = t.Add(time.Second)
// If the batcher is ready, write the batch to the client and reset the batcher
if b.Ready() {
err := client.WritePoints(context.Background(), b.Emit())
if err != nil {
log.Fatal(err)
}
}
}
// Write the final batch to the client
err = client.WritePoints(context.Background(), b.Emit())
if err != nil {
panic(err)
}
// Asynchronous use
// Create a batcher with a size of 5, a ready callback and an emit callback to write the batch to the client
b = batching.NewBatcher(
batching.WithSize(5),
batching.WithReadyCallback(func() { fmt.Println("ready") }),
batching.WithEmitCallback(func(points []*influxdb3.Point) {
err = client.WritePoints(context.Background(), points)
if err != nil {
log.Fatal(err)
}
}),
)
// Simulate delay of a second
t = time.Now().Add(-54 * time.Second)
// Write 54 points synchronously to the batcher
for range 54 {
p := influxdb3.NewPoint("stat",
map[string]string{"location": "Madrid"},
map[string]any{
"temperature": 15 + r.Float64()*20,
"humidity": 30 + r.Int63n(40),
},
t)
// Add the point to the batcher
b.Add(p)
// Update time
t = t.Add(time.Second)
}
// Write the final batch to the client
err = client.WritePoints(context.Background(), b.Emit())
if err != nil {
log.Fatal(err)
}
}
Example (LineProtocol_batcher) ¶
package main
import (
"context"
"fmt"
"log"
"math/rand"
"time"
"github.com/InfluxCommunity/influxdb3-go/influxdb3"
"github.com/InfluxCommunity/influxdb3-go/influxdb3/batching"
)
func main() {
// Create a random number generator
rnd := rand.New(rand.NewSource(time.Now().UnixNano()))
// initialize data
dataTemplate := "cpu,host=%s load=%.3f,reg=%d %d"
syncHosts := []string{"r2d2", "c3po", "robbie"}
const recordCount = 200
var wErr error
// Instantiate a client using your credentials.
client, err := influxdb3.NewFromEnv()
if err != nil {
log.Fatal(err)
}
defer func(client *influxdb3.Client) {
err = client.Close()
if err != nil {
log.Fatal(err)
}
}(client)
// SYNCHRONOUS USAGE
// create a new Line Protocol Batcher with a batch size of 4096 bytes
slpb := batching.NewLPBatcher(batching.WithBufferSize(4096)) // Set buffer size
// Simulate delay of a second
t := time.Now().Add(-recordCount * time.Second)
// create and emit records
for range recordCount {
slpb.Add(fmt.Sprintf(dataTemplate,
syncHosts[rnd.Intn(len(syncHosts))],
rnd.Float64()*150,
rnd.Intn(32),
t))
t = t.Add(time.Second)
if slpb.Ready() {
wErr = client.Write(context.Background(), slpb.Emit())
if wErr != nil {
log.Fatal(wErr)
}
}
}
// write any remaining records in batcher to client
wErr = client.Write(context.Background(), slpb.Emit())
if wErr != nil {
log.Fatal(wErr)
}
// ASYNCHRONOUS USAGE
asyncHosts := []string{"Z80", "C64", "i8088"}
// create a new Line Protocol Batcher with a batch size of 4096 bytes
// ... a callback to handle when ready state reached and
// ... a callback to handle emits of bytes
alpb := batching.NewLPBatcher(batching.WithBufferSize(4096),
batching.WithByteEmitReadyCallback(func() { fmt.Println("ready") }),
batching.WithEmitBytesCallback(func(bytes []byte) {
wErr := client.Write(context.Background(), bytes)
if wErr != nil {
log.Fatal(wErr)
}
}))
// Simulate delay of a second
t = time.Now().Add(-recordCount * time.Second)
// create and add data to the batcher
for range recordCount {
alpb.Add(fmt.Sprintf(dataTemplate,
asyncHosts[rnd.Intn(len(asyncHosts))],
rnd.Float64()*150,
rnd.Intn(32),
t))
// update time
t = t.Add(time.Second)
}
// write any remaining records in batcher to client
wErr = client.Write(context.Background(), alpb.Emit())
if wErr != nil {
log.Fatal(wErr)
}
}
Index ¶
- Constants
- type Batcher
- func (b *Batcher) Add(p ...*influxdb3.Point)
- func (b *Batcher) CurrentLoadSize() int
- func (b *Batcher) Emit() []*influxdb3.Point
- func (b *Batcher) Flush() []*influxdb3.Point
- func (b *Batcher) Ready() bool
- func (b *Batcher) SetCapacity(c int)
- func (b *Batcher) SetEmitCallback(f func([]*influxdb3.Point))
- func (b *Batcher) SetReadyCallback(f func())
- func (b *Batcher) SetSize(s int)
- type ByteEmittable
- type Emittable
- type LPBatcher
- func (lpb *LPBatcher) Add(lines ...string)
- func (lpb *LPBatcher) CurrentLoadSize() int
- func (lpb *LPBatcher) Emit() []byte
- func (lpb *LPBatcher) Flush() []byte
- func (lpb *LPBatcher) Ready() bool
- func (lpb *LPBatcher) SetCapacity(c int)
- func (lpb *LPBatcher) SetEmitBytesCallback(f func([]byte))
- func (lpb *LPBatcher) SetReadyCallback(f func())
- func (lpb *LPBatcher) SetSize(s int)
- type LPOption
- type Option
- type PointEmittable
Examples ¶
Constants ¶
const DefaultBatchSize = 1000
DefaultBatchSize is the default number of points emitted
const DefaultBufferCapacity = DefaultByteBatchSize * 2
const DefaultByteBatchSize = 100000
const DefaultCapacity = 2 * DefaultBatchSize
DefaultCapacity is the default initial capacity of the point buffer
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Batcher ¶
Batcher collects points and emits them as batches
func NewBatcher ¶
NewBatcher creates and initializes a new Batcher instance applying the specified options. By default, a batch-size is DefaultBatchSize and the initial capacity is DefaultCapacity.
func (*Batcher) CurrentLoadSize ¶ added in v0.14.0
func (*Batcher) Emit ¶
Emit returns a new batch of points with the provided batch size or with the remaining points. Please drain the points at the end of your processing to get the remaining points not filling up a batch.
func (*Batcher) Flush ¶ added in v0.14.0
Flush drains all points even if the internal buffer is currently larger than size. It does not call the callbackEmit method
func (*Batcher) SetCapacity ¶ added in v0.14.0
SetCapacity sets the initial Capacity of the internal []*influxdb3.Point buffer.
func (*Batcher) SetEmitCallback ¶ added in v0.14.0
SetEmitCallback sets the callbackEmit function.
func (*Batcher) SetReadyCallback ¶ added in v0.14.0
func (b *Batcher) SetReadyCallback(f func())
SetReadyCallback sets the callbackReady function.
type ByteEmittable ¶ added in v0.14.0
type ByteEmittable interface {
Emittable
SetEmitBytesCallback(ebcb func([]byte)) // callback for emitting bytes
}
ByteEmittable provides the basis for a type Emitting line protocol data as a byte array (i.e. []byte).
type Emittable ¶ added in v0.14.0
type Emittable interface {
SetSize(s int) // setsize
SetCapacity(c int) // set capacity
SetReadyCallback(rcb func()) // ready Callback
}
Emittable provides the base for any type that will collect and then emit data upon reaching a ready state.
type LPBatcher ¶ added in v0.14.0
LPBatcher collects line protocol strings storing them to a byte buffer and then emitting them as []byte.
func NewLPBatcher ¶ added in v0.14.0
NewLPBatcher creates and initializes a new LPBatcher instance applying the supplied options. By default a batch size is DefaultByteBatchSize and the initial capacity is the DefaultBufferCapacity.
func (*LPBatcher) Add ¶ added in v0.14.0
Add lines to the buffer and call appropriate callbacks when the ready state is reached.
func (*LPBatcher) CurrentLoadSize ¶ added in v0.14.0
CurrentLoadSize returns the current size of the internal buffer
func (*LPBatcher) Emit ¶ added in v0.14.0
Emit returns a new batch of bytes with upto to the provided batch size depending on when the last newline character in the potential batch is met, or with all the remaining bytes. Please drain the bytes at the end of your processing to get the remaining bytes not filling up a batch.
func (*LPBatcher) Flush ¶ added in v0.14.0
Flush drains all bytes even if buffer currently larger than size
func (*LPBatcher) SetCapacity ¶ added in v0.14.0
SetCapacity sets the initial capacity of the internal buffer
func (*LPBatcher) SetEmitBytesCallback ¶ added in v0.14.0
SetEmitBytesCallback sets the callbackByteEmit function
func (*LPBatcher) SetReadyCallback ¶ added in v0.14.0
func (lpb *LPBatcher) SetReadyCallback(f func())
SetReadyCallback sets the ReadyCallback function
type LPOption ¶ added in v0.14.0
type LPOption func(ByteEmittable)
func WithBufferCapacity ¶ added in v0.14.0
WithBufferCapacity changes the initial capacity of the internal buffer The unit is byte
func WithBufferSize ¶ added in v0.14.0
WithBufferSize changes the batch-size emitted by the LPbatcher The unit is byte
func WithByteEmitReadyCallback ¶ added in v0.14.0
func WithByteEmitReadyCallback(f func()) LPOption
WithByteEmitReadyCallback sets the function called when a new batch is ready. The batcher will wait for the callback to finish, so please return as fast as possible and move long-running processing to a go-routine.
func WithEmitBytesCallback ¶ added in v0.14.0
WithEmitBytesCallback sets the function called when a new batch is ready with the batch bytes. The batcher will wait for the callback to finish, so please return as quickly as possible and move any long-running processing to a go routine.
type Option ¶
type Option func(PointEmittable)
func WithCapacity ¶
WithCapacity changes the initial capacity of the internal buffer
func WithEmitCallback ¶
WithEmitCallback sets the function called when a new batch is ready with the batch of points. The batcher will wait for the callback to finish, so please return as fast as possible and move long-running processing to a go-routine.
func WithReadyCallback ¶
func WithReadyCallback(f func()) Option
WithReadyCallback sets the function called when a new batch is ready. The batcher will wait for the callback to finish, so please return as fast as possible and move long-running processing to a go-routine.
type PointEmittable ¶ added in v0.14.0
type PointEmittable interface {
Emittable
SetEmitCallback(epcb func([]*influxdb3.Point)) // callback for emitting points
}
PointEmittable provides the basis for any type emitting Point arrays as []*influxdb3.Point