Documentation
¶
Overview ¶
Package derrgroup is a low-level group abstraction; providing synchronization, error propagation, and cancellation callback for groups of goroutines working on subtasks of a common task.
derrgroup is a fork of golang.org/x/sync/errgroup commit 6e8e738ad208923de99951fe0b48239bfd864f28 (2020-06-04). It is forked to provide only things that cannot reasonably be implemented on top of itself; it is impossible to add goroutine enumeration on top of errgroup without duplicating and doubling up on all of errgroup's synchronization/locking. Anything that can reasonably be implemented *on top of* derrgroup is not included in derrgroup:
- Managing `context.Contexts`s (this is something that errgroup kind of does, but derrgroup ripped out, because it can trivially be implemented on top of derrgroup)
- Signal handling
- Logging
- Hard/soft cancellation
- Having `Wait()` timeout on a shutdown that takes too long
Those are all good and useful things to have. But they should be implemented in a layer *on top of* derrgroup. "derrgroup.Group" was originally called "llGroup" for "low-level group"; it is intentionally low-level in order the be a clean primitive for other things to build on top of.
Right now, there have been at least 3 Go implementations of "group" functionality in use at Datawire at the same time (pkg/dgroup and pkg/supervisor are the two still in use), which each offer some subset of the above. derrgroup offers to them a common robust base. If you're writing new application code, you should use one of those, and not use derrgroup directly. If you're writing a new "group" abstraction, you should use derrgroup instead of implementing your own locking/synchronization.
Index ¶
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type GoroutineState ¶
type GoroutineState int
const ( GoroutineRunning GoroutineState = iota GoroutineExited GoroutineErrored )
func (GoroutineState) String ¶
func (s GoroutineState) String() string
type Group ¶
type Group struct {
// contains filtered or unexported fields
}
A Group is a collection of goroutines working on subtasks that are part of the same overall task.
A zero Group is valid and does not cancel on error.
Example (JustErrors) ¶
JustErrors illustrates the use of a Group in place of a sync.WaitGroup to simplify goroutine counting and error handling. This example is derived from the sync.WaitGroup example at https://golang.org/pkg/sync/#example_WaitGroup.
package main
import (
"fmt"
"net/http"
errgroup "github.com/datawire/dlib/derrgroup"
)
func main() {
g := new(errgroup.Group)
var urls = []string{
"http://www.golang.org/",
"http://www.google.com/",
"http://www.somestupidname.com/",
}
for _, url := range urls {
// Launch a goroutine to fetch the URL.
url := url // https://golang.org/doc/faq#closures_and_goroutines
g.Go(url, func() error {
// Fetch the URL.
resp, err := http.Get(url)
if err == nil {
resp.Body.Close()
}
return err
})
}
// Wait for all HTTP fetches to complete.
if err := g.Wait(); err == nil {
fmt.Println("Successfully fetched all URLs.")
}
}
Example (Parallel) ¶
Parallel illustrates the use of a Group for synchronizing a simple parallel task: the "Google Search 2.0" function from https://talks.golang.org/2012/concurrency.slide#46, augmented with a Context and error-handling.
package main
import (
"context"
"fmt"
"os"
errgroup "github.com/datawire/dlib/derrgroup"
)
var (
Web = fakeSearch("web")
Image = fakeSearch("image")
Video = fakeSearch("video")
)
type Result string
type Search func(ctx context.Context, query string) (Result, error)
func fakeSearch(kind string) Search {
return func(_ context.Context, query string) (Result, error) {
return Result(fmt.Sprintf("%s result for %q", kind, query)), nil
}
}
func main() {
Google := func(ctx context.Context, query string) ([]Result, error) {
ctx, cancel := context.WithCancel(ctx)
g := errgroup.NewGroup(cancel, false)
searches := []Search{Web, Image, Video}
results := make([]Result, len(searches))
for i, search := range searches {
i, search := i, search // https://golang.org/doc/faq#closures_and_goroutines
g.Go(fmt.Sprintf("search-%d", i), func() error {
result, err := search(ctx, query)
if err == nil {
results[i] = result
}
return err
})
}
if err := g.Wait(); err != nil {
return nil, err
}
return results, nil
}
results, err := Google(context.Background(), "golang")
if err != nil {
fmt.Fprintln(os.Stderr, err)
return
}
for _, result := range results {
fmt.Println(result)
}
}
Output: web result for "golang" image result for "golang" video result for "golang"
Example (Pipeline) ¶
Pipeline demonstrates the use of a Group to implement a multi-stage pipeline: a version of the MD5All function with bounded parallelism from https://blog.golang.org/pipelines.
package main
import (
"context"
"crypto/md5"
"fmt"
"log"
"os"
"path/filepath"
errgroup "github.com/datawire/dlib/derrgroup"
)
// Pipeline demonstrates the use of a Group to implement a multi-stage
// pipeline: a version of the MD5All function with bounded parallelism from
// https://blog.golang.org/pipelines.
func main() {
m, err := MD5All(context.Background(), ".")
if err != nil {
log.Fatal(err)
}
for k, sum := range m {
fmt.Printf("%s:\t%x\n", k, sum)
}
}
type result struct {
path string
sum [md5.Size]byte
}
// MD5All reads all the files in the file tree rooted at root and returns a map
// from file path to the MD5 sum of the file's contents. If the directory walk
// fails or any read operation fails, MD5All returns an error.
func MD5All(ctx context.Context, root string) (map[string][md5.Size]byte, error) {
// ctx is canceled when g.Wait() returns. When this version of MD5All returns
// - even in case of error! - we know that all of the goroutines have finished
// and the memory they were using can be garbage-collected.
ctx, cancel := context.WithCancel(ctx)
g := errgroup.NewGroup(cancel, false)
paths := make(chan string)
g.Go("walk", func() error {
defer close(paths)
return filepath.Walk(root, func(path string, info os.FileInfo, err error) error {
if err != nil {
return err
}
if !info.Mode().IsRegular() {
return nil
}
select {
case paths <- path:
case <-ctx.Done():
return ctx.Err()
}
return nil
})
})
// Start a fixed number of goroutines to read and digest files.
c := make(chan result)
const numDigesters = 20
for i := 0; i < numDigesters; i++ {
g.Go(fmt.Sprintf("digestor-%d", i), func() error {
for path := range paths {
data, err := os.ReadFile(path)
if err != nil {
return err
}
select {
case c <- result{path, md5.Sum(data)}:
case <-ctx.Done():
return ctx.Err()
}
}
return nil
})
}
go func() {
_ = g.Wait()
close(c)
}()
m := make(map[string][md5.Size]byte)
for r := range c {
m[r.path] = r.sum
}
// Check whether any of the goroutines failed. Since g is accumulating the
// errors, we don't need to send them (or check for them) in the individual
// results sent on the channel.
if err := g.Wait(); err != nil {
return nil, err
}
return m, nil
}
func NewGroup ¶
NewGroup returns a new Group.
The provided 'cancel' function is called the first time a function passed to Go returns a non-nil error.
func (*Group) Go ¶
Go calls the given function in a new goroutine.
The first call to return a non-nil error cancels the group; its error will be returned by Wait.
func (*Group) List ¶
func (g *Group) List() map[string]GoroutineState
List returns a listing of all goroutines launched with Go.