kafka

package
v0.3.7 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 26, 2025 License: Apache-2.0 Imports: 6 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type CustomGroupBalancer

type CustomGroupBalancer struct {
	// contains filtered or unexported fields
}

CustomGroupBalancer ensures proper consumer ID distribution according to requirements

func (*CustomGroupBalancer) AssignGroups

func (b *CustomGroupBalancer) AssignGroups(members []kafka.GroupMember, partitions []kafka.Partition) kafka.GroupMemberAssignments

AssignGroups implements kafka.GroupBalancer interface

func (*CustomGroupBalancer) ProtocolName

func (b *CustomGroupBalancer) ProtocolName() string

ProtocolName implements kafka.GroupBalancer interface

func (*CustomGroupBalancer) UserData

func (b *CustomGroupBalancer) UserData() ([]byte, error)

UserData implements kafka.GroupBalancer interface

type ReaderConfig

type ReaderConfig struct {
	MaxThreads                  int
	ThreadsEqualTotalPartitions bool
	BootstrapServers            string
	ConsumerGroupID             string
	Dialer                      *kafka.Dialer
	AdminClient                 *kafka.Client
}

ReaderConfig holds configuration for creating Kafka readers

type ReaderManager

type ReaderManager struct {
	// contains filtered or unexported fields
}

ReaderManager manages Kafka readers and their metadata

func NewReaderManager

func NewReaderManager(config ReaderConfig) *ReaderManager

NewReaderManager creates a new Kafka reader manager

func (*ReaderManager) Close

func (r *ReaderManager) Close() error

func (*ReaderManager) CreateReaders

func (r *ReaderManager) CreateReaders(ctx context.Context, streams []types.StreamInterface, consumerGroupID string) error

CreateReaders creates Kafka readers based on the provided streams and configuration

func (*ReaderManager) FetchCommittedOffsets

func (r *ReaderManager) FetchCommittedOffsets(ctx context.Context, topic string, partitions []kafka.Partition) map[int]int64

FetchCommittedOffsets fetches committed offsets for a topic

func (*ReaderManager) GetPartitionIndex

func (r *ReaderManager) GetPartitionIndex(partitionKey string) (types.PartitionMetaData, bool)

GetPartitionIndex returns the partition index

func (*ReaderManager) GetReader

func (r *ReaderManager) GetReader(readerID string) *kafka.Reader

GetReaders returns the created readers

func (*ReaderManager) GetReaderClientID

func (r *ReaderManager) GetReaderClientID(readerID string) (string, bool)

GetReaderClientIDs returns the reader client IDs

func (*ReaderManager) GetReaderIDs

func (r *ReaderManager) GetReaderIDs() []string

return reader ids

func (*ReaderManager) GetTopicMetadata

func (r *ReaderManager) GetTopicMetadata(ctx context.Context, topic string) (*kafka.Topic, error)

GetTopicMetadata fetches metadata for a topic

func (*ReaderManager) SetPartitions

func (r *ReaderManager) SetPartitions(ctx context.Context, stream types.StreamInterface) error

sets partitions that need to be synced for a stream

func (*ReaderManager) ShouldMatchPartitionCount

func (r *ReaderManager) ShouldMatchPartitionCount() bool

ShouldMatchPartitionCount returns whether readers should match partition count

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL