message

package
v0.0.16 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 28, 2026 License: MIT Imports: 12 Imported by: 0

README

message

简介

message 包提供高性能的自定义消息协议与连接封装,支持消息类型注册、心跳包、字符串消息、自动分包、并发安全等,适用于分布式服务、长连接、定制协议等多种场景。

主要特性
  • 统一消息接口,支持自定义消息类型、封包与解包
  • 消息工厂机制,支持类型注册与动态生成
  • 高性能连接封装,支持并发安全、自动分包、心跳机制
  • 内置心跳消息、字符串消息实现
  • 支持 bufio.Scanner 自动分割消息包
  • 完整单元测试覆盖
设计理念

message 包遵循"高效、灵活、可扩展"的设计理念,接口与实现分离,工厂注册机制灵活,连接层支持自动分包与心跳,适合高并发、定制协议、长连接等场景。

安装

前置条件
  • Go 版本要求:Go 1.18+
  • 依赖要求:
    • github.com/stretchr/testify(仅测试)
安装命令
go get -u github.com/fsyyft-go/kit/net/message

快速开始

基础用法

请查阅示例代码

详细指南

核心概念
  • Message 接口:统一封装消息类型、封包与解包方法
  • 消息工厂:支持类型注册与动态生成,便于扩展
  • Conn 接口:自定义消息连接,支持并发安全、自动分包、心跳
  • 心跳消息/字符串消息:内置常用消息类型
  • Scanner:支持 bufio.Scanner 自动分割消息包
常见用例
  • 分布式服务自定义协议通信
  • 长连接心跳与消息收发
  • 物联网、游戏、IM 等定制消息协议
  • 高并发消息网关
最佳实践
  • 注册所有自定义消息类型,避免类型冲突
  • 合理设置心跳间隔,防止连接假死
  • 使用消息工厂统一管理类型与生成逻辑
  • 充分利用并发安全的连接封装
添加自定义消息类型的详细步骤

要扩展一个新的消息类型,建议遵循如下流程:

1. 在 message_type.go 中定义消息类型常量并注册
// 步骤1:定义唯一的消息类型常量
const MyCustomMessageType MessageType = 0x20 // 需保证唯一,避免与已有类型冲突

// 步骤4:在 init() 中注册工厂函数
func init() {
    // ...已有注册...
    _ = FactoryRegister(MyCustomMessageType, GenerateMyCustomMessage)
}

说明:所有消息类型常量建议集中管理,注册必须在 init() 完成,确保工厂可用。

2. 定义扩展接口(如有需要)和消息结构体
// 步骤2:如有扩展需求,定义扩展接口
type MyCustomMessage interface {
    Message // 必须嵌入基础 Message 接口
    // 可扩展自定义方法
    CustomField() string
}

// 步骤2:实现消息结构体
type myCustomMessage struct {
    messageType MessageType
    customField string
    // 其它字段...
}

// 实现扩展接口方法
func (m *myCustomMessage) CustomField() string {
    return m.customField
}

说明:扩展接口便于类型断言和业务扩展,结构体需包含类型和自定义字段。

3. 实现 Message 接口
// 步骤3:实现 Message 接口
func (m *myCustomMessage) MessageType() MessageType {
    return m.messageType
}

func (m *myCustomMessage) Pack() ([]byte, error) {
    // 将 customField 等内容序列化为字节数组
    // 伪代码:return []byte(m.customField), nil
}

func (m *myCustomMessage) Unpack(payload []byte) error {
    // 从 payload 反序列化 customField
    // 伪代码:m.customField = string(payload); return nil
}

说明:Pack/Unpack 需保证与协议一致,错误处理要健壮。

4. 实现 GenerateMessageFunc 工厂函数
// 步骤4:实现工厂函数
func GenerateMyCustomMessage(messageType MessageType, payload []byte) (Message, error) {
    if messageType != MyCustomMessageType {
        // 类型校验,防止误用
        return nil, errors.New("消息类型不匹配")
    }
    m := &myCustomMessage{messageType: MyCustomMessageType}
    // 调用 Unpack 解析 payload
    if err := m.Unpack(payload); err != nil {
        return nil, err
    }
    return m, nil
}

说明:工厂函数用于工厂动态生成消息实例,需校验类型并处理 payload。


补充说明

  • 推荐为新类型编写单元测试,覆盖 Pack/Unpack/工厂函数的正常与异常场景。
  • 若有复杂字段,建议采用二进制序列化/反序列化方案,保证兼容性和健壮性。

API 文档

主要类型
// Message 消息接口
type Message interface {
    MessageType() MessageType
    Pack() ([]byte, error)
    Unpack(payload []byte) error
}

// Conn 消息连接接口
type Conn interface {
    Close() error
    LocalAddr() net.Addr
    RemoteAddr() net.Addr
    SetDeadline(time.Time) error
    SetReadDeadline(time.Time) error
    SetWriteDeadline(time.Time) error
    Closed() bool
    Start(context.Context)
    SendMessage(Message) error
    Message() <-chan Message
}

// 消息工厂注册与生成
func FactoryRegister(messageType MessageType, fn GenerateMessageFunc) error
func FactoryGenerate(messageType MessageType, payload []byte) (Message, error)

// 内置消息类型
const (
    HeartbeatMessageType    MessageType = 0x80
    SingleStringMessageType MessageType = 0x09
)

// 内置消息构造
func NewHeartbeatMessage(sn uint64) *heartbeatMessage
func NewSingleStringMessage(msg string) *singleStringMessage

// 连接封装
func WrapConn(c net.Conn, heartbeatInterval time.Duration) *conn
关键函数
  • WrapConn:将 net.Conn 封装为消息连接,支持心跳与自动分包
  • SendMessage:发送消息(并发安全)
  • Message:接收消息通道(只读)
  • FactoryRegister/FactoryGenerate:注册与生成自定义消息类型
  • NewHeartbeatMessage/NewSingleStringMessage:内置消息构造
  • NewScanner:创建自定义分包 Scanner

错误处理

  • 所有接口方法均返回 error,需检查
  • 消息类型未注册、payload 非法等均有详细错误
  • 连接关闭、超时、网络异常均有详细提示

性能指标

  • 单连接高并发安全,消息收发吞吐量高
  • 自动分包与缓冲队列,适合大规模消息场景
  • 心跳与超时机制对性能影响可控

测试覆盖率

  • 单元测试覆盖所有接口、边界、异常场景
  • 使用 testify,覆盖率高

调试指南

  • 检查消息类型注册与工厂逻辑
  • 合理设置心跳与缓冲区参数
  • 利用测试用例覆盖边界与异常

相关文档

贡献指南

欢迎提交 Issue、PR 或建议,详见 贡献指南

许可证

本项目采用 MIT License 许可证。详见 LICENSE

Documentation

Overview

Package message 提供基于自定义二进制协议的消息类型、工厂注册和连接封装。

本包使用固定 4 字节头部编码消息:前 2 字节为 MessageType,后 2 字节为 payload 长度, 因此单条消息的 payload 最大为 uint16 上限。调用方可以通过 Message、MessageFactory 和 FactoryRegister 扩展自定义消息类型;内置实现提供心跳消息、单字符串消息以及与该协议配套的 Scanner。

WrapConn 会把 net.Conn 包装为按上述协议收发消息的连接,并可按给定间隔发送心跳包。 连接上的并发、生命周期和共享 channel 约束以 Conn 及其方法文档为准。

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func FactoryRegister

func FactoryRegister(messageType MessageType, messageFunc GenerateMessageFunc) error

FactoryRegister 将消息类型注册到默认工厂。

参数:

  • messageType: 待注册的消息类型。
  • messageFunc: 对应的消息生成函数。

返回:

  • error: 默认工厂注册失败时返回错误。

func NewHeartbeatMessage

func NewHeartbeatMessage(serialNumber uint64) *heartbeatMessage

NewHeartbeatMessage 创建心跳消息。

参数:

  • serialNumber: 要写入消息的心跳序列号。

返回:

  • *heartbeatMessage: 新创建的心跳消息实例。

func NewMessageFactory

func NewMessageFactory() *messageFactory

NewMessageFactory 创建新的消息工厂实例。

返回的工厂可并发调用 Register;Generate 依赖底层 map 读取,通常应在完成注册后再供并发生成使用。

参数:无。

返回:

  • *messageFactory: 新创建的消息工厂实例。

func NewScanner

func NewScanner(r io.Reader) *bufio.Scanner

NewScanner 创建按本包协议拆分消息包的 bufio.Scanner

返回的 Scanner 使用 [scanMessage] 作为 SplitFunc,并将 token 缓冲区上限设置为协议允许的最大完整包长度。

参数:

  • r: 提供协议字节流的输入源。

返回:

  • *bufio.Scanner: 按本包协议拆分消息包的 Scanner。

func NewSingleStringMessage

func NewSingleStringMessage(message string) *singleStringMessage

NewSingleStringMessage 创建单字符串消息。

参数:

  • message: 要写入消息 payload 的字符串内容。

返回:

  • *singleStringMessage: 新创建的单字符串消息实例。

func WrapConn

func WrapConn(c net.Conn, heartbeatInterval time.Duration) *conn

WrapConn 将底层 net.Conn 包装为按本包协议异步收发消息的连接。

返回的连接会创建容量为 5120 的接收与发送队列,但不会自动启动后台任务; 调用方需要显式调用 Conn.Start 启动读写循环,且 Start 只应调用一次。 heartbeatInterval 大于 0 时,Start 会额外提交定时心跳发送任务。

参数:

  • c: 待包装的底层网络连接,必须非 nil;调用方负责保证其满足所需的 net.Conn 语义,传入 nil 会导致后续使用时 panic。
  • heartbeatInterval: 心跳发送间隔;小于等于 0 时不会启动心跳任务。

返回:

  • *conn: 包装后的协议连接实例,初始处于未关闭状态;调用方应在不再使用时调用 Close。

Types

type Conn

type Conn interface {
	// Close 关闭连接并通知内部 goroutine 退出。
	//
	// Close 可与 Closed 和 SendMessage 并发调用;重复调用只会在首次关闭时关闭底层连接,
	// 并关闭 [Conn.Message] 返回的共享接收 channel。
	//
	// 参数:无。
	//
	// 返回:
	//   - error: 首次关闭底层连接时返回的错误;连接已关闭时返回 nil。
	Close() error
	// LocalAddr 返回底层连接的本地网络地址。
	//
	// 参数:无。
	//
	// 返回:
	//   - net.Addr: 本地网络地址。
	LocalAddr() net.Addr
	// RemoteAddr 返回底层连接的远程网络地址。
	//
	// 参数:无。
	//
	// 返回:
	//   - net.Addr: 远程网络地址。
	RemoteAddr() net.Addr
	// SetDeadline 设置底层连接的读写截止时间。
	//
	// 调用后会同时影响 [Conn.Start] 启动的内部收发流程。
	//
	// 参数:
	//   - time.Time: 截止时间;零值表示取消已设置的读写截止时间。
	//
	// 返回:
	//   - error: 底层连接设置截止时间失败时返回错误。
	SetDeadline(time.Time) error
	// SetReadDeadline 设置底层连接的读截止时间。
	//
	// 该设置会影响 [Conn.Start] 启动的内部接收流程。
	//
	// 参数:
	//   - time.Time: 截止时间;零值表示取消已设置的读截止时间。
	//
	// 返回:
	//   - error: 底层连接设置读截止时间失败时返回错误。
	SetReadDeadline(time.Time) error
	// SetWriteDeadline 设置底层连接的写截止时间。
	//
	// 该设置会影响 [Conn.Start] 启动的内部发送流程。
	//
	// 参数:
	//   - time.Time: 截止时间;零值表示取消已设置的写截止时间。
	//
	// 返回:
	//   - error: 底层连接设置写截止时间失败时返回错误。
	SetWriteDeadline(time.Time) error

	// Closed 返回连接是否已经关闭。
	//
	// Closed 可与 Close 和 SendMessage 并发调用。
	//
	// 参数:无。
	//
	// 返回:
	//   - bool: 连接是否已经进入关闭状态。
	Closed() bool
	// Start 提交内部发送、接收以及可选心跳任务。
	//
	// Start 不会自行去重,调用方只应调用一次。传入的上下文结束或连接关闭后,
	// 已成功启动的内部任务会退出。任务提交通过包级 goroutine 池完成,提交失败时
	// 当前签名不会向调用方返回错误。
	//
	// 参数:
	//   - context.Context: 控制内部 goroutine 生命周期的上下文,不能为空。
	Start(context.Context)
	// SendMessage 将消息放入内部发送队列。
	//
	// SendMessage 可与 Close 和 Closed 并发调用。返回 nil 仅表示消息已入队,
	// 不表示已经写入底层连接;当发送队列已满时会阻塞,直到队列腾出空间或连接关闭。
	//
	// 参数:
	//   - Message: 待异步发送的消息;调用方应保证其非 nil。
	//
	// 返回:
	//   - error: 连接已关闭,或消息在入队前因收到关闭通知而被拒绝时返回错误。
	SendMessage(Message) error
	// Message 返回连接的共享接收 channel。
	//
	// 该 channel 只创建一次;多个消费者同时读取时会竞争消费消息。
	// 连接关闭时,该 channel 也会被关闭。
	//
	// 参数:无。
	//
	// 返回:
	//   - <-chan Message: 共享的只读消息通道。
	Message() <-chan Message
}

Conn 定义按本包协议收发消息的连接契约。

Close、Closed 和 SendMessage 可以并发调用。Start 不会自行去重, 调用方只应调用一次;重复调用会重复提交内部收发任务和可选心跳任务, 并竞争同一底层连接。Message 返回单个共享接收 channel,多个消费者 同时读取时会竞争消费消息;连接关闭时该 channel 会被关闭。 地址查询方法直接委托给底层 net.Conn;截止时间设置会同步影响 Conn.Start 启动的内部收发流程。

type GenerateMessageFunc

type GenerateMessageFunc func(MessageType, []byte) (Message, error)

GenerateMessageFunc 适配普通函数为 Generator 实现。

适配函数收到的 payload 不包含协议头中的 MessageType 和长度字段。

参数:

  • MessageType: 目标消息类型。
  • []byte: 待解码的消息 payload。

返回:

  • Message: 生成的消息实例。
  • error: 生成失败时返回错误。

func (GenerateMessageFunc) GenerateMessage

func (f GenerateMessageFunc) GenerateMessage(messageType MessageType, payload []byte) (Message, error)

GenerateMessage 调用底层函数生成消息实例。

参数:

  • messageType: 目标消息类型。
  • payload: 待解码的消息 payload。

返回:

  • Message: 生成的消息实例。
  • error: 底层适配函数返回的错误。

type Generator

type Generator interface {
	// GenerateMessage 根据消息类型和 payload 生成消息实例。
	//
	// payload 不包含协议头中的 MessageType 和长度字段。
	//
	// 参数:
	//   - messageType: 目标消息类型。
	//   - payload: 待解码的消息 payload。
	//
	// 返回:
	//   - Message: 生成的消息实例。
	//   - error: 生成失败时返回错误。
	GenerateMessage(messageType MessageType, payload []byte) (Message, error)
}

Generator 根据消息类型和 payload 生成具体消息实例。

type HeartbeatMessage

type HeartbeatMessage interface {
	// SerialNumber 返回心跳消息中的序列号。
	//
	// 参数:无。
	//
	// 返回:
	//   - uint64: 心跳消息中的序列号。
	SerialNumber() uint64
}

HeartbeatMessage 表示携带心跳序列号的消息。

type Message

type Message interface {
	// MessageType 返回消息的协议类型。
	//
	// 参数:无。
	//
	// 返回:
	//   - MessageType: 当前消息的协议类型。
	MessageType() MessageType

	// Pack 将消息编码为 payload。
	//
	// 返回结果不包含协议头中的 MessageType 和长度字段。
	//
	// 参数:无。
	//
	// 返回:
	//   - []byte: 当前消息编码后的 payload。
	//   - error: 编码失败时返回错误。
	Pack() ([]byte, error)

	// Unpack 使用 payload 还原消息内容。
	//
	// payload 不包含协议头中的 MessageType 和长度字段。
	//
	// 参数:
	//   - payload: 需要解码的消息 payload。
	//
	// 返回:
	//   - error: 解码失败时返回错误。
	Unpack(payload []byte) error
}

Message 定义协议消息的类型、封包和解包契约。

Pack 和 Unpack 只处理 payload,不包含协议头中的 MessageType 和长度字段。

func FactoryGenerate

func FactoryGenerate(messageType MessageType, payload []byte) (Message, error)

FactoryGenerate 使用默认工厂根据消息类型和 payload 创建消息实例。

参数:

  • messageType: 目标消息类型。
  • payload: 待解码的消息 payload。

返回:

  • Message: 生成的消息实例。
  • error: 默认工厂生成失败时返回错误。

func GenerateHeartbeatMessage

func GenerateHeartbeatMessage(messageType MessageType, payload []byte) (Message, error)

GenerateHeartbeatMessage 根据消息类型和 payload 生成心跳消息。

messageType 必须等于 HeartbeatMessageType,payload 不能为空。

参数:

  • messageType: 目标消息类型。
  • payload: 待解码的心跳消息 payload。

返回:

  • Message: 生成的心跳消息实例。
  • error: messageType 不匹配、payload 为 nil 或解码失败时返回错误。

func GenerateSingleStringMessage

func GenerateSingleStringMessage(messageType MessageType, payload []byte) (Message, error)

GenerateSingleStringMessage 根据消息类型和 payload 生成单字符串消息。

messageType 必须等于 SingleStringMessageType;nil payload 会被拒绝,非 nil 的空 payload 表示合法空字符串。

参数:

  • messageType: 目标消息类型。
  • payload: 待解码的字符串消息 payload。

返回:

  • Message: 生成的单字符串消息实例。
  • error: messageType 不匹配、payload 为 nil 或解码失败时返回错误。

type MessageFactory

type MessageFactory interface {
	// Register 将消息类型注册到工厂。
	//
	// 同一 messageType 只能注册一次;messageFunc 不能为空。
	//
	// 参数:
	//   - messageType: 待注册的消息类型。
	//   - messageFunc: 对应的消息生成函数。
	//
	// 返回:
	//   - error: 消息类型已存在或 messageFunc 为空时返回错误。
	Register(messageType MessageType, messageFunc GenerateMessageFunc) error
	// Generate 根据消息类型和 payload 创建消息实例。
	//
	// payload 不能为空,但可以是非 nil 的空切片。
	//
	// 参数:
	//   - messageType: 目标消息类型。
	//   - payload: 待解码的消息 payload。
	//
	// 返回:
	//   - Message: 生成的消息实例。
	//   - error: 消息类型未注册、payload 为 nil 或生成失败时返回错误。
	Generate(messageType MessageType, payload []byte) (Message, error)
}

MessageFactory 定义消息类型注册和消息生成契约。

Register 负责将消息类型映射到生成函数;Generate 根据消息类型和 payload 还原消息实例。

type MessageType

type MessageType uint16

MessageType 标识协议中的消息类型。

内置消息类型包括:

  • HeartbeatMessageType: 心跳消息类型。
  • SingleStringMessageType: 仅携带单个字符串 payload 的消息类型。

调用方可通过 FactoryRegister 注册其它 uint16 值作为自定义消息类型。

const (
	// HeartbeatMessageType 表示心跳消息类型。
	HeartbeatMessageType MessageType = 0x80
	// SingleStringMessageType 表示仅携带单个字符串 payload 的消息类型。
	SingleStringMessageType MessageType = 0x09
)

type SingleStringMessage

type SingleStringMessage interface {
	// Message 返回消息中的字符串内容。
	//
	// 参数:无。
	//
	// 返回:
	//   - string: 当前消息中的字符串内容。
	Message() string
}

SingleStringMessage 表示仅携带一个字符串 payload 的消息。

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL