Documentation
¶
Overview ¶
Package agentictools provides a tool executor processor component that routes tool calls to registered tool executors with filtering and timeout support.
Package agentictools provides the tool execution processor for the SemStreams agentic system.
Overview ¶
The agentic-tools processor executes tool calls from the agentic loop orchestrator. It receives ToolCall messages, dispatches them to registered tool executors, and publishes ToolResult messages back. The processor supports tool registration, allowlist filtering, per-execution timeouts, and concurrent execution.
This processor enables agents to interact with external systems (files, APIs, databases, etc.) through a well-defined tool interface.
Architecture ¶
The tools processor sits between the loop orchestrator and tool implementations:
┌───────────────┐ ┌────────────────┐ ┌──────────────────┐ │ agentic-loop │────▶│ agentic-tools │────▶│ Tool Executors │ │ │ │ (this pkg) │ │ (your code) │ │ │◀────│ │◀────│ │ └───────────────┘ └────────────────┘ └──────────────────┘ tool.execute.* Execute() read_file, query_db, tool.result.* call_api, etc.
ToolExecutor Interface ¶
Tools are implemented by satisfying the ToolExecutor interface:
type ToolExecutor interface {
Execute(ctx context.Context, call agentic.ToolCall) (agentic.ToolResult, error)
ListTools() []agentic.ToolDefinition
}
Example implementation:
type FileReader struct{}
func (f *FileReader) Execute(ctx context.Context, call agentic.ToolCall) (agentic.ToolResult, error) {
path, _ := call.Arguments["path"].(string)
// Respect context cancellation
select {
case <-ctx.Done():
return agentic.ToolResult{CallID: call.ID, Error: "cancelled"}, ctx.Err()
default:
}
content, err := os.ReadFile(path)
if err != nil {
return agentic.ToolResult{
CallID: call.ID,
Error: err.Error(),
}, nil
}
return agentic.ToolResult{
CallID: call.ID,
Content: string(content),
}, nil
}
func (f *FileReader) ListTools() []agentic.ToolDefinition {
return []agentic.ToolDefinition{
{
Name: "read_file",
Description: "Read the contents of a file",
Parameters: map[string]any{
"type": "object",
"properties": map[string]any{
"path": map[string]any{
"type": "string",
"description": "Path to the file",
},
},
"required": []string{"path"},
},
},
}
}
Tool Registration ¶
Tools can be registered in two ways:
1. Global registration via init() (preferred for reusable tools):
func init() {
agentictools.RegisterTool("my_tool", &MyToolExecutor{})
}
Global registration makes tools available to all agentic-tools components automatically. This pattern matches how components and rules are registered.
2. Per-component registration (for component-specific tools):
comp, _ := agentictools.NewComponent(rawConfig, deps)
toolsComp := comp.(*agentictools.Component)
err := toolsComp.RegisterToolExecutor(&FileReader{})
The processor extracts tool names from ListTools() for routing and validation.
ExecutorRegistry ¶
The ExecutorRegistry provides thread-safe tool management:
registry := NewExecutorRegistry()
// Register tools
registry.RegisterTool("read_file", &FileReader{})
registry.RegisterTool("query_db", &DatabaseQuerier{})
// Get executor by name
executor := registry.GetTool("read_file")
// List all available tools
tools := registry.ListTools()
// Execute a tool call
result, err := registry.Execute(ctx, toolCall)
The registry prevents duplicate registrations and returns descriptive errors for missing tools.
Tool Allowlist ¶
The processor supports allowlist filtering for security and control:
config := agentictools.Config{
AllowedTools: []string{"read_file", "list_dir"}, // Only these allowed
// ...
}
Behavior:
- Empty/nil AllowedTools: All registered tools are allowed
- Populated AllowedTools: Only listed tools can execute
- Blocked tools return an error result (not a Go error)
Example blocked response:
result := agentic.ToolResult{
CallID: "call_001",
Error: "tool 'delete_file' is not allowed",
}
Timeout Handling ¶
Each tool execution runs with a configurable timeout:
config := agentictools.Config{
Timeout: "60s", // Per-tool execution timeout
// ...
}
The timeout is enforced via context cancellation. Tool implementations should respect ctx.Done() for proper cancellation:
func (t *SlowTool) Execute(ctx context.Context, call agentic.ToolCall) (agentic.ToolResult, error) {
select {
case <-ctx.Done():
return agentic.ToolResult{CallID: call.ID, Error: "execution cancelled"}, ctx.Err()
case result := <-t.doWork(call):
return result, nil
}
}
Timeout errors are returned as ToolResult.Error, not as Go errors.
Quick Start ¶
Configure and start the processor:
config := agentictools.Config{
StreamName: "AGENT",
AllowedTools: nil, // Allow all
Timeout: "60s",
}
rawConfig, _ := json.Marshal(config)
comp, err := agentictools.NewComponent(rawConfig, deps)
// Register tools
toolsComp := comp.(*agentictools.Component)
toolsComp.RegisterToolExecutor(&FileReader{})
toolsComp.RegisterToolExecutor(&WebFetcher{})
// Start
lc := comp.(component.LifecycleComponent)
lc.Initialize()
lc.Start(ctx)
Configuration Reference ¶
Full configuration schema:
{
"allowed_tools": ["string", ...],
"timeout": "string (default: 60s)",
"stream_name": "string (default: AGENT)",
"consumer_name_suffix": "string (optional)",
"ports": {
"inputs": [...],
"outputs": [...]
}
}
Configuration fields:
- allowed_tools: List of tool names to allow (nil/empty = allow all)
- timeout: Per-tool execution timeout as duration string (default: "60s")
- stream_name: JetStream stream name for agentic messages (default: "AGENT")
- consumer_name_suffix: Optional suffix for JetStream consumer names (for testing)
- ports: Port configuration for inputs and outputs
Ports ¶
Input ports (JetStream consumers):
- tool.execute: Tool execution requests from agentic-loop (subject: tool.execute.>)
Output ports (JetStream publishers):
- tool.result: Tool execution results to agentic-loop (subject: tool.result.*)
Message Flow ¶
The processor handles each tool call through:
- Receive ToolCall from tool.execute.>
- Validate tool is in allowlist (if configured)
- Look up executor in registry
- Create timeout context
- Execute tool with context
- Publish ToolResult to tool.result.{call_id}
- Acknowledge JetStream message
Error Handling ¶
Errors are categorized into two types:
**Tool execution errors** (returned in ToolResult.Error):
- Tool not found in registry
- Tool not in allowlist
- Tool execution failed
- Timeout exceeded
**System errors** (returned as Go error):
- JSON marshaling failures
- NATS publishing failures
Tool execution errors don't fail the loop - the agent can handle them:
if result.Error != "" {
// Agent sees: "Error: file not found"
// Agent can try alternative approach
}
Concurrent Execution ¶
The processor handles concurrent tool calls safely:
- ExecutorRegistry uses RWMutex for thread-safe access
- Each tool call gets its own goroutine
- Context cancellation propagates to all active executions
Multiple tools can execute in parallel when the loop sends concurrent calls:
// agentic-loop sends two tool calls tool.execute.read_file → executes tool.execute.query_db → executes concurrently
Built-in Tools ¶
The package does not include built-in tools - all tools must be registered by the application. This keeps the processor focused and allows full control over available capabilities.
Common tools to implement:
- File operations: read_file, write_file, list_dir
- Web operations: fetch_url, call_api
- Database operations: query, insert, update
- Graph operations: graph_query (query knowledge graph)
Thread Safety ¶
The Component is safe for concurrent use after Start() is called:
- ExecutorRegistry uses RWMutex for all operations
- Tool registration should complete before Start()
- Multiple tool calls can execute concurrently
Testing ¶
For testing, use mock executors and unique consumer names:
type MockExecutor struct {
result agentic.ToolResult
}
func (m *MockExecutor) Execute(ctx context.Context, call agentic.ToolCall) (agentic.ToolResult, error) {
m.result.CallID = call.ID
return m.result, nil
}
func (m *MockExecutor) ListTools() []agentic.ToolDefinition {
return []agentic.ToolDefinition{{Name: "mock_tool"}}
}
// In test
config := agentictools.Config{
StreamName: "AGENT",
ConsumerNameSuffix: "test-" + t.Name(),
}
Limitations ¶
Current limitations:
- No tool versioning (single version per name)
- No tool dependencies or ordering
- No streaming tool output
- No built-in rate limiting per tool
- Timeout is global, not per-tool configurable
See Also ¶
Related packages:
- agentic: Shared types (ToolCall, ToolResult, ToolDefinition)
- processor/agentic-loop: Loop orchestration
- processor/agentic-model: LLM endpoint integration
JetStream Integration ¶
All messaging uses JetStream for durability. Tool call subjects require the AGENT stream to exist with subjects matching tool.execute.> and tool.result.>.
Consumer naming follows the pattern: agentic-tools-{subject-pattern}
Package agentictools provides Prometheus metrics for agentic-tools component.
Index ¶
- func ConsumerNameForTool(toolName string) string
- func ListRegisteredTools() []agentic.ToolDefinition
- func NewComponent(rawConfig json.RawMessage, deps component.Dependencies) (component.Discoverable, error)
- func Register(registry RegistryInterface) error
- func RegisterTool(name string, executor ToolExecutor) error
- type Component
- func (c *Component) ConfigSchema() component.ConfigSchema
- func (c *Component) DataFlow() component.FlowMetrics
- func (c *Component) Execute(ctx context.Context, call agentic.ToolCall) (agentic.ToolResult, error)
- func (c *Component) Health() component.HealthStatus
- func (c *Component) Initialize() error
- func (c *Component) InputPorts() []component.Port
- func (c *Component) ListTools() []ToolDefinition
- func (c *Component) Meta() component.Metadata
- func (c *Component) OutputPorts() []component.Port
- func (c *Component) RegisterToolExecutor(executor ToolExecutor) error
- func (c *Component) Start(ctx context.Context) error
- func (c *Component) Stop(timeout time.Duration) error
- type Config
- type ExecutorRegistry
- func (r *ExecutorRegistry) Execute(ctx context.Context, call agentic.ToolCall) (agentic.ToolResult, error)
- func (r *ExecutorRegistry) GetTool(name string) ToolExecutor
- func (r *ExecutorRegistry) ListTools() []agentic.ToolDefinition
- func (r *ExecutorRegistry) RegisterTool(name string, executor ToolExecutor) error
- type InMemoryToolCallStore
- type KVToolCallStore
- type RecordingExecutor
- type RegistryInterface
- type ToolCallRecord
- type ToolCallStore
- type ToolDefinition
- type ToolExecutor
- type ToolListResponse
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func ConsumerNameForTool ¶
ConsumerNameForTool generates a JetStream consumer name for a tool. Sanitizes dots and underscores to dashes, adds "tool-exec-" prefix.
Examples:
file_read → tool-exec-file-read graph.query → tool-exec-graph-query
func ListRegisteredTools ¶
func ListRegisteredTools() []agentic.ToolDefinition
ListRegisteredTools returns all globally registered tool definitions.
func NewComponent ¶
func NewComponent(rawConfig json.RawMessage, deps component.Dependencies) (component.Discoverable, error)
NewComponent creates a new agentic-tools processor component
func Register ¶
func Register(registry RegistryInterface) error
Register registers the agentic-tools processor component with the given registry
func RegisterTool ¶
func RegisterTool(name string, executor ToolExecutor) error
RegisterTool registers a tool executor globally via init(). Thread-safe and can be called from any package's init() function.
Example usage:
func init() {
agentictools.RegisterTool("my_tool", &MyToolExecutor{})
}
Types ¶
type Component ¶
type Component struct {
// contains filtered or unexported fields
}
Component implements the agentic-tools processor
func (*Component) ConfigSchema ¶
func (c *Component) ConfigSchema() component.ConfigSchema
ConfigSchema returns the configuration schema
func (*Component) DataFlow ¶
func (c *Component) DataFlow() component.FlowMetrics
DataFlow returns current data flow metrics
func (*Component) Health ¶
func (c *Component) Health() component.HealthStatus
Health returns the current health status
func (*Component) Initialize ¶
Initialize prepares the component (no-op for this component)
func (*Component) InputPorts ¶
InputPorts returns configured input port definitions
func (*Component) ListTools ¶
func (c *Component) ListTools() []ToolDefinition
ListTools returns all tool definitions for discovery. Combines tools from both the component's local registry and the global registry.
func (*Component) OutputPorts ¶
OutputPorts returns configured output port definitions
func (*Component) RegisterToolExecutor ¶
func (c *Component) RegisterToolExecutor(executor ToolExecutor) error
RegisterToolExecutor registers a tool executor with the component This method extracts all tools from the executor and registers them individually
type Config ¶
type Config struct {
Ports *component.PortConfig `json:"ports" schema:"type:ports,description:Port configuration,category:basic"`
StreamName string `` /* 132-byte string literal not displayed */
ConsumerNameSuffix string `` /* 127-byte string literal not displayed */
DeleteConsumerOnStop bool `` /* 157-byte string literal not displayed */
Timeout string `json:"timeout" schema:"type:string,description:Tool execution timeout,category:advanced,default:60s"`
AllowedTools []string `json:"allowed_tools" schema:"type:array,description:List of allowed tools (nil/empty allows all),category:advanced"`
}
Config holds configuration for agentic-tools processor component
func DefaultConfig ¶
func DefaultConfig() Config
DefaultConfig returns default configuration for agentic-tools processor
type ExecutorRegistry ¶
type ExecutorRegistry struct {
// contains filtered or unexported fields
}
ExecutorRegistry manages tool executors and provides thread-safe registration and execution
func GetGlobalRegistry ¶
func GetGlobalRegistry() *ExecutorRegistry
GetGlobalRegistry returns the global tool registry. Used by agentic-tools component to access registered tools.
func NewExecutorRegistry ¶
func NewExecutorRegistry() *ExecutorRegistry
NewExecutorRegistry creates a new empty executor registry
func (*ExecutorRegistry) Execute ¶
func (r *ExecutorRegistry) Execute(ctx context.Context, call agentic.ToolCall) (agentic.ToolResult, error)
Execute executes a tool call using the registered executor Returns an error result if the tool is not found or execution fails
func (*ExecutorRegistry) GetTool ¶
func (r *ExecutorRegistry) GetTool(name string) ToolExecutor
GetTool retrieves a tool executor by name Returns nil if the tool is not registered
func (*ExecutorRegistry) ListTools ¶
func (r *ExecutorRegistry) ListTools() []agentic.ToolDefinition
ListTools returns all registered tool definitions Returns an empty slice (not nil) when no tools are registered
func (*ExecutorRegistry) RegisterTool ¶
func (r *ExecutorRegistry) RegisterTool(name string, executor ToolExecutor) error
RegisterTool registers a tool executor with the given name Returns an error if a tool with the same name is already registered
type InMemoryToolCallStore ¶
type InMemoryToolCallStore struct {
// contains filtered or unexported fields
}
InMemoryToolCallStore provides an in-memory implementation of ToolCallStore for testing purposes.
func NewInMemoryToolCallStore ¶
func NewInMemoryToolCallStore() *InMemoryToolCallStore
NewInMemoryToolCallStore creates a new in-memory store for testing.
func (*InMemoryToolCallStore) Close ¶
func (s *InMemoryToolCallStore) Close() error
Close marks the store as closed.
func (*InMemoryToolCallStore) Records ¶
func (s *InMemoryToolCallStore) Records() []ToolCallRecord
Records returns all stored records. For testing only.
func (*InMemoryToolCallStore) Reset ¶
func (s *InMemoryToolCallStore) Reset()
Reset clears all records and reopens the store. For testing only.
func (*InMemoryToolCallStore) Store ¶
func (s *InMemoryToolCallStore) Store(_ context.Context, record ToolCallRecord) error
Store adds a record to the in-memory store.
type KVToolCallStore ¶
type KVToolCallStore struct {
// contains filtered or unexported fields
}
KVToolCallStore implements ToolCallStore using NATS KV. It supports lazy initialization with retry logic and can be reset to allow re-initialization after failures.
func NewKVToolCallStore ¶
func NewKVToolCallStore(client *natsclient.Client, bucketName string) *KVToolCallStore
NewKVToolCallStore creates a new KV-backed tool call store. Initialization is deferred until the first Store call.
func (*KVToolCallStore) Close ¶
func (s *KVToolCallStore) Close() error
Close marks the store as closed. The underlying KV connection is managed by the natsclient and should not be closed here.
func (*KVToolCallStore) Reset ¶
func (s *KVToolCallStore) Reset()
Reset allows re-initialization after a failure. This is useful for testing or recovery scenarios.
func (*KVToolCallStore) Store ¶
func (s *KVToolCallStore) Store(ctx context.Context, record ToolCallRecord) error
Store persists a tool call record to the KV store. The key format is "{call_id}.{timestamp_ns}" to allow multiple records per call ID (e.g., retries).
type RecordingExecutor ¶
type RecordingExecutor struct {
// contains filtered or unexported fields
}
RecordingExecutor wraps a ToolExecutor and records all calls to a store. It uses a buffered channel and background goroutine for non-blocking recording.
func NewRecordingExecutor ¶
func NewRecordingExecutor(wrapped ToolExecutor, store ToolCallStore, logger *slog.Logger) *RecordingExecutor
NewRecordingExecutor creates a new recording executor that wraps the given executor and records all calls to the provided store.
func (*RecordingExecutor) Execute ¶
func (r *RecordingExecutor) Execute(ctx context.Context, call agentic.ToolCall) (agentic.ToolResult, error)
Execute executes the wrapped tool and records the call
func (*RecordingExecutor) ListTools ¶
func (r *RecordingExecutor) ListTools() []agentic.ToolDefinition
ListTools returns tools from the wrapped executor
type RegistryInterface ¶
type RegistryInterface interface {
RegisterWithConfig(component.RegistrationConfig) error
}
RegistryInterface defines the minimal interface needed for registration
type ToolCallRecord ¶
type ToolCallRecord struct {
Call agentic.ToolCall `json:"call"`
Result agentic.ToolResult `json:"result"`
StartTime time.Time `json:"start_time"`
EndTime time.Time `json:"end_time"`
Duration time.Duration `json:"duration"`
}
ToolCallRecord captures a tool call and its result for auditing
type ToolCallStore ¶
type ToolCallStore interface {
Store(ctx context.Context, record ToolCallRecord) error
Close() error
}
ToolCallStore interface for persisting tool call records
type ToolDefinition ¶
type ToolDefinition struct {
Name string `json:"name"`
Description string `json:"description"`
Provider string `json:"provider"`
Available bool `json:"available"`
}
ToolDefinition represents a tool definition for discovery responses
type ToolExecutor ¶
type ToolExecutor interface {
Execute(ctx context.Context, call agentic.ToolCall) (agentic.ToolResult, error)
ListTools() []agentic.ToolDefinition
}
ToolExecutor defines the interface for tool executors
type ToolListResponse ¶
type ToolListResponse struct {
Tools []ToolDefinition `json:"tools"`
}
ToolListResponse represents the response to a tool.list request