Documentation
¶
Overview ¶
Package server provides callback functionality for hooking into agent execution lifecycle.
Callbacks allow you to intercept and modify behavior at various points during agent execution:
- BeforeAgent/AfterAgent: Hook into the overall agent execution
- BeforeModel/AfterModel: Hook into LLM calls for caching, guardrails, etc.
- BeforeTool/AfterTool: Hook into tool execution for authorization, logging, etc.
Flow Control ¶
Before callbacks can skip default behavior by returning a non-nil value:
- BeforeAgent returning a Message skips agent execution
- BeforeModel returning an LLMResponse skips the LLM call
- BeforeTool returning a map skips tool execution
After callbacks can modify outputs by returning a non-nil value:
- AfterAgent returning a Message replaces the agent output
- AfterModel returning an LLMResponse replaces the LLM response
- AfterTool returning a map replaces the tool result
Example Usage ¶
callbacks := &CallbackConfig{
BeforeAgent: []BeforeAgentCallback{
func(ctx context.Context, callbackCtx *CallbackContext) *types.Message {
// Log, validate, or return early
return nil // proceed with execution
},
},
BeforeModel: []BeforeModelCallback{
func(ctx context.Context, callbackCtx *CallbackContext, req *LLMRequest) *LLMResponse {
// Implement caching or guardrails
return nil // proceed with LLM call
},
},
}
agent, err := NewAgentBuilder(logger).
WithCallbacks(callbacks).
Build()
Index ¶
- Variables
- func GenerateTaskID() string
- func GetSupportedProviders() []string
- func JSONTool(result any) (string, error)
- func NewEmptyMessagePartsError() error
- func NewStreamingNotImplementedError() error
- func NewTaskNotCancelableError(taskID string, state types.TaskState) error
- func NewTaskNotFoundError(taskID string) error
- func RegisterStorageProvider(provider string, factory StorageFactory)
- func StringPtr(s string) *string
- type A2AProtocolHandler
- type A2AServer
- func CustomA2AServer(cfg config.Config, logger *zap.Logger, pollingTaskHandler TaskHandler, ...) (A2AServer, error)
- func CustomA2AServerWithAgent(cfg config.Config, logger *zap.Logger, agent OpenAICompatibleAgent, ...) (A2AServer, error)
- func SimpleA2AServerWithAgent(cfg config.Config, logger *zap.Logger, agent OpenAICompatibleAgent, ...) (A2AServer, error)
- type A2AServerBuilder
- type A2AServerBuilderImpl
- func (b *A2AServerBuilderImpl) Build() (A2AServer, error)
- func (b *A2AServerBuilderImpl) WithAgent(agent OpenAICompatibleAgent) A2AServerBuilder
- func (b *A2AServerBuilderImpl) WithAgentCard(agentCard types.AgentCard) A2AServerBuilder
- func (b *A2AServerBuilderImpl) WithAgentCardFromFile(filePath string, overrides map[string]any) A2AServerBuilder
- func (b *A2AServerBuilderImpl) WithArtifactService(service ArtifactService) A2AServerBuilder
- func (b *A2AServerBuilderImpl) WithBackgroundTaskHandler(handler TaskHandler) A2AServerBuilder
- func (b *A2AServerBuilderImpl) WithDefaultBackgroundTaskHandler() A2AServerBuilder
- func (b *A2AServerBuilderImpl) WithDefaultStreamingTaskHandler() A2AServerBuilder
- func (b *A2AServerBuilderImpl) WithDefaultTaskHandlers() A2AServerBuilder
- func (b *A2AServerBuilderImpl) WithLogger(logger *zap.Logger) A2AServerBuilder
- func (b *A2AServerBuilderImpl) WithStreamingTaskHandler(handler StreamableTaskHandler) A2AServerBuilder
- func (b *A2AServerBuilderImpl) WithTaskResultProcessor(processor TaskResultProcessor) A2AServerBuilder
- type A2AServerImpl
- func NewA2AServer(cfg *config.Config, logger *zap.Logger, otel otel.OpenTelemetry) *A2AServerImpl
- func NewA2AServerEnvironmentAware(cfg *config.Config, logger *zap.Logger, otel otel.OpenTelemetry) *A2AServerImpl
- func NewA2AServerWithAgent(cfg *config.Config, logger *zap.Logger, otel otel.OpenTelemetry, ...) *A2AServerImpl
- func NewDefaultA2AServer(cfg *config.Config) *A2AServerImpl
- func (s *A2AServerImpl) GetAgent() OpenAICompatibleAgent
- func (s *A2AServerImpl) GetAgentCard() *types.AgentCard
- func (s *A2AServerImpl) GetBackgroundTaskHandler() TaskHandler
- func (s *A2AServerImpl) GetStreamingTaskHandler() StreamableTaskHandler
- func (s *A2AServerImpl) LoadAgentCardFromFile(filePath string, overrides map[string]any) error
- func (s *A2AServerImpl) SetAgent(agent OpenAICompatibleAgent)
- func (s *A2AServerImpl) SetAgentCard(agentCard types.AgentCard)
- func (s *A2AServerImpl) SetAgentDescription(description string)
- func (s *A2AServerImpl) SetAgentName(name string)
- func (s *A2AServerImpl) SetAgentURL(url string)
- func (s *A2AServerImpl) SetAgentVersion(version string)
- func (s *A2AServerImpl) SetBackgroundTaskHandler(handler TaskHandler)
- func (s *A2AServerImpl) SetStreamingTaskHandler(handler StreamableTaskHandler)
- func (s *A2AServerImpl) SetTaskResultProcessor(processor TaskResultProcessor)
- func (s *A2AServerImpl) Start(ctx context.Context) error
- func (s *A2AServerImpl) StartTaskProcessor(ctx context.Context)
- func (s *A2AServerImpl) Stop(ctx context.Context) error
- type AfterAgentCallback
- type AfterModelCallback
- type AfterToolCallback
- type AgentBuilder
- type AgentBuilderImpl
- func (b *AgentBuilderImpl) Build() (*OpenAICompatibleAgentImpl, error)
- func (b *AgentBuilderImpl) GetConfig() *config.AgentConfig
- func (b *AgentBuilderImpl) WithCallbacks(config *CallbackConfig) AgentBuilder
- func (b *AgentBuilderImpl) WithConfig(userConfig *config.AgentConfig) AgentBuilder
- func (b *AgentBuilderImpl) WithDefaultToolBox() AgentBuilder
- func (b *AgentBuilderImpl) WithLLMClient(client LLMClient) AgentBuilder
- func (b *AgentBuilderImpl) WithMaxChatCompletion(max int) AgentBuilder
- func (b *AgentBuilderImpl) WithMaxConversationHistory(max int) AgentBuilder
- func (b *AgentBuilderImpl) WithSystemPrompt(prompt string) AgentBuilder
- func (b *AgentBuilderImpl) WithToolBox(toolBox ToolBox) AgentBuilder
- type ArtifactMetadata
- type ArtifactService
- type ArtifactServiceImpl
- func (as *ArtifactServiceImpl) AddArtifactToTask(task *types.Task, artifact types.Artifact)
- func (as *ArtifactServiceImpl) AddArtifactsToTask(task *types.Task, artifacts []types.Artifact)
- func (as *ArtifactServiceImpl) CleanupExpiredArtifacts(ctx context.Context, maxAge time.Duration) (int, error)
- func (as *ArtifactServiceImpl) CleanupOldestArtifacts(ctx context.Context, maxArtifacts int) (int, error)
- func (as *ArtifactServiceImpl) Close() error
- func (as *ArtifactServiceImpl) CreateDataArtifact(name, description string, data map[string]any) types.Artifact
- func (as *ArtifactServiceImpl) CreateFileArtifact(name, description, filename string, data []byte, mimeType *string) (types.Artifact, error)
- func (as *ArtifactServiceImpl) CreateFileArtifactFromURI(name, description, filename, uri string, mimeType *string) types.Artifact
- func (as *ArtifactServiceImpl) CreateMultiPartArtifact(name, description string, parts []types.Part) types.Artifact
- func (as *ArtifactServiceImpl) CreateTaskArtifactUpdateEvent(taskID, contextID string, artifact types.Artifact, append, lastChunk *bool) types.TaskArtifactUpdateEvent
- func (as *ArtifactServiceImpl) CreateTextArtifact(name, description, text string) types.Artifact
- func (as *ArtifactServiceImpl) Exists(ctx context.Context, artifactID, filename string) (bool, error)
- func (as *ArtifactServiceImpl) GetArtifactByID(task *types.Task, artifactID string) (*types.Artifact, bool)
- func (as *ArtifactServiceImpl) GetArtifactsByType(task *types.Task, partKind string) []types.Artifact
- func (as *ArtifactServiceImpl) GetMimeTypeFromExtension(filename string) *string
- func (as *ArtifactServiceImpl) Retrieve(ctx context.Context, artifactID, filename string) (io.ReadCloser, error)
- func (as *ArtifactServiceImpl) ValidateArtifact(artifact types.Artifact) error
- type ArtifactStorageProvider
- type ArtifactsServer
- type ArtifactsServerBuilder
- type ArtifactsServerBuilderImpl
- type ArtifactsServerImpl
- type BasicTool
- type BeforeAgentCallback
- type BeforeModelCallback
- type BeforeToolCallback
- type CallbackConfig
- type CallbackContext
- type CallbackExecutor
- type ContextKey
- type DefaultA2AProtocolHandler
- func (h *DefaultA2AProtocolHandler) CreateTaskFromMessage(ctx context.Context, params types.MessageSendParams) (*types.Task, error)
- func (h *DefaultA2AProtocolHandler) HandleMessageSend(c *gin.Context, req types.JSONRPCRequest)
- func (h *DefaultA2AProtocolHandler) HandleMessageStream(c *gin.Context, req types.JSONRPCRequest, ...)
- func (h *DefaultA2AProtocolHandler) HandleTaskCancel(c *gin.Context, req types.JSONRPCRequest)
- func (h *DefaultA2AProtocolHandler) HandleTaskGet(c *gin.Context, req types.JSONRPCRequest)
- func (h *DefaultA2AProtocolHandler) HandleTaskList(c *gin.Context, req types.JSONRPCRequest)
- func (h *DefaultA2AProtocolHandler) HandleTaskPushNotificationConfigDelete(c *gin.Context, req types.JSONRPCRequest)
- func (h *DefaultA2AProtocolHandler) HandleTaskPushNotificationConfigGet(c *gin.Context, req types.JSONRPCRequest)
- func (h *DefaultA2AProtocolHandler) HandleTaskPushNotificationConfigList(c *gin.Context, req types.JSONRPCRequest)
- func (h *DefaultA2AProtocolHandler) HandleTaskPushNotificationConfigSet(c *gin.Context, req types.JSONRPCRequest)
- type DefaultBackgroundTaskHandler
- type DefaultCallbackExecutor
- func (ce *DefaultCallbackExecutor) ExecuteAfterAgent(ctx context.Context, callbackContext *CallbackContext, ...) *types.Message
- func (ce *DefaultCallbackExecutor) ExecuteAfterModel(ctx context.Context, callbackContext *CallbackContext, ...) *LLMResponse
- func (ce *DefaultCallbackExecutor) ExecuteAfterTool(ctx context.Context, tool Tool, args map[string]any, toolContext *ToolContext, ...) map[string]any
- func (ce *DefaultCallbackExecutor) ExecuteBeforeAgent(ctx context.Context, callbackContext *CallbackContext) *types.Message
- func (ce *DefaultCallbackExecutor) ExecuteBeforeModel(ctx context.Context, callbackContext *CallbackContext, llmRequest *LLMRequest) *LLMResponse
- func (ce *DefaultCallbackExecutor) ExecuteBeforeTool(ctx context.Context, tool Tool, args map[string]any, toolContext *ToolContext) map[string]any
- type DefaultResponseSender
- type DefaultStreamingTaskHandler
- func (sth *DefaultStreamingTaskHandler) GetAgent() OpenAICompatibleAgent
- func (sth *DefaultStreamingTaskHandler) HandleStreamingTask(ctx context.Context, task *types.Task, message *types.Message) (<-chan cloudevents.Event, error)
- func (sth *DefaultStreamingTaskHandler) SetAgent(agent OpenAICompatibleAgent)
- type DefaultTaskManager
- func (tm *DefaultTaskManager) CancelTask(taskID string) error
- func (tm *DefaultTaskManager) CleanupCompletedTasks()
- func (tm *DefaultTaskManager) CreateTask(contextID string, state types.TaskState, message *types.Message) *types.Task
- func (tm *DefaultTaskManager) CreateTaskWithHistory(contextID string, state types.TaskState, message *types.Message, ...) *types.Task
- func (tm *DefaultTaskManager) DeleteTaskPushNotificationConfig(params types.DeleteTaskPushNotificationConfigParams) error
- func (tm *DefaultTaskManager) GetConversationHistory(contextID string) []types.Message
- func (tm *DefaultTaskManager) GetStorage() Storage
- func (tm *DefaultTaskManager) GetTask(taskID string) (*types.Task, bool)
- func (tm *DefaultTaskManager) GetTaskPushNotificationConfig(params types.GetTaskPushNotificationConfigParams) (*types.TaskPushNotificationConfig, error)
- func (tm *DefaultTaskManager) IsTaskPaused(taskID string) (bool, error)
- func (tm *DefaultTaskManager) ListTaskPushNotificationConfigs(params types.ListTaskPushNotificationConfigParams) ([]types.TaskPushNotificationConfig, error)
- func (tm *DefaultTaskManager) ListTasks(params types.TaskListParams) (*types.TaskList, error)
- func (tm *DefaultTaskManager) PauseTaskForInput(taskID string, message *types.Message) error
- func (tm *DefaultTaskManager) PollTaskStatus(taskID string, interval time.Duration, timeout time.Duration) (*types.Task, error)
- func (tm *DefaultTaskManager) RegisterTaskCancelFunc(taskID string, cancelFunc context.CancelFunc)
- func (tm *DefaultTaskManager) ResumeTaskWithInput(taskID string, message *types.Message) error
- func (tm *DefaultTaskManager) SetNotificationSender(sender PushNotificationSender)
- func (tm *DefaultTaskManager) SetRetentionConfig(retentionConfig config.TaskRetentionConfig)
- func (tm *DefaultTaskManager) SetTaskPushNotificationConfig(config types.TaskPushNotificationConfig) (*types.TaskPushNotificationConfig, error)
- func (tm *DefaultTaskManager) StopCleanup()
- func (tm *DefaultTaskManager) UnregisterTaskCancelFunc(taskID string)
- func (tm *DefaultTaskManager) UpdateConversationHistory(contextID string, messages []types.Message)
- func (tm *DefaultTaskManager) UpdateError(taskID string, message *types.Message) error
- func (tm *DefaultTaskManager) UpdateState(taskID string, state types.TaskState) error
- func (tm *DefaultTaskManager) UpdateTask(task *types.Task) error
- type DefaultToolBox
- func (tb *DefaultToolBox) AddTool(tool Tool)
- func (tb *DefaultToolBox) ExecuteTool(ctx context.Context, toolName string, arguments map[string]any) (string, error)
- func (tb *DefaultToolBox) GetTool(toolName string) (Tool, bool)
- func (tb *DefaultToolBox) GetToolNames() []string
- func (tb *DefaultToolBox) GetTools() []sdk.ChatCompletionTool
- func (tb *DefaultToolBox) HasTool(toolName string) bool
- type EmptyMessagePartsError
- type FilesystemArtifactStorage
- func (fs *FilesystemArtifactStorage) CleanupExpiredArtifacts(ctx context.Context, maxAge time.Duration) (int, error)
- func (fs *FilesystemArtifactStorage) CleanupOldestArtifacts(ctx context.Context, maxCount int) (int, error)
- func (fs *FilesystemArtifactStorage) Close() error
- func (fs *FilesystemArtifactStorage) Delete(ctx context.Context, artifactID string, filename string) error
- func (fs *FilesystemArtifactStorage) Exists(ctx context.Context, artifactID string, filename string) (bool, error)
- func (fs *FilesystemArtifactStorage) GetURL(artifactID string, filename string) string
- func (fs *FilesystemArtifactStorage) Retrieve(ctx context.Context, artifactID string, filename string) (io.ReadCloser, error)
- func (fs *FilesystemArtifactStorage) Store(ctx context.Context, artifactID string, filename string, data io.Reader) (string, error)
- type HTTPPushNotificationSender
- type InMemoryStorage
- func (s *InMemoryStorage) CleanupCompletedTasks() int
- func (s *InMemoryStorage) CleanupOldConversations(maxAge int64) int
- func (s *InMemoryStorage) CleanupTasksWithRetention(maxCompleted, maxFailed int) int
- func (s *InMemoryStorage) ClearQueue() error
- func (s *InMemoryStorage) CreateActiveTask(task *types.Task) error
- func (s *InMemoryStorage) DeleteContext(contextID string) error
- func (s *InMemoryStorage) DeleteContextAndTasks(contextID string) error
- func (s *InMemoryStorage) DeleteTask(taskID string) error
- func (s *InMemoryStorage) DequeueTask(ctx context.Context) (*QueuedTask, error)
- func (s *InMemoryStorage) EnqueueTask(task *types.Task, requestID any) error
- func (s *InMemoryStorage) GetActiveTask(taskID string) (*types.Task, error)
- func (s *InMemoryStorage) GetContexts() []string
- func (s *InMemoryStorage) GetContextsWithTasks() []string
- func (s *InMemoryStorage) GetQueueLength() int
- func (s *InMemoryStorage) GetStats() StorageStats
- func (s *InMemoryStorage) GetTask(taskID string) (*types.Task, bool)
- func (s *InMemoryStorage) GetTaskByContextAndID(contextID, taskID string) (*types.Task, bool)
- func (s *InMemoryStorage) ListTasks(filter TaskFilter) ([]*types.Task, error)
- func (s *InMemoryStorage) ListTasksByContext(contextID string, filter TaskFilter) ([]*types.Task, error)
- func (s *InMemoryStorage) StoreDeadLetterTask(task *types.Task) error
- func (s *InMemoryStorage) UpdateActiveTask(task *types.Task) error
- type InMemoryStorageFactory
- type JRPCErrorCode
- type LLMClient
- type LLMConfig
- type LLMRequest
- type LLMResponse
- type MinIOArtifactStorage
- func (m *MinIOArtifactStorage) CleanupExpiredArtifacts(ctx context.Context, maxAge time.Duration) (int, error)
- func (m *MinIOArtifactStorage) CleanupOldestArtifacts(ctx context.Context, maxCount int) (int, error)
- func (m *MinIOArtifactStorage) Close() error
- func (m *MinIOArtifactStorage) Delete(ctx context.Context, artifactID string, filename string) error
- func (m *MinIOArtifactStorage) Exists(ctx context.Context, artifactID string, filename string) (bool, error)
- func (m *MinIOArtifactStorage) GetURL(artifactID string, filename string) string
- func (m *MinIOArtifactStorage) Retrieve(ctx context.Context, artifactID string, filename string) (io.ReadCloser, error)
- func (m *MinIOArtifactStorage) Store(ctx context.Context, artifactID string, filename string, data io.Reader) (string, error)
- type OpenAICompatibleAgent
- type OpenAICompatibleAgentImpl
- func AgentWithConfig(logger *zap.Logger, config *config.AgentConfig) (*OpenAICompatibleAgentImpl, error)
- func AgentWithLLM(logger *zap.Logger, llmClient LLMClient) (*OpenAICompatibleAgentImpl, error)
- func FullyConfiguredAgent(logger *zap.Logger, config *config.AgentConfig, llmClient LLMClient, ...) (*OpenAICompatibleAgentImpl, error)
- func NewOpenAICompatibleAgent(logger *zap.Logger) *OpenAICompatibleAgentImpl
- func NewOpenAICompatibleAgentWithConfig(logger *zap.Logger, cfg *config.AgentConfig) *OpenAICompatibleAgentImpl
- func NewOpenAICompatibleAgentWithLLM(logger *zap.Logger, llmClient LLMClient) *OpenAICompatibleAgentImpl
- func NewOpenAICompatibleAgentWithLLMConfig(logger *zap.Logger, config *config.AgentConfig) (*OpenAICompatibleAgentImpl, error)
- func SimpleAgent(logger *zap.Logger) (*OpenAICompatibleAgentImpl, error)
- func (a *OpenAICompatibleAgentImpl) GetCallbackExecutor() CallbackExecutor
- func (a *OpenAICompatibleAgentImpl) RunWithStream(ctx context.Context, messages []types.Message) (<-chan cloudevents.Event, error)
- func (a *OpenAICompatibleAgentImpl) SetCallbackExecutor(executor CallbackExecutor)
- func (a *OpenAICompatibleAgentImpl) SetLLMClient(client LLMClient)
- func (a *OpenAICompatibleAgentImpl) SetToolBox(toolBox ToolBox)
- type OpenAICompatibleLLMClient
- func (c *OpenAICompatibleLLMClient) CreateChatCompletion(ctx context.Context, messages []sdk.Message, tools ...sdk.ChatCompletionTool) (*sdk.CreateChatCompletionResponse, error)
- func (c *OpenAICompatibleLLMClient) CreateStreamingChatCompletion(ctx context.Context, messages []sdk.Message, tools ...sdk.ChatCompletionTool) (<-chan *sdk.CreateChatCompletionStreamResponse, <-chan error)
- type PushNotificationSender
- type QueuedTask
- type RedisStorage
- func (s *RedisStorage) CleanupCompletedTasks() int
- func (s *RedisStorage) CleanupTasksWithRetention(maxCompleted, maxFailed int) int
- func (s *RedisStorage) ClearQueue() error
- func (s *RedisStorage) CreateActiveTask(task *types.Task) error
- func (s *RedisStorage) DeleteContext(contextID string) error
- func (s *RedisStorage) DeleteContextAndTasks(contextID string) error
- func (s *RedisStorage) DeleteTask(taskID string) error
- func (s *RedisStorage) DequeueTask(ctx context.Context) (*QueuedTask, error)
- func (s *RedisStorage) EnqueueTask(task *types.Task, requestID any) error
- func (s *RedisStorage) GetActiveTask(taskID string) (*types.Task, error)
- func (s *RedisStorage) GetContexts() []string
- func (s *RedisStorage) GetContextsWithTasks() []string
- func (s *RedisStorage) GetQueueLength() int
- func (s *RedisStorage) GetStats() StorageStats
- func (s *RedisStorage) GetTask(taskID string) (*types.Task, bool)
- func (s *RedisStorage) GetTaskByContextAndID(contextID, taskID string) (*types.Task, bool)
- func (s *RedisStorage) ListTasks(filter TaskFilter) ([]*types.Task, error)
- func (s *RedisStorage) ListTasksByContext(contextID string, filter TaskFilter) ([]*types.Task, error)
- func (s *RedisStorage) StoreDeadLetterTask(task *types.Task) error
- func (s *RedisStorage) UpdateActiveTask(task *types.Task) error
- type RedisStorageFactory
- type ResponseSender
- type SortOrder
- type Storage
- type StorageFactory
- type StorageFactoryRegistry
- func (r *StorageFactoryRegistry) CreateStorage(ctx context.Context, config config.QueueConfig, logger *zap.Logger) (Storage, error)
- func (r *StorageFactoryRegistry) GetFactory(provider string) (StorageFactory, error)
- func (r *StorageFactoryRegistry) GetProviders() []string
- func (r *StorageFactoryRegistry) Register(provider string, factory StorageFactory)
- type StorageStats
- type StreamableTaskHandler
- type StreamingNotImplementedError
- type TaskFilter
- type TaskHandler
- type TaskManager
- type TaskNotCancelableError
- type TaskNotFoundError
- type TaskResultProcessor
- type TaskSortField
- type TaskUpdateNotification
- type Tool
- type ToolBox
- type ToolContext
- type ToolNotFoundError
- type UsageTracker
- func (ut *UsageTracker) AddMessages(count int)
- func (ut *UsageTracker) AddTokenUsage(usage sdk.CompletionUsage)
- func (ut *UsageTracker) GetMetadata() map[string]any
- func (ut *UsageTracker) HasUsage() bool
- func (ut *UsageTracker) IncrementFailedTools()
- func (ut *UsageTracker) IncrementIteration()
- func (ut *UsageTracker) IncrementToolCalls()
Constants ¶
This section is empty.
Variables ¶
var ( BuildAgentName = "" BuildAgentDescription = "" BuildAgentVersion = "" )
Build-time metadata variables set via LD flags
Functions ¶
func GenerateTaskID ¶
func GenerateTaskID() string
GenerateTaskID generates a unique task ID using UUID v4
func GetSupportedProviders ¶ added in v0.9.0
func GetSupportedProviders() []string
GetSupportedProviders returns a list of all registered providers
func NewEmptyMessagePartsError ¶
func NewEmptyMessagePartsError() error
NewEmptyMessagePartsError creates a new EmptyMessagePartsError
func NewStreamingNotImplementedError ¶
func NewStreamingNotImplementedError() error
NewStreamingNotImplementedError creates a new StreamingNotImplementedError
func NewTaskNotCancelableError ¶ added in v0.8.0
NewTaskNotCancelableError creates a new TaskNotCancelableError
func NewTaskNotFoundError ¶
NewTaskNotFoundError creates a new TaskNotFoundError
func RegisterStorageProvider ¶ added in v0.9.0
func RegisterStorageProvider(provider string, factory StorageFactory)
RegisterStorageProvider registers a storage provider factory
Types ¶
type A2AProtocolHandler ¶ added in v0.9.4
type A2AProtocolHandler interface {
// HandleMessageSend processes message/send requests
HandleMessageSend(c *gin.Context, req types.JSONRPCRequest)
// HandleMessageStream processes message/stream requests
HandleMessageStream(c *gin.Context, req types.JSONRPCRequest, streamingHandler StreamableTaskHandler)
// HandleTaskGet processes tasks/get requests
HandleTaskGet(c *gin.Context, req types.JSONRPCRequest)
// HandleTaskList processes tasks/list requests
HandleTaskList(c *gin.Context, req types.JSONRPCRequest)
// HandleTaskCancel processes tasks/cancel requests
HandleTaskCancel(c *gin.Context, req types.JSONRPCRequest)
// HandleTaskPushNotificationConfigSet processes tasks/pushNotificationConfig/set requests
HandleTaskPushNotificationConfigSet(c *gin.Context, req types.JSONRPCRequest)
// HandleTaskPushNotificationConfigGet processes tasks/pushNotificationConfig/get requests
HandleTaskPushNotificationConfigGet(c *gin.Context, req types.JSONRPCRequest)
// HandleTaskPushNotificationConfigList processes tasks/pushNotificationConfig/list requests
HandleTaskPushNotificationConfigList(c *gin.Context, req types.JSONRPCRequest)
// HandleTaskPushNotificationConfigDelete processes tasks/pushNotificationConfig/delete requests
HandleTaskPushNotificationConfigDelete(c *gin.Context, req types.JSONRPCRequest)
}
A2AProtocolHandler defines the interface for handling A2A protocol requests
type A2AServer ¶
type A2AServer interface {
// Start starts the A2A server on the configured port
Start(ctx context.Context) error
// Stop gracefully stops the A2A server
Stop(ctx context.Context) error
// GetAgentCard returns the agent's capabilities and metadata
// Returns nil if no agent card has been explicitly set
GetAgentCard() *types.AgentCard
// StartTaskProcessor starts the background task processor
StartTaskProcessor(ctx context.Context)
// SetPollingTaskHandler sets the task handler for polling/queue-based scenarios
SetBackgroundTaskHandler(handler TaskHandler)
// GetPollingTaskHandler returns the configured polling task handler
GetBackgroundTaskHandler() TaskHandler
// SetStreamingTaskHandler sets the task handler for streaming scenarios
SetStreamingTaskHandler(handler StreamableTaskHandler)
// GetStreamingTaskHandler returns the configured streaming task handler
GetStreamingTaskHandler() StreamableTaskHandler
// SetAgent sets the OpenAI-compatible agent for processing tasks
SetAgent(agent OpenAICompatibleAgent)
// GetAgent returns the configured OpenAI-compatible agent
GetAgent() OpenAICompatibleAgent
// SetAgentName sets the agent's name dynamically
SetAgentName(name string)
// SetAgentDescription sets the agent's description dynamically
SetAgentDescription(description string)
// SetAgentURL sets the agent's URL dynamically
SetAgentURL(url string)
// SetAgentVersion sets the agent's version dynamically
SetAgentVersion(version string)
// SetAgentCard sets a custom agent card that overrides the default card generation
SetAgentCard(agentCard types.AgentCard)
// LoadAgentCardFromFile loads and sets an agent card from a JSON file
// The optional overrides map allows dynamic replacement of JSON attribute values
LoadAgentCardFromFile(filePath string, overrides map[string]any) error
}
A2AServer defines the interface for an A2A-compatible server
func CustomA2AServer ¶
func CustomA2AServer( cfg config.Config, logger *zap.Logger, pollingTaskHandler TaskHandler, streamingTaskHandler StreamableTaskHandler, taskResultProcessor TaskResultProcessor, agentCard types.AgentCard, ) (A2AServer, error)
CustomA2AServer creates an A2A server with custom components This provides more control over the server configuration
func CustomA2AServerWithAgent ¶
func CustomA2AServerWithAgent( cfg config.Config, logger *zap.Logger, agent OpenAICompatibleAgent, toolBox ToolBox, taskResultProcessor TaskResultProcessor, agentCard types.AgentCard, ) (A2AServer, error)
CustomA2AServerWithAgent creates an A2A server with custom components and an agent This provides maximum control over the server configuration
func SimpleA2AServerWithAgent ¶
func SimpleA2AServerWithAgent(cfg config.Config, logger *zap.Logger, agent OpenAICompatibleAgent, agentCard types.AgentCard) (A2AServer, error)
SimpleA2AServerWithAgent creates a basic A2A server with an OpenAI-compatible agent This is a convenience function for agent-based use cases
type A2AServerBuilder ¶
type A2AServerBuilder interface {
// WithBackgroundTaskHandler sets a custom task handler for polling/queue-based scenarios.
// This handler will be used for message/send requests and background queue processing.
WithBackgroundTaskHandler(handler TaskHandler) A2AServerBuilder
// WithStreamingTaskHandler sets a custom task handler for streaming scenarios.
// This handler will be used for message/stream requests.
WithStreamingTaskHandler(handler StreamableTaskHandler) A2AServerBuilder
// WithDefaultBackgroundTaskHandler sets a default background task handler optimized for background scenarios.
// This handler automatically handles input-required pausing without requiring custom implementation.
WithDefaultBackgroundTaskHandler() A2AServerBuilder // WithDefaultStreamingTaskHandler sets a default streaming task handler optimized for streaming scenarios.
// This handler automatically handles input-required pausing with streaming-aware behavior.
WithDefaultStreamingTaskHandler() A2AServerBuilder
// WithDefaultTaskHandlers sets both default polling and streaming task handlers.
// This is a convenience method that sets up optimized handlers for both scenarios.
WithDefaultTaskHandlers() A2AServerBuilder
// WithTaskResultProcessor sets a custom task result processor for handling tool call results.
// This allows custom business logic for determining when tasks should be completed.
WithTaskResultProcessor(processor TaskResultProcessor) A2AServerBuilder
// WithAgent sets a pre-configured OpenAI-compatible agent for processing tasks.
// This is useful when you have already configured an agent with specific settings.
WithAgent(agent OpenAICompatibleAgent) A2AServerBuilder
// WithAgentCard sets a custom agent card that overrides the default card generation.
// This gives full control over the agent's advertised capabilities and metadata.
WithAgentCard(agentCard types.AgentCard) A2AServerBuilder
// WithAgentCardFromFile loads and sets an agent card from a JSON file.
// This provides a convenient way to load agent configuration from a static file.
// The optional overrides map allows dynamic replacement of JSON attribute values.
WithAgentCardFromFile(filePath string, overrides map[string]any) A2AServerBuilder
// WithLogger sets a custom logger for the builder and resulting server.
// This allows using a logger configured with appropriate level based on the Debug config.
WithLogger(logger *zap.Logger) A2AServerBuilder
// WithArtifactService sets the artifact service for the server.
// This enables URI-based artifacts instead of base64-encoded ones.
WithArtifactService(service ArtifactService) A2AServerBuilder
// Build creates and returns the configured A2A server.
// This method applies configuration defaults and initializes all components.
Build() (A2AServer, error)
}
A2AServerBuilder provides a fluent interface for building A2A servers with custom configurations. This interface allows for flexible server construction with optional components and settings. Use NewA2AServerBuilder to create an instance, then chain method calls to configure the server.
Example:
server := NewA2AServerBuilder(config, logger). WithAgent(agent). Build()
func NewA2AServerBuilder ¶
func NewA2AServerBuilder(cfg config.Config, logger *zap.Logger) A2AServerBuilder
NewA2AServerBuilder creates a new server builder with required dependencies. The configuration passed here will be used to configure the server. Any nil nested configuration objects will be populated with sensible defaults when Build() is called.
Parameters:
- cfg: The base configuration for the server (agent name, port, etc.)
- logger: Logger instance to use for the server (should match cfg.Debug level)
Returns:
A2AServerBuilder interface that can be used to further configure the server before building.
Example:
cfg := config.Config{
AgentName: "my-agent",
Port: "8080",
Debug: true,
}
logger, _ := zap.NewDevelopment() // Use development logger for debug
server := NewA2AServerBuilder(cfg, logger).
WithAgent(myAgent).
Build()
type A2AServerBuilderImpl ¶
type A2AServerBuilderImpl struct {
// contains filtered or unexported fields
}
A2AServerBuilderImpl is the concrete implementation of the A2AServerBuilder interface. It provides a fluent interface for building A2A servers with custom configurations. This struct holds the configuration and optional components that will be used to create the server.
func (*A2AServerBuilderImpl) Build ¶
func (b *A2AServerBuilderImpl) Build() (A2AServer, error)
Build creates and returns the configured A2A server.
func (*A2AServerBuilderImpl) WithAgent ¶
func (b *A2AServerBuilderImpl) WithAgent(agent OpenAICompatibleAgent) A2AServerBuilder
WithAgent sets a custom OpenAI-compatible agent
func (*A2AServerBuilderImpl) WithAgentCard ¶
func (b *A2AServerBuilderImpl) WithAgentCard(agentCard types.AgentCard) A2AServerBuilder
WithAgentCard sets a custom agent card that overrides the default card generation
func (*A2AServerBuilderImpl) WithAgentCardFromFile ¶
func (b *A2AServerBuilderImpl) WithAgentCardFromFile(filePath string, overrides map[string]any) A2AServerBuilder
WithAgentCardFromFile loads and sets an agent card from a JSON file The optional overrides map allows dynamic replacement of JSON attribute values
func (*A2AServerBuilderImpl) WithArtifactService ¶ added in v0.15.0
func (b *A2AServerBuilderImpl) WithArtifactService(service ArtifactService) A2AServerBuilder
WithArtifactService sets the artifact service
func (*A2AServerBuilderImpl) WithBackgroundTaskHandler ¶ added in v0.9.0
func (b *A2AServerBuilderImpl) WithBackgroundTaskHandler(handler TaskHandler) A2AServerBuilder
WithBackgroundTaskHandler sets a custom task handler for polling/queue-based scenarios
func (*A2AServerBuilderImpl) WithDefaultBackgroundTaskHandler ¶ added in v0.9.0
func (b *A2AServerBuilderImpl) WithDefaultBackgroundTaskHandler() A2AServerBuilder
WithDefaultBackgroundTaskHandler sets a default background task handler optimized for background scenarios
func (*A2AServerBuilderImpl) WithDefaultStreamingTaskHandler ¶ added in v0.9.0
func (b *A2AServerBuilderImpl) WithDefaultStreamingTaskHandler() A2AServerBuilder
WithDefaultStreamingTaskHandler sets a default streaming task handler optimized for streaming scenarios
func (*A2AServerBuilderImpl) WithDefaultTaskHandlers ¶ added in v0.9.0
func (b *A2AServerBuilderImpl) WithDefaultTaskHandlers() A2AServerBuilder
WithDefaultTaskHandlers sets both default background and streaming task handlers
func (*A2AServerBuilderImpl) WithLogger ¶
func (b *A2AServerBuilderImpl) WithLogger(logger *zap.Logger) A2AServerBuilder
WithLogger sets a custom logger for the builder
func (*A2AServerBuilderImpl) WithStreamingTaskHandler ¶ added in v0.9.0
func (b *A2AServerBuilderImpl) WithStreamingTaskHandler(handler StreamableTaskHandler) A2AServerBuilder
WithStreamingTaskHandler sets a custom task handler for streaming scenarios
func (*A2AServerBuilderImpl) WithTaskResultProcessor ¶
func (b *A2AServerBuilderImpl) WithTaskResultProcessor(processor TaskResultProcessor) A2AServerBuilder
WithTaskResultProcessor sets a custom task result processor
type A2AServerImpl ¶
type A2AServerImpl struct {
// contains filtered or unexported fields
}
func NewA2AServer ¶
func NewA2AServer(cfg *config.Config, logger *zap.Logger, otel otel.OpenTelemetry) *A2AServerImpl
NewA2AServer creates a new A2A server with the provided configuration and logger
func NewA2AServerEnvironmentAware ¶
func NewA2AServerEnvironmentAware(cfg *config.Config, logger *zap.Logger, otel otel.OpenTelemetry) *A2AServerImpl
NewA2AServerEnvironmentAware creates a new A2A server with environment-aware configuration.
func NewA2AServerWithAgent ¶
func NewA2AServerWithAgent(cfg *config.Config, logger *zap.Logger, otel otel.OpenTelemetry, agent OpenAICompatibleAgent) *A2AServerImpl
NewA2AServerWithAgent creates a new A2A server with an optional OpenAI-compatible agent
func NewDefaultA2AServer ¶
func NewDefaultA2AServer(cfg *config.Config) *A2AServerImpl
NewDefaultA2AServer creates a new default A2A server implementation
func (*A2AServerImpl) GetAgent ¶
func (s *A2AServerImpl) GetAgent() OpenAICompatibleAgent
GetAgent returns the configured OpenAI-compatible agent
func (*A2AServerImpl) GetAgentCard ¶
func (s *A2AServerImpl) GetAgentCard() *types.AgentCard
GetAgentCard returns the agent's capabilities and metadata Returns nil if no agent card has been explicitly set
func (*A2AServerImpl) GetBackgroundTaskHandler ¶ added in v0.9.0
func (s *A2AServerImpl) GetBackgroundTaskHandler() TaskHandler
GetBackgroundTaskHandler returns the configured polling task handler
func (*A2AServerImpl) GetStreamingTaskHandler ¶ added in v0.9.0
func (s *A2AServerImpl) GetStreamingTaskHandler() StreamableTaskHandler
GetStreamingTaskHandler returns the configured streaming task handler
func (*A2AServerImpl) LoadAgentCardFromFile ¶
func (s *A2AServerImpl) LoadAgentCardFromFile(filePath string, overrides map[string]any) error
LoadAgentCardFromFile loads and sets an agent card from a JSON file The optional overrides map allows dynamic replacement of JSON attribute values
func (*A2AServerImpl) SetAgent ¶
func (s *A2AServerImpl) SetAgent(agent OpenAICompatibleAgent)
SetAgent sets the OpenAI-compatible agent for processing tasks
func (*A2AServerImpl) SetAgentCard ¶
func (s *A2AServerImpl) SetAgentCard(agentCard types.AgentCard)
SetAgentCard sets a custom agent card that overrides the default card generation
func (*A2AServerImpl) SetAgentDescription ¶
func (s *A2AServerImpl) SetAgentDescription(description string)
SetAgentDescription sets the agent's description dynamically
func (*A2AServerImpl) SetAgentName ¶
func (s *A2AServerImpl) SetAgentName(name string)
SetAgentName sets the agent's name dynamically
func (*A2AServerImpl) SetAgentURL ¶
func (s *A2AServerImpl) SetAgentURL(url string)
SetAgentURL sets the agent's URL dynamically
func (*A2AServerImpl) SetAgentVersion ¶
func (s *A2AServerImpl) SetAgentVersion(version string)
SetAgentVersion sets the agent's version dynamically
func (*A2AServerImpl) SetBackgroundTaskHandler ¶ added in v0.9.0
func (s *A2AServerImpl) SetBackgroundTaskHandler(handler TaskHandler)
SetBackgroundTaskHandler sets the task handler for polling/queue-based scenarios
func (*A2AServerImpl) SetStreamingTaskHandler ¶ added in v0.9.0
func (s *A2AServerImpl) SetStreamingTaskHandler(handler StreamableTaskHandler)
SetStreamingTaskHandler sets the task handler for streaming scenarios
func (*A2AServerImpl) SetTaskResultProcessor ¶
func (s *A2AServerImpl) SetTaskResultProcessor(processor TaskResultProcessor)
SetTaskResultProcessor sets the task result processor for custom business logic
func (*A2AServerImpl) Start ¶
func (s *A2AServerImpl) Start(ctx context.Context) error
Start starts the A2A server
func (*A2AServerImpl) StartTaskProcessor ¶
func (s *A2AServerImpl) StartTaskProcessor(ctx context.Context)
StartTaskProcessor starts the background task processing goroutine
type AfterAgentCallback ¶ added in v0.14.0
type AfterAgentCallback func(ctx context.Context, callbackContext *CallbackContext, agentOutput *types.Message) *types.Message
AfterAgentCallback is called immediately after the agent's execution completes
It does not run if the agent was skipped due to BeforeAgentCallback returning a types.Message It's purpose can be for clean up tasks, post-execution validation, logging or modifying the final agent output.
Return nil to use the original output, or return types.Message to replace the agent's output
type AfterModelCallback ¶ added in v0.14.0
type AfterModelCallback func(ctx context.Context, callbackContext *CallbackContext, llmResponse *LLMResponse) *LLMResponse
AfterModelCallback is called just after receiving a response from the LLM, before it's processed further by the invoking agent.
It's purpose is to allow inspection or modification of the raw LLM response. E.g. log model outputs, reformat responses, censor sensitive information, parsing structured data or handling specific error codes.
It's not clear in the official ADK docs whether the after callback is skipped if the BeforeModelCallback returns an LLMResponse. To avoid confusion the AfterModelCallback will always be invoked regardless if the LLMResponse is real or modified by the BeforeModelCallback.
Return nil to use the original response, or return LLMResponse to replace the LLM response
type AfterToolCallback ¶ added in v0.14.0
type AfterToolCallback func(ctx context.Context, tool Tool, args map[string]any, toolContext *ToolContext, toolResult map[string]any) map[string]any
AfterToolCallback is called just after a tool's execution completes successfully.
It's purpose is to allow inspection or modification of the tool's result before it's sent back to the LLM. Useful for logging, post-processing, or in the future saving parts of the result to the session state (not implemented yet).
Return nil to use the original result, or return map[string]any to replace the tool result
type AgentBuilder ¶
type AgentBuilder interface {
// WithConfig sets the agent configuration
WithConfig(config *config.AgentConfig) AgentBuilder
// WithLLMClient sets a pre-configured LLM client
WithLLMClient(client LLMClient) AgentBuilder
// WithToolBox sets a custom toolbox
WithToolBox(toolBox ToolBox) AgentBuilder
// WithDefaultToolBox sets the default toolbox
WithDefaultToolBox() AgentBuilder
// WithSystemPrompt sets the system prompt (overrides config)
WithSystemPrompt(prompt string) AgentBuilder
// WithMaxChatCompletion sets the maximum chat completion iterations for the agent
WithMaxChatCompletion(max int) AgentBuilder
// WithMaxConversationHistory sets the maximum conversation history for the agent
WithMaxConversationHistory(max int) AgentBuilder
// WithCallbacks sets the callback configuration for the agent
// Callbacks allow you to hook into various points of the agent's execution lifecycle
// including before/after agent execution, model calls, and tool execution
WithCallbacks(config *CallbackConfig) AgentBuilder
// GetConfig returns the current agent configuration (for testing purposes)
GetConfig() *config.AgentConfig
// Build creates and returns the configured agent
Build() (*OpenAICompatibleAgentImpl, error)
}
AgentBuilder provides a fluent interface for building OpenAI-compatible agents with custom configurations. This interface allows for flexible agent construction with optional components and settings. Use NewAgentBuilder to create an instance, then chain method calls to configure the agent.
Example:
agent := NewAgentBuilder(logger).
WithConfig(agentConfig).
WithLLMClient(client).
WithCallbacks(&CallbackConfig{
BeforeAgent: []BeforeAgentCallback{myBeforeAgentCallback},
AfterAgent: []AfterAgentCallback{myAfterAgentCallback},
}).
Build()
func NewAgentBuilder ¶
func NewAgentBuilder(logger *zap.Logger) AgentBuilder
NewAgentBuilder creates a new agent builder with required dependencies.
Parameters:
- logger: Logger instance to use for the agent
Returns:
AgentBuilder interface that can be used to configure the agent before building.
Example:
logger, _ := zap.NewDevelopment() agent, err := NewAgentBuilder(logger). WithConfig(agentConfig). Build()
type AgentBuilderImpl ¶
type AgentBuilderImpl struct {
// contains filtered or unexported fields
}
AgentBuilderImpl is the concrete implementation of the AgentBuilder interface. It provides a fluent interface for building OpenAI-compatible agents with custom configurations.
func (*AgentBuilderImpl) Build ¶
func (b *AgentBuilderImpl) Build() (*OpenAICompatibleAgentImpl, error)
Build creates and returns the configured agent
func (*AgentBuilderImpl) GetConfig ¶
func (b *AgentBuilderImpl) GetConfig() *config.AgentConfig
GetConfig returns the current agent configuration (for testing purposes)
func (*AgentBuilderImpl) WithCallbacks ¶ added in v0.16.0
func (b *AgentBuilderImpl) WithCallbacks(config *CallbackConfig) AgentBuilder
WithCallbacks sets the callback configuration for the agent This allows hooking into the agent lifecycle, model calls, and tool execution
func (*AgentBuilderImpl) WithConfig ¶
func (b *AgentBuilderImpl) WithConfig(userConfig *config.AgentConfig) AgentBuilder
WithConfig sets the agent configuration
func (*AgentBuilderImpl) WithDefaultToolBox ¶ added in v0.15.0
func (b *AgentBuilderImpl) WithDefaultToolBox() AgentBuilder
WithDefaultToolBox sets the default toolbox
func (*AgentBuilderImpl) WithLLMClient ¶
func (b *AgentBuilderImpl) WithLLMClient(client LLMClient) AgentBuilder
WithLLMClient sets a pre-configured LLM client
func (*AgentBuilderImpl) WithMaxChatCompletion ¶
func (b *AgentBuilderImpl) WithMaxChatCompletion(max int) AgentBuilder
WithMaxChatCompletion sets the maximum chat completion iterations for the agent
func (*AgentBuilderImpl) WithMaxConversationHistory ¶
func (b *AgentBuilderImpl) WithMaxConversationHistory(max int) AgentBuilder
WithMaxConversationHistory sets the maximum conversation history for the agent
func (*AgentBuilderImpl) WithSystemPrompt ¶
func (b *AgentBuilderImpl) WithSystemPrompt(prompt string) AgentBuilder
WithSystemPrompt sets the system prompt (overrides config)
func (*AgentBuilderImpl) WithToolBox ¶
func (b *AgentBuilderImpl) WithToolBox(toolBox ToolBox) AgentBuilder
WithToolBox sets a custom toolbox
type ArtifactMetadata ¶ added in v0.12.0
type ArtifactMetadata struct {
ArtifactID string `json:"artifact_id"`
Filename string `json:"filename"`
Size int64 `json:"size"`
ContentType string `json:"content_type"`
UploadedAt time.Time `json:"uploaded_at"`
}
ArtifactMetadata holds metadata about stored artifacts
type ArtifactService ¶ added in v0.15.0
type ArtifactService interface {
// CreateTextArtifact creates a text artifact
CreateTextArtifact(name, description, text string) types.Artifact
// CreateFileArtifact creates a file artifact with URI by storing the content
CreateFileArtifact(name, description, filename string, data []byte, mimeType *string) (types.Artifact, error)
// CreateFileArtifactFromURI creates a file artifact from an existing URI
CreateFileArtifactFromURI(name, description, filename, uri string, mimeType *string) types.Artifact
// CreateDataArtifact creates a structured data artifact
CreateDataArtifact(name, description string, data map[string]any) types.Artifact
// CreateMultiPartArtifact creates an artifact with multiple parts
CreateMultiPartArtifact(name, description string, parts []types.Part) types.Artifact
// AddArtifactToTask adds an artifact to a task's artifact collection
AddArtifactToTask(task *types.Task, artifact types.Artifact)
// AddArtifactsToTask adds multiple artifacts to a task's artifact collection
AddArtifactsToTask(task *types.Task, artifacts []types.Artifact)
// GetArtifactByID retrieves an artifact from a task by its ID
GetArtifactByID(task *types.Task, artifactID string) (*types.Artifact, bool)
// GetArtifactsByType retrieves all artifacts from a task that contain parts of a specific type
GetArtifactsByType(task *types.Task, partKind string) []types.Artifact
// ValidateArtifact validates that an artifact conforms to the A2A protocol specification
ValidateArtifact(artifact types.Artifact) error
// GetMimeTypeFromExtension returns a MIME type based on file extension
GetMimeTypeFromExtension(filename string) *string
// CreateTaskArtifactUpdateEvent creates an artifact update event for streaming
CreateTaskArtifactUpdateEvent(taskID, contextID string, artifact types.Artifact, append, lastChunk *bool) types.TaskArtifactUpdateEvent
// Storage operations for artifacts server
// Exists checks if an artifact file exists
Exists(ctx context.Context, artifactID, filename string) (bool, error)
// Retrieve retrieves an artifact file
Retrieve(ctx context.Context, artifactID, filename string) (io.ReadCloser, error)
// CleanupExpiredArtifacts removes artifacts older than maxAge
CleanupExpiredArtifacts(ctx context.Context, maxAge time.Duration) (int, error)
// CleanupOldestArtifacts removes oldest artifacts keeping only maxArtifacts
CleanupOldestArtifacts(ctx context.Context, maxArtifacts int) (int, error)
// Close closes the artifact service and releases resources
Close() error
}
ArtifactService defines the interface for artifact operations with storage support. It provides a clean API for artifact creation, management, and storage operations.
func NewArtifactService ¶ added in v0.15.0
func NewArtifactService(cfg *config.ArtifactsConfig, logger *zap.Logger) (ArtifactService, error)
NewArtifactService creates a new artifact service from configuration. It creates and manages its own storage provider internally.
type ArtifactServiceImpl ¶ added in v0.15.0
type ArtifactServiceImpl struct {
// contains filtered or unexported fields
}
ArtifactServiceImpl is the concrete implementation of ArtifactService. It encapsulates the storage dependency and provides a clean API for artifact creation.
func (*ArtifactServiceImpl) AddArtifactToTask ¶ added in v0.15.0
func (as *ArtifactServiceImpl) AddArtifactToTask(task *types.Task, artifact types.Artifact)
AddArtifactToTask adds an artifact to a task's artifact collection
func (*ArtifactServiceImpl) AddArtifactsToTask ¶ added in v0.15.0
func (as *ArtifactServiceImpl) AddArtifactsToTask(task *types.Task, artifacts []types.Artifact)
AddArtifactsToTask adds multiple artifacts to a task's artifact collection
func (*ArtifactServiceImpl) CleanupExpiredArtifacts ¶ added in v0.15.0
func (as *ArtifactServiceImpl) CleanupExpiredArtifacts(ctx context.Context, maxAge time.Duration) (int, error)
CleanupExpiredArtifacts removes artifacts older than maxAge
func (*ArtifactServiceImpl) CleanupOldestArtifacts ¶ added in v0.15.0
func (as *ArtifactServiceImpl) CleanupOldestArtifacts(ctx context.Context, maxArtifacts int) (int, error)
CleanupOldestArtifacts removes oldest artifacts keeping only maxArtifacts
func (*ArtifactServiceImpl) Close ¶ added in v0.15.0
func (as *ArtifactServiceImpl) Close() error
Close closes the artifact service and releases resources
func (*ArtifactServiceImpl) CreateDataArtifact ¶ added in v0.15.0
func (as *ArtifactServiceImpl) CreateDataArtifact(name, description string, data map[string]any) types.Artifact
CreateDataArtifact creates a structured data artifact
func (*ArtifactServiceImpl) CreateFileArtifact ¶ added in v0.15.0
func (as *ArtifactServiceImpl) CreateFileArtifact(name, description, filename string, data []byte, mimeType *string) (types.Artifact, error)
CreateFileArtifact creates a file artifact with URI by storing the content
func (*ArtifactServiceImpl) CreateFileArtifactFromURI ¶ added in v0.15.0
func (as *ArtifactServiceImpl) CreateFileArtifactFromURI(name, description, filename, uri string, mimeType *string) types.Artifact
CreateFileArtifactFromURI creates a file artifact from an existing URI
func (*ArtifactServiceImpl) CreateMultiPartArtifact ¶ added in v0.15.0
func (as *ArtifactServiceImpl) CreateMultiPartArtifact(name, description string, parts []types.Part) types.Artifact
CreateMultiPartArtifact creates an artifact with multiple parts
func (*ArtifactServiceImpl) CreateTaskArtifactUpdateEvent ¶ added in v0.15.0
func (as *ArtifactServiceImpl) CreateTaskArtifactUpdateEvent(taskID, contextID string, artifact types.Artifact, append, lastChunk *bool) types.TaskArtifactUpdateEvent
CreateTaskArtifactUpdateEvent creates an artifact update event for streaming
func (*ArtifactServiceImpl) CreateTextArtifact ¶ added in v0.15.0
func (as *ArtifactServiceImpl) CreateTextArtifact(name, description, text string) types.Artifact
CreateTextArtifact creates a text artifact
func (*ArtifactServiceImpl) Exists ¶ added in v0.15.0
func (as *ArtifactServiceImpl) Exists(ctx context.Context, artifactID, filename string) (bool, error)
Exists checks if an artifact file exists
func (*ArtifactServiceImpl) GetArtifactByID ¶ added in v0.15.0
func (as *ArtifactServiceImpl) GetArtifactByID(task *types.Task, artifactID string) (*types.Artifact, bool)
GetArtifactByID retrieves an artifact from a task by its ID
func (*ArtifactServiceImpl) GetArtifactsByType ¶ added in v0.15.0
func (as *ArtifactServiceImpl) GetArtifactsByType(task *types.Task, partKind string) []types.Artifact
GetArtifactsByType retrieves all artifacts from a task that contain parts of a specific type
func (*ArtifactServiceImpl) GetMimeTypeFromExtension ¶ added in v0.15.0
func (as *ArtifactServiceImpl) GetMimeTypeFromExtension(filename string) *string
GetMimeTypeFromExtension returns a MIME type based on file extension
func (*ArtifactServiceImpl) Retrieve ¶ added in v0.15.0
func (as *ArtifactServiceImpl) Retrieve(ctx context.Context, artifactID, filename string) (io.ReadCloser, error)
Retrieve retrieves an artifact file
func (*ArtifactServiceImpl) ValidateArtifact ¶ added in v0.15.0
func (as *ArtifactServiceImpl) ValidateArtifact(artifact types.Artifact) error
ValidateArtifact validates that an artifact conforms to the A2A protocol specification
type ArtifactStorageProvider ¶ added in v0.12.0
type ArtifactStorageProvider interface {
// Store stores an artifact and returns its URL for retrieval
Store(ctx context.Context, artifactID string, filename string, data io.Reader) (string, error)
// Retrieve retrieves an artifact by its ID and filename
Retrieve(ctx context.Context, artifactID string, filename string) (io.ReadCloser, error)
// Delete removes an artifact from storage
Delete(ctx context.Context, artifactID string, filename string) error
// Exists checks if an artifact exists in storage
Exists(ctx context.Context, artifactID string, filename string) (bool, error)
// GetURL returns the public URL for accessing an artifact
GetURL(artifactID string, filename string) string
// Close closes the storage provider and cleans up resources
Close() error
// CleanupExpiredArtifacts removes artifacts older than maxAge
CleanupExpiredArtifacts(ctx context.Context, maxAge time.Duration) (int, error)
// CleanupOldestArtifacts removes old artifacts keeping only maxCount per artifact ID
CleanupOldestArtifacts(ctx context.Context, maxCount int) (int, error)
}
ArtifactStorageProvider defines the interface for artifact storage backends
type ArtifactsServer ¶ added in v0.12.0
type ArtifactsServer interface {
// Start starts the artifacts server
Start(ctx context.Context) error
// Stop stops the artifacts server
Stop(ctx context.Context) error
}
ArtifactsServer provides HTTP endpoints for artifact download
func NewArtifactsServer ¶ added in v0.12.0
func NewArtifactsServer(cfg *config.ArtifactsConfig, logger *zap.Logger, artifactService ArtifactService) ArtifactsServer
NewArtifactsServer creates a new artifacts server instance with the provided service
type ArtifactsServerBuilder ¶ added in v0.12.0
type ArtifactsServerBuilder interface {
// WithLogger sets a custom logger for the builder and resulting server
WithLogger(logger *zap.Logger) ArtifactsServerBuilder
// WithArtifactService sets a pre-configured artifact service for the server.
WithArtifactService(service ArtifactService) ArtifactsServerBuilder
// Build creates and returns the configured artifacts server
Build() (ArtifactsServer, error)
}
ArtifactsServerBuilder provides a fluent interface for building artifacts servers with custom configurations. This interface allows for flexible server construction with optional components and settings. Use NewArtifactsServerBuilder to create an instance, then chain method calls to configure the server.
Example:
artifactsServer := NewArtifactsServerBuilder(cfg, logger). Build()
func NewArtifactsServerBuilder ¶ added in v0.12.0
func NewArtifactsServerBuilder(cfg *config.ArtifactsConfig, logger *zap.Logger) ArtifactsServerBuilder
NewArtifactsServerBuilder creates a new artifacts server builder with required dependencies. The configuration passed here will be used to configure the server.
Parameters:
- cfg: The artifacts configuration for the server
- logger: Logger instance to use for the server
Returns:
ArtifactsServerBuilder interface that can be used to further configure the server before building.
Example:
cfg := &config.ArtifactsConfig{
Enable: true,
ServerConfig: config.ArtifactsServerConfig{
Port: "8081",
},
StorageConfig: config.ArtifactsStorageConfig{
Provider: "filesystem",
BasePath: "./artifacts",
},
}
logger, _ := zap.NewDevelopment()
server := NewArtifactsServerBuilder(cfg, logger).
Build()
type ArtifactsServerBuilderImpl ¶ added in v0.12.0
type ArtifactsServerBuilderImpl struct {
// contains filtered or unexported fields
}
ArtifactsServerBuilderImpl is the concrete implementation of the ArtifactsServerBuilder interface. It provides a fluent interface for building artifacts servers with custom configurations. This struct holds the configuration and optional components that will be used to create the server.
func (*ArtifactsServerBuilderImpl) Build ¶ added in v0.12.0
func (b *ArtifactsServerBuilderImpl) Build() (ArtifactsServer, error)
Build creates and returns the configured artifacts server
func (*ArtifactsServerBuilderImpl) WithArtifactService ¶ added in v0.15.2
func (b *ArtifactsServerBuilderImpl) WithArtifactService(service ArtifactService) ArtifactsServerBuilder
WithArtifactService sets a pre-configured artifact service for the server
func (*ArtifactsServerBuilderImpl) WithLogger ¶ added in v0.12.0
func (b *ArtifactsServerBuilderImpl) WithLogger(logger *zap.Logger) ArtifactsServerBuilder
WithLogger sets a custom logger for the builder
type ArtifactsServerImpl ¶ added in v0.12.0
type ArtifactsServerImpl struct {
// contains filtered or unexported fields
}
ArtifactsServerImpl implements the ArtifactsServer interface
type BasicTool ¶
type BasicTool struct {
// contains filtered or unexported fields
}
BasicTool is a simple implementation of the Tool interface using function callbacks
func NewBasicTool ¶
func NewBasicTool( name string, description string, parameters map[string]any, executor func(ctx context.Context, arguments map[string]any) (string, error), ) *BasicTool
NewBasicTool creates a new BasicTool
func (*BasicTool) GetDescription ¶
func (*BasicTool) GetParameters ¶
type BeforeAgentCallback ¶ added in v0.14.0
type BeforeAgentCallback func(ctx context.Context, callbackContext *CallbackContext) *types.Message
BeforeAgentCallback is called immediately before the agent's main execution logic starts Return nil to allow normal execution, or return types.Message to skip agent execution and use its result as the final response.
It's purpose is for setting up resources or state required for a specific agent run, performing validation checks on the session state (not implemented yet) before execution starts, adding additional logging points for agent activity or modifying the agent context before the core logic uses it.
type BeforeModelCallback ¶ added in v0.14.0
type BeforeModelCallback func(ctx context.Context, callbackContext *CallbackContext, llmRequest *LLMRequest) *LLMResponse
BeforeModelCallback is called just before sending a request to the LLM
It's purpose is to allow inspection and modification of the request going to the LLM. E.g. adding dynamic instructions, guardrails, modifying the model config or implementing request-level caching.
Return nil to allow the request to proceed, or return LLMResponse to skip the LLM call. The returned LLMResponse is used directly as if it came from the model making it a powerful option for implementing guardrails or caching.
type BeforeToolCallback ¶ added in v0.14.0
type BeforeToolCallback func(ctx context.Context, tool Tool, args map[string]any, toolContext *ToolContext) map[string]any
BeforeToolCallback is called just before a tool's execution
It's purpose is to allow inspection and modification of the tool arguments, perform authz checks, logging or implementing tool-level caching.
Return nil to allow the tool to execute, or return map[string]any to skip tool execution. The returned map is used directly as the result of the tool call. Making it useful for either caching or overriding the tool behavior completely.
type CallbackConfig ¶ added in v0.14.0
type CallbackConfig struct {
// Agent lifecycle callbacks
BeforeAgent []BeforeAgentCallback
AfterAgent []AfterAgentCallback
// LLM interaction callbacks
BeforeModel []BeforeModelCallback
AfterModel []AfterModelCallback
// Tool execution callbacks
BeforeTool []BeforeToolCallback
AfterTool []AfterToolCallback
}
CallbackConfig holds all callback configurations for an agent
type CallbackContext ¶ added in v0.14.0
type CallbackContext struct {
// AgentName is the name of the agent being executed
AgentName string
// InvocationID uniquely identifies this execution invocation
InvocationID string
// TaskID is the ID of the current task being processed
TaskID string
// ContextID is the conversation context ID
ContextID string
// State provides access to session state that can be read and modified
State map[string]any
// Logger provides access to the logger for callback implementations
Logger *zap.Logger
}
CallbackContext provides context information to callback functions during execution
type CallbackExecutor ¶ added in v0.14.0
type CallbackExecutor interface {
// ExecuteBeforeAgent executes the before agent callback if configured
ExecuteBeforeAgent(ctx context.Context, callbackContext *CallbackContext) *types.Message
// ExecuteAfterAgent executes the after agent callback if configured
ExecuteAfterAgent(ctx context.Context, callbackContext *CallbackContext, agentOutput *types.Message) *types.Message
// ExecuteBeforeModel executes the before model callback if configured
ExecuteBeforeModel(ctx context.Context, callbackContext *CallbackContext, llmRequest *LLMRequest) *LLMResponse
// ExecuteAfterModel executes the after model callback if configured
ExecuteAfterModel(ctx context.Context, callbackContext *CallbackContext, llmResponse *LLMResponse) *LLMResponse
// ExecuteBeforeTool executes the before tool callback if configured
ExecuteBeforeTool(ctx context.Context, tool Tool, args map[string]any, toolContext *ToolContext) map[string]any
// ExecuteAfterTool executes the after tool callback if configured
ExecuteAfterTool(ctx context.Context, tool Tool, args map[string]any, toolContext *ToolContext, toolResult map[string]any) map[string]any
}
CallbackExecutor handles the execution of callbacks with proper flow control
func NewCallbackExecutor ¶ added in v0.14.0
func NewCallbackExecutor(config *CallbackConfig, logger *zap.Logger) CallbackExecutor
NewCallbackExecutor creates a new callback executor with the given configuration
type ContextKey ¶ added in v0.13.0
type ContextKey string
Context keys for injecting dependencies into tool execution
const ( TaskContextKey ContextKey = "task" ArtifactServiceContextKey ContextKey = "artifactService" UsageTrackerContextKey ContextKey = "usageTracker" )
type DefaultA2AProtocolHandler ¶ added in v0.9.4
type DefaultA2AProtocolHandler struct {
// contains filtered or unexported fields
}
DefaultA2AProtocolHandler implements the A2AProtocolHandler interface
func NewDefaultA2AProtocolHandler ¶ added in v0.9.4
func NewDefaultA2AProtocolHandler( logger *zap.Logger, storage Storage, taskManager TaskManager, responseSender ResponseSender, ) *DefaultA2AProtocolHandler
NewDefaultA2AProtocolHandler creates a new default A2A protocol handler
func (*DefaultA2AProtocolHandler) CreateTaskFromMessage ¶ added in v0.11.2
func (h *DefaultA2AProtocolHandler) CreateTaskFromMessage(ctx context.Context, params types.MessageSendParams) (*types.Task, error)
CreateTaskFromMessage creates a task directly from message parameters
func (*DefaultA2AProtocolHandler) HandleMessageSend ¶ added in v0.9.4
func (h *DefaultA2AProtocolHandler) HandleMessageSend(c *gin.Context, req types.JSONRPCRequest)
HandleMessageSend processes message/send requests
func (*DefaultA2AProtocolHandler) HandleMessageStream ¶ added in v0.9.4
func (h *DefaultA2AProtocolHandler) HandleMessageStream(c *gin.Context, req types.JSONRPCRequest, streamingHandler StreamableTaskHandler)
HandleMessageStream processes message/stream requests
func (*DefaultA2AProtocolHandler) HandleTaskCancel ¶ added in v0.9.4
func (h *DefaultA2AProtocolHandler) HandleTaskCancel(c *gin.Context, req types.JSONRPCRequest)
HandleTaskCancel processes tasks/cancel requests
func (*DefaultA2AProtocolHandler) HandleTaskGet ¶ added in v0.9.4
func (h *DefaultA2AProtocolHandler) HandleTaskGet(c *gin.Context, req types.JSONRPCRequest)
HandleTaskGet processes tasks/get requests
func (*DefaultA2AProtocolHandler) HandleTaskList ¶ added in v0.9.4
func (h *DefaultA2AProtocolHandler) HandleTaskList(c *gin.Context, req types.JSONRPCRequest)
HandleTaskList processes tasks/list requests
func (*DefaultA2AProtocolHandler) HandleTaskPushNotificationConfigDelete ¶ added in v0.9.4
func (h *DefaultA2AProtocolHandler) HandleTaskPushNotificationConfigDelete(c *gin.Context, req types.JSONRPCRequest)
HandleTaskPushNotificationConfigDelete processes tasks/pushNotificationConfig/delete requests
func (*DefaultA2AProtocolHandler) HandleTaskPushNotificationConfigGet ¶ added in v0.9.4
func (h *DefaultA2AProtocolHandler) HandleTaskPushNotificationConfigGet(c *gin.Context, req types.JSONRPCRequest)
HandleTaskPushNotificationConfigGet processes tasks/pushNotificationConfig/get requests
func (*DefaultA2AProtocolHandler) HandleTaskPushNotificationConfigList ¶ added in v0.9.4
func (h *DefaultA2AProtocolHandler) HandleTaskPushNotificationConfigList(c *gin.Context, req types.JSONRPCRequest)
HandleTaskPushNotificationConfigList processes tasks/pushNotificationConfig/list requests
func (*DefaultA2AProtocolHandler) HandleTaskPushNotificationConfigSet ¶ added in v0.9.4
func (h *DefaultA2AProtocolHandler) HandleTaskPushNotificationConfigSet(c *gin.Context, req types.JSONRPCRequest)
HandleTaskPushNotificationConfigSet processes tasks/pushNotificationConfig/set requests
type DefaultBackgroundTaskHandler ¶ added in v0.9.0
type DefaultBackgroundTaskHandler struct {
// contains filtered or unexported fields
}
DefaultBackgroundTaskHandler implements the TaskHandler interface optimized for background scenarios This handler automatically handles input-required pausing without requiring custom implementation
func NewDefaultBackgroundTaskHandler ¶ added in v0.9.0
func NewDefaultBackgroundTaskHandler(logger *zap.Logger, agent OpenAICompatibleAgent) *DefaultBackgroundTaskHandler
NewDefaultBackgroundTaskHandler creates a new default background task handler
func NewDefaultBackgroundTaskHandlerWithAgent ¶ added in v0.9.0
func NewDefaultBackgroundTaskHandlerWithAgent(logger *zap.Logger, agent OpenAICompatibleAgent) *DefaultBackgroundTaskHandler
NewDefaultBackgroundTaskHandlerWithAgent creates a new default background task handler with an agent
func (*DefaultBackgroundTaskHandler) GetAgent ¶ added in v0.9.0
func (bth *DefaultBackgroundTaskHandler) GetAgent() OpenAICompatibleAgent
GetAgent returns the configured agent
func (*DefaultBackgroundTaskHandler) HandleTask ¶ added in v0.9.0
func (bth *DefaultBackgroundTaskHandler) HandleTask(ctx context.Context, task *types.Task, message *types.Message) (*types.Task, error)
HandleTask processes a task with optimized logic for background scenarios
func (*DefaultBackgroundTaskHandler) SetAgent ¶ added in v0.9.0
func (bth *DefaultBackgroundTaskHandler) SetAgent(agent OpenAICompatibleAgent)
SetAgent sets the agent for the task handler
type DefaultCallbackExecutor ¶ added in v0.14.0
type DefaultCallbackExecutor struct {
// contains filtered or unexported fields
}
DefaultCallbackExecutor implements CallbackExecutor with proper error handling and logging
func (*DefaultCallbackExecutor) ExecuteAfterAgent ¶ added in v0.14.0
func (ce *DefaultCallbackExecutor) ExecuteAfterAgent(ctx context.Context, callbackContext *CallbackContext, agentOutput *types.Message) *types.Message
ExecuteAfterAgent executes all after agent callbacks if configured Chains the callbacks - each callback receives the output of the previous callback Returns the final modified output, or nil if no callbacks modify the output
func (*DefaultCallbackExecutor) ExecuteAfterModel ¶ added in v0.14.0
func (ce *DefaultCallbackExecutor) ExecuteAfterModel(ctx context.Context, callbackContext *CallbackContext, llmResponse *LLMResponse) *LLMResponse
ExecuteAfterModel executes all after model callbacks if configured Chains the callbacks - each callback receives the response from the previous callback Returns the final modified response, or nil if no callbacks modify the response
func (*DefaultCallbackExecutor) ExecuteAfterTool ¶ added in v0.14.0
func (ce *DefaultCallbackExecutor) ExecuteAfterTool(ctx context.Context, tool Tool, args map[string]any, toolContext *ToolContext, toolResult map[string]any) map[string]any
ExecuteAfterTool executes all after tool callbacks if configured Chains the callbacks - each callback receives the result from the previous callback Returns the final modified result, or nil if no callbacks modify the result
func (*DefaultCallbackExecutor) ExecuteBeforeAgent ¶ added in v0.14.0
func (ce *DefaultCallbackExecutor) ExecuteBeforeAgent(ctx context.Context, callbackContext *CallbackContext) *types.Message
ExecuteBeforeAgent executes all before agent callbacks if configured Returns the result of the first callback that returns a non-nil value (flow control) If all callbacks return nil, execution continues normally
func (*DefaultCallbackExecutor) ExecuteBeforeModel ¶ added in v0.14.0
func (ce *DefaultCallbackExecutor) ExecuteBeforeModel(ctx context.Context, callbackContext *CallbackContext, llmRequest *LLMRequest) *LLMResponse
ExecuteBeforeModel executes all before model callbacks if configured Returns the result of the first callback that returns a non-nil value (flow control) If all callbacks return nil, execution continues normally with LLM call
func (*DefaultCallbackExecutor) ExecuteBeforeTool ¶ added in v0.14.0
func (ce *DefaultCallbackExecutor) ExecuteBeforeTool(ctx context.Context, tool Tool, args map[string]any, toolContext *ToolContext) map[string]any
ExecuteBeforeTool executes all before tool callbacks if configured Returns the result of the first callback that returns a non-nil value (flow control) If all callbacks return nil, execution continues normally with tool call
type DefaultResponseSender ¶
type DefaultResponseSender struct {
// contains filtered or unexported fields
}
DefaultResponseSender implements the ResponseSender interface
func NewDefaultResponseSender ¶
func NewDefaultResponseSender(logger *zap.Logger) *DefaultResponseSender
NewDefaultResponseSender creates a new default response sender
func (*DefaultResponseSender) SendSuccess ¶
func (rs *DefaultResponseSender) SendSuccess(c *gin.Context, id any, result any)
SendSuccess sends a JSON-RPC success response
type DefaultStreamingTaskHandler ¶ added in v0.9.0
type DefaultStreamingTaskHandler struct {
// contains filtered or unexported fields
}
DefaultStreamingTaskHandler implements the TaskHandler interface optimized for streaming scenarios This handler automatically handles input-required pausing with streaming-aware behavior
func NewDefaultStreamingTaskHandler ¶ added in v0.9.0
func NewDefaultStreamingTaskHandler(logger *zap.Logger, agent OpenAICompatibleAgent) *DefaultStreamingTaskHandler
NewDefaultStreamingTaskHandler creates a new default streaming task handler
func (*DefaultStreamingTaskHandler) GetAgent ¶ added in v0.9.0
func (sth *DefaultStreamingTaskHandler) GetAgent() OpenAICompatibleAgent
GetAgent returns the configured agent
func (*DefaultStreamingTaskHandler) HandleStreamingTask ¶ added in v0.9.4
func (sth *DefaultStreamingTaskHandler) HandleStreamingTask(ctx context.Context, task *types.Task, message *types.Message) (<-chan cloudevents.Event, error)
HandleStreamingTask processes a task and returns a channel of CloudEvents It forwards events from the agent directly without conversion
func (*DefaultStreamingTaskHandler) SetAgent ¶ added in v0.9.0
func (sth *DefaultStreamingTaskHandler) SetAgent(agent OpenAICompatibleAgent)
SetAgent sets the agent for the task handler
type DefaultTaskManager ¶
type DefaultTaskManager struct {
// contains filtered or unexported fields
}
DefaultTaskManager implements the TaskManager interface
func NewDefaultTaskManager ¶
func NewDefaultTaskManager(logger *zap.Logger) *DefaultTaskManager
NewDefaultTaskManager creates a new default task manager
func NewDefaultTaskManagerWithNotifications ¶
func NewDefaultTaskManagerWithNotifications(logger *zap.Logger, notificationSender PushNotificationSender) *DefaultTaskManager
NewDefaultTaskManagerWithNotifications creates a new default task manager with push notification support
func NewDefaultTaskManagerWithStorage ¶ added in v0.8.0
func NewDefaultTaskManagerWithStorage(logger *zap.Logger, storage Storage) *DefaultTaskManager
NewDefaultTaskManagerWithStorage creates a new default task manager with custom storage
func (*DefaultTaskManager) CancelTask ¶
func (tm *DefaultTaskManager) CancelTask(taskID string) error
CancelTask cancels a task
func (*DefaultTaskManager) CleanupCompletedTasks ¶
func (tm *DefaultTaskManager) CleanupCompletedTasks()
CleanupCompletedTasks removes old completed tasks from memory
func (*DefaultTaskManager) CreateTask ¶
func (tm *DefaultTaskManager) CreateTask(contextID string, state types.TaskState, message *types.Message) *types.Task
CreateTask creates a new task with message history managed within the task
func (*DefaultTaskManager) CreateTaskWithHistory ¶ added in v0.8.0
func (tm *DefaultTaskManager) CreateTaskWithHistory(contextID string, state types.TaskState, message *types.Message, history []types.Message) *types.Task
CreateTaskWithHistory creates a new task with existing conversation history
func (*DefaultTaskManager) DeleteTaskPushNotificationConfig ¶
func (tm *DefaultTaskManager) DeleteTaskPushNotificationConfig(params types.DeleteTaskPushNotificationConfigParams) error
DeleteTaskPushNotificationConfig deletes a push notification configuration
func (*DefaultTaskManager) GetConversationHistory ¶
func (tm *DefaultTaskManager) GetConversationHistory(contextID string) []types.Message
GetConversationHistory retrieves conversation history for a context ID
func (*DefaultTaskManager) GetStorage ¶ added in v0.8.0
func (tm *DefaultTaskManager) GetStorage() Storage
GetStorage returns the storage interface used by this task manager
func (*DefaultTaskManager) GetTask ¶
func (tm *DefaultTaskManager) GetTask(taskID string) (*types.Task, bool)
GetTask retrieves a task by ID
func (*DefaultTaskManager) GetTaskPushNotificationConfig ¶
func (tm *DefaultTaskManager) GetTaskPushNotificationConfig(params types.GetTaskPushNotificationConfigParams) (*types.TaskPushNotificationConfig, error)
GetTaskPushNotificationConfig gets push notification configuration for a task
func (*DefaultTaskManager) IsTaskPaused ¶ added in v0.8.0
func (tm *DefaultTaskManager) IsTaskPaused(taskID string) (bool, error)
IsTaskPaused checks if a task is currently paused (in input-required state)
func (*DefaultTaskManager) ListTaskPushNotificationConfigs ¶
func (tm *DefaultTaskManager) ListTaskPushNotificationConfigs(params types.ListTaskPushNotificationConfigParams) ([]types.TaskPushNotificationConfig, error)
ListTaskPushNotificationConfigs lists all push notification configurations for a task
func (*DefaultTaskManager) ListTasks ¶
func (tm *DefaultTaskManager) ListTasks(params types.TaskListParams) (*types.TaskList, error)
ListTasks retrieves a list of tasks based on the provided parameters
func (*DefaultTaskManager) PauseTaskForInput ¶ added in v0.8.0
func (tm *DefaultTaskManager) PauseTaskForInput(taskID string, message *types.Message) error
PauseTaskForInput pauses a task waiting for additional input from the client
func (*DefaultTaskManager) PollTaskStatus ¶
func (tm *DefaultTaskManager) PollTaskStatus(taskID string, interval time.Duration, timeout time.Duration) (*types.Task, error)
PollTaskStatus periodically checks the status of a task until it is completed or failed
func (*DefaultTaskManager) RegisterTaskCancelFunc ¶ added in v0.15.1
func (tm *DefaultTaskManager) RegisterTaskCancelFunc(taskID string, cancelFunc context.CancelFunc)
RegisterTaskCancelFunc registers a cancel function for a running task
func (*DefaultTaskManager) ResumeTaskWithInput ¶ added in v0.8.0
func (tm *DefaultTaskManager) ResumeTaskWithInput(taskID string, message *types.Message) error
ResumeTaskWithInput resumes a paused task with new input from the client
func (*DefaultTaskManager) SetNotificationSender ¶
func (tm *DefaultTaskManager) SetNotificationSender(sender PushNotificationSender)
SetNotificationSender sets the push notification sender
func (*DefaultTaskManager) SetRetentionConfig ¶ added in v0.8.0
func (tm *DefaultTaskManager) SetRetentionConfig(retentionConfig config.TaskRetentionConfig)
SetRetentionConfig sets the task retention configuration and starts automatic cleanup
func (*DefaultTaskManager) SetTaskPushNotificationConfig ¶
func (tm *DefaultTaskManager) SetTaskPushNotificationConfig(config types.TaskPushNotificationConfig) (*types.TaskPushNotificationConfig, error)
SetTaskPushNotificationConfig sets push notification configuration for a task
func (*DefaultTaskManager) StopCleanup ¶ added in v0.8.0
func (tm *DefaultTaskManager) StopCleanup()
StopCleanup stops the automatic cleanup process
func (*DefaultTaskManager) UnregisterTaskCancelFunc ¶ added in v0.15.1
func (tm *DefaultTaskManager) UnregisterTaskCancelFunc(taskID string)
UnregisterTaskCancelFunc removes the cancel function for a completed task
func (*DefaultTaskManager) UpdateConversationHistory ¶
func (tm *DefaultTaskManager) UpdateConversationHistory(contextID string, messages []types.Message)
UpdateConversationHistory updates conversation history for a context ID
func (*DefaultTaskManager) UpdateError ¶ added in v0.8.0
func (tm *DefaultTaskManager) UpdateError(taskID string, message *types.Message) error
UpdateError updates a task to failed state with an error message
func (*DefaultTaskManager) UpdateState ¶ added in v0.8.0
func (tm *DefaultTaskManager) UpdateState(taskID string, state types.TaskState) error
UpdateState updates a task's state
func (*DefaultTaskManager) UpdateTask ¶
func (tm *DefaultTaskManager) UpdateTask(task *types.Task) error
UpdateTask updates a complete task (including history, state, and message)
type DefaultToolBox ¶
type DefaultToolBox struct {
// contains filtered or unexported fields
}
DefaultToolBox is a default implementation of ToolBox
func NewDefaultToolBox ¶
func NewDefaultToolBox(cfg *config.ToolBoxConfig) *DefaultToolBox
NewDefaultToolBox creates a new DefaultToolBox with built-in tools The config parameter determines which tools are enabled
func NewToolBox ¶ added in v0.8.0
func NewToolBox() *DefaultToolBox
NewToolBox creates a new empty DefaultToolBox
func (*DefaultToolBox) AddTool ¶
func (tb *DefaultToolBox) AddTool(tool Tool)
AddTool adds a tool to the toolbox
func (*DefaultToolBox) ExecuteTool ¶
func (tb *DefaultToolBox) ExecuteTool(ctx context.Context, toolName string, arguments map[string]any) (string, error)
ExecuteTool executes a tool by name with the provided arguments
func (*DefaultToolBox) GetTool ¶ added in v0.16.0
func (tb *DefaultToolBox) GetTool(toolName string) (Tool, bool)
GetTool retrieves a tool by name, returning the tool and a boolean indicating if it was found
func (*DefaultToolBox) GetToolNames ¶
func (tb *DefaultToolBox) GetToolNames() []string
GetToolNames returns a list of all available tool names
func (*DefaultToolBox) GetTools ¶
func (tb *DefaultToolBox) GetTools() []sdk.ChatCompletionTool
GetTools returns all available tools in OpenAI function call format
func (*DefaultToolBox) HasTool ¶
func (tb *DefaultToolBox) HasTool(toolName string) bool
HasTool checks if a tool with the given name exists
type EmptyMessagePartsError ¶
type EmptyMessagePartsError struct{}
EmptyMessagePartsError represents an error for empty message parts
func (*EmptyMessagePartsError) Error ¶
func (e *EmptyMessagePartsError) Error() string
type FilesystemArtifactStorage ¶ added in v0.12.0
type FilesystemArtifactStorage struct {
// contains filtered or unexported fields
}
FilesystemArtifactStorage implements ArtifactStorageProvider using local filesystem
func NewFilesystemArtifactStorage ¶ added in v0.12.0
func NewFilesystemArtifactStorage(cfg *config.ArtifactsStorageConfig) (*FilesystemArtifactStorage, error)
NewFilesystemArtifactStorage creates a new filesystem-based artifact storage provider
func (*FilesystemArtifactStorage) CleanupExpiredArtifacts ¶ added in v0.12.0
func (fs *FilesystemArtifactStorage) CleanupExpiredArtifacts(ctx context.Context, maxAge time.Duration) (int, error)
CleanupExpiredArtifacts removes artifacts older than maxAge
func (*FilesystemArtifactStorage) CleanupOldestArtifacts ¶ added in v0.12.0
func (fs *FilesystemArtifactStorage) CleanupOldestArtifacts(ctx context.Context, maxCount int) (int, error)
CleanupOldestArtifacts removes old artifacts keeping only maxCount per artifact ID
func (*FilesystemArtifactStorage) Close ¶ added in v0.12.0
func (fs *FilesystemArtifactStorage) Close() error
Close cleans up the filesystem storage (no-op for filesystem)
func (*FilesystemArtifactStorage) Delete ¶ added in v0.12.0
func (fs *FilesystemArtifactStorage) Delete(ctx context.Context, artifactID string, filename string) error
Delete removes an artifact from the filesystem
func (*FilesystemArtifactStorage) Exists ¶ added in v0.12.0
func (fs *FilesystemArtifactStorage) Exists(ctx context.Context, artifactID string, filename string) (bool, error)
Exists checks if an artifact exists in the filesystem
func (*FilesystemArtifactStorage) GetURL ¶ added in v0.12.0
func (fs *FilesystemArtifactStorage) GetURL(artifactID string, filename string) string
GetURL returns the public URL for accessing an artifact
func (*FilesystemArtifactStorage) Retrieve ¶ added in v0.12.0
func (fs *FilesystemArtifactStorage) Retrieve(ctx context.Context, artifactID string, filename string) (io.ReadCloser, error)
Retrieve retrieves an artifact from the local filesystem
type HTTPPushNotificationSender ¶
type HTTPPushNotificationSender struct {
// contains filtered or unexported fields
}
HTTPPushNotificationSender implements push notifications via HTTP webhooks
func NewHTTPPushNotificationSender ¶
func NewHTTPPushNotificationSender(logger *zap.Logger) *HTTPPushNotificationSender
NewHTTPPushNotificationSender creates a new HTTP-based push notification sender
func (*HTTPPushNotificationSender) SendTaskUpdate ¶
func (s *HTTPPushNotificationSender) SendTaskUpdate(ctx context.Context, config types.PushNotificationConfig, task *types.Task) error
SendTaskUpdate sends a push notification about a task update
type InMemoryStorage ¶ added in v0.8.0
type InMemoryStorage struct {
// contains filtered or unexported fields
}
InMemoryStorage implements Storage interface using in-memory storage
func NewInMemoryStorage ¶ added in v0.8.0
func NewInMemoryStorage(logger *zap.Logger, maxConversationHistory int) *InMemoryStorage
NewInMemoryStorage creates a new in-memory storage instance
func (*InMemoryStorage) CleanupCompletedTasks ¶ added in v0.8.0
func (s *InMemoryStorage) CleanupCompletedTasks() int
CleanupCompletedTasks removes completed, failed, and canceled tasks
func (*InMemoryStorage) CleanupOldConversations ¶ added in v0.8.0
func (s *InMemoryStorage) CleanupOldConversations(maxAge int64) int
CleanupOldConversations removes conversations older than maxAge (in seconds)
func (*InMemoryStorage) CleanupTasksWithRetention ¶ added in v0.8.0
func (s *InMemoryStorage) CleanupTasksWithRetention(maxCompleted, maxFailed int) int
CleanupTasksWithRetention removes old completed and failed tasks while keeping the specified number of most recent ones
func (*InMemoryStorage) ClearQueue ¶ added in v0.8.0
func (s *InMemoryStorage) ClearQueue() error
ClearQueue removes all tasks from the queue
func (*InMemoryStorage) CreateActiveTask ¶ added in v0.8.0
func (s *InMemoryStorage) CreateActiveTask(task *types.Task) error
CreateActiveTask creates a new active task in the active tasks storage
func (*InMemoryStorage) DeleteContext ¶ added in v0.8.0
func (s *InMemoryStorage) DeleteContext(contextID string) error
DeleteContext deletes all tasks for a context (not applicable since no conversation history)
func (*InMemoryStorage) DeleteContextAndTasks ¶ added in v0.8.0
func (s *InMemoryStorage) DeleteContextAndTasks(contextID string) error
DeleteContextAndTasks deletes all tasks for a context
func (*InMemoryStorage) DeleteTask ¶ added in v0.8.0
func (s *InMemoryStorage) DeleteTask(taskID string) error
DeleteTask deletes a task from dead letter queue and cleans up context mapping
func (*InMemoryStorage) DequeueTask ¶ added in v0.8.0
func (s *InMemoryStorage) DequeueTask(ctx context.Context) (*QueuedTask, error)
DequeueTask retrieves and removes the next task from the processing queue Blocks until a task is available or context is cancelled
func (*InMemoryStorage) EnqueueTask ¶ added in v0.8.0
func (s *InMemoryStorage) EnqueueTask(task *types.Task, requestID any) error
EnqueueTask adds a task to the processing queue
func (*InMemoryStorage) GetActiveTask ¶ added in v0.8.0
func (s *InMemoryStorage) GetActiveTask(taskID string) (*types.Task, error)
GetActiveTask retrieves an active task by ID (from queue or processing)
func (*InMemoryStorage) GetContexts ¶ added in v0.8.0
func (s *InMemoryStorage) GetContexts() []string
GetContexts returns all context IDs that have tasks (both active and dead letter)
func (*InMemoryStorage) GetContextsWithTasks ¶ added in v0.8.0
func (s *InMemoryStorage) GetContextsWithTasks() []string
GetContextsWithTasks returns all context IDs that have tasks
func (*InMemoryStorage) GetQueueLength ¶ added in v0.8.0
func (s *InMemoryStorage) GetQueueLength() int
GetQueueLength returns the current number of tasks in the queue
func (*InMemoryStorage) GetStats ¶ added in v0.8.0
func (s *InMemoryStorage) GetStats() StorageStats
GetStats provides statistics about the storage
func (*InMemoryStorage) GetTask ¶ added in v0.8.0
func (s *InMemoryStorage) GetTask(taskID string) (*types.Task, bool)
GetTask retrieves a task by ID from dead letter queue
func (*InMemoryStorage) GetTaskByContextAndID ¶ added in v0.8.0
func (s *InMemoryStorage) GetTaskByContextAndID(contextID, taskID string) (*types.Task, bool)
GetTaskByContextAndID retrieves a task by context ID and task ID from dead letter queue
func (*InMemoryStorage) ListTasks ¶ added in v0.8.0
func (s *InMemoryStorage) ListTasks(filter TaskFilter) ([]*types.Task, error)
ListTasks retrieves a list of tasks based on the provided filter from both active and dead letter queues
func (*InMemoryStorage) ListTasksByContext ¶ added in v0.8.0
func (s *InMemoryStorage) ListTasksByContext(contextID string, filter TaskFilter) ([]*types.Task, error)
ListTasksByContext retrieves tasks for a specific context with filtering from dead letter queue
func (*InMemoryStorage) StoreDeadLetterTask ¶ added in v0.8.0
func (s *InMemoryStorage) StoreDeadLetterTask(task *types.Task) error
StoreDeadLetterTask stores a completed/failed task in the dead letter queue for audit
func (*InMemoryStorage) UpdateActiveTask ¶ added in v0.8.0
func (s *InMemoryStorage) UpdateActiveTask(task *types.Task) error
UpdateActiveTask updates an active task's metadata
type InMemoryStorageFactory ¶ added in v0.9.0
type InMemoryStorageFactory struct{}
InMemoryStorageFactory implements StorageFactory for in-memory storage
func (*InMemoryStorageFactory) CreateStorage ¶ added in v0.9.0
func (f *InMemoryStorageFactory) CreateStorage(ctx context.Context, config config.QueueConfig, logger *zap.Logger) (Storage, error)
CreateStorage creates an in-memory storage instance
func (*InMemoryStorageFactory) SupportedProvider ¶ added in v0.9.0
func (f *InMemoryStorageFactory) SupportedProvider() string
SupportedProvider returns the provider name
func (*InMemoryStorageFactory) ValidateConfig ¶ added in v0.9.0
func (f *InMemoryStorageFactory) ValidateConfig(config config.QueueConfig) error
ValidateConfig validates the configuration for in-memory storage
type JRPCErrorCode ¶
type JRPCErrorCode int
JRPCErrorCode represents JSON-RPC error codes
const ( ErrParseError JRPCErrorCode = -32700 ErrInvalidRequest JRPCErrorCode = -32600 ErrMethodNotFound JRPCErrorCode = -32601 ErrInvalidParams JRPCErrorCode = -32602 ErrInternalError JRPCErrorCode = -32603 ErrServerError JRPCErrorCode = -32000 )
type LLMClient ¶
type LLMClient interface {
// CreateChatCompletion sends a chat completion request using SDK messages
CreateChatCompletion(ctx context.Context, messages []sdk.Message, tools ...sdk.ChatCompletionTool) (*sdk.CreateChatCompletionResponse, error)
// CreateStreamingChatCompletion sends a streaming chat completion request using SDK messages
CreateStreamingChatCompletion(ctx context.Context, messages []sdk.Message, tools ...sdk.ChatCompletionTool) (<-chan *sdk.CreateChatCompletionStreamResponse, <-chan error)
}
LLMClient defines the interface for Language Model clients
type LLMConfig ¶ added in v0.14.0
type LLMConfig struct {
// SystemInstruction is the system prompt/instruction for the LLM
SystemInstruction *types.Message
// Temperature controls randomness in LLM responses (0.0-2.0)
Temperature *float64
// MaxTokens is the maximum number of tokens to generate
MaxTokens *int
}
LLMConfig contains configuration for LLM requests
type LLMRequest ¶ added in v0.14.0
type LLMRequest struct {
// Contents are the conversation messages being sent to the LLM
Contents []types.Message
// Config contains LLM configuration like system instruction, temperature, etc.
Config *LLMConfig
}
LLMRequest represents a request to be sent to the LLM
type LLMResponse ¶ added in v0.14.0
type LLMResponse struct {
// Content is the main response content from the LLM
Content *types.Message
}
LLMResponse represents a response from the LLM
type MinIOArtifactStorage ¶ added in v0.12.0
type MinIOArtifactStorage struct {
// contains filtered or unexported fields
}
MinIOArtifactStorage implements ArtifactStorageProvider using MinIO/S3
func NewMinIOArtifactStorage ¶ added in v0.12.0
func NewMinIOArtifactStorage(cfg *config.ArtifactsStorageConfig) (*MinIOArtifactStorage, error)
NewMinIOArtifactStorage creates a new MinIO-based artifact storage provider
func (*MinIOArtifactStorage) CleanupExpiredArtifacts ¶ added in v0.12.0
func (m *MinIOArtifactStorage) CleanupExpiredArtifacts(ctx context.Context, maxAge time.Duration) (int, error)
CleanupExpiredArtifacts removes artifacts older than maxAge
func (*MinIOArtifactStorage) CleanupOldestArtifacts ¶ added in v0.12.0
func (m *MinIOArtifactStorage) CleanupOldestArtifacts(ctx context.Context, maxCount int) (int, error)
CleanupOldestArtifacts removes old artifacts keeping only maxCount per artifact ID
func (*MinIOArtifactStorage) Close ¶ added in v0.12.0
func (m *MinIOArtifactStorage) Close() error
Close closes the MinIO connection
func (*MinIOArtifactStorage) Delete ¶ added in v0.12.0
func (m *MinIOArtifactStorage) Delete(ctx context.Context, artifactID string, filename string) error
Delete removes an artifact from MinIO
func (*MinIOArtifactStorage) Exists ¶ added in v0.12.0
func (m *MinIOArtifactStorage) Exists(ctx context.Context, artifactID string, filename string) (bool, error)
Exists checks if an artifact exists in MinIO
func (*MinIOArtifactStorage) GetURL ¶ added in v0.12.0
func (m *MinIOArtifactStorage) GetURL(artifactID string, filename string) string
GetURL returns the public URL for accessing an artifact
func (*MinIOArtifactStorage) Retrieve ¶ added in v0.12.0
func (m *MinIOArtifactStorage) Retrieve(ctx context.Context, artifactID string, filename string) (io.ReadCloser, error)
Retrieve retrieves an artifact from MinIO
type OpenAICompatibleAgent ¶
type OpenAICompatibleAgent interface {
// RunWithStream processes a conversation and returns a streaming response
// Uses the agent's configured toolbox for tool execution
// Events are emitted for deltas, tool execution, completions, and errors
RunWithStream(ctx context.Context, messages []types.Message) (<-chan cloudevents.Event, error)
}
OpenAICompatibleAgent represents an agent that can interact with OpenAI-compatible LLM APIs and execute tools The agent is stateless and does not maintain conversation history Tools are configured during agent creation via the toolbox All agent execution is event-driven via RunWithStream
type OpenAICompatibleAgentImpl ¶ added in v0.8.0
type OpenAICompatibleAgentImpl struct {
// contains filtered or unexported fields
}
OpenAICompatibleAgentImpl is the implementation of OpenAICompatibleAgent This implementation is stateless and does not maintain conversation history
func AgentWithConfig ¶
func AgentWithConfig(logger *zap.Logger, config *config.AgentConfig) (*OpenAICompatibleAgentImpl, error)
AgentWithConfig creates an agent with the provided configuration
func AgentWithLLM ¶
func AgentWithLLM(logger *zap.Logger, llmClient LLMClient) (*OpenAICompatibleAgentImpl, error)
AgentWithLLM creates an agent with a pre-configured LLM client
func FullyConfiguredAgent ¶
func FullyConfiguredAgent(logger *zap.Logger, config *config.AgentConfig, llmClient LLMClient, toolBox ToolBox) (*OpenAICompatibleAgentImpl, error)
FullyConfiguredAgent creates an agent with all components configured
func NewOpenAICompatibleAgent ¶ added in v0.8.0
func NewOpenAICompatibleAgent(logger *zap.Logger) *OpenAICompatibleAgentImpl
NewOpenAICompatibleAgent creates a new OpenAICompatibleAgentImpl
func NewOpenAICompatibleAgentWithConfig ¶
func NewOpenAICompatibleAgentWithConfig(logger *zap.Logger, cfg *config.AgentConfig) *OpenAICompatibleAgentImpl
NewOpenAICompatibleAgentWithConfig creates a new OpenAICompatibleAgentImpl with configuration
func NewOpenAICompatibleAgentWithLLM ¶
func NewOpenAICompatibleAgentWithLLM(logger *zap.Logger, llmClient LLMClient) *OpenAICompatibleAgentImpl
NewOpenAICompatibleAgentWithLLM creates a new agent with an LLM client
func NewOpenAICompatibleAgentWithLLMConfig ¶ added in v0.8.0
func NewOpenAICompatibleAgentWithLLMConfig(logger *zap.Logger, config *config.AgentConfig) (*OpenAICompatibleAgentImpl, error)
NewOpenAICompatibleAgentWithLLMConfig creates a new agent with LLM configuration
func SimpleAgent ¶
func SimpleAgent(logger *zap.Logger) (*OpenAICompatibleAgentImpl, error)
SimpleAgent creates a basic agent with default configuration
func (*OpenAICompatibleAgentImpl) GetCallbackExecutor ¶ added in v0.14.0
func (a *OpenAICompatibleAgentImpl) GetCallbackExecutor() CallbackExecutor
GetCallbackExecutor returns the callback executor for the agent if available or a provided default
func (*OpenAICompatibleAgentImpl) RunWithStream ¶ added in v0.8.0
func (a *OpenAICompatibleAgentImpl) RunWithStream(ctx context.Context, messages []types.Message) (<-chan cloudevents.Event, error)
RunWithStream processes a conversation and returns a streaming response with iterative tool calling support
func (*OpenAICompatibleAgentImpl) SetCallbackExecutor ¶ added in v0.14.0
func (a *OpenAICompatibleAgentImpl) SetCallbackExecutor(executor CallbackExecutor)
SetCallbackExecutor sets the callback executor for the agent
func (*OpenAICompatibleAgentImpl) SetLLMClient ¶ added in v0.8.0
func (a *OpenAICompatibleAgentImpl) SetLLMClient(client LLMClient)
SetLLMClient sets the LLM client for the agent
func (*OpenAICompatibleAgentImpl) SetToolBox ¶ added in v0.8.0
func (a *OpenAICompatibleAgentImpl) SetToolBox(toolBox ToolBox)
SetToolBox sets the tool box for the agent
type OpenAICompatibleLLMClient ¶
type OpenAICompatibleLLMClient struct {
// contains filtered or unexported fields
}
OpenAICompatibleLLMClient implements LLMClient using an OpenAI-compatible API via the Inference Gateway SDK
func NewOpenAICompatibleLLMClient ¶
func NewOpenAICompatibleLLMClient(cfg *config.AgentConfig, logger *zap.Logger) (*OpenAICompatibleLLMClient, error)
NewOpenAICompatibleLLMClient creates a new OpenAI-compatible LLM client
func (*OpenAICompatibleLLMClient) CreateChatCompletion ¶
func (c *OpenAICompatibleLLMClient) CreateChatCompletion(ctx context.Context, messages []sdk.Message, tools ...sdk.ChatCompletionTool) (*sdk.CreateChatCompletionResponse, error)
CreateChatCompletion implements LLMClient.CreateChatCompletion using SDK messages
func (*OpenAICompatibleLLMClient) CreateStreamingChatCompletion ¶
func (c *OpenAICompatibleLLMClient) CreateStreamingChatCompletion(ctx context.Context, messages []sdk.Message, tools ...sdk.ChatCompletionTool) (<-chan *sdk.CreateChatCompletionStreamResponse, <-chan error)
CreateStreamingChatCompletion implements LLMClient.CreateStreamingChatCompletion using SDK messages
type PushNotificationSender ¶
type PushNotificationSender interface {
SendTaskUpdate(ctx context.Context, config types.PushNotificationConfig, task *types.Task) error
}
PushNotificationSender handles sending push notifications
type QueuedTask ¶
QueuedTask represents a task in the processing queue
type RedisStorage ¶ added in v0.8.0
type RedisStorage struct {
// contains filtered or unexported fields
}
RedisStorage implements Storage interface using Redis
func (*RedisStorage) CleanupCompletedTasks ¶ added in v0.8.0
func (s *RedisStorage) CleanupCompletedTasks() int
CleanupCompletedTasks removes completed, failed, and canceled tasks
func (*RedisStorage) CleanupTasksWithRetention ¶ added in v0.9.0
func (s *RedisStorage) CleanupTasksWithRetention(maxCompleted, maxFailed int) int
CleanupTasksWithRetention removes old completed and failed tasks while keeping specified number
func (*RedisStorage) ClearQueue ¶ added in v0.9.0
func (s *RedisStorage) ClearQueue() error
ClearQueue removes all tasks from the queue
func (*RedisStorage) CreateActiveTask ¶ added in v0.9.0
func (s *RedisStorage) CreateActiveTask(task *types.Task) error
CreateActiveTask creates a new active task
func (*RedisStorage) DeleteContext ¶ added in v0.8.0
func (s *RedisStorage) DeleteContext(contextID string) error
DeleteContext deletes all tasks for a context
func (*RedisStorage) DeleteContextAndTasks ¶ added in v0.8.0
func (s *RedisStorage) DeleteContextAndTasks(contextID string) error
DeleteContextAndTasks deletes all tasks for a context
func (*RedisStorage) DeleteTask ¶ added in v0.8.0
func (s *RedisStorage) DeleteTask(taskID string) error
DeleteTask deletes a task from dead letter queue and cleans up context mapping
func (*RedisStorage) DequeueTask ¶ added in v0.9.0
func (s *RedisStorage) DequeueTask(ctx context.Context) (*QueuedTask, error)
DequeueTask retrieves and removes the next task from the processing queue
func (*RedisStorage) EnqueueTask ¶ added in v0.9.0
func (s *RedisStorage) EnqueueTask(task *types.Task, requestID any) error
EnqueueTask adds a task to the processing queue
func (*RedisStorage) GetActiveTask ¶ added in v0.9.0
func (s *RedisStorage) GetActiveTask(taskID string) (*types.Task, error)
GetActiveTask retrieves an active task by ID
func (*RedisStorage) GetContexts ¶ added in v0.8.0
func (s *RedisStorage) GetContexts() []string
GetContexts returns all context IDs that have tasks
func (*RedisStorage) GetContextsWithTasks ¶ added in v0.8.0
func (s *RedisStorage) GetContextsWithTasks() []string
GetContextsWithTasks returns all context IDs that have tasks
func (*RedisStorage) GetQueueLength ¶ added in v0.9.0
func (s *RedisStorage) GetQueueLength() int
GetQueueLength returns the current number of tasks in the queue
func (*RedisStorage) GetStats ¶ added in v0.8.0
func (s *RedisStorage) GetStats() StorageStats
GetStats provides statistics about the storage
func (*RedisStorage) GetTask ¶ added in v0.8.0
func (s *RedisStorage) GetTask(taskID string) (*types.Task, bool)
GetTask retrieves a task by ID from dead letter queue
func (*RedisStorage) GetTaskByContextAndID ¶ added in v0.8.0
func (s *RedisStorage) GetTaskByContextAndID(contextID, taskID string) (*types.Task, bool)
GetTaskByContextAndID retrieves a task by context ID and task ID
func (*RedisStorage) ListTasks ¶ added in v0.8.0
func (s *RedisStorage) ListTasks(filter TaskFilter) ([]*types.Task, error)
ListTasks retrieves a list of tasks based on the provided filter
func (*RedisStorage) ListTasksByContext ¶ added in v0.8.0
func (s *RedisStorage) ListTasksByContext(contextID string, filter TaskFilter) ([]*types.Task, error)
ListTasksByContext retrieves tasks for a specific context
func (*RedisStorage) StoreDeadLetterTask ¶ added in v0.9.0
func (s *RedisStorage) StoreDeadLetterTask(task *types.Task) error
StoreDeadLetterTask stores a completed/failed task in the dead letter queue
func (*RedisStorage) UpdateActiveTask ¶ added in v0.9.0
func (s *RedisStorage) UpdateActiveTask(task *types.Task) error
UpdateActiveTask updates an active task's metadata
type RedisStorageFactory ¶ added in v0.9.0
type RedisStorageFactory struct{}
RedisStorageFactory implements StorageFactory for Redis storage
func (*RedisStorageFactory) CreateStorage ¶ added in v0.9.0
func (f *RedisStorageFactory) CreateStorage(ctx context.Context, config config.QueueConfig, logger *zap.Logger) (Storage, error)
CreateStorage creates a Redis storage instance
func (*RedisStorageFactory) SupportedProvider ¶ added in v0.9.0
func (f *RedisStorageFactory) SupportedProvider() string
SupportedProvider returns the provider name
func (*RedisStorageFactory) ValidateConfig ¶ added in v0.9.0
func (f *RedisStorageFactory) ValidateConfig(config config.QueueConfig) error
ValidateConfig validates the configuration for Redis storage
type ResponseSender ¶
type ResponseSender interface {
// SendSuccess sends a JSON-RPC success response
SendSuccess(c *gin.Context, id any, result any)
// SendError sends a JSON-RPC error response
SendError(c *gin.Context, id any, code int, message string)
}
ResponseSender defines how to send JSON-RPC responses
type Storage ¶ added in v0.8.0
type Storage interface {
// Task Queue Management (primary storage for active tasks)
EnqueueTask(task *types.Task, requestID any) error
DequeueTask(ctx context.Context) (*QueuedTask, error)
GetQueueLength() int
ClearQueue() error
// Active Task Queries (for tasks currently in queue or being processed)
GetActiveTask(taskID string) (*types.Task, error)
CreateActiveTask(task *types.Task) error
UpdateActiveTask(task *types.Task) error
// Dead Letter Queue (completed/failed tasks with full history for audit)
StoreDeadLetterTask(task *types.Task) error
GetTask(taskID string) (*types.Task, bool)
GetTaskByContextAndID(contextID, taskID string) (*types.Task, bool)
DeleteTask(taskID string) error
ListTasks(filter TaskFilter) ([]*types.Task, error)
ListTasksByContext(contextID string, filter TaskFilter) ([]*types.Task, error)
// Context Management (contexts are implicit from tasks)
GetContexts() []string
GetContextsWithTasks() []string
DeleteContext(contextID string) error
DeleteContextAndTasks(contextID string) error
// Cleanup Operations
CleanupCompletedTasks() int
CleanupTasksWithRetention(maxCompleted, maxFailed int) int
// Health and Statistics
GetStats() StorageStats
}
Storage defines the interface for queue-centric task management Tasks carry their complete message history and flow through: Queue -> Processing -> Dead Letter
func CreateStorage ¶ added in v0.9.0
func CreateStorage(ctx context.Context, config config.QueueConfig, logger *zap.Logger) (Storage, error)
CreateStorage creates a storage instance using the registered factories
type StorageFactory ¶ added in v0.9.0
type StorageFactory interface {
// CreateStorage creates a storage instance with the given configuration
CreateStorage(ctx context.Context, config config.QueueConfig, logger *zap.Logger) (Storage, error)
// SupportedProvider returns the provider name this factory supports
SupportedProvider() string
// ValidateConfig validates the configuration for this provider
ValidateConfig(config config.QueueConfig) error
}
StorageFactory defines the interface for creating storage instances
func GetStorageProvider ¶ added in v0.9.0
func GetStorageProvider(provider string) (StorageFactory, error)
GetStorageProvider retrieves a storage provider factory
type StorageFactoryRegistry ¶ added in v0.9.0
type StorageFactoryRegistry struct {
// contains filtered or unexported fields
}
StorageFactoryRegistry manages registered storage providers
func (*StorageFactoryRegistry) CreateStorage ¶ added in v0.9.0
func (r *StorageFactoryRegistry) CreateStorage(ctx context.Context, config config.QueueConfig, logger *zap.Logger) (Storage, error)
CreateStorage creates a storage instance using the appropriate factory
func (*StorageFactoryRegistry) GetFactory ¶ added in v0.9.0
func (r *StorageFactoryRegistry) GetFactory(provider string) (StorageFactory, error)
GetFactory retrieves a factory for a provider
func (*StorageFactoryRegistry) GetProviders ¶ added in v0.9.0
func (r *StorageFactoryRegistry) GetProviders() []string
GetProviders returns a list of all registered provider names
func (*StorageFactoryRegistry) Register ¶ added in v0.9.0
func (r *StorageFactoryRegistry) Register(provider string, factory StorageFactory)
Register registers a factory for a provider
type StorageStats ¶ added in v0.8.0
type StorageStats struct {
TotalTasks int `json:"total_tasks"`
TasksByState map[string]int `json:"tasks_by_state"`
TotalContexts int `json:"total_contexts"`
ContextsWithTasks int `json:"contexts_with_tasks"`
AverageTasksPerContext float64 `json:"average_tasks_per_context"`
TotalMessages int `json:"total_messages"`
AverageMessagesPerContext float64 `json:"average_messages_per_context"`
}
StorageStats provides statistics about the storage
type StreamableTaskHandler ¶ added in v0.9.4
type StreamableTaskHandler interface {
// HandleStreamingTask processes a task and returns a channel of CloudEvents
// The channel should be closed when streaming is complete
// Event flow: agent → handler → protocol handler → client
HandleStreamingTask(ctx context.Context, task *types.Task, message *types.Message) (<-chan cloudevents.Event, error)
// SetAgent sets the OpenAI-compatible agent for the task handler
SetAgent(agent OpenAICompatibleAgent)
// GetAgent returns the configured OpenAI-compatible agent
GetAgent() OpenAICompatibleAgent
}
StreamableTaskHandler defines how to handle streaming task processing This interface should be implemented by streaming task handlers that need to return real-time data
type StreamingNotImplementedError ¶
type StreamingNotImplementedError struct{}
StreamingNotImplementedError represents an error for unimplemented streaming
func (*StreamingNotImplementedError) Error ¶
func (e *StreamingNotImplementedError) Error() string
type TaskFilter ¶ added in v0.8.0
type TaskFilter struct {
State *types.TaskState
ContextID *string
Limit int
Offset int
SortBy TaskSortField
SortOrder SortOrder
}
TaskFilter defines filtering criteria for listing tasks
type TaskHandler ¶
type TaskHandler interface {
// HandleTask processes a task and returns the updated task
// This is where the main business logic should be implemented
HandleTask(ctx context.Context, task *types.Task, message *types.Message) (*types.Task, error)
// SetAgent sets the OpenAI-compatible agent for the task handler
SetAgent(agent OpenAICompatibleAgent)
// GetAgent returns the configured OpenAI-compatible agent
GetAgent() OpenAICompatibleAgent
}
TaskHandler defines how to handle task processing This interface should be implemented by domain-specific task handlers
type TaskManager ¶
type TaskManager interface {
// CreateTask creates a new task and stores it
CreateTask(contextID string, state types.TaskState, message *types.Message) *types.Task
// CreateTaskWithHistory creates a new task with existing conversation history
CreateTaskWithHistory(contextID string, state types.TaskState, message *types.Message, history []types.Message) *types.Task
// UpdateState updates a task's state
UpdateState(taskID string, state types.TaskState) error
// UpdateTask updates a complete task (including history, state, and message)
UpdateTask(task *types.Task) error
// UpdateError updates a task to failed state with an error message
UpdateError(taskID string, message *types.Message) error
// GetTask retrieves a task by ID
GetTask(taskID string) (*types.Task, bool)
// ListTasks retrieves a list of tasks based on the provided parameters
ListTasks(params types.TaskListParams) (*types.TaskList, error)
// CancelTask cancels a task
CancelTask(taskID string) error
// CleanupCompletedTasks removes old completed tasks from memory
CleanupCompletedTasks()
// PollTaskStatus periodically checks the status of a task until it is completed or failed
PollTaskStatus(taskID string, interval time.Duration, timeout time.Duration) (*types.Task, error)
// GetConversationHistory retrieves conversation history for a context ID
GetConversationHistory(contextID string) []types.Message
// UpdateConversationHistory updates conversation history for a context ID
UpdateConversationHistory(contextID string, messages []types.Message)
// SetTaskPushNotificationConfig sets push notification configuration for a task
SetTaskPushNotificationConfig(config types.TaskPushNotificationConfig) (*types.TaskPushNotificationConfig, error)
// GetTaskPushNotificationConfig gets push notification configuration for a task
GetTaskPushNotificationConfig(params types.GetTaskPushNotificationConfigParams) (*types.TaskPushNotificationConfig, error)
// ListTaskPushNotificationConfigs lists all push notification configurations for a task
ListTaskPushNotificationConfigs(params types.ListTaskPushNotificationConfigParams) ([]types.TaskPushNotificationConfig, error)
// DeleteTaskPushNotificationConfig deletes a push notification configuration
DeleteTaskPushNotificationConfig(params types.DeleteTaskPushNotificationConfigParams) error
// PauseTaskForInput pauses a task waiting for additional input from the client
PauseTaskForInput(taskID string, message *types.Message) error
// ResumeTaskWithInput resumes a paused task with new input from the client
ResumeTaskWithInput(taskID string, message *types.Message) error
// IsTaskPaused checks if a task is currently paused (in input-required state)
IsTaskPaused(taskID string) (bool, error)
// SetRetentionConfig sets the task retention configuration and starts automatic cleanup
SetRetentionConfig(retentionConfig config.TaskRetentionConfig)
// StopCleanup stops the automatic cleanup process
StopCleanup()
}
TaskManager defines task lifecycle management
type TaskNotCancelableError ¶ added in v0.8.0
TaskNotCancelableError represents an error when a task cannot be canceled due to its current state
func (*TaskNotCancelableError) Error ¶ added in v0.8.0
func (e *TaskNotCancelableError) Error() string
type TaskNotFoundError ¶
type TaskNotFoundError struct {
TaskID string
}
TaskNotFoundError represents an error when a task is not found
func (*TaskNotFoundError) Error ¶
func (e *TaskNotFoundError) Error() string
type TaskResultProcessor ¶
type TaskResultProcessor interface {
// ProcessToolResult processes a tool call result and returns a completion message if the task should be completed
// Returns nil if the task should continue processing
ProcessToolResult(toolCallResult string) *types.Message
}
TaskResultProcessor defines how to process tool call results for task completion
type TaskSortField ¶ added in v0.8.0
type TaskSortField string
TaskSortField defines the fields that can be used for sorting tasks
const ( TaskSortFieldCreatedAt TaskSortField = "created_at" TaskSortFieldUpdatedAt TaskSortField = "updated_at" TaskSortFieldState TaskSortField = "state" TaskSortFieldContextID TaskSortField = "context_id" )
type TaskUpdateNotification ¶
type TaskUpdateNotification struct {
Type string `json:"type"`
TaskID string `json:"taskId"`
State string `json:"state"`
Timestamp string `json:"timestamp"`
Task *types.Task `json:"task,omitempty"`
}
TaskUpdateNotification represents the payload sent to webhook URLs
type Tool ¶
type Tool interface {
// GetName returns the name of the tool
GetName() string
// GetDescription returns a description of what the tool does
GetDescription() string
// GetParameters returns the JSON schema for the tool parameters
GetParameters() map[string]any
// Execute runs the tool with the provided arguments
Execute(ctx context.Context, arguments map[string]any) (string, error)
}
Tool represents a single tool that can be executed
type ToolBox ¶
type ToolBox interface {
// GetTools returns all available tools in OpenAI function call format
GetTools() []sdk.ChatCompletionTool
// ExecuteTool executes a tool by name with the provided arguments
// Returns the tool result as a string and any error that occurred
ExecuteTool(ctx context.Context, toolName string, arguments map[string]any) (string, error)
// GetToolNames returns a list of all available tool names
GetToolNames() []string
// HasTool checks if a tool with the given name exists
HasTool(toolName string) bool
// GetTool retrieves a tool by name, returning the tool and a boolean indicating if it was found
GetTool(toolName string) (Tool, bool)
}
ToolBox defines the interface for a collection of tools that can be used by OpenAI-compatible agents
type ToolContext ¶ added in v0.14.0
type ToolContext struct {
// AgentName is the name of the agent executing the tool
AgentName string
// InvocationID uniquely identifies this execution invocation
InvocationID string
// TaskID is the ID of the current task being processed
TaskID string
// ContextID is the conversation context ID
ContextID string
// State provides access to session state that can be read and modified
State map[string]any
// Logger provides access to the logger for callback implementations
Logger *zap.Logger
}
ToolContext provides context information to tool-related callback functions
type ToolNotFoundError ¶
type ToolNotFoundError struct {
ToolName string
}
ToolNotFoundError represents an error when a requested tool is not found
func (*ToolNotFoundError) Error ¶
func (e *ToolNotFoundError) Error() string
type UsageTracker ¶ added in v0.17.0
type UsageTracker struct {
// contains filtered or unexported fields
}
UsageTracker tracks token usage and execution statistics during agent execution
func NewUsageTracker ¶ added in v0.17.0
func NewUsageTracker() *UsageTracker
NewUsageTracker creates a new usage tracker
func (*UsageTracker) AddMessages ¶ added in v0.17.0
func (ut *UsageTracker) AddMessages(count int)
AddMessages adds to the message count
func (*UsageTracker) AddTokenUsage ¶ added in v0.17.0
func (ut *UsageTracker) AddTokenUsage(usage sdk.CompletionUsage)
AddTokenUsage adds token usage from an LLM response
func (*UsageTracker) GetMetadata ¶ added in v0.17.0
func (ut *UsageTracker) GetMetadata() map[string]any
GetMetadata returns the collected metrics as a metadata map
func (*UsageTracker) HasUsage ¶ added in v0.17.0
func (ut *UsageTracker) HasUsage() bool
HasUsage returns true if any metrics have been collected
func (*UsageTracker) IncrementFailedTools ¶ added in v0.17.0
func (ut *UsageTracker) IncrementFailedTools()
IncrementFailedTools increments the failed tool counter
func (*UsageTracker) IncrementIteration ¶ added in v0.17.0
func (ut *UsageTracker) IncrementIteration()
IncrementIteration increments the iteration counter
func (*UsageTracker) IncrementToolCalls ¶ added in v0.17.0
func (ut *UsageTracker) IncrementToolCalls()
IncrementToolCalls increments the tool call counter
Source Files
¶
- agent.go
- agent_builder.go
- agent_llm_client.go
- agent_streamable.go
- agent_toolbox.go
- artifacts_server.go
- artifacts_server_builder.go
- artifacts_service.go
- artifacts_storage.go
- artifacts_storage_filesystem.go
- artifacts_storage_minio.go
- callbacks.go
- errors.go
- metadata.go
- push_notification_sender.go
- response_sender.go
- server.go
- server_builder.go
- storage.go
- storage_factory.go
- storage_redis.go
- task_handler.go
- task_manager.go
- test_helpers.go
- usage_tracker.go
- utils.go