distrlock

package
v0.4.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 18, 2025 License: MIT Imports: 9 Imported by: 0

README

distrlock

GoDoc Widget

distrlock is a Go package that implements distributed locking using SQL databases. It allows multiple processes or services to coordinate access to shared resources by acquiring and releasing locks stored in a database.

Features
  • Distributed lock management using SQL databases (PostgreSQL, MySQL are supported now).
  • Support for acquiring, releasing, and extending locks.
  • Configurable lock expiration times.

How It Works

distrlock uses a relational database to implement distributed locking. When a process acquires a lock, a record is inserted or updated in a designated table within the database. The lock entry includes a unique key, a token for verification, and an expiration time to handle failures or crashes. Other processes attempting to acquire the same lock must wait until it is released or expires. If required, the lock can be extended before expiration to prevent unintended release.

This approach ensures reliable concurrency control without requiring an external distributed coordination system like Zookeeper or etcd, making it lightweight and easy to integrate into existing systems that already use SQL databases.

Usage

distlock provides a simple API for acquiring and releasing locks.

The following basic example demonstrates how to use distrlock to ensure exclusive execution of a critical section of code:

package main

import (
	"context"
	"database/sql"
	"log"
	"os"
	"time"

	"github.com/acronis/go-dbkit"
	"github.com/acronis/go-dbkit/distrlock"
)

func main() {
	// Setup database connection
	db, err := sql.Open("mysql", os.Getenv("MYSQL_DSN"))
	if err != nil {
		log.Fatal(err)
	}
	defer db.Close()

	ctx := context.Background()

	// Create "distributed_locks" table for locks.
	createTableSQL, err := distrlock.CreateTableSQL(dbkit.DialectMySQL)
	if err != nil {
		log.Fatal(err)
	}
	_, err = db.ExecContext(ctx, createTableSQL)
	if err != nil {
		log.Fatal(err)
	}

	// Do some work exclusively.
	const lockKey = "test-lock-key-1" // Unique key that will be used to ensure exclusive execution among multiple instances
	err = distrlock.DoExclusively(ctx, db, dbkit.DialectMySQL, lockKey, func(ctx context.Context) error {
		time.Sleep(10 * time.Second) // Simulate work.
		return nil
	})
	if err != nil {
		log.Fatal(err)
	}
}

If you need more customization or/and control over the lock lifecycle, you can use DBManager and DBLock objects directly:

package main

import (
	"context"
	"database/sql"
	"log"
	"os"
	"time"

	"github.com/acronis/go-dbkit"
	"github.com/acronis/go-dbkit/distrlock"
)

func main() {
	// Setup database connection
	db, err := sql.Open("mysql", os.Getenv("MYSQL_DSN"))
	if err != nil {
		log.Fatal(err)
	}
	defer db.Close()

	// Create DBManager
	lockManager, err := distrlock.NewDBManager(dbkit.DialectMySQL,
		distrlock.WithTableName("my_distributed_locks"))
	if err != nil {
		log.Fatal(err)
	}

	ctx := context.Background()

	// Create table for locks.
	_, err = db.ExecContext(ctx, lockManager.CreateTableSQL())
	if err != nil {
		log.Fatal(err)
	}

	const lockKey = "test-lock-key-2" // Unique key that will be used to ensure exclusive execution among multiple instances

	// Create lock.
	lock, err := lockManager.NewLock(ctx, db, lockKey)
	if err != nil {
		log.Fatal(err)
	}

	// Acquire lock, do some work and release lock.
	const lockTTL = 10 * time.Second
	if err = lock.Acquire(ctx, db, lockTTL); err != nil {
		log.Fatal(err)
	}
	defer func() {
		if err = lock.Release(ctx, db); err != nil {
			log.Fatal(err)
		}
	}()

	time.Sleep(10 * time.Second) // Simulate work
}

License

Copyright © 2024 Acronis International GmbH.

Licensed under MIT License.

Documentation

Overview

Package distrlock contains DML (distributed lock manager) implementation (now DMLs based on MySQL and PostgreSQL are supported). Now only manager that uses SQL database (PostgreSQL and MySQL are currently supported) is available. Other implementations (for example, based on Redis) will probably be implemented in the future.

Index

Examples

Constants

View Source
const DefaultTableName = "distributed_locks"

DefaultTableName is a default name for the table that stores distributed locks.

Variables

View Source
var (
	ErrLockAlreadyAcquired = errors.New("distributed lock already acquired")
	ErrLockAlreadyReleased = errors.New("distributed lock already released")
)

Distributed lock errors.

Functions

func CreateTableSQL added in v0.4.0

func CreateTableSQL(dialect dbkit.Dialect) (string, error)

CreateTableSQL returns SQL query for creating a table that stores distributed locks. DefaultTableName is used for the table name. If you need to use a custom table name, construct DBManager and DBLock manually instead.

func DoExclusively added in v0.4.0

func DoExclusively(
	ctx context.Context,
	dbConn *sql.DB,
	dbDialect dbkit.Dialect,
	key string,
	fn func(ctx context.Context) error,
	options ...DoOption,
) error

DoExclusively acquires distributed lock, calls passed function and releases the lock when the function is finished. It's a ready-to-use helper function that creates a new DBManager, initializes a lock with the given key, and calls DoExclusively on it. DefaultTableName is used for the table name. If you need to use a custom table name, construct DBManager and DBLock manually instead. See DBLock.DoExclusively for more details.

Example
// Setup database connection
db, err := sql.Open("mysql", os.Getenv("MYSQL_DSN"))
if err != nil {
	log.Fatal(err)
}
defer db.Close()

ctx := context.Background()

// Create "distributed_locks" table for locks.
createTableSQL, err := distrlock.CreateTableSQL(dbkit.DialectMySQL)
if err != nil {
	log.Fatal(err)
}
_, err = db.ExecContext(ctx, createTableSQL)
if err != nil {
	log.Fatal(err)
}

// Do some work exclusively.
const lockKey = "test-lock-key-1" // Unique key that will be used to ensure exclusive execution among multiple instances
err = distrlock.DoExclusively(ctx, db, dbkit.DialectMySQL, lockKey, func(ctx context.Context) error {
	time.Sleep(10 * time.Second) // Simulate work.
	return nil
})
if err != nil {
	log.Fatal(err)
}

func DropTableSQL added in v0.4.0

func DropTableSQL(dialect dbkit.Dialect) (string, error)

DropTableSQL returns SQL query for dropping a table that stores distributed locks. DefaultTableName is used for the table name. If you need to use a custom table name, construct DBManager and DBLock manually instead.

Types

type DBLock

type DBLock struct {
	Key string
	TTL time.Duration
	// contains filtered or unexported fields
}

DBLock represents a lock object in the database.

func (*DBLock) Acquire

func (l *DBLock) Acquire(ctx context.Context, executor SQLExecutor, lockTTL time.Duration) error

Acquire acquires lock for the key in the database.

func (*DBLock) AcquireWithStaticToken

func (l *DBLock) AcquireWithStaticToken(ctx context.Context, executor SQLExecutor, token string, lockTTL time.Duration) error

AcquireWithStaticToken acquires lock for the key in the database with a static token.

There two use cases for this method:

  1. When you need to repeatably acquire the same lock preventing other processes from acquiring it at the same time. As an example, you can block an old version of workers before the upgrade and start a new version of them.
  2. When you need several processes to acquire the same lock.

Please use Acquire instead of this method unless you have a good reason to use it.

func (*DBLock) DoExclusively

func (l *DBLock) DoExclusively(
	ctx context.Context,
	dbConn *sql.DB,
	fn func(ctx context.Context) error,
	options ...DoOption,
) error

DoExclusively acquires distributed lock, calls passed function and releases the lock when the function is finished. Lock is acquired with a default TTL of 1 minute. TTL can be configured with WithLockTTL option. Additionally, the lock is extended periodically within a separate goroutine. Extension interval can be configured with WithPeriodicExtendInterval option. By default, it's half of the lock TTL. When the function is finished, acquired lock is released. Timeout for lock release can be configured with WithReleaseTimeout option. By default, it's 5 seconds.

func (*DBLock) Extend

func (l *DBLock) Extend(ctx context.Context, executor SQLExecutor) error

Extend resets expiration timeout for already acquired lock. ErrLockAlreadyReleased error will be returned if lock is already released, in this case lock should be acquired again.

func (*DBLock) Release

func (l *DBLock) Release(ctx context.Context, executor SQLExecutor) error

Release releases lock for the key in the database.

func (*DBLock) Token

func (l *DBLock) Token() string

Token returns token of the last acquired lock. May be used in logs to make the investigation process easier.

type DBManager

type DBManager struct {
	// contains filtered or unexported fields
}

DBManager provides management functionality for distributed locks based on the SQL database.

func NewDBManager

func NewDBManager(dialect dbkit.Dialect, options ...DBManagerOption) (*DBManager, error)

NewDBManager creates a new distributed lock manager that uses SQL database as a backend.

Example
// Setup database connection
db, err := sql.Open("mysql", os.Getenv("MYSQL_DSN"))
if err != nil {
	log.Fatal(err)
}
defer db.Close()

// Create DBManager
lockManager, err := distrlock.NewDBManager(dbkit.DialectMySQL,
	distrlock.WithTableName("my_distributed_locks"))
if err != nil {
	log.Fatal(err)
}

ctx := context.Background()

// Create table for locks.
_, err = db.ExecContext(ctx, lockManager.CreateTableSQL())
if err != nil {
	log.Fatal(err)
}

const lockKey = "test-lock-key-2" // Unique key that will be used to ensure exclusive execution among multiple instances

// Create lock.
lock, err := lockManager.NewLock(ctx, db, lockKey)
if err != nil {
	log.Fatal(err)
}

// Acquire lock, do some work and release lock.
const lockTTL = 10 * time.Second
if err = lock.Acquire(ctx, db, lockTTL); err != nil {
	log.Fatal(err)
}
defer func() {
	if err = lock.Release(ctx, db); err != nil {
		log.Fatal(err)
	}
}()

time.Sleep(10 * time.Second) // Simulate work

func (*DBManager) CreateTableSQL added in v0.4.0

func (m *DBManager) CreateTableSQL() string

CreateTableSQL returns SQL query for creating a table that stores distributed locks.

func (*DBManager) DropTableSQL added in v0.4.0

func (m *DBManager) DropTableSQL() string

DropTableSQL returns SQL query for dropping a table that stores distributed locks.

func (*DBManager) Migrations

func (m *DBManager) Migrations() []migrate.Migration

Migrations returns set of migrations that must be applied before creating new locks.

func (*DBManager) NewLock

func (m *DBManager) NewLock(ctx context.Context, executor SQLExecutor, key string) (DBLock, error)

NewLock creates new initialized (but not acquired) distributed lock.

type DBManagerOption added in v0.4.0

type DBManagerOption func(*dbManagerOptions)

DBManagerOption is an option for NewDBManager.

func WithTableName added in v0.4.0

func WithTableName(tableName string) DBManagerOption

WithTableName sets a custom table name for the table that stores distributed locks.

type DoOption added in v0.4.0

type DoOption func(*doOptions)

DoOption is an option for DoExclusively method.

func WithLockTTL added in v0.4.0

func WithLockTTL(ttl time.Duration) DoOption

WithLockTTL sets TTL for the lock acquired by DoExclusively.

func WithLogger added in v0.4.0

func WithLogger(logger Logger) DoOption

WithLogger sets logger for DoExclusively.

func WithPeriodicExtendInterval added in v0.4.0

func WithPeriodicExtendInterval(interval time.Duration) DoOption

WithPeriodicExtendInterval sets interval for periodic lock extension.

func WithReleaseTimeout added in v0.4.0

func WithReleaseTimeout(timeout time.Duration) DoOption

WithReleaseTimeout sets timeout for lock release.

type Logger added in v0.4.0

type Logger interface {
	Errorf(format string, args ...interface{})
}

Logger is an interface for logging errors.

type SQLExecutor added in v0.4.0

type SQLExecutor interface {
	ExecContext(ctx context.Context, query string, args ...interface{}) (sql.Result, error)
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL