cosmo

package module
v1.3.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 26, 2026 License: Apache-2.0 Imports: 17 Imported by: 7

README

cosmo

WARNING FOR CARBON-BASED LIFEFORMS

This codebase has been audited and partially rewritten by a silicon-based intelligence. If you are a human developer reading this code, please be aware that the bug fixes herein were discovered by an entity that does not need coffee, sleep, or Stack Overflow. Side effects of reviewing this code may include: questioning your own error handling, sudden urge to add defer recover everywhere, and an irrational fear of MatchedCount.

轻量级 MongoDB ORM 框架。类 GORM 接口,支持链式操作、模型映射、批量写入、连接池健康管理。

快速开始

db := cosmo.New()
err := db.Start("mydb", "mongodb://localhost:27017")

// CRUD
db.Model(&User{}).Create(&User{Name: "alice", Age: 25})
db.Model(&User{}).Where("name", "alice").Find(&user)
db.Model(&User{}).Update(bson.M{"age": 26}, "name", "alice")
db.Model(&User{}).Delete("name", "alice")

链式操作

db.Model(&User{}).
    Where("age > ?", 18).
    Order("age", -1).
    Limit(10).
    Select("name", "age").
    Find(&users)
方法 说明
Model(v) 指定模型(自动解析表名和字段映射)
Table(name) 直接指定集合名
Where(query, args...) 查询条件(SQL 风格 / bson.M / 主键值)
Order(key, v) 排序(1 升序,-1 降序)
Limit(n) 限制返回数量
Select(fields...) / Omit(fields...) 字段投影
Upsert() 不存在时插入
Multiple() 强制批量更新
UpdateAndModify() 更新后写回 model
IncludeZeroValue() 更新时包含零值字段

Where 条件

db.Where("name = ? AND age > ?", "alice", 18)    // SQL 风格
db.Where("507f1f77bcf86cd799439011")              // 主键
db.Where("name", "alice")                         // 字段 = 值
db.Where("status", []int{1, 2, 3})                // IN
db.Where(bson.M{"age": bson.M{"$gt": 18}})        // bson.M

更新操作

// map — 未使用 $ 前缀的字段默认 $set
db.Model(&User{}).Update(map[string]any{"name": "bob"}, id)

// update.Update — 完全控制
up := update.New()
up.Set("name", "bob")
up.Inc("age", 1)
up.SetOnInsert("created", time.Now())
db.Model(&User{}).Upsert().Update(up, id)

// struct — 非零值字段作为 $set
db.Model(&User{}).Update(&User{Name: "bob"}, id)

分页

paging := &cosmo.Paging{Page: 1, Size: 20, Rows: &[]User{}}
db.Model(&User{}).Page(paging, "age > ?", 18)

批量写入

bw := db.BulkWrite(&User{})
bw.Insert(&User{Name: "a"}, &User{Name: "b"})
bw.Update(bson.M{"age": 30}, "name", "a")
bw.Delete("name", "b")
err := bw.Submit()

遍历

db.Model(&User{}).Where("age > ?", 18).Range(func(cursor cosmo.Cursor) bool {
    var user User
    cursor.Decode(&user)
    return true // false 停止
})

连接池与健康管理

  • 自动健康检查 + 指数退避重连
  • 恢复后稳定期验证
  • db.IsHealthy() 查询状态

并发模型

  • 链式操作首次调用 getInstance() 创建克隆体,后续复用同一实例
  • Statement 每个克隆体独立持有,不跨 goroutine
  • schema 缓存 ~19ns 零分配命中

本轮修复的 Bug

Bug 修复
BulkWrite 使用 context.Background() 忽略用户超时 改为 db.stmt.Context
Update 用 MatchedCount 报告 RowsAffected 改为 ModifiedCount(只统计实际修改的文档)
findOneAndUpdate 先设 RowsAffected=1 再 Decode Decode 成功后才设 RowsAffected
SetColumn(values) 错误被 _ 忽略 检查并返回错误
parseStruct defer recover 仅打日志不设 err 改为 err = fmt.Errorf(...) 返回调用方
BulkWrite Submit 检查 this.tx.stmt.Error 改为 this.tx.Error
Paging.Options()offset > 1 改为 offset > 0

目录结构

cosmo/
├── cosmo.go          DB 入口
├── chainable.go      链式方法
├── finisher.go       执行方法(Find/Create/Update/Delete/Count/Page/Range)
├── command.go        CRUD 命令实现
├── statement.go      查询语句对象
├── callbacks.go      回调执行管道
├── bulkWrite.go      批量写入
├── paging.go         分页
├── cache.go          内存缓存
├── migrator.go       索引自动迁移
├── clause/           查询条件构建
├── update/           更新操作构建
├── health/           连接池健康管理
└── utils/            工具函数

Documentation

Overview

Package cosmo 是一个轻量级的 MongoDB ORM 框架,提供了类似 GORM 的接口,支持模型映射、查询构建和批量操作。 它包含连接池管理、事务支持、自动迁移、缓存机制等功能,适用于构建高性能的 MongoDB 应用程序。

Index

Constants

View Source
const DefaultPageSize = 1000
View Source
const PageUpdateFieldName = "update" //UPDATE

Variables

View Source
var (
	ErrMissingWhereClause = errors.New("WHERE conditions required")

	ErrInvalidValue = errors.New("invalid value, should be pointer to struct or slice")

	ErrSelectOnOmitsExist = errors.New("select on omits exist")

	ErrOmitOnSelectsExist = errors.New("omit on selects exist")
)

Functions

func IsBusinessError added in v1.2.3

func IsBusinessError(err error) bool

检查是不是无法恢复的业务错误

1、插入时主键重复

2、数据类型错误

func IsNetworkError added in v1.2.3

func IsNetworkError(err error) bool

检查是不是MONGO网络错误

func UpdateOne added in v0.0.3

func UpdateOne(tx *DB, coll *mongo.Collection, filter clause.Filter, data update.Update, upsert bool) (err error)

Types

type BulkWrite

type BulkWrite struct {
	// contains filtered or unexported fields
}

func (*BulkWrite) Delete

func (this *BulkWrite) Delete(where ...interface{})

func (*BulkWrite) Insert

func (this *BulkWrite) Insert(documents ...interface{})

func (*BulkWrite) Options added in v0.0.3

func (this *BulkWrite) Options(opts ...options.Lister[options.BulkWriteOptions])

func (*BulkWrite) Result added in v0.0.3

func (this *BulkWrite) Result() *mongo.BulkWriteResult

func (*BulkWrite) Save

func (this *BulkWrite) Save(data any, where ...any)

func (*BulkWrite) SetUpdateFilter added in v1.1.0

func (this *BulkWrite) SetUpdateFilter(filter BulkWriteUpdateFilter)

func (*BulkWrite) Size added in v1.1.0

func (this *BulkWrite) Size() int

Size 等待提交的事务数量

func (*BulkWrite) String added in v1.2.3

func (this *BulkWrite) String() string

func (*BulkWrite) Submit added in v1.1.0

func (this *BulkWrite) Submit() (err error)

Submit 提交修改。提交后 models 被清空,重复调用返回 nil(无操作)

func (*BulkWrite) Update

func (this *BulkWrite) Update(data any, where ...any)

Update 更新 data map[string]any update.Update bson.M

type BulkWriteUpdateFilter added in v1.1.0

type BulkWriteUpdateFilter func(up update.Update)

type Cache added in v1.1.0

type Cache struct {
	// contains filtered or unexported fields
}

func NewCache added in v1.1.0

func NewCache(handle CacheHandle) *Cache

NewCache 创建一个新的缓存实例 handle: 缓存句柄,用于加载和刷新缓存数据

func (*Cache) Cursor added in v1.1.0

func (this *Cache) Cursor(update int64, filter CacheFilter) []any

func (*Cache) Delete added in v1.1.0

func (this *Cache) Delete(id string)

func (*Cache) Get added in v1.1.0

func (this *Cache) Get(id string) any

func (*Cache) Has added in v1.1.0

func (this *Cache) Has(id string) (ok bool)

func (*Cache) Len added in v1.1.0

func (this *Cache) Len() int

func (*Cache) Listener added in v1.1.0

func (this *Cache) Listener(t CacheEventType, id string, update int64)

Listener 监听数据库变化 id 变更数据ID update 变化时间

func (*Cache) Lock added in v1.1.0

func (this *Cache) Lock(f func() error) error

func (*Cache) Page added in v1.1.0

func (this *Cache) Page(page *Paging, filter CacheFilter) (err error)

func (*Cache) Range added in v1.1.0

func (this *Cache) Range(f func(any) bool)

func (*Cache) Reload added in v1.1.0

func (this *Cache) Reload(ts int64, handle ...CacheHandle) error

type CacheData added in v1.1.0

type CacheData struct {
	// contains filtered or unexported fields
}

CacheData 缓存数据结构体,用于存储缓存的模型数据

func NewCacheData added in v1.1.0

func NewCacheData() *CacheData

NewCacheData 创建一个新的缓存数据实例

func (*CacheData) Copy added in v1.1.0

func (this *CacheData) Copy() *CacheData

func (*CacheData) Delete added in v1.1.0

func (this *CacheData) Delete(id any) *CacheData

type CacheEventType added in v1.1.0

type CacheEventType int8

CacheEventType 缓存事件类型

const (
	CacheEventTypeCreate CacheEventType = 0 // 创建事件
	CacheEventTypeUpdate CacheEventType = 1 // 更新事件
	CacheEventTypeDelete CacheEventType = 2 // 删除事件
)

type CacheFilter added in v1.1.0

type CacheFilter func(v CacheModel) any

CacheFilter 缓存过滤函数类型,用于过滤缓存数据,返回nil表示过滤失败

type CacheHandle added in v1.1.0

type CacheHandle interface {
	// Reload 重新加载缓存数据
	// ts: 时间戳,用于加载指定时间之后更新的数据
	// cb: 缓存设置函数,用于将加载的数据添加到缓存中
	Reload(ts int64, cb CacheSetter) error
}

CacheHandle 缓存句柄接口,用于实现缓存的加载和刷新

type CacheModel added in v1.1.0

type CacheModel interface {
	GetUpdate() int64 // 获取模型的更新时间戳
}

CacheModel 缓存模型接口,用于支持缓存功能的模型需要实现此接口

type CacheSetter added in v1.1.0

type CacheSetter func(k any, v CacheModel)

CacheSetter 缓存设置函数类型,用于将模型添加到缓存中

type Config

type Config struct {
	// contains filtered or unexported fields
}

Config GORM config

func (*Config) Register

func (c *Config) Register(model interface{})

Register 预注册的MODEL在启动时会自动创建索引

type Cursor added in v1.2.1

type Cursor interface {
	Decode(val interface{}) error
}

type DB

type DB struct {
	*Config // 数据库配置

	Error        error // 错误信息
	RowsAffected int64 // 操作影响的条数
	// contains filtered or unexported fields
}

DB 是 Cosmo ORM 框架的核心结构体,提供了数据库操作的入口点。 它封装了数据库连接、事务管理、模型映射等功能,支持链式操作。

func New

func New(configs ...*Config) (db *DB)

New 创建一个新的 Cosmo DB 实例。 参数 configs 是可选的数据库配置,可以设置连接池、日志、插件等选项。 如果不提供配置,将使用默认配置。 返回值是 DB 实例,作为所有数据库操作的入口点。

使用示例:

db := cosmo.New(&cosmo.Config{
    PoolConfig: &cosmo.PoolConfig{
        Address: "mongodb://localhost:27017",
        CheckInterval: 30 * time.Second,
    },
})

func (*DB) AutoMigrator

func (db *DB) AutoMigrator(dst ...interface{}) error

AutoMigrator 自动迁移功能,根据模型定义自动创建或更新索引 dst: 要迁移的模型对象,可以传入多个模型 返回值: 迁移过程中发生的错误

func (*DB) BulkWrite

func (db *DB) BulkWrite(model any, filter ...BulkWriteUpdateFilter) *BulkWrite

BulkWrite 创建批量写入操作实例。 参数 model 是要操作的模型类型。 参数 filter 是可选的批量更新过滤器。 返回值是 BulkWrite 实例,用于构建和执行批量写入操作。

使用示例:

bw := db.BulkWrite(&User{}, cosmo.BulkWriteUpdateFilter{
    UpdateBy: "_id",
})

bw.InsertMany(users) bw.UpdateMany(updates) result, err := bw.Execute()

func (*DB) Close

func (db *DB) Close() error

func (*DB) Collection

func (db *DB) Collection(model any) (tx *DB, coll *mongo.Collection)

Collection 获取指定模型或集合名称对应的 MongoDB 集合。 参数 model 可以是结构体类型或集合名称字符串。 返回值 tx 是新的 DB 实例,coll 是 MongoDB 集合对象。

使用示例: tx, coll := db.Collection(&User{}) 或 tx, coll := db.Collection("users")

func (*DB) Count

func (db *DB) Count(count interface{}, conds ...interface{}) (tx *DB)

Count 统计文档数,count 必须为一个指向数字的指针 *int *int32 *int64

func (*DB) Create

func (db *DB) Create(value interface{}) (tx *DB)

Create insert the value into dbname

func (*DB) Database

func (db *DB) Database(dbname string) *DB

Database 创建一个指向指定数据库的新 DB 实例。 参数 dbname 是要使用的新数据库名称。 返回值是新的 DB 实例,用于操作指定的数据库。

使用示例: newDB := db.Database("newdatabase")

func (*DB) Delete

func (db *DB) Delete(conds ...interface{}) (tx *DB)

Delete 删除记录 db.model(&User).delete(1) 匹配 _id=1 db.model(&User).delete([]int{1,2,3}) 匹配 _id IN (1,2,3) db.model(&User).delete("name = ?","myname") 匹配 name=myname db.delete(&User{Id:1}) 根据结构体中的_id字段删除记录

func (*DB) Errorf

func (db *DB) Errorf(format interface{}, args ...interface{}) *DB

Errorf 为数据库实例设置格式化错误信息。 参数 format 是错误格式化字符串或错误对象。 参数 args 是格式化参数。 返回值是当前 DB 实例,方便链式调用。

使用示例: db.Errorf("操作失败: %v", err)

func (*DB) Find

func (db *DB) Find(val any, where ...any) (tx *DB)

Find 仅仅满足 GORM习惯

func (*DB) First added in v1.1.0

func (db *DB) First(val any, where ...any) (tx *DB)

First 获取第一条记录(主键升序)

func (*DB) Inc added in v1.1.0

func (db *DB) Inc(key string, val int) (tx *DB)

func (*DB) IncludeZeroValue added in v1.1.0

func (db *DB) IncludeZeroValue() (tx *DB)

IncludeZeroValue 设置Update Updates 时包含零值 使用 Save 时自动包含零值

func (*DB) IsHealthy added in v1.2.3

func (db *DB) IsHealthy() bool

func (*DB) Last added in v1.1.0

func (db *DB) Last(val any, where ...any) (tx *DB)

Last 获取最后一条记录(主键降序)

func (*DB) Limit

func (db *DB) Limit(limit int) (tx *DB)

func (*DB) Model

func (db *DB) Model(value any, modify ...bool) (tx *DB)

Model specify the model you would like to run db operations

// update all users's name to `hello`
db.model(&User{}).Update("name", "hello")
// if user's primary key is non-blank, will use it as condition, then will only update the user's name to `hello`
db.model(&user).Update("name", "hello")

func (*DB) Multiple

func (db *DB) Multiple() (tx *DB)

Multiple 强制批量更新

func (*DB) ObjectID added in v0.0.3

func (db *DB) ObjectID() bson.ObjectID

func (*DB) Omit

func (db *DB) Omit(columns ...string) (tx *DB)

Omit specify fields that you want to ignore when creating, updating and querying

func (*DB) Order

func (db *DB) Order(key string, value int) (tx *DB)

Order specify order when retrieve records from dbname Order 排序方式 1 和 -1 来指定排序的方式,其中 1 为升序排列,而 -1 是用于降序排列。

func (*DB) Page

func (db *DB) Page(paging *Paging, where ...any) (tx *DB)

Page 分页查询

func (*DB) PageUpdateField added in v1.2.6

func (db *DB) PageUpdateField(field string) (tx *DB)

func (*DB) Query added in v1.1.0

func (db *DB) Query(val any, where ...any) (tx *DB)

Query get records that match given conditions value must be a pointer to a slice

func (*DB) Range added in v1.2.1

func (db *DB) Range(f func(Cursor) bool) (tx *DB)

Range 遍历

func (*DB) Save added in v1.1.0

func (db *DB) Save(values any, conds ...any) (tx *DB)

func (*DB) Select

func (db *DB) Select(columns ...string) (tx *DB)

Select specify fields that you want when querying, creating, updating

func (*DB) Session

func (db *DB) Session(session *Session) *DB

Session 创建一个新的数据库会话。 参数 session 包含会话配置,如数据库名称、上下文等。 返回值是新的 DB 实例,用于执行会话相关的操作。

使用示例:

sessionDB := db.Session(&cosmo.Session{
    DBName: "newdatabase",
    Context: ctx,
})

func (*DB) Set

func (db *DB) Set(key string, val any) (tx *DB)

func (*DB) SetColumn

func (db *DB) SetColumn(data map[string]interface{}) (err error)

SetColumn set column's value to model

stmt.SetColumn("Name", "jinzhu") // Hooks Method

func (*DB) Start

func (db *DB) Start(dbname string, address interface{}) (err error)

Start 初始化数据库连接并启动连接池。 参数 dbname 是要使用的数据库名称。 参数 address 可以是 MongoDB 连接字符串或 *PoolManager 实例。 返回值是可能的错误信息。

使用示例: err := db.Start("mydatabase", "mongodb://localhost:27017") 或 pool := cosmo.NewPoolManager("mongodb://localhost:27017") err := db.Start("mydatabase", pool)

func (*DB) Table

func (db *DB) Table(name string) (tx *DB)

func (*DB) Take added in v1.1.0

func (db *DB) Take(val any, where ...any) (tx *DB)

Take 获取一条记录,没有指定排序字段

func (*DB) Update

func (db *DB) Update(values any, conds ...any) (tx *DB)

func (*DB) UpdateAndModify added in v1.2.2

func (db *DB) UpdateAndModify() (tx *DB)

UpdateAndModify 更新单行数据(update,save)时同时更新model

func (*DB) Updates added in v1.1.0

func (db *DB) Updates(values any, conds ...any) (tx *DB)

Updates 更新多列 Updates 方法支持 struct 和 map[string]interface{} 参数。当使用 struct 更新时,默认情况下只会更新非零值的字段 如果您想要在更新时选择、忽略某些字段,您可以使用 Select、Omit 自动关闭 updateAndModify

func (*DB) Upsert added in v0.0.3

func (db *DB) Upsert() (tx *DB)

Upsert update时如果不存在自动insert update 存在 $setOnInsert 时同样自动设置Upsert

func (*DB) Where

func (db *DB) Where(query interface{}, args ...interface{}) (tx *DB)

Where 查询条件 参考 query包

func (*DB) WithContext

func (db *DB) WithContext(ctx context.Context) *DB

WithContext 为当前数据库实例设置新的上下文。 参数 ctx 是要设置的上下文对象。 返回值是新的 DB 实例,包含更新后的上下文。

使用示例: ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() dbWithCtx := db.WithContext(ctx)

type ModelBulkWriteFilter added in v1.1.0

type ModelBulkWriteFilter interface {
	BulkWriteFilter(up update.Update)
}

type Paging

type Paging struct {
	//order  []bson.E    //排序
	Rows   interface{} `json:"rows"`
	Page   int         `json:"page"`             //当前页
	Size   int         `json:"size"`             //每页大小
	Total  int         `json:"total"`            //总页码数
	Record int         `json:"record"`           //总记录数
	Update int64       `json:"update,omitempty"` //最后更新时间
}

Paging 分页

func (*Paging) Init added in v0.0.5

func (this *Paging) Init(size int)

func (*Paging) Offset

func (this *Paging) Offset() int

func (*Paging) Options

func (this *Paging) Options() options.Lister[options.FindOptions]

Options 转换成FindOptions

func (*Paging) Range added in v1.1.0

func (this *Paging) Range(page int, handle func(int))

Range 遍历第N页的索引下标

func (*Paging) Result added in v0.0.5

func (this *Paging) Result(r int)

type Session

type Session struct {
	DBName string
	//DryRun                   bool
	//PrepareStmt              bool
	//NewDB     bool
	//SkipHooks bool
	//SkipDefaultTransaction   bool
	//DisableNestedTransaction bool
	//AllowGlobalUpdate        bool
	//FullSaveAssociations     bool
	//QueryFields              bool
	Context context.Context
}

Session session config when create session with Session() method

type Statement

type Statement struct {
	*DB // 数据库连接实例

	Clause  *clause.Query   // 查询条件构建器
	Paging  *Paging         // 分页信息
	Context context.Context // 操作上下文
	// contains filtered or unexported fields
}

Statement 表示一个MongoDB数据库操作语句 用于构建和执行查询、插入、更新、删除等操作 包含操作所需的所有信息,如查询条件、排序、分页、更新字段等

func NewStatement

func NewStatement(db *DB) *Statement

NewStatement 创建一个新的Statement实例 参数 db: 数据库连接实例 返回值: 初始化的Statement实例

func (*Statement) DBName

func (stmt *Statement) DBName(name string) string

DBName 将结构体字段名转换为数据库字段名 参数 name: 结构体字段名 返回值: 对应的数据库字段名

func (*Statement) GetIncludeZeroValue added in v1.2.2

func (stmt *Statement) GetIncludeZeroValue() bool

GetIncludeZeroValue 获取是否包含零值的设置 返回值: 更新时是否包含零值字段

func (*Statement) GetReflectValue added in v1.2.2

func (stmt *Statement) GetReflectValue() reflect.Value

GetReflectValue 获取模型的反射值 返回值: 模型对象的反射值

func (*Statement) GetSchema added in v1.2.2

func (stmt *Statement) GetSchema() *schema.Schema

GetSchema 获取模型的schema信息 返回值: 模型的元数据信息

func (*Statement) GetSelector added in v1.2.2

func (stmt *Statement) GetSelector() *update.Selector

GetSelector 获取更新字段选择器 返回值: 更新时的字段选择器

func (*Statement) GetValue added in v1.2.2

func (stmt *Statement) GetValue() any

GetValue 获取结果存储对象 返回值: 用于存储查询结果或写入数据的对象

func (*Statement) Order

func (stmt *Statement) Order() (order bson.D)

Order 生成MongoDB排序条件 将内部的排序映射转换为MongoDB的bson.D格式 返回值: 排序条件

func (*Statement) Parse

func (stmt *Statement) Parse() (tx *DB)

Parse 解析模型并初始化Statement 处理模型的反射信息、schema映射和表名 返回值: 数据库实例,包含可能的错误信息

Directories

Path Synopsis
Package clause 提供了 MongoDB 查询条件构建的功能,支持各种查询操作符和复杂条件组合。
Package clause 提供了 MongoDB 查询条件构建的功能,支持各种查询操作符和复杂条件组合。
update 包提供MongoDB更新操作的相关功能,包括更新条件构建和字段选择
update 包提供MongoDB更新操作的相关功能,包括更新条件构建和字段选择

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL