Documentation
¶
Index ¶
- type CustomGroupBalancer
- type ReaderConfig
- type ReaderManager
- func (r *ReaderManager) Close() error
- func (r *ReaderManager) CreateReaders(ctx context.Context, streams []types.StreamInterface, consumerGroupID string) error
- func (r *ReaderManager) FetchCommittedOffsets(ctx context.Context, topic string, partitions []kafka.Partition) map[int]int64
- func (r *ReaderManager) GetPartitionIndex(partitionKey string) (types.PartitionMetaData, bool)
- func (r *ReaderManager) GetReader(readerID int) *kafka.Reader
- func (r *ReaderManager) GetReaderCount() int
- func (r *ReaderManager) GetReaderIDAndClientID(readerIndex int) (string, string)
- func (r *ReaderManager) GetTopicMetadata(ctx context.Context, topic string) (*kafka.Topic, error)
- func (r *ReaderManager) SetPartitions(ctx context.Context, stream types.StreamInterface) error
- func (r *ReaderManager) ShouldMatchPartitionCount() bool
- type SchemaRegistryClient
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type CustomGroupBalancer ¶
type CustomGroupBalancer struct {
// contains filtered or unexported fields
}
CustomGroupBalancer ensures proper consumer ID distribution according to requirements
func (*CustomGroupBalancer) AssignGroups ¶
func (b *CustomGroupBalancer) AssignGroups(members []kafka.GroupMember, partitions []kafka.Partition) kafka.GroupMemberAssignments
AssignGroups implements kafka.GroupBalancer interface
func (*CustomGroupBalancer) ProtocolName ¶
func (b *CustomGroupBalancer) ProtocolName() string
ProtocolName implements kafka.GroupBalancer interface
func (*CustomGroupBalancer) UserData ¶
func (b *CustomGroupBalancer) UserData() ([]byte, error)
UserData implements kafka.GroupBalancer interface
type ReaderConfig ¶
type ReaderConfig struct {
MaxThreads int
ThreadsEqualTotalPartitions bool
BootstrapServers string
ConsumerGroupID string
Dialer *kafka.Dialer
AdminClient *kafka.Client
}
ReaderConfig holds configuration for creating Kafka readers
type ReaderManager ¶
type ReaderManager struct {
// contains filtered or unexported fields
}
ReaderManager manages Kafka readers and their metadata
func NewReaderManager ¶
func NewReaderManager(config ReaderConfig) *ReaderManager
NewReaderManager creates a new Kafka reader manager
func (*ReaderManager) Close ¶
func (r *ReaderManager) Close() error
func (*ReaderManager) CreateReaders ¶
func (r *ReaderManager) CreateReaders(ctx context.Context, streams []types.StreamInterface, consumerGroupID string) error
CreateReaders creates Kafka readers based on the provided streams and configuration
func (*ReaderManager) FetchCommittedOffsets ¶
func (r *ReaderManager) FetchCommittedOffsets(ctx context.Context, topic string, partitions []kafka.Partition) map[int]int64
FetchCommittedOffsets fetches committed offsets for a topic
func (*ReaderManager) GetPartitionIndex ¶
func (r *ReaderManager) GetPartitionIndex(partitionKey string) (types.PartitionMetaData, bool)
GetPartitionIndex returns the partition index
func (*ReaderManager) GetReader ¶
func (r *ReaderManager) GetReader(readerID int) *kafka.Reader
GetReaders returns the created readers
func (*ReaderManager) GetReaderCount ¶ added in v0.3.16
func (r *ReaderManager) GetReaderCount() int
GetReaders returns the created readers
func (*ReaderManager) GetReaderIDAndClientID ¶ added in v0.3.16
func (r *ReaderManager) GetReaderIDAndClientID(readerIndex int) (string, string)
GetReaderClientIDs returns the reader client IDs
func (*ReaderManager) GetTopicMetadata ¶
GetTopicMetadata fetches metadata for a topic
func (*ReaderManager) SetPartitions ¶
func (r *ReaderManager) SetPartitions(ctx context.Context, stream types.StreamInterface) error
sets partitions that need to be synced for a stream
func (*ReaderManager) ShouldMatchPartitionCount ¶
func (r *ReaderManager) ShouldMatchPartitionCount() bool
ShouldMatchPartitionCount returns whether readers should match partition count
type SchemaRegistryClient ¶ added in v0.3.16
type SchemaRegistryClient struct {
Endpoint string `json:"endpoint"`
// Authentication
Username string `json:"username,omitempty"`
Password string `json:"password,omitempty"`
BearerToken string `json:"bearer_token,omitempty"`
// contains filtered or unexported fields
}
SchemaRegistryClient holds the schema registry client information
func (*SchemaRegistryClient) FetchSchema ¶ added in v0.3.16
func (c *SchemaRegistryClient) FetchSchema(schemaID uint32) (*types.RegisteredSchema, error)
TODO: fetch schema by subject strategy if needed (e.g. latest, version specific) currently we only fetch by ID which is sufficient for deserialization of consumed messages
func (*SchemaRegistryClient) Init ¶ added in v0.3.16
func (c *SchemaRegistryClient) Init()
Init initializes the HTTP client for the SchemaRegistryClient
func (*SchemaRegistryClient) Validate ¶ added in v0.3.16
func (c *SchemaRegistryClient) Validate() error
Validate validates the schema registry connection using lightweight /subject request