pipeline

package
v1.14.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 3, 2026 License: Apache-2.0 Imports: 17 Imported by: 0

Documentation

Overview

Package pipeline implements reverse expansion for authorization queries.

Reverse expansion answers the question: "What objects can a user access?" Given a target type/relation and a user, the pipeline traverses the authorization model graph to find all objects where the user has the specified relation.

Architecture

The pipeline constructs a network of workers, one per node in the authorization graph. Workers communicate through message passing, processing tuples and propagating results toward the query origin.

Pipeline.Expand()
       │
       ▼
  resolve() ─── builds worker graph from authorization model
       │
       ▼
  ┌─────────┐     ┌─────────┐     ┌─────────┐
  │ Worker  │◄────│ Worker  │◄────│ Worker  │
  │ (root)  │     │         │     │ (leaf)  │
  └─────────┘     └─────────┘     └─────────┘
       │
       ▼
  iter.Seq[Object] ─── results streamed to caller

Trade-offs

Message-Passing Concurrency:

  • Workers communicate via bounded buffers, avoiding shared memory
  • Natural backpressure prevents memory exhaustion
  • Graph structure determines parallelism

Streaming Results:

  • Memory usage remains constant regardless of result set size
  • Enables early termination when caller stops iteration

Cycle Handling:

  • Cycles require input deduplication to prevent infinite loops
  • Coordinated shutdown ensures all in-flight messages are processed

Usage

p := pipeline.New(graph, reader,
    pipeline.WithNumProcs(4),
    pipeline.WithChunkSize(100),
)

seq, err := p.Expand(ctx, pipeline.Spec{
    ObjectType: "document",
    Relation:   "viewer",
    User:       "user:alice",
})

for obj := range seq {
    id, err := obj.Object()
    if err != nil {
        // handle error
    }
    // id is an object ID like "document:readme"
}

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrInvalidBufferCapacity = errors.New("buffer capacity must be a positive number")
	ErrInvalidChunkSize      = errors.New("chunk size must be greater than zero")
	ErrInvalidNumProcs       = errors.New("process number must be greater than zero")
	ErrInvalidObject         = errors.New("invalid object")
	ErrInvalidUser           = errors.New("invalid user")
)

Functions

func NewValidator

NewValidator returns a validator that combines condition evaluation and type-system filtering for use with WithReaderValidator.

Types

type Config

type Config struct {
	BufferCapacity int
	ChunkSize      int
	NumProcs       int
}

Config contains pipeline tuning parameters.

func DefaultConfig

func DefaultConfig() Config

DefaultConfig returns a balanced configuration suitable for most workloads.

func (*Config) Validate

func (config *Config) Validate() error

Validate returns an error if any configuration value is out of range.

type Edge

Edge is an alias for the weighted authorization model edge type.

type Graph

Graph is an alias for the weighted authorization model graph type.

type Item

type Item = worker.Item

Item is an alias for a single result from a worker, carrying either a value or an error.

type Node

Node is an alias for the weighted authorization model node type.

type ObjectQuery

type ObjectQuery struct {
	ObjectType string
	Relation   string
	Users      []string
	Conditions []string
}

ObjectQuery describes a reverse lookup: find objects of ObjectType where any of Users holds Relation, optionally constrained by Conditions.

type ObjectReader

type ObjectReader interface {
	Read(context.Context, ObjectQuery) iter.Seq[Item]
}

ObjectReader reads relationship tuples from storage.

type Option

type Option func(*Config)

Option configures a Pipeline.

func WithBufferCapacity

func WithBufferCapacity(size int) Option

WithBufferCapacity sets the capacity of pipes between workers. A value of 0 creates unbuffered channels. Larger buffers reduce blocking but increase memory.

func WithChunkSize

func WithChunkSize(size int) Option

WithChunkSize sets how many items are batched before sending between workers. Larger chunks improve throughput but increase latency to first result.

func WithConfig

func WithConfig(c Config) Option

WithConfig replaces the entire configuration.

func WithNumProcs

func WithNumProcs(num int) Option

WithNumProcs sets goroutines per worker for parallel message processing. Higher values improve throughput but increase scheduling overhead.

type Pipeline

type Pipeline struct {
	// contains filtered or unexported fields
}

Pipeline holds the dependencies and configuration for reverse expansion queries.

func New

func New(graph *Graph, reader ObjectReader, options ...Option) *Pipeline

New creates a Pipeline with the given graph and reader.

func (*Pipeline) Expand

func (pl *Pipeline) Expand(ctx context.Context, spec Spec) (iter.Seq[Item], error)

Expand returns a streaming iterator of objects accessible to the user specified in spec. Iteration can be stopped early; the pipeline will clean up resources automatically.

type Reader

type Reader struct {
	// contains filtered or unexported fields
}

Reader implements ObjectReader by querying a storage.RelationshipTupleReader and optionally filtering results through a validator.

func NewReader

func NewReader(
	store storage.RelationshipTupleReader,
	storeID string,
	opts ...ReaderOption,
) *Reader

NewReader returns a Reader that queries store for relationship tuples.

func (*Reader) Read

func (r *Reader) Read(
	ctx context.Context,
	q ObjectQuery,
) iter.Seq[Item]

Read queries storage for tuples matching q and returns the matching object identifiers as a streaming sequence.

type ReaderOption

type ReaderOption func(o *Reader)

ReaderOption configures a Reader.

func WithReaderConsistency

func WithReaderConsistency(pref openfgav1.ConsistencyPreference) ReaderOption

WithReaderConsistency sets the consistency preference for storage reads.

func WithReaderValidator

func WithReaderValidator(fn func(*openfgav1.TupleKey) (bool, error)) ReaderOption

WithReaderValidator sets a function that filters tuples during iteration. Tuples for which fn returns false are silently skipped.

type Spec

type Spec struct {
	ObjectType string
	Relation   string
	User       string
}

Spec identifies the target of a reverse expansion query.

Directories

Path Synopsis
internal
track
Package track provides coordination primitives for detecting when a group of concurrent sources have all reached a ready state and the number of in-flight operations has dropped to zero (quiescence).
Package track provides coordination primitives for detecting when a group of concurrent sources have all reached a ready state and the number of in-flight operations has dropped to zero (quiescence).
worker
Package worker implements the concurrent processing nodes that form a reverse expansion pipeline.
Package worker implements the concurrent processing nodes that form a reverse expansion pipeline.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL