Documentation
¶
Index ¶
Constants ¶
View Source
const OffsetTableName = "consumer_offset"
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type ConsumerOffset ¶
type ConsumerOffset struct {
ConsumerGroup string `gorm:"column:consumer_group;primary_key"`
Topic string `gorm:"column:topic;primary_key"`
KafkaPartition int32 `gorm:"column:kafka_partition;primary_key"`
Offset int64 `gorm:"column:offset"`
Ts time.Time `gorm:"column:ts"`
Metadata string `gorm:"column:metadata"`
}
func (ConsumerOffset) TableName ¶
func (ConsumerOffset) TableName() string
type OffsetCommitRequest ¶
type OffsetCommitRequest struct {
ConsumerGroup string
ConsumerGroupGeneration int32 // nothing to do
ConsumerID string // nothing to do
RetentionTime int64 // nothing to do
// contains filtered or unexported fields
}
OffsetCommitRequest contains configuration options for requesting commit offset.
func (*OffsetCommitRequest) AddBlock ¶
func (r *OffsetCommitRequest) AddBlock(topic string, partitionID int32, offset int64, timestamp int64, metadata string)
AddBlock to set commit offset of request block
func (*OffsetCommitRequest) Blocks ¶
func (r *OffsetCommitRequest) Blocks() map[string]map[int32]*OffsetCommitRequestBlock
type OffsetCommitResponse ¶
OffsetCommitResponse is the value for response of commit offset
func SaveOffsetToDB ¶
func SaveOffsetToDB(db *sql.DB, fullTableName string, request *OffsetCommitRequest) (*OffsetCommitResponse, error)
type OffsetFetchRequest ¶
type OffsetFetchRequest struct {
ConsumerGroup string
// contains filtered or unexported fields
}
OffsetFetchRequest contains configuration options for request of fetch offset
func (*OffsetFetchRequest) AddPartition ¶
func (r *OffsetFetchRequest) AddPartition(topic string, partitionID int32)
AddPartition to set partition for request
func (*OffsetFetchRequest) Partitions ¶
func (r *OffsetFetchRequest) Partitions() map[string][]int32
type OffsetFetchResponse ¶
type OffsetFetchResponse struct {
Blocks map[string]map[int32]*OffsetFetchResponseBlock
LastUpdate time.Time //only for db position store
}
OffsetFetchResponse contains configuration options for response of fetch offset
func FetchOffsetFromDB ¶
func FetchOffsetFromDB(db *sql.DB, request *OffsetFetchRequest) (*OffsetFetchResponse, error)
func (*OffsetFetchResponse) AddBlock ¶
func (r *OffsetFetchResponse) AddBlock(topic string, partitionID int32, offset int64, metadata string)
AddBlock to set fetch offset of response block
func (*OffsetFetchResponse) GetBlock ¶
func (r *OffsetFetchResponse) GetBlock(topic string, partition int32) *OffsetFetchResponseBlock
GetBlock to get fetch offset of response block
type OffsetFetchResponseBlock ¶
OffsetFetchResponseBlock contains configuration options for response of fetch offset block
Click to show internal directories.
Click to hide internal directories.