agentictools

package
v1.0.0-alpha.12 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 6, 2026 License: MIT Imports: 17 Imported by: 0

README

agentic-tools

Tool execution component for the agentic processing system.

Overview

The agentic-tools component executes tool calls from the agentic loop orchestrator. It receives ToolCall messages, dispatches them to registered tool executors, and publishes ToolResult messages back. Supports tool registration, allowlist filtering, per-execution timeouts, and concurrent execution.

Architecture

┌───────────────┐     ┌────────────────┐     ┌──────────────────┐
│ agentic-loop  │────►│ agentic-tools  │────►│ Tool Executors   │
│               │     │                │     │ (your code)      │
│               │◄────│                │◄────│                  │
└───────────────┘     └────────────────┘     └──────────────────┘
  tool.execute.*        Execute()           read_file, query_db,
  tool.result.*                             call_api, etc.

Features

  • Tool Registration: Register custom tool executors at runtime
  • Allowlist Filtering: Restrict which tools can execute
  • Timeout Handling: Per-execution timeout with context cancellation
  • Concurrent Execution: Multiple tools can run in parallel

Configuration

{
  "type": "processor",
  "name": "agentic-tools",
  "enabled": true,
  "config": {
    "stream_name": "AGENT",
    "timeout": "60s",
    "allowed_tools": null,
    "ports": {
      "inputs": [
        {"name": "tool_calls", "type": "jetstream", "subject": "tool.execute.>", "stream_name": "AGENT"}
      ],
      "outputs": [
        {"name": "tool_results", "type": "jetstream", "subject": "tool.result.*", "stream_name": "AGENT"}
      ]
    }
  }
}
Configuration Options
Option Type Default Description
allowed_tools []string null Tool allowlist (null/empty = allow all)
timeout string "60s" Per-tool execution timeout
stream_name string "AGENT" JetStream stream name
consumer_name_suffix string "" Suffix for consumer names (for testing)
ports object (defaults) Port configuration
JetStream Integration

All messaging uses JetStream for durability. Tool call subjects require the AGENT stream to exist with subjects matching tool.execute.> and tool.result.>.

Consumer naming: agentic-tools-{subject-pattern}

Ports

Inputs
Name Type Subject Description
tool_calls jetstream tool.execute.> Tool calls from agentic-loop
Outputs
Name Type Subject Description
tool_results jetstream tool.result.* Tool results to agentic-loop

Tool Registration

Tools implement the ToolExecutor interface:

type ToolExecutor interface {
    Execute(ctx context.Context, call agentic.ToolCall) (agentic.ToolResult, error)
    ListTools() []agentic.ToolDefinition
}
Global Registration (Preferred)

Register tools globally via init() so they're available to all agentic-tools components:

package mytools

import agentictools "github.com/c360/semstreams/processor/agentic-tools"

func init() {
    agentictools.RegisterTool("read_file", &FileReader{})
    agentictools.RegisterTool("query_graph", &GraphQueryExecutor{})
}

This pattern matches how components and rules are registered in SemStreams. Globally registered tools are automatically available to all agentic-tools component instances.

Per-Component Registration

For component-specific tools, register after creating the component:

comp, _ := agentictools.NewComponent(rawConfig, deps)
toolsComp := comp.(*agentictools.Component)

// Register component-specific executors
toolsComp.RegisterToolExecutor(&CustomExecutor{})

// Start component
lc := comp.(component.LifecycleComponent)
lc.Initialize()
lc.Start(ctx)

Local tools take precedence over global tools with the same name.

Example Implementation
type FileReader struct{}

func (f *FileReader) Execute(ctx context.Context, call agentic.ToolCall) (agentic.ToolResult, error) {
    var args struct {
        Path string `json:"path"`
    }
    if err := json.Unmarshal([]byte(call.Arguments), &args); err != nil {
        return agentic.ToolResult{CallID: call.ID, Error: err.Error()}, nil
    }

    // Respect context cancellation
    select {
    case <-ctx.Done():
        return agentic.ToolResult{CallID: call.ID, Error: "cancelled"}, ctx.Err()
    default:
    }

    content, err := os.ReadFile(args.Path)
    if err != nil {
        return agentic.ToolResult{CallID: call.ID, Error: err.Error()}, nil
    }

    return agentic.ToolResult{CallID: call.ID, Content: string(content)}, nil
}

func (f *FileReader) ListTools() []agentic.ToolDefinition {
    return []agentic.ToolDefinition{{
        Name:        "read_file",
        Description: "Read the contents of a file",
        Parameters: map[string]any{
            "type": "object",
            "properties": map[string]any{
                "path": map[string]any{"type": "string", "description": "File path"},
            },
            "required": []string{"path"},
        },
    }}
}

Tool Allowlist

Control which tools can execute:

{
  "allowed_tools": ["read_file", "list_dir", "query_graph"]
}
Config Behavior
null or [] All registered tools allowed
["tool1", "tool2"] Only listed tools allowed

Blocked tools return an error result (not a Go error):

{
  "call_id": "call_001",
  "error": "tool 'delete_file' is not allowed"
}

Message Formats

ToolCall (Input)
{
  "id": "call_001",
  "name": "read_file",
  "arguments": {
    "path": "/etc/hosts"
  }
}
ToolResult (Output)
{
  "call_id": "call_001",
  "content": "127.0.0.1 localhost\n...",
  "error": "",
  "metadata": {}
}

Common Tools to Implement

Tool Description
read_file Read file contents
write_file Write content to file
list_dir List directory contents
fetch_url HTTP GET request
call_api Generic HTTP request
query_graph Query knowledge graph
run_command Execute shell command

Troubleshooting

Tool not found
  • Verify tool executor is registered before Start()
  • Check tool name matches exactly (case-sensitive)
  • Ensure ListTools() returns the correct name
Tool timeout
  • Increase timeout for long-running operations
  • Implement context cancellation in executor
  • Check for blocking operations
Tool blocked by allowlist
  • Add tool name to allowed_tools array
  • Set allowed_tools: null to allow all
  • Verify tool name spelling
Concurrent execution issues
  • Ensure tool executor is thread-safe
  • Don't share mutable state between calls
  • Use proper synchronization if needed

Documentation

Overview

Package agentictools provides a tool executor processor component that routes tool calls to registered tool executors with filtering and timeout support.

Package agentictools provides the tool execution processor for the SemStreams agentic system.

Overview

The agentic-tools processor executes tool calls from the agentic loop orchestrator. It receives ToolCall messages, dispatches them to registered tool executors, and publishes ToolResult messages back. The processor supports tool registration, allowlist filtering, per-execution timeouts, and concurrent execution.

This processor enables agents to interact with external systems (files, APIs, databases, etc.) through a well-defined tool interface.

Architecture

The tools processor sits between the loop orchestrator and tool implementations:

┌───────────────┐     ┌────────────────┐     ┌──────────────────┐
│ agentic-loop  │────▶│ agentic-tools  │────▶│ Tool Executors   │
│               │     │ (this pkg)     │     │ (your code)      │
│               │◀────│                │◀────│                  │
└───────────────┘     └────────────────┘     └──────────────────┘
  tool.execute.*        Execute()           read_file, query_db,
  tool.result.*                             call_api, etc.

ToolExecutor Interface

Tools are implemented by satisfying the ToolExecutor interface:

type ToolExecutor interface {
    Execute(ctx context.Context, call agentic.ToolCall) (agentic.ToolResult, error)
    ListTools() []agentic.ToolDefinition
}

Example implementation:

type FileReader struct{}

func (f *FileReader) Execute(ctx context.Context, call agentic.ToolCall) (agentic.ToolResult, error) {
    path, _ := call.Arguments["path"].(string)

    // Respect context cancellation
    select {
    case <-ctx.Done():
        return agentic.ToolResult{CallID: call.ID, Error: "cancelled"}, ctx.Err()
    default:
    }

    content, err := os.ReadFile(path)
    if err != nil {
        return agentic.ToolResult{
            CallID: call.ID,
            Error:  err.Error(),
        }, nil
    }

    return agentic.ToolResult{
        CallID:  call.ID,
        Content: string(content),
    }, nil
}

func (f *FileReader) ListTools() []agentic.ToolDefinition {
    return []agentic.ToolDefinition{
        {
            Name:        "read_file",
            Description: "Read the contents of a file",
            Parameters: map[string]any{
                "type": "object",
                "properties": map[string]any{
                    "path": map[string]any{
                        "type":        "string",
                        "description": "Path to the file",
                    },
                },
                "required": []string{"path"},
            },
        },
    }
}

Tool Registration

Tools can be registered in two ways:

1. Global registration via init() (preferred for reusable tools):

func init() {
    agentictools.RegisterTool("my_tool", &MyToolExecutor{})
}

Global registration makes tools available to all agentic-tools components automatically. This pattern matches how components and rules are registered.

2. Per-component registration (for component-specific tools):

comp, _ := agentictools.NewComponent(rawConfig, deps)
toolsComp := comp.(*agentictools.Component)
err := toolsComp.RegisterToolExecutor(&FileReader{})

The processor extracts tool names from ListTools() for routing and validation.

ExecutorRegistry

The ExecutorRegistry provides thread-safe tool management:

registry := NewExecutorRegistry()

// Register tools
registry.RegisterTool("read_file", &FileReader{})
registry.RegisterTool("query_db", &DatabaseQuerier{})

// Get executor by name
executor := registry.GetTool("read_file")

// List all available tools
tools := registry.ListTools()

// Execute a tool call
result, err := registry.Execute(ctx, toolCall)

The registry prevents duplicate registrations and returns descriptive errors for missing tools.

Tool Allowlist

The processor supports allowlist filtering for security and control:

config := agentictools.Config{
    AllowedTools: []string{"read_file", "list_dir"},  // Only these allowed
    // ...
}

Behavior:

  • Empty/nil AllowedTools: All registered tools are allowed
  • Populated AllowedTools: Only listed tools can execute
  • Blocked tools return an error result (not a Go error)

Example blocked response:

result := agentic.ToolResult{
    CallID: "call_001",
    Error:  "tool 'delete_file' is not allowed",
}

Timeout Handling

Each tool execution runs with a configurable timeout:

config := agentictools.Config{
    Timeout: "60s",  // Per-tool execution timeout
    // ...
}

The timeout is enforced via context cancellation. Tool implementations should respect ctx.Done() for proper cancellation:

func (t *SlowTool) Execute(ctx context.Context, call agentic.ToolCall) (agentic.ToolResult, error) {
    select {
    case <-ctx.Done():
        return agentic.ToolResult{CallID: call.ID, Error: "execution cancelled"}, ctx.Err()
    case result := <-t.doWork(call):
        return result, nil
    }
}

Timeout errors are returned as ToolResult.Error, not as Go errors.

Quick Start

Configure and start the processor:

config := agentictools.Config{
    StreamName:   "AGENT",
    AllowedTools: nil,  // Allow all
    Timeout:      "60s",
}

rawConfig, _ := json.Marshal(config)
comp, err := agentictools.NewComponent(rawConfig, deps)

// Register tools
toolsComp := comp.(*agentictools.Component)
toolsComp.RegisterToolExecutor(&FileReader{})
toolsComp.RegisterToolExecutor(&WebFetcher{})

// Start
lc := comp.(component.LifecycleComponent)
lc.Initialize()
lc.Start(ctx)

Configuration Reference

Full configuration schema:

{
    "allowed_tools": ["string", ...],
    "timeout": "string (default: 60s)",
    "stream_name": "string (default: AGENT)",
    "consumer_name_suffix": "string (optional)",
    "ports": {
        "inputs": [...],
        "outputs": [...]
    }
}

Configuration fields:

  • allowed_tools: List of tool names to allow (nil/empty = allow all)
  • timeout: Per-tool execution timeout as duration string (default: "60s")
  • stream_name: JetStream stream name for agentic messages (default: "AGENT")
  • consumer_name_suffix: Optional suffix for JetStream consumer names (for testing)
  • ports: Port configuration for inputs and outputs

Ports

Input ports (JetStream consumers):

  • tool.execute: Tool execution requests from agentic-loop (subject: tool.execute.>)

Output ports (JetStream publishers):

  • tool.result: Tool execution results to agentic-loop (subject: tool.result.*)

Message Flow

The processor handles each tool call through:

  1. Receive ToolCall from tool.execute.>
  2. Validate tool is in allowlist (if configured)
  3. Look up executor in registry
  4. Create timeout context
  5. Execute tool with context
  6. Publish ToolResult to tool.result.{call_id}
  7. Acknowledge JetStream message

Error Handling

Errors are categorized into two types:

**Tool execution errors** (returned in ToolResult.Error):

  • Tool not found in registry
  • Tool not in allowlist
  • Tool execution failed
  • Timeout exceeded

**System errors** (returned as Go error):

  • JSON marshaling failures
  • NATS publishing failures

Tool execution errors don't fail the loop - the agent can handle them:

if result.Error != "" {
    // Agent sees: "Error: file not found"
    // Agent can try alternative approach
}

Concurrent Execution

The processor handles concurrent tool calls safely:

  • ExecutorRegistry uses RWMutex for thread-safe access
  • Each tool call gets its own goroutine
  • Context cancellation propagates to all active executions

Multiple tools can execute in parallel when the loop sends concurrent calls:

// agentic-loop sends two tool calls
tool.execute.read_file  → executes
tool.execute.query_db   → executes concurrently

Built-in Tools

The package does not include built-in tools - all tools must be registered by the application. This keeps the processor focused and allows full control over available capabilities.

Common tools to implement:

  • File operations: read_file, write_file, list_dir
  • Web operations: fetch_url, call_api
  • Database operations: query, insert, update
  • Graph operations: graph_query (query knowledge graph)

Thread Safety

The Component is safe for concurrent use after Start() is called:

  • ExecutorRegistry uses RWMutex for all operations
  • Tool registration should complete before Start()
  • Multiple tool calls can execute concurrently

Testing

For testing, use mock executors and unique consumer names:

type MockExecutor struct {
    result agentic.ToolResult
}

func (m *MockExecutor) Execute(ctx context.Context, call agentic.ToolCall) (agentic.ToolResult, error) {
    m.result.CallID = call.ID
    return m.result, nil
}

func (m *MockExecutor) ListTools() []agentic.ToolDefinition {
    return []agentic.ToolDefinition{{Name: "mock_tool"}}
}

// In test
config := agentictools.Config{
    StreamName:         "AGENT",
    ConsumerNameSuffix: "test-" + t.Name(),
}

Limitations

Current limitations:

  • No tool versioning (single version per name)
  • No tool dependencies or ordering
  • No streaming tool output
  • No built-in rate limiting per tool
  • Timeout is global, not per-tool configurable

See Also

Related packages:

  • agentic: Shared types (ToolCall, ToolResult, ToolDefinition)
  • processor/agentic-loop: Loop orchestration
  • processor/agentic-model: LLM endpoint integration

JetStream Integration

All messaging uses JetStream for durability. Tool call subjects require the AGENT stream to exist with subjects matching tool.execute.> and tool.result.>.

Consumer naming follows the pattern: agentic-tools-{subject-pattern}

Package agentictools provides Prometheus metrics for agentic-tools component.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func ConsumerNameForTool

func ConsumerNameForTool(toolName string) string

ConsumerNameForTool generates a JetStream consumer name for a tool. Sanitizes dots and underscores to dashes, adds "tool-exec-" prefix.

Examples:

file_read → tool-exec-file-read
graph.query → tool-exec-graph-query

func ListRegisteredTools

func ListRegisteredTools() []agentic.ToolDefinition

ListRegisteredTools returns all globally registered tool definitions.

func NewComponent

func NewComponent(rawConfig json.RawMessage, deps component.Dependencies) (component.Discoverable, error)

NewComponent creates a new agentic-tools processor component

func Register

func Register(registry RegistryInterface) error

Register registers the agentic-tools processor component with the given registry

func RegisterTool

func RegisterTool(name string, executor ToolExecutor) error

RegisterTool registers a tool executor globally via init(). Thread-safe and can be called from any package's init() function.

Example usage:

func init() {
    agentictools.RegisterTool("my_tool", &MyToolExecutor{})
}

Types

type Component

type Component struct {
	// contains filtered or unexported fields
}

Component implements the agentic-tools processor

func (*Component) ConfigSchema

func (c *Component) ConfigSchema() component.ConfigSchema

ConfigSchema returns the configuration schema

func (*Component) DataFlow

func (c *Component) DataFlow() component.FlowMetrics

DataFlow returns current data flow metrics

func (*Component) Execute

func (c *Component) Execute(ctx context.Context, call agentic.ToolCall) (agentic.ToolResult, error)

Execute executes a tool call (for testing and direct invocation)

func (*Component) Health

func (c *Component) Health() component.HealthStatus

Health returns the current health status

func (*Component) Initialize

func (c *Component) Initialize() error

Initialize prepares the component (no-op for this component)

func (*Component) InputPorts

func (c *Component) InputPorts() []component.Port

InputPorts returns configured input port definitions

func (*Component) ListTools

func (c *Component) ListTools() []ToolDefinition

ListTools returns all tool definitions for discovery. Combines tools from both the component's local registry and the global registry.

func (*Component) Meta

func (c *Component) Meta() component.Metadata

Meta returns component metadata

func (*Component) OutputPorts

func (c *Component) OutputPorts() []component.Port

OutputPorts returns configured output port definitions

func (*Component) RegisterToolExecutor

func (c *Component) RegisterToolExecutor(executor ToolExecutor) error

RegisterToolExecutor registers a tool executor with the component This method extracts all tools from the executor and registers them individually

func (*Component) Start

func (c *Component) Start(ctx context.Context) error

Start begins processing tool calls

func (*Component) Stop

func (c *Component) Stop(timeout time.Duration) error

Stop gracefully stops the component within the given timeout

type Config

type Config struct {
	Ports                *component.PortConfig `json:"ports"                schema:"type:ports,description:Port configuration,category:basic"`
	StreamName           string                `` /* 132-byte string literal not displayed */
	ConsumerNameSuffix   string                `` /* 127-byte string literal not displayed */
	DeleteConsumerOnStop bool                  `` /* 157-byte string literal not displayed */
	Timeout              string                `json:"timeout"              schema:"type:string,description:Tool execution timeout,category:advanced,default:60s"`
	AllowedTools         []string              `json:"allowed_tools"        schema:"type:array,description:List of allowed tools (nil/empty allows all),category:advanced"`
}

Config holds configuration for agentic-tools processor component

func DefaultConfig

func DefaultConfig() Config

DefaultConfig returns default configuration for agentic-tools processor

func (*Config) Validate

func (c *Config) Validate() error

Validate checks the configuration for errors

type ExecutorRegistry

type ExecutorRegistry struct {
	// contains filtered or unexported fields
}

ExecutorRegistry manages tool executors and provides thread-safe registration and execution

func GetGlobalRegistry

func GetGlobalRegistry() *ExecutorRegistry

GetGlobalRegistry returns the global tool registry. Used by agentic-tools component to access registered tools.

func NewExecutorRegistry

func NewExecutorRegistry() *ExecutorRegistry

NewExecutorRegistry creates a new empty executor registry

func (*ExecutorRegistry) Execute

Execute executes a tool call using the registered executor Returns an error result if the tool is not found or execution fails

func (*ExecutorRegistry) GetTool

func (r *ExecutorRegistry) GetTool(name string) ToolExecutor

GetTool retrieves a tool executor by name Returns nil if the tool is not registered

func (*ExecutorRegistry) ListTools

func (r *ExecutorRegistry) ListTools() []agentic.ToolDefinition

ListTools returns all registered tool definitions Returns an empty slice (not nil) when no tools are registered

func (*ExecutorRegistry) RegisterTool

func (r *ExecutorRegistry) RegisterTool(name string, executor ToolExecutor) error

RegisterTool registers a tool executor with the given name Returns an error if a tool with the same name is already registered

type InMemoryToolCallStore

type InMemoryToolCallStore struct {
	// contains filtered or unexported fields
}

InMemoryToolCallStore provides an in-memory implementation of ToolCallStore for testing purposes.

func NewInMemoryToolCallStore

func NewInMemoryToolCallStore() *InMemoryToolCallStore

NewInMemoryToolCallStore creates a new in-memory store for testing.

func (*InMemoryToolCallStore) Close

func (s *InMemoryToolCallStore) Close() error

Close marks the store as closed.

func (*InMemoryToolCallStore) Records

func (s *InMemoryToolCallStore) Records() []ToolCallRecord

Records returns all stored records. For testing only.

func (*InMemoryToolCallStore) Reset

func (s *InMemoryToolCallStore) Reset()

Reset clears all records and reopens the store. For testing only.

func (*InMemoryToolCallStore) Store

Store adds a record to the in-memory store.

type KVToolCallStore

type KVToolCallStore struct {
	// contains filtered or unexported fields
}

KVToolCallStore implements ToolCallStore using NATS KV. It supports lazy initialization with retry logic and can be reset to allow re-initialization after failures.

func NewKVToolCallStore

func NewKVToolCallStore(client *natsclient.Client, bucketName string) *KVToolCallStore

NewKVToolCallStore creates a new KV-backed tool call store. Initialization is deferred until the first Store call.

func (*KVToolCallStore) Close

func (s *KVToolCallStore) Close() error

Close marks the store as closed. The underlying KV connection is managed by the natsclient and should not be closed here.

func (*KVToolCallStore) Reset

func (s *KVToolCallStore) Reset()

Reset allows re-initialization after a failure. This is useful for testing or recovery scenarios.

func (*KVToolCallStore) Store

func (s *KVToolCallStore) Store(ctx context.Context, record ToolCallRecord) error

Store persists a tool call record to the KV store. The key format is "{call_id}.{timestamp_ns}" to allow multiple records per call ID (e.g., retries).

type RecordingExecutor

type RecordingExecutor struct {
	// contains filtered or unexported fields
}

RecordingExecutor wraps a ToolExecutor and records all calls to a store. It uses a buffered channel and background goroutine for non-blocking recording.

func NewRecordingExecutor

func NewRecordingExecutor(wrapped ToolExecutor, store ToolCallStore, logger *slog.Logger) *RecordingExecutor

NewRecordingExecutor creates a new recording executor that wraps the given executor and records all calls to the provided store.

func (*RecordingExecutor) Execute

Execute executes the wrapped tool and records the call

func (*RecordingExecutor) ListTools

func (r *RecordingExecutor) ListTools() []agentic.ToolDefinition

ListTools returns tools from the wrapped executor

func (*RecordingExecutor) Stop

func (r *RecordingExecutor) Stop(timeout time.Duration) error

Stop gracefully shuts down the recording executor, waiting for pending records to be stored up to the specified timeout.

type RegistryInterface

type RegistryInterface interface {
	RegisterWithConfig(component.RegistrationConfig) error
}

RegistryInterface defines the minimal interface needed for registration

type ToolCallRecord

type ToolCallRecord struct {
	Call      agentic.ToolCall   `json:"call"`
	Result    agentic.ToolResult `json:"result"`
	StartTime time.Time          `json:"start_time"`
	EndTime   time.Time          `json:"end_time"`
	Duration  time.Duration      `json:"duration"`
}

ToolCallRecord captures a tool call and its result for auditing

type ToolCallStore

type ToolCallStore interface {
	Store(ctx context.Context, record ToolCallRecord) error
	Close() error
}

ToolCallStore interface for persisting tool call records

type ToolDefinition

type ToolDefinition struct {
	Name        string `json:"name"`
	Description string `json:"description"`
	Provider    string `json:"provider"`
	Available   bool   `json:"available"`
}

ToolDefinition represents a tool definition for discovery responses

type ToolExecutor

type ToolExecutor interface {
	Execute(ctx context.Context, call agentic.ToolCall) (agentic.ToolResult, error)
	ListTools() []agentic.ToolDefinition
}

ToolExecutor defines the interface for tool executors

type ToolListResponse

type ToolListResponse struct {
	Tools []ToolDefinition `json:"tools"`
}

ToolListResponse represents the response to a tool.list request

Directories

Path Synopsis
Package executors provides tool executor implementations for the agentic-tools component.
Package executors provides tool executor implementations for the agentic-tools component.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL