pglock

package module
v3.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 9, 2026 License: MIT Imports: 2 Imported by: 1

README ΒΆ

πŸ”’ go-pglock

Build Status Go Report Card go.dev reference

🐘 Distributed locks using PostgreSQL session level advisory locks.

Table of Contents

πŸ“– Overview

go-pglock provides a simple and reliable way to implement distributed locks using PostgreSQL's advisory lock mechanism. This is useful when you need to coordinate access to shared resources across multiple processes or servers.

✨ Key Features
  • 🎯 Simple API: Easy-to-use interface for acquiring and releasing locks
  • ⚑ Non-blocking locks: Try to acquire a lock without waiting
  • ⏳ Blocking locks: Wait until a lock becomes available
  • πŸ“š Read-write locks: Support for shared (read) and exclusive (write) locks
  • ⏱️ Context support: Timeout and cancellation support for all operations
  • πŸ“¦ Lock stacking: Same session can acquire the same lock multiple times
  • 🧹 Automatic cleanup: Locks are automatically released when connections close
  • πŸ”Œ No external dependencies: Uses only PostgreSQL (no Redis, ZooKeeper, etc.)
  • πŸ’ͺ Battle-tested: Used in production environments
🎯 When to Use

Use go-pglock when you need to:

  • πŸ”„ Prevent duplicate execution of scheduled jobs across multiple servers
  • 🀝 Coordinate access to shared resources
  • πŸ‘‘ Implement leader election
  • βœ… Ensure only one instance processes a particular task
  • πŸ” Serialize access to critical sections in distributed systems
  • 🎰 Manage resource pools across multiple processes

πŸ“¦ Installation

go get github.com/allisson/go-pglock/v3

Requirements:

  • πŸ”΅ Go 1.17 or higher
  • 🐘 PostgreSQL 9.6 or higher

πŸš€ Quick Start

package main

import (
    "context"
    "database/sql"
    "fmt"
    "log"

    "github.com/allisson/go-pglock/v3"
    _ "github.com/lib/pq"
)

func main() {
    // Connect to PostgreSQL
    db, err := sql.Open("postgres", "postgres://user:pass@localhost/mydb?sslmode=disable")
    if err != nil {
        log.Fatal(err)
    }
    defer db.Close()

    ctx := context.Background()

    // Create a lock with ID 1
    lock, err := pglock.NewLock(ctx, 1, db)
    if err != nil {
        log.Fatal(err)
    }
    defer lock.Close()

    // Try to acquire the lock
    acquired, err := lock.Lock(ctx)
    if err != nil {
        log.Fatal(err)
    }

    if acquired {
        fmt.Println("Lock acquired! Doing work...")
        // Do your work here
        
        // Release the lock
        if err := lock.Unlock(ctx); err != nil {
            log.Fatal(err)
        }
        fmt.Println("Lock released!")
    } else {
        fmt.Println("Could not acquire lock - another process has it")
    }
}

βš™οΈ How It Works

PostgreSQL advisory locks are a powerful feature for implementing distributed locking:

  • πŸ”— Session-level locks: Locks are held until explicitly released or the database connection closes
  • 🏷️ Application-defined: You define the meaning of each lock using a numeric identifier (int64)
  • ⚑ Fast and efficient: No table bloat, faster than row-level locks
  • 🧹 Automatic cleanup: Server automatically releases locks when sessions end
  • πŸ“š Lock stacking: A session can acquire the same lock multiple times (requires equal unlocks)

From the PostgreSQL documentation:

PostgreSQL provides a means for creating locks that have application-defined meanings. These are called advisory locks, because the system does not enforce their use β€” it is up to the application to use them correctly. Advisory locks can be useful for locking strategies that are an awkward fit for the MVCC model.

πŸ“š API Reference

Types
Locker Interface
type Locker interface {
    Lock(ctx context.Context) (bool, error)
    RLock(ctx context.Context) (bool, error)
    WaitAndLock(ctx context.Context) error
    WaitAndRLock(ctx context.Context) error
    Unlock(ctx context.Context) error
    RUnlock(ctx context.Context) error
    Close() error
}
Lock Struct
type Lock struct {
    // contains filtered or unexported fields
}
Functions
NewLock(ctx context.Context, id int64, db *sql.DB) (Lock, error)

Creates a new Lock instance with a dedicated database connection.

  • ctx: Context for managing the connection acquisition
  • id: The lock identifier (int64)
  • db: Database connection pool
  • Returns: Lock instance and error
Lock(ctx context.Context) (bool, error)

Attempts to acquire an exclusive lock without waiting. Returns immediately with true if acquired, false otherwise.

RLock(ctx context.Context) (bool, error)

Attempts to acquire a shared (read) lock without waiting. Multiple sessions can hold shared locks simultaneously, but shared locks conflict with exclusive locks. Returns true if acquired, false otherwise.

WaitAndLock(ctx context.Context) error

Blocks until an exclusive lock is acquired. Respects context cancellation and timeouts.

WaitAndRLock(ctx context.Context) error

Blocks until a shared (read) lock is acquired. Multiple sessions can acquire shared locks concurrently. Respects context cancellation and timeouts.

Unlock(ctx context.Context) error

Releases one level of exclusive lock ownership. Must be called equal to the number of Lock/WaitAndLock calls.

RUnlock(ctx context.Context) error

Releases one level of shared lock ownership. Must be called equal to the number of RLock/WaitAndRLock calls.

Close() error

Closes the database connection and releases all locks (both exclusive and shared).

πŸ” Lock Types

πŸ”’ Exclusive Locks (Write Locks)

Exclusive locks are mutually exclusive with all other locks (both exclusive and shared):

  • βœ… Only one session can hold an exclusive lock at a time
  • ❌ No other locks (exclusive or shared) can be acquired while an exclusive lock is held
  • ✏️ Use for write operations or when you need exclusive access to a resource
  • 🎯 Acquired with Lock() or WaitAndLock()
  • πŸ”“ Released with Unlock()
πŸ“– Shared Locks (Read Locks)

Shared locks allow multiple concurrent readers but prevent writers:

  • πŸ‘₯ Multiple sessions can hold shared locks simultaneously
  • ⚠️ Shared locks conflict with exclusive locks (writers)
  • πŸš€ Perfect for read-heavy workloads where multiple readers can safely access a resource
  • πŸ“š Use when you need to read data but prevent writes during the read
  • 🎯 Acquired with RLock() or WaitAndRLock()
  • πŸ”“ Released with RUnlock()
Lock Compatibility Matrix
Current Lock Lock() RLock()
None βœ… Succeeds βœ… Succeeds
Exclusive ❌ Blocks ❌ Blocks
Shared ❌ Blocks βœ… Succeeds

πŸ’‘ Examples

Basic Lock Usage
package main

import (
    "context"
    "database/sql"
    "fmt"
    "log"

    "github.com/allisson/go-pglock/v3"
    _ "github.com/lib/pq"
)

func main() {
    db, err := sql.Open("postgres", "postgres://user:pass@localhost/mydb?sslmode=disable")
    if err != nil {
        log.Fatal(err)
    }
    defer db.Close()

    ctx := context.Background()
    lockID := int64(100)

    // Create lock
    lock, err := pglock.NewLock(ctx, lockID, db)
    if err != nil {
        log.Fatal(err)
    }
    defer lock.Close()

    // Acquire lock
    acquired, err := lock.Lock(ctx)
    if err != nil {
        log.Fatal(err)
    }

    if !acquired {
        fmt.Println("Lock is held by another process")
        return
    }

    // Critical section
    fmt.Println("Executing critical section...")
    // Your code here

    // Release lock
    if err := lock.Unlock(ctx); err != nil {
        log.Fatal(err)
    }
}
Try Lock (Non-blocking)

Perfect for scenarios where you want to skip work if another process is already doing it.

func processDataIfAvailable(db *sql.DB) error {
    ctx := context.Background()
    lockID := int64(200)

    lock, err := pglock.NewLock(ctx, lockID, db)
    if err != nil {
        return err
    }
    defer lock.Close()

    // Try to acquire without waiting
    acquired, err := lock.Lock(ctx)
    if err != nil {
        return err
    }

    if !acquired {
        fmt.Println("Another process is already processing data, skipping...")
        return nil
    }
    defer lock.Unlock(ctx)

    // Process data
    fmt.Println("Processing data...")
    // Your processing logic here
    
    return nil
}
Wait and Lock (Blocking)

Use when you must execute the task eventually, even if you have to wait.

func processDataAndWait(db *sql.DB) error {
    ctx := context.Background()
    lockID := int64(300)

    lock, err := pglock.NewLock(ctx, lockID, db)
    if err != nil {
        return err
    }
    defer lock.Close()

    fmt.Println("Waiting for lock...")
    
    // Wait until lock is available
    if err := lock.WaitAndLock(ctx); err != nil {
        return err
    }
    defer lock.Unlock(ctx)

    fmt.Println("Lock acquired, processing data...")
    // Your processing logic here
    
    return nil
}
Lock with Timeout

Implement a timeout to avoid waiting indefinitely.

func processWithTimeout(db *sql.DB, timeout time.Duration) error {
    ctx, cancel := context.WithTimeout(context.Background(), timeout)
    defer cancel()

    lockID := int64(400)

    lock, err := pglock.NewLock(ctx, lockID, db)
    if err != nil {
        return err
    }
    defer lock.Close()

    // This will fail if lock is not acquired within timeout
    if err := lock.WaitAndLock(ctx); err != nil {
        if errors.Is(err, context.DeadlineExceeded) {
            return fmt.Errorf("could not acquire lock within %v", timeout)
        }
        return err
    }
    defer lock.Unlock(context.Background()) // Use background for cleanup

    fmt.Println("Lock acquired, processing...")
    // Your processing logic here
    
    return nil
}
Concurrent Workers

Coordinate multiple workers accessing a shared resource.

func runWorker(workerID int, db *sql.DB, wg *sync.WaitGroup) {
    defer wg.Done()

    ctx := context.Background()
    lockID := int64(500) // Same lock ID for all workers

    lock, err := pglock.NewLock(ctx, lockID, db)
    if err != nil {
        log.Printf("Worker %d: failed to create lock: %v", workerID, err)
        return
    }
    defer lock.Close()

    fmt.Printf("Worker %d: waiting for lock...\n", workerID)
    
    if err := lock.WaitAndLock(ctx); err != nil {
        log.Printf("Worker %d: failed to acquire lock: %v", workerID, err)
        return
    }
    
    fmt.Printf("Worker %d: acquired lock, processing...\n", workerID)
    
    // Simulate work
    time.Sleep(1 * time.Second)
    
    fmt.Printf("Worker %d: releasing lock\n", workerID)
    lock.Unlock(ctx)
}

func main() {
    db, _ := sql.Open("postgres", "postgres://user:pass@localhost/mydb?sslmode=disable")
    defer db.Close()

    var wg sync.WaitGroup
    for i := 1; i <= 5; i++ {
        wg.Add(1)
        go runWorker(i, db, &wg)
    }
    wg.Wait()
}
Distributed Task Execution

Ensure a task runs only once across multiple servers.

type TaskProcessor struct {
    db *sql.DB
}

func (tp *TaskProcessor) ProcessTask(taskID string) error {
    ctx := context.Background()
    
    // Use hash of task ID as lock ID
    lockID := hashToInt64(taskID)

    lock, err := pglock.NewLock(ctx, lockID, tp.db)
    if err != nil {
        return err
    }
    defer lock.Close()

    // Try to acquire lock
    acquired, err := lock.Lock(ctx)
    if err != nil {
        return err
    }

    if !acquired {
        return fmt.Errorf("task %s is already being processed", taskID)
    }
    defer lock.Unlock(ctx)

    fmt.Printf("Processing task %s...\n", taskID)
    
    // Execute task
    if err := tp.executeTask(taskID); err != nil {
        return fmt.Errorf("failed to execute task: %w", err)
    }

    fmt.Printf("Task %s completed\n", taskID)
    return nil
}

func (tp *TaskProcessor) executeTask(taskID string) error {
    // Your task execution logic
    time.Sleep(2 * time.Second)
    return nil
}

func hashToInt64(s string) int64 {
    h := fnv.New64a()
    h.Write([]byte(s))
    return int64(h.Sum64())
}
Leader Election

Implement leader election in a cluster of services.

type LeaderElector struct {
    db       *sql.DB
    lockID   int64
    isLeader bool
    mu       sync.RWMutex
}

func NewLeaderElector(db *sql.DB, clusterName string) *LeaderElector {
    return &LeaderElector{
        db:     db,
        lockID: hashToInt64(clusterName),
    }
}

func (le *LeaderElector) RunElection(ctx context.Context) error {
    lock, err := pglock.NewLock(ctx, le.lockID, le.db)
    if err != nil {
        return err
    }
    defer lock.Close()

    // Try to become leader
    acquired, err := lock.Lock(ctx)
    if err != nil {
        return err
    }

    if acquired {
        le.mu.Lock()
        le.isLeader = true
        le.mu.Unlock()

        fmt.Println("βœ“ Became leader")
        defer func() {
            le.mu.Lock()
            le.isLeader = false
            le.mu.Unlock()
            lock.Unlock(context.Background())
            fmt.Println("βœ— Lost leadership")
        }()

        // Perform leader duties
        le.performLeaderDuties(ctx)
    } else {
        fmt.Println("Another instance is the leader")
    }

    return nil
}

func (le *LeaderElector) IsLeader() bool {
    le.mu.RLock()
    defer le.mu.RUnlock()
    return le.isLeader
}

func (le *LeaderElector) performLeaderDuties(ctx context.Context) {
    ticker := time.NewTicker(5 * time.Second)
    defer ticker.Stop()

    for {
        select {
        case <-ctx.Done():
            return
        case <-ticker.C:
            fmt.Println("Leader performing periodic task...")
            // Do leader work
        }
    }
}
Resource Pool Management

Manage a limited pool of resources across multiple processes.

type ResourcePool struct {
    db       *sql.DB
    poolSize int
}

func NewResourcePool(db *sql.DB, poolSize int) *ResourcePool {
    return &ResourcePool{
        db:       db,
        poolSize: poolSize,
    }
}

// AcquireResource tries to acquire one resource from the pool
func (rp *ResourcePool) AcquireResource(ctx context.Context) (resourceID int, release func(), err error) {
    // Try each resource slot
    for i := 1; i <= rp.poolSize; i++ {
        lockID := int64(10000 + i) // Base offset + slot number
        
        lock, err := pglock.NewLock(ctx, lockID, rp.db)
        if err != nil {
            continue
        }

        // Try to acquire this slot (non-blocking)
        acquired, err := lock.Lock(ctx)
        if err != nil {
            lock.Close()
            continue
        }

        if acquired {
            // Successfully acquired this resource slot
            release := func() {
                lock.Unlock(context.Background())
                lock.Close()
            }
            return i, release, nil
        }

        lock.Close()
    }

    return 0, nil, fmt.Errorf("no resources available in pool")
}

func main() {
    db, _ := sql.Open("postgres", "postgres://user:pass@localhost/mydb?sslmode=disable")
    defer db.Close()

    pool := NewResourcePool(db, 3) // Pool of 3 resources

    ctx := context.Background()
    resourceID, release, err := pool.AcquireResource(ctx)
    if err != nil {
        log.Fatal(err)
    }
    defer release()

    fmt.Printf("Acquired resource %d\n", resourceID)
    
    // Use the resource
    time.Sleep(2 * time.Second)
    
    fmt.Printf("Releasing resource %d\n", resourceID)
}
Database Migration Lock

Ensure database migrations run only once in multi-instance deployments.

type MigrationRunner struct {
    db *sql.DB
}

func (mr *MigrationRunner) RunMigrations(ctx context.Context) error {
    const migrationLockID = int64(999999)

    lock, err := pglock.NewLock(ctx, migrationLockID, mr.db)
    if err != nil {
        return fmt.Errorf("failed to create migration lock: %w", err)
    }
    defer lock.Close()

    fmt.Println("Attempting to acquire migration lock...")

    // Use a timeout to avoid waiting too long
    lockCtx, cancel := context.WithTimeout(ctx, 30*time.Second)
    defer cancel()

    if err := lock.WaitAndLock(lockCtx); err != nil {
        return fmt.Errorf("failed to acquire migration lock: %w", err)
    }
    defer lock.Unlock(context.Background())

    fmt.Println("Migration lock acquired, checking migration status...")

    // Check if migrations are needed
    needsMigration, err := mr.checkMigrationStatus()
    if err != nil {
        return err
    }

    if !needsMigration {
        fmt.Println("Database is up to date")
        return nil
    }

    // Run migrations
    fmt.Println("Running migrations...")
    if err := mr.executeMigrations(); err != nil {
        return fmt.Errorf("migration failed: %w", err)
    }

    fmt.Println("Migrations completed successfully")
    return nil
}

func (mr *MigrationRunner) checkMigrationStatus() (bool, error) {
    // Check if migrations are needed
    // This is application-specific logic
    return true, nil
}

func (mr *MigrationRunner) executeMigrations() error {
    // Execute your migrations
    time.Sleep(2 * time.Second) // Simulate migration work
    return nil
}
Scheduled Job Coordination

Coordinate scheduled jobs across multiple instances to prevent duplicate execution.

type ScheduledJob struct {
    db     *sql.DB
    jobID  string
    lockID int64
}

func NewScheduledJob(db *sql.DB, jobID string) *ScheduledJob {
    return &ScheduledJob{
        db:     db,
        jobID:  jobID,
        lockID: hashToInt64(jobID),
    }
}

func (sj *ScheduledJob) Execute(ctx context.Context) error {
    lock, err := pglock.NewLock(ctx, sj.lockID, sj.db)
    if err != nil {
        return fmt.Errorf("failed to create lock: %w", err)
    }
    defer lock.Close()

    // Try to acquire lock (non-blocking)
    acquired, err := lock.Lock(ctx)
    if err != nil {
        return fmt.Errorf("failed to acquire lock: %w", err)
    }

    if !acquired {
        fmt.Printf("Job %s is already running on another instance\n", sj.jobID)
        return nil
    }
    defer lock.Unlock(ctx)

    fmt.Printf("Executing job %s...\n", sj.jobID)
    
    // Execute the actual job
    if err := sj.run(ctx); err != nil {
        return fmt.Errorf("job execution failed: %w", err)
    }

    fmt.Printf("Job %s completed\n", sj.jobID)
    return nil
}

func (sj *ScheduledJob) run(ctx context.Context) error {
    // Your job logic here
    select {
    case <-time.After(5 * time.Second):
        // Job completed
        return nil
    case <-ctx.Done():
        return ctx.Err()
    }
}

func main() {
    db, _ := sql.Open("postgres", "postgres://user:pass@localhost/mydb?sslmode=disable")
    defer db.Close()

    // Simulate a cron job running every minute
    ticker := time.NewTicker(1 * time.Minute)
    defer ticker.Stop()

    ctx := context.Background()
    job := NewScheduledJob(db, "cleanup-task")

    for {
        select {
        case <-ticker.C:
            if err := job.Execute(ctx); err != nil {
                log.Printf("Job execution error: %v", err)
            }
        }
    }
}
Read-Write Lock: Multiple Readers, Single Writer

Use shared locks to allow multiple readers while preventing writers.

type DataCache struct {
	db       *sql.DB
	recordID string
	lockID   int64
}

func NewDataCache(db *sql.DB, recordID string) *DataCache {
	return &DataCache{
		db:       db,
		recordID: recordID,
		lockID:   hashToInt64("cache-" + recordID),
	}
}

// ReadData acquires a shared lock for reading
func (dc *DataCache) ReadData(ctx context.Context) (string, error) {
	lock, err := pglock.NewLock(ctx, dc.lockID, dc.db)
	if err != nil {
		return "", err
	}
	defer lock.Close()

	// Acquire shared lock - multiple readers can hold this simultaneously
	if err := lock.WaitAndRLock(ctx); err != nil {
		return "", fmt.Errorf("failed to acquire read lock: %w", err)
	}
	defer lock.RUnlock(ctx)

	fmt.Println("Reading data... (shared lock held)")
	// Simulate reading from database or cache
	time.Sleep(100 * time.Millisecond)
	data := "cached-data-for-" + dc.recordID

	return data, nil
}

// WriteData acquires an exclusive lock for writing
func (dc *DataCache) WriteData(ctx context.Context, data string) error {
	lock, err := pglock.NewLock(ctx, dc.lockID, dc.db)
	if err != nil {
		return err
	}
	defer lock.Close()

	// Acquire exclusive lock - blocks all other locks (read and write)
	if err := lock.WaitAndLock(ctx); err != nil {
		return fmt.Errorf("failed to acquire write lock: %w", err)
	}
	defer lock.Unlock(ctx)

	fmt.Println("Writing data... (exclusive lock held)")
	// Simulate writing to database and invalidating cache
	time.Sleep(200 * time.Millisecond)

	return nil
}

func main() {
	db, _ := sql.Open("postgres", "postgres://user:pass@localhost/mydb?sslmode=disable")
	defer db.Close()

	cache := NewDataCache(db, "user-123")
	ctx := context.Background()

	var wg sync.WaitGroup

	// Spawn 5 concurrent readers
	for i := 1; i <= 5; i++ {
		wg.Add(1)
		go func(readerID int) {
			defer wg.Done()
			data, err := cache.ReadData(ctx)
			if err != nil {
				log.Printf("Reader %d error: %v", readerID, err)
				return
			}
			fmt.Printf("Reader %d got: %s\n", readerID, data)
		}(i)
	}

	// Spawn 1 writer after a delay
	time.Sleep(50 * time.Millisecond)
	wg.Add(1)
	go func() {
		defer wg.Done()
		if err := cache.WriteData(ctx, "new-data"); err != nil {
			log.Printf("Writer error: %v", err)
		}
		fmt.Println("Writer completed")
	}()

	wg.Wait()
}
Read-Write Lock: Configuration Management

Manage application configuration with frequent reads and rare writes.

type ConfigManager struct {
	db     *sql.DB
	lockID int64
}

func NewConfigManager(db *sql.DB) *ConfigManager {
	return &ConfigManager{
		db:     db,
		lockID: hashToInt64("app-config"),
	}
}

// GetConfig reads configuration (uses shared lock)
func (cm *ConfigManager) GetConfig(ctx context.Context) (map[string]string, error) {
	lock, err := pglock.NewLock(ctx, cm.lockID, cm.db)
	if err != nil {
		return nil, err
	}
	defer lock.Close()

	// Use shared lock - allows multiple concurrent readers
	acquired, err := lock.RLock(ctx)
	if err != nil {
		return nil, err
	}

	if !acquired {
		return nil, fmt.Errorf("config is being updated, try again")
	}
	defer lock.RUnlock(ctx)

	// Read config from database
	fmt.Println("Reading configuration...")
	config := map[string]string{
		"db_host":     "localhost",
		"db_port":     "5432",
		"max_workers": "10",
	}

	return config, nil
}

// UpdateConfig writes configuration (uses exclusive lock)
func (cm *ConfigManager) UpdateConfig(ctx context.Context, updates map[string]string) error {
	lock, err := pglock.NewLock(ctx, cm.lockID, cm.db)
	if err != nil {
		return err
	}
	defer lock.Close()

	// Use exclusive lock with timeout - blocks all readers and writers
	lockCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
	defer cancel()

	if err := lock.WaitAndLock(lockCtx); err != nil {
		return fmt.Errorf("failed to acquire exclusive lock for config update: %w", err)
	}
	defer lock.Unlock(context.Background())

	fmt.Println("Updating configuration...")
	// Write config to database
	time.Sleep(100 * time.Millisecond)

	fmt.Printf("Configuration updated with %d keys\n", len(updates))
	return nil
}

func main() {
	db, _ := sql.Open("postgres", "postgres://user:pass@localhost/mydb?sslmode=disable")
	defer db.Close()

	cm := NewConfigManager(db)
	ctx := context.Background()

	// Multiple services reading config concurrently
	var wg sync.WaitGroup
	for i := 1; i <= 3; i++ {
		wg.Add(1)
		go func(serviceID int) {
			defer wg.Done()
			config, err := cm.GetConfig(ctx)
			if err != nil {
				log.Printf("Service %d: %v", serviceID, err)
				return
			}
			fmt.Printf("Service %d loaded %d config keys\n", serviceID, len(config))
		}(i)
	}

	// Admin updating config
	wg.Add(1)
	go func() {
		defer wg.Done()
		updates := map[string]string{"max_workers": "20"}
		if err := cm.UpdateConfig(ctx, updates); err != nil {
			log.Printf("Update failed: %v", err)
		}
	}()

	wg.Wait()
}
Read-Write Lock: Report Generation

Allow multiple users to view reports while preventing generation conflicts.

type ReportGenerator struct {
	db     *sql.DB
	lockID int64
}

func NewReportGenerator(db *sql.DB, reportType string) *ReportGenerator {
	return &ReportGenerator{
		db:     db,
		lockID: hashToInt64("report-" + reportType),
	}
}

// ViewReport reads the report (uses shared lock)
func (rg *ReportGenerator) ViewReport(ctx context.Context, userID string) error {
	lock, err := pglock.NewLock(ctx, rg.lockID, rg.db)
	if err != nil {
		return err
	}
	defer lock.Close()

	// Try to acquire shared lock (non-blocking)
	acquired, err := lock.RLock(ctx)
	if err != nil {
		return err
	}

	if !acquired {
		return fmt.Errorf("report is being generated, please wait")
	}
	defer lock.RUnlock(ctx)

	fmt.Printf("User %s viewing report...\n", userID)
	time.Sleep(500 * time.Millisecond) // Simulate viewing

	return nil
}

// GenerateReport creates/updates the report (uses exclusive lock)
func (rg *ReportGenerator) GenerateReport(ctx context.Context) error {
	lock, err := pglock.NewLock(ctx, rg.lockID, rg.db)
	if err != nil {
		return err
	}
	defer lock.Close()

	// Try to acquire exclusive lock (non-blocking)
	acquired, err := lock.Lock(ctx)
	if err != nil {
		return err
	}

	if !acquired {
		return fmt.Errorf("report generation already in progress or being viewed")
	}
	defer lock.Unlock(ctx)

	fmt.Println("Generating report...")
	time.Sleep(2 * time.Second) // Simulate generation
	fmt.Println("Report generation completed")

	return nil
}

βœ… Best Practices

1. πŸ”’ Always Close Locks

Use defer to ensure locks are closed even if errors occur:

lock, err := pglock.NewLock(ctx, lockID, db)
if err != nil {
    return err
}
defer lock.Close() // Always close to release the connection
2. πŸ”„ Match Lock and Unlock Calls

Locks stack, so ensure you unlock as many times as you lock:

// Acquired twice
lock.Lock(ctx)
lock.Lock(ctx)

// Must unlock twice
lock.Unlock(ctx)
lock.Unlock(ctx)

The same applies to shared locks:

// Acquired twice
lock.RLock(ctx)
lock.RLock(ctx)

// Must unlock twice
lock.RUnlock(ctx)
lock.RUnlock(ctx)
3. βœ… Use Correct Lock and Unlock Pairs

Always pair the correct lock and unlock methods:

// Correct pairs
lock.Lock(ctx)   // Use Unlock()
lock.Unlock(ctx)

lock.RLock(ctx)   // Use RUnlock()
lock.RUnlock(ctx)

// Wrong - don't mix them!
// lock.Lock(ctx)
// lock.RUnlock(ctx)  // Wrong!
4. ⏱️ Use Context Timeouts

Prevent indefinite waiting with context timeouts:

ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()

if err := lock.WaitAndLock(ctx); err != nil {
    // Handle timeout
}
5. 🎯 Choose Appropriate Lock IDs
  • πŸ“ Use meaningful, deterministic IDs based on resource names
  • #️⃣ Use hash functions for string-based identifiers
  • πŸ“‹ Document your lock ID allocation strategy
// Good: Deterministic based on resource
lockID := hashToInt64("user-" + userID)

// Avoid: Random or non-deterministic IDs
lockID := rand.Int63() // Bad!
6. ⚠️ Handle Lock Acquisition Failures

Always check if lock acquisition succeeded:

acquired, err := lock.Lock(ctx)
if err != nil {
    // Handle error
}
if !acquired {
    // Handle case where lock is held by another process
}
7. πŸ”Œ Use Connection Pooling Wisely

Each lock holds a dedicated connection. Consider your connection pool size:

// Configure appropriate pool size
db.SetMaxOpenConns(50) // Ensure enough connections for locks + queries
8. 🎯 Choose the Right Lock Type

Use the appropriate lock type for your use case:

  • πŸ”’ Exclusive locks (Lock/Unlock): Use when you need to modify data or require exclusive access
  • πŸ“– Shared locks (RLock/RUnlock): Use for read operations where multiple readers can work concurrently
// Reading data - use shared lock
acquired, _ := lock.RLock(ctx)
defer lock.RUnlock(ctx)
// Multiple readers can read simultaneously

// Writing data - use exclusive lock
acquired, _ := lock.Lock(ctx)
defer lock.Unlock(ctx)
// Only one writer, blocks all readers and writers
9. 🎚️ Consider Lock Granularity
  • πŸ”¬ Fine-grained locks: Better concurrency, more complex
  • 🎯 Coarse-grained locks: Simpler, but may reduce throughput
10. πŸ§ͺ Testing with Locks

When testing code that uses locks, consider using different lock IDs per test:

func TestMyFunction(t *testing.T) {
    lockID := int64(time.Now().UnixNano()) // Unique per test run
    // ... test code
}

πŸ§ͺ Testing

Running Tests Locally

The project includes a Docker Compose setup for easy local testing:

# Start PostgreSQL and run tests
make test-local

# Run tests with race detector
make test-race

# Generate coverage report
make test-coverage
Manual Testing
# Start PostgreSQL
docker-compose up -d

# Set DATABASE_URL
export DATABASE_URL='postgres://test:test@localhost:5432/pglock?sslmode=disable'

# Run tests
go test -v ./...

# Clean up
docker-compose down

πŸ”§ Troubleshooting

❌ "pq: database "pglock" does not exist"

Create the database:

CREATE DATABASE pglock;
⚠️ "too many connections"

Increase PostgreSQL's max_connections or reduce your application's connection pool size:

db.SetMaxOpenConns(25) // Reduce if hitting connection limits
πŸ”’ Deadlocks

Advisory locks can deadlock if acquired in different orders. Always acquire locks in a consistent order:

// Good: Consistent order
lockA := getLock(1)
lockB := getLock(2)

// Bad: Inconsistent order can cause deadlocks
if someCondition {
    lockA, then lockB
} else {
    lockB, then lockA
}
πŸ”“ Lock Not Released

Locks are automatically released when:

  • βœ… Unlock() is called
  • βœ… Close() is called
  • βœ… Database connection closes
  • βœ… Database session ends

If locks aren't releasing, check for:

  • ❌ Missing Unlock() calls
  • ⚠️ Connection leaks
  • πŸ’₯ Application crashes before cleanup
⏱️ Context Deadline Exceeded

If you see context deadline errors, either:

  • ⏫ Increase the timeout
  • πŸ” Investigate why locks are held for so long
  • πŸ”„ Use non-blocking Lock() instead of WaitAndLock()

🀝 Contributing

Contributions are welcome! Please feel free to submit a Pull Request.

πŸ“„ License

This project is licensed under the MIT License - see the LICENSE file for details.

Documentation ΒΆ

Overview ΒΆ

Package pglock provides distributed locks using PostgreSQL session level advisory locks.

PostgreSQL advisory locks are application-defined locks that can be useful for locking strategies that are an awkward fit for the MVCC model. They are faster than table-based locking mechanisms, avoid table bloat, and are automatically cleaned up by the server at the end of the session.

This package uses session-level advisory locks, which are held until explicitly released or the session ends. Unlike transaction-level locks, session-level advisory locks do not honor transaction semantics: a lock acquired during a transaction that is later rolled back will still be held following the rollback.

Index ΒΆ

Constants ΒΆ

This section is empty.

Variables ΒΆ

This section is empty.

Functions ΒΆ

This section is empty.

Types ΒΆ

type Lock ΒΆ

type Lock struct {
	// contains filtered or unexported fields
}

Lock implements the Locker interface using PostgreSQL advisory locks.

A Lock holds a dedicated database connection and a lock identifier. The connection is obtained from a connection pool and is held for the lifetime of the Lock instance to maintain the session-level advisory lock.

func NewLock ΒΆ

func NewLock(ctx context.Context, id int64, db *sql.DB) (Lock, error)

NewLock creates a new Lock instance with a dedicated database connection.

This function obtains a connection from the provided database connection pool and stores it for use in lock and unlock operations. The connection is held for the lifetime of the Lock instance to maintain session-level advisory locks.

Parameters:

  • ctx: Context for managing the connection acquisition
  • id: The lock identifier (a 64-bit integer used as the PostgreSQL advisory lock key)
  • db: A database connection pool from which to obtain a dedicated connection

The caller is responsible for calling Close on the returned Lock to release the connection back to the pool and clean up any held advisory locks.

Returns a Lock instance and an error if the connection cannot be obtained.

func (*Lock) Close ΒΆ

func (l *Lock) Close() error

Close closes the database connection, releasing all advisory locks held by this Lock.

Since advisory locks are automatically cleaned up when a database session ends, closing the connection will release all locks held on this connection, regardless of how many times they were acquired. This provides a reliable way to ensure all locks are released when the Lock instance is no longer needed.

After calling Close, the Lock instance should not be used for any further operations. Returns an error if closing the connection fails.

func (*Lock) Lock ΒΆ

func (l *Lock) Lock(ctx context.Context) (bool, error)

Lock attempts to obtain an exclusive session level advisory lock without waiting.

This method uses PostgreSQL's pg_try_advisory_lock function, which is non-blocking. It will either obtain the lock immediately and return true, or return false if the lock is already held by another session. This is similar to WaitAndLock, except it will not wait for the lock to become available.

Multiple lock requests stack within the same session, meaning if a resource is locked three times, it must be unlocked three times to be fully released.

Returns true if the lock was successfully acquired, false if it's already held by another session, and an error if the database operation fails.

func (*Lock) RLock ΒΆ added in v3.2.0

func (l *Lock) RLock(ctx context.Context) (bool, error)

RLock attempts to obtain a shared session level advisory lock without waiting.

This method uses PostgreSQL's pg_try_advisory_lock_shared function, which is non-blocking. It will either obtain the shared lock immediately and return true, or return false if an exclusive lock is already held by another session. Multiple sessions can hold shared locks simultaneously, but shared locks conflict with exclusive locks.

Shared locks are ideal for read operations where multiple readers can safely access a resource concurrently, but writers need to be prevented from modifying it during the read.

Multiple lock requests stack within the same session, meaning if a shared lock is acquired three times, it must be released three times to be fully released.

Returns true if the shared lock was successfully acquired, false if an exclusive lock is held by another session, and an error if the database operation fails.

func (*Lock) RUnlock ΒΆ added in v3.2.0

func (l *Lock) RUnlock(ctx context.Context) error

RUnlock releases a previously acquired shared advisory lock.

This method uses PostgreSQL's pg_advisory_unlock_shared function to release one level of shared lock ownership. Because shared lock requests stack within a session, each RUnlock call only decrements the shared lock count by one. If the same shared lock was acquired multiple times, it must be unlocked the same number of times to be fully released.

Note that unlocking a shared lock that is not currently held will not return an error, but may have unexpected consequences in PostgreSQL. It's the caller's responsibility to ensure shared locks and unlocks are properly paired, and to use RUnlock only for locks acquired with RLock or WaitAndRLock (not with Lock or WaitAndLock).

Returns an error if the database operation fails.

func (*Lock) Unlock ΒΆ

func (l *Lock) Unlock(ctx context.Context) error

Unlock releases a previously acquired advisory lock.

This method uses PostgreSQL's pg_advisory_unlock function to release one level of lock ownership. Because lock requests stack within a session, each Unlock call only decrements the lock count by one. If the same lock was acquired multiple times, it must be unlocked the same number of times to be fully released.

Note that unlocking a lock that is not currently held will not return an error, but may have unexpected consequences in PostgreSQL. It's the caller's responsibility to ensure locks and unlocks are properly paired.

Returns an error if the database operation fails.

func (*Lock) WaitAndLock ΒΆ

func (l *Lock) WaitAndLock(ctx context.Context) error

WaitAndLock obtains an exclusive session level advisory lock, waiting if necessary.

This method uses PostgreSQL's pg_advisory_lock function, which will block until the lock becomes available. If another session already holds a lock on the same resource identifier, this function will wait until the resource becomes available.

Multiple lock requests stack within the same session, meaning if a resource is locked three times, it must be unlocked three times to be fully released. If the session already holds the given advisory lock, additional requests will always succeed immediately.

The lock persists until explicitly released via Unlock or until the session ends. Returns an error if the database operation fails or if the context is cancelled while waiting for the lock.

func (*Lock) WaitAndRLock ΒΆ added in v3.2.0

func (l *Lock) WaitAndRLock(ctx context.Context) error

WaitAndRLock obtains a shared session level advisory lock, waiting if necessary.

This method uses PostgreSQL's pg_advisory_lock_shared function, which will block until the lock becomes available. If another session holds an exclusive lock on the same resource identifier, this function will wait until the exclusive lock is released. Multiple sessions can hold shared locks concurrently.

Shared locks are ideal for read operations where multiple readers can safely access a resource at the same time, but need to prevent writers from modifying it during reads.

Multiple lock requests stack within the same session, meaning if a shared lock is acquired three times, it must be released three times to be fully released. If the session already holds the given shared advisory lock, additional requests will always succeed immediately.

The lock persists until explicitly released via RUnlock or until the session ends. Returns an error if the database operation fails or if the context is cancelled while waiting for the lock.

type Locker ΒΆ

type Locker interface {
	Lock(ctx context.Context) (bool, error)
	RLock(ctx context.Context) (bool, error)
	WaitAndLock(ctx context.Context) error
	WaitAndRLock(ctx context.Context) error
	Unlock(ctx context.Context) error
	RUnlock(ctx context.Context) error
	Close() error
}

Locker is an interface for PostgreSQL advisory locks.

All methods use PostgreSQL session-level advisory locks, which persist until explicitly released or the database connection is closed. These locks are based on a numeric identifier and can be used to coordinate access to shared resources across multiple database connections.

Directories ΒΆ

Path Synopsis
examples
basic command
leader-election command
task-processing command
timeout command
workers command

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL