Documentation
¶
Index ¶
- type CustomGroupBalancer
- type ReaderConfig
- type ReaderManager
- func (r *ReaderManager) Close() error
- func (r *ReaderManager) CreateReaders(ctx context.Context, streams []types.StreamInterface, consumerGroupID string) error
- func (r *ReaderManager) FetchCommittedOffsets(ctx context.Context, topic string, partitions []kafka.Partition) map[int]int64
- func (r *ReaderManager) GetPartitionIndex(partitionKey string) (types.PartitionMetaData, bool)
- func (r *ReaderManager) GetReader(readerID string) *kafka.Reader
- func (r *ReaderManager) GetReaderClientID(readerID string) (string, bool)
- func (r *ReaderManager) GetReaderIDs() []string
- func (r *ReaderManager) GetTopicMetadata(ctx context.Context, topic string) (*kafka.Topic, error)
- func (r *ReaderManager) SetPartitions(ctx context.Context, stream types.StreamInterface) error
- func (r *ReaderManager) ShouldMatchPartitionCount() bool
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type CustomGroupBalancer ¶
type CustomGroupBalancer struct {
// contains filtered or unexported fields
}
CustomGroupBalancer ensures proper consumer ID distribution according to requirements
func (*CustomGroupBalancer) AssignGroups ¶
func (b *CustomGroupBalancer) AssignGroups(members []kafka.GroupMember, partitions []kafka.Partition) kafka.GroupMemberAssignments
AssignGroups implements kafka.GroupBalancer interface
func (*CustomGroupBalancer) ProtocolName ¶
func (b *CustomGroupBalancer) ProtocolName() string
ProtocolName implements kafka.GroupBalancer interface
func (*CustomGroupBalancer) UserData ¶
func (b *CustomGroupBalancer) UserData() ([]byte, error)
UserData implements kafka.GroupBalancer interface
type ReaderConfig ¶
type ReaderConfig struct {
MaxThreads int
ThreadsEqualTotalPartitions bool
BootstrapServers string
ConsumerGroupID string
Dialer *kafka.Dialer
AdminClient *kafka.Client
}
ReaderConfig holds configuration for creating Kafka readers
type ReaderManager ¶
type ReaderManager struct {
// contains filtered or unexported fields
}
ReaderManager manages Kafka readers and their metadata
func NewReaderManager ¶
func NewReaderManager(config ReaderConfig) *ReaderManager
NewReaderManager creates a new Kafka reader manager
func (*ReaderManager) Close ¶
func (r *ReaderManager) Close() error
func (*ReaderManager) CreateReaders ¶
func (r *ReaderManager) CreateReaders(ctx context.Context, streams []types.StreamInterface, consumerGroupID string) error
CreateReaders creates Kafka readers based on the provided streams and configuration
func (*ReaderManager) FetchCommittedOffsets ¶
func (r *ReaderManager) FetchCommittedOffsets(ctx context.Context, topic string, partitions []kafka.Partition) map[int]int64
FetchCommittedOffsets fetches committed offsets for a topic
func (*ReaderManager) GetPartitionIndex ¶
func (r *ReaderManager) GetPartitionIndex(partitionKey string) (types.PartitionMetaData, bool)
GetPartitionIndex returns the partition index
func (*ReaderManager) GetReader ¶
func (r *ReaderManager) GetReader(readerID string) *kafka.Reader
GetReaders returns the created readers
func (*ReaderManager) GetReaderClientID ¶
func (r *ReaderManager) GetReaderClientID(readerID string) (string, bool)
GetReaderClientIDs returns the reader client IDs
func (*ReaderManager) GetReaderIDs ¶
func (r *ReaderManager) GetReaderIDs() []string
return reader ids
func (*ReaderManager) GetTopicMetadata ¶
GetTopicMetadata fetches metadata for a topic
func (*ReaderManager) SetPartitions ¶
func (r *ReaderManager) SetPartitions(ctx context.Context, stream types.StreamInterface) error
sets partitions that need to be synced for a stream
func (*ReaderManager) ShouldMatchPartitionCount ¶
func (r *ReaderManager) ShouldMatchPartitionCount() bool
ShouldMatchPartitionCount returns whether readers should match partition count