Documentation
¶
Overview ¶
Package directorybridge provides an output component that registers agents with AGNTCY directories using OASF (Open Agent Specification Framework) records.
Overview ¶
The directory-bridge component watches for OASF records in a NATS KV bucket and automatically registers/updates agents with AGNTCY directory services. It maintains registrations through periodic heartbeats and handles deregistration on shutdown.
Architecture ¶
┌─────────────────┐ ┌──────────────────┐ ┌─────────────────┐
│ OASF_RECORDS │────▶│ Directory │────▶│ AGNTCY │
│ KV Bucket │ │ Bridge │ │ Directory │
└─────────────────┘ └──────────────────┘ └─────────────────┘
│
▼
┌──────────────────┐
│ Identity │
│ Provider │
└──────────────────┘
Components ¶
The package consists of several key components:
- Component: The main LifecycleComponent that watches KV and orchestrates registration
- DirectoryClient: HTTP client for communicating with AGNTCY directory APIs
- RegistrationManager: Manages the lifecycle of agent registrations including heartbeats
- Config: Configuration for directory URL, heartbeat intervals, and retry settings
Configuration ¶
Example configuration:
{
"directory_url": "https://directory.agntcy.dev",
"heartbeat_interval": "30s",
"registration_ttl": "5m",
"identity_provider": "local",
"oasf_kv_bucket": "OASF_RECORDS",
"retry_count": 3,
"retry_delay": "1s"
}
Identity Integration ¶
The bridge uses the agentic/identity package to create or resolve DIDs for agents being registered. Supported identity providers:
- "local": Creates did:key identities locally
- "agntcy": Resolves identities through AGNTCY identity service (future)
NATS Topology ¶
Input:
- OASF_RECORDS KV bucket (watch): Receives OASF records from oasf-generator
Output:
- directory.registration.* (optional): Emits registration events
Registration Lifecycle ¶
- KV watcher detects new/updated OASF record
- Component parses record and extracts entity ID
- RegistrationManager creates/retrieves DID identity
- DirectoryClient sends registration request
- Registration stored with expiration time
- Heartbeat loop maintains registration before expiry
- On shutdown, all agents are deregistered
Usage ¶
Register the component with the component registry:
import directorybridge "github.com/c360studio/semstreams/output/directory-bridge"
func init() {
directorybridge.Register(registry)
}
See Also ¶
- processor/oasf-generator: Generates OASF records from entity predicates
- agentic/identity: DID and verifiable credential management
- docs/architecture/adr-019-agntcy-integration.md: Architecture decision record
Index ¶
- Constants
- Variables
- func NewComponent(rawConfig json.RawMessage, deps component.Dependencies) (component.Discoverable, error)
- func Register(registry RegistryInterface) error
- type AgntcyGRPCConfig
- type AuthConfig
- type AuthProvider
- type Backend
- type Component
- func (c *Component) ConfigSchema() component.ConfigSchema
- func (c *Component) DataFlow() component.FlowMetrics
- func (c *Component) GetRegistrations() []*Registration
- func (c *Component) Health() component.HealthStatus
- func (c *Component) Initialize() error
- func (c *Component) InputPorts() []component.Port
- func (c *Component) Meta() component.Metadata
- func (c *Component) OutputPorts() []component.Port
- func (c *Component) Start(ctx context.Context) error
- func (c *Component) Stop(timeout time.Duration) error
- type Config
- type DeregistrationRequest
- type DirectoryClient
- func (c *DirectoryClient) Deregister(ctx context.Context, req *DeregistrationRequest) error
- func (c *DirectoryClient) Discover(ctx context.Context, query *DiscoveryQuery) (*DiscoveryResponse, error)
- func (c *DirectoryClient) Heartbeat(ctx context.Context, req *HeartbeatRequest) (*HeartbeatResponse, error)
- func (c *DirectoryClient) Register(ctx context.Context, req *RegistrationRequest) (*RegistrationResponse, error)
- type DiscoveredAgent
- type DiscoveryQuery
- type DiscoveryResponse
- type GRPCBackend
- func (b *GRPCBackend) Close() error
- func (b *GRPCBackend) Publish(ctx context.Context, req *PublishRequest) (*PublishResult, error)
- func (b *GRPCBackend) Refresh(_ context.Context, req *RefreshRequest) (*PublishResult, error)
- func (b *GRPCBackend) Withdraw(ctx context.Context, req *WithdrawRequest) error
- type HTTPBackend
- func (b *HTTPBackend) Close() error
- func (b *HTTPBackend) Publish(ctx context.Context, req *PublishRequest) (*PublishResult, error)
- func (b *HTTPBackend) Refresh(ctx context.Context, req *RefreshRequest) (*PublishResult, error)
- func (b *HTTPBackend) Withdraw(ctx context.Context, req *WithdrawRequest) error
- type HeartbeatRequest
- type HeartbeatResponse
- type MockDirectory
- func (md *MockDirectory) Close()
- func (md *MockDirectory) GetRegistration(id string) *RegistrationRequest
- func (md *MockDirectory) RegistrationCount() int
- func (md *MockDirectory) SetFailNextDeregister(fail bool)
- func (md *MockDirectory) SetFailNextHeartbeat(fail bool)
- func (md *MockDirectory) SetFailNextRegister(fail bool)
- func (md *MockDirectory) SetRegisterDelay(d time.Duration)
- func (md *MockDirectory) URL() string
- type NoOpAuthProvider
- type OIDCAuthProvider
- type PublishRequest
- type PublishResult
- type RefreshRequest
- type Registration
- type RegistrationError
- type RegistrationManager
- func (rm *RegistrationManager) Deregister(ctx context.Context, entityID string) error
- func (rm *RegistrationManager) GetRegistration(entityID string) *Registration
- func (rm *RegistrationManager) ListRegistrations() []*Registration
- func (rm *RegistrationManager) RegisterAgent(ctx context.Context, entityID string, record *oasfgenerator.OASFRecord, ...) error
- func (rm *RegistrationManager) Start(ctx context.Context) error
- func (rm *RegistrationManager) Stop(ctx context.Context) error
- func (rm *RegistrationManager) UpdateRegistration(ctx context.Context, entityID string, record *oasfgenerator.OASFRecord) error
- type RegistrationRequest
- type RegistrationResponse
- type RegistryInterface
- type WithdrawRequest
Constants ¶
const ( // BackendHTTP is the default — POST JSON to a SemStreams HTTP directory. BackendHTTP = "http" // BackendAgntcyGRPC is the AGNTCY agntcy/dir StoreService over gRPC. BackendAgntcyGRPC = "agntcy_grpc" )
Backend constants accepted by Config.Backend.
Variables ¶
var ErrRefreshNotSupported = errors.New("agntcy_grpc backend: refresh has no wire counterpart (records are CID-anchored)")
ErrRefreshNotSupported is returned by GRPCBackend.Refresh — the agntcy/dir StoreService stores content-addressed records (CIDs) that do not expire on the publisher side, so heartbeat-style refresh has no wire counterpart. The bridge's RegistrationManager already skips the Refresh path when ExpiresAt.IsZero() (set by Publish for this backend), so this sentinel only surfaces if a caller bypasses the manager and invokes Refresh directly. Use errors.Is to test for it.
Functions ¶
func NewComponent ¶
func NewComponent(rawConfig json.RawMessage, deps component.Dependencies) (component.Discoverable, error)
NewComponent creates a new directory bridge component.
func Register ¶
func Register(registry RegistryInterface) error
Register registers the directory bridge output component with the given registry.
Types ¶
type AgntcyGRPCConfig ¶
type AgntcyGRPCConfig struct {
// Endpoint is the host:port the gRPC client dials.
// Example: "prod.api.ads.outshift.io:443".
Endpoint string `json:"endpoint" schema:"type:string,description:gRPC endpoint host:port,category:basic"`
// TLS controls whether the client establishes TLS on dial. The
// hosted hub requires TLS (true); local dev / bufconn tests use
// false. When false the client uses insecure transport credentials
// (grpc.WithTransportCredentials(insecure.NewCredentials())) —
// suitable only for trusted networks and local development.
TLS bool `json:"tls" schema:"type:bool,description:Establish TLS on dial (required for the hosted hub),category:basic"`
// Auth carries optional per-RPC OIDC authentication. nil or
// Type=="none" disables auth (suitable for local dev / private
// deployments). The hosted hub generally requires Type=="oidc".
Auth *AuthConfig `json:"auth,omitempty" schema:"type:object,description:Per-RPC OIDC auth (omit for unauthenticated),category:basic"`
}
AgntcyGRPCConfig carries settings for the agntcy_grpc backend.
func (*AgntcyGRPCConfig) Validate ¶
func (g *AgntcyGRPCConfig) Validate() error
Validate validates an AgntcyGRPCConfig.
type AuthConfig ¶
type AuthConfig struct {
// Type selects the auth flow. One of "none" or "oidc". Empty
// defaults to "none".
Type string `json:"type" schema:"type:string,description:Auth flow (none or oidc),category:basic,default:none"`
// Issuer is the OIDC token endpoint URL (the `token_endpoint` from
// the issuer's OIDC discovery document). Required when Type=="oidc".
Issuer string `json:"issuer,omitempty" schema:"type:string,description:OIDC token endpoint URL,category:basic"`
// ClientID is the OIDC client identifier. Prefer ClientIDEnv for
// secret management hygiene. Required when Type=="oidc" if
// ClientIDEnv is not set.
ClientID string `` /* 128-byte string literal not displayed */
// ClientIDEnv names an environment variable to read the OIDC
// client_id from at runtime. Wins over ClientID when both set.
ClientIDEnv string `` /* 129-byte string literal not displayed */
// ClientSecretEnv names an environment variable to read the OIDC
// client_secret from at runtime. Required when Type=="oidc".
// No inline client_secret field — secrets do not live in JSON
// config on disk; this is intentional.
ClientSecretEnv string `` /* 152-byte string literal not displayed */
// Scopes is the OIDC scope list requested at token-endpoint time.
Scopes []string `json:"scopes,omitempty" schema:"type:array,description:OIDC scope list,category:basic"`
}
AuthConfig configures per-RPC authentication for the agntcy_grpc backend. The only auth flow supported today is OIDC client credentials (the deployment pattern AGNTCY's hosted hub uses).
func (*AuthConfig) Validate ¶
func (a *AuthConfig) Validate() error
Validate validates an AuthConfig.
type AuthProvider ¶
type AuthProvider interface {
// PerRPC returns the per-RPC credentials to attach to gRPC calls, or
// nil for "no auth". The bridge feeds the result into
// grpc.WithPerRPCCredentials at dial time; a nil return is the
// signal to omit that DialOption.
PerRPC() credentials.PerRPCCredentials
// Close releases any provider-held resources (token caches, HTTP
// clients). Providers without resources to release implement this
// as a no-op.
Close() error
}
AuthProvider supplies per-RPC credentials for the agntcy_grpc backend. Implementations either source a bearer token (OIDC) or signal that no authentication should be attached to outgoing RPCs (NoOp).
The provider lifecycle parallels the Backend's: the bridge constructs one provider per Component.Initialize and closes it via Close at Component.Stop. Token caching / refresh lives inside the provider — callers see only the wrapped credentials.PerRPCCredentials.
func NewAuthProvider ¶
func NewAuthProvider(cfg *AuthConfig) (AuthProvider, error)
NewAuthProvider constructs an AuthProvider from the given config. A nil config or Type "" / "none" returns the NoOp provider. "oidc" reads the client_id and client_secret from the configured env vars (or inline field) and returns an OIDC client-credentials provider that auto-refreshes tokens.
type Backend ¶
type Backend interface {
// Publish registers (or re-registers) an agent record with the directory.
// Returns the backend-assigned RecordID, which the bridge stores so it
// can later Refresh or Withdraw without re-deriving it.
Publish(ctx context.Context, req *PublishRequest) (*PublishResult, error)
// Refresh extends the lifetime of a previously published record. The
// HTTP backend implements this as a heartbeat POST; future gRPC
// backends may push a renewed CID-anchored record.
Refresh(ctx context.Context, req *RefreshRequest) (*PublishResult, error)
// Withdraw removes a previously published record. Called when the
// source entity is deleted from ENTITY_STATES, and on component Stop
// for every active registration.
Withdraw(ctx context.Context, req *WithdrawRequest) error
// Close releases any backend-held resources (open connections, pools,
// auth token caches). Called once during component Stop. Backends
// without resources to release implement this as a no-op.
Close() error
}
Backend abstracts the wire-level publish/withdraw operations against a directory. The directory-bridge component owns lifecycle, retry, KV watching, and heartbeat scheduling; backends own protocol, encoding, and authentication.
All methods must be safe for concurrent use — the heartbeat loop and the KV watcher goroutine call them concurrently.
Publish must be idempotent. The bridge calls it on every KV change AND when re-registering after a Refresh that returns a new RecordID. Backends that natively support upsert (HTTP POST /v1/agents in our v1 spec; gRPC PushReferrer in agntcy/dir) satisfy this trivially. Backends that don't must internally deduplicate by EntityID.
type Component ¶
type Component struct {
// contains filtered or unexported fields
}
Component implements the directory bridge output component.
func (*Component) ConfigSchema ¶
func (c *Component) ConfigSchema() component.ConfigSchema
ConfigSchema returns the configuration schema.
func (*Component) DataFlow ¶
func (c *Component) DataFlow() component.FlowMetrics
DataFlow returns current data flow metrics.
func (*Component) GetRegistrations ¶
func (c *Component) GetRegistrations() []*Registration
GetRegistrations returns all active registrations.
func (*Component) Health ¶
func (c *Component) Health() component.HealthStatus
Health returns the current health status.
func (*Component) Initialize ¶
Initialize prepares the component, selecting a Backend implementation based on config.Backend. Empty / "http" selects the legacy HTTP wire format; "agntcy_grpc" selects the AGNTCY gRPC StoreService wire format with optional per-RPC OIDC auth.
func (*Component) InputPorts ¶
InputPorts returns configured input port definitions.
func (*Component) OutputPorts ¶
OutputPorts returns configured output port definitions.
type Config ¶
type Config struct {
// Ports defines the input/output port configuration.
Ports *component.PortConfig `json:"ports" schema:"type:ports,description:Port configuration,category:basic"`
// Backend selects the directory wire format. One of:
// - "http" (default) — POST /v1/agents JSON to DirectoryURL.
// The legacy SemStreams-defined HTTP protocol that the in-tree
// mock and any privately hosted HTTP directory speak.
// - "agntcy_grpc" — AGNTCY agntcy/dir StoreService over gRPC.
// Use this for the hosted hub at prod.api.ads.outshift.io and
// for any AGNTCY-conformant directory.
// Empty defaults to "http" for backwards compatibility with
// pre-PR-C configs.
Backend string `` /* 127-byte string literal not displayed */
// DirectoryURL is the HTTP backend's directory service URL.
// Ignored when Backend == "agntcy_grpc" — that path uses AgntcyGRPC.Endpoint.
DirectoryURL string `json:"directory_url" schema:"type:string,description:AGNTCY directory service URL (HTTP backend only),category:basic"`
// AgntcyGRPC carries the agntcy_grpc backend's settings. Ignored
// when Backend is not "agntcy_grpc".
AgntcyGRPC *AgntcyGRPCConfig `json:"agntcy_grpc,omitempty" schema:"type:object,description:agntcy_grpc backend settings,category:basic"`
// HeartbeatInterval is how often to send heartbeats to the directory.
HeartbeatInterval string `json:"heartbeat_interval" schema:"type:string,description:Heartbeat interval,category:basic,default:30s"`
// RegistrationTTL is the time-to-live for registrations.
RegistrationTTL string `json:"registration_ttl" schema:"type:string,description:Registration time-to-live,category:basic,default:5m"`
// IdentityProvider specifies which identity provider to use.
// Values: "local", "agntcy"
IdentityProvider string `json:"identity_provider" schema:"type:string,description:Identity provider type,category:basic,default:local"`
// OASFKVBucket is the KV bucket to watch for OASF records.
OASFKVBucket string `json:"oasf_kv_bucket" schema:"type:string,description:KV bucket for OASF records,category:basic,default:OASF_RECORDS"`
// RetryCount is the number of retries for failed registrations.
RetryCount int `json:"retry_count" schema:"type:int,description:Number of registration retries,category:advanced,default:3"`
// RetryDelay is the initial delay between retries.
RetryDelay string `json:"retry_delay" schema:"type:string,description:Initial retry delay,category:advanced,default:1s"`
// ConsumerNameSuffix adds a suffix to consumer names for uniqueness in tests.
ConsumerNameSuffix string `json:"consumer_name_suffix" schema:"type:string,description:Suffix for consumer names,category:advanced"`
// DeleteConsumerOnStop enables consumer cleanup on stop (for testing).
DeleteConsumerOnStop bool `` /* 128-byte string literal not displayed */
}
Config defines the configuration for the directory bridge component.
func (*Config) GetHeartbeatInterval ¶
GetHeartbeatInterval returns the heartbeat interval.
func (*Config) GetRegistrationTTL ¶
GetRegistrationTTL returns the registration TTL.
func (*Config) GetRetryDelay ¶
GetRetryDelay returns the retry delay.
type DeregistrationRequest ¶
type DeregistrationRequest struct {
// RegistrationID is the registration to remove.
RegistrationID string `json:"registration_id"`
// AgentDID is the agent's DID.
AgentDID string `json:"agent_did"`
}
DeregistrationRequest represents a request to deregister an agent.
type DirectoryClient ¶
type DirectoryClient struct {
// contains filtered or unexported fields
}
DirectoryClient handles communication with the AGNTCY directory service.
func NewDirectoryClient ¶
func NewDirectoryClient(baseURL string) *DirectoryClient
NewDirectoryClient creates a new directory client.
func (*DirectoryClient) Deregister ¶
func (c *DirectoryClient) Deregister(ctx context.Context, req *DeregistrationRequest) error
Deregister removes an agent from the directory.
func (*DirectoryClient) Discover ¶
func (c *DirectoryClient) Discover(ctx context.Context, query *DiscoveryQuery) (*DiscoveryResponse, error)
Discover searches the directory for agents.
func (*DirectoryClient) Heartbeat ¶
func (c *DirectoryClient) Heartbeat(ctx context.Context, req *HeartbeatRequest) (*HeartbeatResponse, error)
Heartbeat sends a heartbeat to renew a registration.
func (*DirectoryClient) Register ¶
func (c *DirectoryClient) Register(ctx context.Context, req *RegistrationRequest) (*RegistrationResponse, error)
Register registers an agent with the directory.
type DiscoveredAgent ¶
type DiscoveredAgent struct {
// AgentDID is the agent's DID.
AgentDID string `json:"agent_did"`
// OASFRecord is the agent's OASF specification.
OASFRecord *oasfgenerator.OASFRecord `json:"oasf_record"`
// RegisteredAt is when the agent registered.
RegisteredAt time.Time `json:"registered_at"`
// ExpiresAt is when the registration expires.
ExpiresAt time.Time `json:"expires_at"`
}
DiscoveredAgent represents an agent found in the directory.
type DiscoveryQuery ¶
type DiscoveryQuery struct {
// Capabilities filters by required capabilities.
Capabilities []string `json:"capabilities,omitempty"`
// Domains filters by domains.
Domains []string `json:"domains,omitempty"`
// Limit limits the number of results.
Limit int `json:"limit,omitempty"`
}
DiscoveryQuery represents a search query for agents.
type DiscoveryResponse ¶
type DiscoveryResponse struct {
// Agents are the matching agents.
Agents []DiscoveredAgent `json:"agents"`
// Total is the total number of matches.
Total int `json:"total"`
}
DiscoveryResponse contains discovered agents.
type GRPCBackend ¶
type GRPCBackend struct {
// contains filtered or unexported fields
}
GRPCBackend implements Backend by talking to an AGNTCY-compatible directory over the agntcy/dir gRPC StoreService.
The wire model differs from the HTTP backend in a few ways the rest of the bridge needs to understand:
- Records are content-addressed. PublishResult.RecordID is the CID the server returns from Push; it never changes for an unchanged record.
- There is no expiry. PublishResult.ExpiresAt is always zero. The bridge's heartbeat scheduler is documented to skip refresh when ExpiresAt.IsZero() (see backend.go contract), so Refresh here is a no-op that simply echoes the RecordID.
- Push and Delete are streaming RPCs. We send exactly one record/ref per call because the bridge invokes us one entity at a time. The ceremony around streams is local to this file.
Authentication is not handled here. Operators pass grpc.DialOption values (TLS credentials, per-RPC OIDC tokens) at construction; this keeps auth strategies pluggable without growing the backend's surface.
func NewGRPCBackend ¶
func NewGRPCBackend(target string, opts ...grpc.DialOption) (*GRPCBackend, error)
NewGRPCBackend dials the directory at target and returns a backend ready to Publish/Withdraw. Caller is responsible for closing it via Close().
Pass grpc.WithTransportCredentials(...) for TLS; for the hosted hub at prod.api.ads.outshift.io that means real cert validation (credentials.NewTLS(nil) covers the default OS root pool case). For local dev / bufconn tests, pass grpc.WithTransportCredentials(insecure.NewCredentials()) — from google.golang.org/grpc/credentials/insecure. Production wire-up uses the NewGRPCBackendWithAuth constructor and the buildGRPCDialOptions helper in auth.go to assemble both transport credentials and per-RPC OIDC auth in one shot.
func NewGRPCBackendFromClient ¶
func NewGRPCBackendFromClient(conn *grpc.ClientConn) *GRPCBackend
NewGRPCBackendFromClient is the test-side constructor: callers (notably the bufconn tests) build their own *grpc.ClientConn and inject it. Close() will still tear down the connection — tests that want to share a conn across backends should build one backend or call Close themselves.
func NewGRPCBackendWithAuth ¶
func NewGRPCBackendWithAuth(target string, auth AuthProvider, dialOpts ...grpc.DialOption) (*GRPCBackend, error)
NewGRPCBackendWithAuth is the production constructor used by Component.Initialize. It dials target with the supplied DialOptions and binds the AuthProvider to the backend's lifecycle so Close() releases both the gRPC connection and any provider-held resources (OIDC token cache HTTP clients, etc.).
The dialOpts slice is expected to already include transport credentials (TLS or insecure) and any grpc.WithPerRPCCredentials needed for auth; see buildGRPCDialOptions in auth.go for the canonical construction.
Note on error semantics: grpc.NewClient is non-blocking — it only validates dial arguments and returns. Connection establishment happens lazily on the first RPC. As a result, unreachable hosts / DNS failures / TLS handshake errors will not surface here; the first Publish or Withdraw call returns them instead. The error returned by this constructor reflects argument-validation failures only.
func (*GRPCBackend) Close ¶
func (b *GRPCBackend) Close() error
Close tears down the underlying gRPC client connection and releases any bound AuthProvider resources. Safe to call once; subsequent calls return whatever grpc.ClientConn.Close returns (typically nil even when already closed).
func (*GRPCBackend) Publish ¶
func (b *GRPCBackend) Publish(ctx context.Context, req *PublishRequest) (*PublishResult, error)
Publish marshals the OASF record into a structpb.Struct, drives one round of the StoreService.Push bidi stream, and returns the server-assigned CID. ExpiresAt is intentionally zero — see type comment.
func (*GRPCBackend) Refresh ¶
func (b *GRPCBackend) Refresh(_ context.Context, req *RefreshRequest) (*PublishResult, error)
Refresh returns ErrRefreshNotSupported. The bridge's RegistrationManager skips this code path for CID-anchored backends (Publish returns zero ExpiresAt, and sendHeartbeats short-circuits on IsZero), so this method only fires when a caller bypasses the manager and invokes Refresh directly. Pre-PR-C behaviour was to echo the RecordID back silently — the typed sentinel surfaces the misuse loudly, per the go-reviewer pass on PR #70.
func (*GRPCBackend) Withdraw ¶
func (b *GRPCBackend) Withdraw(ctx context.Context, req *WithdrawRequest) error
Withdraw drives the StoreService.Delete client-streaming RPC for one RecordRef and waits for the Empty response.
type HTTPBackend ¶
type HTTPBackend struct {
// contains filtered or unexported fields
}
HTTPBackend implements Backend by delegating to the in-package DirectoryClient, which speaks the SemStreams-defined HTTP/JSON directory protocol (POST /v1/agents, POST /v1/agents/{id}/heartbeat, DELETE /v1/agents/{id}).
This is the only backend wired today; the test mock and any privately hosted directory server speak this protocol. A second backend implementing the AGNTCY agntcy/dir gRPC service is planned but not yet in tree.
func NewHTTPBackend ¶
func NewHTTPBackend(baseURL string) *HTTPBackend
NewHTTPBackend constructs a backend pointing at baseURL. baseURL must be non-empty; the underlying client returns errors at call time if it is, preserving the original behavior.
func NewHTTPBackendFromClient ¶
func NewHTTPBackendFromClient(client *DirectoryClient) *HTTPBackend
NewHTTPBackendFromClient lets callers (notably tests) inject a pre-configured *DirectoryClient. Keeps the existing DirectoryClient unit tests usable as-is.
func (*HTTPBackend) Close ¶
func (b *HTTPBackend) Close() error
Close is a no-op for the HTTP backend — net/http's Transport pools idle connections internally and reclaims them on GC. No backend-owned resources to release.
func (*HTTPBackend) Publish ¶
func (b *HTTPBackend) Publish(ctx context.Context, req *PublishRequest) (*PublishResult, error)
Publish translates the domain request into the HTTP RegistrationRequest and adapts the response back. A non-success response from a 2xx status is treated as an error so the bridge can retry the same way it did before the refactor.
func (*HTTPBackend) Refresh ¶
func (b *HTTPBackend) Refresh(ctx context.Context, req *RefreshRequest) (*PublishResult, error)
Refresh sends a heartbeat to extend the registration's lifetime.
func (*HTTPBackend) Withdraw ¶
func (b *HTTPBackend) Withdraw(ctx context.Context, req *WithdrawRequest) error
Withdraw deregisters a record by ID.
type HeartbeatRequest ¶
type HeartbeatRequest struct {
// RegistrationID is the registration to renew.
RegistrationID string `json:"registration_id"`
// AgentDID is the agent's DID.
AgentDID string `json:"agent_did"`
}
HeartbeatRequest represents a registration renewal request.
type HeartbeatResponse ¶
type HeartbeatResponse struct {
// Success indicates if the heartbeat succeeded.
Success bool `json:"success"`
// ExpiresAt is the new expiration time.
ExpiresAt time.Time `json:"expires_at,omitempty"`
// Error contains error details if heartbeat failed.
Error string `json:"error,omitempty"`
}
HeartbeatResponse represents the heartbeat response.
type MockDirectory ¶
type MockDirectory struct {
// Call counters for assertions
RegisterCalls int
HeartbeatCalls int
DeregisterCalls int
DiscoverCalls int
// contains filtered or unexported fields
}
MockDirectory provides a test mock for the AGNTCY directory service.
func NewMockDirectory ¶
func NewMockDirectory() *MockDirectory
NewMockDirectory creates a new mock directory server.
func (*MockDirectory) GetRegistration ¶
func (md *MockDirectory) GetRegistration(id string) *RegistrationRequest
GetRegistration returns a stored registration by ID.
func (*MockDirectory) RegistrationCount ¶
func (md *MockDirectory) RegistrationCount() int
RegistrationCount returns the number of active registrations.
func (*MockDirectory) SetFailNextDeregister ¶
func (md *MockDirectory) SetFailNextDeregister(fail bool)
SetFailNextDeregister makes the next deregister call fail.
func (*MockDirectory) SetFailNextHeartbeat ¶
func (md *MockDirectory) SetFailNextHeartbeat(fail bool)
SetFailNextHeartbeat makes the next heartbeat call fail.
func (*MockDirectory) SetFailNextRegister ¶
func (md *MockDirectory) SetFailNextRegister(fail bool)
SetFailNextRegister makes the next register call fail.
func (*MockDirectory) SetRegisterDelay ¶
func (md *MockDirectory) SetRegisterDelay(d time.Duration)
SetRegisterDelay adds a delay to register calls.
func (*MockDirectory) URL ¶
func (md *MockDirectory) URL() string
URL returns the mock server's URL.
type NoOpAuthProvider ¶
type NoOpAuthProvider struct{}
NoOpAuthProvider attaches no credentials. Suitable for local dev, bufconn tests, and privately hosted directories that don't gate on auth. The hosted hub at prod.api.ads.outshift.io generally does, so production deployments swap this for OIDCAuthProvider.
func (NoOpAuthProvider) Close ¶
func (NoOpAuthProvider) Close() error
Close is a no-op for NoOpAuthProvider.
func (NoOpAuthProvider) PerRPC ¶
func (NoOpAuthProvider) PerRPC() credentials.PerRPCCredentials
PerRPC returns nil — the bridge skips grpc.WithPerRPCCredentials.
type OIDCAuthProvider ¶
type OIDCAuthProvider struct {
// contains filtered or unexported fields
}
OIDCAuthProvider sources per-RPC bearer tokens from an OIDC issuer via the client-credentials flow. Tokens are cached and auto-refreshed by the underlying clientcredentials.Config.TokenSource, so callers don't need to manage lifetimes — each PerRPC().GetRequestMetadata call returns whatever the cached source hands out.
func (*OIDCAuthProvider) Close ¶
func (p *OIDCAuthProvider) Close() error
Close is a no-op — clientcredentials.Config.TokenSource uses net/http's default transport, which is reclaimed by GC.
func (*OIDCAuthProvider) PerRPC ¶
func (p *OIDCAuthProvider) PerRPC() credentials.PerRPCCredentials
PerRPC wraps the OAuth2 token source as gRPC PerRPCCredentials.
type PublishRequest ¶
type PublishRequest struct {
// EntityID is the SemStreams entity ID for the agent. Backends may
// echo it in protocol-specific metadata so the directory can correlate
// records back to their source system.
EntityID string
// AgentDID is the agent's decentralized identifier, generated by the
// identity provider before this call. Required.
AgentDID string
// Record is the OASF document to publish. The bridge guarantees it is
// non-nil and has passed oasfgenerator.OASFRecord.Validate.
Record *oasfgenerator.OASFRecord
// TTL is the requested lifetime of the registration. The backend may
// honor it exactly, clamp it to a server-side maximum, or ignore it
// (CID-anchored backends don't expire on the publisher side).
TTL time.Duration
// Metadata is opaque key/value data forwarded by the bridge. Today the
// bridge sets {"semstreams_entity_id": EntityID, "source": "semstreams"};
// future callers may add lineage or trace context.
Metadata map[string]any
}
PublishRequest is the bridge → backend contract for adding or replacing an agent record. Domain-shaped: backends translate to their wire format.
type PublishResult ¶
type PublishResult struct {
// RecordID is the backend-assigned identifier for this publication.
// - HTTP backend: server-issued registration_id
// - future gRPC backend: the OASF record CID
// Persisted by the bridge so subsequent Refresh/Withdraw can address
// the same record.
RecordID string
// ExpiresAt is when the bridge should next refresh. Zero means the
// backend does not expire records on its own (CID-anchored or
// long-lived OCI artifact); the bridge skips heartbeat scheduling in
// that case.
ExpiresAt time.Time
}
PublishResult is what the backend returns after a successful publish or refresh.
type RefreshRequest ¶
type RefreshRequest struct {
// RecordID is the previously-issued PublishResult.RecordID.
RecordID string
// AgentDID is the agent's DID; some backends bind heartbeat auth to it.
AgentDID string
}
RefreshRequest is the bridge → backend contract for heartbeating.
type Registration ¶
type Registration struct {
// EntityID is the SemStreams entity ID.
EntityID string `json:"entity_id"`
// RegistrationID is the directory's registration ID.
RegistrationID string `json:"registration_id"`
// AgentDID is the agent's decentralized identifier.
AgentDID string `json:"agent_did"`
// OASFRecord is the agent's OASF specification.
OASFRecord *oasfgenerator.OASFRecord `json:"oasf_record"`
// RegisteredAt is when the registration was created.
RegisteredAt time.Time `json:"registered_at"`
// ExpiresAt is when the registration expires.
ExpiresAt time.Time `json:"expires_at"`
// LastHeartbeat is when the last heartbeat was sent.
LastHeartbeat time.Time `json:"last_heartbeat"`
// Retries is the number of registration retries.
Retries int `json:"retries"`
}
Registration represents an active directory registration.
type RegistrationError ¶
RegistrationError represents a registration failure.
func (*RegistrationError) Error ¶
func (e *RegistrationError) Error() string
type RegistrationManager ¶
type RegistrationManager struct {
// contains filtered or unexported fields
}
RegistrationManager handles the lifecycle of agent registrations. It owns the heartbeat scheduler and the entityID→Registration map; the concrete protocol/wire format is delegated to a Backend.
func NewRegistrationManager ¶
func NewRegistrationManager(backend Backend, identityProvider identity.Provider, config Config, logger *slog.Logger) *RegistrationManager
NewRegistrationManager creates a new registration manager bound to the given Backend. Callers wrap a *DirectoryClient via NewHTTPBackend or NewHTTPBackendFromClient when constructing for the current HTTP wire format.
func (*RegistrationManager) Deregister ¶
func (rm *RegistrationManager) Deregister(ctx context.Context, entityID string) error
Deregister removes an agent from the directory.
func (*RegistrationManager) GetRegistration ¶
func (rm *RegistrationManager) GetRegistration(entityID string) *Registration
GetRegistration returns the registration for an entity.
func (*RegistrationManager) ListRegistrations ¶
func (rm *RegistrationManager) ListRegistrations() []*Registration
ListRegistrations returns all active registrations.
func (*RegistrationManager) RegisterAgent ¶
func (rm *RegistrationManager) RegisterAgent(ctx context.Context, entityID string, record *oasfgenerator.OASFRecord, agentIdentity *identity.AgentIdentity) error
RegisterAgent registers an agent with the directory.
func (*RegistrationManager) Start ¶
func (rm *RegistrationManager) Start(ctx context.Context) error
Start begins the heartbeat goroutine.
func (*RegistrationManager) Stop ¶
func (rm *RegistrationManager) Stop(ctx context.Context) error
Stop stops the heartbeat goroutine and deregisters all agents.
func (*RegistrationManager) UpdateRegistration ¶
func (rm *RegistrationManager) UpdateRegistration(ctx context.Context, entityID string, record *oasfgenerator.OASFRecord) error
UpdateRegistration updates an existing registration with new OASF data.
type RegistrationRequest ¶
type RegistrationRequest struct {
// AgentDID is the agent's decentralized identifier.
AgentDID string `json:"agent_did"`
// OASFRecord is the agent's OASF specification.
OASFRecord *oasfgenerator.OASFRecord `json:"oasf_record"`
// TTL is the registration time-to-live in seconds.
TTL int `json:"ttl,omitempty"`
// Metadata contains additional registration metadata.
Metadata map[string]any `json:"metadata,omitempty"`
}
RegistrationRequest represents a request to register an agent.
type RegistrationResponse ¶
type RegistrationResponse struct {
// Success indicates if registration succeeded.
Success bool `json:"success"`
// RegistrationID is the unique ID for this registration.
RegistrationID string `json:"registration_id,omitempty"`
// ExpiresAt is when the registration expires.
ExpiresAt time.Time `json:"expires_at,omitempty"`
// Error contains error details if registration failed.
Error string `json:"error,omitempty"`
}
RegistrationResponse represents the directory's response to a registration.
type RegistryInterface ¶
type RegistryInterface interface {
RegisterWithConfig(component.RegistrationConfig) error
}
RegistryInterface defines the minimal interface needed for registration.
type WithdrawRequest ¶
type WithdrawRequest struct {
// RecordID is the previously-issued PublishResult.RecordID.
RecordID string
// AgentDID is the agent's DID; some backends bind withdrawal auth to it.
AgentDID string
}
WithdrawRequest is the bridge → backend contract for removal.