agentictools

package
v1.0.0-beta.9 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 21, 2026 License: MIT Imports: 24 Imported by: 0

README

agentic-tools

Tool execution component for the agentic processing system.

Overview

The agentic-tools component executes tool calls from the agentic loop orchestrator. It receives ToolCall messages, dispatches them to registered tool executors, and publishes ToolResult messages back. Supports tool registration, allowlist filtering, per-execution timeouts, and concurrent execution.

Architecture

┌───────────────┐     ┌────────────────┐     ┌──────────────────┐
│ agentic-loop  │────►│ agentic-tools  │────►│ Tool Executors   │
│               │     │                │     │ (your code)      │
│               │◄────│                │◄────│                  │
└───────────────┘     └────────────────┘     └──────────────────┘
  tool.execute.*        Execute()           read_file, query_db,
  tool.result.*                             call_api, etc.

Features

  • Tool Registration: Register custom tool executors at runtime
  • Allowlist Filtering: Restrict which tools can execute
  • Timeout Handling: Per-execution timeout with context cancellation
  • Concurrent Execution: Multiple tools can run in parallel

Configuration

{
  "type": "processor",
  "name": "agentic-tools",
  "enabled": true,
  "config": {
    "stream_name": "AGENT",
    "timeout": "60s",
    "allowed_tools": null,
    "ports": {
      "inputs": [
        {"name": "tool_calls", "type": "jetstream", "subject": "tool.execute.>", "stream_name": "AGENT"}
      ],
      "outputs": [
        {"name": "tool_results", "type": "jetstream", "subject": "tool.result.*", "stream_name": "AGENT"}
      ]
    }
  }
}
Configuration Options
Option Type Default Description
allowed_tools []string null Tool allowlist (null/empty = allow all)
timeout string "60s" Per-tool execution timeout
stream_name string "AGENT" JetStream stream name
consumer_name_suffix string "" Suffix for consumer names (for testing)
ports object (defaults) Port configuration
JetStream Integration

All messaging uses JetStream for durability. Tool call subjects require the AGENT stream to exist with subjects matching tool.execute.> and tool.result.>.

Consumer naming: agentic-tools-{subject-pattern}

Ports

Inputs
Name Type Subject Description
tool_calls jetstream tool.execute.> Tool calls from agentic-loop
Outputs
Name Type Subject Description
tool_results jetstream tool.result.* Tool results to agentic-loop

Tool Registration

Tools implement the ToolExecutor interface:

type ToolExecutor interface {
    Execute(ctx context.Context, call agentic.ToolCall) (agentic.ToolResult, error)
    ListTools() []agentic.ToolDefinition
}
Global Registration (Preferred)

Register tools globally via init() so they're available to all agentic-tools components:

package mytools

import agentictools "github.com/c360/semstreams/processor/agentic-tools"

func init() {
    agentictools.RegisterTool("read_file", &FileReader{})
    agentictools.RegisterTool("query_graph", &GraphQueryExecutor{})
}

This pattern matches how components and rules are registered in SemStreams. Globally registered tools are automatically available to all agentic-tools component instances.

Per-Component Registration

For component-specific tools, register after creating the component:

comp, _ := agentictools.NewComponent(rawConfig, deps)
toolsComp := comp.(*agentictools.Component)

// Register component-specific executors
toolsComp.RegisterToolExecutor(&CustomExecutor{})

// Start component
lc := comp.(component.LifecycleComponent)
lc.Initialize()
lc.Start(ctx)

Local tools take precedence over global tools with the same name.

Example Implementation
type FileReader struct{}

func (f *FileReader) Execute(ctx context.Context, call agentic.ToolCall) (agentic.ToolResult, error) {
    var args struct {
        Path string `json:"path"`
    }
    if err := json.Unmarshal([]byte(call.Arguments), &args); err != nil {
        return agentic.ToolResult{CallID: call.ID, Error: err.Error()}, nil
    }

    // Respect context cancellation
    select {
    case <-ctx.Done():
        return agentic.ToolResult{CallID: call.ID, Error: "cancelled"}, ctx.Err()
    default:
    }

    content, err := os.ReadFile(args.Path)
    if err != nil {
        return agentic.ToolResult{CallID: call.ID, Error: err.Error()}, nil
    }

    return agentic.ToolResult{CallID: call.ID, Content: string(content)}, nil
}

func (f *FileReader) ListTools() []agentic.ToolDefinition {
    return []agentic.ToolDefinition{{
        Name:        "read_file",
        Description: "Read the contents of a file",
        Parameters: map[string]any{
            "type": "object",
            "properties": map[string]any{
                "path": map[string]any{"type": "string", "description": "File path"},
            },
            "required": []string{"path"},
        },
    }}
}

Tool Allowlist

Control which tools can execute:

{
  "allowed_tools": ["read_file", "list_dir", "query_graph"]
}
Config Behavior
null or [] All registered tools allowed
["tool1", "tool2"] Only listed tools allowed

Blocked tools return an error result (not a Go error):

{
  "call_id": "call_001",
  "error": "tool 'delete_file' is not allowed"
}

Message Formats

ToolCall (Input)
{
  "id": "call_001",
  "name": "read_file",
  "arguments": {
    "path": "/etc/hosts"
  }
}
ToolResult (Output)
{
  "call_id": "call_001",
  "content": "127.0.0.1 localhost\n...",
  "error": "",
  "metadata": {}
}

Common Tools to Implement

Tool Description
read_file Read file contents
write_file Write content to file
list_dir List directory contents
fetch_url HTTP GET request
call_api Generic HTTP request
query_graph Query knowledge graph
run_command Execute shell command

Troubleshooting

Tool not found
  • Verify tool executor is registered before Start()
  • Check tool name matches exactly (case-sensitive)
  • Ensure ListTools() returns the correct name
Tool timeout
  • Increase timeout for long-running operations
  • Implement context cancellation in executor
  • Check for blocking operations
Tool blocked by allowlist
  • Add tool name to allowed_tools array
  • Set allowed_tools: null to allow all
  • Verify tool name spelling
Concurrent execution issues
  • Ensure tool executor is thread-safe
  • Don't share mutable state between calls
  • Use proper synchronization if needed

Documentation

Overview

Package agentictools provides a tool executor processor component that routes tool calls to registered tool executors with filtering and timeout support.

Package agentictools provides the tool execution processor for the SemStreams agentic system.

Overview

The agentic-tools processor executes tool calls from the agentic loop orchestrator. It receives ToolCall messages, dispatches them to registered tool executors, and publishes ToolResult messages back. The processor supports tool registration, allowlist filtering, per-execution timeouts, and concurrent execution.

This processor enables agents to interact with external systems (files, APIs, databases, etc.) through a well-defined tool interface.

Architecture

The tools processor sits between the loop orchestrator and tool implementations:

┌───────────────┐     ┌────────────────┐     ┌──────────────────┐
│ agentic-loop  │────▶│ agentic-tools  │────▶│ Tool Executors   │
│               │     │ (this pkg)     │     │ (your code)      │
│               │◀────│                │◀────│                  │
└───────────────┘     └────────────────┘     └──────────────────┘
  tool.execute.*        Execute()           read_file, query_db,
  tool.result.*                             call_api, etc.

ToolExecutor Interface

Tools are implemented by satisfying the ToolExecutor interface:

type ToolExecutor interface {
    Execute(ctx context.Context, call agentic.ToolCall) (agentic.ToolResult, error)
    ListTools() []agentic.ToolDefinition
}

Example implementation:

type FileReader struct{}

func (f *FileReader) Execute(ctx context.Context, call agentic.ToolCall) (agentic.ToolResult, error) {
    path, _ := call.Arguments["path"].(string)

    // Respect context cancellation
    select {
    case <-ctx.Done():
        return agentic.ToolResult{CallID: call.ID, Error: "cancelled"}, ctx.Err()
    default:
    }

    content, err := os.ReadFile(path)
    if err != nil {
        return agentic.ToolResult{
            CallID: call.ID,
            Error:  err.Error(),
        }, nil
    }

    return agentic.ToolResult{
        CallID:  call.ID,
        Content: string(content),
    }, nil
}

func (f *FileReader) ListTools() []agentic.ToolDefinition {
    return []agentic.ToolDefinition{
        {
            Name:        "read_file",
            Description: "Read the contents of a file",
            Parameters: map[string]any{
                "type": "object",
                "properties": map[string]any{
                    "path": map[string]any{
                        "type":        "string",
                        "description": "Path to the file",
                    },
                },
                "required": []string{"path"},
            },
        },
    }
}

Tool Registration

Tools can be registered in two ways:

1. Global registration via init() (preferred for reusable tools):

func init() {
    agentictools.RegisterTool("my_tool", &MyToolExecutor{})
}

Global registration makes tools available to all agentic-tools components automatically. This pattern matches how components and rules are registered.

2. Per-component registration (for component-specific tools):

comp, _ := agentictools.NewComponent(rawConfig, deps)
toolsComp := comp.(*agentictools.Component)
err := toolsComp.RegisterToolExecutor(&FileReader{})

The processor extracts tool names from ListTools() for routing and validation.

ExecutorRegistry

The ExecutorRegistry provides thread-safe tool management:

registry := NewExecutorRegistry()

// Register tools
registry.RegisterTool("read_file", &FileReader{})
registry.RegisterTool("query_db", &DatabaseQuerier{})

// Get executor by name
executor := registry.GetTool("read_file")

// List all available tools
tools := registry.ListTools()

// Execute a tool call
result, err := registry.Execute(ctx, toolCall)

The registry prevents duplicate registrations and returns descriptive errors for missing tools.

Tool Allowlist

The processor supports allowlist filtering for security and control:

config := agentictools.Config{
    AllowedTools: []string{"read_file", "list_dir"},  // Only these allowed
    // ...
}

Behavior:

  • Empty/nil AllowedTools: All registered tools are allowed
  • Populated AllowedTools: Only listed tools can execute
  • Blocked tools return an error result (not a Go error)

Example blocked response:

result := agentic.ToolResult{
    CallID: "call_001",
    Error:  "tool 'delete_file' is not allowed",
}

Timeout Handling

Each tool execution runs with a configurable timeout:

config := agentictools.Config{
    Timeout: "60s",  // Per-tool execution timeout
    // ...
}

The timeout is enforced via context cancellation. Tool implementations should respect ctx.Done() for proper cancellation:

func (t *SlowTool) Execute(ctx context.Context, call agentic.ToolCall) (agentic.ToolResult, error) {
    select {
    case <-ctx.Done():
        return agentic.ToolResult{CallID: call.ID, Error: "execution cancelled"}, ctx.Err()
    case result := <-t.doWork(call):
        return result, nil
    }
}

Timeout errors are returned as ToolResult.Error, not as Go errors.

Quick Start

Configure and start the processor:

config := agentictools.Config{
    StreamName:   "AGENT",
    AllowedTools: nil,  // Allow all
    Timeout:      "60s",
}

rawConfig, _ := json.Marshal(config)
comp, err := agentictools.NewComponent(rawConfig, deps)

// Register tools
toolsComp := comp.(*agentictools.Component)
toolsComp.RegisterToolExecutor(&FileReader{})
toolsComp.RegisterToolExecutor(&WebFetcher{})

// Start
lc := comp.(component.LifecycleComponent)
lc.Initialize()
lc.Start(ctx)

Configuration Reference

Full configuration schema:

{
    "allowed_tools": ["string", ...],
    "timeout": "string (default: 60s)",
    "stream_name": "string (default: AGENT)",
    "consumer_name_suffix": "string (optional)",
    "ports": {
        "inputs": [...],
        "outputs": [...]
    }
}

Configuration fields:

  • allowed_tools: List of tool names to allow (nil/empty = allow all)
  • timeout: Per-tool execution timeout as duration string (default: "60s")
  • stream_name: JetStream stream name for agentic messages (default: "AGENT")
  • consumer_name_suffix: Optional suffix for JetStream consumer names (for testing)
  • ports: Port configuration for inputs and outputs

Ports

Input ports (JetStream consumers):

  • tool.execute: Tool execution requests from agentic-loop (subject: tool.execute.>)

Output ports (JetStream publishers):

  • tool.result: Tool execution results to agentic-loop (subject: tool.result.*)

Message Flow

The processor handles each tool call through:

  1. Receive ToolCall from tool.execute.>
  2. Validate tool is in allowlist (if configured)
  3. Look up executor in registry
  4. Create timeout context
  5. Execute tool with context
  6. Publish ToolResult to tool.result.{call_id}
  7. Acknowledge JetStream message

Error Handling

Errors are categorized into two types:

**Tool execution errors** (returned in ToolResult.Error):

  • Tool not found in registry
  • Tool not in allowlist
  • Tool execution failed
  • Timeout exceeded

**System errors** (returned as Go error):

  • JSON marshaling failures
  • NATS publishing failures

Tool execution errors don't fail the loop - the agent can handle them:

if result.Error != "" {
    // Agent sees: "Error: file not found"
    // Agent can try alternative approach
}

Concurrent Execution

The processor handles concurrent tool calls safely:

  • ExecutorRegistry uses RWMutex for thread-safe access
  • Each tool call gets its own goroutine
  • Context cancellation propagates to all active executions

Multiple tools can execute in parallel when the loop sends concurrent calls:

// agentic-loop sends two tool calls
tool.execute.read_file  → executes
tool.execute.query_db   → executes concurrently

Built-in Tools

The package does not include built-in tools - all tools must be registered by the application. This keeps the processor focused and allows full control over available capabilities.

Common tools to implement:

  • File operations: read_file, write_file, list_dir
  • Web operations: fetch_url, call_api
  • Database operations: query, insert, update
  • Graph operations: graph_query (query knowledge graph)

Thread Safety

The Component is safe for concurrent use after Start() is called:

  • ExecutorRegistry uses RWMutex for all operations
  • Tool registration should complete before Start()
  • Multiple tool calls can execute concurrently

Testing

For testing, use mock executors and unique consumer names:

type MockExecutor struct {
    result agentic.ToolResult
}

func (m *MockExecutor) Execute(ctx context.Context, call agentic.ToolCall) (agentic.ToolResult, error) {
    m.result.CallID = call.ID
    return m.result, nil
}

func (m *MockExecutor) ListTools() []agentic.ToolDefinition {
    return []agentic.ToolDefinition{{Name: "mock_tool"}}
}

// In test
config := agentictools.Config{
    StreamName:         "AGENT",
    ConsumerNameSuffix: "test-" + t.Name(),
}

Limitations

Current limitations:

  • No tool versioning (single version per name)
  • No tool dependencies or ordering
  • No streaming tool output
  • No built-in rate limiting per tool
  • Timeout is global, not per-tool configurable

See Also

Related packages:

  • agentic: Shared types (ToolCall, ToolResult, ToolDefinition)
  • processor/agentic-loop: Loop orchestration
  • processor/agentic-model: LLM endpoint integration

JetStream Integration

All messaging uses JetStream for durability. Tool call subjects require the AGENT stream to exist with subjects matching tool.execute.> and tool.result.>.

Consumer naming follows the pattern: agentic-tools-{subject-pattern}

Package agentictools provides Prometheus metrics for agentic-tools component.

Index

Constants

View Source
const ApprovalRequiredPrefix = "approval_required: "

ApprovalRequiredPrefix is prepended to rejection reasons when a tool requires human approval. The agentic-loop detects this prefix and transitions the loop to LoopStateAwaitingApproval instead of storing a normal error result.

View Source
const ComponentCatalogToolName = "list_components"

ComponentCatalogToolName is the tool name agents use to invoke list_components.

View Source
const DecideToolName = "decide"

DecideToolName is the name agents use to invoke the coordinator's terminal decision tool.

View Source
const EmitDiagnosisToolName = "emit_diagnosis"

EmitDiagnosisToolName is the name agents use to invoke the ops agent's diagnosis emission tool.

View Source
const FlowMonitorToolName = "monitor_flow"

FlowMonitorToolName is the tool name agents use to invoke monitor_flow.

View Source
const ReadLoopResultToolName = "read_loop_result"

ReadLoopResultToolName is the name agents use to invoke the read_loop_result tool.

Variables

This section is empty.

Functions

func ConsumerNameForTool

func ConsumerNameForTool(toolName string) string

ConsumerNameForTool generates a JetStream consumer name for a tool. Sanitizes dots and underscores to dashes, adds "tool-exec-" prefix.

Examples:

file_read → tool-exec-file-read
graph.query → tool-exec-graph-query

func IsApprovalRequired

func IsApprovalRequired(reason string) bool

IsApprovalRequired checks whether a rejection reason indicates the tool needs human approval rather than being a genuine error.

func ListRegisteredTools

func ListRegisteredTools() []agentic.ToolDefinition

ListRegisteredTools returns all globally registered tool definitions.

func NewComponent

func NewComponent(rawConfig json.RawMessage, deps component.Dependencies) (component.Discoverable, error)

NewComponent creates a new agentic-tools processor component

func ReadOnlyCategories

func ReadOnlyCategories() map[ToolCategory]bool

ReadOnlyCategories returns the set of categories safe for explore sub-agents (no write operations, no orchestration, no meta-programming).

func Register

func Register(registry RegistryInterface) error

Register registers the agentic-tools processor component with the given registry

func RegisterTool

func RegisterTool(name string, executor ToolExecutor) error

RegisterTool registers a tool executor globally via init(). Thread-safe and can be called from any package's init() function.

Example usage:

func init() {
    agentictools.RegisterTool("my_tool", &MyToolExecutor{})
}

func RegisterToolCategory

func RegisterToolCategory(toolName string, category ToolCategory)

RegisterToolCategory registers a category for a tool name. This allows executor implementations to declare their category at registration time.

Types

type ApprovalFilter

type ApprovalFilter struct {
	// contains filtered or unexported fields
}

ApprovalFilter implements agentic.ToolCallFilter. It checks each tool call against a configured list of tool names that require human approval. If a tool is in the list, the call is rejected with an ApprovalRequiredPrefix reason so the loop can transition to awaiting_approval.

func NewApprovalFilter

func NewApprovalFilter(approvalRequired []string) *ApprovalFilter

NewApprovalFilter creates a filter from the configured list of tool names that require approval.

func (*ApprovalFilter) FilterToolCalls

func (f *ApprovalFilter) FilterToolCalls(_ string, calls []agentic.ToolCall) (agentic.ToolCallFilterResult, error)

FilterToolCalls checks each call against the configured approval list.

type Component

type Component struct {
	// contains filtered or unexported fields
}

Component implements the agentic-tools processor

func (*Component) ConfigSchema

func (c *Component) ConfigSchema() component.ConfigSchema

ConfigSchema returns the configuration schema

func (*Component) DataFlow

func (c *Component) DataFlow() component.FlowMetrics

DataFlow returns current data flow metrics

func (*Component) Execute

func (c *Component) Execute(ctx context.Context, call agentic.ToolCall) (agentic.ToolResult, error)

Execute executes a tool call (for testing and direct invocation)

func (*Component) Health

func (c *Component) Health() component.HealthStatus

Health returns the current health status

func (*Component) Initialize

func (c *Component) Initialize() error

Initialize prepares the component (no-op for this component)

func (*Component) InputPorts

func (c *Component) InputPorts() []component.Port

InputPorts returns configured input port definitions

func (*Component) ListTools

func (c *Component) ListTools() []ToolDefinition

ListTools returns all tool definitions for discovery. Combines tools from both the component's local registry and the global registry.

func (*Component) Meta

func (c *Component) Meta() component.Metadata

Meta returns component metadata

func (*Component) OutputPorts

func (c *Component) OutputPorts() []component.Port

OutputPorts returns configured output port definitions

func (*Component) RegisterToolExecutor

func (c *Component) RegisterToolExecutor(executor ToolExecutor) error

RegisterToolExecutor registers a tool executor with the component This method extracts all tools from the executor and registers them individually

func (*Component) Start

func (c *Component) Start(ctx context.Context) error

Start begins processing tool calls

func (*Component) Stop

func (c *Component) Stop(timeout time.Duration) error

Stop gracefully stops the component within the given timeout

type ComponentCatalogExecutor

type ComponentCatalogExecutor struct {
	// contains filtered or unexported fields
}

ComponentCatalogExecutor introspects the component factory registry and returns available component types. The optional type_filter arg narrows the response to a single entry, keeping LLM context pressure low for large registries (30+ factory types produce substantial JSON).

func NewComponentCatalogExecutor

func NewComponentCatalogExecutor(registry *component.Registry, logger *slog.Logger) *ComponentCatalogExecutor

NewComponentCatalogExecutor constructs the executor. registry must be non-nil; callers that can't provide one should skip registration entirely (see executors.registerComponentCatalog).

func (*ComponentCatalogExecutor) Execute

Execute routes the tool call to the handler.

func (*ComponentCatalogExecutor) ListTools

ListTools describes the list_components tool.

type Config

type Config struct {
	Ports                *component.PortConfig `json:"ports"                schema:"type:ports,description:Port configuration,category:basic"`
	StreamName           string                `` /* 132-byte string literal not displayed */
	ConsumerNameSuffix   string                `` /* 127-byte string literal not displayed */
	DeleteConsumerOnStop bool                  `` /* 157-byte string literal not displayed */
	Timeout              string                `json:"timeout"              schema:"type:string,description:Tool execution timeout,category:advanced,default:60s"`
	AllowedTools         []string              `json:"allowed_tools"        schema:"type:array,description:List of allowed tools (nil/empty allows all),category:advanced"`
	ApprovalRequired     []string              `` /* 137-byte string literal not displayed */
	EnableCategories     bool                  `` /* 150-byte string literal not displayed */
	LoopsBucket          string                `` /* 168-byte string literal not displayed */

	// ToolRetries is an opt-in per-tool retry policy. Tools not listed run
	// without retries. Use this for tools where transient failures (timeout,
	// external 5xx) are worth auto-retrying at the framework layer instead
	// of burning LLM iteration budget. Validation-shaped errors
	// (invalid_args, not_found) deliberately do NOT retry here by default —
	// those need LLM feedback via the agent's iteration loop.
	ToolRetries map[string]RetryPolicy `` /* 167-byte string literal not displayed */
}

Config holds configuration for agentic-tools processor component

func DefaultConfig

func DefaultConfig() Config

DefaultConfig returns default configuration for agentic-tools processor

func (*Config) Validate

func (c *Config) Validate() error

Validate checks the configuration for errors

type DecideExecutor

type DecideExecutor struct {
	// contains filtered or unexported fields
}

DecideExecutor is the coordinator's terminal tool. A coordinator agent calls decide() exactly once to signal its judgment; on success, the tool publishes a small set of metadata triples onto the coordinator's loop entity so downstream rules match deterministically, and returns StopLoop=true with the full decision payload in Content so downstream agents can fetch any bulky fields (subtopics list, retry hint) via read_loop_result without them riding in triples.

func NewDecideExecutor

func NewDecideExecutor(publisher TriplePublisher, platform types.PlatformMeta) *DecideExecutor

NewDecideExecutor constructs the executor given a triple publisher and the platform identity used to build the coordinator's loop entity ID.

func (*DecideExecutor) Execute

Execute routes the tool call to decide; any other tool name is treated as a routing bug.

func (*DecideExecutor) ListTools

func (e *DecideExecutor) ListTools() []agentic.ToolDefinition

ListTools describes the decide tool. The action string is NOT enumerated at the tool level — different flows want different terminal actions. The coordinator's system prompt enumerates valid values per flow; downstream rules match on specific action values via the CoordinatorNextAction predicate.

type EmitDiagnosisExecutor

type EmitDiagnosisExecutor struct {
	// contains filtered or unexported fields
}

EmitDiagnosisExecutor is the ops agent's finding emission tool. Each call mints a new {org}.{platform}.ops.diagnosis.finding.{uuid} entity and publishes one triple per predicate plus an agent.action.executed_by back-link to the ops loop entity. StopLoop is false so the agent can emit multiple findings per loop before calling submit_work.

func NewEmitDiagnosisExecutor

func NewEmitDiagnosisExecutor(publisher TriplePublisher, platform types.PlatformMeta, logger *slog.Logger) *EmitDiagnosisExecutor

NewEmitDiagnosisExecutor constructs the executor given a triple publisher, the platform identity used to build entity IDs, and a logger for instrumentation.

func (*EmitDiagnosisExecutor) Execute

Execute routes the tool call to emitDiagnosis; any other name is a routing bug.

func (*EmitDiagnosisExecutor) ListTools

func (e *EmitDiagnosisExecutor) ListTools() []agentic.ToolDefinition

ListTools describes the emit_diagnosis tool schema. The severity enum is enforced during execution (not just in the schema) because small models sometimes emit free-text severity values; the executor clamps invalid values to "info" rather than rejecting the call outright — see Execute.

type ExecutorRegistry

type ExecutorRegistry struct {
	// contains filtered or unexported fields
}

ExecutorRegistry manages tool executors and provides thread-safe registration and execution

func GetGlobalRegistry

func GetGlobalRegistry() *ExecutorRegistry

GetGlobalRegistry returns the global tool registry. Used by agentic-tools component to access registered tools.

func NewExecutorRegistry

func NewExecutorRegistry() *ExecutorRegistry

NewExecutorRegistry creates a new empty executor registry

func (*ExecutorRegistry) Execute

Execute executes a tool call using the registered executor Returns an error result if the tool is not found or execution fails

func (*ExecutorRegistry) GetTool

func (r *ExecutorRegistry) GetTool(name string) ToolExecutor

GetTool retrieves a tool executor by name Returns nil if the tool is not registered

func (*ExecutorRegistry) ListTools

func (r *ExecutorRegistry) ListTools() []agentic.ToolDefinition

ListTools returns all registered tool definitions Returns an empty slice (not nil) when no tools are registered

func (*ExecutorRegistry) RegisterTool

func (r *ExecutorRegistry) RegisterTool(name string, executor ToolExecutor) error

RegisterTool registers a tool executor with the given name Returns an error if a tool with the same name is already registered

type FlowMonitorExecutor

type FlowMonitorExecutor struct {
	// contains filtered or unexported fields
}

FlowMonitorExecutor aggregates completed-loop data for a given flow. It scans the AGENT_LOOPS KV bucket for COMPLETE_* keys, filters by WorkflowSlug, and returns aggregate counts plus a recency-capped list.

func NewFlowMonitorExecutor

func NewFlowMonitorExecutor(kv LoopKVScanner, flows FlowStateReader, logger *slog.Logger) *FlowMonitorExecutor

NewFlowMonitorExecutor constructs the executor with its KV handle and flow state reader. Both must be non-nil; callers that can't satisfy either should skip registration (see executors.registerFlowMonitor).

func (*FlowMonitorExecutor) Execute

Execute routes the tool call to the handler.

func (*FlowMonitorExecutor) ListTools

func (e *FlowMonitorExecutor) ListTools() []agentic.ToolDefinition

ListTools describes the monitor_flow tool.

type FlowState

type FlowState struct {
	RuntimeState string
}

FlowState is the minimal projection of flowstore.Flow that monitor_flow needs. Returned by FlowStateReader.

type FlowStateReader

type FlowStateReader interface {
	Get(ctx context.Context, id string) (FlowState, error)
}

FlowStateReader is the minimal flowstore surface needed to read runtime_state. FlowManager (declared in executors package) already satisfies this — we use a separate interface here to keep agentic-tools free of a direct dependency on the executors package.

type InMemoryToolCallStore

type InMemoryToolCallStore struct {
	// contains filtered or unexported fields
}

InMemoryToolCallStore provides an in-memory implementation of ToolCallStore for testing purposes.

func NewInMemoryToolCallStore

func NewInMemoryToolCallStore() *InMemoryToolCallStore

NewInMemoryToolCallStore creates a new in-memory store for testing.

func (*InMemoryToolCallStore) Close

func (s *InMemoryToolCallStore) Close() error

Close marks the store as closed.

func (*InMemoryToolCallStore) Records

func (s *InMemoryToolCallStore) Records() []ToolCallRecord

Records returns all stored records. For testing only.

func (*InMemoryToolCallStore) Reset

func (s *InMemoryToolCallStore) Reset()

Reset clears all records and reopens the store. For testing only.

func (*InMemoryToolCallStore) Store

Store adds a record to the in-memory store.

type KVToolCallStore

type KVToolCallStore struct {
	// contains filtered or unexported fields
}

KVToolCallStore implements ToolCallStore using NATS KV. It supports lazy initialization with retry logic and can be reset to allow re-initialization after failures.

func NewKVToolCallStore

func NewKVToolCallStore(client *natsclient.Client, bucketName string) *KVToolCallStore

NewKVToolCallStore creates a new KV-backed tool call store. Initialization is deferred until the first Store call.

func (*KVToolCallStore) Close

func (s *KVToolCallStore) Close() error

Close marks the store as closed. The underlying KV connection is managed by the natsclient and should not be closed here.

func (*KVToolCallStore) Reset

func (s *KVToolCallStore) Reset()

Reset allows re-initialization after a failure. This is useful for testing or recovery scenarios.

func (*KVToolCallStore) Store

func (s *KVToolCallStore) Store(ctx context.Context, record ToolCallRecord) error

Store persists a tool call record to the KV store. The key format is "{call_id}.{timestamp_ns}" to allow multiple records per call ID (e.g., retries).

type LoopKVScanner

type LoopKVScanner interface {
	// Keys returns all current keys in the bucket (deleted keys excluded).
	Keys(ctx context.Context) ([]string, error)
	// Get fetches a single KV entry by exact key.
	Get(ctx context.Context, key string) (*natsclient.KVEntry, error)
}

LoopKVScanner is the minimal KV surface monitor_flow needs: list keys and fetch individual entries. *natsclient.KVStore satisfies this interface by duck-typing. Declared here so unit tests can inject an in-memory fake without pulling in the NATS stack.

type LoopsKVReader

type LoopsKVReader interface {
	Get(ctx context.Context, key string) (*natsclient.KVEntry, error)
}

LoopsKVReader is the minimal surface this executor consumes. It matches natsclient.KVStore.Get exactly, so production callers pass a *KVStore directly and tests supply an in-memory fake. Keeping the interface tiny and typed on natsclient.KVEntry avoids re-adapting jetstream types here — natsclient already owns that translation (see natsclient/kv.go).

type ReadLoopResultExecutor

type ReadLoopResultExecutor struct {
	// contains filtered or unexported fields
}

ReadLoopResultExecutor fetches a completed loop's full Result from the AGENT_LOOPS KV bucket so downstream agents can read another loop's output without having it injected into their prompt. Supports paging (max_bytes, offset) so small-context-window agents can consume large outputs a slice at a time.

func NewReadLoopResultExecutor

func NewReadLoopResultExecutor(kv LoopsKVReader) *ReadLoopResultExecutor

NewReadLoopResultExecutor constructs the executor against a KV reader scoped to the loops bucket (AGENT_LOOPS by default).

func (*ReadLoopResultExecutor) Execute

Execute routes the tool call to the handler.

func (*ReadLoopResultExecutor) ListTools

ListTools describes the read_loop_result tool.

type RecordingExecutor

type RecordingExecutor struct {
	// contains filtered or unexported fields
}

RecordingExecutor wraps a ToolExecutor and records all calls to a store. It uses a buffered channel and background goroutine for non-blocking recording.

func NewRecordingExecutor

func NewRecordingExecutor(wrapped ToolExecutor, store ToolCallStore, logger *slog.Logger) *RecordingExecutor

NewRecordingExecutor creates a new recording executor that wraps the given executor and records all calls to the provided store.

func (*RecordingExecutor) Execute

Execute executes the wrapped tool and records the call

func (*RecordingExecutor) ListTools

func (r *RecordingExecutor) ListTools() []agentic.ToolDefinition

ListTools returns tools from the wrapped executor

func (*RecordingExecutor) Stop

func (r *RecordingExecutor) Stop(timeout time.Duration) error

Stop gracefully shuts down the recording executor, waiting for pending records to be stored up to the specified timeout.

type RegistryInterface

type RegistryInterface interface {
	RegisterWithConfig(component.RegistrationConfig) error
}

RegistryInterface defines the minimal interface needed for registration

type RetryPolicy

type RetryPolicy struct {
	// MaxAttempts is the total number of tries including the first call.
	// 1 means no retry. Values below 1 are clamped to 1.
	MaxAttempts int `json:"max_attempts" schema:"type:int,description:Total attempts including the first call (1 = no retry),default:1"`

	// BackoffInitialMs is the wait before the second attempt, in
	// milliseconds. Subsequent attempts use exponential backoff
	// (initial * 2^(attempt-1)) capped at BackoffMaxMs.
	BackoffInitialMs int `json:"backoff_initial_ms,omitempty" schema:"type:int,description:Initial backoff before retry in milliseconds,default:100"`

	// BackoffMaxMs caps the per-attempt backoff.
	BackoffMaxMs int `json:"backoff_max_ms,omitempty" schema:"type:int,description:Maximum backoff between retries in milliseconds,default:2000"`

	// RetryOnKinds names the ToolErrorKind values that should trigger
	// retry. Defaults to ["timeout", "external", "network"] when unset. Pass an
	// explicit empty list to retry only on raw executor errors (no
	// tool-level error kind considered).
	RetryOnKinds []string `` /* 160-byte string literal not displayed */
}

RetryPolicy controls how a single tool's transient failures are retried inside executeWithTimeout. Zero/empty values are replaced with defaults by the runtime; an all-zero policy still means "no retry" because MaxAttempts defaults to 1.

type ToolCallRecord

type ToolCallRecord struct {
	Call      agentic.ToolCall   `json:"call"`
	Result    agentic.ToolResult `json:"result"`
	StartTime time.Time          `json:"start_time"`
	EndTime   time.Time          `json:"end_time"`
	Duration  time.Duration      `json:"duration"`
}

ToolCallRecord captures a tool call and its result for auditing

type ToolCallStore

type ToolCallStore interface {
	Store(ctx context.Context, record ToolCallRecord) error
	Close() error
}

ToolCallStore interface for persisting tool call records

type ToolCategory

type ToolCategory string

ToolCategory groups tools by their primary function for token optimization and role-based filtering. Inspired by semdragon's category system.

const (
	// CategoryCore contains essential tools available to all agents (submit_work, etc.)
	CategoryCore ToolCategory = "core"

	// CategoryKnowledge contains graph query and search tools.
	CategoryKnowledge ToolCategory = "knowledge"

	// CategoryNetwork contains web search, HTTP request, and external API tools.
	CategoryNetwork ToolCategory = "network"

	// CategoryInspect contains code execution tools (bash, sandbox).
	CategoryInspect ToolCategory = "inspect"

	// CategoryOrchestration contains tools for spawning sub-agents and decomposing tasks.
	CategoryOrchestration ToolCategory = "orchestration"

	// CategoryMeta contains tools for self-programming (create_rule, manage_flow).
	CategoryMeta ToolCategory = "meta"
)

func GetToolCategory

func GetToolCategory(toolName string) ToolCategory

GetToolCategory returns the category for a tool name. Returns CategoryCore as default for unregistered tools.

type ToolDefinition

type ToolDefinition struct {
	Name        string `json:"name"`
	Description string `json:"description"`
	Provider    string `json:"provider"`
	Available   bool   `json:"available"`
}

ToolDefinition represents a tool definition for discovery responses

type ToolExecutor

type ToolExecutor interface {
	Execute(ctx context.Context, call agentic.ToolCall) (agentic.ToolResult, error)
	ListTools() []agentic.ToolDefinition
}

ToolExecutor defines the interface for tool executors

type ToolListResponse

type ToolListResponse struct {
	Tools []ToolDefinition `json:"tools"`
}

ToolListResponse represents the response to a tool.list request

type TriplePublisher

type TriplePublisher interface {
	AddTriple(ctx context.Context, triple message.Triple) error
}

TriplePublisher is the narrow surface DecideExecutor uses to write triples. Production satisfies it with a natsclient.Client adapter; tests use an in-memory recorder so they don't need a real NATS connection.

func NewNATSTriplePublisher

func NewNATSTriplePublisher(client *natsclient.Client) TriplePublisher

NewNATSTriplePublisher builds a TriplePublisher backed by the shared graph.mutation.triple.add NATS surface.

Directories

Path Synopsis
Package executors provides tool executor implementations for the agentic system.
Package executors provides tool executor implementations for the agentic system.
Package sandbox provides an HTTP client for the sandbox server.
Package sandbox provides an HTTP client for the sandbox server.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL