concurrency

package
v2.3.8 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 1, 2025 License: MIT Imports: 4 Imported by: 2

Documentation

Overview

Package concurrency contain some functions to support concurrent programming. eg, goroutine, channel.

Package concurrency contain some functions to support concurrent programming. eg, goroutine, channel, locker.

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Channel

type Channel[T any] struct {
}

Channel is a logic object which can generate or manipulate go channel all methods of Channel are in the book tilted《Concurrency in Go》

func NewChannel

func NewChannel[T any]() *Channel[T]

NewChannel return a Channel instance

func (*Channel[T]) Bridge

func (c *Channel[T]) Bridge(ctx context.Context, chanStream <-chan <-chan T) <-chan T

Bridge link multiply channels into one channel. Play: https://go.dev/play/p/qmWSy1NVF-Y

Example
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
m1 := make(map[int]int)
m2 := make(map[int]int)
c := NewChannel[int]()
genVals := func() <-chan <-chan int {
	out := make(chan (<-chan int))
	go func() {
		defer close(out)
		for i := 1; i <= 5; i++ {
			stream := make(chan int, 1)
			stream <- i
			m1[i]++
			close(stream)
			out <- stream
		}
	}()
	return out
}

for v := range c.Bridge(ctx, genVals()) {
	m2[v]++
}
for k, v := range m1 {
	fmt.Println(m2[k] == v)
}
Output:

true
true
true
true
true

func (*Channel[T]) FanIn

func (c *Channel[T]) FanIn(ctx context.Context, channels ...<-chan T) <-chan T

FanIn merge multiple channels into one channel. Play: https://go.dev/play/p/2VYFMexEvTm

Example
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

c := NewChannel[int]()
channels := make([]<-chan int, 2)

for i := 0; i < 2; i++ {
	channels[i] = c.Take(ctx, c.Repeat(ctx, i), 2)
}

chs := c.FanIn(ctx, channels...)

for v := range chs {
	fmt.Println(v) //1 1 0 0 or 0 0 1 1
}

func (*Channel[T]) Generate

func (c *Channel[T]) Generate(ctx context.Context, values ...T) <-chan T

Generate creates channel, then put values into the channel. Play: https://go.dev/play/p/7aB4KyMMp9A

Example
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

c := NewChannel[int]()
intStream := c.Generate(ctx, 1, 2, 3)

fmt.Println(<-intStream)
fmt.Println(<-intStream)
fmt.Println(<-intStream)
Output:

1
2
3

func (*Channel[T]) Or

func (c *Channel[T]) Or(channels ...<-chan T) <-chan T

Or read one or more channels into one channel, will close when any readin channel is closed. Play: https://go.dev/play/p/Wqz9rwioPww

Example
sig := func(after time.Duration) <-chan any {
	c := make(chan any)
	go func() {
		defer close(c)
		time.Sleep(after)
	}()
	return c
}

start := time.Now()

c := NewChannel[any]()
<-c.Or(
	sig(1*time.Second),
	sig(2*time.Second),
	sig(3*time.Second),
)

if time.Since(start).Seconds() < 2 {
	fmt.Println("ok")
}
Output:

ok

func (*Channel[T]) OrDone

func (c *Channel[T]) OrDone(ctx context.Context, channel <-chan T) <-chan T

OrDone read a channel into another channel, will close until cancel context. Play: https://go.dev/play/p/lm_GoS6aDjo

Example
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

c := NewChannel[int]()
intStream := c.Take(ctx, c.Repeat(ctx, 1), 3)

for v := range c.OrDone(ctx, intStream) {
	fmt.Println(v)
}
Output:

1
1
1

func (*Channel[T]) Repeat

func (c *Channel[T]) Repeat(ctx context.Context, values ...T) <-chan T

Repeat create channel, put values into the channel repeatly until cancel the context. Play: https://go.dev/play/p/k5N_ALVmYjE

Example
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

c := NewChannel[int]()
intStream := c.Take(ctx, c.Repeat(ctx, 1, 2), 4)

for v := range intStream {
	fmt.Println(v)
}
Output:

1
2
1
2

func (*Channel[T]) RepeatFn

func (c *Channel[T]) RepeatFn(ctx context.Context, fn func() T) <-chan T

RepeatFn create a channel, excutes fn repeatly, and put the result into the channel until close context. Play: https://go.dev/play/p/4J1zAWttP85

Example
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

fn := func() string {
	return "hello"
}

c := NewChannel[string]()
intStream := c.Take(ctx, c.RepeatFn(ctx, fn), 3)

for v := range intStream {
	fmt.Println(v)
}
Output:

hello
hello
hello

func (*Channel[T]) Take

func (c *Channel[T]) Take(ctx context.Context, valueStream <-chan T, number int) <-chan T

Take create a channel whose values are taken from another channel with limit number. Play: https://go.dev/play/p/9Utt-1pDr2J

Example
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

numbers := make(chan int, 5)
numbers <- 1
numbers <- 2
numbers <- 3
numbers <- 4
numbers <- 5
defer close(numbers)

c := NewChannel[int]()
intStream := c.Take(ctx, numbers, 3)

for v := range intStream {
	fmt.Println(v)
}
Output:

1
2
3

func (*Channel[T]) Tee

func (c *Channel[T]) Tee(ctx context.Context, in <-chan T) (<-chan T, <-chan T)

Tee split one chanel into two channels, until cancel the context. Play: https://go.dev/play/p/3TQPKnCirrP

Example
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

c := NewChannel[int]()
intStream := c.Take(ctx, c.Repeat(ctx, 1), 2)

ch1, ch2 := c.Tee(ctx, intStream)

for v := range ch1 {
	fmt.Println(v)
	fmt.Println(<-ch2)
}
Output:

1
1
1
1

type KeyedLocker added in v2.3.6

type KeyedLocker[K comparable] struct {
	// contains filtered or unexported fields
}

KeyedLocker is a simple implementation of a keyed locker that allows for non-blocking lock acquisition.

func NewKeyedLocker added in v2.3.6

func NewKeyedLocker[K comparable](ttl time.Duration) *KeyedLocker[K]

NewKeyedLocker creates a new KeyedLocker with the specified TTL for lock expiration. The TTL is used to automatically release locks that are no longer held. Play: https://go.dev/play/p/GzeyC33T5rw

func (*KeyedLocker[K]) Do added in v2.3.6

func (l *KeyedLocker[K]) Do(ctx context.Context, key K, fn func()) error

Do acquires a lock for the specified key and executes the provided function. It returns an error if the context is canceled before the function completes. Play: https://go.dev/play/p/GzeyC33T5rw

Example
locker := NewKeyedLocker[string](2 * time.Second)

task := func() {
	fmt.Println("Executing task...")
	time.Sleep(1 * time.Second)
	fmt.Println("Task completed.")
}

ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()

if err := locker.Do(ctx, "mykey", task); err != nil {
	log.Fatalf("Error executing task: %v\n", err)
} else {
	fmt.Println("Task successfully executed.")
}

ctx2, cancel2 := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel2()

if err := locker.Do(ctx2, "mykey", task); err != nil {
	log.Fatalf("Error executing task: %v\n", err)
} else {
	fmt.Println("Task successfully executed.")
}
Output:

Executing task...
Task completed.
Task successfully executed.
Executing task...
Task completed.
Task successfully executed.

type RWKeyedLocker added in v2.3.6

type RWKeyedLocker[K comparable] struct {
	// contains filtered or unexported fields
}

RWKeyedLocker is a read-write version of KeyedLocker.

func NewRWKeyedLocker added in v2.3.6

func NewRWKeyedLocker[K comparable](ttl time.Duration) *RWKeyedLocker[K]

NewRWKeyedLocker creates a new RWKeyedLocker with the specified TTL for lock expiration. The TTL is used to automatically release locks that are no longer held. Play: https://go.dev/play/p/CkaJWWwZm9

func (*RWKeyedLocker[K]) Lock added in v2.3.6

func (l *RWKeyedLocker[K]) Lock(ctx context.Context, key K, fn func()) error

Lock acquires a write lock for the specified key and executes the provided function. It returns an error if the context is canceled before the function completes. Play: https://go.dev/play/p/WgAcXbOPKGk

Example
locker := NewRWKeyedLocker[string](2 * time.Second)

// Simulate a key
key := "resource_key"

fn := func() {
	fmt.Println("Starting write operation...")
	// Simulate write operation, assuming it takes 2 seconds
	time.Sleep(200 * time.Millisecond)
	fmt.Println("Write operation completed!")
}

// Acquire the write lock and execute the operation
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()

// Execute the lock operation with a 3-second timeout
err := locker.Lock(ctx, key, fn)
if err != nil {
	return
}
Output:

Starting write operation...
Write operation completed!

func (*RWKeyedLocker[K]) RLock added in v2.3.6

func (l *RWKeyedLocker[K]) RLock(ctx context.Context, key K, fn func()) error

RLock acquires a read lock for the specified key and executes the provided function. It returns an error if the context is canceled before the function completes. Play: https://go.dev/play/p/ZrCr8sMo77T

Example
locker := NewRWKeyedLocker[string](2 * time.Second)

// Simulate a key
key := "resource_key"

fn := func() {
	fmt.Println("Starting write operation...")
	// Simulate write operation, assuming it takes 2 seconds
	time.Sleep(200 * time.Millisecond)
	fmt.Println("Write operation completed!")
}

// Acquire the write lock and execute the operation
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()

// Execute the lock operation with a 3-second timeout
err := locker.RLock(ctx, key, fn)
if err != nil {
	return
}
Output:

Starting write operation...
Write operation completed!

type TryKeyedLocker added in v2.3.6

type TryKeyedLocker[K comparable] struct {
	// contains filtered or unexported fields
}

TryKeyedLocker is a non-blocking version of KeyedLocker. It allows for trying to acquire a lock without blocking if the lock is already held.

Example
locker := NewTryKeyedLocker[string]()

key := "resource_key"

if locker.TryLock(key) {
	fmt.Println("Lock acquired")
	time.Sleep(1 * time.Second)
	// Unlock after work is done
	locker.Unlock(key)
	fmt.Println("Lock released")
} else {
	fmt.Println("Lock failed")
}
Output:

Lock acquired
Lock released

func NewTryKeyedLocker added in v2.3.6

func NewTryKeyedLocker[K comparable]() *TryKeyedLocker[K]

NewTryKeyedLocker creates a new TryKeyedLocker. Play: https://go.dev/play/p/VG9qLvyetE2

func (*TryKeyedLocker[K]) TryLock added in v2.3.6

func (l *TryKeyedLocker[K]) TryLock(key K) bool

TryLock tries to acquire a lock for the specified key. It returns true if the lock was acquired, false otherwise. Play: https://go.dev/play/p/VG9qLvyetE2

Example
locker := NewTryKeyedLocker[string]()

key := "resource_key"

done := make(chan struct{})
go func() {
	if locker.TryLock(key) {
		time.Sleep(2 * time.Second)
		locker.Unlock(key)
	}
	close(done)
}()

time.Sleep(100 * time.Millisecond)

if locker.TryLock(key) {
	fmt.Println("Lock acquired")
	locker.Unlock(key)
} else {
	fmt.Println("Lock failed")
}

// wait for the goroutine to finish
<-done

fmt.Println("Retrying...")
time.Sleep(100 * time.Millisecond)

if locker.TryLock(key) {
	fmt.Println("Lock acquired")
	locker.Unlock(key)
	fmt.Println("Lock released")
} else {
	fmt.Println("Lock failed")
}
Output:

Lock failed
Retrying...
Lock acquired
Lock released

func (*TryKeyedLocker[K]) Unlock added in v2.3.6

func (l *TryKeyedLocker[K]) Unlock(key K)

Unlock releases the lock for the specified key. Play: https://go.dev/play/p/VG9qLvyetE2

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL