Documentation
¶
Overview ¶
Package proto provides kafka binary protocol implementation.
Index ¶
- Constants
- Variables
- func ComputeCrc(m *Message, compression Compression) uint32
- func ConfigureParser(c ParserConfig) error
- func NewDecoder(r io.Reader) *decoder
- func NewEncoder(w io.Writer) *encoder
- func ReadReq(r io.Reader) (requestKind int16, b []byte, err error)
- func ReadResp(r io.Reader) (correlationID int32, b []byte, err error)
- type Compression
- type ConsumerMetadataReq
- type ConsumerMetadataResp
- type FetchReq
- type FetchReqPartition
- type FetchReqTopic
- type FetchResp
- type FetchRespAbortedTransaction
- type FetchRespPartition
- type FetchRespTopic
- type KafkaError
- type Message
- type MetadataReq
- type MetadataResp
- type MetadataRespBroker
- type MetadataRespPartition
- type MetadataRespTopic
- type OffsetCommitReq
- type OffsetCommitReqPartition
- type OffsetCommitReqTopic
- type OffsetCommitResp
- type OffsetCommitRespPartition
- type OffsetCommitRespTopic
- type OffsetFetchReq
- type OffsetFetchReqTopic
- type OffsetFetchResp
- type OffsetFetchRespPartition
- type OffsetFetchRespTopic
- type OffsetReq
- type OffsetReqPartition
- type OffsetReqTopic
- type OffsetResp
- type OffsetRespPartition
- type OffsetRespTopic
- type ParserConfig
- type ProduceReq
- type ProduceReqPartition
- type ProduceReqTopic
- type ProduceResp
- type ProduceRespPartition
- type ProduceRespTopic
Constants ¶
const ( KafkaV0 int16 = iota KafkaV1 KafkaV2 KafkaV3 KafkaV4 KafkaV5 )
const ( ProduceReqKind = 0 FetchReqKind = 1 OffsetReqKind = 2 MetadataReqKind = 3 OffsetCommitReqKind = 8 OffsetFetchReqKind = 9 ConsumerMetadataReqKind = 10 // receive the latest offset (i.e. the offset of the next coming message) OffsetReqTimeLatest = -1 // receive the earliest available offset. Note that because offsets are // pulled in descending order, asking for the earliest offset will always // return you a single element. OffsetReqTimeEarliest = -2 // Server will not send any response. RequiredAcksNone = 0 // Server will block until the message is committed by all in sync replicas // before sending a response. RequiredAcksAll = -1 // Server will wait the data is written to the local log before sending a // response. RequiredAcksLocal = 1 )
const ( CorrelationTypeGroup int8 = 0 CorrelationTypeTransaction = 1 )
Variables ¶
var ( ErrUnknown = &KafkaError{-1, "unknown error"} ErrOffsetOutOfRange = &KafkaError{1, "offset out of range"} ErrInvalidMessage = &KafkaError{2, "invalid message"} ErrUnknownTopicOrPartition = &KafkaError{3, "unknown topic or partition"} ErrInvalidMessageSize = &KafkaError{4, "invalid message size"} ErrLeaderNotAvailable = &KafkaError{5, "leader not available"} ErrNotLeaderForPartition = &KafkaError{6, "not leader for partition"} ErrRequestTimeout = &KafkaError{7, "request timeed out"} ErrBrokerNotAvailable = &KafkaError{8, "broker not available"} ErrReplicaNotAvailable = &KafkaError{9, "replica not available"} ErrMessageSizeTooLarge = &KafkaError{10, "message size too large"} ErrScaleControllerEpoch = &KafkaError{11, "scale controller epoch"} ErrOffsetMetadataTooLarge = &KafkaError{12, "offset metadata too large"} ErrNetwork = &KafkaError{13, "server disconnected before response was received"} ErrOffsetLoadInProgress = &KafkaError{14, "offsets load in progress"} ErrNoCoordinator = &KafkaError{15, "consumer coordinator not available"} ErrNotCoordinator = &KafkaError{16, "not coordinator for consumer"} ErrInvalidTopic = &KafkaError{17, "operation on an invalid topic"} ErrRecordListTooLarge = &KafkaError{18, "message batch larger than the configured segment size"} ErrNotEnoughReplicas = &KafkaError{19, "not enough in-sync replicas"} ErrNotEnoughReplicasAfterAppend = &KafkaError{20, "messages are written to the log, but to fewer in-sync replicas than required"} ErrInvalidRequiredAcks = &KafkaError{21, "invalid value for required acks"} ErrIllegalGeneration = &KafkaError{22, "consumer generation id is not valid"} ErrInconsistentPartitionAssignmentStrategy = &KafkaError{23, "partition assignment strategy does not match that of the group"} ErrUnknownParititonAssignmentStrategy = &KafkaError{24, "partition assignment strategy is unknown to the broker"} ErrUnknownConsumerID = &KafkaError{25, "coordinator is not aware of this consumer"} ErrInvalidSessionTimeout = &KafkaError{26, "invalid session timeout"} ErrRebalanceInProgress = &KafkaError{27, "group is rebalancing, so a rejoin is needed"} ErrInvalidCommitOffsetSize = &KafkaError{28, "offset data size is not valid"} ErrTopicAuthorizationFailed = &KafkaError{29, "topic authorization failed"} ErrGroupAuthorizationFailed = &KafkaError{30, "group authorization failed"} ErrClusterAuthorizationFailed = &KafkaError{31, "cluster authorization failed"} ErrInvalidTimeStamp = &KafkaError{32, "timestamp of the message is out of acceptable range"} )
var ErrInvalidArrayLen = errors.New("invalid array length")
var ErrNotEnoughData = errors.New("not enough data")
Functions ¶
func ComputeCrc ¶
func ComputeCrc(m *Message, compression Compression) uint32
ComputeCrc returns crc32 hash for given message content.
func ConfigureParser ¶
func ConfigureParser(c ParserConfig) error
ConfigureParser configures the parser. It must be called prior to parsing any messages as the structure is currently not prepared for concurrent access.
func NewDecoder ¶
func NewEncoder ¶
func ReadReq ¶
ReadReq returns request kind ID and byte representation of the whole message in wire protocol format.
func ReadResp ¶
ReadResp returns message correlation ID and byte representation of the whole message in wire protocol that is returned when reading from given stream, including 4 bytes of message size itself. Byte representation returned by ReadResp can be parsed by all response reeaders to transform it into specialized response structure.
Types ¶
type Compression ¶
type Compression int8
const ( CompressionNone Compression = 0 CompressionGzip Compression = 1 CompressionSnappy Compression = 2 )
type ConsumerMetadataReq ¶
type ConsumerMetadataReq struct {
Version int16
CorrelationID int32
ClientID string
ConsumerGroup string
CoordinatorType int8 // >= KafkaV1
}
func ReadConsumerMetadataReq ¶
func ReadConsumerMetadataReq(r io.Reader) (*ConsumerMetadataReq, error)
func (*ConsumerMetadataReq) Bytes ¶
func (r *ConsumerMetadataReq) Bytes() ([]byte, error)
type ConsumerMetadataResp ¶
type ConsumerMetadataResp struct {
Version int16
CorrelationID int32
ThrottleTime time.Duration // >= KafkaV1
Err error
ErrMsg string // >= KafkaV1
CoordinatorID int32
CoordinatorHost string
CoordinatorPort int32
}
func ReadConsumerMetadataResp ¶
func ReadConsumerMetadataResp(r io.Reader) (*ConsumerMetadataResp, error)
func (*ConsumerMetadataResp) Bytes ¶
func (r *ConsumerMetadataResp) Bytes() ([]byte, error)
type FetchReq ¶
type FetchReqPartition ¶
type FetchReqTopic ¶
type FetchReqTopic struct {
Name string
Partitions []FetchReqPartition
}
type FetchResp ¶
type FetchResp struct {
Version int16
CorrelationID int32
ThrottleTime time.Duration
Topics []FetchRespTopic
}
type FetchRespPartition ¶
type FetchRespTopic ¶
type FetchRespTopic struct {
Name string
Partitions []FetchRespPartition
}
type KafkaError ¶
type KafkaError struct {
// contains filtered or unexported fields
}
func (*KafkaError) Errno ¶
func (err *KafkaError) Errno() int
func (*KafkaError) Error ¶
func (err *KafkaError) Error() string
type Message ¶
type Message struct {
Key []byte
Value []byte
Offset int64 // set when fetching and after successful producing
Crc uint32 // set when fetching, ignored when producing
Topic string // set when fetching, ignored when producing
Partition int32 // set when fetching, ignored when producing
TipOffset int64 // set when fetching, ignored when processing
}
Message represents single entity of message set.
type MetadataReq ¶
type MetadataReq struct {
Version int16
CorrelationID int32
ClientID string
Topics []string
AllowAutoTopicCreation bool // >= KafkaV4 only
}
func ReadMetadataReq ¶
func ReadMetadataReq(r io.Reader) (*MetadataReq, error)
func (*MetadataReq) Bytes ¶
func (r *MetadataReq) Bytes() ([]byte, error)
type MetadataResp ¶
type MetadataResp struct {
Version int16
CorrelationID int32
ThrottleTime time.Duration // >= KafkaV3
Brokers []MetadataRespBroker
ClusterID string // >= KafkaV2
ControllerID int32 // >= KafkaV1
Topics []MetadataRespTopic
}
func ReadMetadataResp ¶
func ReadMetadataResp(r io.Reader) (*MetadataResp, error)
func (*MetadataResp) Bytes ¶
func (r *MetadataResp) Bytes() ([]byte, error)
type MetadataRespBroker ¶
type MetadataRespPartition ¶
type MetadataRespTopic ¶
type MetadataRespTopic struct {
Name string
Err error
IsInternal bool // >= KafkaV1
Partitions []MetadataRespPartition
}
type OffsetCommitReq ¶
type OffsetCommitReq struct {
Version int16
CorrelationID int32
ClientID string
ConsumerGroup string
GroupGenerationID int32 // >= KafkaV1 only
MemberID string // >= KafkaV1 only
RetentionTime int64 // >= KafkaV2 only
Topics []OffsetCommitReqTopic
}
func ReadOffsetCommitReq ¶
func ReadOffsetCommitReq(r io.Reader) (*OffsetCommitReq, error)
func (*OffsetCommitReq) Bytes ¶
func (r *OffsetCommitReq) Bytes() ([]byte, error)
type OffsetCommitReqTopic ¶
type OffsetCommitReqTopic struct {
Name string
Partitions []OffsetCommitReqPartition
}
type OffsetCommitResp ¶
type OffsetCommitResp struct {
Version int16
CorrelationID int32
ThrottleTime time.Duration // >= KafkaV3 only
Topics []OffsetCommitRespTopic
}
func ReadOffsetCommitResp ¶
func ReadOffsetCommitResp(r io.Reader) (*OffsetCommitResp, error)
func (*OffsetCommitResp) Bytes ¶
func (r *OffsetCommitResp) Bytes() ([]byte, error)
type OffsetCommitRespTopic ¶
type OffsetCommitRespTopic struct {
Name string
Partitions []OffsetCommitRespPartition
}
type OffsetFetchReq ¶
type OffsetFetchReq struct {
Version int16
CorrelationID int32
ClientID string
ConsumerGroup string
Topics []OffsetFetchReqTopic
}
func ReadOffsetFetchReq ¶
func ReadOffsetFetchReq(r io.Reader) (*OffsetFetchReq, error)
func (*OffsetFetchReq) Bytes ¶
func (r *OffsetFetchReq) Bytes() ([]byte, error)
type OffsetFetchReqTopic ¶
type OffsetFetchResp ¶
type OffsetFetchResp struct {
Version int16
CorrelationID int32
ThrottleTime time.Duration // >= KafkaV3
Topics []OffsetFetchRespTopic
Err error // >= KafkaV2
}
func ReadOffsetFetchResp ¶
func ReadOffsetFetchResp(r io.Reader) (*OffsetFetchResp, error)
func (*OffsetFetchResp) Bytes ¶
func (r *OffsetFetchResp) Bytes() ([]byte, error)
type OffsetFetchRespTopic ¶
type OffsetFetchRespTopic struct {
Name string
Partitions []OffsetFetchRespPartition
}
type OffsetReq ¶
type OffsetReqPartition ¶
type OffsetReqTopic ¶
type OffsetReqTopic struct {
Name string
Partitions []OffsetReqPartition
}
type OffsetResp ¶
type OffsetResp struct {
Version int16
CorrelationID int32
ThrottleTime time.Duration
Topics []OffsetRespTopic
}
func ReadOffsetResp ¶
func ReadOffsetResp(r io.Reader) (*OffsetResp, error)
func (*OffsetResp) Bytes ¶
func (r *OffsetResp) Bytes() ([]byte, error)
type OffsetRespPartition ¶
type OffsetRespTopic ¶
type OffsetRespTopic struct {
Name string
Partitions []OffsetRespPartition
}
type ParserConfig ¶
type ParserConfig struct {
// SimplifiedMessageSetParsing enables a simplified version of the
// MessageSet parser which will not split MessageSet into slices of
// Message structures. Instead, the entire MessageSet will be read
// over. This mode improves parsing speed due to reduce memory read at
// the cost of not providing access to the message payload after
// parsing.
SimplifiedMessageSetParsing bool
}
ParserConfig is optional configuration for the parser. It can be configured via SetParserConfig
type ProduceReq ¶
type ProduceReq struct {
Version int16
CorrelationID int32
ClientID string
Compression Compression // only used when sending ProduceReqs
TransactionalID string
RequiredAcks int16
Timeout time.Duration
Topics []ProduceReqTopic
}
func ReadProduceReq ¶
func ReadProduceReq(r io.Reader) (*ProduceReq, error)
func (*ProduceReq) Bytes ¶
func (r *ProduceReq) Bytes() ([]byte, error)
type ProduceReqPartition ¶
type ProduceReqTopic ¶
type ProduceReqTopic struct {
Name string
Partitions []ProduceReqPartition
}
type ProduceResp ¶
type ProduceResp struct {
Version int16
CorrelationID int32
Topics []ProduceRespTopic
ThrottleTime time.Duration
}
func ReadProduceResp ¶
func ReadProduceResp(r io.Reader) (*ProduceResp, error)
func (*ProduceResp) Bytes ¶
func (r *ProduceResp) Bytes() ([]byte, error)
type ProduceRespPartition ¶
type ProduceRespTopic ¶
type ProduceRespTopic struct {
Name string
Partitions []ProduceRespPartition
}