batching

package
v0.14.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 11, 2024 License: MIT Imports: 5 Imported by: 0

Documentation

Overview

Package batching provides a batcher to collect points and emit them as batches.

Example (Batcher)
package main

import (
	"context"
	"fmt"
	"log"
	"math/rand"
	"time"

	"github.com/InfluxCommunity/influxdb3-go/influxdb3"
	"github.com/InfluxCommunity/influxdb3-go/influxdb3/batching"
)

func main() {
	// Create a random number generator
	r := rand.New(rand.NewSource(456))

	// Instantiate a client using your credentials.
	client, err := influxdb3.NewFromEnv()
	if err != nil {
		log.Fatal(err)
	}

	// Close the client when finished and raise any errors.
	defer client.Close()

	// Synchronous use

	// Create a Batcher with a size of 5
	b := batching.NewBatcher(batching.WithSize(5))

	// Simulate delay of a second
	t := time.Now().Add(-54 * time.Second)

	// Write 54 points synchronously to the batcher
	for range 54 {
		p := influxdb3.NewPoint("stat",
			map[string]string{"location": "Paris"},
			map[string]any{
				"temperature": 15 + r.Float64()*20,
				"humidity":    30 + r.Int63n(40),
			},
			t)

		// Add the point to the batcher
		b.Add(p)
		// Update time
		t = t.Add(time.Second)

		// If the batcher is ready, write the batch to the client and reset the batcher
		if b.Ready() {
			err := client.WritePoints(context.Background(), b.Emit())
			if err != nil {
				log.Fatal(err)
			}
		}
	}

	// Write the final batch to the client
	err = client.WritePoints(context.Background(), b.Emit())
	if err != nil {
		panic(err)
	}

	// Asynchronous use

	// Create a batcher with a size of 5, a ready callback and an emit callback to write the batch to the client
	b = batching.NewBatcher(
		batching.WithSize(5),
		batching.WithReadyCallback(func() { fmt.Println("ready") }),
		batching.WithEmitCallback(func(points []*influxdb3.Point) {
			err = client.WritePoints(context.Background(), points)
			if err != nil {
				log.Fatal(err)
			}
		}),
	)

	// Simulate delay of a second
	t = time.Now().Add(-54 * time.Second)

	// Write 54 points synchronously to the batcher
	for range 54 {
		p := influxdb3.NewPoint("stat",
			map[string]string{"location": "Madrid"},
			map[string]any{
				"temperature": 15 + r.Float64()*20,
				"humidity":    30 + r.Int63n(40),
			},
			t)

		// Add the point to the batcher
		b.Add(p)
		// Update time
		t = t.Add(time.Second)
	}

	// Write the final batch to the client
	err = client.WritePoints(context.Background(), b.Emit())
	if err != nil {
		log.Fatal(err)
	}
}
Example (LineProtocol_batcher)
package main

import (
	"context"
	"fmt"
	"log"
	"math/rand"
	"time"

	"github.com/InfluxCommunity/influxdb3-go/influxdb3"
	"github.com/InfluxCommunity/influxdb3-go/influxdb3/batching"
)

func main() {
	// Create a random number generator
	rnd := rand.New(rand.NewSource(time.Now().UnixNano()))

	// initialize data
	dataTemplate := "cpu,host=%s load=%.3f,reg=%d %d"
	syncHosts := []string{"r2d2", "c3po", "robbie"}
	const recordCount = 200

	var wErr error

	// Instantiate a client using your credentials.
	client, err := influxdb3.NewFromEnv()
	if err != nil {
		log.Fatal(err)
	}
	defer func(client *influxdb3.Client) {
		err = client.Close()
		if err != nil {
			log.Fatal(err)
		}
	}(client)

	// SYNCHRONOUS USAGE
	// create a new Line Protocol Batcher with a batch size of 4096 bytes
	slpb := batching.NewLPBatcher(batching.WithBufferSize(4096)) // Set buffer size

	// Simulate delay of a second
	t := time.Now().Add(-recordCount * time.Second)

	// create and emit records
	for range recordCount {
		slpb.Add(fmt.Sprintf(dataTemplate,
			syncHosts[rnd.Intn(len(syncHosts))],
			rnd.Float64()*150,
			rnd.Intn(32),
			t))

		t = t.Add(time.Second)

		if slpb.Ready() {
			wErr = client.Write(context.Background(), slpb.Emit())
			if wErr != nil {
				log.Fatal(wErr)
			}
		}
	}

	// write any remaining records in batcher to client
	wErr = client.Write(context.Background(), slpb.Emit())
	if wErr != nil {
		log.Fatal(wErr)
	}

	// ASYNCHRONOUS USAGE
	asyncHosts := []string{"Z80", "C64", "i8088"}
	// create a new Line Protocol Batcher with a batch size of 4096 bytes
	// ... a callback to handle when ready state reached and
	// ... a callback to handle emits of bytes
	alpb := batching.NewLPBatcher(batching.WithBufferSize(4096),
		batching.WithByteEmitReadyCallback(func() { fmt.Println("ready") }),
		batching.WithEmitBytesCallback(func(bytes []byte) {
			wErr := client.Write(context.Background(), bytes)
			if wErr != nil {
				log.Fatal(wErr)
			}
		}))

	// Simulate delay of a second
	t = time.Now().Add(-recordCount * time.Second)

	// create and add data to the batcher
	for range recordCount {
		alpb.Add(fmt.Sprintf(dataTemplate,
			asyncHosts[rnd.Intn(len(asyncHosts))],
			rnd.Float64()*150,
			rnd.Intn(32),
			t))

		// update time
		t = t.Add(time.Second)
	}

	// write any remaining records in batcher to client
	wErr = client.Write(context.Background(), alpb.Emit())
	if wErr != nil {
		log.Fatal(wErr)
	}
}

Index

Examples

Constants

View Source
const DefaultBatchSize = 1000

DefaultBatchSize is the default number of points emitted

View Source
const DefaultBufferCapacity = DefaultByteBatchSize * 2
View Source
const DefaultByteBatchSize = 100000
View Source
const DefaultCapacity = 2 * DefaultBatchSize

DefaultCapacity is the default initial capacity of the point buffer

Variables

This section is empty.

Functions

This section is empty.

Types

type Batcher

type Batcher struct {
	sync.Mutex
	// contains filtered or unexported fields
}

Batcher collects points and emits them as batches

func NewBatcher

func NewBatcher(options ...Option) *Batcher

NewBatcher creates and initializes a new Batcher instance applying the specified options. By default, a batch-size is DefaultBatchSize and the initial capacity is DefaultCapacity.

func (*Batcher) Add

func (b *Batcher) Add(p ...*influxdb3.Point)

Add metric(s) to the batcher and call the given callbacks if any

func (*Batcher) CurrentLoadSize added in v0.14.0

func (b *Batcher) CurrentLoadSize() int

func (*Batcher) Emit

func (b *Batcher) Emit() []*influxdb3.Point

Emit returns a new batch of points with the provided batch size or with the remaining points. Please drain the points at the end of your processing to get the remaining points not filling up a batch.

func (*Batcher) Flush added in v0.14.0

func (b *Batcher) Flush() []*influxdb3.Point

Flush drains all points even if the internal buffer is currently larger than size. It does not call the callbackEmit method

func (*Batcher) Ready

func (b *Batcher) Ready() bool

Ready tells the call if a new batch is ready to be emitted

func (*Batcher) SetCapacity added in v0.14.0

func (b *Batcher) SetCapacity(c int)

SetCapacity sets the initial Capacity of the internal []*influxdb3.Point buffer.

func (*Batcher) SetEmitCallback added in v0.14.0

func (b *Batcher) SetEmitCallback(f func([]*influxdb3.Point))

SetEmitCallback sets the callbackEmit function.

func (*Batcher) SetReadyCallback added in v0.14.0

func (b *Batcher) SetReadyCallback(f func())

SetReadyCallback sets the callbackReady function.

func (*Batcher) SetSize added in v0.14.0

func (b *Batcher) SetSize(s int)

SetSize sets the batch size. Units are Points.

type ByteEmittable added in v0.14.0

type ByteEmittable interface {
	Emittable
	SetEmitBytesCallback(ebcb func([]byte)) // callback for emitting bytes
}

ByteEmittable provides the basis for a type Emitting line protocol data as a byte array (i.e. []byte).

type Emittable added in v0.14.0

type Emittable interface {
	SetSize(s int)               // setsize
	SetCapacity(c int)           // set capacity
	SetReadyCallback(rcb func()) // ready Callback
}

Emittable provides the base for any type that will collect and then emit data upon reaching a ready state.

type LPBatcher added in v0.14.0

type LPBatcher struct {
	sync.Mutex
	// contains filtered or unexported fields
}

LPBatcher collects line protocol strings storing them to a byte buffer and then emitting them as []byte.

func NewLPBatcher added in v0.14.0

func NewLPBatcher(options ...LPOption) *LPBatcher

NewLPBatcher creates and initializes a new LPBatcher instance applying the supplied options. By default a batch size is DefaultByteBatchSize and the initial capacity is the DefaultBufferCapacity.

func (*LPBatcher) Add added in v0.14.0

func (lpb *LPBatcher) Add(lines ...string)

Add lines to the buffer and call appropriate callbacks when the ready state is reached.

func (*LPBatcher) CurrentLoadSize added in v0.14.0

func (lpb *LPBatcher) CurrentLoadSize() int

CurrentLoadSize returns the current size of the internal buffer

func (*LPBatcher) Emit added in v0.14.0

func (lpb *LPBatcher) Emit() []byte

Emit returns a new batch of bytes with upto to the provided batch size depending on when the last newline character in the potential batch is met, or with all the remaining bytes. Please drain the bytes at the end of your processing to get the remaining bytes not filling up a batch.

func (*LPBatcher) Flush added in v0.14.0

func (lpb *LPBatcher) Flush() []byte

Flush drains all bytes even if buffer currently larger than size

func (*LPBatcher) Ready added in v0.14.0

func (lpb *LPBatcher) Ready() bool

Ready reports when the ready state is reached.

func (*LPBatcher) SetCapacity added in v0.14.0

func (lpb *LPBatcher) SetCapacity(c int)

SetCapacity sets the initial capacity of the internal buffer

func (*LPBatcher) SetEmitBytesCallback added in v0.14.0

func (lpb *LPBatcher) SetEmitBytesCallback(f func([]byte))

SetEmitBytesCallback sets the callbackByteEmit function

func (*LPBatcher) SetReadyCallback added in v0.14.0

func (lpb *LPBatcher) SetReadyCallback(f func())

SetReadyCallback sets the ReadyCallback function

func (*LPBatcher) SetSize added in v0.14.0

func (lpb *LPBatcher) SetSize(s int)

SetSize sets the batch size of the batcher

type LPOption added in v0.14.0

type LPOption func(ByteEmittable)

func WithBufferCapacity added in v0.14.0

func WithBufferCapacity(capacity int) LPOption

WithBufferCapacity changes the initial capacity of the internal buffer The unit is byte

func WithBufferSize added in v0.14.0

func WithBufferSize(size int) LPOption

WithBufferSize changes the batch-size emitted by the LPbatcher The unit is byte

func WithByteEmitReadyCallback added in v0.14.0

func WithByteEmitReadyCallback(f func()) LPOption

WithByteEmitReadyCallback sets the function called when a new batch is ready. The batcher will wait for the callback to finish, so please return as fast as possible and move long-running processing to a go-routine.

func WithEmitBytesCallback added in v0.14.0

func WithEmitBytesCallback(f func([]byte)) LPOption

WithEmitBytesCallback sets the function called when a new batch is ready with the batch bytes. The batcher will wait for the callback to finish, so please return as quickly as possible and move any long-running processing to a go routine.

type Option

type Option func(PointEmittable)

func WithCapacity

func WithCapacity(capacity int) Option

WithCapacity changes the initial capacity of the internal buffer

func WithEmitCallback

func WithEmitCallback(f func([]*influxdb3.Point)) Option

WithEmitCallback sets the function called when a new batch is ready with the batch of points. The batcher will wait for the callback to finish, so please return as fast as possible and move long-running processing to a go-routine.

func WithReadyCallback

func WithReadyCallback(f func()) Option

WithReadyCallback sets the function called when a new batch is ready. The batcher will wait for the callback to finish, so please return as fast as possible and move long-running processing to a go-routine.

func WithSize

func WithSize(size int) Option

WithSize changes the batch-size emitted by the batcher

type PointEmittable added in v0.14.0

type PointEmittable interface {
	Emittable
	SetEmitCallback(epcb func([]*influxdb3.Point)) // callback for emitting points
}

PointEmittable provides the basis for any type emitting Point arrays as []*influxdb3.Point

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL