cx

package module
v3.0.6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 10, 2022 License: Apache-2.0 Imports: 12 Imported by: 0

README

build build

Clickhouse Buffer

An easy-to-use, powerful and productive package for writing data to Clickhouse columnar database

Install

  • for go-clickhouse v1 $ go get -u github.com/zikwall/clickhouse-buffer
  • for go-clickhouse v2 $ go get -u github.com/zikwall/clickhouse-buffer/v3

Why and why

In the practice of using the Clickhouse database (in real projects), you often have to resort to creating your own bicycles in the form of queues and testers that accumulate the necessary amount of data or for a certain period of time and send one large data package to the Clickhouse database.

This is due to the fact that Clickhouse is designed so that it better processes new data in batches (and this is recommended by the authors themselves).

Features

  • non-blocking - (recommend) async write client uses implicit batching. Data are asynchronously written to the underlying buffer and they are automatically sent to a server when the size of the write buffer reaches the batch size, default 5000, or the flush interval, default 1s, times out. Asynchronous write client is recommended for frequent periodic writes.
  • blocking.

Client buffer engines:

  • in-memory - use native channels and slices
  • redis - use redis server as queue and buffer
  • in-memory-sync - if you get direct access to buffer, it will help to avoid data race
  • retries - resending "broken" or for some reason not sent packets

Usage

import (
    "database/sql"

    "github.com/zikwall/clickhouse-buffer/v3/src/database/cxnative"
    "github.com/zikwall/clickhouse-buffer/v3/src/database/cxsql"
)

// if you already have a connection to Clickhouse you can just use wrappers
// with native interface
ch := cxnative.NewClickhouseWithConn(conn: driver.Conn)
// or use database/sql interface
ch := cxsql.NewClickhouseWithConn(conn: *sql.DB)
// if you don't want to create connections yourself, 
// package can do it for you, just call the connection option you need:

// with native interface
ch, conn, err := cxnative.NewClickhouse(ctx, &clickhouse.Options{
        Addr: ctx.StringSlice("clickhouse-host"),
        Auth: clickhouse.Auth{
            Database:  ctx.String("clickhouse-database"),
            Username:  ctx.String("clickhouse-username"),
            Password:  ctx.String("clickhouse-password"),
        },
        Settings: clickhouse.Settings{
            "max_execution_time": 60,
        },
        DialTimeout: 5 * time.Second,
        Compression: &clickhouse.Compression{
            Method: clickhouse.CompressionLZ4,
        },
        Debug: ctx.Bool("debug"),
})
// or with database/sql interface
ch, conn, err := cxsql.NewClickhouse(ctx, &clickhouse.Options{
        Addr: ctx.StringSlice("clickhouse-host"),
        Auth: clickhouse.Auth{
            Database:  ctx.String("clickhouse-database"),
            Username:  ctx.String("clickhouse-username"),
            Password:  ctx.String("clickhouse-password"),
        },
        Settings: clickhouse.Settings{
            "max_execution_time": 60,
        },
        DialTimeout: 5 * time.Second,
        Compression: &clickhouse.Compression{
            Method: clickhouse.CompressionLZ4,
        },
        Debug: ctx.Bool("debug"),
}, &cxsql.RuntimeOptions{})
Create main data streamer client and write data
import (
    cx "github.com/zikwall/clickhouse-buffer/v3"
    "github.com/zikwall/clickhouse-buffer/v3/src/buffer/cxmem"
    "github.com/zikwall/clickhouse-buffer/v3/src/database/cxnative"
)
// create root client
client := cx.NewClientWithOptions(ctx, ch,
    cx.DefaultOptions().SetFlushInterval(1000).SetBatchSize(5000),
)
// create buffer engine
buffer := cxmem.NewBuffer(
    client.Options().BatchSize(),
)
// or use redis
buffer := cxredis.NewBuffer(
    contetx, *redis.Client, "bucket", client.Options().BatchSize(),
)
// create new writer api: table name with columns
writeAPI := client.Writer(
	cx.NewView("clickhouse_database.clickhouse_table", []string{"id", "uuid", "insert_ts"}), 
	buffer,
)

// define your custom data structure
type MyCustomDataView struct {
	id       int
	uuid     string
	insertTS time.Time
}
// and implement cxbuffer.Vectorable interface
func (t *MyCustomDataView) Row() cx.Vector {
	return cx.Vector{t.id, t.uuid, t.insertTS.Format(time.RFC822)}
}
// async write your data
writeAPI.WriteRow(&MyCustomDataView{
    id: 1, uuid: "1", insertTS: time.Now(),
})

When using a non-blocking record, you can track errors through a special error channel

errorsCh := writeAPI.Errors()
go func() {
	for err := range errorsCh {
		log.Warning(fmt.Sprintf("clickhouse write error: %s", err.Error()))
	}
}()

Using the blocking writer interface

// create new writer api: table name with columns
writerBlocking := client.WriterBlocking(cx.View{
    Name:    "clickhouse_database.clickhouse_table",
    Columns: []string{"id", "uuid", "insert_ts"},
})
// non-asynchronous writing of data directly to Clickhouse
err := writerBlocking.WriteRow(ctx, []&MyCustomDataView{
    {
        id: 1, uuid: "1", insertTS: time.Now(),
    },
    {
        id: 2, uuid: "2", insertTS: time.Now(),
    },
    {
        id: 3, uuid: "3", insertTS: time.Now(),
    },
}...)

More

Retries:

By default, packet resending is disabled, to enable it, you need to call (*Options).SetRetryIsEnabled(true).

  • in-memory use channels (default)
  • redis
  • rabbitMQ
  • kafka

You can implement queue engine by defining the Queueable interface:

type Queueable interface {
	Queue(packet *retryPacket)
	Retries() <-chan *retryPacket
}

and set it as an engine:

cx.DefaultOptions().SetDebugMode(true).SetRetryIsEnabled(true).SetQueueEngine(CustomQueueable)
Logs:

You can implement your logger by simply implementing the Logger interface and throwing it in options:

type Logger interface {
	Log(message interface{})
	Logf(format string, v ...interface{})
}
// example with default options
cx.DefaultOptions().SetDebugMode(true).SetLogger(SomeLogger)
Tests:
  • $ go test -v ./...
  • $ golangci-lint run --config ./.golangci.yml

Integration Tests:

export CLICKHOUSE_HOST=111.11.11.11:9000
export REDIS_HOST=111.11.11.11:6379
export REDIS_PASS=password_if_needed

$ go test -v ./... -tags=integration

TODO:

  • buffer interfaces
  • more retry buffer interfaces
  • rewrite retry lib
  • create binary app for streaming data to clickhouse
    • client and server with HTTP interface
    • client and server with gRPC interface

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Batch

type Batch struct {
	// contains filtered or unexported fields
}

Batch holds information for sending rows batch

func NewBatch

func NewBatch(rows []Vector) *Batch

NewBatch creates new batch

func (*Batch) Rows

func (b *Batch) Rows() []Vector

type Buffer

type Buffer interface {
	Write(Vector)
	Read() []Vector
	Len() int
	Flush()
}

Buffer it is the interface for creating a data buffer (temporary storage). It is enough to implement this interface so that you can use your own temporary storage

type Clickhouse

type Clickhouse interface {
	Insert(context.Context, View, []Vector) (uint64, error)
	Close() error
}

type Client

type Client interface {
	// Options returns the options associated with client
	Options() *Options
	// WriteBatch method of sending data to Clickhouse is used implicitly in a non - blocking record,
	// and explicitly in a blocking record
	WriteBatch(context.Context, View, *Batch) error
	// Writer returns the asynchronous, non-blocking, Writer client.
	// Ensures using a single Writer instance for each table pair.
	Writer(View, Buffer) Writer
	// WriterBlocking returns the synchronous, blocking, WriterBlocking client.
	// Ensures using a single WriterBlocking instance for each table pair.
	WriterBlocking(View) WriterBlocking
	// RetryClient Get retry client
	RetryClient() Retryable
	// Close ensures all ongoing asynchronous write clients finish.
	Close()
}

func NewClient

func NewClient(ctx context.Context, clickhouse Clickhouse) Client

func NewClientWithOptions

func NewClientWithOptions(ctx context.Context, clickhouse Clickhouse, options *Options) Client

type Closable

type Closable interface {
	Close() error
	CloseMessage() string
}

type Countable

type Countable interface {
	Inc() uint64
	Dec() uint64
	Val() uint64
}

type Logger

type Logger interface {
	Log(message interface{})
	Logf(format string, v ...interface{})
}

func NewDefaultLogger

func NewDefaultLogger() Logger

type Options

type Options struct {
	// contains filtered or unexported fields
}

Options holds write configuration properties

func DefaultOptions

func DefaultOptions() *Options

DefaultOptions returns Options object with default values

func (*Options) BatchSize

func (o *Options) BatchSize() uint

BatchSize returns size of batch

func (*Options) FlushInterval

func (o *Options) FlushInterval() uint

FlushInterval returns flush interval in ms

func (*Options) SetBatchSize

func (o *Options) SetBatchSize(batchSize uint) *Options

SetBatchSize sets number of rows sent in single request

func (*Options) SetDebugMode

func (o *Options) SetDebugMode(isDebug bool) *Options

func (*Options) SetFlushInterval

func (o *Options) SetFlushInterval(flushIntervalMs uint) *Options

SetFlushInterval sets flush interval in ms in which is buffer flushed if it has not been already written

func (*Options) SetLogger

func (o *Options) SetLogger(logger Logger) *Options

func (*Options) SetQueueEngine

func (o *Options) SetQueueEngine(queue Queueable) *Options

func (*Options) SetRetryIsEnabled

func (o *Options) SetRetryIsEnabled(enabled bool) *Options

type Queueable

type Queueable interface {
	Queue(packet *RetryPacket)
	Retries() <-chan *RetryPacket
}

func NewImMemoryQueueEngine

func NewImMemoryQueueEngine() Queueable

type RetryPacket

type RetryPacket struct {
	// contains filtered or unexported fields
}

func NewRetryPacket

func NewRetryPacket(view View, batch *Batch) *RetryPacket

type Retryable

type Retryable interface {
	Retry(packet *RetryPacket)
	Metrics() (uint64, uint64, uint64)
}

func NewRetry

func NewRetry(ctx context.Context, engine Queueable, writer Writeable, logger Logger, isDebug bool) Retryable

type Vector

type Vector []interface{}

func (Vector) Encode

func (rw Vector) Encode() ([]byte, error)

Encode turns the Vector type into an array of bytes. This method is used for data serialization and storage in remote buffers, such as redis.Buffer

type VectorDecoded

type VectorDecoded string

func (VectorDecoded) Decode

func (rd VectorDecoded) Decode() (Vector, error)

Decode This method is required to reverse deserialize an array of bytes in a Vector type

type Vectorable

type Vectorable interface {
	Row() Vector
}

Vectorable interface is an assistant in the correct formation of the order of fields in the data before sending it to Clickhouse

type View

type View struct {
	Name    string
	Columns []string
}

func NewView

func NewView(name string, columns []string) View

type Writeable

type Writeable interface {
	Write(ctx context.Context, view View, batch *Batch) (uint64, error)
}

func NewDefaultWriter

func NewDefaultWriter(conn Clickhouse) Writeable

type Writer

type Writer interface {
	// WriteRow writes asynchronously line protocol record into bucket.
	WriteRow(vector Vectorable)
	// Errors returns a channel for reading errors which occurs during async writes.
	Errors() <-chan error
	// Close writer
	Close()
}

Writer is client interface with non-blocking methods for writing rows asynchronously in batches into an Clickhouse server. Writer can be used concurrently. When using multiple goroutines for writing, use a single WriteAPI instance in all goroutines.

func NewWriter

func NewWriter(ctx context.Context, client Client, view View, engine Buffer) Writer

NewWriter returns new non-blocking write client for writing rows to Clickhouse table

type WriterBlocking

type WriterBlocking interface {
	// WriteRow writes row(s) into bucket.
	// WriteRow writes without implicit batching. Batch is created from given number of records
	// Non-blocking alternative is available in the Writer interface
	WriteRow(ctx context.Context, row ...Vectorable) error
}

func NewWriterBlocking

func NewWriterBlocking(client Client, view View) WriterBlocking

Directories

Path Synopsis
example
cmd/redis command
cmd/redis_sql command
cmd/simple command
cmd/simple_2 command
cmd/simple_sql command
src

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL