Documentation
¶
Overview ¶
Package federation provides merge policy logic for the federation processor. It applies namespace sovereignty, edge union, and provenance tracking to incoming graph events for cross-service federation.
Index ¶
- func NewComponent(rawConfig json.RawMessage, deps component.Dependencies) (component.Discoverable, error)
- func Register(registry RegistryInterface) error
- type Component
- func (c *Component) ConfigSchema() component.ConfigSchema
- func (c *Component) DataFlow() component.FlowMetrics
- func (c *Component) Health() component.HealthStatus
- func (c *Component) Initialize() error
- func (c *Component) InputPorts() []component.Port
- func (c *Component) Meta() component.Metadata
- func (c *Component) OutputPorts() []component.Port
- func (c *Component) Start(ctx context.Context) error
- func (c *Component) Stop(_ time.Duration) error
- type Config
- type MergePolicy
- type Merger
- type RegistryInterface
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func NewComponent ¶
func NewComponent(rawConfig json.RawMessage, deps component.Dependencies) (component.Discoverable, error)
NewComponent creates a federation processor component from raw JSON config. Follows the semstreams factory pattern: unmarshal → apply defaults → validate.
func Register ¶
func Register(registry RegistryInterface) error
Register registers the federation processor component with the given registry. Returns an error if registry is nil or registration fails.
Types ¶
type Component ¶
type Component struct {
// contains filtered or unexported fields
}
Component implements the federation processor. It subscribes to incoming EventPayload messages on a JetStream subject, applies federation merge policy via Merger, and republishes the filtered/merged event to the output subject.
func (*Component) ConfigSchema ¶
func (c *Component) ConfigSchema() component.ConfigSchema
ConfigSchema returns the pre-generated configuration schema.
func (*Component) DataFlow ¶
func (c *Component) DataFlow() component.FlowMetrics
DataFlow returns current message flow metrics.
func (*Component) Health ¶
func (c *Component) Health() component.HealthStatus
Health returns the current health status of the component.
func (*Component) Initialize ¶
Initialize performs pre-start initialization.
func (*Component) InputPorts ¶
InputPorts returns the input port definitions.
func (*Component) OutputPorts ¶
OutputPorts returns the output port definitions.
type Config ¶
type Config struct {
// LocalNamespace is the org namespace this processor is authoritative for
// (e.g. "acme"). Entities from other orgs are rejected unless they are
// in the "public" namespace.
LocalNamespace string `` /* 151-byte string literal not displayed */
// MergePolicy controls the entity merge strategy. Valid values: "standard".
MergePolicy MergePolicy `json:"merge_policy" schema:"type:string,description:Entity merge strategy,category:basic,enum:standard,default:standard"`
// InputSubject is the JetStream subject this processor consumes events from.
InputSubject string `` /* 149-byte string literal not displayed */
// OutputSubject is the JetStream subject merged events are published to.
OutputSubject string `` /* 148-byte string literal not displayed */
// InputStream is the JetStream stream name for the input subject.
InputStream string `json:"input_stream" schema:"type:string,description:Input JetStream stream name,default:FEDERATION_EVENTS,category:basic"`
// OutputStream is the JetStream stream name for the output subject.
OutputStream string `json:"output_stream" schema:"type:string,description:Output JetStream stream name,default:FEDERATION_MERGED,category:basic"`
// Ports is the port configuration for inputs and outputs.
Ports *component.PortConfig `json:"ports,omitempty" schema:"type:ports,description:Port configuration for inputs and outputs,category:basic"`
}
Config holds configuration for the federation processor.
func DefaultConfig ¶
func DefaultConfig() Config
DefaultConfig returns a Config with sensible defaults. LocalNamespace is set to "public" as a safe starting point; callers should override it with the actual org namespace.
type MergePolicy ¶
type MergePolicy string
MergePolicy controls how incoming entities are merged against the local graph.
const ( // MergePolicyStandard applies the standard merge rules: // - public.* merges unconditionally // - {org}.* merges only if org matches LocalNamespace // - cross-org entities are rejected // - edges use union semantics // - provenance is always appended MergePolicyStandard MergePolicy = "standard" )
type Merger ¶
type Merger struct {
// contains filtered or unexported fields
}
Merger applies federation merge policy to Entities and Events. It is safe for concurrent use — all state is derived from the immutable Config.
func NewMerger ¶
NewMerger creates a Merger from the given Config. Callers should validate the Config before calling NewMerger.
func (*Merger) ApplyEvent ¶
func (m *Merger) ApplyEvent(event *fedtypes.Event, existing map[string]*fedtypes.Entity) (*fedtypes.Event, error)
ApplyEvent applies federation merge policy to an incoming Event.
For SEED and DELTA events, each entity is run through MergeEntity:
- Accepted entities are included in the output event.
- Rejected entities (cross-org) are silently filtered.
For RETRACT events, each entity ID is checked:
- IDs belonging to the local namespace or public namespace pass through.
- Cross-org retraction IDs are filtered (cannot retract another org's entities).
For HEARTBEAT events, the event passes through unchanged.
existing is the caller's current entity store (may be nil). When non-nil, it is used to look up the existing entity for edge-union and provenance-append. The map is read-only — ApplyEvent never modifies it.
func (*Merger) MergeEntity ¶
func (m *Merger) MergeEntity(incoming fedtypes.Entity, existing *fedtypes.Entity) (*fedtypes.Entity, error)
MergeEntity applies federation merge policy to a single incoming entity.
Rules:
- public.* entities are accepted and merged unconditionally
- {org}.* entities are accepted only when org == cfg.LocalNamespace
- All other entities are rejected (cross-org overwrite prevented)
existing may be nil for a brand-new entity. On success returns the merged entity and nil error. On rejection returns nil and a descriptive error.
type RegistryInterface ¶
type RegistryInterface interface {
RegisterWithConfig(component.RegistrationConfig) error
}
RegistryInterface is the minimal interface required for component registration. The concrete *component.Registry satisfies this interface.