stcp

package
v1.3.14 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 5, 2025 License: MIT Imports: 10 Imported by: 0

README

TCP Server 框架(基于消息驱动的接口分层设计)

该模块提供一个轻量、高性能、类型安全的 TCP Server 框架,采用消息驱动的接口分层架构:

  • 通过 NewTcpServer(cnf, readProcessor, connIOFactory) 创建服务,支持工厂模式注入不同的连接处理器
  • ReadProcessor 完全由业务方实现,负责处理从连接读取的数据(定长、变长、心跳等策略均由外部控制)
  • 引入IMsg消息接口系统,提供类型安全的消息处理机制
  • 框架内部管理连接生命周期、并发处理,通过ConnIOFactory支持灵活的连接实现策略

核心组件

  • TcpServer: 主服务器,负责Accept循环和连接管理
  • ConnHandler: 连接处理器,管理单个连接的生命周期,包含panic恢复机制
  • IMsg: 消息接口,定义统一的消息抽象,支持类型安全的消息处理
  • BytesMsg: 字节消息实现,封装字节数组为消息对象
  • BasicConnIO: 基础连接IO实现,提供通用的连接操作功能
  • QSendConn: 队列连接实现,基于BasicConnIO,提供异步消息发送能力
  • IConnSender: 连接发送器接口,定义消息发送和队列管理能力
  • IConnReader: 连接读取器接口,定义数据读取能力,支持不同的帧读取策略
  • IConnIO: 连接IO接口,组合发送器和读取器,提供完整的连接操作能力
  • ConnIOFactory: 连接IO工厂,用于创建不同类型的连接实例

接口分层架构

框架采用三层接口设计,实现关注点分离:

IConnSender - 连接发送器接口

负责消息发送和队列管理功能,提供类型安全的消息处理:

type IConnSender interface {
    // 核心连接管理 (线程安全)
    Conn() net.Conn                    // 获取底层连接
    SetMetaInfo(m MetaInfo)            // 设置元信息 (重入安全)
    MetaInfo() MetaInfo                // 获取元信息
    Close() error                      // 关闭连接 (重入安全)
    
    // 消息处理 (线程安全,重入安全)
    PutMsg(msg IMsg) error             // 发送消息到队列,类型安全
    PopMsgBytes() ([]byte, error)      // 从队列取出消息字节
    // 注意:sendBytes2Conn 是内部方法,不对外暴露
}
IConnReader - 连接读取器接口

负责数据读取相关功能,支持不同的帧读取策略:

type IConnReader interface {
    ReadFrame(conn net.Conn) ([]byte, error)  // 读取一帧数据(一条完整消息的字节流)
}
IConnIO - 连接IO接口

组合发送器和读取器,提供完整的连接操作能力:

type IConnIO interface {
    IConnSender                        // 继承发送能力
    IConnReader                        // 继承读取能力
}
工厂接口
type ConnIOFactory func(conn net.Conn) IConnIO

说明: ConnIOFactory负责创建IConnIO实例。在工厂函数内部,您需要创建IConnReader实例并传递给具体的连接实现。

API 接口

Server 构造
func NewTcpServer(cnf *ServerAcceptCnf, readProcessor ReadProcessor, connIOFactory ConnIOFactory) *TcpServer
ReadProcessor 函数定义
type ReadProcessor func(iConnIO IConnIO, buffer []byte) error

框架会在每个连接上循环调用 ReadProcessor;当返回非 nil 错误时,该连接将被关闭并触发退出钩子。

配置结构
type ServerAcceptCnf struct {
    Address        string         // 监听地址,如 ":9000" 或 "127.0.0.1:9000"
    AcceptDelay    timex.Duration // 初始 accept 退避延迟
    AcceptMaxDelay timex.Duration // 最大 accept 退避延迟
    AcceptMaxRetry int            // 最大 accept 重试次数
    MaxConn        int32          // 最大并发连接数
}

// 获取默认配置
func DefaultServerAcceptCnf() *ServerAcceptCnf
ConnReader 函数定义
type ConnReaderFunc func(handler IConnSender, conn net.Conn) error

框架会在每个连接上循环调用 ConnReaderFunc;当返回非 nil 错误时,该连接将被关闭并触发退出钩子。

启动与关闭
  • 启动服务:Run(errChan chan<- error)
  • 关闭服务:Close() error(停止监听,已建立连接会按钩子流程退出)
Hook 机制
  • 设置连接启动钩子:SetStartHooker(hooker ConnStartEvent)
  • 设置连接退出钩子:SetExitHooker(hooker ConnExitEvent)

Hook函数定义:

type ConnStartEvent func(iConnIO IConnIO)
type ConnExitEvent func(iConnIO IConnIO)

重要: 框架对Hook函数提供panic恢复机制,Hook函数中的panic不会导致整个连接或服务器崩溃,错误会被记录到日志中。

消息发送
  • 类型安全发送:iConnIO.PutMsg(NewBytesMsg([]byte)) error(类型安全的消息发送)
  • 连接关闭:iConnIO.Close() error(线程安全,支持重入调用)
  • 写超时控制:SetWriteTimeout(d time.Duration),默认 5s

重要: 框架内部会自动处理消息发送,用户代码应使用PutMsg()方法来发送消息,而不是直接调用底层发送方法。

连接信息
  • 获取监听地址:Address() string
  • 获取当前连接数:ConnCount() int32
日志与元信息
  • 默认 MetaInfo 为 BasicMetaInfo{RemoteAddr}
  • 可调用 iConnIO.SetMetaInfo(...) 替换为自定义 MetaInfo(需实现 zapcore.ObjectMarshaler)以丰富日志字段
  • SetMetaInfo 方法线程安全,支持在任何时候更新连接元信息

IConnIO 接口设计

线程安全与重入性

IConnIO 接口及其组合的 IConnSenderIConnReader 接口的所有公开方法都经过精心设计,确保:

  • 线程安全: 所有方法都可以在多个 goroutine 中并发调用
  • 重入安全: Close() 方法支持多次调用,不会 panic,确保资源清理的幂等性
  • 错误处理: 方法可能返回错误,但保证不会 panic
  • panic恢复: ConnHandler对关键操作提供panic恢复机制,确保单个连接的异常不影响整体服务
方法分类
  • 必需方法: Conn(), SetMetaInfo(), MetaInfo(), Close(), ReadFrame()
  • 消息方法: PutMsg(), PopMsgBytes()
  • 内部机制: ConnHandler内部自动调用PopMsgBytes()和sendBytes2Conn()完成发送循环
生命周期管理

通过 IConnIO.Close() 可以优雅地关闭连接:

// 业务代码中的任何地方都可以安全调用
err := iConnIO.Close()  // 线程安全,重入安全
if err != nil {
    // 处理关闭错误,但不会 panic
}

调用 Close() 后会触发:

  1. 发送队列关闭,PopMsgBytes() 返回错误,发送循环退出
  2. 网络连接关闭,ReadFrame() 返回错误,接收循环退出
  3. ConnHandler.Exit() 执行,触发退出钩子

快速上手示例

基础服务启动

package main

import (
    "io"
    "net"
    "time"

    "github.com/pinealctx/neptune/stcp"
    "github.com/pinealctx/neptune/timex"
)

func main() {
    // ReadProcessor 函数:负责处理从连接读取的数据
    readProcessor := func(iConnIO stcp.IConnIO, buffer []byte) error {
        // 示例:回显数据(使用新的消息系统)
        msg := stcp.NewBytesMsg(buffer)
        return iConnIO.PutMsg(msg)  // 类型安全的消息发送
    }

    // 创建连接读取器工厂(实现自定义帧读取逻辑)
    reader := &MyConnReader{}  // 直接创建读取器实例

    // 创建连接IO工厂(使用队列连接实现)
    connIOFactory := func(conn net.Conn) stcp.IConnIO {
        reader := &MyConnReader{}  // 自定义读取器实现
        return stcp.NewQSendConnHandler(conn, 1024, reader)  // 队列容量1024
    }

    // 创建服务器配置
    cnf := stcp.DefaultServerAcceptCnf()
    cnf.Address = ":9000"
    cnf.MaxConn = 1000
    
    // 创建服务器
    srv := stcp.NewTcpServer(cnf, readProcessor, connIOFactory)
    
    // 设置连接钩子(可选)
    srv.SetStartHooker(func(iConnIO stcp.IConnIO) {
        // 连接建立时的处理
    })
    srv.SetExitHooker(func(iConnIO stcp.IConnIO) {
        // 连接断开时的处理
    })
    
    // 设置全局写超时(可选)
    stcp.SetWriteTimeout(5 * time.Second)

    // 启动服务器
    errCh := make(chan error, 1)
    srv.Run(errCh)

    // 等待服务器退出
    if err := <-errCh; err != nil {
        panic(err)
    }
}

// 示例:自定义连接读取器实现
type MyConnReader struct {
    // 不需要持有连接,设计更纯净
}

func (r *MyConnReader) ReadFrame(conn net.Conn) ([]byte, error) {
    // 示例:读取一行数据
    buf := make([]byte, 1024)
    n, err := conn.Read(buf)
    if err != nil {
        return nil, err
    }
    return buf[:n], nil
}

高级特性:灵活的连接IO工厂

框架支持通过ConnIOFactory工厂模式注入不同的连接实现,满足各种业务场景需求。所有连接实现都必须遵循 IConnIO 接口的线程安全约定。

不同队列容量的连接实现
// 大容量队列连接(适合高并发场景)
largeQueueFactory := func(conn net.Conn) stcp.IConnIO {
    reader := &MyConnReader{}  // 创建读取器实例
    return stcp.NewQSendConnHandler(conn, 10000, reader)
}

// 小容量队列连接(适合内存敏感场景)
smallQueueFactory := func(conn net.Conn) stcp.IConnIO {
    reader := &MyConnReader{}
    return stcp.NewQSendConnHandler(conn, 100, reader)
}

// 无限容量队列连接
unlimitedFactory := func(conn net.Conn) stcp.IConnIO {
    reader := &MyConnReader{}
    return stcp.NewQSendConnHandler(conn, 0, reader)  // 0表示无限容量
}
条件化连接选择
// 根据连接来源选择不同的连接实现
conditionalFactory := func(conn net.Conn) stcp.IConnIO {
    remoteAddr := conn.RemoteAddr().String()
    
    reader := &MyConnReader{}
    
    if strings.Contains(remoteAddr, "127.0.0.1") {
        // 本地连接使用大队列
        return stcp.NewQSendConnHandler(conn, 10000, reader)
    } else {
        // 外部连接使用小队列
        return stcp.NewQSendConnHandler(conn, 1000, reader)
    }
}
自定义连接实现
// 如果你有自定义的IConnIO实现
customFactory := func(conn net.Conn) stcp.IConnIO {
    // return your custom implementation
    // return NewMyCustomConnIO(conn, customConfig)
    
    // 示例中仍使用QSendConn
    reader := &MyConnReader{}
    return stcp.NewQSendConnHandler(conn, 1024, reader)
}

连接生命周期管理示例

业务代码中主动关闭连接
func businessLogic(iConnIO stcp.IConnIO, buffer []byte) error {
    // 检查是否需要关闭连接
    if shouldCloseConnection(buffer) {
        // 线程安全地关闭连接,触发完整的清理流程
        if err := iConnIO.Close(); err != nil {
            // 记录错误但不会 panic
            log.Printf("Close connection error: %v", err)
        }
        return io.EOF // 返回错误让 ReadProcessor 退出
    }
    
    // 继续处理消息(使用新的消息系统)
    msg := stcp.NewBytesMsg(processMessage(buffer))
    return iConnIO.PutMsg(msg)
}
多goroutine环境下的安全调用
func multiGoroutineExample(iConnIO stcp.IConnIO) {
    // Goroutine 1: 处理业务逻辑
    go func() {
        for {
            // 线程安全的元信息更新
            iConnIO.SetMetaInfo(&MyMetaInfo{
                UserID:    getCurrentUser(),
                Timestamp: time.Now().Unix(),
            })
            time.Sleep(time.Second)
        }
    }()
    
    // Goroutine 2: 发送心跳
    go func() {
        ticker := time.NewTicker(30 * time.Second)
        defer ticker.Stop()
        
        for range ticker.C {
            // 线程安全的消息发送(使用新的消息系统)
            heartbeat := stcp.NewBytesMsg([]byte("PING"))
            if err := iConnIO.PutMsg(heartbeat); err != nil {
                // 连接可能已关闭,安全退出
                return
            }
        }
    }()
    
    // Goroutine 3: 条件关闭
    go func() {
        <-shutdownSignal
        // 多个goroutine可以安全地调用Close()
        iConnIO.Close() // 重入安全,不会panic
    }()
}

定长消息读取示例

import (
    "io"
    "net"
    
    "github.com/pinealctx/neptune/stcp"
)

// 定长消息读取器实现
type FixedLengthReader struct {
    // 不需要持有连接引用
}

func (r *FixedLengthReader) ReadFrame(conn net.Conn) ([]byte, error) {
    const messageSize = 128
    buf := make([]byte, messageSize)
    if _, err := io.ReadFull(conn, buf); err != nil {
        return nil, err // 读不足或连接关闭则退出
    }
    return buf, nil
}

// ReadProcessor 处理定长消息
func fixedLengthProcessor(iConnIO stcp.IConnIO, buffer []byte) error {
    // 处理定长消息...(使用新的消息系统)
    msg := stcp.NewBytesMsg(buffer)
    return iConnIO.PutMsg(msg)
}

变长消息读取示例(前置长度头)

import (
    "encoding/binary"
    "io"
    "net"
    
    "github.com/pinealctx/neptune/stcp"
)

// 变长消息读取器实现
type VarLengthReader struct {
    // 更纯净的设计,连接作为参数传入
}

func (r *VarLengthReader) ReadFrame(conn net.Conn) ([]byte, error) {
    // 读取 4 字节长度头(大端序)
    var hdr [4]byte
    if _, err := io.ReadFull(conn, hdr[:]); err != nil {
        return nil, err
    }
    
    length := binary.BigEndian.Uint32(hdr[:])
    if length == 0 || length > 10<<20 { // 防御:限制最大包 10MB
        return nil, io.ErrUnexpectedEOF
    }
    
    // 读取消息体
    body := make([]byte, length)
    if _, err := io.ReadFull(conn, body); err != nil {
        return nil, err
    }
    
    return body, nil
}

// ReadProcessor 处理变长消息
func varLengthProcessor(iConnIO stcp.IConnIO, buffer []byte) error {
    // 处理变长消息...
    msg := stcp.NewBytesMsg(buffer)
    return iConnIO.PutMsg(msg)
}

心跳与读超时控制示例

import (
    "io"
    "net"
    "time"
    
    "github.com/pinealctx/neptune/stcp"
)

// 心跳读取器实现
type HeartbeatReader struct {
    // 无状态设计,更灵活
}

func (r *HeartbeatReader) ReadFrame(conn net.Conn) ([]byte, error) {
    const heartbeatInterval = 15 * time.Second
    const gracePeriod = 5 * time.Second
    
    // 设置读超时
    if err := conn.SetReadDeadline(time.Now().Add(heartbeatInterval + gracePeriod)); err != nil {
        return nil, err
    }

    // 读取消息
    var hdr [4]byte
    if _, err := io.ReadFull(conn, hdr[:]); err != nil {
        return nil, err
    }
    
    // 成功读到数据后,重置下一次的读超时
    if err := conn.SetReadDeadline(time.Now().Add(heartbeatInterval + gracePeriod)); err != nil {
        return nil, err
    }
    
    // 继续读取消息体...
    return hdr[:], nil
}

// ReadProcessor 处理心跳消息
func heartbeatProcessor(iConnIO stcp.IConnIO, buffer []byte) error {
    // 处理心跳或业务消息...
    return nil
}

自定义日志元信息

在握手成功或识别到业务身份后,可设置更丰富的 MetaInfo,便于日志检索:

import (
    "github.com/pinealctx/neptune/stcp"
    "go.uber.org/zap/zapcore"
)

// 自定义MetaInfo结构
type MyMetaInfo struct {
    UserID     string
    RemoteAddr string
    SessionID  string
}

func (m *MyMetaInfo) MarshalLogObject(enc zapcore.ObjectEncoder) error {
    enc.AddString("userId", m.UserID)
    enc.AddString("remoteAddr", m.RemoteAddr)
    enc.AddString("sessionId", m.SessionID)
    return nil
}

func (m *MyMetaInfo) GetRemoteAddr() string {
    return m.RemoteAddr
}

// ReadProcessor 中设置自定义MetaInfo
func customMetaProcessor(iConnIO stcp.IConnIO, buffer []byte) error {
    // 例如:在握手后设置自定义MetaInfo
    customMeta := &MyMetaInfo{
        UserID:     "user123",
        RemoteAddr: iConnIO.Conn().RemoteAddr().String(),
        SessionID:  "session456",
    }
    iConnIO.SetMetaInfo(customMeta)
    
    // 继续处理消息(使用新的消息系统)
    msg := stcp.NewBytesMsg(buffer)
    return iConnIO.PutMsg(msg)
}

优雅关闭示例

import (
    "context"
    "os"
    "os/signal"
    "syscall"
    "time"
)

func main() {
    // 创建连接IO工厂
    connIOFactory := func(conn net.Conn) stcp.IConnIO {
        readerFactory := func(c net.Conn) stcp.IConnReader {
            return &MyConnReader{}
        }
        return stcp.NewQSendConnHandler(conn, 1024, readerFactory)
    }
    
    srv := stcp.NewTcpServer(cnf, readProcessor, connIOFactory)
    
    errCh := make(chan error, 1)
    srv.Run(errCh)
    
    // 监听系统信号
    sigCh := make(chan os.Signal, 1)
    signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
    
    select {
    case err := <-errCh:
        if err != nil {
            panic(err)
        }
    case <-sigCh:
        // 优雅关闭
        if err := srv.Close(); err != nil {
            log.Printf("Server close error: %v", err)
        }
        // 等待现有连接处理完成
        time.Sleep(time.Second)
    }
}

运行时行为说明

连接处理流程

  1. 服务器在指定地址监听TCP连接
  2. 每个新连接会通过ConnIOFactory创建对应的IConnIO实例
  3. ConnHandler启动两个goroutine管理连接:
    • 接收goroutine:循环调用IConnIO.ReadFrame(conn)获取数据,然后调用ReadProcessor处理
    • 发送goroutine:循环调用IConnIO.PopMsgBytes()获取消息,然后通过内部方法sendBytes2Conn()发送
  4. 当ReadFrame()、ReadProcessor或PopMsgBytes()返回错误时,对应goroutine退出,连接被关闭
  5. ConnHandler提供panic恢复机制,确保单个连接异常不影响整体服务

连接数限制

  • 达到MaxConn限制时,新连接会被立即关闭并记录日志
  • 连接计数通过原子操作维护,在连接建立时递增,退出时递减

错误处理与重试

  • Accept操作出错时采用指数退避策略重试
  • 可配置最大重试次数和延迟时间
  • 写操作支持超时控制,默认5秒
  • Hook函数异常会被捕获并记录,不会影响连接正常运行

内存与性能

  • 发送队列支持容量限制,防止内存无限增长
  • 队列满时PutMsg会返回错误,应用层应优雅处理
  • 所有关键路径都经过并发安全设计
  • IConnIO 接口方法的线程安全实现确保高并发场景下的稳定性
  • Close() 方法使用 sync.Once 实现,避免重复资源释放的开销
  • 消息系统提供类型安全,避免运行时类型转换错误

设计原则与最佳实践

接口分层原则
  • IMsg: 定义消息抽象,支持类型安全的消息处理
  • IConnSender: 专注于消息发送和队列管理,提供PutMsg/PopMsgBytes方法,sendBytes2Conn为内部方法
  • IConnReader: 专注于读取能力,支持不同的帧解析策略
  • IConnIO: 组合两者,提供完整的连接操作能力
  • BasicConnIO: 提供基础实现,支持组合设计模式
  • 业务代码只需依赖相应接口,无需了解具体实现细节
工厂模式设计
  • ConnIOFactory: 支持不同的连接实现(队列、直接发送、批处理等)
  • 消息工厂: NewBytesMsg等便捷构造函数简化消息创建
  • 便于依赖注入和单元测试Mock,提高代码可测试性
并发安全设计
  • 所有公开方法都是线程安全的,可在多 goroutine 环境下安全使用
  • Close() 方法支持重入调用,多次调用不会导致 panic
  • 资源清理操作具有幂等性,确保系统的健壮性
  • panic恢复机制确保单个连接异常不影响整体服务
错误处理策略
  • 方法可能返回错误,但保证不会 panic
  • 错误信息结构化,便于日志记录和问题诊断
  • 支持优雅降级,连接异常时不影响整体服务稳定性
  • Hook函数异常会被隔离,不会传播到核心逻辑

架构特点

  • 消息驱动设计:IMsg接口提供类型安全的消息处理,支持扩展不同消息类型
  • 接口分层架构:IConnSender/IConnReader/IConnIO三层接口实现关注点分离
  • 组合设计模式:BasicConnIO提供基础实现,子类通过组合复用功能
  • 工厂模式:ConnIOFactory支持灵活的实现策略
  • 接口驱动:所有核心功能基于接口定义,便于扩展和测试
  • 插拔式架构:可以轻松替换和扩展连接读取器、发送器实现
  • 异步发送:队列化发送避免阻塞接收处理,支持PutMsg/PopMsgBytes分离设计
  • 完善日志:结构化日志支持,MetaInfo可自定义
  • Hook机制:连接生命周期钩子便于监控和扩展,具备panic恢复能力
  • 并发安全:所有共享状态都有适当的同步保护,支持高并发场景
  • 重入安全:关键方法如 Close() 支持多次调用,确保资源清理的可靠性
  • 测试友好:工厂模式便于单元测试时注入Mock实现
  • 关注点分离:读取、发送、连接管理职责清晰,便于维护和扩展
  • 生命周期管理:通过接口方法即可完整控制连接的创建、运行和销毁
  • 错误隔离:panic恢复机制确保单个连接异常不影响整体服务稳定性
  • 类型安全:消息系统在编译期检查类型,避免运行时错误

如需更多示例或适配特定协议,可通过实现IConnReader接口按需定制帧读取逻辑。


IConnIO 接口实现指南

如果需要实现自定义的 IConnIO,请遵循以下约定:

接口定义

type IConnIO interface {
    IConnSender    // 继承发送能力
    IConnReader    // 继承读取能力
}

type IConnSender interface {
    // 必需方法 - 必须实现
    Conn() net.Conn              // 返回底层连接
    SetMetaInfo(m MetaInfo)       // 设置元信息,线程安全
    MetaInfo() MetaInfo          // 获取元信息,线程安全
    Close() error                // 关闭连接,重入安全,不能panic
    
    // 消息处理方法 - 必须实现
    PutMsg(msg IMsg) error           // 发送消息到队列,类型安全
    PopMsgBytes() ([]byte, error)    // 从队列取出消息字节
    
    // 内部方法 - 仅供ConnHandler调用
    sendBytes2Conn(bs []byte) error  // 内部发送方法,外部代码禁止调用
}

type IConnReader interface {
    ReadFrame(conn net.Conn) ([]byte, error)  // 读取一帧数据(一条完整消息的字节流)
}

实现要求

1. 线程安全性
  • sendBytes2Conn() 外的所有方法都必须是线程安全的
  • ReadFrame(conn net.Conn) 在单一goroutine中调用
  • sendBytes2Conn() 仅在ConnHandler的发送goroutine中调用,无需考虑线程安全
  • 可以使用 sync.Mutex, atomic.Value, sync.Once 等同步原语
2. 重入安全性
  • Close() 方法必须支持多次调用
  • 推荐使用 sync.Once 确保资源只清理一次
  • 多次调用应该是无害的,可以返回错误但不能panic
3. 错误处理
  • 方法可以返回错误,但绝对不能panic
  • 错误信息应该具有描述性,便于调试
4. 内部方法约定
  • sendBytes2Conn() 是内部方法,仅供ConnHandler调用
  • 外部代码绝对不能直接调用此方法,这会破坏消息队列机制
  • 用户应始终使用 PutMsg() 来发送消息
  • ReadFrame()返回错误会导致连接关闭
  • PutMsg()需要验证消息类型,返回具体的类型错误信息
4. 资源管理
  • Close() 中确保所有资源得到正确清理
  • 队列、连接、goroutine等都应该被适当关闭
  • 考虑错误包装,提供清晰的错误上下文

实现示例模板

type MyCustomConnIO struct {
    *stcp.BasicConnIO  // 组合基础实现
    // 其他自定义字段...
}

func (c *MyCustomConnIO) PutMsg(msg stcp.IMsg) error {
    // 实现消息类型检查和处理
    if msg == nil {
        return fmt.Errorf("MyCustomConnIO.PutMsg: nil message")
    }
    
    switch m := msg.(type) {
    case stcp.BytesMsg:
        // 处理字节消息
        return c.handleBytesMsg(m)
    default:
        return fmt.Errorf("MyCustomConnIO.PutMsg: unsupported message type: %s", msg.Name())
    }
}

func (c *MyCustomConnIO) PopMsgBytes() ([]byte, error) {
    // 实现消息队列的出队逻辑
    return c.queue.Pop()
}

func (c *MyCustomConnIO) sendBytes2Conn(bs []byte) error {
    // 实现网络发送逻辑 - 这是内部方法,仅供ConnHandler调用
    return stcp.SendBytes2Conn(c.Conn(), bs)
}

// 实现其他必需的方法...

最佳实践

组合优于继承
  • 推荐组合BasicConnIO基础实现而不是从头实现所有方法
  • 可以重用框架提供的基础组件,专注于业务逻辑
  • 新的接口设计使Reader更加无状态,便于复用
消息类型设计
  • 为不同业务场景定义特定的消息类型:
type ProtobufMsg struct {
    MessageType string
    Data        []byte
}

func (m *ProtobufMsg) Name() string {
    return fmt.Sprintf("ProtobufMsg:%s", m.MessageType)
}
错误包装
  • 在PutMsg()中进行类型检查和错误包装:
func (c *MyCustomConnIO) PutMsg(msg stcp.IMsg) error {
    if msg == nil {
        return fmt.Errorf("MyCustomConnIO.PutMsg: nil message")
    }
    
    bsMsg, ok := msg.(stcp.BytesMsg)
    if !ok {
        return fmt.Errorf("MyCustomConnIO.PutMsg: expected BytesMsg, got %s", msg.Name())
    }
    
    return c.processBytesMsg(bsMsg)
}
工厂函数设计
  • 提供便于使用的工厂函数:
func NewMyCustomConnIO(conn net.Conn, reader stcp.IConnReader) stcp.IConnIO {
    return &MyCustomConnIO{
        BasicConnIO: stcp.NewBasicConnIO(conn, reader),
        // 初始化其他字段...
    }
}

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func SendBytes2Conn added in v1.3.0

func SendBytes2Conn(conn net.Conn, bs []byte) error

SendBytes2Conn send bytes to connection Utility function

func SetWriteTimeout added in v1.2.12

func SetWriteTimeout(d time.Duration)

SetWriteTimeout set write timeout Utility function, it's a global setting If not set, default is 5 seconds

Types

type BasicConnIO added in v1.3.6

type BasicConnIO struct {
	// contains filtered or unexported fields
}

BasicConnIO basic connection io

func NewBasicConnIO added in v1.3.6

func NewBasicConnIO(conn net.Conn, reader IConnReader) *BasicConnIO

NewBasicConnIO : new basic connection io

func (*BasicConnIO) CloseX added in v1.3.6

func (x *BasicConnIO) CloseX(closeFn func() error) error

CloseX closes connection handler (required, goroutine-safe, re-entrant) This method can be called directly via IConnSender/IConnIO interface to gracefully shutdown the connection and trigger the associated ConnHandler.Exit() through the goroutine defer chain Multiple calls are safe and will not panic closeFn : custom close function, if nil, use x.conn.Close()

func (*BasicConnIO) Conn added in v1.3.6

func (x *BasicConnIO) Conn() net.Conn

Conn returns the underlying network connection

func (*BasicConnIO) MetaInfo added in v1.3.6

func (x *BasicConnIO) MetaInfo() MetaInfo

MetaInfo gets meta info for logging (required, goroutine-safe)

func (*BasicConnIO) ReadFrame added in v1.3.6

func (x *BasicConnIO) ReadFrame(conn net.Conn) ([]byte, error)

ReadFrame reads one frame(an entire message bytes) from connection

func (*BasicConnIO) SetMetaInfo added in v1.3.6

func (x *BasicConnIO) SetMetaInfo(m MetaInfo)

SetMetaInfo sets meta info for logging (required, goroutine-safe, re-entrant)

type BasicMetaInfo added in v1.2.12

type BasicMetaInfo struct {
	RemoteAddr string
}

BasicMetaInfo basic meta info

func NewBasicMetaInfo added in v1.3.0

func NewBasicMetaInfo(conn net.Conn) *BasicMetaInfo

NewBasicMetaInfo new basic meta info

func (*BasicMetaInfo) GetRemoteAddr added in v1.3.0

func (m *BasicMetaInfo) GetRemoteAddr() string

GetRemoteAddr get remote address

func (*BasicMetaInfo) MarshalLogObject added in v1.2.12

func (m *BasicMetaInfo) MarshalLogObject(enc zapcore.ObjectEncoder) error

MarshalLogObject marshal log object

type BytesMsg added in v1.3.6

type BytesMsg struct {
	Bs []byte
}

BytesMsg : bytes message

func NewBytesMsg added in v1.3.6

func NewBytesMsg(bs []byte) BytesMsg

NewBytesMsg : new bytes message

func (BytesMsg) Name added in v1.3.6

func (x BytesMsg) Name() string

Name : message name

type ConnExitEvent added in v1.2.12

type ConnExitEvent func(iConnIO IConnIO)

ConnExitEvent on connection exit

type ConnHandler added in v1.2.12

type ConnHandler struct {
	// contains filtered or unexported fields
}

ConnHandler connection handler

func NewConnHandler added in v1.2.12

func NewConnHandler(readerProcessor ReadProcessor, iConnIO IConnIO) *ConnHandler

NewConnHandler : new connection handler

func (*ConnHandler) AddExitHook added in v1.2.12

func (x *ConnHandler) AddExitHook(hook ConnExitEvent)

AddExitHook add exit hook

func (*ConnHandler) AddStartHook added in v1.2.12

func (x *ConnHandler) AddStartHook(hook ConnStartEvent)

AddStartHook add start hook

func (*ConnHandler) Exit added in v1.3.0

func (x *ConnHandler) Exit()

Exit : exit connection handler

func (*ConnHandler) GetIConn added in v1.3.3

func (x *ConnHandler) GetIConn() IConnIO

GetIConn get connection interface

func (*ConnHandler) Start added in v1.2.12

func (x *ConnHandler) Start()

Start : start connection handler

type ConnIOFactory added in v1.3.3

type ConnIOFactory func(conn net.Conn) IConnIO

ConnIOFactory connection io factory

type ConnReaderFactory added in v1.3.3

type ConnReaderFactory func(conn net.Conn) IConnReader

ConnReaderFactory connection reader factory

type ConnStartEvent added in v1.2.12

type ConnStartEvent func(iConnIO IConnIO)

ConnStartEvent on connection start

type IConnIO added in v1.3.3

type IConnIO interface {
	// IConnSender connection sender interface
	IConnSender
	// IConnReader connection reader interface
	IConnReader
}

IConnIO connection io interface

Thread Safety & Re-entrance Requirements: - Multiple calls to Close() should be safe (may return error but MUST NOT panic) - Resource cleanup operations should be idempotent

type IConnReader added in v1.3.3

type IConnReader interface {
	// ReadFrame reads one frame(an entire message bytes) from connection
	// conn : connection to read from
	// return : read buffer and error if any
	ReadFrame(conn net.Conn) ([]byte, error)
}

IConnReader connection reader interface

type IConnSender added in v1.3.0

type IConnSender interface {
	// Conn returns the underlying network connection (required, goroutine-safe)
	Conn() net.Conn

	// SetMetaInfo sets meta info for logging (required, goroutine-safe, re-entrant)
	SetMetaInfo(m MetaInfo)

	// MetaInfo gets meta info for logging (required, goroutine-safe)
	MetaInfo() MetaInfo

	// Close closes the connection (required, goroutine-safe, re-entrant)
	// Multiple calls should be safe, may return error but MUST NOT panic
	Close() error

	// PutMsg put message to send (required, goroutine-safe)
	// return : error if any
	PutMsg(msg IMsg) error

	// PopMsgBytes pop message bytes to send (required, goroutine-safe)
	// return : message bytes and error if any
	PopMsgBytes() ([]byte, error)
	// contains filtered or unexported methods
}

IConnSender connection sender interface

Thread Safety & Re-entrance Requirements: - ALL methods MUST be goroutine-safe and re-entrant - Multiple calls to Close() should be safe (may return error but MUST NOT panic) - Resource cleanup operations should be idempotent

type IMsg added in v1.3.6

type IMsg interface {
	// Name : message name
	Name() string
}

IMsg : message interface

type MetaInfo added in v1.2.12

type MetaInfo interface {
	// ObjectMarshaler marshal log object
	zapcore.ObjectMarshaler
	// GetRemoteAddr get remote address
	GetRemoteAddr() string
}

MetaInfo meta info for logging

type QSendConn added in v1.3.3

type QSendConn struct {
	*BasicConnIO
	// contains filtered or unexported fields
}

QSendConn queue send connection sender send queue based connection sender user can put bytes to send queue async then send bytes in queue one by one

func NewQSendConnHandler added in v1.3.0

func NewQSendConnHandler(conn net.Conn, sendQSize int, reader IConnReader) *QSendConn

NewQSendConnHandler : new queue send connection handler sendQSize : send queue size, if sendQSize is 0, the send queue has unlimited capacity

otherwise, the send queue has limited capacity.
if the send queue is full, Put2Queue will return error

func (*QSendConn) Close added in v1.3.3

func (x *QSendConn) Close() error

Close closes connection handler (required, goroutine-safe, re-entrant) This method can be called directly via IConnSender/IConnIO interface to gracefully shutdown the connection and trigger the associated ConnHandler.Exit() through the goroutine defer chain Multiple calls are safe and will not panic

func (*QSendConn) PopMsgBytes added in v1.3.6

func (x *QSendConn) PopMsgBytes() ([]byte, error)

PopMsgBytes pop message bytes to send (optional, goroutine-safe, re-entrant)

func (*QSendConn) PutMsg added in v1.3.6

func (x *QSendConn) PutMsg(msg IMsg) error

PutMsg send bytes async(put to send queue) (optional, goroutine-safe, re-entrant)

type ReadProcessor added in v1.3.3

type ReadProcessor func(iConnIO IConnIO, buffer []byte) error

ReadProcessor read handler logic iConnIO : connection io interface buffer : read buffer return : error if any Actually, this is the core function to process the read data

type ServerAcceptCnf added in v1.2.12

type ServerAcceptCnf struct {
	Address        string         `json:"address"`
	AcceptDelay    timex.Duration `json:"acceptDelay"`
	AcceptMaxDelay timex.Duration `json:"acceptMaxDelay"`
	AcceptMaxRetry int            `json:"acceptMaxRetry"`
	MaxConn        int32          `json:"maxConn"`
}

ServerAcceptCnf server start config

func DefaultServerAcceptCnf added in v1.2.12

func DefaultServerAcceptCnf() *ServerAcceptCnf

DefaultServerAcceptCnf : get default start cnf

type TcpServer added in v1.3.0

type TcpServer struct {
	// contains filtered or unexported fields
}

TcpServer tcp server

func NewTcpServer added in v1.3.0

func NewTcpServer(cnf *ServerAcceptCnf, readerProcessor ReadProcessor, connIOFactory ConnIOFactory) *TcpServer

NewTcpServer : new tcp server

func (*TcpServer) Address added in v1.3.0

func (x *TcpServer) Address() string

Address : get listen address

func (*TcpServer) Close added in v1.3.0

func (x *TcpServer) Close() error

Close : close the server listener

func (*TcpServer) ConnCount added in v1.3.0

func (x *TcpServer) ConnCount() int32

ConnCount : get current connection count

func (*TcpServer) Run added in v1.3.0

func (x *TcpServer) Run(errChan chan<- error)

Run : run server errChan : error channel if server exit, the error will be sent to errChan

func (*TcpServer) SetExitHooker added in v1.3.0

func (x *TcpServer) SetExitHooker(hooker ConnExitEvent)

SetExitHooker : set connection exit hooker

func (*TcpServer) SetStartHooker added in v1.3.0

func (x *TcpServer) SetStartHooker(hooker ConnStartEvent)

SetStartHooker : set connection start hooker

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL