π go-pglock

π Distributed locks using PostgreSQL session level advisory locks.
Table of Contents
π Overview
go-pglock provides a simple and reliable way to implement distributed locks using PostgreSQL's advisory lock mechanism. This is useful when you need to coordinate access to shared resources across multiple processes or servers.
β¨ Key Features
- π― Simple API: Easy-to-use interface for acquiring and releasing locks
- β‘ Non-blocking locks: Try to acquire a lock without waiting
- β³ Blocking locks: Wait until a lock becomes available
- π Read-write locks: Support for shared (read) and exclusive (write) locks
- β±οΈ Context support: Timeout and cancellation support for all operations
- π¦ Lock stacking: Same session can acquire the same lock multiple times
- π§Ή Automatic cleanup: Locks are automatically released when connections close
- π No external dependencies: Uses only PostgreSQL (no Redis, ZooKeeper, etc.)
- πͺ Battle-tested: Used in production environments
π― When to Use
Use go-pglock when you need to:
- π Prevent duplicate execution of scheduled jobs across multiple servers
- π€ Coordinate access to shared resources
- π Implement leader election
- β
Ensure only one instance processes a particular task
- π Serialize access to critical sections in distributed systems
- π° Manage resource pools across multiple processes
π¦ Installation
go get github.com/allisson/go-pglock/v3
Requirements:
- π΅ Go 1.17 or higher
- π PostgreSQL 9.6 or higher
π Quick Start
package main
import (
"context"
"database/sql"
"fmt"
"log"
"github.com/allisson/go-pglock/v3"
_ "github.com/lib/pq"
)
func main() {
// Connect to PostgreSQL
db, err := sql.Open("postgres", "postgres://user:pass@localhost/mydb?sslmode=disable")
if err != nil {
log.Fatal(err)
}
defer db.Close()
ctx := context.Background()
// Create a lock with ID 1
lock, err := pglock.NewLock(ctx, 1, db)
if err != nil {
log.Fatal(err)
}
defer lock.Close()
// Try to acquire the lock
acquired, err := lock.Lock(ctx)
if err != nil {
log.Fatal(err)
}
if acquired {
fmt.Println("Lock acquired! Doing work...")
// Do your work here
// Release the lock
if err := lock.Unlock(ctx); err != nil {
log.Fatal(err)
}
fmt.Println("Lock released!")
} else {
fmt.Println("Could not acquire lock - another process has it")
}
}
βοΈ How It Works
PostgreSQL advisory locks are a powerful feature for implementing distributed locking:
- π Session-level locks: Locks are held until explicitly released or the database connection closes
- π·οΈ Application-defined: You define the meaning of each lock using a numeric identifier (int64)
- β‘ Fast and efficient: No table bloat, faster than row-level locks
- π§Ή Automatic cleanup: Server automatically releases locks when sessions end
- π Lock stacking: A session can acquire the same lock multiple times (requires equal unlocks)
From the PostgreSQL documentation:
PostgreSQL provides a means for creating locks that have application-defined meanings. These are called advisory locks, because the system does not enforce their use β it is up to the application to use them correctly. Advisory locks can be useful for locking strategies that are an awkward fit for the MVCC model.
π API Reference
Types
Locker Interface
type Locker interface {
Lock(ctx context.Context) (bool, error)
RLock(ctx context.Context) (bool, error)
WaitAndLock(ctx context.Context) error
WaitAndRLock(ctx context.Context) error
Unlock(ctx context.Context) error
RUnlock(ctx context.Context) error
Close() error
}
Lock Struct
type Lock struct {
// contains filtered or unexported fields
}
Functions
NewLock(ctx context.Context, id int64, db *sql.DB) (Lock, error)
Creates a new Lock instance with a dedicated database connection.
ctx: Context for managing the connection acquisition
id: The lock identifier (int64)
db: Database connection pool
- Returns: Lock instance and error
Lock(ctx context.Context) (bool, error)
Attempts to acquire an exclusive lock without waiting. Returns immediately with true if acquired, false otherwise.
RLock(ctx context.Context) (bool, error)
Attempts to acquire a shared (read) lock without waiting. Multiple sessions can hold shared locks simultaneously, but shared locks conflict with exclusive locks. Returns true if acquired, false otherwise.
WaitAndLock(ctx context.Context) error
Blocks until an exclusive lock is acquired. Respects context cancellation and timeouts.
WaitAndRLock(ctx context.Context) error
Blocks until a shared (read) lock is acquired. Multiple sessions can acquire shared locks concurrently. Respects context cancellation and timeouts.
Unlock(ctx context.Context) error
Releases one level of exclusive lock ownership. Must be called equal to the number of Lock/WaitAndLock calls.
RUnlock(ctx context.Context) error
Releases one level of shared lock ownership. Must be called equal to the number of RLock/WaitAndRLock calls.
Close() error
Closes the database connection and releases all locks (both exclusive and shared).
π Lock Types
π Exclusive Locks (Write Locks)
Exclusive locks are mutually exclusive with all other locks (both exclusive and shared):
- β
Only one session can hold an exclusive lock at a time
- β No other locks (exclusive or shared) can be acquired while an exclusive lock is held
- βοΈ Use for write operations or when you need exclusive access to a resource
- π― Acquired with
Lock() or WaitAndLock()
- π Released with
Unlock()
π Shared Locks (Read Locks)
Shared locks allow multiple concurrent readers but prevent writers:
- π₯ Multiple sessions can hold shared locks simultaneously
- β οΈ Shared locks conflict with exclusive locks (writers)
- π Perfect for read-heavy workloads where multiple readers can safely access a resource
- π Use when you need to read data but prevent writes during the read
- π― Acquired with
RLock() or WaitAndRLock()
- π Released with
RUnlock()
Lock Compatibility Matrix
| Current Lock |
Lock() |
RLock() |
| None |
β
Succeeds |
β
Succeeds |
| Exclusive |
β Blocks |
β Blocks |
| Shared |
β Blocks |
β
Succeeds |
π‘ Examples
Basic Lock Usage
package main
import (
"context"
"database/sql"
"fmt"
"log"
"github.com/allisson/go-pglock/v3"
_ "github.com/lib/pq"
)
func main() {
db, err := sql.Open("postgres", "postgres://user:pass@localhost/mydb?sslmode=disable")
if err != nil {
log.Fatal(err)
}
defer db.Close()
ctx := context.Background()
lockID := int64(100)
// Create lock
lock, err := pglock.NewLock(ctx, lockID, db)
if err != nil {
log.Fatal(err)
}
defer lock.Close()
// Acquire lock
acquired, err := lock.Lock(ctx)
if err != nil {
log.Fatal(err)
}
if !acquired {
fmt.Println("Lock is held by another process")
return
}
// Critical section
fmt.Println("Executing critical section...")
// Your code here
// Release lock
if err := lock.Unlock(ctx); err != nil {
log.Fatal(err)
}
}
Try Lock (Non-blocking)
Perfect for scenarios where you want to skip work if another process is already doing it.
func processDataIfAvailable(db *sql.DB) error {
ctx := context.Background()
lockID := int64(200)
lock, err := pglock.NewLock(ctx, lockID, db)
if err != nil {
return err
}
defer lock.Close()
// Try to acquire without waiting
acquired, err := lock.Lock(ctx)
if err != nil {
return err
}
if !acquired {
fmt.Println("Another process is already processing data, skipping...")
return nil
}
defer lock.Unlock(ctx)
// Process data
fmt.Println("Processing data...")
// Your processing logic here
return nil
}
Wait and Lock (Blocking)
Use when you must execute the task eventually, even if you have to wait.
func processDataAndWait(db *sql.DB) error {
ctx := context.Background()
lockID := int64(300)
lock, err := pglock.NewLock(ctx, lockID, db)
if err != nil {
return err
}
defer lock.Close()
fmt.Println("Waiting for lock...")
// Wait until lock is available
if err := lock.WaitAndLock(ctx); err != nil {
return err
}
defer lock.Unlock(ctx)
fmt.Println("Lock acquired, processing data...")
// Your processing logic here
return nil
}
Lock with Timeout
Implement a timeout to avoid waiting indefinitely.
func processWithTimeout(db *sql.DB, timeout time.Duration) error {
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
lockID := int64(400)
lock, err := pglock.NewLock(ctx, lockID, db)
if err != nil {
return err
}
defer lock.Close()
// This will fail if lock is not acquired within timeout
if err := lock.WaitAndLock(ctx); err != nil {
if errors.Is(err, context.DeadlineExceeded) {
return fmt.Errorf("could not acquire lock within %v", timeout)
}
return err
}
defer lock.Unlock(context.Background()) // Use background for cleanup
fmt.Println("Lock acquired, processing...")
// Your processing logic here
return nil
}
Concurrent Workers
Coordinate multiple workers accessing a shared resource.
func runWorker(workerID int, db *sql.DB, wg *sync.WaitGroup) {
defer wg.Done()
ctx := context.Background()
lockID := int64(500) // Same lock ID for all workers
lock, err := pglock.NewLock(ctx, lockID, db)
if err != nil {
log.Printf("Worker %d: failed to create lock: %v", workerID, err)
return
}
defer lock.Close()
fmt.Printf("Worker %d: waiting for lock...\n", workerID)
if err := lock.WaitAndLock(ctx); err != nil {
log.Printf("Worker %d: failed to acquire lock: %v", workerID, err)
return
}
fmt.Printf("Worker %d: acquired lock, processing...\n", workerID)
// Simulate work
time.Sleep(1 * time.Second)
fmt.Printf("Worker %d: releasing lock\n", workerID)
lock.Unlock(ctx)
}
func main() {
db, _ := sql.Open("postgres", "postgres://user:pass@localhost/mydb?sslmode=disable")
defer db.Close()
var wg sync.WaitGroup
for i := 1; i <= 5; i++ {
wg.Add(1)
go runWorker(i, db, &wg)
}
wg.Wait()
}
Distributed Task Execution
Ensure a task runs only once across multiple servers.
type TaskProcessor struct {
db *sql.DB
}
func (tp *TaskProcessor) ProcessTask(taskID string) error {
ctx := context.Background()
// Use hash of task ID as lock ID
lockID := hashToInt64(taskID)
lock, err := pglock.NewLock(ctx, lockID, tp.db)
if err != nil {
return err
}
defer lock.Close()
// Try to acquire lock
acquired, err := lock.Lock(ctx)
if err != nil {
return err
}
if !acquired {
return fmt.Errorf("task %s is already being processed", taskID)
}
defer lock.Unlock(ctx)
fmt.Printf("Processing task %s...\n", taskID)
// Execute task
if err := tp.executeTask(taskID); err != nil {
return fmt.Errorf("failed to execute task: %w", err)
}
fmt.Printf("Task %s completed\n", taskID)
return nil
}
func (tp *TaskProcessor) executeTask(taskID string) error {
// Your task execution logic
time.Sleep(2 * time.Second)
return nil
}
func hashToInt64(s string) int64 {
h := fnv.New64a()
h.Write([]byte(s))
return int64(h.Sum64())
}
Leader Election
Implement leader election in a cluster of services.
type LeaderElector struct {
db *sql.DB
lockID int64
isLeader bool
mu sync.RWMutex
}
func NewLeaderElector(db *sql.DB, clusterName string) *LeaderElector {
return &LeaderElector{
db: db,
lockID: hashToInt64(clusterName),
}
}
func (le *LeaderElector) RunElection(ctx context.Context) error {
lock, err := pglock.NewLock(ctx, le.lockID, le.db)
if err != nil {
return err
}
defer lock.Close()
// Try to become leader
acquired, err := lock.Lock(ctx)
if err != nil {
return err
}
if acquired {
le.mu.Lock()
le.isLeader = true
le.mu.Unlock()
fmt.Println("β Became leader")
defer func() {
le.mu.Lock()
le.isLeader = false
le.mu.Unlock()
lock.Unlock(context.Background())
fmt.Println("β Lost leadership")
}()
// Perform leader duties
le.performLeaderDuties(ctx)
} else {
fmt.Println("Another instance is the leader")
}
return nil
}
func (le *LeaderElector) IsLeader() bool {
le.mu.RLock()
defer le.mu.RUnlock()
return le.isLeader
}
func (le *LeaderElector) performLeaderDuties(ctx context.Context) {
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
fmt.Println("Leader performing periodic task...")
// Do leader work
}
}
}
Resource Pool Management
Manage a limited pool of resources across multiple processes.
type ResourcePool struct {
db *sql.DB
poolSize int
}
func NewResourcePool(db *sql.DB, poolSize int) *ResourcePool {
return &ResourcePool{
db: db,
poolSize: poolSize,
}
}
// AcquireResource tries to acquire one resource from the pool
func (rp *ResourcePool) AcquireResource(ctx context.Context) (resourceID int, release func(), err error) {
// Try each resource slot
for i := 1; i <= rp.poolSize; i++ {
lockID := int64(10000 + i) // Base offset + slot number
lock, err := pglock.NewLock(ctx, lockID, rp.db)
if err != nil {
continue
}
// Try to acquire this slot (non-blocking)
acquired, err := lock.Lock(ctx)
if err != nil {
lock.Close()
continue
}
if acquired {
// Successfully acquired this resource slot
release := func() {
lock.Unlock(context.Background())
lock.Close()
}
return i, release, nil
}
lock.Close()
}
return 0, nil, fmt.Errorf("no resources available in pool")
}
func main() {
db, _ := sql.Open("postgres", "postgres://user:pass@localhost/mydb?sslmode=disable")
defer db.Close()
pool := NewResourcePool(db, 3) // Pool of 3 resources
ctx := context.Background()
resourceID, release, err := pool.AcquireResource(ctx)
if err != nil {
log.Fatal(err)
}
defer release()
fmt.Printf("Acquired resource %d\n", resourceID)
// Use the resource
time.Sleep(2 * time.Second)
fmt.Printf("Releasing resource %d\n", resourceID)
}
Database Migration Lock
Ensure database migrations run only once in multi-instance deployments.
type MigrationRunner struct {
db *sql.DB
}
func (mr *MigrationRunner) RunMigrations(ctx context.Context) error {
const migrationLockID = int64(999999)
lock, err := pglock.NewLock(ctx, migrationLockID, mr.db)
if err != nil {
return fmt.Errorf("failed to create migration lock: %w", err)
}
defer lock.Close()
fmt.Println("Attempting to acquire migration lock...")
// Use a timeout to avoid waiting too long
lockCtx, cancel := context.WithTimeout(ctx, 30*time.Second)
defer cancel()
if err := lock.WaitAndLock(lockCtx); err != nil {
return fmt.Errorf("failed to acquire migration lock: %w", err)
}
defer lock.Unlock(context.Background())
fmt.Println("Migration lock acquired, checking migration status...")
// Check if migrations are needed
needsMigration, err := mr.checkMigrationStatus()
if err != nil {
return err
}
if !needsMigration {
fmt.Println("Database is up to date")
return nil
}
// Run migrations
fmt.Println("Running migrations...")
if err := mr.executeMigrations(); err != nil {
return fmt.Errorf("migration failed: %w", err)
}
fmt.Println("Migrations completed successfully")
return nil
}
func (mr *MigrationRunner) checkMigrationStatus() (bool, error) {
// Check if migrations are needed
// This is application-specific logic
return true, nil
}
func (mr *MigrationRunner) executeMigrations() error {
// Execute your migrations
time.Sleep(2 * time.Second) // Simulate migration work
return nil
}
Scheduled Job Coordination
Coordinate scheduled jobs across multiple instances to prevent duplicate execution.
type ScheduledJob struct {
db *sql.DB
jobID string
lockID int64
}
func NewScheduledJob(db *sql.DB, jobID string) *ScheduledJob {
return &ScheduledJob{
db: db,
jobID: jobID,
lockID: hashToInt64(jobID),
}
}
func (sj *ScheduledJob) Execute(ctx context.Context) error {
lock, err := pglock.NewLock(ctx, sj.lockID, sj.db)
if err != nil {
return fmt.Errorf("failed to create lock: %w", err)
}
defer lock.Close()
// Try to acquire lock (non-blocking)
acquired, err := lock.Lock(ctx)
if err != nil {
return fmt.Errorf("failed to acquire lock: %w", err)
}
if !acquired {
fmt.Printf("Job %s is already running on another instance\n", sj.jobID)
return nil
}
defer lock.Unlock(ctx)
fmt.Printf("Executing job %s...\n", sj.jobID)
// Execute the actual job
if err := sj.run(ctx); err != nil {
return fmt.Errorf("job execution failed: %w", err)
}
fmt.Printf("Job %s completed\n", sj.jobID)
return nil
}
func (sj *ScheduledJob) run(ctx context.Context) error {
// Your job logic here
select {
case <-time.After(5 * time.Second):
// Job completed
return nil
case <-ctx.Done():
return ctx.Err()
}
}
func main() {
db, _ := sql.Open("postgres", "postgres://user:pass@localhost/mydb?sslmode=disable")
defer db.Close()
// Simulate a cron job running every minute
ticker := time.NewTicker(1 * time.Minute)
defer ticker.Stop()
ctx := context.Background()
job := NewScheduledJob(db, "cleanup-task")
for {
select {
case <-ticker.C:
if err := job.Execute(ctx); err != nil {
log.Printf("Job execution error: %v", err)
}
}
}
}
Read-Write Lock: Multiple Readers, Single Writer
Use shared locks to allow multiple readers while preventing writers.
type DataCache struct {
db *sql.DB
recordID string
lockID int64
}
func NewDataCache(db *sql.DB, recordID string) *DataCache {
return &DataCache{
db: db,
recordID: recordID,
lockID: hashToInt64("cache-" + recordID),
}
}
// ReadData acquires a shared lock for reading
func (dc *DataCache) ReadData(ctx context.Context) (string, error) {
lock, err := pglock.NewLock(ctx, dc.lockID, dc.db)
if err != nil {
return "", err
}
defer lock.Close()
// Acquire shared lock - multiple readers can hold this simultaneously
if err := lock.WaitAndRLock(ctx); err != nil {
return "", fmt.Errorf("failed to acquire read lock: %w", err)
}
defer lock.RUnlock(ctx)
fmt.Println("Reading data... (shared lock held)")
// Simulate reading from database or cache
time.Sleep(100 * time.Millisecond)
data := "cached-data-for-" + dc.recordID
return data, nil
}
// WriteData acquires an exclusive lock for writing
func (dc *DataCache) WriteData(ctx context.Context, data string) error {
lock, err := pglock.NewLock(ctx, dc.lockID, dc.db)
if err != nil {
return err
}
defer lock.Close()
// Acquire exclusive lock - blocks all other locks (read and write)
if err := lock.WaitAndLock(ctx); err != nil {
return fmt.Errorf("failed to acquire write lock: %w", err)
}
defer lock.Unlock(ctx)
fmt.Println("Writing data... (exclusive lock held)")
// Simulate writing to database and invalidating cache
time.Sleep(200 * time.Millisecond)
return nil
}
func main() {
db, _ := sql.Open("postgres", "postgres://user:pass@localhost/mydb?sslmode=disable")
defer db.Close()
cache := NewDataCache(db, "user-123")
ctx := context.Background()
var wg sync.WaitGroup
// Spawn 5 concurrent readers
for i := 1; i <= 5; i++ {
wg.Add(1)
go func(readerID int) {
defer wg.Done()
data, err := cache.ReadData(ctx)
if err != nil {
log.Printf("Reader %d error: %v", readerID, err)
return
}
fmt.Printf("Reader %d got: %s\n", readerID, data)
}(i)
}
// Spawn 1 writer after a delay
time.Sleep(50 * time.Millisecond)
wg.Add(1)
go func() {
defer wg.Done()
if err := cache.WriteData(ctx, "new-data"); err != nil {
log.Printf("Writer error: %v", err)
}
fmt.Println("Writer completed")
}()
wg.Wait()
}
Read-Write Lock: Configuration Management
Manage application configuration with frequent reads and rare writes.
type ConfigManager struct {
db *sql.DB
lockID int64
}
func NewConfigManager(db *sql.DB) *ConfigManager {
return &ConfigManager{
db: db,
lockID: hashToInt64("app-config"),
}
}
// GetConfig reads configuration (uses shared lock)
func (cm *ConfigManager) GetConfig(ctx context.Context) (map[string]string, error) {
lock, err := pglock.NewLock(ctx, cm.lockID, cm.db)
if err != nil {
return nil, err
}
defer lock.Close()
// Use shared lock - allows multiple concurrent readers
acquired, err := lock.RLock(ctx)
if err != nil {
return nil, err
}
if !acquired {
return nil, fmt.Errorf("config is being updated, try again")
}
defer lock.RUnlock(ctx)
// Read config from database
fmt.Println("Reading configuration...")
config := map[string]string{
"db_host": "localhost",
"db_port": "5432",
"max_workers": "10",
}
return config, nil
}
// UpdateConfig writes configuration (uses exclusive lock)
func (cm *ConfigManager) UpdateConfig(ctx context.Context, updates map[string]string) error {
lock, err := pglock.NewLock(ctx, cm.lockID, cm.db)
if err != nil {
return err
}
defer lock.Close()
// Use exclusive lock with timeout - blocks all readers and writers
lockCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
if err := lock.WaitAndLock(lockCtx); err != nil {
return fmt.Errorf("failed to acquire exclusive lock for config update: %w", err)
}
defer lock.Unlock(context.Background())
fmt.Println("Updating configuration...")
// Write config to database
time.Sleep(100 * time.Millisecond)
fmt.Printf("Configuration updated with %d keys\n", len(updates))
return nil
}
func main() {
db, _ := sql.Open("postgres", "postgres://user:pass@localhost/mydb?sslmode=disable")
defer db.Close()
cm := NewConfigManager(db)
ctx := context.Background()
// Multiple services reading config concurrently
var wg sync.WaitGroup
for i := 1; i <= 3; i++ {
wg.Add(1)
go func(serviceID int) {
defer wg.Done()
config, err := cm.GetConfig(ctx)
if err != nil {
log.Printf("Service %d: %v", serviceID, err)
return
}
fmt.Printf("Service %d loaded %d config keys\n", serviceID, len(config))
}(i)
}
// Admin updating config
wg.Add(1)
go func() {
defer wg.Done()
updates := map[string]string{"max_workers": "20"}
if err := cm.UpdateConfig(ctx, updates); err != nil {
log.Printf("Update failed: %v", err)
}
}()
wg.Wait()
}
Read-Write Lock: Report Generation
Allow multiple users to view reports while preventing generation conflicts.
type ReportGenerator struct {
db *sql.DB
lockID int64
}
func NewReportGenerator(db *sql.DB, reportType string) *ReportGenerator {
return &ReportGenerator{
db: db,
lockID: hashToInt64("report-" + reportType),
}
}
// ViewReport reads the report (uses shared lock)
func (rg *ReportGenerator) ViewReport(ctx context.Context, userID string) error {
lock, err := pglock.NewLock(ctx, rg.lockID, rg.db)
if err != nil {
return err
}
defer lock.Close()
// Try to acquire shared lock (non-blocking)
acquired, err := lock.RLock(ctx)
if err != nil {
return err
}
if !acquired {
return fmt.Errorf("report is being generated, please wait")
}
defer lock.RUnlock(ctx)
fmt.Printf("User %s viewing report...\n", userID)
time.Sleep(500 * time.Millisecond) // Simulate viewing
return nil
}
// GenerateReport creates/updates the report (uses exclusive lock)
func (rg *ReportGenerator) GenerateReport(ctx context.Context) error {
lock, err := pglock.NewLock(ctx, rg.lockID, rg.db)
if err != nil {
return err
}
defer lock.Close()
// Try to acquire exclusive lock (non-blocking)
acquired, err := lock.Lock(ctx)
if err != nil {
return err
}
if !acquired {
return fmt.Errorf("report generation already in progress or being viewed")
}
defer lock.Unlock(ctx)
fmt.Println("Generating report...")
time.Sleep(2 * time.Second) // Simulate generation
fmt.Println("Report generation completed")
return nil
}
β
Best Practices
1. π Always Close Locks
Use defer to ensure locks are closed even if errors occur:
lock, err := pglock.NewLock(ctx, lockID, db)
if err != nil {
return err
}
defer lock.Close() // Always close to release the connection
2. π Match Lock and Unlock Calls
Locks stack, so ensure you unlock as many times as you lock:
// Acquired twice
lock.Lock(ctx)
lock.Lock(ctx)
// Must unlock twice
lock.Unlock(ctx)
lock.Unlock(ctx)
The same applies to shared locks:
// Acquired twice
lock.RLock(ctx)
lock.RLock(ctx)
// Must unlock twice
lock.RUnlock(ctx)
lock.RUnlock(ctx)
3. β
Use Correct Lock and Unlock Pairs
Always pair the correct lock and unlock methods:
// Correct pairs
lock.Lock(ctx) // Use Unlock()
lock.Unlock(ctx)
lock.RLock(ctx) // Use RUnlock()
lock.RUnlock(ctx)
// Wrong - don't mix them!
// lock.Lock(ctx)
// lock.RUnlock(ctx) // Wrong!
4. β±οΈ Use Context Timeouts
Prevent indefinite waiting with context timeouts:
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
if err := lock.WaitAndLock(ctx); err != nil {
// Handle timeout
}
5. π― Choose Appropriate Lock IDs
- π Use meaningful, deterministic IDs based on resource names
- #οΈβ£ Use hash functions for string-based identifiers
- π Document your lock ID allocation strategy
// Good: Deterministic based on resource
lockID := hashToInt64("user-" + userID)
// Avoid: Random or non-deterministic IDs
lockID := rand.Int63() // Bad!
6. β οΈ Handle Lock Acquisition Failures
Always check if lock acquisition succeeded:
acquired, err := lock.Lock(ctx)
if err != nil {
// Handle error
}
if !acquired {
// Handle case where lock is held by another process
}
7. π Use Connection Pooling Wisely
Each lock holds a dedicated connection. Consider your connection pool size:
// Configure appropriate pool size
db.SetMaxOpenConns(50) // Ensure enough connections for locks + queries
8. π― Choose the Right Lock Type
Use the appropriate lock type for your use case:
- π Exclusive locks (Lock/Unlock): Use when you need to modify data or require exclusive access
- π Shared locks (RLock/RUnlock): Use for read operations where multiple readers can work concurrently
// Reading data - use shared lock
acquired, _ := lock.RLock(ctx)
defer lock.RUnlock(ctx)
// Multiple readers can read simultaneously
// Writing data - use exclusive lock
acquired, _ := lock.Lock(ctx)
defer lock.Unlock(ctx)
// Only one writer, blocks all readers and writers
9. ποΈ Consider Lock Granularity
- π¬ Fine-grained locks: Better concurrency, more complex
- π― Coarse-grained locks: Simpler, but may reduce throughput
10. π§ͺ Testing with Locks
When testing code that uses locks, consider using different lock IDs per test:
func TestMyFunction(t *testing.T) {
lockID := int64(time.Now().UnixNano()) // Unique per test run
// ... test code
}
π§ͺ Testing
Running Tests Locally
The project includes a Docker Compose setup for easy local testing:
# Start PostgreSQL and run tests
make test-local
# Run tests with race detector
make test-race
# Generate coverage report
make test-coverage
Manual Testing
# Start PostgreSQL
docker-compose up -d
# Set DATABASE_URL
export DATABASE_URL='postgres://test:test@localhost:5432/pglock?sslmode=disable'
# Run tests
go test -v ./...
# Clean up
docker-compose down
π§ Troubleshooting
β "pq: database "pglock" does not exist"
Create the database:
CREATE DATABASE pglock;
β οΈ "too many connections"
Increase PostgreSQL's max_connections or reduce your application's connection pool size:
db.SetMaxOpenConns(25) // Reduce if hitting connection limits
π Deadlocks
Advisory locks can deadlock if acquired in different orders. Always acquire locks in a consistent order:
// Good: Consistent order
lockA := getLock(1)
lockB := getLock(2)
// Bad: Inconsistent order can cause deadlocks
if someCondition {
lockA, then lockB
} else {
lockB, then lockA
}
π Lock Not Released
Locks are automatically released when:
- β
Unlock() is called
- β
Close() is called
- β
Database connection closes
- β
Database session ends
If locks aren't releasing, check for:
- β Missing
Unlock() calls
- β οΈ Connection leaks
- π₯ Application crashes before cleanup
β±οΈ Context Deadline Exceeded
If you see context deadline errors, either:
- β« Increase the timeout
- π Investigate why locks are held for so long
- π Use non-blocking
Lock() instead of WaitAndLock()
π€ Contributing
Contributions are welcome! Please feel free to submit a Pull Request.
π License
This project is licensed under the MIT License - see the LICENSE file for details.