Documentation
¶
Overview ¶
Package mts 提供高性能时序数据库功能。
mts(micro time-series)是一个专为高性能时间序列数据存储设计的数据库。 它支持高效的点写入、批量写入和范围查询操作。
基本使用示例:
db, err := mts.Open(mts.Config{
DataDir: "/var/lib/mts",
ShardDuration: 7 * 24 * time.Hour,
})
if err != nil {
log.Fatal(err)
}
defer db.Close()
// 写入数据点
point := &mts.Point{
Database: "metrics",
Measurement: "cpu",
Tags: map[string]string{"host": "server1"},
Timestamp: time.Now().UnixNano(),
Fields: map[string]any{"usage": 45.2},
}
if err := db.Write(ctx, point); err != nil {
log.Fatal(err)
}
Index ¶
- Constants
- func DefaultMemTableConfig() *types.MemTableConfig
- type CompactionConfig
- type Config
- type DB
- func (db *DB) Close() error
- func (db *DB) CreateDatabase(ctx context.Context, database string, retention time.Duration, ...) error
- func (db *DB) CreateEmptyMeasurement(database, measurement string) error
- func (db *DB) CreateMeasurement(ctx context.Context, database, measurement string) error
- func (db *DB) DataDir() string
- func (db *DB) DropDatabase(ctx context.Context, database string) error
- func (db *DB) DropMeasurement(ctx context.Context, database, measurement string) error
- func (db *DB) Execute(ctx context.Context, plan *types.QueryPlan) (*query.RowIterator, error)
- func (db *DB) FlushAll() error
- func (db *DB) ForceDownsample()
- func (db *DB) Iterator(ctx context.Context, req *types.QueryRangeRequest) (*query.Iterator, error)
- func (db *DB) ListDatabases(ctx context.Context) ([]string, error)
- func (db *DB) ListMeasurements(ctx context.Context, database string) ([]string, error)
- func (db *DB) QueryRange(ctx context.Context, req *types.QueryRangeRequest) (*types.QueryRangeResponse, error)
- func (db *DB) Write(ctx context.Context, point *types.Point) error
- func (db *DB) WriteBatch(ctx context.Context, points []*types.Point) error
- type MemTableConfig
- type Point
- type PointRow
- type QueryRangeRequest
- type QueryRangeResponse
Constants ¶
const ( CompressionNone = types.CompressionNone CompressionSnappy = types.CompressionSnappy CompressionLZ4 = types.CompressionLZ4 )
压缩算法常量
Variables ¶
This section is empty.
Functions ¶
func DefaultMemTableConfig ¶
func DefaultMemTableConfig() *types.MemTableConfig
DefaultMemTableConfig 返回默认的 MemTable 配置。
默认配置:
- FlushMemorySize: 64MB,内存表最大内存占用
- FlushPointCount: 10000,最大条目数
- FlushIdle: 1分钟,空闲时间阈值
返回:
MemTableConfig: 包含默认值的配置结构
使用示例:
cfg := mts.Config{
DataDir: "/data",
MemTableCfg: mts.DefaultMemTableConfig(),
}
Types ¶
type CompactionConfig ¶
type CompactionConfig = types.CompactionConfig
CompactionConfig 配置 Compaction 行为。
控制 Compaction 的触发策略、批处理大小和资源限制。
使用示例:
cfg := mts.CompactionConfig{
MaxSSTableCount: 4,
CheckInterval: 1 * time.Hour,
}
func DefaultCompactionConfig ¶
func DefaultCompactionConfig() *CompactionConfig
DefaultCompactionConfig 返回默认的 Compaction 配置。
type Config ¶
Config 是数据库的配置选项。
DataDir 指定数据存储目录,必须可写。 ShardDuration 定义每个 shard 的时间窗口,最小 1 小时,默认 7 天。 MemTableCfg 配置内存表行为,使用 nil 时将采用 DefaultMemTableConfig() 的默认值。 CompactionCfg 配置 compaction 行为,使用 nil 时将采用默认配置。 RetentionPeriod 配置数据保留期,0 表示不自动删除过期数据。 RetentionCheckInterval 配置 retention 检查间隔,默认 1 小时。
type DB ¶
type DB struct {
// contains filtered or unexported fields
}
DB 是微时序数据库的主结构。
提供完整的时序数据存储和查询功能。 DB 实例是并发安全的,可以从多个 goroutine 同时访问。
使用 Open 函数创建 DB 实例,使用 Close 方法关闭。 关闭后不可再使用,否则会导致未定义行为。
func Open ¶
Open 打开或创建一个时序数据库实例。
参数:
- cfg: 数据库配置,包含数据目录、分片时长和 MemTable 配置
返回:
- *DB: 数据库实例
- error: 如果目录创建失败或引擎初始化失败则返回错误
配置文件默认值:
- ShardDuration: 默认为 7 天
- MemTableCfg: 使用 DefaultMemTableConfig()
使用示例:
db, err := mts.Open(&mts.Config{
DataDir: "/var/lib/mts",
ShardDuration: 7 * 24 * time.Hour,
})
if err != nil {
log.Fatal(err)
}
defer db.Close()
注意:必须调用 Close 释放资源。
func (*DB) Close ¶
Close 关闭数据库,释放所有资源。
返回:
- error: 关闭失败时返回错误
注意:
关闭前会尝试将所有 MemTable 数据刷写到 SSTable。
关闭后 DB 实例不可再使用。
多个 Close() 调用是安全的,但只有第一个会生效。
建议配合 defer 使用:
db, err := mts.Open(cfg)
if err != nil {
log.Fatal(err)
}
defer db.Close()
func (*DB) CreateDatabase ¶
func (db *DB) CreateDatabase(ctx context.Context, database string, retention time.Duration, downsample *types.DownsampleConfig) error
CreateDatabase 创建一个新的数据库。
参数:
- ctx: 上下文
- database: 数据库名称
返回:
- error: 创建失败时返回错误(当前不会返回错误)
说明:
创建 DatabaseMetaStore 并注册到 Engine。 实际的数据目录在第一次写入时才会创建。 如果数据库已存在,不会返回错误。
使用示例:
if err := db.CreateDatabase(ctx, "new_metrics"); err != nil {
log.Fatal(err)
}
func (*DB) CreateEmptyMeasurement ¶
CreateEmptyMeasurement 创建一个空目录结构的 Measurement(用于兼容旧 API)。 此函数在首次写入数据时自动调用,通常不需要手动调用。
参数:
- database: 数据库名称
- measurement: Measurement 名称
返回:
- error: 创建失败时返回错误
func (*DB) CreateMeasurement ¶
CreateMeasurement 在指定数据库中创建一个新的 Measurement。
参数:
- ctx: 上下文
- database: 数据库名称
- measurement: Measurement 名称
返回:
- error: 创建失败时返回错误
说明:
创建 DatabaseMetaStore(如果不存在)并创建 MeasurementMetaStore。 实际的数据目录在第一次写入时才会创建。 如果 measurement 已存在,不会返回错误。
使用示例:
if err := db.CreateMeasurement(ctx, "metrics", "cpu_usage"); err != nil {
log.Fatal(err)
}
func (*DB) DropDatabase ¶
DropDatabase 删除指定的数据库。
参数:
- ctx: 上下文
- database: 数据库名称
返回:
- error: 删除失败时返回错误
警告:
此操作会永久删除该数据库的所有元数据(包括所有 Measurement)。 磁盘上的数据文件不会被立即删除,需要单独清理。 如果数据库不存在,返回错误。
使用示例:
if err := db.DropDatabase(ctx, "old_metrics"); err != nil {
log.Fatal(err)
}
func (*DB) DropMeasurement ¶
DropMeasurement 删除指定的 Measurement。
参数:
- ctx: 上下文
- database: 数据库名称
- measurement: Measurement 名称
返回:
- error: 删除失败时返回错误
警告:
此操作会永久删除该 Measurement 的元数据(Schema、Series、Tag 索引)。 磁盘上的数据文件不会被立即删除,需要单独清理。 如果数据库或 Measurement 不存在,返回错误。
使用示例:
if err := db.DropMeasurement(ctx, "metrics", "old_metric"); err != nil {
log.Fatal(err)
}
func (*DB) Execute ¶
Execute 执行查询计划,通过算子 Pipeline 执行 Filter、GroupBy、Aggregate、Sort、Project 等操作。
参数:
- ctx: 上下文
- plan: 查询计划,由 query.Builder 构建
返回:
- *query.RowIterator: 算子 Pipeline 迭代器
- error: 执行失败时返回错误
使用示例:
plan := query.NewBuilder().
Select("avg(cpu)", "host").
From("metrics", "cpu").
TimeRange(start, end).
GroupBy("host").
Build()
iter, err := db.Execute(ctx, plan)
if err != nil {
log.Fatal(err)
}
defer iter.Close()
for iter.Next(ctx) {
row := iter.Points()
process(row)
}
func (*DB) FlushAll ¶
FlushAll 刷新所有 Shard 的 MemTable 到 SSTable。
返回:
- error: 如果任一 Shard 刷盘失败则返回错误
说明:
遍历所有已创建的 Shard,调用其 Flush() 方法。 用于确保所有内存数据持久化,或在关闭前确保数据安全。
func (*DB) Iterator ¶
Iterator 创建流式查询迭代器,用于处理大量数据而不占用大量内存。
流式迭代器只在需要时加载数据,适合处理海量数据查询。
参数:
- ctx: 上下文,可用于取消迭代
- req: 查询请求,支持时间范围、字段过滤和标签过滤
返回:
- *query.Iterator: 查询迭代器,使用完后必须调用 Close()
- error: 创建失败时返回错误
使用示例:
it, err := db.Iterator(ctx, &mts.QueryRangeRequest{
Database: "metrics",
Measurement: "cpu",
StartTime: start.UnixNano(),
EndTime: end.UnixNano(),
})
if err != nil {
log.Fatal(err)
}
defer it.Close()
for it.Next(ctx) {
row := it.Points()
process(row)
}
func (*DB) ListDatabases ¶
ListDatabases 列出所有数据库名称。
参数:
- ctx: 上下文
返回:
- []string: 数据库名称列表(按字母序排序)
- error: 查询失败时返回错误(当前不会返回错误)
实现说明:
从 Engine 的 dbMetaStores 获取所有数据库名称。 这反映了当前已知的所有数据库(有元数据的)。 返回的列表按字母序排序。
使用示例:
databases, err := db.ListDatabases(ctx)
if err != nil {
log.Fatal(err)
}
for _, dbName := range databases {
fmt.Println(dbName)
}
func (*DB) ListMeasurements ¶
ListMeasurements 列出指定数据库中的所有 Measurement 名称。
参数:
- ctx: 上下文
- database: 数据库名称
返回:
- []string: Measurement 名称列表(按字母序排序)
- error: 如果数据库不存在返回错误
实现说明:
从 Engine 的 DatabaseMetaStore 获取元数据,遍历 measurement keys。 这反映了当前已知的所有 measurement(有元数据的)。 返回的列表按字母序排序。
使用示例:
measurements, err := db.ListMeasurements(ctx, "metrics")
if err != nil {
log.Fatal(err)
}
for _, m := range measurements {
fmt.Println(m)
}
func (*DB) QueryRange ¶
func (db *DB) QueryRange(ctx context.Context, req *types.QueryRangeRequest) (*types.QueryRangeResponse, error)
QueryRange 执行范围查询,返回指定时间范围内的数据点。
查询会自动合并 MemTable(内存数据)和 SSTable(磁盘数据)的结果。 数据按时间戳升序返回。
参数:
- ctx: 上下文,可用于取消查询
- req: 查询请求,包含时间范围、字段列表、标签过滤和分页参数
返回:
- *QueryRangeResponse: 包含查询结果行、总数和是否有更多数据
- error: 查询失败时返回错误
说明:
底层使用流式迭代器查询,本方法会迭代读取所有数据后返回。 如果需要处理大量数据,建议使用 Iterator 直接流式处理, 可以避免在内存中累积所有结果。
分页处理:
使用 Offset 和 Limit 进行分页。当 HasMore 为 true 时, 可以通过设置 Offset = 已返回行数 来获取下一页。
字段过滤:
如果 Fields 为空,返回所有字段。 如果指定字段,只返回这些字段的值。
使用示例:
resp, err := db.QueryRange(ctx, &mts.QueryRangeRequest{
Database: "metrics",
Measurement: "cpu",
StartTime: start.UnixNano(),
EndTime: end.UnixNano(),
Fields: []string{"usage", "temperature"},
Tags: map[string]string{"host": "server1"},
Offset: 0,
Limit: 1000,
})
if err != nil {
log.Fatal(err)
}
for _, row := range resp.Rows {
fmt.Printf("时间: %d, 字段: %v\n", row.Timestamp, row.Fields)
}
func (*DB) Write ¶
Write 写入单个数据点到数据库。
参数:
- ctx: 上下文,可用于取消操作(会透传到引擎层)
- point: 要写入的数据点,包含时间戳、标签和字段
返回:
- error: 写入失败时返回错误,错误类型包括 IO 错误和序列化错误
时间戳排序:
MemTable 会自动维护按时间戳排序的条目。 当写入乱序数据时,会触发内部排序操作。
持久化保证:
数据首先写入 WAL (Write-Ahead Log),然后写入 MemTable。 只有 WAL 写入成功后,写入才被认为是成功的。
使用示例:
point := &mts.Point{
Database: "metrics",
Measurement: "cpu",
Tags: map[string]string{"host": "server1"},
Timestamp: time.Now().UnixNano(),
Fields: map[string]any{"usage": 45.2},
}
if err := db.Write(ctx, point); err != nil {
log.Printf("写入失败: %v", err)
}
func (*DB) WriteBatch ¶
WriteBatch 批量写入多个数据点。
相比多次调用 Write,批量写入可以减少系统调用开销,提高吞吐量。 注意:WriteBatch 不保证原子性。如果某个点写入失败,已成功写入的点不会回滚, 调用方需要自行处理部分写入的情况(通过重试或丢弃受影响的数据)。
参数:
- ctx: 上下文
- points: 要写入的数据点切片
返回:
- error: 写入失败时返回错误,包含失败点的时间戳信息
性能建议:
建议每个批次包含 100-1000 个点,具体取决于数据点大小。 批次过大可能导致内存占用过高。
使用示例:
points := []*mts.Point{p1, p2, p3}
if err := db.WriteBatch(ctx, points); err != nil {
log.Printf("批量写入失败: %v", err)
}
type MemTableConfig ¶
type MemTableConfig = types.MemTableConfig
MemTableConfig 配置内存表的行为。
控制 MemTable 的大小、条目数和空闲时间,当达到任一阈值时会触发刷盘。
使用示例:
cfg := mts.MemTableConfig{
FlushMemorySize: 64 * 1024 * 1024, // 64MB
FlushPointCount: 10000,
FlushIdle: 5 * time.Minute,
}
type Point ¶
Point 是写入时序数据库的基本数据单元。
每个 Point 属于特定的 Database 和 Measurement,包含时间戳、标签集合和字段值。 标签用于标识时间序列,字段存储实际的数据值。 Timestamp 使用纳秒级 Unix 时间戳。
使用示例:
point := &mts.Point{
Database: "metrics",
Measurement: "cpu",
Tags: map[string]string{
"host": "server1",
"region": "us-east-1",
},
Timestamp: time.Now().UnixNano(),
Fields: map[string]any{
"value": 45.2,
},
}
type QueryRangeRequest ¶
type QueryRangeRequest = types.QueryRangeRequest
QueryRangeRequest 定义范围查询的请求参数。
支持按时间范围 (StartTime, EndTime)、字段列表和标签过滤器查询数据。 Offset 和 Limit 用于分页查询,Limit = 0 表示不限制。 时间戳使用纳秒级 Unix 时间戳。
使用示例:
req := &mts.QueryRangeRequest{
Database: "metrics",
Measurement: "cpu",
StartTime: startTime.UnixNano(),
EndTime: endTime.UnixNano(),
Fields: []string{"value"}, // 只返回指定字段
Tags: map[string]string{"host": "server1"}, // 过滤标签
Offset: 0,
Limit: 1000,
}
type QueryRangeResponse ¶
type QueryRangeResponse = types.QueryRangeResponse
QueryRangeResponse 是范围查询的响应。
Directories
¶
| Path | Synopsis |
|---|---|
|
cmd
|
|
|
server
command
Package main provides the micro-ts gRPC server entry point.
|
Package main provides the micro-ts gRPC server entry point. |
|
internal
|
|
|
api
Package api 实现 gRPC API 服务。
|
Package api 实现 gRPC API 服务。 |
|
api/auth
Package auth 提供 gRPC 认证拦截器。
|
Package auth 提供 gRPC 认证拦截器。 |
|
engine
Package engine 实现微时序数据库的存储引擎。
|
Package engine 实现微时序数据库的存储引擎。 |
|
metrics
Package metrics 提供 expvar 指标定义与埋点。
|
Package metrics 提供 expvar 指标定义与埋点。 |
|
query
Package query 实现查询处理和执行。
|
Package query 实现查询处理和执行。 |
|
storage
Package storage 提供存储相关的工具函数。
|
Package storage 提供存储相关的工具函数。 |
|
storage/compaction
Package compaction 实现 Level Compaction 策略。
|
Package compaction 实现 Level Compaction 策略。 |
|
storage/downsample
Package downsample 实现时序数据降采样服务。
|
Package downsample 实现时序数据降采样服务。 |
|
storage/metadata
Package metadata 提供全局元数据管理。
|
Package metadata 提供全局元数据管理。 |
|
storage/shard
Package shard 实现分片存储管理。
|
Package shard 实现分片存储管理。 |
|
storage/shard/compression
Package compression 提供数据压缩工具。
|
Package compression 提供数据压缩工具。 |
|
storage/shard/sstable
internal/storage/shard/sstable/index.go
|
internal/storage/shard/sstable/index.go |
|
storage/unordered
Package unordered 管理未排序 SSTable 文件(immutable memtable 集合)。
|
Package unordered 管理未排序 SSTable 文件(immutable memtable 集合)。 |
|
storage/wal
Package wal 实现 Write-Ahead Log。
|
Package wal 实现 Write-Ahead Log。 |
|
tests
|
|
|
e2e/check_fields
command
|
|
|
e2e/check_schema
command
|
|
|
e2e/compaction_test
command
tests/e2e/compaction_test/main.go
|
tests/e2e/compaction_test/main.go |
|
e2e/compression_test
command
tests/e2e/compression_test/main.go 端到端压缩测试:验证 none/snappy/lz4 压缩算法的写入、查询、恢复、压缩率
|
tests/e2e/compression_test/main.go 端到端压缩测试:验证 none/snappy/lz4 压缩算法的写入、查询、恢复、压缩率 |
|
e2e/diff_compact_disk_usage
command
tests/e2e/diff_compact_disk_usage/main.go
|
tests/e2e/diff_compact_disk_usage/main.go |
|
e2e/downsample_test
command
tests/e2e/downsample_test/main.go
|
tests/e2e/downsample_test/main.go |
|
e2e/grpc_write_query
command
tests/e2e/grpc_write_query/main.go gRPC 端到端测试:写入 10K 条数据并通过查询 API 验证数据完整性
|
tests/e2e/grpc_write_query/main.go gRPC 端到端测试:写入 10K 条数据并通过查询 API 验证数据完整性 |
|
e2e/integrity
command
tests/e2e/integrity/main.go
|
tests/e2e/integrity/main.go |
|
e2e/persistence_test
command
tests/e2e/persistence_test/main.go
|
tests/e2e/persistence_test/main.go |
|
e2e/pkg/data_gen
tests/e2e/pkg/data_gen/data_gen.go
|
tests/e2e/pkg/data_gen/data_gen.go |
|
e2e/pkg/framework
tests/e2e/pkg/framework/framework.go
|
tests/e2e/pkg/framework/framework.go |
|
e2e/pkg/metrics
tests/e2e/pkg/metrics/metrics.go
|
tests/e2e/pkg/metrics/metrics.go |
|
e2e/query_100k
command
tests/e2e/query_100k/main.go 查询端测用例:100K 数据写入 → 多次刷盘 → Compaction 合并 → 分页查询延迟/内存分析
|
tests/e2e/query_100k/main.go 查询端测用例:100K 数据写入 → 多次刷盘 → Compaction 合并 → 分页查询延迟/内存分析 |
|
e2e/query_10k
command
tests/e2e/query_10k/main.go 查询端测用例:10K 数据写入 → 多次刷盘 → Compaction 合并 → 查询延迟/内存分析
|
tests/e2e/query_10k/main.go 查询端测用例:10K 数据写入 → 多次刷盘 → Compaction 合并 → 查询延迟/内存分析 |
|
e2e/query_1k
command
tests/e2e/query_1k/main.go 查询端测用例:1K 数据写入 → 多次刷盘 → Compaction 合并 → 查询延迟/内存分析
|
tests/e2e/query_1k/main.go 查询端测用例:1K 数据写入 → 多次刷盘 → Compaction 合并 → 查询延迟/内存分析 |
|
e2e/query_1m
command
tests/e2e/query_1m/main.go 查询端测用例:1M 数据写入 → 多次刷盘 → Compaction 合并 → 分页查询延迟/内存分析
|
tests/e2e/query_1m/main.go 查询端测用例:1M 数据写入 → 多次刷盘 → Compaction 合并 → 分页查询延迟/内存分析 |
|
e2e/query_op_benchmark
command
tests/e2e/query_op_benchmark/main.go
|
tests/e2e/query_op_benchmark/main.go |
|
e2e/restart_recovery
command
tests/e2e/restart_recovery/main.go
|
tests/e2e/restart_recovery/main.go |
|
e2e/retention_test
command
tests/e2e/retention_test/main.go
|
tests/e2e/retention_test/main.go |
|
e2e/simple_integrity
command
tests/e2e/simple_integrity/main.go
|
tests/e2e/simple_integrity/main.go |
|
e2e/wal_compression_test
command
tests/e2e/wal_compression_test/main.go
|
tests/e2e/wal_compression_test/main.go |
|
e2e/wal_test
command
tests/e2e/wal_test/main.go
|
tests/e2e/wal_test/main.go |
|
e2e/write_100k
command
tests/e2e/write_100k/main.go
|
tests/e2e/write_100k/main.go |
|
e2e/write_100k_pprof
command
tests/e2e/write_100k_pprof/main.go 写入 100K 数据点并生成 pprof CPU/heap profile 用于性能分析。
|
tests/e2e/write_100k_pprof/main.go 写入 100K 数据点并生成 pprof CPU/heap profile 用于性能分析。 |
|
e2e/write_10k
command
tests/e2e/write_10k/main.go
|
tests/e2e/write_10k/main.go |
|
e2e/write_10m_pprof
command
tests/e2e/write_10m_pprof/main.go
|
tests/e2e/write_10m_pprof/main.go |
|
e2e/write_1k
command
tests/e2e/write_1k/main.go
|
tests/e2e/write_1k/main.go |
|
e2e/write_1m
command
tests/e2e/write_1m/main.go
|
tests/e2e/write_1m/main.go |
|
e2e/write_1m_pprof
command
tests/e2e/write_1m_pprof/main.go
|
tests/e2e/write_1m_pprof/main.go |
|
e2e/write_20k_debug
command
tests/e2e/write_20k_debug/main.go
|
tests/e2e/write_20k_debug/main.go |
|
e2e/write_and_compact
command
tests/e2e/write_and_compact/main.go
|
tests/e2e/write_and_compact/main.go |
|
Package types 提供所有数据类型的定义和转换工具。
|
Package types 提供所有数据类型的定义和转换工具。 |