kafka

package
v0.12.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 28, 2026 License: Apache-2.0 Imports: 14 Imported by: 0

README

Apache Kafka

Usage

sinks:
  - name: kafka
    config:
      brokers: "localhost:9092"
      topic: metadata-topic
      key_path: ".resource.urn"

Config

Key Value Example Description
brokers string localhost:9092 Comma-separated list of Kafka broker addresses required
topic string metadata-topic Kafka topic to publish messages to required
key_path string .resource.urn JSON path to extract the Kafka message key from the asset. Only supports root level keys. optional

Behavior

  • Each asset is serialized as a JSON message and published to the configured Kafka topic.
  • If key_path is set, the value at that path in the asset is used as the Kafka message key. This is useful for partitioning messages by asset URN or type.
  • If key_path is not set, messages are published without a key and will be distributed across partitions by the Kafka producer.

Examples

Basic metadata streaming
name: bigquery-to-kafka
version: v1beta1
source:
  name: bigquery
  config:
    project_id: my-project
sinks:
  - name: kafka
    config:
      brokers: "broker1:9092,broker2:9092"
      topic: metadata-stream
      key_path: ".resource.urn"

Contributing

Refer to the contribution guidelines for information on contributing to this module.

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func New

func New(logger log.Logger) plugins.Syncer

Types

type Config

type Config struct {
	Brokers string `mapstructure:"brokers" validate:"required"`
	Topic   string `mapstructure:"topic" validate:"required"`
	KeyPath string `mapstructure:"key_path"`
}

type ProtoReflector

type ProtoReflector interface {
	ProtoReflect() protoreflect.Message
}

type Sink

type Sink struct {
	plugins.BasePlugin
	// contains filtered or unexported fields
}

func (*Sink) Close

func (s *Sink) Close() (err error)

func (*Sink) Init

func (s *Sink) Init(ctx context.Context, config plugins.Config) (err error)

func (*Sink) Sink

func (s *Sink) Sink(ctx context.Context, batch []models.Record) (err error)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL