semaphore

package
v1.20.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 10, 2026 License: MIT Imports: 8 Imported by: 0

README

Semaphore Package

High-level semaphore implementation with integrated progress bar support for concurrent goroutine management in Go.

Overview

The semaphore package provides a comprehensive solution for controlling concurrent worker execution with optional visual progress tracking. It combines the functionality of base semaphore operations (sem subpackage) with visual progress bars (bar subpackage) to offer both worker concurrency control and real-time progress visualization using the MPB (Multi-Progress Bar) library.

Architecture

The package is organized into three main components:

semaphore/
├── Main Package (github.com/nabbar/golib/semaphore)
│   └── High-level wrapper combining sem + bar + progress tracking
├── sem/ (github.com/nabbar/golib/semaphore/sem)
│   └── Base semaphore implementations (weighted & WaitGroup-based)
├── bar/ (github.com/nabbar/golib/semaphore/bar)
│   └── Progress bar integration with semaphore operations
└── types/ (github.com/nabbar/golib/semaphore/types)
    └── Core interfaces and type definitions
Component Responsibilities
┌─────────────────────────────────────────────────────────┐
│                   Main Package                          │
│  ┌────────────────┐  ┌──────────────┐  ┌─────────────┐│
│  │   Semaphore    │  │   Progress   │  │   Context   ││
│  │   Interface    │  │   Bar        │  │   Support   ││
│  └────────┬───────┘  └──────┬───────┘  └──────┬──────┘│
│           │                  │                  │       │
└───────────┼──────────────────┼──────────────────┼───────┘
            │                  │                  │
    ┌───────▼────────┐  ┌─────▼──────┐  ┌───────▼──────┐
    │  sem Package   │  │bar Package │  │types Package │
    │                │  │            │  │              │
    │ • Weighted     │  │ • Progress │  │ • Sem        │
    │ • WaitGroup    │  │   Tracking │  │ • SemBar     │
    │ • Worker Mgmt  │  │ • MPB      │  │ • SemPgb     │
    └────────────────┘  └────────────┘  └──────────────┘

Features

Core Capabilities
  • Concurrency Control: Limit the number of concurrent goroutines
  • Flexible Modes:
    • Weighted semaphore (with concurrency limits)
    • WaitGroup-based (unlimited concurrency)
  • Progress Visualization: Optional MPB progress bars
  • Context Integration: Full context.Context support for lifecycle management
  • Type Safety: Well-defined interfaces in types subpackage
  • Thread-Safe: All operations are safe for concurrent use
Progress Bar Types
  • BarBytes: For byte-based operations (downloads, file processing)
  • BarTime: For time-based operations with ETA
  • BarNumber: For numeric counters
  • BarOpts: Custom progress bars with MPB options

Performance

Based on comprehensive test suite (168 test specs):

Metric Value
Test Coverage Main: 100%, sem: 100%, bar: 95%
Total Specs 168 (33 main + 66 sem + 68 bar + 3 race)
Concurrency Up to 1000 concurrent workers tested
Race Conditions None detected (tested with -race)
Execution Time ~3.2s for full suite
Benchmark Results
  • Worker throughput: >10,000 workers/second
  • Progress updates: <1ms latency per update
  • Memory overhead: Minimal (~40 bytes per worker slot)

Installation

go get github.com/nabbar/golib/semaphore

Quick Start

Basic Semaphore (No Progress)
package main

import (
    "context"
    "fmt"
    "time"
    
    "github.com/nabbar/golib/semaphore"
)

func main() {
    ctx := context.Background()
    
    // Create semaphore limiting to 5 concurrent workers
    sem := semaphore.New(ctx, 5, false)
    defer sem.DeferMain()
    
    // Process 20 tasks with max 5 concurrent
    for i := 0; i < 20; i++ {
        if err := sem.NewWorker(); err != nil {
            fmt.Printf("Failed to acquire worker: %v\n", err)
            continue
        }
        
        go func(id int) {
            defer sem.DeferWorker()
            
            // Simulate work
            time.Sleep(100 * time.Millisecond)
            fmt.Printf("Task %d completed\n", id)
        }(i)
    }
    
    // Wait for all workers to complete
    if err := sem.WaitAll(); err != nil {
        fmt.Printf("Error waiting: %v\n", err)
    }
}
With Progress Bar
package main

import (
    "context"
    "time"
    
    "github.com/nabbar/golib/semaphore"
)

func main() {
    ctx := context.Background()
    
    // Create semaphore with progress visualization
    sem := semaphore.New(ctx, 5, true) // true = enable MPB
    defer sem.DeferMain()
    
    // Create a progress bar for 100 items
    bar := sem.BarNumber("Processing", "items", 100, false, nil)
    
    for i := 0; i < 100; i++ {
        if err := bar.NewWorker(); err != nil {
            continue
        }
        
        go func(id int) {
            defer bar.DeferWorker() // Auto-increments progress
            
            // Simulate work
            time.Sleep(50 * time.Millisecond)
        }(i)
    }
    
    bar.WaitAll()
}
Advanced: Multiple Progress Bars
package main

import (
    "context"
    "fmt"
    "time"
    
    "github.com/nabbar/golib/semaphore"
)

func main() {
    ctx := context.Background()
    sem := semaphore.New(ctx, 10, true)
    defer sem.DeferMain()
    
    // Create multiple progress bars
    downloadBar := sem.BarBytes("Download", "files", 1024*1024*100, false, nil)
    processBar := sem.BarNumber("Process", "items", 50, false, downloadBar) // Queue after downloadBar
    
    // Download simulation
    go func() {
        for i := 0; i < 10; i++ {
            if err := downloadBar.NewWorker(); err == nil {
                go func() {
                    defer downloadBar.DeferWorker()
                    downloadBar.Inc64(1024 * 1024 * 10) // 10MB
                    time.Sleep(100 * time.Millisecond)
                }()
            }
        }
        downloadBar.WaitAll()
    }()
    
    // Processing simulation
    go func() {
        for i := 0; i < 50; i++ {
            if err := processBar.NewWorker(); err == nil {
                go func() {
                    defer processBar.DeferWorker()
                    time.Sleep(50 * time.Millisecond)
                }()
            }
        }
        processBar.WaitAll()
    }()
    
    sem.WaitAll()
}

Use Cases

1. Concurrent File Processing
sem := semaphore.New(ctx, 20, true)
defer sem.DeferMain()

bar := sem.BarBytes("Processing", "files", totalBytes, false, nil)

for _, file := range files {
    if err := bar.NewWorker(); err == nil {
        go func(f string) {
            defer bar.DeferWorker()
            
            data, _ := os.ReadFile(f)
            bar.Inc64(int64(len(data)))
            // Process file...
        }(file)
    }
}

bar.WaitAll()
2. API Rate Limiting
// Limit to 10 concurrent API requests
sem := semaphore.New(ctx, 10, false)
defer sem.DeferMain()

for _, endpoint := range endpoints {
    if err := sem.NewWorker(); err == nil {
        go func(url string) {
            defer sem.DeferWorker()
            
            resp, err := http.Get(url)
            // Handle response...
        }(endpoint)
    }
}

sem.WaitAll()
3. Database Connection Pooling
// Create semaphore matching DB connection pool size
sem := semaphore.New(ctx, dbPoolSize, true)
defer sem.DeferMain()

bar := sem.BarNumber("Queries", "executing", len(queries), false, nil)

for _, query := range queries {
    if err := bar.NewWorker(); err == nil {
        go func(q string) {
            defer bar.DeferWorker()
            
            // Execute query within pool limits
            db.Exec(q)
        }(query)
    }
}

bar.WaitAll()
4. Batch Job Processing
sem := semaphore.New(ctx, runtime.NumCPU(), true)
defer sem.DeferMain()

bar := sem.BarNumber("Jobs", "processing", len(jobs), false, nil)

for _, job := range jobs {
    if err := bar.NewWorker(); err == nil {
        go func(j Job) {
            defer bar.DeferWorker()
            
            j.Execute()
        }(job)
    }
}

bar.WaitAll()

API Reference

Main Package
Creating Semaphores
// New creates a semaphore with optional progress
func New(ctx context.Context, nbrSimultaneous int, progress bool, opt ...mpb.ContainerOption) Semaphore

// MaxSimultaneous returns GOMAXPROCS value
func MaxSimultaneous() int

// SetSimultaneous calculates actual limit
func SetSimultaneous(n int) int64
Semaphore Interface
type Semaphore interface {
    context.Context  // Lifecycle management
    types.Sem        // Worker management
    types.Progress   // Progress bar creation
    
    Clone() Semaphore
}
Worker Management
// Blocking acquisition
NewWorker() error

// Non-blocking acquisition
NewWorkerTry() bool

// Release worker slot
DeferWorker()

// Wait for all workers
WaitAll() error

// Get concurrency limit (-1 = unlimited)
Weighted() int64
Progress Bars
// Byte-based progress bar
BarBytes(name, job string, tot int64, drop bool, bar SemBar) SemBar

// Time-based progress bar
BarTime(name, job string, tot int64, drop bool, bar SemBar) SemBar

// Number-based progress bar
BarNumber(name, job string, tot int64, drop bool, bar SemBar) SemBar

// Custom progress bar
BarOpts(tot int64, drop bool, opts ...mpb.BarOption) SemBar
Subpackages
sem - Base Semaphore

Provides core semaphore functionality:

  • Weighted semaphores (with limits)
  • WaitGroup-based (unlimited)
  • Context integration

See: sem/README.md

bar - Progress Bars

Provides progress bar integration:

  • MPB integration
  • Progress tracking
  • Auto-increment on worker release

See: bar/README.md

types - Interfaces

Defines core interfaces:

  • Sem: Base semaphore interface
  • SemBar: Semaphore + progress bar
  • SemPgb: Semaphore + MPB container
  • Bar, Progress: Progress bar interfaces

Concurrency Modes

Weighted Semaphore (Limited)
// Limit to 10 concurrent workers
sem := semaphore.New(ctx, 10, false)

When to use:

  • Resource-limited operations (DB connections, file handles)
  • Rate limiting
  • Memory-constrained environments
WaitGroup Mode (Unlimited)
// No concurrency limit
sem := semaphore.New(ctx, -1, false)

When to use:

  • Pure tracking (no resource limits)
  • Dynamic workloads
  • High-throughput scenarios
Auto-Detect (Use GOMAXPROCS)
// Use CPU count as limit
sem := semaphore.New(ctx, 0, false)

When to use:

  • CPU-bound operations
  • Default choice for most cases

Thread Safety

All semaphore operations are fully thread-safe and can be called concurrently from multiple goroutines:

  • Worker Management: NewWorker(), NewWorkerTry(), DeferWorker() are safe for concurrent calls
  • Progress Updates: Inc(), Dec(), Inc64(), Dec64() use atomic operations
  • State Access: Current(), Total(), Completed() can be read concurrently
  • Timestamp Tracking: Internal timestamp tracking uses atomic.Int64 with Swap() for atomic read-modify-write operations

The package is designed for high-concurrency scenarios and has been tested with:

  • Up to 1000 concurrent goroutines
  • Rapid increment/decrement cycles from multiple goroutines
  • Mixed read/write operations
  • Go race detector (-race flag)

Best Practices

1. Always Use Defer
sem := semaphore.New(ctx, 10, false)
defer sem.DeferMain() // Cleanup resources

if err := sem.NewWorker(); err == nil {
    defer sem.DeferWorker() // Release worker slot
    // Do work...
}
2. Handle Errors
if err := sem.NewWorker(); err != nil {
    log.Printf("Failed to acquire worker: %v", err)
    return
}
defer sem.DeferWorker()
3. Use Context for Timeouts
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
defer cancel()

sem := semaphore.New(ctx, 10, false)
defer sem.DeferMain()
4. Monitor Progress
// Enable progress for long-running operations
sem := semaphore.New(ctx, 10, true)
defer sem.DeferMain()

bar := sem.BarNumber("Tasks", "processing", total, false, nil)
// Progress automatically tracked
5. Graceful Shutdown
// Use context cancellation for graceful shutdown
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

sem := semaphore.New(ctx, 10, true)
defer sem.DeferMain()

// On shutdown signal
go func() {
    <-shutdownChan
    cancel() // Stop accepting new workers
}()

Error Handling

Common Errors
// Context cancelled
err := sem.NewWorker()
if errors.Is(err, context.Canceled) {
    // Handle cancellation
}

// Context deadline exceeded
if errors.Is(err, context.DeadlineExceeded) {
    // Handle timeout
}
Error Recovery
for i := 0; i < tasks; i++ {
    if err := sem.NewWorker(); err != nil {
        log.Printf("Task %d failed to acquire worker: %v", i, err)
        continue // Skip this task
    }
    
    go func(id int) {
        defer sem.DeferWorker()
        
        // Task execution with recovery
        defer func() {
            if r := recover(); r != nil {
                log.Printf("Task %d panicked: %v", id, r)
            }
        }()
        
        // Do work...
    }(i)
}

Contributing

Contributions are welcome! Please note:

  • Do not use AI for package implementation (core logic)
  • AI may assist with tests, documentation, and bug fixes
  • All contributions must include tests
  • Follow existing code style and patterns

License

MIT License - See LICENSE file for details

AI Transparency Notice

This package uses AI assistance for testing, documentation, and bug fixing under human supervision, in compliance with AI Act Article 50.4.

Documentation

Overview

Package semaphore provides a high-level semaphore implementation with integrated progress bar support. It combines the functionality of github.com/nabbar/golib/semaphore/sem (base semaphore) with github.com/nabbar/golib/semaphore/bar (progress bars) for convenient concurrent worker management with optional visual progress tracking.

See also:

  • github.com/nabbar/golib/semaphore/sem - Base semaphore implementations
  • github.com/nabbar/golib/semaphore/bar - Progress bar functionality
  • github.com/nabbar/golib/semaphore/types - Core interfaces

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func MaxSimultaneous added in v1.13.0

func MaxSimultaneous() int

MaxSimultaneous returns the maximum number of concurrent goroutines based on GOMAXPROCS.

See: github.com/nabbar/golib/semaphore/sem.MaxSimultaneous

func SetSimultaneous added in v1.13.0

func SetSimultaneous(n int) int64

SetSimultaneous calculates the actual simultaneous limit based on the requested value.

Returns:

  • MaxSimultaneous() if n < 1
  • MaxSimultaneous() if n > MaxSimultaneous()
  • n otherwise

See: github.com/nabbar/golib/semaphore/sem.SetSimultaneous

Types

type Semaphore added in v1.13.0

type Semaphore interface {
	context.Context // Lifecycle management
	semtps.Sem      // Worker management
	semtps.Progress // Progress bar creation

	// Clone creates a copy of this semaphore with independent worker management
	// but shared MPB progress container (if enabled).
	Clone() Semaphore
}

Semaphore is the main interface combining semaphore functionality with progress bar support. It extends context.Context, Sem, and Progress interfaces to provide:

  • Context lifecycle management (cancellation, deadlines)
  • Worker concurrency control (acquire/release)
  • Progress bar creation and management

See: github.com/nabbar/golib/semaphore/types for interface details

func New added in v1.13.0

func New(ctx context.Context, nbrSimultaneous int, progress bool, opt ...sdkmpb.ContainerOption) Semaphore

New creates a new Semaphore instance.

Parameters:

  • ctx: Parent context for lifecycle management
  • nbrSimultaneous: Number of concurrent workers allowed
  • == 0: Uses MaxSimultaneous() as the limit
  • > 0: Uses the provided value as the limit
  • < 0: No limit (WaitGroup mode)
  • progress: Enable visual progress bars (MPB)
  • opt: Optional MPB container options (only used if progress is true)

Returns:

  • Semaphore: A new semaphore instance

Example:

sem := semaphore.New(ctx, 10, true)
defer sem.DeferMain()

bar := sem.BarNumber("Tasks", "processing", 100, false, nil)
// Use bar for worker management with progress tracking

Directories

Path Synopsis
Package bar provides a semaphore wrapper with integrated progress bar support.
Package bar provides a semaphore wrapper with integrated progress bar support.
Package bar provides a semaphore wrapper with integrated progress bar support.
Package bar provides a semaphore wrapper with integrated progress bar support.
Package sem provides semaphore implementations for controlling concurrent goroutine execution.
Package sem provides semaphore implementations for controlling concurrent goroutine execution.
Package types defines the core interfaces for semaphore implementations with progress tracking.
Package types defines the core interfaces for semaphore implementations with progress tracking.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL