Documentation
¶
Index ¶
- Variables
- func ConnPropertyGet[T any](c IConnection, key string) T
- func ConnPropertyRemove(c IConnection, key string)
- func ConnPropertySet(c IConnection, key string, value any)
- func SetCustomServer(custom *CustomServer)
- type AppConf
- type BaseRouter
- type ConcurrentMap
- func (m ConcurrentMap[K, V]) Clear()
- func (m ConcurrentMap[K, V]) Count() int
- func (m ConcurrentMap[K, V]) Get(key K) (V, bool)
- func (m ConcurrentMap[K, V]) GetShard(key K) *ConcurrentMapShared[K, V]
- func (m ConcurrentMap[K, V]) Has(key K) bool
- func (m ConcurrentMap[K, V]) IsEmpty() bool
- func (m ConcurrentMap[K, V]) Items() map[K]V
- func (m ConcurrentMap[K, V]) IterBuffered() <-chan Tuple[K, V]
- func (m ConcurrentMap[K, V]) IterCb(fn IterCb[K, V])
- func (m ConcurrentMap[K, V]) Keys() []K
- func (m ConcurrentMap[K, V]) MSet(data map[K]V)
- func (m ConcurrentMap[K, V]) MarshalJSON() ([]byte, error)
- func (m ConcurrentMap[K, V]) Pop(key K) (v V, exists bool)
- func (m ConcurrentMap[K, V]) Remove(key K)
- func (m ConcurrentMap[K, V]) RemoveCb(key K, cb RemoveCb[K, V]) bool
- func (m ConcurrentMap[K, V]) Set(key K, value V)
- func (m ConcurrentMap[K, V]) SetIfAbsent(key K, value V) bool
- func (m ConcurrentMap[K, V]) UnmarshalJSON(b []byte) (err error)
- func (m ConcurrentMap[K, V]) Upsert(key K, value V, cb UpsertCb[V]) (res V)
- type ConcurrentMapShared
- type ConnectionBase
- func (c *ConnectionBase) ByteToProtocol(byte []byte, target proto.Message) error
- func (c *ConnectionBase) DoTask(task func())
- func (c *ConnectionBase) FlowControl() (b bool)
- func (c *ConnectionBase) GetConnId() int
- func (c *ConnectionBase) GetProperty() any
- func (c *ConnectionBase) IsClose() bool
- func (c *ConnectionBase) ProtocolToByte(str proto.Message) []byte
- func (c *ConnectionBase) SendMsg(msgId int32, msgData proto.Message)
- func (c *ConnectionBase) Start(readerHandler func() bool, writerHandler func(data []byte) bool)
- func (c *ConnectionBase) Stop() bool
- type ConnectionManager
- func (c *ConnectionManager) Add(conn IConnection)
- func (c *ConnectionManager) ClearConn()
- func (c *ConnectionManager) ConnOnClosed(conn IConnection)
- func (c *ConnectionManager) ConnOnOpened(conn IConnection)
- func (c *ConnectionManager) ConnRateLimiting(conn IConnection)
- func (c *ConnectionManager) Get(connId int) (IConnection, bool)
- func (c *ConnectionManager) Len() int
- func (c *ConnectionManager) NewConnId() int
- func (c *ConnectionManager) RangeConnections(handler func(conn IConnection))
- func (c *ConnectionManager) Remove(conn IConnection)
- func (c *ConnectionManager) SetConnOnClosed(connCloseCallBack func(conn IConnection))
- func (c *ConnectionManager) SetConnOnOpened(connOpenCallBack func(conn IConnection))
- func (c *ConnectionManager) SetConnOnRateLimiting(limitCallBack func(conn IConnection))
- type CustomServer
- type IConnection
- func NewConnectionHTTP(server IServer, writer http.ResponseWriter, reader *http.Request) IConnection
- func NewConnectionKCP(server *serverKCP, conn net.Conn) IConnection
- func NewConnectionTCP(server IServer, conn *net.TCPConn) IConnection
- func NewConnectionWS(server IServer, conn *websocket.Conn) IConnection
- type IDataPack
- type IErrCapture
- type IFilter
- type IMessage
- type INewMsgStructTemplate
- type IReceiveMsgHandler
- type IServer
- type Integer
- type IterCb
- type Message
- type MsgHandler
- func (m *MsgHandler) AddRouter(msgId int32, msgTemplate INewMsgStructTemplate, msgHandler IReceiveMsgHandler)
- func (m *MsgHandler) GetApis() map[int32]*BaseRouter
- func (m *MsgHandler) GetErrCapture(conn IConnection, message IMessage)
- func (m *MsgHandler) GetFilter() IFilter
- func (m *MsgHandler) SetErrCapture(fun IErrCapture)
- func (m *MsgHandler) SetFilter(fun IFilter)
- type RemoveCb
- type ServerConf
- type ServerManager
- type Stringer
- type Tuple
- type UpsertCb
Constants ¶
This section is empty.
Variables ¶
var ( ConnPropertyHttpAuthorization = "HttpAuthorization" ConnPropertyHttpReader = "HttpReader" ConnPropertyHttpWriter = "HttpWriter" )
var SHARD_COUNT = 32
Functions ¶
Types ¶
type AppConf ¶
type AppConf struct {
AppName string // 服务名称
MaxPackSize int // 数据包最大长度
MaxConn int // 最大允许连接数
WorkerPoolSize int // 工作池容量
WorkerTaskMaxLen int // 每个工作队列可执行最大任务数量
MaxMsgChanLen int // 读写通道最大限度
MaxFlowSecond int // 每秒允许的最大请求数量
ProtocolIsJson bool // 是否使用json协议
ConnRWTimeOut int // 连接读写超时时间(秒)
ServerTCP ServerConf // tcp服务
ServerWS ServerConf // websocket服务
ServerHTTP ServerConf // http服务
ServerKCP ServerConf // http服务
}
type BaseRouter ¶
type BaseRouter struct {
// contains filtered or unexported fields
}
func (*BaseRouter) GetNewMsg ¶
func (b *BaseRouter) GetNewMsg() proto.Message
func (*BaseRouter) RunHandler ¶
func (b *BaseRouter) RunHandler(conn IConnection, message proto.Message)
func (*BaseRouter) SetHandler ¶
func (b *BaseRouter) SetHandler(msgHandler IReceiveMsgHandler)
func (*BaseRouter) SetMsg ¶
func (b *BaseRouter) SetMsg(msgTemplate INewMsgStructTemplate)
type ConcurrentMap ¶
type ConcurrentMap[K comparable, V any] struct { // contains filtered or unexported fields }
A "thread" safe map of type string:Anything. To avoid lock bottlenecks this map is dived to several (SHARD_COUNT) map shards.
func NewConcurrentMap ¶
func NewConcurrentMap[V any]() ConcurrentMap[string, V]
Creates a new concurrent map.
func NewConcurrentStringer ¶
func NewConcurrentStringer[K Stringer, V any]() ConcurrentMap[K, V]
Creates a new concurrent map.
func NewWithCustomShardingFunction ¶
func NewWithCustomShardingFunction[K comparable, V any](sharding func(key K) uint32) ConcurrentMap[K, V]
Creates a new concurrent map.
func (ConcurrentMap[K, V]) Clear ¶
func (m ConcurrentMap[K, V]) Clear()
Clear removes all items from map.
func (ConcurrentMap[K, V]) Count ¶
func (m ConcurrentMap[K, V]) Count() int
Count returns the number of elements within the map.
func (ConcurrentMap[K, V]) Get ¶
func (m ConcurrentMap[K, V]) Get(key K) (V, bool)
Get retrieves an element from map under given key.
func (ConcurrentMap[K, V]) GetShard ¶
func (m ConcurrentMap[K, V]) GetShard(key K) *ConcurrentMapShared[K, V]
GetShard returns shard under given key
func (ConcurrentMap[K, V]) Has ¶
func (m ConcurrentMap[K, V]) Has(key K) bool
Has Looks up an item under specified key
func (ConcurrentMap[K, V]) IsEmpty ¶
func (m ConcurrentMap[K, V]) IsEmpty() bool
IsEmpty checks if map is empty.
func (ConcurrentMap[K, V]) Items ¶
func (m ConcurrentMap[K, V]) Items() map[K]V
Items returns all items as map[string]V
func (ConcurrentMap[K, V]) IterBuffered ¶
func (m ConcurrentMap[K, V]) IterBuffered() <-chan Tuple[K, V]
IterBuffered returns a buffered iterator which could be used in a for range loop.
func (ConcurrentMap[K, V]) IterCb ¶
func (m ConcurrentMap[K, V]) IterCb(fn IterCb[K, V])
Callback based iterator, cheapest way to read all elements in a map.
func (ConcurrentMap[K, V]) Keys ¶
func (m ConcurrentMap[K, V]) Keys() []K
Keys returns all keys as []string
func (ConcurrentMap[K, V]) MSet ¶
func (m ConcurrentMap[K, V]) MSet(data map[K]V)
func (ConcurrentMap[K, V]) MarshalJSON ¶
func (m ConcurrentMap[K, V]) MarshalJSON() ([]byte, error)
MarshalJSON Reviles ConcurrentMap "private" variables to json marshal.
func (ConcurrentMap[K, V]) Pop ¶
func (m ConcurrentMap[K, V]) Pop(key K) (v V, exists bool)
Pop removes an element from the map and returns it
func (ConcurrentMap[K, V]) Remove ¶
func (m ConcurrentMap[K, V]) Remove(key K)
Remove removes an element from the map.
func (ConcurrentMap[K, V]) RemoveCb ¶
func (m ConcurrentMap[K, V]) RemoveCb(key K, cb RemoveCb[K, V]) bool
locks the shard containing the key, retrieves its current value and calls the callback with those params If callback returns true and element exists, it will remove it from the map Returns the value returned by the callback (even if element was not present in the map)
func (ConcurrentMap[K, V]) Set ¶
func (m ConcurrentMap[K, V]) Set(key K, value V)
Set Sets the given value under the specified key.
func (ConcurrentMap[K, V]) SetIfAbsent ¶
func (m ConcurrentMap[K, V]) SetIfAbsent(key K, value V) bool
SetIfAbsent Sets the given value under the specified key if no value was associated with it.
func (ConcurrentMap[K, V]) UnmarshalJSON ¶
func (m ConcurrentMap[K, V]) UnmarshalJSON(b []byte) (err error)
UnmarshalJSON Reverse process of Marshal.
func (ConcurrentMap[K, V]) Upsert ¶
func (m ConcurrentMap[K, V]) Upsert(key K, value V, cb UpsertCb[V]) (res V)
Upsert Insert or Update - updates existing element or inserts a new one using UpsertCb
type ConcurrentMapShared ¶
type ConcurrentMapShared[K comparable, V any] struct { // contains filtered or unexported fields }
ConcurrentMapShared A "thread" safe string to anything map.
type ConnectionBase ¶
type ConnectionBase struct {
// contains filtered or unexported fields
}
func (*ConnectionBase) ByteToProtocol ¶
func (c *ConnectionBase) ByteToProtocol(byte []byte, target proto.Message) error
func (*ConnectionBase) DoTask ¶
func (c *ConnectionBase) DoTask(task func())
func (*ConnectionBase) FlowControl ¶
func (c *ConnectionBase) FlowControl() (b bool)
func (*ConnectionBase) GetConnId ¶
func (c *ConnectionBase) GetConnId() int
func (*ConnectionBase) GetProperty ¶
func (c *ConnectionBase) GetProperty() any
func (*ConnectionBase) IsClose ¶
func (c *ConnectionBase) IsClose() bool
func (*ConnectionBase) ProtocolToByte ¶
func (c *ConnectionBase) ProtocolToByte(str proto.Message) []byte
func (*ConnectionBase) SendMsg ¶
func (c *ConnectionBase) SendMsg(msgId int32, msgData proto.Message)
func (*ConnectionBase) Start ¶
func (c *ConnectionBase) Start(readerHandler func() bool, writerHandler func(data []byte) bool)
func (*ConnectionBase) Stop ¶
func (c *ConnectionBase) Stop() bool
type ConnectionManager ¶
type ConnectionManager struct {
// contains filtered or unexported fields
}
func (*ConnectionManager) Add ¶
func (c *ConnectionManager) Add(conn IConnection)
func (*ConnectionManager) ClearConn ¶
func (c *ConnectionManager) ClearConn()
func (*ConnectionManager) ConnOnClosed ¶
func (c *ConnectionManager) ConnOnClosed(conn IConnection)
func (*ConnectionManager) ConnOnOpened ¶
func (c *ConnectionManager) ConnOnOpened(conn IConnection)
func (*ConnectionManager) ConnRateLimiting ¶
func (c *ConnectionManager) ConnRateLimiting(conn IConnection)
func (*ConnectionManager) Get ¶
func (c *ConnectionManager) Get(connId int) (IConnection, bool)
func (*ConnectionManager) Len ¶
func (c *ConnectionManager) Len() int
func (*ConnectionManager) NewConnId ¶
func (c *ConnectionManager) NewConnId() int
func (*ConnectionManager) RangeConnections ¶
func (c *ConnectionManager) RangeConnections(handler func(conn IConnection))
func (*ConnectionManager) Remove ¶
func (c *ConnectionManager) Remove(conn IConnection)
func (*ConnectionManager) SetConnOnClosed ¶
func (c *ConnectionManager) SetConnOnClosed(connCloseCallBack func(conn IConnection))
func (*ConnectionManager) SetConnOnOpened ¶
func (c *ConnectionManager) SetConnOnOpened(connOpenCallBack func(conn IConnection))
func (*ConnectionManager) SetConnOnRateLimiting ¶
func (c *ConnectionManager) SetConnOnRateLimiting(limitCallBack func(conn IConnection))
type CustomServer ¶
type CustomServer struct {
AppConf *AppConf // 服务启动配置
DataPack IDataPack // 自定义编码/解码器
Message func(id int32, data []byte) IMessage // 自定消息
}
自定义服务器
type IConnection ¶
type IConnection interface {
// 启动连接(通过connmanager调用)
Start(readerHandler func() bool, writerHandler func(data []byte) bool)
// 停止连接(通过connmanager调用)
Stop() bool
// 启动接收消息协程
StartReader() bool
// 启动发送消息协程
StartWriter(data []byte) bool
// 执行任务
DoTask(task func())
// 获取当前连接Id
GetConnId() int
// 获取客户端地址信息
RemoteAddrStr() string
// 获取连接是否已关闭
IsClose() bool
// 获取连接绑定的属性列表
GetProperty() any
// 发送消息给客户端
SendMsg(msgId int32, msgData proto.Message)
// 限流控制
FlowControl() bool
// 序列化
ProtocolToByte(str proto.Message) []byte
// 反序列化
ByteToProtocol(byte []byte, target proto.Message) error
}
func NewConnectionHTTP ¶
func NewConnectionHTTP(server IServer, writer http.ResponseWriter, reader *http.Request) IConnection
func NewConnectionKCP ¶
func NewConnectionKCP(server *serverKCP, conn net.Conn) IConnection
func NewConnectionTCP ¶
func NewConnectionTCP(server IServer, conn *net.TCPConn) IConnection
func NewConnectionWS ¶
func NewConnectionWS(server IServer, conn *websocket.Conn) IConnection
type IDataPack ¶
type IDataPack interface {
// 获取消息头长度
GetHeadLen() int
// 消息封包
Pack(msg IMessage) []byte
// 消息拆包
UnPack([]byte) IMessage
}
封包拆包,通过固定的包头获取消息数据,解决TCP粘包问题
func NewDataPack ¶
func NewDataPack() IDataPack
type IErrCapture ¶
type IErrCapture func(conn IConnection, msg IMessage, panicInfo string)
type IFilter ¶
type IFilter func(conn IConnection, msg IMessage) bool
type IMessage ¶
type IMessage interface {
// 获取消息Id
GetMsgId() uint16
// 获取消息长度
GetDataLen() uint16
// 获取消息内容
GetData() []byte
// 设置消息内容
SetData([]byte)
}
定义消息模板
func NewMsgPackage ¶
type INewMsgStructTemplate ¶
type IReceiveMsgHandler ¶
type IReceiveMsgHandler func(conn IConnection, message proto.Message)
type IServer ¶
type IServer interface {
// 获取服务器名称
GetServerName() string
// 启动服务器
Start()
}
定义服务器接口
func GetServerHTTP ¶
func GetServerHTTP() IServer
func GetServerKCP ¶
func GetServerKCP() IServer
func GetServerTCP ¶
func GetServerTCP() IServer
func GetServerWS ¶
func GetServerWS() IServer
type IterCb ¶
type IterCb[K comparable, V any] func(key K, v V)
Iterator callbacalled for every key,value found in maps. RLock is held for all calls for a given shard therefore callback sess consistent view of a shard, but not across the shards
type Message ¶
type Message struct {
proto.Message `json:"-"`
Id uint16 `protobuf:"bytes,1,opt,name=msg_id,proto3" json:"msg_id"` // 消息Id
Data string `protobuf:"bytes,2,opt,name=data,proto3" json:"data"` // 消息内容
DataLen uint16 `json:"-"` // 消息长度
}
func (*Message) GetDataLen ¶
type MsgHandler ¶
type MsgHandler struct {
// contains filtered or unexported fields
}
func (*MsgHandler) AddRouter ¶
func (m *MsgHandler) AddRouter(msgId int32, msgTemplate INewMsgStructTemplate, msgHandler IReceiveMsgHandler)
func (*MsgHandler) GetApis ¶
func (m *MsgHandler) GetApis() map[int32]*BaseRouter
func (*MsgHandler) GetErrCapture ¶
func (m *MsgHandler) GetErrCapture(conn IConnection, message IMessage)
func (*MsgHandler) GetFilter ¶
func (m *MsgHandler) GetFilter() IFilter
func (*MsgHandler) SetErrCapture ¶
func (m *MsgHandler) SetErrCapture(fun IErrCapture)
func (*MsgHandler) SetFilter ¶
func (m *MsgHandler) SetFilter(fun IFilter)
type RemoveCb ¶
is a callback executed in a map.RemoveCb() call, while Lock is held If returns true, the element will be removed from the map
type ServerConf ¶
type ServerManager ¶
type ServerManager struct {
// contains filtered or unexported fields
}
func (*ServerManager) IsClose ¶
func (c *ServerManager) IsClose() bool
func (*ServerManager) RegisterServer ¶
func (c *ServerManager) RegisterServer(server ...IServer)
func (*ServerManager) Servers ¶
func (c *ServerManager) Servers() []IServer
func (*ServerManager) StopAll ¶
func (c *ServerManager) StopAll()
func (*ServerManager) WaitGroupAdd ¶
func (c *ServerManager) WaitGroupAdd(delta int)
func (*ServerManager) WaitGroupDone ¶
func (c *ServerManager) WaitGroupDone()
type Stringer ¶
type Stringer interface {
fmt.Stringer
comparable
}
type Tuple ¶
type Tuple[K comparable, V any] struct { Key K Val V }
Tuple Used by the Iter & IterBuffered functions to wrap two variables together over a channel,
