Documentation
¶
Overview ¶
Package proto 若涉及到字节序,则全部为大端序
Index ¶
- Constants
- Variables
- func AddDescriptor(m Message, text string) bool
- func CalcChecksum(data []byte) uint16
- func CalcSHA1(str string) string
- func CalcSHA256(str string) string
- func GetMessageResponseStatusText(status MessageResponseStatus) string
- func JsonMessageBuild(m Message) ([]byte, error)
- func JsonMessageBuildTo(writer io.Writer, m Message) (int, error)
- func JsonMessageParse(stream []byte, m Message) error
- func JsonMessageParseFrom(reader io.Reader, m Message) error
- func SetGlobalCrypto(c Crypto)
- type AckType
- type CMessage
- func (m *CMessage) Build() ([]byte, error)
- func (m *CMessage) BuildTo(writer io.Writer) (int, error)
- func (m *CMessage) Length() int
- func (m *CMessage) MarshalMethod() MarshalMethodType
- func (m *CMessage) MessageType() MessageType
- func (m *CMessage) Parse(stream []byte) error
- func (m *CMessage) ParseFrom(reader io.Reader) error
- func (m *CMessage) Reset()
- func (m *CMessage) String() string
- type CPMPool
- type ConsumerMessage
- type Counter
- type Crypto
- type DecryptFunc
- type Descriptor
- type EncryptFunc
- type FramePool
- type HCPMessagePool
- type HeartbeatMessage
- func (h *HeartbeatMessage) Build() ([]byte, error)
- func (h *HeartbeatMessage) BuildTo(writer io.Writer) (int, error)
- func (h *HeartbeatMessage) Length() int
- func (h *HeartbeatMessage) MarshalMethod() MarshalMethodType
- func (h *HeartbeatMessage) MessageType() MessageType
- func (h *HeartbeatMessage) Parse(stream []byte) error
- func (h *HeartbeatMessage) ParseFrom(reader io.Reader) error
- func (h *HeartbeatMessage) Reset()
- func (h *HeartbeatMessage) String() string
- type HumanMessage
- type LinkType
- type MarshalMethodType
- type Message
- type MessageResponse
- func (m *MessageResponse) Accepted() bool
- func (m *MessageResponse) Build() ([]byte, error)
- func (m *MessageResponse) BuildTo(writer io.Writer) (int, error)
- func (m *MessageResponse) Length() int
- func (m *MessageResponse) MarshalMethod() MarshalMethodType
- func (m *MessageResponse) MessageType() MessageType
- func (m *MessageResponse) Parse(stream []byte) error
- func (m *MessageResponse) ParseFrom(reader io.Reader) error
- func (m *MessageResponse) Reset()
- func (m *MessageResponse) String() string
- type MessageResponseStatus
- type MessageType
- type NoCopy
- type NotImplementMessage
- func (m NotImplementMessage) Build() ([]byte, error)
- func (m NotImplementMessage) BuildTo(_ io.Writer) (int, error)
- func (m NotImplementMessage) Length() int
- func (m NotImplementMessage) MarshalMethod() MarshalMethodType
- func (m NotImplementMessage) MessageType() MessageType
- func (m NotImplementMessage) Parse(_ []byte) error
- func (m NotImplementMessage) ParseFrom(_ io.Reader) error
- func (m NotImplementMessage) Reset()
- func (m NotImplementMessage) String() string
- type PMessage
- func (m *PMessage) Build() ([]byte, error)
- func (m *PMessage) BuildTo(writer io.Writer) (int, error)
- func (m *PMessage) Length() int
- func (m *PMessage) MarshalMethod() MarshalMethodType
- func (m *PMessage) MessageType() MessageType
- func (m *PMessage) Parse(stream []byte) error
- func (m *PMessage) ParseFrom(reader io.Reader) error
- func (m *PMessage) Reset()
- func (m *PMessage) String() string
- type ProducerMessage
- type Queue
- type RegisterMessage
- func (m *RegisterMessage) Build() ([]byte, error)
- func (m *RegisterMessage) BuildTo(writer io.Writer) (int, error)
- func (m *RegisterMessage) Length() int
- func (m *RegisterMessage) MarshalMethod() MarshalMethodType
- func (m *RegisterMessage) MessageType() MessageType
- func (m *RegisterMessage) Parse(stream []byte) error
- func (m *RegisterMessage) ParseFrom(reader io.Reader) error
- func (m *RegisterMessage) Reset()
- func (m *RegisterMessage) String() string
- type TransferFrame
- func (f *TransferFrame) Build() ([]byte, error)
- func (f *TransferFrame) BuildFrom(m Message) ([]byte, error)
- func (f *TransferFrame) BuildWith(typ MessageType, data []byte) ([]byte, error)
- func (f *TransferFrame) Length() int
- func (f *TransferFrame) MarshalMethod() MarshalMethodType
- func (f *TransferFrame) Parse(stream []byte) error
- func (f *TransferFrame) ParseChecksum() uint16
- func (f *TransferFrame) ParseDataLength() int
- func (f *TransferFrame) ParseFrom(reader io.Reader) error
- func (f *TransferFrame) Read(buf []byte) (int, error)
- func (f *TransferFrame) Reset()
- func (f *TransferFrame) String() string
- func (f *TransferFrame) Unmarshal(msg Message) error
- func (f *TransferFrame) UnmarshalTo() (Message, error)
- func (f *TransferFrame) Write(buf []byte) (int, error)
Constants ¶
const ( FrameHead = 0x3C // 0x3C (可见字符: <) FrameTail = 0x0D // 0x0D (回车符) )
const FrameMinLength int = 7
const TotalNumberOfMessages int = 256
Variables ¶
var ( ErrMethodNotImplemented = errors.New("method not implemented") ErrMessageNotFull = errors.New("message is not full") )
var CalcSHA = CalcSHA1
CalcSHA 计算一个字符串的hash值
Functions ¶
func AddDescriptor ¶
AddDescriptor 添加描述符号表, 对于已经实现的协议则不允许修改
func GetMessageResponseStatusText ¶ added in v0.3.1
func GetMessageResponseStatusText(status MessageResponseStatus) string
func JsonMessageBuild ¶
func JsonMessageParse ¶
func JsonMessageParseFrom ¶ added in v0.3.3
JsonMessageParseFrom 从reader解析消息,此操作不够优化,应考虑使用 Parse 方法
Types ¶
type CMessage ¶
type CMessage struct {
Offset []byte // uint64
ProductTime []byte // time.Time.Unix() 消息创建的Unix时间戳
PM *PMessage
}
CMessage 消费者消息记录, 不允许复制
消息结构: | TopicLen | Topic | KeyLen | key | ValueLen | Value | Offset | ProductTime | |--------------|-----------------|------------|-------------------|--------------|-----------|------------|-----------------| len | 1 | N [1-255] bytes | 1 | N [1-255] bytes | 2 | N | 8 | 8 | |--------------|-----------------|------------|-------------------|--------------|-----------|------------|-----------------|
打包后的总长度不能超过 65526 字节
func (*CMessage) MarshalMethod ¶
func (m *CMessage) MarshalMethod() MarshalMethodType
func (*CMessage) MessageType ¶
func (m *CMessage) MessageType() MessageType
type CPMPool ¶
type CPMPool struct {
// contains filtered or unexported fields
}
func NewCPMPool ¶
func NewCPMPool() *CPMPool
type ConsumerMessage ¶
type ConsumerMessage struct {
Topic string `json:"topic"`
Key string `json:"key"`
Value []byte `json:"value"`
Offset uint64 `json:"offset"`
ProductTime time.Time `json:"product_time"` // 服务端收到消息时的时间戳
// contains filtered or unexported fields
}
ConsumerMessage 直接投递给消费者的单条数据消息 需要从 TransferFrame 中转换
func (*ConsumerMessage) MarshalMethod ¶
func (m *ConsumerMessage) MarshalMethod() MarshalMethodType
func (*ConsumerMessage) MessageType ¶
func (m *ConsumerMessage) MessageType() MessageType
func (*ConsumerMessage) ParseFromCMessage ¶
func (m *ConsumerMessage) ParseFromCMessage(cm *CMessage)
func (*ConsumerMessage) Reset ¶
func (m *ConsumerMessage) Reset()
func (*ConsumerMessage) ShouldBindJSON ¶
func (m *ConsumerMessage) ShouldBindJSON(v any) error
ShouldBindJSON 将数据反序列化到一个JSON模型上
func (*ConsumerMessage) String ¶
func (m *ConsumerMessage) String() string
type Counter ¶
type Counter struct {
// contains filtered or unexported fields
}
Counter 计数器
func (*Counter) ValueBeforeIncrement ¶
ValueBeforeIncrement 首先获取当前计数器的数值,然后将计数器 +1
type Crypto ¶
type Crypto interface {
Encrypt(stream []byte) ([]byte, error) // 加密数据体
Decrypt(stream []byte) ([]byte, error) // 解密数据体
}
Crypto 加解密支持
type DecryptFunc ¶
type Descriptor ¶
type Descriptor struct {
// contains filtered or unexported fields
}
func GetDescriptor ¶
func GetDescriptor(code MessageType) *Descriptor
type EncryptFunc ¶
type FramePool ¶
type FramePool struct {
// contains filtered or unexported fields
}
func NewFramePool ¶
func NewFramePool() *FramePool
func (*FramePool) Get ¶
func (p *FramePool) Get() *TransferFrame
func (*FramePool) HistoryNum ¶ added in v0.3.3
HistoryNum 历史数量
func (*FramePool) Put ¶
func (p *FramePool) Put(v *TransferFrame)
type HCPMessagePool ¶
type HCPMessagePool struct {
// contains filtered or unexported fields
}
func NewHCPMPool ¶
func NewHCPMPool() *HCPMessagePool
func (*HCPMessagePool) CMHistoryNum ¶
func (m *HCPMessagePool) CMHistoryNum() uint64
CMHistoryNum ConsumerMessage 历史数量
func (*HCPMessagePool) GetCM ¶
func (m *HCPMessagePool) GetCM() *ConsumerMessage
func (*HCPMessagePool) GetPM ¶
func (m *HCPMessagePool) GetPM() *ProducerMessage
func (*HCPMessagePool) PMHistoryNum ¶
func (m *HCPMessagePool) PMHistoryNum() uint64
PMHistoryNum ProducerMessage 历史数量
func (*HCPMessagePool) PutCM ¶
func (m *HCPMessagePool) PutCM(v *ConsumerMessage)
func (*HCPMessagePool) PutPM ¶
func (m *HCPMessagePool) PutPM(v *ProducerMessage)
type HeartbeatMessage ¶ added in v0.3.3
type HeartbeatMessage struct {
Type LinkType `json:"type" description:"客户端类型"`
CreatedAt int64 `json:"created_at" description:"客户端创建时间戳"`
}
HeartbeatMessage 心跳
func (*HeartbeatMessage) Build ¶
func (h *HeartbeatMessage) Build() ([]byte, error)
func (*HeartbeatMessage) BuildTo ¶
func (h *HeartbeatMessage) BuildTo(writer io.Writer) (int, error)
func (*HeartbeatMessage) Length ¶
func (h *HeartbeatMessage) Length() int
func (*HeartbeatMessage) MarshalMethod ¶ added in v0.3.3
func (h *HeartbeatMessage) MarshalMethod() MarshalMethodType
func (*HeartbeatMessage) MessageType ¶ added in v0.3.3
func (h *HeartbeatMessage) MessageType() MessageType
func (*HeartbeatMessage) Parse ¶
func (h *HeartbeatMessage) Parse(stream []byte) error
func (*HeartbeatMessage) Reset ¶ added in v0.3.3
func (h *HeartbeatMessage) Reset()
func (*HeartbeatMessage) String ¶ added in v0.3.3
func (h *HeartbeatMessage) String() string
type HumanMessage ¶
type HumanMessage interface {
MessageType() MessageType // 消息类别
MarshalMethod() MarshalMethodType // 消息序列化方法
String() string
}
HumanMessage 直接返回给调用者的消息定义
type MarshalMethodType ¶
type MarshalMethodType string
const ( JsonMarshalMethod MarshalMethodType = "JSON" BinaryMarshalMethod MarshalMethodType = "BINARY" )
type MessageResponse ¶
type MessageResponse struct {
// 仅当 AcceptedStatus 时才认为服务器接受了请求并下方了有效的参数
Status MessageResponseStatus `json:"status"`
Offset uint64 `json:"offset"`
ReceiveTime int64 `json:"receive_time"`
// 定时器间隔,单位ms,仅生产者有效,生产者需要按照此间隔发送帧消息
TickerInterval int `json:"ticker_duration"`
// 消费者需要按照此参数,在此周期内向服务端发送心跳
// 生产者在此周期内若没有数据产生,也应发送心跳
Keepalive float64 `json:"keepalive"`
}
MessageResponse 消息响应, P和C通用
func (*MessageResponse) Accepted ¶ added in v0.3.1
func (m *MessageResponse) Accepted() bool
func (*MessageResponse) Build ¶
func (m *MessageResponse) Build() ([]byte, error)
func (*MessageResponse) Length ¶
func (m *MessageResponse) Length() int
func (*MessageResponse) MarshalMethod ¶
func (m *MessageResponse) MarshalMethod() MarshalMethodType
func (*MessageResponse) MessageType ¶
func (m *MessageResponse) MessageType() MessageType
MessageType 依据偏移量字段判断消息类型
func (*MessageResponse) Parse ¶
func (m *MessageResponse) Parse(stream []byte) error
func (*MessageResponse) ParseFrom ¶
func (m *MessageResponse) ParseFrom(reader io.Reader) error
ParseFrom 从reader解析消息,此操作不够优化,应考虑使用 Parse 方法
func (*MessageResponse) Reset ¶
func (m *MessageResponse) Reset()
func (*MessageResponse) String ¶
func (m *MessageResponse) String() string
type MessageResponseStatus ¶ added in v0.3.1
type MessageResponseStatus string
const ( AcceptedStatus MessageResponseStatus = "0" // 已接受,正常状态 RefusedStatus MessageResponseStatus = "1" TokenIncorrectStatus MessageResponseStatus = "10" // 密钥不争取 )
type MessageType ¶
type MessageType byte
const ( NotImplementMessageType MessageType = 0 RegisterMessageType MessageType = 1 // 客户端消费者/生产者注册消息类别 c -> s RegisterMessage RegisterMessageRespType MessageType = 2 // s -> c MessageResponse ReRegisterMessageType MessageType = 3 // s -> c 令客户端重新发起注册流程, 无消息体 HeartbeatMessageType MessageType = 4 // c -> s MessageRespType MessageType = 100 // 生产者消息响应 s -> c MessageResponse PMessageType MessageType = 101 // 生产者消息类别 c -> s PMessage CMessageType MessageType = 102 // 消费者消息类别s -> c CMessage )
如果增加了新的协议代码,则都需要在 descriptors 中添加其类型
type NotImplementMessage ¶
type NotImplementMessage struct{}
func (NotImplementMessage) Build ¶
func (m NotImplementMessage) Build() ([]byte, error)
func (NotImplementMessage) Length ¶
func (m NotImplementMessage) Length() int
func (NotImplementMessage) MarshalMethod ¶
func (m NotImplementMessage) MarshalMethod() MarshalMethodType
func (NotImplementMessage) MessageType ¶
func (m NotImplementMessage) MessageType() MessageType
func (NotImplementMessage) Parse ¶
func (m NotImplementMessage) Parse(_ []byte) error
func (NotImplementMessage) Reset ¶
func (m NotImplementMessage) Reset()
func (NotImplementMessage) String ¶
func (m NotImplementMessage) String() string
type PMessage ¶
type PMessage struct {
Topic []byte // 字符串转字节
Key []byte
Value []byte
// contains filtered or unexported fields
}
PMessage 生产者消息数据, 不允许复制
消息结构: | TopicLen | Topic | KeyLen | key | ValueLen | Value | |--------------|-----------------|------------|-------------------|--------------|-----------| len | 1 | N [1-255] bytes | 1 | N [1-255] bytes | 2 | N | |--------------|-----------------|------------|-------------------|--------------|-----------|
打包后的总长度不能超过 65526 字节
func (*PMessage) MarshalMethod ¶
func (m *PMessage) MarshalMethod() MarshalMethodType
func (*PMessage) MessageType ¶
func (m *PMessage) MessageType() MessageType
type ProducerMessage ¶
type ProducerMessage struct {
Topic string `json:"topic"`
Key string `json:"key"`
Value []byte `json:"value"`
// contains filtered or unexported fields
}
ProducerMessage 生产者直接发送的数据 会转换成 TransferFrame 后发送
func (*ProducerMessage) BindFromJSON ¶
func (m *ProducerMessage) BindFromJSON(v any) error
BindFromJSON 从JSON模型获取序列化数据
func (*ProducerMessage) MarshalMethod ¶
func (m *ProducerMessage) MarshalMethod() MarshalMethodType
func (*ProducerMessage) MessageType ¶
func (m *ProducerMessage) MessageType() MessageType
func (*ProducerMessage) Reset ¶
func (m *ProducerMessage) Reset()
func (*ProducerMessage) String ¶
func (m *ProducerMessage) String() string
type RegisterMessage ¶
type RegisterMessage struct {
Topics []string `json:"topics"` // 对于生产者,无意义
Ack AckType `json:"ack"`
Type LinkType `json:"type"`
Token string `json:"token,omitempty"` // 认证密钥的hash值,当此值不为空时强制有效
}
RegisterMessage 消息注册,适用于生产者和消费者
func (*RegisterMessage) Build ¶
func (m *RegisterMessage) Build() ([]byte, error)
func (*RegisterMessage) Length ¶
func (m *RegisterMessage) Length() int
func (*RegisterMessage) MarshalMethod ¶
func (m *RegisterMessage) MarshalMethod() MarshalMethodType
func (*RegisterMessage) MessageType ¶
func (m *RegisterMessage) MessageType() MessageType
func (*RegisterMessage) Parse ¶
func (m *RegisterMessage) Parse(stream []byte) error
func (*RegisterMessage) ParseFrom ¶
func (m *RegisterMessage) ParseFrom(reader io.Reader) error
ParseFrom 从reader解析消息,此操作不够优化,应考虑使用 Parse 方法
func (*RegisterMessage) Reset ¶
func (m *RegisterMessage) Reset()
func (*RegisterMessage) String ¶
func (m *RegisterMessage) String() string
type TransferFrame ¶
type TransferFrame struct {
Head byte // 恒为 FrameHead
Type MessageType // Data 包含的消息类型
DataSize []byte // 标识消息总长度,2个字节, Data 的长度, 同样适用于多帧报文
Data []byte // 若干个消息
Checksum []byte // Checksum 经典校验和算法,2个字节, Data 的校验和
Tail byte // 恒为 FrameTail
// contains filtered or unexported fields
}
TransferFrame TCP传输协议帧, 可一次性传输多条消息
帧结构: | Head | Type | DataSize | Data | Checksum | Tail | |----------|----------|--------------|--------------------|--------------|----------| len | 1 | 1 | 2 | N [0-65526] bytes | 2 | 1 | |----------|----------|--------------|--------------------|--------------|----------| 取值 | 0x3C | | | | | 0x0D | |----------|----------|--------------|--------------------|--------------|----------|
TODO: 实现 Reader 和 Writer 接口
func (*TransferFrame) BuildFrom ¶
func (f *TransferFrame) BuildFrom(m Message) ([]byte, error)
BuildFrom 从协议中构建消息帧
func (*TransferFrame) BuildWith ¶
func (f *TransferFrame) BuildWith(typ MessageType, data []byte) ([]byte, error)
BuildWith 补充字段,编码消息帧
func (*TransferFrame) MarshalMethod ¶
func (f *TransferFrame) MarshalMethod() MarshalMethodType
func (*TransferFrame) ParseChecksum ¶
func (f *TransferFrame) ParseChecksum() uint16
ParseChecksum 将校验和转换为uint16类型
func (*TransferFrame) ParseDataLength ¶
func (f *TransferFrame) ParseDataLength() int
ParseDataLength 获得消息的总长度, DataSize 由标识
func (*TransferFrame) ParseFrom ¶
func (f *TransferFrame) ParseFrom(reader io.Reader) error
ParseFrom 从流中解析数据帧
func (*TransferFrame) Read ¶
func (f *TransferFrame) Read(buf []byte) (int, error)
TODO: io.Reader 接口实现
func (*TransferFrame) Reset ¶
func (f *TransferFrame) Reset()
func (*TransferFrame) String ¶
func (f *TransferFrame) String() string
func (*TransferFrame) Unmarshal ¶
func (f *TransferFrame) Unmarshal(msg Message) error
Unmarshal 反序列化帧消息体
func (*TransferFrame) UnmarshalTo ¶
func (f *TransferFrame) UnmarshalTo() (Message, error)
UnmarshalTo 将帧消息解析成某一个具体的协议消息