stcp

package
v1.3.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 25, 2025 License: MIT Imports: 10 Imported by: 0

README

TCP Server 框架(基于组合式设计)

该模块提供一个轻量、高性能的 TCP Server 框架:

  • 通过 NewTcpServer(cnf, connReader, connSenderFactory) 创建服务,支持工厂模式注入不同的连接发送器
  • ConnReader 完全由业务方实现,负责从 net.Conn 读取数据并处理(定长、变长、心跳等策略均由外部控制)
  • 框架内部管理连接生命周期、并发处理,通过ConnSenderFactory支持灵活的发送策略,包括完整的配置和Hook机制

核心组件

  • TcpServer: 主服务器,负责Accept循环和连接管理
  • ConnHandler: 连接处理器,管理单个连接的生命周期
  • QSender: 队列发送器,提供异步消息发送能力(IConnSender的一个实现)
  • IConnSender: 连接发送器接口,支持不同的发送策略
  • ConnSenderFactory: 连接发送器工厂,用于创建不同类型的发送器实例

API 接口

Server 构造
func NewTcpServer(cnf *ServerAcceptCnf, connReader ConnReaderFunc, connSenderFactory ConnSenderFactory) *TcpServer
连接发送器工厂
type ConnSenderFactory func(conn net.Conn) IConnSender

工厂函数用于为每个新连接创建相应的发送器实例,支持不同的发送策略。

配置结构
type ServerAcceptCnf struct {
    Address        string         // 监听地址,如 ":9000" 或 "127.0.0.1:9000"
    AcceptDelay    timex.Duration // 初始 accept 退避延迟
    AcceptMaxDelay timex.Duration // 最大 accept 退避延迟
    AcceptMaxRetry int            // 最大 accept 重试次数
    MaxConn        int32          // 最大并发连接数
}

// 获取默认配置
func DefaultServerAcceptCnf() *ServerAcceptCnf
ConnReader 函数定义
type ConnReaderFunc func(handler IConnSender, conn net.Conn) error

框架会在每个连接上循环调用 ConnReaderFunc;当返回非 nil 错误时,该连接将被关闭并触发退出钩子。

启动与关闭
  • 启动服务:Run(errChan chan<- error)
  • 关闭服务:Close() error(停止监听,已建立连接会按钩子流程退出)
Hook 机制
  • 设置连接启动钩子:SetStartHooker(hooker ConnStartEvent)
  • 设置连接退出钩子:SetExitHooker(hooker ConnExitEvent)
消息发送
  • 异步发送:handler.Put2Queue([]byte) error(写入发送队列)
  • 写超时控制:SetWriteTimeout(d time.Duration),默认 5s
连接信息
  • 获取监听地址:Address() string
  • 获取当前连接数:ConnCount() int32
日志与元信息
  • 默认 MetaInfo 为 BasicMetaInfo{RemoteAddr}
  • 可调用 handler.SetMetaInfo(...) 替换为自定义 MetaInfo(需实现 zapcore.ObjectMarshaler)以丰富日志字段

快速上手示例

基础服务启动

package main

import (
    "io"
    "net"
    "time"

    "github.com/pinealctx/neptune/stcp"
    "github.com/pinealctx/neptune/timex"
)

func main() {
    // ConnReader 函数:负责从连接读取数据并处理
    connReader := func(sender stcp.IConnSender, conn net.Conn) error {
        // 示例:回显一行数据
        buf := make([]byte, 1024)
        n, err := conn.Read(buf)
        if err != nil {
            return err // io.EOF 或其他错误会触发退出
        }
        // 业务处理...
        return sender.Put2Queue(buf[:n])  // 异步发送响应
    }

    // 创建连接发送器工厂(使用队列发送器)
    senderFactory := func(conn net.Conn) stcp.IConnSender {
        return stcp.NewQSendConnHandler(conn, 1024)  // 队列容量1024
    }

    // 创建服务器配置
    cnf := stcp.DefaultServerAcceptCnf()
    cnf.Address = ":9000"
    cnf.MaxConn = 1000
    
    // 创建服务器
    srv := stcp.NewTcpServer(cnf, connReader, senderFactory)
    
    // 设置连接钩子(可选)
    srv.SetStartHooker(func(sender stcp.IConnSender) {
        // 连接建立时的处理
    })
    srv.SetExitHooker(func(sender stcp.IConnSender) {
        // 连接断开时的处理
    })
    
    // 设置全局写超时(可选)
    stcp.SetWriteTimeout(5 * time.Second)

    // 启动服务器
    errCh := make(chan error, 1)
    srv.Run(errCh)

    // 等待服务器退出
    if err := <-errCh; err != nil {
        panic(err)
    }
}

高级特性:灵活的连接发送器工厂

框架支持通过ConnSenderFactory工厂模式注入不同的发送器实现,满足各种业务场景需求:

不同队列容量的发送器
// 大容量队列发送器(适合高并发场景)
largeQueueFactory := func(conn net.Conn) stcp.IConnSender {
    return stcp.NewQSendConnHandler(conn, 10000)
}

// 小容量队列发送器(适合内存敏感场景)
smallQueueFactory := func(conn net.Conn) stcp.IConnSender {
    return stcp.NewQSendConnHandler(conn, 100)
}

// 无限容量队列发送器
unlimitedFactory := func(conn net.Conn) stcp.IConnSender {
    return stcp.NewQSendConnHandler(conn, 0)  // 0表示无限容量
}
条件化发送器选择
// 根据连接来源选择不同的发送器
conditionalFactory := func(conn net.Conn) stcp.IConnSender {
    remoteAddr := conn.RemoteAddr().String()
    
    if strings.Contains(remoteAddr, "127.0.0.1") {
        // 本地连接使用大队列
        return stcp.NewQSendConnHandler(conn, 10000)
    } else {
        // 外部连接使用小队列
        return stcp.NewQSendConnHandler(conn, 1000)
    }
}
自定义发送器实现
// 如果你有自定义的IConnSender实现
customFactory := func(conn net.Conn) stcp.IConnSender {
    // return your custom implementation
    // return NewMyCustomSender(conn, customConfig)
    return stcp.NewQSendConnHandler(conn, 1024)  // 示例中仍使用QSender
}

定长消息读取示例

import (
    "io"
    "net"
    
    "github.com/pinealctx/neptune/stcp"
)

func fixedLengthReader(sender stcp.IConnSender, conn net.Conn) error {
    const messageSize = 128
    buf := make([]byte, messageSize)
    if _, err := io.ReadFull(conn, buf); err != nil {
        return err // 读不足或连接关闭则退出
    }
    // 处理定长消息...
    return sender.Put2Queue(buf)
}

变长消息读取示例(前置长度头)

import (
    "encoding/binary"
    "io"
    "net"
    
    "github.com/pinealctx/neptune/stcp"
)

func varLengthReader(sender stcp.IConnSender, conn net.Conn) error {
    // 读取 4 字节长度头(大端序)
    var hdr [4]byte
    if _, err := io.ReadFull(conn, hdr[:]); err != nil {
        return err
    }
    
    length := binary.BigEndian.Uint32(hdr[:])
    if length == 0 || length > 10<<20 { // 防御:限制最大包 10MB
        return io.ErrUnexpectedEOF
    }
    
    // 读取消息体
    body := make([]byte, length)
    if _, err := io.ReadFull(conn, body); err != nil {
        return err
    }
    
    // 处理变长消息...
    return sender.Put2Queue(body)
}

心跳与读超时控制示例

import (
    "io"
    "net"
    "time"
    
    "github.com/pinealctx/neptune/stcp"
)

func heartbeatReader(sender stcp.IConnSender, conn net.Conn) error {
    const heartbeatInterval = 15 * time.Second
    const gracePeriod = 5 * time.Second
    
    // 设置读超时
    if err := conn.SetReadDeadline(time.Now().Add(heartbeatInterval + gracePeriod)); err != nil {
        return err
    }

    // 读取消息
    var hdr [4]byte
    if _, err := io.ReadFull(conn, hdr[:]); err != nil {
        return err
    }
    
    // 成功读到数据后,重置下一次的读超时
    if err := conn.SetReadDeadline(time.Now().Add(heartbeatInterval + gracePeriod)); err != nil {
        return err
    }
    
    // 继续处理消息...
    return nil
}

自定义日志元信息

在握手成功或识别到业务身份后,可设置更丰富的 MetaInfo,便于日志检索:

import (
    "github.com/pinealctx/neptune/stcp"
    "go.uber.org/zap/zapcore"
)

// 自定义MetaInfo结构
type MyMetaInfo struct {
    UserID     string
    RemoteAddr string
    SessionID  string
}

func (m *MyMetaInfo) MarshalLogObject(enc zapcore.ObjectEncoder) error {
    enc.AddString("userId", m.UserID)
    enc.AddString("remoteAddr", m.RemoteAddr)
    enc.AddString("sessionId", m.SessionID)
    return nil
}

func (m *MyMetaInfo) GetRemoteAddr() string {
    return m.RemoteAddr
}

func customMetaReader(sender stcp.IConnSender, conn net.Conn) error {
    // 例如:在握手后设置自定义MetaInfo
    customMeta := &MyMetaInfo{
        UserID:     "user123",
        RemoteAddr: conn.RemoteAddr().String(),
        SessionID:  "session456",
    }
    sender.SetMetaInfo(customMeta)
    
    // 继续处理消息...
    return nil
}

优雅关闭示例

import (
    "context"
    "os"
    "os/signal"
    "syscall"
    "time"
)

func main() {
    // 创建连接发送器工厂
    senderFactory := func(conn net.Conn) stcp.IConnSender {
        return stcp.NewQSendConnHandler(conn, 1024)
    }
    srv := stcp.NewTcpServer(cnf, connReader, senderFactory)
    
    errCh := make(chan error, 1)
    srv.Run(errCh)
    
    // 监听系统信号
    sigCh := make(chan os.Signal, 1)
    signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
    
    select {
    case err := <-errCh:
        if err != nil {
            panic(err)
        }
    case <-sigCh:
        // 优雅关闭
        if err := srv.Close(); err != nil {
            log.Printf("Server close error: %v", err)
        }
        // 等待现有连接处理完成
        time.Sleep(time.Second)
    }
}

运行时行为说明

连接处理流程

  1. 服务器在指定地址监听TCP连接
  2. 每个新连接会创建一个ConnHandler来管理
  3. ConnHandler启动两个goroutine:
    • 接收goroutine:循环调用ConnReader处理入站数据
    • 发送goroutine:从发送队列取数据并写入连接
  4. 当ConnReader返回错误时,连接被关闭并触发退出钩子

连接数限制

  • 达到MaxConn限制时,新连接会被立即关闭并记录日志
  • 连接计数通过原子操作维护,在连接建立时递增,退出时递减

错误处理与重试

  • Accept操作出错时采用指数退避策略重试
  • 可配置最大重试次数和延迟时间
  • 写操作支持超时控制,默认5秒

内存与性能

  • 发送队列支持容量限制,防止内存无限增长
  • 队列满时Put2Queue会返回错误,连接处理器应优雅退出
  • 所有关键路径都经过并发安全设计

架构特点

  • 组合式设计:避免复杂继承,各组件职责清晰
  • 工厂模式:ConnSenderFactory支持灵活的发送器创建策略
  • 接口驱动:IConnSender接口支持不同发送策略扩展
  • 插拔式架构:可以轻松替换和扩展连接发送器实现
  • 异步发送:队列化发送避免阻塞接收处理
  • 完善日志:结构化日志支持,MetaInfo可自定义
  • Hook机制:连接生命周期钩子便于监控和扩展
  • 并发安全:所有共享状态都有适当的同步保护
  • 测试友好:工厂模式便于单元测试时注入Mock实现

如需更多示例或适配特定协议,可在ConnReader内按需实现相应的读取和处理逻辑。

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func SendBytes2Conn added in v1.3.0

func SendBytes2Conn(conn net.Conn, bs []byte) error

SendBytes2Conn send bytes to connection Utility function

func SetWriteTimeout added in v1.2.12

func SetWriteTimeout(d time.Duration)

SetWriteTimeout set write timeout Utility function, it's a global setting If not set, default is 5 seconds

Types

type BasicMetaInfo added in v1.2.12

type BasicMetaInfo struct {
	RemoteAddr string
}

BasicMetaInfo basic meta info

func NewBasicMetaInfo added in v1.3.0

func NewBasicMetaInfo(conn net.Conn) *BasicMetaInfo

NewBasicMetaInfo new basic meta info

func (*BasicMetaInfo) GetRemoteAddr added in v1.3.0

func (m *BasicMetaInfo) GetRemoteAddr() string

GetRemoteAddr get remote address

func (*BasicMetaInfo) MarshalLogObject added in v1.2.12

func (m *BasicMetaInfo) MarshalLogObject(enc zapcore.ObjectEncoder) error

MarshalLogObject marshal log object

type ConnExitEvent added in v1.2.12

type ConnExitEvent func(handler IConnSender)

ConnExitEvent on connection exit

type ConnHandler added in v1.2.12

type ConnHandler struct {
	// contains filtered or unexported fields
}

ConnHandler connection handler

func NewConnRunner added in v1.3.0

func NewConnRunner(connReader ConnReaderFunc, connSender IConnSender) *ConnHandler

NewConnRunner : new connection runner

func (*ConnHandler) AddExitHook added in v1.2.12

func (x *ConnHandler) AddExitHook(hook ConnExitEvent)

AddExitHook add exit hook

func (*ConnHandler) AddStartHook added in v1.2.12

func (x *ConnHandler) AddStartHook(hook ConnStartEvent)

AddStartHook add start hook

func (*ConnHandler) Exit added in v1.3.0

func (x *ConnHandler) Exit()

Exit : exit connection handler

func (*ConnHandler) GetConnSender added in v1.3.0

func (x *ConnHandler) GetConnSender() IConnSender

GetConnSender get connection sender

func (*ConnHandler) Start added in v1.2.12

func (x *ConnHandler) Start()

Start : start connection handler

type ConnReaderFunc added in v1.3.0

type ConnReaderFunc func(handler IConnSender, conn net.Conn) error

ConnReaderFunc connection reader function

type ConnSenderFactory added in v1.3.0

type ConnSenderFactory func(conn net.Conn) IConnSender

ConnSenderFactory connection sender factory

type ConnStartEvent added in v1.2.12

type ConnStartEvent func(handler IConnSender)

ConnStartEvent on connection start

type IConnSender added in v1.3.0

type IConnSender interface {
	// Conn : get connection (required)
	Conn() net.Conn
	// LoopSend : loop to send (required)
	LoopSend()
	// SetMetaInfo set meta info (required)
	SetMetaInfo(m MetaInfo)
	// MetaInfo get meta info (required)
	MetaInfo() MetaInfo
	// Close : close connection handler
	Close() error

	// Put2Queue put bytes to send queue (optional)
	Put2Queue(bs []byte) error
	// Put2SendMap put bytes to send map (optional)
	Put2SendMap(key uint32, bs []byte) error
	// Put2SendSMap put bytes to send map (optional)
	Put2SendSMap(key string, bs []byte) error
	// Put2SendMaps put multiple key uint32 and bytes pairs to send map (optional)
	Put2SendMaps(pairs []KeyIntBytesPair) error
	// Put2SendSMaps put multiple key string and bytes pairs to send map (optional)
	Put2SendSMaps(pairs []KeyStrBytesPair) error
}

IConnSender connection handler interface all methods are goroutine safe user must implement this interface Conn/LoopSend/SetMetaInfo/MetaInfo/Close are required Put2Queue/Put2SendMap/Put2SendSMap/Put2SendMaps/Put2SendSMaps are optional, but at least one of them should be implemented, others can be no-op

type KeyIntBytesPair added in v1.3.0

type KeyIntBytesPair struct {
	Key uint32
	Val []byte
}

KeyIntBytesPair key uint32 and bytes pair

type KeyStrBytesPair added in v1.3.0

type KeyStrBytesPair struct {
	Key string
	Val []byte
}

KeyStrBytesPair key string and bytes pair

type MetaInfo added in v1.2.12

type MetaInfo interface {
	zapcore.ObjectMarshaler
	GetRemoteAddr() string
}

MetaInfo meta info for logging

type QSender added in v1.3.0

type QSender struct {
	// contains filtered or unexported fields
}

QSender queue send connection sender send queue based connection sender user can put bytes to send queue async and LoopSend will send bytes in queue one by one

func NewQSendConnHandler added in v1.3.0

func NewQSendConnHandler(conn net.Conn, sendQSize int) *QSender

NewQSendConnHandler : new queue send connection handler sendQSize : send queue size, if sendQSize is 0, the send queue has unlimited capacity

otherwise, the send queue has limited capacity.
if the send queue is full, Put2Queue will return error

func (*QSender) Close added in v1.3.0

func (x *QSender) Close() error

Close : close connection handler

func (*QSender) Conn added in v1.3.0

func (x *QSender) Conn() net.Conn

Conn : get connection (required)

func (*QSender) LoopSend added in v1.3.0

func (x *QSender) LoopSend()

LoopSend : loop to send (required)

func (*QSender) MetaInfo added in v1.3.0

func (x *QSender) MetaInfo() MetaInfo

MetaInfo get meta info (required)

func (*QSender) Put2Queue added in v1.3.0

func (x *QSender) Put2Queue(bs []byte) error

Put2Queue send bytes async(put to send queue)

func (*QSender) Put2SendMap added in v1.3.0

func (x *QSender) Put2SendMap(_ uint32, bs []byte) error

Put2SendMap put bytes to send map (not supported)

func (*QSender) Put2SendMaps added in v1.3.0

func (x *QSender) Put2SendMaps(_ []KeyIntBytesPair) error

Put2SendMaps put multiple key uint32 and bytes pairs to send map (not supported)

func (*QSender) Put2SendSMap added in v1.3.0

func (x *QSender) Put2SendSMap(_ string, bs []byte) error

Put2SendSMap put bytes to send map (not supported)

func (*QSender) Put2SendSMaps added in v1.3.0

func (x *QSender) Put2SendSMaps(_ []KeyStrBytesPair) error

Put2SendSMaps put multiple key string and bytes pairs to send map (not supported)

func (*QSender) SetMetaInfo added in v1.3.0

func (x *QSender) SetMetaInfo(m MetaInfo)

SetMetaInfo set meta info (required)

type ServerAcceptCnf added in v1.2.12

type ServerAcceptCnf struct {
	Address        string         `json:"address"`
	AcceptDelay    timex.Duration `json:"acceptDelay"`
	AcceptMaxDelay timex.Duration `json:"acceptMaxDelay"`
	AcceptMaxRetry int            `json:"acceptMaxRetry"`
	MaxConn        int32          `json:"maxConn"`
}

ServerAcceptCnf server start config

func DefaultServerAcceptCnf added in v1.2.12

func DefaultServerAcceptCnf() *ServerAcceptCnf

DefaultServerAcceptCnf : get default start cnf

type TcpServer added in v1.3.0

type TcpServer struct {
	// contains filtered or unexported fields
}

TcpServer tcp server

func NewTcpServer added in v1.3.0

func NewTcpServer(cnf *ServerAcceptCnf, connReader ConnReaderFunc, connSenderFactory ConnSenderFactory) *TcpServer

NewTcpServer : new tcp server

func (*TcpServer) Address added in v1.3.0

func (x *TcpServer) Address() string

Address : get listen address

func (*TcpServer) Close added in v1.3.0

func (x *TcpServer) Close() error

Close : close the server listener

func (*TcpServer) ConnCount added in v1.3.0

func (x *TcpServer) ConnCount() int32

ConnCount : get current connection count

func (*TcpServer) Run added in v1.3.0

func (x *TcpServer) Run(errChan chan<- error)

Run : run server errChan : error channel if server exit, the error will be sent to errChan

func (*TcpServer) SetExitHooker added in v1.3.0

func (x *TcpServer) SetExitHooker(hooker ConnExitEvent)

SetExitHooker : set connection exit hooker

func (*TcpServer) SetStartHooker added in v1.3.0

func (x *TcpServer) SetStartHooker(hooker ConnStartEvent)

SetStartHooker : set connection start hooker

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL