dispatch

package
v1.54.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 18, 2026 License: Apache-2.0 Imports: 29 Imported by: 0

Documentation

Overview

Package dispatch contains logic to dispatch requests locally or to other nodes.

Index

Constants

View Source
const DispatchIteratorType query.IteratorType = 'd'

DispatchIteratorType is the wire/registry byte for DispatchIterator. It uses lowercase 'd' so it does not collide with DatastoreIteratorType ('D').

View Source
const DispatchWrapAliasOptimizationName = "dispatch-wrap-alias"

DispatchWrapAliasOptimizationName is the registry name for the dispatch-wrap-alias optimization. Callers fetch the registered Optimizer by this name when they want to apply the wrap as a standalone step after the usual queryopt.OptimizersForRequest set.

Variables

This section is empty.

Functions

func AddResponseMetadata added in v1.12.0

func AddResponseMetadata(existing *v1.ResponseMeta, incoming *v1.ResponseMeta)

AddResponseMetadata adds the metadata found in the incoming metadata to the existing metadata, *modifying it in place*.

func ApplyDispatchWrap added in v1.54.0

func ApplyDispatchWrap(co query.CanonicalOutline, params queryopt.RequestParams) (query.CanonicalOutline, error)

ApplyDispatchWrap runs only the dispatch-wrap-alias optimization on the given CanonicalOutline. It exists for callers (dispatch service handlers, query plan compilers) that already ran queryopt.OptimizersForRequest and want to bolt on the dispatch wrap as a final pass.

func CaveatContextFromPlanContext added in v1.53.0

func CaveatContextFromPlanContext(pc *v1.PlanContext) map[string]any

CaveatContextFromPlanContext extracts the caveat context map from a PlanContext.

func CheckDepth

func CheckDepth(ctx context.Context, req DispatchableRequest) error

CheckDepth returns ErrMaxDepth if there is insufficient depth remaining to dispatch.

func ExtractTraversalTrace added in v1.52.0

func ExtractTraversalTrace(err error) *dispatchv1.LookupDebugInfo

ExtractTraversalTrace reads the accumulated traversal trace from a MaxDepthExceeded error. Returns nil if no trace is present or the error is not a MaxDepthExceeded.

func HandleTraversalTrace added in v1.52.0

func HandleTraversalTrace(originalErr error, resourceType, relation string, resourceIds []string) error

HandleTraversalTrace takes an error and populates the fields of a LookupDebugInfo accordingly.

Designed to be called once per dispatchIter level as the error unwinds: each level prepends its frame so the outermost caller sees the full path from root to leaf. Because the trace is stored in the gRPC ErrorInfo metadata it survives a network hop.

func IsMaxDepthExceeded added in v1.52.0

func IsMaxDepthExceeded(err error) bool

IsMaxDepthExceeded returns true if err represents a max-depth-exceeded condition, whether it is an in-process MaxDepthExceededError or a gRPC status error that crossed a network boundary (identified by ERROR_REASON_MAXIMUM_DEPTH_EXCEEDED in ErrorInfo).

func NewMaxDepthExceededError added in v1.23.0

func NewMaxDepthExceededError(req DispatchableRequest) error

NewMaxDepthExceededError creates a new MaxDepthExceededError.

func NewPlanContext added in v1.53.0

func NewPlanContext(revision string, schemaHash datalayer.SchemaHash, caveatContext map[string]any, maxRecursionDepth int, datastoreLimit uint64) *v1.PlanContext

NewPlanContext builds a PlanContext proto from query.Context fields.

func NewQueryContext added in v1.53.0

func NewQueryContext(
	stdContext context.Context,
	dispatcher Dispatcher,
	planContext *v1.PlanContext,
	reader query.QueryDatastoreReader,
	caveatRunner *caveats.CaveatRunner,
	dispatchChunkSize uint16,
	opts ...query.ContextOption,
) *query.Context

NewQueryContext builds a query.Context whose Executor is a DispatchExecutor driven by the given PlanContext, so that dispatched sub-requests carry the same plan state. Plan-derived options (caveat context, recursion depth, datastore limit) are applied first; extra opts are appended after. dispatchChunkSize caps inputs-per-RPC for batched CheckMany operations.

func PaginationLimitFromPlanContext added in v1.53.0

func PaginationLimitFromPlanContext(pc *v1.PlanContext) *uint64

PaginationLimitFromPlanContext returns the datastore limit from a PlanContext, or nil if unset.

func PlanOperationLabel added in v1.54.0

func PlanOperationLabel(op v1.PlanOperation) string

PlanOperationLabel returns a stable, lowercase snake_case label string for the given plan operation, suitable for use as a Prometheus label value.

func QueryPathToResultPath added in v1.53.0

func QueryPathToResultPath(p *query.Path) *v1.ResultPath

QueryPathToResultPath converts a query.Path to a proto ResultPath.

Types

type Check

type Check interface {
	// DispatchCheck submits a single check request and returns its result.
	// The result should be safe to access from multiple goroutines.
	DispatchCheck(ctx context.Context, req *v1.DispatchCheckRequest) (*v1.DispatchCheckResponse, error)
}

Check interface describes just the methods required to dispatch check requests.

type CloningCollectingDispatchStream added in v1.46.0

type CloningCollectingDispatchStream[T VTCloneable[T]] struct {
	// contains filtered or unexported fields
}

CloningCollectingDispatchStream is a dispatch stream that collects results in memory, cloning when publishing to avoid mutation issues.

func NewCloningCollectingDispatchStream added in v1.46.0

func NewCloningCollectingDispatchStream[T VTCloneable[T]](ctx context.Context) *CloningCollectingDispatchStream[T]

NewCloningCollectingDispatchStream creates a new CloningCollectingDispatchStream.

func (*CloningCollectingDispatchStream[T]) Context added in v1.46.0

func (*CloningCollectingDispatchStream[T]) Publish added in v1.46.0

func (s *CloningCollectingDispatchStream[T]) Publish(result T) error

func (*CloningCollectingDispatchStream[T]) Results added in v1.46.0

func (s *CloningCollectingDispatchStream[T]) Results() []T

type CollectingDispatchStream added in v1.8.0

type CollectingDispatchStream[T any] struct {
	// contains filtered or unexported fields
}

CollectingDispatchStream is a dispatch stream that collects results in memory.

func NewCollectingDispatchStream added in v1.8.0

func NewCollectingDispatchStream[T any](ctx context.Context) *CollectingDispatchStream[T]

NewCollectingDispatchStream creates a new CollectingDispatchStream.

func (*CollectingDispatchStream[T]) Context added in v1.8.0

func (s *CollectingDispatchStream[T]) Context() context.Context

func (*CollectingDispatchStream[T]) Publish added in v1.8.0

func (s *CollectingDispatchStream[T]) Publish(result T) error

func (*CollectingDispatchStream[T]) Results added in v1.8.0

func (s *CollectingDispatchStream[T]) Results() []T

type CountingDispatchStream added in v1.22.0

type CountingDispatchStream[T any] struct {
	Stream Stream[T]
	// contains filtered or unexported fields
}

CountingDispatchStream is a dispatch stream that counts the number of items published. It uses an internal atomic int to ensure it is thread safe.

func NewCountingDispatchStream added in v1.22.0

func NewCountingDispatchStream[T any](wrapped Stream[T]) *CountingDispatchStream[T]

func (*CountingDispatchStream[T]) Context added in v1.22.0

func (s *CountingDispatchStream[T]) Context() context.Context

func (*CountingDispatchStream[T]) Publish added in v1.22.0

func (s *CountingDispatchStream[T]) Publish(result T) error

func (*CountingDispatchStream[T]) PublishedCount added in v1.22.0

func (s *CountingDispatchStream[T]) PublishedCount() uint64

type DispatchExecutor added in v1.53.0

type DispatchExecutor struct {
	// contains filtered or unexported fields
}

DispatchExecutor is an Executor that selectively dispatches sub-operations through the dispatch infrastructure at alias iterator boundaries. For non-alias iterators, it delegates locally like LocalExecutor.

func NewDispatchExecutor added in v1.53.0

func NewDispatchExecutor(dispatcher Dispatcher, planContext *v1.PlanContext, dispatchChunkSize uint16) *DispatchExecutor

NewDispatchExecutor creates a new DispatchExecutor that dispatches alias iterator operations through the given Dispatcher chain. dispatchChunkSize caps the number of inputs per CheckMany RPC; if zero, defaults to 100.

func (*DispatchExecutor) Check added in v1.53.0

func (e *DispatchExecutor) Check(ctx *query.Context, it query.Iterator, resource query.Object, subject query.ObjectAndRelation) (*query.Path, error)

func (*DispatchExecutor) CheckManyResources added in v1.53.0

func (e *DispatchExecutor) CheckManyResources(ctx *query.Context, it query.Iterator, resources []query.Object, subject query.ObjectAndRelation) ([]*query.Path, error)

func (*DispatchExecutor) CheckManySubjects added in v1.53.0

func (e *DispatchExecutor) CheckManySubjects(ctx *query.Context, it query.Iterator, resource query.Object, subjects []query.ObjectAndRelation) ([]*query.Path, error)

func (*DispatchExecutor) IterResources added in v1.53.0

func (e *DispatchExecutor) IterResources(ctx *query.Context, it query.Iterator, subject query.ObjectAndRelation, filterResourceType query.ObjectType) (query.PathSeq, error)

func (*DispatchExecutor) IterSubjects added in v1.53.0

func (e *DispatchExecutor) IterSubjects(ctx *query.Context, it query.Iterator, resource query.Object, filterSubjectType query.ObjectType) (query.PathSeq, error)

type DispatchIterator added in v1.54.0

type DispatchIterator struct {
	// contains filtered or unexported fields
}

DispatchIterator is a single-child passthrough: every Plan method delegates straight to its sub-iterator. It carries no state of its own beyond a canonical key and the wrapped child, and exists as a marker node in the iterator tree that the dispatch layer can recognize and act on.

func NewDispatchIterator added in v1.54.0

func NewDispatchIterator(sub query.Iterator) *DispatchIterator

NewDispatchIterator wraps sub in a passthrough DispatchIterator.

func (*DispatchIterator) CanonicalKey added in v1.54.0

func (d *DispatchIterator) CanonicalKey() query.CanonicalKey

func (*DispatchIterator) CheckImpl added in v1.54.0

func (d *DispatchIterator) CheckImpl(ctx *query.Context, resource query.Object, subject query.ObjectAndRelation) (*query.Path, error)

func (*DispatchIterator) Clone added in v1.54.0

func (d *DispatchIterator) Clone() query.Iterator

func (*DispatchIterator) Explain added in v1.54.0

func (d *DispatchIterator) Explain() query.Explain

func (*DispatchIterator) IterResourcesImpl added in v1.54.0

func (d *DispatchIterator) IterResourcesImpl(ctx *query.Context, subject query.ObjectAndRelation, filterResourceType query.ObjectType) (query.PathSeq, error)

func (*DispatchIterator) IterSubjectsImpl added in v1.54.0

func (d *DispatchIterator) IterSubjectsImpl(ctx *query.Context, resource query.Object, filterSubjectType query.ObjectType) (query.PathSeq, error)

func (*DispatchIterator) ReplaceSubiterators added in v1.54.0

func (d *DispatchIterator) ReplaceSubiterators(newSubs []query.Iterator) (query.Iterator, error)

func (*DispatchIterator) ResourceType added in v1.54.0

func (d *DispatchIterator) ResourceType() ([]query.ObjectType, error)

func (*DispatchIterator) Serialize added in v1.54.0

func (d *DispatchIterator) Serialize(w io.Writer) error

func (*DispatchIterator) Subiterators added in v1.54.0

func (d *DispatchIterator) Subiterators() []query.Iterator

func (*DispatchIterator) SubjectTypes added in v1.54.0

func (d *DispatchIterator) SubjectTypes() ([]query.ObjectType, error)

type DispatchableRequest added in v1.23.0

type DispatchableRequest interface {
	zerolog.LogObjectMarshaler

	GetMetadata() *v1.ResolverMeta
}

DispatchableRequest is an interface for requests.

type Dispatcher

type Dispatcher interface {
	Check
	Expand
	LookupSubjects
	LookupResources2
	LookupResources3
	Plan

	// Close closes the dispatcher.
	Close() error

	// ReadyState returns true when dispatcher is able to respond to requests
	ReadyState() ReadyState
}

Dispatcher interface describes a method for passing subchecks off to additional machines.

type Expand

type Expand interface {
	// DispatchExpand submits a single expand request and returns its result.
	// If an error is returned, DispatchExpandResponse will still contain Metadata.
	DispatchExpand(ctx context.Context, req *v1.DispatchExpandRequest) (*v1.DispatchExpandResponse, error)
}

Expand interface describes just the methods required to dispatch expand requests.

type HandlingDispatchStream added in v1.12.0

type HandlingDispatchStream[T any] struct {
	// contains filtered or unexported fields
}

HandlingDispatchStream is a dispatch stream that executes a handler for each item published. It uses an internal mutex to ensure it is thread safe.

func (*HandlingDispatchStream[T]) Context added in v1.12.0

func (s *HandlingDispatchStream[T]) Context() context.Context

func (*HandlingDispatchStream[T]) Publish added in v1.12.0

func (s *HandlingDispatchStream[T]) Publish(result T) error

type LookupResources2 added in v1.35.0

type LookupResources2 interface {
	DispatchLookupResources2(
		req *v1.DispatchLookupResources2Request,
		stream LookupResources2Stream,
	) error
}

type LookupResources2Stream added in v1.35.0

type LookupResources2Stream = Stream[*v1.DispatchLookupResources2Response]

type LookupResources3 added in v1.45.4

type LookupResources3 interface {
	DispatchLookupResources3(
		req *v1.DispatchLookupResources3Request,
		stream LookupResources3Stream,
	) error
}

type LookupResources3Stream added in v1.45.4

type LookupResources3Stream = Stream[*v1.DispatchLookupResources3Response]

type LookupSubjects added in v1.12.0

type LookupSubjects interface {
	// DispatchLookupSubjects submits a single lookup subjects request, writing its results to the specified stream.
	DispatchLookupSubjects(
		req *v1.DispatchLookupSubjectsRequest,
		stream LookupSubjectsStream,
	) error
}

LookupSubjects interface describes just the methods required to dispatch lookup subjects requests.

type LookupSubjectsStream added in v1.12.0

type LookupSubjectsStream = Stream[*v1.DispatchLookupSubjectsResponse]

LookupSubjectsStream is an alias for the stream to which found subjects will be written.

type MaxDepthExceededError added in v1.23.0

type MaxDepthExceededError struct {

	// Request is the request that exceeded the maximum depth.
	Request DispatchableRequest
	// contains filtered or unexported fields
}

MaxDepthExceededError is an error returned when the maximum depth for dispatching has been exceeded.

func (MaxDepthExceededError) GRPCStatus added in v1.35.3

func (err MaxDepthExceededError) GRPCStatus() *status.Status

GRPCStatus implements retrieving the gRPC status for the error.

type MetricsOptions added in v1.54.0

type MetricsOptions struct {
	// PrometheusSubsystem is the subsystem name prepended to every metric
	// emitted by the dispatcher (e.g. "dispatch" or "dispatch_client"). It
	// namespaces the dispatcher's metrics so they don't collide with those of
	// other components registered to the same registry.
	PrometheusSubsystem string

	// PrometheusRegistry is the registry the dispatcher's metrics are registered with.
	PrometheusRegistry prometheus.Registerer
}

MetricsOptions configures how a dispatcher reports Prometheus metrics.

Metrics are only emitted when both fields are set. The zero value disables metrics entirely.

func (*MetricsOptions) Enabled added in v1.54.0

func (m *MetricsOptions) Enabled() bool

Enabled reports whether metrics should be emitted.

type Plan added in v1.52.0

type Plan interface {
	// DispatchQueryPlan submits a plan-based query request, writing its results to the specified stream.
	DispatchQueryPlan(
		req *v1.DispatchQueryPlanRequest,
		stream PlanStream,
	) error

	// LookupPlanCheck consults any caches in the dispatcher chain for a
	// Plan-Check answer matching lookup, without forcing the caller to
	// serialize the iterator subtree first. Returns (path, true, nil) on a
	// cache hit; (nil, false, nil) on a miss (caller should proceed to
	// DispatchQueryPlan); error on a cache I/O failure. Implementations that
	// hold no cache return (nil, false, nil) and should forward to any wrapped
	// delegate so chained caches all get probed.
	LookupPlanCheck(ctx context.Context, lookup PlanCheckLookup) (*v1.ResultPath, bool, error)
}

Plan interface describes the methods required to dispatch plan-based query planner requests.

type PlanCheckLookup added in v1.54.0

type PlanCheckLookup struct {
	CanonicalKey string
	Resource     *core.ObjectAndRelation
	Subject      *core.ObjectAndRelation
	PlanContext  *v1.PlanContext
}

PlanCheckLookup is the small, cheap-to-construct descriptor a caller passes to LookupPlanCheck to consult dispatcher caches before paying to serialize a full iterator subtree into a DispatchQueryPlanRequest. The fields mirror the inputs the Plan-Check cache key hashes over (see keys.PlanCheckLookupKey): callers that produce identical descriptors for the same underlying problem will hit the same cache entry as the corresponding DispatchQueryPlan call.

type PlanStream added in v1.52.0

type PlanStream = Stream[*v1.DispatchQueryPlanResponse]

PlanStream is an alias for the stream to which plan results will be written.

type ReadyState added in v1.18.1

type ReadyState struct {
	// Message is a human-readable status message for the current state.
	Message string

	// IsReady indicates whether the datastore is ready.
	IsReady bool
}

ReadyState represents the ready state of the dispatcher.

type Stream added in v1.8.0

type Stream[T any] interface {
	// Publish publishes the result to the stream.
	Publish(T) error

	// Context returns the context for the stream.
	Context() context.Context
}

Stream defines the interface generically matching a streaming dispatch response.

func NewHandlingDispatchStream added in v1.12.0

func NewHandlingDispatchStream[T any](ctx context.Context, processor func(result T) error) Stream[T]

NewHandlingDispatchStream returns a new handling dispatch stream.

func StreamWithContext added in v1.8.0

func StreamWithContext[T any](context context.Context, stream Stream[T]) Stream[T]

StreamWithContext returns the given dispatch stream, wrapped to return the given context.

func WrapGRPCStream added in v1.8.0

func WrapGRPCStream[R any, S grpcStream[R]](grpcStream S) Stream[R]

WrapGRPCStream wraps a gRPC result stream with a concurrent-safe dispatch stream. This is necessary because gRPC response streams are *not concurrent safe*. See: https://groups.google.com/g/grpc-io/c/aI6L6M4fzQ0?pli=1

type VTCloneable added in v1.46.0

type VTCloneable[T any] interface {
	CloneVT() T
}

VTCloneable is an interface that requires a CloneVT method.

type WrappedDispatchStream added in v1.8.0

type WrappedDispatchStream[T any] struct {
	Stream    Stream[T]
	Ctx       context.Context
	Processor func(result T) (T, bool, error)
}

WrappedDispatchStream is a dispatch stream that wraps another dispatch stream, and performs an operation on each result before puppeting back up to the parent stream.

func (*WrappedDispatchStream[T]) Context added in v1.8.0

func (s *WrappedDispatchStream[T]) Context() context.Context

func (*WrappedDispatchStream[T]) Publish added in v1.8.0

func (s *WrappedDispatchStream[T]) Publish(result T) error

Directories

Path Synopsis
Package combined implements a dispatcher that combines caching, redispatching and optional cluster dispatching.
Package combined implements a dispatcher that combines caching, redispatching and optional cluster dispatching.
Code generated by github.com/ecordell/optgen.
Code generated by github.com/ecordell/optgen.
Package mocks is a generated GoMock package.
Package mocks is a generated GoMock package.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL