Documentation
¶
Index ¶
- Variables
- func BatchInsert[T any](ctx context.Context, c *Client, data []*T, mapper Mapper[T]) error
- func BoolToString(value *bool) string
- func BuildQuery(table string, filters map[string]interface{}, operators map[string]string, ...) (string, []interface{})
- func BuildQueryWithParams(table string, filters map[string]interface{}, operators map[string]string, ...) string
- func ConvertAnyToPointsSafe(pts []any) ([]*influxdb3.Point, error)
- func GetBoolPointTag(point *influxdb3.Point, name string) *bool
- func GetEnumPointTag[T ~int32](point *influxdb3.Point, name string, valueMap map[string]int32) *T
- func GetPointTag(point *influxdb3.Point, name string) *string
- func GetTimestampField(point *influxdb3.Point, name string) *timestamppb.Timestamp
- func GetUint32Field(point *influxdb3.Point, name string) *uint32
- func GetUint32PointTag(point *influxdb3.Point, name string) *uint32
- func GetUint64PointTag(point *influxdb3.Point, name string) *uint64
- func Insert[T any](ctx context.Context, c *Client, data *T, mapper Mapper[T]) error
- func ProtoMessageToPoint(msg proto.Message, overrides map[string]string) (*influxdb3.Point, error)
- func Query[T any](ctx context.Context, c *Client, query string, mapper Mapper[T]) ([]*T, error)
- func StructToPoint(s interface{}) (*influxdb3.Point, error)
- func Uint64ToString(value *uint64) string
- type Client
- func (c *Client) BatchInsert(ctx context.Context, points []*influxdb3.Point) error
- func (c *Client) Close()
- func (c *Client) Count(ctx context.Context, query string) (int64, error)
- func (c *Client) ExecInfluxQLQuery(ctx context.Context, query string, opts ...influxdb3.QueryOption) (*influxdb3.QueryIterator, error)
- func (c *Client) ExecSQLQuery(ctx context.Context, query string, opts ...influxdb3.QueryOption) (*influxdb3.QueryIterator, error)
- func (c *Client) Exist(ctx context.Context, query string) (bool, error)
- func (c *Client) Insert(ctx context.Context, point *influxdb3.Point) error
- func (c *Client) Query(ctx context.Context, query string) (*influxdb3.QueryIterator, error)
- func (c *Client) QueryWithParams(ctx context.Context, table string, filters map[string]interface{}, ...) (*influxdb3.QueryIterator, error)
- func (c *Client) ServerVersion() string
- func (c *Client) WritePoints(ctx context.Context, pts []any) error
- func (c *Client) WritePointsStrict(ctx context.Context, points []*influxdb3.Point) error
- type Mapper
- type Option
- func WithAuthScheme(authScheme string) Option
- func WithDatabase(database string) Option
- func WithHost(host string) Option
- func WithIdleConnectionTimeout(idleTimeout time.Duration) Option
- func WithLogger(logger log.Logger) Option
- func WithMaxIdleConnections(maxIdle int) Option
- func WithOptions(opts *influxdb3.ClientConfig) Option
- func WithOrganization(organization string) Option
- func WithQueryTimeout(timeout time.Duration) Option
- func WithTLSConfig(tlsConfig *tls.Config) Option
- func WithToken(token string) Option
- func WithWriteTimeout(timeout time.Duration) Option
- type Repository
- func (r *Repository[DTO, ENTITY]) BatchCreate(_ context.Context, dtos []*DTO) ([]*DTO, error)
- func (r *Repository[DTO, ENTITY]) Count(ctx context.Context, baseWhere string, whereArgs ...any) (int64, error)
- func (r *Repository[DTO, ENTITY]) Create(_ context.Context, dto *DTO) (*DTO, error)
- func (r *Repository[DTO, ENTITY]) Exists(ctx context.Context, baseWhere string, whereArgs ...any) (bool, error)
- func (r *Repository[DTO, ENTITY]) ListWithPagination(ctx context.Context, req *paginationV1.PaginationRequest) ([]*DTO, int64, error)
- func (r *Repository[DTO, ENTITY]) ListWithPaging(ctx context.Context, req *paginationV1.PagingRequest) ([]*DTO, int64, error)
Constants ¶
This section is empty.
Variables ¶
var ( ErrInfluxDBClientNotInitialized = errors.InternalServer("INFLUXDB_CLIENT_NOT_INITIALIZED", "client not initialized") ErrInfluxDBConnectFailed = errors.InternalServer("INFLUXDB_CONNECT_FAILED", "connect failed") ErrInfluxDBCreateDatabaseFailed = errors.InternalServer("INFLUXDB_CREATE_DATABASE_FAILED", "database create failed") ErrInfluxDBQueryFailed = errors.InternalServer("INFLUXDB_QUERY_FAILED", "query failed") ErrClientNotConnected = errors.InternalServer("INFLUXDB_CLIENT_NOT_CONNECTED", "client not connected") ErrInvalidPoint = errors.InternalServer("INFLUXDB_INVALID_POINT", "invalid point") ErrNoPointsToInsert = errors.InternalServer("INFLUXDB_NO_POINTS_TO_INSERT", "no points to insert") ErrEmptyData = errors.InternalServer("INFLUXDB_EMPTY_DATA", "empty data") ErrBatchInsertFailed = errors.InternalServer("INFLUXDB_BATCH_INSERT_FAILED", "batch insert failed") ErrInsertFailed = errors.InternalServer("INFLUXDB_INSERT_FAILED", "insert failed") )
Functions ¶
func BatchInsert ¶
BatchInsert 批量插入数据
func BoolToString ¶
func BuildQuery ¶
func BuildQueryWithParams ¶
func ConvertAnyToPointsSafe ¶
ConvertAnyToPointsSafe 将 []any 逐元素断言为 []*influxdb3.Point
func GetEnumPointTag ¶
func GetTimestampField ¶
func GetTimestampField(point *influxdb3.Point, name string) *timestamppb.Timestamp
func ProtoMessageToPoint ¶
ProtoMessageToPoint 将 protobuf message 转为 influxdb3.Point。 参数 overrides 可选:map[key]=role,key 为 protobuf 字段的 JSON 名称(例如 "deviceId"),role 为 "measurement"/"tag"/"field"/"time"。 约定识别规则(在无 overrides 时):
- 字段名为 "measurement" -> measurement
- 名称以 "_tag" 或以 "tag_" 前缀 -> tag
- 字段类型为 google.protobuf.Timestamp 或名为 "time"/"timestamp" -> time
- 其它标量 -> field
example.proto:
syntax = "proto3"; import "google/protobuf/timestamp.proto";
message SensorProto {
string measurement = 1; // 当作为 measurement 使用
string device_id = 2; // 作为 tag
string location = 3; // 作为 tag
double temperature = 4; // field
int64 battery = 5; // field
google.protobuf.Timestamp ts = 6; // time
}
Go 使用示例(将 pb 替换为实际生成包路径,例如 `github.com/you/project/pb`)
package influxdb_test
import (
"fmt" "time" "github.com/InfluxCommunity/influxdb3-go/v2/influxdb3" "google.golang.org/protobuf/types/known/timestamppb" // 替换为你的 protobuf 生成包 pb "path/to/your/generated/pb" // 假设 ProtoMessageToPoint 位于本模块的 influxdb 包 "your/module/path/influxdb"
)
func ExampleProtoMessageToPoint_basic() {
// 构造 protobuf message
msg := &pb.SensorProto{
Measurement: "sensors",
DeviceId: "dev-1",
Location: "room1",
Temperature: 23.5,
Battery: 95,
Ts: timestamppb.New(time.Now().UTC()),
}
// 不传 overrides:按约定自动分类(字段名 measurement -> measurement,ts -> time,带 tag hint 的字段 -> tag)
pt, err := influxdb.ProtoMessageToPoint(msg, nil)
if err != nil {
fmt.Println("err:", err)
return
}
// 打印示例:measurement/tags/fields/time
fmt.Println("measurement:", pt.GetMeasurement())
fmt.Println("tag device_id:", pt.GetTag("device_id"))
fmt.Println("field temperature:", pt.GetField("temperature"))
}
func ExampleProtoMessageToPoint_withOverrides() {
msg := &pb.SensorProto{
// 假设 proto 中没有 measurement 字段或你希望使用其它字段作为 measurement
DeviceId: "dev-1",
Location: "room1",
Temperature: 23.5,
Battery: 95,
Ts: timestamppb.New(time.Now().UTC()),
}
// overrides 的 key 使用 protobuf 字段的 JSON 名称(例如 proto 字段 device_id 的 json 名称通常为 "deviceId")
overrides := map[string]string{
"deviceId": "measurement", // 把 deviceId 当作 measurement
"location": "tag", // 强制 location 为 tag
"battery": "field",
"ts": "time",
"temperature": "field",
}
pt, err := influxdb.ProtoMessageToPoint(msg, overrides)
if err != nil {
fmt.Println("err:", err)
return
}
fmt.Println("measurement:", pt.GetMeasurement())
fmt.Println("tags:", pt.GetTags())
fmt.Println("fields:", pt.GetFields())
}
func StructToPoint ¶
StructToPoint 通用转换函数:将带 influx tag 的 struct 转为 influxdb3.Point 参数:s 带 tag 的 struct 实例(不能是指针)
func Uint64ToString ¶
Types ¶
type Client ¶
type Client struct {
// contains filtered or unexported fields
}
func (*Client) BatchInsert ¶
BatchInsert 批量插入数据
func (*Client) ExecInfluxQLQuery ¶
func (c *Client) ExecInfluxQLQuery(ctx context.Context, query string, opts ...influxdb3.QueryOption) (*influxdb3.QueryIterator, error)
ExecInfluxQLQuery 执行 Flux/InfluxQL 查询并返回原始迭代器
func (*Client) ExecSQLQuery ¶
func (c *Client) ExecSQLQuery(ctx context.Context, query string, opts ...influxdb3.QueryOption) (*influxdb3.QueryIterator, error)
ExecSQLQuery 执行 SQL 查询并返回原始迭代器
func (*Client) QueryWithParams ¶
func (c *Client) QueryWithParams( ctx context.Context, table string, filters map[string]interface{}, operators map[string]string, fields []string, ) (*influxdb3.QueryIterator, error)
QueryWithParams 使用参数化方式查询数据
func (*Client) ServerVersion ¶
ServerVersion 获取InfluxDB服务器版本
func (*Client) WritePoints ¶
WritePoints 将通用点集合写入 InfluxDB;仅支持传入的元素为 *influxdb3.Point
type Mapper ¶
type Mapper[T any] interface { // ToPoint 将数据转换为InfluxDB的Point格式 ToPoint(data *T) *influxdb3.Point // ToData 将InfluxDB的Point转换为原始数据 ToData(point *influxdb3.Point) *T }
Mapper 数据转换的接口
type Option ¶
type Option func(o *Client)
func WithAuthScheme ¶
func WithDatabase ¶
func WithLogger ¶
func WithMaxIdleConnections ¶
func WithOptions ¶
func WithOptions(opts *influxdb3.ClientConfig) Option
func WithOrganization ¶
func WithQueryTimeout ¶
func WithTLSConfig ¶
func WithWriteTimeout ¶
type Repository ¶
Repository MongoDB 版仓库(泛型)
func NewRepository ¶
func (*Repository[DTO, ENTITY]) BatchCreate ¶
func (r *Repository[DTO, ENTITY]) BatchCreate(_ context.Context, dtos []*DTO) ([]*DTO, error)
BatchCreate 批量插入
func (*Repository[DTO, ENTITY]) Count ¶
func (r *Repository[DTO, ENTITY]) Count(ctx context.Context, baseWhere string, whereArgs ...any) (int64, error)
Count 按给定 builder 中的 filter 统计数量
func (*Repository[DTO, ENTITY]) Create ¶
func (r *Repository[DTO, ENTITY]) Create(_ context.Context, dto *DTO) (*DTO, error)
Create 插入一条记录
func (*Repository[DTO, ENTITY]) Exists ¶
func (r *Repository[DTO, ENTITY]) Exists(ctx context.Context, baseWhere string, whereArgs ...any) (bool, error)
Exists 检查是否存在符合条件的记录
func (*Repository[DTO, ENTITY]) ListWithPagination ¶
func (r *Repository[DTO, ENTITY]) ListWithPagination(ctx context.Context, req *paginationV1.PaginationRequest) ([]*DTO, int64, error)
ListWithPagination 针对 paginationV1.PaginationRequest 的列表查询
func (*Repository[DTO, ENTITY]) ListWithPaging ¶
func (r *Repository[DTO, ENTITY]) ListWithPaging(ctx context.Context, req *paginationV1.PagingRequest) ([]*DTO, int64, error)
ListWithPaging 针对 paginationV1.PagingRequest 的列表查询(兼容 Query/OrQuery/FilterExpr)