Documentation
¶
Index ¶
- Constants
- Variables
- func BytesLen(bytes []byte) int
- func CompactArrayLen(length int) int
- func CompactBytesLen(bytes []byte) int
- func CompactNullableBytesLen(bytes []byte) int
- func CompactNullableStrLen(str *string) int
- func CompactStrLen(str string) int
- func CompactVarintBytesLen(bytes []byte) int
- func ConvertCompactLen(compactLen int) int
- func FourByteLength(bytes []byte) int
- func NullableStrLen(str *string) int
- func PanicToError(r any, stack []byte) error
- func StrLen(str string) int
- type ApiCode
- type ApiReq
- type ApiResp
- type ApiRespVersion
- type BaseReq
- type BaseResp
- type BrokerMetadata
- type EnableMechanism
- type ErrorCode
- type FetchPartitionReq
- type FetchPartitionResp
- type FetchReq
- type FetchResp
- type FetchTopicReq
- type FetchTopicResp
- type FindCoordinatorReq
- type FindCoordinatorResp
- type GroupAssignment
- type GroupProtocol
- type Header
- type HeartbeatReq
- type HeartbeatResp
- type JoinGroupReq
- type JoinGroupResp
- type LeaveGroupMember
- type LeaveGroupReq
- type LeaveGroupResp
- type ListOffsetsPartition
- type ListOffsetsPartitionResp
- type ListOffsetsReq
- type ListOffsetsResp
- type ListOffsetsTopic
- type ListOffsetsTopicResp
- type Member
- type MetadataReq
- type MetadataResp
- type MetadataTopicReq
- type OffsetCommitPartitionReq
- type OffsetCommitPartitionResp
- type OffsetCommitReq
- type OffsetCommitResp
- type OffsetCommitTopicReq
- type OffsetCommitTopicResp
- type OffsetFetchPartitionReq
- type OffsetFetchPartitionResp
- type OffsetFetchReq
- type OffsetFetchResp
- type OffsetFetchTopicReq
- type OffsetFetchTopicResp
- type OffsetForLeaderEpochPartitionResp
- type OffsetForLeaderEpochReq
- type OffsetForLeaderEpochResp
- type OffsetForLeaderEpochTopicResp
- type OffsetLeaderEpochPartitionReq
- type OffsetLeaderEpochTopicReq
- type PartitionMetadata
- type ProducePartitionReq
- type ProducePartitionResp
- type ProduceReq
- type ProduceResp
- type ProduceTopicReq
- type ProduceTopicResp
- type Record
- type RecordBatch
- type RecordError
- type Replica
- type SaslAuthenticateReq
- type SaslAuthenticateResp
- type SaslHandshakeReq
- type SaslHandshakeResp
- type SyncGroupReq
- type SyncGroupResp
- type TopicMetadata
Constants ¶
View Source
const ( LenAbortTransactions = 4 LenAllowAutoTopicCreation = 1 LenApiKey = 2 LenApiV0to2 = 6 LenApiV3 = 7 LenApiVersion = 2 LenArray = 4 LenBaseSequence = 4 LenBatchIndex = 4 LenClusterAuthOperation = 4 LenControllerId = 4 LenCoordinatorType = 1 LenCorrId = 4 LenCrc32 = 4 LenErrorCode = 2 LenFetchBytes = 4 LenFetchMaxWaitTime = 4 LenFetchSessionId = 4 LenFetchSessionEpoch = 4 LenGenerationId = 4 LenIncludeClusterAuthorizedOperations = 1 LenIncludeTopicAuthorizedOperations = 1 LenIsInternalV1 = 1 LenIsInternalV9 = 4 LenIsolationLevel = 1 LenLastStableOffset = 8 LenLeaderEpoch = 4 LenLeaderId = 4 LenLength = 4 LenMagicByte = 1 LenMessageSize = 4 LenNodeId = 4 LenOffset = 8 LenOffsetDelta = 4 LenPartitionId = 4 LenPort = 4 LenProducerId = 8 LenProducerEpoch = 2 LenRecordAttributes = 1 LenReplicaId = 4 LenRequiredAcks = 2 LenRequireStableOffset = 1 LenSessionTimeout = 8 LenStartOffset = 8 LenTaggedField = 1 LenThrottleTime = 4 LenTime = 8 LenTimeout = 4 LenTopicAuthOperation = 4 LenTransactionalId = 2 )
Variables ¶
View Source
var InvalidProtocolContent = errors.New("InvalidProtocolContent")
Functions ¶
func CompactArrayLen ¶
func CompactBytesLen ¶
func CompactNullableBytesLen ¶
func CompactNullableStrLen ¶
func CompactStrLen ¶
func CompactVarintBytesLen ¶
func ConvertCompactLen ¶
ConvertCompactLen convert compactLen into realLen
func FourByteLength ¶
func NullableStrLen ¶
func PanicToError ¶
Types ¶
type ApiCode ¶
type ApiCode int16
const ( Produce ApiCode = iota Fetch ListOffsets Metadata LeaderAndIsr StopReplica UpdateMetadata ControlledShutdown OffsetCommit OffsetFetch FindCoordinator JoinGroup Heartbeat LeaveGroup SyncGroup DescribeGroups ListGroups SaslHandshake ApiVersions CreateTopics DeleteTopics DeleteRecords InitProducerId OffsetForLeaderEpoch AddPartitionsToTxn AddOffsetsToTxn EndTxn WriteTxnMarkers TxnOffsetCommit DescribeAcls CreateAcls DeleteAcls DescribeConfigs AlterConfigs AlterReplicaLogDirs DescribeLogDirs SaslAuthenticate CreatePartitions CreateDelegationToken RenewDelegationToken ExpireDelegationToken DescribeDelegationToken DeleteGroups ElectLeaders IncrementalAlterConfigs AlterPartitionReassignments ListPartitionReassignments OffsetDelete DescribeClientQuotas AlterClientQuotas DescribeUserScramCredentials AlterUserScramCredentials AlterIsr UpdateFeatures DescribeCluster DescribeProducers )
type ApiResp ¶
type ApiResp struct {
BaseResp
ErrorCode ErrorCode
ApiRespVersions []*ApiRespVersion
ThrottleTime int
}
func (*ApiResp) BytesLength ¶
type ApiRespVersion ¶
type EnableMechanism ¶
type EnableMechanism struct {
SaslMechanism string
}
type ErrorCode ¶
type ErrorCode int16
const ( // UNKNOWN_SERVER_ERROR // The server experienced an unexpected error when processing the request. UNKNOWN_SERVER_ERROR ErrorCode = iota - 1 NONE // OFFSET_OUT_OF_RANGE // The requested offset is not within the range of offsets maintained by the server. OFFSET_OUT_OF_RANGE // CORRUPT_MESSAGE // This message has failed its CRC checksum, exceeds the valid size, has a null key for a compacted topic, or is otherwise corrupt. CORRUPT_MESSAGE // UNKNOWN_TOPIC_OR_PARTITION // This server does not host this topic-partition. UNKNOWN_TOPIC_OR_PARTITION // INVALID_FETCH_SIZE // The requested fetch size is invalid. INVALID_FETCH_SIZE // LEADER_NOT_AVAILABLE // There is no leader for this topic-partition as we are in the middle of a leadership election. LEADER_NOT_AVAILABLE // NOT_LEADER_OR_FOLLOWER // For requests intended only for the leader, this error indicates that the broker is not the current leader. // For requests intended for any replica, this error indicates that the broker is not a replica of the topic partition. NOT_LEADER_OR_FOLLOWER // REQUEST_TIMED_OUT // The request timed out. REQUEST_TIMED_OUT BROKER_NOT_AVAILABLE REPLICA_NOT_AVAILABLE MESSAGE_TOO_LARGE STALE_CONTROLLER_EPOCH OFFSET_METADATA_TOO_LARGE NETWORK_EXCEPTION COORDINATOR_LOAD_IN_PROGRESS COORDINATOR_NOT_AVAILABLE NOT_COORDINATOR INVALID_TOPIC_EXCEPTION RECORD_LIST_TOO_LARGE NOT_ENOUGH_REPLICAS NOT_ENOUGH_REPLICAS_AFTER_APPEND INVALID_REQUIRED_ACKS ILLEGAL_GENERATION INCONSISTENT_GROUP_PROTOCOL INVALID_GROUP_ID UNKNOWN_MEMBER_ID INVALID_SESSION_TIMEOUT REBALANCE_IN_PROGRESS INVALID_COMMIT_OFFSET_SIZE TOPIC_AUTHORIZATION_FAILED GROUP_AUTHORIZATION_FAILED CLUSTER_AUTHORIZATION_FAILED INVALID_TIMESTAMP UNSUPPORTED_SASL_MECHANISM ILLEGAL_SASL_STATE UNSUPPORTED_VERSION TOPIC_ALREADY_EXISTS INVALID_PARTITIONS INVALID_REPLICATION_FACTOR INVALID_REPLICA_ASSIGNMENT INVALID_CONFIG NOT_CONTROLLER INVALID_REQUEST UNSUPPORTED_FOR_MESSAGE_FORMAT POLICY_VIOLATION OUT_OF_ORDER_SEQUENCE_NUMBER DUPLICATE_SEQUENCE_NUMBER INVALID_PRODUCER_EPOCH INVALID_TXN_STATE INVALID_PRODUCER_ID_MAPPING INVALID_TRANSACTION_TIMEOUT CONCURRENT_TRANSACTIONS TRANSACTION_COORDINATOR_FENCED TRANSACTIONAL_ID_AUTHORIZATION_FAILED SECURITY_DISABLED OPERATION_NOT_ATTEMPTED KAFKA_STORAGE_ERROR LOG_DIR_NOT_FOUND SASL_AUTHENTICATION_FAILED UNKNOWN_PRODUCER_ID REASSIGNMENT_IN_PROGRESS DELEGATION_TOKEN_AUTH_DISABLED DELEGATION_TOKEN_NOT_FOUND DELEGATION_TOKEN_OWNER_MISMATCH DELEGATION_TOKEN_REQUEST_NOT_ALLOWED DELEGATION_TOKEN_AUTHORIZATION_FAILED DELEGATION_TOKEN_EXPIRED INVALID_PRINCIPAL_TYPE NON_EMPTY_GROUP GROUP_ID_NOT_FOUND FETCH_SESSION_ID_NOT_FOUND INVALID_FETCH_SESSION_EPOCH LISTENER_NOT_FOUND TOPIC_DELETION_DISABLED FENCED_LEADER_EPOCH UNKNOWN_LEADER_EPOCH UNSUPPORTED_COMPRESSION_TYPE STALE_BROKER_EPOCH OFFSET_NOT_AVAILABLE // MEMBER_ID_REQUIRED // The group member needs to have a valid member id before actually entering a consumer group. MEMBER_ID_REQUIRED PREFERRED_LEADER_NOT_AVAILABLE GROUP_MAX_SIZE_REACHED FENCED_INSTANCE_ID ELIGIBLE_LEADERS_NOT_AVAILABLE ELECTION_NOT_NEEDED NO_REASSIGNMENT_IN_PROGRESS GROUP_SUBSCRIBED_TO_TOPIC INVALID_RECORD UNSTABLE_OFFSET_COMMIT THROTTLING_QUOTA_EXCEEDED PRODUCER_FENCED RESOURCE_NOT_FOUND DUPLICATE_RESOURCE UNACCEPTABLE_CREDENTIAL INCONSISTENT_VOTER_SET INVALID_UPDATE_VERSION FEATURE_UPDATE_FAILED PRINCIPAL_DESERIALIZATION_FAILURE SNAPSHOT_NOT_FOUND POSITION_OUT_OF_RANGE UNKNOWN_TOPIC_ID DUPLICATE_BROKER_REGISTRATION BROKER_ID_NOT_REGISTERED INCONSISTENT_TOPIC_ID INCONSISTENT_CLUSTER_ID )
type FetchPartitionReq ¶
type FetchPartitionResp ¶
type FetchReq ¶
type FetchReq struct {
BaseReq
ReplicaId int32
MaxWaitTime int
MinBytes int
MaxBytes int
IsolationLevel byte
FetchSessionId int
FetchSessionEpoch int32
TopicReqList []*FetchTopicReq
}
func DecodeFetchReq ¶
type FetchResp ¶
type FetchResp struct {
BaseResp
ThrottleTime int
ErrorCode ErrorCode
SessionId int
TopicRespList []*FetchTopicResp
}
func DecodeFetchResp ¶
func NewFetchResp ¶
func (*FetchResp) BytesLength ¶
type FetchTopicReq ¶
type FetchTopicReq struct {
Topic string
PartitionReqList []*FetchPartitionReq
}
type FetchTopicResp ¶
type FetchTopicResp struct {
Topic string
PartitionRespList []*FetchPartitionResp
}
type FindCoordinatorReq ¶
func DecodeFindCoordinatorReq ¶
func DecodeFindCoordinatorReq(bytes []byte, version int16) (findCoordinatorReq *FindCoordinatorReq, err error)
func (*FindCoordinatorReq) Bytes ¶
func (f *FindCoordinatorReq) Bytes(containLen bool, containApiKeyVersion bool) []byte
func (*FindCoordinatorReq) BytesLength ¶
func (f *FindCoordinatorReq) BytesLength(containLen bool, containApiKeyVersion bool) int
type FindCoordinatorResp ¶
type FindCoordinatorResp struct {
BaseResp
ErrorCode ErrorCode
ThrottleTime int
ErrorMessage *string
NodeId int32
Host string
Port int
}
func DecodeFindCoordinatorResp ¶
func DecodeFindCoordinatorResp(bytes []byte, version int16) (fResp *FindCoordinatorResp, err error)
func (*FindCoordinatorResp) Bytes ¶
func (f *FindCoordinatorResp) Bytes(version int16, containLen bool) []byte
func (*FindCoordinatorResp) BytesLength ¶
func (f *FindCoordinatorResp) BytesLength(version int16) int
type GroupAssignment ¶
type GroupProtocol ¶
type HeartbeatReq ¶
type HeartbeatReq struct {
BaseReq
GroupId string
GenerationId int
MemberId string
GroupInstanceId *string
}
func DecodeHeartbeatReq ¶
func DecodeHeartbeatReq(bytes []byte, version int16) (heartBeatReq *HeartbeatReq, err error)
func (*HeartbeatReq) Bytes ¶
func (h *HeartbeatReq) Bytes(containLen bool, containApiKeyVersion bool) []byte
func (*HeartbeatReq) BytesLength ¶
func (h *HeartbeatReq) BytesLength(containLen bool, containApiKeyVersion bool) int
type HeartbeatResp ¶
func DecodeHeartbeatResp ¶
func DecodeHeartbeatResp(bytes []byte, version int16) (resp *HeartbeatResp, err error)
func (*HeartbeatResp) BytesLength ¶
func (h *HeartbeatResp) BytesLength(version int16) int
type JoinGroupReq ¶
type JoinGroupReq struct {
BaseReq
GroupId string
SessionTimeout int
RebalanceTimeout int
MemberId string
GroupInstanceId *string
ProtocolType string
GroupProtocols []*GroupProtocol
}
func DecodeJoinGroupReq ¶
func DecodeJoinGroupReq(bytes []byte, version int16) (joinGroupReq *JoinGroupReq, err error)
func (*JoinGroupReq) Bytes ¶
func (j *JoinGroupReq) Bytes(containLen bool, containApiKeyVersion bool) []byte
func (*JoinGroupReq) BytesLength ¶
func (j *JoinGroupReq) BytesLength(containLen bool, containApiKeyVersion bool) int
type JoinGroupResp ¶
type JoinGroupResp struct {
BaseResp
ErrorCode ErrorCode
ThrottleTime int
GenerationId int
ProtocolType *string
ProtocolName string
LeaderId string
MemberId string
Members []*Member
}
func DecodeJoinGroupResp ¶
func DecodeJoinGroupResp(bytes []byte, version int16) (resp *JoinGroupResp, err error)
func (*JoinGroupResp) BytesLength ¶
func (j *JoinGroupResp) BytesLength(version int16) int
type LeaveGroupMember ¶
type LeaveGroupReq ¶
type LeaveGroupReq struct {
BaseReq
GroupId string
Members []*LeaveGroupMember
}
func DecodeLeaveGroupReq ¶
func DecodeLeaveGroupReq(bytes []byte, version int16) (leaveGroupReq *LeaveGroupReq, err error)
func (*LeaveGroupReq) Bytes ¶
func (l *LeaveGroupReq) Bytes(containLen bool, containApiKeyVersion bool) []byte
func (*LeaveGroupReq) BytesLength ¶
func (l *LeaveGroupReq) BytesLength(containLen bool, containApiKeyVersion bool) int
type LeaveGroupResp ¶
type LeaveGroupResp struct {
BaseResp
ErrorCode ErrorCode
ThrottleTime int
Members []*LeaveGroupMember
MemberErrorCode ErrorCode
}
func DecodeLeaveGroupResp ¶
func DecodeLeaveGroupResp(bytes []byte, version int16) (resp *LeaveGroupResp, err error)
func (*LeaveGroupResp) Bytes ¶
func (l *LeaveGroupResp) Bytes(version int16, containLen bool) []byte
func (*LeaveGroupResp) BytesLength ¶
func (l *LeaveGroupResp) BytesLength(version int16) int
type ListOffsetsPartition ¶
type ListOffsetsReq ¶
type ListOffsetsReq struct {
BaseReq
ReplicaId int32
IsolationLevel byte
TopicReqList []*ListOffsetsTopic
}
func DecodeListOffsetsReq ¶
func DecodeListOffsetsReq(bytes []byte, version int16) (offsetReq *ListOffsetsReq, err error)
func (*ListOffsetsReq) Bytes ¶
func (l *ListOffsetsReq) Bytes(containLen bool, containApiKeyVersion bool) []byte
func (*ListOffsetsReq) BytesLength ¶
func (l *ListOffsetsReq) BytesLength(containLen bool, containApiKeyVersion bool) int
type ListOffsetsResp ¶
type ListOffsetsResp struct {
BaseResp
ErrorCode ErrorCode
ThrottleTime int
TopicRespList []*ListOffsetsTopicResp
}
func DecodeListOffsetsResp ¶
func DecodeListOffsetsResp(bytes []byte, version int16) (resp *ListOffsetsResp, err error)
func (*ListOffsetsResp) Bytes ¶
func (o *ListOffsetsResp) Bytes(version int16, containLen bool) []byte
func (*ListOffsetsResp) BytesLength ¶
func (o *ListOffsetsResp) BytesLength(version int16) int
type ListOffsetsTopic ¶
type ListOffsetsTopic struct {
Topic string
PartitionReqList []*ListOffsetsPartition
}
type ListOffsetsTopicResp ¶
type ListOffsetsTopicResp struct {
Topic string
PartitionRespList []*ListOffsetsPartitionResp
}
type MetadataReq ¶
type MetadataReq struct {
BaseReq
Topics []*MetadataTopicReq
AllowAutoTopicCreation bool
IncludeClusterAuthorizedOperations bool
IncludeTopicAuthorizedOperations bool
}
func DecodeMetadataReq ¶
func DecodeMetadataReq(bytes []byte, version int16) (metadataReq *MetadataReq, err error)
func (*MetadataReq) Bytes ¶
func (m *MetadataReq) Bytes(containLen bool, containApiKeyVersion bool) []byte
func (*MetadataReq) BytesLength ¶
func (m *MetadataReq) BytesLength(containLen bool, containApiKeyVersion bool) int
type MetadataResp ¶
type MetadataResp struct {
BaseResp
ThrottleTime int
ErrorCode int16
BrokerMetadataList []*BrokerMetadata
ClusterId string
ControllerId int32
TopicMetadataList []*TopicMetadata
ClusterAuthorizedOperation int
}
func DecodeMetadataResp ¶
func DecodeMetadataResp(bytes []byte, version int16) (metadataResp *MetadataResp, err error)
func (*MetadataResp) BytesLength ¶
func (m *MetadataResp) BytesLength(version int16) int
type MetadataTopicReq ¶
type MetadataTopicReq struct {
Topic string
}
type OffsetCommitReq ¶
type OffsetCommitReq struct {
BaseReq
GroupId string
GenerationId int
MemberId string
RetentionTime int64
GroupInstanceId *string
TopicReqList []*OffsetCommitTopicReq
}
func DecodeOffsetCommitReq ¶
func DecodeOffsetCommitReq(bytes []byte, version int16) (offsetReq *OffsetCommitReq, err error)
func (*OffsetCommitReq) Bytes ¶
func (o *OffsetCommitReq) Bytes(containLen bool, containApiKeyVersion bool) []byte
func (*OffsetCommitReq) BytesLength ¶
func (o *OffsetCommitReq) BytesLength(containLen bool, containApiKeyVersion bool) int
type OffsetCommitResp ¶
type OffsetCommitResp struct {
BaseResp
ThrottleTime int
TopicRespList []*OffsetCommitTopicResp
}
func DecodeOffsetCommitResp ¶
func DecodeOffsetCommitResp(bytes []byte, version int16) (resp *OffsetCommitResp, err error)
func (*OffsetCommitResp) Bytes ¶
func (o *OffsetCommitResp) Bytes(version int16, containLen bool) []byte
func (*OffsetCommitResp) BytesLength ¶
func (o *OffsetCommitResp) BytesLength(version int16) int
type OffsetCommitTopicReq ¶
type OffsetCommitTopicReq struct {
Topic string
PartitionReqList []*OffsetCommitPartitionReq
}
type OffsetCommitTopicResp ¶
type OffsetCommitTopicResp struct {
Topic string
PartitionRespList []*OffsetCommitPartitionResp
}
type OffsetFetchPartitionReq ¶
type OffsetFetchPartitionReq struct {
PartitionId int
}
type OffsetFetchReq ¶
type OffsetFetchReq struct {
BaseReq
GroupId string
TopicReqList []*OffsetFetchTopicReq
RequireStableOffset bool
}
func DecodeOffsetFetchReq ¶
func DecodeOffsetFetchReq(bytes []byte, version int16) (fetchReq *OffsetFetchReq, err error)
func (*OffsetFetchReq) Bytes ¶
func (o *OffsetFetchReq) Bytes(containLen bool, containApiKeyVersion bool) []byte
func (*OffsetFetchReq) BytesLength ¶
func (o *OffsetFetchReq) BytesLength(containLen bool, containApiKeyVersion bool) int
type OffsetFetchResp ¶
type OffsetFetchResp struct {
BaseResp
ThrottleTime int
ErrorCode ErrorCode
TopicRespList []*OffsetFetchTopicResp
}
func DecodeOffsetFetchResp ¶
func DecodeOffsetFetchResp(bytes []byte, version int16) (resp *OffsetFetchResp, err error)
func (*OffsetFetchResp) Bytes ¶
func (o *OffsetFetchResp) Bytes(version int16, containLen bool) []byte
func (*OffsetFetchResp) BytesLength ¶
func (o *OffsetFetchResp) BytesLength(version int16) int
type OffsetFetchTopicReq ¶
type OffsetFetchTopicReq struct {
Topic string
PartitionReqList []*OffsetFetchPartitionReq
}
type OffsetFetchTopicResp ¶
type OffsetFetchTopicResp struct {
Topic string
PartitionRespList []*OffsetFetchPartitionResp
}
type OffsetForLeaderEpochReq ¶
type OffsetForLeaderEpochReq struct {
BaseReq
ReplicaId int32
TopicReqList []*OffsetLeaderEpochTopicReq
}
func DecodeOffsetForLeaderEpochReq ¶
func DecodeOffsetForLeaderEpochReq(bytes []byte, version int16) (leaderEpochReq *OffsetForLeaderEpochReq, err error)
func (*OffsetForLeaderEpochReq) Bytes ¶
func (o *OffsetForLeaderEpochReq) Bytes(containLen bool, containApiKeyVersion bool) []byte
func (*OffsetForLeaderEpochReq) BytesLength ¶
func (o *OffsetForLeaderEpochReq) BytesLength(containLen bool, containApiKeyVersion bool) int
type OffsetForLeaderEpochResp ¶
type OffsetForLeaderEpochResp struct {
BaseResp
ThrottleTime int
TopicRespList []*OffsetForLeaderEpochTopicResp
}
func DecodeOffsetForLeaderEpochResp ¶
func DecodeOffsetForLeaderEpochResp(bytes []byte, version int16) (resp *OffsetForLeaderEpochResp, err error)
func (*OffsetForLeaderEpochResp) Bytes ¶
func (o *OffsetForLeaderEpochResp) Bytes(version int16, containLen bool) []byte
func (*OffsetForLeaderEpochResp) BytesLength ¶
func (o *OffsetForLeaderEpochResp) BytesLength(version int16) int
type OffsetForLeaderEpochTopicResp ¶
type OffsetForLeaderEpochTopicResp struct {
Topic string
PartitionRespList []*OffsetForLeaderEpochPartitionResp
}
type OffsetLeaderEpochTopicReq ¶
type OffsetLeaderEpochTopicReq struct {
Topic string
PartitionReqList []*OffsetLeaderEpochPartitionReq
}
type PartitionMetadata ¶
type ProducePartitionReq ¶
type ProducePartitionReq struct {
PartitionId int
RecordBatch *RecordBatch
}
type ProducePartitionResp ¶
type ProduceReq ¶
type ProduceReq struct {
BaseReq
ClientId string
TransactionId *string
RequiredAcks int16
Timeout int
TopicReqList []*ProduceTopicReq
}
func DecodeProduceReq ¶
func DecodeProduceReq(bytes []byte, version int16) (produceReq *ProduceReq, err error)
func (*ProduceReq) Bytes ¶
func (p *ProduceReq) Bytes(containLen bool, containApiKeyVersion bool) []byte
func (*ProduceReq) BytesLength ¶
func (p *ProduceReq) BytesLength(containLen bool, containApiKeyVersion bool) int
type ProduceResp ¶
type ProduceResp struct {
BaseResp
TopicRespList []*ProduceTopicResp
ThrottleTime int
}
func DecodeProduceResp ¶
func DecodeProduceResp(bytes []byte, version int16) (resp *ProduceResp, err error)
func (*ProduceResp) BytesLength ¶
func (p *ProduceResp) BytesLength(version int16) int
type ProduceTopicReq ¶
type ProduceTopicReq struct {
Topic string
PartitionReqList []*ProducePartitionReq
}
type ProduceTopicResp ¶
type ProduceTopicResp struct {
Topic string
PartitionRespList []*ProducePartitionResp
}
type Record ¶
type Record struct {
RecordAttributes byte
RelativeTimestamp int64
RelativeOffset int
Key []byte
Value []byte
Headers []*Header
}
func DecodeRecord ¶
func (*Record) BytesLength ¶
type RecordBatch ¶
type RecordBatch struct {
Offset int64
MessageSize int
LeaderEpoch int32
MagicByte byte
Flags uint16
LastOffsetDelta int
FirstTimestamp int64
LastTimestamp int64
ProducerId int64
ProducerEpoch int16
BaseSequence int32
Records []*Record
}
func DecodeRecordBatch ¶
func DecodeRecordBatch(bytes []byte, version int16) *RecordBatch
func (*RecordBatch) Bytes ¶
func (r *RecordBatch) Bytes() []byte
func (*RecordBatch) BytesLength ¶
func (r *RecordBatch) BytesLength() int
func (*RecordBatch) RecordBatchMessageLength ¶
func (r *RecordBatch) RecordBatchMessageLength() int
type RecordError ¶
type SaslAuthenticateReq ¶
func DecodeSaslAuthenticateReq ¶
func DecodeSaslAuthenticateReq(bytes []byte, version int16) (authReq *SaslAuthenticateReq, err error)
func (*SaslAuthenticateReq) Bytes ¶
func (s *SaslAuthenticateReq) Bytes(containLen bool, containApiKeyVersion bool) []byte
func (*SaslAuthenticateReq) BytesLength ¶
func (s *SaslAuthenticateReq) BytesLength(containLen bool, containApiKeyVersion bool) int
type SaslAuthenticateResp ¶
type SaslAuthenticateResp struct {
BaseResp
ErrorCode ErrorCode
ErrorMessage string
AuthBytes []byte
SessionLifetime int64
}
func DecodeSaslAuthenticateResp ¶
func DecodeSaslAuthenticateResp(bytes []byte, version int16) (resp *SaslAuthenticateResp, err error)
func (*SaslAuthenticateResp) Bytes ¶
func (s *SaslAuthenticateResp) Bytes(version int16, containLen bool) []byte
func (*SaslAuthenticateResp) BytesLength ¶
func (s *SaslAuthenticateResp) BytesLength(version int16) int
type SaslHandshakeReq ¶
func DecodeSaslHandshakeReq ¶
func DecodeSaslHandshakeReq(bytes []byte, version int16) (saslHandshakeReq *SaslHandshakeReq, err error)
func (*SaslHandshakeReq) Bytes ¶
func (s *SaslHandshakeReq) Bytes(containLen bool, containApiKeyVersion bool) []byte
func (*SaslHandshakeReq) BytesLength ¶
func (s *SaslHandshakeReq) BytesLength(containLen bool, containApiKeyVersion bool) int
type SaslHandshakeResp ¶
type SaslHandshakeResp struct {
BaseResp
ErrorCode ErrorCode
EnableMechanisms []*EnableMechanism
}
func DecodeSaslHandshakeResp ¶
func DecodeSaslHandshakeResp(bytes []byte, version int16) (resp *SaslHandshakeResp, err error)
func (*SaslHandshakeResp) Bytes ¶
func (s *SaslHandshakeResp) Bytes(version int16, containLen bool) []byte
func (*SaslHandshakeResp) BytesLength ¶
func (s *SaslHandshakeResp) BytesLength(version int16) int
type SyncGroupReq ¶
type SyncGroupReq struct {
BaseReq
GroupId string
GenerationId int
MemberId string
GroupInstanceId *string
ProtocolType string
ProtocolName string
GroupAssignments []*GroupAssignment
}
func DecodeSyncGroupReq ¶
func DecodeSyncGroupReq(bytes []byte, version int16) (groupReq *SyncGroupReq, err error)
func (*SyncGroupReq) Bytes ¶
func (s *SyncGroupReq) Bytes(containLen bool, containApiKeyVersion bool) []byte
func (*SyncGroupReq) BytesLength ¶
func (s *SyncGroupReq) BytesLength(containLen bool, containApiKeyVersion bool) int
type SyncGroupResp ¶
type SyncGroupResp struct {
BaseResp
ThrottleTime int
ErrorCode ErrorCode
ProtocolType string
ProtocolName string
MemberAssignment []byte
}
func DecodeSyncGroupResp ¶
func DecodeSyncGroupResp(bytes []byte, version int16) (resp *SyncGroupResp, err error)
func (*SyncGroupResp) BytesLength ¶
func (s *SyncGroupResp) BytesLength(version int16) int
type TopicMetadata ¶
type TopicMetadata struct {
ErrorCode ErrorCode
Topic string
IsInternal bool
PartitionMetadataList []*PartitionMetadata
TopicAuthorizedOperation int
}
Source Files
¶
- api_code.go
- api_versions_req.go
- api_versions_resp.go
- base.go
- const.go
- crc32.go
- error.go
- error_code.go
- fetch_req.go
- fetch_resp.go
- find_coordinator_req.go
- find_coordinator_resp.go
- heartbeat_req.go
- heartbeat_resp.go
- join_group_req.go
- join_group_resp.go
- leave_group_common.go
- leave_group_req.go
- leave_group_resp.go
- list_offsets_req.go
- list_offsets_resp.go
- metadata_req.go
- metadata_resp.go
- offset_commit_req.go
- offset_commit_resp.go
- offset_fetch_req.go
- offset_fetch_resp.go
- offset_for_leader_epoch_req.go
- offset_leader_epoch_resp.go
- produce_req.go
- produce_resp.go
- record.go
- record_batch.go
- sasl_authenticate_req.go
- sasl_authenticate_resp.go
- sasl_handshake_req.go
- sasl_handshake_resp.go
- sync_group_req.go
- sync_group_resp.go
- test_util.go
- util_decode.go
- util_general_array.go
- util_general_bool.go
- util_general_byte.go
- util_general_bytes.go
- util_general_integer.go
- util_general_string.go
- util_general_varint.go
- util_kafka_array.go
- util_kafka_bool.go
- util_kafka_byte.go
- util_kafka_bytes.go
- util_kafka_field.go
- util_kafka_int.go
- util_kafka_int16.go
- util_kafka_int32.go
- util_kafka_int64.go
- util_kafka_record.go
- util_kafka_sasl.go
- util_kafka_string.go
- util_kafka_varint.go
- util_kafka_varint64.go
- util_length.go
Click to show internal directories.
Click to hide internal directories.