Documentation
¶
Overview ¶
Package directorybridge provides an output component that registers agents with AGNTCY directories using OASF (Open Agent Specification Framework) records.
Overview ¶
The directory-bridge component watches for OASF records in a NATS KV bucket and automatically registers/updates agents with AGNTCY directory services. It maintains registrations through periodic heartbeats and handles deregistration on shutdown.
Architecture ¶
┌─────────────────┐ ┌──────────────────┐ ┌─────────────────┐
│ OASF_RECORDS │────▶│ Directory │────▶│ AGNTCY │
│ KV Bucket │ │ Bridge │ │ Directory │
└─────────────────┘ └──────────────────┘ └─────────────────┘
│
▼
┌──────────────────┐
│ Identity │
│ Provider │
└──────────────────┘
Components ¶
The package consists of several key components:
- Component: The main LifecycleComponent that watches KV and orchestrates registration
- DirectoryClient: HTTP client for communicating with AGNTCY directory APIs
- RegistrationManager: Manages the lifecycle of agent registrations including heartbeats
- Config: Configuration for directory URL, heartbeat intervals, and retry settings
Configuration ¶
Example configuration:
{
"directory_url": "https://directory.agntcy.dev",
"heartbeat_interval": "30s",
"registration_ttl": "5m",
"identity_provider": "local",
"oasf_kv_bucket": "OASF_RECORDS",
"retry_count": 3,
"retry_delay": "1s"
}
Identity Integration ¶
The bridge uses the agentic/identity package to create or resolve DIDs for agents being registered. Supported identity providers:
- "local": Creates did:key identities locally
- "agntcy": Resolves identities through AGNTCY identity service (future)
NATS Topology ¶
Input:
- OASF_RECORDS KV bucket (watch): Receives OASF records from oasf-generator
Output:
- directory.registration.* (optional): Emits registration events
Registration Lifecycle ¶
- KV watcher detects new/updated OASF record
- Component parses record and extracts entity ID
- RegistrationManager creates/retrieves DID identity
- DirectoryClient sends registration request
- Registration stored with expiration time
- Heartbeat loop maintains registration before expiry
- On shutdown, all agents are deregistered
Usage ¶
Register the component with the component registry:
import directorybridge "github.com/c360studio/semstreams/output/directory-bridge"
func init() {
directorybridge.Register(registry)
}
See Also ¶
- processor/oasf-generator: Generates OASF records from entity predicates
- agentic/identity: DID and verifiable credential management
- docs/architecture/adr-019-agntcy-integration.md: Architecture decision record
Index ¶
- func NewComponent(rawConfig json.RawMessage, deps component.Dependencies) (component.Discoverable, error)
- func Register(registry RegistryInterface) error
- type Component
- func (c *Component) ConfigSchema() component.ConfigSchema
- func (c *Component) DataFlow() component.FlowMetrics
- func (c *Component) GetRegistrations() []*Registration
- func (c *Component) Health() component.HealthStatus
- func (c *Component) Initialize() error
- func (c *Component) InputPorts() []component.Port
- func (c *Component) Meta() component.Metadata
- func (c *Component) OutputPorts() []component.Port
- func (c *Component) Start(ctx context.Context) error
- func (c *Component) Stop(timeout time.Duration) error
- type Config
- type DeregistrationRequest
- type DirectoryClient
- func (c *DirectoryClient) Deregister(ctx context.Context, req *DeregistrationRequest) error
- func (c *DirectoryClient) Discover(ctx context.Context, query *DiscoveryQuery) (*DiscoveryResponse, error)
- func (c *DirectoryClient) Heartbeat(ctx context.Context, req *HeartbeatRequest) (*HeartbeatResponse, error)
- func (c *DirectoryClient) Register(ctx context.Context, req *RegistrationRequest) (*RegistrationResponse, error)
- type DiscoveredAgent
- type DiscoveryQuery
- type DiscoveryResponse
- type HeartbeatRequest
- type HeartbeatResponse
- type MockDirectory
- func (md *MockDirectory) Close()
- func (md *MockDirectory) GetRegistration(id string) *RegistrationRequest
- func (md *MockDirectory) RegistrationCount() int
- func (md *MockDirectory) SetFailNextDeregister(fail bool)
- func (md *MockDirectory) SetFailNextHeartbeat(fail bool)
- func (md *MockDirectory) SetFailNextRegister(fail bool)
- func (md *MockDirectory) SetRegisterDelay(d time.Duration)
- func (md *MockDirectory) URL() string
- type Registration
- type RegistrationError
- type RegistrationManager
- func (rm *RegistrationManager) Deregister(ctx context.Context, entityID string) error
- func (rm *RegistrationManager) GetRegistration(entityID string) *Registration
- func (rm *RegistrationManager) ListRegistrations() []*Registration
- func (rm *RegistrationManager) RegisterAgent(ctx context.Context, entityID string, record *oasfgenerator.OASFRecord, ...) error
- func (rm *RegistrationManager) Start(ctx context.Context) error
- func (rm *RegistrationManager) Stop(ctx context.Context) error
- func (rm *RegistrationManager) UpdateRegistration(ctx context.Context, entityID string, record *oasfgenerator.OASFRecord) error
- type RegistrationRequest
- type RegistrationResponse
- type RegistryInterface
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func NewComponent ¶
func NewComponent(rawConfig json.RawMessage, deps component.Dependencies) (component.Discoverable, error)
NewComponent creates a new directory bridge component.
func Register ¶
func Register(registry RegistryInterface) error
Register registers the directory bridge output component with the given registry.
Types ¶
type Component ¶
type Component struct {
// contains filtered or unexported fields
}
Component implements the directory bridge output component.
func (*Component) ConfigSchema ¶
func (c *Component) ConfigSchema() component.ConfigSchema
ConfigSchema returns the configuration schema.
func (*Component) DataFlow ¶
func (c *Component) DataFlow() component.FlowMetrics
DataFlow returns current data flow metrics.
func (*Component) GetRegistrations ¶
func (c *Component) GetRegistrations() []*Registration
GetRegistrations returns all active registrations.
func (*Component) Health ¶
func (c *Component) Health() component.HealthStatus
Health returns the current health status.
func (*Component) Initialize ¶
Initialize prepares the component.
func (*Component) InputPorts ¶
InputPorts returns configured input port definitions.
func (*Component) OutputPorts ¶
OutputPorts returns configured output port definitions.
type Config ¶
type Config struct {
// Ports defines the input/output port configuration.
Ports *component.PortConfig `json:"ports" schema:"type:ports,description:Port configuration,category:basic"`
// DirectoryURL is the AGNTCY directory service URL.
DirectoryURL string `json:"directory_url" schema:"type:string,description:AGNTCY directory service URL,category:basic"`
// HeartbeatInterval is how often to send heartbeats to the directory.
HeartbeatInterval string `json:"heartbeat_interval" schema:"type:string,description:Heartbeat interval,category:basic,default:30s"`
// RegistrationTTL is the time-to-live for registrations.
RegistrationTTL string `json:"registration_ttl" schema:"type:string,description:Registration time-to-live,category:basic,default:5m"`
// IdentityProvider specifies which identity provider to use.
// Values: "local", "agntcy"
IdentityProvider string `json:"identity_provider" schema:"type:string,description:Identity provider type,category:basic,default:local"`
// OASFKVBucket is the KV bucket to watch for OASF records.
OASFKVBucket string `json:"oasf_kv_bucket" schema:"type:string,description:KV bucket for OASF records,category:basic,default:OASF_RECORDS"`
// RetryCount is the number of retries for failed registrations.
RetryCount int `json:"retry_count" schema:"type:int,description:Number of registration retries,category:advanced,default:3"`
// RetryDelay is the initial delay between retries.
RetryDelay string `json:"retry_delay" schema:"type:string,description:Initial retry delay,category:advanced,default:1s"`
// ConsumerNameSuffix adds a suffix to consumer names for uniqueness in tests.
ConsumerNameSuffix string `json:"consumer_name_suffix" schema:"type:string,description:Suffix for consumer names,category:advanced"`
// DeleteConsumerOnStop enables consumer cleanup on stop (for testing).
DeleteConsumerOnStop bool `` /* 128-byte string literal not displayed */
}
Config defines the configuration for the directory bridge component.
func (*Config) GetHeartbeatInterval ¶
GetHeartbeatInterval returns the heartbeat interval.
func (*Config) GetRegistrationTTL ¶
GetRegistrationTTL returns the registration TTL.
func (*Config) GetRetryDelay ¶
GetRetryDelay returns the retry delay.
type DeregistrationRequest ¶
type DeregistrationRequest struct {
// RegistrationID is the registration to remove.
RegistrationID string `json:"registration_id"`
// AgentDID is the agent's DID.
AgentDID string `json:"agent_did"`
}
DeregistrationRequest represents a request to deregister an agent.
type DirectoryClient ¶
type DirectoryClient struct {
// contains filtered or unexported fields
}
DirectoryClient handles communication with the AGNTCY directory service.
func NewDirectoryClient ¶
func NewDirectoryClient(baseURL string) *DirectoryClient
NewDirectoryClient creates a new directory client.
func (*DirectoryClient) Deregister ¶
func (c *DirectoryClient) Deregister(ctx context.Context, req *DeregistrationRequest) error
Deregister removes an agent from the directory.
func (*DirectoryClient) Discover ¶
func (c *DirectoryClient) Discover(ctx context.Context, query *DiscoveryQuery) (*DiscoveryResponse, error)
Discover searches the directory for agents.
func (*DirectoryClient) Heartbeat ¶
func (c *DirectoryClient) Heartbeat(ctx context.Context, req *HeartbeatRequest) (*HeartbeatResponse, error)
Heartbeat sends a heartbeat to renew a registration.
func (*DirectoryClient) Register ¶
func (c *DirectoryClient) Register(ctx context.Context, req *RegistrationRequest) (*RegistrationResponse, error)
Register registers an agent with the directory.
type DiscoveredAgent ¶
type DiscoveredAgent struct {
// AgentDID is the agent's DID.
AgentDID string `json:"agent_did"`
// OASFRecord is the agent's OASF specification.
OASFRecord *oasfgenerator.OASFRecord `json:"oasf_record"`
// RegisteredAt is when the agent registered.
RegisteredAt time.Time `json:"registered_at"`
// ExpiresAt is when the registration expires.
ExpiresAt time.Time `json:"expires_at"`
}
DiscoveredAgent represents an agent found in the directory.
type DiscoveryQuery ¶
type DiscoveryQuery struct {
// Capabilities filters by required capabilities.
Capabilities []string `json:"capabilities,omitempty"`
// Domains filters by domains.
Domains []string `json:"domains,omitempty"`
// Limit limits the number of results.
Limit int `json:"limit,omitempty"`
}
DiscoveryQuery represents a search query for agents.
type DiscoveryResponse ¶
type DiscoveryResponse struct {
// Agents are the matching agents.
Agents []DiscoveredAgent `json:"agents"`
// Total is the total number of matches.
Total int `json:"total"`
}
DiscoveryResponse contains discovered agents.
type HeartbeatRequest ¶
type HeartbeatRequest struct {
// RegistrationID is the registration to renew.
RegistrationID string `json:"registration_id"`
// AgentDID is the agent's DID.
AgentDID string `json:"agent_did"`
}
HeartbeatRequest represents a registration renewal request.
type HeartbeatResponse ¶
type HeartbeatResponse struct {
// Success indicates if the heartbeat succeeded.
Success bool `json:"success"`
// ExpiresAt is the new expiration time.
ExpiresAt time.Time `json:"expires_at,omitempty"`
// Error contains error details if heartbeat failed.
Error string `json:"error,omitempty"`
}
HeartbeatResponse represents the heartbeat response.
type MockDirectory ¶
type MockDirectory struct {
// Call counters for assertions
RegisterCalls int
HeartbeatCalls int
DeregisterCalls int
DiscoverCalls int
// contains filtered or unexported fields
}
MockDirectory provides a test mock for the AGNTCY directory service.
func NewMockDirectory ¶
func NewMockDirectory() *MockDirectory
NewMockDirectory creates a new mock directory server.
func (*MockDirectory) GetRegistration ¶
func (md *MockDirectory) GetRegistration(id string) *RegistrationRequest
GetRegistration returns a stored registration by ID.
func (*MockDirectory) RegistrationCount ¶
func (md *MockDirectory) RegistrationCount() int
RegistrationCount returns the number of active registrations.
func (*MockDirectory) SetFailNextDeregister ¶
func (md *MockDirectory) SetFailNextDeregister(fail bool)
SetFailNextDeregister makes the next deregister call fail.
func (*MockDirectory) SetFailNextHeartbeat ¶
func (md *MockDirectory) SetFailNextHeartbeat(fail bool)
SetFailNextHeartbeat makes the next heartbeat call fail.
func (*MockDirectory) SetFailNextRegister ¶
func (md *MockDirectory) SetFailNextRegister(fail bool)
SetFailNextRegister makes the next register call fail.
func (*MockDirectory) SetRegisterDelay ¶
func (md *MockDirectory) SetRegisterDelay(d time.Duration)
SetRegisterDelay adds a delay to register calls.
func (*MockDirectory) URL ¶
func (md *MockDirectory) URL() string
URL returns the mock server's URL.
type Registration ¶
type Registration struct {
// EntityID is the SemStreams entity ID.
EntityID string `json:"entity_id"`
// RegistrationID is the directory's registration ID.
RegistrationID string `json:"registration_id"`
// AgentDID is the agent's decentralized identifier.
AgentDID string `json:"agent_did"`
// OASFRecord is the agent's OASF specification.
OASFRecord *oasfgenerator.OASFRecord `json:"oasf_record"`
// RegisteredAt is when the registration was created.
RegisteredAt time.Time `json:"registered_at"`
// ExpiresAt is when the registration expires.
ExpiresAt time.Time `json:"expires_at"`
// LastHeartbeat is when the last heartbeat was sent.
LastHeartbeat time.Time `json:"last_heartbeat"`
// Retries is the number of registration retries.
Retries int `json:"retries"`
}
Registration represents an active directory registration.
type RegistrationError ¶
RegistrationError represents a registration failure.
func (*RegistrationError) Error ¶
func (e *RegistrationError) Error() string
type RegistrationManager ¶
type RegistrationManager struct {
// contains filtered or unexported fields
}
RegistrationManager handles the lifecycle of agent registrations.
func NewRegistrationManager ¶
func NewRegistrationManager(client *DirectoryClient, identityProvider identity.Provider, config Config, logger *slog.Logger) *RegistrationManager
NewRegistrationManager creates a new registration manager.
func (*RegistrationManager) Deregister ¶
func (rm *RegistrationManager) Deregister(ctx context.Context, entityID string) error
Deregister removes an agent from the directory.
func (*RegistrationManager) GetRegistration ¶
func (rm *RegistrationManager) GetRegistration(entityID string) *Registration
GetRegistration returns the registration for an entity.
func (*RegistrationManager) ListRegistrations ¶
func (rm *RegistrationManager) ListRegistrations() []*Registration
ListRegistrations returns all active registrations.
func (*RegistrationManager) RegisterAgent ¶
func (rm *RegistrationManager) RegisterAgent(ctx context.Context, entityID string, record *oasfgenerator.OASFRecord, agentIdentity *identity.AgentIdentity) error
RegisterAgent registers an agent with the directory.
func (*RegistrationManager) Start ¶
func (rm *RegistrationManager) Start(ctx context.Context) error
Start begins the heartbeat goroutine.
func (*RegistrationManager) Stop ¶
func (rm *RegistrationManager) Stop(ctx context.Context) error
Stop stops the heartbeat goroutine and deregisters all agents.
func (*RegistrationManager) UpdateRegistration ¶
func (rm *RegistrationManager) UpdateRegistration(ctx context.Context, entityID string, record *oasfgenerator.OASFRecord) error
UpdateRegistration updates an existing registration with new OASF data.
type RegistrationRequest ¶
type RegistrationRequest struct {
// AgentDID is the agent's decentralized identifier.
AgentDID string `json:"agent_did"`
// OASFRecord is the agent's OASF specification.
OASFRecord *oasfgenerator.OASFRecord `json:"oasf_record"`
// TTL is the registration time-to-live in seconds.
TTL int `json:"ttl,omitempty"`
// Metadata contains additional registration metadata.
Metadata map[string]any `json:"metadata,omitempty"`
}
RegistrationRequest represents a request to register an agent.
type RegistrationResponse ¶
type RegistrationResponse struct {
// Success indicates if registration succeeded.
Success bool `json:"success"`
// RegistrationID is the unique ID for this registration.
RegistrationID string `json:"registration_id,omitempty"`
// ExpiresAt is when the registration expires.
ExpiresAt time.Time `json:"expires_at,omitempty"`
// Error contains error details if registration failed.
Error string `json:"error,omitempty"`
}
RegistrationResponse represents the directory's response to a registration.
type RegistryInterface ¶
type RegistryInterface interface {
RegisterWithConfig(component.RegistrationConfig) error
}
RegistryInterface defines the minimal interface needed for registration.