proto

package
v0.3.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 6, 2023 License: MIT Imports: 13 Imported by: 0

Documentation

Overview

Package proto 若涉及到字节序,则全部为大端序

Index

Constants

View Source
const (
	FrameHead = 0x3C // 0x3C (可见字符: <)
	FrameTail = 0x0D // 0x0D (回车符)
)
View Source
const FrameMinLength int = 7
View Source
const TotalNumberOfMessages int = 256

Variables

View Source
var (
	ErrMethodNotImplemented = errors.New("method not implemented")
	ErrMessageNotFull       = errors.New("message is not full")
)
View Source
var CalcSHA = CalcSHA1

CalcSHA 计算一个字符串的hash值

Functions

func AddDescriptor

func AddDescriptor(m Message, text string) bool

AddDescriptor 添加描述符号表, 对于已经实现的协议则不允许修改

func BuildCMessages deprecated

func BuildCMessages(cms ...*CMessage) (slice []byte)

Deprecated:BuildCMessages 构造消产者消息序列 TODO: 改为 io.Writer

func BuildPMessages deprecated

func BuildPMessages(pms ...*PMessage) (slice []byte)

Deprecated: BuildPMessages 构造生产者消息序列 TODO: 改为 io.Writer

func CalcChecksum

func CalcChecksum(data []byte) uint16

CalcChecksum 经典校验和算法

func CalcSHA1 added in v0.3.1

func CalcSHA1(str string) string

CalcSHA1 计算字符串的哈希值

func CalcSHA256 added in v0.3.1

func CalcSHA256(str string) string

CalcSHA256 计算字符串的哈希值

func GetMessageResponseStatusText added in v0.3.1

func GetMessageResponseStatusText(status MessageResponseStatus) string

func SetGlobalCrypto

func SetGlobalCrypto(c Crypto)

SetGlobalCrypto 替换全局加解密对象

Types

type AckType

type AckType string
const (
	NoConfirm     AckType = "0"
	LeaderConfirm AckType = "1"
	AllConfirm    AckType = "all"
)

type CMessage

type CMessage struct {
	Offset      []byte // uint64
	ProductTime []byte // time.Time.Unix() 消息创建的Unix时间戳
	PM          *PMessage
}

CMessage 消费者消息记录, 不允许复制

消息结构:
	|   TopicLen   |      Topic      |   KeyLen   |        key        |   ValueLen   |   Value   |   Offset   |   ProductTime   |
	|--------------|-----------------|------------|-------------------|--------------|-----------|------------|-----------------|
len	|      1       | N [1-255] bytes |      1     |  N [1-255] bytes  |       2      |     N     |      8     |         8       |
   	|--------------|-----------------|------------|-------------------|--------------|-----------|------------|-----------------|

打包后的总长度不能超过 65526 字节

func (*CMessage) Build

func (m *CMessage) Build() ([]byte, error)

func (*CMessage) BuildTo

func (m *CMessage) BuildTo(writer io.Writer) (int, error)

func (*CMessage) Length

func (m *CMessage) Length() int

func (*CMessage) MarshalMethod

func (m *CMessage) MarshalMethod() MarshalMethodType

func (*CMessage) MessageType

func (m *CMessage) MessageType() MessageType

func (*CMessage) Parse

func (m *CMessage) Parse(stream []byte) error

func (*CMessage) ParseFrom

func (m *CMessage) ParseFrom(reader io.Reader) error

func (*CMessage) Reset

func (m *CMessage) Reset()

func (*CMessage) String

func (m *CMessage) String() string

type CPMPool

type CPMPool struct {
	// contains filtered or unexported fields
}

func NewCPMPool

func NewCPMPool() *CPMPool

func (*CPMPool) GetCM

func (p *CPMPool) GetCM() *CMessage

GetCM Attention: PM is nil

func (*CPMPool) GetPM

func (p *CPMPool) GetPM() *PMessage

func (*CPMPool) PutCM

func (p *CPMPool) PutCM(v *CMessage)

func (*CPMPool) PutPM

func (p *CPMPool) PutPM(v *PMessage)

type ConsumerMessage

type ConsumerMessage struct {
	Topic       string    `json:"topic"`
	Key         string    `json:"key"`
	Value       []byte    `json:"value"`
	Offset      uint64    `json:"offset"`
	ProductTime time.Time `json:"product_time"` // 服务端收到消息时的时间戳
	// contains filtered or unexported fields
}

ConsumerMessage 直接投递给消费者的单条数据消息 需要从 TransferFrame 中转换

func (*ConsumerMessage) MarshalMethod

func (m *ConsumerMessage) MarshalMethod() MarshalMethodType

func (*ConsumerMessage) MessageType

func (m *ConsumerMessage) MessageType() MessageType

func (*ConsumerMessage) ParseFromCMessage

func (m *ConsumerMessage) ParseFromCMessage(cm *CMessage)

func (*ConsumerMessage) Reset

func (m *ConsumerMessage) Reset()

func (*ConsumerMessage) ShouldBindJSON

func (m *ConsumerMessage) ShouldBindJSON(v any) error

ShouldBindJSON 将数据反序列化到一个JSON模型上

func (*ConsumerMessage) String

func (m *ConsumerMessage) String() string

type Counter

type Counter struct {
	// contains filtered or unexported fields
}

Counter 计数器

func NewCounter

func NewCounter() *Counter

NewCounter 创建一个新的计数器

func (*Counter) Increment

func (c *Counter) Increment()

Increment 计数器 +1,并返回新的值

func (*Counter) Value

func (c *Counter) Value() uint64

Value 获取当前计数器的数值

func (*Counter) ValueBeforeIncrement

func (c *Counter) ValueBeforeIncrement() uint64

ValueBeforeIncrement 首先获取当前计数器的数值,然后将计数器 +1

type Crypto

type Crypto interface {
	Encrypt(stream []byte) ([]byte, error) // 加密数据体
	Decrypt(stream []byte) ([]byte, error) // 解密数据体
}

Crypto 加解密支持

func DefaultCrypto

func DefaultCrypto() Crypto

DefaultCrypto 默认的加解密器,就是不加密

func GlobalCrypto

func GlobalCrypto() Crypto

GlobalCrypto 全局加解密对象

type DecryptFunc

type DecryptFunc = func(stream []byte) ([]byte, error)

type Descriptor

type Descriptor struct {
	// contains filtered or unexported fields
}

func GetDescriptor

func GetDescriptor(code MessageType) *Descriptor

func (Descriptor) Message

func (m Descriptor) Message() Message

Message 协议定义,可能为nil

func (Descriptor) MessageType

func (m Descriptor) MessageType() MessageType

MessageType 协议类别

func (Descriptor) Text

func (m Descriptor) Text() string

Text 协议类别的文字描述

func (Descriptor) UserDefined

func (m Descriptor) UserDefined() bool

UserDefined 是否是用户自定义协议

type EncryptFunc

type EncryptFunc = func(stream []byte) ([]byte, error)

type FramePool

type FramePool struct {
	// contains filtered or unexported fields
}

func NewFramePool

func NewFramePool() *FramePool

func (*FramePool) Get

func (p *FramePool) Get() *TransferFrame

func (*FramePool) Put

func (p *FramePool) Put(v *TransferFrame)

type HCPMessagePool

type HCPMessagePool struct {
	// contains filtered or unexported fields
}

func NewHCPMPool

func NewHCPMPool() *HCPMessagePool

func (*HCPMessagePool) GetCM

func (m *HCPMessagePool) GetCM() *ConsumerMessage

func (*HCPMessagePool) GetPM

func (m *HCPMessagePool) GetPM() *ProducerMessage

func (*HCPMessagePool) PutCM

func (m *HCPMessagePool) PutCM(v *ConsumerMessage)

func (*HCPMessagePool) PutPM

func (m *HCPMessagePool) PutPM(v *ProducerMessage)

type HumanMessage

type HumanMessage interface {
	MessageType() MessageType         // 消息类别
	MarshalMethod() MarshalMethodType // 消息序列化方法
	String() string
}

HumanMessage 直接返回给调用者的消息定义

type LinkType

type LinkType string
const (
	ConsumerLinkType LinkType = "CONSUMER"
	ProducerLinkType LinkType = "PRODUCER"
)

type MarshalMethodType

type MarshalMethodType string
const (
	JsonMarshalMethod   MarshalMethodType = "JSON"
	BinaryMarshalMethod MarshalMethodType = "BINARY"
)

type Message

type Message interface {
	HumanMessage                           // 类别和消息解码方法
	Length() int                           // 编码后的消息序列长度
	Reset()                                // 重置消息体
	Parse(stream []byte) error             // 从字节序中解析消息
	ParseFrom(reader io.Reader) error      // 从流中解析一个消息
	Build() ([]byte, error)                // 构建消息序列
	BuildTo(writer io.Writer) (int, error) // 直接将待构建的消息序列写入流内
}

type MessageResponse

type MessageResponse struct {
	// 仅当 AcceptedStatus 时才认为服务器接受了请求并下方了有效的参数
	Status      MessageResponseStatus `json:"status"`
	Offset      uint64                `json:"offset,omitempty"`
	ReceiveTime time.Time             `json:"receive_time,omitempty"`
	// 定时器间隔,单位ms,仅生产者有效,生产者需要按照此间隔发送帧消息
	TickerInterval time.Duration `json:"ticker_duration"`
}

MessageResponse 消息响应, P和C通用

func (*MessageResponse) Accepted added in v0.3.1

func (m *MessageResponse) Accepted() bool

func (*MessageResponse) Build

func (m *MessageResponse) Build() ([]byte, error)

func (*MessageResponse) BuildTo

func (m *MessageResponse) BuildTo(writer io.Writer) (int, error)

func (*MessageResponse) Length

func (m *MessageResponse) Length() int

func (*MessageResponse) MarshalMethod

func (m *MessageResponse) MarshalMethod() MarshalMethodType

func (*MessageResponse) MessageType

func (m *MessageResponse) MessageType() MessageType

MessageType 依据偏移量字段判断消息类型

func (*MessageResponse) Parse

func (m *MessageResponse) Parse(stream []byte) error

func (*MessageResponse) ParseFrom

func (m *MessageResponse) ParseFrom(reader io.Reader) error

ParseFrom 从reader解析消息,此操作不够优化,应考虑使用 Parse 方法

func (*MessageResponse) Reset

func (m *MessageResponse) Reset()

func (*MessageResponse) String

func (m *MessageResponse) String() string

type MessageResponseStatus added in v0.3.1

type MessageResponseStatus string
const (
	AcceptedStatus       MessageResponseStatus = "0" // 已接受,正常状态
	RefusedStatus        MessageResponseStatus = "1"
	TokenIncorrectStatus MessageResponseStatus = "10" // 密钥不争取
)

type MessageType

type MessageType byte
const (
	NotImplementMessageType MessageType = 0
	RegisterMessageType     MessageType = 1   // 客户端消费者/生产者注册消息类别 c -> s
	RegisterMessageRespType MessageType = 2   // s -> c
	ReRegisterMessageType   MessageType = 3   // s -> c 令客户端重新发起注册流程
	MessageRespType         MessageType = 100 // 生产者消息响应 s -> c
	PMessageType            MessageType = 101 // 生产者消息类别 c -> s
	CMessageType            MessageType = 102 // 消费者消息类别s -> c
)

如果增加了新的协议代码,则都需要在 descriptors 中添加其类型

type NoCopy

type NoCopy struct{}

func (*NoCopy) Lock

func (*NoCopy) Lock()

func (*NoCopy) Unlock

func (*NoCopy) Unlock()

type NotImplementMessage

type NotImplementMessage struct{}

func (NotImplementMessage) Build

func (m NotImplementMessage) Build() ([]byte, error)

func (NotImplementMessage) BuildTo

func (m NotImplementMessage) BuildTo(_ io.Writer) (int, error)

func (NotImplementMessage) Length

func (m NotImplementMessage) Length() int

func (NotImplementMessage) MarshalMethod

func (m NotImplementMessage) MarshalMethod() MarshalMethodType

func (NotImplementMessage) MessageType

func (m NotImplementMessage) MessageType() MessageType

func (NotImplementMessage) Parse

func (m NotImplementMessage) Parse(_ []byte) error

func (NotImplementMessage) ParseFrom

func (m NotImplementMessage) ParseFrom(_ io.Reader) error

func (NotImplementMessage) Reset

func (m NotImplementMessage) Reset()

func (NotImplementMessage) String

func (m NotImplementMessage) String() string

type PMessage

type PMessage struct {
	Topic []byte // 字符串转字节
	Key   []byte
	Value []byte
	// contains filtered or unexported fields
}

PMessage 生产者消息数据, 不允许复制

消息结构:
	|   TopicLen   |      Topic      |   KeyLen   |        key        |   ValueLen   |   Value   |
	|--------------|-----------------|------------|-------------------|--------------|-----------|
len	|      1       | N [1-255] bytes |      1     |  N [1-255] bytes  |       2      |     N     |
   	|--------------|-----------------|------------|-------------------|--------------|-----------|

打包后的总长度不能超过 65526 字节

func (*PMessage) Build

func (m *PMessage) Build() ([]byte, error)

func (*PMessage) BuildTo

func (m *PMessage) BuildTo(writer io.Writer) (int, error)

func (*PMessage) Length

func (m *PMessage) Length() int

Length 获取编码后的消息序列长度

func (*PMessage) MarshalMethod

func (m *PMessage) MarshalMethod() MarshalMethodType

func (*PMessage) MessageType

func (m *PMessage) MessageType() MessageType

func (*PMessage) Parse

func (m *PMessage) Parse(stream []byte) error

func (*PMessage) ParseFrom

func (m *PMessage) ParseFrom(reader io.Reader) error

func (*PMessage) Reset

func (m *PMessage) Reset()

func (*PMessage) String

func (m *PMessage) String() string

type ProducerMessage

type ProducerMessage struct {
	Topic string `json:"topic"`
	Key   string `json:"key"`
	Value []byte `json:"value"`
	// contains filtered or unexported fields
}

ProducerMessage 生产者直接发送的数据 会转换成 TransferFrame 后发送

func (*ProducerMessage) BindFromJSON

func (m *ProducerMessage) BindFromJSON(v any) error

BindFromJSON 从JSON模型获取序列化数据

func (*ProducerMessage) MarshalMethod

func (m *ProducerMessage) MarshalMethod() MarshalMethodType

func (*ProducerMessage) MessageType

func (m *ProducerMessage) MessageType() MessageType

func (*ProducerMessage) Reset

func (m *ProducerMessage) Reset()

func (*ProducerMessage) String

func (m *ProducerMessage) String() string

type Queue

type Queue struct {
	// contains filtered or unexported fields
}

func NewQueue

func NewQueue(capacity int) *Queue

func (*Queue) Append

func (q *Queue) Append(value any)

func (*Queue) Capacity

func (q *Queue) Capacity() int

func (*Queue) Length

func (q *Queue) Length() int

func (*Queue) PopLeft

func (q *Queue) PopLeft() any

type RegisterMessage

type RegisterMessage struct {
	Topics []string `json:"topics"` // 对于生产者,无意义
	Ack    AckType  `json:"ack"`
	Type   LinkType `json:"type"`
	Token  string   `json:"token,omitempty"` // 认证密钥的hash值,当此值不为空时强制有效
}

RegisterMessage 消息注册,适用于生产者和消费者

func (*RegisterMessage) Build

func (m *RegisterMessage) Build() ([]byte, error)

func (*RegisterMessage) BuildTo

func (m *RegisterMessage) BuildTo(writer io.Writer) (int, error)

func (*RegisterMessage) Length

func (m *RegisterMessage) Length() int

func (*RegisterMessage) MarshalMethod

func (m *RegisterMessage) MarshalMethod() MarshalMethodType

func (*RegisterMessage) MessageType

func (m *RegisterMessage) MessageType() MessageType

func (*RegisterMessage) Parse

func (m *RegisterMessage) Parse(stream []byte) error

func (*RegisterMessage) ParseFrom

func (m *RegisterMessage) ParseFrom(reader io.Reader) error

ParseFrom 从reader解析消息,此操作不够优化,应考虑使用 Parse 方法

func (*RegisterMessage) Reset

func (m *RegisterMessage) Reset()

func (*RegisterMessage) String

func (m *RegisterMessage) String() string

type TransferFrame

type TransferFrame struct {
	Head     byte        // 恒为 FrameHead
	Type     MessageType // Data 包含的消息类型
	DataSize []byte      // 标识消息总长度,2个字节, Data 的长度, 同样适用于多帧报文
	Data     []byte      // 若干个消息
	Checksum []byte      // Checksum 经典校验和算法,2个字节, Data 的校验和
	Tail     byte        // 恒为 FrameTail
	// contains filtered or unexported fields
}

TransferFrame TCP传输协议帧, 可一次性传输多条消息

帧结构:
	|   Head   |   Type   |   DataSize   |        Data        |   Checksum   |   Tail   |
	|----------|----------|--------------|--------------------|--------------|----------|
len	|    1     |    1     |       2      |  N [0-65526] bytes |       2      |     1    |
   	|----------|----------|--------------|--------------------|--------------|----------|
取值	|   0x3C   |          |              |                    |              |   0x0D   |
	|----------|----------|--------------|--------------------|--------------|----------|

TODO: 实现 Reader 和 Writer 接口

func (*TransferFrame) Build

func (f *TransferFrame) Build() ([]byte, error)

Build 编码消息帧 (最终方法)

func (*TransferFrame) BuildFrom

func (f *TransferFrame) BuildFrom(m Message) ([]byte, error)

BuildFrom 从协议中构建消息帧

func (*TransferFrame) BuildWith

func (f *TransferFrame) BuildWith(typ MessageType, data []byte) ([]byte, error)

BuildWith 补充字段,编码消息帧

func (*TransferFrame) Length

func (f *TransferFrame) Length() int

Length 获得帧总长

func (*TransferFrame) MarshalMethod

func (f *TransferFrame) MarshalMethod() MarshalMethodType

func (*TransferFrame) Parse

func (f *TransferFrame) Parse(stream []byte) error

Parse 从字节序中解析数据帧

func (*TransferFrame) ParseChecksum

func (f *TransferFrame) ParseChecksum() uint16

ParseChecksum 将校验和转换为uint16类型

func (*TransferFrame) ParseDataLength

func (f *TransferFrame) ParseDataLength() int

ParseDataLength 获得消息的总长度, DataSize 由标识

func (*TransferFrame) ParseFrom

func (f *TransferFrame) ParseFrom(reader io.Reader) error

ParseFrom 从流中解析数据帧

func (*TransferFrame) Read

func (f *TransferFrame) Read(buf []byte) (int, error)

TODO: io.Reader 接口实现

func (*TransferFrame) Reset

func (f *TransferFrame) Reset()

func (*TransferFrame) String

func (f *TransferFrame) String() string

func (*TransferFrame) Unmarshal

func (f *TransferFrame) Unmarshal(msg Message) error

Unmarshal 反序列化帧消息体

func (*TransferFrame) UnmarshalTo

func (f *TransferFrame) UnmarshalTo() (Message, error)

UnmarshalTo 将帧消息解析成某一个具体的协议消息

func (*TransferFrame) Write

func (f *TransferFrame) Write(buf []byte) (int, error)

io.Writer 接口实现

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL