federation

package
v1.0.0-alpha.12 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 6, 2026 License: MIT Imports: 15 Imported by: 0

Documentation

Overview

Package federation provides merge policy logic for the federation processor. It applies namespace sovereignty, edge union, and provenance tracking to incoming graph events for cross-service federation.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func NewComponent

func NewComponent(rawConfig json.RawMessage, deps component.Dependencies) (component.Discoverable, error)

NewComponent creates a federation processor component from raw JSON config. Follows the semstreams factory pattern: unmarshal → apply defaults → validate.

func Register

func Register(registry RegistryInterface) error

Register registers the federation processor component with the given registry. Returns an error if registry is nil or registration fails.

Types

type Component

type Component struct {
	// contains filtered or unexported fields
}

Component implements the federation processor. It subscribes to incoming EventPayload messages on a JetStream subject, applies federation merge policy via Merger, and republishes the filtered/merged event to the output subject.

func (*Component) ConfigSchema

func (c *Component) ConfigSchema() component.ConfigSchema

ConfigSchema returns the pre-generated configuration schema.

func (*Component) DataFlow

func (c *Component) DataFlow() component.FlowMetrics

DataFlow returns current message flow metrics.

func (*Component) Health

func (c *Component) Health() component.HealthStatus

Health returns the current health status of the component.

func (*Component) Initialize

func (c *Component) Initialize() error

Initialize performs pre-start initialization.

func (*Component) InputPorts

func (c *Component) InputPorts() []component.Port

InputPorts returns the input port definitions.

func (*Component) Meta

func (c *Component) Meta() component.Metadata

Meta returns component metadata.

func (*Component) OutputPorts

func (c *Component) OutputPorts() []component.Port

OutputPorts returns the output port definitions.

func (*Component) Start

func (c *Component) Start(ctx context.Context) error

Start subscribes to the input JetStream subject and begins processing EventPayload messages in the background.

func (*Component) Stop

func (c *Component) Stop(_ time.Duration) error

Stop cancels the background goroutine and marks the component as stopped.

type Config

type Config struct {
	// LocalNamespace is the org namespace this processor is authoritative for
	// (e.g. "acme"). Entities from other orgs are rejected unless they are
	// in the "public" namespace.
	LocalNamespace string `` /* 151-byte string literal not displayed */

	// MergePolicy controls the entity merge strategy. Valid values: "standard".
	MergePolicy MergePolicy `json:"merge_policy" schema:"type:string,description:Entity merge strategy,category:basic,enum:standard,default:standard"`

	// InputSubject is the JetStream subject this processor consumes events from.
	InputSubject string `` /* 149-byte string literal not displayed */

	// OutputSubject is the JetStream subject merged events are published to.
	OutputSubject string `` /* 148-byte string literal not displayed */

	// InputStream is the JetStream stream name for the input subject.
	InputStream string `json:"input_stream" schema:"type:string,description:Input JetStream stream name,default:FEDERATION_EVENTS,category:basic"`

	// OutputStream is the JetStream stream name for the output subject.
	OutputStream string `json:"output_stream" schema:"type:string,description:Output JetStream stream name,default:FEDERATION_MERGED,category:basic"`

	// Ports is the port configuration for inputs and outputs.
	Ports *component.PortConfig `json:"ports,omitempty" schema:"type:ports,description:Port configuration for inputs and outputs,category:basic"`
}

Config holds configuration for the federation processor.

func DefaultConfig

func DefaultConfig() Config

DefaultConfig returns a Config with sensible defaults. LocalNamespace is set to "public" as a safe starting point; callers should override it with the actual org namespace.

func (Config) Validate

func (c Config) Validate() error

Validate checks that Config contains all required and valid field values.

type MergePolicy

type MergePolicy string

MergePolicy controls how incoming entities are merged against the local graph.

const (
	// MergePolicyStandard applies the standard merge rules:
	//   - public.* merges unconditionally
	//   - {org}.* merges only if org matches LocalNamespace
	//   - cross-org entities are rejected
	//   - edges use union semantics
	//   - provenance is always appended
	MergePolicyStandard MergePolicy = "standard"
)

type Merger

type Merger struct {
	// contains filtered or unexported fields
}

Merger applies federation merge policy to Entities and Events. It is safe for concurrent use — all state is derived from the immutable Config.

func NewMerger

func NewMerger(cfg Config) *Merger

NewMerger creates a Merger from the given Config. Callers should validate the Config before calling NewMerger.

func (*Merger) ApplyEvent

func (m *Merger) ApplyEvent(event *fedtypes.Event, existing map[string]*fedtypes.Entity) (*fedtypes.Event, error)

ApplyEvent applies federation merge policy to an incoming Event.

For SEED and DELTA events, each entity is run through MergeEntity:

  • Accepted entities are included in the output event.
  • Rejected entities (cross-org) are silently filtered.

For RETRACT events, each entity ID is checked:

  • IDs belonging to the local namespace or public namespace pass through.
  • Cross-org retraction IDs are filtered (cannot retract another org's entities).

For HEARTBEAT events, the event passes through unchanged.

existing is the caller's current entity store (may be nil). When non-nil, it is used to look up the existing entity for edge-union and provenance-append. The map is read-only — ApplyEvent never modifies it.

func (*Merger) MergeEntity

func (m *Merger) MergeEntity(incoming fedtypes.Entity, existing *fedtypes.Entity) (*fedtypes.Entity, error)

MergeEntity applies federation merge policy to a single incoming entity.

Rules:

  • public.* entities are accepted and merged unconditionally
  • {org}.* entities are accepted only when org == cfg.LocalNamespace
  • All other entities are rejected (cross-org overwrite prevented)

existing may be nil for a brand-new entity. On success returns the merged entity and nil error. On rejection returns nil and a descriptive error.

type RegistryInterface

type RegistryInterface interface {
	RegisterWithConfig(component.RegistrationConfig) error
}

RegistryInterface is the minimal interface required for component registration. The concrete *component.Registry satisfies this interface.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL