kafka

package
v0.0.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 4, 2021 License: MIT Imports: 12 Imported by: 0

Documentation

Index

Constants

View Source
const (
	SetConfigOperation    int8 = 0
	DeleteConfigOperation int8 = 1
)

Variables

This section is empty.

Functions

This section is empty.

Types

type ConfigOperation

type ConfigOperation struct {
	Name  string
	Value *string
	Op    int8 // 0: SET, 1: DELETE, 2: APPEND, 3: SUBTRACT
}

An alter config operation

type ConfigOperations

type ConfigOperations []ConfigOperation

An array of ConfigOperation

func (ConfigOperations) Contains

func (c ConfigOperations) Contains(name string) bool

Determine if the specified config key name exists

func (ConfigOperations) ContainsOp

func (c ConfigOperations) ContainsOp(operation int8) bool

Determine if the specified operation type exists

type Metadata

type Metadata struct {
	ClusterId string
	Brokers   meta.Brokers
	Topics    []TopicMetadata
}

Cluster metadata

type ResourceAcls

type ResourceAcls struct {
	ResourceName string
	ResourceType string
	Acls         def.AclEntryGroups
}

Acls for a named resource

type ResourceConfigs

type ResourceConfigs struct {
	ResourceName string
	Configs      def.Configs
}

Configs for a named resource

type Service

type Service struct {
	// contains filtered or unexported fields
}

A Kafka service

func NewService

func NewService(
	cl *client.Client,
) *Service

Create a new service

func (*Service) AlterAllBrokerConfigs

func (s *Service) AlterAllBrokerConfigs(configOps ConfigOperations, validateOnly bool) error

Execute a request to alter cluster-wide broker configs (Kafka 0.11.0+/2.3.0+)

func (*Service) AlterBrokerConfigs

func (s *Service) AlterBrokerConfigs(brokerId string, configOps ConfigOperations, validateOnly bool) error

Execute a request to alter broker configs (Kafka 0.11.0+/2.3.0+)

func (*Service) AlterPartitionAssignments

func (s *Service) AlterPartitionAssignments(
	topic string,
	assignments def.PartitionAssignments,
) error

Execute a request to alter partition assignments (Kafka 2.4.0+)

func (*Service) AlterTopicConfigs

func (s *Service) AlterTopicConfigs(topic string, configOps ConfigOperations, validateOnly bool) error

Execute a request to alter topic configs (Kafka 0.11.0+/2.3.0+)

func (*Service) CreateAcls

func (s *Service) CreateAcls(
	name string,
	resourceType string,
	acls def.AclEntryGroups,
) error

Execute a request to create acls (Kafka 0.11.0+)

func (*Service) CreatePartitions

func (s *Service) CreatePartitions(
	topic string,
	partitions int,
	assignments def.PartitionAssignments,
	validateOnly bool,
) error

Execute a request to create partitions (Kafka 0.10.0+)

func (*Service) CreateTopic

func (s *Service) CreateTopic(
	topicDef def.TopicDefinition,
	assignments def.PartitionAssignments,
	validateOnly bool,
) error

Execute a request to create a topic (Kafka 0.10.1+)

func (*Service) DeleteAcls

func (s *Service) DeleteAcls(
	name string,
	resourceType string,
	acls def.AclEntryGroups,
) error

Execute a request to delete acls (Kafka 0.11.0+)

func (*Service) DescribeAllBrokerConfigs

func (s *Service) DescribeAllBrokerConfigs() (def.Configs, error)

Execute a request to describe all broker configs (Kafka 0.11.0+)

func (*Service) DescribeAllResourceAcls

func (s *Service) DescribeAllResourceAcls(
	resourceType string,
) ([]ResourceAcls, error)

Execute a request to describe acls for all resources (Kafka 0.11.0+)

func (*Service) DescribeBrokerConfigs

func (s *Service) DescribeBrokerConfigs(brokerId string) (def.Configs, error)

Execute a request to describe broker configs (Kafka 0.11.0+)

func (*Service) DescribeMetadata

func (s *Service) DescribeMetadata(topics []string, errorOnNonExistence bool) (*Metadata, error)

Execute a request for metadata (Kafka 0.8.0+)

func (*Service) DescribeResourceAcls

func (s *Service) DescribeResourceAcls(
	name string,
	resourceType string,
) (def.AclEntryGroups, error)

Execute a request to describe acls of a specific resource (Kafka 0.11.0+)

func (*Service) DescribeTopicConfigs

func (s *Service) DescribeTopicConfigs(topics []string) ([]ResourceConfigs, error)

Execute a request to describe topic configs (Kafka 0.11.0+)

func (*Service) IsKafkaReady

func (s *Service) IsKafkaReady(minBrokers int, timeoutSec int) bool

Execute describe cluster requests until a minimum number of brokers are alive (Kafka 2.8.0+)

func (*Service) ListPartitionReassignments

func (s *Service) ListPartitionReassignments(
	topic string,
	partitions []int32,
) (meta.PartitionReassignments, error)

Execute a request to list partition reassignments (Kafka 2.4.0+)

func (*Service) NewConfigOps

func (s *Service) NewConfigOps(
	localConfigs def.ConfigsMap,
	remoteConfigsMap def.ConfigsMap,
	remoteConfigs def.Configs,
	deleteUndefinedConfigs bool,
) (ConfigOperations, error)

Create alter configs operations

func (*Service) TryRequestTopic

func (s *Service) TryRequestTopic(topic string) (
	*def.TopicDefinition,
	def.Configs,
	meta.Brokers,
	error,
)

Execute a request for the metadata of a topic that may or may not exist (Kafka 0.11.0+)

type TopicMetadata

type TopicMetadata struct {
	Topic                    string
	PartitionAssignments     def.PartitionAssignments
	PartitionRackAssignments def.PartitionRackAssignments
	Exists                   bool
}

Topic metadata

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL