protocol

package
v0.9.0-dev.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 20, 2025 License: Apache-2.0 Imports: 3 Imported by: 0

Documentation

Index

Constants

View Source
const (
	APIKeyProduce         int16 = 0
	APIKeyFetch           int16 = 1
	APIKeyMetadata        int16 = 3
	APIKeyOffsetCommit    int16 = 8
	APIKeyOffsetFetch     int16 = 9
	APIKeyFindCoordinator int16 = 10
	APIKeyJoinGroup       int16 = 11
	APIKeyHeartbeat       int16 = 12
	APIKeyLeaveGroup      int16 = 13
	APIKeySyncGroup       int16 = 14
	APIKeyApiVersion      int16 = 18
	APIKeyCreateTopics    int16 = 19
	APIKeyDeleteTopics    int16 = 20
	APIKeyListOffsets     int16 = 2
	APIKeyDescribeConfigs int16 = 32
	APIKeyDeleteGroups    int16 = 42
)

API keys supported by Kafscale in milestone 1.

View Source
const (
	NONE                       int16 = 0
	OFFSET_OUT_OF_RANGE        int16 = 1
	UNKNOWN_TOPIC_OR_PARTITION int16 = 3
	UNKNOWN_TOPIC_ID           int16 = 100
	UNKNOWN_SERVER_ERROR       int16 = -1
	REQUEST_TIMED_OUT          int16 = 7
	ILLEGAL_GENERATION         int16 = 22
	UNKNOWN_MEMBER_ID          int16 = 25
	REBALANCE_IN_PROGRESS      int16 = 27
	INVALID_TOPIC_EXCEPTION    int16 = 17
	TOPIC_ALREADY_EXISTS       int16 = 36
	UNSUPPORTED_VERSION        int16 = 35
)

Variables

This section is empty.

Functions

func EncodeApiVersionsResponse

func EncodeApiVersionsResponse(resp *ApiVersionsResponse) ([]byte, error)

EncodeApiVersionsResponse renders bytes ready to send on the wire.

func EncodeCreateTopicsResponse

func EncodeCreateTopicsResponse(resp *CreateTopicsResponse) ([]byte, error)

func EncodeDeleteTopicsResponse

func EncodeDeleteTopicsResponse(resp *DeleteTopicsResponse) ([]byte, error)

func EncodeFetchResponse

func EncodeFetchResponse(resp *FetchResponse, version int16) ([]byte, error)

EncodeFetchResponse renders bytes for fetch responses.

func EncodeFindCoordinatorResponse

func EncodeFindCoordinatorResponse(resp *FindCoordinatorResponse, version int16) ([]byte, error)

func EncodeHeartbeatResponse

func EncodeHeartbeatResponse(resp *HeartbeatResponse, version int16) ([]byte, error)

func EncodeJoinGroupResponse

func EncodeJoinGroupResponse(resp *JoinGroupResponse, version int16) ([]byte, error)

func EncodeLeaveGroupResponse

func EncodeLeaveGroupResponse(resp *LeaveGroupResponse) ([]byte, error)

func EncodeListOffsetsResponse

func EncodeListOffsetsResponse(version int16, resp *ListOffsetsResponse) ([]byte, error)

func EncodeMetadataResponse

func EncodeMetadataResponse(resp *MetadataResponse, version int16) ([]byte, error)

EncodeMetadataResponse renders bytes for metadata responses. version should match the Metadata request version that triggered this response.

func EncodeOffsetCommitResponse

func EncodeOffsetCommitResponse(resp *OffsetCommitResponse) ([]byte, error)

func EncodeOffsetFetchResponse

func EncodeOffsetFetchResponse(resp *OffsetFetchResponse, version int16) ([]byte, error)

func EncodeProduceResponse

func EncodeProduceResponse(resp *ProduceResponse, version int16) ([]byte, error)

EncodeProduceResponse renders bytes for produce responses.

func EncodeResponse

func EncodeResponse(payload []byte) ([]byte, error)

EncodeResponse wraps a response payload into a Kafka frame.

func EncodeSyncGroupResponse

func EncodeSyncGroupResponse(resp *SyncGroupResponse, version int16) ([]byte, error)

func ParseRequest

func ParseRequest(b []byte) (*RequestHeader, Request, error)

ParseRequest decodes a request header and body from bytes.

func WriteFrame

func WriteFrame(w io.Writer, payload []byte) error

WriteFrame writes payload prefixed with its length to w.

Types

type ApiVersion

type ApiVersion struct {
	APIKey     int16
	MinVersion int16
	MaxVersion int16
}

ApiVersion describes the supported version range for an API.

type ApiVersionsRequest

type ApiVersionsRequest struct{}

ApiVersionsRequest describes the ApiVersions call.

func (ApiVersionsRequest) APIKey

func (ApiVersionsRequest) APIKey() int16

type ApiVersionsResponse

type ApiVersionsResponse struct {
	CorrelationID int32
	ErrorCode     int16
	Versions      []ApiVersion
}

ApiVersionsResponse describes server capabilities.

type CreateTopicConfig

type CreateTopicConfig struct {
	Name              string
	NumPartitions     int32
	ReplicationFactor int16
}

type CreateTopicResult

type CreateTopicResult struct {
	Name         string
	ErrorCode    int16
	ErrorMessage string
}

type CreateTopicsRequest

type CreateTopicsRequest struct {
	Topics []CreateTopicConfig
}

func (CreateTopicsRequest) APIKey

func (CreateTopicsRequest) APIKey() int16

type CreateTopicsResponse

type CreateTopicsResponse struct {
	CorrelationID int32
	Topics        []CreateTopicResult
}

type DeleteTopicResult

type DeleteTopicResult struct {
	Name         string
	ErrorCode    int16
	ErrorMessage string
}

type DeleteTopicsRequest

type DeleteTopicsRequest struct {
	TopicNames []string
}

func (DeleteTopicsRequest) APIKey

func (DeleteTopicsRequest) APIKey() int16

type DeleteTopicsResponse

type DeleteTopicsResponse struct {
	CorrelationID int32
	Topics        []DeleteTopicResult
}

type FetchAbortedTransaction

type FetchAbortedTransaction struct {
	ProducerID  int64
	FirstOffset int64
}

type FetchPartitionRequest

type FetchPartitionRequest struct {
	Partition   int32
	FetchOffset int64
	MaxBytes    int32
}

type FetchPartitionResponse

type FetchPartitionResponse struct {
	Partition            int32
	ErrorCode            int16
	HighWatermark        int64
	LastStableOffset     int64
	LogStartOffset       int64
	PreferredReadReplica int32
	RecordSet            []byte
	AbortedTransactions  []FetchAbortedTransaction
}

type FetchRequest

type FetchRequest struct {
	ReplicaID      int32
	MaxWaitMs      int32
	MinBytes       int32
	MaxBytes       int32
	IsolationLevel int8
	SessionID      int32
	SessionEpoch   int32
	Topics         []FetchTopicRequest
}

FetchRequest represents a subset of Kafka FetchRequest v13.

func (FetchRequest) APIKey

func (FetchRequest) APIKey() int16

type FetchResponse

type FetchResponse struct {
	CorrelationID int32
	ThrottleMs    int32
	ErrorCode     int16
	SessionID     int32
	Topics        []FetchTopicResponse
}

FetchResponse represents data returned to consumers.

type FetchTopicRequest

type FetchTopicRequest struct {
	Name       string
	TopicID    [16]byte
	Partitions []FetchPartitionRequest
}

type FetchTopicResponse

type FetchTopicResponse struct {
	Name       string
	TopicID    [16]byte
	Partitions []FetchPartitionResponse
}

type FindCoordinatorRequest

type FindCoordinatorRequest struct {
	KeyType int8
	Key     string
}

FindCoordinatorRequest targets a group coordinator lookup.

func (FindCoordinatorRequest) APIKey

func (FindCoordinatorRequest) APIKey() int16

type FindCoordinatorResponse

type FindCoordinatorResponse struct {
	CorrelationID int32
	ThrottleMs    int32
	ErrorCode     int16
	ErrorMessage  *string
	NodeID        int32
	Host          string
	Port          int32
}

type Frame

type Frame struct {
	Length  int32
	Payload []byte
}

Frame represents a Kafka request or response frame.

func ReadFrame

func ReadFrame(r io.Reader) (*Frame, error)

ReadFrame reads a single size-prefixed frame from r.

type HeartbeatRequest

type HeartbeatRequest struct {
	GroupID      string
	GenerationID int32
	MemberID     string
	InstanceID   *string
}

func (HeartbeatRequest) APIKey

func (HeartbeatRequest) APIKey() int16

type HeartbeatResponse

type HeartbeatResponse struct {
	CorrelationID int32
	ThrottleMs    int32
	ErrorCode     int16
}

type JoinGroupMember

type JoinGroupMember struct {
	MemberID   string
	InstanceID *string
	Metadata   []byte
}

type JoinGroupProtocol

type JoinGroupProtocol struct {
	Name     string
	Metadata []byte
}

type JoinGroupRequest

type JoinGroupRequest struct {
	GroupID            string
	SessionTimeoutMs   int32
	RebalanceTimeoutMs int32
	MemberID           string
	ProtocolType       string
	Protocols          []JoinGroupProtocol
}

func (JoinGroupRequest) APIKey

func (JoinGroupRequest) APIKey() int16

type JoinGroupResponse

type JoinGroupResponse struct {
	CorrelationID int32
	ThrottleMs    int32
	ErrorCode     int16
	GenerationID  int32
	ProtocolName  string
	LeaderID      string
	MemberID      string
	Members       []JoinGroupMember
}

type LeaveGroupRequest

type LeaveGroupRequest struct {
	GroupID  string
	MemberID string
}

func (LeaveGroupRequest) APIKey

func (LeaveGroupRequest) APIKey() int16

type LeaveGroupResponse

type LeaveGroupResponse struct {
	CorrelationID int32
	ErrorCode     int16
}

type ListOffsetsPartition

type ListOffsetsPartition struct {
	Partition     int32
	Timestamp     int64
	MaxNumOffsets int32
}

type ListOffsetsPartitionResponse

type ListOffsetsPartitionResponse struct {
	Partition       int32
	ErrorCode       int16
	Timestamp       int64
	Offset          int64
	LeaderEpoch     int32
	OldStyleOffsets []int64
}

type ListOffsetsRequest

type ListOffsetsRequest struct {
	ReplicaID int32
	Topics    []ListOffsetsTopic
}

func (ListOffsetsRequest) APIKey

func (ListOffsetsRequest) APIKey() int16

type ListOffsetsResponse

type ListOffsetsResponse struct {
	CorrelationID int32
	ThrottleMs    int32
	Topics        []ListOffsetsTopicResponse
}

type ListOffsetsTopic

type ListOffsetsTopic struct {
	Name       string
	Partitions []ListOffsetsPartition
}

type ListOffsetsTopicResponse

type ListOffsetsTopicResponse struct {
	Name       string
	Partitions []ListOffsetsPartitionResponse
}

type MetadataBroker

type MetadataBroker struct {
	NodeID int32
	Host   string
	Port   int32
	Rack   *string
}

MetadataBroker describes a broker in Metadata response.

type MetadataPartition

type MetadataPartition struct {
	ErrorCode       int16
	PartitionIndex  int32
	LeaderID        int32
	LeaderEpoch     int32
	ReplicaNodes    []int32
	ISRNodes        []int32
	OfflineReplicas []int32
}

MetadataPartition describes partition metadata.

type MetadataRequest

type MetadataRequest struct {
	Topics                 []string
	TopicIDs               [][16]byte
	AllowAutoTopicCreation bool
	IncludeClusterAuthOps  bool
	IncludeTopicAuthOps    bool
}

MetadataRequest asks for cluster metadata. Empty Topics means "all".

func (MetadataRequest) APIKey

func (MetadataRequest) APIKey() int16

type MetadataResponse

type MetadataResponse struct {
	CorrelationID               int32
	ThrottleMs                  int32
	Brokers                     []MetadataBroker
	ClusterID                   *string
	ControllerID                int32
	Topics                      []MetadataTopic
	ClusterAuthorizedOperations int32
}

MetadataResponse holds topic + broker info.

type MetadataTopic

type MetadataTopic struct {
	ErrorCode                 int16
	Name                      string
	TopicID                   [16]byte
	IsInternal                bool
	Partitions                []MetadataPartition
	TopicAuthorizedOperations int32
}

MetadataTopic describes a topic in Metadata response.

type OffsetCommitPartition

type OffsetCommitPartition struct {
	Partition int32
	Offset    int64
	Metadata  string
}

type OffsetCommitPartitionResponse

type OffsetCommitPartitionResponse struct {
	Partition int32
	ErrorCode int16
}

type OffsetCommitRequest

type OffsetCommitRequest struct {
	GroupID      string
	GenerationID int32
	MemberID     string
	RetentionMs  int64
	Topics       []OffsetCommitTopic
}

func (OffsetCommitRequest) APIKey

func (OffsetCommitRequest) APIKey() int16

type OffsetCommitResponse

type OffsetCommitResponse struct {
	CorrelationID int32
	ThrottleMs    int32
	Topics        []OffsetCommitTopicResponse
}

type OffsetCommitTopic

type OffsetCommitTopic struct {
	Name       string
	Partitions []OffsetCommitPartition
}

type OffsetCommitTopicResponse

type OffsetCommitTopicResponse struct {
	Name       string
	Partitions []OffsetCommitPartitionResponse
}

type OffsetFetchPartition

type OffsetFetchPartition struct {
	Partition int32
}

type OffsetFetchPartitionResponse

type OffsetFetchPartitionResponse struct {
	Partition   int32
	Offset      int64
	LeaderEpoch int32
	Metadata    *string
	ErrorCode   int16
}

type OffsetFetchRequest

type OffsetFetchRequest struct {
	GroupID string
	Topics  []OffsetFetchTopic
}

func (OffsetFetchRequest) APIKey

func (OffsetFetchRequest) APIKey() int16

type OffsetFetchResponse

type OffsetFetchResponse struct {
	CorrelationID int32
	ThrottleMs    int32
	Topics        []OffsetFetchTopicResponse
	ErrorCode     int16
}

type OffsetFetchTopic

type OffsetFetchTopic struct {
	Name       string
	Partitions []OffsetFetchPartition
}

type OffsetFetchTopicResponse

type OffsetFetchTopicResponse struct {
	Name       string
	Partitions []OffsetFetchPartitionResponse
}

type ProducePartition

type ProducePartition struct {
	Partition int32
	Records   []byte
}

type ProducePartitionResponse

type ProducePartitionResponse struct {
	Partition       int32
	ErrorCode       int16
	BaseOffset      int64
	LogAppendTimeMs int64
	LogStartOffset  int64
}

type ProduceRequest

type ProduceRequest struct {
	Acks            int16
	TimeoutMs       int32
	TransactionalID *string
	Topics          []ProduceTopic
}

ProduceRequest is a simplified representation of Kafka ProduceRequest v9.

func (ProduceRequest) APIKey

func (ProduceRequest) APIKey() int16

type ProduceResponse

type ProduceResponse struct {
	CorrelationID int32
	Topics        []ProduceTopicResponse
	ThrottleMs    int32
}

ProduceResponse contains per-partition acknowledgement info.

type ProduceTopic

type ProduceTopic struct {
	Name       string
	Partitions []ProducePartition
}

type ProduceTopicResponse

type ProduceTopicResponse struct {
	Name       string
	Partitions []ProducePartitionResponse
}

type Request

type Request interface {
	APIKey() int16
}

Request is implemented by concrete protocol requests.

type RequestHeader

type RequestHeader struct {
	APIKey        int16
	APIVersion    int16
	CorrelationID int32
	ClientID      *string
}

RequestHeader matches Kafka RequestHeader v1 (simplified without tagged fields).

func ParseRequestHeader

func ParseRequestHeader(b []byte) (*RequestHeader, *byteReader, error)

ParseRequestHeader decodes the header portion from raw bytes.

type SyncGroupAssignment

type SyncGroupAssignment struct {
	MemberID   string
	Assignment []byte
}

type SyncGroupRequest

type SyncGroupRequest struct {
	GroupID      string
	GenerationID int32
	MemberID     string
	Assignments  []SyncGroupAssignment
}

func (SyncGroupRequest) APIKey

func (SyncGroupRequest) APIKey() int16

type SyncGroupResponse

type SyncGroupResponse struct {
	CorrelationID int32
	ThrottleMs    int32
	ErrorCode     int16
	ProtocolType  *string
	ProtocolName  *string
	Assignment    []byte
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL