Documentation
¶
Overview ¶
Package pipeline implements reverse expansion for authorization queries.
Reverse expansion answers the question: "What objects can a user access?" Given a target type/relation and a user, the pipeline traverses the authorization model graph to find all objects where the user has the specified relation.
Architecture ¶
The pipeline constructs a network of workers, one per node in the authorization graph. Workers communicate through message passing, processing tuples and propagating results toward the query origin.
Pipeline.Expand()
│
▼
resolve() ─── builds worker graph from authorization model
│
▼
┌─────────┐ ┌─────────┐ ┌─────────┐
│ Worker │◄────│ Worker │◄────│ Worker │
│ (root) │ │ │ │ (leaf) │
└─────────┘ └─────────┘ └─────────┘
│
▼
iter.Seq[Object] ─── results streamed to caller
Trade-offs ¶
Message-Passing Concurrency:
- Workers communicate via bounded buffers, avoiding shared memory
- Natural backpressure prevents memory exhaustion
- Graph structure determines parallelism
Streaming Results:
- Memory usage remains constant regardless of result set size
- Enables early termination when caller stops iteration
Cycle Handling:
- Cycles require input deduplication to prevent infinite loops
- Coordinated shutdown ensures all in-flight messages are processed
Usage ¶
p := pipeline.New(graph, reader,
pipeline.WithNumProcs(4),
pipeline.WithChunkSize(100),
)
seq, err := p.Expand(ctx, pipeline.Spec{
ObjectType: "document",
Relation: "viewer",
User: "user:alice",
})
for obj := range seq {
id, err := obj.Object()
if err != nil {
// handle error
}
// id is an object ID like "document:readme"
}
Index ¶
- Variables
- func NewValidator(ctx context.Context, ts *typesystem.TypeSystem, obj *structpb.Struct) validation.Validator[*openfgav1.TupleKey]
- type Config
- type Edge
- type Graph
- type Item
- type Node
- type ObjectQuery
- type ObjectReader
- type Option
- type Pipeline
- type Reader
- type ReaderOption
- type Receiver
- type Spec
- type TupleKeyItemReceiver
Constants ¶
This section is empty.
Variables ¶
var ( ErrInvalidBufferCapacity = errors.New("buffer capacity must be a positive number") ErrInvalidChunkSize = errors.New("chunk size must be greater than zero") ErrInvalidNumProcs = errors.New("process number must be greater than zero") ErrInvalidObject = errors.New("invalid object") ErrInvalidUser = errors.New("invalid user") )
Functions ¶
func NewValidator ¶
func NewValidator( ctx context.Context, ts *typesystem.TypeSystem, obj *structpb.Struct, ) validation.Validator[*openfgav1.TupleKey]
NewValidator returns a validator that combines condition evaluation and type-system filtering for use with WithReaderValidator.
Types ¶
type Config ¶
Config contains pipeline tuning parameters.
func DefaultConfig ¶
func DefaultConfig() Config
DefaultConfig returns a balanced configuration suitable for most workloads.
type Edge ¶
type Edge = weightedGraph.WeightedAuthorizationModelEdge
Edge is an alias for the weighted authorization model edge type.
type Graph ¶
type Graph = weightedGraph.WeightedAuthorizationModelGraph
Graph is an alias for the weighted authorization model graph type.
type Item ¶
Item is an alias for a single result from a worker, carrying either a value or an error.
type Node ¶
type Node = weightedGraph.WeightedAuthorizationModelNode
Node is an alias for the weighted authorization model node type.
type ObjectQuery ¶
ObjectQuery describes a reverse lookup: find objects of ObjectType where any of Users holds Relation, optionally constrained by Conditions.
type ObjectReader ¶
type ObjectReader interface {
Read(context.Context, ObjectQuery) Receiver[Item]
}
ObjectReader reads relationship tuples from storage.
type Option ¶
type Option func(*Config)
Option configures a Pipeline.
func WithBufferCapacity ¶
WithBufferCapacity sets the capacity of pipes between workers. A value of 0 creates unbuffered channels. Larger buffers reduce blocking but increase memory.
func WithChunkSize ¶
WithChunkSize sets how many items are batched before sending between workers. Larger chunks improve throughput but increase latency to first result.
func WithNumProcs ¶
WithNumProcs sets goroutines per worker for parallel message processing. Higher values improve throughput but increase scheduling overhead.
type Pipeline ¶
type Pipeline struct {
// contains filtered or unexported fields
}
Pipeline holds the dependencies and configuration for reverse expansion queries.
type Reader ¶
type Reader struct {
// contains filtered or unexported fields
}
Reader implements ObjectReader by querying a storage.RelationshipTupleReader and optionally filtering results through a validator.
func NewReader ¶
func NewReader( store storage.RelationshipTupleReader, storeID string, opts ...ReaderOption, ) *Reader
NewReader returns a Reader that queries store for relationship tuples.
type ReaderOption ¶
type ReaderOption func(o *Reader)
ReaderOption configures a Reader.
func WithReaderConsistency ¶
func WithReaderConsistency(pref openfgav1.ConsistencyPreference) ReaderOption
WithReaderConsistency sets the consistency preference for storage reads.
func WithReaderValidator ¶
func WithReaderValidator(fn func(*openfgav1.TupleKey) (bool, error)) ReaderOption
WithReaderValidator sets a function that filters tuples during iteration. Tuples for which fn returns false are silently skipped.
type Receiver ¶ added in v1.14.1
Receiver is a generic streaming result interface used throughout the pipeline to consume values one at a time without buffering the entire result set.
type TupleKeyItemReceiver ¶ added in v1.14.1
type TupleKeyItemReceiver struct {
// contains filtered or unexported fields
}
TupleKeyItemReceiver adapts a storage.TupleKeyIterator into a Receiver of Item values, extracting the object identifier from each tuple key.
func (*TupleKeyItemReceiver) Close ¶ added in v1.14.1
func (r *TupleKeyItemReceiver) Close()
Close stops the underlying iterator. It is safe to call multiple times.
Directories
¶
| Path | Synopsis |
|---|---|
|
internal
|
|
|
track
Package track provides coordination primitives for detecting when a group of concurrent sources have all reached a ready state and the number of in-flight operations has dropped to zero (quiescence).
|
Package track provides coordination primitives for detecting when a group of concurrent sources have all reached a ready state and the number of in-flight operations has dropped to zero (quiescence). |
|
worker
Package worker implements the concurrent processing nodes that form a reverse expansion pipeline.
|
Package worker implements the concurrent processing nodes that form a reverse expansion pipeline. |