clickhouse

package
v1.5.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 8, 2026 License: AGPL-3.0 Imports: 17 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// DefaultQueryTimeout is the default max_execution_time in seconds if not specified
	DefaultQueryTimeout = 60
	// MaxQueryTimeout is the maximum allowed timeout to prevent resource abuse
	MaxQueryTimeout = 300 // 5 minutes
)

Default values for query execution

View Source
const (
	DefaultQueryLimit          = 100
	HealthCheckTimeout         = 1 * time.Second // Reduce to 1 second for faster health checks
	DefaultHealthCheckInterval = 30 * time.Second
)

Default values

View Source
const (
	// DefaultLimit is a conservative fallback used only when maxLimit is not configured.
	DefaultLimit = 1000
)

Variables

View Source
var (
	// ErrSourceNotConnected is returned when a source is not connected
	ErrSourceNotConnected = errors.New("source not connected")

	// ErrQueryTimeout is returned when a query times out
	ErrQueryTimeout = errors.New("query timeout exceeded")

	// ErrInvalidQuery is returned when a query is invalid
	ErrInvalidQuery = errors.New("invalid query")

	// ErrConnectionFailed is returned when a connection cannot be established
	ErrConnectionFailed = errors.New("connection failed")

	// ErrSourceExists is returned when trying to create a source that already exists
	ErrSourceExists = errors.New("source already exists")

	// ErrInvalidSourceType is returned when the source type is not supported
	ErrInvalidSourceType = errors.New("invalid source type")
)

Common errors for the clickhouse package

Functions

func IsValidationError added in v1.5.0

func IsValidationError(err error) bool

IsValidationError checks if an error (or any in its chain) is a ValidationError.

func ValidateIdentifier added in v1.4.0

func ValidateIdentifier(name string) error

ValidateIdentifier checks that a string is a safe SQL identifier (column name, field name).

func ValidateTimezone added in v1.4.0

func ValidateTimezone(tz string) error

ValidateTimezone checks that a string is a safe timezone identifier for ClickHouse.

Types

type AllFieldValuesParams added in v1.0.0

type AllFieldValuesParams struct {
	TimestampField string    // Required: timestamp column name for time range filter
	StartTime      time.Time // Required: start of time range
	EndTime        time.Time // Required: end of time range
	Timezone       string    // Optional: timezone for time conversion (defaults to UTC)
	Limit          int       // Optional: max values per field (default 10, max 100)
	Timeout        *int      // Optional: query timeout in seconds (default 5s for String fields)
	LogchefQL      string    // Optional: LogchefQL query string - parsed on backend for proper SQL generation
}

AllFieldValuesParams holds parameters for fetching field values for filterable columns.

type Client

type Client struct {
	// contains filtered or unexported fields
}

Client represents a connection to a ClickHouse database using the native protocol. It provides methods for executing queries and retrieving metadata.

func NewClient

func NewClient(opts ClientOptions, logger *slog.Logger) (*Client, error)

NewClient establishes a new connection to a ClickHouse server using the native protocol. It takes connection options and a logger, creates the connection, and returns a Client instance. Note: This does not automatically verify the connection with a ping - callers should do that if needed.

func (*Client) AddQueryHook

func (c *Client) AddQueryHook(hook QueryHook)

AddQueryHook registers a hook to be executed before and after queries run by this client.

func (*Client) Close

func (c *Client) Close() error

Close terminates the underlying database connection with a timeout.

func (*Client) ColumnStats

func (c *Client) ColumnStats(ctx context.Context, database, table string) ([]TableColumnStat, error)

ColumnStats retrieves detailed statistics for each column of a specific table.

func (*Client) GetAllFilterableFieldValues added in v1.0.0

func (c *Client) GetAllFilterableFieldValues(ctx context.Context, database, table string, params AllFieldValuesParams) (map[string]*FieldValuesResult, error)

GetAllFilterableFieldValues retrieves distinct values for all filterable fields within a time range. Filterable fields include: LowCardinality, String, Nullable(String), and Enum types. This is useful for populating a field sidebar with filterable values. For String fields, a shorter timeout is used to gracefully handle high cardinality columns. IMPORTANT: Time range is required to avoid scanning entire tables.

func (*Client) GetAllLowCardinalityFieldValues added in v1.0.0

func (c *Client) GetAllLowCardinalityFieldValues(ctx context.Context, database, table string, params AllFieldValuesParams) (map[string]*FieldValuesResult, error)

GetAllLowCardinalityFieldValues is deprecated, use GetAllFilterableFieldValues instead. Kept for backwards compatibility.

func (*Client) GetFieldDistinctValues added in v1.0.0

func (c *Client) GetFieldDistinctValues(ctx context.Context, database, table string, params FieldValuesParams) (*FieldValuesResult, error)

GetFieldDistinctValues retrieves the top N distinct values for a field within a time range.

func (*Client) GetHistogramData

func (c *Client) GetHistogramData(ctx context.Context, tableName, timestampField string, params HistogramParams) (*HistogramResult, error)

GetHistogramData generates histogram data by grouping log counts into time buckets.

func (*Client) GetSurroundingLogs added in v1.0.0

func (c *Client) GetSurroundingLogs(ctx context.Context, tableName, timestampField string, params LogContextParams, queryTimeout *int) (*LogContextResult, error)

GetSurroundingLogs retrieves logs around a specific timestamp, similar to grep -C. It executes 2 queries: one for logs at or before the target time, one for logs after. The target timestamp logs are included at the end of BeforeLogs (after reversing).

func (*Client) GetTableInfo

func (c *Client) GetTableInfo(ctx context.Context, database, table string) (*TableInfo, error)

GetTableInfo retrieves detailed metadata about a table, including handling for Distributed tables by inspecting the underlying local table.

func (*Client) IngestionStats added in v1.3.0

func (c *Client) IngestionStats(ctx context.Context, database, table, timestampField string) (*IngestionStats, error)

IngestionStats retrieves recent ingestion statistics for a specific table.

func (*Client) Ping

func (c *Client) Ping(ctx context.Context, database, table string) error

Ping checks the connectivity to the ClickHouse server and optionally verifies a table exists. It uses short timeouts internally. Returns nil on success, or an error indicating the failure reason.

func (*Client) Query

func (c *Client) Query(ctx context.Context, query string) (*models.QueryResult, error)

Query executes a SELECT query, processes the results, and applies query hooks. It automatically handles DDL statements by calling execDDL. The params argument is now unused but kept for potential future structured query building.

func (*Client) QueryWithTimeout added in v0.2.2

func (c *Client) QueryWithTimeout(ctx context.Context, query string, timeoutSeconds *int) (*models.QueryResult, error)

QueryWithTimeout executes a SELECT query with a timeout setting. The timeoutSeconds parameter is required and will always be applied.

func (*Client) Reconnect

func (c *Client) Reconnect(ctx context.Context) error

Reconnect attempts to re-establish the connection to the ClickHouse server. This is useful for recovering from connection failures during health checks.

func (*Client) TableStats

func (c *Client) TableStats(ctx context.Context, database, table string) (*TableStat, error)

TableStats retrieves overall statistics for a specific table from active parts.

type ClientOptions

type ClientOptions struct {
	Host     string                 // Hostname or IP address.
	Database string                 // Target database name.
	Username string                 // Username for authentication.
	Password string                 // Password for authentication.
	Settings map[string]interface{} // Additional ClickHouse settings (e.g., max_execution_time).
	SourceID string                 // Source ID for metrics tracking.
	Source   *models.Source         // Source model for enhanced metrics.
}

ClientOptions holds configuration for establishing a new ClickHouse client connection.

type ExtendedColumnInfo

type ExtendedColumnInfo struct {
	Name              string `json:"name"`
	Type              string `json:"type"`
	IsNullable        bool   `json:"is_nullable"`
	IsPrimaryKey      bool   `json:"is_primary_key"`
	DefaultExpression string `json:"default_expression,omitempty"`
	Comment           string `json:"comment,omitempty"`
}

ExtendedColumnInfo provides detailed column metadata, including nullability, primary key status, default expressions, and comments, supplementing models.ColumnInfo.

type FieldValueInfo added in v1.0.0

type FieldValueInfo struct {
	Value string `json:"value"`
	Count int64  `json:"count"`
}

FieldValueInfo represents a distinct value with its count for a field.

type FieldValuesParams added in v1.0.0

type FieldValuesParams struct {
	FieldName      string
	FieldType      string
	TimestampField string    // Required: timestamp column name for time range filter
	StartTime      time.Time // Required: start of time range
	EndTime        time.Time // Required: end of time range
	Timezone       string    // Optional: timezone for time conversion (defaults to UTC)
	Limit          int       // Optional: max values to return (default 10, max 100)
	Timeout        *int      // Optional: query timeout in seconds
	LogchefQL      string    // Optional: LogchefQL query string - parsed on backend for proper SQL generation
}

FieldValuesParams holds parameters for fetching field distinct values.

type FieldValuesResult added in v1.0.0

type FieldValuesResult struct {
	FieldName     string           `json:"field_name"`
	FieldType     string           `json:"field_type"`
	IsLowCard     bool             `json:"is_low_cardinality"`
	Values        []FieldValueInfo `json:"values"`
	TotalDistinct int64            `json:"total_distinct"`
}

FieldValuesResult holds the distinct values for a field along with metadata.

type HistogramData

type HistogramData struct {
	Bucket     time.Time `json:"bucket"`      // Start time of the bucket.
	LogCount   int       `json:"log_count"`   // Number of logs in the bucket.
	GroupValue string    `json:"group_value"` // Value of the group for grouped histograms.
}

HistogramData represents a single time bucket and its log count in a histogram.

type HistogramParams

type HistogramParams struct {
	Window   TimeWindow
	Query    string // Raw SQL query to use as base for histogram
	GroupBy  string // Optional: Field to group by for segmented histograms.
	Timezone string // Optional: Timezone identifier for time-based operations.
	// Query execution timeout in seconds. If not specified, uses default timeout.
	QueryTimeout *int
}

HistogramParams defines parameters for generating histogram data.

type HistogramResult

type HistogramResult struct {
	Granularity string          `json:"granularity"` // The time window used (e.g., "5m").
	Data        []HistogramData `json:"data"`
}

HistogramResult holds the complete histogram data and its granularity.

type IngestionBucket added in v1.3.0

type IngestionBucket struct {
	Bucket time.Time `json:"bucket"`
	Rows   uint64    `json:"rows"`
}

IngestionBucket represents ingestion volume for a given time bucket.

type IngestionStats added in v1.3.0

type IngestionStats struct {
	Rows1h        uint64            `json:"rows_1h"`
	Rows24h       uint64            `json:"rows_24h"`
	Rows7d        uint64            `json:"rows_7d"`
	LatestTS      *time.Time        `json:"latest_ts,omitempty"`
	HourlyBuckets []IngestionBucket `json:"hourly_buckets"`
	DailyBuckets  []IngestionBucket `json:"daily_buckets"`
}

IngestionStats represents recent ingestion activity for a table.

type LogContextParams

type LogContextParams struct {
	TargetTime      time.Time
	BeforeLimit     int
	AfterLimit      int
	BeforeOffset    int  // Offset for before query (for pagination)
	AfterOffset     int  // Offset for after query (for pagination)
	ExcludeBoundary bool // When true, use < instead of <= for before query (for pagination)
}

LogContextParams defines parameters for fetching logs around a specific target time.

type LogContextResult

type LogContextResult struct {
	BeforeLogs []map[string]interface{}
	TargetLogs []map[string]interface{} // Logs exactly at the target time.
	AfterLogs  []map[string]interface{}
	Stats      models.QueryStats
}

LogContextResult holds the logs retrieved before, at, and after the target time.

type LogQueryHook

type LogQueryHook struct {

	// Verbose logs all queries if true; otherwise, only logs failed queries.
	Verbose bool
	// contains filtered or unexported fields
}

LogQueryHook is a basic QueryHook implementation that logs query execution start and completion/failure, controlled by the Verbose flag.

func NewLogQueryHook

func NewLogQueryHook(logger *slog.Logger, verbose bool) *LogQueryHook

NewLogQueryHook creates a new LogQueryHook.

func (*LogQueryHook) AfterQuery

func (h *LogQueryHook) AfterQuery(ctx context.Context, query string, err error, duration time.Duration)

AfterQuery logs query completion status (success/failure) and duration. Success is only logged if Verbose is true.

func (*LogQueryHook) BeforeQuery

func (h *LogQueryHook) BeforeQuery(ctx context.Context, query string) (context.Context, error)

BeforeQuery optionally logs the query before execution if Verbose is true.

type LogQueryParams

type LogQueryParams struct {
	Limit    int
	MaxLimit int
	RawSQL   string
	// Query execution timeout in seconds. If not specified, uses default timeout.
	QueryTimeout *int
}

LogQueryParams defines parameters for querying logs.

type LogQueryResult

type LogQueryResult struct {
	Data    []map[string]interface{} `json:"data"`
	Stats   models.QueryStats        `json:"stats"`
	Columns []models.ColumnInfo      `json:"columns"`
}

LogQueryResult represents the structured result of a log query.

type Manager

type Manager struct {
	// contains filtered or unexported fields
}

Manager handles pooling and management of multiple ClickHouse client connections, one per data source. It also manages query hooks and background health checks.

func NewManager

func NewManager(log *slog.Logger) *Manager

NewManager creates a new ClickHouse connection manager.

func (*Manager) AddQueryHook

func (m *Manager) AddQueryHook(hook QueryHook)

AddQueryHook adds a query hook to the manager's list. The hook will be applied to all currently managed clients and any subsequently added clients via AddSource.

func (*Manager) AddSource

func (m *Manager) AddSource(ctx context.Context, source *models.Source) error

AddSource creates a new ClickHouse client connection based on the source details, applies existing hooks, stores it in the manager pool, and initializes health. Modified to always create a client entry even if initial connection fails.

func (*Manager) Close

func (m *Manager) Close() error

Close iterates through all managed client connections and closes them, with a timeout for each client to prevent hanging on unhealthy connections. It also stops the background health checker and waits for it to complete.

func (*Manager) CreateTemporaryClient

func (m *Manager) CreateTemporaryClient(ctx context.Context, source *models.Source) (*Client, error)

CreateTemporaryClient creates a new, unmanaged ClickHouse client instance, typically used for validating connection details before adding a source. The caller is responsible for closing the returned client.

func (*Manager) GetCachedHealth

func (m *Manager) GetCachedHealth(sourceID models.SourceID) models.SourceHealth

GetCachedHealth retrieves the latest known health status for a source ID from the cache. Returns a default unhealthy status if the source hasn't been checked yet.

func (*Manager) GetClient

func (m *Manager) GetClient(sourceID models.SourceID) (*Client, error)

GetClient is an alias for GetConnection for potential backward compatibility.

func (*Manager) GetConnection

func (m *Manager) GetConnection(sourceID models.SourceID) (*Client, error)

GetConnection returns the managed client connection for a given source ID. Returns ErrSourceNotConnected if the source is not currently managed.

func (*Manager) GetHealth

func (m *Manager) GetHealth(ctx context.Context, sourceID models.SourceID) models.SourceHealth

GetHealth performs a LIVE health check on a specific source and updates the cache. Deprecated: Use GetCachedHealth for regular status checks. Use this only if an immediate, live check is explicitly required.

func (*Manager) RemoveSource

func (m *Manager) RemoveSource(sourceID models.SourceID) error

RemoveSource closes the connection for the given source ID and removes it from the manager.

func (*Manager) StartBackgroundHealthChecks

func (m *Manager) StartBackgroundHealthChecks(interval time.Duration)

StartBackgroundHealthChecks launches a goroutine to periodically check the health of all managed connections. nolint:contextcheck // Background goroutine intentionally uses its own context

func (*Manager) StopBackgroundHealthChecks

func (m *Manager) StopBackgroundHealthChecks()

StopBackgroundHealthChecks signals the health check goroutine to stop.

type QueryBuilder

type QueryBuilder struct {
	// contains filtered or unexported fields
}

QueryBuilder assists in building and validating ClickHouse SQL queries.

func NewExtendedQueryBuilder added in v1.2.0

func NewExtendedQueryBuilder(tableName string, maxLimit int) *QueryBuilder

NewExtendedQueryBuilder creates a QueryBuilder that allows any SELECT query. Only validates that the query is a SELECT statement (not INSERT/DELETE/UPDATE). The ClickHouse connection permissions are the real security boundary.

func NewQueryBuilder

func NewQueryBuilder(tableName string, maxLimit int) *QueryBuilder

NewQueryBuilder creates a new QueryBuilder for restricted mode. This validates that queries target the specified table and blocks JOINs/subqueries.

func (*QueryBuilder) BuildRawQuery

func (qb *QueryBuilder) BuildRawQuery(rawSQL string, limit int) (string, error)

BuildRawQuery parses, validates, and adds LIMIT to a SQL query.

func (*QueryBuilder) RemoveLimitClause

func (qb *QueryBuilder) RemoveLimitClause(rawSQL string) (string, error)

RemoveLimitClause parses the SQL and removes any LIMIT clause.

type QueryHook

type QueryHook interface {
	// BeforeQuery is called before a query is executed.
	// It can return a modified context for the query execution.
	BeforeQuery(ctx context.Context, query string) (context.Context, error)

	// AfterQuery is called after a query has finished executing,
	// regardless of whether it succeeded or failed.
	AfterQuery(ctx context.Context, query string, err error, duration time.Duration)
}

QueryHook defines an interface for intercepting ClickHouse queries. Hooks can modify the context or log query details before and after execution.

type QueryMode added in v1.2.0

type QueryMode int

QueryMode defines the validation strictness for SQL queries.

const (
	// RestrictedMode validates table reference and blocks JOINs/subqueries.
	// Used for LogchefQL-generated queries.
	RestrictedMode QueryMode = iota
	// ExtendedMode allows any SELECT query without table validation.
	// The ClickHouse connection permissions are the security boundary.
	ExtendedMode
)

type StructuredQueryLoggerHook

type StructuredQueryLoggerHook struct {
	// contains filtered or unexported fields
}

StructuredQueryLoggerHook implements QueryHook to log query details *before* execution using structured logging attributes.

func NewStructuredQueryLoggerHook

func NewStructuredQueryLoggerHook(logger *slog.Logger) *StructuredQueryLoggerHook

NewStructuredQueryLoggerHook creates a new StructuredQueryLoggerHook.

func (*StructuredQueryLoggerHook) AfterQuery

func (h *StructuredQueryLoggerHook) AfterQuery(ctx context.Context, query string, err error, duration time.Duration)

AfterQuery is a no-op for StructuredQueryLoggerHook.

func (*StructuredQueryLoggerHook) BeforeQuery

func (h *StructuredQueryLoggerHook) BeforeQuery(ctx context.Context, query string) (context.Context, error)

type TableColumnStat

type TableColumnStat struct {
	Database     string  `json:"database"`
	Table        string  `json:"table"`
	Column       string  `json:"column"`
	Compressed   string  `json:"compressed"`   // Size on disk (human-readable).
	Uncompressed string  `json:"uncompressed"` // Original size (human-readable).
	ComprRatio   float64 `json:"compr_ratio"`  // Compression ratio.
	RowsCount    uint64  `json:"rows_count"`   // Number of rows in the column chunk.
	AvgRowSize   float64 `json:"avg_row_size"` // Average row size in bytes.
}

TableColumnStat represents statistics for a single column in a ClickHouse table, typically retrieved from system.parts_columns.

type TableInfo

type TableInfo struct {
	Database     string               `json:"database"`
	Name         string               `json:"name"`
	Engine       string               `json:"engine"`                  // e.g., "MergeTree", "Distributed"
	EngineParams []string             `json:"engine_params,omitempty"` // Parameters extracted from engine_full.
	Columns      []models.ColumnInfo  `json:"columns"`                 // Basic column info (Name, Type).
	ExtColumns   []ExtendedColumnInfo `json:"ext_columns,omitempty"`   // Detailed column info.
	SortKeys     []string             `json:"sort_keys"`               // Parsed sorting key columns.
	CreateQuery  string               `json:"create_query,omitempty"`  // Full CREATE TABLE statement.
}

TableInfo represents comprehensive metadata about a ClickHouse table, including engine details, column definitions (basic and extended), sorting keys, and the CREATE statement.

type TableStat

type TableStat struct {
	Database     string  `json:"database"`
	Table        string  `json:"table"`
	Compressed   string  `json:"compressed"`   // Total size on disk (human-readable).
	Uncompressed string  `json:"uncompressed"` // Total original size (human-readable).
	ComprRate    float64 `json:"compr_rate"`   // Overall compression rate.
	Rows         uint64  `json:"rows"`         // Total rows in the table partition/part.
	PartCount    uint64  `json:"part_count"`   // Number of data parts.
}

TableStat represents overall statistics for a ClickHouse table, typically retrieved from system.parts.

type TimeWindow

type TimeWindow string

TimeWindow represents the desired granularity for time-based aggregations.

const (
	// Second-based windows
	TimeWindow1s  TimeWindow = "1s"  // 1 second
	TimeWindow5s  TimeWindow = "5s"  // 5 seconds
	TimeWindow10s TimeWindow = "10s" // 10 seconds
	TimeWindow15s TimeWindow = "15s" // 15 seconds
	TimeWindow30s TimeWindow = "30s" // 30 seconds

	// Minute-based windows
	TimeWindow1m  TimeWindow = "1m"  // 1 minute
	TimeWindow5m  TimeWindow = "5m"  // 5 minutes
	TimeWindow10m TimeWindow = "10m" // 10 minutes
	TimeWindow15m TimeWindow = "15m" // 15 minutes
	TimeWindow30m TimeWindow = "30m" // 30 minutes

	// Hour-based windows
	TimeWindow1h  TimeWindow = "1h"  // 1 hour
	TimeWindow2h  TimeWindow = "2h"  // 2 hours
	TimeWindow3h  TimeWindow = "3h"  // 3 hours
	TimeWindow6h  TimeWindow = "6h"  // 6 hours
	TimeWindow12h TimeWindow = "12h" // 12 hours
	TimeWindow24h TimeWindow = "24h" // 24 hours
)

type ValidationError added in v1.5.0

type ValidationError struct {
	Message string
}

ValidationError is returned for invalid inputs (field names, timezones). Callers can use errors.As to distinguish validation failures from DB errors.

func (*ValidationError) Error added in v1.5.0

func (e *ValidationError) Error() string

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL