concurrency

package
v1.5.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 15, 2026 License: MIT Imports: 4 Imported by: 0

Documentation

Overview

Package concurrency provides concurrency limiting for Go applications.

A concurrency limiter controls the number of operations that can run simultaneously, acting as a semaphore with additional features like context support, dynamic configuration, and comprehensive state inspection.

Basic usage:

limiter, err := concurrency.NewSafe(10) // Allow 10 concurrent operations
if err != nil {
	log.Fatal(err)
}

if limiter.Acquire() {
	defer limiter.Release()
	// Do work
}

Key Features:

The concurrency limiter provides:

  • Non-blocking permit acquisition (Acquire/AcquireN)
  • Context-aware blocking operations (Wait/WaitN)
  • Batch permit operations for efficiency
  • Dynamic capacity adjustment at runtime
  • Comprehensive state inspection
  • Graceful handling of context cancellation

Use Cases:

Concurrency limiting is ideal for:

  • Database connection pools
  • HTTP server request limiting
  • Worker pool management
  • Resource-intensive operation control
  • API rate limiting by active requests
  • Memory-bound operation throttling

Worker Pool Pattern:

limiter, err := concurrency.NewSafe(5) // Max 5 concurrent workers
if err != nil {
	log.Fatal(err)
}

var wg sync.WaitGroup
for _, task := range tasks {
	wg.Add(1)
	go func(t Task) {
		defer wg.Done()

		// Wait for available slot
		if err := limiter.Wait(ctx); err != nil {
			return // Context canceled/timeout
		}
		defer limiter.Release()

		// Process task
		processTask(t)
	}(task)
}
wg.Wait()

Database Connection Limiting:

dbLimiter, err := concurrency.NewSafe(20) // Max 20 DB connections
if err != nil {
	log.Fatal(err)
}

func queryDatabase(query string) error {
	// Acquire connection permit
	if !dbLimiter.Acquire() {
		return errors.New("database busy")
	}
	defer dbLimiter.Release()

	// Execute query
	return db.Query(query)
}

HTTP Server Limiting:

requestLimiter, err := concurrency.NewSafe(1000) // Max 1000 concurrent requests
if err != nil {
	log.Fatal(err)
}

func handler(w http.ResponseWriter, r *http.Request) {
	if !requestLimiter.Acquire() {
		http.Error(w, "Server busy", http.StatusTooManyRequests)
		return
	}
	defer requestLimiter.Release()

	// Handle request
	handleRequest(w, r)
}

Batch Operations:

// Acquire multiple permits for batch processing
if limiter.AcquireN(batchSize) {
	defer limiter.ReleaseN(batchSize)

	// Process batch
	for _, item := range batch {
		processItem(item)
	}
}

Context Integration:

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

if err := limiter.Wait(ctx); err != nil {
	if err == context.DeadlineExceeded {
		// Handle timeout
	}
	return err
}
defer limiter.Release()

Dynamic Configuration:

// Start with capacity for normal load
limiter, err := concurrency.NewSafe(10)
if err != nil {
	log.Fatal(err)
}

// Increase capacity during peak hours
limiter.SetCapacity(50)

// Monitor and adjust based on system metrics
if systemLoad < 0.5 {
	limiter.SetCapacity(limiter.Capacity() + 10)
}

State Inspection:

capacity := limiter.Capacity()   // Maximum concurrent operations
available := limiter.Available() // Currently available permits
inUse := limiter.InUse()        // Currently active operations

// Calculate utilization
utilization := float64(inUse) / float64(capacity)

Advanced Configuration:

config := concurrency.Config{
	Capacity:         100,
	InitialAvailable: 50, // Start with 50 operations "in use"
}
limiter, err := concurrency.NewWithConfigSafe(config)
if err != nil {
	log.Fatal(err)
}

Graceful Shutdown:

// Allow pending operations to complete, but reject new ones
limiter.SetCapacity(0) // Stop accepting new operations

// Wait for active operations to complete
for limiter.InUse() > 0 {
	time.Sleep(100 * time.Millisecond)
}

Error Handling:

The limiter panics on:

  • Invalid capacity (zero or negative)
  • Releasing more permits than capacity
  • Invalid configuration parameters

Context errors are returned for:

  • context.Canceled: Context was canceled
  • context.DeadlineExceeded: Context timeout exceeded

Thread Safety:

All operations are safe for concurrent use. The limiter uses mutex-based synchronization to ensure consistency while maintaining good performance for high-throughput scenarios.

Performance Characteristics:

  • Acquire/Release: O(1) for immediate success/failure
  • Wait operations: O(n) where n is the number of waiters
  • Memory usage scales with number of waiting goroutines
  • Optimized for high-frequency acquire/release patterns

Comparison with Other Patterns:

vs Channel-based semaphores:

  • Better performance for high-frequency operations
  • More features (state inspection, dynamic capacity)
  • Context integration built-in

vs sync.WaitGroup:

  • Controls concurrency rather than coordination
  • Supports timeouts and cancellation
  • Runtime capacity adjustment

vs Worker pools:

  • More flexible - works with any operation pattern
  • Lower overhead - no goroutine pool management
  • Dynamic scaling based on demand
Example

Example demonstrates basic usage of the concurrency limiter

package main

import (
	"fmt"

	"github.com/1mb-dev/goflow/pkg/ratelimit/concurrency"
)

func main() {
	// Create a limiter that allows 3 concurrent operations
	limiter, err := concurrency.NewSafe(3)
	if err != nil {
		panic(fmt.Sprintf("Failed to create limiter: %v", err))
	}

	// Try to acquire a permit (non-blocking)
	if limiter.Acquire() {
		fmt.Println("Operation permitted")
		// Do work...
		limiter.Release() // Don't forget to release!
	} else {
		fmt.Println("Operation denied - at capacity")
	}

}
Output:

Operation permitted
Example (CustomConfiguration)

Example_customConfiguration demonstrates advanced configuration

package main

import (
	"fmt"

	"github.com/1mb-dev/goflow/pkg/ratelimit/concurrency"
)

func main() {
	// Create limiter with custom initial state
	config := concurrency.Config{
		Capacity:         10,
		InitialAvailable: 5, // Start with only 5 available (simulating 5 in use)
	}

	limiter, err := concurrency.NewWithConfigSafe(config)
	if err != nil {
		panic(fmt.Sprintf("Failed to create limiter: %v", err))
	}

	fmt.Printf("Custom limiter: capacity=%d, available=%d, in_use=%d\n",
		limiter.Capacity(), limiter.Available(), limiter.InUse())

}
Output:

Custom limiter: capacity=10, available=5, in_use=5
Example (DatabaseConnections)

Example_databaseConnections demonstrates limiting database connections

package main

import (
	"fmt"

	"github.com/1mb-dev/goflow/pkg/ratelimit/concurrency"
)

func main() {
	// Limit to 3 concurrent database connections
	dbLimiter, err := concurrency.NewSafe(3)
	if err != nil {
		panic(fmt.Sprintf("Failed to create limiter: %v", err))
	}

	fmt.Printf("Database limiter capacity: %d\n", dbLimiter.Capacity())
	fmt.Printf("Available connections: %d\n", dbLimiter.Available())

	// Simulate acquiring connections
	for i := 1; i <= 4; i++ {
		if dbLimiter.Acquire() {
			fmt.Printf("Connection %d acquired. Available: %d, In use: %d\n",
				i, dbLimiter.Available(), dbLimiter.InUse())
		} else {
			fmt.Printf("Connection %d denied. Available: %d, In use: %d\n",
				i, dbLimiter.Available(), dbLimiter.InUse())
		}
	}

	// Release a connection
	dbLimiter.Release()
	fmt.Printf("Connection released. Available: %d, In use: %d\n",
		dbLimiter.Available(), dbLimiter.InUse())

}
Output:

Database limiter capacity: 3
Available connections: 3
Connection 1 acquired. Available: 2, In use: 1
Connection 2 acquired. Available: 1, In use: 2
Connection 3 acquired. Available: 0, In use: 3
Connection 4 denied. Available: 0, In use: 3
Connection released. Available: 1, In use: 2
Example (DynamicCapacity)

Example_dynamicCapacity demonstrates changing capacity at runtime

package main

import (
	"fmt"

	"github.com/1mb-dev/goflow/pkg/ratelimit/concurrency"
)

func main() {
	limiter, err := concurrency.NewSafe(3)
	if err != nil {
		panic(fmt.Sprintf("Failed to create limiter: %v", err))
	}

	// Use some permits
	limiter.AcquireN(2)
	fmt.Printf("Initial: capacity=%d, available=%d, in_use=%d\n",
		limiter.Capacity(), limiter.Available(), limiter.InUse())

	// Increase capacity
	limiter.SetCapacity(5)
	fmt.Printf("After increase: capacity=%d, available=%d, in_use=%d\n",
		limiter.Capacity(), limiter.Available(), limiter.InUse())

	// Decrease capacity
	limiter.SetCapacity(3)
	fmt.Printf("After decrease: capacity=%d, available=%d, in_use=%d\n",
		limiter.Capacity(), limiter.Available(), limiter.InUse())

}
Output:

Initial: capacity=3, available=1, in_use=2
After increase: capacity=5, available=3, in_use=2
After decrease: capacity=3, available=1, in_use=2
Example (GracefulShutdown)

Example_gracefulShutdown demonstrates graceful shutdown pattern

package main

import (
	"context"
	"fmt"
	"sync"
	"time"

	"github.com/1mb-dev/goflow/pkg/ratelimit/concurrency"
)

func main() {
	limiter, err := concurrency.NewSafe(2)
	if err != nil {
		panic(fmt.Sprintf("Failed to create limiter: %v", err))
	}

	// Start some work
	var wg sync.WaitGroup
	for i := 1; i <= 3; i++ {
		wg.Add(1)
		go func(id int) {
			defer wg.Done()

			// Acquire permit with timeout for shutdown
			ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
			defer cancel()

			if err := limiter.Wait(ctx); err != nil {
				fmt.Printf("Worker %d: shutdown timeout\n", id)
				return
			}
			defer limiter.Release()

			time.Sleep(50 * time.Millisecond)
		}(i)
	}

	wg.Wait()
	fmt.Printf("Final state: available=%d, in_use=%d\n",
		limiter.Available(), limiter.InUse())

}
Output:

Final state: available=2, in_use=0
Example (HttpServerLimiting)

Example_httpServerLimiting demonstrates HTTP request limiting pattern

package main

import (
	"fmt"
	"sync"
	"time"

	"github.com/1mb-dev/goflow/pkg/ratelimit/concurrency"
)

func main() {
	// Limit concurrent HTTP requests
	requestLimiter, err := concurrency.NewSafe(100)
	if err != nil {
		panic(fmt.Sprintf("Failed to create limiter: %v", err))
	}

	// Simulate request handler
	handleRequest := func(requestID int) {
		if !requestLimiter.Acquire() {
			fmt.Printf("Request %d: Server busy, try again later\n", requestID)
			return
		}
		defer requestLimiter.Release()

		// Simulate request processing time
		time.Sleep(10 * time.Millisecond)
	}

	// Simulate burst of requests
	var wg sync.WaitGroup
	for i := 1; i <= 3; i++ {
		wg.Add(1)
		go func(id int) {
			defer wg.Done()
			handleRequest(id)
		}(i)
	}

	wg.Wait()
	fmt.Printf("All requests processed. Server state: %d/%d in use\n",
		requestLimiter.InUse(), requestLimiter.Capacity())

}
Output:

All requests processed. Server state: 0/100 in use
Example (MultiplePermits)

Example_multiplePermits demonstrates acquiring multiple permits at once

package main

import (
	"fmt"
	"time"

	"github.com/1mb-dev/goflow/pkg/ratelimit/concurrency"
)

func main() {
	limiter, err := concurrency.NewSafe(5)
	if err != nil {
		panic(fmt.Sprintf("Failed to create limiter: %v", err))
	}

	// Acquire multiple permits for batch operation
	if limiter.AcquireN(3) {
		fmt.Printf("Batch operation started. Available: %d, In use: %d\n",
			limiter.Available(), limiter.InUse())

		// Simulate batch work
		time.Sleep(10 * time.Millisecond)

		// Release all permits
		limiter.ReleaseN(3)
		fmt.Printf("Batch operation completed. Available: %d, In use: %d\n",
			limiter.Available(), limiter.InUse())
	} else {
		fmt.Println("Not enough permits available for batch operation")
	}

}
Output:

Batch operation started. Available: 2, In use: 3
Batch operation completed. Available: 5, In use: 0
Example (WithTimeout)

Example_withTimeout demonstrates using timeouts with concurrency limiter

package main

import (
	"context"
	"fmt"
	"time"

	"github.com/1mb-dev/goflow/pkg/ratelimit/concurrency"
)

func main() {
	// Small limiter to demonstrate blocking
	limiter, err := concurrency.NewSafe(1)
	if err != nil {
		panic(fmt.Sprintf("Failed to create limiter: %v", err))
	}

	// Fill the limiter
	limiter.Acquire()

	// Try to acquire with timeout
	ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond)
	defer cancel()

	err = limiter.Wait(ctx)
	if err != nil {
		fmt.Printf("Failed to acquire permit: %v\n", err)
	} else {
		fmt.Println("Permit acquired")
		limiter.Release()
	}

}
Output:

Failed to acquire permit: context deadline exceeded
Example (WorkerPool)

Example_workerPool demonstrates using concurrency limiter for a worker pool

package main

import (
	"context"
	"fmt"
	"sync"
	"time"

	"github.com/1mb-dev/goflow/pkg/ratelimit/concurrency"
)

func main() {
	// Limit concurrent workers to 2
	limiter, err := concurrency.NewSafe(2)
	if err != nil {
		panic(fmt.Sprintf("Failed to create limiter: %v", err))
	}

	tasks := []string{"task1", "task2", "task3", "task4", "task5"}
	var wg sync.WaitGroup

	for _, task := range tasks {
		wg.Add(1)
		go func(taskName string) {
			defer wg.Done()

			// Wait for permit (blocks if necessary)
			ctx := context.Background()
			if err := limiter.Wait(ctx); err != nil {
				fmt.Printf("Failed to acquire permit for %s: %v\n", taskName, err)
				return
			}
			defer limiter.Release()

			// Simulate work
			time.Sleep(100 * time.Millisecond)
		}(task)
	}

	wg.Wait()
	fmt.Println("All tasks completed")

}
Output:

All tasks completed

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Config

type Config struct {
	// Capacity is the maximum number of concurrent operations allowed.
	Capacity int

	// InitialAvailable is the initial number of available permits.
	// If negative or greater than Capacity, defaults to Capacity.
	InitialAvailable int
}

Config holds configuration options for creating a new concurrency Limiter.

type Limiter

type Limiter interface {
	// Acquire attempts to acquire a permit for one operation.
	// It returns true if a permit was available, false otherwise.
	// This method does not block.
	Acquire() bool

	// AcquireN attempts to acquire n permits for operations.
	// It returns true if all permits were available, false otherwise.
	// This method does not block.
	AcquireN(n int) bool

	// Wait blocks until a permit is available for one operation.
	// It returns an error if the context is canceled or deadline exceeded.
	Wait(ctx context.Context) error

	// WaitN blocks until n permits are available for operations.
	// It returns an error if the context is canceled or deadline exceeded.
	WaitN(ctx context.Context, n int) error

	// Release releases one permit back to the limiter.
	// It panics if more permits are released than were acquired.
	Release()

	// ReleaseN releases n permits back to the limiter.
	// It panics if more permits are released than were acquired.
	ReleaseN(n int)

	// SetCapacity changes the maximum number of concurrent operations allowed.
	// If the new capacity is less than current usage, it will take effect
	// as operations complete and permits are released.
	SetCapacity(capacity int)

	// Capacity returns the maximum number of concurrent operations allowed.
	Capacity() int

	// Available returns the number of permits currently available.
	Available() int

	// InUse returns the number of permits currently in use.
	InUse() int
}

Limiter controls the number of concurrent operations that can happen at any given time. It acts as a semaphore with additional features like context support and state inspection.

func NewSafe

func NewSafe(capacity int) (Limiter, error)

NewSafe creates a new concurrency limiter with validation that returns an error instead of panicking. This is the recommended way to create concurrency limiters for production use.

func NewWithConfigSafe

func NewWithConfigSafe(config Config) (Limiter, error)

NewWithConfigSafe creates a new concurrency limiter with validation that returns an error instead of panicking. This is the recommended way to create concurrency limiters for production use.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL