directorybridge

package
v1.0.0-beta.76 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 17, 2026 License: MIT Imports: 27 Imported by: 0

README

Directory Bridge Output Component

An output component that registers SemStreams agents with AGNTCY directories using OASF (Open Agent Specification Framework) records.

Overview

The directory-bridge component enables SemStreams agents to participate in the Internet of Agents ecosystem by automatically registering them with AGNTCY directory services. It watches for OASF records generated by the oasf-generator processor and maintains persistent registrations through periodic heartbeats and DID-based identity verification.

This component implements Phase 2 of the AGNTCY integration strategy, providing federated discovery capabilities so external systems can find and invoke SemStreams agents through standard directories.

Architecture

flowchart TD
    subgraph "NATS Infrastructure"
        OASF[OASF_RECORDS KV]
        ENTITY[ENTITY_STATES KV]
    end

    subgraph "SemStreams Components"
        OASFGEN[OASF Generator<br/>processor]
        BRIDGE[Directory Bridge<br/>output]
        IDENTITY[Identity Provider]
    end

    subgraph "External Services"
        DIRECTORY[AGNTCY Directory<br/>API]
    end

    ENTITY -->|watch changes| OASFGEN
    OASFGEN -->|store records| OASF
    OASF -->|watch updates| BRIDGE
    IDENTITY -->|create/resolve DID| BRIDGE
    BRIDGE -->|register| DIRECTORY
    BRIDGE -->|heartbeat| DIRECTORY
    BRIDGE -->|deregister| DIRECTORY

    style BRIDGE fill:#4a90e2,stroke:#2e5c8a,color:#fff
    style DIRECTORY fill:#50c878,stroke:#2e7d4e,color:#fff
    style IDENTITY fill:#f5a623,stroke:#c47e1a,color:#fff

Features

  • Automatic Registration: Watches OASF KV bucket and registers agents as records are created/updated
  • DID Identity: Creates decentralized identifiers (DIDs) for agents using pluggable identity providers
  • Heartbeat Management: Maintains registrations with periodic heartbeats before TTL expiration
  • Graceful Deregistration: Removes all agent registrations on component shutdown
  • Retry Logic: Configurable retry behavior for failed registration attempts
  • Multiple Providers: Supports local (did:key) and AGNTCY identity providers

Configuration

Basic Configuration
components:
  - name: dir-bridge
    type: directory-bridge
    config:
      directory_url: "https://directory.agntcy.dev"
      heartbeat_interval: "30s"
      registration_ttl: "5m"
      identity_provider: "local"
      oasf_kv_bucket: "OASF_RECORDS"
Advanced Configuration
components:
  - name: dir-bridge
    type: directory-bridge
    config:
      directory_url: "https://directory.example.com"
      heartbeat_interval: "30s"
      registration_ttl: "5m"
      identity_provider: "local"
      oasf_kv_bucket: "OASF_RECORDS"
      retry_count: 3
      retry_delay: "1s"

      ports:
        inputs:
          - name: oasf_records
            subject: "oasf.record.generated.>"
            type: kv-watch
            required: true
            description: "Watch for generated OASF records"

        outputs:
          - name: registration_events
            subject: "directory.registration.*"
            type: jetstream
            required: false
            description: "Emit registration events"
Configuration Options
Option Type Default Description
directory_url string - AGNTCY directory service base URL
heartbeat_interval duration 30s How often to send heartbeats
registration_ttl duration 5m Registration time-to-live
identity_provider string local Identity provider type (local, agntcy)
oasf_kv_bucket string OASF_RECORDS KV bucket to watch for OASF records
retry_count int 3 Number of retries for failed operations
retry_delay duration 1s Initial delay between retries
ports object (default) Input/output port configuration

NATS Topology

Input Sources
Source Type Purpose
OASF_RECORDS KV Watch Watches for agent OASF records from oasf-generator
Output Destinations
Destination Type Purpose
directory.registration.* JetStream Optional registration event notifications
Data Flow
sequenceDiagram
    participant OG as OASF Generator
    participant KV as OASF_RECORDS KV
    participant DB as Directory Bridge
    participant IP as Identity Provider
    participant DIR as AGNTCY Directory

    OG->>KV: Put OASF record
    KV-->>DB: KV watch notification
    DB->>DB: Parse OASF record
    DB->>IP: Create/resolve DID
    IP-->>DB: Return DID
    DB->>DIR: POST /v1/agents (register)
    DIR-->>DB: Registration ID + expiry
    DB->>DB: Store registration

    loop Every heartbeat_interval
        DB->>DB: Check expiration
        DB->>DIR: POST /v1/agents/{id}/heartbeat
        DIR-->>DB: Updated expiry
    end

    DB->>DIR: DELETE /v1/agents/{id}
    DIR-->>DB: Deregistration confirmed

Registration Lifecycle

1. Detection Phase

The component watches the OASF_RECORDS KV bucket for changes:

watcher, err := kv.Watch(ctx, ">", jetstream.IgnoreDeletes())

When an OASF record is created or updated, the watcher receives a notification.

2. Identity Resolution Phase

For each agent being registered, the component creates or resolves a DID identity:

identity, err := identityProvider.CreateIdentity(ctx, identity.CreateIdentityOptions{
    DisplayName: record.Name,
})
agentDID := identity.DIDString()

Supported identity providers:

  • local: Creates did:key identities using Ed25519 key pairs
  • agntcy: Resolves identities through AGNTCY identity service (future)
3. Registration Phase

The component sends a registration request to the AGNTCY directory:

POST /v1/agents
{
  "agent_did": "did:key:z6Mk...",
  "oasf_record": { ... },
  "ttl": 300,
  "metadata": {
    "semstreams_entity_id": "acme.ops.agentic.system.agent.001",
    "source": "semstreams"
  }
}

Response contains registration ID and expiration time:

{
  "success": true,
  "registration_id": "reg_abc123",
  "expires_at": "2026-02-13T15:30:00Z"
}
4. Heartbeat Phase

A background goroutine periodically sends heartbeats to maintain registrations before they expire:

// Heartbeat loop runs at configured interval
ticker := time.NewTicker(heartbeatInterval)

// Only send heartbeat if expiry is approaching
if time.Until(registration.ExpiresAt) < heartbeatInterval*2 {
    client.Heartbeat(ctx, &HeartbeatRequest{
        RegistrationID: registration.RegistrationID,
        AgentDID:       registration.AgentDID,
    })
}
5. Deregistration Phase

On component shutdown, all active registrations are removed:

for _, registration := range registrations {
    client.Deregister(ctx, &DeregistrationRequest{
        RegistrationID: registration.RegistrationID,
        AgentDID:       registration.AgentDID,
    })
}

Usage Example

Complete Integration Example
# flow.yaml
components:
  # Generate OASF records from agent entities
  - name: oasf-gen
    type: oasf-generator
    config:
      entity_kv_bucket: ENTITY_STATES
      oasf_kv_bucket: OASF_RECORDS
      watch_pattern: "*.agent.*"

  # Register agents with AGNTCY directory
  - name: dir-bridge
    type: directory-bridge
    config:
      directory_url: "https://directory.agntcy.dev"
      heartbeat_interval: "30s"
      registration_ttl: "5m"
      identity_provider: "local"
      oasf_kv_bucket: "OASF_RECORDS"
Programmatic Usage
import (
    directorybridge "github.com/c360studio/semstreams/output/directory-bridge"
    "github.com/c360studio/semstreams/component"
)

// Register component with registry
func init() {
    directorybridge.Register(registry)
}

// Create component instance
config := directorybridge.DefaultConfig()
config.DirectoryURL = "https://directory.example.com"
config.HeartbeatInterval = "30s"

rawConfig, _ := json.Marshal(config)
comp, err := directorybridge.NewComponent(rawConfig, component.Dependencies{
    NATSClient: natsClient,
    Logger:     logger,
})

// Initialize and start
comp.Initialize()
comp.Start(ctx)

// Check active registrations
registrations := comp.GetRegistrations()
for _, reg := range registrations {
    fmt.Printf("Agent %s registered as %s\n", reg.EntityID, reg.RegistrationID)
}
Querying Registration Status
// Get specific registration
registration := component.GetRegistration("acme.ops.agentic.system.agent.001")
if registration != nil {
    fmt.Printf("DID: %s\n", registration.AgentDID)
    fmt.Printf("Expires: %s\n", registration.ExpiresAt)
    fmt.Printf("Last Heartbeat: %s\n", registration.LastHeartbeat)
}

// List all registrations
registrations := component.GetRegistrations()
fmt.Printf("Active registrations: %d\n", len(registrations))

API Reference

DirectoryClient

HTTP client for AGNTCY directory API operations.

type DirectoryClient struct {
    baseURL    string
    httpClient *http.Client
}

// Register registers an agent with the directory
func (c *DirectoryClient) Register(ctx context.Context, req *RegistrationRequest) (*RegistrationResponse, error)

// Heartbeat sends a heartbeat to renew a registration
func (c *DirectoryClient) Heartbeat(ctx context.Context, req *HeartbeatRequest) (*HeartbeatResponse, error)

// Deregister removes an agent from the directory
func (c *DirectoryClient) Deregister(ctx context.Context, req *DeregistrationRequest) error

// Discover searches the directory for agents
func (c *DirectoryClient) Discover(ctx context.Context, query *DiscoveryQuery) (*DiscoveryResponse, error)
RegistrationManager

Manages the lifecycle of agent registrations including heartbeats.

type RegistrationManager struct {
    client           *DirectoryClient
    identityProvider identity.Provider
    config           Config
    logger           *slog.Logger
}

// RegisterAgent registers an agent with the directory
func (rm *RegistrationManager) RegisterAgent(ctx context.Context, entityID string,
    record *oasfgenerator.OASFRecord, agentIdentity *identity.AgentIdentity) error

// UpdateRegistration updates an existing registration with new OASF data
func (rm *RegistrationManager) UpdateRegistration(ctx context.Context, entityID string,
    record *oasfgenerator.OASFRecord) error

// Deregister removes an agent from the directory
func (rm *RegistrationManager) Deregister(ctx context.Context, entityID string) error

// GetRegistration returns the registration for an entity
func (rm *RegistrationManager) GetRegistration(entityID string) *Registration

// ListRegistrations returns all active registrations
func (rm *RegistrationManager) ListRegistrations() []*Registration
Registration

Represents an active directory registration.

type Registration struct {
    EntityID       string                      // SemStreams entity ID
    RegistrationID string                      // Directory registration ID
    AgentDID       string                      // Agent's DID
    OASFRecord     *oasfgenerator.OASFRecord  // Agent's OASF specification
    RegisteredAt   time.Time                   // Registration creation time
    ExpiresAt      time.Time                   // Registration expiration time
    LastHeartbeat  time.Time                   // Last heartbeat timestamp
    Retries        int                         // Number of registration retries
}

Testing

Unit Tests
# Run unit tests
cd output/directory-bridge
go test -v

# Run with race detector
go test -race -v

# Run specific test
go test -v -run TestComponent_Initialize
Integration Tests

Integration tests require NATS infrastructure via testcontainers:

# Run integration tests
go test -tags=integration -v

# Test directory registration flow
go test -tags=integration -v -run TestComponent_Registration
Mock Directory Server

For testing without external dependencies, use the mock directory implementation:

import "github.com/c360studio/semstreams/output/directory-bridge"

// Create mock directory
mock := directorybridge.NewMockDirectory()

// Configure component to use mock
config.DirectoryURL = mock.URL()

// Verify registrations
registrations := mock.GetRegistrations()
assert.Equal(t, 1, len(registrations))

Metrics

The component exposes health and data flow metrics through the standard component interface:

// Health metrics
health := component.Health()
fmt.Printf("Healthy: %v\n", health.Healthy)
fmt.Printf("Uptime: %s\n", health.Uptime)
fmt.Printf("Errors: %d\n", health.ErrorCount)

// Data flow metrics
flow := component.DataFlow()
fmt.Printf("Error Rate: %.2f%%\n", flow.ErrorRate*100)
fmt.Printf("Last Activity: %s\n", flow.LastActivity)
Prometheus Metrics

When integrated with Prometheus observability:

Metric Type Description
directory_bridge_registrations_total Counter Total successful registrations
directory_bridge_registration_errors_total Counter Total registration failures
directory_bridge_heartbeats_sent_total Counter Total heartbeats sent
directory_bridge_heartbeat_errors_total Counter Total heartbeat failures
directory_bridge_active_registrations Gauge Currently active registrations
directory_bridge_registration_duration_seconds Histogram Registration operation duration

Troubleshooting

Registration Failures

Symptom: Agents not appearing in directory

Common Causes:

  1. Missing OASF records: Verify oasf-generator is running and producing records
  2. Directory URL misconfigured: Check directory_url in configuration
  3. Network issues: Verify connectivity to directory service
  4. Identity provider errors: Check identity provider configuration

Diagnostics:

// Check for OASF records in KV
oasfKV, _ := natsClient.GetKeyValueBucket(ctx, "OASF_RECORDS")
keys, _ := oasfKV.Keys(ctx)
fmt.Printf("OASF records: %d\n", len(keys))

// Check component health
health := component.Health()
if !health.Healthy {
    fmt.Printf("Component unhealthy: %s\n", health.Status)
    fmt.Printf("Errors: %d\n", health.ErrorCount)
}
Heartbeat Issues

Symptom: Registrations expiring unexpectedly

Common Causes:

  1. Heartbeat interval too long: Set to less than 1/2 of TTL
  2. Component stopped: Verify component is running
  3. Network interruptions: Check for transient connectivity issues

Solution:

# Ensure heartbeat interval is well below TTL
heartbeat_interval: "30s"   # Heartbeat every 30s
registration_ttl: "5m"      # Expire after 5 minutes
KV Watcher Not Receiving Updates

Symptom: New OASF records not triggering registrations

Common Causes:

  1. Bucket doesn't exist: OASF_RECORDS bucket not created
  2. Watch pattern mismatch: Watcher pattern doesn't match keys
  3. Consumer name collision: Multiple instances using same consumer

Solution:

# For testing, add unique consumer suffix
consumer_name_suffix: "-test-1"

# For production, ensure unique component names
name: dir-bridge-prod
Identity Provider Errors

Symptom: "Failed to create identity" errors in logs

Common Causes:

  1. Invalid provider type: Check identity_provider configuration
  2. Provider initialization failed: Review startup logs
  3. Cryptographic operation failed: Verify system entropy

Solution:

# Use local provider for development
identity_provider: "local"

# For AGNTCY provider (future)
identity_provider: "agntcy"
identity_service_url: "https://identity.agntcy.dev"

Performance Considerations

Heartbeat Optimization
  • Heartbeats only sent when expiration approaches (within 2x heartbeat interval)
  • Batch heartbeats for multiple registrations in single loop iteration
  • Failed heartbeats don't block other registrations
Memory Usage
  • Registration manager maintains in-memory map of active registrations
  • OASF records stored in NATS KV, not duplicated in memory
  • KV watcher uses bounded channel to prevent memory growth
Network Efficiency
  • HTTP client reuses connections with 30-second timeout
  • Response bodies limited to 1MB to prevent memory exhaustion
  • Graceful shutdown ensures all deregistrations complete

Security Considerations

DID Identity

Agents registered with the directory receive cryptographically verifiable DID identities:

  • Local provider: Creates did:key using Ed25519 key pairs
  • Private keys: Stored securely by identity provider implementation
  • Verifiable: External systems can cryptographically verify agent identity
Network Security
  • HTTPS required for production directory URLs
  • HTTP client validates TLS certificates
  • No sensitive data in OASF metadata (use extensions for custom fields)
Access Control

Directory-level access control is handled by the AGNTCY directory service. The bridge component:

  • Presents agent DIDs for authentication
  • Respects directory rate limits and policies
  • Includes metadata for audit trails

See Also

Documentation

Overview

Package directorybridge provides an output component that registers agents with AGNTCY directories using OASF (Open Agent Specification Framework) records.

Overview

The directory-bridge component watches for OASF records in a NATS KV bucket and automatically registers/updates agents with AGNTCY directory services. It maintains registrations through periodic heartbeats and handles deregistration on shutdown.

Architecture

┌─────────────────┐     ┌──────────────────┐     ┌─────────────────┐
│  OASF_RECORDS   │────▶│  Directory       │────▶│  AGNTCY         │
│  KV Bucket      │     │  Bridge          │     │  Directory      │
└─────────────────┘     └──────────────────┘     └─────────────────┘
                               │
                               ▼
                        ┌──────────────────┐
                        │  Identity        │
                        │  Provider        │
                        └──────────────────┘

Components

The package consists of several key components:

  • Component: The main LifecycleComponent that watches KV and orchestrates registration
  • DirectoryClient: HTTP client for communicating with AGNTCY directory APIs
  • RegistrationManager: Manages the lifecycle of agent registrations including heartbeats
  • Config: Configuration for directory URL, heartbeat intervals, and retry settings

Configuration

Example configuration:

{
  "directory_url": "https://directory.agntcy.dev",
  "heartbeat_interval": "30s",
  "registration_ttl": "5m",
  "identity_provider": "local",
  "oasf_kv_bucket": "OASF_RECORDS",
  "retry_count": 3,
  "retry_delay": "1s"
}

Identity Integration

The bridge uses the agentic/identity package to create or resolve DIDs for agents being registered. Supported identity providers:

  • "local": Creates did:key identities locally
  • "agntcy": Resolves identities through AGNTCY identity service (future)

NATS Topology

Input:

  • OASF_RECORDS KV bucket (watch): Receives OASF records from oasf-generator

Output:

  • directory.registration.* (optional): Emits registration events

Registration Lifecycle

  1. KV watcher detects new/updated OASF record
  2. Component parses record and extracts entity ID
  3. RegistrationManager creates/retrieves DID identity
  4. DirectoryClient sends registration request
  5. Registration stored with expiration time
  6. Heartbeat loop maintains registration before expiry
  7. On shutdown, all agents are deregistered

Usage

Register the component with the component registry:

import directorybridge "github.com/c360studio/semstreams/output/directory-bridge"

func init() {
    directorybridge.Register(registry)
}

See Also

  • processor/oasf-generator: Generates OASF records from entity predicates
  • agentic/identity: DID and verifiable credential management
  • docs/architecture/adr-019-agntcy-integration.md: Architecture decision record

Index

Constants

View Source
const (
	// BackendHTTP is the default — POST JSON to a SemStreams HTTP directory.
	BackendHTTP = "http"
	// BackendAgntcyGRPC is the AGNTCY agntcy/dir StoreService over gRPC.
	BackendAgntcyGRPC = "agntcy_grpc"
)

Backend constants accepted by Config.Backend.

Variables

View Source
var ErrRefreshNotSupported = errors.New("agntcy_grpc backend: refresh has no wire counterpart (records are CID-anchored)")

ErrRefreshNotSupported is returned by GRPCBackend.Refresh — the agntcy/dir StoreService stores content-addressed records (CIDs) that do not expire on the publisher side, so heartbeat-style refresh has no wire counterpart. The bridge's RegistrationManager already skips the Refresh path when ExpiresAt.IsZero() (set by Publish for this backend), so this sentinel only surfaces if a caller bypasses the manager and invokes Refresh directly. Use errors.Is to test for it.

Functions

func NewComponent

func NewComponent(rawConfig json.RawMessage, deps component.Dependencies) (component.Discoverable, error)

NewComponent creates a new directory bridge component.

func Register

func Register(registry RegistryInterface) error

Register registers the directory bridge output component with the given registry.

Types

type AgntcyGRPCConfig

type AgntcyGRPCConfig struct {
	// Endpoint is the host:port the gRPC client dials.
	// Example: "prod.api.ads.outshift.io:443".
	Endpoint string `json:"endpoint" schema:"type:string,description:gRPC endpoint host:port,category:basic"`

	// TLS controls whether the client establishes TLS on dial. The
	// hosted hub requires TLS (true); local dev / bufconn tests use
	// false. When false the client uses insecure transport credentials
	// (grpc.WithTransportCredentials(insecure.NewCredentials())) —
	// suitable only for trusted networks and local development.
	TLS bool `json:"tls" schema:"type:bool,description:Establish TLS on dial (required for the hosted hub),category:basic"`

	// Auth carries optional per-RPC OIDC authentication. nil or
	// Type=="none" disables auth (suitable for local dev / private
	// deployments). The hosted hub generally requires Type=="oidc".
	Auth *AuthConfig `json:"auth,omitempty" schema:"type:object,description:Per-RPC OIDC auth (omit for unauthenticated),category:basic"`
}

AgntcyGRPCConfig carries settings for the agntcy_grpc backend.

func (*AgntcyGRPCConfig) Validate

func (g *AgntcyGRPCConfig) Validate() error

Validate validates an AgntcyGRPCConfig.

type AuthConfig

type AuthConfig struct {
	// Type selects the auth flow. One of "none" or "oidc". Empty
	// defaults to "none".
	Type string `json:"type" schema:"type:string,description:Auth flow (none or oidc),category:basic,default:none"`

	// Issuer is the OIDC token endpoint URL (the `token_endpoint` from
	// the issuer's OIDC discovery document). Required when Type=="oidc".
	Issuer string `json:"issuer,omitempty" schema:"type:string,description:OIDC token endpoint URL,category:basic"`

	// ClientID is the OIDC client identifier. Prefer ClientIDEnv for
	// secret management hygiene. Required when Type=="oidc" if
	// ClientIDEnv is not set.
	ClientID string `` /* 128-byte string literal not displayed */

	// ClientIDEnv names an environment variable to read the OIDC
	// client_id from at runtime. Wins over ClientID when both set.
	ClientIDEnv string `` /* 129-byte string literal not displayed */

	// ClientSecretEnv names an environment variable to read the OIDC
	// client_secret from at runtime. Required when Type=="oidc".
	// No inline client_secret field — secrets do not live in JSON
	// config on disk; this is intentional.
	ClientSecretEnv string `` /* 152-byte string literal not displayed */

	// Scopes is the OIDC scope list requested at token-endpoint time.
	Scopes []string `json:"scopes,omitempty" schema:"type:array,description:OIDC scope list,category:basic"`
}

AuthConfig configures per-RPC authentication for the agntcy_grpc backend. The only auth flow supported today is OIDC client credentials (the deployment pattern AGNTCY's hosted hub uses).

func (*AuthConfig) Validate

func (a *AuthConfig) Validate() error

Validate validates an AuthConfig.

type AuthProvider

type AuthProvider interface {
	// PerRPC returns the per-RPC credentials to attach to gRPC calls, or
	// nil for "no auth". The bridge feeds the result into
	// grpc.WithPerRPCCredentials at dial time; a nil return is the
	// signal to omit that DialOption.
	PerRPC() credentials.PerRPCCredentials

	// Close releases any provider-held resources (token caches, HTTP
	// clients). Providers without resources to release implement this
	// as a no-op.
	Close() error
}

AuthProvider supplies per-RPC credentials for the agntcy_grpc backend. Implementations either source a bearer token (OIDC) or signal that no authentication should be attached to outgoing RPCs (NoOp).

The provider lifecycle parallels the Backend's: the bridge constructs one provider per Component.Initialize and closes it via Close at Component.Stop. Token caching / refresh lives inside the provider — callers see only the wrapped credentials.PerRPCCredentials.

func NewAuthProvider

func NewAuthProvider(cfg *AuthConfig) (AuthProvider, error)

NewAuthProvider constructs an AuthProvider from the given config. A nil config or Type "" / "none" returns the NoOp provider. "oidc" reads the client_id and client_secret from the configured env vars (or inline field) and returns an OIDC client-credentials provider that auto-refreshes tokens.

type Backend

type Backend interface {
	// Publish registers (or re-registers) an agent record with the directory.
	// Returns the backend-assigned RecordID, which the bridge stores so it
	// can later Refresh or Withdraw without re-deriving it.
	Publish(ctx context.Context, req *PublishRequest) (*PublishResult, error)

	// Refresh extends the lifetime of a previously published record. The
	// HTTP backend implements this as a heartbeat POST; future gRPC
	// backends may push a renewed CID-anchored record.
	Refresh(ctx context.Context, req *RefreshRequest) (*PublishResult, error)

	// Withdraw removes a previously published record. Called when the
	// source entity is deleted from ENTITY_STATES, and on component Stop
	// for every active registration.
	Withdraw(ctx context.Context, req *WithdrawRequest) error

	// Close releases any backend-held resources (open connections, pools,
	// auth token caches). Called once during component Stop. Backends
	// without resources to release implement this as a no-op.
	Close() error
}

Backend abstracts the wire-level publish/withdraw operations against a directory. The directory-bridge component owns lifecycle, retry, KV watching, and heartbeat scheduling; backends own protocol, encoding, and authentication.

All methods must be safe for concurrent use — the heartbeat loop and the KV watcher goroutine call them concurrently.

Publish must be idempotent. The bridge calls it on every KV change AND when re-registering after a Refresh that returns a new RecordID. Backends that natively support upsert (HTTP POST /v1/agents in our v1 spec; gRPC PushReferrer in agntcy/dir) satisfy this trivially. Backends that don't must internally deduplicate by EntityID.

type Component

type Component struct {
	// contains filtered or unexported fields
}

Component implements the directory bridge output component.

func (*Component) ConfigSchema

func (c *Component) ConfigSchema() component.ConfigSchema

ConfigSchema returns the configuration schema.

func (*Component) DataFlow

func (c *Component) DataFlow() component.FlowMetrics

DataFlow returns current data flow metrics.

func (*Component) GetRegistrations

func (c *Component) GetRegistrations() []*Registration

GetRegistrations returns all active registrations.

func (*Component) Health

func (c *Component) Health() component.HealthStatus

Health returns the current health status.

func (*Component) Initialize

func (c *Component) Initialize() error

Initialize prepares the component, selecting a Backend implementation based on config.Backend. Empty / "http" selects the legacy HTTP wire format; "agntcy_grpc" selects the AGNTCY gRPC StoreService wire format with optional per-RPC OIDC auth.

func (*Component) InputPorts

func (c *Component) InputPorts() []component.Port

InputPorts returns configured input port definitions.

func (*Component) Meta

func (c *Component) Meta() component.Metadata

Meta returns component metadata.

func (*Component) OutputPorts

func (c *Component) OutputPorts() []component.Port

OutputPorts returns configured output port definitions.

func (*Component) Start

func (c *Component) Start(ctx context.Context) error

Start begins watching for OASF records and registering agents.

func (*Component) Stop

func (c *Component) Stop(timeout time.Duration) error

Stop gracefully stops the component.

type Config

type Config struct {
	// Ports defines the input/output port configuration.
	Ports *component.PortConfig `json:"ports" schema:"type:ports,description:Port configuration,category:basic"`

	// Backend selects the directory wire format. One of:
	//   - "http" (default) — POST /v1/agents JSON to DirectoryURL.
	//     The legacy SemStreams-defined HTTP protocol that the in-tree
	//     mock and any privately hosted HTTP directory speak.
	//   - "agntcy_grpc" — AGNTCY agntcy/dir StoreService over gRPC.
	//     Use this for the hosted hub at prod.api.ads.outshift.io and
	//     for any AGNTCY-conformant directory.
	// Empty defaults to "http" for backwards compatibility with
	// pre-PR-C configs.
	Backend string `` /* 127-byte string literal not displayed */

	// DirectoryURL is the HTTP backend's directory service URL.
	// Ignored when Backend == "agntcy_grpc" — that path uses AgntcyGRPC.Endpoint.
	DirectoryURL string `json:"directory_url" schema:"type:string,description:AGNTCY directory service URL (HTTP backend only),category:basic"`

	// AgntcyGRPC carries the agntcy_grpc backend's settings. Ignored
	// when Backend is not "agntcy_grpc".
	AgntcyGRPC *AgntcyGRPCConfig `json:"agntcy_grpc,omitempty" schema:"type:object,description:agntcy_grpc backend settings,category:basic"`

	// HeartbeatInterval is how often to send heartbeats to the directory.
	HeartbeatInterval string `json:"heartbeat_interval" schema:"type:string,description:Heartbeat interval,category:basic,default:30s"`

	// RegistrationTTL is the time-to-live for registrations.
	RegistrationTTL string `json:"registration_ttl" schema:"type:string,description:Registration time-to-live,category:basic,default:5m"`

	// IdentityProvider specifies which identity provider to use.
	// Values: "local", "agntcy"
	IdentityProvider string `json:"identity_provider" schema:"type:string,description:Identity provider type,category:basic,default:local"`

	// OASFKVBucket is the KV bucket to watch for OASF records.
	OASFKVBucket string `json:"oasf_kv_bucket" schema:"type:string,description:KV bucket for OASF records,category:basic,default:OASF_RECORDS"`

	// RetryCount is the number of retries for failed registrations.
	RetryCount int `json:"retry_count" schema:"type:int,description:Number of registration retries,category:advanced,default:3"`

	// RetryDelay is the initial delay between retries.
	RetryDelay string `json:"retry_delay" schema:"type:string,description:Initial retry delay,category:advanced,default:1s"`

	// ConsumerNameSuffix adds a suffix to consumer names for uniqueness in tests.
	ConsumerNameSuffix string `json:"consumer_name_suffix" schema:"type:string,description:Suffix for consumer names,category:advanced"`

	// DeleteConsumerOnStop enables consumer cleanup on stop (for testing).
	DeleteConsumerOnStop bool `` /* 128-byte string literal not displayed */
}

Config defines the configuration for the directory bridge component.

func DefaultConfig

func DefaultConfig() Config

DefaultConfig returns the default configuration.

func (*Config) GetHeartbeatInterval

func (c *Config) GetHeartbeatInterval() time.Duration

GetHeartbeatInterval returns the heartbeat interval.

func (*Config) GetRegistrationTTL

func (c *Config) GetRegistrationTTL() time.Duration

GetRegistrationTTL returns the registration TTL.

func (*Config) GetRetryDelay

func (c *Config) GetRetryDelay() time.Duration

GetRetryDelay returns the retry delay.

func (*Config) Validate

func (c *Config) Validate() error

Validate validates the configuration.

type DeregistrationRequest

type DeregistrationRequest struct {
	// RegistrationID is the registration to remove.
	RegistrationID string `json:"registration_id"`

	// AgentDID is the agent's DID.
	AgentDID string `json:"agent_did"`
}

DeregistrationRequest represents a request to deregister an agent.

type DirectoryClient

type DirectoryClient struct {
	// contains filtered or unexported fields
}

DirectoryClient handles communication with the AGNTCY directory service.

func NewDirectoryClient

func NewDirectoryClient(baseURL string) *DirectoryClient

NewDirectoryClient creates a new directory client.

func (*DirectoryClient) Deregister

func (c *DirectoryClient) Deregister(ctx context.Context, req *DeregistrationRequest) error

Deregister removes an agent from the directory.

func (*DirectoryClient) Discover

Discover searches the directory for agents.

func (*DirectoryClient) Heartbeat

Heartbeat sends a heartbeat to renew a registration.

func (*DirectoryClient) Register

Register registers an agent with the directory.

type DiscoveredAgent

type DiscoveredAgent struct {
	// AgentDID is the agent's DID.
	AgentDID string `json:"agent_did"`

	// OASFRecord is the agent's OASF specification.
	OASFRecord *oasfgenerator.OASFRecord `json:"oasf_record"`

	// RegisteredAt is when the agent registered.
	RegisteredAt time.Time `json:"registered_at"`

	// ExpiresAt is when the registration expires.
	ExpiresAt time.Time `json:"expires_at"`
}

DiscoveredAgent represents an agent found in the directory.

type DiscoveryQuery

type DiscoveryQuery struct {
	// Capabilities filters by required capabilities.
	Capabilities []string `json:"capabilities,omitempty"`

	// Domains filters by domains.
	Domains []string `json:"domains,omitempty"`

	// Limit limits the number of results.
	Limit int `json:"limit,omitempty"`
}

DiscoveryQuery represents a search query for agents.

type DiscoveryResponse

type DiscoveryResponse struct {
	// Agents are the matching agents.
	Agents []DiscoveredAgent `json:"agents"`

	// Total is the total number of matches.
	Total int `json:"total"`
}

DiscoveryResponse contains discovered agents.

type GRPCBackend

type GRPCBackend struct {
	// contains filtered or unexported fields
}

GRPCBackend implements Backend by talking to an AGNTCY-compatible directory over the agntcy/dir gRPC StoreService.

The wire model differs from the HTTP backend in a few ways the rest of the bridge needs to understand:

  • Records are content-addressed. PublishResult.RecordID is the CID the server returns from Push; it never changes for an unchanged record.
  • There is no expiry. PublishResult.ExpiresAt is always zero. The bridge's heartbeat scheduler is documented to skip refresh when ExpiresAt.IsZero() (see backend.go contract), so Refresh here is a no-op that simply echoes the RecordID.
  • Push and Delete are streaming RPCs. We send exactly one record/ref per call because the bridge invokes us one entity at a time. The ceremony around streams is local to this file.

Authentication is not handled here. Operators pass grpc.DialOption values (TLS credentials, per-RPC OIDC tokens) at construction; this keeps auth strategies pluggable without growing the backend's surface.

func NewGRPCBackend

func NewGRPCBackend(target string, opts ...grpc.DialOption) (*GRPCBackend, error)

NewGRPCBackend dials the directory at target and returns a backend ready to Publish/Withdraw. Caller is responsible for closing it via Close().

Pass grpc.WithTransportCredentials(...) for TLS; for the hosted hub at prod.api.ads.outshift.io that means real cert validation (credentials.NewTLS(nil) covers the default OS root pool case). For local dev / bufconn tests, pass grpc.WithTransportCredentials(insecure.NewCredentials()) — from google.golang.org/grpc/credentials/insecure. Production wire-up uses the NewGRPCBackendWithAuth constructor and the buildGRPCDialOptions helper in auth.go to assemble both transport credentials and per-RPC OIDC auth in one shot.

func NewGRPCBackendFromClient

func NewGRPCBackendFromClient(conn *grpc.ClientConn) *GRPCBackend

NewGRPCBackendFromClient is the test-side constructor: callers (notably the bufconn tests) build their own *grpc.ClientConn and inject it. Close() will still tear down the connection — tests that want to share a conn across backends should build one backend or call Close themselves.

func NewGRPCBackendWithAuth

func NewGRPCBackendWithAuth(target string, auth AuthProvider, dialOpts ...grpc.DialOption) (*GRPCBackend, error)

NewGRPCBackendWithAuth is the production constructor used by Component.Initialize. It dials target with the supplied DialOptions and binds the AuthProvider to the backend's lifecycle so Close() releases both the gRPC connection and any provider-held resources (OIDC token cache HTTP clients, etc.).

The dialOpts slice is expected to already include transport credentials (TLS or insecure) and any grpc.WithPerRPCCredentials needed for auth; see buildGRPCDialOptions in auth.go for the canonical construction.

Note on error semantics: grpc.NewClient is non-blocking — it only validates dial arguments and returns. Connection establishment happens lazily on the first RPC. As a result, unreachable hosts / DNS failures / TLS handshake errors will not surface here; the first Publish or Withdraw call returns them instead. The error returned by this constructor reflects argument-validation failures only.

func (*GRPCBackend) Close

func (b *GRPCBackend) Close() error

Close tears down the underlying gRPC client connection and releases any bound AuthProvider resources. Safe to call once; subsequent calls return whatever grpc.ClientConn.Close returns (typically nil even when already closed).

func (*GRPCBackend) Publish

func (b *GRPCBackend) Publish(ctx context.Context, req *PublishRequest) (*PublishResult, error)

Publish marshals the OASF record into a structpb.Struct, drives one round of the StoreService.Push bidi stream, and returns the server-assigned CID. ExpiresAt is intentionally zero — see type comment.

func (*GRPCBackend) Refresh

func (b *GRPCBackend) Refresh(_ context.Context, req *RefreshRequest) (*PublishResult, error)

Refresh returns ErrRefreshNotSupported. The bridge's RegistrationManager skips this code path for CID-anchored backends (Publish returns zero ExpiresAt, and sendHeartbeats short-circuits on IsZero), so this method only fires when a caller bypasses the manager and invokes Refresh directly. Pre-PR-C behaviour was to echo the RecordID back silently — the typed sentinel surfaces the misuse loudly, per the go-reviewer pass on PR #70.

func (*GRPCBackend) Withdraw

func (b *GRPCBackend) Withdraw(ctx context.Context, req *WithdrawRequest) error

Withdraw drives the StoreService.Delete client-streaming RPC for one RecordRef and waits for the Empty response.

type HTTPBackend

type HTTPBackend struct {
	// contains filtered or unexported fields
}

HTTPBackend implements Backend by delegating to the in-package DirectoryClient, which speaks the SemStreams-defined HTTP/JSON directory protocol (POST /v1/agents, POST /v1/agents/{id}/heartbeat, DELETE /v1/agents/{id}).

This is the only backend wired today; the test mock and any privately hosted directory server speak this protocol. A second backend implementing the AGNTCY agntcy/dir gRPC service is planned but not yet in tree.

func NewHTTPBackend

func NewHTTPBackend(baseURL string) *HTTPBackend

NewHTTPBackend constructs a backend pointing at baseURL. baseURL must be non-empty; the underlying client returns errors at call time if it is, preserving the original behavior.

func NewHTTPBackendFromClient

func NewHTTPBackendFromClient(client *DirectoryClient) *HTTPBackend

NewHTTPBackendFromClient lets callers (notably tests) inject a pre-configured *DirectoryClient. Keeps the existing DirectoryClient unit tests usable as-is.

func (*HTTPBackend) Close

func (b *HTTPBackend) Close() error

Close is a no-op for the HTTP backend — net/http's Transport pools idle connections internally and reclaims them on GC. No backend-owned resources to release.

func (*HTTPBackend) Publish

func (b *HTTPBackend) Publish(ctx context.Context, req *PublishRequest) (*PublishResult, error)

Publish translates the domain request into the HTTP RegistrationRequest and adapts the response back. A non-success response from a 2xx status is treated as an error so the bridge can retry the same way it did before the refactor.

func (*HTTPBackend) Refresh

func (b *HTTPBackend) Refresh(ctx context.Context, req *RefreshRequest) (*PublishResult, error)

Refresh sends a heartbeat to extend the registration's lifetime.

func (*HTTPBackend) Withdraw

func (b *HTTPBackend) Withdraw(ctx context.Context, req *WithdrawRequest) error

Withdraw deregisters a record by ID.

type HeartbeatRequest

type HeartbeatRequest struct {
	// RegistrationID is the registration to renew.
	RegistrationID string `json:"registration_id"`

	// AgentDID is the agent's DID.
	AgentDID string `json:"agent_did"`
}

HeartbeatRequest represents a registration renewal request.

type HeartbeatResponse

type HeartbeatResponse struct {
	// Success indicates if the heartbeat succeeded.
	Success bool `json:"success"`

	// ExpiresAt is the new expiration time.
	ExpiresAt time.Time `json:"expires_at,omitempty"`

	// Error contains error details if heartbeat failed.
	Error string `json:"error,omitempty"`
}

HeartbeatResponse represents the heartbeat response.

type MockDirectory

type MockDirectory struct {

	// Call counters for assertions
	RegisterCalls   int
	HeartbeatCalls  int
	DeregisterCalls int
	DiscoverCalls   int
	// contains filtered or unexported fields
}

MockDirectory provides a test mock for the AGNTCY directory service.

func NewMockDirectory

func NewMockDirectory() *MockDirectory

NewMockDirectory creates a new mock directory server.

func (*MockDirectory) Close

func (md *MockDirectory) Close()

Close shuts down the mock server.

func (*MockDirectory) GetRegistration

func (md *MockDirectory) GetRegistration(id string) *RegistrationRequest

GetRegistration returns a stored registration by ID.

func (*MockDirectory) RegistrationCount

func (md *MockDirectory) RegistrationCount() int

RegistrationCount returns the number of active registrations.

func (*MockDirectory) SetFailNextDeregister

func (md *MockDirectory) SetFailNextDeregister(fail bool)

SetFailNextDeregister makes the next deregister call fail.

func (*MockDirectory) SetFailNextHeartbeat

func (md *MockDirectory) SetFailNextHeartbeat(fail bool)

SetFailNextHeartbeat makes the next heartbeat call fail.

func (*MockDirectory) SetFailNextRegister

func (md *MockDirectory) SetFailNextRegister(fail bool)

SetFailNextRegister makes the next register call fail.

func (*MockDirectory) SetRegisterDelay

func (md *MockDirectory) SetRegisterDelay(d time.Duration)

SetRegisterDelay adds a delay to register calls.

func (*MockDirectory) URL

func (md *MockDirectory) URL() string

URL returns the mock server's URL.

type NoOpAuthProvider

type NoOpAuthProvider struct{}

NoOpAuthProvider attaches no credentials. Suitable for local dev, bufconn tests, and privately hosted directories that don't gate on auth. The hosted hub at prod.api.ads.outshift.io generally does, so production deployments swap this for OIDCAuthProvider.

func (NoOpAuthProvider) Close

func (NoOpAuthProvider) Close() error

Close is a no-op for NoOpAuthProvider.

func (NoOpAuthProvider) PerRPC

PerRPC returns nil — the bridge skips grpc.WithPerRPCCredentials.

type OIDCAuthProvider

type OIDCAuthProvider struct {
	// contains filtered or unexported fields
}

OIDCAuthProvider sources per-RPC bearer tokens from an OIDC issuer via the client-credentials flow. Tokens are cached and auto-refreshed by the underlying clientcredentials.Config.TokenSource, so callers don't need to manage lifetimes — each PerRPC().GetRequestMetadata call returns whatever the cached source hands out.

func (*OIDCAuthProvider) Close

func (p *OIDCAuthProvider) Close() error

Close is a no-op — clientcredentials.Config.TokenSource uses net/http's default transport, which is reclaimed by GC.

func (*OIDCAuthProvider) PerRPC

PerRPC wraps the OAuth2 token source as gRPC PerRPCCredentials.

type PublishRequest

type PublishRequest struct {
	// EntityID is the SemStreams entity ID for the agent. Backends may
	// echo it in protocol-specific metadata so the directory can correlate
	// records back to their source system.
	EntityID string

	// AgentDID is the agent's decentralized identifier, generated by the
	// identity provider before this call. Required.
	AgentDID string

	// Record is the OASF document to publish. The bridge guarantees it is
	// non-nil and has passed oasfgenerator.OASFRecord.Validate.
	Record *oasfgenerator.OASFRecord

	// TTL is the requested lifetime of the registration. The backend may
	// honor it exactly, clamp it to a server-side maximum, or ignore it
	// (CID-anchored backends don't expire on the publisher side).
	TTL time.Duration

	// Metadata is opaque key/value data forwarded by the bridge. Today the
	// bridge sets {"semstreams_entity_id": EntityID, "source": "semstreams"};
	// future callers may add lineage or trace context.
	Metadata map[string]any
}

PublishRequest is the bridge → backend contract for adding or replacing an agent record. Domain-shaped: backends translate to their wire format.

type PublishResult

type PublishResult struct {
	// RecordID is the backend-assigned identifier for this publication.
	//   - HTTP backend: server-issued registration_id
	//   - future gRPC backend: the OASF record CID
	// Persisted by the bridge so subsequent Refresh/Withdraw can address
	// the same record.
	RecordID string

	// ExpiresAt is when the bridge should next refresh. Zero means the
	// backend does not expire records on its own (CID-anchored or
	// long-lived OCI artifact); the bridge skips heartbeat scheduling in
	// that case.
	ExpiresAt time.Time
}

PublishResult is what the backend returns after a successful publish or refresh.

type RefreshRequest

type RefreshRequest struct {
	// RecordID is the previously-issued PublishResult.RecordID.
	RecordID string

	// AgentDID is the agent's DID; some backends bind heartbeat auth to it.
	AgentDID string
}

RefreshRequest is the bridge → backend contract for heartbeating.

type Registration

type Registration struct {
	// EntityID is the SemStreams entity ID.
	EntityID string `json:"entity_id"`

	// RegistrationID is the directory's registration ID.
	RegistrationID string `json:"registration_id"`

	// AgentDID is the agent's decentralized identifier.
	AgentDID string `json:"agent_did"`

	// OASFRecord is the agent's OASF specification.
	OASFRecord *oasfgenerator.OASFRecord `json:"oasf_record"`

	// RegisteredAt is when the registration was created.
	RegisteredAt time.Time `json:"registered_at"`

	// ExpiresAt is when the registration expires.
	ExpiresAt time.Time `json:"expires_at"`

	// LastHeartbeat is when the last heartbeat was sent.
	LastHeartbeat time.Time `json:"last_heartbeat"`

	// Retries is the number of registration retries.
	Retries int `json:"retries"`
}

Registration represents an active directory registration.

type RegistrationError

type RegistrationError struct {
	EntityID string
	Message  string
}

RegistrationError represents a registration failure.

func (*RegistrationError) Error

func (e *RegistrationError) Error() string

type RegistrationManager

type RegistrationManager struct {
	// contains filtered or unexported fields
}

RegistrationManager handles the lifecycle of agent registrations. It owns the heartbeat scheduler and the entityID→Registration map; the concrete protocol/wire format is delegated to a Backend.

func NewRegistrationManager

func NewRegistrationManager(backend Backend, identityProvider identity.Provider, config Config, logger *slog.Logger) *RegistrationManager

NewRegistrationManager creates a new registration manager bound to the given Backend. Callers wrap a *DirectoryClient via NewHTTPBackend or NewHTTPBackendFromClient when constructing for the current HTTP wire format.

func (*RegistrationManager) Deregister

func (rm *RegistrationManager) Deregister(ctx context.Context, entityID string) error

Deregister removes an agent from the directory.

func (*RegistrationManager) GetRegistration

func (rm *RegistrationManager) GetRegistration(entityID string) *Registration

GetRegistration returns the registration for an entity.

func (*RegistrationManager) ListRegistrations

func (rm *RegistrationManager) ListRegistrations() []*Registration

ListRegistrations returns all active registrations.

func (*RegistrationManager) RegisterAgent

func (rm *RegistrationManager) RegisterAgent(ctx context.Context, entityID string, record *oasfgenerator.OASFRecord, agentIdentity *identity.AgentIdentity) error

RegisterAgent registers an agent with the directory.

func (*RegistrationManager) Start

func (rm *RegistrationManager) Start(ctx context.Context) error

Start begins the heartbeat goroutine.

func (*RegistrationManager) Stop

func (rm *RegistrationManager) Stop(ctx context.Context) error

Stop stops the heartbeat goroutine and deregisters all agents.

func (*RegistrationManager) UpdateRegistration

func (rm *RegistrationManager) UpdateRegistration(ctx context.Context, entityID string, record *oasfgenerator.OASFRecord) error

UpdateRegistration updates an existing registration with new OASF data.

type RegistrationRequest

type RegistrationRequest struct {
	// AgentDID is the agent's decentralized identifier.
	AgentDID string `json:"agent_did"`

	// OASFRecord is the agent's OASF specification.
	OASFRecord *oasfgenerator.OASFRecord `json:"oasf_record"`

	// TTL is the registration time-to-live in seconds.
	TTL int `json:"ttl,omitempty"`

	// Metadata contains additional registration metadata.
	Metadata map[string]any `json:"metadata,omitempty"`
}

RegistrationRequest represents a request to register an agent.

type RegistrationResponse

type RegistrationResponse struct {
	// Success indicates if registration succeeded.
	Success bool `json:"success"`

	// RegistrationID is the unique ID for this registration.
	RegistrationID string `json:"registration_id,omitempty"`

	// ExpiresAt is when the registration expires.
	ExpiresAt time.Time `json:"expires_at,omitempty"`

	// Error contains error details if registration failed.
	Error string `json:"error,omitempty"`
}

RegistrationResponse represents the directory's response to a registration.

type RegistryInterface

type RegistryInterface interface {
	RegisterWithConfig(component.RegistrationConfig) error
}

RegistryInterface defines the minimal interface needed for registration.

type WithdrawRequest

type WithdrawRequest struct {
	// RecordID is the previously-issued PublishResult.RecordID.
	RecordID string

	// AgentDID is the agent's DID; some backends bind withdrawal auth to it.
	AgentDID string
}

WithdrawRequest is the bridge → backend contract for removal.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL