kafka

package
v0.13.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 19, 2026 License: Apache-2.0 Imports: 20 Imported by: 0

README

Apache Kafka

Extract topic and consumer group metadata from Apache Kafka.

Usage

source:
  name: kafka
  scope: my-kafka-cluster
  config:
    broker: "localhost:9092"
    # extract specifies which entity types to extract.
    # Defaults to all: ["topics", "consumer_groups"]
    extract:
      - topics
      - consumer_groups
    auth_config:
      tls:
        enabled: true
        insecure_skip_verify: false
        cert_file: "/opt/client.cer.pem"
        key_file: "/opt/client.key.pem"
        ca_file: "/opt/caCertFile.cer.pem"
      sasl:
        enabled: false
        mechanism: "OAUTHBEARER"

Configuration

Key Type Required Default Description
broker string Yes Kafka broker address.
extract []string No ["topics", "consumer_groups"] Entity types to extract.
auth_config.tls.enabled bool No false Enable TLS authentication.
auth_config.tls.insecure_skip_verify bool No false Skip server certificate verification.
auth_config.tls.cert_file string No Path to client certificate file.
auth_config.tls.key_file string No Path to client key file.
auth_config.tls.ca_file string No Path to CA certificate file.
auth_config.sasl.enabled bool No false Enable SASL authentication.
auth_config.sasl.mechanism string No SASL mechanism (e.g. OAUTHBEARER).

Entities

Topic
  • Type: topic
  • URN format: urn:kafka:{scope}:topic:{topic_name}
Properties
Property Type Description
number_of_partitions int64 Number of partitions for the topic.
replication_factor int64 Number of replicas per partition.
retention_ms string Topic retention period in milliseconds.
cleanup_policy string Topic cleanup policy (e.g. delete, compact).
min_insync_replicas string Minimum number of in-sync replicas.
Consumer Group
  • Type: consumer_group
  • URN format: urn:kafka:{scope}:consumer_group:{group_id}
Properties
Property Type Description
state string Consumer group state (e.g. Stable, Empty).
protocol string Partition assignment protocol.
protocol_type string Protocol type (e.g. consumer).
num_members int64 Number of members in the group.
members []object List of group members with member_id, client_id, and host.

Edges

Source Target Type Description
consumer_group topic consumed_by Consumer group consumes from a topic.

Contributing

Refer to the contribution guidelines for information on contributing to this module.

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type AuthConfig

type AuthConfig struct {
	TLS struct {
		Enabled            bool   `mapstructure:"enabled"`
		InsecureSkipVerify bool   `mapstructure:"insecure_skip_verify"`
		CertFile           string `mapstructure:"cert_file" validate:"required_if=Enabled true"`
		KeyFile            string `mapstructure:"key_file" validate:"required_if=Enabled true"`
		CAFile             string `mapstructure:"ca_file" validate:"required_if=Enabled true"`
	} `mapstructure:"tls"`

	SASL struct {
		Enabled   bool   `mapstructure:"enabled"`
		Mechanism string `mapstructure:"mechanism" validate:"required_if=Enabled true"`
	} `mapstructure:"sasl"`
}

type Config

type Config struct {
	Broker  string     `json:"broker" yaml:"broker" mapstructure:"broker" validate:"required"`
	Auth    AuthConfig `json:"auth_config" yaml:"auth_config" mapstructure:"auth_config"`
	Extract []string   `json:"extract" yaml:"extract" mapstructure:"extract" validate:"omitempty,dive,oneof=topics consumer_groups"`
}

Config holds the set of configuration for the kafka extractor

type Extractor

type Extractor struct {
	plugins.BaseExtractor
	// contains filtered or unexported fields
}

Extractor manages the extraction of data from a kafka broker

func New

func New(logger log.Logger) *Extractor

New returns a pointer to an initialized Extractor Object

func (*Extractor) Extract

func (e *Extractor) Extract(ctx context.Context, emit plugins.Emit) (err error)

Extract checks if the extractor is ready to extract if so, then extracts metadata from the kafka broker

func (*Extractor) Init

func (e *Extractor) Init(ctx context.Context, config plugins.Config) error

Init initializes the extractor

type KubernetesTokenProvider added in v0.11.0

type KubernetesTokenProvider struct {
	// contains filtered or unexported fields
}

func NewKubernetesTokenProvider added in v0.11.0

func NewKubernetesTokenProvider(opts ...TokenProviderOption) *KubernetesTokenProvider

NewKubernetesTokenProvider creates a new TokenProvider that reads the token from kubernetes pod service account token file. By default, the token file path for kafka is stored in `/var/run/secrets/kafka/serviceaccount/token`. User need to make sure there a valid projected service account token on that path.

func (*KubernetesTokenProvider) Token added in v0.11.0

Token returns the token from the service account token file.

type TokenProviderOption added in v0.11.0

type TokenProviderOption func(*TokenProviderOptions)

func WithTokenFilePath added in v0.11.0

func WithTokenFilePath(path string) TokenProviderOption

WithTokenFilePath sets the file path to the token.

type TokenProviderOptions added in v0.11.0

type TokenProviderOptions struct {
	// FilePath is the path to the file containing the token.
	FilePath string
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL