Versions in this module Expand all Collapse all v1 v1.5.0 Jan 30, 2026 v1.4.2 Jan 19, 2026 Changes in this version + var ErrInvalidTopic = errors.New("invalid topic configuration") + var ErrStoreUnavailable = errors.New("metadata store unavailable") + var ErrTopicExists = errors.New("topic already exists") + var ErrUnknownTopic = errors.New("unknown topic") + func BrokerRegistrationKey(brokerID string) string + func ConsumerGroupKey(groupID string) string + func ConsumerGroupPrefix() string + func ConsumerOffsetKey(groupID, topic string, partition int32) string + func DecodeConsumerGroup(data []byte) (*metadatapb.ConsumerGroup, error) + func DecodePartitionState(data []byte) (*metadatapb.PartitionState, error) + func DecodeTopicConfig(data []byte) (*metadatapb.TopicConfig, error) + func EncodeConsumerGroup(group *metadatapb.ConsumerGroup) ([]byte, error) + func EncodePartitionState(state *metadatapb.PartitionState) ([]byte, error) + func EncodeTopicConfig(cfg *metadatapb.TopicConfig) ([]byte, error) + func ParseConsumerGroupID(key string) (string, bool) + func ParseConsumerOffsetKey(key string) (string, string, int32, bool) + func PartitionAssignmentKey(topic string, partition int32) string + func PartitionStateKey(topic string, partition int32) string + func TopicConfigKey(topic string) string + func TopicIDForName(name string) [16]byte + type ClusterMetadata struct + Brokers []protocol.MetadataBroker + ClusterID *string + ClusterName *string + ControllerID int32 + Topics []protocol.MetadataTopic + type ConsumerOffset struct + Group string + Offset int64 + Partition int32 + Topic string + type EtcdStore struct + func NewEtcdStore(ctx context.Context, snapshot ClusterMetadata, cfg EtcdStoreConfig) (*EtcdStore, error) + func (s *EtcdStore) Available() bool + func (s *EtcdStore) CommitConsumerOffset(ctx context.Context, group, topic string, partition int32, offset int64, ...) error + func (s *EtcdStore) CreatePartitions(ctx context.Context, topic string, partitionCount int32) error + func (s *EtcdStore) CreateTopic(ctx context.Context, spec TopicSpec) (*protocol.MetadataTopic, error) + func (s *EtcdStore) DeleteConsumerGroup(ctx context.Context, groupID string) error + func (s *EtcdStore) DeleteTopic(ctx context.Context, name string) error + func (s *EtcdStore) FetchConsumerGroup(ctx context.Context, groupID string) (*metadatapb.ConsumerGroup, error) + func (s *EtcdStore) FetchConsumerOffset(ctx context.Context, group, topic string, partition int32) (int64, string, error) + func (s *EtcdStore) FetchTopicConfig(ctx context.Context, topic string) (*metadatapb.TopicConfig, error) + func (s *EtcdStore) ListConsumerGroups(ctx context.Context) ([]*metadatapb.ConsumerGroup, error) + func (s *EtcdStore) ListConsumerOffsets(ctx context.Context) ([]ConsumerOffset, error) + func (s *EtcdStore) Metadata(ctx context.Context, topics []string) (*ClusterMetadata, error) + func (s *EtcdStore) NextOffset(ctx context.Context, topic string, partition int32) (int64, error) + func (s *EtcdStore) PutConsumerGroup(ctx context.Context, group *metadatapb.ConsumerGroup) error + func (s *EtcdStore) UpdateOffsets(ctx context.Context, topic string, partition int32, lastOffset int64) error + func (s *EtcdStore) UpdateTopicConfig(ctx context.Context, cfg *metadatapb.TopicConfig) error + type EtcdStoreConfig struct + DialTimeout time.Duration + Endpoints []string + Password string + Username string + type InMemoryStore struct + func NewInMemoryStore(state ClusterMetadata) *InMemoryStore + func (s *InMemoryStore) CommitConsumerOffset(ctx context.Context, group, topic string, partition int32, offset int64, ...) error + func (s *InMemoryStore) CreatePartitions(ctx context.Context, topic string, partitionCount int32) error + func (s *InMemoryStore) CreateTopic(ctx context.Context, spec TopicSpec) (*protocol.MetadataTopic, error) + func (s *InMemoryStore) DeleteConsumerGroup(ctx context.Context, groupID string) error + func (s *InMemoryStore) DeleteTopic(ctx context.Context, name string) error + func (s *InMemoryStore) FetchConsumerGroup(ctx context.Context, groupID string) (*metadatapb.ConsumerGroup, error) + func (s *InMemoryStore) FetchConsumerOffset(ctx context.Context, group, topic string, partition int32) (int64, string, error) + func (s *InMemoryStore) FetchTopicConfig(ctx context.Context, topic string) (*metadatapb.TopicConfig, error) + func (s *InMemoryStore) ListConsumerGroups(ctx context.Context) ([]*metadatapb.ConsumerGroup, error) + func (s *InMemoryStore) ListConsumerOffsets(ctx context.Context) ([]ConsumerOffset, error) + func (s *InMemoryStore) Metadata(ctx context.Context, topics []string) (*ClusterMetadata, error) + func (s *InMemoryStore) NextOffset(ctx context.Context, topic string, partition int32) (int64, error) + func (s *InMemoryStore) PutConsumerGroup(ctx context.Context, group *metadatapb.ConsumerGroup) error + func (s *InMemoryStore) Update(state ClusterMetadata) + func (s *InMemoryStore) UpdateOffsets(ctx context.Context, topic string, partition int32, lastOffset int64) error + func (s *InMemoryStore) UpdateTopicConfig(ctx context.Context, cfg *metadatapb.TopicConfig) error + type Store interface + CommitConsumerOffset func(ctx context.Context, group, topic string, partition int32, offset int64, ...) error + CreatePartitions func(ctx context.Context, topic string, partitionCount int32) error + CreateTopic func(ctx context.Context, spec TopicSpec) (*protocol.MetadataTopic, error) + DeleteConsumerGroup func(ctx context.Context, groupID string) error + DeleteTopic func(ctx context.Context, name string) error + FetchConsumerGroup func(ctx context.Context, groupID string) (*metadatapb.ConsumerGroup, error) + FetchConsumerOffset func(ctx context.Context, group, topic string, partition int32) (int64, string, error) + FetchTopicConfig func(ctx context.Context, topic string) (*metadatapb.TopicConfig, error) + ListConsumerGroups func(ctx context.Context) ([]*metadatapb.ConsumerGroup, error) + ListConsumerOffsets func(ctx context.Context) ([]ConsumerOffset, error) + Metadata func(ctx context.Context, topics []string) (*ClusterMetadata, error) + NextOffset func(ctx context.Context, topic string, partition int32) (int64, error) + PutConsumerGroup func(ctx context.Context, group *metadatapb.ConsumerGroup) error + UpdateOffsets func(ctx context.Context, topic string, partition int32, lastOffset int64) error + UpdateTopicConfig func(ctx context.Context, cfg *metadatapb.TopicConfig) error + type TopicSpec struct + Name string + NumPartitions int32 + ReplicationFactor int16