a2asrv

package
v2.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 10, 2026 License: Apache-2.0 Imports: 29 Imported by: 11

Documentation

Overview

Package a2asrv provides a configurable A2A protocol server implementation.

The default implementation can be created using NewHandler. The function takes a single required AgentExecutor dependency and a variable number of RequestHandlerOption-s used to customize handler behavior.

AgentExecutor implementation is responsible for invoking the agent, translating its outputs to a2a core types and emitting them for processing. A2A server stack will handle task state modification, persistence and notification dispatch to registered push endpoint and connected clients.

RequestHandler is transport-agnostic and needs to be wrapped in a transport-specific translation layer like NewJSONRPCHandler. JSONRPC transport implementation can be registered with the standard http.Server:

handler := a2asrv.NewHandler(
	agentExecutor,
	task.WithTaskStore(customDB),
	a2asrv.WithPushNotifications(configStore, sender),
	a2asrv.WithCallInterceptor(customMiddleware),
	...
)

mux := http.NewServeMux()
mux.Handle("/invoke", a2asrv.NewJSONRPCHandler(handler))

The package provides utilities for serving public a2a.AgentCard-s. These return handler implementations which can be registered with a standard http server. Since the card is public, CORS policy allows requests from any domain.

mux.Handle(a2asrv, a2asrv.NewStaticAgentCardHandler(card))

// or for more advanced use cases

mux.Handle(a2asrv, a2asrv.NewAgentCardHandler(producer))

Index

Examples

Constants

View Source
const WellKnownAgentCardPath = "/.well-known/agent-card.json"

WellKnownAgentCardPath is the standard HTTP path for retrieving the agent card as defined in A2A spec.

Variables

This section is empty.

Functions

func NewAgentCardHandler

func NewAgentCardHandler(producer AgentCardProducer) http.Handler

NewAgentCardHandler creates an http.Handler implementation for serving a PUBLIC a2a.AgentCard. The information contained in this card can be queried from any origin.

Example
package main

import (
	"context"
	"encoding/json"
	"fmt"
	"net/http"
	"net/http/httptest"

	"github.com/a2aproject/a2a-go/v2/a2a"
	"github.com/a2aproject/a2a-go/v2/a2asrv"
)

func main() {
	producer := a2asrv.AgentCardProducerFn(func(_ context.Context) (*a2a.AgentCard, error) {
		return &a2a.AgentCard{
			Name:    "Dynamic Agent",
			Version: "2.0.0",
			SupportedInterfaces: []*a2a.AgentInterface{
				a2a.NewAgentInterface("http://localhost:8080", a2a.TransportProtocolJSONRPC),
			},
		}, nil
	})
	handler := a2asrv.NewAgentCardHandler(producer)

	server := httptest.NewServer(handler)
	defer server.Close()

	resp, err := http.Get(server.URL)
	if err != nil {
		fmt.Println("Error:", err)
		return
	}
	defer func() { _ = resp.Body.Close() }()

	var result map[string]any
	if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
		fmt.Println("Error:", err)
		return
	}

	fmt.Println("Name:", result["name"])
	fmt.Println("Version:", result["version"])
}
Output:
Name: Dynamic Agent
Version: 2.0.0

func NewJSONRPCHandler

func NewJSONRPCHandler(handler RequestHandler, options ...TransportOption) http.Handler

NewJSONRPCHandler creates an http.Handler which implements JSONRPC A2A protocol binding.

Example
package main

import (
	"context"
	"fmt"
	"iter"
	"net/http"

	"github.com/a2aproject/a2a-go/v2/a2a"
	"github.com/a2aproject/a2a-go/v2/a2asrv"
)

type echoExecutor struct {
	ExecuteFn func(context.Context, *a2asrv.ExecutorContext) iter.Seq2[a2a.Event, error]
}

func (e *echoExecutor) Execute(ctx context.Context, execCtx *a2asrv.ExecutorContext) iter.Seq2[a2a.Event, error] {
	if e.ExecuteFn != nil {
		return e.ExecuteFn(ctx, execCtx)
	}
	return func(yield func(a2a.Event, error) bool) {
		yield(a2a.NewMessage(a2a.MessageRoleAgent, a2a.NewTextPart("echo")), nil)
	}
}

func (e *echoExecutor) Cancel(_ context.Context, execCtx *a2asrv.ExecutorContext) iter.Seq2[a2a.Event, error] {
	return func(yield func(a2a.Event, error) bool) {
		yield(a2a.NewStatusUpdateEvent(execCtx, a2a.TaskStateCanceled, nil), nil)
	}
}

func main() {
	executor := &echoExecutor{}
	handler := a2asrv.NewHandler(executor)
	jsonrpcHandler := a2asrv.NewJSONRPCHandler(handler)

	mux := http.NewServeMux()
	mux.Handle("/", jsonrpcHandler)

	fmt.Println("JSON-RPC handler registered:", jsonrpcHandler != nil)
}
Output:
JSON-RPC handler registered: true

func NewRESTHandler

func NewRESTHandler(handler RequestHandler, opts ...TransportOption) http.Handler

NewRESTHandler creates an http.Handler which implements the HTTP+JSON A2A protocol binding.

func NewStaticAgentCardHandler

func NewStaticAgentCardHandler(card *a2a.AgentCard) http.Handler

NewStaticAgentCardHandler creates an http.Handler implementation for serving a PUBLIC a2a.AgentCard which is not expected to change while the program is running. The information contained in this card can be queried from any origin. The method panics if the argument json marhsaling fails.

Example
package main

import (
	"encoding/json"
	"fmt"
	"net/http"
	"net/http/httptest"

	"github.com/a2aproject/a2a-go/v2/a2a"
	"github.com/a2aproject/a2a-go/v2/a2asrv"
)

func main() {
	card := &a2a.AgentCard{
		Name:    "Echo Agent",
		Version: "1.0.0",
		SupportedInterfaces: []*a2a.AgentInterface{
			a2a.NewAgentInterface("http://localhost:8080", a2a.TransportProtocolJSONRPC),
		},
	}
	handler := a2asrv.NewStaticAgentCardHandler(card)

	server := httptest.NewServer(handler)
	defer server.Close()

	resp, err := http.Get(server.URL)
	if err != nil {
		fmt.Println("Error:", err)
		return
	}
	defer func() { _ = resp.Body.Close() }()

	var result map[string]any
	if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
		fmt.Println("Error:", err)
		return
	}

	fmt.Println("Name:", result["name"])
	fmt.Println("Version:", result["version"])
	fmt.Println("Content-Type:", resp.Header.Get("Content-Type"))
}
Output:
Name: Echo Agent
Version: 1.0.0
Content-Type: application/json

func NewTaskStoreAuthenticator

func NewTaskStoreAuthenticator() taskstore.Authenticator

NewTaskStoreAuthenticator returns a taskstore.Authenticator which uses the CallContext to get the user name.

func NewTenantRESTHandler

func NewTenantRESTHandler(tenantTemplate string, handler RequestHandler, opts ...TransportOption) http.Handler

NewTenantRESTHandler creates an http.Handler which implements the HTTP+JSON A2A protocol binding. It extracts tenant information from the URL path based on the provided template, strips the prefix, and attaches the tenant ID (part inside {}) to the request context. Examples of templates: - "/{*}" - "/locations/*/projects/{*}" - "/{locations/*/projects/*}"

Types

type AgentCardJSONProducer

type AgentCardJSONProducer interface {
	// CardJSON returns an [a2a.AgentCard] as raw json.
	CardJSON(ctx context.Context) ([]byte, error)
}

AgentCardJSONProducer creates an AgentCard instances used for agent discovery and capability negotiation as raw json.

type AgentCardProducer

type AgentCardProducer interface {
	// Card returns a self-describing manifest for an agent. It provides essential
	// metadata including the agent's identity, capabilities, skills, supported
	// communication methods, and security requirements.
	Card(ctx context.Context) (*a2a.AgentCard, error)
}

AgentCardProducer creates public AgentCard instances used for agent discovery and capability negotiation.

type AgentCardProducerFn

type AgentCardProducerFn func(ctx context.Context) (*a2a.AgentCard, error)

AgentCardProducerFn is a function type which implements AgentCardProducer.

func (AgentCardProducerFn) Card

Card implements AgentCardProducer.

type AgentExecutionCleaner

type AgentExecutionCleaner interface {
	// Cleanup is called after an agent execution completes with either result or an error.
	Cleanup(ctx context.Context, execCtx *ExecutorContext, result a2a.SendMessageResult, err error)
}

AgentExecutionCleaner is an optional interface AgentExecutor can implement to perform cleanup after execution finishes.

type AgentExecutor

type AgentExecutor interface {
	// Execute invokes the agent passing information about the request which triggered the execution,
	// translates agent outputs to A2A events and emits them for processing.
	// Every invocation runs in a dedicated goroutine.
	//
	// Failures should generally be reported by writing events carrying the cancelation information
	// and task state. An error should be returned in special cases or before a task was created.
	Execute(ctx context.Context, execCtx *ExecutorContext) iter.Seq2[a2a.Event, error]

	// Cancel is called when a client requests the agent to stop working on a task.
	// The simplest implementation can emit an [a2a.TaskStatusUpdateEvent] with [a2a.TaskStateCanceled].
	//
	// Optimistic concurrent control is used during task store updates to prevent concurrent writes
	// and propagate cancelation signal in a server cluster when execution and cancelation is handled
	// by different processes.
	//
	// TaskStatusUpdateEvent with a failed state is handled differently in terms
	// of retries. If a concurrent task modification is detected server stack will re-fetch
	// the latest state from the task store and retry update request.
	//
	// If the event gets applied during an active execution, the next execution update will
	// fail on OCC and execution will be canceled.
	//
	// An error should be returned if the cancelation request cannot be processed.
	Cancel(ctx context.Context, execCtx *ExecutorContext) iter.Seq2[a2a.Event, error]
}

AgentExecutor implementations translate agent outputs to A2A events. The provided ExecutorContext should be used as a a2a.TaskInfoProvider argument for a2a.Event-s constructor functions, for example:

a2a.NewSubmittedTask(execCtx, a2a.TaskStateSubmitted, nil)
a2a.NewStatusUpdateEvent(execCtx, a2a.TaskStateWorking, nil)
a2a.NewArtifactEvent(execCtx, parts...)
a2a.NewArtifactUpdateEvent(execCtx, artifactID, parts...)

For streaming responses a2a.TaskArtifactUpdatEvent-s should be emitted. A2A server stops processing events after one of these events:

In general, the executor should not emit an error after the first event was emitted, but an a2a.TaskStatusUpdateEvent with a failed state.

The following code can be used as a streaming implementation template with generateOutputs and toParts missing:

func Execute(ctx context.Context, execCtx *ExecutorContext) iter.Seq2[a2a.Event, error] {
	return func(yield func(a2a.Event, error) bool) {
		if execCtx.StoredTask == nil {
			if !yield(a2a.NewSubmittedTask(execCtx, a2a.TaskStateSubmitted, nil), nil) {
				return
			}
		}

		/* performs the necessary setup */

		if !yield(a2a.NewStatusUpdateEvent(execCtx, a2a.TaskStateWorking, nil), nil) {
			return
		}

		var artifactID a2a.ArtifactID
		for output, err := range generateOutputs() {
			if err != nil {
				event := a2a.NewStatusUpdateEvent(execCtx, a2a.TaskStateFailed, toErrorMessage(err))
				if !yield(event, nil) {
					return
				}
			}

			parts := toParts(output)
			var event *a2a.TaskArtifactUpdateEvent
			if artifactID == "" {
				event = a2a.NewArtifactEvent(execCtx, parts...)
				artifactID = event.Artifact.ID
			} else {
				event = a2a.NewArtifactUpdateEvent(execCtx, artifactID, parts...)
			}

			if !yield(event, nil) {
				return
			}
		}

		if !yield(a2a.NewStatusUpdateEvent(execCtx, a2a.TaskStateCompleted, nil), nil) {
			return
		}
	}
}
Example
package main

import (
	"context"
	"fmt"
	"iter"

	"github.com/a2aproject/a2a-go/v2/a2a"
	"github.com/a2aproject/a2a-go/v2/a2asrv"
)

type echoExecutor struct {
	ExecuteFn func(context.Context, *a2asrv.ExecutorContext) iter.Seq2[a2a.Event, error]
}

func (e *echoExecutor) Execute(ctx context.Context, execCtx *a2asrv.ExecutorContext) iter.Seq2[a2a.Event, error] {
	if e.ExecuteFn != nil {
		return e.ExecuteFn(ctx, execCtx)
	}
	return func(yield func(a2a.Event, error) bool) {
		yield(a2a.NewMessage(a2a.MessageRoleAgent, a2a.NewTextPart("echo")), nil)
	}
}

func (e *echoExecutor) Cancel(_ context.Context, execCtx *a2asrv.ExecutorContext) iter.Seq2[a2a.Event, error] {
	return func(yield func(a2a.Event, error) bool) {
		yield(a2a.NewStatusUpdateEvent(execCtx, a2a.TaskStateCanceled, nil), nil)
	}
}

func main() {
	executor := &echoExecutor{
		ExecuteFn: func(_ context.Context, execCtx *a2asrv.ExecutorContext) iter.Seq2[a2a.Event, error] {
			return func(yield func(a2a.Event, error) bool) {
				if !yield(a2a.NewSubmittedTask(execCtx, execCtx.Message), nil) {
					return
				}

				if !yield(a2a.NewStatusUpdateEvent(execCtx, a2a.TaskStateWorking, nil), nil) {
					return
				}

				if !yield(a2a.NewArtifactEvent(execCtx, a2a.NewTextPart("generated output")), nil) {
					return
				}

				yield(a2a.NewStatusUpdateEvent(execCtx, a2a.TaskStateCompleted, nil), nil)
			}
		},
	}

	handler := a2asrv.NewHandler(executor)

	msg := a2a.NewMessage(a2a.MessageRoleUser, a2a.NewTextPart("generate something"))
	result, err := handler.SendMessage(context.Background(), &a2a.SendMessageRequest{Message: msg})
	if err != nil {
		fmt.Println("Error:", err)
		return
	}

	task, ok := result.(*a2a.Task)
	if !ok {
		fmt.Println("Expected task result")
		return
	}

	fmt.Println("State:", task.Status.State)
	fmt.Println("Artifacts:", len(task.Artifacts))
}
Output:
State: TASK_STATE_COMPLETED
Artifacts: 1

type CallContext

type CallContext struct {

	// User can be set by authentication middleware to provide information about
	// the user who initiated the request.
	User *User
	// contains filtered or unexported fields
}

CallContext holds information about the current server call scope.

func CallContextFrom

func CallContextFrom(ctx context.Context) (*CallContext, bool)

CallContextFrom allows to get a CallContext struct which holds additional information about the current execution scope.

func NewCallContext

func NewCallContext(ctx context.Context, params *ServiceParams) (context.Context, *CallContext)

NewCallContext can be called by a transport implementations to provide request metadata to RequestHandler or to have access to the list of activated extensions via the returned CallContext after the call ends. If context already had a CallContext attached, the old context will be shadowed.

func (*CallContext) Extensions

func (cc *CallContext) Extensions() *Extensions

Extensions returns a struct which provides an API for working with extensions in the current call context.

func (*CallContext) Method

func (cc *CallContext) Method() string

Method returns the name of the RequestHandler method which is being executed.

func (*CallContext) ServiceParams

func (cc *CallContext) ServiceParams() *ServiceParams

ServiceParams returns metadata of the request which created the call context.

func (*CallContext) Tenant

func (cc *CallContext) Tenant() string

Tenant returns the tenant ID of the current call context.

type CallInterceptor

type CallInterceptor interface {
	// Before allows to observe, modify or reject a Request.
	// A new context.Context can be returned to pass information down the call stack.
	// If either the result (2nd return value) or the error (3rd return value) is non nil,
	// the actual handler will not be called and the value will be returned to the client.
	Before(ctx context.Context, callCtx *CallContext, req *Request) (context.Context, any, error)

	// After allows to observe, modify or reject a Response.
	// Context passed to this method will be the same as returned from [CallInterceptor.Before].
	After(ctx context.Context, callCtx *CallContext, resp *Response) error
}

CallInterceptor can be attached to an RequestHandler. If multiple interceptors are added:

  • Before will be executed in the order of attachment sequentially.
  • After will be executed in the reverse order sequentially.

func NewLoggingInterceptor added in v2.1.0

func NewLoggingInterceptor(config *LoggingConfig) CallInterceptor

NewLoggingInterceptor creates a CallInterceptor that logs A2A method invocations. Incoming requests are logged at the configured level and errors are logged at the error level.

type ClusterConfig

type ClusterConfig struct {
	QueueManager eventqueue.Manager
	WorkQueue    workqueue.Queue
	TaskStore    taskstore.Store
}

ClusterConfig groups the necessary dependencies for A2A cluster mode operation.

type ExecutorContext

type ExecutorContext struct {
	// A message which triggered the execution. nil for cancelation request.
	Message *a2a.Message
	// TaskID is an ID of the task or a newly generated UUIDv4 in case Message did not reference any Task.
	TaskID a2a.TaskID
	// StoredTask is present if request message specified a TaskID.
	StoredTask *a2a.Task
	// RelatedTasks can be present when Message includes Task references and RequestContextBuilder is configured to load them.
	RelatedTasks []*a2a.Task
	// ContextID is a server-generated identifier for maintaining context across multiple related tasks or interactions. Matches the Task ContextID.
	ContextID string
	// Metadata of the request which triggered the call.
	Metadata map[string]any
	// User who made the request which triggered the execution.
	User *User
	// ServiceParams of the request which triggered the execution.
	ServiceParams *ServiceParams
	// Tenant is an optional ID of the agent owner.
	Tenant string
}

ExecutorContext provides information about an incoming A2A request to AgentExecutor.

func (*ExecutorContext) TaskInfo

func (ec *ExecutorContext) TaskInfo() a2a.TaskInfo

TaskInfo returns information used for associating events with a task.

type ExecutorContextInterceptor

type ExecutorContextInterceptor interface {
	// Intercept can modify the [ExecutorContext] before it gets passed to the [AgentExecutor].
	Intercept(ctx context.Context, execCtx *ExecutorContext) (context.Context, error)
}

ExecutorContextInterceptor defines an extension point for modifying the information which gets passed to the agent when it is invoked.

type ExtendedAgentCardProducer

type ExtendedAgentCardProducer interface {
	// ExtendedCard returns a self-describing manifest for an agent. It contains extended data
	// for authenticated clients.
	ExtendedCard(ctx context.Context, req *a2a.GetExtendedAgentCardRequest) (*a2a.AgentCard, error)
}

ExtendedAgentCardProducer creates AgentCard instances used for communicating extended capabilities to authenticated clients.

type ExtendedAgentCardProducerFn

type ExtendedAgentCardProducerFn func(ctx context.Context, req *a2a.GetExtendedAgentCardRequest) (*a2a.AgentCard, error)

ExtendedAgentCardProducerFn is a function type which implements ExtendedAgentCardProducer.

func (ExtendedAgentCardProducerFn) ExtendedCard

ExtendedCard implements ExtendedAgentCardProducer.

type Extensions

type Extensions struct {
	// contains filtered or unexported fields
}

Extensions provides utility methods for accessing extensions requested by the client and keeping track of extensions activated during request processing.

func ExtensionsFrom

func ExtensionsFrom(ctx context.Context) (*Extensions, bool)

ExtensionsFrom is a helper function for quick access to Extensions in the current CallContext.

func (*Extensions) Activate

func (e *Extensions) Activate(extension *a2a.AgentExtension)

Activate marks extension as activated in the current CallContext. A list of activated extensions might be attached as response metadata by a transport implementation.

func (*Extensions) ActivatedURIs

func (e *Extensions) ActivatedURIs() []string

ActivatedURIs returns URIs of all extensions activated during call processing.

func (*Extensions) Active

func (e *Extensions) Active(extension *a2a.AgentExtension) bool

Active returns true if an extension has already been activated in the current CallContext using ExtensionContext.Activate.

func (*Extensions) Requested

func (e *Extensions) Requested(extension *a2a.AgentExtension) bool

Requested returns true if the provided extension was requested by the client.

func (*Extensions) RequestedURIs

func (e *Extensions) RequestedURIs() []string

RequestedURIs returns URIs of all extensions requested by the client.

type InterceptedHandler

type InterceptedHandler struct {
	// Handler is responsible for the actual processing of every call.
	Handler RequestHandler
	// Interceptors is a list of call interceptors which will be applied before and after each call.
	Interceptors []CallInterceptor
	// Logger is the logger which will be accessible from request scope context using log package
	// methods. Defaults to slog.Default() if not set.
	Logger *slog.Logger
	// contains filtered or unexported fields
}

InterceptedHandler implements RequestHandler. It can be used to attach call interceptors and initialize call context for every method of the wrapped handler.

func (*InterceptedHandler) CancelTask

func (h *InterceptedHandler) CancelTask(ctx context.Context, req *a2a.CancelTaskRequest) (*a2a.Task, error)

CancelTask implements RequestHandler.

func (*InterceptedHandler) CreateTaskPushConfig

CreateTaskPushConfig implements RequestHandler.

func (*InterceptedHandler) DeleteTaskPushConfig

func (h *InterceptedHandler) DeleteTaskPushConfig(ctx context.Context, req *a2a.DeleteTaskPushConfigRequest) error

DeleteTaskPushConfig implements RequestHandler.

func (*InterceptedHandler) GetExtendedAgentCard

func (h *InterceptedHandler) GetExtendedAgentCard(ctx context.Context, req *a2a.GetExtendedAgentCardRequest) (*a2a.AgentCard, error)

GetExtendedAgentCard implements RequestHandler.

func (*InterceptedHandler) GetTask

func (h *InterceptedHandler) GetTask(ctx context.Context, req *a2a.GetTaskRequest) (*a2a.Task, error)

GetTask implements RequestHandler.

func (*InterceptedHandler) GetTaskPushConfig

GetTaskPushConfig implements RequestHandler.

func (*InterceptedHandler) ListTaskPushConfigs

func (h *InterceptedHandler) ListTaskPushConfigs(ctx context.Context, req *a2a.ListTaskPushConfigRequest) ([]*a2a.TaskPushConfig, error)

ListTaskPushConfigs implements RequestHandler.

func (*InterceptedHandler) ListTasks

ListTasks implements RequestHandler.

func (*InterceptedHandler) SendMessage

SendMessage implements RequestHandler.

func (*InterceptedHandler) SendStreamingMessage

func (h *InterceptedHandler) SendStreamingMessage(ctx context.Context, req *a2a.SendMessageRequest) iter.Seq2[a2a.Event, error]

SendStreamingMessage implements RequestHandler.

func (*InterceptedHandler) SubscribeToTask

SubscribeToTask implements RequestHandler.

type LoggingConfig added in v2.1.0

type LoggingConfig struct {
	// Level is the log level for incoming requests. Default: slog.LevelInfo.
	Level slog.Level
	// ErrorLevel is the log level for failed requests. Default: slog.LevelInfo.
	ErrorLevel slog.Level
	// LogPayload enables logging of request and response payloads.
	LogPayload bool
}

LoggingConfig controls the behavior of the logging CallInterceptor created by NewLoggingInterceptor.

type PassthroughCallInterceptor

type PassthroughCallInterceptor struct{}

PassthroughCallInterceptor can be used by CallInterceptor implementers who don't need all methods. The struct can be embedded for providing a no-op implementation.

Example
package main

import (
	"context"
	"fmt"
	"iter"

	"github.com/a2aproject/a2a-go/v2/a2a"
	"github.com/a2aproject/a2a-go/v2/a2asrv"
)

type echoExecutor struct {
	ExecuteFn func(context.Context, *a2asrv.ExecutorContext) iter.Seq2[a2a.Event, error]
}

func (e *echoExecutor) Execute(ctx context.Context, execCtx *a2asrv.ExecutorContext) iter.Seq2[a2a.Event, error] {
	if e.ExecuteFn != nil {
		return e.ExecuteFn(ctx, execCtx)
	}
	return func(yield func(a2a.Event, error) bool) {
		yield(a2a.NewMessage(a2a.MessageRoleAgent, a2a.NewTextPart("echo")), nil)
	}
}

func (e *echoExecutor) Cancel(_ context.Context, execCtx *a2asrv.ExecutorContext) iter.Seq2[a2a.Event, error] {
	return func(yield func(a2a.Event, error) bool) {
		yield(a2a.NewStatusUpdateEvent(execCtx, a2a.TaskStateCanceled, nil), nil)
	}
}

func main() {
	type myInterceptor struct {
		a2asrv.PassthroughCallInterceptor
	}

	handler := a2asrv.NewHandler(&echoExecutor{}, a2asrv.WithCallInterceptors(myInterceptor{}))
	fmt.Println("Handler created:", handler != nil)
}
Output:
Handler created: true

func (PassthroughCallInterceptor) After

After implements [CallInterceptor.After].

func (PassthroughCallInterceptor) Before

Before implements [CallInterceptor.Before].

type ReferencedTasksLoader

type ReferencedTasksLoader struct {
	Store taskstore.Store
}

ReferencedTasksLoader implements ExecutorContextInterceptor. It populates [ExecutorContext.RelatedTasks] with Tasks referenced in the a2a.Message.ReferenceTasks of the message which triggered the agent execution.

func (*ReferencedTasksLoader) Intercept

func (ri *ReferencedTasksLoader) Intercept(ctx context.Context, execCtx *ExecutorContext) (context.Context, error)

Intercept implements ExecutorContextInterceptor. It loads referenced tasks from the task store and populates [ExecutorContext.RelatedTasks].

type Request

type Request struct {
	// Payload is one of a2a package core types. It is nil when a request does not have any parameters.
	Payload any
}

Request represents a transport-agnostic request received by the A2A server.

type RequestHandler

type RequestHandler interface {
	// GetTask handles the 'GetTask' protocol method.
	GetTask(context.Context, *a2a.GetTaskRequest) (*a2a.Task, error)

	// ListTasks handles the 'ListTasks' protocol method.
	ListTasks(context.Context, *a2a.ListTasksRequest) (*a2a.ListTasksResponse, error)

	// CancelTask handles the 'CancelTask' protocol method.
	CancelTask(context.Context, *a2a.CancelTaskRequest) (*a2a.Task, error)

	// SendMessage handles the 'SendMessage' protocol method (non-streaming).
	SendMessage(context.Context, *a2a.SendMessageRequest) (a2a.SendMessageResult, error)

	// SubscribeToTask handles the `SubscribeToTask` protocol method.
	SubscribeToTask(context.Context, *a2a.SubscribeToTaskRequest) iter.Seq2[a2a.Event, error]

	// SendStreamingMessage handles the 'SendStreamingMessage' protocol method (streaming).
	SendStreamingMessage(context.Context, *a2a.SendMessageRequest) iter.Seq2[a2a.Event, error]

	// GetTaskPushConfig handles the `GetTaskPushNotificationConfig` protocol method.
	GetTaskPushConfig(context.Context, *a2a.GetTaskPushConfigRequest) (*a2a.TaskPushConfig, error)

	// ListTaskPushConfigs handles the `ListTaskPushNotificationConfigs` protocol method.
	ListTaskPushConfigs(context.Context, *a2a.ListTaskPushConfigRequest) ([]*a2a.TaskPushConfig, error)

	// CreateTaskPushConfig handles the `CreateTaskPushNotificationConfig` protocol method.
	CreateTaskPushConfig(context.Context, *a2a.CreateTaskPushConfigRequest) (*a2a.TaskPushConfig, error)

	// DeleteTaskPushConfig handles the `DeleteTaskPushNotificationConfig` protocol method.
	DeleteTaskPushConfig(context.Context, *a2a.DeleteTaskPushConfigRequest) error

	// GetExtendedAgentCard handles the `GetExtendedAgentCard` protocol method.
	GetExtendedAgentCard(context.Context, *a2a.GetExtendedAgentCardRequest) (*a2a.AgentCard, error)
}

RequestHandler defines a transport-agnostic interface for handling incoming A2A requests.

func NewHandler

func NewHandler(executor AgentExecutor, options ...RequestHandlerOption) RequestHandler

NewHandler creates a new request handler.

Example
package main

import (
	"context"
	"fmt"
	"iter"

	"github.com/a2aproject/a2a-go/v2/a2a"
	"github.com/a2aproject/a2a-go/v2/a2asrv"
)

type echoExecutor struct {
	ExecuteFn func(context.Context, *a2asrv.ExecutorContext) iter.Seq2[a2a.Event, error]
}

func (e *echoExecutor) Execute(ctx context.Context, execCtx *a2asrv.ExecutorContext) iter.Seq2[a2a.Event, error] {
	if e.ExecuteFn != nil {
		return e.ExecuteFn(ctx, execCtx)
	}
	return func(yield func(a2a.Event, error) bool) {
		yield(a2a.NewMessage(a2a.MessageRoleAgent, a2a.NewTextPart("echo")), nil)
	}
}

func (e *echoExecutor) Cancel(_ context.Context, execCtx *a2asrv.ExecutorContext) iter.Seq2[a2a.Event, error] {
	return func(yield func(a2a.Event, error) bool) {
		yield(a2a.NewStatusUpdateEvent(execCtx, a2a.TaskStateCanceled, nil), nil)
	}
}

func main() {
	executor := &echoExecutor{}
	handler := a2asrv.NewHandler(executor)

	fmt.Println("Handler created:", handler != nil)
}
Output:
Handler created: true
Example (FullServer)
package main

import (
	"context"
	"fmt"
	"iter"
	"net/http"

	"github.com/a2aproject/a2a-go/v2/a2a"
	"github.com/a2aproject/a2a-go/v2/a2asrv"
)

type echoExecutor struct {
	ExecuteFn func(context.Context, *a2asrv.ExecutorContext) iter.Seq2[a2a.Event, error]
}

func (e *echoExecutor) Execute(ctx context.Context, execCtx *a2asrv.ExecutorContext) iter.Seq2[a2a.Event, error] {
	if e.ExecuteFn != nil {
		return e.ExecuteFn(ctx, execCtx)
	}
	return func(yield func(a2a.Event, error) bool) {
		yield(a2a.NewMessage(a2a.MessageRoleAgent, a2a.NewTextPart("echo")), nil)
	}
}

func (e *echoExecutor) Cancel(_ context.Context, execCtx *a2asrv.ExecutorContext) iter.Seq2[a2a.Event, error] {
	return func(yield func(a2a.Event, error) bool) {
		yield(a2a.NewStatusUpdateEvent(execCtx, a2a.TaskStateCanceled, nil), nil)
	}
}

func main() {
	executor := &echoExecutor{}
	handler := a2asrv.NewHandler(executor)

	card := &a2a.AgentCard{
		Name:    "My Agent",
		Version: "1.0.0",
		SupportedInterfaces: []*a2a.AgentInterface{
			a2a.NewAgentInterface("http://localhost:8080", a2a.TransportProtocolJSONRPC),
		},
	}

	mux := http.NewServeMux()
	mux.Handle(a2asrv.WellKnownAgentCardPath, a2asrv.NewStaticAgentCardHandler(card))
	mux.Handle("/", a2asrv.NewJSONRPCHandler(handler))

	fmt.Println("Agent card path:", a2asrv.WellKnownAgentCardPath)
	fmt.Println("Server ready")
}
Output:
Agent card path: /.well-known/agent-card.json
Server ready
Example (WithOptions)
package main

import (
	"context"
	"fmt"
	"iter"

	"github.com/a2aproject/a2a-go/v2/a2a"
	"github.com/a2aproject/a2a-go/v2/a2asrv"
)

type echoExecutor struct {
	ExecuteFn func(context.Context, *a2asrv.ExecutorContext) iter.Seq2[a2a.Event, error]
}

func (e *echoExecutor) Execute(ctx context.Context, execCtx *a2asrv.ExecutorContext) iter.Seq2[a2a.Event, error] {
	if e.ExecuteFn != nil {
		return e.ExecuteFn(ctx, execCtx)
	}
	return func(yield func(a2a.Event, error) bool) {
		yield(a2a.NewMessage(a2a.MessageRoleAgent, a2a.NewTextPart("echo")), nil)
	}
}

func (e *echoExecutor) Cancel(_ context.Context, execCtx *a2asrv.ExecutorContext) iter.Seq2[a2a.Event, error] {
	return func(yield func(a2a.Event, error) bool) {
		yield(a2a.NewStatusUpdateEvent(execCtx, a2a.TaskStateCanceled, nil), nil)
	}
}

func main() {
	executor := &echoExecutor{}
	handler := a2asrv.NewHandler(
		executor,
		a2asrv.WithExtendedAgentCard(&a2a.AgentCard{Name: "Extended Agent"}),
		a2asrv.WithCallInterceptors(&a2asrv.PassthroughCallInterceptor{}),
	)

	fmt.Println("Handler with options created:", handler != nil)
}
Output:
Handler with options created: true

type RequestHandlerOption

type RequestHandlerOption func(*InterceptedHandler, *defaultRequestHandler)

RequestHandlerOption can be used to customize the default RequestHandler implementation behavior.

func WithCallInterceptors

func WithCallInterceptors(interceptors ...CallInterceptor) RequestHandlerOption

WithCallInterceptors adds a CallInterceptor which will be applied to all requests and responses.

func WithCapabilityChecks

func WithCapabilityChecks(capabilities *a2a.AgentCapabilities) RequestHandlerOption

WithCapabilityChecks sets the provided capabilities for the request handler.

func WithClusterMode

func WithClusterMode(config ClusterConfig) RequestHandlerOption

WithClusterMode is an experimental feature where work queue is used to distribute tasks across multiple instances.

func WithConcurrencyConfig

func WithConcurrencyConfig(config limiter.ConcurrencyConfig) RequestHandlerOption

WithConcurrencyConfig allows to set limits on the number of concurrent executions.

func WithEventQueueManager

func WithEventQueueManager(manager eventqueue.Manager) RequestHandlerOption

WithEventQueueManager overrides eventqueue.Manager with custom implementation

func WithExecutionPanicHandler

func WithExecutionPanicHandler(handler func(r any) error) RequestHandlerOption

WithExecutionPanicHandler allows to set a custom handler for panics occurred during execution.

func WithExecutorContextInterceptor

func WithExecutorContextInterceptor(interceptor ExecutorContextInterceptor) RequestHandlerOption

WithExecutorContextInterceptor overrides the default ExecutorContextInterceptor with a custom implementation.

func WithExtendedAgentCard

func WithExtendedAgentCard(card *a2a.AgentCard) RequestHandlerOption

WithExtendedAgentCard sets a static extended authenticated agent card.

func WithExtendedAgentCardProducer

func WithExtendedAgentCardProducer(cardProducer ExtendedAgentCardProducer) RequestHandlerOption

WithExtendedAgentCardProducer sets a dynamic extended authenticated agent card producer.

func WithLogger

func WithLogger(logger *slog.Logger) RequestHandlerOption

WithLogger sets a custom logger. Request scoped parameters will be attached to this logger on method invocations. Any injected dependency will be able to access the logger using github.com/a2aproject/a2a-go/v2/log package-level functions. If not provided, defaults to slog.Default().

func WithPushNotifications

func WithPushNotifications(store push.ConfigStore, sender push.Sender) RequestHandlerOption

WithPushNotifications adds support for push notifications. If dependencies are not provided push-related methods will be returning a2a.ErrPushNotificationNotSupported,

func WithTaskStore

func WithTaskStore(store taskstore.Store) RequestHandlerOption

WithTaskStore overrides TaskStore with a custom implementation. If not provided, default to an in-memory implementation.

type Response

type Response struct {
	// Payload is one of a2a package core types. It is nil when Err is set or when a request does not return any value.
	Payload any
	// Err is set to indicate that request processing failed.
	Err error
}

Response represents a transport-agnostic response generated by the A2A server. Payload is one of a2a package core types.

type ServiceParams

type ServiceParams struct {
	// contains filtered or unexported fields
}

ServiceParams holds the metadata associated with a request. Custom transport implementations can call NewCallContext to make it accessible during request processing.

Example
package main

import (
	"context"
	"fmt"
	"iter"
	"net/http"
	"net/http/httptest"

	"github.com/a2aproject/a2a-go/v2/a2a"
	"github.com/a2aproject/a2a-go/v2/a2asrv"
)

type echoExecutor struct {
	ExecuteFn func(context.Context, *a2asrv.ExecutorContext) iter.Seq2[a2a.Event, error]
}

func (e *echoExecutor) Execute(ctx context.Context, execCtx *a2asrv.ExecutorContext) iter.Seq2[a2a.Event, error] {
	if e.ExecuteFn != nil {
		return e.ExecuteFn(ctx, execCtx)
	}
	return func(yield func(a2a.Event, error) bool) {
		yield(a2a.NewMessage(a2a.MessageRoleAgent, a2a.NewTextPart("echo")), nil)
	}
}

func (e *echoExecutor) Cancel(_ context.Context, execCtx *a2asrv.ExecutorContext) iter.Seq2[a2a.Event, error] {
	return func(yield func(a2a.Event, error) bool) {
		yield(a2a.NewStatusUpdateEvent(execCtx, a2a.TaskStateCanceled, nil), nil)
	}
}

type testInterceptor struct {
	BeforeFn func(ctx context.Context, callCtx *a2asrv.CallContext, req *a2asrv.Request) (context.Context, any, error)
	AfterFn  func(ctx context.Context, callCtx *a2asrv.CallContext, resp *a2asrv.Response) error
}

func (ti *testInterceptor) Before(ctx context.Context, callCtx *a2asrv.CallContext, req *a2asrv.Request) (context.Context, any, error) {
	if ti.BeforeFn != nil {
		return ti.BeforeFn(ctx, callCtx, req)
	}
	return ctx, nil, nil
}

func (ti *testInterceptor) After(ctx context.Context, callCtx *a2asrv.CallContext, resp *a2asrv.Response) error {
	if ti.AfterFn != nil {
		return ti.AfterFn(ctx, callCtx, resp)
	}
	return nil
}

func main() {
	var capturedHeader string

	interceptor := &testInterceptor{
		BeforeFn: func(ctx context.Context, callCtx *a2asrv.CallContext, _ *a2asrv.Request) (context.Context, any, error) {
			if vals, ok := callCtx.ServiceParams().Get("x-custom-header"); ok && len(vals) > 0 {
				capturedHeader = vals[0]
			}
			return ctx, nil, nil
		},
	}

	executor := &echoExecutor{}
	handler := a2asrv.NewHandler(executor, a2asrv.WithCallInterceptors(interceptor))
	restHandler := a2asrv.NewRESTHandler(handler)

	server := httptest.NewServer(restHandler)
	defer server.Close()

	req, _ := http.NewRequest("GET", server.URL+"/tasks/task-123", nil)
	req.Header.Set("X-Custom-Header", "my-value")

	resp, err := http.DefaultClient.Do(req)
	if err != nil {
		fmt.Println("Error:", err)
		return
	}
	defer func() { _ = resp.Body.Close() }()

	// The task won't be found, but the interceptor still captures the header.
	fmt.Println("Header in ServiceParams:", capturedHeader)
}
Output:
Header in ServiceParams: my-value

func NewServiceParams

func NewServiceParams(src map[string][]string) *ServiceParams

NewServiceParams is a ServiceParams constructor function.

func (*ServiceParams) Get

func (sp *ServiceParams) Get(key string) ([]string, bool)

Get performs a case-insensitive lookup of values for the given key.

func (*ServiceParams) List

func (sp *ServiceParams) List() iter.Seq2[string, []string]

List allows to inspect all request meta values.

func (*ServiceParams) With

func (sp *ServiceParams) With(additional map[string][]string) *ServiceParams

With allows to create a ServiceParams instance holding the extended set of values.

type TransportConfig

type TransportConfig struct {
	KeepAliveInterval time.Duration
	PanicHandler      func(r any) error
}

TransportConfig holds the configuration for transport bindings.

type TransportOption

type TransportOption func(*TransportConfig)

TransportOption is a functional option for configuring protocol binding implementations.

func WithTransportKeepAlive

func WithTransportKeepAlive(interval time.Duration) TransportOption

WithTransportKeepAlive enables SSE keep-alive messages at the specified interval. Keep-alive messages prevent API gateways from dropping idle connections. If interval is 0 or negative, keep-alive is disabled (default behavior).

func WithTransportPanicHandler

func WithTransportPanicHandler(handler func(r any) error) TransportOption

WithTransportPanicHandler sets a custom panic handler for the transport bindings. This gives the ability to recovery from panic by returning an error to the client.

type User

type User struct {
	// Name is a username.
	Name string
	// Authenticated is true if the request was authenticated.
	Authenticated bool
	// Attributes is a map of attributes associated with the user.
	Attributes map[string]any
}

User can be attached to CallContext by authentication middleware.

Example
package main

import (
	"context"
	"fmt"
	"iter"
	"strings"

	"github.com/a2aproject/a2a-go/v2/a2a"
	"github.com/a2aproject/a2a-go/v2/a2asrv"
)

type echoExecutor struct {
	ExecuteFn func(context.Context, *a2asrv.ExecutorContext) iter.Seq2[a2a.Event, error]
}

func (e *echoExecutor) Execute(ctx context.Context, execCtx *a2asrv.ExecutorContext) iter.Seq2[a2a.Event, error] {
	if e.ExecuteFn != nil {
		return e.ExecuteFn(ctx, execCtx)
	}
	return func(yield func(a2a.Event, error) bool) {
		yield(a2a.NewMessage(a2a.MessageRoleAgent, a2a.NewTextPart("echo")), nil)
	}
}

func (e *echoExecutor) Cancel(_ context.Context, execCtx *a2asrv.ExecutorContext) iter.Seq2[a2a.Event, error] {
	return func(yield func(a2a.Event, error) bool) {
		yield(a2a.NewStatusUpdateEvent(execCtx, a2a.TaskStateCanceled, nil), nil)
	}
}

type testInterceptor struct {
	BeforeFn func(ctx context.Context, callCtx *a2asrv.CallContext, req *a2asrv.Request) (context.Context, any, error)
	AfterFn  func(ctx context.Context, callCtx *a2asrv.CallContext, resp *a2asrv.Response) error
}

func (ti *testInterceptor) Before(ctx context.Context, callCtx *a2asrv.CallContext, req *a2asrv.Request) (context.Context, any, error) {
	if ti.BeforeFn != nil {
		return ti.BeforeFn(ctx, callCtx, req)
	}
	return ctx, nil, nil
}

func (ti *testInterceptor) After(ctx context.Context, callCtx *a2asrv.CallContext, resp *a2asrv.Response) error {
	if ti.AfterFn != nil {
		return ti.AfterFn(ctx, callCtx, resp)
	}
	return nil
}

func main() {
	authenticate := func(_ string) string { return "user" }

	interceptor := &testInterceptor{
		BeforeFn: func(ctx context.Context, callCtx *a2asrv.CallContext, req *a2asrv.Request) (context.Context, any, error) {
			if auth, ok := callCtx.ServiceParams().Get("authorization"); ok && len(auth) > 0 && strings.HasPrefix(auth[0], "Bearer ") {
				if name := authenticate(auth[0]); name != "" {
					callCtx.User = a2asrv.NewAuthenticatedUser(name, nil)
				}
			}
			return ctx, nil, nil
		},
	}

	executor := &echoExecutor{
		ExecuteFn: func(_ context.Context, execCtx *a2asrv.ExecutorContext) iter.Seq2[a2a.Event, error] {
			return func(yield func(a2a.Event, error) bool) {
				fmt.Println("Auth found:", execCtx.User.Name)
				yield(a2a.NewMessage(a2a.MessageRoleAgent, a2a.NewTextPart("echo")), nil)
			}
		},
	}

	ctx, _ := a2asrv.NewCallContext(context.Background(), a2asrv.NewServiceParams(map[string][]string{
		"Authorization": {"Bearer token"},
	}))
	handler := a2asrv.NewHandler(executor, a2asrv.WithCallInterceptors(interceptor))
	_, err := handler.SendMessage(ctx, &a2a.SendMessageRequest{
		Message: a2a.NewMessage(a2a.MessageRoleUser, a2a.NewTextPart("echo")),
	})
	fmt.Println("Error:", err)
}
Output:
Auth found: user
Error: <nil>

func NewAuthenticatedUser

func NewAuthenticatedUser(username string, attrs map[string]any) *User

NewAuthenticatedUser returns a new User instance with the specified username and attributes.

Directories

Path Synopsis
Package eventqueue provides implementation for in-memory queue management and event processing.
Package eventqueue provides implementation for in-memory queue management and event processing.
Package limiter provides configurations for controlling concurrency limit.
Package limiter provides configurations for controlling concurrency limit.
Package push provides a basic implementation of push notification functionality.
Package push provides a basic implementation of push notification functionality.
Package taskstore provides the contract and in-memory implementation of a2a.Task storage.
Package taskstore provides the contract and in-memory implementation of a2a.Task storage.
Package workqueue provides a work queue implementation for task execution.
Package workqueue provides a work queue implementation for task execution.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL