mts

package module
v0.0.0-...-f634b87 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 22, 2026 License: Apache-2.0 Imports: 10 Imported by: 0

README

micro-ts

Go Version License

高性能微时序数据库,专为高吞吐写入和快速时间范围查询设计。

特性

  • 高性能写入:全局 WAL + 全局 MemTable,单点写入路径极短
  • 💾 分层存储:MemTable → unordered SSTable → stable L0 SSTable,逐级下沉
  • 🔄 异步 Compaction:后台自动将无序数据分拣排序写入有序层
  • 🛡️ 崩溃恢复:WAL 预写日志 + checkpoint 机制,重启自动恢复
  • 🗄️ 元数据管理:bbolt 嵌入式 KV 存储,ACID 事务保证一致性
  • 🔌 gRPC 接口:高性能远程访问接口
  • 🔍 时间范围查询:纳秒级精度,合并内存/无序/有序三层数据
  • 🗜️ 数据压缩:支持 Snappy / LZ4 块压缩

架构

                              ┌──────────────────────┐
                              │     gRPC API Layer    │
                              └──────────┬───────────┘
                                         │
                              ┌──────────▼───────────┐
                              │    Engine (引擎)      │
                              │  写入协调 | 查询合并   │
                              └──────────┬───────────┘
                                         │
              ┌──────────────────────────┼──────────────────────────┐
              │                          │                          │
    ┌─────────▼────────┐    ┌───────────▼──────────┐    ┌──────────▼─────────┐
    │  全局 WAL         │    │  全局 MemTable       │    │  ShardManager       │
    │  (预写日志)       │    │  (内存有序跳表)      │    │  (Shard 生命周期)   │
    └─────────┬────────┘    └───────────┬──────────┘    └──────────┬─────────┘
              │ 写后即返回              │ 背压触发刷盘              │
              │                         │                          │
              │              ┌──────────▼──────────┐               │
              │              │  FlushCoordinator    │               │
              │              │  (异步刷盘编排)      │               │
              │              └──────────┬──────────┘               │
              │                         │                          │
              │              ┌──────────▼──────────┐               │
              │              │  unordered/          │               │
              │              │  (未排序 SSTable)    │               │
              │              └──────────┬──────────┘               │
              │                         │                          │
              │              ┌──────────▼──────────┐    ┌──────────▼─────────┐
              │              │  UnorderedCompactor  │───▶│  stable/L0/         │
              │              │  (分拣排序 → L0)     │    │  (有序 SSTable)     │
              │              └─────────────────────┘    └─────────────────────┘
              │
    ┌─────────▼────────┐
    │  Metadata (bbolt) │
    │  Catalog | Series │
    │  | ShardIndex     │
    └──────────────────┘
核心组件
组件 职责
Engine 入口协调层:写入、查询合并、崩溃恢复
全局 WAL 单 WAL 实例,所有写入先入 WAL 保证持久性
全局 MemTable 单 MemTable 实例,跳表排序,双 buffer(active/passive)无锁 swap
FlushCoordinator 定时检查 MemTable 水位,触发 swap → 写入 unordered → 清理 WAL
unordered/ 存放从 MemTable 落盘但未排序的 SSTable 文件
UnorderedCompactor 扫描 unordered 文件,按 (db, measurement, shard) 分拣排序后写入 stable L0
ShardManager 管理 shard 目录生命周期,提供 L0 写入路径
SSTable 有序不可变字符串表,支持块压缩(Snappy/LZ4)和稀疏索引
Metadata bbolt 单文件存储,管理 Catalog、Series、ShardIndex

写流程

Point ──▶ engine.Write()
           │
           ├─ 1. 校验 & 自动创建 db/measurement
           ├─ 2. 分配 SID (Series ID)
           ├─ 3. Point → MemPoint (序列化 FieldData, 池化零拷贝)
           ├─ 4. MemPoint → WAL 序列化 (version + db + meas + ts + sid + fieldData)
           ├─ 5. WAL 写入 (LZ4 压缩, small payload <80B 跳过)
           │      └─ 写缓冲聚合 (1MB), 定时 fsync
           ├─ 6. MemTable.Write (跳表插入, O(log N))
           │
           └─ 背压检查: MemTable 满?
                  ├─ 是 → flushWithRetry (最多 10 次, 间隔 50ms)
                  └─ 否 → 返回
刷盘流程 (FlushCoordinator, 每 1s 检查)
触发条件: FlushMemorySize/FlushPointCount 达到 2x / FlushIdle 超时

active MemTable ──Swap──▶ passive ──▶ unordered.Write()
                                         │
                                         ├─ 按 (db, meas) 分组
                                         ├─ 每组写入 SSTable (unordered/db/meas/sst_N.bin)
                                         └─ 写入成功后释放 FieldData 回池
                              │
                              ├─ ClearPassive()
                              └─ WAL TruncateBefore (删除已刷盘的 WAL segment)
Compaction 流程 (UnorderedCompactor, 每 500ms)
unordered/sst_*.bin ──▶ 扫描所有文件
                         │
                         ├─ 读取每个 SSTable 的全部数据
                         ├─ 按 (db, measurement, shardStart) 分组
                         ├─ 每组内按 Timestamp 排序
                         ├─ 写入 stable/{db}/{meas}/{shardStart}_{shardEnd}/data/L0/
                         └─ 删除源 unordered 文件

通过 L0 Compaction,后续的 Level Compaction (L0→L1→...) 由 ShardManager 内部的 Compaction 机制处理。

读流程

QueryRange ──▶ engine.Iterator()
                │
                ├─ 1. 收集三层数据源:
                │      ├─ 全局 MemTable (未刷盘的热数据, 已排序)
                │      ├─ unordered SSTable (已刷盘但未排序, 按文件读取后排序)
                │      └─ stable Shard SSTable (已 compaction 的有序数据)
                │
                ├─ 2. 构建 MergeIterator:
                │      合并三层 Iterator, 按 Timestamp 归并输出
                │
                └─ 3. 流式返回:
                       Next() 按需拉取, 避免全量加载
数据流路径
写入时:  WAL ──▶ MemTable ──▶ unordered ──▶ stable L0 ──▶ L1...(Level Compaction)
查询时:  结果 = Merge(MemTable, unordered, stable)

崩溃恢复

启动 ──▶ 1. 加载 Metadata (bbolt)
        ├─ 2. 恢复 unordered seq (扫描最大序列号)
        ├─ 3. 发现已有 Shard (填充 ShardManager 缓存)
        ├─ 4. WAL.Replay()
        │      ├─ 加载 checkpoint, 跳过已持久化的旧 segment
        │      ├─ 反序列化每条 WAL 记录 → MemPoint
        │      ├─ MemTable 接近满时先刷盘
        │      └─ 写入全局 MemTable
        ├─ 5. MemTable.Sort() (重建跳表顺序)
        └─ 6. TruncateBefore (清理已回放的 WAL segment)

目录结构

{dataDir}/
├── wal/                          # 全局 WAL segment
│   ├── {gen}_{seq}.wal           # segment 文件 (含 LZ4 压缩)
│   └── checkpoint                # 已持久化标记
├── metadata.db                   # bbolt 元数据
├── unordered/                    # 未排序 SSTable
│   └── {db}/{meas}/sst_{N}.bin
├── {db}/{meas}/                  # 有序 SSTable
│   └── {shardStart}_{shardEnd}/
│       └── data/
│           └── L{N}/sst_{M}.bin  # Level Compaction 层级
└── internal/
    ├── engine/                   # 引擎 (写入、查询、恢复)
    ├── query/                    # 查询执行器 (归并迭代器)
    ├── storage/
    │   ├── wal/                  # WAL 实现
    │   ├── memtable/             # 内存跳表
    │   ├── unordered/            # 未排序 SSTable 管理
    │   ├── compaction/           # Compaction 策略 (含 UnorderedCompactor)
    │   ├── shard/                # Shard 管理 & SSTable
    │   │   ├── sstable/          # SSTable 读写
    │   │   └── compression/      # Snappy/LZ4
    │   └── metadata/             # bbolt 元数据
    ├── api/                      # gRPC 服务
    └── metrics/                  # 内部指标

快速开始

安装
go get codeberg.org/micro-ts/mts
基本使用
package main

import (
    "context"
    "log"
    "time"

    microts "codeberg.org/micro-ts/mts"
)

func main() {
    db, err := microts.Open(microts.Config{
        DataDir: "/tmp/microts",
    })
    if err != nil {
        log.Fatal(err)
    }
    defer db.Close()

    // 写入
    point := &microts.Point{
        Database:    "metrics",
        Measurement: "cpu",
        Tags:        map[string]string{"host": "server1"},
        Timestamp:   time.Now().UnixNano(),
        Fields:      map[string]any{"usage": 85.5},
    }
    if err := db.Write(context.Background(), point); err != nil {
        log.Fatal(err)
    }

    // 批量写入
    points := []*microts.Point{point1, point2, point3}
    if err := db.WriteBatch(context.Background(), points); err != nil {
        log.Fatal(err)
    }

    // 范围查询
    resp, err := db.QueryRange(context.Background(), &microts.QueryRangeRequest{
        Database:    "metrics",
        Measurement: "cpu",
        StartTime:   time.Now().Add(-time.Hour).UnixNano(),
        EndTime:     time.Now().UnixNano(),
    })
    if err != nil {
        log.Fatal(err)
    }
    log.Printf("查询到 %d 条记录", len(resp.Rows))
}

配置说明

config := microts.Config{
    DataDir:       "/var/lib/microts",
    ShardDuration: 24 * time.Hour,       // Shard 时间窗口 (默认 7d)

    MemTableCfg: &microts.MemTableConfig{
        FlushMemorySize: 64 * 1024 * 1024,   // 内存阈值 (默认 64MB)
        FlushPointCount: 50000,              // 条目阈值 (默认 10000)
        FlushIdleNanos: int64(time.Minute),  // 空闲刷盘时间 (默认 1min)
    },

    CompactionCfg: &microts.CompactionConfig{
        MaxSstableCount: 4,               // 触发 compaction 阈值
        CheckInterval:   time.Hour,       // 检查间隔
    },

    CompressionAlgorithm: microts.CompressionLZ4,  // none/snappy/lz4
    RetentionPeriod:      30 * 24 * time.Hour,     // 数据保留期 (0=永久)
}
配置建议
场景 FlushMemorySize FlushPointCount FlushIdle ShardDuration
高频写入 (IoT) 128MB 100000 30s 1h
中频写入 (监控) 64MB 50000 1min 24h
低频写入 (日志) 32MB 10000 5min 7d

性能

规模 TPS 内存占用 磁盘占用
1K ~552K ~7 MB ~0.4 MB
10K ~555K ~10 MB ~0.8 MB
100K ~568K ~38 MB ~5.5 MB
1M ~228K ~188 MB ~42 MB

1M 数据点: TotalAlloc ~3.3GB, GC 周期 ~26 次, 磁盘压缩比 ~1.8x (LZ4)。

运行基准测试
go test -bench=. -benchmem ./internal/storage/...
E2E 测试
# 写入性能
cd tests/e2e/write_1m && go run main.go

# 数据完整性
cd tests/e2e/integrity && go run main.go

# 崩溃恢复
cd tests/e2e/restart_recovery && go run main.go

开发

运行测试
go test ./...                        # 全部单元测试
cd tests/e2e/{test_dir} && go run main.go  # 单个 E2E 测试
代码规范
  • 代码行覆盖率 ≥ 90%
  • golangci-lint 代码检查
  • goimports-reviser 格式化导入
  • 禁止在 for 循环中使用 defer
  • 目录权限 0700,文件权限 0600
Git GPG 签名免交互配置

使用 GPG 签名提交时,默认会触发交互式密码输入。以下配置可实现自动化:

1. 配置 gpg-agent 使用 loopback 模式

echo "allow-loopback-pinentry" >> ~/.gnupg/gpg-agent.conf

2. 重启 gpg-agent

killall gpg-agent 2>/dev/null || true
gpg-agent --homedir ~/.gnupg --daemon

3. 配置 Git 使用 GPG 签名

git config --global gpg.program gpg
git config --global commit.gpgsign true
git config --global user.signingkey <YOUR_GPG_KEY_ID>

4. 设置环境变量自动提供密码

export GPG_TTY=$(tty)
export PINENTRY_USER_DATA="<YOUR_PASSPHRASE>"

或直接在命令行中指定:

git commit -s -S -m "提交信息"

5. 验证配置

git log --show-signature -1

License

Apache 2.0 © 2026 micro-ts Authors

Documentation

Overview

Package mts 提供高性能时序数据库功能。

mts(micro time-series)是一个专为高性能时间序列数据存储设计的数据库。 它支持高效的点写入、批量写入和范围查询操作。

基本使用示例:

db, err := mts.Open(mts.Config{
    DataDir:       "/var/lib/mts",
    ShardDuration: 7 * 24 * time.Hour,
})
if err != nil {
    log.Fatal(err)
}
defer db.Close()

// 写入数据点
point := &mts.Point{
    Database:    "metrics",
    Measurement: "cpu",
    Tags:        map[string]string{"host": "server1"},
    Timestamp:   time.Now().UnixNano(),
    Fields:      map[string]any{"usage": 45.2},
}
if err := db.Write(ctx, point); err != nil {
    log.Fatal(err)
}

Index

Constants

View Source
const (
	CompressionNone   = types.CompressionNone
	CompressionSnappy = types.CompressionSnappy
	CompressionLZ4    = types.CompressionLZ4
)

压缩算法常量

Variables

This section is empty.

Functions

func DefaultMemTableConfig

func DefaultMemTableConfig() *types.MemTableConfig

DefaultMemTableConfig 返回默认的 MemTable 配置。

默认配置:

  • FlushMemorySize: 64MB,内存表最大内存占用
  • FlushPointCount: 10000,最大条目数
  • FlushIdle: 1分钟,空闲时间阈值

返回:

MemTableConfig: 包含默认值的配置结构

使用示例:

cfg := mts.Config{
    DataDir:       "/data",
    MemTableCfg:   mts.DefaultMemTableConfig(),
}

Types

type CompactionConfig

type CompactionConfig = types.CompactionConfig

CompactionConfig 配置 Compaction 行为。

控制 Compaction 的触发策略、批处理大小和资源限制。

使用示例:

cfg := mts.CompactionConfig{
    MaxSSTableCount: 4,
    CheckInterval:   1 * time.Hour,
}

func DefaultCompactionConfig

func DefaultCompactionConfig() *CompactionConfig

DefaultCompactionConfig 返回默认的 Compaction 配置。

type Config

type Config = types.Config

Config 是数据库的配置选项。

DataDir 指定数据存储目录,必须可写。 ShardDuration 定义每个 shard 的时间窗口,最小 1 小时,默认 7 天。 MemTableCfg 配置内存表行为,使用 nil 时将采用 DefaultMemTableConfig() 的默认值。 CompactionCfg 配置 compaction 行为,使用 nil 时将采用默认配置。 RetentionPeriod 配置数据保留期,0 表示不自动删除过期数据。 RetentionCheckInterval 配置 retention 检查间隔,默认 1 小时。

type DB

type DB struct {
	// contains filtered or unexported fields
}

DB 是微时序数据库的主结构。

提供完整的时序数据存储和查询功能。 DB 实例是并发安全的,可以从多个 goroutine 同时访问。

使用 Open 函数创建 DB 实例,使用 Close 方法关闭。 关闭后不可再使用,否则会导致未定义行为。

func Open

func Open(cfg *Config) (*DB, error)

Open 打开或创建一个时序数据库实例。

参数:

  • cfg: 数据库配置,包含数据目录、分片时长和 MemTable 配置

返回:

  • *DB: 数据库实例
  • error: 如果目录创建失败或引擎初始化失败则返回错误

配置文件默认值:

  • ShardDuration: 默认为 7 天
  • MemTableCfg: 使用 DefaultMemTableConfig()

使用示例:

db, err := mts.Open(&mts.Config{
    DataDir:       "/var/lib/mts",
    ShardDuration: 7 * 24 * time.Hour,
})
if err != nil {
    log.Fatal(err)
}
defer db.Close()

注意:必须调用 Close 释放资源。

func (*DB) Close

func (db *DB) Close() error

Close 关闭数据库,释放所有资源。

返回:

  • error: 关闭失败时返回错误

注意:

关闭前会尝试将所有 MemTable 数据刷写到 SSTable。
关闭后 DB 实例不可再使用。
多个 Close() 调用是安全的,但只有第一个会生效。
建议配合 defer 使用:

	db, err := mts.Open(cfg)
	if err != nil {
	    log.Fatal(err)
	}
	defer db.Close()

func (*DB) CreateDatabase

func (db *DB) CreateDatabase(ctx context.Context, database string, retention time.Duration, downsample *types.DownsampleConfig) error

CreateDatabase 创建一个新的数据库。

参数:

  • ctx: 上下文
  • database: 数据库名称

返回:

  • error: 创建失败时返回错误(当前不会返回错误)

说明:

创建 DatabaseMetaStore 并注册到 Engine。
实际的数据目录在第一次写入时才会创建。
如果数据库已存在,不会返回错误。

使用示例:

if err := db.CreateDatabase(ctx, "new_metrics"); err != nil {
    log.Fatal(err)
}

func (*DB) CreateEmptyMeasurement

func (db *DB) CreateEmptyMeasurement(database, measurement string) error

CreateEmptyMeasurement 创建一个空目录结构的 Measurement(用于兼容旧 API)。 此函数在首次写入数据时自动调用,通常不需要手动调用。

参数:

  • database: 数据库名称
  • measurement: Measurement 名称

返回:

  • error: 创建失败时返回错误

func (*DB) CreateMeasurement

func (db *DB) CreateMeasurement(ctx context.Context, database, measurement string) error

CreateMeasurement 在指定数据库中创建一个新的 Measurement。

参数:

  • ctx: 上下文
  • database: 数据库名称
  • measurement: Measurement 名称

返回:

  • error: 创建失败时返回错误

说明:

创建 DatabaseMetaStore(如果不存在)并创建 MeasurementMetaStore。
实际的数据目录在第一次写入时才会创建。
如果 measurement 已存在,不会返回错误。

使用示例:

if err := db.CreateMeasurement(ctx, "metrics", "cpu_usage"); err != nil {
    log.Fatal(err)
}

func (*DB) DataDir

func (db *DB) DataDir() string

DataDir 返回数据库的数据目录路径。

返回:

  • string: 数据目录路径

func (*DB) DropDatabase

func (db *DB) DropDatabase(ctx context.Context, database string) error

DropDatabase 删除指定的数据库。

参数:

  • ctx: 上下文
  • database: 数据库名称

返回:

  • error: 删除失败时返回错误

警告:

此操作会永久删除该数据库的所有元数据(包括所有 Measurement)。
磁盘上的数据文件不会被立即删除,需要单独清理。
如果数据库不存在,返回错误。

使用示例:

if err := db.DropDatabase(ctx, "old_metrics"); err != nil {
    log.Fatal(err)
}

func (*DB) DropMeasurement

func (db *DB) DropMeasurement(ctx context.Context, database, measurement string) error

DropMeasurement 删除指定的 Measurement。

参数:

  • ctx: 上下文
  • database: 数据库名称
  • measurement: Measurement 名称

返回:

  • error: 删除失败时返回错误

警告:

此操作会永久删除该 Measurement 的元数据(Schema、Series、Tag 索引)。
磁盘上的数据文件不会被立即删除,需要单独清理。
如果数据库或 Measurement 不存在,返回错误。

使用示例:

if err := db.DropMeasurement(ctx, "metrics", "old_metric"); err != nil {
    log.Fatal(err)
}

func (*DB) Execute

func (db *DB) Execute(ctx context.Context, plan *types.QueryPlan) (*query.RowIterator, error)

Execute 执行查询计划,通过算子 Pipeline 执行 Filter、GroupBy、Aggregate、Sort、Project 等操作。

参数:

  • ctx: 上下文
  • plan: 查询计划,由 query.Builder 构建

返回:

  • *query.RowIterator: 算子 Pipeline 迭代器
  • error: 执行失败时返回错误

使用示例:

plan := query.NewBuilder().
    Select("avg(cpu)", "host").
    From("metrics", "cpu").
    TimeRange(start, end).
    GroupBy("host").
    Build()
iter, err := db.Execute(ctx, plan)
if err != nil {
    log.Fatal(err)
}
defer iter.Close()
for iter.Next(ctx) {
    row := iter.Points()
    process(row)
}

func (*DB) FlushAll

func (db *DB) FlushAll() error

FlushAll 刷新所有 Shard 的 MemTable 到 SSTable。

返回:

  • error: 如果任一 Shard 刷盘失败则返回错误

说明:

遍历所有已创建的 Shard,调用其 Flush() 方法。
用于确保所有内存数据持久化,或在关闭前确保数据安全。

func (*DB) ForceDownsample

func (db *DB) ForceDownsample()

ForceDownsample 手动触发一次降采样处理(用于测试和运维)。

func (*DB) Iterator

func (db *DB) Iterator(ctx context.Context, req *types.QueryRangeRequest) (*query.Iterator, error)

Iterator 创建流式查询迭代器,用于处理大量数据而不占用大量内存。

流式迭代器只在需要时加载数据,适合处理海量数据查询。

参数:

  • ctx: 上下文,可用于取消迭代
  • req: 查询请求,支持时间范围、字段过滤和标签过滤

返回:

  • *query.Iterator: 查询迭代器,使用完后必须调用 Close()
  • error: 创建失败时返回错误

使用示例:

it, err := db.Iterator(ctx, &mts.QueryRangeRequest{
    Database:    "metrics",
    Measurement: "cpu",
    StartTime:   start.UnixNano(),
    EndTime:     end.UnixNano(),
})
if err != nil {
    log.Fatal(err)
}
defer it.Close()

for it.Next(ctx) {
    row := it.Points()
    process(row)
}

func (*DB) ListDatabases

func (db *DB) ListDatabases(ctx context.Context) ([]string, error)

ListDatabases 列出所有数据库名称。

参数:

  • ctx: 上下文

返回:

  • []string: 数据库名称列表(按字母序排序)
  • error: 查询失败时返回错误(当前不会返回错误)

实现说明:

从 Engine 的 dbMetaStores 获取所有数据库名称。
这反映了当前已知的所有数据库(有元数据的)。
返回的列表按字母序排序。

使用示例:

databases, err := db.ListDatabases(ctx)
if err != nil {
    log.Fatal(err)
}
for _, dbName := range databases {
    fmt.Println(dbName)
}

func (*DB) ListMeasurements

func (db *DB) ListMeasurements(ctx context.Context, database string) ([]string, error)

ListMeasurements 列出指定数据库中的所有 Measurement 名称。

参数:

  • ctx: 上下文
  • database: 数据库名称

返回:

  • []string: Measurement 名称列表(按字母序排序)
  • error: 如果数据库不存在返回错误

实现说明:

从 Engine 的 DatabaseMetaStore 获取元数据,遍历 measurement keys。
这反映了当前已知的所有 measurement(有元数据的)。
返回的列表按字母序排序。

使用示例:

measurements, err := db.ListMeasurements(ctx, "metrics")
if err != nil {
    log.Fatal(err)
}
for _, m := range measurements {
    fmt.Println(m)
}

func (*DB) QueryRange

func (db *DB) QueryRange(ctx context.Context, req *types.QueryRangeRequest) (*types.QueryRangeResponse, error)

QueryRange 执行范围查询,返回指定时间范围内的数据点。

查询会自动合并 MemTable(内存数据)和 SSTable(磁盘数据)的结果。 数据按时间戳升序返回。

参数:

  • ctx: 上下文,可用于取消查询
  • req: 查询请求,包含时间范围、字段列表、标签过滤和分页参数

返回:

  • *QueryRangeResponse: 包含查询结果行、总数和是否有更多数据
  • error: 查询失败时返回错误

说明:

底层使用流式迭代器查询,本方法会迭代读取所有数据后返回。
如果需要处理大量数据,建议使用 Iterator 直接流式处理,
可以避免在内存中累积所有结果。

分页处理:

使用 Offset 和 Limit 进行分页。当 HasMore 为 true 时,
可以通过设置 Offset = 已返回行数 来获取下一页。

字段过滤:

如果 Fields 为空,返回所有字段。
如果指定字段,只返回这些字段的值。

使用示例:

resp, err := db.QueryRange(ctx, &mts.QueryRangeRequest{
    Database:    "metrics",
    Measurement: "cpu",
    StartTime:   start.UnixNano(),
    EndTime:     end.UnixNano(),
    Fields:      []string{"usage", "temperature"},
    Tags:        map[string]string{"host": "server1"},
    Offset:      0,
    Limit:       1000,
})
if err != nil {
    log.Fatal(err)
}
for _, row := range resp.Rows {
    fmt.Printf("时间: %d, 字段: %v\n", row.Timestamp, row.Fields)
}

func (*DB) Write

func (db *DB) Write(ctx context.Context, point *types.Point) error

Write 写入单个数据点到数据库。

参数:

  • ctx: 上下文,可用于取消操作(会透传到引擎层)
  • point: 要写入的数据点,包含时间戳、标签和字段

返回:

  • error: 写入失败时返回错误,错误类型包括 IO 错误和序列化错误

时间戳排序:

MemTable 会自动维护按时间戳排序的条目。
当写入乱序数据时,会触发内部排序操作。

持久化保证:

数据首先写入 WAL (Write-Ahead Log),然后写入 MemTable。
只有 WAL 写入成功后,写入才被认为是成功的。

使用示例:

point := &mts.Point{
    Database:    "metrics",
    Measurement: "cpu",
    Tags:        map[string]string{"host": "server1"},
    Timestamp:   time.Now().UnixNano(),
    Fields:      map[string]any{"usage": 45.2},
}
if err := db.Write(ctx, point); err != nil {
    log.Printf("写入失败: %v", err)
}

func (*DB) WriteBatch

func (db *DB) WriteBatch(ctx context.Context, points []*types.Point) error

WriteBatch 批量写入多个数据点。

相比多次调用 Write,批量写入可以减少系统调用开销,提高吞吐量。 注意:WriteBatch 不保证原子性。如果某个点写入失败,已成功写入的点不会回滚, 调用方需要自行处理部分写入的情况(通过重试或丢弃受影响的数据)。

参数:

  • ctx: 上下文
  • points: 要写入的数据点切片

返回:

  • error: 写入失败时返回错误,包含失败点的时间戳信息

性能建议:

建议每个批次包含 100-1000 个点,具体取决于数据点大小。
批次过大可能导致内存占用过高。

使用示例:

points := []*mts.Point{p1, p2, p3}
if err := db.WriteBatch(ctx, points); err != nil {
    log.Printf("批量写入失败: %v", err)
}

type MemTableConfig

type MemTableConfig = types.MemTableConfig

MemTableConfig 配置内存表的行为。

控制 MemTable 的大小、条目数和空闲时间,当达到任一阈值时会触发刷盘。

使用示例:

cfg := mts.MemTableConfig{
    FlushMemorySize:  64 * 1024 * 1024, // 64MB
    FlushPointCount: 10000,
    FlushIdle:  5 * time.Minute,
}

type Point

type Point = types.Point

Point 是写入时序数据库的基本数据单元。

每个 Point 属于特定的 Database 和 Measurement,包含时间戳、标签集合和字段值。 标签用于标识时间序列,字段存储实际的数据值。 Timestamp 使用纳秒级 Unix 时间戳。

使用示例:

point := &mts.Point{
    Database:    "metrics",
    Measurement: "cpu",
    Tags: map[string]string{
        "host":   "server1",
        "region": "us-east-1",
    },
    Timestamp: time.Now().UnixNano(),
    Fields: map[string]any{
        "value": 45.2,
    },
}

type PointRow

type PointRow = types.PointRow

PointRow 是查询结果的单行数据。

包含 Series ID (SID)、时间戳、标签和字段值。 SID 是内部生成的序列标识符,用户通常不需要直接使用。

type QueryRangeRequest

type QueryRangeRequest = types.QueryRangeRequest

QueryRangeRequest 定义范围查询的请求参数。

支持按时间范围 (StartTime, EndTime)、字段列表和标签过滤器查询数据。 Offset 和 Limit 用于分页查询,Limit = 0 表示不限制。 时间戳使用纳秒级 Unix 时间戳。

使用示例:

req := &mts.QueryRangeRequest{
    Database:    "metrics",
    Measurement: "cpu",
    StartTime:   startTime.UnixNano(),
    EndTime:     endTime.UnixNano(),
    Fields:      []string{"value"},           // 只返回指定字段
    Tags:        map[string]string{"host": "server1"}, // 过滤标签
    Offset:      0,
    Limit:       1000,
}

type QueryRangeResponse

type QueryRangeResponse = types.QueryRangeResponse

QueryRangeResponse 是范围查询的响应。

Directories

Path Synopsis
cmd
server command
Package main provides the micro-ts gRPC server entry point.
Package main provides the micro-ts gRPC server entry point.
internal
api
Package api 实现 gRPC API 服务。
Package api 实现 gRPC API 服务。
api/auth
Package auth 提供 gRPC 认证拦截器。
Package auth 提供 gRPC 认证拦截器。
engine
Package engine 实现微时序数据库的存储引擎。
Package engine 实现微时序数据库的存储引擎。
metrics
Package metrics 提供 expvar 指标定义与埋点。
Package metrics 提供 expvar 指标定义与埋点。
query
Package query 实现查询处理和执行。
Package query 实现查询处理和执行。
storage
Package storage 提供存储相关的工具函数。
Package storage 提供存储相关的工具函数。
storage/compaction
Package compaction 实现 Level Compaction 策略。
Package compaction 实现 Level Compaction 策略。
storage/downsample
Package downsample 实现时序数据降采样服务。
Package downsample 实现时序数据降采样服务。
storage/metadata
Package metadata 提供全局元数据管理。
Package metadata 提供全局元数据管理。
storage/shard
Package shard 实现分片存储管理。
Package shard 实现分片存储管理。
storage/shard/compression
Package compression 提供数据压缩工具。
Package compression 提供数据压缩工具。
storage/shard/sstable
internal/storage/shard/sstable/index.go
internal/storage/shard/sstable/index.go
storage/unordered
Package unordered 管理未排序 SSTable 文件(immutable memtable 集合)。
Package unordered 管理未排序 SSTable 文件(immutable memtable 集合)。
storage/wal
Package wal 实现 Write-Ahead Log。
Package wal 实现 Write-Ahead Log。
tests
e2e/compaction_test command
tests/e2e/compaction_test/main.go
tests/e2e/compaction_test/main.go
e2e/compression_test command
tests/e2e/compression_test/main.go 端到端压缩测试:验证 none/snappy/lz4 压缩算法的写入、查询、恢复、压缩率
tests/e2e/compression_test/main.go 端到端压缩测试:验证 none/snappy/lz4 压缩算法的写入、查询、恢复、压缩率
e2e/diff_compact_disk_usage command
tests/e2e/diff_compact_disk_usage/main.go
tests/e2e/diff_compact_disk_usage/main.go
e2e/downsample_test command
tests/e2e/downsample_test/main.go
tests/e2e/downsample_test/main.go
e2e/grpc_write_query command
tests/e2e/grpc_write_query/main.go gRPC 端到端测试:写入 10K 条数据并通过查询 API 验证数据完整性
tests/e2e/grpc_write_query/main.go gRPC 端到端测试:写入 10K 条数据并通过查询 API 验证数据完整性
e2e/integrity command
tests/e2e/integrity/main.go
tests/e2e/integrity/main.go
e2e/persistence_test command
tests/e2e/persistence_test/main.go
tests/e2e/persistence_test/main.go
e2e/pkg/data_gen
tests/e2e/pkg/data_gen/data_gen.go
tests/e2e/pkg/data_gen/data_gen.go
e2e/pkg/framework
tests/e2e/pkg/framework/framework.go
tests/e2e/pkg/framework/framework.go
e2e/pkg/metrics
tests/e2e/pkg/metrics/metrics.go
tests/e2e/pkg/metrics/metrics.go
e2e/query_100k command
tests/e2e/query_100k/main.go 查询端测用例:100K 数据写入 → 多次刷盘 → Compaction 合并 → 分页查询延迟/内存分析
tests/e2e/query_100k/main.go 查询端测用例:100K 数据写入 → 多次刷盘 → Compaction 合并 → 分页查询延迟/内存分析
e2e/query_10k command
tests/e2e/query_10k/main.go 查询端测用例:10K 数据写入 → 多次刷盘 → Compaction 合并 → 查询延迟/内存分析
tests/e2e/query_10k/main.go 查询端测用例:10K 数据写入 → 多次刷盘 → Compaction 合并 → 查询延迟/内存分析
e2e/query_1k command
tests/e2e/query_1k/main.go 查询端测用例:1K 数据写入 → 多次刷盘 → Compaction 合并 → 查询延迟/内存分析
tests/e2e/query_1k/main.go 查询端测用例:1K 数据写入 → 多次刷盘 → Compaction 合并 → 查询延迟/内存分析
e2e/query_1m command
tests/e2e/query_1m/main.go 查询端测用例:1M 数据写入 → 多次刷盘 → Compaction 合并 → 分页查询延迟/内存分析
tests/e2e/query_1m/main.go 查询端测用例:1M 数据写入 → 多次刷盘 → Compaction 合并 → 分页查询延迟/内存分析
e2e/query_op_benchmark command
tests/e2e/query_op_benchmark/main.go
tests/e2e/query_op_benchmark/main.go
e2e/restart_recovery command
tests/e2e/restart_recovery/main.go
tests/e2e/restart_recovery/main.go
e2e/retention_test command
tests/e2e/retention_test/main.go
tests/e2e/retention_test/main.go
e2e/simple_integrity command
tests/e2e/simple_integrity/main.go
tests/e2e/simple_integrity/main.go
e2e/wal_compression_test command
tests/e2e/wal_compression_test/main.go
tests/e2e/wal_compression_test/main.go
e2e/wal_test command
tests/e2e/wal_test/main.go
tests/e2e/wal_test/main.go
e2e/write_100k command
tests/e2e/write_100k/main.go
tests/e2e/write_100k/main.go
e2e/write_100k_pprof command
tests/e2e/write_100k_pprof/main.go 写入 100K 数据点并生成 pprof CPU/heap profile 用于性能分析。
tests/e2e/write_100k_pprof/main.go 写入 100K 数据点并生成 pprof CPU/heap profile 用于性能分析。
e2e/write_10k command
tests/e2e/write_10k/main.go
tests/e2e/write_10k/main.go
e2e/write_10m_pprof command
tests/e2e/write_10m_pprof/main.go
tests/e2e/write_10m_pprof/main.go
e2e/write_1k command
tests/e2e/write_1k/main.go
tests/e2e/write_1k/main.go
e2e/write_1m command
tests/e2e/write_1m/main.go
tests/e2e/write_1m/main.go
e2e/write_1m_pprof command
tests/e2e/write_1m_pprof/main.go
tests/e2e/write_1m_pprof/main.go
e2e/write_20k_debug command
tests/e2e/write_20k_debug/main.go
tests/e2e/write_20k_debug/main.go
e2e/write_and_compact command
tests/e2e/write_and_compact/main.go
tests/e2e/write_and_compact/main.go
Package types 提供所有数据类型的定义和转换工具。
Package types 提供所有数据类型的定义和转换工具。

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL