kafka

package
v0.65.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 1, 2025 License: BSD-3-Clause Imports: 12 Imported by: 0

README

Kafka output

It sends the event batches to kafka brokers using franz-go lib.

Config params

brokers []string required

List of kafka brokers to write to.


default_topic string required

The default topic name if nothing will be found in the event field or should_use_topic_field isn't set.


client_id string default=file-d

Kafka client ID.


use_topic_field bool default=false

If set, the plugin will use topic name from the event field.


topic_field string default=topic

Which event field to use as topic name. It works only if should_use_topic_field is set.


workers_count cfg.Expression default=gomaxprocs*4

How many workers will be instantiated to send batches.


batch_size cfg.Expression default=capacity/4

A maximum quantity of the events to pack into one batch.


batch_size_bytes cfg.Expression default=0

A minimum size of events in a batch to send. If both batch_size and batch_size_bytes are set, they will work together.


batch_flush_timeout cfg.Duration default=200ms

After this timeout the batch will be sent even if batch isn't full.


max_message_bytes cfg.Expression default=1000000

The maximum permitted size of a message. Should be set equal to or smaller than the broker's message.max.bytes.


compression string default=none options=none|gzip|snappy|lz4|zstd

Compression codec


ack string default=leader options=no|leader|all-isr

Required acks for produced records


retry int default=10

Retries of insertion. If File.d cannot insert for this number of attempts, File.d will fall with non-zero exit code or skip message (see fatal_on_failed_insert).

There are situations when one of the brokers is disconnected and the client does not have time to update the metadata before all the remaining retries are finished. To avoid this situation, the client.ForceMetadataRefresh() function is used for some ProduceSync errors:

  • kerr.LeaderNotAvailable - There is no leader for this topic-partition as we are in the middle of a leadership election.
  • kerr.NotLeaderForPartition - This server is not the leader for that topic-partition.

fatal_on_failed_insert bool default=false

After an insert error, fall with a non-zero exit code or not Experimental feature


retention cfg.Duration default=50ms

Retention milliseconds for retry.


retention_exponentially_multiplier int default=2

Multiplier for exponential increase of retention between retries


is_sasl_enabled bool default=false

If set, the plugin will use SASL authentications mechanism.


sasl_mechanism string default=SCRAM-SHA-512 options=PLAIN|SCRAM-SHA-256|SCRAM-SHA-512

SASL mechanism to use.


sasl_username string default=user

SASL username.


sasl_password string default=password

SASL password.


is_ssl_enabled bool default=false

If set, the plugin will use SSL/TLS connections method.


ssl_skip_verify bool default=false

If set, the plugin will skip SSL/TLS verification.


client_cert string

Path or content of a PEM-encoded client certificate file.


client_key string

Path or content of a PEM-encoded client key file.


ca_cert string

Path or content of a PEM-encoded CA file.



Generated using insane-doc

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func Factory

func Factory() (pipeline.AnyPlugin, pipeline.AnyConfig)

func NewClient added in v0.29.0

func NewClient(c *Config, l *zap.Logger) *kgo.Client

Types

type Config

type Config struct {
	// > @3@4@5@6
	// >
	// > List of kafka brokers to write to.
	Brokers []string `json:"brokers" required:"true"` // *

	// > @3@4@5@6
	// >
	// > The default topic name if nothing will be found in the event field or `should_use_topic_field` isn't set.
	DefaultTopic string `json:"default_topic" required:"true"` // *

	// > @3@4@5@6
	// >
	// > Kafka client ID.
	ClientID string `json:"client_id" default:"file-d"` // *

	// > @3@4@5@6
	// >
	// > If set, the plugin will use topic name from the event field.
	UseTopicField bool `json:"use_topic_field" default:"false"` // *

	// > @3@4@5@6
	// >
	// > Which event field to use as topic name. It works only if `should_use_topic_field` is set.
	TopicField string `json:"topic_field" default:"topic"` // *

	// > @3@4@5@6
	// >
	// > How many workers will be instantiated to send batches.
	WorkersCount  cfg.Expression `json:"workers_count" default:"gomaxprocs*4" parse:"expression"` // *
	WorkersCount_ int

	// > @3@4@5@6
	// >
	// > A maximum quantity of the events to pack into one batch.
	BatchSize  cfg.Expression `json:"batch_size" default:"capacity/4" parse:"expression"` // *
	BatchSize_ int

	// > @3@4@5@6
	// >
	// > A minimum size of events in a batch to send.
	// > If both batch_size and batch_size_bytes are set, they will work together.
	BatchSizeBytes  cfg.Expression `json:"batch_size_bytes" default:"0" parse:"expression"` // *
	BatchSizeBytes_ int

	// > @3@4@5@6
	// >
	// > After this timeout the batch will be sent even if batch isn't full.
	BatchFlushTimeout  cfg.Duration `json:"batch_flush_timeout" default:"200ms" parse:"duration"` // *
	BatchFlushTimeout_ time.Duration

	// > @3@4@5@6
	// >
	// > The maximum permitted size of a message.
	// > Should be set equal to or smaller than the broker's `message.max.bytes`.
	MaxMessageBytes  cfg.Expression `json:"max_message_bytes" default:"1000000" parse:"expression"` // *
	MaxMessageBytes_ int

	// > @3@4@5@6
	// >
	// > Compression codec
	Compression string `json:"compression" default:"none" options:"none|gzip|snappy|lz4|zstd"` // *

	// > @3@4@5@6
	// >
	// > Required acks for produced records
	Ack string `json:"ack" default:"leader" options:"no|leader|all-isr"` // *

	// > @3@4@5@6
	// >
	// > Retries of insertion. If File.d cannot insert for this number of attempts,
	// > File.d will fall with non-zero exit code or skip message (see fatal_on_failed_insert).
	// >
	// > There are situations when one of the brokers is disconnected and the client does not have time to
	// > update the metadata before all the remaining retries are finished. To avoid this situation,
	// > the client.ForceMetadataRefresh() function is used for some ProduceSync errors:
	// > - kerr.LeaderNotAvailable - There is no leader for this topic-partition as we are in the middle of a leadership election.
	// > - kerr.NotLeaderForPartition - This server is not the leader for that topic-partition.
	Retry int `json:"retry" default:"10"` // *

	// > @3@4@5@6
	// >
	// > After an insert error, fall with a non-zero exit code or not
	// > **Experimental feature**
	FatalOnFailedInsert bool `json:"fatal_on_failed_insert" default:"false"` // *

	// > @3@4@5@6
	// >
	// > Retention milliseconds for retry.
	Retention  cfg.Duration `json:"retention" default:"50ms" parse:"duration"` // *
	Retention_ time.Duration

	// > @3@4@5@6
	// >
	// > Multiplier for exponential increase of retention between retries
	RetentionExponentMultiplier int `json:"retention_exponentially_multiplier" default:"2"` // *

	// > @3@4@5@6
	// >
	// > If set, the plugin will use SASL authentications mechanism.
	SaslEnabled bool `json:"is_sasl_enabled" default:"false"` // *

	// > @3@4@5@6
	// >
	// > SASL mechanism to use.
	SaslMechanism string `json:"sasl_mechanism" default:"SCRAM-SHA-512" options:"PLAIN|SCRAM-SHA-256|SCRAM-SHA-512"` // *

	// > @3@4@5@6
	// >
	// > SASL username.
	SaslUsername string `json:"sasl_username" default:"user"` // *

	// > @3@4@5@6
	// >
	// > SASL password.
	SaslPassword string `json:"sasl_password" default:"password"` // *

	// > @3@4@5@6
	// >
	// > If set, the plugin will use SSL/TLS connections method.
	SslEnabled bool `json:"is_ssl_enabled" default:"false"` // *

	// > @3@4@5@6
	// >
	// > If set, the plugin will skip SSL/TLS verification.
	SslSkipVerify bool `json:"ssl_skip_verify" default:"false"` // *

	// > @3@4@5@6
	// >
	// > Path or content of a PEM-encoded client certificate file.
	ClientCert string `json:"client_cert"` // *

	// > @3@4@5@6
	// >
	// > > Path or content of a PEM-encoded client key file.
	ClientKey string `json:"client_key"` // *

	// > @3@4@5@6
	// >
	// > Path or content of a PEM-encoded CA file.
	CACert string `json:"ca_cert"` // *
}

! config-params ^ config-params

func (*Config) GetBrokers added in v0.29.0

func (c *Config) GetBrokers() []string

func (*Config) GetClientID added in v0.29.0

func (c *Config) GetClientID() string

func (*Config) GetSaslConfig added in v0.29.0

func (c *Config) GetSaslConfig() cfg.KafkaClientSaslConfig

func (*Config) GetSslConfig added in v0.29.0

func (c *Config) GetSslConfig() cfg.KafkaClientSslConfig

func (*Config) IsSaslEnabled added in v0.29.0

func (c *Config) IsSaslEnabled() bool

func (*Config) IsSslEnabled added in v0.29.0

func (c *Config) IsSslEnabled() bool

type KafkaClient added in v0.29.0

type KafkaClient interface {
	ProduceSync(ctx context.Context, rs ...*kgo.Record) kgo.ProduceResults
	Close()
	ForceMetadataRefresh()
}

type Plugin

type Plugin struct {
	// contains filtered or unexported fields
}

func (*Plugin) Out

func (p *Plugin) Out(event *pipeline.Event)

func (*Plugin) Start

func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.OutputPluginParams)

func (*Plugin) Stop

func (p *Plugin) Stop()

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL