pipeline

package
v1.15.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 27, 2026 License: Apache-2.0 Imports: 20 Imported by: 0

Documentation

Overview

Package pipeline implements reverse expansion for authorization queries.

Reverse expansion answers the question: "What objects can a user access?" Given a target type/relation and a user, the pipeline traverses the authorization model graph to find all objects where the user has the specified relation.

Architecture

The pipeline constructs a network of workers, one per node in the authorization graph. Workers communicate through message passing, processing tuples and propagating results toward the query origin.

Builder.Build(ctx, graph, spec)
       │
       ▼
  DFS walk ─── builds worker graph from authorization model
       │
       ▼
  ┌─────────┐     ┌─────────┐     ┌─────────┐
  │ Worker  │◄────│ Worker  │◄────│ Worker  │
  │ (root)  │     │         │     │ (leaf)  │
  └─────────┘     └─────────┘     └─────────┘
       │
       ▼
  Pipeline.Recv() ─── results streamed to caller

Trade-offs

Message-Passing Concurrency:

  • Workers communicate via bounded buffers, avoiding shared memory
  • Natural backpressure prevents memory exhaustion
  • Graph structure determines parallelism

Streaming Results:

  • Memory usage remains constant regardless of result set size
  • Enables early termination when caller calls Close

Cycle Handling:

  • Cycles require input deduplication to prevent infinite loops
  • Coordinated shutdown ensures all in-flight messages are processed

Usage

builder, err := pipeline.NewBuilder(store,
    pipeline.WithNumProcs(4),
    pipeline.WithChunkSize(100),
)
if err != nil {
    return err
}

p, err := builder.Build(ctx, graph, pipeline.Spec{
    ObjectType:     "document",
    ObjectRelation: "viewer",
    SubjectType:    "user",
    SubjectID:      "alice",
})
if err != nil {
    return err
}
defer p.Close()

for {
    id, ok := p.Recv(ctx)
    if !ok {
        break
    }
    // id is an object ID like "document:readme"
}

if err := p.Err(); err != nil {
    // handle error (e.g. storage failure, context cancellation)
    return err
}

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrInvalidStore          = errors.New("store is nil")
	ErrInvalidGraph          = errors.New("graph is nil")
	ErrInvalidBufferCapacity = errors.New("buffer capacity must be a positive number")
	ErrInvalidChunkSize      = errors.New("chunk size must be greater than zero")
	ErrInvalidNumProcs       = errors.New("process number must be greater than zero")
	ErrInvalidObject         = errors.New("invalid object")
	ErrInvalidSubject        = errors.New("invalid subject")
	ErrUnreachable           = errors.New("no path exists")
)

Functions

func NewValidator

NewValidator returns a validator that combines condition evaluation and type-system filtering for use with WithStoreValidator.

Types

type Builder added in v1.15.0

type Builder struct {
	// contains filtered or unexported fields
}

Builder holds infrastructure configuration for constructing Pipelines. A single Builder can produce multiple Pipelines concurrently.

func NewBuilder added in v1.15.0

func NewBuilder(store ObjectStore, options ...Option) (*Builder, error)

NewBuilder returns a Builder that uses store for tuple reads. Options configure tuning parameters (buffer size, chunk size, concurrency).

func (*Builder) Build added in v1.15.0

func (b *Builder) Build(
	ctx context.Context,
	graph *Graph,
	spec Spec,
) (*Pipeline, error)

Build constructs a Pipeline for the given graph and spec, starts all workers, injects the subject identifier, and closes the input stream. The caller must call Close on the returned Pipeline to avoid leaking goroutines.

type Config

type Config struct {
	BufferCapacity int
	ChunkSize      int
	NumProcs       int
}

Config contains pipeline tuning parameters.

func DefaultConfig

func DefaultConfig() Config

DefaultConfig returns a balanced configuration suitable for most workloads.

func (*Config) Validate

func (config *Config) Validate() error

Validate returns an error if any configuration value is out of range.

type Edge

Edge is an alias for the weighted authorization model edge type.

type Graph

Graph is an alias for the weighted authorization model graph type.

type Item

type Item = worker.Item

Item is an alias for a single result from a worker, carrying either a value or an error.

type Node

Node is an alias for the weighted authorization model node type.

type ObjectQuery

type ObjectQuery struct {
	ObjectType string
	Relation   string
	Users      []string
	Conditions []string
}

ObjectQuery describes a reverse lookup: find objects of ObjectType where any of Users holds Relation, optionally constrained by Conditions.

type ObjectStore added in v1.15.0

type ObjectStore interface {
	Read(context.Context, ObjectQuery) Receiver[Item]
}

ObjectStore reads relationship tuples from storage.

type Option

type Option func(*Config)

Option configures a Builder.

func WithBufferCapacity

func WithBufferCapacity(size int) Option

WithBufferCapacity sets the capacity of pipes between workers. A value of 0 creates unbuffered channels. Larger buffers reduce blocking but increase memory.

func WithChunkSize

func WithChunkSize(size int) Option

WithChunkSize sets how many items are batched before sending between workers. Larger chunks improve throughput but increase latency to first result.

func WithConfig

func WithConfig(c Config) Option

WithConfig replaces the entire configuration.

func WithNumProcs

func WithNumProcs(num int) Option

WithNumProcs sets goroutines per worker for parallel message processing. Higher values improve throughput but increase scheduling overhead.

type Pipeline

type Pipeline struct {
	// contains filtered or unexported fields
}

Pipeline holds the state for a single reverse expansion query.

func (*Pipeline) Close added in v1.15.0

func (p *Pipeline) Close()

Close cancels the pipeline context, drains any remaining output messages, and waits for all workers to finish. It is nil-safe and idempotent: subsequent calls after the first are no-ops. After workers complete, it closes and drains the error accumulator, storing the first non-context-cancellation error for retrieval via Err. Close must be called to avoid leaking goroutines.

func (*Pipeline) Err added in v1.15.0

func (p *Pipeline) Err() error

Err returns the first error encountered during pipeline execution, or nil if the pipeline completed successfully. It should be called after Recv returns ("", false).

func (*Pipeline) Recv added in v1.15.0

func (p *Pipeline) Recv(ctx context.Context) (string, bool)

Recv returns the next result from the pipeline. It is nil-safe and returns ("", false) immediately if the pipeline is nil or has already encountered an error. Otherwise it blocks until a value is available, draining any buffered values before checking for new errors or output. When an error is received from a worker, it is stored and the pipeline is automatically closed. When the output stream is exhausted without context cancellation, the pipeline is also closed. On context cancellation, Recv returns ("", false) without closing, leaving that responsibility to the caller. After Recv returns ("", false), call Err to distinguish between clean exhaustion and failure.

type Receiver added in v1.14.1

type Receiver[T any] = worker.Receiver[T]

Receiver is a generic streaming result interface used throughout the pipeline to consume values one at a time without buffering the entire result set.

type Spec

type Spec struct {
	ObjectType     string
	ObjectRelation string
	SubjectType    string
	SubjectID      string
}

Spec identifies the target of a reverse expansion query.

type StoreOption added in v1.15.0

type StoreOption func(o *ValidatingStore)

StoreOption configures a ValidatingStore.

func WithStoreConsistency added in v1.15.0

func WithStoreConsistency(pref openfgav1.ConsistencyPreference) StoreOption

WithStoreConsistency sets the consistency preference for storage reads.

func WithStoreValidator added in v1.15.0

func WithStoreValidator(fn func(*openfgav1.TupleKey) (bool, error)) StoreOption

WithStoreValidator sets a function that filters tuples during iteration. Tuples for which fn returns false are silently skipped.

type TupleKeyItemReceiver added in v1.14.1

type TupleKeyItemReceiver struct {
	// contains filtered or unexported fields
}

TupleKeyItemReceiver adapts a storage.TupleKeyIterator into a Receiver of Item values, extracting the object identifier from each tuple key.

func (*TupleKeyItemReceiver) Close added in v1.14.1

func (r *TupleKeyItemReceiver) Close()

Close stops the underlying iterator. It is safe to call multiple times.

func (*TupleKeyItemReceiver) Recv added in v1.14.1

func (r *TupleKeyItemReceiver) Recv(ctx context.Context) (Item, bool)

Recv returns the next object identifier from the underlying iterator. It returns false when the iterator is exhausted, closed, or the context is cancelled.

type ValidatingStore added in v1.15.0

type ValidatingStore struct {
	// contains filtered or unexported fields
}

ValidatingStore implements ObjectStore by querying a storage.RelationshipTupleReader and optionally filtering results through a validator.

func NewValidatingStore added in v1.15.0

func NewValidatingStore(
	store storage.RelationshipTupleReader,
	storeID string,
	opts ...StoreOption,
) *ValidatingStore

NewValidatingStore returns a ValidatingStore that queries store for relationship tuples.

func (*ValidatingStore) Read added in v1.15.0

Read queries storage for tuples matching q and returns the matching object identifiers as a streaming sequence.

Directories

Path Synopsis
internal
track
Package track provides coordination primitives for detecting when a group of concurrent sources have all reached a ready state and the number of in-flight operations has dropped to zero (quiescence).
Package track provides coordination primitives for detecting when a group of concurrent sources have all reached a ready state and the number of in-flight operations has dropped to zero (quiescence).
worker
Package worker implements the concurrent processing nodes that form a reverse expansion pipeline.
Package worker implements the concurrent processing nodes that form a reverse expansion pipeline.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL