Documentation
¶
Overview ¶
Package a2asrv provides a configurable A2A protocol server implementation.
The default implementation can be created using NewHandler. The function takes a single required AgentExecutor dependency and a variable number of RequestHandlerOption-s used to customize handler behavior.
AgentExecutor implementation is responsible for invoking the agent, translating its outputs to a2a core types and emitting them for processing. A2A server stack will handle task state modification, persistence and notification dispatch to registered push endpoint and connected clients.
RequestHandler is transport-agnostic and needs to be wrapped in a transport-specific translation layer like NewJSONRPCHandler. JSONRPC transport implementation can be registered with the standard http.Server:
handler := a2asrv.NewHandler(
agentExecutor,
task.WithTaskStore(customDB),
a2asrv.WithPushNotifications(configStore, sender),
a2asrv.WithCallInterceptor(customMiddleware),
...
)
mux := http.NewServeMux()
mux.Handle("/invoke", a2asrv.NewJSONRPCHandler(handler))
The package provides utilities for serving public a2a.AgentCard-s. These return handler implementations which can be registered with a standard http server. Since the card is public, CORS policy allows requests from any domain.
mux.Handle(a2asrv, a2asrv.NewStaticAgentCardHandler(card)) // or for more advanced use cases mux.Handle(a2asrv, a2asrv.NewAgentCardHandler(producer))
Index ¶
- Constants
- func NewAgentCardHandler(producer AgentCardProducer) http.Handler
- func NewJSONRPCHandler(handler RequestHandler, options ...TransportOption) http.Handler
- func NewRESTHandler(handler RequestHandler, opts ...TransportOption) http.Handler
- func NewStaticAgentCardHandler(card *a2a.AgentCard) http.Handler
- func NewTaskStoreAuthenticator() taskstore.Authenticator
- func NewTenantRESTHandler(tenantTemplate string, handler RequestHandler, opts ...TransportOption) http.Handler
- type AgentCardJSONProducer
- type AgentCardProducer
- type AgentCardProducerFn
- type AgentExecutionCleaner
- type AgentExecutor
- type CallContext
- type CallInterceptor
- type ClusterConfig
- type ExecutorContext
- type ExecutorContextInterceptor
- type ExtendedAgentCardProducer
- type ExtendedAgentCardProducerFn
- type Extensions
- type InterceptedHandler
- func (h *InterceptedHandler) CancelTask(ctx context.Context, req *a2a.CancelTaskRequest) (*a2a.Task, error)
- func (h *InterceptedHandler) CreateTaskPushConfig(ctx context.Context, req *a2a.CreateTaskPushConfigRequest) (*a2a.TaskPushConfig, error)
- func (h *InterceptedHandler) DeleteTaskPushConfig(ctx context.Context, req *a2a.DeleteTaskPushConfigRequest) error
- func (h *InterceptedHandler) GetExtendedAgentCard(ctx context.Context, req *a2a.GetExtendedAgentCardRequest) (*a2a.AgentCard, error)
- func (h *InterceptedHandler) GetTask(ctx context.Context, req *a2a.GetTaskRequest) (*a2a.Task, error)
- func (h *InterceptedHandler) GetTaskPushConfig(ctx context.Context, req *a2a.GetTaskPushConfigRequest) (*a2a.TaskPushConfig, error)
- func (h *InterceptedHandler) ListTaskPushConfigs(ctx context.Context, req *a2a.ListTaskPushConfigRequest) ([]*a2a.TaskPushConfig, error)
- func (h *InterceptedHandler) ListTasks(ctx context.Context, req *a2a.ListTasksRequest) (*a2a.ListTasksResponse, error)
- func (h *InterceptedHandler) SendMessage(ctx context.Context, req *a2a.SendMessageRequest) (a2a.SendMessageResult, error)
- func (h *InterceptedHandler) SendStreamingMessage(ctx context.Context, req *a2a.SendMessageRequest) iter.Seq2[a2a.Event, error]
- func (h *InterceptedHandler) SubscribeToTask(ctx context.Context, req *a2a.SubscribeToTaskRequest) iter.Seq2[a2a.Event, error]
- type LoggingConfig
- type PassthroughCallInterceptor
- type ReferencedTasksLoader
- type Request
- type RequestHandler
- type RequestHandlerOption
- func WithCallInterceptors(interceptors ...CallInterceptor) RequestHandlerOption
- func WithCapabilityChecks(capabilities *a2a.AgentCapabilities) RequestHandlerOption
- func WithClusterMode(config ClusterConfig) RequestHandlerOption
- func WithConcurrencyConfig(config limiter.ConcurrencyConfig) RequestHandlerOption
- func WithEventQueueManager(manager eventqueue.Manager) RequestHandlerOption
- func WithExecutionPanicHandler(handler func(r any) error) RequestHandlerOption
- func WithExecutorContextInterceptor(interceptor ExecutorContextInterceptor) RequestHandlerOption
- func WithExtendedAgentCard(card *a2a.AgentCard) RequestHandlerOption
- func WithExtendedAgentCardProducer(cardProducer ExtendedAgentCardProducer) RequestHandlerOption
- func WithLogger(logger *slog.Logger) RequestHandlerOption
- func WithPushNotifications(store push.ConfigStore, sender push.Sender) RequestHandlerOption
- func WithTaskStore(store taskstore.Store) RequestHandlerOption
- type Response
- type ServiceParams
- type TransportConfig
- type TransportOption
- type User
Examples ¶
Constants ¶
const WellKnownAgentCardPath = "/.well-known/agent-card.json"
WellKnownAgentCardPath is the standard HTTP path for retrieving the agent card as defined in A2A spec.
Variables ¶
This section is empty.
Functions ¶
func NewAgentCardHandler ¶
func NewAgentCardHandler(producer AgentCardProducer) http.Handler
NewAgentCardHandler creates an http.Handler implementation for serving a PUBLIC a2a.AgentCard. The information contained in this card can be queried from any origin.
Example ¶
package main
import (
"context"
"encoding/json"
"fmt"
"net/http"
"net/http/httptest"
"github.com/a2aproject/a2a-go/v2/a2a"
"github.com/a2aproject/a2a-go/v2/a2asrv"
)
func main() {
producer := a2asrv.AgentCardProducerFn(func(_ context.Context) (*a2a.AgentCard, error) {
return &a2a.AgentCard{
Name: "Dynamic Agent",
Version: "2.0.0",
SupportedInterfaces: []*a2a.AgentInterface{
a2a.NewAgentInterface("http://localhost:8080", a2a.TransportProtocolJSONRPC),
},
}, nil
})
handler := a2asrv.NewAgentCardHandler(producer)
server := httptest.NewServer(handler)
defer server.Close()
resp, err := http.Get(server.URL)
if err != nil {
fmt.Println("Error:", err)
return
}
defer func() { _ = resp.Body.Close() }()
var result map[string]any
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
fmt.Println("Error:", err)
return
}
fmt.Println("Name:", result["name"])
fmt.Println("Version:", result["version"])
}
Output: Name: Dynamic Agent Version: 2.0.0
func NewJSONRPCHandler ¶
func NewJSONRPCHandler(handler RequestHandler, options ...TransportOption) http.Handler
NewJSONRPCHandler creates an http.Handler which implements JSONRPC A2A protocol binding.
Example ¶
package main
import (
"context"
"fmt"
"iter"
"net/http"
"github.com/a2aproject/a2a-go/v2/a2a"
"github.com/a2aproject/a2a-go/v2/a2asrv"
)
type echoExecutor struct {
ExecuteFn func(context.Context, *a2asrv.ExecutorContext) iter.Seq2[a2a.Event, error]
}
func (e *echoExecutor) Execute(ctx context.Context, execCtx *a2asrv.ExecutorContext) iter.Seq2[a2a.Event, error] {
if e.ExecuteFn != nil {
return e.ExecuteFn(ctx, execCtx)
}
return func(yield func(a2a.Event, error) bool) {
yield(a2a.NewMessage(a2a.MessageRoleAgent, a2a.NewTextPart("echo")), nil)
}
}
func (e *echoExecutor) Cancel(_ context.Context, execCtx *a2asrv.ExecutorContext) iter.Seq2[a2a.Event, error] {
return func(yield func(a2a.Event, error) bool) {
yield(a2a.NewStatusUpdateEvent(execCtx, a2a.TaskStateCanceled, nil), nil)
}
}
func main() {
executor := &echoExecutor{}
handler := a2asrv.NewHandler(executor)
jsonrpcHandler := a2asrv.NewJSONRPCHandler(handler)
mux := http.NewServeMux()
mux.Handle("/", jsonrpcHandler)
fmt.Println("JSON-RPC handler registered:", jsonrpcHandler != nil)
}
Output: JSON-RPC handler registered: true
func NewRESTHandler ¶
func NewRESTHandler(handler RequestHandler, opts ...TransportOption) http.Handler
NewRESTHandler creates an http.Handler which implements the HTTP+JSON A2A protocol binding.
func NewStaticAgentCardHandler ¶
NewStaticAgentCardHandler creates an http.Handler implementation for serving a PUBLIC a2a.AgentCard which is not expected to change while the program is running. The information contained in this card can be queried from any origin. The method panics if the argument json marhsaling fails.
Example ¶
package main
import (
"encoding/json"
"fmt"
"net/http"
"net/http/httptest"
"github.com/a2aproject/a2a-go/v2/a2a"
"github.com/a2aproject/a2a-go/v2/a2asrv"
)
func main() {
card := &a2a.AgentCard{
Name: "Echo Agent",
Version: "1.0.0",
SupportedInterfaces: []*a2a.AgentInterface{
a2a.NewAgentInterface("http://localhost:8080", a2a.TransportProtocolJSONRPC),
},
}
handler := a2asrv.NewStaticAgentCardHandler(card)
server := httptest.NewServer(handler)
defer server.Close()
resp, err := http.Get(server.URL)
if err != nil {
fmt.Println("Error:", err)
return
}
defer func() { _ = resp.Body.Close() }()
var result map[string]any
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
fmt.Println("Error:", err)
return
}
fmt.Println("Name:", result["name"])
fmt.Println("Version:", result["version"])
fmt.Println("Content-Type:", resp.Header.Get("Content-Type"))
}
Output: Name: Echo Agent Version: 1.0.0 Content-Type: application/json
func NewTaskStoreAuthenticator ¶
func NewTaskStoreAuthenticator() taskstore.Authenticator
NewTaskStoreAuthenticator returns a taskstore.Authenticator which uses the CallContext to get the user name.
func NewTenantRESTHandler ¶
func NewTenantRESTHandler(tenantTemplate string, handler RequestHandler, opts ...TransportOption) http.Handler
NewTenantRESTHandler creates an http.Handler which implements the HTTP+JSON A2A protocol binding. It extracts tenant information from the URL path based on the provided template, strips the prefix, and attaches the tenant ID (part inside {}) to the request context. Examples of templates: - "/{*}" - "/locations/*/projects/{*}" - "/{locations/*/projects/*}"
Types ¶
type AgentCardJSONProducer ¶
type AgentCardJSONProducer interface {
// CardJSON returns an [a2a.AgentCard] as raw json.
CardJSON(ctx context.Context) ([]byte, error)
}
AgentCardJSONProducer creates an AgentCard instances used for agent discovery and capability negotiation as raw json.
type AgentCardProducer ¶
type AgentCardProducer interface {
// Card returns a self-describing manifest for an agent. It provides essential
// metadata including the agent's identity, capabilities, skills, supported
// communication methods, and security requirements.
Card(ctx context.Context) (*a2a.AgentCard, error)
}
AgentCardProducer creates public AgentCard instances used for agent discovery and capability negotiation.
type AgentCardProducerFn ¶
AgentCardProducerFn is a function type which implements AgentCardProducer.
type AgentExecutionCleaner ¶
type AgentExecutionCleaner interface {
// Cleanup is called after an agent execution completes with either result or an error.
Cleanup(ctx context.Context, execCtx *ExecutorContext, result a2a.SendMessageResult, err error)
}
AgentExecutionCleaner is an optional interface AgentExecutor can implement to perform cleanup after execution finishes.
type AgentExecutor ¶
type AgentExecutor interface {
// Execute invokes the agent passing information about the request which triggered the execution,
// translates agent outputs to A2A events and emits them for processing.
// Every invocation runs in a dedicated goroutine.
//
// Failures should generally be reported by writing events carrying the cancelation information
// and task state. An error should be returned in special cases or before a task was created.
Execute(ctx context.Context, execCtx *ExecutorContext) iter.Seq2[a2a.Event, error]
// Cancel is called when a client requests the agent to stop working on a task.
// The simplest implementation can emit an [a2a.TaskStatusUpdateEvent] with [a2a.TaskStateCanceled].
//
// Optimistic concurrent control is used during task store updates to prevent concurrent writes
// and propagate cancelation signal in a server cluster when execution and cancelation is handled
// by different processes.
//
// TaskStatusUpdateEvent with a failed state is handled differently in terms
// of retries. If a concurrent task modification is detected server stack will re-fetch
// the latest state from the task store and retry update request.
//
// If the event gets applied during an active execution, the next execution update will
// fail on OCC and execution will be canceled.
//
// An error should be returned if the cancelation request cannot be processed.
Cancel(ctx context.Context, execCtx *ExecutorContext) iter.Seq2[a2a.Event, error]
}
AgentExecutor implementations translate agent outputs to A2A events. The provided ExecutorContext should be used as a a2a.TaskInfoProvider argument for a2a.Event-s constructor functions, for example:
a2a.NewSubmittedTask(execCtx, a2a.TaskStateSubmitted, nil) a2a.NewStatusUpdateEvent(execCtx, a2a.TaskStateWorking, nil) a2a.NewArtifactEvent(execCtx, parts...) a2a.NewArtifactUpdateEvent(execCtx, artifactID, parts...)
For streaming responses a2a.TaskArtifactUpdatEvent-s should be emitted. A2A server stops processing events after one of these events:
- An a2a.Message with any payload.
- An a2a.Task or a2a.TaskStatusUpdateEvent with a a2a.TaskState.Terminal equal to true.
- An a2a.Task or a2a.TaskStatusUpdateEvent in a2a.TaskStateInputRequired state.
In general, the executor should not emit an error after the first event was emitted, but an a2a.TaskStatusUpdateEvent with a failed state.
The following code can be used as a streaming implementation template with generateOutputs and toParts missing:
func Execute(ctx context.Context, execCtx *ExecutorContext) iter.Seq2[a2a.Event, error] {
return func(yield func(a2a.Event, error) bool) {
if execCtx.StoredTask == nil {
if !yield(a2a.NewSubmittedTask(execCtx, a2a.TaskStateSubmitted, nil), nil) {
return
}
}
/* performs the necessary setup */
if !yield(a2a.NewStatusUpdateEvent(execCtx, a2a.TaskStateWorking, nil), nil) {
return
}
var artifactID a2a.ArtifactID
for output, err := range generateOutputs() {
if err != nil {
event := a2a.NewStatusUpdateEvent(execCtx, a2a.TaskStateFailed, toErrorMessage(err))
if !yield(event, nil) {
return
}
}
parts := toParts(output)
var event *a2a.TaskArtifactUpdateEvent
if artifactID == "" {
event = a2a.NewArtifactEvent(execCtx, parts...)
artifactID = event.Artifact.ID
} else {
event = a2a.NewArtifactUpdateEvent(execCtx, artifactID, parts...)
}
if !yield(event, nil) {
return
}
}
if !yield(a2a.NewStatusUpdateEvent(execCtx, a2a.TaskStateCompleted, nil), nil) {
return
}
}
}
Example ¶
package main
import (
"context"
"fmt"
"iter"
"github.com/a2aproject/a2a-go/v2/a2a"
"github.com/a2aproject/a2a-go/v2/a2asrv"
)
type echoExecutor struct {
ExecuteFn func(context.Context, *a2asrv.ExecutorContext) iter.Seq2[a2a.Event, error]
}
func (e *echoExecutor) Execute(ctx context.Context, execCtx *a2asrv.ExecutorContext) iter.Seq2[a2a.Event, error] {
if e.ExecuteFn != nil {
return e.ExecuteFn(ctx, execCtx)
}
return func(yield func(a2a.Event, error) bool) {
yield(a2a.NewMessage(a2a.MessageRoleAgent, a2a.NewTextPart("echo")), nil)
}
}
func (e *echoExecutor) Cancel(_ context.Context, execCtx *a2asrv.ExecutorContext) iter.Seq2[a2a.Event, error] {
return func(yield func(a2a.Event, error) bool) {
yield(a2a.NewStatusUpdateEvent(execCtx, a2a.TaskStateCanceled, nil), nil)
}
}
func main() {
executor := &echoExecutor{
ExecuteFn: func(_ context.Context, execCtx *a2asrv.ExecutorContext) iter.Seq2[a2a.Event, error] {
return func(yield func(a2a.Event, error) bool) {
if !yield(a2a.NewSubmittedTask(execCtx, execCtx.Message), nil) {
return
}
if !yield(a2a.NewStatusUpdateEvent(execCtx, a2a.TaskStateWorking, nil), nil) {
return
}
if !yield(a2a.NewArtifactEvent(execCtx, a2a.NewTextPart("generated output")), nil) {
return
}
yield(a2a.NewStatusUpdateEvent(execCtx, a2a.TaskStateCompleted, nil), nil)
}
},
}
handler := a2asrv.NewHandler(executor)
msg := a2a.NewMessage(a2a.MessageRoleUser, a2a.NewTextPart("generate something"))
result, err := handler.SendMessage(context.Background(), &a2a.SendMessageRequest{Message: msg})
if err != nil {
fmt.Println("Error:", err)
return
}
task, ok := result.(*a2a.Task)
if !ok {
fmt.Println("Expected task result")
return
}
fmt.Println("State:", task.Status.State)
fmt.Println("Artifacts:", len(task.Artifacts))
}
Output: State: TASK_STATE_COMPLETED Artifacts: 1
type CallContext ¶
type CallContext struct {
// User can be set by authentication middleware to provide information about
// the user who initiated the request.
User *User
// contains filtered or unexported fields
}
CallContext holds information about the current server call scope.
func CallContextFrom ¶
func CallContextFrom(ctx context.Context) (*CallContext, bool)
CallContextFrom allows to get a CallContext struct which holds additional information about the current execution scope.
func NewCallContext ¶
func NewCallContext(ctx context.Context, params *ServiceParams) (context.Context, *CallContext)
NewCallContext can be called by a transport implementations to provide request metadata to RequestHandler or to have access to the list of activated extensions via the returned CallContext after the call ends. If context already had a CallContext attached, the old context will be shadowed.
func (*CallContext) Extensions ¶
func (cc *CallContext) Extensions() *Extensions
Extensions returns a struct which provides an API for working with extensions in the current call context.
func (*CallContext) Method ¶
func (cc *CallContext) Method() string
Method returns the name of the RequestHandler method which is being executed.
func (*CallContext) ServiceParams ¶
func (cc *CallContext) ServiceParams() *ServiceParams
ServiceParams returns metadata of the request which created the call context.
func (*CallContext) Tenant ¶
func (cc *CallContext) Tenant() string
Tenant returns the tenant ID of the current call context.
type CallInterceptor ¶
type CallInterceptor interface {
// Before allows to observe, modify or reject a Request.
// A new context.Context can be returned to pass information down the call stack.
// If either the result (2nd return value) or the error (3rd return value) is non nil,
// the actual handler will not be called and the value will be returned to the client.
Before(ctx context.Context, callCtx *CallContext, req *Request) (context.Context, any, error)
// After allows to observe, modify or reject a Response.
// Context passed to this method will be the same as returned from [CallInterceptor.Before].
After(ctx context.Context, callCtx *CallContext, resp *Response) error
}
CallInterceptor can be attached to an RequestHandler. If multiple interceptors are added:
- Before will be executed in the order of attachment sequentially.
- After will be executed in the reverse order sequentially.
func NewLoggingInterceptor ¶ added in v2.1.0
func NewLoggingInterceptor(config *LoggingConfig) CallInterceptor
NewLoggingInterceptor creates a CallInterceptor that logs A2A method invocations. Incoming requests are logged at the configured level and errors are logged at the error level.
type ClusterConfig ¶
type ClusterConfig struct {
QueueManager eventqueue.Manager
WorkQueue workqueue.Queue
TaskStore taskstore.Store
}
ClusterConfig groups the necessary dependencies for A2A cluster mode operation.
type ExecutorContext ¶
type ExecutorContext struct {
// A message which triggered the execution. nil for cancelation request.
Message *a2a.Message
// TaskID is an ID of the task or a newly generated UUIDv4 in case Message did not reference any Task.
TaskID a2a.TaskID
// StoredTask is present if request message specified a TaskID.
StoredTask *a2a.Task
// RelatedTasks can be present when Message includes Task references and RequestContextBuilder is configured to load them.
RelatedTasks []*a2a.Task
// ContextID is a server-generated identifier for maintaining context across multiple related tasks or interactions. Matches the Task ContextID.
ContextID string
// Metadata of the request which triggered the call.
Metadata map[string]any
// User who made the request which triggered the execution.
User *User
// ServiceParams of the request which triggered the execution.
ServiceParams *ServiceParams
// Tenant is an optional ID of the agent owner.
Tenant string
}
ExecutorContext provides information about an incoming A2A request to AgentExecutor.
func (*ExecutorContext) TaskInfo ¶
func (ec *ExecutorContext) TaskInfo() a2a.TaskInfo
TaskInfo returns information used for associating events with a task.
type ExecutorContextInterceptor ¶
type ExecutorContextInterceptor interface {
// Intercept can modify the [ExecutorContext] before it gets passed to the [AgentExecutor].
Intercept(ctx context.Context, execCtx *ExecutorContext) (context.Context, error)
}
ExecutorContextInterceptor defines an extension point for modifying the information which gets passed to the agent when it is invoked.
type ExtendedAgentCardProducer ¶
type ExtendedAgentCardProducer interface {
// ExtendedCard returns a self-describing manifest for an agent. It contains extended data
// for authenticated clients.
ExtendedCard(ctx context.Context, req *a2a.GetExtendedAgentCardRequest) (*a2a.AgentCard, error)
}
ExtendedAgentCardProducer creates AgentCard instances used for communicating extended capabilities to authenticated clients.
type ExtendedAgentCardProducerFn ¶
type ExtendedAgentCardProducerFn func(ctx context.Context, req *a2a.GetExtendedAgentCardRequest) (*a2a.AgentCard, error)
ExtendedAgentCardProducerFn is a function type which implements ExtendedAgentCardProducer.
func (ExtendedAgentCardProducerFn) ExtendedCard ¶
func (fn ExtendedAgentCardProducerFn) ExtendedCard(ctx context.Context, req *a2a.GetExtendedAgentCardRequest) (*a2a.AgentCard, error)
ExtendedCard implements ExtendedAgentCardProducer.
type Extensions ¶
type Extensions struct {
// contains filtered or unexported fields
}
Extensions provides utility methods for accessing extensions requested by the client and keeping track of extensions activated during request processing.
func ExtensionsFrom ¶
func ExtensionsFrom(ctx context.Context) (*Extensions, bool)
ExtensionsFrom is a helper function for quick access to Extensions in the current CallContext.
func (*Extensions) Activate ¶
func (e *Extensions) Activate(extension *a2a.AgentExtension)
Activate marks extension as activated in the current CallContext. A list of activated extensions might be attached as response metadata by a transport implementation.
func (*Extensions) ActivatedURIs ¶
func (e *Extensions) ActivatedURIs() []string
ActivatedURIs returns URIs of all extensions activated during call processing.
func (*Extensions) Active ¶
func (e *Extensions) Active(extension *a2a.AgentExtension) bool
Active returns true if an extension has already been activated in the current CallContext using ExtensionContext.Activate.
func (*Extensions) Requested ¶
func (e *Extensions) Requested(extension *a2a.AgentExtension) bool
Requested returns true if the provided extension was requested by the client.
func (*Extensions) RequestedURIs ¶
func (e *Extensions) RequestedURIs() []string
RequestedURIs returns URIs of all extensions requested by the client.
type InterceptedHandler ¶
type InterceptedHandler struct {
// Handler is responsible for the actual processing of every call.
Handler RequestHandler
// Interceptors is a list of call interceptors which will be applied before and after each call.
Interceptors []CallInterceptor
// Logger is the logger which will be accessible from request scope context using log package
// methods. Defaults to slog.Default() if not set.
Logger *slog.Logger
// contains filtered or unexported fields
}
InterceptedHandler implements RequestHandler. It can be used to attach call interceptors and initialize call context for every method of the wrapped handler.
func (*InterceptedHandler) CancelTask ¶
func (h *InterceptedHandler) CancelTask(ctx context.Context, req *a2a.CancelTaskRequest) (*a2a.Task, error)
CancelTask implements RequestHandler.
func (*InterceptedHandler) CreateTaskPushConfig ¶
func (h *InterceptedHandler) CreateTaskPushConfig(ctx context.Context, req *a2a.CreateTaskPushConfigRequest) (*a2a.TaskPushConfig, error)
CreateTaskPushConfig implements RequestHandler.
func (*InterceptedHandler) DeleteTaskPushConfig ¶
func (h *InterceptedHandler) DeleteTaskPushConfig(ctx context.Context, req *a2a.DeleteTaskPushConfigRequest) error
DeleteTaskPushConfig implements RequestHandler.
func (*InterceptedHandler) GetExtendedAgentCard ¶
func (h *InterceptedHandler) GetExtendedAgentCard(ctx context.Context, req *a2a.GetExtendedAgentCardRequest) (*a2a.AgentCard, error)
GetExtendedAgentCard implements RequestHandler.
func (*InterceptedHandler) GetTask ¶
func (h *InterceptedHandler) GetTask(ctx context.Context, req *a2a.GetTaskRequest) (*a2a.Task, error)
GetTask implements RequestHandler.
func (*InterceptedHandler) GetTaskPushConfig ¶
func (h *InterceptedHandler) GetTaskPushConfig(ctx context.Context, req *a2a.GetTaskPushConfigRequest) (*a2a.TaskPushConfig, error)
GetTaskPushConfig implements RequestHandler.
func (*InterceptedHandler) ListTaskPushConfigs ¶
func (h *InterceptedHandler) ListTaskPushConfigs(ctx context.Context, req *a2a.ListTaskPushConfigRequest) ([]*a2a.TaskPushConfig, error)
ListTaskPushConfigs implements RequestHandler.
func (*InterceptedHandler) ListTasks ¶
func (h *InterceptedHandler) ListTasks(ctx context.Context, req *a2a.ListTasksRequest) (*a2a.ListTasksResponse, error)
ListTasks implements RequestHandler.
func (*InterceptedHandler) SendMessage ¶
func (h *InterceptedHandler) SendMessage(ctx context.Context, req *a2a.SendMessageRequest) (a2a.SendMessageResult, error)
SendMessage implements RequestHandler.
func (*InterceptedHandler) SendStreamingMessage ¶
func (h *InterceptedHandler) SendStreamingMessage(ctx context.Context, req *a2a.SendMessageRequest) iter.Seq2[a2a.Event, error]
SendStreamingMessage implements RequestHandler.
func (*InterceptedHandler) SubscribeToTask ¶
func (h *InterceptedHandler) SubscribeToTask(ctx context.Context, req *a2a.SubscribeToTaskRequest) iter.Seq2[a2a.Event, error]
SubscribeToTask implements RequestHandler.
type LoggingConfig ¶ added in v2.1.0
type LoggingConfig struct {
// Level is the log level for incoming requests. Default: slog.LevelInfo.
Level slog.Level
// ErrorLevel is the log level for failed requests. Default: slog.LevelInfo.
ErrorLevel slog.Level
// LogPayload enables logging of request and response payloads.
LogPayload bool
}
LoggingConfig controls the behavior of the logging CallInterceptor created by NewLoggingInterceptor.
type PassthroughCallInterceptor ¶
type PassthroughCallInterceptor struct{}
PassthroughCallInterceptor can be used by CallInterceptor implementers who don't need all methods. The struct can be embedded for providing a no-op implementation.
Example ¶
package main
import (
"context"
"fmt"
"iter"
"github.com/a2aproject/a2a-go/v2/a2a"
"github.com/a2aproject/a2a-go/v2/a2asrv"
)
type echoExecutor struct {
ExecuteFn func(context.Context, *a2asrv.ExecutorContext) iter.Seq2[a2a.Event, error]
}
func (e *echoExecutor) Execute(ctx context.Context, execCtx *a2asrv.ExecutorContext) iter.Seq2[a2a.Event, error] {
if e.ExecuteFn != nil {
return e.ExecuteFn(ctx, execCtx)
}
return func(yield func(a2a.Event, error) bool) {
yield(a2a.NewMessage(a2a.MessageRoleAgent, a2a.NewTextPart("echo")), nil)
}
}
func (e *echoExecutor) Cancel(_ context.Context, execCtx *a2asrv.ExecutorContext) iter.Seq2[a2a.Event, error] {
return func(yield func(a2a.Event, error) bool) {
yield(a2a.NewStatusUpdateEvent(execCtx, a2a.TaskStateCanceled, nil), nil)
}
}
func main() {
type myInterceptor struct {
a2asrv.PassthroughCallInterceptor
}
handler := a2asrv.NewHandler(&echoExecutor{}, a2asrv.WithCallInterceptors(myInterceptor{}))
fmt.Println("Handler created:", handler != nil)
}
Output: Handler created: true
func (PassthroughCallInterceptor) After ¶
func (PassthroughCallInterceptor) After(ctx context.Context, callCtx *CallContext, resp *Response) error
After implements [CallInterceptor.After].
type ReferencedTasksLoader ¶
ReferencedTasksLoader implements ExecutorContextInterceptor. It populates [ExecutorContext.RelatedTasks] with Tasks referenced in the a2a.Message.ReferenceTasks of the message which triggered the agent execution.
func (*ReferencedTasksLoader) Intercept ¶
func (ri *ReferencedTasksLoader) Intercept(ctx context.Context, execCtx *ExecutorContext) (context.Context, error)
Intercept implements ExecutorContextInterceptor. It loads referenced tasks from the task store and populates [ExecutorContext.RelatedTasks].
type Request ¶
type Request struct {
// Payload is one of a2a package core types. It is nil when a request does not have any parameters.
Payload any
}
Request represents a transport-agnostic request received by the A2A server.
type RequestHandler ¶
type RequestHandler interface {
// GetTask handles the 'GetTask' protocol method.
GetTask(context.Context, *a2a.GetTaskRequest) (*a2a.Task, error)
// ListTasks handles the 'ListTasks' protocol method.
ListTasks(context.Context, *a2a.ListTasksRequest) (*a2a.ListTasksResponse, error)
// CancelTask handles the 'CancelTask' protocol method.
CancelTask(context.Context, *a2a.CancelTaskRequest) (*a2a.Task, error)
// SendMessage handles the 'SendMessage' protocol method (non-streaming).
SendMessage(context.Context, *a2a.SendMessageRequest) (a2a.SendMessageResult, error)
// SubscribeToTask handles the `SubscribeToTask` protocol method.
SubscribeToTask(context.Context, *a2a.SubscribeToTaskRequest) iter.Seq2[a2a.Event, error]
// SendStreamingMessage handles the 'SendStreamingMessage' protocol method (streaming).
SendStreamingMessage(context.Context, *a2a.SendMessageRequest) iter.Seq2[a2a.Event, error]
// GetTaskPushConfig handles the `GetTaskPushNotificationConfig` protocol method.
GetTaskPushConfig(context.Context, *a2a.GetTaskPushConfigRequest) (*a2a.TaskPushConfig, error)
// ListTaskPushConfigs handles the `ListTaskPushNotificationConfigs` protocol method.
ListTaskPushConfigs(context.Context, *a2a.ListTaskPushConfigRequest) ([]*a2a.TaskPushConfig, error)
// CreateTaskPushConfig handles the `CreateTaskPushNotificationConfig` protocol method.
CreateTaskPushConfig(context.Context, *a2a.CreateTaskPushConfigRequest) (*a2a.TaskPushConfig, error)
// DeleteTaskPushConfig handles the `DeleteTaskPushNotificationConfig` protocol method.
DeleteTaskPushConfig(context.Context, *a2a.DeleteTaskPushConfigRequest) error
// GetExtendedAgentCard handles the `GetExtendedAgentCard` protocol method.
GetExtendedAgentCard(context.Context, *a2a.GetExtendedAgentCardRequest) (*a2a.AgentCard, error)
}
RequestHandler defines a transport-agnostic interface for handling incoming A2A requests.
func NewHandler ¶
func NewHandler(executor AgentExecutor, options ...RequestHandlerOption) RequestHandler
NewHandler creates a new request handler.
Example ¶
package main
import (
"context"
"fmt"
"iter"
"github.com/a2aproject/a2a-go/v2/a2a"
"github.com/a2aproject/a2a-go/v2/a2asrv"
)
type echoExecutor struct {
ExecuteFn func(context.Context, *a2asrv.ExecutorContext) iter.Seq2[a2a.Event, error]
}
func (e *echoExecutor) Execute(ctx context.Context, execCtx *a2asrv.ExecutorContext) iter.Seq2[a2a.Event, error] {
if e.ExecuteFn != nil {
return e.ExecuteFn(ctx, execCtx)
}
return func(yield func(a2a.Event, error) bool) {
yield(a2a.NewMessage(a2a.MessageRoleAgent, a2a.NewTextPart("echo")), nil)
}
}
func (e *echoExecutor) Cancel(_ context.Context, execCtx *a2asrv.ExecutorContext) iter.Seq2[a2a.Event, error] {
return func(yield func(a2a.Event, error) bool) {
yield(a2a.NewStatusUpdateEvent(execCtx, a2a.TaskStateCanceled, nil), nil)
}
}
func main() {
executor := &echoExecutor{}
handler := a2asrv.NewHandler(executor)
fmt.Println("Handler created:", handler != nil)
}
Output: Handler created: true
Example (FullServer) ¶
package main
import (
"context"
"fmt"
"iter"
"net/http"
"github.com/a2aproject/a2a-go/v2/a2a"
"github.com/a2aproject/a2a-go/v2/a2asrv"
)
type echoExecutor struct {
ExecuteFn func(context.Context, *a2asrv.ExecutorContext) iter.Seq2[a2a.Event, error]
}
func (e *echoExecutor) Execute(ctx context.Context, execCtx *a2asrv.ExecutorContext) iter.Seq2[a2a.Event, error] {
if e.ExecuteFn != nil {
return e.ExecuteFn(ctx, execCtx)
}
return func(yield func(a2a.Event, error) bool) {
yield(a2a.NewMessage(a2a.MessageRoleAgent, a2a.NewTextPart("echo")), nil)
}
}
func (e *echoExecutor) Cancel(_ context.Context, execCtx *a2asrv.ExecutorContext) iter.Seq2[a2a.Event, error] {
return func(yield func(a2a.Event, error) bool) {
yield(a2a.NewStatusUpdateEvent(execCtx, a2a.TaskStateCanceled, nil), nil)
}
}
func main() {
executor := &echoExecutor{}
handler := a2asrv.NewHandler(executor)
card := &a2a.AgentCard{
Name: "My Agent",
Version: "1.0.0",
SupportedInterfaces: []*a2a.AgentInterface{
a2a.NewAgentInterface("http://localhost:8080", a2a.TransportProtocolJSONRPC),
},
}
mux := http.NewServeMux()
mux.Handle(a2asrv.WellKnownAgentCardPath, a2asrv.NewStaticAgentCardHandler(card))
mux.Handle("/", a2asrv.NewJSONRPCHandler(handler))
fmt.Println("Agent card path:", a2asrv.WellKnownAgentCardPath)
fmt.Println("Server ready")
}
Output: Agent card path: /.well-known/agent-card.json Server ready
Example (WithOptions) ¶
package main
import (
"context"
"fmt"
"iter"
"github.com/a2aproject/a2a-go/v2/a2a"
"github.com/a2aproject/a2a-go/v2/a2asrv"
)
type echoExecutor struct {
ExecuteFn func(context.Context, *a2asrv.ExecutorContext) iter.Seq2[a2a.Event, error]
}
func (e *echoExecutor) Execute(ctx context.Context, execCtx *a2asrv.ExecutorContext) iter.Seq2[a2a.Event, error] {
if e.ExecuteFn != nil {
return e.ExecuteFn(ctx, execCtx)
}
return func(yield func(a2a.Event, error) bool) {
yield(a2a.NewMessage(a2a.MessageRoleAgent, a2a.NewTextPart("echo")), nil)
}
}
func (e *echoExecutor) Cancel(_ context.Context, execCtx *a2asrv.ExecutorContext) iter.Seq2[a2a.Event, error] {
return func(yield func(a2a.Event, error) bool) {
yield(a2a.NewStatusUpdateEvent(execCtx, a2a.TaskStateCanceled, nil), nil)
}
}
func main() {
executor := &echoExecutor{}
handler := a2asrv.NewHandler(
executor,
a2asrv.WithExtendedAgentCard(&a2a.AgentCard{Name: "Extended Agent"}),
a2asrv.WithCallInterceptors(&a2asrv.PassthroughCallInterceptor{}),
)
fmt.Println("Handler with options created:", handler != nil)
}
Output: Handler with options created: true
type RequestHandlerOption ¶
type RequestHandlerOption func(*InterceptedHandler, *defaultRequestHandler)
RequestHandlerOption can be used to customize the default RequestHandler implementation behavior.
func WithCallInterceptors ¶
func WithCallInterceptors(interceptors ...CallInterceptor) RequestHandlerOption
WithCallInterceptors adds a CallInterceptor which will be applied to all requests and responses.
func WithCapabilityChecks ¶
func WithCapabilityChecks(capabilities *a2a.AgentCapabilities) RequestHandlerOption
WithCapabilityChecks sets the provided capabilities for the request handler.
func WithClusterMode ¶
func WithClusterMode(config ClusterConfig) RequestHandlerOption
WithClusterMode is an experimental feature where work queue is used to distribute tasks across multiple instances.
func WithConcurrencyConfig ¶
func WithConcurrencyConfig(config limiter.ConcurrencyConfig) RequestHandlerOption
WithConcurrencyConfig allows to set limits on the number of concurrent executions.
func WithEventQueueManager ¶
func WithEventQueueManager(manager eventqueue.Manager) RequestHandlerOption
WithEventQueueManager overrides eventqueue.Manager with custom implementation
func WithExecutionPanicHandler ¶
func WithExecutionPanicHandler(handler func(r any) error) RequestHandlerOption
WithExecutionPanicHandler allows to set a custom handler for panics occurred during execution.
func WithExecutorContextInterceptor ¶
func WithExecutorContextInterceptor(interceptor ExecutorContextInterceptor) RequestHandlerOption
WithExecutorContextInterceptor overrides the default ExecutorContextInterceptor with a custom implementation.
func WithExtendedAgentCard ¶
func WithExtendedAgentCard(card *a2a.AgentCard) RequestHandlerOption
WithExtendedAgentCard sets a static extended authenticated agent card.
func WithExtendedAgentCardProducer ¶
func WithExtendedAgentCardProducer(cardProducer ExtendedAgentCardProducer) RequestHandlerOption
WithExtendedAgentCardProducer sets a dynamic extended authenticated agent card producer.
func WithLogger ¶
func WithLogger(logger *slog.Logger) RequestHandlerOption
WithLogger sets a custom logger. Request scoped parameters will be attached to this logger on method invocations. Any injected dependency will be able to access the logger using github.com/a2aproject/a2a-go/v2/log package-level functions. If not provided, defaults to slog.Default().
func WithPushNotifications ¶
func WithPushNotifications(store push.ConfigStore, sender push.Sender) RequestHandlerOption
WithPushNotifications adds support for push notifications. If dependencies are not provided push-related methods will be returning a2a.ErrPushNotificationNotSupported,
func WithTaskStore ¶
func WithTaskStore(store taskstore.Store) RequestHandlerOption
WithTaskStore overrides TaskStore with a custom implementation. If not provided, default to an in-memory implementation.
type Response ¶
type Response struct {
// Payload is one of a2a package core types. It is nil when Err is set or when a request does not return any value.
Payload any
// Err is set to indicate that request processing failed.
Err error
}
Response represents a transport-agnostic response generated by the A2A server. Payload is one of a2a package core types.
type ServiceParams ¶
type ServiceParams struct {
// contains filtered or unexported fields
}
ServiceParams holds the metadata associated with a request. Custom transport implementations can call NewCallContext to make it accessible during request processing.
Example ¶
package main
import (
"context"
"fmt"
"iter"
"net/http"
"net/http/httptest"
"github.com/a2aproject/a2a-go/v2/a2a"
"github.com/a2aproject/a2a-go/v2/a2asrv"
)
type echoExecutor struct {
ExecuteFn func(context.Context, *a2asrv.ExecutorContext) iter.Seq2[a2a.Event, error]
}
func (e *echoExecutor) Execute(ctx context.Context, execCtx *a2asrv.ExecutorContext) iter.Seq2[a2a.Event, error] {
if e.ExecuteFn != nil {
return e.ExecuteFn(ctx, execCtx)
}
return func(yield func(a2a.Event, error) bool) {
yield(a2a.NewMessage(a2a.MessageRoleAgent, a2a.NewTextPart("echo")), nil)
}
}
func (e *echoExecutor) Cancel(_ context.Context, execCtx *a2asrv.ExecutorContext) iter.Seq2[a2a.Event, error] {
return func(yield func(a2a.Event, error) bool) {
yield(a2a.NewStatusUpdateEvent(execCtx, a2a.TaskStateCanceled, nil), nil)
}
}
type testInterceptor struct {
BeforeFn func(ctx context.Context, callCtx *a2asrv.CallContext, req *a2asrv.Request) (context.Context, any, error)
AfterFn func(ctx context.Context, callCtx *a2asrv.CallContext, resp *a2asrv.Response) error
}
func (ti *testInterceptor) Before(ctx context.Context, callCtx *a2asrv.CallContext, req *a2asrv.Request) (context.Context, any, error) {
if ti.BeforeFn != nil {
return ti.BeforeFn(ctx, callCtx, req)
}
return ctx, nil, nil
}
func (ti *testInterceptor) After(ctx context.Context, callCtx *a2asrv.CallContext, resp *a2asrv.Response) error {
if ti.AfterFn != nil {
return ti.AfterFn(ctx, callCtx, resp)
}
return nil
}
func main() {
var capturedHeader string
interceptor := &testInterceptor{
BeforeFn: func(ctx context.Context, callCtx *a2asrv.CallContext, _ *a2asrv.Request) (context.Context, any, error) {
if vals, ok := callCtx.ServiceParams().Get("x-custom-header"); ok && len(vals) > 0 {
capturedHeader = vals[0]
}
return ctx, nil, nil
},
}
executor := &echoExecutor{}
handler := a2asrv.NewHandler(executor, a2asrv.WithCallInterceptors(interceptor))
restHandler := a2asrv.NewRESTHandler(handler)
server := httptest.NewServer(restHandler)
defer server.Close()
req, _ := http.NewRequest("GET", server.URL+"/tasks/task-123", nil)
req.Header.Set("X-Custom-Header", "my-value")
resp, err := http.DefaultClient.Do(req)
if err != nil {
fmt.Println("Error:", err)
return
}
defer func() { _ = resp.Body.Close() }()
// The task won't be found, but the interceptor still captures the header.
fmt.Println("Header in ServiceParams:", capturedHeader)
}
Output: Header in ServiceParams: my-value
func NewServiceParams ¶
func NewServiceParams(src map[string][]string) *ServiceParams
NewServiceParams is a ServiceParams constructor function.
func (*ServiceParams) Get ¶
func (sp *ServiceParams) Get(key string) ([]string, bool)
Get performs a case-insensitive lookup of values for the given key.
func (*ServiceParams) List ¶
func (sp *ServiceParams) List() iter.Seq2[string, []string]
List allows to inspect all request meta values.
func (*ServiceParams) With ¶
func (sp *ServiceParams) With(additional map[string][]string) *ServiceParams
With allows to create a ServiceParams instance holding the extended set of values.
type TransportConfig ¶
TransportConfig holds the configuration for transport bindings.
type TransportOption ¶
type TransportOption func(*TransportConfig)
TransportOption is a functional option for configuring protocol binding implementations.
func WithTransportKeepAlive ¶
func WithTransportKeepAlive(interval time.Duration) TransportOption
WithTransportKeepAlive enables SSE keep-alive messages at the specified interval. Keep-alive messages prevent API gateways from dropping idle connections. If interval is 0 or negative, keep-alive is disabled (default behavior).
func WithTransportPanicHandler ¶
func WithTransportPanicHandler(handler func(r any) error) TransportOption
WithTransportPanicHandler sets a custom panic handler for the transport bindings. This gives the ability to recovery from panic by returning an error to the client.
type User ¶
type User struct {
// Name is a username.
Name string
// Authenticated is true if the request was authenticated.
Authenticated bool
// Attributes is a map of attributes associated with the user.
Attributes map[string]any
}
User can be attached to CallContext by authentication middleware.
Example ¶
package main
import (
"context"
"fmt"
"iter"
"strings"
"github.com/a2aproject/a2a-go/v2/a2a"
"github.com/a2aproject/a2a-go/v2/a2asrv"
)
type echoExecutor struct {
ExecuteFn func(context.Context, *a2asrv.ExecutorContext) iter.Seq2[a2a.Event, error]
}
func (e *echoExecutor) Execute(ctx context.Context, execCtx *a2asrv.ExecutorContext) iter.Seq2[a2a.Event, error] {
if e.ExecuteFn != nil {
return e.ExecuteFn(ctx, execCtx)
}
return func(yield func(a2a.Event, error) bool) {
yield(a2a.NewMessage(a2a.MessageRoleAgent, a2a.NewTextPart("echo")), nil)
}
}
func (e *echoExecutor) Cancel(_ context.Context, execCtx *a2asrv.ExecutorContext) iter.Seq2[a2a.Event, error] {
return func(yield func(a2a.Event, error) bool) {
yield(a2a.NewStatusUpdateEvent(execCtx, a2a.TaskStateCanceled, nil), nil)
}
}
type testInterceptor struct {
BeforeFn func(ctx context.Context, callCtx *a2asrv.CallContext, req *a2asrv.Request) (context.Context, any, error)
AfterFn func(ctx context.Context, callCtx *a2asrv.CallContext, resp *a2asrv.Response) error
}
func (ti *testInterceptor) Before(ctx context.Context, callCtx *a2asrv.CallContext, req *a2asrv.Request) (context.Context, any, error) {
if ti.BeforeFn != nil {
return ti.BeforeFn(ctx, callCtx, req)
}
return ctx, nil, nil
}
func (ti *testInterceptor) After(ctx context.Context, callCtx *a2asrv.CallContext, resp *a2asrv.Response) error {
if ti.AfterFn != nil {
return ti.AfterFn(ctx, callCtx, resp)
}
return nil
}
func main() {
authenticate := func(_ string) string { return "user" }
interceptor := &testInterceptor{
BeforeFn: func(ctx context.Context, callCtx *a2asrv.CallContext, req *a2asrv.Request) (context.Context, any, error) {
if auth, ok := callCtx.ServiceParams().Get("authorization"); ok && len(auth) > 0 && strings.HasPrefix(auth[0], "Bearer ") {
if name := authenticate(auth[0]); name != "" {
callCtx.User = a2asrv.NewAuthenticatedUser(name, nil)
}
}
return ctx, nil, nil
},
}
executor := &echoExecutor{
ExecuteFn: func(_ context.Context, execCtx *a2asrv.ExecutorContext) iter.Seq2[a2a.Event, error] {
return func(yield func(a2a.Event, error) bool) {
fmt.Println("Auth found:", execCtx.User.Name)
yield(a2a.NewMessage(a2a.MessageRoleAgent, a2a.NewTextPart("echo")), nil)
}
},
}
ctx, _ := a2asrv.NewCallContext(context.Background(), a2asrv.NewServiceParams(map[string][]string{
"Authorization": {"Bearer token"},
}))
handler := a2asrv.NewHandler(executor, a2asrv.WithCallInterceptors(interceptor))
_, err := handler.SendMessage(ctx, &a2a.SendMessageRequest{
Message: a2a.NewMessage(a2a.MessageRoleUser, a2a.NewTextPart("echo")),
})
fmt.Println("Error:", err)
}
Output: Auth found: user Error: <nil>
Source Files
¶
Directories
¶
| Path | Synopsis |
|---|---|
|
Package eventqueue provides implementation for in-memory queue management and event processing.
|
Package eventqueue provides implementation for in-memory queue management and event processing. |
|
Package limiter provides configurations for controlling concurrency limit.
|
Package limiter provides configurations for controlling concurrency limit. |
|
Package push provides a basic implementation of push notification functionality.
|
Package push provides a basic implementation of push notification functionality. |
|
Package taskstore provides the contract and in-memory implementation of a2a.Task storage.
|
Package taskstore provides the contract and in-memory implementation of a2a.Task storage. |
|
Package workqueue provides a work queue implementation for task execution.
|
Package workqueue provides a work queue implementation for task execution. |