agents

package
v0.0.0-...-ba96703 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 17, 2025 License: MIT Imports: 14 Imported by: 0

README

How Orchestrator works high level

sequenceDiagram
    participant Client
    participant Orchestrator
    participant Analyzer
    participant TaskParser
    participant PlanCreator
    participant Processor

    Client->>Orchestrator: Process(task, context)
    activate Orchestrator
    
    Note over Orchestrator,Analyzer: Phase 1: Task Analysis
    Orchestrator->>Analyzer: Analyze task breakdown
    activate Analyzer
    Analyzer-->>Orchestrator: Raw analysis output (XML format)
    deactivate Analyzer
    
    Note over Orchestrator,TaskParser: Phase 2: Task Parsing
    Orchestrator->>TaskParser: Parse(analyzerOutput)
    activate TaskParser
    TaskParser-->>Orchestrator: Structured Task objects
    deactivate TaskParser
    
    Note over Orchestrator,PlanCreator: Phase 3: Plan Creation
    Orchestrator->>PlanCreator: CreatePlan(tasks)
    activate PlanCreator
    PlanCreator-->>Orchestrator: Execution phases
    deactivate PlanCreator
    
    Note over Orchestrator,Processor: Phase 4: Execution
    loop For each phase
        loop For each task in phase (parallel)
            Orchestrator->>Processor: Process(task, context)
            activate Processor
            Processor-->>Orchestrator: Task result
            deactivate Processor
        end
    end
    
    Orchestrator-->>Client: OrchestratorResult
    deactivate Orchestrator

Task dependency resolution


graph TD
    subgraph Task Structure
        A[Task] --> B[ID]
        A --> C[Type]
        A --> D[ProcessorType]
        A --> E[Dependencies]
        A --> F[Priority]
        A --> G[Metadata]
    end

    subgraph Plan Creation
        H[Input Tasks] --> I[Build Dependency Graph]
        I --> J[Detect Cycles]
        J --> K[Create Phases]
        K --> L[Sort by Priority]
        L --> M[Apply Max Concurrent]
    end

    subgraph Execution
        N[Phase Execution] --> O[Parallel Task Pool]
        O --> P[Process Task 1]
        O --> Q[Process Task 2]
        O --> R[Process Task N]
        P --> S[Collect Results]
        Q --> S
        R --> S
    end

Error handling and retry flow explained

stateDiagram-v2
    [*] --> TaskReceived
    TaskReceived --> Analyzing
    
    state Analyzing {
        [*] --> AttemptAnalysis
        AttemptAnalysis --> AnalysisSuccess
        AttemptAnalysis --> AnalysisFailure
        AnalysisFailure --> RetryAnalysis: Retry < MaxAttempts
        RetryAnalysis --> AttemptAnalysis
        AnalysisFailure --> AnalysisFailed: Retry >= MaxAttempts
    }
    
    state Execution {
        [*] --> ExecuteTask
        ExecuteTask --> TaskSuccess
        ExecuteTask --> TaskFailure
        TaskFailure --> RetryTask: Retry < MaxAttempts
        RetryTask --> ExecuteTask
        TaskFailure --> TaskFailed: Retry >= MaxAttempts
    }
    
    Analyzing --> Execution: Analysis Success
    Analyzing --> [*]: Analysis Failed
    Execution --> [*]: All Tasks Complete/Failed

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Agent

type Agent interface {
	// Execute runs the agent's task with given input and returns output
	Execute(ctx context.Context, input map[string]any) (map[string]any, error)

	// GetCapabilities returns the tools/capabilities available to this agent
	GetCapabilities() []core.Tool

	// GetMemory returns the agent's memory store
	GetMemory() Memory
}

type AnalyzerConfig

type AnalyzerConfig struct {
	// The base instruction for task analysis
	BaseInstruction string
	// Additional formatting instructions specific to the implementation
	FormatInstructions string
	// Any extra considerations for task analysis
	Considerations []string
}

New type to encapsulate analyzer-specific configuration.

type DefaultPlanCreator

type DefaultPlanCreator struct{}

DefaultPlanCreator provides a simple implementation for testing.

func (*DefaultPlanCreator) CreatePlan

func (p *DefaultPlanCreator) CreatePlan(tasks []Task) ([][]Task, error)

type DefaultTaskParser

type DefaultTaskParser struct{}

DefaultTaskParser provides a simple implementation for testing.

func (*DefaultTaskParser) Parse

func (p *DefaultTaskParser) Parse(analyzerOutput map[string]any) ([]Task, error)

type DependencyPlanCreator

type DependencyPlanCreator struct {
	// Optional configuration for planning
	MaxTasksPerPhase int
}

DependencyPlanCreator creates execution plans based on task dependencies.

func NewDependencyPlanCreator

func NewDependencyPlanCreator(maxTasksPerPhase int) *DependencyPlanCreator

func (*DependencyPlanCreator) CreatePlan

func (p *DependencyPlanCreator) CreatePlan(tasks []Task) ([][]Task, error)

type ErrorCode

type ErrorCode int

ErrorCode represents specific error conditions.

const (
	ErrNoTasksSection ErrorCode = iota
	ErrInvalidXML
	ErrMalformedTasks
)

type FlexibleOrchestrator

type FlexibleOrchestrator struct {
	Config *core.DSPYConfig
	// contains filtered or unexported fields
}

FlexibleOrchestrator coordinates intelligent task decomposition and execution.

func NewFlexibleOrchestrator

func NewFlexibleOrchestrator(memory Memory, config OrchestrationConfig, dspyConfig *core.DSPYConfig) *FlexibleOrchestrator

NewFlexibleOrchestrator creates a new orchestrator instance.

func (*FlexibleOrchestrator) GetProcessor

func (f *FlexibleOrchestrator) GetProcessor(processorType string) (TaskProcessor, error)

getProcessor returns the registered processor for a task type.

func (*FlexibleOrchestrator) Process

func (f *FlexibleOrchestrator) Process(ctx context.Context, task string, context map[string]any) (*OrchestratorResult, error)

Process handles complete orchestration workflow.

func (*FlexibleOrchestrator) RegisterProcessor

func (f *FlexibleOrchestrator) RegisterProcessor(processorType string, processor TaskProcessor)

RegisterProcessor adds a new task processor.

type InMemoryStore

type InMemoryStore struct {
	// contains filtered or unexported fields
}

Simple in-memory implementation.

func NewInMemoryStore

func NewInMemoryStore() *InMemoryStore

func (*InMemoryStore) CleanExpired

func (s *InMemoryStore) CleanExpired(ctx context.Context) (int64, error)

CleanExpired removes expired entries and returns count of removed items

func (*InMemoryStore) Clear

func (s *InMemoryStore) Clear() error

func (*InMemoryStore) Close

func (s *InMemoryStore) Close() error

Close is a no-op for InMemoryStore

func (*InMemoryStore) List

func (s *InMemoryStore) List() ([]string, error)

func (*InMemoryStore) Retrieve

func (s *InMemoryStore) Retrieve(key string) (any, error)

func (*InMemoryStore) Store

func (s *InMemoryStore) Store(key string, value any, opts ...StoreOption) error

type Memory

type Memory interface {
	// Store saves a value with a given key and optional TTL settings
	Store(key string, value any, opts ...StoreOption) error

	// Retrieve gets a value by key
	Retrieve(key string) (any, error)

	// List returns all stored keys
	List() ([]string, error)

	// Clear removes all stored values
	Clear() error

	// CleanExpired removes all expired entries
	CleanExpired(ctx context.Context) (int64, error)

	// Close releases resources
	Close() error
}

Memory provides storage capabilities for agents.

type OrchestrationConfig

type OrchestrationConfig struct {
	// MaxConcurrent controls maximum parallel task execution
	MaxConcurrent int

	// DefaultTimeout for task execution
	DefaultTimeout time.Duration

	// RetryConfig specifies retry behavior for failed tasks
	RetryConfig *RetryConfig

	// CustomProcessors maps processor types to implementations
	CustomProcessors map[string]TaskProcessor
	TaskParser       TaskParser
	PlanCreator      PlanCreator

	AnalyzerConfig AnalyzerConfig
	Options        core.Option
}

OrchestrationConfig allows customizing orchestrator behavior.

type OrchestratorResult

type OrchestratorResult struct {
	// CompletedTasks holds results from successful tasks
	CompletedTasks map[string]any

	// FailedTasks contains tasks that could not be completed
	FailedTasks map[string]error

	// Analysis contains orchestrator's task breakdown reasoning
	Analysis string

	// Metadata holds additional orchestration information
	Metadata map[string]any
	// contains filtered or unexported fields
}

OrchestratorResult contains orchestration outputs.

type PlanCreator

type PlanCreator interface {
	// CreatePlan organizes tasks into execution phases
	CreatePlan(tasks []Task) ([][]Task, error)
}

PlanCreator defines how to create an execution plan from tasks.

type RetryConfig

type RetryConfig struct {
	MaxAttempts       int
	BackoffMultiplier float64
}

RetryConfig specifies retry behavior.

type StoreOption

type StoreOption func(*StoreOptions)

StoreOption defines options for Store operations

func WithTTL

func WithTTL(ttl time.Duration) StoreOption

WithTTL creates an option to set a TTL for a stored value

type StoreOptions

type StoreOptions struct {
	TTL time.Duration
}

StoreOptions contains configuration for Store operations

type Task

type Task struct {
	// ID uniquely identifies the task
	ID string

	// Type indicates the kind of task
	Type string

	// Metadata holds task-specific information
	Metadata map[string]any

	// Dependencies lists task IDs that must complete before this task
	Dependencies []string

	// Priority indicates task importance (lower number = higher priority)
	Priority int

	// ProcessorType indicates which processor should handle this task
	ProcessorType string
}

Task represents a unit of work identified by the orchestrator.

type TaskParser

type TaskParser interface {
	// Parse converts analyzer output into a slice of tasks
	Parse(analyzerOutput map[string]any) ([]Task, error)
}

TaskParser defines how to parse tasks from analyzer output.

type TaskProcessor

type TaskProcessor interface {
	// Process handles a single task execution
	Process(ctx context.Context, task Task, taskContext map[string]any) (any, error)
}

TaskProcessor defines how to process individual tasks.

type XMLError

type XMLError struct {
	Code    ErrorCode
	Message string
	Details map[string]any
}

XMLError represents structured errors during XML processing.

func (*XMLError) Error

func (e *XMLError) Error() string

type XMLMetadata

type XMLMetadata struct {
	Items []XMLMetadataItem `xml:"item"`
}

type XMLMetadataItem

type XMLMetadataItem struct {
	Key   string `xml:"key,attr"`
	Value string `xml:",chardata"`
}

type XMLNormalizer

type XMLNormalizer struct {
	// contains filtered or unexported fields
}

XMLNormalizer handles cleaning and standardization of XML content.

func NewXMLNormalizer

func NewXMLNormalizer() *XMLNormalizer

NewXMLNormalizer creates a new normalizer with compiled regex patterns.

func (*XMLNormalizer) NormalizeXML

func (n *XMLNormalizer) NormalizeXML(content string) (string, error)

NormalizeXML cleans and standardizes XML content for parsing.

type XMLTask

type XMLTask struct {
	XMLName       xml.Name    `xml:"task"`
	ID            string      `xml:"id,attr"`
	Type          string      `xml:"type,attr"`      // Make sure this maps to the type attribute
	ProcessorType string      `xml:"processor,attr"` // Make sure this maps to the processor attribute
	Priority      int         `xml:"priority,attr"`
	Description   string      `xml:"description"`
	Dependencies  []string    `xml:"dependencies>dep"` // This maps to the <dependencies><dep>...</dep></dependencies> structure
	Metadata      XMLMetadata `xml:"metadata"`
}

type XMLTaskParser

type XMLTaskParser struct {
	// Configuration for XML parsing
	RequiredFields []string
}

func (*XMLTaskParser) Parse

func (p *XMLTaskParser) Parse(analyzerOutput map[string]any) ([]Task, error)

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL