river

package module
v1.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 4, 2026 License: Apache-2.0 Imports: 18 Imported by: 0

README

River

River是一款基于Golang语言的简洁、高效、高性能的分布式微服务框架。其灵活的架构也适用于即时通讯、物联网及其他分布式应用领域。

Go Report Card GoDoc Release

目录

版本

当前版本: v1.2.3

特性

  • 分布式架构:支持高并发、高实时性,适用于游戏、即时通讯、物联网场景
  • 无回调编程模型:基于Goroutine实现,开发过程全程做到无callback回调,代码可读性更高
  • 微服务支持:完整的微服务框架,支持分布式服务注册发现
  • 多协议支持:多种网关支持, 网关层支持HTTP、TCP、WebSocket协议及自定义粘包协议
  • 灵活的RPC通信:使用NATS作为RPC通信通道,提供高效的消息传递分发机制
  • 服务治理:使用Consul实现服务注册与发现,支持服务监控和管理
  • 模块化设计:核心服务模块管理,支持灵活扩展
  • 高效数据序列化:只需使用MsgPack进行数据编码,让数据传输更简单更干净
  • 连接池优化:针对高频网络操作进行缓冲区复用优化
  • 安全特性:支持TLS加密和数据包加密

架构设计

River采用分层架构设计,主要包括以下几个核心组件:

┌─────────────────────────────────────────────────────────────┐
│                        Client Layer                         │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│  Gateway Layer (TCP/WebSocket) │  HTTP Gateway Layer        │
│                                                             │
├─────────────────────────────────────────────────────────────┤
│                    Application Layer                        │
│  ┌─────────┐  ┌─────────┐  ┌─────────┐  ┌─────────────────┐ │
│  │ Module1 │  │ Module2 │  │ Module3 │  │ Custom Modules  │ │
│  └─────────┘  └─────────┘  └─────────┘  └─────────────────┘ │
├─────────────────────────────────────────────────────────────┤
│                     Service Layer                           │
│  ┌─────────────┐  ┌─────────────┐  ┌─────────────────────┐  │
│  │ NATS Broker │  │ RPC Server  │  │ Registry (Consul)   │  │
│  └─────────────┘  └─────────────┘  └─────────────────────┘  │
└─────────────────────────────────────────────────────────────┘
核心组件
  1. 长连接网关层(Gate)

    • 支持TCP和WebSocket协议
    • 支持自定义粘包协议
    • 提供客户端连接管理和消息路由
    • 支持TLS加密和数据包加密
    • 心跳超时控制(默认60秒)
    • 可配置最大包大小、发送缓冲区等参数
    • 模块注册设计:基于工厂模式,使用统一的模块基类gatebase.GateBase,支持通过配置动态注册模块
    • 配置管理:支持通过配置键(如ws_addr, tcp_addr, tls, encrypt_key等)动态配置网关参数
  2. 短连接网关层(HAPI)

    • 提供HTTP/HTTPS API服务
    • 支持RESTful风格路由
    • 支持TLS加密
    • 可配置超时参数(读超时、写超时、空闲超时)
    • 使用Gin框架处理HTTP请求
    • 模块注册设计:基于工厂模式,使用统一的模块基类hapibase.HApiBase,支持通过配置动态注册模块
    • 配置管理:支持通过配置键(如addr, read_timeout, write_timeout, idle_timeout, encrypt_key等)动态配置网关参数
  3. 应用层(App)

    • 提供应用实例创建和管理
    • 支持模块注册和运行
    • 实现服务发现和RPC调用
    • 集成Consul服务注册与发现
    • 支持配置动态加载
    • 模块注册设计:采用工厂模式和统一的初始化流程,通过Init()方法和OnInit()回调实现模块的标准化注册和初始化
    • 配置管理:支持通过配置键(如ConfigKey, ConsulAddr等)动态配置应用参数
  4. 模块系统(Module)

    • 支持自定义业务模块
    • 提供定时器模块等基础模块(基于时间轮算法)
    • 支持模块间RPC通信
    • 模块生命周期管理
    • 模块注册设计:遵循工厂模式,所有模块继承自module.ModuleBase,通过GetType()方法标识模块类型,并在Init()方法中完成配置初始化
    • 对象复用:采用对象池模式优化资源利用,减少GC压力
  5. RPC通信(MQRPC)

    • 基于NATS的消息队列实现
    • 支持同步和异步调用
    • 支持广播调用
    • 提供服务注册和发现机制
    • 配置管理:支持通过配置键动态调整RPC参数(如超时时间、并发限制等)
  6. 服务注册与发现(Registry)

    • 基于Consul实现
    • 支持服务监控和健康检查
    • 服务自动注册与注销
    • 配置管理:支持通过配置键动态配置Consul连接参数
  7. 工具集(Tools)

    • AES加密/解密
    • ID生成
    • Base62编码
    • ID,IP工具
    • 环形Queue,安全Map等实用工具
    • 对象复用:提供对象池、缓冲区池等复用机制,提升性能

安装

环境要求
  • Go版本 >= 1.25.0
  • NATS消息队列服务
  • Consul服务注册与发现服务
  • 支持Linux、Windows、macOS等操作系统
获取代码
git clone https://github.com/cloudapex/river.git
cd river
依赖管理

River使用Go Modules进行依赖管理:

go mod tidy

主要依赖:

  • NATS (github.com/nats-io/nats.go) - RPC通信
  • Consul (github.com/hashicorp/consul/api) - 服务注册与发现
  • WebSocket (github.com/gorilla/websocket) - WebSocket支持
  • MsgPack (github.com/vmihailenco/msgpack/v5) - 高效数据序列化
  • cleanenv (github.com/ilyakaznacheev/cleanenv) - 配置解析
  • Gin (github.com/gin-gonic/gin) - HTTP网关路由
  • assert (github.com/stretchr/testify/assert) - 测试断言
  • uuid (github.com/google/uuid) - UUID生成

快速开始

1. 启动依赖服务

首先确保NATS和Consul服务已启动:

# 启动NATS服务
docker run -d --name nats -p 4222:4222 nats:latest

# 启动Consul服务
docker run -d --name consul -p 8500:8500 consul:latest
2. 配置文件

创建配置文件config.json

{
  "RpcLog": true,
  "Module": {
    "gate": [
      {
        "ID": "gate-1",
        "ProcessEnv": "dev",
        "Settings": {
          "TCPAddr": ":8091",
          "WsAddr": ":8092"
        }
      }
    ],
    "hapi": [
      {
        "ID": "hapi-1",
        "ProcessEnv": "dev",
        "Settings": {
          "Addr": ":8088"
        }
      }
    ]
  },
  "Nats": {
    "Addr": "127.0.0.1:4222",
    "MaxReconnects": 1000
  },
  "BI": {
    "file": {
      "prefix": "",
      "suffix": ".log"
    }
  },
  "Log": {
    "file": {
      "prefix": "",
      "suffix": ".log"
    }
  }
}
3. 创建应用
package main

import (
  "github.com/cloudapex/river"
  "github.com/cloudapex/river/app"
)

func main() {
  // 创建应用实例
  app := river.CreateApp(
    app.ConsulAddr("127.0.0.1:8500"),
    app.ConfigKey("/river/config"),
  )
  
  // 运行应用
  app.Run()
}
4. 创建业务模块

package main

import (
  "context"
  
  "github.com/cloudapex/river/app"
  "github.com/cloudapex/river/conf"
)

// 游戏模块示例 - 展现工厂模式和模块注册设计
type GameModule struct {
  app.ModuleBase
}

// GetType 返回模块类型,用于模块注册和识别
// 遵循工厂模式,通过类型标识创建相应模块实例
func (m *GameModule) GetType() string {
  return "game"
}

func (m *GameModule) Version() string {
  return "1.0.0"
}

// OnInit 模块初始化,通过Settings配置进行初始化
// 注意:配置key不得硬编码,应使用常量定义
func (m *GameModule) OnInit(settings *conf.ModuleSettings) {
  // 从settings中读取配置参数
  for k, v := range settings.Settings {
    switch k {
    case "some_config_key":  // 应该使用常量替代,如 constants.SettingKeySomeConfig
      // 处理配置参数
      _ = v
    }
  }
  // 模块初始化逻辑
}

func (m *GameModule) Run(closeSig chan bool) {
  // 模块运行逻辑
  <-closeSig
}

func (m *GameModule) OnDestroy() {
  // 模块销毁逻辑
}

// 注册模块到应用
func main() {
  app := river.CreateApp(/* ... */)
  gameModule := &GameModule{}
  app.Run(gameModule)
}

// 模块工厂模式示例
func CreateModuleByType(moduleType string) app.IRPCModule {
  switch moduleType {
  case "game":
    return &GameModule{}
  case "timer":
    return &TimerModule{} // 假设TimerModule存在
  default:
    return nil
  }
}
5. 网关模块配置
// TCP/WebSocket网关配置
import "github.com/cloudapex/river/gate"

opts := gate.NewOptions(
  gate.WsAddr(":3654"),
  gate.TcpAddr(":3653"),
  gate.TLS(false),
  gate.HeartOverTimer(60*time.Second),
  gate.MaxPackSize(65535),
  gate.SendPackBuffNum(100),
)

// HTTP网关配置
import "github.com/cloudapex/river/hapi"

httpOpts := hapi.NewOptions(
  hapi.Addr(":8090"),
  hapi.TLS(false),
  hapi.ReadTimeout(5*time.Second),
  hapi.WriteTimeout(10*time.Second),
  hapi.IdleTimeout(60*time.Second),
)
6. 运行应用
go run main.go

使用示例

RPC调用
// 同步调用
result, err := app.Call(context.Background(), "game@server1", "Hello", 
  func() []any { return []any{"world"} })

// 异步调用
err := app.CallNR(context.Background(), "game", "Notify", "message")

// 广播调用
app.CallBroadcast(context.Background(), "game", "Broadcast", "notice")
网关消息处理
// 发送消息给客户端
session.ToSend("topic", []byte("message"))

// 绑定用户ID
session.ToBind("user123")

// 设置会话属性
session.ToSet("key", "value")

// 关闭会话
session.ToClose()
模块间通信
// 在模块中获取其他模块实例
server, err := app.GetRouteServer("game@server1")
if err != nil {
  // 处理错误
}

// 调用远程方法
result, err := server.Call(ctx, "Method", "param1", "param2")

配置详解

网关配置参数

TCP/WebSocket网关(gate):

  • WsAddr: WebSocket监听地址 (配置键: ws_addr)
  • TcpAddr: TCP监听地址 (配置键: tcp_addr)
  • TLS: 是否启用TLS (配置键: tls)
  • CertFile: TLS证书文件路径 (配置键: tls_cert_file)
  • KeyFile: TLS私钥文件路径 (配置键: tls_key_file)
  • HeartOverTimer: 心跳超时时间(默认60秒)
  • MaxPackSize: 单个协议包最大数据量(默认65535字节)
  • SendPackBuffSize: 发送消息缓冲队列大小(默认100)
  • EncryptKey: 消息包加密密钥 (配置键: encrypt_key)

HTTP网关(hapi):

  • Addr: HTTP监听地址 (配置键: addr)
  • TLS: 是否启用HTTPS (配置键: tls)
  • CertFile: HTTPS证书文件路径 (配置键: tls_cert_file)
  • KeyFile: HTTPS私钥文件路径 (配置键: tls_key_file)
  • ReadTimeout: 读取超时时间(默认5秒)(配置键: read_timeout)
  • WriteTimeout: 写入超时时间(默认10秒)(配置键: write_timeout)
  • IdleTimeout: 空闲超时时间(默认60秒)(配置键: idle_timeout)
  • MaxHeaderBytes: 最大HTTP头部字节数(默认4KB)(配置键: max_header_bytes)
  • DebugKey: 调试密钥 (配置键: debug_key)
  • EncryptKey: 消息包加密密钥 (配置键: encrypt_key)
配置键使用规范

为确保代码的一致性和可维护性,River框架遵循以下配置键使用规范:

  1. 配置键常量化:所有配置键都应在对应的options.go文件中定义为常量,不得在代码中硬编码字符串

    • 正确做法:使用 gate.SettingKeyWSAddrhapi.SettingKeyAddr
    • 错误做法:直接使用 "ws_addr""addr" 字符串
  2. 配置键命名规范

    • 使用小写字母和下划线分隔
    • 避免使用驼峰命名法
    • 保持语义清晰且一致
  3. 配置管理:通过ModuleSettings结构体统一管理模块配置,支持动态加载和热更新

超时配置说明

River框架在多个层面提供了超时控制:

  1. 连接超时:WebSocket服务器使用HTTPTimeout参数控制HTTP层面的读写超时(默认10秒,网关中配置为12秒)
  2. 心跳超时:TCP/WebSocket网关使用HeartOverTimer参数控制心跳超时(默认60秒)
  3. HTTP超时:HTTP网关提供读、写、空闲超时配置
  4. RPC超时:通过TimeOut参数控制RPC调用超时

内置模块

River提供了多个内置模块:

  • Timer模块:提供定时器功能,基于时间轮算法实现(精度10ms,36个槽位,单圈360ms)
  • Gate模块:提供TCP/WebSocket网关服务,支持自定义协议
  • HTTP模块:提供HTTP/HTTPS API服务,支持RESTful路由
模块配置示例
{
  "rpc_log": true,
  "module": {
    "Timer": [
      {
        "id": "timer-1",
        "env": "dev"
      }
    ],
    "Gate": [
      {
        "id": "gate-1",
        "env": "dev",
        "settings": {
          "tcp_addr": ":3653",
          "ws_addr": ":3654",
          "tls": false,
          "tls_cert_file": "./cert.pem",
          "tls_key_file": "./key.pem",
          "encrypt_key": "your-encryption-key-here",
          "heart_over_timer": "60s",
          "max_pack_size": 65535,
          "send_pack_buff_num": 100
        }
      }
    ],
    "hapi": [
      {
        "id": "hapi-1",
        "env": "dev",
        "settings": {
          "addr": ":8090",
          "tls": false,
          "tls_cert_file": "./cert.pem",
          "tls_key_file": "./key.pem",
          "read_timeout": "5s",
          "write_timeout": "10s",
          "idle_timeout": "60s",
          "max_header_bytes": 4096,
          "debug_key": "debug-mode-key",
          "encrypt_key": "your-encryption-key-here"
        }
      }
    ]
  }
}

注意:配置中的键名应使用小写字母和下划线格式,遵循配置键使用规范:

  • Gate网关配置使用 ws_addr, tcp_addr, tls, tls_cert_file, tls_key_file, encrypt_key 等键
  • HAPI网关配置使用 addr, tls, read_timeout, write_timeout, idle_timeout, max_header_bytes, debug_key, encrypt_key 等键
  • 不得在代码中硬编码这些配置键,应使用对应的常量(如 gate.SettingKeyWSAddr, hapi.SettingKeyAddr 等)

技术栈

  • 语言:Golang 1.25.0+
  • RPC通信NATS
  • 服务注册发现Consul
  • 网络协议:TCP, WebSocket, HTTP/HTTPS
  • 序列化MsgPack
  • Web框架Gin
  • 配置解析cleanenv
  • 日志系统:基于Beego日志组件封装
  • 加密算法:AES CBC/GCM模式
  • 工具库:UUID,Base34, Base62, aes, IP工具等

性能优化

River针对高并发场景进行了多项优化:

  • 基于Goroutine的并发模型,避免回调地狱
  • 高效的消息序列化和反序列化
  • 连接复用和池化技术
  • 对象复用:通过对象池模式复用常用对象,减少内存分配和GC压力
  • 缓冲区池化:使用sync.Pool管理缓冲区,显著降低GC频率
  • 零拷贝技术优化数据传输
  • 时间轮算法实现高效定时器
  • 资源复用:在网关层和RPC层广泛使用连接池、会话复用等技术,提高资源利用率

贡献

欢迎提交Issue和Pull Request来帮助改进River。

许可证

River基于Apache License 2.0许可证开源。

联系方式

如有问题,请提交Issue或联系项目维护者。

相关项目

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func CreateApp

func CreateApp(opts ...app.Option) app.IApp

CreateApp 创建应用

Types

type DefaultApp

type DefaultApp struct {
	// contains filtered or unexported fields
}

DefaultApp 默认应用

func (*DefaultApp) Call

func (this *DefaultApp) Call(ctx context.Context, moduleServer, _func string, param mqrpc.ParamOption, opts ...selector.SelectOption) (result any, err error)

Call RPC调用(需要等待结果)

func (*DefaultApp) CallBroadcast

func (this *DefaultApp) CallBroadcast(ctx context.Context, moduleType, _func string, params ...any)

CallBroadcast RPC调用(群发,无需等待结果)

func (*DefaultApp) CallNR

func (this *DefaultApp) CallNR(ctx context.Context, moduleServer, _func string, params ...any) (err error)

Call RPC调用(无需等待结果)

func (*DefaultApp) Config

func (this *DefaultApp) Config() conf.Config

Config 获取启动配置

func (*DefaultApp) GetModuleInited

func (this *DefaultApp) GetModuleInited() func(module app.IModule)

GetModuleInited 获取每个模块初始化完成后回调函数

func (*DefaultApp) GetProcessEnv

func (this *DefaultApp) GetProcessEnv() string

GetProcessEnv 获取应用进程分组环境ID

func (*DefaultApp) GetRouteServer

func (this *DefaultApp) GetRouteServer(service string, opts ...selector.SelectOption) (app.IModuleServerSession, error)

GetRouteServer 获取服务实例(通过服务ID|服务类型,可设置可设置selector.WithFilter和selector.WithStrategy)

func (*DefaultApp) GetServerByID

func (this *DefaultApp) GetServerByID(serverID string) (app.IModuleServerSession, error)

GetServerByID 获取服务实例(通过服务ID(moduleType@id))

func (*DefaultApp) GetServerBySelector

func (this *DefaultApp) GetServerBySelector(moduleType string, opts ...selector.SelectOption) (app.IModuleServerSession, error)

GetServerBySelector 获取服务实例(通过服务类型(moduleType),可设置可设置selector.WithFilter和selector.WithStrategy)

func (*DefaultApp) GetServersByType

func (this *DefaultApp) GetServersByType(moduleType string) []app.IModuleServerSession

GetServersByType 获取多个服务实例(通过服务类型(moduleType))

func (*DefaultApp) OnConfigurationLoaded

func (this *DefaultApp) OnConfigurationLoaded(_func func()) error

OnConfigurationLoaded 设置应用启动配置初始化完成后回调

func (*DefaultApp) OnDestroy

func (this *DefaultApp) OnDestroy() error

OnDestroy 应用退出

func (*DefaultApp) OnInit

func (this *DefaultApp) OnInit() error

OnInit 初始化(初始化modules之前执行)

func (*DefaultApp) OnModuleInited

func (this *DefaultApp) OnModuleInited(_func func(module app.IModule)) error

OnModuleInited 设置每个模块初始化完成后回调

func (*DefaultApp) OnServiceBreak added in v1.1.6

func (this *DefaultApp) OnServiceBreak(_func func(moduleName, serverId string)) error

OnServiceBreak 设置当模块服务断开删除时回调

func (*DefaultApp) OnStartup

func (this *DefaultApp) OnStartup(_func func()) error

OnStartup 设置应用启动完成后回调

func (*DefaultApp) Options

func (this *DefaultApp) Options() app.Options

Options 获取应用选项

func (*DefaultApp) Registrar

func (this *DefaultApp) Registrar() registry.Registry

Registrar 获取服务注册对象

func (*DefaultApp) Run

func (this *DefaultApp) Run(mods ...app.IModule) error

Run 运行应用

func (*DefaultApp) SetServiceRoute

func (this *DefaultApp) SetServiceRoute(fn func(route string) string) error

SetServiceRoute 设置服务路由器(动态转换service名称)

func (*DefaultApp) Transporter

func (this *DefaultApp) Transporter() *nats.Conn

Transporter 获取消息传输对象

func (*DefaultApp) UpdateOptions

func (this *DefaultApp) UpdateOptions(opts ...app.Option) error

UpdateOptions 允许再次更新应用配置(before app.Run)

func (*DefaultApp) WorkDir

func (this *DefaultApp) WorkDir() string

WorkDir 获取进程工作目录

Directories

Path Synopsis
Package gate 长连接网关定义
Package gate 长连接网关定义
base
Package basegate handler
Package basegate handler
Package errors provides a way to return detailed information for an rpc request error.
Package errors provides a way to return detailed information for an rpc request error.
log
Package log beego日志
Package log beego日志
beego
Package logs provide a general log interface Usage:
Package logs provide a general log interface Usage:
Package basemodule BaseModule定义
Package basemodule BaseModule定义
server
Package server is an interface for a micro server
Package server is an interface for a micro server
Package network 网络代理器
Package network 网络代理器
Package registry is an interface for service discovery
Package registry is an interface for service discovery
Package selector is a way to load balance service nodes
Package selector is a way to load balance service nodes
cache
Package cache is a caching selector.
Package cache is a caching selector.
Package tools 工具箱
Package tools 工具箱
aes

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL