directorybridge

package
v1.0.0-alpha.21 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 9, 2026 License: MIT Imports: 18 Imported by: 0

README

Directory Bridge Output Component

An output component that registers SemStreams agents with AGNTCY directories using OASF (Open Agent Specification Framework) records.

Overview

The directory-bridge component enables SemStreams agents to participate in the Internet of Agents ecosystem by automatically registering them with AGNTCY directory services. It watches for OASF records generated by the oasf-generator processor and maintains persistent registrations through periodic heartbeats and DID-based identity verification.

This component implements Phase 2 of the AGNTCY integration strategy, providing federated discovery capabilities so external systems can find and invoke SemStreams agents through standard directories.

Architecture

flowchart TD
    subgraph "NATS Infrastructure"
        OASF[OASF_RECORDS KV]
        ENTITY[ENTITY_STATES KV]
    end

    subgraph "SemStreams Components"
        OASFGEN[OASF Generator<br/>processor]
        BRIDGE[Directory Bridge<br/>output]
        IDENTITY[Identity Provider]
    end

    subgraph "External Services"
        DIRECTORY[AGNTCY Directory<br/>API]
    end

    ENTITY -->|watch changes| OASFGEN
    OASFGEN -->|store records| OASF
    OASF -->|watch updates| BRIDGE
    IDENTITY -->|create/resolve DID| BRIDGE
    BRIDGE -->|register| DIRECTORY
    BRIDGE -->|heartbeat| DIRECTORY
    BRIDGE -->|deregister| DIRECTORY

    style BRIDGE fill:#4a90e2,stroke:#2e5c8a,color:#fff
    style DIRECTORY fill:#50c878,stroke:#2e7d4e,color:#fff
    style IDENTITY fill:#f5a623,stroke:#c47e1a,color:#fff

Features

  • Automatic Registration: Watches OASF KV bucket and registers agents as records are created/updated
  • DID Identity: Creates decentralized identifiers (DIDs) for agents using pluggable identity providers
  • Heartbeat Management: Maintains registrations with periodic heartbeats before TTL expiration
  • Graceful Deregistration: Removes all agent registrations on component shutdown
  • Retry Logic: Configurable retry behavior for failed registration attempts
  • Multiple Providers: Supports local (did:key) and AGNTCY identity providers

Configuration

Basic Configuration
components:
  - name: dir-bridge
    type: directory-bridge
    config:
      directory_url: "https://directory.agntcy.dev"
      heartbeat_interval: "30s"
      registration_ttl: "5m"
      identity_provider: "local"
      oasf_kv_bucket: "OASF_RECORDS"
Advanced Configuration
components:
  - name: dir-bridge
    type: directory-bridge
    config:
      directory_url: "https://directory.example.com"
      heartbeat_interval: "30s"
      registration_ttl: "5m"
      identity_provider: "local"
      oasf_kv_bucket: "OASF_RECORDS"
      retry_count: 3
      retry_delay: "1s"

      ports:
        inputs:
          - name: oasf_records
            subject: "oasf.record.generated.>"
            type: kv-watch
            required: true
            description: "Watch for generated OASF records"

        outputs:
          - name: registration_events
            subject: "directory.registration.*"
            type: jetstream
            required: false
            description: "Emit registration events"
Configuration Options
Option Type Default Description
directory_url string - AGNTCY directory service base URL
heartbeat_interval duration 30s How often to send heartbeats
registration_ttl duration 5m Registration time-to-live
identity_provider string local Identity provider type (local, agntcy)
oasf_kv_bucket string OASF_RECORDS KV bucket to watch for OASF records
retry_count int 3 Number of retries for failed operations
retry_delay duration 1s Initial delay between retries
ports object (default) Input/output port configuration

NATS Topology

Input Sources
Source Type Purpose
OASF_RECORDS KV Watch Watches for agent OASF records from oasf-generator
Output Destinations
Destination Type Purpose
directory.registration.* JetStream Optional registration event notifications
Data Flow
sequenceDiagram
    participant OG as OASF Generator
    participant KV as OASF_RECORDS KV
    participant DB as Directory Bridge
    participant IP as Identity Provider
    participant DIR as AGNTCY Directory

    OG->>KV: Put OASF record
    KV-->>DB: KV watch notification
    DB->>DB: Parse OASF record
    DB->>IP: Create/resolve DID
    IP-->>DB: Return DID
    DB->>DIR: POST /v1/agents (register)
    DIR-->>DB: Registration ID + expiry
    DB->>DB: Store registration

    loop Every heartbeat_interval
        DB->>DB: Check expiration
        DB->>DIR: POST /v1/agents/{id}/heartbeat
        DIR-->>DB: Updated expiry
    end

    DB->>DIR: DELETE /v1/agents/{id}
    DIR-->>DB: Deregistration confirmed

Registration Lifecycle

1. Detection Phase

The component watches the OASF_RECORDS KV bucket for changes:

watcher, err := kv.Watch(ctx, ">", jetstream.IgnoreDeletes())

When an OASF record is created or updated, the watcher receives a notification.

2. Identity Resolution Phase

For each agent being registered, the component creates or resolves a DID identity:

identity, err := identityProvider.CreateIdentity(ctx, identity.CreateIdentityOptions{
    DisplayName: record.Name,
})
agentDID := identity.DIDString()

Supported identity providers:

  • local: Creates did:key identities using Ed25519 key pairs
  • agntcy: Resolves identities through AGNTCY identity service (future)
3. Registration Phase

The component sends a registration request to the AGNTCY directory:

POST /v1/agents
{
  "agent_did": "did:key:z6Mk...",
  "oasf_record": { ... },
  "ttl": 300,
  "metadata": {
    "semstreams_entity_id": "acme.ops.agentic.system.agent.001",
    "source": "semstreams"
  }
}

Response contains registration ID and expiration time:

{
  "success": true,
  "registration_id": "reg_abc123",
  "expires_at": "2026-02-13T15:30:00Z"
}
4. Heartbeat Phase

A background goroutine periodically sends heartbeats to maintain registrations before they expire:

// Heartbeat loop runs at configured interval
ticker := time.NewTicker(heartbeatInterval)

// Only send heartbeat if expiry is approaching
if time.Until(registration.ExpiresAt) < heartbeatInterval*2 {
    client.Heartbeat(ctx, &HeartbeatRequest{
        RegistrationID: registration.RegistrationID,
        AgentDID:       registration.AgentDID,
    })
}
5. Deregistration Phase

On component shutdown, all active registrations are removed:

for _, registration := range registrations {
    client.Deregister(ctx, &DeregistrationRequest{
        RegistrationID: registration.RegistrationID,
        AgentDID:       registration.AgentDID,
    })
}

Usage Example

Complete Integration Example
# flow.yaml
components:
  # Generate OASF records from agent entities
  - name: oasf-gen
    type: oasf-generator
    config:
      entity_kv_bucket: ENTITY_STATES
      oasf_kv_bucket: OASF_RECORDS
      watch_pattern: "*.agent.*"

  # Register agents with AGNTCY directory
  - name: dir-bridge
    type: directory-bridge
    config:
      directory_url: "https://directory.agntcy.dev"
      heartbeat_interval: "30s"
      registration_ttl: "5m"
      identity_provider: "local"
      oasf_kv_bucket: "OASF_RECORDS"
Programmatic Usage
import (
    directorybridge "github.com/c360studio/semstreams/output/directory-bridge"
    "github.com/c360studio/semstreams/component"
)

// Register component with registry
func init() {
    directorybridge.Register(registry)
}

// Create component instance
config := directorybridge.DefaultConfig()
config.DirectoryURL = "https://directory.example.com"
config.HeartbeatInterval = "30s"

rawConfig, _ := json.Marshal(config)
comp, err := directorybridge.NewComponent(rawConfig, component.Dependencies{
    NATSClient: natsClient,
    Logger:     logger,
})

// Initialize and start
comp.Initialize()
comp.Start(ctx)

// Check active registrations
registrations := comp.GetRegistrations()
for _, reg := range registrations {
    fmt.Printf("Agent %s registered as %s\n", reg.EntityID, reg.RegistrationID)
}
Querying Registration Status
// Get specific registration
registration := component.GetRegistration("acme.ops.agentic.system.agent.001")
if registration != nil {
    fmt.Printf("DID: %s\n", registration.AgentDID)
    fmt.Printf("Expires: %s\n", registration.ExpiresAt)
    fmt.Printf("Last Heartbeat: %s\n", registration.LastHeartbeat)
}

// List all registrations
registrations := component.GetRegistrations()
fmt.Printf("Active registrations: %d\n", len(registrations))

API Reference

DirectoryClient

HTTP client for AGNTCY directory API operations.

type DirectoryClient struct {
    baseURL    string
    httpClient *http.Client
}

// Register registers an agent with the directory
func (c *DirectoryClient) Register(ctx context.Context, req *RegistrationRequest) (*RegistrationResponse, error)

// Heartbeat sends a heartbeat to renew a registration
func (c *DirectoryClient) Heartbeat(ctx context.Context, req *HeartbeatRequest) (*HeartbeatResponse, error)

// Deregister removes an agent from the directory
func (c *DirectoryClient) Deregister(ctx context.Context, req *DeregistrationRequest) error

// Discover searches the directory for agents
func (c *DirectoryClient) Discover(ctx context.Context, query *DiscoveryQuery) (*DiscoveryResponse, error)
RegistrationManager

Manages the lifecycle of agent registrations including heartbeats.

type RegistrationManager struct {
    client           *DirectoryClient
    identityProvider identity.Provider
    config           Config
    logger           *slog.Logger
}

// RegisterAgent registers an agent with the directory
func (rm *RegistrationManager) RegisterAgent(ctx context.Context, entityID string,
    record *oasfgenerator.OASFRecord, agentIdentity *identity.AgentIdentity) error

// UpdateRegistration updates an existing registration with new OASF data
func (rm *RegistrationManager) UpdateRegistration(ctx context.Context, entityID string,
    record *oasfgenerator.OASFRecord) error

// Deregister removes an agent from the directory
func (rm *RegistrationManager) Deregister(ctx context.Context, entityID string) error

// GetRegistration returns the registration for an entity
func (rm *RegistrationManager) GetRegistration(entityID string) *Registration

// ListRegistrations returns all active registrations
func (rm *RegistrationManager) ListRegistrations() []*Registration
Registration

Represents an active directory registration.

type Registration struct {
    EntityID       string                      // SemStreams entity ID
    RegistrationID string                      // Directory registration ID
    AgentDID       string                      // Agent's DID
    OASFRecord     *oasfgenerator.OASFRecord  // Agent's OASF specification
    RegisteredAt   time.Time                   // Registration creation time
    ExpiresAt      time.Time                   // Registration expiration time
    LastHeartbeat  time.Time                   // Last heartbeat timestamp
    Retries        int                         // Number of registration retries
}

Testing

Unit Tests
# Run unit tests
cd output/directory-bridge
go test -v

# Run with race detector
go test -race -v

# Run specific test
go test -v -run TestComponent_Initialize
Integration Tests

Integration tests require NATS infrastructure via testcontainers:

# Run integration tests
go test -tags=integration -v

# Test directory registration flow
go test -tags=integration -v -run TestComponent_Registration
Mock Directory Server

For testing without external dependencies, use the mock directory implementation:

import "github.com/c360studio/semstreams/output/directory-bridge"

// Create mock directory
mock := directorybridge.NewMockDirectory()

// Configure component to use mock
config.DirectoryURL = mock.URL()

// Verify registrations
registrations := mock.GetRegistrations()
assert.Equal(t, 1, len(registrations))

Metrics

The component exposes health and data flow metrics through the standard component interface:

// Health metrics
health := component.Health()
fmt.Printf("Healthy: %v\n", health.Healthy)
fmt.Printf("Uptime: %s\n", health.Uptime)
fmt.Printf("Errors: %d\n", health.ErrorCount)

// Data flow metrics
flow := component.DataFlow()
fmt.Printf("Error Rate: %.2f%%\n", flow.ErrorRate*100)
fmt.Printf("Last Activity: %s\n", flow.LastActivity)
Prometheus Metrics

When integrated with Prometheus observability:

Metric Type Description
directory_bridge_registrations_total Counter Total successful registrations
directory_bridge_registration_errors_total Counter Total registration failures
directory_bridge_heartbeats_sent_total Counter Total heartbeats sent
directory_bridge_heartbeat_errors_total Counter Total heartbeat failures
directory_bridge_active_registrations Gauge Currently active registrations
directory_bridge_registration_duration_seconds Histogram Registration operation duration

Troubleshooting

Registration Failures

Symptom: Agents not appearing in directory

Common Causes:

  1. Missing OASF records: Verify oasf-generator is running and producing records
  2. Directory URL misconfigured: Check directory_url in configuration
  3. Network issues: Verify connectivity to directory service
  4. Identity provider errors: Check identity provider configuration

Diagnostics:

// Check for OASF records in KV
oasfKV, _ := natsClient.GetKeyValueBucket(ctx, "OASF_RECORDS")
keys, _ := oasfKV.Keys(ctx)
fmt.Printf("OASF records: %d\n", len(keys))

// Check component health
health := component.Health()
if !health.Healthy {
    fmt.Printf("Component unhealthy: %s\n", health.Status)
    fmt.Printf("Errors: %d\n", health.ErrorCount)
}
Heartbeat Issues

Symptom: Registrations expiring unexpectedly

Common Causes:

  1. Heartbeat interval too long: Set to less than 1/2 of TTL
  2. Component stopped: Verify component is running
  3. Network interruptions: Check for transient connectivity issues

Solution:

# Ensure heartbeat interval is well below TTL
heartbeat_interval: "30s"   # Heartbeat every 30s
registration_ttl: "5m"      # Expire after 5 minutes
KV Watcher Not Receiving Updates

Symptom: New OASF records not triggering registrations

Common Causes:

  1. Bucket doesn't exist: OASF_RECORDS bucket not created
  2. Watch pattern mismatch: Watcher pattern doesn't match keys
  3. Consumer name collision: Multiple instances using same consumer

Solution:

# For testing, add unique consumer suffix
consumer_name_suffix: "-test-1"

# For production, ensure unique component names
name: dir-bridge-prod
Identity Provider Errors

Symptom: "Failed to create identity" errors in logs

Common Causes:

  1. Invalid provider type: Check identity_provider configuration
  2. Provider initialization failed: Review startup logs
  3. Cryptographic operation failed: Verify system entropy

Solution:

# Use local provider for development
identity_provider: "local"

# For AGNTCY provider (future)
identity_provider: "agntcy"
identity_service_url: "https://identity.agntcy.dev"

Performance Considerations

Heartbeat Optimization
  • Heartbeats only sent when expiration approaches (within 2x heartbeat interval)
  • Batch heartbeats for multiple registrations in single loop iteration
  • Failed heartbeats don't block other registrations
Memory Usage
  • Registration manager maintains in-memory map of active registrations
  • OASF records stored in NATS KV, not duplicated in memory
  • KV watcher uses bounded channel to prevent memory growth
Network Efficiency
  • HTTP client reuses connections with 30-second timeout
  • Response bodies limited to 1MB to prevent memory exhaustion
  • Graceful shutdown ensures all deregistrations complete

Security Considerations

DID Identity

Agents registered with the directory receive cryptographically verifiable DID identities:

  • Local provider: Creates did:key using Ed25519 key pairs
  • Private keys: Stored securely by identity provider implementation
  • Verifiable: External systems can cryptographically verify agent identity
Network Security
  • HTTPS required for production directory URLs
  • HTTP client validates TLS certificates
  • No sensitive data in OASF metadata (use extensions for custom fields)
Access Control

Directory-level access control is handled by the AGNTCY directory service. The bridge component:

  • Presents agent DIDs for authentication
  • Respects directory rate limits and policies
  • Includes metadata for audit trails

See Also

Documentation

Overview

Package directorybridge provides an output component that registers agents with AGNTCY directories using OASF (Open Agent Specification Framework) records.

Overview

The directory-bridge component watches for OASF records in a NATS KV bucket and automatically registers/updates agents with AGNTCY directory services. It maintains registrations through periodic heartbeats and handles deregistration on shutdown.

Architecture

┌─────────────────┐     ┌──────────────────┐     ┌─────────────────┐
│  OASF_RECORDS   │────▶│  Directory       │────▶│  AGNTCY         │
│  KV Bucket      │     │  Bridge          │     │  Directory      │
└─────────────────┘     └──────────────────┘     └─────────────────┘
                               │
                               ▼
                        ┌──────────────────┐
                        │  Identity        │
                        │  Provider        │
                        └──────────────────┘

Components

The package consists of several key components:

  • Component: The main LifecycleComponent that watches KV and orchestrates registration
  • DirectoryClient: HTTP client for communicating with AGNTCY directory APIs
  • RegistrationManager: Manages the lifecycle of agent registrations including heartbeats
  • Config: Configuration for directory URL, heartbeat intervals, and retry settings

Configuration

Example configuration:

{
  "directory_url": "https://directory.agntcy.dev",
  "heartbeat_interval": "30s",
  "registration_ttl": "5m",
  "identity_provider": "local",
  "oasf_kv_bucket": "OASF_RECORDS",
  "retry_count": 3,
  "retry_delay": "1s"
}

Identity Integration

The bridge uses the agentic/identity package to create or resolve DIDs for agents being registered. Supported identity providers:

  • "local": Creates did:key identities locally
  • "agntcy": Resolves identities through AGNTCY identity service (future)

NATS Topology

Input:

  • OASF_RECORDS KV bucket (watch): Receives OASF records from oasf-generator

Output:

  • directory.registration.* (optional): Emits registration events

Registration Lifecycle

  1. KV watcher detects new/updated OASF record
  2. Component parses record and extracts entity ID
  3. RegistrationManager creates/retrieves DID identity
  4. DirectoryClient sends registration request
  5. Registration stored with expiration time
  6. Heartbeat loop maintains registration before expiry
  7. On shutdown, all agents are deregistered

Usage

Register the component with the component registry:

import directorybridge "github.com/c360studio/semstreams/output/directory-bridge"

func init() {
    directorybridge.Register(registry)
}

See Also

  • processor/oasf-generator: Generates OASF records from entity predicates
  • agentic/identity: DID and verifiable credential management
  • docs/architecture/adr-019-agntcy-integration.md: Architecture decision record

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func NewComponent

func NewComponent(rawConfig json.RawMessage, deps component.Dependencies) (component.Discoverable, error)

NewComponent creates a new directory bridge component.

func Register

func Register(registry RegistryInterface) error

Register registers the directory bridge output component with the given registry.

Types

type Component

type Component struct {
	// contains filtered or unexported fields
}

Component implements the directory bridge output component.

func (*Component) ConfigSchema

func (c *Component) ConfigSchema() component.ConfigSchema

ConfigSchema returns the configuration schema.

func (*Component) DataFlow

func (c *Component) DataFlow() component.FlowMetrics

DataFlow returns current data flow metrics.

func (*Component) GetRegistrations

func (c *Component) GetRegistrations() []*Registration

GetRegistrations returns all active registrations.

func (*Component) Health

func (c *Component) Health() component.HealthStatus

Health returns the current health status.

func (*Component) Initialize

func (c *Component) Initialize() error

Initialize prepares the component.

func (*Component) InputPorts

func (c *Component) InputPorts() []component.Port

InputPorts returns configured input port definitions.

func (*Component) Meta

func (c *Component) Meta() component.Metadata

Meta returns component metadata.

func (*Component) OutputPorts

func (c *Component) OutputPorts() []component.Port

OutputPorts returns configured output port definitions.

func (*Component) Start

func (c *Component) Start(ctx context.Context) error

Start begins watching for OASF records and registering agents.

func (*Component) Stop

func (c *Component) Stop(timeout time.Duration) error

Stop gracefully stops the component.

type Config

type Config struct {
	// Ports defines the input/output port configuration.
	Ports *component.PortConfig `json:"ports" schema:"type:ports,description:Port configuration,category:basic"`

	// DirectoryURL is the AGNTCY directory service URL.
	DirectoryURL string `json:"directory_url" schema:"type:string,description:AGNTCY directory service URL,category:basic"`

	// HeartbeatInterval is how often to send heartbeats to the directory.
	HeartbeatInterval string `json:"heartbeat_interval" schema:"type:string,description:Heartbeat interval,category:basic,default:30s"`

	// RegistrationTTL is the time-to-live for registrations.
	RegistrationTTL string `json:"registration_ttl" schema:"type:string,description:Registration time-to-live,category:basic,default:5m"`

	// IdentityProvider specifies which identity provider to use.
	// Values: "local", "agntcy"
	IdentityProvider string `json:"identity_provider" schema:"type:string,description:Identity provider type,category:basic,default:local"`

	// OASFKVBucket is the KV bucket to watch for OASF records.
	OASFKVBucket string `json:"oasf_kv_bucket" schema:"type:string,description:KV bucket for OASF records,category:basic,default:OASF_RECORDS"`

	// RetryCount is the number of retries for failed registrations.
	RetryCount int `json:"retry_count" schema:"type:int,description:Number of registration retries,category:advanced,default:3"`

	// RetryDelay is the initial delay between retries.
	RetryDelay string `json:"retry_delay" schema:"type:string,description:Initial retry delay,category:advanced,default:1s"`

	// ConsumerNameSuffix adds a suffix to consumer names for uniqueness in tests.
	ConsumerNameSuffix string `json:"consumer_name_suffix" schema:"type:string,description:Suffix for consumer names,category:advanced"`

	// DeleteConsumerOnStop enables consumer cleanup on stop (for testing).
	DeleteConsumerOnStop bool `` /* 128-byte string literal not displayed */
}

Config defines the configuration for the directory bridge component.

func DefaultConfig

func DefaultConfig() Config

DefaultConfig returns the default configuration.

func (*Config) GetHeartbeatInterval

func (c *Config) GetHeartbeatInterval() time.Duration

GetHeartbeatInterval returns the heartbeat interval.

func (*Config) GetRegistrationTTL

func (c *Config) GetRegistrationTTL() time.Duration

GetRegistrationTTL returns the registration TTL.

func (*Config) GetRetryDelay

func (c *Config) GetRetryDelay() time.Duration

GetRetryDelay returns the retry delay.

func (*Config) Validate

func (c *Config) Validate() error

Validate validates the configuration.

type DeregistrationRequest

type DeregistrationRequest struct {
	// RegistrationID is the registration to remove.
	RegistrationID string `json:"registration_id"`

	// AgentDID is the agent's DID.
	AgentDID string `json:"agent_did"`
}

DeregistrationRequest represents a request to deregister an agent.

type DirectoryClient

type DirectoryClient struct {
	// contains filtered or unexported fields
}

DirectoryClient handles communication with the AGNTCY directory service.

func NewDirectoryClient

func NewDirectoryClient(baseURL string) *DirectoryClient

NewDirectoryClient creates a new directory client.

func (*DirectoryClient) Deregister

func (c *DirectoryClient) Deregister(ctx context.Context, req *DeregistrationRequest) error

Deregister removes an agent from the directory.

func (*DirectoryClient) Discover

Discover searches the directory for agents.

func (*DirectoryClient) Heartbeat

Heartbeat sends a heartbeat to renew a registration.

func (*DirectoryClient) Register

Register registers an agent with the directory.

type DiscoveredAgent

type DiscoveredAgent struct {
	// AgentDID is the agent's DID.
	AgentDID string `json:"agent_did"`

	// OASFRecord is the agent's OASF specification.
	OASFRecord *oasfgenerator.OASFRecord `json:"oasf_record"`

	// RegisteredAt is when the agent registered.
	RegisteredAt time.Time `json:"registered_at"`

	// ExpiresAt is when the registration expires.
	ExpiresAt time.Time `json:"expires_at"`
}

DiscoveredAgent represents an agent found in the directory.

type DiscoveryQuery

type DiscoveryQuery struct {
	// Capabilities filters by required capabilities.
	Capabilities []string `json:"capabilities,omitempty"`

	// Domains filters by domains.
	Domains []string `json:"domains,omitempty"`

	// Limit limits the number of results.
	Limit int `json:"limit,omitempty"`
}

DiscoveryQuery represents a search query for agents.

type DiscoveryResponse

type DiscoveryResponse struct {
	// Agents are the matching agents.
	Agents []DiscoveredAgent `json:"agents"`

	// Total is the total number of matches.
	Total int `json:"total"`
}

DiscoveryResponse contains discovered agents.

type HeartbeatRequest

type HeartbeatRequest struct {
	// RegistrationID is the registration to renew.
	RegistrationID string `json:"registration_id"`

	// AgentDID is the agent's DID.
	AgentDID string `json:"agent_did"`
}

HeartbeatRequest represents a registration renewal request.

type HeartbeatResponse

type HeartbeatResponse struct {
	// Success indicates if the heartbeat succeeded.
	Success bool `json:"success"`

	// ExpiresAt is the new expiration time.
	ExpiresAt time.Time `json:"expires_at,omitempty"`

	// Error contains error details if heartbeat failed.
	Error string `json:"error,omitempty"`
}

HeartbeatResponse represents the heartbeat response.

type MockDirectory

type MockDirectory struct {

	// Call counters for assertions
	RegisterCalls   int
	HeartbeatCalls  int
	DeregisterCalls int
	DiscoverCalls   int
	// contains filtered or unexported fields
}

MockDirectory provides a test mock for the AGNTCY directory service.

func NewMockDirectory

func NewMockDirectory() *MockDirectory

NewMockDirectory creates a new mock directory server.

func (*MockDirectory) Close

func (md *MockDirectory) Close()

Close shuts down the mock server.

func (*MockDirectory) GetRegistration

func (md *MockDirectory) GetRegistration(id string) *RegistrationRequest

GetRegistration returns a stored registration by ID.

func (*MockDirectory) RegistrationCount

func (md *MockDirectory) RegistrationCount() int

RegistrationCount returns the number of active registrations.

func (*MockDirectory) SetFailNextDeregister

func (md *MockDirectory) SetFailNextDeregister(fail bool)

SetFailNextDeregister makes the next deregister call fail.

func (*MockDirectory) SetFailNextHeartbeat

func (md *MockDirectory) SetFailNextHeartbeat(fail bool)

SetFailNextHeartbeat makes the next heartbeat call fail.

func (*MockDirectory) SetFailNextRegister

func (md *MockDirectory) SetFailNextRegister(fail bool)

SetFailNextRegister makes the next register call fail.

func (*MockDirectory) SetRegisterDelay

func (md *MockDirectory) SetRegisterDelay(d time.Duration)

SetRegisterDelay adds a delay to register calls.

func (*MockDirectory) URL

func (md *MockDirectory) URL() string

URL returns the mock server's URL.

type Registration

type Registration struct {
	// EntityID is the SemStreams entity ID.
	EntityID string `json:"entity_id"`

	// RegistrationID is the directory's registration ID.
	RegistrationID string `json:"registration_id"`

	// AgentDID is the agent's decentralized identifier.
	AgentDID string `json:"agent_did"`

	// OASFRecord is the agent's OASF specification.
	OASFRecord *oasfgenerator.OASFRecord `json:"oasf_record"`

	// RegisteredAt is when the registration was created.
	RegisteredAt time.Time `json:"registered_at"`

	// ExpiresAt is when the registration expires.
	ExpiresAt time.Time `json:"expires_at"`

	// LastHeartbeat is when the last heartbeat was sent.
	LastHeartbeat time.Time `json:"last_heartbeat"`

	// Retries is the number of registration retries.
	Retries int `json:"retries"`
}

Registration represents an active directory registration.

type RegistrationError

type RegistrationError struct {
	EntityID string
	Message  string
}

RegistrationError represents a registration failure.

func (*RegistrationError) Error

func (e *RegistrationError) Error() string

type RegistrationManager

type RegistrationManager struct {
	// contains filtered or unexported fields
}

RegistrationManager handles the lifecycle of agent registrations.

func NewRegistrationManager

func NewRegistrationManager(client *DirectoryClient, identityProvider identity.Provider, config Config, logger *slog.Logger) *RegistrationManager

NewRegistrationManager creates a new registration manager.

func (*RegistrationManager) Deregister

func (rm *RegistrationManager) Deregister(ctx context.Context, entityID string) error

Deregister removes an agent from the directory.

func (*RegistrationManager) GetRegistration

func (rm *RegistrationManager) GetRegistration(entityID string) *Registration

GetRegistration returns the registration for an entity.

func (*RegistrationManager) ListRegistrations

func (rm *RegistrationManager) ListRegistrations() []*Registration

ListRegistrations returns all active registrations.

func (*RegistrationManager) RegisterAgent

func (rm *RegistrationManager) RegisterAgent(ctx context.Context, entityID string, record *oasfgenerator.OASFRecord, agentIdentity *identity.AgentIdentity) error

RegisterAgent registers an agent with the directory.

func (*RegistrationManager) Start

func (rm *RegistrationManager) Start(ctx context.Context) error

Start begins the heartbeat goroutine.

func (*RegistrationManager) Stop

func (rm *RegistrationManager) Stop(ctx context.Context) error

Stop stops the heartbeat goroutine and deregisters all agents.

func (*RegistrationManager) UpdateRegistration

func (rm *RegistrationManager) UpdateRegistration(ctx context.Context, entityID string, record *oasfgenerator.OASFRecord) error

UpdateRegistration updates an existing registration with new OASF data.

type RegistrationRequest

type RegistrationRequest struct {
	// AgentDID is the agent's decentralized identifier.
	AgentDID string `json:"agent_did"`

	// OASFRecord is the agent's OASF specification.
	OASFRecord *oasfgenerator.OASFRecord `json:"oasf_record"`

	// TTL is the registration time-to-live in seconds.
	TTL int `json:"ttl,omitempty"`

	// Metadata contains additional registration metadata.
	Metadata map[string]any `json:"metadata,omitempty"`
}

RegistrationRequest represents a request to register an agent.

type RegistrationResponse

type RegistrationResponse struct {
	// Success indicates if registration succeeded.
	Success bool `json:"success"`

	// RegistrationID is the unique ID for this registration.
	RegistrationID string `json:"registration_id,omitempty"`

	// ExpiresAt is when the registration expires.
	ExpiresAt time.Time `json:"expires_at,omitempty"`

	// Error contains error details if registration failed.
	Error string `json:"error,omitempty"`
}

RegistrationResponse represents the directory's response to a registration.

type RegistryInterface

type RegistryInterface interface {
	RegisterWithConfig(component.RegistrationConfig) error
}

RegistryInterface defines the minimal interface needed for registration.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL