kafka

package
v0.4.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 20, 2026 License: Apache-2.0 Imports: 12 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type CustomGroupBalancer

type CustomGroupBalancer struct {
	// contains filtered or unexported fields
}

CustomGroupBalancer ensures proper consumer ID distribution according to requirements

func (*CustomGroupBalancer) AssignGroups

func (b *CustomGroupBalancer) AssignGroups(members []kafka.GroupMember, partitions []kafka.Partition) kafka.GroupMemberAssignments

AssignGroups implements kafka.GroupBalancer interface

func (*CustomGroupBalancer) ProtocolName

func (b *CustomGroupBalancer) ProtocolName() string

ProtocolName implements kafka.GroupBalancer interface

func (*CustomGroupBalancer) UserData

func (b *CustomGroupBalancer) UserData() ([]byte, error)

UserData implements kafka.GroupBalancer interface

type ReaderConfig

type ReaderConfig struct {
	MaxThreads                  int
	ThreadsEqualTotalPartitions bool
	BootstrapServers            string
	ConsumerGroupID             string
	Dialer                      *kafka.Dialer
	AdminClient                 *kafka.Client
}

ReaderConfig holds configuration for creating Kafka readers

type ReaderManager

type ReaderManager struct {
	// contains filtered or unexported fields
}

ReaderManager manages Kafka readers and their metadata

func NewReaderManager

func NewReaderManager(config ReaderConfig) *ReaderManager

NewReaderManager creates a new Kafka reader manager

func (*ReaderManager) Close

func (r *ReaderManager) Close() error

func (*ReaderManager) CreateReaders

func (r *ReaderManager) CreateReaders(ctx context.Context, streams []types.StreamInterface, consumerGroupID string) error

CreateReaders creates Kafka readers based on the provided streams and configuration

func (*ReaderManager) FetchCommittedOffsets

func (r *ReaderManager) FetchCommittedOffsets(ctx context.Context, topic string, partitions []kafka.Partition) map[int]int64

FetchCommittedOffsets fetches committed offsets for a topic

func (*ReaderManager) GetPartitionIndex

func (r *ReaderManager) GetPartitionIndex(partitionKey string) (types.PartitionMetaData, bool)

GetPartitionIndex returns the partition index

func (*ReaderManager) GetReader

func (r *ReaderManager) GetReader(readerID int) *kafka.Reader

GetReaders returns the created readers

func (*ReaderManager) GetReaderCount added in v0.3.16

func (r *ReaderManager) GetReaderCount() int

GetReaders returns the created readers

func (*ReaderManager) GetReaderIDAndClientID added in v0.3.16

func (r *ReaderManager) GetReaderIDAndClientID(readerIndex int) (string, string)

GetReaderClientIDs returns the reader client IDs

func (*ReaderManager) GetTopicMetadata

func (r *ReaderManager) GetTopicMetadata(ctx context.Context, topic string) (*kafka.Topic, error)

GetTopicMetadata fetches metadata for a topic

func (*ReaderManager) SetPartitions

func (r *ReaderManager) SetPartitions(ctx context.Context, stream types.StreamInterface) error

sets partitions that need to be synced for a stream

func (*ReaderManager) ShouldMatchPartitionCount

func (r *ReaderManager) ShouldMatchPartitionCount() bool

ShouldMatchPartitionCount returns whether readers should match partition count

type SchemaRegistryClient added in v0.3.16

type SchemaRegistryClient struct {
	Endpoint string `json:"endpoint"`

	// Authentication
	Username    string `json:"username,omitempty"`
	Password    string `json:"password,omitempty"`
	BearerToken string `json:"bearer_token,omitempty"`
	// contains filtered or unexported fields
}

SchemaRegistryClient holds the schema registry client information

func (*SchemaRegistryClient) FetchSchema added in v0.3.16

func (c *SchemaRegistryClient) FetchSchema(schemaID uint32) (*types.RegisteredSchema, error)

TODO: fetch schema by subject strategy if needed (e.g. latest, version specific) currently we only fetch by ID which is sufficient for deserialization of consumed messages

func (*SchemaRegistryClient) Init added in v0.3.16

func (c *SchemaRegistryClient) Init()

Init initializes the HTTP client for the SchemaRegistryClient

func (*SchemaRegistryClient) Validate added in v0.3.16

func (c *SchemaRegistryClient) Validate() error

Validate validates the schema registry connection using lightweight /subject request

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL