a2aserver

package module
v1.3.10 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 8, 2026 License: Apache-2.0 Imports: 18 Imported by: 0

Documentation

Overview

Package a2aserver provides a standalone A2A-protocol HTTP server that can be backed by any conversation implementation satisfying the interfaces defined here. It imports only runtime/ and has no dependency on sdk/.

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrTaskNotFound      = errors.New("a2a: task not found")
	ErrTaskAlreadyExists = errors.New("a2a: task already exists")
	ErrInvalidTransition = errors.New("a2a: invalid state transition")
	ErrTaskTerminal      = errors.New("a2a: task is in a terminal state")
)

Task store errors.

View Source
var ErrTooManySubscribers = fmt.Errorf("a2a: too many subscribers")

ErrTooManySubscribers is returned when a broadcaster has reached its subscriber limit.

Functions

This section is empty.

Types

type AgentCardProvider

type AgentCardProvider interface {
	AgentCard(r *http.Request) (*a2a.AgentCard, error)
}

AgentCardProvider returns the agent card to serve at /.well-known/agent.json.

type Authenticator

type Authenticator interface {
	Authenticate(r *http.Request) error
}

Authenticator validates incoming requests. Return a non-nil error to reject.

type Conversation

type Conversation interface {
	Send(ctx context.Context, message any) (SendResult, error)
	Close() error
}

Conversation is the non-streaming conversation interface the server uses.

type ConversationOpener

type ConversationOpener func(contextID string) (Conversation, error)

ConversationOpener creates or retrieves a conversation for a given context ID.

type EventKind

type EventKind int

EventKind discriminates the payload of a StreamEvent.

const (
	// EventText indicates the event contains text content.
	EventText EventKind = iota

	// EventToolCall indicates a tool call (suppressed by the server).
	EventToolCall

	// EventMedia indicates the event contains media content.
	EventMedia

	// EventDone indicates the stream is complete.
	EventDone

	// EventClientTool indicates a client tool request awaiting fulfillment.
	EventClientTool
)

type HealthChecker added in v1.3.10

type HealthChecker interface {
	Check(ctx context.Context) error
}

HealthChecker performs a named health check. Implementations should return nil when healthy and a non-nil error describing the problem otherwise.

type HealthCheckerFunc added in v1.3.10

type HealthCheckerFunc func(ctx context.Context) error

HealthCheckerFunc adapts an ordinary function to the HealthChecker interface.

func (HealthCheckerFunc) Check added in v1.3.10

func (f HealthCheckerFunc) Check(ctx context.Context) error

Check calls f(ctx).

type InMemoryTaskStore

type InMemoryTaskStore struct {
	// contains filtered or unexported fields
}

InMemoryTaskStore is a concurrency-safe, in-memory implementation of TaskStore.

func NewInMemoryTaskStore

func NewInMemoryTaskStore() *InMemoryTaskStore

NewInMemoryTaskStore creates a new InMemoryTaskStore.

func (*InMemoryTaskStore) AddArtifacts

func (s *InMemoryTaskStore) AddArtifacts(taskID string, artifacts []a2a.Artifact) error

AddArtifacts appends artifacts to a task.

func (*InMemoryTaskStore) Cancel

func (s *InMemoryTaskStore) Cancel(taskID string) error

Cancel transitions the task to the canceled state from any non-terminal state.

func (*InMemoryTaskStore) Create

func (s *InMemoryTaskStore) Create(taskID, contextID string) (*a2a.Task, error)

Create initializes a new task in the submitted state.

func (*InMemoryTaskStore) EvictTerminal

func (s *InMemoryTaskStore) EvictTerminal(cutoff time.Time) []string

EvictTerminal removes tasks in a terminal state whose last status timestamp is older than cutoff. It returns the IDs of evicted tasks.

func (*InMemoryTaskStore) Get

func (s *InMemoryTaskStore) Get(taskID string) (*a2a.Task, error)

Get retrieves a deep copy of a task by ID. The returned task is safe to read/modify without holding the store lock.

func (*InMemoryTaskStore) List

func (s *InMemoryTaskStore) List(contextID string, limit, offset int) ([]*a2a.Task, error)

List returns deep copies of tasks matching the given contextID with pagination. If contextID is empty, all tasks are returned. Results are sorted by ID for deterministic pagination. Offset and limit control pagination.

func (*InMemoryTaskStore) SetState

func (s *InMemoryTaskStore) SetState(taskID string, state a2a.TaskState, msg *a2a.Message) error

SetState transitions the task to a new state with an optional status message.

type Option

type Option func(*Server)

Option configures a Server.

func WithAuthenticator

func WithAuthenticator(auth Authenticator) Option

WithAuthenticator sets an authenticator for incoming requests.

func WithCard

func WithCard(card *a2a.AgentCard) Option

WithCard sets the agent card served at /.well-known/agent.json.

func WithCardProvider

func WithCardProvider(p AgentCardProvider) Option

WithCardProvider sets a dynamic agent card provider.

func WithConversationTTL

func WithConversationTTL(d time.Duration) Option

WithConversationTTL sets how long idle conversations are retained before automatic eviction. A conversation is considered idle when its last-use timestamp exceeds this duration. Default: 1 hour. Set to 0 to disable.

func WithHealthCheck added in v1.3.10

func WithHealthCheck(name string, checker HealthChecker) Option

WithHealthCheck registers a named health checker that is evaluated by the /readyz endpoint. Multiple checkers can be registered; each is reported individually in the response body.

func WithIdleTimeout

func WithIdleTimeout(d time.Duration) Option

WithIdleTimeout sets the maximum amount of time to wait for the next request when keep-alives are enabled. Default: 120s.

func WithMaxBodySize

func WithMaxBodySize(n int64) Option

WithMaxBodySize sets the maximum allowed request body size in bytes. Default: 10 MB.

func WithPort

func WithPort(port int) Option

WithPort sets the TCP port for ListenAndServe.

func WithReadTimeout

func WithReadTimeout(d time.Duration) Option

WithReadTimeout sets the maximum duration for reading the entire request. Default: 30s.

func WithTaskStore

func WithTaskStore(store TaskStore) Option

WithTaskStore sets a custom task store. Defaults to an in-memory store.

func WithTaskTTL

func WithTaskTTL(d time.Duration) Option

WithTaskTTL sets how long completed/failed/canceled tasks are retained before automatic eviction. Default: 1 hour. Set to 0 to disable eviction.

func WithWriteTimeout

func WithWriteTimeout(d time.Duration) Option

WithWriteTimeout sets the maximum duration before timing out writes of the response. Default: 60s.

type PendingClientToolInfo added in v1.3.8

type PendingClientToolInfo struct {
	CallID     string         `json:"call_id"`
	ToolName   string         `json:"tool_name"`
	Args       map[string]any `json:"args,omitempty"`
	ConsentMsg string         `json:"consent_message,omitempty"`
}

PendingClientToolInfo describes a client-side tool call awaiting fulfillment.

type ResumableConversation added in v1.3.8

type ResumableConversation interface {
	Conversation
	SendToolResult(callID string, result any) error
	RejectClientTool(callID string, reason string)
	Resume(ctx context.Context) (SendResult, error)
	ResumeStream(ctx context.Context) <-chan StreamEvent
}

ResumableConversation extends Conversation with the ability to submit client-side tool results and resume pipeline execution.

type SendResult

type SendResult interface {
	// HasPendingTools reports whether there are tools awaiting approval
	// (HITL or client-side). When true the task transitions to input_required.
	HasPendingTools() bool

	// HasPendingClientTools reports whether there are client-side tools
	// awaiting fulfillment by the caller.
	HasPendingClientTools() bool

	// PendingClientTools returns metadata for each pending client tool.
	PendingClientTools() []PendingClientToolInfo

	// Parts returns the content parts of the response.
	Parts() []types.ContentPart

	// Text returns the text content of the response as a fallback
	// when Parts() is empty.
	Text() string
}

SendResult is what the server needs from a completed conversation turn.

type Server

type Server struct {
	// contains filtered or unexported fields
}

Server is an HTTP server that exposes a Conversation as an A2A-compliant JSON-RPC endpoint.

func NewServer

func NewServer(opener ConversationOpener, opts ...Option) *Server

NewServer creates a new A2A server.

func (*Server) Handler

func (s *Server) Handler() http.Handler

Handler returns an http.Handler implementing the A2A protocol.

func (*Server) ListenAndServe

func (s *Server) ListenAndServe() error

ListenAndServe starts the HTTP server on the configured port.

WriteTimeout is set to 0 (disabled) because SSE streaming endpoints (message/stream, tasks/subscribe) hold the connection open indefinitely. A non-zero WriteTimeout would kill long-lived SSE connections. Non-streaming endpoints rely on the request context deadline for timeout enforcement.

func (*Server) Serve

func (s *Server) Serve(ln net.Listener) error

Serve starts the HTTP server on the given listener. See ListenAndServe for the rationale behind WriteTimeout: 0.

func (*Server) Shutdown

func (s *Server) Shutdown(ctx context.Context) error

Shutdown gracefully shuts down the server: stops the eviction goroutine, drains HTTP requests, cancels in-flight tasks, and closes all conversations.

type StaticCard

type StaticCard struct {
	Card a2a.AgentCard
}

StaticCard is an AgentCardProvider that always returns the same card.

func (*StaticCard) AgentCard

func (s *StaticCard) AgentCard(*http.Request) (*a2a.AgentCard, error)

AgentCard returns the static card.

type StreamEvent

type StreamEvent struct {
	Kind       EventKind
	Text       string
	Media      *types.MediaContent
	ClientTool *PendingClientToolInfo
	Error      error
}

StreamEvent is a single event on a streaming channel.

type StreamingConversation

type StreamingConversation interface {
	Conversation
	Stream(ctx context.Context, message any) <-chan StreamEvent
}

StreamingConversation extends Conversation with streaming support.

type TaskStore

type TaskStore interface {
	Create(taskID, contextID string) (*a2a.Task, error)
	Get(taskID string) (*a2a.Task, error)
	SetState(taskID string, state a2a.TaskState, msg *a2a.Message) error
	AddArtifacts(taskID string, artifacts []a2a.Artifact) error
	Cancel(taskID string) error
	List(contextID string, limit, offset int) ([]*a2a.Task, error)

	// EvictTerminal removes tasks in a terminal state whose last status
	// timestamp is older than the given cutoff time. It returns the IDs
	// of evicted tasks so callers can clean up associated resources.
	EvictTerminal(olderThan time.Time) []string
}

TaskStore defines the interface for task persistence and lifecycle management.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL