clickhouse

package
v0.1.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 15, 2026 License: MIT Imports: 5 Imported by: 0

Documentation

Overview

Package clickhouse provides a ClickHouse driver for Queen migrations.

Example

Example demonstrates basic usage of the ClickHouse driver.

package main

import (
	"context"
	"database/sql"
	"fmt"
	"log"

	_ "github.com/ClickHouse/clickhouse-go/v2"
	"github.com/honeynil/queen"
	"github.com/honeynil/queen/drivers/clickhouse"
)

func main() {
	// Connect to ClickHouse
	db, err := sql.Open("clickhouse", "clickhouse://default:password@localhost:9000/default?")
	if err != nil {
		log.Fatal(err)
	}
	defer db.Close()

	// Create ClickHouse driver
	driver := clickhouse.New(db)

	// Create Queen instance
	q := queen.New(driver)
	defer q.Close()

	// Register migrations
	q.MustAdd(queen.M{
		Version: "001",
		Name:    "create_users_table",
		UpSQL: `
			CREATE TABLE users(
				id          UUID DEFAULT generateUUIDv4(), 
				email       String                                 NOT NULL,  -- UNIQUE не поддерживается
				name        String,
				created_at  DateTime DEFAULT now()
			)
			ENGINE = ReplacingMergeTree()
			ORDER BY (id)           
		`,
		DownSQL: `DROP TABLE users`,
	})

	q.MustAdd(queen.M{
		Version: "002",
		Name:    "add_users_bio",
		UpSQL:   `ALTER TABLE users ADD COLUMN bio String`,
		DownSQL: `ALTER TABLE users DROP COLUMN bio`,
	})

	// Apply all pending migrations
	ctx := context.Background()
	if err := q.Up(ctx); err != nil {
		log.Fatal(err)
	}

	fmt.Println("Migrations applied successfully!")
}
Example (CustomTableName)

Example_customTableName demonstrates using a custom table name for migrations.

package main

import (
	"database/sql"

	_ "github.com/ClickHouse/clickhouse-go/v2"
	"github.com/honeynil/queen"
	"github.com/honeynil/queen/drivers/clickhouse"
)

func main() {
	db, _ := sql.Open("clickhouse", "clickhouse://default:password@localhost:9000/default?")
	defer db.Close()

	// Use custom table name
	driver := clickhouse.NewWithTableName(db, "my_custom_migrations")
	q := queen.New(driver)
	defer q.Close()

	// The migrations will be tracked in "my_custom_migrations" table
	// instead of the default "queen_migrations"
}
Example (GoFunctionMigration)

Example_goFunctionMigration demonstrates using Go functions for complex migrations.

package main

import (
	"context"
	"database/sql"
	"log"
	"strings"

	_ "github.com/ClickHouse/clickhouse-go/v2"
	"github.com/honeynil/queen"
	"github.com/honeynil/queen/drivers/clickhouse"
)

func main() {
	db, _ := sql.Open("clickhouse", "clickhouse://default:password@localhost:9000/default?")
	defer db.Close()

	driver := clickhouse.New(db)
	q := queen.New(driver)
	defer q.Close()

	q.MustAdd(queen.M{
		Version: "003",
		Name:    "add_users",
		UpSQL: `INSERT INTO users (email, name)
				  VALUES
					('alice@example.com',    'Alice Smith'),
					('bob@example.com',      'Bob Johnson'),
					('carol@example.com',    'Carol Williams'),
					('david@example.com',    'David Brown'),
					('eve@example.com',      'Eve Davis'),
					('frank@example.com',    'Frank Miller'),
					('grace@example.com',    'Grace Wilson'),
					('henry@example.com',    'Henry Moore'),
					('isabella@example.com', 'Isabella Taylor'),
					('jack@example.com',     'Jack Anderson');`,
		DownSQL: `DELETE FROM users
					WHERE email IN (
						'alice@example.com',
						'bob@example.com',
						'carol@example.com',
						'david@example.com',
						'eve@example.com',
						'frank@example.com',
						'grace@example.com',
						'henry@example.com',
						'isabella@example.com',
						'jack@example.com'
					);`,
	})

	q.MustAdd(queen.M{
		Version: "004",
		Name:    "modify_setting",
		UpSQL: `ALTER TABLE users
					MODIFY SETTING
						enable_block_number_column = 1,
						enable_block_offset_column = 1;`,
		DownSQL: `ALTER TABLE users
					MODIFY SETTING
						enable_block_number_column = 0,
						enable_block_offset_column = 0;`,
	})

	// Migration using Go function for complex logic
	q.MustAdd(queen.M{
		Version:        "005",
		Name:           "normalize_names",
		ManualChecksum: "v1", // Important: track function changes!
		UpFunc: func(ctx context.Context, tx *sql.Tx) error {
			// Fetch all users
			rows, err := tx.QueryContext(ctx, "SELECT id, name FROM users")
			if err != nil {
				return err
			}
			defer rows.Close()

			// Normalize each email
			for rows.Next() {
				var id string
				var name string
				if err := rows.Scan(&id, &name); err != nil {
					return err
				}

				// Convert to lowercase
				normalized := normalizeNames(name)

				// Update the email
				_, err = tx.ExecContext(ctx,
					"UPDATE users SET name = ? WHERE id = ?",
					normalized, id)
				if err != nil {
					return err
				}
			}

			return rows.Err()
		},
		DownFunc: func(ctx context.Context, tx *sql.Tx) error {
			// Rollback is not possible for this migration
			return nil
		},
	})

	ctx := context.Background()
	if err := q.Up(ctx); err != nil {
		log.Fatal(err)
	}
}

// Helper function for name normalization
func normalizeNames(name string) string {

	return strings.ToUpper(name)
}
Example (Status)

Example_status demonstrates checking migration status.

Note: This example requires a running ClickHouse server. It will be skipped in CI if MySQL is not available.

package main

import (
	"context"
	"database/sql"
	"fmt"
	"log"

	_ "github.com/ClickHouse/clickhouse-go/v2"
	"github.com/honeynil/queen"
	"github.com/honeynil/queen/drivers/clickhouse"
)

func main() {
	db, err := sql.Open("clickhouse", "clickhouse://default:password@localhost:9000/default?")
	if err != nil {
		fmt.Println("ClickHouse not available")
		return
	}
	defer db.Close()

	// Check if ClickHouse is actually available
	if err := db.Ping(); err != nil {
		fmt.Println("ClickHouse not available")
		return
	}

	driver := clickhouse.New(db)
	q := queen.New(driver)
	defer q.Close()

	// Register migrations
	q.MustAdd(queen.M{
		Version: "001",
		Name:    "create_users",
		UpSQL: `CREATE TABLE users(
				id          UUID DEFAULT generateUUIDv4(),
				name        String
			)
			ENGINE = ReplacingMergeTree()
			ORDER BY (id)`,
		DownSQL: `DROP TABLE users`,
	})

	q.MustAdd(queen.M{
		Version: "002",
		Name:    "add_users_bio",
		UpSQL:   `ALTER TABLE users ADD COLUMN bio String`,
		DownSQL: `ALTER TABLE users DROP COLUMN bio`,
	})

	ctx := context.Background()

	// Apply first migration only
	if err := q.UpSteps(ctx, 1); err != nil {
		log.Fatal(err)
	}

	// Check status
	statuses, err := q.Status(ctx)
	if err != nil {
		log.Fatal(err)
	}

	for _, s := range statuses {
		fmt.Printf("%s: %s (%s)\n", s.Version, s.Name, s.Status)
	}

	// Example output (when ClickHouse is available):
	// 001: create_users (applied)
	// 002: create_posts (pending)
}
Example (WithConfig)

Example_withConfig demonstrates using custom configuration.

package main

import (
	"database/sql"
	"time"

	_ "github.com/ClickHouse/clickhouse-go/v2"
	"github.com/honeynil/queen"
	"github.com/honeynil/queen/drivers/clickhouse"
)

func main() {
	db, _ := sql.Open("clickhouse", "clickhouse://default:password@localhost:9000/default?")
	defer db.Close()

	driver := clickhouse.New(db)

	// Create Queen with custom config
	config := &queen.Config{
		TableName:   "custom_migrations",
		LockTimeout: 10 * time.Minute, // 10 minutes
	}
	q := queen.NewWithConfig(driver, config)
	defer q.Close()

	// Your migrations here
}

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Driver

type Driver struct {
	// contains filtered or unexported fields
}

Driver implements the queen.Driver interface for ClickHouse

func New

func New(db *sql.DB) *Driver

New creates a new ClickHouse driver.

The database connection should already be open and configured. The default migrations table name is "queen_migrations".

Example:

db, err := sql.Open("clickhouse", DSN)
if err != nil {
    log.Fatal(err)
}
driver := clickhouse.New(db)

func NewWithTableName

func NewWithTableName(db *sql.DB, tableName string) *Driver

NewWithTableName creates a new ClickHouse driver with a custom table name.

Use this when you need to manage multiple independent sets of migrations in the same database, or when you want to customize the table name for organizational purposes.

Example:

driver := clickhouse.NewWithTableName(db, "my_custom_migrations")

func (*Driver) Close

func (d *Driver) Close() error

Close closes the database connection.

func (*Driver) Exec

func (d *Driver) Exec(ctx context.Context, fn func(*sql.Tx) error) error

Exec executes a function within a transaction.

IMPORTANT: ClickHouse transaction support is LIMITED and EXPERIMENTAL.

Transaction limitations in ClickHouse:

  • Only works for MergeTree engine family tables (e.g., MergeTree, ReplacingMergeTree)
  • Requires experimental feature flag: allow_experimental_transactions=1
  • Provides atomicity only for the current session, not full ACID guarantees
  • Cross-table atomicity is limited
  • Not suitable for high-concurrency OLTP workloads

Despite these limitations, transactions are used here to provide best-effort atomicity for migration execution. Most migration DDL operations (CREATE TABLE, ALTER TABLE) are atomic by nature in ClickHouse.

If the function returns an error, the transaction is rolled back. Otherwise, the transaction is committed.

See: https://clickhouse.com/docs/en/guides/developer/transactional

func (*Driver) GetApplied

func (d *Driver) GetApplied(ctx context.Context) ([]queen.Applied, error)

GetApplied returns all applied migrations sorted by applied_at in ascending order.

This is used by Queen to determine which migrations have already been applied and which are pending.

func (*Driver) Init

func (d *Driver) Init(ctx context.Context) error

Init creates the migrations tracking table and lock table if they don't exist.

The migrations table schema:

  • version: String - unique migration version
  • name: LowCardinality(String) - human-readable migration name
  • applied_at: DateTime64(3) DEFAULT now64(3) - when the migration was applied
  • checksum: String DEFAULT ” - hash of migration content for validation

The lock table schema:

  • lock_key: LowCardinality(String) - lock identifier
  • acquired_at: DateTime64(3) - when the lock was acquired
  • expires_at: DateTime64(3) - when the lock expires
  • TTL: expires_at + 10 SECOND - automatically removes expired locks

The TTL (Time To Live) on the lock table provides automatic cleanup of expired locks as a safety mechanism. This prevents abandoned locks from blocking migrations indefinitely if a process crashes without releasing the lock.

This method is idempotent and safe to call multiple times.

func (*Driver) Lock

func (d *Driver) Lock(ctx context.Context, timeout time.Duration) error

Lock acquires a distributed lock to prevent concurrent migrations.

ClickHouse doesn't have advisory locks like PostgreSQL. Instead, this driver uses a lock table with expiration times to implement distributed locking across multiple processes/containers.

The lock mechanism: 1. Cleans up expired locks using ALTER TABLE DELETE (async in ClickHouse) 2. Checks if an active lock exists using SELECT with FINAL 3. If no lock exists, attempts INSERT 4. Retries with exponential backoff until timeout or lock is acquired

IMPORTANT: Uses FINAL modifier with ReplacingMergeTree to ensure we see deduplicated data, not intermediate merge states. This is critical because ClickHouse operations are asynchronous by nature.

Exponential backoff starts at 50ms and doubles up to 1s maximum to reduce database load during lock contention.

If the lock cannot be acquired within the timeout, returns queen.ErrLockTimeout.

func (*Driver) Record

func (d *Driver) Record(ctx context.Context, m *queen.Migration) error

Record marks a migration as applied in the database.

This should be called after successfully executing a migration's up function. The checksum is automatically computed from the migration content.

func (*Driver) Remove

func (d *Driver) Remove(ctx context.Context, version string) error

Remove removes a migration record from the database.

This should be called after successfully rolling back a migration's down function.

func (*Driver) Unlock

func (d *Driver) Unlock(ctx context.Context) error

Unlock releases the migration lock.

This removes the lock record from the lock table, allowing other processes to acquire the lock.

This method is graceful: it returns nil if the lock doesn't exist or was already released. This prevents errors during cleanup when locks expire via TTL or in error recovery scenarios.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL