Documentation
¶
Overview ¶
Package concurrency provides concurrency limiting for Go applications.
A concurrency limiter controls the number of operations that can run simultaneously, acting as a semaphore with additional features like context support, dynamic configuration, and comprehensive state inspection.
Basic usage:
limiter, err := concurrency.NewSafe(10) // Allow 10 concurrent operations
if err != nil {
log.Fatal(err)
}
if limiter.Acquire() {
defer limiter.Release()
// Do work
}
Key Features:
The concurrency limiter provides:
- Non-blocking permit acquisition (Acquire/AcquireN)
- Context-aware blocking operations (Wait/WaitN)
- Batch permit operations for efficiency
- Dynamic capacity adjustment at runtime
- Comprehensive state inspection
- Graceful handling of context cancellation
Use Cases:
Concurrency limiting is ideal for:
- Database connection pools
- HTTP server request limiting
- Worker pool management
- Resource-intensive operation control
- API rate limiting by active requests
- Memory-bound operation throttling
Worker Pool Pattern:
limiter, err := concurrency.NewSafe(5) // Max 5 concurrent workers
if err != nil {
log.Fatal(err)
}
var wg sync.WaitGroup
for _, task := range tasks {
wg.Add(1)
go func(t Task) {
defer wg.Done()
// Wait for available slot
if err := limiter.Wait(ctx); err != nil {
return // Context canceled/timeout
}
defer limiter.Release()
// Process task
processTask(t)
}(task)
}
wg.Wait()
Database Connection Limiting:
dbLimiter, err := concurrency.NewSafe(20) // Max 20 DB connections
if err != nil {
log.Fatal(err)
}
func queryDatabase(query string) error {
// Acquire connection permit
if !dbLimiter.Acquire() {
return errors.New("database busy")
}
defer dbLimiter.Release()
// Execute query
return db.Query(query)
}
HTTP Server Limiting:
requestLimiter, err := concurrency.NewSafe(1000) // Max 1000 concurrent requests
if err != nil {
log.Fatal(err)
}
func handler(w http.ResponseWriter, r *http.Request) {
if !requestLimiter.Acquire() {
http.Error(w, "Server busy", http.StatusTooManyRequests)
return
}
defer requestLimiter.Release()
// Handle request
handleRequest(w, r)
}
Batch Operations:
// Acquire multiple permits for batch processing
if limiter.AcquireN(batchSize) {
defer limiter.ReleaseN(batchSize)
// Process batch
for _, item := range batch {
processItem(item)
}
}
Context Integration:
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
if err := limiter.Wait(ctx); err != nil {
if err == context.DeadlineExceeded {
// Handle timeout
}
return err
}
defer limiter.Release()
Dynamic Configuration:
// Start with capacity for normal load
limiter, err := concurrency.NewSafe(10)
if err != nil {
log.Fatal(err)
}
// Increase capacity during peak hours
limiter.SetCapacity(50)
// Monitor and adjust based on system metrics
if systemLoad < 0.5 {
limiter.SetCapacity(limiter.Capacity() + 10)
}
State Inspection:
capacity := limiter.Capacity() // Maximum concurrent operations available := limiter.Available() // Currently available permits inUse := limiter.InUse() // Currently active operations // Calculate utilization utilization := float64(inUse) / float64(capacity)
Advanced Configuration:
config := concurrency.Config{
Capacity: 100,
InitialAvailable: 50, // Start with 50 operations "in use"
}
limiter, err := concurrency.NewWithConfigSafe(config)
if err != nil {
log.Fatal(err)
}
Graceful Shutdown:
// Allow pending operations to complete, but reject new ones
limiter.SetCapacity(0) // Stop accepting new operations
// Wait for active operations to complete
for limiter.InUse() > 0 {
time.Sleep(100 * time.Millisecond)
}
Error Handling:
The limiter panics on:
- Invalid capacity (zero or negative)
- Releasing more permits than capacity
- Invalid configuration parameters
Context errors are returned for:
- context.Canceled: Context was canceled
- context.DeadlineExceeded: Context timeout exceeded
Thread Safety:
All operations are safe for concurrent use. The limiter uses mutex-based synchronization to ensure consistency while maintaining good performance for high-throughput scenarios.
Performance Characteristics:
- Acquire/Release: O(1) for immediate success/failure
- Wait operations: O(n) where n is the number of waiters
- Memory usage scales with number of waiting goroutines
- Optimized for high-frequency acquire/release patterns
Comparison with Other Patterns:
vs Channel-based semaphores:
- Better performance for high-frequency operations
- More features (state inspection, dynamic capacity)
- Context integration built-in
vs sync.WaitGroup:
- Controls concurrency rather than coordination
- Supports timeouts and cancellation
- Runtime capacity adjustment
vs Worker pools:
- More flexible - works with any operation pattern
- Lower overhead - no goroutine pool management
- Dynamic scaling based on demand
Example ¶
Example demonstrates basic usage of the concurrency limiter
package main
import (
"fmt"
"github.com/1mb-dev/goflow/pkg/ratelimit/concurrency"
)
func main() {
// Create a limiter that allows 3 concurrent operations
limiter, err := concurrency.NewSafe(3)
if err != nil {
panic(fmt.Sprintf("Failed to create limiter: %v", err))
}
// Try to acquire a permit (non-blocking)
if limiter.Acquire() {
fmt.Println("Operation permitted")
// Do work...
limiter.Release() // Don't forget to release!
} else {
fmt.Println("Operation denied - at capacity")
}
}
Output: Operation permitted
Example (CustomConfiguration) ¶
Example_customConfiguration demonstrates advanced configuration
package main
import (
"fmt"
"github.com/1mb-dev/goflow/pkg/ratelimit/concurrency"
)
func main() {
// Create limiter with custom initial state
config := concurrency.Config{
Capacity: 10,
InitialAvailable: 5, // Start with only 5 available (simulating 5 in use)
}
limiter, err := concurrency.NewWithConfigSafe(config)
if err != nil {
panic(fmt.Sprintf("Failed to create limiter: %v", err))
}
fmt.Printf("Custom limiter: capacity=%d, available=%d, in_use=%d\n",
limiter.Capacity(), limiter.Available(), limiter.InUse())
}
Output: Custom limiter: capacity=10, available=5, in_use=5
Example (DatabaseConnections) ¶
Example_databaseConnections demonstrates limiting database connections
package main
import (
"fmt"
"github.com/1mb-dev/goflow/pkg/ratelimit/concurrency"
)
func main() {
// Limit to 3 concurrent database connections
dbLimiter, err := concurrency.NewSafe(3)
if err != nil {
panic(fmt.Sprintf("Failed to create limiter: %v", err))
}
fmt.Printf("Database limiter capacity: %d\n", dbLimiter.Capacity())
fmt.Printf("Available connections: %d\n", dbLimiter.Available())
// Simulate acquiring connections
for i := 1; i <= 4; i++ {
if dbLimiter.Acquire() {
fmt.Printf("Connection %d acquired. Available: %d, In use: %d\n",
i, dbLimiter.Available(), dbLimiter.InUse())
} else {
fmt.Printf("Connection %d denied. Available: %d, In use: %d\n",
i, dbLimiter.Available(), dbLimiter.InUse())
}
}
// Release a connection
dbLimiter.Release()
fmt.Printf("Connection released. Available: %d, In use: %d\n",
dbLimiter.Available(), dbLimiter.InUse())
}
Output: Database limiter capacity: 3 Available connections: 3 Connection 1 acquired. Available: 2, In use: 1 Connection 2 acquired. Available: 1, In use: 2 Connection 3 acquired. Available: 0, In use: 3 Connection 4 denied. Available: 0, In use: 3 Connection released. Available: 1, In use: 2
Example (DynamicCapacity) ¶
Example_dynamicCapacity demonstrates changing capacity at runtime
package main
import (
"fmt"
"github.com/1mb-dev/goflow/pkg/ratelimit/concurrency"
)
func main() {
limiter, err := concurrency.NewSafe(3)
if err != nil {
panic(fmt.Sprintf("Failed to create limiter: %v", err))
}
// Use some permits
limiter.AcquireN(2)
fmt.Printf("Initial: capacity=%d, available=%d, in_use=%d\n",
limiter.Capacity(), limiter.Available(), limiter.InUse())
// Increase capacity
limiter.SetCapacity(5)
fmt.Printf("After increase: capacity=%d, available=%d, in_use=%d\n",
limiter.Capacity(), limiter.Available(), limiter.InUse())
// Decrease capacity
limiter.SetCapacity(3)
fmt.Printf("After decrease: capacity=%d, available=%d, in_use=%d\n",
limiter.Capacity(), limiter.Available(), limiter.InUse())
}
Output: Initial: capacity=3, available=1, in_use=2 After increase: capacity=5, available=3, in_use=2 After decrease: capacity=3, available=1, in_use=2
Example (GracefulShutdown) ¶
Example_gracefulShutdown demonstrates graceful shutdown pattern
package main
import (
"context"
"fmt"
"sync"
"time"
"github.com/1mb-dev/goflow/pkg/ratelimit/concurrency"
)
func main() {
limiter, err := concurrency.NewSafe(2)
if err != nil {
panic(fmt.Sprintf("Failed to create limiter: %v", err))
}
// Start some work
var wg sync.WaitGroup
for i := 1; i <= 3; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
// Acquire permit with timeout for shutdown
ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
defer cancel()
if err := limiter.Wait(ctx); err != nil {
fmt.Printf("Worker %d: shutdown timeout\n", id)
return
}
defer limiter.Release()
time.Sleep(50 * time.Millisecond)
}(i)
}
wg.Wait()
fmt.Printf("Final state: available=%d, in_use=%d\n",
limiter.Available(), limiter.InUse())
}
Output: Final state: available=2, in_use=0
Example (HttpServerLimiting) ¶
Example_httpServerLimiting demonstrates HTTP request limiting pattern
package main
import (
"fmt"
"sync"
"time"
"github.com/1mb-dev/goflow/pkg/ratelimit/concurrency"
)
func main() {
// Limit concurrent HTTP requests
requestLimiter, err := concurrency.NewSafe(100)
if err != nil {
panic(fmt.Sprintf("Failed to create limiter: %v", err))
}
// Simulate request handler
handleRequest := func(requestID int) {
if !requestLimiter.Acquire() {
fmt.Printf("Request %d: Server busy, try again later\n", requestID)
return
}
defer requestLimiter.Release()
// Simulate request processing time
time.Sleep(10 * time.Millisecond)
}
// Simulate burst of requests
var wg sync.WaitGroup
for i := 1; i <= 3; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
handleRequest(id)
}(i)
}
wg.Wait()
fmt.Printf("All requests processed. Server state: %d/%d in use\n",
requestLimiter.InUse(), requestLimiter.Capacity())
}
Output: All requests processed. Server state: 0/100 in use
Example (MultiplePermits) ¶
Example_multiplePermits demonstrates acquiring multiple permits at once
package main
import (
"fmt"
"time"
"github.com/1mb-dev/goflow/pkg/ratelimit/concurrency"
)
func main() {
limiter, err := concurrency.NewSafe(5)
if err != nil {
panic(fmt.Sprintf("Failed to create limiter: %v", err))
}
// Acquire multiple permits for batch operation
if limiter.AcquireN(3) {
fmt.Printf("Batch operation started. Available: %d, In use: %d\n",
limiter.Available(), limiter.InUse())
// Simulate batch work
time.Sleep(10 * time.Millisecond)
// Release all permits
limiter.ReleaseN(3)
fmt.Printf("Batch operation completed. Available: %d, In use: %d\n",
limiter.Available(), limiter.InUse())
} else {
fmt.Println("Not enough permits available for batch operation")
}
}
Output: Batch operation started. Available: 2, In use: 3 Batch operation completed. Available: 5, In use: 0
Example (WithTimeout) ¶
Example_withTimeout demonstrates using timeouts with concurrency limiter
package main
import (
"context"
"fmt"
"time"
"github.com/1mb-dev/goflow/pkg/ratelimit/concurrency"
)
func main() {
// Small limiter to demonstrate blocking
limiter, err := concurrency.NewSafe(1)
if err != nil {
panic(fmt.Sprintf("Failed to create limiter: %v", err))
}
// Fill the limiter
limiter.Acquire()
// Try to acquire with timeout
ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond)
defer cancel()
err = limiter.Wait(ctx)
if err != nil {
fmt.Printf("Failed to acquire permit: %v\n", err)
} else {
fmt.Println("Permit acquired")
limiter.Release()
}
}
Output: Failed to acquire permit: context deadline exceeded
Example (WorkerPool) ¶
Example_workerPool demonstrates using concurrency limiter for a worker pool
package main
import (
"context"
"fmt"
"sync"
"time"
"github.com/1mb-dev/goflow/pkg/ratelimit/concurrency"
)
func main() {
// Limit concurrent workers to 2
limiter, err := concurrency.NewSafe(2)
if err != nil {
panic(fmt.Sprintf("Failed to create limiter: %v", err))
}
tasks := []string{"task1", "task2", "task3", "task4", "task5"}
var wg sync.WaitGroup
for _, task := range tasks {
wg.Add(1)
go func(taskName string) {
defer wg.Done()
// Wait for permit (blocks if necessary)
ctx := context.Background()
if err := limiter.Wait(ctx); err != nil {
fmt.Printf("Failed to acquire permit for %s: %v\n", taskName, err)
return
}
defer limiter.Release()
// Simulate work
time.Sleep(100 * time.Millisecond)
}(task)
}
wg.Wait()
fmt.Println("All tasks completed")
}
Output: All tasks completed
Index ¶
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Config ¶
type Config struct {
// Capacity is the maximum number of concurrent operations allowed.
Capacity int
// InitialAvailable is the initial number of available permits.
// If negative or greater than Capacity, defaults to Capacity.
InitialAvailable int
}
Config holds configuration options for creating a new concurrency Limiter.
type Limiter ¶
type Limiter interface {
// Acquire attempts to acquire a permit for one operation.
// It returns true if a permit was available, false otherwise.
// This method does not block.
Acquire() bool
// AcquireN attempts to acquire n permits for operations.
// It returns true if all permits were available, false otherwise.
// This method does not block.
AcquireN(n int) bool
// Wait blocks until a permit is available for one operation.
// It returns an error if the context is canceled or deadline exceeded.
Wait(ctx context.Context) error
// WaitN blocks until n permits are available for operations.
// It returns an error if the context is canceled or deadline exceeded.
WaitN(ctx context.Context, n int) error
// Release releases one permit back to the limiter.
// It panics if more permits are released than were acquired.
Release()
// ReleaseN releases n permits back to the limiter.
// It panics if more permits are released than were acquired.
ReleaseN(n int)
// SetCapacity changes the maximum number of concurrent operations allowed.
// If the new capacity is less than current usage, it will take effect
// as operations complete and permits are released.
SetCapacity(capacity int)
// Capacity returns the maximum number of concurrent operations allowed.
Capacity() int
// Available returns the number of permits currently available.
Available() int
// InUse returns the number of permits currently in use.
InUse() int
}
Limiter controls the number of concurrent operations that can happen at any given time. It acts as a semaphore with additional features like context support and state inspection.
func NewSafe ¶
NewSafe creates a new concurrency limiter with validation that returns an error instead of panicking. This is the recommended way to create concurrency limiters for production use.
func NewWithConfigSafe ¶
NewWithConfigSafe creates a new concurrency limiter with validation that returns an error instead of panicking. This is the recommended way to create concurrency limiters for production use.