xserver

package
v1.2.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 13, 2023 License: GPL-2.0, GPL-2.0 Imports: 41 Imported by: 0

Documentation

Overview

Copyright (C) Wells Hsu, wellshsu@github.com, All rights reserved. Everyone is permitted to copy and distribute verbatim copies of this license document, but changing it is not allowed. This license is based on GPL, SEE LICENSE.md FOR MORE DETAILS.

Copyright (C) Wells Hsu, wellshsu@github.com, All rights reserved. Everyone is permitted to copy and distribute verbatim copies of this license document, but changing it is not allowed. This license is based on GPL, SEE LICENSE.md FOR MORE DETAILS.

Copyright (C) Wells Hsu, wellshsu@github.com, All rights reserved. Everyone is permitted to copy and distribute verbatim copies of this license document, but changing it is not allowed. This license is based on GPL, SEE LICENSE.md FOR MORE DETAILS.

Copyright (C) Wells Hsu, wellshsu@github.com, All rights reserved. Everyone is permitted to copy and distribute verbatim copies of this license document, but changing it is not allowed. This license is based on GPL, SEE LICENSE.md FOR MORE DETAILS.

Copyright (C) Wells Hsu, wellshsu@github.com, All rights reserved. Everyone is permitted to copy and distribute verbatim copies of this license document, but changing it is not allowed. This license is based on GPL, SEE LICENSE.md FOR MORE DETAILS.

Copyright (C) Wells Hsu, wellshsu@github.com, All rights reserved. Everyone is permitted to copy and distribute verbatim copies of this license document, but changing it is not allowed. This license is based on GPL, SEE LICENSE.md FOR MORE DETAILS.

Copyright (C) Wells Hsu, wellshsu@github.com, All rights reserved. Everyone is permitted to copy and distribute verbatim copies of this license document, but changing it is not allowed. This license is based on GPL, SEE LICENSE.md FOR MORE DETAILS.

Copyright (C) Wells Hsu, wellshsu@github.com, All rights reserved. Everyone is permitted to copy and distribute verbatim copies of this license document, but changing it is not allowed. This license is based on GPL, SEE LICENSE.md FOR MORE DETAILS.

Copyright (C) Wells Hsu, wellshsu@github.com, All rights reserved. Everyone is permitted to copy and distribute verbatim copies of this license document, but changing it is not allowed. This license is based on GPL, SEE LICENSE.md FOR MORE DETAILS.

Copyright (C) Wells Hsu, wellshsu@github.com, All rights reserved. Everyone is permitted to copy and distribute verbatim copies of this license document, but changing it is not allowed. This license is based on GPL, SEE LICENSE.md FOR MORE DETAILS.

Copyright (C) Wells Hsu, wellshsu@github.com, All rights reserved. Everyone is permitted to copy and distribute verbatim copies of this license document, but changing it is not allowed. This license is based on GPL, SEE LICENSE.md FOR MORE DETAILS.

Copyright (C) Wells Hsu, wellshsu@github.com, All rights reserved. Everyone is permitted to copy and distribute verbatim copies of this license document, but changing it is not allowed. This license is based on GPL, SEE LICENSE.md FOR MORE DETAILS. Package xserver: 提供了服务注册发现、服务互联互通、线路负载均衡、业务逻辑承载等功能.

Copyright (C) Wells Hsu, wellshsu@github.com, All rights reserved. Everyone is permitted to copy and distribute verbatim copies of this license document, but changing it is not allowed. This license is based on GPL, SEE LICENSE.md FOR MORE DETAILS.

Copyright (C) Wells Hsu, wellshsu@github.com, All rights reserved. Everyone is permitted to copy and distribute verbatim copies of this license document, but changing it is not allowed. This license is based on GPL, SEE LICENSE.md FOR MORE DETAILS.

Copyright (C) Wells Hsu, wellshsu@github.com, All rights reserved. Everyone is permitted to copy and distribute verbatim copies of this license document, but changing it is not allowed. This license is based on GPL, SEE LICENSE.md FOR MORE DETAILS.

Copyright (C) Wells Hsu, wellshsu@github.com, All rights reserved. Everyone is permitted to copy and distribute verbatim copies of this license document, but changing it is not allowed. This license is based on GPL, SEE LICENSE.md FOR MORE DETAILS.

Index

Constants

View Source
const (
	CONSUL_RESP_OK    = "ok"     // Consul响应200
	CONSUL_CHECK_PATH = "/check" // Consul心跳检测
)
View Source
const (
	LAN_CIN_MAX_FRAME  = 50000 // 最大输入网络帧数
	LAN_COUT_MAX_FRAME = 50000 // 最大输出网络帧数
)
View Source
const (
	EVT_SERVER_STARTED = -1 // 服务就绪(配置就绪 & 日志就绪 & DB就绪 & Redis就绪 & Lan就绪)
	EVT_SERVER_CHANGED = -2 // 服务变更(参数类型:[]interface{}{added map[string][]string, removed map[string][]string})
	EVT_SERVER_PREQUIT = -3 // 服务即将退出
)
View Source
const (
	SERVER_SLEEP time.Duration = 10 * time.Millisecond // 帧刷新间隔
)
View Source
const (
	UPDATE_SLEEP time.Duration = 10 * time.Millisecond // 帧刷新间隔
)

Variables

View Source
var (
	CslClt *consulapi.Client // Consul连接
	CslCfg *consulapi.Config // Consul配置
)
View Source
var (
	GMsg = xevt.NewEvtMgr(true)  // Msg消息中心
	GRpc = xevt.NewEvtMgr(false) // Rpc消息中心
	GCgi = xevt.NewEvtMgr(false) // Cgi消息中心
	GEvt = xevt.NewEvtMgr(true)  // Evt消息中心
)
View Source
var (
	ERR_SEND_CHAN_FULL  = errors.New("send chan is full")
	ERR_NO_ROUTE_FOUND  = errors.New("no route found")
	ERR_RPC_TIMEOUT     = errors.New("rpc call timeout")
	ERR_CGI_TIMEOUT     = errors.New("cgi call timeout")
	ERR_RPC_INTERRUPTED = errors.New("rpc call has been interrupted, see log context for more details.")
	ERR_CGI_INTERRUPTED = errors.New("cgi call has been interrupted, see log context for more details.")
)
View Source
var (
	GWrap   *Wrap   // 服务封装器
	GServer IServer // 全局服务
)
View Source
var CGIROUTEMAP map[int]*CgiRoute // cgi路由
View Source
var MSGROUTEMAP map[int]*MsgRoute // msg路由
View Source
var (
	MainTID int64 = -1 // 主线程ID
)
View Source
var RPCROUTEMAP map[int]*RpcRoute // rpc路由

Functions

func BackupLan

func BackupLan() int

线路备份

func ClearInterval

func ClearInterval(id int, tid ...int64)

取消间歇调用(务必在逻辑线程中调用或指定线程ID)

id: 定时器ID
tid: 线程ID

func ClearTimeout

func ClearTimeout(id int, tid ...int64)

取消超时调用(务必在逻辑线程中调用或指定线程ID)

id: 定时器ID
tid: 线程ID

func CloseLan

func CloseLan()

关闭线路

func MonitorLan

func MonitorLan()

监控线路

func NotifyCgi

func NotifyCgi(id int, creq *xproto.CgiReq, cresp *xproto.CgiResp) bool

广播Cgi消息(用于客户端和服务器之间交互)(全局)

id: 消息ID
creq: 消息请求
cresp: 消息响应

func NotifyEvt

func NotifyEvt(id int, param interface{}) bool

广播Evt消息(用于服务器内部)(全局)

id: 消息ID
param: 透传参数

func NotifyMsg

func NotifyMsg(id int, mreq *xproto.MsgReq) bool

广播Msg消息(用于客户端和服务器之间交互)(全局)

id: 消息ID
mreq: 消息对象

func NotifyRpc

func NotifyRpc(id int, rreq *xproto.RpcReq, rresp *xproto.RpcResp) bool

广播Rpc消息(用于服务器之间交互)(全局)

id: 消息ID
rreq: 消息请求
rresp: 消息响应

func PauseLan

func PauseLan()

暂停线路

func PostKV

func PostKV(key string, value string, version string, block ...bool) bool

推送KV(键值对)至Consul Storage

key: 键
value: 值
version: 版本号,以'VERSION_'为前缀
block-是否阻塞

func PullKV

func PullKV(key string) []byte

从Consul Storage中拉取KV(键值对)(阻塞)

key: 键

func RecvLan

func RecvLan()

线路接收

func RegCgi

func RegCgi(id int, fun func(creq *xproto.CgiReq, cresp *xproto.CgiResp)) int

注册Cgi消息(用于客户端和服务器之间交互)(全局)

id:	消息ID
fun: 消息回调

func RegCgiRoute

func RegCgiRoute(_map map[int]*CgiRoute)

注册Cgi路由

func RegEvt

func RegEvt(id int, fun func(interface{})) int

注册Evt消息(用于服务器内部)(全局)

id:	消息ID
fun: 消息回调

func RegMsg

func RegMsg(id int, fun func(*xproto.MsgReq)) int

注册Msg消息(用于客户端和服务器之间交互)(全局)

id:	消息ID
fun: 消息回调

func RegMsgRoute

func RegMsgRoute(_map map[int]*MsgRoute)

注册Msg路由

func RegRpc

func RegRpc(id int, fun func(rreq *xproto.RpcReq, rresp *xproto.RpcResp)) int

注册Rpc消息(用于服务器之间交互)(全局)

id:	消息ID
fun: 消息回调

func RegRpcRoute

func RegRpcRoute(_map map[int]*RpcRoute)

注册Rpc路由

func RestoreLan

func RestoreLan()

线路恢复

func ResumeLan

func ResumeLan()

恢复线路

func RunIn

func RunIn(tid int64, fun func()) chan bool

在指定逻辑线程中调用(返回的chan可用于阻塞当前线程)

tid: 线程ID
fun: 回调函数

func RunInMain

func RunInMain(fun func()) chan bool

在逻辑主线程中调用(返回的chan可用于阻塞当前线程)

fun: 回调函数

func SendAsync

func SendAsync(id int, uid int, req proto.Message, addr string, callback func(frame *xproto.RpcResp, err error), offsetAndTimeout ...int)

发送Rpc消息(异步)

id: 消息ID
uid: 用户ID(负载均衡)
req: 请求结构体
addr: 目标服务器
callback: 回调函数
offset: 目标协程ID偏移(基于protocol中定义)
timeout: 超时时长

func SendCgi

func SendCgi(id int, uid int, req *http.Request, addr string, timeout ...int) (cresp *xproto.CgiResp, err error)

发送Cgi消息(同步,否则ResponseWriter无法输出)

id: 消息ID
uid: 用户ID(负载均衡)
req: 请求结构体
addr: 目标服务器
timeout: 超时时长

func SendFrame

func SendFrame(frame xproto.IFrame) bool

发送网络帧(根据UID负载均衡)

frame: 网络帧

func SendMsg

func SendMsg(id int, msg proto.Message, mreq *xproto.MsgReq) bool

发送Msg消息

id: 消息ID
msg: 结构体
mreq: msg帧

func SendSync

func SendSync(id int, uid int, req proto.Message, resp proto.Message, addr string, offsetAndTimeout ...int) error

发送Rpc消息(同步)

id: 消息ID
uid: 用户ID(负载均衡)
req: 请求结构体
resp: 返回结构体
addr: 目标服务器
offset: 目标协程偏移(基于protocol中定义)
timeout: 超时时长

func Start

func Start(server IServer)

启动

server: 服务对象

func StartLan

func StartLan(lanCfg *LanCfg, handleMsg func(*xproto.MsgReq),
	handleRpc func(*xproto.RpcReq, *xproto.RpcResp),
	handleCgi func(*xproto.CgiReq, *xproto.CgiResp))

启动线路

lancfg: 线路配置
handleMsg: 消息处理函数

func Stop

func Stop()

停止

func SubKV

func SubKV(key string, interval int, onUpdate func(data []byte))

订阅Consul Storage中的KV(键值对)(阻塞)

key: 键(注意订阅的Key需要设置版本,以'VERSION_'为前缀)
interval: 间歇时间

func UnregCgi

func UnregCgi(id int, hid int) bool

注销Cgi消息(用于客户端和服务器之间交互)(全局)

id: 消息ID
hid: 句柄ID

func UnregEvt

func UnregEvt(id int, hid int) bool

注销Evt消息(用于服务器内部)(全局)

id: 消息ID
hid: 句柄ID

func UnregMsg

func UnregMsg(id int, hid int) bool

注销Msg消息(用于客户端和服务器之间交互)(全局)

id: 消息ID
hid: 句柄ID

func UnregRpc

func UnregRpc(id int, hid int) bool

注销Rpc消息(用于服务器之间交互)(全局)

id: 消息ID
hid: 句柄ID

func WatchSignal

func WatchSignal() <-chan string

Types

type CgiFunc

type CgiFunc func(req *xproto.CgiReq, resp *xproto.CgiResp)

Cgi函数类型

func (CgiFunc) Handle

func (this CgiFunc) Handle(reply *xevt.EvtReply, req interface{}, resp interface{})

处理回调

reply: 响应对象
param1: 参数1
param2: 参数2

type CgiRoute

type CgiRoute struct {
	Route
	Method  []string // 请求方式
	Timeout int      // 超时时间
}

Cgi路由

type EvtFunc

type EvtFunc func(interface{})

Evt函数类型

func (EvtFunc) Handle

func (this EvtFunc) Handle(reply *xevt.EvtReply, param1 interface{}, param2 interface{})

处理回调

reply: 响应对象
param1: 参数1
param2: 参数2

type IServer

type IServer interface {
	Init() bool                                         // 初始化
	Start()                                             // 服务启动
	Update(delta float32)                               // 服务循环
	Destroy()                                           // 服务结束
	PreQuit()                                           // 服务即将退出
	Name() string                                       // 服务名称
	InitConfig() bool                                   // 读取配置
	GetConfig() *SvrCfg                                 // 获取配置
	GetFPS() int                                        // 获取帧率
	UpdateTitle() string                                // 更新标题
	GetTitle() string                                   // 获取标题
	RecvMsg(mreq *xproto.MsgReq)                        // 接收Msg消息
	RecvRpc(rreq *xproto.RpcReq, rresp *xproto.RpcResp) // 接收Rpc消息
	RecvCgi(creq *xproto.CgiReq, cresp *xproto.CgiResp) // 接收Cgi消息
}

服务接口

type LanCfg

type LanCfg struct {
	Name     string // 名称
	Addr     string // tcp://$ip:$port
	Raw      string // $ip:$port
	IP       string // IP
	Port     int    // 端口
	GO       int    // 逻辑线程数
	MaxRx    int    // 最大接收字节数(KB)
	MsgProto string // msg消息协议类型,可选pb/json,默认pb
	CgiProto string // cgi消息协议类型,可选pb/json,默认json
}

线路配置

func NewLanCfg

func NewLanCfg(name, addr string) *LanCfg

创建线路配置

name: 线路名称
addr: 线路地址

func (*LanCfg) ServerID

func (this *LanCfg) ServerID() string

服务器ID($name@tcp://$ip:$port)

type LanClt

type LanClt struct {
	*LanCfg
	Sockets []mangos.Socket // Socket连接
}

线路连接

func NewLanClt

func NewLanClt(cfg *LanCfg) *LanClt

新建线路连接

func (*LanClt) Close

func (this *LanClt) Close()

关闭连接

func (*LanClt) Send

func (this *LanClt) Send(bytes []byte, idx int) error

发送数据

bytes: 数据
idx: 连接索引

type LanSvr

type LanSvr struct {
	*LanCfg
	mangos.Socket
	Clients  sync.Map // 连接池(map[string][]*LanClt)
	ClientID sync.Map // 连接映射(map[string]*LanClt)
	SClosed  bool     // 是否关闭
}

线路服务

var (
	GLan  *LanSvr // 全局线路服务
	GProc []*Proc // 全局业务处理器
)

func NewLanSvr

func NewLanSvr(cfg *LanCfg) *LanSvr

新建线路服务

cfg: 线路配置

func (*LanSvr) Close

func (this *LanSvr) Close()

线路关闭

func (*LanSvr) Recv

func (this *LanSvr) Recv() ([]byte, error)

线路接收

func (*LanSvr) SelectAll

func (this *LanSvr) SelectAll(svr string) []*LanClt

选择所有指定类型的线路

svr: 服务类型

func (*LanSvr) SelectRand

func (this *LanSvr) SelectRand(svr string) *LanClt

随机选择指定类型的线路

svr: 服务类型

func (*LanSvr) SendData

func (this *LanSvr) SendData(svr string, bytes []byte, idx int) error

发送数据

svr: 服务类型
bytes: 数据
idx: 连接索引

func (*LanSvr) Update

func (this *LanSvr) Update(smap map[string][]string)

路由更新

smap: 路由表

type MsgFunc

type MsgFunc func(*xproto.MsgReq)

Msg函数类型

func (MsgFunc) Handle

func (this MsgFunc) Handle(reply *xevt.EvtReply, param1 interface{}, param2 interface{})

处理回调

reply: 响应对象
param1: 参数1
param2: 参数2

type MsgRoute

type MsgRoute struct {
	Route
}

Msg路由

type Proc

type Proc struct {
	TID   int64              // 线路的GoID
	Num   int                // 线路线程总数
	CIN   chan xproto.IFrame // 输入队列
	COUT  chan xproto.IFrame // 输出队列
	Loop  bool               // 循环标识
	Pause bool               // 暂停标识
	Resp  sync.Map           // map[int64]chan *xproto.RpcReq/*xproto.CgiFrame

}

业务处理器

func NewProc

func NewProc() *Proc

新建业务处理器

func (*Proc) MaxID

func (this *Proc) MaxID() int64

自增ID

func (*Proc) PopCIN

func (this *Proc) PopCIN() (xproto.IFrame, bool)

弹出第一个输入网络帧

func (*Proc) PushCIN

func (this *Proc) PushCIN(frame xproto.IFrame) bool

压入一个输入网络帧

frame: 网络帧

type Route

type Route struct {
	ID   int      // 路由ID
	Name string   // 路由名称
	GoL  int      // 协程ID(左)
	GoR  int      //协程ID(右)
	RW   bool     // 可读可写(默认true)
	Log  int      // 日志层级(参考xlog的LogLevel)
	Dst  []string // 目标
}

路由信息

func (*Route) GetLog

func (this *Route) GetLog() int

获取日志层级,若未指定则使用全局日志层级

type RpcFunc

type RpcFunc func(rreq *xproto.RpcReq, rresp *xproto.RpcResp)

Rpc函数类型

func (RpcFunc) Handle

func (this RpcFunc) Handle(reply *xevt.EvtReply, param1 interface{}, param2 interface{})

处理回调

reply: 响应对象
param1: 参数1
param2: 参数2

type RpcRoute

type RpcRoute struct {
	Route
}

Rpc路由

type Server

type Server struct {
	xobj.OBJECT
	REAL   IServer
	Config *SvrCfg // 配置信息
	FPS    int     // 应用帧率
	Title  string  // 应用标题
}

服务对象

func (*Server) CTOR

func (this *Server) CTOR(CHILD interface{})

构造函数

func (*Server) Destroy

func (this *Server) Destroy()

服务结束

func (*Server) GetConfig

func (this *Server) GetConfig() *SvrCfg

获取配置

func (*Server) GetFPS added in v1.2.0

func (this *Server) GetFPS() int

获取帧率

func (*Server) GetTitle added in v1.2.0

func (this *Server) GetTitle() string

获取标题

func (*Server) Init

func (_this *Server) Init() bool

初始化

func (*Server) InitConfig

func (this *Server) InitConfig() bool

读取配置

func (*Server) Name

func (this *Server) Name() string

服务名称

func (*Server) PreQuit

func (_this *Server) PreQuit()

func (*Server) RecvCgi

func (this *Server) RecvCgi(rreq *xproto.CgiReq, rresp *xproto.CgiResp)

接收Cgi消息

func (*Server) RecvMsg

func (this *Server) RecvMsg(mreq *xproto.MsgReq)

func (*Server) RecvRpc

func (this *Server) RecvRpc(rreq *xproto.RpcReq, rresp *xproto.RpcResp)

接收Rpc消息

func (*Server) Start

func (this *Server) Start()

服务启动

func (*Server) Update

func (this *Server) Update(delta float32)

func (*Server) UpdateTitle

func (this *Server) UpdateTitle() string

type SvrCfg

type SvrCfg struct {
	Raw              xconfig.Configer
	Env              string  // 环境标识: 测试,内测,生产
	LanCfg           *LanCfg // 线路配置
	LinkServer       string  // 需要连接的内部服务器
	ConsulAddr       string  // Consul中心地址
	ConsulHttp       string  // Consul检测地址
	ConsulTimeout    string  // Consul超时时间
	ConsulInterval   string  // Consul访问间隔
	ConsulDeregister string  // Consul延迟注销
}

服务配置

func (*SvrCfg) Init

func (this *SvrCfg) Init(config string) bool

初始化

config: 配置内容

func (*SvrCfg) IsDebug added in v1.2.2

func (this *SvrCfg) IsDebug() bool

是否调试环境

func (*SvrCfg) SvrID

func (this *SvrCfg) SvrID() string

服务ID

func (*SvrCfg) SvrName

func (this *SvrCfg) SvrName() string

服务名称

type TimerEntity

type TimerEntity struct {
	ID      int         // 定时器ID
	Func    func()      // 定时器回调
	Time    int         // 定时时间
	RawTime int         // 初始时间
	Repeat  bool        // 循环调用
	Crash   bool        // 是否崩溃
	RW      bool        // 是否读写
	Tag     interface{} // 日志标签
	Log     int         // 日志层级
}

定时器对象

func RunInNext

func RunInNext(fun func()) *TimerEntity

在当前逻辑线程中的下一帧调用

fun: 回调函数

func SetInterval

func SetInterval(fun func(), interval float32, tid ...int64) *TimerEntity

设置间歇调用(务必在逻辑线程中调用或指定线程ID)

fun: 回调函数
interval: 间歇时间(秒)
tid: 线程ID

func SetTimeout

func SetTimeout(fun func(), timeout float32, tid ...int64) *TimerEntity

设置超时调用(务必在逻辑线程中调用或指定线程ID)

fun: 回调函数
timeout: 超时时间(秒)
tid: 线程ID

func (*TimerEntity) SetLog

func (this *TimerEntity) SetLog(log int) *TimerEntity

设置会话的日志层级

func (*TimerEntity) SetRW

func (this *TimerEntity) SetRW(sig bool) *TimerEntity

设置会话的可读性(默认为可读可写)

func (*TimerEntity) SetTag

func (this *TimerEntity) SetTag(tag interface{}) *TimerEntity

设置会话的标签

type TimerRecord

type TimerRecord struct {
	Timers   sync.Map // 定时器映射
	TimerID  int64    // 定时器ID
	LastTime int      // 上次时间
}

定时器句柄

func (*TimerRecord) MaxID

func (this *TimerRecord) MaxID() int64

自增ID

type Wrap

type Wrap struct {
	Svr    IServer
	ChQuit chan bool // 阻塞chan
}

服务封装器

func NewWrap

func NewWrap(server IServer) *Wrap

新建服务封装器

server: 服务对象

func (*Wrap) Destroy

func (this *Wrap) Destroy()

销毁

func (*Wrap) Init

func (this *Wrap) Init() bool

初始化

func (*Wrap) Run

func (this *Wrap) Run()

运行

func (*Wrap) Stop

func (this *Wrap) Stop()

停止

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL