message

package
v0.0.15 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 20, 2025 License: MIT Imports: 11 Imported by: 0

README

message

简介

message 包提供高性能的自定义消息协议与连接封装,支持消息类型注册、心跳包、字符串消息、自动分包、并发安全等,适用于分布式服务、长连接、定制协议等多种场景。

主要特性
  • 统一消息接口,支持自定义消息类型、封包与解包
  • 消息工厂机制,支持类型注册与动态生成
  • 高性能连接封装,支持并发安全、自动分包、心跳机制
  • 内置心跳消息、字符串消息实现
  • 支持 bufio.Scanner 自动分割消息包
  • 完整单元测试覆盖
设计理念

message 包遵循"高效、灵活、可扩展"的设计理念,接口与实现分离,工厂注册机制灵活,连接层支持自动分包与心跳,适合高并发、定制协议、长连接等场景。

安装

前置条件
  • Go 版本要求:Go 1.18+
  • 依赖要求:
    • github.com/stretchr/testify(仅测试)
安装命令
go get -u github.com/fsyyft-go/kit/net/message

快速开始

基础用法

请查阅示例代码

详细指南

核心概念
  • Message 接口:统一封装消息类型、封包与解包方法
  • 消息工厂:支持类型注册与动态生成,便于扩展
  • Conn 接口:自定义消息连接,支持并发安全、自动分包、心跳
  • 心跳消息/字符串消息:内置常用消息类型
  • Scanner:支持 bufio.Scanner 自动分割消息包
常见用例
  • 分布式服务自定义协议通信
  • 长连接心跳与消息收发
  • 物联网、游戏、IM 等定制消息协议
  • 高并发消息网关
最佳实践
  • 注册所有自定义消息类型,避免类型冲突
  • 合理设置心跳间隔,防止连接假死
  • 使用消息工厂统一管理类型与生成逻辑
  • 充分利用并发安全的连接封装
添加自定义消息类型的详细步骤

要扩展一个新的消息类型,建议遵循如下流程:

1. 在 message_type.go 中定义消息类型常量并注册
// 步骤1:定义唯一的消息类型常量
const MyCustomMessageType MessageType = 0x20 // 需保证唯一,避免与已有类型冲突

// 步骤4:在 init() 中注册工厂函数
func init() {
    // ...已有注册...
    _ = FactoryRegister(MyCustomMessageType, GenerateMyCustomMessage)
}

说明:所有消息类型常量建议集中管理,注册必须在 init() 完成,确保工厂可用。

2. 定义扩展接口(如有需要)和消息结构体
// 步骤2:如有扩展需求,定义扩展接口
type MyCustomMessage interface {
    Message // 必须嵌入基础 Message 接口
    // 可扩展自定义方法
    CustomField() string
}

// 步骤2:实现消息结构体
type myCustomMessage struct {
    messageType MessageType
    customField string
    // 其它字段...
}

// 实现扩展接口方法
func (m *myCustomMessage) CustomField() string {
    return m.customField
}

说明:扩展接口便于类型断言和业务扩展,结构体需包含类型和自定义字段。

3. 实现 Message 接口
// 步骤3:实现 Message 接口
func (m *myCustomMessage) MessageType() MessageType {
    return m.messageType
}

func (m *myCustomMessage) Pack() ([]byte, error) {
    // 将 customField 等内容序列化为字节数组
    // 伪代码:return []byte(m.customField), nil
}

func (m *myCustomMessage) Unpack(payload []byte) error {
    // 从 payload 反序列化 customField
    // 伪代码:m.customField = string(payload); return nil
}

说明:Pack/Unpack 需保证与协议一致,错误处理要健壮。

4. 实现 GenerateMessageFunc 工厂函数
// 步骤4:实现工厂函数
func GenerateMyCustomMessage(messageType MessageType, payload []byte) (Message, error) {
    if messageType != MyCustomMessageType {
        // 类型校验,防止误用
        return nil, errors.New("消息类型不匹配")
    }
    m := &myCustomMessage{messageType: MyCustomMessageType}
    // 调用 Unpack 解析 payload
    if err := m.Unpack(payload); err != nil {
        return nil, err
    }
    return m, nil
}

说明:工厂函数用于工厂动态生成消息实例,需校验类型并处理 payload。


补充说明

  • 推荐为新类型编写单元测试,覆盖 Pack/Unpack/工厂函数的正常与异常场景。
  • 若有复杂字段,建议采用二进制序列化/反序列化方案,保证兼容性和健壮性。

API 文档

主要类型
// Message 消息接口
type Message interface {
    MessageType() MessageType
    Pack() ([]byte, error)
    Unpack(payload []byte) error
}

// Conn 消息连接接口
type Conn interface {
    Close() error
    LocalAddr() net.Addr
    RemoteAddr() net.Addr
    SetDeadline(time.Time) error
    SetReadDeadline(time.Time) error
    SetWriteDeadline(time.Time) error
    Closed() bool
    Start(context.Context)
    SendMessage(Message) error
    Message() <-chan Message
}

// 消息工厂注册与生成
func FactoryRegister(messageType MessageType, fn GenerateMessageFunc) error
func FactoryGenerate(messageType MessageType, payload []byte) (Message, error)

// 内置消息类型
const (
    HeartbeatMessageType    MessageType = 0x80
    SingleStringMessageType MessageType = 0x09
)

// 内置消息构造
func NewHeartbeatMessage(sn uint64) *heartbeatMessage
func NewSingleStringMessage(msg string) *singleStringMessage

// 连接封装
func WrapConn(c net.Conn, heartbeatInterval time.Duration) *conn
关键函数
  • WrapConn:将 net.Conn 封装为消息连接,支持心跳与自动分包
  • SendMessage:发送消息(并发安全)
  • Message:接收消息通道(只读)
  • FactoryRegister/FactoryGenerate:注册与生成自定义消息类型
  • NewHeartbeatMessage/NewSingleStringMessage:内置消息构造
  • NewScanner:创建自定义分包 Scanner

错误处理

  • 所有接口方法均返回 error,需检查
  • 消息类型未注册、payload 非法等均有详细错误
  • 连接关闭、超时、网络异常均有详细提示

性能指标

  • 单连接高并发安全,消息收发吞吐量高
  • 自动分包与缓冲队列,适合大规模消息场景
  • 心跳与超时机制对性能影响可控

测试覆盖率

  • 单元测试覆盖所有接口、边界、异常场景
  • 使用 testify,覆盖率高

调试指南

  • 检查消息类型注册与工厂逻辑
  • 合理设置心跳与缓冲区参数
  • 利用测试用例覆盖边界与异常

相关文档

贡献指南

欢迎提交 Issue、PR 或建议,详见 贡献指南

许可证

本项目采用 MIT License 许可证。详见 LICENSE

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func FactoryRegister

func FactoryRegister(messageType MessageType, messageFunc GenerateMessageFunc) error

FactoryRegister 注册消息类型到默认工厂。

参数:

  • messageType: 消息类型。
  • messageFunc: 消息生成方法。

返回值:

  • error: 错误信息。

func NewHeartbeatMessage

func NewHeartbeatMessage(serialNumber uint64) *heartbeatMessage

NewHeartbeatMessage 创建一个心跳消息包。

参数:

  • serialNumber: 心跳包序列号。

返回值:

  • *heartbeatMessage: 新建的心跳消息包。

func NewMessageFactory

func NewMessageFactory() *messageFactory

NewMessageFactory 创建一个新消息工厂实例。

无参数。

返回值:

  • *messageFactory: 新建的消息工厂实例。

func NewScanner

func NewScanner(r io.Reader) *bufio.Scanner

NewScanner 创建自定义消息包分割的 bufio.Scanner。

参数:

  • r: 输入流。

返回值:

  • *bufio.Scanner: 自定义分割的 Scanner。

func NewSingleStringMessage

func NewSingleStringMessage(message string) *singleStringMessage

NewSingleStringMessage 创建一个简单的字符串消息包。

参数:

  • message: 字符串消息内容。

返回值:

  • *singleStringMessage: 新建的字符串消息包。

func WrapConn

func WrapConn(c net.Conn, heartbeatInterval time.Duration) *conn

WrapConn 将 net.Conn 包装成自定义消息包传输时使用的网络连接。

参数:

  • c: 底层网络连接。
  • heartbeatInterval: 心跳包发送间隔。

返回值:

  • *conn: 自定义连接实例。

Types

type Conn

type Conn interface {
	// Close 关闭连接。
	//
	// 返回值:
	//   - error: 错误信息。
	Close() error
	// LocalAddr 返回本地网络地址。
	//
	// 返回值:
	//   - net.Addr: 本地网络地址。
	LocalAddr() net.Addr
	// RemoteAddr 返回远程网络地址。
	//
	// 返回值:
	//   - net.Addr: 远程网络地址。
	RemoteAddr() net.Addr
	// SetDeadline 设置读写相关的截止时间。
	//
	// 参数:
	//   - t: 截止时间。
	//
	// 返回值:
	//   - error: 错误信息。
	SetDeadline(time.Time) error
	// SetReadDeadline 设置读截止时间。
	//
	// 参数:
	//   - t: 截止时间。
	//
	// 返回值:
	//   - error: 错误信息。
	SetReadDeadline(time.Time) error
	// SetWriteDeadline 设置写截止时间。
	//
	// 参数:
	//   - t: 截止时间。
	//
	// 返回值:
	//   - error: 错误信息。
	SetWriteDeadline(time.Time) error

	// Closed 返回连接是否已经关闭。
	//
	// 返回值:
	//   - bool: 连接是否关闭。
	Closed() bool
	// Start 启动消息读写 goroutine。
	//
	// 参数:
	//   - ctx: 上下文,用于控制 goroutine 生命周期。
	Start(context.Context)
	// SendMessage 发送消息。
	//
	// 参数:
	//   - message: 待发送的消息。
	//
	// 返回值:
	//   - error: 错误信息。
	SendMessage(Message) error
	// Message 返回只读消息通道。
	//
	// 返回值:
	//   - <-chan Message: 只读消息通道。
	Message() <-chan Message
}

Conn 自定义消息包传输时使用的网络连接接口。 多个 goroutine 可以同时调用 Conn 上的方法。

type GenerateMessageFunc

type GenerateMessageFunc func(MessageType, []byte) (Message, error)

GenerateMessageFunc 生成消息包结构体的方法类型,实现 Generator 接口。

参数:

  • messageType: 消息类型。
  • payload: 消息对应的字节数组的表示形式(不包含消息类型和长度)。

返回值:

  • Message: 生成的消息包。
  • error: 错误信息。

func (GenerateMessageFunc) GenerateMessage

func (f GenerateMessageFunc) GenerateMessage(messageType MessageType, payload []byte) (Message, error)

GenerateMessage 调用函数生成消息包结构体。

参数:

  • messageType: 消息类型。
  • payload: 消息对应的字节数组的表示形式(不包含消息类型和长度)。

返回值:

  • Message: 生成的消息包。
  • error: 错误信息。

type Generator

type Generator interface {
	// GenerateMessage 生成消息包结构体。
	//
	// 参数:
	//   - messageType: 消息类型。
	//   - payload: 消息对应的字节数组的表示形式(不包含消息类型和长度)。
	//
	// 返回值:
	//   - Message: 生成的消息包。
	//   - error: 错误信息。
	GenerateMessage(messageType MessageType, payload []byte) (Message, error)
}

Generator 消息生成器接口,定义生成消息包结构体的方法。

type HeartbeatMessage

type HeartbeatMessage interface {
	// SerialNumber 返回心跳包序列号。
	//
	// 返回值:
	//   - uint64: 心跳包序列号。
	SerialNumber() uint64
}

HeartbeatMessage 心跳消息包接口。

type Message

type Message interface {
	// MessageType 返回消息类型。
	//
	// 返回值:
	//   - MessageType: 消息类型。
	MessageType() MessageType

	// Pack 将消息内容转换为 payload 字节数组(不包含消息类型和长度)。
	//
	// 返回值:
	//   - []byte: 消息内容的字节数组。
	//   - error: 错误信息。
	Pack() ([]byte, error)

	// Unpack 将 payload 字节数组(不包含消息类型和长度)还原为消息内容。
	//
	// 参数:
	//   - payload: 消息内容的字节数组。
	//
	// 返回值:
	//   - error: 错误信息。
	Unpack(payload []byte) error
}

Message 消息包接口,定义消息类型、封包与拆包方法。

func FactoryGenerate

func FactoryGenerate(messageType MessageType, payload []byte) (Message, error)

FactoryGenerate 根据消息类型和消息负载数据创建消息。

参数:

  • messageType: 消息类型。
  • payload: 消息负载数据。

返回值:

  • Message: 生成的消息实例。
  • error: 错误信息。

func GenerateHeartbeatMessage

func GenerateHeartbeatMessage(messageType MessageType, payload []byte) (Message, error)

GenerateHeartbeatMessage 生成心跳消息包结构体。

参数:

  • messageType: 消息类型。
  • payload: 心跳包序列号的字节数组。

返回值:

  • Message: 生成的心跳消息包。
  • error: 错误信息。

func GenerateSingleStringMessage

func GenerateSingleStringMessage(messageType MessageType, payload []byte) (Message, error)

GenerateSingleStringMessage 生成简单的字符串消息包结构体。

参数:

  • messageType: 消息类型。
  • payload: 字符串消息的字节数组。

返回值:

  • Message: 生成的字符串消息包。
  • error: 错误信息。

type MessageFactory

type MessageFactory interface {
	// Register 注册消息类型到工厂。
	//
	// 参数:
	//   - messageType: 消息类型。
	//   - messageFunc: 消息生成方法。
	//
	// 返回值:
	//   - error: 错误信息。
	Register(messageType MessageType, messageFunc GenerateMessageFunc) error
	// Generate 根据消息类型和消息负载数据创建消息。
	//
	// 参数:
	//   - messageType: 消息类型。
	//   - payload: 消息负载数据。
	//
	// 返回值:
	//   - Message: 生成的消息实例。
	//   - error: 错误信息。
	Generate(messageType MessageType, payload []byte) (Message, error)
}

MessageFactory 消息工厂接口,定义消息类型注册与消息生成方法。

type MessageType

type MessageType uint16

MessageType 消息类型。

const (
	HeartbeatMessageType    MessageType = 0x80 // 心跳消息。
	SingleStringMessageType MessageType = 0x09 // 简单的字符串消息。
)

HeartbeatMessageType 心跳消息类型常量。

type SingleStringMessage

type SingleStringMessage interface {
	// Message 返回字符串消息内容。
	//
	// 返回值:
	//   - string: 字符串消息内容。
	Message() string
}

SingleStringMessage 简单的字符串消息包接口。

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL