kafkactl

command module
v5.18.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 12, 2026 License: Apache-2.0 Imports: 8 Imported by: 0

README

:toc:
:toclevels: 2

= kafkactl

A command-line interface for interaction with Apache Kafka

image:https://github.com/deviceinsight/kafkactl/actions/workflows/lint_test.yml/badge.svg[Build Status,link=https://github.com/deviceinsight/kafkactl/actions]
| image:https://img.shields.io/badge/command-docs-blue.svg[command docs,link=https://deviceinsight.github.io/kafkactl/]

== Features

* command auto-completion for bash, zsh, fish shell including dynamic completion for e.g. topics or consumer groups.
* support for avro schemas
* support for JSON schema registry
* Configuration of different contexts
* directly access kafka clusters inside your kubernetes cluster
* support for consuming and producing protobuf-encoded messages

image::https://asciinema.org/a/vmxrTA0h8CAXPnJnSFk5uHKzr.svg[asciicast,link=https://asciinema.org/a/vmxrTA0h8CAXPnJnSFk5uHKzr]

== Installation

You can install the pre-compiled binary or compile from source.

=== Install the pre-compiled binary

*homebrew*:

[,bash]
----
# install kafkactl
brew install kafkactl
# upgrade kafkactl
brew upgrade kafkactl
----

*winget*:
[,bash]
----
winget install kafkactl
----

*deb/rpm*:

Download the .deb or .rpm from the https://github.com/deviceinsight/kafkactl/releases[releases page] and install with dpkg -i and rpm -i respectively.

*yay (AUR)*

There's a kafkactl https://aur.archlinux.org/packages/kafkactl/[AUR package] available for Arch. Install it with your AUR helper of choice (e.g. https://github.com/Jguer/yay[yay]):

[,bash]
----
yay -S kafkactl
----

*manually*:

Download the pre-compiled binaries from the https://github.com/deviceinsight/kafkactl/releases[releases page] and copy to the desired location.

=== Compiling from source

[,bash]
----
go install github.com/deviceinsight/kafkactl/v5@latest
----

*NOTE:* make sure that `kafkactl` is on PATH otherwise auto-completion won't work.

== Configuration

If no config file is found, a default config is generated in `$HOME/.config/kafkactl/config.yml`.
This configuration is suitable to get started with a single node cluster on a local machine.

=== Create a config file

Create `$HOME/.config/kafkactl/config.yml` with a definition of contexts that should be available

[,yaml]
----
contexts:
  default:
    brokers:
      - localhost:9092
  remote-cluster:
    brokers:
      - remote-cluster001:9092
      - remote-cluster002:9092
      - remote-cluster003:9092

    # optional: tls config
    tls:
      enabled: true
      ca: my-ca
      cert: my-cert
      certKey: my-key
      # set insecure to true to ignore all tls verification (defaults to false)
      insecure: false

    # optional: sasl support
    sasl:
      enabled: true
      username: admin
      password: admin
      # optional configure sasl mechanism as plaintext, scram-sha256, scram-sha512, oauth (defaults to plaintext)
      mechanism: oauth
      # optional configure sasl version as v0, v1 (defaults to not configured), Refer to: https://github.com/IBM/sarama/issues/3000#issuecomment-2415829478
      version: v0
      # optional tokenProvider configuration (only used for 'sasl.mechanism=oauth')
      tokenprovider:
        # plugin to use as token provider implementation (see plugin section)
        plugin: azure
        # optional: additional options passed to the plugin
        options:
          key: value

    # optional: access clusters running kubernetes
    kubernetes:
      enabled: false
      binary: kubectl #optional
      kubeConfig: ~/.kube/config #optional
      kubeContext: my-cluster
      namespace: my-namespace
      # optional: docker image to use (the tag of the image will be suffixed by `-scratch` or `-ubuntu` depending on command)
      image: private.registry.com/deviceinsight/kafkactl
      # optional: secret for private docker registry
      imagePullSecret: registry-secret
      # optional: secret containing tls certificates (e.g. ca.crt, cert.crt, key.key)
      tlsSecret: tls-secret
      # optional: Username to impersonate for the kubectl command
      asUser: user
      # optional: serviceAccount to use for the pod
      serviceAccount: my-service-account
      # optional: keep pod after exit (can be set to true for debugging)
      keepPod: true
      # optional: labels to add to the pod
      labels:
        key: value
      # optional: annotations to add to the pod
      annotations:
        key: value
      # optional: nodeSelector to add to the pod
      nodeSelector:
        key: value
      # optional: resource limits to add to the pod
      resources:
        requests:
          memory: "64Mi"
          cpu: "250m"
        limits:
          memory: "128Mi"
          cpu: "500m"

      # optional: affinity to add to the pod
      affinity:
        # note: other types of affinity also supported
        nodeAffinity:
          requiredDuringSchedulingIgnoredDuringExecution:
            nodeSelectorTerms:
              - matchExpressions:
                  - key: "<key>"
                    operator: "<operator>"
                    values: [ "<value>" ]

      # optional: tolerations to add to the pod
      tolerations:
        - key: "<key>"
          operator: "<operator>"
          value: "<value>"
          effect: "<effect>"

    # optional: clientID config (defaults to kafkactl-{username})
    clientID: my-client-id

    # optional: kafkaVersion (defaults to 2.5.0)
    kafkaVersion: 1.1.1

    # optional: timeout for admin requests (defaults to 3s)
    requestTimeout: 10s

    # optional: avro configuration
    avro:
      # optional: configure codec for (de)serialization as standard,avro (defaults to standard)
      # see: https://github.com/deviceinsight/kafkactl/issues/123
      jsonCodec: avro

    # optional: schema registry
    schemaRegistry:
      url: localhost:8081

      # optional: timeout for requests (defaults to 5s)
      requestTimeout: 10s

      # optional: basic auth credentials
      username: admin
      password: admin

      # optional: tls config for avro
      tls:
        enabled: true
        ca: my-ca
        cert: my-cert
        certKey: my-key
        # set insecure to true to ignore all tls verification (defaults to false)
        insecure: false

    # optional: default protobuf messages search paths
    protobuf:
      importPaths:
        - "/usr/include/protobuf"
      protoFiles:
        - "someMessage.proto"
        - "otherMessage.proto"
      protosetFiles:
        - "/usr/include/protoset/other.protoset"
      # see: https://pkg.go.dev/google.golang.org/protobuf@v1.36.6/encoding/protojson#MarshalOptions
      marshalOptions:
        allowPartial: true
        useProtoNames: true
        useEnumNumbers: true
        emitUnpopulated: true
        emitDefaultValues: true

    producer:
      # optional: changes the default partitioner
      partitioner: "hash"

      # optional: changes default required acks in produce request
      # see: https://pkg.go.dev/github.com/IBM/sarama?utm_source=godoc#RequiredAcks
      requiredAcks: "WaitForAll"

      # optional: maximum permitted size of a message (defaults to 1000000)
      maxMessageBytes: 1000000

    consumer:
      # optional: isolationLevel (defaults to ReadCommitted)
      isolationLevel: ReadUncommitted
----

[#_config_file_read_order]
The config file location is resolved by

. checking for a provided commandline argument: `--config-file=$PATH_TO_CONFIG`
. evaluating the environment variable: `export KAFKA_CTL_CONFIG=$PATH_TO_CONFIG`
. checking for a project config file in the working directory (see <<_project_config_files>>)
. as default the config file is looked up from one of the following locations:
 ** `$HOME/.config/kafkactl/config.yml`
 ** `$HOME/.kafkactl/config.yml`
 ** `$APPDATA/kafkactl/config.yml`
 ** `/etc/kafkactl/config.yml`

[#_project_config_files]
==== Project config files

In addition to the config file locations above, _kafkactl_ allows to create a config file on project level.
A project config file is meant to be placed at the root level of a git repo and declares the kafka configuration
for this repository/project.

In order to identify the config file as belonging to _kafkactl_ the following names can be used:

* `kafkactl.yml`
* `.kafkactl.yml`

During initialization _kafkactl_ starts from the current working directory and recursively looks for a project level
config file. The recursive lookup ends at the boundary of a git repository (i.e. if a `.git` folder is found).
This way, _kafkactl_ can be used conveniently anywhere in the git repository.

[#_current_context]
==== Current context

The current context can be set via commandline argument `--context`, environment variable `CURRENT_CONTEXT` or
it can be defined in a file.

If no current context is defined, the first context in the config file is used as current context. Additionally,
in this case a file storing the current context is created.

The file is typically stored next to the config file and named `current-context.yml`.
The location of the file can be overridden via environment variable `KAFKA_CTL_WRITABLE_CONFIG`.

=== Auto completion

==== bash

----
source <(kafkactl completion bash)
----

To load completions for each session, execute once:
Linux:

----
kafkactl completion bash > /etc/bash_completion.d/kafkactl
----

MacOS:

----
kafkactl completion bash > /usr/local/etc/bash_completion.d/kafkactl
----

==== zsh

If shell completion is not already enabled in your environment,
you will need to enable it. You can execute the following once:

----
echo "autoload -U compinit; compinit" >> ~/.zshrc
----

To load completions for each session, execute once:

----
kafkactl completion zsh > "${fpath[1]}/_kafkactl"
----

You will need to start a new shell for this setup to take effect.

==== Fish

----
kafkactl completion fish | source
----

To load completions for each session, execute once:

----
kafkactl completion fish > ~/.config/fish/completions/kafkactl.fish
----

== Documentation

The documentation for all available commands can be found here:

image::https://img.shields.io/badge/command-docs-blue.svg[command docs,link=https://deviceinsight.github.io/kafkactl/]

== Running in docker

Assuming your Kafka brokers are accessible under `kafka1:9092` and `kafka2:9092`, you can list topics by running:

[,bash]
----
docker run --env BROKERS="kafka1:9092 kafka2:9092" deviceinsight/kafkactl:latest get topics
----

If a more elaborate config is needed, you can mount it as a volume:

[,bash]
----
docker run -v /absolute/path/to/config.yml:/etc/kafkactl/config.yml deviceinsight/kafkactl get topics
----

== Running in Kubernetes

____
:construction: This feature is still experimental.
____

If your kafka cluster is not directly accessible from your machine, but it is accessible from a kubernetes cluster
which in turn is accessible via `kubectl` from your machine you can configure kubernetes support:

[,$yaml]
----
contexts:
  kafka-cluster:
    brokers:
      - broker1:9092
      - broker2:9092
    kubernetes:
      enabled: true
      binary: kubectl #optional
      kubeContext: k8s-cluster
      namespace: k8s-namespace
----

If you are using `tlsSecret`, make sure that you set `ca`, `cert` and `certKey`. For example secret created by
Strimzi Kafka operator will have values `ca.crt`, `user.crt`, `user.key`:

[,$yaml]
----
contexts:
  kafka-cluster:
    brokers:
      - broker1:9092
      - broker2:9092
    kubernetes:
      enabled: true
      binary: kubectl #optional
      kubeContext: k8s-cluster
      namespace: k8s-namespace
      tlsSecret: tls-secret
    tls:
      enabled: true
      ca: ca.crt
      cert: user.crt
      certKey: user.key
      # set insecure to true to ignore all tls verification (defaults to false)
      insecure: false
----

Instead of directly talking to kafka brokers a kafkactl docker image is deployed as a pod into the kubernetes
cluster, and the defined namespace. Standard-Input and Standard-Output are then wired between the pod and your shell
running kafkactl.

There are two options:

. You can run `kafkactl attach` with your kubernetes cluster configured. This will use `kubectl run` to create a pod
in the configured kubeContext/namespace which runs an image of kafkactl and gives you a `bash` into the container.
Standard-in is piped to the pod and standard-out, standard-err directly to your shell. You even get auto-completion.
. You can run any other kafkactl command with your kubernetes cluster configured. Instead of directly
querying the cluster a pod is deployed, and input/output are wired between pod and your shell.

The names of the brokers have to match the service names used to access kafka in your cluster. A command like this should
give you this information:

[,bash]
----
kubectl get svc | grep kafka
----

____
:bulb: The first option takes a bit longer to start up since an Ubuntu based docker image is used in order to have
a bash available. The second option uses a docker image build from scratch and should therefore be quicker.
Which option is more suitable, will depend on your use-case.
____

== Configuration via environment variables

Every key in the `config.yml` can be overwritten via environment variables. The corresponding environment variable
for a key can be found by applying the following rules:

. replace `.` by `_`
. replace `-` by `_`
. write the key name in ALL CAPS

e.g. the key `contexts.default.tls.certKey` has the corresponding environment variable `CONTEXTS_DEFAULT_TLS_CERTKEY`.

*NOTE:* an array variable can be written using whitespace as delimiter. For example `BROKERS` can be provided as
`BROKERS="broker1:9092 broker2:9092 broker3:9092"`.

If environment variables for the `default` context should be set, the prefix `CONTEXTS_DEFAULT_` can be omitted.
So, instead of `CONTEXTS_DEFAULT_TLS_CERTKEY` one can also set `TLS_CERTKEY`.
See *root_test.go* for more examples.

== Plugins

_kafkactl_ supports plugins to cope with specifics when using Kafka-compatible clusters available from cloud providers such as Azure or AWS.

At the moment, plugins can only be used to implement a `tokenProvider` for _oauth_ authentication.
In the future, plugins might implement additional commands to query data or configuration which is not part of the Kafka-API. One example would be Eventhub consumer groups/offsets for Azure.

See the plugin documentation for additional documentation and usage examples.

Available plugins:

* https://github.com/deviceinsight/kafkactl-plugins/blob/main/aws/README.adoc[aws plugin]
* https://github.com/deviceinsight/kafkactl-plugins/blob/main/azure/README.adoc[azure plugin]

=== Generic Token Provider

For cases where a custom plugin is not needed, _kafkactl_ provides a built-in `generic` token provider that executes a script to retrieve OAuth tokens dynamically.

==== Configuration

[,yaml]
----
contexts:
  my-cluster:
    sasl:
      enabled: true
      mechanism: oauth
      tokenprovider:
        plugin: generic
        options:
          script: /path/to/get-token.sh
          args:
            - intended-scope
----

==== Script Output Format

The script must output a JSON object with the following structure:

[,json]
----
{
  "token": "your-oauth-access-token",
  "extensions": {
    "key": "value"
  }
}
----

* `token` (required): The OAuth access token string
* `extensions` (optional): A map of string key-value pairs for OAuth extensions

==== Example Script

[,bash]
----
#!/bin/bash
# Example script that fetches a token from an OAuth provider
SCOPE=${1:-kafka}

curl -s --fail -X POST "$AUTH_SERVER/oauth/token" \
  -H "Content-Type: application/x-www-form-urlencoded" \
  -d "grant_type=client_credentials" \
  -d "client_id=$CLIENT_ID" \
  -d "client_secret=$CLIENT_SECRET" \
  -d "scope=$SCOPE" | jq -r '{"token": .access_token}'
----

The script is executed each time a token is needed, allowing for automatic token refresh.

== Examples

=== Consuming messages

Consuming messages from a topic can be done with:

[,bash]
----
kafkactl consume my-topic
----

In order to consume starting from the oldest offset use:

[,bash]
----
kafkactl consume my-topic --from-beginning
----

The following example prints message `key` and `timestamp` as well as `partition` and `offset` in `yaml` format:

[,bash]
----
kafkactl consume my-topic --print-keys --print-timestamps -o yaml
----

To print partition in default output format use:

[,bash]
----
kafkactl consume my-topic --print-partitions
----

Headers of kafka messages can be printed with the parameter `--print-headers` e.g.:

[,bash]
----
kafkactl consume my-topic --print-headers -o yaml
----

If one is only interested in the last `n` messages this can be achieved by `--tail` e.g.:

[,bash]
----
kafkactl consume my-topic --tail=5
----

The consumer can be stopped when the latest offset is reached using `--exit` parameter e.g.:

[,bash]
----
kafkactl consume my-topic --from-beginning --exit
----

The consumer can compute the offset it starts from using a timestamp:

[,bash]
----
kafkactl consume my-topic --from-timestamp 1384216367189
kafkactl consume my-topic --from-timestamp 2014-04-26T17:24:37.123Z
kafkactl consume my-topic --from-timestamp 2014-04-26T17:24:37.123
kafkactl consume my-topic --from-timestamp 2009-08-12T22:15:09Z
kafkactl consume my-topic --from-timestamp 2017-07-19T03:21:51
kafkactl consume my-topic --from-timestamp 2013-04-01T22:43
kafkactl consume my-topic --from-timestamp 2014-04-26
----

The `from-timestamp` parameter supports different timestamp formats. It can either be a number representing the epoch milliseconds
or a string with a timestamp in one of the https://github.com/deviceinsight/kafkactl/blob/main/internal/util/util.go#L10[supported date formats].

*NOTE:* `--from-timestamp` is not designed to schedule the beginning of consumer's consumption. The offset corresponding to the timestamp is computed at the beginning of the process. So if you set it to a date in the future, the consumer will start from the latest offset.

The consumer can be stopped when the offset corresponding to a particular timestamp is reached:

[,bash]
----
kafkactl consume my-topic --from-timestamp 2017-07-19T03:30:00 --to-timestamp 2017-07-19T04:30:00
----

The `to-timestamp` parameter supports the same formats as `from-timestamp`.

*NOTE:* `--to-timestamp` is not designed to schedule the end of consumer's consumption. The offset corresponding to the timestamp is computed at the beginning of the process. So if you set it to a date in the future, the consumer will stop at the current latest offset.

The following example prints keys in hex and values in base64:

[,bash]
----
kafkactl consume my-topic --print-keys --key-encoding=hex --value-encoding=base64
----

The consumer can convert protobuf messages to JSON in keys (optional) and values:

[,bash]
----
kafkactl consume my-topic --value-proto-type MyTopicValue --key-proto-type MyTopicKey --proto-file kafkamsg.proto
----

To join a consumer group and consume messages as a member of the group:

[,bash]
----
kafkactl consume my-topic --group my-consumer-group
----

If you want to limit the number of messages that will be read, specify `--max-messages`:

[,bash]
----
kafkactl consume my-topic --max-messages 2
----

Messages can be filtered by key, value, or headers using glob patterns:

[,bash]
----
# filter by key pattern
kafkactl consume my-topic --filter-key "user-*" --print-keys

# filter by value pattern
kafkactl consume my-topic --filter-value "error:*"

# filter by header pattern
kafkactl consume my-topic --filter-header "trace-id=abc-*" --print-headers

# combine multiple filters (all must match)
kafkactl consume my-topic --filter-key "user-*" --filter-value "*error*" --filter-header "env=prod"
----

Glob patterns support the following wildcards:

* `*` - matches any sequence of characters
* `?` - matches a single character
* `[abc]` - matches any character in the set
* `[!abc]` - matches any character not in the set
* `{a,b,c}` - matches any of the alternatives

Examples of glob patterns:

[,bash]
----
# match keys starting with "user-"
--filter-key "user-*"

# match keys for user or admin
--filter-key "{user,admin}-*"

# match values containing "error" or "warn"
--filter-value "*error*" or --filter-value "*warn*"

# match specific header patterns
--filter-header "trace-id=abc-???"
----

=== Producing messages

Producing messages can be done in multiple ways. If we want to produce a message with `key='my-key'`,
`value='my-value'` to the topic `my-topic` this can be achieved with one of the following commands:

[,bash]
----
echo "my-key#my-value" | kafkactl produce my-topic --separator=#
echo "my-value" | kafkactl produce my-topic --key=my-key
kafkactl produce my-topic --key=my-key --value=my-value
----

If we have a file containing messages where each line contains `key` and `value` separated by `#`, the file can be
used as input to produce messages to topic `my-topic`:

[,bash]
----
cat myfile | kafkactl produce my-topic --separator=#
----

The same can be accomplished without piping the file to stdin with the `--file` parameter:

[,bash]
----
kafkactl produce my-topic --separator=# --file=myfile
----

If the messages in the input file need to be split by a different delimiter than `\n` a custom line separator can be provided:

[,bash]
----
kafkactl produce my-topic --separator=# --lineSeparator=|| --file=myfile
----

*NOTE:* if the file was generated with `kafkactl consume --print-keys --print-timestamps my-topic` the produce
command is able to detect the message timestamp in the input and will ignore it.

It is also possible to produce messages in json format:

[,bash]
----
# each line in myfile.json is expected to contain a json object with fields key, value and headers
kafkactl produce my-topic --file=myfile.json --input-format=json
cat myfile.json | kafkactl produce my-topic --input-format=json
echo '{"value": "my-value"}' | kafkactl produce my-topic --input-format=json
echo '{"key": "my-key", "value": "my-value", "headers": {"my-header": "val"}}' | kafkactl produce my-topic --input-format=json
----

the number of messages produced per second can be controlled with the `--rate` parameter:

[,bash]
----
cat myfile | kafkactl produce my-topic --separator=# --rate=200
----

It is also possible to specify the partition to insert the message:

[,bash]
----
kafkactl produce my-topic --key=my-key --value=my-value --partition=2
----

Additionally, a different partitioning scheme can be used. When a `key` is provided the default partitioner
uses the `hash` of the `key` to assign a partition. So the same `key` will end up in the same partition:

[,bash]
----
# the following 3 messages will all be inserted to the same partition
kafkactl produce my-topic --key=my-key --value=my-value
kafkactl produce my-topic --key=my-key --value=my-value
kafkactl produce my-topic --key=my-key --value=my-value

# the following 3 messages will probably be inserted to different partitions
kafkactl produce my-topic --key=my-key --value=my-value --partitioner=random
kafkactl produce my-topic --key=my-key --value=my-value --partitioner=random
kafkactl produce my-topic --key=my-key --value=my-value --partitioner=random
----

Message headers can also be written:

[,bash]
----
kafkactl produce my-topic --key=my-key --value=my-value --header key1:value1 --header key2:value\:2
----

The following example writes the key from base64 and value from hex:

[,bash]
----
kafkactl produce my-topic --key=dGVzdC1rZXk= --key-encoding=base64 --value=0000000000000000 --value-encoding=hex
----

You can control how many replica acknowledgements are needed for a response:

[,bash]
----
kafkactl produce my-topic --key=my-key --value=my-value --required-acks=WaitForAll
----

Producing null values (tombstone record) is also possible:

[,bash]
----
 kafkactl produce my-topic --null-value
----

Producing protobuf message converted from JSON:

[,bash]
----
kafkactl produce my-topic --key='{"keyField":123}' --key-proto-type MyKeyMessage --value='{"valueField":"value"}' --value-proto-type MyValueMessage --proto-file kafkamsg.proto
----

A more complex protobuf message converted from a multi-line JSON string can be produced using a file input with custom separators.

For example, if you have the following protobuf definition (`complex.proto`):

[,protobuf]
----
syntax = "proto3";

import "google/protobuf/timestamp.proto";

message ComplexMessage {
  CustomerInfo customer_info = 1;
  DeviceInfo device_info = 2;
}

message CustomerInfo {
  string customer_id = 1;
  string name = 2;
}

message DeviceInfo {
  string serial = 1;
  google.protobuf.Timestamp last_update  = 2;
}
----

And you have the following file (`complex-msg.txt`) that contains the key and value of the message:

[,text]
----
msg-key##
{
    "customer_info": {
        "customer_id": "12345",
        "name": "Bob"
    },
    "device_info": {
        "serial": "abcde",
        "last_update": "2024-03-02T07:01:02.000Z"
    }
}
+++
----

The command to produce the protobuf message using sample protobuf definition and input file would be:

[,bash]
----
kafkactl produce my-topic --value-proto-type=ComplexMessage --proto-file=complex.proto --lineSeparator='+++' --separator='##' --file=complex-msg.txt
----

=== Avro support

In order to enable avro support you just have to add the schema registry to your configuration:

[,$yaml]
----
contexts:
  localhost:
    schemaRegistry:
      url: localhost:8081
----

==== Producing to an avro topic

`kafkactl` will lookup the topic in the schema registry in order to determine if key or value needs to be avro encoded.
If producing with the latest `schemaVersion` is sufficient, no additional configuration is needed an `kafkactl` handles
this automatically.

If however one needs to produce an older `schemaVersion` this can be achieved by providing the parameters `keySchemaVersion`, `valueSchemaVersion`.

===== Example

[,bash]
----
# create a topic
kafkactl create topic avro_topic
# add a schema for the topic value
curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \
--data '{"schema": "{\"type\": \"record\", \"name\": \"LongList\", \"fields\" : [{\"name\": \"next\", \"type\": [\"null\", \"LongList\"], \"default\": null}]}"}' \
http://localhost:8081/subjects/avro_topic-value/versions
# produce a message
kafkactl produce avro_topic --value {\"next\":{\"next\":{}}}
# consume the message
kafkactl consume avro_topic --from-beginning --print-schema -o yaml
----

==== Consuming from an avro topic

As for producing `kafkactl` will also lookup the topic in the schema registry to determine if key or value needs to be
decoded with an avro schema.

The `consume` command handles this automatically and no configuration is needed.

An additional parameter `print-schema` can be provided to display the schema used for decoding.

=== JSON schema support

JSON schema support works similarly to Avro. When a schema registry is configured, `kafkactl` will automatically detect
topics with JSON schemas registered and encode/decode messages using the Confluent wire format.

==== Producing to a JSON schema topic

`kafkactl` will lookup the topic in the schema registry in order to determine if key or value has a JSON schema registered.
The input JSON is validated against the schema before producing. If producing with the latest `schemaVersion` is sufficient,
no additional configuration is needed and `kafkactl` handles this automatically.

If however one needs to produce an older `schemaVersion` this can be achieved by providing the parameters `keySchemaVersion`, `valueSchemaVersion`.

===== Example

[,bash]
----
# create a topic
kafkactl create topic json_topic
# add a JSON schema for the topic value
curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \
--data '{"schemaType": "JSON", "schema": "{\"type\": \"object\", \"properties\": {\"name\": {\"type\": \"string\"}, \"age\": {\"type\": \"integer\"}}, \"required\": [\"name\", \"age\"]}"}' \
http://localhost:8081/subjects/json_topic-value/versions
# produce a message
kafkactl produce json_topic --value '{"name":"Alice","age":30}'
# consume the message
kafkactl consume json_topic --from-beginning --print-schema -o yaml
----

==== Consuming from a JSON schema topic

As for producing, `kafkactl` will also lookup the topic in the schema registry to determine if key or value needs to be
decoded with a JSON schema.

The `consume` command handles this automatically and no configuration is needed.

An additional parameter `print-schema` can be provided to display the schema used for decoding.

=== Protobuf support

`kafkactl` can consume and produce protobuf-encoded messages. In order to enable protobuf serialization/deserialization
you should add flag `--value-proto-type` and optionally `--key-proto-type` (if keys encoded in protobuf format)
with type name. Protobuf-encoded messages are mapped with https://developers.google.com/protocol-buffers/docs/proto3#json[pbjson].

`kafkactl` will search messages in following order:

. Protoset files specified in `--protoset-file` flag
. Protoset files specified in `context.protobuf.protosetFiles` config value
. Proto files specified in `--proto-file` flag
. Proto files specified in `context.protobuf.protoFiles` config value

Proto files may require some dependencies in `import` sections. To specify additional lookup paths use
`--proto-import-path` flag or `context.protobuf.importPaths` config value.

If provided message types was not found `kafkactl` will return error.

Note that if you want to use raw proto files `protoc` installation don't need to be installed.

Also note that protoset files must be compiled with included imports:

[,bash]
----
protoc -o kafkamsg.protoset --include_imports kafkamsg.proto
----

==== Example

Assume you have following proto schema in `kafkamsg.proto`:

[,protobuf]
----
syntax = "proto3";

import "google/protobuf/timestamp.proto";

message TopicMessage {
  google.protobuf.Timestamp produced_at = 1;
  int64 num = 2;
}

message TopicKey {
  float fvalue = 1;
}
----

"well-known" `google/protobuf` types are included so no additional proto files needed.

To produce message run

[,bash]
----
kafkactl produce <topic> --key '{"fvalue":1.2}' --key-proto-type TopicKey --value '{"producedAt":"2021-12-01T14:10:12Z","num":"1"}' --value-proto-type TopicValue --proto-file kafkamsg.proto
----

or with protoset

[,bash]
----
kafkactl produce <topic> --key '{"fvalue":1.2}' --key-proto-type TopicKey --value '{"producedAt":"2021-12-01T14:10:12Z","num":"1"}' --value-proto-type TopicValue --protoset-file kafkamsg.protoset
----

To consume messages run

[,bash]
----
kafkactl consume <topic> --key-proto-type TopicKey --value-proto-type TopicValue --proto-file kafkamsg.proto
----

or with protoset

[,bash]
----
kafkactl consume <topic> --key-proto-type TopicKey --value-proto-type TopicValue --protoset-file kafkamsg.protoset
----



=== Topic management

==== List topics

In order to get a list of topics the `get topics` command can be used:

[,bash]
----
kafkactl get topics
kafkactl list topics
----

==== Describe topic

A detailed description of a topic can be obtained with `describe topic`:

[,bash]
----
kafkactl describe topic my-topic
----

Per default only overwritten config entries are printed. To print all config entries including defaults use:

[,bash]
----
kafkactl describe topic my-topic --all-configs
----

To print only partition details of partitions with messages use:
[,bash]
----
kafkactl describe topic my-topic --skip-empty
----

==== Create topic

The `create topic` allows you to create one or multiple topics.

Basic usage:
[,bash]
----
kafkactl create topic my-topic
----

The partition count can be specified with:
[,bash]
----
kafkactl create topic my-topic --partitions 32
----

The replication factor can be specified with:
[,bash]
----
kafkactl create topic my-topic --replication-factor 3
----

Configs can also be provided:
[,bash]
----
kafkactl create topic my-topic --config retention.ms=3600000 --config=cleanup.policy=compact
----

The topic configuration can also be taken from an existing topic using the following:
[,bash]
----
kafkactl describe topic my-topic -o json > my-topic-config.json
kafkactl create topic my-topic-clone --file my-topic-config.json
----

==== Altering topics

Using the `alter topic` command allows you to change the partition count, replication factor and topic-level
configurations of an existing topic.

The partition count can be increased with:

[,bash]
----
kafkactl alter topic my-topic --partitions 32
----

The replication factor can be altered with:

[,bash]
----
kafkactl alter topic my-topic --replication-factor 2
----

____
:information_source: when altering replication factor, kafkactl tries to keep the number of replicas assigned to each
broker balanced. If you need more control over the assigned replicas use `alter partition` directly.
____

The topic configs can be edited by supplying key value pairs as follows:

[,bash]
----
kafkactl alter topic my-topic --config retention.ms=3600000 --config cleanup.policy=compact
----

____
:bulb: use the flag `--validate-only` to perform a dry-run without actually modifying the topic
____

==== Altering partitions

The assigned replicas of a partition can directly be altered with:

[,bash]
----
# set brokers 102,103 as replicas for partition 3 of topic my-topic
kafkactl alter partition my-topic 3 -r 102,103
----

==== Clone topic

New topic may be created from existing topic as follows:

[,bash]
----
kafkactl clone topic source-topic target-topic
----

Source topic must exist, target topic must not exist.
`kafkactl` clones partitions count, replication factor and config entries.


==== Delete Records from a topics

Command to be used to delete records from partition, which have an offset smaller than the provided offset.

[,bash]
----
# delete records with offset < 123 from partition 0 and offset < 456 from partition 1
kafkactl delete records my-topic --offset 0=123 --offset 1=456
----


=== Consumer group management

==== List Consumer groups

In order to get a list of consumer groups the `get consumer-groups` command can be used:

[,bash]
----
# all available consumer groups
kafkactl get consumer-groups
# only consumer groups for a single topic
kafkactl get consumer-groups --topic my-topic
# using command alias
kafkactl get cg
----

To get detailed information about the consumer group use `describe consumer-group`. If the parameter `--partitions`
is provided details will be printed for each partition otherwise the partitions are aggregated to the clients.

==== Describe Consumer group

[,bash]
----
# describe a consumer group
kafkactl describe consumer-group my-group
# show partition details only for partitions with lag
kafkactl describe consumer-group my-group --only-with-lag
# show details only for a single topic
kafkactl describe consumer-group my-group --topic my-topic
# using command alias
kafkactl describe cg my-group
----

==== Create consumer groups

A consumer-group can be created as follows:

[,bash]
----
# create group with offset for all partitions set to oldest
kafkactl create consumer-group my-group --topic my-topic --oldest
# create group with offset for all partitions set to newest
kafkactl create consumer-group my-group --topic my-topic --newest
# create group with offset for a single partition set to specific offset
kafkactl create consumer-group my-group --topic my-topic --partition 5 --offset 100
# create group for multiple topics with offset for all partitions set to oldest
kafkactl create consumer-group my-group --topic my-topic-a --topic my-topic-b --oldest
----

==== Clone consumer group

A consumer group may be created as clone of another consumer group as follows:

[,bash]
----
kafkactl clone consumer-group source-group target-group
----

Source group must exist and have committed offsets. Target group must not exist or don't have committed offsets.
`kafkactl` clones topic assignment and partition offsets.

==== Reset consumer group offsets

in order to ensure the reset does what it is expected, per default only
the results are printed without actually executing it. Use the additional parameter `--execute` to perform the reset.

[,bash]
----
# reset offset of for all partitions to oldest offset
kafkactl reset offset my-group --topic my-topic --oldest
# reset offset of for all partitions to newest offset
kafkactl reset offset my-group --topic my-topic --newest
# reset offset for a single partition to specific offset
kafkactl reset offset my-group --topic my-topic --partition 5 --offset 100
# reset offset to newest for all topics in the group
kafkactl reset offset my-group --all-topics --newest
# reset offset of for all partitions on multiple topics to oldest offset
kafkactl reset offset my-group --topic my-topic-a --topic my-topic-b --oldest
# reset offset to offset at a given timestamp(epoch)/datetime
kafkactl reset offset my-group --topic my-topic-a --to-datetime 2014-04-26T17:24:37.123Z
# reset offset to offset at a given timestamp(epoch)/datetime
kafkactl reset offset my-group --topic my-topic-a --to-datetime 1697726906352
----

==== Delete consumer group offsets

In order to delete a consumer group offset use `delete offset`

[,bash]
----
# delete offset for all partitions of topic my-topic
kafkactl delete offset my-group --topic my-topic
# delete offset for partition 1 of topic my-topic
kafkactl delete offset my-group --topic my-topic --partition 1
----

==== Delete consumer groups

In order to delete a consumer group or a list of consumer groups use `delete consumer-group`

[,bash]
----
# delete consumer group my-group
kafkactl delete consumer-group my-group
----

=== ACL Management

Available ACL operations are documented https://docs.confluent.io/platform/current/kafka/authorization.html#operations[here].

==== Create a new ACL

[,bash]
----
# create an acl that allows topic read for a user 'consumer'
kafkactl create acl --topic my-topic --operation read --principal User:consumer --allow
# create an acl that denies topic write for a user 'consumer' coming from a specific host
kafkactl create acl --topic my-topic --operation write --host 1.2.3.4 --principal User:consumer --deny
# allow multiple operations
kafkactl create acl --topic my-topic --operation read --operation describe --principal User:consumer --allow
# allow on all topics with prefix common prefix
kafkactl create acl --topic my-prefix --pattern prefixed --operation read --principal User:consumer --allow
----

==== List ACLs

[,bash]
----
# list all acl
kafkactl get acl
# list all acl (alias command)
kafkactl get access-control-list
# filter only topic resources
kafkactl get acl --topics
# filter only consumer group resources with operation read
kafkactl get acl --groups --operation read
# filter specific topic and user
kafkactl get acl --resource-name my-topic --principal User:myUser
# filter specific topic and host
kafkactl get acl --resource-name my-topic --host my-host
----

==== Delete ACLs

[,bash]
----
# delete all topic read acls
kafkactl delete acl --topics --operation read --pattern any
# delete all topic acls for any operation
kafkactl delete acl --topics --operation any --pattern any
# delete all cluster acls for any operation
kafkactl delete acl --cluster --operation any --pattern any
# delete all consumer-group acls with operation describe, patternType prefixed and permissionType allow
kafkactl delete acl --groups --operation describe --pattern prefixed --allow
# delete all topic acls for a principal
kafkactl delete acl --topics --operation any --pattern any --prinicipal User:myUser
# delete all topic acls for a host
kafkactl delete acl --topics --operation any --pattern any --host my-host
----

=== Broker Management

==== Getting Brokers

To get the list of brokers of a kafka cluster use `get brokers`

[,bash]
----
# get the list of brokers
kafkactl get brokers
----

==== Describe Broker

To view configs for a single broker use `describe broker`

[,bash]
----
# describe broker
kafkactl describe broker 1
----

Per default only dynamic configs are shown. To view all configs use `--all-configs`

[,bash]
----
kafkactl describe broker 1 --all-configs
----

Additionally, only default configs can be shown with:
[,bash]
----
kafkactl describe broker default
----

==== Altering brokers

Using the `alter broker` command allows you to change dynamic broker configurations for individual brokers or cluster-wide defaults.

To alter a configuration for a specific broker:

[,bash]
----
kafkactl alter broker 101 --config background.threads=8
----

To alter a cluster-wide default configuration (affects brokers without individual overrides):

[,bash]
----
kafkactl alter broker default --config background.threads=8
----

Multiple configurations can be altered simultaneously:

[,bash]
----
kafkactl alter broker 101 --config background.threads=8 --config log.cleaner.threads=2
----

____
:bulb: use the flag `--validate-only` to perform a dry-run without actually modifying the broker configuration
____

____
:information_source: only dynamically configurable broker properties can be altered. Static properties like `broker.id` or `log.dirs` require a broker restart to change.
____


=== SCRAM User Management

kafkactl provides comprehensive SCRAM (Salted Challenge Response Authentication Mechanism) user management capabilities for Kafka clusters that support SCRAM authentication. This allows you to create, modify, and manage user credentials directly through kafkactl.

==== Requirements

* Kafka 2.7.0+ (for SCRAM user management APIs)
* Admin privileges on the Kafka cluster
* SCRAM-enabled listeners configured on Kafka brokers

==== Supported Mechanisms

* `SCRAM-SHA-256` (default)
* `SCRAM-SHA-512`

==== Create SCRAM Users

Create a new SCRAM user with default settings:

[,bash]
----
# Create user with SCRAM-SHA-256 (default mechanism)
kafkactl create user myuser --password mypassword

# Create user with specific mechanism
kafkactl create user myuser --password mypassword --mechanism SCRAM-SHA-512

# Create user with custom iterations (default: 4096)
kafkactl create user myuser --password mypassword --iterations 8192

# Create user with custom base64-encoded salt
kafkactl create user myuser --password mypassword --salt "c2FsdA=="
----

==== Alter SCRAM Users

Update existing user credentials:

[,bash]
----
# Update user password (keeps existing mechanism)
kafkactl alter user myuser --password newpassword

# Update user with different mechanism
kafkactl alter user myuser --password newpassword --mechanism SCRAM-SHA-512

# Update with custom iterations
kafkactl alter user myuser --password newpassword --iterations 16384
----

==== Delete SCRAM Users

Remove SCRAM credentials by mechanism:

[,bash]
----
# Delete SCRAM-SHA-256 credentials (default)
kafkactl delete user myuser --mechanism SCRAM-SHA-256

# Delete SCRAM-SHA-512 credentials
kafkactl delete user myuser --mechanism SCRAM-SHA-512

# Note: A user may have multiple mechanisms, delete each separately
----

==== List SCRAM Users

Get all SCRAM users and their mechanisms:

[,bash]
----
# List all users (table format)
kafkactl get users

# List users in JSON format
kafkactl get users -o json

# List users in YAML format
kafkactl get users -o yaml
----

==== Describe SCRAM User

Get detailed information about a specific user:

[,bash]
----
# Describe user (table format)
kafkactl describe user myuser

# Describe user in JSON format
kafkactl describe user myuser -o json

# Describe user in YAML format
kafkactl describe user myuser -o yaml
----

==== Security Considerations

* **Salt Generation**: kafkactl automatically generates cryptographically secure random salts unless custom salts are provided
* **Password Security**: Passwords are transmitted securely to Kafka and never stored by kafkactl
* **Mechanism Support**: Users can have credentials for multiple SCRAM mechanisms simultaneously
* **Admin Privileges**: SCRAM user management requires admin-level access to the Kafka cluster

==== Multi-Mechanism Users

A single user can have credentials for multiple SCRAM mechanisms:

[,bash]
----
# Create user with SCRAM-SHA-256
kafkactl create user myuser --password mypassword --mechanism SCRAM-SHA-256

# Add SCRAM-SHA-512 credentials to the same user
kafkactl create user myuser --password mypassword --mechanism SCRAM-SHA-512

# User now has both mechanisms
kafkactl describe user myuser
----

== Development

In order to see linter errors before commit, add the following pre-commit hook:

[,bash]
----
pip install --user pre-commit
pre-commit install
----

Documentation

The Go Gopher

There is no documentation for this package.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL