Documentation
¶
Overview ¶
Package pipeline implements reverse expansion for authorization queries.
Reverse expansion answers the question: "What objects can a user access?" Given a target type/relation and a user, the pipeline traverses the authorization model graph to find all objects where the user has the specified relation.
Architecture ¶
The pipeline constructs a network of workers, one per node in the authorization graph. Workers communicate through message passing, processing tuples and propagating results toward the query origin.
Builder.Build(ctx, graph, spec)
│
▼
DFS walk ─── builds worker graph from authorization model
│
▼
┌─────────┐ ┌─────────┐ ┌─────────┐
│ Worker │◄────│ Worker │◄────│ Worker │
│ (root) │ │ │ │ (leaf) │
└─────────┘ └─────────┘ └─────────┘
│
▼
Pipeline.Recv() ─── results streamed to caller
Trade-offs ¶
Message-Passing Concurrency:
- Workers communicate via bounded buffers, avoiding shared memory
- Natural backpressure prevents memory exhaustion
- Graph structure determines parallelism
Streaming Results:
- Memory usage remains constant regardless of result set size
- Enables early termination when caller calls Close
Cycle Handling:
- Cycles require input deduplication to prevent infinite loops
- Coordinated shutdown ensures all in-flight messages are processed
Usage ¶
builder, err := pipeline.NewBuilder(store,
pipeline.WithNumProcs(4),
pipeline.WithChunkSize(100),
)
if err != nil {
return err
}
p, err := builder.Build(ctx, graph, pipeline.Spec{
ObjectType: "document",
ObjectRelation: "viewer",
SubjectType: "user",
SubjectID: "alice",
})
if err != nil {
return err
}
defer p.Close()
for {
id, ok := p.Recv(ctx)
if !ok {
break
}
// id is an object ID like "document:readme"
}
if err := p.Err(); err != nil {
// handle error (e.g. storage failure, context cancellation)
return err
}
Index ¶
- Variables
- func NewValidator(ctx context.Context, ts *typesystem.TypeSystem, obj *structpb.Struct) validation.Validator[*openfgav1.TupleKey]
- type Builder
- type Config
- type Edge
- type Graph
- type Item
- type Node
- type ObjectQuery
- type ObjectStore
- type Option
- type Pipeline
- type Receiver
- type Spec
- type StoreOption
- type TupleKeyItemReceiver
- type ValidatingStore
Constants ¶
This section is empty.
Variables ¶
var ( ErrInvalidStore = errors.New("store is nil") ErrInvalidGraph = errors.New("graph is nil") ErrInvalidBufferCapacity = errors.New("buffer capacity must be a positive number") ErrInvalidChunkSize = errors.New("chunk size must be greater than zero") ErrInvalidNumProcs = errors.New("process number must be greater than zero") ErrInvalidObject = errors.New("invalid object") ErrInvalidSubject = errors.New("invalid subject") ErrUnreachable = errors.New("no path exists") )
Functions ¶
func NewValidator ¶
func NewValidator( ctx context.Context, ts *typesystem.TypeSystem, obj *structpb.Struct, ) validation.Validator[*openfgav1.TupleKey]
NewValidator returns a validator that combines condition evaluation and type-system filtering for use with WithStoreValidator.
Types ¶
type Builder ¶ added in v1.15.0
type Builder struct {
// contains filtered or unexported fields
}
Builder holds infrastructure configuration for constructing Pipelines. A single Builder can produce multiple Pipelines concurrently.
func NewBuilder ¶ added in v1.15.0
func NewBuilder(store ObjectStore, options ...Option) (*Builder, error)
NewBuilder returns a Builder that uses store for tuple reads. Options configure tuning parameters (buffer size, chunk size, concurrency).
type Config ¶
Config contains pipeline tuning parameters.
func DefaultConfig ¶
func DefaultConfig() Config
DefaultConfig returns a balanced configuration suitable for most workloads.
type Edge ¶
type Edge = weightedGraph.WeightedAuthorizationModelEdge
Edge is an alias for the weighted authorization model edge type.
type Graph ¶
type Graph = weightedGraph.WeightedAuthorizationModelGraph
Graph is an alias for the weighted authorization model graph type.
type Item ¶
Item is an alias for a single result from a worker, carrying either a value or an error.
type Node ¶
type Node = weightedGraph.WeightedAuthorizationModelNode
Node is an alias for the weighted authorization model node type.
type ObjectQuery ¶
ObjectQuery describes a reverse lookup: find objects of ObjectType where any of Users holds Relation, optionally constrained by Conditions.
type ObjectStore ¶ added in v1.15.0
type ObjectStore interface {
Read(context.Context, ObjectQuery) Receiver[Item]
}
ObjectStore reads relationship tuples from storage.
type Option ¶
type Option func(*Config)
Option configures a Builder.
func WithBufferCapacity ¶
WithBufferCapacity sets the capacity of pipes between workers. A value of 0 creates unbuffered channels. Larger buffers reduce blocking but increase memory.
func WithChunkSize ¶
WithChunkSize sets how many items are batched before sending between workers. Larger chunks improve throughput but increase latency to first result.
func WithNumProcs ¶
WithNumProcs sets goroutines per worker for parallel message processing. Higher values improve throughput but increase scheduling overhead.
type Pipeline ¶
type Pipeline struct {
// contains filtered or unexported fields
}
Pipeline holds the state for a single reverse expansion query.
func (*Pipeline) Close ¶ added in v1.15.0
func (p *Pipeline) Close()
Close cancels the pipeline context, drains any remaining output messages, and waits for all workers to finish. It is nil-safe and idempotent: subsequent calls after the first are no-ops. After workers complete, it closes and drains the error accumulator, storing the first non-context-cancellation error for retrieval via Err. Close must be called to avoid leaking goroutines.
func (*Pipeline) Err ¶ added in v1.15.0
Err returns the first error encountered during pipeline execution, or nil if the pipeline completed successfully. It should be called after Recv returns ("", false).
func (*Pipeline) Recv ¶ added in v1.15.0
Recv returns the next result from the pipeline. It is nil-safe and returns ("", false) immediately if the pipeline is nil or has already encountered an error. Otherwise it blocks until a value is available, draining any buffered values before checking for new errors or output. When an error is received from a worker, it is stored and the pipeline is automatically closed. When the output stream is exhausted without context cancellation, the pipeline is also closed. On context cancellation, Recv returns ("", false) without closing, leaving that responsibility to the caller. After Recv returns ("", false), call Err to distinguish between clean exhaustion and failure.
type Receiver ¶ added in v1.14.1
Receiver is a generic streaming result interface used throughout the pipeline to consume values one at a time without buffering the entire result set.
type StoreOption ¶ added in v1.15.0
type StoreOption func(o *ValidatingStore)
StoreOption configures a ValidatingStore.
func WithStoreConsistency ¶ added in v1.15.0
func WithStoreConsistency(pref openfgav1.ConsistencyPreference) StoreOption
WithStoreConsistency sets the consistency preference for storage reads.
func WithStoreValidator ¶ added in v1.15.0
func WithStoreValidator(fn func(*openfgav1.TupleKey) (bool, error)) StoreOption
WithStoreValidator sets a function that filters tuples during iteration. Tuples for which fn returns false are silently skipped.
type TupleKeyItemReceiver ¶ added in v1.14.1
type TupleKeyItemReceiver struct {
// contains filtered or unexported fields
}
TupleKeyItemReceiver adapts a storage.TupleKeyIterator into a Receiver of Item values, extracting the object identifier from each tuple key.
func (*TupleKeyItemReceiver) Close ¶ added in v1.14.1
func (r *TupleKeyItemReceiver) Close()
Close stops the underlying iterator. It is safe to call multiple times.
type ValidatingStore ¶ added in v1.15.0
type ValidatingStore struct {
// contains filtered or unexported fields
}
ValidatingStore implements ObjectStore by querying a storage.RelationshipTupleReader and optionally filtering results through a validator.
func NewValidatingStore ¶ added in v1.15.0
func NewValidatingStore( store storage.RelationshipTupleReader, storeID string, opts ...StoreOption, ) *ValidatingStore
NewValidatingStore returns a ValidatingStore that queries store for relationship tuples.
func (*ValidatingStore) Read ¶ added in v1.15.0
func (r *ValidatingStore) Read( ctx context.Context, q ObjectQuery, ) Receiver[Item]
Read queries storage for tuples matching q and returns the matching object identifiers as a streaming sequence.
Directories
¶
| Path | Synopsis |
|---|---|
|
internal
|
|
|
track
Package track provides coordination primitives for detecting when a group of concurrent sources have all reached a ready state and the number of in-flight operations has dropped to zero (quiescence).
|
Package track provides coordination primitives for detecting when a group of concurrent sources have all reached a ready state and the number of in-flight operations has dropped to zero (quiescence). |
|
worker
Package worker implements the concurrent processing nodes that form a reverse expansion pipeline.
|
Package worker implements the concurrent processing nodes that form a reverse expansion pipeline. |