Documentation
¶
Overview ¶
Package agentictools provides a tool executor processor component that routes tool calls to registered tool executors with filtering and timeout support.
Package agentictools provides the tool execution processor for the SemStreams agentic system.
Overview ¶
The agentic-tools processor executes tool calls from the agentic loop orchestrator. It receives ToolCall messages, dispatches them to registered tool executors, and publishes ToolResult messages back. The processor supports tool registration, allowlist filtering, per-execution timeouts, and concurrent execution.
This processor enables agents to interact with external systems (files, APIs, databases, etc.) through a well-defined tool interface.
Architecture ¶
The tools processor sits between the loop orchestrator and tool implementations:
┌───────────────┐ ┌────────────────┐ ┌──────────────────┐ │ agentic-loop │────▶│ agentic-tools │────▶│ Tool Executors │ │ │ │ (this pkg) │ │ (your code) │ │ │◀────│ │◀────│ │ └───────────────┘ └────────────────┘ └──────────────────┘ tool.execute.* Execute() read_file, query_db, tool.result.* call_api, etc.
ToolExecutor Interface ¶
Tools are implemented by satisfying the ToolExecutor interface:
type ToolExecutor interface {
Execute(ctx context.Context, call agentic.ToolCall) (agentic.ToolResult, error)
ListTools() []agentic.ToolDefinition
}
Example implementation:
type FileReader struct{}
func (f *FileReader) Execute(ctx context.Context, call agentic.ToolCall) (agentic.ToolResult, error) {
path, _ := call.Arguments["path"].(string)
// Respect context cancellation
select {
case <-ctx.Done():
return agentic.ToolResult{CallID: call.ID, Error: "cancelled"}, ctx.Err()
default:
}
content, err := os.ReadFile(path)
if err != nil {
return agentic.ToolResult{
CallID: call.ID,
Error: err.Error(),
}, nil
}
return agentic.ToolResult{
CallID: call.ID,
Content: string(content),
}, nil
}
func (f *FileReader) ListTools() []agentic.ToolDefinition {
return []agentic.ToolDefinition{
{
Name: "read_file",
Description: "Read the contents of a file",
Parameters: map[string]any{
"type": "object",
"properties": map[string]any{
"path": map[string]any{
"type": "string",
"description": "Path to the file",
},
},
"required": []string{"path"},
},
},
}
}
Tool Registration ¶
Tools can be registered in two ways:
1. Global registration via init() (preferred for reusable tools):
func init() {
agentictools.RegisterTool("my_tool", &MyToolExecutor{})
}
Global registration makes tools available to all agentic-tools components automatically. This pattern matches how components and rules are registered.
2. Per-component registration (for component-specific tools):
comp, _ := agentictools.NewComponent(rawConfig, deps)
toolsComp := comp.(*agentictools.Component)
err := toolsComp.RegisterToolExecutor(&FileReader{})
The processor extracts tool names from ListTools() for routing and validation.
ExecutorRegistry ¶
The ExecutorRegistry provides thread-safe tool management:
registry := NewExecutorRegistry()
// Register tools
registry.RegisterTool("read_file", &FileReader{})
registry.RegisterTool("query_db", &DatabaseQuerier{})
// Get executor by name
executor := registry.GetTool("read_file")
// List all available tools
tools := registry.ListTools()
// Execute a tool call
result, err := registry.Execute(ctx, toolCall)
The registry prevents duplicate registrations and returns descriptive errors for missing tools.
Tool Allowlist ¶
The processor supports allowlist filtering for security and control:
config := agentictools.Config{
AllowedTools: []string{"read_file", "list_dir"}, // Only these allowed
// ...
}
Behavior:
- Empty/nil AllowedTools: All registered tools are allowed
- Populated AllowedTools: Only listed tools can execute
- Blocked tools return an error result (not a Go error)
Example blocked response:
result := agentic.ToolResult{
CallID: "call_001",
Error: "tool 'delete_file' is not allowed",
}
Timeout Handling ¶
Each tool execution runs with a configurable timeout:
config := agentictools.Config{
Timeout: "60s", // Per-tool execution timeout
// ...
}
The timeout is enforced via context cancellation. Tool implementations should respect ctx.Done() for proper cancellation:
func (t *SlowTool) Execute(ctx context.Context, call agentic.ToolCall) (agentic.ToolResult, error) {
select {
case <-ctx.Done():
return agentic.ToolResult{CallID: call.ID, Error: "execution cancelled"}, ctx.Err()
case result := <-t.doWork(call):
return result, nil
}
}
Timeout errors are returned as ToolResult.Error, not as Go errors.
Quick Start ¶
Configure and start the processor:
config := agentictools.Config{
StreamName: "AGENT",
AllowedTools: nil, // Allow all
Timeout: "60s",
}
rawConfig, _ := json.Marshal(config)
comp, err := agentictools.NewComponent(rawConfig, deps)
// Register tools
toolsComp := comp.(*agentictools.Component)
toolsComp.RegisterToolExecutor(&FileReader{})
toolsComp.RegisterToolExecutor(&WebFetcher{})
// Start
lc := comp.(component.LifecycleComponent)
lc.Initialize()
lc.Start(ctx)
Configuration Reference ¶
Full configuration schema:
{
"allowed_tools": ["string", ...],
"timeout": "string (default: 60s)",
"stream_name": "string (default: AGENT)",
"consumer_name_suffix": "string (optional)",
"ports": {
"inputs": [...],
"outputs": [...]
}
}
Configuration fields:
- allowed_tools: List of tool names to allow (nil/empty = allow all)
- timeout: Per-tool execution timeout as duration string (default: "60s")
- stream_name: JetStream stream name for agentic messages (default: "AGENT")
- consumer_name_suffix: Optional suffix for JetStream consumer names (for testing)
- ports: Port configuration for inputs and outputs
Ports ¶
Input ports (JetStream consumers):
- tool.execute: Tool execution requests from agentic-loop (subject: tool.execute.>)
Output ports (JetStream publishers):
- tool.result: Tool execution results to agentic-loop (subject: tool.result.*)
Message Flow ¶
The processor handles each tool call through:
- Receive ToolCall from tool.execute.>
- Validate tool is in allowlist (if configured)
- Look up executor in registry
- Create timeout context
- Execute tool with context
- Publish ToolResult to tool.result.{call_id}
- Acknowledge JetStream message
Error Handling ¶
Errors are categorized into two types:
**Tool execution errors** (returned in ToolResult.Error):
- Tool not found in registry
- Tool not in allowlist
- Tool execution failed
- Timeout exceeded
**System errors** (returned as Go error):
- JSON marshaling failures
- NATS publishing failures
Tool execution errors don't fail the loop - the agent can handle them:
if result.Error != "" {
// Agent sees: "Error: file not found"
// Agent can try alternative approach
}
Concurrent Execution ¶
The processor handles concurrent tool calls safely:
- ExecutorRegistry uses RWMutex for thread-safe access
- Each tool call gets its own goroutine
- Context cancellation propagates to all active executions
Multiple tools can execute in parallel when the loop sends concurrent calls:
// agentic-loop sends two tool calls tool.execute.read_file → executes tool.execute.query_db → executes concurrently
Built-in Tools ¶
The package does not include built-in tools - all tools must be registered by the application. This keeps the processor focused and allows full control over available capabilities.
Common tools to implement:
- File operations: read_file, write_file, list_dir
- Web operations: fetch_url, call_api
- Database operations: query, insert, update
- Graph operations: graph_query (query knowledge graph)
Thread Safety ¶
The Component is safe for concurrent use after Start() is called:
- ExecutorRegistry uses RWMutex for all operations
- Tool registration should complete before Start()
- Multiple tool calls can execute concurrently
Testing ¶
For testing, use mock executors and unique consumer names:
type MockExecutor struct {
result agentic.ToolResult
}
func (m *MockExecutor) Execute(ctx context.Context, call agentic.ToolCall) (agentic.ToolResult, error) {
m.result.CallID = call.ID
return m.result, nil
}
func (m *MockExecutor) ListTools() []agentic.ToolDefinition {
return []agentic.ToolDefinition{{Name: "mock_tool"}}
}
// In test
config := agentictools.Config{
StreamName: "AGENT",
ConsumerNameSuffix: "test-" + t.Name(),
}
Limitations ¶
Current limitations:
- No tool versioning (single version per name)
- No tool dependencies or ordering
- No streaming tool output
- No built-in rate limiting per tool
- Timeout is global, not per-tool configurable
See Also ¶
Related packages:
- agentic: Shared types (ToolCall, ToolResult, ToolDefinition)
- processor/agentic-loop: Loop orchestration
- processor/agentic-model: LLM endpoint integration
JetStream Integration ¶
All messaging uses JetStream for durability. Tool call subjects require the AGENT stream to exist with subjects matching tool.execute.> and tool.result.>.
Consumer naming follows the pattern: agentic-tools-{subject-pattern}
Package agentictools provides Prometheus metrics for agentic-tools component.
Index ¶
- Constants
- func ConsumerNameForTool(toolName string) string
- func IsApprovalRequired(reason string) bool
- func ListRegisteredTools() []agentic.ToolDefinition
- func NewComponent(rawConfig json.RawMessage, deps component.Dependencies) (component.Discoverable, error)
- func ReadOnlyCategories() map[ToolCategory]bool
- func Register(registry RegistryInterface) error
- func RegisterTool(name string, executor ToolExecutor) error
- func RegisterToolCategory(toolName string, category ToolCategory)
- type ApprovalFilter
- type Component
- func (c *Component) ConfigSchema() component.ConfigSchema
- func (c *Component) DataFlow() component.FlowMetrics
- func (c *Component) Execute(ctx context.Context, call agentic.ToolCall) (agentic.ToolResult, error)
- func (c *Component) Health() component.HealthStatus
- func (c *Component) Initialize() error
- func (c *Component) InputPorts() []component.Port
- func (c *Component) ListTools() []ToolDefinition
- func (c *Component) Meta() component.Metadata
- func (c *Component) OutputPorts() []component.Port
- func (c *Component) RegisterToolExecutor(executor ToolExecutor) error
- func (c *Component) Start(ctx context.Context) error
- func (c *Component) Stop(timeout time.Duration) error
- type ComponentCatalogExecutor
- type Config
- type DecideExecutor
- type EmitDiagnosisExecutor
- type ExecutorRegistry
- func (r *ExecutorRegistry) Execute(ctx context.Context, call agentic.ToolCall) (agentic.ToolResult, error)
- func (r *ExecutorRegistry) GetTool(name string) ToolExecutor
- func (r *ExecutorRegistry) ListTools() []agentic.ToolDefinition
- func (r *ExecutorRegistry) RegisterTool(name string, executor ToolExecutor) error
- type FlowMonitorExecutor
- type FlowState
- type FlowStateReader
- type InMemoryToolCallStore
- type KVToolCallStore
- type LoopKVScanner
- type LoopsKVReader
- type ReadLoopResultExecutor
- type RecordingExecutor
- type RegistryInterface
- type RetryPolicy
- type ToolCallRecord
- type ToolCallStore
- type ToolCategory
- type ToolDefinition
- type ToolExecutor
- type ToolListResponse
- type TriplePublisher
Constants ¶
const ApprovalRequiredPrefix = "approval_required: "
ApprovalRequiredPrefix is prepended to rejection reasons when a tool requires human approval. The agentic-loop detects this prefix and transitions the loop to LoopStateAwaitingApproval instead of storing a normal error result.
const ComponentCatalogToolName = "list_components"
ComponentCatalogToolName is the tool name agents use to invoke list_components.
const DecideToolName = "decide"
DecideToolName is the name agents use to invoke the coordinator's terminal decision tool.
const EmitDiagnosisToolName = "emit_diagnosis"
EmitDiagnosisToolName is the name agents use to invoke the ops agent's diagnosis emission tool.
const FlowMonitorToolName = "monitor_flow"
FlowMonitorToolName is the tool name agents use to invoke monitor_flow.
const ReadLoopResultToolName = "read_loop_result"
ReadLoopResultToolName is the name agents use to invoke the read_loop_result tool.
Variables ¶
This section is empty.
Functions ¶
func ConsumerNameForTool ¶
ConsumerNameForTool generates a JetStream consumer name for a tool. Sanitizes dots and underscores to dashes, adds "tool-exec-" prefix.
Examples:
file_read → tool-exec-file-read graph.query → tool-exec-graph-query
func IsApprovalRequired ¶
IsApprovalRequired checks whether a rejection reason indicates the tool needs human approval rather than being a genuine error.
func ListRegisteredTools ¶
func ListRegisteredTools() []agentic.ToolDefinition
ListRegisteredTools returns all globally registered tool definitions.
func NewComponent ¶
func NewComponent(rawConfig json.RawMessage, deps component.Dependencies) (component.Discoverable, error)
NewComponent creates a new agentic-tools processor component
func ReadOnlyCategories ¶
func ReadOnlyCategories() map[ToolCategory]bool
ReadOnlyCategories returns the set of categories safe for explore sub-agents (no write operations, no orchestration, no meta-programming).
func Register ¶
func Register(registry RegistryInterface) error
Register registers the agentic-tools processor component with the given registry
func RegisterTool ¶
func RegisterTool(name string, executor ToolExecutor) error
RegisterTool registers a tool executor globally via init(). Thread-safe and can be called from any package's init() function.
Example usage:
func init() {
agentictools.RegisterTool("my_tool", &MyToolExecutor{})
}
func RegisterToolCategory ¶
func RegisterToolCategory(toolName string, category ToolCategory)
RegisterToolCategory registers a category for a tool name. This allows executor implementations to declare their category at registration time.
Types ¶
type ApprovalFilter ¶
type ApprovalFilter struct {
// contains filtered or unexported fields
}
ApprovalFilter implements agentic.ToolCallFilter. It checks each tool call against a configured list of tool names that require human approval. If a tool is in the list, the call is rejected with an ApprovalRequiredPrefix reason so the loop can transition to awaiting_approval.
func NewApprovalFilter ¶
func NewApprovalFilter(approvalRequired []string) *ApprovalFilter
NewApprovalFilter creates a filter from the configured list of tool names that require approval.
func (*ApprovalFilter) FilterToolCalls ¶
func (f *ApprovalFilter) FilterToolCalls(_ string, calls []agentic.ToolCall) (agentic.ToolCallFilterResult, error)
FilterToolCalls checks each call against the configured approval list.
type Component ¶
type Component struct {
// contains filtered or unexported fields
}
Component implements the agentic-tools processor
func (*Component) ConfigSchema ¶
func (c *Component) ConfigSchema() component.ConfigSchema
ConfigSchema returns the configuration schema
func (*Component) DataFlow ¶
func (c *Component) DataFlow() component.FlowMetrics
DataFlow returns current data flow metrics
func (*Component) Health ¶
func (c *Component) Health() component.HealthStatus
Health returns the current health status
func (*Component) Initialize ¶
Initialize prepares the component (no-op for this component)
func (*Component) InputPorts ¶
InputPorts returns configured input port definitions
func (*Component) ListTools ¶
func (c *Component) ListTools() []ToolDefinition
ListTools returns all tool definitions for discovery. Combines tools from both the component's local registry and the global registry.
func (*Component) OutputPorts ¶
OutputPorts returns configured output port definitions
func (*Component) RegisterToolExecutor ¶
func (c *Component) RegisterToolExecutor(executor ToolExecutor) error
RegisterToolExecutor registers a tool executor with the component This method extracts all tools from the executor and registers them individually
type ComponentCatalogExecutor ¶
type ComponentCatalogExecutor struct {
// contains filtered or unexported fields
}
ComponentCatalogExecutor introspects the component factory registry and returns available component types. The optional type_filter arg narrows the response to a single entry, keeping LLM context pressure low for large registries (30+ factory types produce substantial JSON).
func NewComponentCatalogExecutor ¶
func NewComponentCatalogExecutor(registry *component.Registry, logger *slog.Logger) *ComponentCatalogExecutor
NewComponentCatalogExecutor constructs the executor. registry must be non-nil; callers that can't provide one should skip registration entirely (see executors.registerComponentCatalog).
func (*ComponentCatalogExecutor) Execute ¶
func (e *ComponentCatalogExecutor) Execute(ctx context.Context, call agentic.ToolCall) (agentic.ToolResult, error)
Execute routes the tool call to the handler.
func (*ComponentCatalogExecutor) ListTools ¶
func (e *ComponentCatalogExecutor) ListTools() []agentic.ToolDefinition
ListTools describes the list_components tool.
type Config ¶
type Config struct {
Ports *component.PortConfig `json:"ports" schema:"type:ports,description:Port configuration,category:basic"`
StreamName string `` /* 132-byte string literal not displayed */
ConsumerNameSuffix string `` /* 127-byte string literal not displayed */
DeleteConsumerOnStop bool `` /* 157-byte string literal not displayed */
Timeout string `json:"timeout" schema:"type:string,description:Tool execution timeout,category:advanced,default:60s"`
AllowedTools []string `json:"allowed_tools" schema:"type:array,description:List of allowed tools (nil/empty allows all),category:advanced"`
ApprovalRequired []string `` /* 137-byte string literal not displayed */
EnableCategories bool `` /* 150-byte string literal not displayed */
LoopsBucket string `` /* 168-byte string literal not displayed */
// ToolRetries is an opt-in per-tool retry policy. Tools not listed run
// without retries. Use this for tools where transient failures (timeout,
// external 5xx) are worth auto-retrying at the framework layer instead
// of burning LLM iteration budget. Validation-shaped errors
// (invalid_args, not_found) deliberately do NOT retry here by default —
// those need LLM feedback via the agent's iteration loop.
ToolRetries map[string]RetryPolicy `` /* 167-byte string literal not displayed */
}
Config holds configuration for agentic-tools processor component
func DefaultConfig ¶
func DefaultConfig() Config
DefaultConfig returns default configuration for agentic-tools processor
type DecideExecutor ¶
type DecideExecutor struct {
// contains filtered or unexported fields
}
DecideExecutor is the coordinator's terminal tool. A coordinator agent calls decide() exactly once to signal its judgment; on success, the tool publishes a small set of metadata triples onto the coordinator's loop entity so downstream rules match deterministically, and returns StopLoop=true with the full decision payload in Content so downstream agents can fetch any bulky fields (subtopics list, retry hint) via read_loop_result without them riding in triples.
func NewDecideExecutor ¶
func NewDecideExecutor(publisher TriplePublisher, platform types.PlatformMeta) *DecideExecutor
NewDecideExecutor constructs the executor given a triple publisher and the platform identity used to build the coordinator's loop entity ID.
func (*DecideExecutor) Execute ¶
func (e *DecideExecutor) Execute(ctx context.Context, call agentic.ToolCall) (agentic.ToolResult, error)
Execute routes the tool call to decide; any other tool name is treated as a routing bug.
func (*DecideExecutor) ListTools ¶
func (e *DecideExecutor) ListTools() []agentic.ToolDefinition
ListTools describes the decide tool. The action string is NOT enumerated at the tool level — different flows want different terminal actions. The coordinator's system prompt enumerates valid values per flow; downstream rules match on specific action values via the CoordinatorNextAction predicate.
type EmitDiagnosisExecutor ¶
type EmitDiagnosisExecutor struct {
// contains filtered or unexported fields
}
EmitDiagnosisExecutor is the ops agent's finding emission tool. Each call mints a new {org}.{platform}.ops.diagnosis.finding.{uuid} entity and publishes one triple per predicate plus an agent.action.executed_by back-link to the ops loop entity. StopLoop is false so the agent can emit multiple findings per loop before calling submit_work.
func NewEmitDiagnosisExecutor ¶
func NewEmitDiagnosisExecutor(publisher TriplePublisher, platform types.PlatformMeta, logger *slog.Logger) *EmitDiagnosisExecutor
NewEmitDiagnosisExecutor constructs the executor given a triple publisher, the platform identity used to build entity IDs, and a logger for instrumentation.
func (*EmitDiagnosisExecutor) Execute ¶
func (e *EmitDiagnosisExecutor) Execute(ctx context.Context, call agentic.ToolCall) (agentic.ToolResult, error)
Execute routes the tool call to emitDiagnosis; any other name is a routing bug.
func (*EmitDiagnosisExecutor) ListTools ¶
func (e *EmitDiagnosisExecutor) ListTools() []agentic.ToolDefinition
ListTools describes the emit_diagnosis tool schema. The severity enum is enforced during execution (not just in the schema) because small models sometimes emit free-text severity values; the executor clamps invalid values to "info" rather than rejecting the call outright — see Execute.
type ExecutorRegistry ¶
type ExecutorRegistry struct {
// contains filtered or unexported fields
}
ExecutorRegistry manages tool executors and provides thread-safe registration and execution
func GetGlobalRegistry ¶
func GetGlobalRegistry() *ExecutorRegistry
GetGlobalRegistry returns the global tool registry. Used by agentic-tools component to access registered tools.
func NewExecutorRegistry ¶
func NewExecutorRegistry() *ExecutorRegistry
NewExecutorRegistry creates a new empty executor registry
func (*ExecutorRegistry) Execute ¶
func (r *ExecutorRegistry) Execute(ctx context.Context, call agentic.ToolCall) (agentic.ToolResult, error)
Execute executes a tool call using the registered executor Returns an error result if the tool is not found or execution fails
func (*ExecutorRegistry) GetTool ¶
func (r *ExecutorRegistry) GetTool(name string) ToolExecutor
GetTool retrieves a tool executor by name Returns nil if the tool is not registered
func (*ExecutorRegistry) ListTools ¶
func (r *ExecutorRegistry) ListTools() []agentic.ToolDefinition
ListTools returns all registered tool definitions Returns an empty slice (not nil) when no tools are registered
func (*ExecutorRegistry) RegisterTool ¶
func (r *ExecutorRegistry) RegisterTool(name string, executor ToolExecutor) error
RegisterTool registers a tool executor with the given name Returns an error if a tool with the same name is already registered
type FlowMonitorExecutor ¶
type FlowMonitorExecutor struct {
// contains filtered or unexported fields
}
FlowMonitorExecutor aggregates completed-loop data for a given flow. It scans the AGENT_LOOPS KV bucket for COMPLETE_* keys, filters by WorkflowSlug, and returns aggregate counts plus a recency-capped list.
func NewFlowMonitorExecutor ¶
func NewFlowMonitorExecutor(kv LoopKVScanner, flows FlowStateReader, logger *slog.Logger) *FlowMonitorExecutor
NewFlowMonitorExecutor constructs the executor with its KV handle and flow state reader. Both must be non-nil; callers that can't satisfy either should skip registration (see executors.registerFlowMonitor).
func (*FlowMonitorExecutor) Execute ¶
func (e *FlowMonitorExecutor) Execute(ctx context.Context, call agentic.ToolCall) (agentic.ToolResult, error)
Execute routes the tool call to the handler.
func (*FlowMonitorExecutor) ListTools ¶
func (e *FlowMonitorExecutor) ListTools() []agentic.ToolDefinition
ListTools describes the monitor_flow tool.
type FlowState ¶
type FlowState struct {
RuntimeState string
}
FlowState is the minimal projection of flowstore.Flow that monitor_flow needs. Returned by FlowStateReader.
type FlowStateReader ¶
FlowStateReader is the minimal flowstore surface needed to read runtime_state. FlowManager (declared in executors package) already satisfies this — we use a separate interface here to keep agentic-tools free of a direct dependency on the executors package.
type InMemoryToolCallStore ¶
type InMemoryToolCallStore struct {
// contains filtered or unexported fields
}
InMemoryToolCallStore provides an in-memory implementation of ToolCallStore for testing purposes.
func NewInMemoryToolCallStore ¶
func NewInMemoryToolCallStore() *InMemoryToolCallStore
NewInMemoryToolCallStore creates a new in-memory store for testing.
func (*InMemoryToolCallStore) Close ¶
func (s *InMemoryToolCallStore) Close() error
Close marks the store as closed.
func (*InMemoryToolCallStore) Records ¶
func (s *InMemoryToolCallStore) Records() []ToolCallRecord
Records returns all stored records. For testing only.
func (*InMemoryToolCallStore) Reset ¶
func (s *InMemoryToolCallStore) Reset()
Reset clears all records and reopens the store. For testing only.
func (*InMemoryToolCallStore) Store ¶
func (s *InMemoryToolCallStore) Store(_ context.Context, record ToolCallRecord) error
Store adds a record to the in-memory store.
type KVToolCallStore ¶
type KVToolCallStore struct {
// contains filtered or unexported fields
}
KVToolCallStore implements ToolCallStore using NATS KV. It supports lazy initialization with retry logic and can be reset to allow re-initialization after failures.
func NewKVToolCallStore ¶
func NewKVToolCallStore(client *natsclient.Client, bucketName string) *KVToolCallStore
NewKVToolCallStore creates a new KV-backed tool call store. Initialization is deferred until the first Store call.
func (*KVToolCallStore) Close ¶
func (s *KVToolCallStore) Close() error
Close marks the store as closed. The underlying KV connection is managed by the natsclient and should not be closed here.
func (*KVToolCallStore) Reset ¶
func (s *KVToolCallStore) Reset()
Reset allows re-initialization after a failure. This is useful for testing or recovery scenarios.
func (*KVToolCallStore) Store ¶
func (s *KVToolCallStore) Store(ctx context.Context, record ToolCallRecord) error
Store persists a tool call record to the KV store. The key format is "{call_id}.{timestamp_ns}" to allow multiple records per call ID (e.g., retries).
type LoopKVScanner ¶
type LoopKVScanner interface {
// Keys returns all current keys in the bucket (deleted keys excluded).
Keys(ctx context.Context) ([]string, error)
// Get fetches a single KV entry by exact key.
Get(ctx context.Context, key string) (*natsclient.KVEntry, error)
}
LoopKVScanner is the minimal KV surface monitor_flow needs: list keys and fetch individual entries. *natsclient.KVStore satisfies this interface by duck-typing. Declared here so unit tests can inject an in-memory fake without pulling in the NATS stack.
type LoopsKVReader ¶
LoopsKVReader is the minimal surface this executor consumes. It matches natsclient.KVStore.Get exactly, so production callers pass a *KVStore directly and tests supply an in-memory fake. Keeping the interface tiny and typed on natsclient.KVEntry avoids re-adapting jetstream types here — natsclient already owns that translation (see natsclient/kv.go).
type ReadLoopResultExecutor ¶
type ReadLoopResultExecutor struct {
// contains filtered or unexported fields
}
ReadLoopResultExecutor fetches a completed loop's full Result from the AGENT_LOOPS KV bucket so downstream agents can read another loop's output without having it injected into their prompt. Supports paging (max_bytes, offset) so small-context-window agents can consume large outputs a slice at a time.
func NewReadLoopResultExecutor ¶
func NewReadLoopResultExecutor(kv LoopsKVReader) *ReadLoopResultExecutor
NewReadLoopResultExecutor constructs the executor against a KV reader scoped to the loops bucket (AGENT_LOOPS by default).
func (*ReadLoopResultExecutor) Execute ¶
func (e *ReadLoopResultExecutor) Execute(ctx context.Context, call agentic.ToolCall) (agentic.ToolResult, error)
Execute routes the tool call to the handler.
func (*ReadLoopResultExecutor) ListTools ¶
func (e *ReadLoopResultExecutor) ListTools() []agentic.ToolDefinition
ListTools describes the read_loop_result tool.
type RecordingExecutor ¶
type RecordingExecutor struct {
// contains filtered or unexported fields
}
RecordingExecutor wraps a ToolExecutor and records all calls to a store. It uses a buffered channel and background goroutine for non-blocking recording.
func NewRecordingExecutor ¶
func NewRecordingExecutor(wrapped ToolExecutor, store ToolCallStore, logger *slog.Logger) *RecordingExecutor
NewRecordingExecutor creates a new recording executor that wraps the given executor and records all calls to the provided store.
func (*RecordingExecutor) Execute ¶
func (r *RecordingExecutor) Execute(ctx context.Context, call agentic.ToolCall) (agentic.ToolResult, error)
Execute executes the wrapped tool and records the call
func (*RecordingExecutor) ListTools ¶
func (r *RecordingExecutor) ListTools() []agentic.ToolDefinition
ListTools returns tools from the wrapped executor
type RegistryInterface ¶
type RegistryInterface interface {
RegisterWithConfig(component.RegistrationConfig) error
}
RegistryInterface defines the minimal interface needed for registration
type RetryPolicy ¶
type RetryPolicy struct {
// MaxAttempts is the total number of tries including the first call.
// 1 means no retry. Values below 1 are clamped to 1.
MaxAttempts int `json:"max_attempts" schema:"type:int,description:Total attempts including the first call (1 = no retry),default:1"`
// BackoffInitialMs is the wait before the second attempt, in
// milliseconds. Subsequent attempts use exponential backoff
// (initial * 2^(attempt-1)) capped at BackoffMaxMs.
BackoffInitialMs int `json:"backoff_initial_ms,omitempty" schema:"type:int,description:Initial backoff before retry in milliseconds,default:100"`
// BackoffMaxMs caps the per-attempt backoff.
BackoffMaxMs int `json:"backoff_max_ms,omitempty" schema:"type:int,description:Maximum backoff between retries in milliseconds,default:2000"`
// RetryOnKinds names the ToolErrorKind values that should trigger
// retry. Defaults to ["timeout", "external", "network"] when unset. Pass an
// explicit empty list to retry only on raw executor errors (no
// tool-level error kind considered).
RetryOnKinds []string `` /* 160-byte string literal not displayed */
}
RetryPolicy controls how a single tool's transient failures are retried inside executeWithTimeout. Zero/empty values are replaced with defaults by the runtime; an all-zero policy still means "no retry" because MaxAttempts defaults to 1.
type ToolCallRecord ¶
type ToolCallRecord struct {
Call agentic.ToolCall `json:"call"`
Result agentic.ToolResult `json:"result"`
StartTime time.Time `json:"start_time"`
EndTime time.Time `json:"end_time"`
Duration time.Duration `json:"duration"`
}
ToolCallRecord captures a tool call and its result for auditing
type ToolCallStore ¶
type ToolCallStore interface {
Store(ctx context.Context, record ToolCallRecord) error
Close() error
}
ToolCallStore interface for persisting tool call records
type ToolCategory ¶
type ToolCategory string
ToolCategory groups tools by their primary function for token optimization and role-based filtering. Inspired by semdragon's category system.
const ( // CategoryCore contains essential tools available to all agents (submit_work, etc.) CategoryCore ToolCategory = "core" // CategoryKnowledge contains graph query and search tools. CategoryKnowledge ToolCategory = "knowledge" // CategoryNetwork contains web search, HTTP request, and external API tools. CategoryNetwork ToolCategory = "network" // CategoryInspect contains code execution tools (bash, sandbox). CategoryInspect ToolCategory = "inspect" // CategoryOrchestration contains tools for spawning sub-agents and decomposing tasks. CategoryOrchestration ToolCategory = "orchestration" // CategoryMeta contains tools for self-programming (create_rule, manage_flow). CategoryMeta ToolCategory = "meta" )
func GetToolCategory ¶
func GetToolCategory(toolName string) ToolCategory
GetToolCategory returns the category for a tool name. Returns CategoryCore as default for unregistered tools.
type ToolDefinition ¶
type ToolDefinition struct {
Name string `json:"name"`
Description string `json:"description"`
Provider string `json:"provider"`
Available bool `json:"available"`
}
ToolDefinition represents a tool definition for discovery responses
type ToolExecutor ¶
type ToolExecutor interface {
Execute(ctx context.Context, call agentic.ToolCall) (agentic.ToolResult, error)
ListTools() []agentic.ToolDefinition
}
ToolExecutor defines the interface for tool executors
type ToolListResponse ¶
type ToolListResponse struct {
Tools []ToolDefinition `json:"tools"`
}
ToolListResponse represents the response to a tool.list request
type TriplePublisher ¶
TriplePublisher is the narrow surface DecideExecutor uses to write triples. Production satisfies it with a natsclient.Client adapter; tests use an in-memory recorder so they don't need a real NATS connection.
func NewNATSTriplePublisher ¶
func NewNATSTriplePublisher(client *natsclient.Client) TriplePublisher
NewNATSTriplePublisher builds a TriplePublisher backed by the shared graph.mutation.triple.add NATS surface.
Source Files
¶
Directories
¶
| Path | Synopsis |
|---|---|
|
Package executors provides tool executor implementations for the agentic system.
|
Package executors provides tool executor implementations for the agentic system. |
|
Package sandbox provides an HTTP client for the sandbox server.
|
Package sandbox provides an HTTP client for the sandbox server. |