clickhouse

package module
v1.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 10, 2026 License: MIT Imports: 27 Imported by: 0

README

ClickHouse

Docker部署

docker pull bitnami/clickhouse:latest

docker run -itd \
    --name clickhouse-server \
    --network=app-tier \
    -p 8123:8123 \
    -p 9000:9000 \
    -p 9004:9004 \
    -e ALLOW_EMPTY_PASSWORD=no \
    -e CLICKHOUSE_ADMIN_USER=default \
    -e CLICKHOUSE_ADMIN_PASSWORD=123456 \
    bitnami/clickhouse:latest

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrInvalidColumnName is returned when an invalid column name is used.
	ErrInvalidColumnName = errors.InternalServer("INVALID_COLUMN_NAME", "invalid column name")

	// ErrInvalidTableName is returned when an invalid table name is used.
	ErrInvalidTableName = errors.InternalServer("INVALID_TABLE_NAME", "invalid table name")

	// ErrInvalidCondition is returned when an invalid condition is used in a query.
	ErrInvalidCondition = errors.InternalServer("INVALID_CONDITION", "invalid condition in query")

	// ErrQueryExecutionFailed is returned when a query execution fails.
	ErrQueryExecutionFailed = errors.InternalServer("QUERY_EXECUTION_FAILED", "query execution failed")

	// ErrExecutionFailed is returned when a general execution fails.
	ErrExecutionFailed = errors.InternalServer("EXECUTION_FAILED", "execution failed")

	// ErrAsyncInsertFailed is returned when an asynchronous insert operation fails.
	ErrAsyncInsertFailed = errors.InternalServer("ASYNC_INSERT_FAILED", "async insert operation failed")

	// ErrRowScanFailed is returned when scanning rows from a query result fails.
	ErrRowScanFailed = errors.InternalServer("ROW_SCAN_FAILED", "row scan failed")

	// ErrRowsIterationError is returned when there is an error iterating over rows.
	ErrRowsIterationError = errors.InternalServer("ROWS_ITERATION_ERROR", "rows iteration error")

	// ErrRowNotFound is returned when a specific row is not found in the result set.
	ErrRowNotFound = errors.InternalServer("ROW_NOT_FOUND", "row not found")

	// ErrConnectionFailed is returned when the connection to ClickHouse fails.
	ErrConnectionFailed = errors.InternalServer("CONNECTION_FAILED", "failed to connect to ClickHouse")

	// ErrDatabaseNotFound is returned when the specified database is not found.
	ErrDatabaseNotFound = errors.InternalServer("DATABASE_NOT_FOUND", "specified database not found")

	// ErrTableNotFound is returned when the specified table is not found.
	ErrTableNotFound = errors.InternalServer("TABLE_NOT_FOUND", "specified table not found")

	// ErrInsertFailed is returned when an insert operation fails.
	ErrInsertFailed = errors.InternalServer("INSERT_FAILED", "insert operation failed")

	// ErrUpdateFailed is returned when an update operation fails.
	ErrUpdateFailed = errors.InternalServer("UPDATE_FAILED", "update operation failed")

	// ErrDeleteFailed is returned when a delete operation fails.
	ErrDeleteFailed = errors.InternalServer("DELETE_FAILED", "delete operation failed")

	// ErrTransactionFailed is returned when a transaction fails.
	ErrTransactionFailed = errors.InternalServer("TRANSACTION_FAILED", "transaction failed")

	// ErrClientNotInitialized is returned when the ClickHouse client is not initialized.
	ErrClientNotInitialized = errors.InternalServer("CLIENT_NOT_INITIALIZED", "clickhouse client not initialized")

	// ErrGetServerVersionFailed is returned when getting the server version fails.
	ErrGetServerVersionFailed = errors.InternalServer("GET_SERVER_VERSION_FAILED", "failed to get server version")

	// ErrPingFailed is returned when a ping to the ClickHouse server fails.
	ErrPingFailed = errors.InternalServer("PING_FAILED", "ping to ClickHouse server failed")

	// ErrCreatorFunctionNil is returned when the creator function is nil.
	ErrCreatorFunctionNil = errors.InternalServer("CREATOR_FUNCTION_NIL", "creator function cannot be nil")

	// ErrBatchPrepareFailed is returned when a batch prepare operation fails.
	ErrBatchPrepareFailed = errors.InternalServer("BATCH_PREPARE_FAILED", "batch prepare operation failed")

	// ErrBatchSendFailed is returned when a batch send operation fails.
	ErrBatchSendFailed = errors.InternalServer("BATCH_SEND_FAILED", "batch send operation failed")

	// ErrBatchAppendFailed is returned when appending to a batch fails.
	ErrBatchAppendFailed = errors.InternalServer("BATCH_APPEND_FAILED", "batch append operation failed")

	// ErrBatchInsertFailed is returned when a batch insert operation fails.
	ErrBatchInsertFailed = errors.InternalServer("BATCH_INSERT_FAILED", "batch insert operation failed")

	// ErrInvalidDSN is returned when the data source name (DSN) is invalid.
	ErrInvalidDSN = errors.InternalServer("INVALID_DSN", "invalid data source name")

	// ErrInvalidProxyURL is returned when the proxy URL is invalid.
	ErrInvalidProxyURL = errors.InternalServer("INVALID_PROXY_URL", "invalid proxy URL")

	// ErrPrepareInsertDataFailed is returned when preparing insert data fails.
	ErrPrepareInsertDataFailed = errors.InternalServer("PREPARE_INSERT_DATA_FAILED", "failed to prepare insert data")

	// ErrInvalidColumnData is returned when the column data type is invalid.
	ErrInvalidColumnData = errors.InternalServer("INVALID_COLUMN_DATA", "invalid column data type")

	ErrInvalidArgument = errors.BadRequest("INVALID_ARGUMENT", "invalid argument provided")
)

Functions

This section is empty.

Types

type BatchInserter

type BatchInserter struct {
	// contains filtered or unexported fields
}

BatchInserter 批量插入器

func NewBatchInserter

func NewBatchInserter(
	ctx context.Context,
	conn clickhouseV2.Conn,
	tableName string,
	batchSize int,
	columns []string,
) (*BatchInserter, error)

NewBatchInserter 创建新的批量插入器

func (*BatchInserter) Add

func (bi *BatchInserter) Add(row interface{}) error

Add 添加数据行

func (*BatchInserter) Close

func (bi *BatchInserter) Close() error

Close 关闭插入器并提交剩余数据

func (*BatchInserter) Flush

func (bi *BatchInserter) Flush() error

Flush 强制提交当前批次

type Client

type Client struct {
	// contains filtered or unexported fields
}

func NewClient

func NewClient(opts ...Option) (*Client, error)

func (*Client) AsyncInsert

func (c *Client) AsyncInsert(ctx context.Context, tableName string, data any, wait bool) error

AsyncInsert 异步插入数据

func (*Client) AsyncInsertMany

func (c *Client) AsyncInsertMany(ctx context.Context, tableName string, data []any, wait bool) error

AsyncInsertMany 批量异步插入数据

func (*Client) BatchInsert

func (c *Client) BatchInsert(ctx context.Context, tableName string, data []any) error

BatchInsert 批量插入数据

func (*Client) BatchStructs

func (c *Client) BatchStructs(ctx context.Context, query string, data []any) error

BatchStructs 批量插入结构体数据

func (*Client) CheckConnection

func (c *Client) CheckConnection(ctx context.Context) error

CheckConnection 检查ClickHouse客户端连接是否正常

func (*Client) Close

func (c *Client) Close()

Close 关闭ClickHouse客户端连接

func (*Client) Exec

func (c *Client) Exec(ctx context.Context, query string, args ...any) error

Exec 执行非查询语句

func (*Client) GetServerVersion

func (c *Client) GetServerVersion() string

GetServerVersion 获取ClickHouse服务器版本

func (*Client) Insert

func (c *Client) Insert(ctx context.Context, tableName string, in any) error

Insert 插入数据到指定表

func (*Client) InsertMany

func (c *Client) InsertMany(ctx context.Context, tableName string, data []any) error

func (*Client) Query

func (c *Client) Query(ctx context.Context, creator Creator, results *[]any, query string, args ...any) error

Query 执行查询并返回结果

func (*Client) QueryRow

func (c *Client) QueryRow(ctx context.Context, dest any, query string, args ...any) error

QueryRow 执行查询并返回单行结果

func (*Client) Select

func (c *Client) Select(ctx context.Context, dest any, query string, args ...any) error

Select 封装 SELECT 子句

type Creator

type Creator func() any

type Option

type Option func(o *Client)

func WithAddresses

func WithAddresses(addresses ...string) Option

func WithBlockBufferSize

func WithBlockBufferSize(blockBufferSize uint8) Option

func WithCompressionLevel

func WithCompressionLevel(compressionLevel int) Option

func WithCompressionMethod

func WithCompressionMethod(compressionMethod string) Option

func WithConnMaxLifetime

func WithConnMaxLifetime(connMaxLifetime time.Duration) Option

func WithConnectionOpenStrategy

func WithConnectionOpenStrategy(connectionOpenStrategy string) Option

func WithDatabase

func WithDatabase(database string) Option

func WithDebug

func WithDebug(debug bool) Option

func WithDebugMode

func WithDebugMode(debug bool) Option

func WithDialTimeout

func WithDialTimeout(dialTimeout time.Duration) Option

func WithDsn

func WithDsn(dsn string) Option

func WithEnableMetrics

func WithEnableMetrics(enableMetrics bool) Option

func WithEnableTracing

func WithEnableTracing(enableTracing bool) Option

func WithHttpProxy

func WithHttpProxy(httpProxy string) Option

func WithLogger

func WithLogger(logger log.Logger) Option

func WithMaxCompressionBuffer

func WithMaxCompressionBuffer(maxCompressionBuffer int) Option

func WithMaxIdleConns

func WithMaxIdleConns(maxIdleConns int) Option

func WithMaxOpenConns

func WithMaxOpenConns(maxOpenConns int) Option

func WithOptions

func WithOptions(opts *clickhouseV2.Options) Option

func WithPassword

func WithPassword(password string) Option

func WithReadTimeout

func WithReadTimeout(readTimeout time.Duration) Option

func WithScheme

func WithScheme(scheme string) Option

func WithTLSConfig

func WithTLSConfig(tlsConfig *tls.Config) Option

func WithUsername

func WithUsername(username string) Option

type PagingResult

type PagingResult[E any] struct {
	Items []*E   `json:"items"`
	Total uint64 `json:"total"`
}

PagingResult 是通用的分页返回结构,包含 items 和 total 字段

type Repository

type Repository[DTO any, ENTITY any] struct {
	// contains filtered or unexported fields
}

Repository GORM 仓库,包含常用的 CRUD 方法

func NewRepository

func NewRepository[DTO any, ENTITY any](client *Client, mapper *mapper.CopierMapper[DTO, ENTITY], table string, log *log.Helper) *Repository[DTO, ENTITY]

func (*Repository[DTO, ENTITY]) BatchCreate

func (r *Repository[DTO, ENTITY]) BatchCreate(ctx context.Context, dtos []*DTO, viewMask *fieldmaskpb.FieldMask) ([]*DTO, error)

BatchCreate 批量创建记录,返回创建后的 DTO 列表

func (*Repository[DTO, ENTITY]) Count

func (r *Repository[DTO, ENTITY]) Count(ctx context.Context, baseWhere string, whereArgs ...any) (uint64, error)

Count 使用 ClickHouse client 计算符合 baseWhere 的记录数 baseWhere: 可以包含 "WHERE ..." 前缀或只写条件表达式(函数会自动拼接) 示例调用: total, err := q.Count(ctx, "id = ?", id) 支持当只传入一个切片参数时自动展开: q.Count(ctx, "id IN (?)", []int{1,2,3})

func (*Repository[DTO, ENTITY]) Create

func (r *Repository[DTO, ENTITY]) Create(ctx context.Context, dto *DTO, viewMask *fieldmaskpb.FieldMask) (*DTO, error)

Create 在数据库中创建一条记录,返回创建后的 DTO

func (*Repository[DTO, ENTITY]) CreateX

func (r *Repository[DTO, ENTITY]) CreateX(ctx context.Context, dto *DTO, viewMask *fieldmaskpb.FieldMask) (int64, error)

CreateX 使用传入的 db 创建记录,支持 viewMask 指定插入字段,返回受影响行数

func (*Repository[DTO, ENTITY]) Delete

func (r *Repository[DTO, ENTITY]) Delete(ctx context.Context, notSoftDelete bool) (int64, error)

Delete 使用传入的 db(可包含 Where)删除记录

func (*Repository[DTO, ENTITY]) Exists

func (r *Repository[DTO, ENTITY]) Exists(ctx context.Context, baseWhere string, whereArgs ...any) (bool, error)

Exists 使用传入的 db(可包含 Where)检查是否存在记录

func (*Repository[DTO, ENTITY]) Get

func (r *Repository[DTO, ENTITY]) Get(ctx context.Context, qb *query.Builder, viewMask *fieldmaskpb.FieldMask) (*DTO, error)

Get 根据查询条件获取单条记录

func (*Repository[DTO, ENTITY]) ListWithPagination

func (r *Repository[DTO, ENTITY]) ListWithPagination(ctx context.Context, req *paginationV1.PaginationRequest) (*PagingResult[DTO], error)

ListWithPagination 使用 PaginationRequest 查询列表

func (*Repository[DTO, ENTITY]) ListWithPaging

func (r *Repository[DTO, ENTITY]) ListWithPaging(ctx context.Context, req *paginationV1.PagingRequest) (*PagingResult[DTO], error)

ListWithPaging 使用 PagingRequest 查询列表

func (*Repository[DTO, ENTITY]) Only

func (r *Repository[DTO, ENTITY]) Only(ctx context.Context, qb *query.Builder, viewMask *fieldmaskpb.FieldMask) (*DTO, error)

Only alias

func (*Repository[DTO, ENTITY]) SoftDelete

func (r *Repository[DTO, ENTITY]) SoftDelete(ctx context.Context) (int64, error)

SoftDelete 对符合 whereSelectors 的记录执行软删除 whereSelectors: 应用到查询的 where scopes(按顺序)

func (*Repository[DTO, ENTITY]) Update

func (r *Repository[DTO, ENTITY]) Update(ctx context.Context, dto *DTO, updateMask *fieldmaskpb.FieldMask) (*DTO, error)

Update 使用传入的 db(可包含 Where)更新记录,支持 updateMask 指定更新字段

func (*Repository[DTO, ENTITY]) UpdateX

func (r *Repository[DTO, ENTITY]) UpdateX(ctx context.Context, dto *DTO, updateMask *fieldmaskpb.FieldMask) (int64, error)

UpdateX 使用传入的 db(可包含 Where)更新记录,支持 updateMask 指定更新字段,返回受影响行数

func (*Repository[DTO, ENTITY]) Upsert

func (r *Repository[DTO, ENTITY]) Upsert(ctx context.Context, dto *DTO, updateMask *fieldmaskpb.FieldMask) (*DTO, error)

Upsert 使用传入的 db(可包含 Where/其他 scope)执行插入或冲突更新,支持 updateMask 指定冲突时更新的字段

func (*Repository[DTO, ENTITY]) UpsertX

func (r *Repository[DTO, ENTITY]) UpsertX(ctx context.Context, dto *DTO, updateMask *fieldmaskpb.FieldMask) (int64, error)

UpsertX 使用传入的 db(可包含 Where/其他 scope)执行插入或冲突更新,支持 updateMask 指定冲突时更新的字段,返回受影响行数

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL