nets

package module
v1.2.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 30, 2025 License: Apache-2.0 Imports: 20 Imported by: 0

README

license issues issues contributors
watchers forks stars

NETS 简介

一个追求轻量、性能、实用、可快速上手的网络框架。采用工作池模式,已实现协程复用并且可根据并发数量自动扩容协程池。建立连接只需占用3个协程(1个读协程、1个写协程、1个协程池内的工作协程)

使用面向接口编程和组合设计模式,最大程度提高系统的灵活性、可维护性和可扩展性

TODO : 连接断开时,需要等待任务队列全部执行完毕之后再执行onClose / 丢弃等待执行的任务

现已支持:

  • 服务:
    • TCP
    • WebSocket(s)
    • UDP / KCP (🚧进行中)
  • 协议:
    • Protocol Buffer
    • JSON
  • 功能:
    • 设置连接建立时的前置
    • 设置连接断开时的后置
    • 绑定消息属性
    • 消息处理中间件
    • 自定义编码/解码器
    • 消息业务panic阻断
    • 停服时优雅关闭所有连接
    • 分组广播
    • 全服广播
    • 广播历史记录 (🚧进行中)

future:
完善消息广播功能,✅支持创建广播组、✅加入广播组、✅退出广播组、❌广播组解散 (标记不可用,记录保留)

架构图

架构图

使用说明

=> 环境配置

Golang >= 1.18

=> 快速上手
  • 一个简单的例子
// 启动TCP服务
serverTCP := network.NewServerTCP(nil)
serverTCP.Listen()

// 启动WebSocket服务
serverWS := network.NewServerWS(nil)
serverWS.Listen()

// 阻塞主进程
network.ServerWaitFlag.Wait()
  • 连接管理器 ( iface.IConnManager ) 的应用
    network.GetInstanceConnManager() 为单例模式,保持全局唯一
connManager := network.GetInstanceConnManager()

// 设置连接建立时的处理
connManager.OnConnOpen(func(conn iface.IConnection) {
    // do something ...
})

// 设置连接断开时的处理
connManager.OnConnClose(func(conn iface.IConnection) {
    // do something ...
})
  • 消息处理器 ( iface.IMsgHandler ) 的应用
msgHandler := network.GetInstanceMsgHandler()

// 添加一个路由
msgHandler.AddRouter(int32(pb.MSgID_PlayerLogin_Req), func() proto.Message { return &pb.PlayerLoginRequest{} }, func(con iface.IConnection, message proto.Message) {
    // do something ...
})

// 自定义消息过滤器。返回 true 时可正常执行,返回 false 则不会执行路由方法
msgHandler.SetFilter(func(request iface.IRequest, msgData proto.Message) bool {
    // do something ...
    return true
})

// 自定义panic捕获。保障业务逻辑不会导致服务整体崩溃
msgHandler.SetErrCapture(func(request iface.IRequest, r any) {
    // do something ...
})
  • 广播管理器 ( iface.IBroadcastManager ) 的应用
	broadcastManager := network.GetInstanceBroadcastManager()

=> Issues

致谢

许可证

⚖️Apache-2.0 license

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ConnPropertyHttpAuthorization = "HttpAuthorization"
	ConnPropertyHttpReader        = "HttpReader"
	ConnPropertyHttpWriter        = "HttpWriter"
)
View Source
var SHARD_COUNT = 32

Functions

func ConnPropertyGet

func ConnPropertyGet[T any](c IConnection, key string) T

获取连接属性

func ConnPropertyRemove

func ConnPropertyRemove(c IConnection, key string)

删除连接属性

func ConnPropertySet

func ConnPropertySet(c IConnection, key string, value any)

设置连接属性

func SetCustomServer

func SetCustomServer(custom *CustomServer)

设置自定义服务器参数

Types

type AppConf

type AppConf struct {
	AppName          string     // 服务名称
	MaxPackSize      int        // 数据包最大长度
	MaxConn          int        // 最大允许连接数
	WorkerPoolSize   int        // 工作池容量
	WorkerTaskMaxLen int        // 每个工作队列可执行最大任务数量
	MaxMsgChanLen    int        // 读写通道最大限度
	MaxFlowSecond    int        // 每秒允许的最大请求数量
	ProtocolIsJson   bool       // 是否使用json协议
	ConnRWTimeOut    int        // 连接读写超时时间(秒)
	ServerTCP        ServerConf // tcp服务
	ServerWS         ServerConf // websocket服务
	ServerHTTP       ServerConf // http服务
	ServerKCP        ServerConf // http服务
}

func GetServerConf

func GetServerConf() *AppConf

获取默认配置

type BaseRouter

type BaseRouter struct {
	// contains filtered or unexported fields
}

func (*BaseRouter) GetNewMsg

func (b *BaseRouter) GetNewMsg() proto.Message

func (*BaseRouter) RunHandler

func (b *BaseRouter) RunHandler(conn IConnection, message proto.Message)

func (*BaseRouter) SetHandler

func (b *BaseRouter) SetHandler(msgHandler IReceiveMsgHandler)

func (*BaseRouter) SetMsg

func (b *BaseRouter) SetMsg(msgTemplate INewMsgStructTemplate)

type ConcurrentMap

type ConcurrentMap[K comparable, V any] struct {
	// contains filtered or unexported fields
}

A "thread" safe map of type string:Anything. To avoid lock bottlenecks this map is dived to several (SHARD_COUNT) map shards.

func NewConcurrentMap

func NewConcurrentMap[V any]() ConcurrentMap[string, V]

Creates a new concurrent map.

func NewConcurrentStringer

func NewConcurrentStringer[K Stringer, V any]() ConcurrentMap[K, V]

Creates a new concurrent map.

func NewWithCustomShardingFunction

func NewWithCustomShardingFunction[K comparable, V any](sharding func(key K) uint32) ConcurrentMap[K, V]

Creates a new concurrent map.

func (ConcurrentMap[K, V]) Clear

func (m ConcurrentMap[K, V]) Clear()

Clear removes all items from map.

func (ConcurrentMap[K, V]) Count

func (m ConcurrentMap[K, V]) Count() int

Count returns the number of elements within the map.

func (ConcurrentMap[K, V]) Get

func (m ConcurrentMap[K, V]) Get(key K) (V, bool)

Get retrieves an element from map under given key.

func (ConcurrentMap[K, V]) GetShard

func (m ConcurrentMap[K, V]) GetShard(key K) *ConcurrentMapShared[K, V]

GetShard returns shard under given key

func (ConcurrentMap[K, V]) Has

func (m ConcurrentMap[K, V]) Has(key K) bool

Has Looks up an item under specified key

func (ConcurrentMap[K, V]) IsEmpty

func (m ConcurrentMap[K, V]) IsEmpty() bool

IsEmpty checks if map is empty.

func (ConcurrentMap[K, V]) Items

func (m ConcurrentMap[K, V]) Items() map[K]V

Items returns all items as map[string]V

func (ConcurrentMap[K, V]) IterBuffered

func (m ConcurrentMap[K, V]) IterBuffered() <-chan Tuple[K, V]

IterBuffered returns a buffered iterator which could be used in a for range loop.

func (ConcurrentMap[K, V]) IterCb

func (m ConcurrentMap[K, V]) IterCb(fn IterCb[K, V])

Callback based iterator, cheapest way to read all elements in a map.

func (ConcurrentMap[K, V]) Keys

func (m ConcurrentMap[K, V]) Keys() []K

Keys returns all keys as []string

func (ConcurrentMap[K, V]) MSet

func (m ConcurrentMap[K, V]) MSet(data map[K]V)

func (ConcurrentMap[K, V]) MarshalJSON

func (m ConcurrentMap[K, V]) MarshalJSON() ([]byte, error)

MarshalJSON Reviles ConcurrentMap "private" variables to json marshal.

func (ConcurrentMap[K, V]) Pop

func (m ConcurrentMap[K, V]) Pop(key K) (v V, exists bool)

Pop removes an element from the map and returns it

func (ConcurrentMap[K, V]) Remove

func (m ConcurrentMap[K, V]) Remove(key K)

Remove removes an element from the map.

func (ConcurrentMap[K, V]) RemoveCb

func (m ConcurrentMap[K, V]) RemoveCb(key K, cb RemoveCb[K, V]) bool

locks the shard containing the key, retrieves its current value and calls the callback with those params If callback returns true and element exists, it will remove it from the map Returns the value returned by the callback (even if element was not present in the map)

func (ConcurrentMap[K, V]) Set

func (m ConcurrentMap[K, V]) Set(key K, value V)

Set Sets the given value under the specified key.

func (ConcurrentMap[K, V]) SetIfAbsent

func (m ConcurrentMap[K, V]) SetIfAbsent(key K, value V) bool

SetIfAbsent Sets the given value under the specified key if no value was associated with it.

func (ConcurrentMap[K, V]) UnmarshalJSON

func (m ConcurrentMap[K, V]) UnmarshalJSON(b []byte) (err error)

UnmarshalJSON Reverse process of Marshal.

func (ConcurrentMap[K, V]) Upsert

func (m ConcurrentMap[K, V]) Upsert(key K, value V, cb UpsertCb[V]) (res V)

Upsert Insert or Update - updates existing element or inserts a new one using UpsertCb

type ConcurrentMapShared

type ConcurrentMapShared[K comparable, V any] struct {
	sync.RWMutex // Read Write mutex, guards access to internal map.
	// contains filtered or unexported fields
}

ConcurrentMapShared A "thread" safe string to anything map.

type ConnectionBase

type ConnectionBase struct {
	// contains filtered or unexported fields
}

func (*ConnectionBase) ByteToProtocol

func (c *ConnectionBase) ByteToProtocol(byte []byte, target proto.Message) error

func (*ConnectionBase) DoTask

func (c *ConnectionBase) DoTask(task func())

func (*ConnectionBase) FlowControl

func (c *ConnectionBase) FlowControl() (b bool)

func (*ConnectionBase) GetConnId

func (c *ConnectionBase) GetConnId() int

func (*ConnectionBase) GetProperty

func (c *ConnectionBase) GetProperty() any

func (*ConnectionBase) IsClose

func (c *ConnectionBase) IsClose() bool

func (*ConnectionBase) ProtocolToByte

func (c *ConnectionBase) ProtocolToByte(str proto.Message) []byte

func (*ConnectionBase) SendMsg

func (c *ConnectionBase) SendMsg(msgId int32, msgData proto.Message)

func (*ConnectionBase) Start

func (c *ConnectionBase) Start(readerHandler func() bool, writerHandler func(data []byte) bool)

func (*ConnectionBase) Stop

func (c *ConnectionBase) Stop() bool

type ConnectionManager

type ConnectionManager struct {
	// contains filtered or unexported fields
}

func GetInstanceConnManager

func GetInstanceConnManager() *ConnectionManager

连接管理器

func (*ConnectionManager) Add

func (c *ConnectionManager) Add(conn IConnection)

func (*ConnectionManager) ClearConn

func (c *ConnectionManager) ClearConn()

func (*ConnectionManager) ConnOnClosed

func (c *ConnectionManager) ConnOnClosed(conn IConnection)

func (*ConnectionManager) ConnOnOpened

func (c *ConnectionManager) ConnOnOpened(conn IConnection)

func (*ConnectionManager) ConnRateLimiting

func (c *ConnectionManager) ConnRateLimiting(conn IConnection)

func (*ConnectionManager) Get

func (c *ConnectionManager) Get(connId int) (IConnection, bool)

func (*ConnectionManager) Len

func (c *ConnectionManager) Len() int

func (*ConnectionManager) NewConnId

func (c *ConnectionManager) NewConnId() int

func (*ConnectionManager) RangeConnections

func (c *ConnectionManager) RangeConnections(handler func(conn IConnection))

func (*ConnectionManager) Remove

func (c *ConnectionManager) Remove(conn IConnection)

func (*ConnectionManager) SetConnOnClosed

func (c *ConnectionManager) SetConnOnClosed(connCloseCallBack func(conn IConnection))

func (*ConnectionManager) SetConnOnOpened

func (c *ConnectionManager) SetConnOnOpened(connOpenCallBack func(conn IConnection))

func (*ConnectionManager) SetConnOnRateLimiting

func (c *ConnectionManager) SetConnOnRateLimiting(limitCallBack func(conn IConnection))

type CustomServer

type CustomServer struct {
	AppConf  *AppConf                             // 服务启动配置
	DataPack IDataPack                            // 自定义编码/解码器
	Message  func(id int32, data []byte) IMessage // 自定消息

}

自定义服务器

type IConnection

type IConnection interface {
	// 启动连接(通过connmanager调用)
	Start(readerHandler func() bool, writerHandler func(data []byte) bool)
	// 停止连接(通过connmanager调用)
	Stop() bool

	// 启动接收消息协程
	StartReader() bool
	// 启动发送消息协程
	StartWriter(data []byte) bool
	// 执行任务
	DoTask(task func())

	// 获取当前连接Id
	GetConnId() int
	// 获取客户端地址信息
	RemoteAddrStr() string
	// 获取连接是否已关闭
	IsClose() bool
	// 获取连接绑定的属性列表
	GetProperty() any

	// 发送消息给客户端
	SendMsg(msgId int32, msgData proto.Message)

	// 限流控制
	FlowControl() bool

	// 序列化
	ProtocolToByte(str proto.Message) []byte
	// 反序列化
	ByteToProtocol(byte []byte, target proto.Message) error
}

func NewConnectionHTTP

func NewConnectionHTTP(server IServer, writer http.ResponseWriter, reader *http.Request) IConnection

func NewConnectionKCP

func NewConnectionKCP(server *serverKCP, conn net.Conn) IConnection

func NewConnectionTCP

func NewConnectionTCP(server IServer, conn *net.TCPConn) IConnection

func NewConnectionWS

func NewConnectionWS(server IServer, conn *websocket.Conn) IConnection

type IDataPack

type IDataPack interface {
	// 获取消息头长度
	GetHeadLen() int
	// 消息封包
	Pack(msg IMessage) []byte
	// 消息拆包
	UnPack([]byte) IMessage
}

封包拆包,通过固定的包头获取消息数据,解决TCP粘包问题

func NewDataPack

func NewDataPack() IDataPack

type IErrCapture

type IErrCapture func(conn IConnection, msg IMessage, panicInfo string)

type IFilter

type IFilter func(conn IConnection, msg IMessage) bool

type IMessage

type IMessage interface {
	// 获取消息Id
	GetMsgId() uint16
	// 获取消息长度
	GetDataLen() uint16
	// 获取消息内容
	GetData() []byte
	// 设置消息内容
	SetData([]byte)
}

定义消息模板

func NewMsgPackage

func NewMsgPackage(id int32, data []byte) IMessage

type INewMsgStructTemplate

type INewMsgStructTemplate func() proto.Message

type IReceiveMsgHandler

type IReceiveMsgHandler func(conn IConnection, message proto.Message)

type IServer

type IServer interface {
	// 获取服务器名称
	GetServerName() string
	// 启动服务器
	Start()
}

定义服务器接口

func GetServerHTTP

func GetServerHTTP() IServer

func GetServerKCP

func GetServerKCP() IServer

func GetServerTCP

func GetServerTCP() IServer

func GetServerWS

func GetServerWS() IServer

type Integer

type Integer int

func (Integer) String

func (i Integer) String() string

type IterCb

type IterCb[K comparable, V any] func(key K, v V)

Iterator callbacalled for every key,value found in maps. RLock is held for all calls for a given shard therefore callback sess consistent view of a shard, but not across the shards

type Message

type Message struct {
	proto.Message `json:"-"`
	Id            uint16 `protobuf:"bytes,1,opt,name=msg_id,proto3" json:"msg_id"` // 消息Id
	Data          string `protobuf:"bytes,2,opt,name=data,proto3" json:"data"`     // 消息内容
	DataLen       uint16 `json:"-"`                                                // 消息长度
}

func (*Message) GetData

func (m *Message) GetData() []byte

func (*Message) GetDataLen

func (m *Message) GetDataLen() uint16

func (*Message) GetMsgId

func (m *Message) GetMsgId() uint16

func (*Message) SetData

func (m *Message) SetData(bytes []byte)

type MsgHandler

type MsgHandler struct {
	// contains filtered or unexported fields
}

func GetInstanceMsgHandler

func GetInstanceMsgHandler() *MsgHandler

消息处理器

func (*MsgHandler) AddRouter

func (m *MsgHandler) AddRouter(msgId int32, msgTemplate INewMsgStructTemplate, msgHandler IReceiveMsgHandler)

func (*MsgHandler) GetApis

func (m *MsgHandler) GetApis() map[int32]*BaseRouter

func (*MsgHandler) GetErrCapture

func (m *MsgHandler) GetErrCapture(conn IConnection, message IMessage)

func (*MsgHandler) GetFilter

func (m *MsgHandler) GetFilter() IFilter

func (*MsgHandler) SetErrCapture

func (m *MsgHandler) SetErrCapture(fun IErrCapture)

func (*MsgHandler) SetFilter

func (m *MsgHandler) SetFilter(fun IFilter)

type RemoveCb

type RemoveCb[K any, V any] func(key K, v V, exists bool) bool

is a callback executed in a map.RemoveCb() call, while Lock is held If returns true, the element will be removed from the map

type ServerConf

type ServerConf struct {
	Address     string // IP地址
	Port        string // 端口
	TLSCertPath string // ssl证书路径
	TLSKeyPath  string // ssl密钥路径
}

type ServerManager

type ServerManager struct {
	// contains filtered or unexported fields
}

func GetInstanceServerManager

func GetInstanceServerManager() *ServerManager

服务管理器

func (*ServerManager) IsClose

func (c *ServerManager) IsClose() bool

func (*ServerManager) RegisterServer

func (c *ServerManager) RegisterServer(server ...IServer)

func (*ServerManager) Servers

func (c *ServerManager) Servers() []IServer

func (*ServerManager) StopAll

func (c *ServerManager) StopAll()

func (*ServerManager) WaitGroupAdd

func (c *ServerManager) WaitGroupAdd(delta int)

func (*ServerManager) WaitGroupDone

func (c *ServerManager) WaitGroupDone()

type Stringer

type Stringer interface {
	fmt.Stringer
	comparable
}

type Tuple

type Tuple[K comparable, V any] struct {
	Key K
	Val V
}

Tuple Used by the Iter & IterBuffered functions to wrap two variables together over a channel,

type UpsertCb

type UpsertCb[V any] func(exist bool, valueInMap V, newValue V) V

Callback to return new element to be inserted into the map It is called while lock is held, therefore it MUST NOT try to access other keys in same map, as it can lead to deadlock since Go sync.RWLock is not reentrant

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL