clickhouse

package
v0.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 4, 2026 License: MIT Imports: 6 Imported by: 0

Documentation

Overview

Package clickhouse provides a ClickHouse driver for Queen migrations.

Example

Example demonstrates basic usage of the ClickHouse driver.

package main

import (
	"context"
	"database/sql"
	"fmt"
	"log"

	_ "github.com/ClickHouse/clickhouse-go/v2"
	"github.com/honeynil/queen"
	"github.com/honeynil/queen/drivers/clickhouse"
)

func main() {
	// Connect to ClickHouse
	db, err := sql.Open("clickhouse", "clickhouse://default:password@localhost:9000/default?")
	if err != nil {
		log.Fatal(err)
	}
	defer db.Close()

	// Create ClickHouse driver
	driver, err := clickhouse.New(db)
	if err != nil {
		log.Fatal(err)
	}

	// Create Queen instance
	q := queen.New(driver)
	defer q.Close()

	// Register migrations
	q.MustAdd(queen.M{
		Version: "001",
		Name:    "create_users_table",
		UpSQL: `
			CREATE TABLE users(
				id          UUID DEFAULT generateUUIDv4(), 
				email       String                                 NOT NULL,  -- UNIQUE не поддерживается
				name        String,
				created_at  DateTime DEFAULT now()
			)
			ENGINE = ReplacingMergeTree()
			ORDER BY (id)           
		`,
		DownSQL: `DROP TABLE users`,
	})

	q.MustAdd(queen.M{
		Version: "002",
		Name:    "add_users_bio",
		UpSQL:   `ALTER TABLE users ADD COLUMN bio String`,
		DownSQL: `ALTER TABLE users DROP COLUMN bio`,
	})

	// Apply all pending migrations
	ctx := context.Background()
	if err := q.Up(ctx); err != nil {
		log.Fatal(err)
	}

	fmt.Println("Migrations applied successfully!")
}
Example (CustomTableName)

Example_customTableName demonstrates using a custom table name for migrations.

package main

import (
	"database/sql"
	"log"

	_ "github.com/ClickHouse/clickhouse-go/v2"
	"github.com/honeynil/queen"
	"github.com/honeynil/queen/drivers/clickhouse"
)

func main() {
	db, err := sql.Open("clickhouse", "clickhouse://default:password@localhost:9000/default?")
	if err != nil {
		log.Fatal(err)
	}
	defer db.Close()

	// Use custom table name
	driver, err := clickhouse.NewWithTableName(db, "my_custom_migrations")
	if err != nil {
		log.Fatal(err)
	}
	q := queen.New(driver)
	defer q.Close()

	// The migrations will be tracked in "my_custom_migrations" table
	// instead of the default "queen_migrations"
}
Example (GoFunctionMigration)

Example_goFunctionMigration demonstrates using Go functions for complex migrations.

package main

import (
	"context"
	"database/sql"
	"log"
	"strings"

	_ "github.com/ClickHouse/clickhouse-go/v2"
	"github.com/honeynil/queen"
	"github.com/honeynil/queen/drivers/clickhouse"
)

func main() {
	db, err := sql.Open("clickhouse", "clickhouse://default:password@localhost:9000/default?")
	if err != nil {
		log.Fatal(err)
	}
	defer db.Close()

	driver, err := clickhouse.New(db)
	if err != nil {
		log.Fatal(err)
	}
	q := queen.New(driver)
	defer q.Close()

	q.MustAdd(queen.M{
		Version: "003",
		Name:    "add_users",
		UpSQL: `INSERT INTO users (email, name)
				  VALUES
					('alice@example.com',    'Alice Smith'),
					('bob@example.com',      'Bob Johnson'),
					('carol@example.com',    'Carol Williams'),
					('david@example.com',    'David Brown'),
					('eve@example.com',      'Eve Davis'),
					('frank@example.com',    'Frank Miller'),
					('grace@example.com',    'Grace Wilson'),
					('henry@example.com',    'Henry Moore'),
					('isabella@example.com', 'Isabella Taylor'),
					('jack@example.com',     'Jack Anderson');`,
		DownSQL: `DELETE FROM users
					WHERE email IN (
						'alice@example.com',
						'bob@example.com',
						'carol@example.com',
						'david@example.com',
						'eve@example.com',
						'frank@example.com',
						'grace@example.com',
						'henry@example.com',
						'isabella@example.com',
						'jack@example.com'
					);`,
	})

	q.MustAdd(queen.M{
		Version: "004",
		Name:    "modify_setting",
		UpSQL: `ALTER TABLE users
					MODIFY SETTING
						enable_block_number_column = 1,
						enable_block_offset_column = 1;`,
		DownSQL: `ALTER TABLE users
					MODIFY SETTING
						enable_block_number_column = 0,
						enable_block_offset_column = 0;`,
	})

	// Migration using Go function for complex logic
	q.MustAdd(queen.M{
		Version:        "005",
		Name:           "normalize_names",
		ManualChecksum: "v1", // Important: track function changes!
		UpFunc: func(ctx context.Context, tx *sql.Tx) error {
			// Fetch all users
			rows, err := tx.QueryContext(ctx, "SELECT id, name FROM users")
			if err != nil {
				return err
			}
			defer rows.Close()

			// Normalize each email
			for rows.Next() {
				var id string
				var name string
				if err := rows.Scan(&id, &name); err != nil {
					return err
				}

				// Convert to lowercase
				normalized := normalizeNames(name)

				// Update the email
				_, err = tx.ExecContext(ctx,
					"UPDATE users SET name = ? WHERE id = ?",
					normalized, id)
				if err != nil {
					return err
				}
			}

			return rows.Err()
		},
		DownFunc: func(ctx context.Context, tx *sql.Tx) error {
			// Rollback is not possible for this migration
			return nil
		},
	})

	ctx := context.Background()
	if err := q.Up(ctx); err != nil {
		log.Fatal(err)
	}
}

// Helper function for name normalization
func normalizeNames(name string) string {

	return strings.ToUpper(name)
}
Example (Status)

Example_status demonstrates checking migration status.

Note: This example requires a running ClickHouse server. It will be skipped in CI if MySQL is not available.

package main

import (
	"context"
	"database/sql"
	"fmt"
	"log"

	_ "github.com/ClickHouse/clickhouse-go/v2"
	"github.com/honeynil/queen"
	"github.com/honeynil/queen/drivers/clickhouse"
)

func main() {
	db, err := sql.Open("clickhouse", "clickhouse://default:password@localhost:9000/default?")
	if err != nil {
		fmt.Println("ClickHouse not available")
		return
	}
	defer db.Close()

	// Check if ClickHouse is actually available
	if err := db.Ping(); err != nil {
		fmt.Println("ClickHouse not available")
		return
	}

	driver, err := clickhouse.New(db)
	if err != nil {
		log.Fatal(err)
	}
	q := queen.New(driver)
	defer q.Close()

	// Register migrations
	q.MustAdd(queen.M{
		Version: "001",
		Name:    "create_users",
		UpSQL: `CREATE TABLE users(
				id          UUID DEFAULT generateUUIDv4(),
				name        String
			)
			ENGINE = ReplacingMergeTree()
			ORDER BY (id)`,
		DownSQL: `DROP TABLE users`,
	})

	q.MustAdd(queen.M{
		Version: "002",
		Name:    "add_users_bio",
		UpSQL:   `ALTER TABLE users ADD COLUMN bio String`,
		DownSQL: `ALTER TABLE users DROP COLUMN bio`,
	})

	ctx := context.Background()

	// Apply first migration only
	if err := q.UpSteps(ctx, 1); err != nil {
		log.Fatal(err)
	}

	// Check status
	statuses, err := q.Status(ctx)
	if err != nil {
		log.Fatal(err)
	}

	for _, s := range statuses {
		fmt.Printf("%s: %s (%s)\n", s.Version, s.Name, s.Status)
	}

	// Example output (when ClickHouse is available):
	// 001: create_users (applied)
	// 002: create_posts (pending)
}
Example (WithConfig)

Example_withConfig demonstrates using custom configuration.

package main

import (
	"database/sql"
	"log"
	"time"

	_ "github.com/ClickHouse/clickhouse-go/v2"
	"github.com/honeynil/queen"
	"github.com/honeynil/queen/drivers/clickhouse"
)

func main() {
	db, err := sql.Open("clickhouse", "clickhouse://default:password@localhost:9000/default?")
	if err != nil {
		log.Fatal(err)
	}
	defer db.Close()

	driver, err := clickhouse.New(db)
	if err != nil {
		log.Fatal(err)
	}

	// Create Queen with custom config
	config := &queen.Config{
		TableName:   "custom_migrations",
		LockTimeout: 10 * time.Minute, // 10 minutes
	}
	q := queen.NewWithConfig(driver, config)
	defer q.Close()

	// Your migrations here
}

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Driver

type Driver struct {
	base.Driver
	// contains filtered or unexported fields
}

Driver implements the queen.Driver interface for ClickHouse

func New

func New(db *sql.DB) (*Driver, error)

New creates a new ClickHouse driver.

The database connection should already be open and configured. The default migrations table name is "queen_migrations".

Example:

db, err := sql.Open("clickhouse", DSN)
if err != nil {
    log.Fatal(err)
}
driver, err := clickhouse.New(db)
if err != nil {
    log.Fatal(err)
}

func NewWithTableName

func NewWithTableName(db *sql.DB, tableName string) (*Driver, error)

NewWithTableName creates a new ClickHouse driver with a custom table name.

Use this when you need to manage multiple independent sets of migrations in the same database, or when you want to customize the table name for organizational purposes.

Example:

driver, err := clickhouse.NewWithTableName(db, "my_custom_migrations")
if err != nil {
    log.Fatal(err)
}

func (*Driver) Init

func (d *Driver) Init(ctx context.Context) error

Init creates the migrations tracking table and lock table if they don't exist.

The migrations table schema:

  • version: String - unique migration version
  • name: LowCardinality(String) - human-readable migration name
  • applied_at: DateTime64(3) DEFAULT now64(3) - when the migration was applied
  • checksum: String DEFAULT ” - hash of migration content for validation

The lock table schema:

  • lock_key: LowCardinality(String) - lock identifier
  • acquired_at: DateTime64(3) - when the lock was acquired
  • expires_at: DateTime64(3) - when the lock expires
  • TTL: expires_at + 10 SECOND - automatically removes expired locks

The TTL (Time To Live) on the lock table provides automatic cleanup of expired locks as a safety mechanism. This prevents abandoned locks from blocking migrations indefinitely if a process crashes without releasing the lock.

This method is idempotent and safe to call multiple times.

func (*Driver) Lock

func (d *Driver) Lock(ctx context.Context, timeout time.Duration) error

Lock acquires a distributed lock to prevent concurrent migrations.

ClickHouse doesn't have advisory locks like PostgreSQL. Instead, this driver uses a lock table with expiration times to implement distributed locking across multiple processes/containers.

The lock mechanism: 1. Cleans up expired locks using ALTER TABLE DELETE (async in ClickHouse) 2. Checks if an active lock exists using SELECT with FINAL 3. If no lock exists, attempts INSERT 4. Retries with exponential backoff until timeout or lock is acquired

IMPORTANT: Uses FINAL modifier with ReplacingMergeTree to ensure we see deduplicated data, not intermediate merge states. This is critical because ClickHouse operations are asynchronous by nature.

Exponential backoff starts at 50ms and doubles up to 1s maximum to reduce database load during lock contention.

If the lock cannot be acquired within the timeout, returns queen.ErrLockTimeout.

func (*Driver) Unlock

func (d *Driver) Unlock(ctx context.Context) error

Unlock releases the migration lock.

This removes the lock record from the lock table, allowing other processes to acquire the lock.

The unlock operation checks the owner_id to ensure only the process that acquired the lock can release it. This prevents race conditions where an expired lock is released by the wrong process.

This method is graceful: it returns nil if the lock doesn't exist, was already released, or belongs to another process. This prevents errors during cleanup when locks expire via TTL or in error recovery scenarios.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL