Documentation
¶
Overview ¶
Package dispatch contains logic to dispatch requests locally or to other nodes.
Index ¶
- Constants
- func AddResponseMetadata(existing *v1.ResponseMeta, incoming *v1.ResponseMeta)
- func ApplyDispatchWrap(co query.CanonicalOutline, params queryopt.RequestParams) (query.CanonicalOutline, error)
- func CaveatContextFromPlanContext(pc *v1.PlanContext) map[string]any
- func CheckDepth(ctx context.Context, req DispatchableRequest) error
- func ExtractTraversalTrace(err error) *dispatchv1.LookupDebugInfo
- func HandleTraversalTrace(originalErr error, resourceType, relation string, resourceIds []string) error
- func IsMaxDepthExceeded(err error) bool
- func NewMaxDepthExceededError(req DispatchableRequest) error
- func NewPlanContext(revision string, schemaHash datalayer.SchemaHash, caveatContext map[string]any, ...) *v1.PlanContext
- func NewQueryContext(stdContext context.Context, dispatcher Dispatcher, planContext *v1.PlanContext, ...) *query.Context
- func PaginationLimitFromPlanContext(pc *v1.PlanContext) *uint64
- func PlanOperationLabel(op v1.PlanOperation) string
- func QueryPathToResultPath(p *query.Path) *v1.ResultPath
- type Check
- type CloningCollectingDispatchStream
- type CollectingDispatchStream
- type CountingDispatchStream
- type DispatchExecutor
- func (e *DispatchExecutor) Check(ctx *query.Context, it query.Iterator, resource query.Object, ...) (*query.Path, error)
- func (e *DispatchExecutor) CheckManyResources(ctx *query.Context, it query.Iterator, resources []query.Object, ...) ([]*query.Path, error)
- func (e *DispatchExecutor) CheckManySubjects(ctx *query.Context, it query.Iterator, resource query.Object, ...) ([]*query.Path, error)
- func (e *DispatchExecutor) IterResources(ctx *query.Context, it query.Iterator, subject query.ObjectAndRelation, ...) (query.PathSeq, error)
- func (e *DispatchExecutor) IterSubjects(ctx *query.Context, it query.Iterator, resource query.Object, ...) (query.PathSeq, error)
- type DispatchIterator
- func (d *DispatchIterator) CanonicalKey() query.CanonicalKey
- func (d *DispatchIterator) CheckImpl(ctx *query.Context, resource query.Object, subject query.ObjectAndRelation) (*query.Path, error)
- func (d *DispatchIterator) Clone() query.Iterator
- func (d *DispatchIterator) Explain() query.Explain
- func (d *DispatchIterator) IterResourcesImpl(ctx *query.Context, subject query.ObjectAndRelation, ...) (query.PathSeq, error)
- func (d *DispatchIterator) IterSubjectsImpl(ctx *query.Context, resource query.Object, filterSubjectType query.ObjectType) (query.PathSeq, error)
- func (d *DispatchIterator) ReplaceSubiterators(newSubs []query.Iterator) (query.Iterator, error)
- func (d *DispatchIterator) ResourceType() ([]query.ObjectType, error)
- func (d *DispatchIterator) Serialize(w io.Writer) error
- func (d *DispatchIterator) Subiterators() []query.Iterator
- func (d *DispatchIterator) SubjectTypes() ([]query.ObjectType, error)
- type DispatchableRequest
- type Dispatcher
- type Expand
- type HandlingDispatchStream
- type LookupResources2
- type LookupResources2Stream
- type LookupResources3
- type LookupResources3Stream
- type LookupSubjects
- type LookupSubjectsStream
- type MaxDepthExceededError
- type MetricsOptions
- type Plan
- type PlanCheckLookup
- type PlanStream
- type ReadyState
- type Stream
- type VTCloneable
- type WrappedDispatchStream
Constants ¶
const DispatchIteratorType query.IteratorType = 'd'
DispatchIteratorType is the wire/registry byte for DispatchIterator. It uses lowercase 'd' so it does not collide with DatastoreIteratorType ('D').
const DispatchWrapAliasOptimizationName = "dispatch-wrap-alias"
DispatchWrapAliasOptimizationName is the registry name for the dispatch-wrap-alias optimization. Callers fetch the registered Optimizer by this name when they want to apply the wrap as a standalone step after the usual queryopt.OptimizersForRequest set.
Variables ¶
This section is empty.
Functions ¶
func AddResponseMetadata ¶ added in v1.12.0
func AddResponseMetadata(existing *v1.ResponseMeta, incoming *v1.ResponseMeta)
AddResponseMetadata adds the metadata found in the incoming metadata to the existing metadata, *modifying it in place*.
func ApplyDispatchWrap ¶ added in v1.54.0
func ApplyDispatchWrap(co query.CanonicalOutline, params queryopt.RequestParams) (query.CanonicalOutline, error)
ApplyDispatchWrap runs only the dispatch-wrap-alias optimization on the given CanonicalOutline. It exists for callers (dispatch service handlers, query plan compilers) that already ran queryopt.OptimizersForRequest and want to bolt on the dispatch wrap as a final pass.
func CaveatContextFromPlanContext ¶ added in v1.53.0
func CaveatContextFromPlanContext(pc *v1.PlanContext) map[string]any
CaveatContextFromPlanContext extracts the caveat context map from a PlanContext.
func CheckDepth ¶
func CheckDepth(ctx context.Context, req DispatchableRequest) error
CheckDepth returns ErrMaxDepth if there is insufficient depth remaining to dispatch.
func ExtractTraversalTrace ¶ added in v1.52.0
func ExtractTraversalTrace(err error) *dispatchv1.LookupDebugInfo
ExtractTraversalTrace reads the accumulated traversal trace from a MaxDepthExceeded error. Returns nil if no trace is present or the error is not a MaxDepthExceeded.
func HandleTraversalTrace ¶ added in v1.52.0
func HandleTraversalTrace(originalErr error, resourceType, relation string, resourceIds []string) error
HandleTraversalTrace takes an error and populates the fields of a LookupDebugInfo accordingly.
Designed to be called once per dispatchIter level as the error unwinds: each level prepends its frame so the outermost caller sees the full path from root to leaf. Because the trace is stored in the gRPC ErrorInfo metadata it survives a network hop.
func IsMaxDepthExceeded ¶ added in v1.52.0
IsMaxDepthExceeded returns true if err represents a max-depth-exceeded condition, whether it is an in-process MaxDepthExceededError or a gRPC status error that crossed a network boundary (identified by ERROR_REASON_MAXIMUM_DEPTH_EXCEEDED in ErrorInfo).
func NewMaxDepthExceededError ¶ added in v1.23.0
func NewMaxDepthExceededError(req DispatchableRequest) error
NewMaxDepthExceededError creates a new MaxDepthExceededError.
func NewPlanContext ¶ added in v1.53.0
func NewPlanContext(revision string, schemaHash datalayer.SchemaHash, caveatContext map[string]any, maxRecursionDepth int, datastoreLimit uint64) *v1.PlanContext
NewPlanContext builds a PlanContext proto from query.Context fields.
func NewQueryContext ¶ added in v1.53.0
func NewQueryContext( stdContext context.Context, dispatcher Dispatcher, planContext *v1.PlanContext, reader query.QueryDatastoreReader, caveatRunner *caveats.CaveatRunner, dispatchChunkSize uint16, opts ...query.ContextOption, ) *query.Context
NewQueryContext builds a query.Context whose Executor is a DispatchExecutor driven by the given PlanContext, so that dispatched sub-requests carry the same plan state. Plan-derived options (caveat context, recursion depth, datastore limit) are applied first; extra opts are appended after. dispatchChunkSize caps inputs-per-RPC for batched CheckMany operations.
func PaginationLimitFromPlanContext ¶ added in v1.53.0
func PaginationLimitFromPlanContext(pc *v1.PlanContext) *uint64
PaginationLimitFromPlanContext returns the datastore limit from a PlanContext, or nil if unset.
func PlanOperationLabel ¶ added in v1.54.0
func PlanOperationLabel(op v1.PlanOperation) string
PlanOperationLabel returns a stable, lowercase snake_case label string for the given plan operation, suitable for use as a Prometheus label value.
func QueryPathToResultPath ¶ added in v1.53.0
func QueryPathToResultPath(p *query.Path) *v1.ResultPath
QueryPathToResultPath converts a query.Path to a proto ResultPath.
Types ¶
type Check ¶
type Check interface {
// DispatchCheck submits a single check request and returns its result.
// The result should be safe to access from multiple goroutines.
DispatchCheck(ctx context.Context, req *v1.DispatchCheckRequest) (*v1.DispatchCheckResponse, error)
}
Check interface describes just the methods required to dispatch check requests.
type CloningCollectingDispatchStream ¶ added in v1.46.0
type CloningCollectingDispatchStream[T VTCloneable[T]] struct { // contains filtered or unexported fields }
CloningCollectingDispatchStream is a dispatch stream that collects results in memory, cloning when publishing to avoid mutation issues.
func NewCloningCollectingDispatchStream ¶ added in v1.46.0
func NewCloningCollectingDispatchStream[T VTCloneable[T]](ctx context.Context) *CloningCollectingDispatchStream[T]
NewCloningCollectingDispatchStream creates a new CloningCollectingDispatchStream.
func (*CloningCollectingDispatchStream[T]) Context ¶ added in v1.46.0
func (s *CloningCollectingDispatchStream[T]) Context() context.Context
func (*CloningCollectingDispatchStream[T]) Publish ¶ added in v1.46.0
func (s *CloningCollectingDispatchStream[T]) Publish(result T) error
func (*CloningCollectingDispatchStream[T]) Results ¶ added in v1.46.0
func (s *CloningCollectingDispatchStream[T]) Results() []T
type CollectingDispatchStream ¶ added in v1.8.0
type CollectingDispatchStream[T any] struct { // contains filtered or unexported fields }
CollectingDispatchStream is a dispatch stream that collects results in memory.
func NewCollectingDispatchStream ¶ added in v1.8.0
func NewCollectingDispatchStream[T any](ctx context.Context) *CollectingDispatchStream[T]
NewCollectingDispatchStream creates a new CollectingDispatchStream.
func (*CollectingDispatchStream[T]) Context ¶ added in v1.8.0
func (s *CollectingDispatchStream[T]) Context() context.Context
func (*CollectingDispatchStream[T]) Publish ¶ added in v1.8.0
func (s *CollectingDispatchStream[T]) Publish(result T) error
func (*CollectingDispatchStream[T]) Results ¶ added in v1.8.0
func (s *CollectingDispatchStream[T]) Results() []T
type CountingDispatchStream ¶ added in v1.22.0
type CountingDispatchStream[T any] struct { Stream Stream[T] // contains filtered or unexported fields }
CountingDispatchStream is a dispatch stream that counts the number of items published. It uses an internal atomic int to ensure it is thread safe.
func NewCountingDispatchStream ¶ added in v1.22.0
func NewCountingDispatchStream[T any](wrapped Stream[T]) *CountingDispatchStream[T]
func (*CountingDispatchStream[T]) Context ¶ added in v1.22.0
func (s *CountingDispatchStream[T]) Context() context.Context
func (*CountingDispatchStream[T]) Publish ¶ added in v1.22.0
func (s *CountingDispatchStream[T]) Publish(result T) error
func (*CountingDispatchStream[T]) PublishedCount ¶ added in v1.22.0
func (s *CountingDispatchStream[T]) PublishedCount() uint64
type DispatchExecutor ¶ added in v1.53.0
type DispatchExecutor struct {
// contains filtered or unexported fields
}
DispatchExecutor is an Executor that selectively dispatches sub-operations through the dispatch infrastructure at alias iterator boundaries. For non-alias iterators, it delegates locally like LocalExecutor.
func NewDispatchExecutor ¶ added in v1.53.0
func NewDispatchExecutor(dispatcher Dispatcher, planContext *v1.PlanContext, dispatchChunkSize uint16) *DispatchExecutor
NewDispatchExecutor creates a new DispatchExecutor that dispatches alias iterator operations through the given Dispatcher chain. dispatchChunkSize caps the number of inputs per CheckMany RPC; if zero, defaults to 100.
func (*DispatchExecutor) CheckManyResources ¶ added in v1.53.0
func (*DispatchExecutor) CheckManySubjects ¶ added in v1.53.0
func (*DispatchExecutor) IterResources ¶ added in v1.53.0
func (e *DispatchExecutor) IterResources(ctx *query.Context, it query.Iterator, subject query.ObjectAndRelation, filterResourceType query.ObjectType) (query.PathSeq, error)
func (*DispatchExecutor) IterSubjects ¶ added in v1.53.0
type DispatchIterator ¶ added in v1.54.0
type DispatchIterator struct {
// contains filtered or unexported fields
}
DispatchIterator is a single-child passthrough: every Plan method delegates straight to its sub-iterator. It carries no state of its own beyond a canonical key and the wrapped child, and exists as a marker node in the iterator tree that the dispatch layer can recognize and act on.
func NewDispatchIterator ¶ added in v1.54.0
func NewDispatchIterator(sub query.Iterator) *DispatchIterator
NewDispatchIterator wraps sub in a passthrough DispatchIterator.
func (*DispatchIterator) CanonicalKey ¶ added in v1.54.0
func (d *DispatchIterator) CanonicalKey() query.CanonicalKey
func (*DispatchIterator) CheckImpl ¶ added in v1.54.0
func (d *DispatchIterator) CheckImpl(ctx *query.Context, resource query.Object, subject query.ObjectAndRelation) (*query.Path, error)
func (*DispatchIterator) Clone ¶ added in v1.54.0
func (d *DispatchIterator) Clone() query.Iterator
func (*DispatchIterator) Explain ¶ added in v1.54.0
func (d *DispatchIterator) Explain() query.Explain
func (*DispatchIterator) IterResourcesImpl ¶ added in v1.54.0
func (d *DispatchIterator) IterResourcesImpl(ctx *query.Context, subject query.ObjectAndRelation, filterResourceType query.ObjectType) (query.PathSeq, error)
func (*DispatchIterator) IterSubjectsImpl ¶ added in v1.54.0
func (d *DispatchIterator) IterSubjectsImpl(ctx *query.Context, resource query.Object, filterSubjectType query.ObjectType) (query.PathSeq, error)
func (*DispatchIterator) ReplaceSubiterators ¶ added in v1.54.0
func (*DispatchIterator) ResourceType ¶ added in v1.54.0
func (d *DispatchIterator) ResourceType() ([]query.ObjectType, error)
func (*DispatchIterator) Serialize ¶ added in v1.54.0
func (d *DispatchIterator) Serialize(w io.Writer) error
func (*DispatchIterator) Subiterators ¶ added in v1.54.0
func (d *DispatchIterator) Subiterators() []query.Iterator
func (*DispatchIterator) SubjectTypes ¶ added in v1.54.0
func (d *DispatchIterator) SubjectTypes() ([]query.ObjectType, error)
type DispatchableRequest ¶ added in v1.23.0
type DispatchableRequest interface {
zerolog.LogObjectMarshaler
GetMetadata() *v1.ResolverMeta
}
DispatchableRequest is an interface for requests.
type Dispatcher ¶
type Dispatcher interface {
Check
Expand
LookupSubjects
LookupResources2
LookupResources3
Plan
// Close closes the dispatcher.
Close() error
// ReadyState returns true when dispatcher is able to respond to requests
ReadyState() ReadyState
}
Dispatcher interface describes a method for passing subchecks off to additional machines.
type Expand ¶
type Expand interface {
// DispatchExpand submits a single expand request and returns its result.
// If an error is returned, DispatchExpandResponse will still contain Metadata.
DispatchExpand(ctx context.Context, req *v1.DispatchExpandRequest) (*v1.DispatchExpandResponse, error)
}
Expand interface describes just the methods required to dispatch expand requests.
type HandlingDispatchStream ¶ added in v1.12.0
type HandlingDispatchStream[T any] struct { // contains filtered or unexported fields }
HandlingDispatchStream is a dispatch stream that executes a handler for each item published. It uses an internal mutex to ensure it is thread safe.
func (*HandlingDispatchStream[T]) Context ¶ added in v1.12.0
func (s *HandlingDispatchStream[T]) Context() context.Context
func (*HandlingDispatchStream[T]) Publish ¶ added in v1.12.0
func (s *HandlingDispatchStream[T]) Publish(result T) error
type LookupResources2 ¶ added in v1.35.0
type LookupResources2 interface {
DispatchLookupResources2(
req *v1.DispatchLookupResources2Request,
stream LookupResources2Stream,
) error
}
type LookupResources2Stream ¶ added in v1.35.0
type LookupResources2Stream = Stream[*v1.DispatchLookupResources2Response]
type LookupResources3 ¶ added in v1.45.4
type LookupResources3 interface {
DispatchLookupResources3(
req *v1.DispatchLookupResources3Request,
stream LookupResources3Stream,
) error
}
type LookupResources3Stream ¶ added in v1.45.4
type LookupResources3Stream = Stream[*v1.DispatchLookupResources3Response]
type LookupSubjects ¶ added in v1.12.0
type LookupSubjects interface {
// DispatchLookupSubjects submits a single lookup subjects request, writing its results to the specified stream.
DispatchLookupSubjects(
req *v1.DispatchLookupSubjectsRequest,
stream LookupSubjectsStream,
) error
}
LookupSubjects interface describes just the methods required to dispatch lookup subjects requests.
type LookupSubjectsStream ¶ added in v1.12.0
type LookupSubjectsStream = Stream[*v1.DispatchLookupSubjectsResponse]
LookupSubjectsStream is an alias for the stream to which found subjects will be written.
type MaxDepthExceededError ¶ added in v1.23.0
type MaxDepthExceededError struct {
// Request is the request that exceeded the maximum depth.
Request DispatchableRequest
// contains filtered or unexported fields
}
MaxDepthExceededError is an error returned when the maximum depth for dispatching has been exceeded.
func (MaxDepthExceededError) GRPCStatus ¶ added in v1.35.3
func (err MaxDepthExceededError) GRPCStatus() *status.Status
GRPCStatus implements retrieving the gRPC status for the error.
type MetricsOptions ¶ added in v1.54.0
type MetricsOptions struct {
// PrometheusSubsystem is the subsystem name prepended to every metric
// emitted by the dispatcher (e.g. "dispatch" or "dispatch_client"). It
// namespaces the dispatcher's metrics so they don't collide with those of
// other components registered to the same registry.
PrometheusSubsystem string
// PrometheusRegistry is the registry the dispatcher's metrics are registered with.
PrometheusRegistry prometheus.Registerer
}
MetricsOptions configures how a dispatcher reports Prometheus metrics.
Metrics are only emitted when both fields are set. The zero value disables metrics entirely.
func (*MetricsOptions) Enabled ¶ added in v1.54.0
func (m *MetricsOptions) Enabled() bool
Enabled reports whether metrics should be emitted.
type Plan ¶ added in v1.52.0
type Plan interface {
// DispatchQueryPlan submits a plan-based query request, writing its results to the specified stream.
DispatchQueryPlan(
req *v1.DispatchQueryPlanRequest,
stream PlanStream,
) error
// LookupPlanCheck consults any caches in the dispatcher chain for a
// Plan-Check answer matching lookup, without forcing the caller to
// serialize the iterator subtree first. Returns (path, true, nil) on a
// cache hit; (nil, false, nil) on a miss (caller should proceed to
// DispatchQueryPlan); error on a cache I/O failure. Implementations that
// hold no cache return (nil, false, nil) and should forward to any wrapped
// delegate so chained caches all get probed.
LookupPlanCheck(ctx context.Context, lookup PlanCheckLookup) (*v1.ResultPath, bool, error)
}
Plan interface describes the methods required to dispatch plan-based query planner requests.
type PlanCheckLookup ¶ added in v1.54.0
type PlanCheckLookup struct {
CanonicalKey string
Resource *core.ObjectAndRelation
Subject *core.ObjectAndRelation
PlanContext *v1.PlanContext
}
PlanCheckLookup is the small, cheap-to-construct descriptor a caller passes to LookupPlanCheck to consult dispatcher caches before paying to serialize a full iterator subtree into a DispatchQueryPlanRequest. The fields mirror the inputs the Plan-Check cache key hashes over (see keys.PlanCheckLookupKey): callers that produce identical descriptors for the same underlying problem will hit the same cache entry as the corresponding DispatchQueryPlan call.
type PlanStream ¶ added in v1.52.0
type PlanStream = Stream[*v1.DispatchQueryPlanResponse]
PlanStream is an alias for the stream to which plan results will be written.
type ReadyState ¶ added in v1.18.1
type ReadyState struct {
// Message is a human-readable status message for the current state.
Message string
// IsReady indicates whether the datastore is ready.
IsReady bool
}
ReadyState represents the ready state of the dispatcher.
type Stream ¶ added in v1.8.0
type Stream[T any] interface { // Publish publishes the result to the stream. Publish(T) error // Context returns the context for the stream. Context() context.Context }
Stream defines the interface generically matching a streaming dispatch response.
func NewHandlingDispatchStream ¶ added in v1.12.0
func NewHandlingDispatchStream[T any](ctx context.Context, processor func(result T) error) Stream[T]
NewHandlingDispatchStream returns a new handling dispatch stream.
func StreamWithContext ¶ added in v1.8.0
StreamWithContext returns the given dispatch stream, wrapped to return the given context.
func WrapGRPCStream ¶ added in v1.8.0
WrapGRPCStream wraps a gRPC result stream with a concurrent-safe dispatch stream. This is necessary because gRPC response streams are *not concurrent safe*. See: https://groups.google.com/g/grpc-io/c/aI6L6M4fzQ0?pli=1
type VTCloneable ¶ added in v1.46.0
type VTCloneable[T any] interface { CloneVT() T }
VTCloneable is an interface that requires a CloneVT method.
type WrappedDispatchStream ¶ added in v1.8.0
type WrappedDispatchStream[T any] struct { Stream Stream[T] Ctx context.Context Processor func(result T) (T, bool, error) }
WrappedDispatchStream is a dispatch stream that wraps another dispatch stream, and performs an operation on each result before puppeting back up to the parent stream.
func (*WrappedDispatchStream[T]) Context ¶ added in v1.8.0
func (s *WrappedDispatchStream[T]) Context() context.Context
func (*WrappedDispatchStream[T]) Publish ¶ added in v1.8.0
func (s *WrappedDispatchStream[T]) Publish(result T) error
Source Files
¶
Directories
¶
| Path | Synopsis |
|---|---|
|
Package combined implements a dispatcher that combines caching, redispatching and optional cluster dispatching.
|
Package combined implements a dispatcher that combines caching, redispatching and optional cluster dispatching. |
|
Code generated by github.com/ecordell/optgen.
|
Code generated by github.com/ecordell/optgen. |
|
Package mocks is a generated GoMock package.
|
Package mocks is a generated GoMock package. |