loopback

package
v1.3.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 11, 2026 License: Apache-2.0 Imports: 9 Imported by: 0

Documentation

Overview

Package loopback ships Harbor's V1 in-process drivers for both `distributed.MessageBus` and `distributed.RemoteTransport`. It is the test reference for the conformance suite — every later driver (durable bus at phase 86, A2A wire RemoteTransport at phase 29) inherits the same suite verbatim.

The MessageBus loopback projects each published envelope as a typed event on the configured `events.EventBus` (registering a synthetic EventType once at init). The RemoteTransport loopback maintains an in-memory `map[agentURL]Agent` registered via the `RegisterAgent` sidecar API; every RemoteTransport call dispatches synchronously to the registered Agent.

The Agent abstraction (see agent.go) is conformance-suite-facing only — production callers depend on `RemoteTransport`. The Agent exists so the loopback driver can simulate a remote A2A endpoint without leaving the process.

Index

Constants

View Source
const EventTypeDistributedBusEnvelope events.EventType = "distributed.bus_envelope"

EventTypeDistributedBusEnvelope is the canonical event type the loopback MessageBus emits onto the typed `events.EventBus` for each Publish. Subscribers wired to the event bus see envelopes by filtering on this type. Registered once at package init.

Variables

This section is empty.

Functions

func NewBus

NewBus builds the loopback MessageBus directly. Exposed for tests that want to skip the registry.

func NewRemoteTransport

func NewRemoteTransport(_ distributed.Dependencies) (distributed.RemoteTransport, error)

NewRemoteTransport builds the loopback RemoteTransport directly. Exposed for tests that want to skip the registry.

Deps are intentionally ignored: the loopback driver is in-process dispatch — it has no EventBus / Cfg consumers at V1. The Phase 29 wire RemoteTransport driver (post-V1) WILL read deps.EventBus to surface transport-level events (`distributed.send_failed` etc.) and deps.Cfg for endpoint configuration; reviewers porting this signature forward to the wire driver MUST replace this stub with real deps consumption.

Types

type Agent

type Agent interface {
	// SendMessage maps to A2A SendMessage.
	SendMessage(ctx context.Context, msg a2a.Message, cfg a2a.SendMessageConfiguration) (a2a.Task, error)
	// SendStreamingMessage maps to A2A SendStreamingMessage. The
	// returned channel is owned by the Agent; the Agent MUST close it
	// when the stream is complete (the loopback transport's Recv
	// observes the close as the end-of-stream signal).
	SendStreamingMessage(ctx context.Context, msg a2a.Message, cfg a2a.SendMessageConfiguration) (<-chan a2a.StreamResponse, error)
	// GetTask maps to A2A GetTask. Returns the Task or
	// `distributed.ErrTaskNotFound` if absent.
	GetTask(ctx context.Context, taskID, contextID string) (a2a.Task, error)
	// ListTasks maps to A2A ListTasks.
	ListTasks(ctx context.Context, filter ListTasksFilter) ([]a2a.Task, error)
	// CancelTask maps to A2A CancelTask. Returns the terminal Task.
	CancelTask(ctx context.Context, taskID, contextID string) (a2a.Task, error)
	// SubscribeToTask maps to A2A SubscribeToTask. Channel ownership
	// rules identical to SendStreamingMessage.
	SubscribeToTask(ctx context.Context, taskID, contextID string) (<-chan a2a.StreamResponse, error)
	// CreateTaskPushNotificationConfig maps to A2A
	// CreateTaskPushNotificationConfig.
	CreateTaskPushNotificationConfig(ctx context.Context, cfg a2a.TaskPushNotificationConfig) (a2a.TaskPushNotificationConfig, error)
	// GetTaskPushNotificationConfig maps to A2A
	// GetTaskPushNotificationConfig.
	GetTaskPushNotificationConfig(ctx context.Context, taskID, configID string) (a2a.TaskPushNotificationConfig, error)
	// ListTaskPushNotificationConfigs maps to A2A
	// ListTaskPushNotificationConfigs.
	ListTaskPushNotificationConfigs(ctx context.Context, taskID string) ([]a2a.TaskPushNotificationConfig, error)
	// DeleteTaskPushNotificationConfig maps to A2A
	// DeleteTaskPushNotificationConfig.
	DeleteTaskPushNotificationConfig(ctx context.Context, taskID, configID string) error
	// GetExtendedAgentCard maps to A2A GetExtendedAgentCard.
	GetExtendedAgentCard(ctx context.Context) (a2a.AgentCard, error)
}

Agent is the in-process simulation of a remote A2A peer. The loopback RemoteTransport dispatches each method one-to-one to the registered Agent for the target URL. One method per A2A RPC.

Conformance-suite-facing only. Production callers depend on `distributed.RemoteTransport`.

type BusEnvelopePayload

type BusEnvelopePayload struct {
	events.SafeSealed
	// Envelope is the published BusEnvelope, projected onto the typed
	// event bus. Consumers idempotency-key on
	// `(Envelope.TaskID, Envelope.Edge, Envelope.EventID)`.
	Envelope distributed.BusEnvelope
}

BusEnvelopePayload is the typed event payload carrying a distributed.BusEnvelope projection. SafePayload — the envelope's Payload bytes are assumed pre-redacted by the publisher (D-020).

type ListTasksFilter

type ListTasksFilter struct {
	Tenant               string
	ContextID            string
	Status               a2a.TaskState
	PageSize             int32
	PageToken            string
	HistoryLength        int32
	StatusTimestampAfter int64 // Unix nanos; 0 = "no filter"
	IncludeArtifacts     bool
}

ListTasksFilter is the Agent-side view of the `distributed.RemoteTaskFilter` (the loopback driver translates one to the other). Kept separate so the Agent surface tracks the proto `ListTasksRequest` shape directly without leaking the distributed package's wrapping types.

type LoopbackTransport

type LoopbackTransport interface {
	distributed.RemoteTransport
	// RegisterAgent installs agent for url. Replaces any prior
	// registration at the same URL. The first registration becomes
	// the "default" URL for GetExtendedAgentCard when no AgentURL
	// is set on the request.
	RegisterAgent(url string, agent Agent)
	// UnregisterAgent removes the agent at url. No-op if absent.
	UnregisterAgent(url string)
	// SetDefaultAgentCard sets the AgentCard returned by
	// GetExtendedAgentCard when the registered Agent itself returns
	// a zero value (test convenience).
	SetDefaultAgentCard(source func() *a2a.AgentCard)
}

LoopbackTransport is the concrete type exposed for tests so they can register / unregister Agents against the in-memory registry without resolving through the interface.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL