Documentation
¶
Overview ¶
Package loopback ships Harbor's V1 in-process drivers for both `distributed.MessageBus` and `distributed.RemoteTransport`. It is the test reference for the conformance suite — every later driver (durable bus at phase 86, A2A wire RemoteTransport at phase 29) inherits the same suite verbatim.
The MessageBus loopback projects each published envelope as a typed event on the configured `events.EventBus` (registering a synthetic EventType once at init). The RemoteTransport loopback maintains an in-memory `map[agentURL]Agent` registered via the `RegisterAgent` sidecar API; every RemoteTransport call dispatches synchronously to the registered Agent.
The Agent abstraction (see agent.go) is conformance-suite-facing only — production callers depend on `RemoteTransport`. The Agent exists so the loopback driver can simulate a remote A2A endpoint without leaving the process.
Index ¶
Constants ¶
const EventTypeDistributedBusEnvelope events.EventType = "distributed.bus_envelope"
EventTypeDistributedBusEnvelope is the canonical event type the loopback MessageBus emits onto the typed `events.EventBus` for each Publish. Subscribers wired to the event bus see envelopes by filtering on this type. Registered once at package init.
Variables ¶
This section is empty.
Functions ¶
func NewBus ¶
func NewBus(deps distributed.Dependencies) (distributed.MessageBus, error)
NewBus builds the loopback MessageBus directly. Exposed for tests that want to skip the registry.
func NewRemoteTransport ¶
func NewRemoteTransport(_ distributed.Dependencies) (distributed.RemoteTransport, error)
NewRemoteTransport builds the loopback RemoteTransport directly. Exposed for tests that want to skip the registry.
Deps are intentionally ignored: the loopback driver is in-process dispatch — it has no EventBus / Cfg consumers at V1. The Phase 29 wire RemoteTransport driver (post-V1) WILL read deps.EventBus to surface transport-level events (`distributed.send_failed` etc.) and deps.Cfg for endpoint configuration; reviewers porting this signature forward to the wire driver MUST replace this stub with real deps consumption.
Types ¶
type Agent ¶
type Agent interface {
// SendMessage maps to A2A SendMessage.
SendMessage(ctx context.Context, msg a2a.Message, cfg a2a.SendMessageConfiguration) (a2a.Task, error)
// SendStreamingMessage maps to A2A SendStreamingMessage. The
// returned channel is owned by the Agent; the Agent MUST close it
// when the stream is complete (the loopback transport's Recv
// observes the close as the end-of-stream signal).
SendStreamingMessage(ctx context.Context, msg a2a.Message, cfg a2a.SendMessageConfiguration) (<-chan a2a.StreamResponse, error)
// GetTask maps to A2A GetTask. Returns the Task or
// `distributed.ErrTaskNotFound` if absent.
GetTask(ctx context.Context, taskID, contextID string) (a2a.Task, error)
// ListTasks maps to A2A ListTasks.
ListTasks(ctx context.Context, filter ListTasksFilter) ([]a2a.Task, error)
// CancelTask maps to A2A CancelTask. Returns the terminal Task.
CancelTask(ctx context.Context, taskID, contextID string) (a2a.Task, error)
// SubscribeToTask maps to A2A SubscribeToTask. Channel ownership
// rules identical to SendStreamingMessage.
SubscribeToTask(ctx context.Context, taskID, contextID string) (<-chan a2a.StreamResponse, error)
// CreateTaskPushNotificationConfig maps to A2A
// CreateTaskPushNotificationConfig.
CreateTaskPushNotificationConfig(ctx context.Context, cfg a2a.TaskPushNotificationConfig) (a2a.TaskPushNotificationConfig, error)
// GetTaskPushNotificationConfig maps to A2A
// GetTaskPushNotificationConfig.
GetTaskPushNotificationConfig(ctx context.Context, taskID, configID string) (a2a.TaskPushNotificationConfig, error)
// ListTaskPushNotificationConfigs maps to A2A
// ListTaskPushNotificationConfigs.
ListTaskPushNotificationConfigs(ctx context.Context, taskID string) ([]a2a.TaskPushNotificationConfig, error)
// DeleteTaskPushNotificationConfig maps to A2A
// DeleteTaskPushNotificationConfig.
DeleteTaskPushNotificationConfig(ctx context.Context, taskID, configID string) error
// GetExtendedAgentCard maps to A2A GetExtendedAgentCard.
GetExtendedAgentCard(ctx context.Context) (a2a.AgentCard, error)
}
Agent is the in-process simulation of a remote A2A peer. The loopback RemoteTransport dispatches each method one-to-one to the registered Agent for the target URL. One method per A2A RPC.
Conformance-suite-facing only. Production callers depend on `distributed.RemoteTransport`.
type BusEnvelopePayload ¶
type BusEnvelopePayload struct {
events.SafeSealed
// Envelope is the published BusEnvelope, projected onto the typed
// event bus. Consumers idempotency-key on
// `(Envelope.TaskID, Envelope.Edge, Envelope.EventID)`.
Envelope distributed.BusEnvelope
}
BusEnvelopePayload is the typed event payload carrying a distributed.BusEnvelope projection. SafePayload — the envelope's Payload bytes are assumed pre-redacted by the publisher (D-020).
type ListTasksFilter ¶
type ListTasksFilter struct {
Tenant string
ContextID string
Status a2a.TaskState
PageSize int32
PageToken string
HistoryLength int32
StatusTimestampAfter int64 // Unix nanos; 0 = "no filter"
IncludeArtifacts bool
}
ListTasksFilter is the Agent-side view of the `distributed.RemoteTaskFilter` (the loopback driver translates one to the other). Kept separate so the Agent surface tracks the proto `ListTasksRequest` shape directly without leaking the distributed package's wrapping types.
type LoopbackTransport ¶
type LoopbackTransport interface {
distributed.RemoteTransport
// RegisterAgent installs agent for url. Replaces any prior
// registration at the same URL. The first registration becomes
// the "default" URL for GetExtendedAgentCard when no AgentURL
// is set on the request.
RegisterAgent(url string, agent Agent)
// UnregisterAgent removes the agent at url. No-op if absent.
UnregisterAgent(url string)
// SetDefaultAgentCard sets the AgentCard returned by
// GetExtendedAgentCard when the registered Agent itself returns
// a zero value (test convenience).
SetDefaultAgentCard(source func() *a2a.AgentCard)
}
LoopbackTransport is the concrete type exposed for tests so they can register / unregister Agents against the in-memory registry without resolving through the interface.