agentictools

package
v1.0.0-beta.19 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 28, 2026 License: MIT Imports: 24 Imported by: 0

README

agentic-tools

Tool execution component for the agentic processing system.

Overview

The agentic-tools component executes tool calls from the agentic loop orchestrator. It receives ToolCall messages, dispatches them to registered tool executors, and publishes ToolResult messages back. Supports tool registration, allowlist filtering, per-execution timeouts, and concurrent execution.

Architecture

┌───────────────┐     ┌────────────────┐     ┌──────────────────┐
│ agentic-loop  │────►│ agentic-tools  │────►│ Tool Executors   │
│               │     │                │     │ (your code)      │
│               │◄────│                │◄────│                  │
└───────────────┘     └────────────────┘     └──────────────────┘
  tool.execute.*        Execute()           read_file, query_db,
  tool.result.*                             call_api, etc.

Features

  • Tool Registration: Register custom tool executors at runtime
  • Allowlist Filtering: Restrict which tools can execute
  • Timeout Handling: Per-execution timeout with context cancellation
  • Concurrent Execution: Multiple tools can run in parallel

Configuration

{
  "type": "processor",
  "name": "agentic-tools",
  "enabled": true,
  "config": {
    "stream_name": "AGENT",
    "timeout": "60s",
    "allowed_tools": null,
    "ports": {
      "inputs": [
        {"name": "tool_calls", "type": "jetstream", "subject": "tool.execute.>", "stream_name": "AGENT"}
      ],
      "outputs": [
        {"name": "tool_results", "type": "jetstream", "subject": "tool.result.*", "stream_name": "AGENT"}
      ]
    }
  }
}
Configuration Options
Option Type Default Description
allowed_tools []string null Tool allowlist (null/empty = allow all)
timeout string "60s" Per-tool execution timeout
stream_name string "AGENT" JetStream stream name
consumer_name_suffix string "" Suffix for consumer names (for testing)
ports object (defaults) Port configuration
JetStream Integration

All messaging uses JetStream for durability. Tool call subjects require the AGENT stream to exist with subjects matching tool.execute.> and tool.result.>.

Consumer naming: agentic-tools-{subject-pattern}

Ports

Inputs
Name Type Subject Description
tool_calls jetstream tool.execute.> Tool calls from agentic-loop
Outputs
Name Type Subject Description
tool_results jetstream tool.result.* Tool results to agentic-loop

Tool Registration

Tools implement the ToolExecutor interface:

type ToolExecutor interface {
    Execute(ctx context.Context, call agentic.ToolCall) (agentic.ToolResult, error)
    ListTools() []agentic.ToolDefinition
}

The tool registry follows ADR-029 Pattern A (boot-registry): the embedding binary constructs an *ExecutorRegistry, registers builtins and any custom tools at startup, then plumbs the registry through component.Dependencies.ToolRegistry. There is no package-level singleton — every process owns its registry explicitly. This mirrors how component.Registry is wired in cmd/semstreams/main.go.

Shared Registration (Preferred)

The shared registry is built once at boot and is what all components in the process resolve against. Embedders typically register builtins via executors.RegisterBuiltins, then layer custom tools on top:

package main

import (
    "github.com/c360studio/semstreams/component"
    agentictools "github.com/c360studio/semstreams/processor/agentic-tools"
    "github.com/c360studio/semstreams/processor/agentic-tools/executors"
    mytools "example.com/mytools"
)

func bootstrap(ctx context.Context /* ... */) error {
    reg := agentictools.NewExecutorRegistry()

    // Builtin tools (bash, http_request, github_*, rule CRUD, etc.).
    if err := executors.RegisterBuiltins(ctx, reg, executors.ToolDependencies{
        // ...nats client, platform, managers...
    }); err != nil {
        return err
    }

    // Custom tools.
    if err := reg.RegisterTool("read_file", &mytools.FileReader{}); err != nil {
        return err
    }
    if err := reg.RegisterTool("query_graph", &mytools.GraphQueryExecutor{}); err != nil {
        return err
    }

    // Plumb into the dependencies that components see.
    deps := component.Dependencies{
        ToolRegistry: reg,
        // ...nats client, etc...
    }
    _ = deps
    return nil
}

Duplicate registration returns an error — boot-time conflicts surface immediately rather than being silently swallowed.

Per-Component Registration

For component-specific tools, register after creating the component:

comp, _ := agentictools.NewComponent(rawConfig, deps)
toolsComp := comp.(*agentictools.Component)

// Register component-specific executors
toolsComp.RegisterToolExecutor(&CustomExecutor{})

// Start component
lc := comp.(component.LifecycleComponent)
lc.Initialize()
lc.Start(ctx)

The component-local registry is dispatched first; the shared registry from deps.ToolRegistry is the fallback. Local beats shared — register a thin ToolExecutor wrapper locally to override or post-process a builtin without disturbing the shared registry that other components see.

Extending an existing tool (wrapping pattern)

To customise a builtin (e.g., transform http_request results before they reach the loop), wrap the inner executor and register the wrapper at the same name on the component-local registry:

type loggedHTTP struct{ inner agentictools.ToolExecutor }

func (l loggedHTTP) ListTools() []agentic.ToolDefinition { return l.inner.ListTools() }

func (l loggedHTTP) Execute(ctx context.Context, call agentic.ToolCall) (agentic.ToolResult, error) {
    res, err := l.inner.Execute(ctx, call)
    // post-process / annotate / metricize / redact / ...
    return res, err
}

// In the binary that owns the component:
shared := /* the deps.ToolRegistry built at boot */
inner := shared.GetTool("http_request")
toolsComp.RegisterToolExecutor(loggedHTTP{inner: inner}) // local wins for this component

The shared registry is untouched, so other components in the process still resolve the original http_request. This is the recommended path for downstream consumers; see the test file wrapping_pattern_test.go for the contract this guarantees.

Example Implementation
type FileReader struct{}

func (f *FileReader) Execute(ctx context.Context, call agentic.ToolCall) (agentic.ToolResult, error) {
    var args struct {
        Path string `json:"path"`
    }
    if err := json.Unmarshal([]byte(call.Arguments), &args); err != nil {
        return agentic.ToolResult{CallID: call.ID, Error: err.Error()}, nil
    }

    // Respect context cancellation
    select {
    case <-ctx.Done():
        return agentic.ToolResult{CallID: call.ID, Error: "cancelled"}, ctx.Err()
    default:
    }

    content, err := os.ReadFile(args.Path)
    if err != nil {
        return agentic.ToolResult{CallID: call.ID, Error: err.Error()}, nil
    }

    return agentic.ToolResult{CallID: call.ID, Content: string(content)}, nil
}

func (f *FileReader) ListTools() []agentic.ToolDefinition {
    return []agentic.ToolDefinition{{
        Name:        "read_file",
        Description: "Read the contents of a file",
        Parameters: map[string]any{
            "type": "object",
            "properties": map[string]any{
                "path": map[string]any{"type": "string", "description": "File path"},
            },
            "required": []string{"path"},
        },
    }}
}

Tool Allowlist

Control which tools can execute:

{
  "allowed_tools": ["read_file", "list_dir", "query_graph"]
}
Config Behavior
null or [] All registered tools allowed
["tool1", "tool2"] Only listed tools allowed

Blocked tools return an error result (not a Go error):

{
  "call_id": "call_001",
  "error": "tool 'delete_file' is not allowed"
}

Message Formats

ToolCall (Input)
{
  "id": "call_001",
  "name": "read_file",
  "arguments": {
    "path": "/etc/hosts"
  }
}
ToolResult (Output)
{
  "call_id": "call_001",
  "content": "127.0.0.1 localhost\n...",
  "error": "",
  "metadata": {}
}

Common Tools to Implement

Tool Description
read_file Read file contents
write_file Write content to file
list_dir List directory contents
fetch_url HTTP GET request
call_api Generic HTTP request
query_graph Query knowledge graph
run_command Execute shell command

Troubleshooting

Tool not found
  • Verify tool executor is registered before Start()
  • Check tool name matches exactly (case-sensitive)
  • Ensure ListTools() returns the correct name
Tool timeout
  • Increase timeout for long-running operations
  • Implement context cancellation in executor
  • Check for blocking operations
Tool blocked by allowlist
  • Add tool name to allowed_tools array
  • Set allowed_tools: null to allow all
  • Verify tool name spelling
Concurrent execution issues
  • Ensure tool executor is thread-safe
  • Don't share mutable state between calls
  • Use proper synchronization if needed

Documentation

Overview

Package agentictools provides a tool executor processor component that routes tool calls to registered tool executors with filtering and timeout support.

Package agentictools provides the tool execution processor for the SemStreams agentic system.

Overview

The agentic-tools processor executes tool calls from the agentic loop orchestrator. It receives ToolCall messages, dispatches them to registered tool executors, and publishes ToolResult messages back. The processor supports tool registration, allowlist filtering, per-execution timeouts, and concurrent execution.

This processor enables agents to interact with external systems (files, APIs, databases, etc.) through a well-defined tool interface.

Architecture

The tools processor sits between the loop orchestrator and tool implementations:

┌───────────────┐     ┌────────────────┐     ┌──────────────────┐
│ agentic-loop  │────▶│ agentic-tools  │────▶│ Tool Executors   │
│               │     │ (this pkg)     │     │ (your code)      │
│               │◀────│                │◀────│                  │
└───────────────┘     └────────────────┘     └──────────────────┘
  tool.execute.*        Execute()           read_file, query_db,
  tool.result.*                             call_api, etc.

ToolExecutor Interface

Tools are implemented by satisfying the ToolExecutor interface:

type ToolExecutor interface {
    Execute(ctx context.Context, call agentic.ToolCall) (agentic.ToolResult, error)
    ListTools() []agentic.ToolDefinition
}

Example implementation:

type FileReader struct{}

func (f *FileReader) Execute(ctx context.Context, call agentic.ToolCall) (agentic.ToolResult, error) {
    path, _ := call.Arguments["path"].(string)

    // Respect context cancellation
    select {
    case <-ctx.Done():
        return agentic.ToolResult{CallID: call.ID, Error: "cancelled"}, ctx.Err()
    default:
    }

    content, err := os.ReadFile(path)
    if err != nil {
        return agentic.ToolResult{
            CallID: call.ID,
            Error:  err.Error(),
        }, nil
    }

    return agentic.ToolResult{
        CallID:  call.ID,
        Content: string(content),
    }, nil
}

func (f *FileReader) ListTools() []agentic.ToolDefinition {
    return []agentic.ToolDefinition{
        {
            Name:        "read_file",
            Description: "Read the contents of a file",
            Parameters: map[string]any{
                "type": "object",
                "properties": map[string]any{
                    "path": map[string]any{
                        "type":        "string",
                        "description": "Path to the file",
                    },
                },
                "required": []string{"path"},
            },
        },
    }
}

Tool Registration

The tool registry is a constructor-injected ADR-029 Pattern A registry, matching component.Registry. The embedding binary owns one *ExecutorRegistry per process and plumbs it through component.Dependencies.ToolRegistry. There is no package-level singleton.

Tools can be registered in two ways:

1. Shared registration (preferred for tools used across components):

	reg := agentictools.NewExecutorRegistry()
	if err := reg.RegisterTool("my_tool", &MyToolExecutor{}); err != nil {
	    return err
	}
	deps := component.Dependencies{ToolRegistry: reg}

 2. Per-component registration (for component-specific overrides or
    wrappers that should not be visible to other components):

    comp, _ := agentictools.NewComponent(rawConfig, deps)
    toolsComp := comp.(*agentictools.Component)
    err := toolsComp.RegisterToolExecutor(&FileReader{})

Component-local registrations beat the shared registry for the same tool name — see wrapping_pattern_test.go for the precedence contract.

The processor extracts tool names from ListTools() for routing and validation.

ExecutorRegistry

The ExecutorRegistry provides thread-safe tool management:

registry := NewExecutorRegistry()

// Register tools
registry.RegisterTool("read_file", &FileReader{})
registry.RegisterTool("query_db", &DatabaseQuerier{})

// Get executor by name
executor := registry.GetTool("read_file")

// List all available tools
tools := registry.ListTools()

// Execute a tool call
result, err := registry.Execute(ctx, toolCall)

The registry prevents duplicate registrations and returns descriptive errors for missing tools.

Tool Allowlist

The processor supports allowlist filtering for security and control:

config := agentictools.Config{
    AllowedTools: []string{"read_file", "list_dir"},  // Only these allowed
    // ...
}

Behavior:

  • Empty/nil AllowedTools: All registered tools are allowed
  • Populated AllowedTools: Only listed tools can execute
  • Blocked tools return an error result (not a Go error)

Example blocked response:

result := agentic.ToolResult{
    CallID: "call_001",
    Error:  "tool 'delete_file' is not allowed",
}

Timeout Handling

Each tool execution runs with a configurable timeout:

config := agentictools.Config{
    Timeout: "60s",  // Per-tool execution timeout
    // ...
}

The timeout is enforced via context cancellation. Tool implementations should respect ctx.Done() for proper cancellation:

func (t *SlowTool) Execute(ctx context.Context, call agentic.ToolCall) (agentic.ToolResult, error) {
    select {
    case <-ctx.Done():
        return agentic.ToolResult{CallID: call.ID, Error: "execution cancelled"}, ctx.Err()
    case result := <-t.doWork(call):
        return result, nil
    }
}

Timeout errors are returned as ToolResult.Error, not as Go errors.

Quick Start

Configure and start the processor:

config := agentictools.Config{
    StreamName:   "AGENT",
    AllowedTools: nil,  // Allow all
    Timeout:      "60s",
}

rawConfig, _ := json.Marshal(config)
comp, err := agentictools.NewComponent(rawConfig, deps)

// Register tools
toolsComp := comp.(*agentictools.Component)
toolsComp.RegisterToolExecutor(&FileReader{})
toolsComp.RegisterToolExecutor(&WebFetcher{})

// Start
lc := comp.(component.LifecycleComponent)
lc.Initialize()
lc.Start(ctx)

Configuration Reference

Full configuration schema:

{
    "allowed_tools": ["string", ...],
    "timeout": "string (default: 60s)",
    "stream_name": "string (default: AGENT)",
    "consumer_name_suffix": "string (optional)",
    "ports": {
        "inputs": [...],
        "outputs": [...]
    }
}

Configuration fields:

  • allowed_tools: List of tool names to allow (nil/empty = allow all)
  • timeout: Per-tool execution timeout as duration string (default: "60s")
  • stream_name: JetStream stream name for agentic messages (default: "AGENT")
  • consumer_name_suffix: Optional suffix for JetStream consumer names (for testing)
  • ports: Port configuration for inputs and outputs

Ports

Input ports (JetStream consumers):

  • tool.execute: Tool execution requests from agentic-loop (subject: tool.execute.>)

Output ports (JetStream publishers):

  • tool.result: Tool execution results to agentic-loop (subject: tool.result.*)

Message Flow

The processor handles each tool call through:

  1. Receive ToolCall from tool.execute.>
  2. Validate tool is in allowlist (if configured)
  3. Look up executor in registry
  4. Create timeout context
  5. Execute tool with context
  6. Publish ToolResult to tool.result.{call_id}
  7. Acknowledge JetStream message

Error Handling

Errors are categorized into two types:

**Tool execution errors** (returned in ToolResult.Error):

  • Tool not found in registry
  • Tool not in allowlist
  • Tool execution failed
  • Timeout exceeded

**System errors** (returned as Go error):

  • JSON marshaling failures
  • NATS publishing failures

Tool execution errors don't fail the loop - the agent can handle them:

if result.Error != "" {
    // Agent sees: "Error: file not found"
    // Agent can try alternative approach
}

Concurrent Execution

The processor handles concurrent tool calls safely:

  • ExecutorRegistry uses RWMutex for thread-safe access
  • Each tool call gets its own goroutine
  • Context cancellation propagates to all active executions

Multiple tools can execute in parallel when the loop sends concurrent calls:

// agentic-loop sends two tool calls
tool.execute.read_file  → executes
tool.execute.query_db   → executes concurrently

Built-in Tools

The package does not include built-in tools - all tools must be registered by the application. This keeps the processor focused and allows full control over available capabilities.

Common tools to implement:

  • File operations: read_file, write_file, list_dir
  • Web operations: fetch_url, call_api
  • Database operations: query, insert, update
  • Graph operations: graph_query (query knowledge graph)

Thread Safety

The Component is safe for concurrent use after Start() is called:

  • ExecutorRegistry uses RWMutex for all operations
  • Tool registration should complete before Start()
  • Multiple tool calls can execute concurrently

Testing

For testing, use mock executors and unique consumer names:

type MockExecutor struct {
    result agentic.ToolResult
}

func (m *MockExecutor) Execute(ctx context.Context, call agentic.ToolCall) (agentic.ToolResult, error) {
    m.result.CallID = call.ID
    return m.result, nil
}

func (m *MockExecutor) ListTools() []agentic.ToolDefinition {
    return []agentic.ToolDefinition{{Name: "mock_tool"}}
}

// In test
config := agentictools.Config{
    StreamName:         "AGENT",
    ConsumerNameSuffix: "test-" + t.Name(),
}

Limitations

Current limitations:

  • No tool versioning (single version per name)
  • No tool dependencies or ordering
  • No streaming tool output
  • No built-in rate limiting per tool
  • Timeout is global, not per-tool configurable

See Also

Related packages:

  • agentic: Shared types (ToolCall, ToolResult, ToolDefinition)
  • processor/agentic-loop: Loop orchestration
  • processor/agentic-model: LLM endpoint integration

JetStream Integration

All messaging uses JetStream for durability. Tool call subjects require the AGENT stream to exist with subjects matching tool.execute.> and tool.result.>.

Consumer naming follows the pattern: agentic-tools-{subject-pattern}

Package agentictools provides Prometheus metrics for agentic-tools component.

Index

Examples

Constants

View Source
const ApprovalRequiredPrefix = agentic.ApprovalRequiredPrefix

ApprovalRequiredPrefix is an alias for agentic.ApprovalRequiredPrefix retained so existing call sites in this package compile unchanged. The canonical declaration lives in the agentic package because both agentic-tools (filter producer) and agentic-loop (filter consumer) need to reference it; keeping it here would force agentic-loop to import a sibling component.

View Source
const ComponentCatalogToolName = "list_components"

ComponentCatalogToolName is the tool name agents use to invoke list_components.

View Source
const DecideToolName = "decide"

DecideToolName is the name agents use to invoke the coordinator's terminal decision tool.

View Source
const EmitDiagnosisToolName = "emit_diagnosis"

EmitDiagnosisToolName is the name agents use to invoke the ops agent's diagnosis emission tool.

View Source
const FlowMonitorToolName = "monitor_flow"

FlowMonitorToolName is the tool name agents use to invoke monitor_flow.

View Source
const ReadLoopResultToolName = "read_loop_result"

ReadLoopResultToolName is the name agents use to invoke the read_loop_result tool.

Variables

This section is empty.

Functions

func ConsumerNameForTool

func ConsumerNameForTool(toolName string) string

ConsumerNameForTool generates a JetStream consumer name for a tool. Sanitizes dots and underscores to dashes, adds "tool-exec-" prefix.

Examples:

file_read → tool-exec-file-read
graph.query → tool-exec-graph-query

func IsApprovalRequired

func IsApprovalRequired(reason string) bool

IsApprovalRequired delegates to agentic.IsApprovalRequired so this package's existing test suite keeps working without churn.

func NewComponent

func NewComponent(rawConfig json.RawMessage, deps component.Dependencies) (component.Discoverable, error)

NewComponent creates a new agentic-tools processor component

func ReadOnlyCategories

func ReadOnlyCategories() map[ToolCategory]bool

ReadOnlyCategories returns the set of categories safe for explore sub-agents (no write operations, no orchestration, no meta-programming).

func Register

func Register(registry RegistryInterface) error

Register registers the agentic-tools processor component with the given registry

func RegisterToolCategory

func RegisterToolCategory(toolName string, category ToolCategory)

RegisterToolCategory registers a category for a tool name. This allows executor implementations to declare their category at registration time.

Types

type ApprovalFilter

type ApprovalFilter struct {
	// contains filtered or unexported fields
}

ApprovalFilter implements agentic.ToolCallFilter. It checks each tool call against a configured list of tool names that require human approval. If a tool is in the list, the call is rejected with an ApprovalRequiredPrefix reason so the loop can transition to awaiting_approval.

func NewApprovalFilter

func NewApprovalFilter(approvalRequired []string) *ApprovalFilter

NewApprovalFilter creates a filter from the configured list of tool names that require approval.

func (*ApprovalFilter) FilterToolCalls

func (f *ApprovalFilter) FilterToolCalls(_ string, calls []agentic.ToolCall) (agentic.ToolCallFilterResult, error)

FilterToolCalls checks each call against the configured approval list. A call carrying a non-empty ApprovedBy is the explicit bypass token: the loop only sets it when re-dispatching after a human ApprovalResponse arrived, so we trust it and pass the call through regardless of the gating list. Empty ApprovedBy means the call has not been through approval and the gating list applies normally.

type Component

type Component struct {
	// contains filtered or unexported fields
}

Component implements the agentic-tools processor

func (*Component) ConfigSchema

func (c *Component) ConfigSchema() component.ConfigSchema

ConfigSchema returns the configuration schema

func (*Component) DataFlow

func (c *Component) DataFlow() component.FlowMetrics

DataFlow returns current data flow metrics

func (*Component) Execute

func (c *Component) Execute(ctx context.Context, call agentic.ToolCall) (agentic.ToolResult, error)

Execute executes a tool call (for testing and direct invocation)

func (*Component) Health

func (c *Component) Health() component.HealthStatus

Health returns the current health status

func (*Component) Initialize

func (c *Component) Initialize() error

Initialize prepares the component (no-op for this component)

func (*Component) InputPorts

func (c *Component) InputPorts() []component.Port

InputPorts returns configured input port definitions

func (*Component) ListTools

func (c *Component) ListTools() []ToolDefinition

ListTools returns all tool definitions for discovery. Combines tools from both the component's local registry and the shared (deps-injected) registry. Local entries override shared entries with the same name.

func (*Component) Meta

func (c *Component) Meta() component.Metadata

Meta returns component metadata

func (*Component) OutputPorts

func (c *Component) OutputPorts() []component.Port

OutputPorts returns configured output port definitions

func (*Component) RegisterToolExecutor

func (c *Component) RegisterToolExecutor(executor ToolExecutor) error

RegisterToolExecutor registers a tool executor with the component This method extracts all tools from the executor and registers them individually

func (*Component) Start

func (c *Component) Start(ctx context.Context) error

Start begins processing tool calls

func (*Component) Stop

func (c *Component) Stop(timeout time.Duration) error

Stop gracefully stops the component within the given timeout

type ComponentCatalogExecutor

type ComponentCatalogExecutor struct {
	// contains filtered or unexported fields
}

ComponentCatalogExecutor introspects the component factory registry and returns available component types. The optional type_filter arg narrows the response to a single entry, keeping LLM context pressure low for large registries (30+ factory types produce substantial JSON).

func NewComponentCatalogExecutor

func NewComponentCatalogExecutor(registry *component.Registry, logger *slog.Logger) *ComponentCatalogExecutor

NewComponentCatalogExecutor constructs the executor. registry must be non-nil; callers that can't provide one should skip registration entirely (see executors.registerComponentCatalog).

func (*ComponentCatalogExecutor) Execute

Execute routes the tool call to the handler.

func (*ComponentCatalogExecutor) ListTools

ListTools describes the list_components tool.

type Config

type Config struct {
	Ports                *component.PortConfig `json:"ports"                schema:"type:ports,description:Port configuration,category:basic"`
	StreamName           string                `` /* 132-byte string literal not displayed */
	ConsumerNameSuffix   string                `` /* 127-byte string literal not displayed */
	DeleteConsumerOnStop bool                  `` /* 157-byte string literal not displayed */
	Timeout              string                `json:"timeout"              schema:"type:string,description:Tool execution timeout,category:advanced,default:60s"`
	AllowedTools         []string              `json:"allowed_tools"        schema:"type:array,description:List of allowed tools (nil/empty allows all),category:advanced"`
	ApprovalRequired     []string              `` /* 137-byte string literal not displayed */
	EnableCategories     bool                  `` /* 150-byte string literal not displayed */
	LoopsBucket          string                `` /* 168-byte string literal not displayed */

	// ToolRetries is an opt-in per-tool retry policy. Tools not listed run
	// without retries. Use this for tools where transient failures (timeout,
	// external 5xx) are worth auto-retrying at the framework layer instead
	// of burning LLM iteration budget. Validation-shaped errors
	// (invalid_args, not_found) deliberately do NOT retry here by default —
	// those need LLM feedback via the agent's iteration loop.
	ToolRetries map[string]RetryPolicy `` /* 167-byte string literal not displayed */
}

Config holds configuration for agentic-tools processor component

func DefaultConfig

func DefaultConfig() Config

DefaultConfig returns default configuration for agentic-tools processor

func (*Config) Validate

func (c *Config) Validate() error

Validate checks the configuration for errors

type DecideExecutor

type DecideExecutor struct {
	// contains filtered or unexported fields
}

DecideExecutor is the coordinator's terminal tool. A coordinator agent calls decide() exactly once to signal its judgment; on success, the tool publishes a small set of metadata triples onto the coordinator's loop entity so downstream rules match deterministically, and returns StopLoop=true with the full decision payload in Content so downstream agents can fetch any bulky fields (subtopics list, retry hint) via read_loop_result without them riding in triples.

func NewDecideExecutor

func NewDecideExecutor(publisher TriplePublisher, platform types.PlatformMeta) *DecideExecutor

NewDecideExecutor constructs the executor given a triple publisher and the platform identity used to build the coordinator's loop entity ID.

func (*DecideExecutor) Execute

Execute routes the tool call to decide; any other tool name is treated as a routing bug.

func (*DecideExecutor) ListTools

func (e *DecideExecutor) ListTools() []agentic.ToolDefinition

ListTools describes the decide tool. The action string is NOT enumerated at the tool level — different flows want different terminal actions. The coordinator's system prompt enumerates valid values per flow; downstream rules match on specific action values via the CoordinatorNextAction predicate.

type EmitDiagnosisExecutor

type EmitDiagnosisExecutor struct {
	// contains filtered or unexported fields
}

EmitDiagnosisExecutor is the ops agent's finding emission tool. Each call mints a new {org}.{platform}.ops.diagnosis.finding.{uuid} entity and publishes one triple per predicate plus an agent.action.executed_by back-link to the ops loop entity. StopLoop is false so the agent can emit multiple findings per loop before calling submit_work.

func NewEmitDiagnosisExecutor

func NewEmitDiagnosisExecutor(publisher TriplePublisher, platform types.PlatformMeta, logger *slog.Logger) *EmitDiagnosisExecutor

NewEmitDiagnosisExecutor constructs the executor given a triple publisher, the platform identity used to build entity IDs, and a logger for instrumentation.

func (*EmitDiagnosisExecutor) Execute

Execute routes the tool call to emitDiagnosis; any other name is a routing bug.

func (*EmitDiagnosisExecutor) ListTools

func (e *EmitDiagnosisExecutor) ListTools() []agentic.ToolDefinition

ListTools describes the emit_diagnosis tool schema. The severity enum is enforced during execution (not just in the schema) because small models sometimes emit free-text severity values; the executor clamps invalid values to "info" rather than rejecting the call outright — see Execute.

type ExecutorRegistry

type ExecutorRegistry struct {
	// contains filtered or unexported fields
}

ExecutorRegistry manages tool executors and provides thread-safe registration and execution

Example

ExampleExecutorRegistry shows how a downstream binary registers a custom tool executor against the shared tool registry. The binary builds the registry once at boot, registers builtins via executors.RegisterBuiltins (omitted for brevity), then layers any custom tools on top, and plumbs the registry into component.Dependencies.ToolRegistry.

package main

import (
	"context"
	"fmt"

	"github.com/c360studio/semstreams/agentic"
	agentictools "github.com/c360studio/semstreams/processor/agentic-tools"
)

// myExecutor is a minimal tool implementation used in the example.
type myExecutor struct{}

func (myExecutor) ListTools() []agentic.ToolDefinition {
	return []agentic.ToolDefinition{{
		Name:        "my_tool",
		Description: "example tool",
		Parameters:  map[string]any{"type": "object", "properties": map[string]any{}},
	}}
}

func (myExecutor) Execute(_ context.Context, call agentic.ToolCall) (agentic.ToolResult, error) {
	return agentic.ToolResult{CallID: call.ID, Content: "executed"}, nil
}

// ExampleExecutorRegistry shows how a downstream binary registers a custom
// tool executor against the shared tool registry. The binary builds the
// registry once at boot, registers builtins via executors.RegisterBuiltins
// (omitted for brevity), then layers any custom tools on top, and plumbs
// the registry into component.Dependencies.ToolRegistry.
func main() {
	reg := agentictools.NewExecutorRegistry()

	// Register a custom tool. Duplicate names error — register once.
	if err := reg.RegisterTool("my_tool", myExecutor{}); err != nil {
		panic(err)
	}

	// In production, the registry is plumbed via component.Dependencies:
	//
	//   deps := component.Dependencies{ToolRegistry: reg}
	//
	// For this example we just call Execute directly.
	res, err := reg.Execute(context.Background(), agentic.ToolCall{
		ID:   "call-1",
		Name: "my_tool",
	})
	if err != nil {
		panic(err)
	}
	fmt.Println(res.Content)
}
Output:
executed

func NewExecutorRegistry

func NewExecutorRegistry() *ExecutorRegistry

NewExecutorRegistry creates a new empty executor registry

func (*ExecutorRegistry) Execute

Execute executes a tool call using the registered executor.

On a miss, returns a populated ToolResult (so callers can publish it as an error response without further work) plus a wrapped agentic.ErrToolNotFound. Callers detect the miss via errors.Is(err, agentic.ErrToolNotFound) instead of parsing error strings — replaces the string-match dispatch fallback that lived in component.go before the registry refactor.

func (*ExecutorRegistry) GetTool

func (r *ExecutorRegistry) GetTool(name string) ToolExecutor

GetTool retrieves a tool executor by name Returns nil if the tool is not registered

func (*ExecutorRegistry) ListTools

func (r *ExecutorRegistry) ListTools() []agentic.ToolDefinition

ListTools returns all registered tool definitions Returns an empty slice (not nil) when no tools are registered

func (*ExecutorRegistry) RegisterTool

func (r *ExecutorRegistry) RegisterTool(name string, executor ToolExecutor) error

RegisterTool registers a tool executor with the given name Returns an error if a tool with the same name is already registered

type FlowMonitorExecutor

type FlowMonitorExecutor struct {
	// contains filtered or unexported fields
}

FlowMonitorExecutor aggregates completed-loop data for a given flow. It scans the AGENT_LOOPS KV bucket for COMPLETE_* keys, filters by WorkflowSlug, and returns aggregate counts plus a recency-capped list.

func NewFlowMonitorExecutor

func NewFlowMonitorExecutor(kv LoopKVScanner, flows FlowStateReader, logger *slog.Logger) *FlowMonitorExecutor

NewFlowMonitorExecutor constructs the executor with its KV handle and flow state reader. Both must be non-nil; callers that can't satisfy either should skip registration (see executors.registerFlowMonitor).

func (*FlowMonitorExecutor) Execute

Execute routes the tool call to the handler.

func (*FlowMonitorExecutor) ListTools

func (e *FlowMonitorExecutor) ListTools() []agentic.ToolDefinition

ListTools describes the monitor_flow tool.

type FlowState

type FlowState struct {
	RuntimeState string
}

FlowState is the minimal projection of flowstore.Flow that monitor_flow needs. Returned by FlowStateReader.

type FlowStateReader

type FlowStateReader interface {
	Get(ctx context.Context, id string) (FlowState, error)
}

FlowStateReader is the minimal flowstore surface needed to read runtime_state. FlowManager (declared in executors package) already satisfies this — we use a separate interface here to keep agentic-tools free of a direct dependency on the executors package.

type InMemoryToolCallStore

type InMemoryToolCallStore struct {
	// contains filtered or unexported fields
}

InMemoryToolCallStore provides an in-memory implementation of ToolCallStore for testing purposes.

func NewInMemoryToolCallStore

func NewInMemoryToolCallStore() *InMemoryToolCallStore

NewInMemoryToolCallStore creates a new in-memory store for testing.

func (*InMemoryToolCallStore) Close

func (s *InMemoryToolCallStore) Close() error

Close marks the store as closed.

func (*InMemoryToolCallStore) Records

func (s *InMemoryToolCallStore) Records() []ToolCallRecord

Records returns all stored records. For testing only.

func (*InMemoryToolCallStore) Reset

func (s *InMemoryToolCallStore) Reset()

Reset clears all records and reopens the store. For testing only.

func (*InMemoryToolCallStore) Store

Store adds a record to the in-memory store.

type KVToolCallStore

type KVToolCallStore struct {
	// contains filtered or unexported fields
}

KVToolCallStore implements ToolCallStore using NATS KV. It supports lazy initialization with retry logic and can be reset to allow re-initialization after failures.

func NewKVToolCallStore

func NewKVToolCallStore(client *natsclient.Client, bucketName string) *KVToolCallStore

NewKVToolCallStore creates a new KV-backed tool call store. Initialization is deferred until the first Store call.

func (*KVToolCallStore) Close

func (s *KVToolCallStore) Close() error

Close marks the store as closed. The underlying KV connection is managed by the natsclient and should not be closed here.

func (*KVToolCallStore) Reset

func (s *KVToolCallStore) Reset()

Reset allows re-initialization after a failure. This is useful for testing or recovery scenarios.

func (*KVToolCallStore) Store

func (s *KVToolCallStore) Store(ctx context.Context, record ToolCallRecord) error

Store persists a tool call record to the KV store. The key format is "{call_id}.{timestamp_ns}" to allow multiple records per call ID (e.g., retries).

type LoopKVScanner

type LoopKVScanner interface {
	// Keys returns all current keys in the bucket (deleted keys excluded).
	Keys(ctx context.Context) ([]string, error)
	// Get fetches a single KV entry by exact key.
	Get(ctx context.Context, key string) (*natsclient.KVEntry, error)
}

LoopKVScanner is the minimal KV surface monitor_flow needs: list keys and fetch individual entries. *natsclient.KVStore satisfies this interface by duck-typing. Declared here so unit tests can inject an in-memory fake without pulling in the NATS stack.

type LoopsKVReader

type LoopsKVReader interface {
	Get(ctx context.Context, key string) (*natsclient.KVEntry, error)
}

LoopsKVReader is the minimal surface this executor consumes. It matches natsclient.KVStore.Get exactly, so production callers pass a *KVStore directly and tests supply an in-memory fake. Keeping the interface tiny and typed on natsclient.KVEntry avoids re-adapting jetstream types here — natsclient already owns that translation (see natsclient/kv.go).

type ReadLoopResultExecutor

type ReadLoopResultExecutor struct {
	// contains filtered or unexported fields
}

ReadLoopResultExecutor fetches a completed loop's full Result from the AGENT_LOOPS KV bucket so downstream agents can read another loop's output without having it injected into their prompt. Supports paging (max_bytes, offset) so small-context-window agents can consume large outputs a slice at a time.

func NewReadLoopResultExecutor

func NewReadLoopResultExecutor(kv LoopsKVReader) *ReadLoopResultExecutor

NewReadLoopResultExecutor constructs the executor against a KV reader scoped to the loops bucket (AGENT_LOOPS by default).

func (*ReadLoopResultExecutor) Execute

Execute routes the tool call to the handler.

func (*ReadLoopResultExecutor) ListTools

ListTools describes the read_loop_result tool.

type RecordingExecutor

type RecordingExecutor struct {
	// contains filtered or unexported fields
}

RecordingExecutor wraps a ToolExecutor and records all calls to a store. It uses a buffered channel and background goroutine for non-blocking recording.

func NewRecordingExecutor

func NewRecordingExecutor(wrapped ToolExecutor, store ToolCallStore, logger *slog.Logger) *RecordingExecutor

NewRecordingExecutor creates a new recording executor that wraps the given executor and records all calls to the provided store.

func (*RecordingExecutor) Execute

Execute executes the wrapped tool and records the call

func (*RecordingExecutor) ListTools

func (r *RecordingExecutor) ListTools() []agentic.ToolDefinition

ListTools returns tools from the wrapped executor

func (*RecordingExecutor) Stop

func (r *RecordingExecutor) Stop(timeout time.Duration) error

Stop gracefully shuts down the recording executor, waiting for pending records to be stored up to the specified timeout.

type RegistryInterface

type RegistryInterface interface {
	RegisterWithConfig(component.RegistrationConfig) error
}

RegistryInterface defines the minimal interface needed for registration

type RetryPolicy

type RetryPolicy struct {
	// MaxAttempts is the total number of tries including the first call.
	// 1 means no retry. Values below 1 are clamped to 1.
	MaxAttempts int `json:"max_attempts" schema:"type:int,description:Total attempts including the first call (1 = no retry),default:1"`

	// BackoffInitialMs is the wait before the second attempt, in
	// milliseconds. Subsequent attempts use exponential backoff
	// (initial * 2^(attempt-1)) capped at BackoffMaxMs.
	BackoffInitialMs int `json:"backoff_initial_ms,omitempty" schema:"type:int,description:Initial backoff before retry in milliseconds,default:100"`

	// BackoffMaxMs caps the per-attempt backoff.
	BackoffMaxMs int `json:"backoff_max_ms,omitempty" schema:"type:int,description:Maximum backoff between retries in milliseconds,default:2000"`

	// RetryOnKinds names the ToolErrorKind values that should trigger
	// retry. Defaults to ["timeout", "external", "network"] when unset. Pass an
	// explicit empty list to retry only on raw executor errors (no
	// tool-level error kind considered).
	RetryOnKinds []string `` /* 160-byte string literal not displayed */
}

RetryPolicy controls how a single tool's transient failures are retried inside executeWithTimeout. Zero/empty values are replaced with defaults by the runtime; an all-zero policy still means "no retry" because MaxAttempts defaults to 1.

type ToolCallRecord

type ToolCallRecord struct {
	Call      agentic.ToolCall   `json:"call"`
	Result    agentic.ToolResult `json:"result"`
	StartTime time.Time          `json:"start_time"`
	EndTime   time.Time          `json:"end_time"`
	Duration  time.Duration      `json:"duration"`
}

ToolCallRecord captures a tool call and its result for auditing

type ToolCallStore

type ToolCallStore interface {
	Store(ctx context.Context, record ToolCallRecord) error
	Close() error
}

ToolCallStore interface for persisting tool call records

type ToolCategory

type ToolCategory string

ToolCategory groups tools by their primary function for token optimization and role-based filtering. Inspired by semdragon's category system.

const (
	// CategoryCore contains essential tools available to all agents (submit_work, etc.)
	CategoryCore ToolCategory = "core"

	// CategoryKnowledge contains graph query and search tools.
	CategoryKnowledge ToolCategory = "knowledge"

	// CategoryNetwork contains web search, HTTP request, and external API tools.
	CategoryNetwork ToolCategory = "network"

	// CategoryInspect contains code execution tools (bash, sandbox).
	CategoryInspect ToolCategory = "inspect"

	// CategoryOrchestration contains tools for spawning sub-agents and decomposing tasks.
	CategoryOrchestration ToolCategory = "orchestration"

	// CategoryMeta contains tools for self-programming (create_rule, manage_flow).
	CategoryMeta ToolCategory = "meta"
)

func GetToolCategory

func GetToolCategory(toolName string) ToolCategory

GetToolCategory returns the category for a tool name. Returns CategoryCore as default for unregistered tools.

type ToolDefinition

type ToolDefinition struct {
	Name        string `json:"name"`
	Description string `json:"description"`
	Provider    string `json:"provider"`
	Available   bool   `json:"available"`
}

ToolDefinition represents a tool definition for discovery responses

type ToolExecutor

type ToolExecutor interface {
	Execute(ctx context.Context, call agentic.ToolCall) (agentic.ToolResult, error)
	ListTools() []agentic.ToolDefinition
}

ToolExecutor defines the interface for tool executors

type ToolListResponse

type ToolListResponse struct {
	Tools []ToolDefinition `json:"tools"`
}

ToolListResponse represents the response to a tool.list request

type TriplePublisher

type TriplePublisher interface {
	AddTriple(ctx context.Context, triple message.Triple) error
}

TriplePublisher is the narrow surface DecideExecutor uses to write triples. Production satisfies it with a natsclient.Client adapter; tests use an in-memory recorder so they don't need a real NATS connection.

func NewNATSTriplePublisher

func NewNATSTriplePublisher(client *natsclient.Client) TriplePublisher

NewNATSTriplePublisher builds a TriplePublisher backed by the shared graph.mutation.triple.add NATS surface.

Directories

Path Synopsis
Package executors provides tool executor implementations for the agentic system.
Package executors provides tool executor implementations for the agentic system.
Package sandbox provides an HTTP client for the sandbox server.
Package sandbox provides an HTTP client for the sandbox server.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL