Documentation
¶
Index ¶
- func BuildInsertSQL(table string, columns []string, rowsCount int) (string, error)
- func BuildSelectWithTable(table, column string) string
- func ExtractColumnsAndRows(slice []any) ([]string, [][]any, error)
- func ExtractColumnsAndValues(entity any) ([]string, []any, error)
- func QuoteIdentifier(id string) string
- type Client
- func (c *Client) BatchInsert(ctx context.Context, table string, columns []string, rows [][]any) (sql.Result, error)
- func (c *Client) BatchInsertProto(ctx context.Context, table string, protoArr []any) (sql.Result, error)
- func (c *Client) BatchInsertStruct(ctx context.Context, table string, structArr []any) (sql.Result, error)
- func (c *Client) BeginTx(ctx context.Context, opts *sql.TxOptions) (*sqlx.Tx, error)
- func (c *Client) BeginTxWithSession(ctx context.Context, vars map[string]string, opts *sql.TxOptions) (*TxWithConn, error)
- func (c *Client) Close() error
- func (c *Client) DB() *sqlx.DB
- func (c *Client) Exec(query string, args ...any) (sql.Result, error)
- func (c *Client) ExecContext(ctx context.Context, query string, args ...any) (sql.Result, error)
- func (c *Client) Get(dest any, query string, args ...any) error
- func (c *Client) GetContext(ctx context.Context, dest any, query string, args ...any) error
- func (c *Client) Insert(ctx context.Context, table string, entity any) error
- func (c *Client) Query(ctx context.Context, creator func() any, results *[]any, query string, ...) error
- func (c *Client) RunInTx(ctx context.Context, opts *sql.TxOptions, fn func(tx *sqlx.Tx) error) error
- func (c *Client) RunWithSession(ctx context.Context, vars map[string]string, ...) error
- func (c *Client) Select(dest any, query string, args ...any) error
- func (c *Client) SelectContext(ctx context.Context, dest any, query string, args ...any) error
- func (c *Client) SetSQLMode(ctx context.Context, mode string) error
- func (c *Client) SetSession(ctx context.Context, stmt string, args ...any) error
- func (c *Client) SetSessionVars(ctx context.Context, vars map[string]string) error
- func (c *Client) StreamLoad(ctx context.Context, db, table string, params map[string]string, ...) ([]byte, int, error)
- func (c *Client) WithSessionConn(ctx context.Context, sessionStmts []string, ...) error
- func (c *Client) WithTx(ctx context.Context, opts *sql.TxOptions, fn func(tx *sqlx.Tx) error) error
- func (c *Client) WithTxWithSession(ctx context.Context, vars map[string]string, opts *sql.TxOptions, ...) error
- type Option
- func WithConnMaxLifetime(d time.Duration) Option
- func WithDB(db *sqlx.DB) Option
- func WithDSN(dsn string) Option
- func WithHTTPClient(hc *http.Client) Option
- func WithLogger(l *log.Helper) Option
- func WithMaxIdleConns(n int) Option
- func WithMaxOpenConns(n int) Option
- func WithStreamLoadAuth(username, password string) Option
- func WithStreamLoadEndpoint(endpoint string) Option
- func WithStreamLoadMethod(method string) Option
- func WithStreamLoadTimeout(d time.Duration) Option
- type PagingResult
- type Repository
- func (r *Repository[DTO, ENTITY]) BatchCreate(ctx context.Context, dtos []*DTO, viewMask *fieldmaskpb.FieldMask) ([]*DTO, error)
- func (r *Repository[DTO, ENTITY]) BatchInsert(ctx context.Context, data any) error
- func (r *Repository[DTO, ENTITY]) BatchInsertStruct(ctx context.Context, data interface{}) error
- func (r *Repository[DTO, ENTITY]) Count(ctx context.Context, baseWhere string, whereArgs ...any) (uint64, error)
- func (r *Repository[DTO, ENTITY]) Create(ctx context.Context, dto *DTO, viewMask *fieldmaskpb.FieldMask) (*DTO, error)
- func (r *Repository[DTO, ENTITY]) CreateX(ctx context.Context, dto *DTO, viewMask *fieldmaskpb.FieldMask) (int64, error)
- func (r *Repository[DTO, ENTITY]) Delete(ctx context.Context, qb *query.Builder, notSoftDelete bool) (int64, error)
- func (r *Repository[DTO, ENTITY]) Exists(ctx context.Context, baseWhere string, whereArgs ...any) (bool, error)
- func (r *Repository[DTO, ENTITY]) Get(ctx context.Context, qb *query.Builder, viewMask *fieldmaskpb.FieldMask) (*DTO, error)
- func (r *Repository[DTO, ENTITY]) Insert(ctx context.Context, data any) error
- func (r *Repository[DTO, ENTITY]) ListWithPagination(ctx context.Context, req *paginationV1.PaginationRequest) (*PagingResult[DTO], error)
- func (r *Repository[DTO, ENTITY]) ListWithPaging(ctx context.Context, req *paginationV1.PagingRequest) (*PagingResult[DTO], error)
- func (r *Repository[DTO, ENTITY]) Only(ctx context.Context, qb *query.Builder, viewMask *fieldmaskpb.FieldMask) (*DTO, error)
- func (r *Repository[DTO, ENTITY]) SoftDelete(ctx context.Context, qb *query.Builder) (int64, error)
- func (r *Repository[DTO, ENTITY]) Update(ctx context.Context, dto *DTO, updateMask *fieldmaskpb.FieldMask) (*DTO, error)
- func (r *Repository[DTO, ENTITY]) UpdateX(ctx context.Context, dto *DTO, updateMask *fieldmaskpb.FieldMask) (int64, error)
- func (r *Repository[DTO, ENTITY]) Upsert(ctx context.Context, dto *DTO, updateMask *fieldmaskpb.FieldMask) (*DTO, error)
- type TxWithConn
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func BuildInsertSQL ¶
BuildInsertSQL builds a parameterized bulk-insert SQL for given table, columns and rowsCount. Returns SQL like: INSERT INTO "table" ("c1","c2") VALUES (?,?),(?,?) ...
func BuildSelectWithTable ¶
BuildSelectWithTable builds `table`.`column` style select expression. table and column are quoted safely.
func ExtractColumnsAndRows ¶
ExtractColumnsAndRows extracts columns and rows from a slice of struct/proto. 只解析 db tag(如无则用字段名),并对 map 类型字段序列化为 json 字符串。 支持 db:"col,readonly",readonly 字段只读(可 select,不可 insert/update)。
func ExtractColumnsAndValues ¶
ExtractColumnsAndValues extracts columns and values from a struct entity. 支持 db:"col,readonly",readonly 字段只读(可 select,不可 insert/update)。 只返回非 readonly 字段。
func QuoteIdentifier ¶
QuoteIdentifier safely quotes an identifier for SQL (table or column). It wraps each part separated by '.' with double quotes and escapes any existing '"'.
Types ¶
type Client ¶
type Client struct {
// contains filtered or unexported fields
}
func (*Client) BatchInsert ¶
func (c *Client) BatchInsert(ctx context.Context, table string, columns []string, rows [][]any) (sql.Result, error)
BatchInsert inserts multiple rows into table. Each row must match columns length.
func (*Client) BatchInsertProto ¶
func (c *Client) BatchInsertProto(ctx context.Context, table string, protoArr []any) (sql.Result, error)
BatchInsertProto inserts a slice of proto.Message into the specified table.
func (*Client) BatchInsertStruct ¶
func (c *Client) BatchInsertStruct(ctx context.Context, table string, structArr []any) (sql.Result, error)
BatchInsertStruct inserts a slice of struct into the specified table.
func (*Client) BeginTx ¶
BeginTx starts a new transaction with given options and returns a *sqlx.Tx. Caller should Commit or Rollback the returned transaction.
func (*Client) BeginTxWithSession ¶
func (c *Client) BeginTxWithSession(ctx context.Context, vars map[string]string, opts *sql.TxOptions) (*TxWithConn, error)
BeginTxWithSession sets given session variables on a dedicated connection and begins a transaction on that same connection, returning a TxWithConn which must be Commit()ed or Rollback()ed.
func (*Client) ExecContext ¶
ExecContext executes a query with context
func (*Client) GetContext ¶
GetContext fetches one row into dest with context
func (*Client) Query ¶
func (c *Client) Query(ctx context.Context, creator func() any, results *[]any, query string, args ...any) error
Query 执行查询并返回结果
func (*Client) RunInTx ¶
func (c *Client) RunInTx(ctx context.Context, opts *sql.TxOptions, fn func(tx *sqlx.Tx) error) error
RunInTx is an alias for WithTx
func (*Client) RunWithSession ¶
func (c *Client) RunWithSession(ctx context.Context, vars map[string]string, fn func(ctx context.Context, conn *sql.Conn) error) error
RunWithSession sets given session variables on a single connection and runs fn on that connection. This guarantees the session variables are visible to operations inside fn.
func (*Client) SelectContext ¶
SelectContext fetches multiple rows into dest with context
func (*Client) SetSQLMode ¶
SetSQLMode sets the session sql_mode. Warning: this affects only the connection that executes the statement. Use WithSessionConn to ensure subsequent operations run on the same physical connection.
func (*Client) SetSession ¶
SetSession executes a session-level SET statement. Note: when executed on the DB pool, the statement runs on a single connection from the pool; subsequent queries may use different connections and therefore may not see the session variable. For a guaranteed per-connection session, use WithSessionConn.
func (*Client) SetSessionVars ¶
SetSessionVars sets multiple session variables on the server. Each value is formatted safely: numeric values (with optional unit K/M/G) are used as-is; others are quoted. Note: when executed via the connection pool, SET statements may apply to different connections; use RunWithSession to guarantee that subsequent operations run on the same connection.
func (*Client) StreamLoad ¶
func (c *Client) StreamLoad(ctx context.Context, db, table string, params map[string]string, data io.Reader) ([]byte, int, error)
StreamLoad uploads data to Doris using the Fe Stream Load API. - db: target database - table: target table - params: additional query params or stream load properties (e.g. columns, format) - data: io.Reader containing the payload (CSV/TSV/JSON as required) Returns the response body and the status code.
func (*Client) WithSessionConn ¶
func (c *Client) WithSessionConn(ctx context.Context, sessionStmts []string, fn func(ctx context.Context, conn *sql.Conn) error) error
WithSessionConn acquires a dedicated underlying *sql.Conn, executes the provided session statements (e.g. SET SESSION ...) on that connection, then runs fn using the same connection. This guarantees the session variables apply for the duration of fn. The provided fn receives the *sql.Conn and should perform operations using database/sql APIs (QueryContext/ExecContext) on that conn. The conn is closed after fn returns.
func (*Client) WithTx ¶
WithTx runs fn inside a transaction. It will commit if fn returns nil, otherwise rollback. It also rollbacks on panic and re-panics after rollback.
func (*Client) WithTxWithSession ¶
func (c *Client) WithTxWithSession(ctx context.Context, vars map[string]string, opts *sql.TxOptions, fn func(tx *sql.Tx) error) error
WithTxWithSession is a convenience wrapper to run a function inside a transaction that has session variables applied on the same underlying connection.
type Option ¶
type Option func(o *Client)
func WithConnMaxLifetime ¶
WithConnMaxLifetime sets maximum connection lifetime.
func WithHTTPClient ¶
WithHTTPClient injects an existing http.Client to use for stream load requests
func WithLogger ¶
WithLogger attaches a kratos log helper to the client.
func WithMaxIdleConns ¶
WithMaxIdleConns sets maximum idle connections.
func WithMaxOpenConns ¶
WithMaxOpenConns sets maximum open connections.
func WithStreamLoadAuth ¶
WithStreamLoadAuth sets basic auth credentials for stream load (username, password)
func WithStreamLoadEndpoint ¶
WithStreamLoadEndpoint sets the FE host (including scheme and port) for stream load, e.g. http://fe-host:8030
func WithStreamLoadMethod ¶
WithStreamLoadMethod sets HTTP method for stream load ("POST" or "PUT")
func WithStreamLoadTimeout ¶
WithStreamLoadTimeout sets timeout for stream load HTTP requests
type PagingResult ¶
PagingResult 是通用的分页返回结构,包含 items 和 total 字段
type Repository ¶
Repository GORM 仓库,包含常用的 CRUD 方法
func NewRepository ¶
func NewRepository[DTO any, ENTITY any]( client *Client, mapper *mapper.CopierMapper[DTO, ENTITY], table string, log *log.Helper, ) *Repository[DTO, ENTITY]
func (*Repository[DTO, ENTITY]) BatchCreate ¶
func (r *Repository[DTO, ENTITY]) BatchCreate(ctx context.Context, dtos []*DTO, viewMask *fieldmaskpb.FieldMask) ([]*DTO, error)
BatchCreate 批量创建记录,返回创建后的 DTO 列表
func (*Repository[DTO, ENTITY]) BatchInsert ¶
func (r *Repository[DTO, ENTITY]) BatchInsert(ctx context.Context, data any) error
BatchInsert 支持 map[string]any 数组或 struct 数组,批量插入
func (*Repository[DTO, ENTITY]) BatchInsertStruct ¶
func (r *Repository[DTO, ENTITY]) BatchInsertStruct(ctx context.Context, data interface{}) error
BatchInsertStruct 支持 struct slice 入参,批量插入
func (*Repository[DTO, ENTITY]) Count ¶
func (r *Repository[DTO, ENTITY]) Count(ctx context.Context, baseWhere string, whereArgs ...any) (uint64, error)
Count 使用 Doris client 计算符合 baseWhere 的记录数 baseWhere: 可以包含 "WHERE ..." 前缀或只写条件表达式(函数会自动拼接) 示例调用: total, err := q.Count(ctx, "id = ?", id) 支持当只传入一个切片参数时自动展开: q.Count(ctx, "id IN (?)", []int{1,2,3})
func (*Repository[DTO, ENTITY]) Create ¶
func (r *Repository[DTO, ENTITY]) Create(ctx context.Context, dto *DTO, viewMask *fieldmaskpb.FieldMask) (*DTO, error)
Create 在数据库中创建一条记录,返回创建后的 DTO
func (*Repository[DTO, ENTITY]) CreateX ¶
func (r *Repository[DTO, ENTITY]) CreateX(ctx context.Context, dto *DTO, viewMask *fieldmaskpb.FieldMask) (int64, error)
CreateX 使用传入的 db 创建记录,支持 viewMask 指定插入字段,返回受影响行数
func (*Repository[DTO, ENTITY]) Delete ¶
func (r *Repository[DTO, ENTITY]) Delete(ctx context.Context, qb *query.Builder, notSoftDelete bool) (int64, error)
Delete 使用传入的 query.Builder(可包含 Where)删除记录,支持软/硬删除
func (*Repository[DTO, ENTITY]) Exists ¶
func (r *Repository[DTO, ENTITY]) Exists(ctx context.Context, baseWhere string, whereArgs ...any) (bool, error)
Exists 使用传入的 db(可包含 Where)检查是否存在记录
func (*Repository[DTO, ENTITY]) Get ¶
func (r *Repository[DTO, ENTITY]) Get(ctx context.Context, qb *query.Builder, viewMask *fieldmaskpb.FieldMask) (*DTO, error)
Get 根据查询条件获取单条记录
func (*Repository[DTO, ENTITY]) Insert ¶
func (r *Repository[DTO, ENTITY]) Insert(ctx context.Context, data any) error
Insert 支持 map[string]any 或 struct 入参,插入单条数据
func (*Repository[DTO, ENTITY]) ListWithPagination ¶
func (r *Repository[DTO, ENTITY]) ListWithPagination(ctx context.Context, req *paginationV1.PaginationRequest) (*PagingResult[DTO], error)
ListWithPagination 使用 PaginationRequest 查询列表
func (*Repository[DTO, ENTITY]) ListWithPaging ¶
func (r *Repository[DTO, ENTITY]) ListWithPaging(ctx context.Context, req *paginationV1.PagingRequest) (*PagingResult[DTO], error)
ListWithPaging 使用 PagingRequest 查询列表
func (*Repository[DTO, ENTITY]) Only ¶
func (r *Repository[DTO, ENTITY]) Only(ctx context.Context, qb *query.Builder, viewMask *fieldmaskpb.FieldMask) (*DTO, error)
Only alias
func (*Repository[DTO, ENTITY]) SoftDelete ¶
SoftDelete 对符合 whereSelectors 的记录执行软删除 whereSelectors: 应用到查询的 where scopes(按顺序)
func (*Repository[DTO, ENTITY]) Update ¶
func (r *Repository[DTO, ENTITY]) Update(ctx context.Context, dto *DTO, updateMask *fieldmaskpb.FieldMask) (*DTO, error)
Update 使用传入的 db(可包含 Where)更新记录,支持 updateMask 指定更新字段
func (*Repository[DTO, ENTITY]) UpdateX ¶
func (r *Repository[DTO, ENTITY]) UpdateX(ctx context.Context, dto *DTO, updateMask *fieldmaskpb.FieldMask) (int64, error)
UpdateX 使用传入的 db(可包含 Where)更新记录,支持 updateMask 指定更新字段,返回受影响行数
func (*Repository[DTO, ENTITY]) Upsert ¶
func (r *Repository[DTO, ENTITY]) Upsert(ctx context.Context, dto *DTO, updateMask *fieldmaskpb.FieldMask) (*DTO, error)
Upsert 使用传入的 db(可包含 Where/其他 scope)执行插入或冲突更新,支持 updateMask 指定冲突时更新的字段
type TxWithConn ¶
TxWithConn wraps sqlx.Tx along with the underlying *sql.Conn so we can ensure the connection is closed when Commit or Rollback is called.
func (*TxWithConn) Commit ¶
func (t *TxWithConn) Commit() error
Commit commits the transaction and closes the underlying connection.
func (*TxWithConn) Rollback ¶
func (t *TxWithConn) Rollback() error
Rollback rollbacks the transaction and closes the underlying connection.