kafka

package
v1.46.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 22, 2025 License: Apache-2.0 Imports: 20 Imported by: 0

README

Kafka Plugins

Kafka is a reliable message queue system. See #3 for steps to setup AWS MSK.

Running a data plugin

eopa --config-file kafka.yaml run -s -l debug testdata/transform.rego
# kafka.yaml: kafka plugin sample MSK configuration file
plugins:
  data: 
    kafka.messages:
      type: kafka
      urls: ["b-1-public...us-east-1.amazonaws.com:9196","b-2-public...us-east-1.amazonaws.com:9196"]
      topics: [styra-topic]
      rego_transform: "data.e2e.transform"
      tls_ca_cert: rootca.pem
      sasl_mechanism: scram-sha-512
      sasl_username: <user>
      sasl_password: <secret>
# transform.rego
package e2e

import future.keywords.contains
import future.keywords.if

transform contains {"op": "add", "path": payload.id, "value": val} if {
    input.topic == "styra-topic"

    payload := json.unmarshal(base64.decode(input.value))
    val := object.filter(payload, ["name", "roles"])
}

AWS MSK Producer script: See #2 on how to setup a terminal an ec2 instance that can talk to MSK.

./bin/kafka-console-producer.sh --bootstrap-server b-2....kafka.us-east-1.amazonaws.com:9096 --producer.config client_sasl.properties --topic styra-topic

Sample messages:

{"id": "d9eccc5c", "name": "Alice", "roles": ["developer", "reports-reader"]}
{"id": "5c0ba07e", "name": "Bob", "roles": ["reports-admin"]}
{"id": "413adc7a", "name": "Eve", "roles": ["database-reader", "database-writer"]}

Validate results:

curl -s localhost:8181/v1/data/kafka/messages
{"result":{"413adc7a":{"name":"Eve","roles":["database-reader","database-writer"]},"5c0ba07e":{"name":"Bob","roles":["reports-admin"]},"d9eccc5c":{"name":"Alice","roles":["developer","reports-reader"]}}}
  1. https://docs.styra.com/enterprise-opa/configuration/data/kafka-streams-api
  2. https://docs.aws.amazon.com/msk/latest/developerguide/getting-started.html
  3. https://docs.styra.com/das/policies/cloud-storage-management/kafka-platform#secure-kafka-platform-access

Documentation

Index

Constants

View Source
const Name = "kafka"

Variables

This section is empty.

Functions

func Factory

func Factory() plugins.Factory

Types

type Config

type Config struct {
	URLs   []string `json:"urls"`
	Topics []string `json:"topics"`
	Path   string   `json:"path"`

	From          string `json:"from,omitempty"`
	ConsumerGroup bool   `json:"consumer_group"`

	RegoTransformRule string `json:"rego_transform"`

	SkipVerification bool   `json:"tls_skip_verification,omitempty"`
	Cert             string `json:"tls_client_cert,omitempty"`
	CACert           string `json:"tls_ca_cert,omitempty"`
	PrivateKey       string `json:"tls_client_private_key,omitempty"`

	SASLMechanism string `json:"sasl_mechanism,omitempty"`
	SASLUsername  string `json:"sasl_username,omitempty"`
	SASLPassword  string `json:"sasl_password,omitempty"`
	SASLToken     bool   `json:"sasl_token,omitempty"` // optional for mechanism=scram, "Delegation Tokens" in Confluent docs
	// contains filtered or unexported fields
}

Config represents the configuration of the kafka data plugin

func (Config) Equal

func (c Config) Equal(other Config) bool

type Data

type Data struct {
	Config Config

	*transform.Rego
	// contains filtered or unexported fields
}

Data plugin

func (*Data) Name

func (c *Data) Name() string

dataPlugin accessors

func (*Data) Path

func (c *Data) Path() storage.Path

func (*Data) Reconfigure

func (c *Data) Reconfigure(ctx context.Context, next any)

func (*Data) Start

func (c *Data) Start(ctx context.Context) error

func (*Data) Stop

func (c *Data) Stop(ctx context.Context)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL