agents

package
v0.0.13 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 7, 2025 License: MIT Imports: 14 Imported by: 0

README

How Orchestrator works high level

sequenceDiagram
    participant Client
    participant Orchestrator
    participant Analyzer
    participant TaskParser
    participant PlanCreator
    participant Processor

    Client->>Orchestrator: Process(task, context)
    activate Orchestrator

    Note over Orchestrator,Analyzer: Phase 1: Task Analysis
    Orchestrator->>Analyzer: Analyze task breakdown
    activate Analyzer
    Analyzer-->>Orchestrator: Raw analysis output (XML format)
    deactivate Analyzer

    Note over Orchestrator,TaskParser: Phase 2: Task Parsing
    Orchestrator->>TaskParser: Parse(analyzerOutput)
    activate TaskParser
    TaskParser-->>Orchestrator: Structured Task objects
    deactivate TaskParser

    Note over Orchestrator,PlanCreator: Phase 3: Plan Creation
    Orchestrator->>PlanCreator: CreatePlan(tasks)
    activate PlanCreator
    PlanCreator-->>Orchestrator: Execution phases
    deactivate PlanCreator

    Note over Orchestrator,Processor: Phase 4: Execution
    loop For each phase
        loop For each task in phase (parallel)
            Orchestrator->>Processor: Process(task, context)
            activate Processor
            Processor-->>Orchestrator: Task result
            deactivate Processor
        end
    end

    Orchestrator-->>Client: OrchestratorResult
    deactivate Orchestrator

Task dependency resolution


graph TD
    subgraph Task Structure
        A[Task] --> B[ID]
        A --> C[Type]
        A --> D[ProcessorType]
        A --> E[Dependencies]
        A --> F[Priority]
        A --> G[Metadata]
    end

    subgraph Plan Creation
        H[Input Tasks] --> I[Build Dependency Graph]
        I --> J[Detect Cycles]
        J --> K[Create Phases]
        K --> L[Sort by Priority]
        L --> M[Apply Max Concurrent]
    end

    subgraph Execution
        N[Phase Execution] --> O[Parallel Task Pool]
        O --> P[Process Task 1]
        O --> Q[Process Task 2]
        O --> R[Process Task N]
        P --> S[Collect Results]
        Q --> S
        R --> S
    end

Error handling and retry flow explained

stateDiagram-v2
    [*] --> TaskReceived
    TaskReceived --> Analyzing

    state Analyzing {
        [*] --> AttemptAnalysis
        AttemptAnalysis --> AnalysisSuccess
        AttemptAnalysis --> AnalysisFailure
        AnalysisFailure --> RetryAnalysis: Retry < MaxAttempts
        RetryAnalysis --> AttemptAnalysis
        AnalysisFailure --> AnalysisFailed: Retry >= MaxAttempts
    }

    state Execution {
        [*] --> ExecuteTask
        ExecuteTask --> TaskSuccess
        ExecuteTask --> TaskFailure
        TaskFailure --> RetryTask: Retry < MaxAttempts
        RetryTask --> ExecuteTask
        TaskFailure --> TaskFailed: Retry >= MaxAttempts
    }

    Analyzing --> Execution: Analysis Success
    Analyzing --> [*]: Analysis Failed
    Execution --> [*]: All Tasks Complete/Failed

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Agent

type Agent interface {
	// Execute runs the agent's task with given input and returns output
	Execute(ctx context.Context, input map[string]interface{}) (map[string]interface{}, error)

	// GetCapabilities returns the tools/capabilities available to this agent
	GetCapabilities() []core.Tool

	// GetMemory returns the agent's memory store
	GetMemory() Memory
}

type AnalyzerConfig

type AnalyzerConfig struct {
	// The base instruction for task analysis
	BaseInstruction string
	// Additional formatting instructions specific to the implementation
	FormatInstructions string
	// Any extra considerations for task analysis
	Considerations []string
}

New type to encapsulate analyzer-specific configuration.

type DefaultPlanCreator

type DefaultPlanCreator struct{}

DefaultPlanCreator provides a simple implementation for testing.

func (*DefaultPlanCreator) CreatePlan

func (p *DefaultPlanCreator) CreatePlan(tasks []Task) ([][]Task, error)

type DefaultTaskParser

type DefaultTaskParser struct{}

DefaultTaskParser provides a simple implementation for testing.

func (*DefaultTaskParser) Parse

func (p *DefaultTaskParser) Parse(analyzerOutput map[string]interface{}) ([]Task, error)

type DependencyPlanCreator

type DependencyPlanCreator struct {
	// Optional configuration for planning
	MaxTasksPerPhase int
}

DependencyPlanCreator creates execution plans based on task dependencies.

func NewDependencyPlanCreator

func NewDependencyPlanCreator(maxTasksPerPhase int) *DependencyPlanCreator

func (*DependencyPlanCreator) CreatePlan

func (p *DependencyPlanCreator) CreatePlan(tasks []Task) ([][]Task, error)

type ErrorCode

type ErrorCode int

ErrorCode represents specific error conditions.

const (
	ErrNoTasksSection ErrorCode = iota
	ErrInvalidXML
	ErrMalformedTasks
)

type FlexibleOrchestrator

type FlexibleOrchestrator struct {
	// contains filtered or unexported fields
}

FlexibleOrchestrator coordinates intelligent task decomposition and execution.

func NewFlexibleOrchestrator

func NewFlexibleOrchestrator(memory Memory, config OrchestrationConfig) *FlexibleOrchestrator

NewFlexibleOrchestrator creates a new orchestrator instance.

func (*FlexibleOrchestrator) GetProcessor

func (f *FlexibleOrchestrator) GetProcessor(processorType string) (TaskProcessor, error)

getProcessor returns the registered processor for a task type.

func (*FlexibleOrchestrator) Process

func (f *FlexibleOrchestrator) Process(ctx context.Context, task string, context map[string]interface{}) (*OrchestratorResult, error)

Process handles complete orchestration workflow.

func (*FlexibleOrchestrator) RegisterProcessor

func (f *FlexibleOrchestrator) RegisterProcessor(processorType string, processor TaskProcessor)

RegisterProcessor adds a new task processor.

type InMemoryStore

type InMemoryStore struct {
	// contains filtered or unexported fields
}

Simple in-memory implementation.

func NewInMemoryStore

func NewInMemoryStore() *InMemoryStore

func (*InMemoryStore) Clear

func (s *InMemoryStore) Clear() error

func (*InMemoryStore) Delete

func (s *InMemoryStore) Delete(key string) error

func (*InMemoryStore) List

func (s *InMemoryStore) List() ([]string, error)

func (*InMemoryStore) Retrieve

func (s *InMemoryStore) Retrieve(key string) (interface{}, error)

func (*InMemoryStore) Store

func (s *InMemoryStore) Store(key string, value interface{}) error

type InterceptableAgent

type InterceptableAgent interface {
	Agent

	// ExecuteWithInterceptors runs the agent's task with interceptor support
	ExecuteWithInterceptors(ctx context.Context, input map[string]interface{}, interceptors []core.AgentInterceptor) (map[string]interface{}, error)

	// SetInterceptors sets the default interceptors for this agent instance
	SetInterceptors(interceptors []core.AgentInterceptor)

	// GetInterceptors returns the current interceptors for this agent
	GetInterceptors() []core.AgentInterceptor

	// ClearInterceptors removes all interceptors from this agent
	ClearInterceptors()

	// GetAgentID returns the unique identifier for this agent instance
	GetAgentID() string

	// GetAgentType returns the category/type of this agent
	GetAgentType() string
}

InterceptableAgent extends Agent with interceptor support. This interface provides backward-compatible enhancement for agents that support interceptors.

func WrapAgentWithInterceptors

func WrapAgentWithInterceptors(agent Agent, agentID, agentType string, interceptors ...core.AgentInterceptor) InterceptableAgent

WrapAgentWithInterceptors is a convenience function to wrap any agent with interceptor support.

type InterceptorAgentAdapter

type InterceptorAgentAdapter struct {
	// contains filtered or unexported fields
}

InterceptorAgentAdapter wraps an existing Agent to provide interceptor support. This allows any existing agent to be used with interceptors without modifying its implementation.

func NewInterceptorAgentAdapter

func NewInterceptorAgentAdapter(agent Agent, agentID, agentType string) *InterceptorAgentAdapter

NewInterceptorAgentAdapter creates a new adapter that wraps an existing agent with interceptor support.

func (*InterceptorAgentAdapter) ClearInterceptors

func (iaa *InterceptorAgentAdapter) ClearInterceptors()

ClearInterceptors removes all interceptors from this adapter.

func (*InterceptorAgentAdapter) Execute

func (iaa *InterceptorAgentAdapter) Execute(ctx context.Context, input map[string]interface{}) (map[string]interface{}, error)

Execute implements the basic Agent interface by calling the wrapped agent.

func (*InterceptorAgentAdapter) ExecuteWithInterceptors

func (iaa *InterceptorAgentAdapter) ExecuteWithInterceptors(ctx context.Context, input map[string]interface{}, interceptors []core.AgentInterceptor) (map[string]interface{}, error)

ExecuteWithInterceptors runs the agent's task with interceptor support.

func (*InterceptorAgentAdapter) GetAgentID

func (iaa *InterceptorAgentAdapter) GetAgentID() string

GetAgentID returns the unique identifier for this agent instance.

func (*InterceptorAgentAdapter) GetAgentType

func (iaa *InterceptorAgentAdapter) GetAgentType() string

GetAgentType returns the category/type of this agent.

func (*InterceptorAgentAdapter) GetCapabilities

func (iaa *InterceptorAgentAdapter) GetCapabilities() []core.Tool

GetCapabilities returns the tools/capabilities from the wrapped agent.

func (*InterceptorAgentAdapter) GetInterceptors

func (iaa *InterceptorAgentAdapter) GetInterceptors() []core.AgentInterceptor

GetInterceptors returns the current interceptors for this adapter.

func (*InterceptorAgentAdapter) GetMemory

func (iaa *InterceptorAgentAdapter) GetMemory() Memory

GetMemory returns the memory store from the wrapped agent.

func (*InterceptorAgentAdapter) SetInterceptors

func (iaa *InterceptorAgentAdapter) SetInterceptors(interceptors []core.AgentInterceptor)

SetInterceptors sets the default interceptors for this adapter.

type Memory

type Memory interface {
	// Store saves a value with a given key
	Store(key string, value interface{}) error

	// Retrieve gets a value by key
	Retrieve(key string) (interface{}, error)

	// Delete removes a value by key
	Delete(key string) error

	// List returns all stored keys
	List() ([]string, error)

	// Clear removes all stored values
	Clear() error
}

Memory provides storage capabilities for agents.

type OrchestrationConfig

type OrchestrationConfig struct {
	// MaxConcurrent controls maximum parallel task execution
	MaxConcurrent int

	// DefaultTimeout for task execution
	DefaultTimeout time.Duration

	// RetryConfig specifies retry behavior for failed tasks
	RetryConfig *RetryConfig

	// CustomProcessors maps processor types to implementations
	CustomProcessors map[string]TaskProcessor
	TaskParser       TaskParser
	PlanCreator      PlanCreator

	AnalyzerConfig AnalyzerConfig
	Options        core.Option
}

OrchestrationConfig allows customizing orchestrator behavior.

type OrchestratorResult

type OrchestratorResult struct {
	// CompletedTasks holds results from successful tasks
	CompletedTasks map[string]interface{}

	// FailedTasks contains tasks that could not be completed
	FailedTasks map[string]error

	// Analysis contains orchestrator's task breakdown reasoning
	Analysis string

	// Metadata holds additional orchestration information
	Metadata map[string]interface{}
	// contains filtered or unexported fields
}

OrchestratorResult contains orchestration outputs.

type PlanCreator

type PlanCreator interface {
	// CreatePlan organizes tasks into execution phases
	CreatePlan(tasks []Task) ([][]Task, error)
}

PlanCreator defines how to create an execution plan from tasks.

type RetryConfig

type RetryConfig struct {
	MaxAttempts       int
	BackoffMultiplier float64
}

RetryConfig specifies retry behavior.

type Task

type Task struct {
	// ID uniquely identifies the task
	ID string

	// Type indicates the kind of task
	Type string

	// Metadata holds task-specific information
	Metadata map[string]interface{}

	// Dependencies lists task IDs that must complete before this task
	Dependencies []string

	// Priority indicates task importance (lower number = higher priority)
	Priority int

	// ProcessorType indicates which processor should handle this task
	ProcessorType string
}

Task represents a unit of work identified by the orchestrator.

type TaskParser

type TaskParser interface {
	// Parse converts analyzer output into a slice of tasks
	Parse(analyzerOutput map[string]interface{}) ([]Task, error)
}

TaskParser defines how to parse tasks from analyzer output.

type TaskProcessor

type TaskProcessor interface {
	// Process handles a single task execution
	Process(ctx context.Context, task Task, taskContext map[string]interface{}) (interface{}, error)
}

TaskProcessor defines how to process individual tasks.

type XMLError

type XMLError struct {
	Code    ErrorCode
	Message string
	Details map[string]interface{}
}

XMLError represents structured errors during XML processing.

func (*XMLError) Error

func (e *XMLError) Error() string

type XMLMetadata

type XMLMetadata struct {
	Items []XMLMetadataItem `xml:"item"`
}

type XMLMetadataItem

type XMLMetadataItem struct {
	Key   string `xml:"key,attr"`
	Value string `xml:",chardata"`
}

type XMLNormalizer

type XMLNormalizer struct {
	// contains filtered or unexported fields
}

XMLNormalizer handles cleaning and standardization of XML content.

func NewXMLNormalizer

func NewXMLNormalizer() *XMLNormalizer

NewXMLNormalizer creates a new normalizer with compiled regex patterns.

func (*XMLNormalizer) NormalizeXML

func (n *XMLNormalizer) NormalizeXML(content string) (string, error)

NormalizeXML cleans and standardizes XML content for parsing.

type XMLTask

type XMLTask struct {
	XMLName       xml.Name    `xml:"task"`
	ID            string      `xml:"id,attr"`
	Type          string      `xml:"type,attr"`      // Make sure this maps to the type attribute
	ProcessorType string      `xml:"processor,attr"` // Make sure this maps to the processor attribute
	Priority      int         `xml:"priority,attr"`
	Description   string      `xml:"description"`
	Dependencies  []string    `xml:"dependencies>dep"` // This maps to the <dependencies><dep>...</dep></dependencies> structure
	Metadata      XMLMetadata `xml:"metadata"`
}

type XMLTaskParser

type XMLTaskParser struct {
	// Configuration for XML parsing
	RequiredFields []string
}

func (*XMLTaskParser) Parse

func (p *XMLTaskParser) Parse(analyzerOutput map[string]interface{}) ([]Task, error)

Directories

Path Synopsis
Package a2a implements Google's Agent-to-Agent (a2a) protocol for dspy-go.
Package a2a implements Google's Agent-to-Agent (a2a) protocol for dspy-go.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL