Documentation
¶
Index ¶
- Constants
- type CoordinatorConfig
- type GroupCoordinator
- func (gc *GroupCoordinator) GetCommittedOffset(groupID string, topic string, partition int32) (int64, error)
- func (gc *GroupCoordinator) GetGroup(groupID string) *GroupMetadata
- func (gc *GroupCoordinator) HandleHeartbeat(req *HeartbeatRequest) (*HeartbeatResponse, error)
- func (gc *GroupCoordinator) HandleJoinGroup(req *JoinGroupRequest) (*JoinGroupResponse, error)
- func (gc *GroupCoordinator) HandleLeaveGroup(req *LeaveGroupRequest) (*LeaveGroupResponse, error)
- func (gc *GroupCoordinator) HandleOffsetCommit(req *OffsetCommitRequest) (*OffsetCommitResponse, error)
- func (gc *GroupCoordinator) HandleOffsetFetch(req *OffsetFetchRequest) (*OffsetFetchResponse, error)
- func (gc *GroupCoordinator) HandleSyncGroup(req *SyncGroupRequest) (*SyncGroupResponse, error)
- func (gc *GroupCoordinator) ListGroups() []*GroupMetadata
- func (gc *GroupCoordinator) Stop() error
- type GroupError
- type GroupMetadata
- type GroupOffsets
- type GroupState
- type HeartbeatRequest
- type HeartbeatResponse
- type JoinGroupMember
- type JoinGroupRequest
- type JoinGroupResponse
- type LeaveGroupRequest
- type LeaveGroupResponse
- type MemberAssignment
- type MemberAssignmentData
- type MemberMetadata
- type MemberState
- type MemoryOffsetStorage
- func (s *MemoryOffsetStorage) DeleteOffsets(groupID string) error
- func (s *MemoryOffsetStorage) FetchOffset(groupID string, topic string, partition int32) (*OffsetAndMetadata, error)
- func (s *MemoryOffsetStorage) FetchOffsets(groupID string) (*GroupOffsets, error)
- func (s *MemoryOffsetStorage) StoreOffset(groupID string, topic string, partition int32, offset *OffsetAndMetadata) error
- type MetadataStore
- type OffsetAndMetadata
- type OffsetCommitData
- type OffsetCommitLog
- type OffsetCommitRequest
- type OffsetCommitResponse
- type OffsetFetchData
- type OffsetFetchRequest
- type OffsetFetchResponse
- type OffsetManager
- func (m *OffsetManager) CommitOffset(groupID string, topic string, partition int32, offset int64, metadata string) error
- func (m *OffsetManager) GetAllOffsets(groupID string) (map[string]map[int32]int64, error)
- func (m *OffsetManager) GetOffset(groupID string, topic string, partition int32) (int64, error)
- func (m *OffsetManager) ResetOffsets(groupID string) error
- func (m *OffsetManager) ValidateOffset(offset int64, minOffset int64, maxOffset int64) error
- type OffsetStorage
- type PersistentOffsetStorage
- func (s *PersistentOffsetStorage) DeleteOffsets(groupID string) error
- func (s *PersistentOffsetStorage) FetchOffset(groupID string, topic string, partition int32) (*OffsetAndMetadata, error)
- func (s *PersistentOffsetStorage) FetchOffsets(groupID string) (*GroupOffsets, error)
- func (s *PersistentOffsetStorage) StoreOffset(groupID string, topic string, partition int32, offset *OffsetAndMetadata) error
- type ProtocolMetadata
- type SyncGroupRequest
- type SyncGroupResponse
Constants ¶
const ( ErrorCodeNone int16 = 0 ErrorCodeUnknown int16 = -1 ErrorCodeIllegalGeneration int16 = 22 ErrorCodeUnknownMemberID int16 = 25 ErrorCodeRebalanceInProgress int16 = 27 ErrorCodeInvalidSessionTimeout int16 = 26 ErrorCodeGroupIDNotFound int16 = 69 ErrorCodeGroupAuthorizationFailed int16 = 30 )
ErrorCode constants
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type CoordinatorConfig ¶
type CoordinatorConfig struct {
// Default session timeout
DefaultSessionTimeoutMs int32
// Default rebalance timeout
DefaultRebalanceTimeoutMs int32
// Min session timeout
MinSessionTimeoutMs int32
// Max session timeout
MaxSessionTimeoutMs int32
// Heartbeat check interval
HeartbeatCheckIntervalMs int32
// Offset retention time
OffsetRetentionMs int64
}
CoordinatorConfig holds coordinator configuration
func DefaultCoordinatorConfig ¶
func DefaultCoordinatorConfig() CoordinatorConfig
DefaultCoordinatorConfig returns default configuration
type GroupCoordinator ¶
type GroupCoordinator struct {
// contains filtered or unexported fields
}
GroupCoordinator manages consumer groups
func NewGroupCoordinator ¶
func NewGroupCoordinator(storage OffsetStorage, config CoordinatorConfig) *GroupCoordinator
NewGroupCoordinator creates a new group coordinator
func (*GroupCoordinator) GetCommittedOffset ¶
func (gc *GroupCoordinator) GetCommittedOffset(groupID string, topic string, partition int32) (int64, error)
GetCommittedOffset returns the committed offset for a group/topic/partition
func (*GroupCoordinator) GetGroup ¶
func (gc *GroupCoordinator) GetGroup(groupID string) *GroupMetadata
GetGroup returns a consumer group by ID
func (*GroupCoordinator) HandleHeartbeat ¶
func (gc *GroupCoordinator) HandleHeartbeat(req *HeartbeatRequest) (*HeartbeatResponse, error)
HandleHeartbeat handles a heartbeat request
func (*GroupCoordinator) HandleJoinGroup ¶
func (gc *GroupCoordinator) HandleJoinGroup(req *JoinGroupRequest) (*JoinGroupResponse, error)
HandleJoinGroup handles a join group request
func (*GroupCoordinator) HandleLeaveGroup ¶
func (gc *GroupCoordinator) HandleLeaveGroup(req *LeaveGroupRequest) (*LeaveGroupResponse, error)
HandleLeaveGroup handles a leave group request
func (*GroupCoordinator) HandleOffsetCommit ¶
func (gc *GroupCoordinator) HandleOffsetCommit(req *OffsetCommitRequest) (*OffsetCommitResponse, error)
HandleOffsetCommit handles an offset commit request
func (*GroupCoordinator) HandleOffsetFetch ¶
func (gc *GroupCoordinator) HandleOffsetFetch(req *OffsetFetchRequest) (*OffsetFetchResponse, error)
HandleOffsetFetch handles an offset fetch request
func (*GroupCoordinator) HandleSyncGroup ¶
func (gc *GroupCoordinator) HandleSyncGroup(req *SyncGroupRequest) (*SyncGroupResponse, error)
HandleSyncGroup handles a sync group request
func (*GroupCoordinator) ListGroups ¶
func (gc *GroupCoordinator) ListGroups() []*GroupMetadata
ListGroups returns all consumer groups
type GroupError ¶
type GroupError struct {
Code int16
}
GroupError represents a group-related error
func (*GroupError) Error ¶
func (e *GroupError) Error() string
type GroupMetadata ¶
type GroupMetadata struct {
// Group identifier
GroupID string
// Current group state
State GroupState
// Protocol type (e.g., "consumer")
ProtocolType string
// Protocol name (e.g., "range", "roundrobin")
ProtocolName string
// Group leader member ID
LeaderID string
// Generation ID (incremented on each rebalance)
GenerationID int32
// Members in the group
Members map[string]*MemberMetadata
// Timestamp of state change
StateTimestamp time.Time
// Rebalance timeout
RebalanceTimeoutMs int32
// Session timeout
SessionTimeoutMs int32
}
GroupMetadata represents metadata about a consumer group
type GroupOffsets ¶
type GroupOffsets struct {
// Group identifier
GroupID string
// Offsets by topic and partition
// Map structure: topic -> partition -> OffsetAndMetadata
Offsets map[string]map[int32]*OffsetAndMetadata
}
GroupOffsets represents all committed offsets for a group
type GroupState ¶
type GroupState string
GroupState represents the state of a consumer group
const ( // GroupStateEmpty means the group has no members GroupStateEmpty GroupState = "Empty" // GroupStatePreparingRebalance means the group is preparing for rebalancing GroupStatePreparingRebalance GroupState = "PreparingRebalance" // GroupStateCompletingRebalance means rebalance assignments are being synced GroupStateCompletingRebalance GroupState = "CompletingRebalance" // GroupStateStable means the group is stable with active members GroupStateStable GroupState = "Stable" // GroupStateDead means the group has no members and is being deleted GroupStateDead GroupState = "Dead" )
type HeartbeatRequest ¶
type HeartbeatRequest struct {
// Group identifier
GroupID string
// Generation ID
GenerationID int32
// Member identifier
MemberID string
}
HeartbeatRequest represents a heartbeat from group member
type HeartbeatResponse ¶
type HeartbeatResponse struct {
// Error code
ErrorCode int16
}
HeartbeatResponse represents response to heartbeat
type JoinGroupMember ¶
JoinGroupMember represents a member in join response
type JoinGroupRequest ¶
type JoinGroupRequest struct {
// Group identifier
GroupID string
// Session timeout in milliseconds
SessionTimeoutMs int32
// Rebalance timeout in milliseconds
RebalanceTimeoutMs int32
// Member identifier (empty for new members)
MemberID string
// Client identifier
ClientID string
// Protocol type (e.g., "consumer")
ProtocolType string
// List of supported protocols and their metadata
Protocols []ProtocolMetadata
}
JoinGroupRequest represents a request to join a consumer group
type JoinGroupResponse ¶
type JoinGroupResponse struct {
// Error code
ErrorCode int16
// Generation ID
GenerationID int32
// Selected protocol name
ProtocolName string
// Assigned member ID
MemberID string
// Leader member ID
LeaderID string
// All members (only populated for leader)
Members []JoinGroupMember
}
JoinGroupResponse represents response to join group request
type LeaveGroupRequest ¶
type LeaveGroupRequest struct {
// Group identifier
GroupID string
// Member identifier
MemberID string
}
LeaveGroupRequest represents a request to leave group
type LeaveGroupResponse ¶
type LeaveGroupResponse struct {
// Error code
ErrorCode int16
}
LeaveGroupResponse represents response to leave group
type MemberAssignment ¶
type MemberAssignment struct {
// Version of assignment
Version int16
// Partition assignments by topic
Partitions map[string][]int32
// User data (for custom assignment strategies)
UserData []byte
}
MemberAssignment represents partition assignments for a member
type MemberAssignmentData ¶
MemberAssignmentData represents assignment for a member
type MemberMetadata ¶
type MemberMetadata struct {
// Unique member identifier
MemberID string
// Client ID for logging
ClientID string
// Client host
ClientHost string
// Member state
State MemberState
// Topics the member is subscribed to
Subscription []string
// Current partition assignment
Assignment *MemberAssignment
// Protocol metadata provided by member
ProtocolMetadata []byte
// Timestamp of last heartbeat
LastHeartbeat time.Time
// Session timeout in milliseconds
SessionTimeoutMs int32
// Rebalance timeout in milliseconds
RebalanceTimeoutMs int32
// Join timestamp
JoinTime time.Time
}
MemberMetadata represents metadata about a group member
type MemberState ¶
type MemberState string
MemberState represents the state of a group member
const ( // MemberStateJoining means member is joining the group MemberStateJoining MemberState = "Joining" // MemberStateAwaitingSync means member is waiting for assignment MemberStateAwaitingSync MemberState = "AwaitingSync" // MemberStateStable means member is active with assignment MemberStateStable MemberState = "Stable" // MemberStateLeaving means member is leaving the group MemberStateLeaving MemberState = "Leaving" )
type MemoryOffsetStorage ¶
type MemoryOffsetStorage struct {
// contains filtered or unexported fields
}
MemoryOffsetStorage is an in-memory implementation of OffsetStorage
func NewMemoryOffsetStorage ¶
func NewMemoryOffsetStorage() *MemoryOffsetStorage
NewMemoryOffsetStorage creates a new in-memory offset storage
func (*MemoryOffsetStorage) DeleteOffsets ¶
func (s *MemoryOffsetStorage) DeleteOffsets(groupID string) error
DeleteOffsets deletes all offsets for a group
func (*MemoryOffsetStorage) FetchOffset ¶
func (s *MemoryOffsetStorage) FetchOffset(groupID string, topic string, partition int32) (*OffsetAndMetadata, error)
FetchOffset fetches an offset for a group/topic/partition
func (*MemoryOffsetStorage) FetchOffsets ¶
func (s *MemoryOffsetStorage) FetchOffsets(groupID string) (*GroupOffsets, error)
FetchOffsets fetches all offsets for a group
func (*MemoryOffsetStorage) StoreOffset ¶
func (s *MemoryOffsetStorage) StoreOffset(groupID string, topic string, partition int32, offset *OffsetAndMetadata) error
StoreOffset stores an offset for a group/topic/partition
type MetadataStore ¶
type MetadataStore interface {
StoreGroupOffset(groupID string, topic string, partition int32, offset *OffsetAndMetadata) error
FetchGroupOffset(groupID string, topic string, partition int32) (*OffsetAndMetadata, error)
FetchGroupOffsets(groupID string) (*GroupOffsets, error)
DeleteGroupOffsets(groupID string) error
}
MetadataStore interface for persisting offsets
type OffsetAndMetadata ¶
type OffsetAndMetadata struct {
// Committed offset
Offset int64
// Metadata string (optional)
Metadata string
// Commit timestamp
CommitTime time.Time
// Expiration timestamp (for cleanup)
ExpireTime time.Time
}
OffsetAndMetadata represents a committed offset with metadata
type OffsetCommitData ¶
OffsetCommitData represents data for committing an offset
type OffsetCommitLog ¶
type OffsetCommitLog struct {
GroupID string
Topic string
Partition int32
Offset int64
Metadata string
Timestamp int64
MemberID string
}
OffsetCommitLog represents a log of offset commits for audit/replay
type OffsetCommitRequest ¶
type OffsetCommitRequest struct {
// Group identifier
GroupID string
// Generation ID (-1 for simple consumer)
GenerationID int32
// Member identifier
MemberID string
// Offsets to commit by topic and partition
Offsets map[string]map[int32]OffsetCommitData
}
OffsetCommitRequest represents a request to commit offsets
type OffsetCommitResponse ¶
type OffsetCommitResponse struct {
// Errors by topic and partition
Errors map[string]map[int32]int16
}
OffsetCommitResponse represents response to offset commit
type OffsetFetchData ¶
OffsetFetchData represents fetched offset data
type OffsetFetchRequest ¶
type OffsetFetchRequest struct {
// Group identifier
GroupID string
// Topics and partitions to fetch (nil means all)
Topics map[string][]int32
}
OffsetFetchRequest represents a request to fetch committed offsets
type OffsetFetchResponse ¶
type OffsetFetchResponse struct {
// Offsets by topic and partition
Offsets map[string]map[int32]OffsetFetchData
}
OffsetFetchResponse represents response to offset fetch
type OffsetManager ¶
type OffsetManager struct {
// contains filtered or unexported fields
}
OffsetManager provides higher-level offset management operations
func NewOffsetManager ¶
func NewOffsetManager(storage OffsetStorage) *OffsetManager
NewOffsetManager creates a new offset manager
func (*OffsetManager) CommitOffset ¶
func (m *OffsetManager) CommitOffset(groupID string, topic string, partition int32, offset int64, metadata string) error
CommitOffset commits a single offset
func (*OffsetManager) GetAllOffsets ¶
GetAllOffsets gets all committed offsets for a group
func (*OffsetManager) ResetOffsets ¶
func (m *OffsetManager) ResetOffsets(groupID string) error
ResetOffsets resets all offsets for a group
func (*OffsetManager) ValidateOffset ¶
func (m *OffsetManager) ValidateOffset(offset int64, minOffset int64, maxOffset int64) error
ValidateOffset validates that an offset is within valid range
type OffsetStorage ¶
type OffsetStorage interface {
// StoreOffset stores an offset for a group/topic/partition
StoreOffset(groupID string, topic string, partition int32, offset *OffsetAndMetadata) error
// FetchOffset fetches an offset for a group/topic/partition
FetchOffset(groupID string, topic string, partition int32) (*OffsetAndMetadata, error)
// FetchOffsets fetches all offsets for a group
FetchOffsets(groupID string) (*GroupOffsets, error)
// DeleteOffsets deletes offsets for a group
DeleteOffsets(groupID string) error
}
OffsetStorage interface for storing offsets
type PersistentOffsetStorage ¶
type PersistentOffsetStorage struct {
// contains filtered or unexported fields
}
PersistentOffsetStorage is a persistent implementation using the metadata store
func NewPersistentOffsetStorage ¶
func NewPersistentOffsetStorage(store MetadataStore) *PersistentOffsetStorage
NewPersistentOffsetStorage creates a new persistent offset storage
func (*PersistentOffsetStorage) DeleteOffsets ¶
func (s *PersistentOffsetStorage) DeleteOffsets(groupID string) error
DeleteOffsets deletes offsets from persistent storage
func (*PersistentOffsetStorage) FetchOffset ¶
func (s *PersistentOffsetStorage) FetchOffset(groupID string, topic string, partition int32) (*OffsetAndMetadata, error)
FetchOffset fetches an offset from persistent storage
func (*PersistentOffsetStorage) FetchOffsets ¶
func (s *PersistentOffsetStorage) FetchOffsets(groupID string) (*GroupOffsets, error)
FetchOffsets fetches all offsets for a group from persistent storage
func (*PersistentOffsetStorage) StoreOffset ¶
func (s *PersistentOffsetStorage) StoreOffset(groupID string, topic string, partition int32, offset *OffsetAndMetadata) error
StoreOffset stores an offset persistently
type ProtocolMetadata ¶
type ProtocolMetadata struct {
// Protocol name (e.g., "range", "roundrobin")
Name string
// Protocol-specific metadata
Metadata []byte
}
ProtocolMetadata represents protocol and its metadata
type SyncGroupRequest ¶
type SyncGroupRequest struct {
// Group identifier
GroupID string
// Generation ID
GenerationID int32
// Member identifier
MemberID string
// Assignments (only provided by leader)
Assignments []MemberAssignmentData
}
SyncGroupRequest represents a request to sync group assignment
type SyncGroupResponse ¶
type SyncGroupResponse struct {
// Error code
ErrorCode int16
// Member assignment
Assignment []byte
}
SyncGroupResponse represents response to sync group request