conc

package module
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 2, 2023 License: MIT Imports: 5 Imported by: 143

README

conc: better structured concurrency for go

conc is your toolbelt for structured concurrency in go, making common tasks easier and safer.

The main goals of the package are:

  1. Make it harder to leak goroutines
  2. Handle panics gracefully
  3. Make concurrent code easier to read

Goal #1: Make it harder to leak goroutines

A common pain point when working with goroutines is cleaning them up. It's really easy to fire off a go statement and fail to properly wait for it to complete.

conc takes the opinionated stance that all concurrency should be scoped. That is, goroutines should have an owner and that owner should always ensure that its owned goroutines exit properly.

In conc, the owner of a goroutine is always a conc.WaitGroup. Goroutines are spawned in a WaitGroup with (*WaitGroup).Go(), and (*WaitGroup).Wait() should always be called before the WaitGroup goes out of scope.

In some cases, you might want a spawned goroutine to outlast the scope of the caller. In that case, you could pass a WaitGroup into the spawning function.

func main() {
    var wg conc.WaitGroup
    defer wg.Wait()

    startTheThing(&wg)
}

func startTheThing(wg *conc.WaitGroup) {
    wg.Go(func() { ... })
}

For some more discussion on why scoped concurrency is nice, check out this blog post.

Goal #2: Handle panics gracefully

A frequent problem with goroutines in long-running applications is handling panics. A goroutine spawned without a panic handler will crash the whole process on panic. This is usually undesirable.

However, if you do add a panic handler to a goroutine, what do you do with the panic once you catch it? Some options:

  1. Ignore it
  2. Log it
  3. Turn it into an error and return that to the goroutine spawner
  4. Propagate the panic to the goroutine spawner

Ignoring panics is a bad idea since panics usually mean there is actually something wrong and someone should fix it.

Just logging panics isn't great either because then there is no indication to the spawner that something bad happened, and it might just continue on as normal even though your program is in a really bad state.

Both (3) and (4) are reasonable options, but both require the goroutine to have an owner that can actually receive the message that something went wrong. This is generally not true with a goroutine spawned with go, but in the conc package, all goroutines have an owner that must collect the spawned goroutine. In the conc package, any call to Wait() will panic if any of the spawned goroutines panicked. Additionally, it decorates the panic value with a stacktrace from the child goroutine so that you don't lose information about what caused the panic.

Doing this all correctly every time you spawn something with go is not trivial and it requires a lot of boilerplate that makes the important parts of the code more difficult to read, so conc does this for you.

stdlib conc
type caughtPanicError struct {
	val   any
	stack []byte
}

func (e *caughtPanicError) Error() string {
	return fmt.Sprintf("panic: %q\n%s", e.val, string(e.stack))
}

func spawn() {
	done := make(chan error)
	go func() {
		defer func() {
			if val := recover(); val != nil {
				done <- caughtPanicError{
					val: val, 
					stack: debug.Stack()
				}
			} else {
				done <- nil
			}
		}()
		doSomethingThatMightPanic()
	}()
	err := <-done
	if err != nil {
		panic(err)
	}
}
func spawn() {
    var wg conc.WaitGroup
    wg.Go(doSomethingThatMightPanic)
    wg.Wait()
}

Goal #3: Make concurrent code easier to read

Doing concurrency correctly is difficult. Doing it in a way that doesn't obfuscate what the code is actually doing is more difficult. The conc package attempts to make common operations easier by abstracting as much boilerplate complexity as possible.

Want to run a set of concurrent tasks with a bounded set of goroutines? Use pool.New(). Want to process an ordered stream of results concurrently, but still maintain order? Try stream.New(). What about a concurrent map over a slice? Take a peek at iter.Map().

Browse some examples below for some comparisons with doing these by hand.

Examples

Each of these examples forgoes propagating panics for simplicity. To see what kind of complexity that would add, check out the "Goal #2" header above.

Spawn a set of goroutines and waiting for them to finish:

stdlib conc
func main() {
	var wg sync.WaitGroup
	for i := 0; i < 10; i++ {
		wg.Add(1)
		go func() {
			defer wg.Done()
			// if doSomething panics, the process crashes!
			doSomething()
		}()
	}
	wg.Wait()
}
func main() {
	var wg conc.WaitGroup
	for i := 0; i < 10; i++ {
		wg.Go(doSomething)
	}
	wg.Wait()
}

Process each element of a stream in a static pool of goroutines:

stdlib conc
func process(stream chan int) {
	var wg sync.WaitGroup
	for i := 0; i < 10; i++ {
		wg.Add(1)
		go func() {
			defer wg.Done()
			for elem := range stream {
				handle(elem)
			}
		}()
	}
	wg.Wait()
}
func process(stream chan int) {
	p := pool.New().WithMaxGoroutines(10)
	for elem := range stream {
		p.Go(func() {
			handle(values[i])
		})
	}
	p.Wait()
}

Process each element of a slice in a static pool of goroutines:

stdlib conc
func main() {
	values := []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}

	feeder := make(chan int, 8)

	var wg sync.WaitGroup
	for i := 0; i < 10; i++ {
		wg.Add(1)
		go func() {
			defer wg.Done()
			for elem := range feeder {
				handle(elem)
			}
		}()
	}

	for _, value := range values {
		feeder <- value
	}

	wg.Wait()
}
func main() {
	values := []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}
	iter.ForEach(values, handle)
}

Concurrently map a slice:

stdlib conc
func concMap(input []int, f func(int) int) []int {
	res := make([]int, len(input))
	var idx atomic.Int64

	var wg sync.WaitGroup
	for i := 0; i < 10; i++ {
		wg.Add(1)
		go func() {
			defer wg.Done()

			for {
				i := int(idx.Add(1) - 1)
				if i >= len(input) {
					return
				}

				res[i] = f(input[i])
			}
		}()
	}
	wg.Wait()
	return res
}
func concMap(input []int, f func(int) int) []int {
	iter.Map(input, f)
}

Process an ordered stream concurrently:

stdlib conc
func mapStream(input chan int, output chan int, f func(int) int) {
	tasks := make(chan func())
	taskResults := make(chan chan int)

	// Spawn the worker goroutines
	var workerWg sync.WaitGroup
	for i := 0; i < 10; i++ {
		workerWg.Add(1)
		go func() {
			defer workerWg.Done()
			for task := range tasks {
				task()
			}
		}()
	}

	// Spawn the goroutine that reads results in order
	var readerWg sync.WaitGroup
	readerWg.Add(1)
	go func() {
		defer readerWg.Done()
		for taskResult := range taskResults {
			output <- taskResult
		}
	}

	// Feed the workers with tasks
	for elem := range input {
		resultCh := make(chan int, 1)
		taskResults <- resultCh
		tasks <- func() {
			resultCh <- f(elem)
		}
	}

	// We've exhausted input. Wait for everything to finish
	close(tasks)
	workerWg.Wait()
	close(taskResults)
	readerWg.Wait()
}
func mapStream(input chan int, output chan int, f func(int) int) {
	s := stream.New().WithMaxGoroutines(10)
	for elem := range input {
		elem := elem
		s.Go(func() {
			res := f(elem)
			return func() { output <- res }
		})
	}
	s.Wait()
}

Documentation

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type PanicCatcher

type PanicCatcher struct {
	// contains filtered or unexported fields
}

PanicCatcher is used to catch panics. You can execute a function with Try, which will catch any spawned panic. Try can be called any number of times, from any number of goroutines. Once all calls to Try have completed, you can get the value of the first panic (if any) with Value(), or you can just propagate the panic (re-panic) with Propagate()

Example
var pc PanicCatcher
i := 0
pc.Try(func() { i += 1 })
pc.Try(func() { panic("abort!") })
pc.Try(func() { i += 1 })

rc := pc.Recovered()

fmt.Println(i)
fmt.Println(rc.Value.(string))
Output:

2
abort!
Example (Callers)
var pc PanicCatcher
pc.Try(func() { panic("mayday!") })

recovered := pc.Recovered()

// For debugging, the pre-formatted recovered.Stack is easier to use than
// rc.Callers. This is not used in the example because its output is
// machine-specific.

frames := runtime.CallersFrames(recovered.Callers)
for {
	frame, more := frames.Next()

	fmt.Println(frame.Function)

	if !more {
		break
	}
}
Output:

github.com/sourcegraph/conc.(*PanicCatcher).tryRecover
runtime.gopanic
github.com/sourcegraph/conc.ExamplePanicCatcher_callers.func1
github.com/sourcegraph/conc.(*PanicCatcher).Try
github.com/sourcegraph/conc.ExamplePanicCatcher_callers
testing.runExample
testing.runExamples
testing.(*M).Run
main.main
runtime.main
runtime.goexit

func (*PanicCatcher) Recovered

func (p *PanicCatcher) Recovered() *RecoveredPanic

Recovered returns the value of the first panic caught by Try, or nil if no calls to Try panicked.

func (*PanicCatcher) Repanic

func (p *PanicCatcher) Repanic()

Repanic panics if any calls to Try caught a panic. It will panic with the value of the first panic caught, wrapped in a RecoveredPanic with caller information.

func (*PanicCatcher) Try

func (p *PanicCatcher) Try(f func())

Try executes f, catching any panic it might spawn. It is safe to call from multiple goroutines simultaneously.

type RecoveredPanic

type RecoveredPanic struct {
	// The original value of the panic
	Value any
	// The caller list as returned by runtime.Callers when the panic was
	// recovered. Can be used to produce a more detailed stack information with
	// runtime.CallersFrames.
	Callers []uintptr
	// The formatted stacktrace from the goroutine where the panic was recovered.
	// Easier to use than Callers.
	Stack []byte
}

RecoveredPanic is a panic that was caught with recover().

func NewRecoveredPanic

func NewRecoveredPanic(skip int, value any) RecoveredPanic

NewRecoveredPanic creates a RecoveredPanic from a panic value and a collected stacktrace. The skip parameter allows the caller to skip stack frames when collecting the stacktrace. Calling with a skip of 0 means include the call to NewRecoveredPanic in the stacktrace.

func (*RecoveredPanic) Error

func (c *RecoveredPanic) Error() string

func (*RecoveredPanic) Unwrap

func (c *RecoveredPanic) Unwrap() error

type WaitGroup

type WaitGroup struct {
	// contains filtered or unexported fields
}

WaitGroup is the primary building block for scoped concurrency. Goroutines can be spawned in the WaitGroup with the Go method, and calling Wait() will ensure that each of those goroutines exits before continuing. Any panics in a child goroutine will be caught and propagated to the caller of Wait().

func (*WaitGroup) Go

func (h *WaitGroup) Go(f func())

Go spawns a new goroutine in the WaitGroup

func (*WaitGroup) Wait

func (h *WaitGroup) Wait()

Wait will block until all goroutines spawned with Go exit and will propagate any panics spawned in a child goroutine.

Directories

Path Synopsis
Package stream provides a concurrent, ordered stream implementation.
Package stream provides a concurrent, ordered stream implementation.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL