kafka

package
v0.13.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 19, 2026 License: Apache-2.0 Imports: 14 Imported by: 0

README

Kafka

Publish metadata as protobuf-serialized messages to an Apache Kafka topic.

Usage

sinks:
  - name: kafka
    config:
      brokers: "localhost:9092"
      topic: metadata-topic
      key_path: ".Urn"

Configuration

Key Type Example Description
brokers string localhost:9092 Comma-separated list of Kafka broker addresses required
topic string metadata-topic Kafka topic to publish messages to required
key_path string .Urn Path to the Entity field used as the Kafka message key. Only top-level fields are supported (e.g. .Urn, .Type, .Name). optional

Behavior

Each Record's Entity is serialized as a Protocol Buffers message and published to the configured Kafka topic. Edges are not included in the message (there is currently no proto wrapper for a full Record).

If key_path is set, the value of that Entity field is used as the Kafka message key, which controls partition assignment. If omitted, messages are published without a key and distributed across partitions by the producer.

Contributing

Refer to the contribution guidelines for information on contributing to this module.

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func New

func New(logger log.Logger) plugins.Syncer

Types

type Config

type Config struct {
	Brokers string `mapstructure:"brokers" validate:"required"`
	Topic   string `mapstructure:"topic" validate:"required"`
	KeyPath string `mapstructure:"key_path"`
}

type ProtoReflector

type ProtoReflector interface {
	ProtoReflect() protoreflect.Message
}

type Sink

type Sink struct {
	plugins.BasePlugin
	// contains filtered or unexported fields
}

func (*Sink) Close

func (s *Sink) Close() (err error)

func (*Sink) Init

func (s *Sink) Init(ctx context.Context, config plugins.Config) (err error)

func (*Sink) Sink

func (s *Sink) Sink(ctx context.Context, batch []models.Record) (err error)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL