gate

package
v1.1.7 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 8, 2025 License: Apache-2.0 Imports: 7 Imported by: 0

Documentation

Overview

Package gate 长连接网关定义

Package gate 网关配置

Index

Constants

View Source
const (
	// RPC_CLIENT_MSG RPC处理来自客户端的消息
	RPC_CLIENT_MSG string = "RPC_CLIENT_MSG"
	// RPC_CLIENT_DISCONNECT_MSG RPC处理客户端断开连接的消息
	RPC_CLIENT_DISCONNECT_MSG string = "RPC_CLIENT_MSG"

	PACK_HEAD_TOTAL_LEN_SIZE       = 2          // 包头中这几个字节存放总pack的长度值
	PACK_HEAD_MSG_ID_LEN_SIZE      = 2          // 包头中这几个字节存放msgId的长度值
	PACK_BODY_DEFAULT_SIZE_IN_POOL = 512 * 1024 // 缓存池中定义的缓存区大小

	CONTEXT_TRANSKEY_SESSION = "session" // 定义需要RPC传输session的ContextKey
)

Variables

This section is empty.

Functions

This section is empty.

Types

type FunRecvPackHandler added in v1.1.4

type FunRecvPackHandler func(session ISession, pack *Pack) error

FunRecvPackHandler 处理接收的消息包

type FunSendMessageHook added in v1.1.4

type FunSendMessageHook func(session ISession, topic string, msg []byte) ([]byte, error)

FunSendMessageHook 给客户端下发消息拦截器

type IAgentLearner

type IAgentLearner interface {
	Connect(a IClientAgent)    //当连接建立  并且协议握手成功
	DisConnect(a IClientAgent) //当连接关闭  或者客户端主动发送DisConnect命令
}

IAgentLearner 连接代理(内部使用)

type IClientAgent added in v1.1.2

type IClientAgent interface {
	Init(impl IClientAgent, gate IGate, conn network.Conn) error
	Close()         // 主动关闭(异常关闭or主动关闭)
	OnClose() error // Run() 结束后触发回调

	Run() (err error)

	ConnTime() time.Time  // 建立连接的时间
	IsClosed() bool       // 连接状态
	IsShaked() bool       // 连接就绪(有些协议会在连接成功后要先握手)
	RecvNum() int64       // 接收消息的数量
	SendNum() int64       // 发送消息的数量
	GetSession() ISession // 管理的ClientSession

	// 发送数据
	SendPack(pack *Pack) error

	// 发送编码Pack后的数据
	OnWriteEncodingPack(pack *Pack) []byte

	// 读取数据并解码出Pack
	OnReadDecodingPack() (*Pack, error)

	GetError() error // 连接断开的错误日志
}

IClientAgent 客户端代理定义

type IDelegater

type IDelegater interface {
	GetAgent(sessionId string) (IClientAgent, error)
	GetAgentNum() int
	SessionsRange(f func(key, value any) bool)
	OnDestroy() // 退出事件,当主动关闭时释放所有的连接

	// 获取最新Session数据
	OnRpcLoad(ctx context.Context, sessionId string) (ISession, error)

	// Bind the session with the the userId.
	OnRpcBind(ctx context.Context, sessionId string, userId string) (ISession, error)

	// UnBind the session with the the userId.
	OnRpcUnBind(ctx context.Context, sessionId string) (ISession, error)

	// Upd settings map value for the session.
	OnRpcPush(ctx context.Context, sessionId string, settings map[string]string) (ISession, error)

	// Set values (one or many) for the session.
	OnRpcSet(ctx context.Context, sessionId string, key string, value string) (ISession, error)

	// Del value from the session.
	OnRpcDel(ctx context.Context, sessionId string, key string) (ISession, error)

	// Send message to the session.
	OnRpcSend(ctx context.Context, sessionId string, topic string, body []byte) (bool, error)

	// 广播消息给网关所有在连客户端
	OnRpcBroadcast(ctx context.Context, topic string, body []byte) (int64, error)

	// 检查连接是否正常
	OnRpcConnected(ctx context.Context, sessionId string) (bool, error)

	// 主动关闭连接
	OnRpcClose(ctx context.Context, sessionId string) (bool, error)
}

IDelegater session管理接口

type IGate

type IGate interface {
	app.IRPCModule

	Options() Options

	GetDelegater() IDelegater
	GetAgentLearner() IAgentLearner
	GetSessionLearner() ISessionLearner
	GetStorageHandler() StorageHandler
	GetRouteHandler() RouteHandler
	GetSendMessageHook() FunSendMessageHook
	GetRecvPackHandler() FunRecvPackHandler
}

IGate 网关代理定义

type ISession

type ISession interface {
	mqrpc.Marshaler

	GetIP() string
	GetNetwork() string
	GetSessionID() string
	GetServerID() string // gate server id
	GetUserData() any    // 网关本地的额外数据,不会再rpc中传递

	// UserID(线程安全)
	GetUserID() string
	GetUserIDUint() uint64
	SetUserID(userId string)

	Get(key string) (string, bool)
	Set(key, value string) error
	Del(key string) error
	SetSettings(settings map[string]string)
	// 合并两个map 并且以 agent.(Agent).GetSession().Settings 已有的优先
	ImportSettings(map[string]string) error
	// 遍历Settings通过回调函数遍历kv值。回调函数的返回值(true-继续遍历; false-终止迭代)
	SettingsRange(func(k, v string) bool)

	// 每次rpc调用都拷贝一份新的Session进行传输
	Clone() ISession
	// 只Clone Settings
	CloneSettings() map[string]string

	// 是否是访客(未登录)
	IsGuest() bool

	// 调用RPC方法时通过context传递
	GenRPCContext() context.Context

	// 日志追踪
	GenTraceSpan()
	GetTraceSpan() log.TraceSpan

	// update local Session(从Gate拉取最新数据)
	ToUpdate() error
	// Bind the session with the the userId.
	ToBind(userId string) error
	// UnBind the session with the the userId.
	ToUnBind() error
	// Set values (one) for the session.
	ToSet(key string, value string) error
	// Set values (many) for the session(合并已存在的,直接用参数Push)
	ToSetBatch(settings map[string]string) error
	// Push all Settings values for the session(合并已存在的,拿自己的Settings去Push).
	ToPush() error
	// Remove value from the session.
	ToDel(key string) error
	// Send message to the session.
	ToSend(topic string, body []byte) error

	// the session is connect status
	ToConnected() (bool, error)
	// close the session connect
	ToClose() error
}

ISession session代表一个客户端连接,不是线程安全的

type ISessionLearner

type ISessionLearner interface {
	OnConnect(a ISession)    //当连接建立  并且协议握手成功
	OnDisConnect(a ISession) //当连接关闭	 或者客户端主动发送DisConnect命令
}

ISessionLearner 客户端代理(业务使用)

type Option

type Option func(*Options)

Option 网关配置项

func BufSize

func BufSize(s int) Option

BufSize 单个连接网络数据缓存大小

func CertFile

func CertFile(s string) Option

CertFile TLS 证书cert文件

func ConcurrentTasks

func ConcurrentTasks(s int) Option

ConcurrentTasks 设置单个连接允许的同时并发协程数(目前没用)

func EncryptKey

func EncryptKey(s string) Option

消息包加密Key

func HeartOverTimer

func HeartOverTimer(s time.Duration) Option

HeartOverTimer 心跳超时时间

func KeyFile

func KeyFile(s string) Option

KeyFile TLS 证书key文件

func MaxPackSize

func MaxPackSize(s int) Option

MaxPackSize 单个协议包数据最大值

func OverTime

func OverTime(s time.Duration) Option

OverTime 超时时间

func SendPackBuffNum

func SendPackBuffNum(n int) Option

SendPackBuffNum 发送消息的缓冲队列数量

func ServerOpts

func ServerOpts(s []server.Option) Option

ServerOpts ServerOpts

func TCPAddr

func TCPAddr(s string) Option

TCPAddr tcp监听端口

func TLS

func TLS(s bool) Option

TLS TLS

func TcpAddr

func TcpAddr(s string) Option

TcpAddr tcp监听地址 Deprecated: 因为命名规范问题函数将废弃,请用TCPAddr代替

func Tls

func Tls(s bool) Option

Tls Tls Deprecated: 因为命名规范问题函数将废弃,请用TLS代替

func WsAddr

func WsAddr(s string) Option

WsAddr websocket监听端口

type Options

type Options struct {
	ConcurrentTasks int // 单个连接允许的同时并发协程数,控制流量(20)(目前没用)
	BufSize         int // 连接数据缓存大小(2048)
	MaxPackSize     int // 单个协议包数据最大值(65535)
	SendPackBuffNum int // 发送消息的缓冲队列(100)
	TLS             bool
	TCPAddr         string
	WsAddr          string
	CertFile        string
	KeyFile         string
	EncryptKey      string        // 消息包加密key
	OverTime        time.Duration // 建立连接超时(10s)
	HeartOverTimer  time.Duration // 心跳超时时间(本质是读取超时)(60s)

	Opts []server.Option // 用来控制Module属性的
}

Options 网关配置项

func NewOptions

func NewOptions(opts ...Option) Options

NewOptions 网关配置项

type Pack

type Pack struct {
	Topic string // "moduleTyp/msgId"
	Body  []byte
}

Pack 消息包

type RouteHandler

type RouteHandler interface {
	/**
	是否需要对本次客户端请求转发规则进行hook, 返回true 表示拦截此请求
	*/
	OnRoute(session ISession, topic string, msg []byte) (bool, error)
}

RouteHandler 路由器

type StorageHandler

type StorageHandler interface {
	/**
	存储用户的Session信息
	Session Bind Userid以后每次设置 settings都会调用一次Storage
	*/
	Storage(session ISession) (err error)
	/**
	强制删除Session信息
	*/
	Delete(session ISession) (err error)
	/**
	获取用户Session信息
	Bind Userid时会调用Query获取最新信息
	*/
	Query(Userid string) (data []byte, err error)
	/**
	用户心跳,一般用户在线时1s发送一次
	可以用来延长Session信息过期时间
	*/
	Heartbeat(session ISession)
}

StorageHandler Session信息持久化

Directories

Path Synopsis
Package basegate handler
Package basegate handler

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL