group

package
v1.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 15, 2026 License: Apache-2.0 Imports: 6 Imported by: 0

Documentation

Index

Constants

View Source
const (
	ErrorCodeNone                     int16 = 0
	ErrorCodeUnknown                  int16 = -1
	ErrorCodeIllegalGeneration        int16 = 22
	ErrorCodeUnknownMemberID          int16 = 25
	ErrorCodeRebalanceInProgress      int16 = 27
	ErrorCodeInvalidSessionTimeout    int16 = 26
	ErrorCodeGroupIDNotFound          int16 = 69
	ErrorCodeGroupAuthorizationFailed int16 = 30
)

ErrorCode constants

Variables

This section is empty.

Functions

This section is empty.

Types

type CoordinatorConfig

type CoordinatorConfig struct {
	// Default session timeout
	DefaultSessionTimeoutMs int32

	// Default rebalance timeout
	DefaultRebalanceTimeoutMs int32

	// Min session timeout
	MinSessionTimeoutMs int32

	// Max session timeout
	MaxSessionTimeoutMs int32

	// Heartbeat check interval
	HeartbeatCheckIntervalMs int32

	// Offset retention time
	OffsetRetentionMs int64
}

CoordinatorConfig holds coordinator configuration

func DefaultCoordinatorConfig

func DefaultCoordinatorConfig() CoordinatorConfig

DefaultCoordinatorConfig returns default configuration

type GroupCoordinator

type GroupCoordinator struct {
	// contains filtered or unexported fields
}

GroupCoordinator manages consumer groups

func NewGroupCoordinator

func NewGroupCoordinator(storage OffsetStorage, config CoordinatorConfig) *GroupCoordinator

NewGroupCoordinator creates a new group coordinator

func (*GroupCoordinator) GetCommittedOffset

func (gc *GroupCoordinator) GetCommittedOffset(groupID string, topic string, partition int32) (int64, error)

GetCommittedOffset returns the committed offset for a group/topic/partition

func (*GroupCoordinator) GetGroup

func (gc *GroupCoordinator) GetGroup(groupID string) *GroupMetadata

GetGroup returns a consumer group by ID

func (*GroupCoordinator) HandleHeartbeat

func (gc *GroupCoordinator) HandleHeartbeat(req *HeartbeatRequest) (*HeartbeatResponse, error)

HandleHeartbeat handles a heartbeat request

func (*GroupCoordinator) HandleJoinGroup

func (gc *GroupCoordinator) HandleJoinGroup(req *JoinGroupRequest) (*JoinGroupResponse, error)

HandleJoinGroup handles a join group request

func (*GroupCoordinator) HandleLeaveGroup

func (gc *GroupCoordinator) HandleLeaveGroup(req *LeaveGroupRequest) (*LeaveGroupResponse, error)

HandleLeaveGroup handles a leave group request

func (*GroupCoordinator) HandleOffsetCommit

func (gc *GroupCoordinator) HandleOffsetCommit(req *OffsetCommitRequest) (*OffsetCommitResponse, error)

HandleOffsetCommit handles an offset commit request

func (*GroupCoordinator) HandleOffsetFetch

func (gc *GroupCoordinator) HandleOffsetFetch(req *OffsetFetchRequest) (*OffsetFetchResponse, error)

HandleOffsetFetch handles an offset fetch request

func (*GroupCoordinator) HandleSyncGroup

func (gc *GroupCoordinator) HandleSyncGroup(req *SyncGroupRequest) (*SyncGroupResponse, error)

HandleSyncGroup handles a sync group request

func (*GroupCoordinator) ListGroups

func (gc *GroupCoordinator) ListGroups() []*GroupMetadata

ListGroups returns all consumer groups

func (*GroupCoordinator) PerformAssignment added in v1.1.0

func (gc *GroupCoordinator) PerformAssignment(
	groupID string,
	partitions []TopicPartition,
) (map[string][]TopicPartition, error)

PerformAssignment runs partition assignment using the group's protocol strategy. It builds MemberSubscription list from group members and invokes the chosen assignor.

func (*GroupCoordinator) Stop

func (gc *GroupCoordinator) Stop() error

Stop stops the coordinator

type GroupError

type GroupError struct {
	Code int16
}

GroupError represents a group-related error

func (*GroupError) Error

func (e *GroupError) Error() string

type GroupMetadata

type GroupMetadata struct {
	// Group identifier
	GroupID string

	// Current group state
	State GroupState

	// Protocol type (e.g., "consumer")
	ProtocolType string

	// Protocol name (e.g., "range", "roundrobin")
	ProtocolName string

	// Group leader member ID
	LeaderID string

	// Generation ID (incremented on each rebalance)
	GenerationID int32

	// Members in the group
	Members map[string]*MemberMetadata

	// Timestamp of state change
	StateTimestamp time.Time

	// Rebalance timeout
	RebalanceTimeoutMs int32

	// Session timeout
	SessionTimeoutMs int32
}

GroupMetadata represents metadata about a consumer group

type GroupOffsets

type GroupOffsets struct {
	// Group identifier
	GroupID string

	// Offsets by topic and partition
	// Map structure: topic -> partition -> OffsetAndMetadata
	Offsets map[string]map[int32]*OffsetAndMetadata
}

GroupOffsets represents all committed offsets for a group

type GroupState

type GroupState string

GroupState represents the state of a consumer group

const (
	// GroupStateEmpty means the group has no members
	GroupStateEmpty GroupState = "Empty"
	// GroupStatePreparingRebalance means the group is preparing for rebalancing
	GroupStatePreparingRebalance GroupState = "PreparingRebalance"
	// GroupStateCompletingRebalance means rebalance assignments are being synced
	GroupStateCompletingRebalance GroupState = "CompletingRebalance"
	// GroupStateStable means the group is stable with active members
	GroupStateStable GroupState = "Stable"
	// GroupStateDead means the group has no members and is being deleted
	GroupStateDead GroupState = "Dead"
)

type HeartbeatRequest

type HeartbeatRequest struct {
	// Group identifier
	GroupID string

	// Generation ID
	GenerationID int32

	// Member identifier
	MemberID string
}

HeartbeatRequest represents a heartbeat from group member

type HeartbeatResponse

type HeartbeatResponse struct {
	// Error code
	ErrorCode int16
}

HeartbeatResponse represents response to heartbeat

type JoinGroupMember

type JoinGroupMember struct {
	MemberID string
	Metadata []byte
}

JoinGroupMember represents a member in join response

type JoinGroupRequest

type JoinGroupRequest struct {
	// Group identifier
	GroupID string

	// Session timeout in milliseconds
	SessionTimeoutMs int32

	// Rebalance timeout in milliseconds
	RebalanceTimeoutMs int32

	// Member identifier (empty for new members)
	MemberID string

	// Client identifier
	ClientID string

	// Protocol type (e.g., "consumer")
	ProtocolType string

	// List of supported protocols and their metadata
	Protocols []ProtocolMetadata
}

JoinGroupRequest represents a request to join a consumer group

type JoinGroupResponse

type JoinGroupResponse struct {
	// Error code
	ErrorCode int16

	// Generation ID
	GenerationID int32

	// Selected protocol name
	ProtocolName string

	// Assigned member ID
	MemberID string

	// Leader member ID
	LeaderID string

	// All members (only populated for leader)
	Members []JoinGroupMember
}

JoinGroupResponse represents response to join group request

type LeaveGroupRequest

type LeaveGroupRequest struct {
	// Group identifier
	GroupID string

	// Member identifier
	MemberID string
}

LeaveGroupRequest represents a request to leave group

type LeaveGroupResponse

type LeaveGroupResponse struct {
	// Error code
	ErrorCode int16
}

LeaveGroupResponse represents response to leave group

type MemberAssignment

type MemberAssignment struct {
	// Version of assignment
	Version int16

	// Partition assignments by topic
	Partitions map[string][]int32

	// User data (for custom assignment strategies)
	UserData []byte
}

MemberAssignment represents partition assignments for a member

type MemberAssignmentData

type MemberAssignmentData struct {
	MemberID   string
	Assignment []byte
}

MemberAssignmentData represents assignment for a member

type MemberMetadata

type MemberMetadata struct {
	// Unique member identifier
	MemberID string

	// Client ID for logging
	ClientID string

	// Client host
	ClientHost string

	// Member state
	State MemberState

	// Topics the member is subscribed to
	Subscription []string

	// Current partition assignment
	Assignment *MemberAssignment

	// Protocol metadata provided by member
	ProtocolMetadata []byte

	// Timestamp of last heartbeat
	LastHeartbeat time.Time

	// Session timeout in milliseconds
	SessionTimeoutMs int32

	// Rebalance timeout in milliseconds
	RebalanceTimeoutMs int32

	// Join timestamp
	JoinTime time.Time
}

MemberMetadata represents metadata about a group member

type MemberState

type MemberState string

MemberState represents the state of a group member

const (
	// MemberStateJoining means member is joining the group
	MemberStateJoining MemberState = "Joining"
	// MemberStateAwaitingSync means member is waiting for assignment
	MemberStateAwaitingSync MemberState = "AwaitingSync"
	// MemberStateStable means member is active with assignment
	MemberStateStable MemberState = "Stable"
	// MemberStateLeaving means member is leaving the group
	MemberStateLeaving MemberState = "Leaving"
)

type MemberSubscription added in v1.1.0

type MemberSubscription struct {
	MemberID string
	Topics   []string
}

MemberSubscription represents a member and the topics it subscribes to

type MemoryOffsetStorage

type MemoryOffsetStorage struct {
	// contains filtered or unexported fields
}

MemoryOffsetStorage is an in-memory implementation of OffsetStorage

func NewMemoryOffsetStorage

func NewMemoryOffsetStorage() *MemoryOffsetStorage

NewMemoryOffsetStorage creates a new in-memory offset storage

func (*MemoryOffsetStorage) DeleteOffsets

func (s *MemoryOffsetStorage) DeleteOffsets(groupID string) error

DeleteOffsets deletes all offsets for a group

func (*MemoryOffsetStorage) FetchOffset

func (s *MemoryOffsetStorage) FetchOffset(groupID string, topic string, partition int32) (*OffsetAndMetadata, error)

FetchOffset fetches an offset for a group/topic/partition

func (*MemoryOffsetStorage) FetchOffsets

func (s *MemoryOffsetStorage) FetchOffsets(groupID string) (*GroupOffsets, error)

FetchOffsets fetches all offsets for a group

func (*MemoryOffsetStorage) StoreOffset

func (s *MemoryOffsetStorage) StoreOffset(groupID string, topic string, partition int32, offset *OffsetAndMetadata) error

StoreOffset stores an offset for a group/topic/partition

type MetadataStore

type MetadataStore interface {
	StoreGroupOffset(groupID string, topic string, partition int32, offset *OffsetAndMetadata) error
	FetchGroupOffset(groupID string, topic string, partition int32) (*OffsetAndMetadata, error)
	FetchGroupOffsets(groupID string) (*GroupOffsets, error)
	DeleteGroupOffsets(groupID string) error
}

MetadataStore interface for persisting offsets

type OffsetAndMetadata

type OffsetAndMetadata struct {
	// Committed offset
	Offset int64

	// Metadata string (optional)
	Metadata string

	// Commit timestamp
	CommitTime time.Time

	// Expiration timestamp (for cleanup)
	ExpireTime time.Time

	// LeaderEpoch is the epoch of the partition leader when this offset was committed.
	// Used for offset validation - if leader epoch has changed significantly,
	// the committed offset may no longer be valid (truncation may have occurred).
	LeaderEpoch int64
}

OffsetAndMetadata represents a committed offset with metadata

type OffsetCommitData

type OffsetCommitData struct {
	Offset   int64
	Metadata string
}

OffsetCommitData represents data for committing an offset

type OffsetCommitLog

type OffsetCommitLog struct {
	GroupID   string
	Topic     string
	Partition int32
	Offset    int64
	Metadata  string
	Timestamp int64
	MemberID  string
}

OffsetCommitLog represents a log of offset commits for audit/replay

type OffsetCommitRequest

type OffsetCommitRequest struct {
	// Group identifier
	GroupID string

	// Generation ID (-1 for simple consumer)
	GenerationID int32

	// Member identifier
	MemberID string

	// Offsets to commit by topic and partition
	Offsets map[string]map[int32]OffsetCommitData
}

OffsetCommitRequest represents a request to commit offsets

type OffsetCommitResponse

type OffsetCommitResponse struct {
	// Errors by topic and partition
	Errors map[string]map[int32]int16
}

OffsetCommitResponse represents response to offset commit

type OffsetFetchData

type OffsetFetchData struct {
	Offset    int64
	Metadata  string
	ErrorCode int16
}

OffsetFetchData represents fetched offset data

type OffsetFetchRequest

type OffsetFetchRequest struct {
	// Group identifier
	GroupID string

	// Topics and partitions to fetch (nil means all)
	Topics map[string][]int32
}

OffsetFetchRequest represents a request to fetch committed offsets

type OffsetFetchResponse

type OffsetFetchResponse struct {
	// Offsets by topic and partition
	Offsets map[string]map[int32]OffsetFetchData
}

OffsetFetchResponse represents response to offset fetch

type OffsetManager

type OffsetManager struct {
	// contains filtered or unexported fields
}

OffsetManager provides higher-level offset management operations

func NewOffsetManager

func NewOffsetManager(storage OffsetStorage) *OffsetManager

NewOffsetManager creates a new offset manager

func (*OffsetManager) CommitOffset

func (m *OffsetManager) CommitOffset(groupID string, topic string, partition int32, offset int64, metadata string) error

CommitOffset commits a single offset

func (*OffsetManager) GetAllOffsets

func (m *OffsetManager) GetAllOffsets(groupID string) (map[string]map[int32]int64, error)

GetAllOffsets gets all committed offsets for a group

func (*OffsetManager) GetOffset

func (m *OffsetManager) GetOffset(groupID string, topic string, partition int32) (int64, error)

GetOffset gets a committed offset

func (*OffsetManager) ResetOffsets

func (m *OffsetManager) ResetOffsets(groupID string) error

ResetOffsets resets all offsets for a group

func (*OffsetManager) ValidateOffset

func (m *OffsetManager) ValidateOffset(offset int64, minOffset int64, maxOffset int64) error

ValidateOffset validates that an offset is within valid range

type OffsetStorage

type OffsetStorage interface {
	// StoreOffset stores an offset for a group/topic/partition
	StoreOffset(groupID string, topic string, partition int32, offset *OffsetAndMetadata) error

	// FetchOffset fetches an offset for a group/topic/partition
	FetchOffset(groupID string, topic string, partition int32) (*OffsetAndMetadata, error)

	// FetchOffsets fetches all offsets for a group
	FetchOffsets(groupID string) (*GroupOffsets, error)

	// DeleteOffsets deletes offsets for a group
	DeleteOffsets(groupID string) error
}

OffsetStorage interface for storing offsets

type PartitionAssignor added in v1.1.0

type PartitionAssignor interface {
	Name() string
	Assign(members []MemberSubscription, partitions []TopicPartition) map[string][]TopicPartition
}

PartitionAssignor assigns partitions to consumer group members

func GetAssignor added in v1.1.0

func GetAssignor(name string) PartitionAssignor

GetAssignor returns a PartitionAssignor by strategy name. Supported strategies: "range", "roundrobin", "sticky".

type PersistentOffsetStorage

type PersistentOffsetStorage struct {
	// contains filtered or unexported fields
}

PersistentOffsetStorage is a persistent implementation using the metadata store

func NewPersistentOffsetStorage

func NewPersistentOffsetStorage(store MetadataStore) *PersistentOffsetStorage

NewPersistentOffsetStorage creates a new persistent offset storage

func (*PersistentOffsetStorage) DeleteOffsets

func (s *PersistentOffsetStorage) DeleteOffsets(groupID string) error

DeleteOffsets deletes offsets from persistent storage

func (*PersistentOffsetStorage) FetchOffset

func (s *PersistentOffsetStorage) FetchOffset(groupID string, topic string, partition int32) (*OffsetAndMetadata, error)

FetchOffset fetches an offset from persistent storage

func (*PersistentOffsetStorage) FetchOffsets

func (s *PersistentOffsetStorage) FetchOffsets(groupID string) (*GroupOffsets, error)

FetchOffsets fetches all offsets for a group from persistent storage

func (*PersistentOffsetStorage) StoreOffset

func (s *PersistentOffsetStorage) StoreOffset(groupID string, topic string, partition int32, offset *OffsetAndMetadata) error

StoreOffset stores an offset persistently

type ProtocolMetadata

type ProtocolMetadata struct {
	// Protocol name (e.g., "range", "roundrobin")
	Name string

	// Protocol-specific metadata
	Metadata []byte
}

ProtocolMetadata represents protocol and its metadata

type RangeAssignor added in v1.1.0

type RangeAssignor struct{}

RangeAssignor assigns partitions using a range-based strategy. For each topic, partitions are divided evenly among subscribed members. If not evenly divisible, the first N members get one extra partition.

func (*RangeAssignor) Assign added in v1.1.0

func (r *RangeAssignor) Assign(members []MemberSubscription, partitions []TopicPartition) map[string][]TopicPartition

Assign distributes partitions to members using range assignment.

func (*RangeAssignor) Name added in v1.1.0

func (r *RangeAssignor) Name() string

Name returns the assignor strategy name.

type RoundRobinAssignor added in v1.1.0

type RoundRobinAssignor struct{}

RoundRobinAssignor assigns partitions in round-robin order across members. All partitions across all subscribed topics are sorted, then distributed one at a time to each consumer in order.

func (*RoundRobinAssignor) Assign added in v1.1.0

func (rr *RoundRobinAssignor) Assign(members []MemberSubscription, partitions []TopicPartition) map[string][]TopicPartition

Assign distributes partitions to members using round-robin assignment.

func (*RoundRobinAssignor) Name added in v1.1.0

func (rr *RoundRobinAssignor) Name() string

Name returns the assignor strategy name.

type StickyAssignor added in v1.1.0

type StickyAssignor struct{}

StickyAssignor tries to preserve existing assignments when possible. On initial assignment it uses round-robin. On subsequent rebalances, it keeps existing assignments and only reassigns partitions that must move.

func (*StickyAssignor) Assign added in v1.1.0

func (s *StickyAssignor) Assign(members []MemberSubscription, partitions []TopicPartition) map[string][]TopicPartition

Assign distributes partitions to members using sticky assignment. The previousAssignments parameter is embedded in the method via StickyAssignWithPrevious. When called through the PartitionAssignor interface, it falls back to round-robin.

func (*StickyAssignor) AssignWithPrevious added in v1.1.0

func (s *StickyAssignor) AssignWithPrevious(
	members []MemberSubscription,
	partitions []TopicPartition,
	previousAssignments map[string][]TopicPartition,
) map[string][]TopicPartition

AssignWithPrevious distributes partitions preserving previous assignments where possible.

func (*StickyAssignor) Name added in v1.1.0

func (s *StickyAssignor) Name() string

Name returns the assignor strategy name.

type SyncGroupRequest

type SyncGroupRequest struct {
	// Group identifier
	GroupID string

	// Generation ID
	GenerationID int32

	// Member identifier
	MemberID string

	// Assignments (only provided by leader)
	Assignments []MemberAssignmentData
}

SyncGroupRequest represents a request to sync group assignment

type SyncGroupResponse

type SyncGroupResponse struct {
	// Error code
	ErrorCode int16

	// Member assignment
	Assignment []byte
}

SyncGroupResponse represents response to sync group request

type TopicPartition added in v1.1.0

type TopicPartition struct {
	Topic     string
	Partition int32
}

TopicPartition represents a topic and partition pair

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL