doris

package module
v1.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 10, 2026 License: MIT Imports: 26 Imported by: 0

README

Apache Doris — 简介与使用说明

本文档对 Apache Doris 做一个简短介绍,并说明如何通过本仓库提供的 Go 客户端连接与使用 Doris(包括 SQL、Stream Load、session/事务与批量插入等常见场景)。

如果你只是想快速启动一个本地 Doris 集群,可以参考下面的 Docker 部署示例(已保留在文档末尾)。

什么是 Doris

Apache Doris(前身为 Apache Incubator 中的 Doris,企业名为 Palo)是一款面向实时分析(OLAP)的分布式列式数据库,主要特点包括:

  • 兼容 MySQL 协议:支持通过 MySQL 客户端或驱动连接(便于集成现有工具)。
  • 高性能分析:支持列式存储、向量化执行、MPP 调度(多副本分布式查询引擎)以提高分析查询吞吐。
  • 多种写入方式:支持批量导入、Stream Load(实时导入)、Broker Load 等多种数据接入方式。
  • 丰富的部署模式:单机快速试验、集群部署(FE/BE)及云环境部署方案。

适合用例:交互式分析、仪表盘、近实时的 OLAP 场景,例如日志/事件分析、时序分析与 BI 报表。

架构概览

  • FE(Frontend):负责元数据管理、解析 SQL、全局调度,提供对外的 SQL/HTTP 接口(例如 Stream Load 的 HTTP 接口通常暴露在 FE 上)。
  • BE(Backend):负责数据存储(列式存储)、实际的查询执行(扫描、聚合等)。

通常一个集群会有多个 FE(高可用)和多个 BE(水平扩展存储和计算)。

关键特性

  • MySQL 协议支持 -> 可用 MySQL 驱动/客户端访问。
  • Stream Load -> 通过 HTTP 将 CSV/JSON/其他格式数据实时推送到表中(常用于近实时导入)。
  • 支持会话级变量(SET SESSION ...),可以控制 query plan、内存限制、时区等执行行为。
  • 支持事务语义(有限度,主要用于小范围原子操作,注意并非全部场景下的强一致 OLTP)。

本仓库的 Doris 客户端(Go)概览

本仓库在 doris 包中实现了一个轻量级的 Doris 客户端,目标是方便在 Go 应用中:

  • 使用 database/sql / sqlx 进行 SQL 查询/批量插入。
  • 使用 HTTP 的 Stream Load 接口上传大量数据。
  • 管理 session 变量(SET SESSION ...)以及在单连接上设置 session 并执行事务(保证这些 session 对事务内的查询生效)。

主要接口(摘选):

  • NewClient(opts...) (*Client, error) — 创建客户端。
  • Exec / Get / Select / BatchInsert — 基本 SQL 操作与批量插入支持。
  • StreamLoad(ctx, db, table, params, data) — 使用 FE 的 Stream Load HTTP 接口上传数据。
  • SetSessionVars(ctx, vars) — 在连接池上执行多个 SET SESSION。
  • RunWithSession(ctx, vars, fn) — 在同一物理连接上设置 session,执行回调(保证 session 对回调内可见)。
  • BeginTxWithSession(ctx, vars, opts) (*TxWithConn, error) — 在同一连接上设置 session 并开启事务,返回需要 Commit/Rollback 的事务对象。
  • WithTxWithSession(ctx, vars, opts, fn) — 便捷方法:在同一连接上设置 session、开始事务并在回调中执行,自动 Commit/Rollback。

示例(使用本仓库实现的客户端):

// 创建 client(示例,注意替换 DSN)
cli, err := doris.NewClient(doris.WithDSN("user:pass@tcp(host:3306)/dbname"))
defer cli.Close()

// 批量插入
cols := []string{"id", "name"}
rows := [][]interface{}{{1, "alice"}, {2, "bob"}}
_, err = cli.BatchInsert(ctx, "posts", cols, rows)

// Stream Load(向 FE 的 /api/{db}/{table}/_stream_load 上传)
params := map[string]string{"columns": "id,name", "format": "csv", "label": "label_123"}
data := strings.NewReader("1,alice\n2,bob\n")
body, status, err := cli.StreamLoad(ctx, "mydb", "mytable", params, data)

// 在同一连接/事务中设置 session 并执行
vars := map[string]string{"exec_mem_limit": "4G", "time_zone": "Asia/Shanghai"}
err = cli.WithTxWithSession(ctx, vars, nil, func(tx *sql.Tx) error {
    // 在事务内执行受 session 影响的语句
    _, err := tx.ExecContext(ctx, "INSERT INTO ...")
    return err
})

注意:当你在连接池上直接执行 SET SESSION 时,SET 仅影响执行该 SET 的物理连接;如果你需要保证接下来的一系列语句(尤其是事务内)都受该设置影响,请通过 RunWithSession / BeginTxWithSession / WithTxWithSession 等 API 在同一连接上设置并执行。

常见 session 变量示例

下面是一些常见的 session 变量(可通过 SetSessionVars 或在单连接上 SET):

  • SET enable_profile = true; — 开启查询分析。
  • SET sql_select_limit = 10000; — 限制默认返回的最大行数。
  • SET exec_mem_limit = 4G; — 限制单次查询使用的内存(可带单位)。
  • SET time_zone = 'Asia/Shanghai'; — 设置时区(字符串需要引号)。

在我们的客户端中,SetSessionVars 会对变量名做白名单校验,并对值进行安全格式化:

  • 布尔值 true/false 不会加引号。
  • 类似 4G 的数值(带单位)会按原样使用。
  • 普通字符串会被单引号包裹并对内部单引号进行转义。

运维与性能建议

  • 对大规模导入推荐使用 Stream Load 或 Broker Load,Stream Load 适合近实时小批量导入。
  • 合理设置 exec_mem_limitsql_select_limit 避免单查询占用过多资源。
  • 使用多个 BE 节点分担存储与查询任务以获得更好扩展性。

Docker 快速部署

下面给出快速启动 FE/BE 的示例(同仓库原始内容):

Docker run
docker pull apache/doris:be-4.0.4
docker pull apache/doris:fe-4.0.4

docker network create --driver bridge --subnet=172.20.80.0/24 doris-network

mkdir -p /data/fe/{doris-meta,conf,log}
mkdir -p /data/be/{storage,conf,log}
chmod -R 777 /data/fe /data/be

docker run -itd \
    --name=doris-fe \
    --env FE_SERVERS="fe1:172.20.80.2:9010" \
    --env FE_ID=1 \
    -p 8030:8030 \
    -p 9030:9030 \
    -p 9010:9010 \
    -v /data/fe/doris-meta:/opt/apache-doris/fe/doris-meta \
    -v /data/fe/conf:/opt/apache-doris/fe/conf \
    -v /data/fe/log:/opt/apache-doris/fe/log \
    --network=doris-network \
    --ip=172.20.80.2 \
    apache/doris:fe-4.0.4

docker run -itd \
    --name=doris-be \
    --env FE_SERVERS="fe1:172.20.80.2:9010" \
    --env BE_ADDR="172.20.80.3:9050" \
    -p 8040:8040 \
    -p 9050:9050 \
    -v /data/be/storage:/opt/apache-doris/be/storage \
    -v /data/be/conf:/opt/apache-doris/be/conf \
    -v /data/be/log:/opt/apache-doris/be/log \
    --network=doris-network \
    --ip=172.20.80.3 \
    apache/doris:be-4.0.4
Docker compose
version: "3"
networks:
  custom_network:
    driver: bridge
    ipam:
      config:
        - subnet: 172.20.80.0/24

services:
  fe:
    image: apache/doris:fe-${DORIS_QUICK_START_VERSION}
    hostname: fe
    ports:
      - 8030:8030
      - 9030:9030
      - 9010:9010
    environment:
      - FE_SERVERS=fe1:172.20.80.2:9010
      - FE_ID=1
    networks:
      custom_network:
        ipv4_address: 172.20.80.2

  be:
    image: apache/doris:be-${DORIS_QUICK_START_VERSION}
    hostname: be
    ports:
      - 8040:8040
      - 9050:9050
    environment:
      - FE_SERVERS=fe1:172.20.80.2:9010
      - BE_ADDR=172.20.80.3:9050
    depends_on:
      - fe
    networks:
      custom_network:
        ipv4_address: 172.20.80.3

参考链接


以上为增强后的 doris 模块说明和快速使用指南;如果你希望我把 README 中的 Go 示例改为更完整的示例程序(包含完整 imports 和运行步骤),或加入 Stream Load 的示例 HTTP 请求/返回解析示例,我可以继续补充。

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func BuildInsertSQL

func BuildInsertSQL(table string, columns []string, rowsCount int) (string, error)

BuildInsertSQL builds a parameterized bulk-insert SQL for given table, columns and rowsCount. Returns SQL like: INSERT INTO "table" ("c1","c2") VALUES (?,?),(?,?) ...

func BuildSelectWithTable

func BuildSelectWithTable(table, column string) string

BuildSelectWithTable builds `table`.`column` style select expression. table and column are quoted safely.

func ExtractColumnsAndRows

func ExtractColumnsAndRows(slice []any) ([]string, [][]any, error)

ExtractColumnsAndRows extracts columns and rows from a slice of struct/proto. 只解析 db tag(如无则用字段名),并对 map 类型字段序列化为 json 字符串。 支持 db:"col,readonly",readonly 字段只读(可 select,不可 insert/update)。

func ExtractColumnsAndValues

func ExtractColumnsAndValues(entity any) ([]string, []any, error)

ExtractColumnsAndValues extracts columns and values from a struct entity. 支持 db:"col,readonly",readonly 字段只读(可 select,不可 insert/update)。 只返回非 readonly 字段。

func QuoteIdentifier

func QuoteIdentifier(id string) string

QuoteIdentifier safely quotes an identifier for SQL (table or column). It wraps each part separated by '.' with double quotes and escapes any existing '"'.

Types

type Client

type Client struct {
	// contains filtered or unexported fields
}

func NewClient

func NewClient(opts ...Option) (*Client, error)

func (*Client) BatchInsert

func (c *Client) BatchInsert(ctx context.Context, table string, columns []string, rows [][]any) (sql.Result, error)

BatchInsert inserts multiple rows into table. Each row must match columns length.

func (*Client) BatchInsertProto

func (c *Client) BatchInsertProto(ctx context.Context, table string, protoArr []any) (sql.Result, error)

BatchInsertProto inserts a slice of proto.Message into the specified table.

func (*Client) BatchInsertStruct

func (c *Client) BatchInsertStruct(ctx context.Context, table string, structArr []any) (sql.Result, error)

BatchInsertStruct inserts a slice of struct into the specified table.

func (*Client) BeginTx

func (c *Client) BeginTx(ctx context.Context, opts *sql.TxOptions) (*sqlx.Tx, error)

BeginTx starts a new transaction with given options and returns a *sqlx.Tx. Caller should Commit or Rollback the returned transaction.

func (*Client) BeginTxWithSession

func (c *Client) BeginTxWithSession(ctx context.Context, vars map[string]string, opts *sql.TxOptions) (*TxWithConn, error)

BeginTxWithSession sets given session variables on a dedicated connection and begins a transaction on that same connection, returning a TxWithConn which must be Commit()ed or Rollback()ed.

func (*Client) Close

func (c *Client) Close() error

Close closes underlying DB if present

func (*Client) DB

func (c *Client) DB() *sqlx.DB

DB returns the underlying *sqlx.DB

func (*Client) Exec

func (c *Client) Exec(query string, args ...any) (sql.Result, error)

Exec executes a query

func (*Client) ExecContext

func (c *Client) ExecContext(ctx context.Context, query string, args ...any) (sql.Result, error)

ExecContext executes a query with context

func (*Client) Get

func (c *Client) Get(dest any, query string, args ...any) error

Get fetches one row into dest

func (*Client) GetContext

func (c *Client) GetContext(ctx context.Context, dest any, query string, args ...any) error

GetContext fetches one row into dest with context

func (*Client) Insert

func (c *Client) Insert(ctx context.Context, table string, entity any) error

Insert inserts a single struct entity into the specified table.

func (*Client) Query

func (c *Client) Query(ctx context.Context, creator func() any, results *[]any, query string, args ...any) error

Query 执行查询并返回结果

func (*Client) RunInTx

func (c *Client) RunInTx(ctx context.Context, opts *sql.TxOptions, fn func(tx *sqlx.Tx) error) error

RunInTx is an alias for WithTx

func (*Client) RunWithSession

func (c *Client) RunWithSession(ctx context.Context, vars map[string]string, fn func(ctx context.Context, conn *sql.Conn) error) error

RunWithSession sets given session variables on a single connection and runs fn on that connection. This guarantees the session variables are visible to operations inside fn.

func (*Client) Select

func (c *Client) Select(dest any, query string, args ...any) error

Select fetches multiple rows into dest

func (*Client) SelectContext

func (c *Client) SelectContext(ctx context.Context, dest any, query string, args ...any) error

SelectContext fetches multiple rows into dest with context

func (*Client) SetSQLMode

func (c *Client) SetSQLMode(ctx context.Context, mode string) error

SetSQLMode sets the session sql_mode. Warning: this affects only the connection that executes the statement. Use WithSessionConn to ensure subsequent operations run on the same physical connection.

func (*Client) SetSession

func (c *Client) SetSession(ctx context.Context, stmt string, args ...any) error

SetSession executes a session-level SET statement. Note: when executed on the DB pool, the statement runs on a single connection from the pool; subsequent queries may use different connections and therefore may not see the session variable. For a guaranteed per-connection session, use WithSessionConn.

func (*Client) SetSessionVars

func (c *Client) SetSessionVars(ctx context.Context, vars map[string]string) error

SetSessionVars sets multiple session variables on the server. Each value is formatted safely: numeric values (with optional unit K/M/G) are used as-is; others are quoted. Note: when executed via the connection pool, SET statements may apply to different connections; use RunWithSession to guarantee that subsequent operations run on the same connection.

func (*Client) StreamLoad

func (c *Client) StreamLoad(ctx context.Context, db, table string, params map[string]string, data io.Reader) ([]byte, int, error)

StreamLoad uploads data to Doris using the Fe Stream Load API. - db: target database - table: target table - params: additional query params or stream load properties (e.g. columns, format) - data: io.Reader containing the payload (CSV/TSV/JSON as required) Returns the response body and the status code.

func (*Client) WithSessionConn

func (c *Client) WithSessionConn(ctx context.Context, sessionStmts []string, fn func(ctx context.Context, conn *sql.Conn) error) error

WithSessionConn acquires a dedicated underlying *sql.Conn, executes the provided session statements (e.g. SET SESSION ...) on that connection, then runs fn using the same connection. This guarantees the session variables apply for the duration of fn. The provided fn receives the *sql.Conn and should perform operations using database/sql APIs (QueryContext/ExecContext) on that conn. The conn is closed after fn returns.

func (*Client) WithTx

func (c *Client) WithTx(ctx context.Context, opts *sql.TxOptions, fn func(tx *sqlx.Tx) error) error

WithTx runs fn inside a transaction. It will commit if fn returns nil, otherwise rollback. It also rollbacks on panic and re-panics after rollback.

func (*Client) WithTxWithSession

func (c *Client) WithTxWithSession(ctx context.Context, vars map[string]string, opts *sql.TxOptions, fn func(tx *sql.Tx) error) error

WithTxWithSession is a convenience wrapper to run a function inside a transaction that has session variables applied on the same underlying connection.

type Option

type Option func(o *Client)

func WithConnMaxLifetime

func WithConnMaxLifetime(d time.Duration) Option

WithConnMaxLifetime sets maximum connection lifetime.

func WithDB

func WithDB(db *sqlx.DB) Option

WithDB injects an existing *sqlx.DB into the client (skips Open).

func WithDSN

func WithDSN(dsn string) Option

WithDSN sets the data source name used to open the DB connection.

func WithHTTPClient

func WithHTTPClient(hc *http.Client) Option

WithHTTPClient injects an existing http.Client to use for stream load requests

func WithLogger

func WithLogger(l *log.Helper) Option

WithLogger attaches a kratos log helper to the client.

func WithMaxIdleConns

func WithMaxIdleConns(n int) Option

WithMaxIdleConns sets maximum idle connections.

func WithMaxOpenConns

func WithMaxOpenConns(n int) Option

WithMaxOpenConns sets maximum open connections.

func WithStreamLoadAuth

func WithStreamLoadAuth(username, password string) Option

WithStreamLoadAuth sets basic auth credentials for stream load (username, password)

func WithStreamLoadEndpoint

func WithStreamLoadEndpoint(endpoint string) Option

WithStreamLoadEndpoint sets the FE host (including scheme and port) for stream load, e.g. http://fe-host:8030

func WithStreamLoadMethod

func WithStreamLoadMethod(method string) Option

WithStreamLoadMethod sets HTTP method for stream load ("POST" or "PUT")

func WithStreamLoadTimeout

func WithStreamLoadTimeout(d time.Duration) Option

WithStreamLoadTimeout sets timeout for stream load HTTP requests

type PagingResult

type PagingResult[E any] struct {
	Items []*E   `json:"items"`
	Total uint64 `json:"total"`
}

PagingResult 是通用的分页返回结构,包含 items 和 total 字段

type Repository

type Repository[DTO any, ENTITY any] struct {
	// contains filtered or unexported fields
}

Repository GORM 仓库,包含常用的 CRUD 方法

func NewRepository

func NewRepository[DTO any, ENTITY any](
	client *Client,
	mapper *mapper.CopierMapper[DTO, ENTITY],
	table string,
	log *log.Helper,
) *Repository[DTO, ENTITY]

func (*Repository[DTO, ENTITY]) BatchCreate

func (r *Repository[DTO, ENTITY]) BatchCreate(ctx context.Context, dtos []*DTO, viewMask *fieldmaskpb.FieldMask) ([]*DTO, error)

BatchCreate 批量创建记录,返回创建后的 DTO 列表

func (*Repository[DTO, ENTITY]) BatchInsert

func (r *Repository[DTO, ENTITY]) BatchInsert(ctx context.Context, data any) error

BatchInsert 支持 map[string]any 数组或 struct 数组,批量插入

func (*Repository[DTO, ENTITY]) BatchInsertStruct

func (r *Repository[DTO, ENTITY]) BatchInsertStruct(ctx context.Context, data interface{}) error

BatchInsertStruct 支持 struct slice 入参,批量插入

func (*Repository[DTO, ENTITY]) Count

func (r *Repository[DTO, ENTITY]) Count(ctx context.Context, baseWhere string, whereArgs ...any) (uint64, error)

Count 使用 Doris client 计算符合 baseWhere 的记录数 baseWhere: 可以包含 "WHERE ..." 前缀或只写条件表达式(函数会自动拼接) 示例调用: total, err := q.Count(ctx, "id = ?", id) 支持当只传入一个切片参数时自动展开: q.Count(ctx, "id IN (?)", []int{1,2,3})

func (*Repository[DTO, ENTITY]) Create

func (r *Repository[DTO, ENTITY]) Create(ctx context.Context, dto *DTO, viewMask *fieldmaskpb.FieldMask) (*DTO, error)

Create 在数据库中创建一条记录,返回创建后的 DTO

func (*Repository[DTO, ENTITY]) CreateX

func (r *Repository[DTO, ENTITY]) CreateX(ctx context.Context, dto *DTO, viewMask *fieldmaskpb.FieldMask) (int64, error)

CreateX 使用传入的 db 创建记录,支持 viewMask 指定插入字段,返回受影响行数

func (*Repository[DTO, ENTITY]) Delete

func (r *Repository[DTO, ENTITY]) Delete(ctx context.Context, qb *query.Builder, notSoftDelete bool) (int64, error)

Delete 使用传入的 query.Builder(可包含 Where)删除记录,支持软/硬删除

func (*Repository[DTO, ENTITY]) Exists

func (r *Repository[DTO, ENTITY]) Exists(ctx context.Context, baseWhere string, whereArgs ...any) (bool, error)

Exists 使用传入的 db(可包含 Where)检查是否存在记录

func (*Repository[DTO, ENTITY]) Get

func (r *Repository[DTO, ENTITY]) Get(ctx context.Context, qb *query.Builder, viewMask *fieldmaskpb.FieldMask) (*DTO, error)

Get 根据查询条件获取单条记录

func (*Repository[DTO, ENTITY]) Insert

func (r *Repository[DTO, ENTITY]) Insert(ctx context.Context, data any) error

Insert 支持 map[string]any 或 struct 入参,插入单条数据

func (*Repository[DTO, ENTITY]) ListWithPagination

func (r *Repository[DTO, ENTITY]) ListWithPagination(ctx context.Context, req *paginationV1.PaginationRequest) (*PagingResult[DTO], error)

ListWithPagination 使用 PaginationRequest 查询列表

func (*Repository[DTO, ENTITY]) ListWithPaging

func (r *Repository[DTO, ENTITY]) ListWithPaging(ctx context.Context, req *paginationV1.PagingRequest) (*PagingResult[DTO], error)

ListWithPaging 使用 PagingRequest 查询列表

func (*Repository[DTO, ENTITY]) Only

func (r *Repository[DTO, ENTITY]) Only(ctx context.Context, qb *query.Builder, viewMask *fieldmaskpb.FieldMask) (*DTO, error)

Only alias

func (*Repository[DTO, ENTITY]) SoftDelete

func (r *Repository[DTO, ENTITY]) SoftDelete(ctx context.Context, qb *query.Builder) (int64, error)

SoftDelete 对符合 whereSelectors 的记录执行软删除 whereSelectors: 应用到查询的 where scopes(按顺序)

func (*Repository[DTO, ENTITY]) Update

func (r *Repository[DTO, ENTITY]) Update(ctx context.Context, dto *DTO, updateMask *fieldmaskpb.FieldMask) (*DTO, error)

Update 使用传入的 db(可包含 Where)更新记录,支持 updateMask 指定更新字段

func (*Repository[DTO, ENTITY]) UpdateX

func (r *Repository[DTO, ENTITY]) UpdateX(ctx context.Context, dto *DTO, updateMask *fieldmaskpb.FieldMask) (int64, error)

UpdateX 使用传入的 db(可包含 Where)更新记录,支持 updateMask 指定更新字段,返回受影响行数

func (*Repository[DTO, ENTITY]) Upsert

func (r *Repository[DTO, ENTITY]) Upsert(ctx context.Context, dto *DTO, updateMask *fieldmaskpb.FieldMask) (*DTO, error)

Upsert 使用传入的 db(可包含 Where/其他 scope)执行插入或冲突更新,支持 updateMask 指定冲突时更新的字段

type TxWithConn

type TxWithConn struct {
	Tx *sql.Tx
	// contains filtered or unexported fields
}

TxWithConn wraps sqlx.Tx along with the underlying *sql.Conn so we can ensure the connection is closed when Commit or Rollback is called.

func (*TxWithConn) Commit

func (t *TxWithConn) Commit() error

Commit commits the transaction and closes the underlying connection.

func (*TxWithConn) Rollback

func (t *TxWithConn) Rollback() error

Rollback rollbacks the transaction and closes the underlying connection.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL