Documentation
¶
Overview ¶
Package workgraph provides some low-level utilities for coordinating a number of workers that are all collaborating to produce different parts of some overall result, with dynamically-discovered dependencies between those workers.
Workers and requests form a bipartite graph. Every request has exactly one worker responsible for resolving it, and every worker is waiting for zero or one requests to be resolved. If worker A attempts to wait for a result that will be produced by worker B which is waiting for another result that A is responsible for then all requests in that chain immediately fail to avoid deadlock.
This is a "nuts-and-bolts" abstraction intended to be used as an implementation detail of a higher-level system, and is not intended to be treated as a cross-cutting concern that appears in another library's exported API. Use idomatic Go features like blocking calls or channels to represent relationships between concurrent work in larger scopes.
The Workgraph Rules ¶
The Go programming language (intentionally) does not have anything resembling "goroutine-local storage", and so workgraph must unfortunately perform its own tracking of the relationships between goroutines in order to detect self-reference problems and return an error instead of deadlocking. This requires cooperation with the calling program to produce an accurate graph of the current work in progress, and so there are some rules that any caller must follow.
The most important rule is that each goroutine that interacts with the workgraph API must use exactly one Worker object to do so. Using the same worker object across multiple goroutines or using multiple workers in the same goroutine will cause workgraph's view of the world to be inconsistent with the Go runtime's view of the world, which unfortunately means that deadlocks and data races can become possible.
Another important rule is that two goroutines that are using workgraph must communicate with one another using workgraph promises exclusively. If one goroutine blocks on another in any way other than calling Promise.Await then workgraph will be unaware of that relationship and so again deadlocks can become possible.
At any instant each active Promise has exactly one Worker that is responsible for resolving it using its Resolver. The initially-responsible worker is the one passed to NewRequest. If a promise is not resolved before its responsible worker is garbage-collected then an await on the promise returns ErrUnresolved. For this mechanism to work it's important that each Worker object become eligible for garbage collection once its associated goroutine exits. The worker responsible for a promise can transfer that responsibility to another worker by passing the associated resolver as an argument to NewWorker. Only the currently-responsible worker for a promise is allowed to resolve it.
At any instant each Worker is awaiting the result of either zero or one Promise. "Awaiting" means making a blocking call to Promise.Await. Whenever a worker is about to begin waiting, workgraph checks to make sure that this would not cause a cycle in the bipartite graph of promises and workers. Awaiting fails with ErrSelfDependency if such a cycle is detected.
Usage Tips ¶
The previous section described the rules that callers must follow for this library to work as intended. This section has some practical tips for how to structure code so that it's harder to accidentally violate those rules.
First, consider using Once or OnceFunc if they are appropriate for your situation. Although using those does not completely avoid the need for callers to take care of the rules, it does encapsulate the problem of deciding which worker is responsible for resolution -- whichever one happens to request the result first -- so the calling workers don't need to be directly aware of each other or coordinate responsibilty themselves.
Consider using WithNewAsyncWorker to start any new goroutines that will interact with workgraph. This utility really just calls NewWorker and passes its result to the given function on a new goroutine, but that means that the lifetime of the Worker object is closely matched to the goroutine as long as a pointer to the worker never escapes to the heap.
Regardless of whether or not you use WithNewAsyncWorker, store pointers to Worker objects only on the stack. In other words: only in local variables, function arguments, and return values. This makes sure that a worker object becomes eligible for garbage collection no later than the associated goroutine exits. If you find you do need to store a pointer to a worker on the heap, such as when capturing within the closure for a function pointer that escapes, then ensure that the outer object referring to that allocation is only reachable from the stack.
Try to structure your code so that only code in the goroutine associated with the worker responsible for a promise can reach that promise's resolver. For example, if you transfer responsibility to a new worker when calling NewWorker, consider letting the variable that previously contained the resolver go out of scope or assign the zero value of Resolver to it to reinforce that access through that variable after that point is incorrect.
If you're intending to use the request ids from ErrSelfDependency errors to produce end-user-friendly error messages describing what participated in a self-dependency cycle, make sure that each end-user-relevant work item correlates to one NewRequest call and that each one is the responsibility of a separate Worker, because that will then cause workgraph's internal graph of the work in progress to match the end-user model of execution as closely as possible.
If you build your own abstractions that wrap promises in similar ways as Once does, consider exposing the underlying RequestID for each encapsulated promise as an aid to error handling and debugging. Unless your wrapping abstraction encapsulates the decision about who is responsible for resolving a promise (as Once does), consider also having your type implement ResolverContainer so that callers can pass it directly to NewWorker to delegate responsibility without having to unwrap and then re-wrap it.
Overall, try to keep your usage of workgraph scoped to as small a part of your program as possible and minimize how it's exposed in your public API. It's much harder to ensure that the overall program is correctly following the workgraph rules when the uses are distributed widely across a program. If multiple subsystems in your program need to collaborate using workgraph, try to build a wrapping abstraction that's better tailored to your specific use-case and which encapsulates the workgraph rules in terms of its own rules that are more natural for the overall shape of your program.
Index ¶
- func NewRequest[T any](responsibleWorker *Worker) (Resolver[T], Promise[T])
- func OnceFunc[T any](f func(*Worker) (T, error)) func(*Worker) (T, error)
- func WithNewAsyncWorker(f func(*Worker), delegatedResults ...ResolverContainer)
- type AnyResolver
- type ErrSelfDependency
- type ErrUnresolved
- type Once
- type Promise
- type RequestID
- type Resolver
- func (r Resolver[T]) ContainedResolvers() iter.Seq[AnyResolver]
- func (r Resolver[T]) Report(resolvingWorker *Worker, val T, err error)
- func (r Resolver[T]) ReportError(resolvingWorker *Worker, err error)
- func (r Resolver[T]) ReportSuccess(resolvingWorker *Worker, val T)
- func (r Resolver[T]) RequestID() RequestID
- type ResolverContainer
- type Worker
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func NewRequest ¶
NewRequest begins a new request and returns both its resolver and its promise.
The given worker is initially responsible for resolving the request.
func OnceFunc ¶
OnceFunc returns a function that, when called for the first time, will run f using a newly-created Worker, and then that and all subsequent calls will return whatever that function returns.
This is a convenience wrapper for Once in situations where the underlying RequestID is unimportant and so just a function pointer is sufficient, and where it's helpful to capture the function to run inside the result so it can be called from many different locations.
Example ¶
This contrived example just returns static strings to keep things simple, but in more typical use these "getters" could perform more complex operations such as fetching data from elsewhere, blocking for as long as they want as long as they can only block on each other through workgraph-based features.
package main
import (
"fmt"
"log"
"github.com/apparentlymart/go-workgraph/workgraph"
)
func main() {
// A more realistic usage would involve these "getters" being constructed
// in different parts of the program and then composed together, but
// this is all written inline just to avoid distracting with unrelated
// complexity.
greetingGetter := workgraph.OnceFunc(func(w *workgraph.Worker) (string, error) {
return "Hello", nil
})
nameGetter := workgraph.OnceFunc(func(w *workgraph.Worker) (string, error) {
return "world", nil
})
msgGetter := workgraph.OnceFunc(func(w *workgraph.Worker) (string, error) {
// The call to greetingGetter blocks until the greeting is ready.
greeting, err := greetingGetter(w)
if err != nil {
return "", fmt.Errorf("greeting failed: %w", err)
}
// The call to nameGetter blocks until the name is ready.
name, err := nameGetter(w)
if err != nil {
return "", fmt.Errorf("name failed: %w", err)
}
return fmt.Sprintf("%s, %s!", greeting, name), nil
})
// We need an initial worker to represent the main goroutine.
w := workgraph.NewWorker()
// This call to msgGetter blocks until the full message is ready, which
// indirectly blocks on greetingGetter and nameGetter's results.
msg, err := msgGetter(w)
if err != nil {
log.Fatalf("message failed: %s", err)
}
fmt.Println(msg)
}
Output: Hello, world!
func WithNewAsyncWorker ¶
func WithNewAsyncWorker(f func(*Worker), delegatedResults ...ResolverContainer)
WithNewSyncWorker is a helper wrapper around NewWorker for the common case of associating a new worker with a new goroutine.
The Worker passed to the given function is immediately responsible for any resolvers given as additional arguments, as if those had been passed to NewWorker.
Types ¶
type AnyResolver ¶
type AnyResolver interface {
// contains filtered or unexported methods
}
AnyResolver is an interface implemented by all instantiations of the generic type Resolver, regardless of their result type.
This is used along with ResolverContainer to delegate resolvers from one worker to another, where it doesn't matter what value type each resolver has.
type ErrSelfDependency ¶
type ErrSelfDependency struct {
// RequestIDs are the identifiers of the requests included in the
// dependency cycle. Callers may use this in conjunction with their own
// records of the meaning of each request ID to return a higher-level error
// that describes the set of requested operations that together caused the
// problem.
RequestIDs []RequestID
}
ErrSelfDependency is returned by Promise.Await if a direct or indirect self-dependency is created in the worker-and-request graph by this or some other call to Promise.Await.
All Await calls blocking on any result in the detected dependency cycle immediately fail with this error.
Example ¶
package main
import (
"fmt"
"github.com/apparentlymart/go-workgraph/workgraph"
)
func main() {
// This example uses [OnceFunc] just for simplicity's sake, but this
// general idea applies to any interaction between workers where a
// worker tries to block on a result it's currently responsible for either
// directly or indirectly.
var a, b func(*workgraph.Worker) (string, error)
a = workgraph.OnceFunc(func(w *workgraph.Worker) (string, error) {
return b(w)
})
b = workgraph.OnceFunc(func(w *workgraph.Worker) (string, error) {
return a(w)
})
// Initial worker for the main goroutine
w := workgraph.NewWorker()
result, err := a(w)
switch err := err.(type) {
case nil:
fmt.Printf("result is %q\n", result)
case workgraph.ErrSelfDependency:
fmt.Printf("self-dependency error involving %d requests", len(err.RequestIDs))
// NOTE: If you actually want to make use of the requset IDs then you'd
// need to either work with [workgraph.Resolver] objects directly or
// use [workgraph.Once] in order to have request ids to compare with
// those in the error message.
default:
fmt.Printf("unexpected error: %s\n", err)
}
}
Output: self-dependency error involving 2 requests
func (ErrSelfDependency) Error ¶
func (err ErrSelfDependency) Error() string
type ErrUnresolved ¶
type ErrUnresolved struct {
// RequestID is the request that was unresolved. This is always the ID of
// the request whose [Promise] the Await method was called on.
RequestID RequestID
}
ErrUnresolved is returned by Promise.Await if the Worker responsible for resolving the request is garbage-collected before the request is resolved.
This suggests a bug in the implementation of the responsible worker, since it should ensure that all requests it is responsible for are either resolved or delegated to another worker before its Worker object goes out of scope.
Note that the Go runtime does not guarantee to collect unused objects at any particular time, and so this is only a best-effort mechanism to try to ensure that workers awaiting unresolved promises can unblock themselves *eventually*, but that might actually be some time after the relevant worker stops running.
func (ErrUnresolved) Error ¶
func (err ErrUnresolved) Error() string
type Once ¶
type Once[T any] struct { // contains filtered or unexported fields }
Once is similar in principle to the standard library's sync.Once, but implemented in terms of the workgraph concepts so that it can provide similar guarantees such as detecting when a Once execution ends up indirectly depending on its own result.
func (*Once[T]) Do ¶
Do calls the function f if and only if Do is being called for the first time on this instance of Once.
Given a specific instance of Once, only the first call with invoke the given f, even if f has a different value on each call.
forWorker is the handle for the worker that the result is requested on behalf of. The given function is called with its own separate Worker that is responsible for providing the return value. If f directly or indirectly causes another call to Do on the same Once then all affected calls will fail with ErrSelfDependency.
type Promise ¶
type Promise[T any] struct { // contains filtered or unexported fields }
Promise is a handle through which many different workers can wait for the result of a request to become available.
type RequestID ¶
type RequestID struct {
// contains filtered or unexported fields
}
RequestID represents an opaque but comparable unique identifier for a request, whose resolver may or may not still be live.
RequestID values are used in some error types returned by this package when reporting situations that could cause deadlock. Callers can therefore maintain a lookup table from RequestID to some higher-level representation of the meaning of a request to allow including more relevant context in externally-facing error results.
Use Resolver.RequestID to find the identity of the request that a particular resolver is associated with.
var NoRequest RequestID
NoRequest is the zero value of RequestID, representing the absence of any request.
No actual request has this request ID; this value can be used, for example, when a caller asks for the request ID for a request that hasn't actually started yet, such as with Once.RequestID.
func (RequestID) Equal ¶
Equal returns true if other is the same RequestID as the receiver.
This is equivalent to using the "==" operator to compare two values, but is implemented here to work better with libraries like Google's "go-cmp" which try to perform deep comparison when no Equal method is present.
type Resolver ¶
type Resolver[T any] struct { // contains filtered or unexported fields }
A Resolver is used by the Worker that is responsible for resolving a request to report its result, thereby unblocking any other workers that are waiting for its resolution.
func (Resolver[T]) ContainedResolvers ¶
func (r Resolver[T]) ContainedResolvers() iter.Seq[AnyResolver]
ContainedResolvers implements ResolverContainer, reporting the reciever itself as the only resolver in the container.
func (Resolver[T]) Report ¶
Report resolves the request with both a result value and an error, both of which will be returned from any Promise.Await calls for the associated request.
func (Resolver[T]) ReportError ¶
ReportSuccess is a helper for Resolver.Report which automatically sets the value part of the result to the zero value of T, suggesting an error result without any useful accompanying error.
func (Resolver[T]) ReportSuccess ¶
ReportSuccess is a helper for Resolver.Report which automatically sets the error to nil, suggesting a successful result.
type ResolverContainer ¶
type ResolverContainer interface {
ContainedResolvers() iter.Seq[AnyResolver]
}
ResolverContainer is implemented by types that in some sense "contain" Resolver objects, allowing the responsibility for all of those results to be passed in aggregate to a new task when calling NewWorker.
*Resolver itself implements this interface, so callers with no need for any higher-level aggregation can pass individual Resolver values directly instead of implementing this interface themselves.
type Worker ¶
type Worker struct {
// contains filtered or unexported fields
}
A Worker represents a specific linear codepath that will ultimately resolve zero or more results.
It's ultimately up to the caller to decide what exactly "linear codepath" means. The simplest mental model is for each Worker to belong to one goroutine and for that worker to go out of scope once the goroutine exits, with no other goroutine interacting with it.
However, the only hard constraint is that each worker can only be waiting on zero or one results at a time, and so two goroutines can potentially share a single worker as long as they somehow arrange for at most one of them to interact with the worker at a time.
A Worker object must be kept live (i.e. not garbage collected) until it has either provided or delegated all of the results that it's responsible for, or else those results will all fail with an error. This is a best-effort mechanism to reclaim other workers that could otherwise be blocked indefinitely, but should not be relied on in any "happy path" because the Go garbage collection details are intentionally underspecified to allow for future improvements.
func NewWorker ¶
func NewWorker(delegatedResolvers ...ResolverContainer) *Worker
NewWorker allocates a new Worker, optionally transferring responsibility for resolving some requests.
Callers are responsible for ensuring that a caller passing resolvers to this function was previously considered to be responsible for those resolvers. Although there are no immediate checks that the caller was already responsible for the given requests (the relationship between codepaths and workers is the caller's concern), incorrect use of this can potentially be detected later if the previous responsible worker subsequently attempts to resolve the request that was delegated.