Documentation
¶
Overview ¶
Package dspy is a Go implementation of the DSPy framework for using language models to solve complex tasks through composable steps and prompting techniques.
DSPy-Go provides a collection of modules, optimizers, and tools for building reliable LLM-powered applications. It focuses on making it easy to:
- Break down complex tasks into modular steps
- Optimize prompts and chain-of-thought reasoning
- Build flexible agent-based systems
- Handle common LLM interaction patterns
- Evaluate and improve system performance
Key Components:
Core: Fundamental abstractions like Module, Signature, LLM and Program for defining and executing LLM-based workflows.
Modules: Building blocks for composing LLM workflows:
Predict: Basic prediction module for simple LLM interactions
ChainOfThought: Implements step-by-step reasoning with rationale tracking
ReAct: Implements Reasoning and Acting with tool integration
Refine: Quality improvement through multiple attempts with reward functions and temperature variation
Parallel: Concurrent execution wrapper for batch processing with any module
MultiChainComparison: Compares multiple reasoning attempts and synthesizes holistic evaluation
Optimizers: Tools for improving prompt effectiveness:
BootstrapFewShot: Automatically selects high-quality examples for few-shot learning
MIPRO: Multi-step interactive prompt optimization
Copro: Collaborative prompt optimization
SIMBA: Stochastic Introspective Mini-Batch Ascent with self-analysis
GEPA: Generative Evolutionary Prompt Adaptation with multi-objective Pareto optimization, LLM-based self-reflection, semantic diversity metrics, and elite archive management
TPE: Tree-structured Parzen Estimator for Bayesian optimization
Agents: Advanced patterns for building sophisticated AI systems:
Memory: Different memory implementations for tracking conversation history
Tools: Integration with external tools and APIs, including:
Smart Tool Registry: Intelligent tool selection using Bayesian inference
Performance Tracking: Real-time metrics and reliability scoring
Auto-Discovery: Dynamic tool registration from MCP servers
MCP (Model Context Protocol) support for seamless integrations
Tool Chaining: Sequential execution of tools in pipelines with data transformation
Tool Composition: Combining multiple tools into reusable composite units
Parallel Execution: Advanced parallel tool execution with intelligent scheduling
Dependency Resolution: Automatic execution planning based on tool dependencies
Workflows:
Chain: Sequential execution of steps
Parallel: Concurrent execution of multiple workflow steps
Router: Dynamic routing based on classification
Advanced Patterns: ForEach, While, Until loops with conditional execution
Orchestrator: Flexible task decomposition and execution
Integration with multiple LLM providers:
Anthropic Claude
Google Gemini (with multimodal support)
OpenAI (with flexible configuration for compatible APIs)
Ollama
LlamaCPP
Multimodal Capabilities:
Image Analysis: Analyze and describe images with natural language
Vision Question Answering: Ask specific questions about visual content
Multimodal Chat: Interactive conversations with images
Streaming Multimodal: Real-time processing of multimodal content
Multiple Image Analysis: Compare and analyze multiple images simultaneously
Content Block System: Flexible handling of text, image, and future audio content
Simple Example:
import (
"context"
"fmt"
"log"
"github.com/darwishdev/dspy-go/pkg/core"
"github.com/darwishdev/dspy-go/pkg/llms"
"github.com/darwishdev/dspy-go/pkg/modules"
"github.com/darwishdev/dspy-go/pkg/config"
)
func main() {
// Configure the default LLM
llms.EnsureFactory()
err := config.ConfigureDefaultLLM("your-api-key", core.ModelAnthropicSonnet)
if err != nil {
log.Fatalf("Failed to configure LLM: %v", err)
}
// Create a signature for question answering
signature := core.NewSignature(
[]core.InputField{{Field: core.Field{Name: "question"}}},
[]core.OutputField{{Field: core.Field{Name: "answer"}}},
)
// Create a ChainOfThought module
cot := modules.NewChainOfThought(signature)
// Create a program
program := core.NewProgram(
map[string]core.Module{"cot": cot},
func(ctx context.Context, inputs map[string]interface{}) (map[string]interface{}, error) {
return cot.Process(ctx, inputs)
},
)
// Execute the program
result, err := program.Execute(context.Background(), map[string]interface{}{
"question": "What is the capital of France?",
})
if err != nil {
log.Fatalf("Error executing program: %v", err)
}
fmt.Printf("Answer: %s\n", result["answer"])
}
OpenAI-Compatible API Example:
import (
"context"
"fmt"
"log"
"time"
"github.com/darwishdev/dspy-go/pkg/core"
"github.com/darwishdev/dspy-go/pkg/llms"
"github.com/darwishdev/dspy-go/pkg/modules"
)
func main() {
// Configure LiteLLM (OpenAI-compatible)
llm, err := llms.NewOpenAILLM(core.ModelOpenAIGPT4,
llms.WithAPIKey("your-api-key"),
llms.WithOpenAIBaseURL("http://localhost:4000"),
llms.WithOpenAITimeout(60*time.Second))
if err != nil {
log.Fatalf("Failed to create LiteLLM instance: %v", err)
}
// Or configure LocalAI
localLLM, err := llms.NewOpenAILLM(core.ModelOpenAIGPT4,
llms.WithOpenAIBaseURL("http://localhost:8080"),
llms.WithOpenAIPath("/v1/chat/completions"))
if err != nil {
log.Fatalf("Failed to create LocalAI instance: %v", err)
}
// Or configure custom OpenAI-compatible API
customLLM, err := llms.NewOpenAILLM(core.ModelOpenAIGPT4,
llms.WithAPIKey("custom-key"),
llms.WithOpenAIBaseURL("https://api.custom-provider.com"),
llms.WithHeader("X-Custom-Header", "value"))
if err != nil {
log.Fatalf("Failed to create custom LLM instance: %v", err)
}
// Use any of these LLMs with DSPy-Go modules
signature := core.NewSignature(
[]core.InputField{{Field: core.Field{Name: "question"}}},
[]core.OutputField{{Field: core.Field{Name: "answer"}}},
)
cot := modules.NewChainOfThought(signature)
cot.SetLLM(llm) // Use LiteLLM, LocalAI, or custom provider
result, err := cot.Process(context.Background(), map[string]interface{}{
"question": "What are the benefits of using OpenAI-compatible APIs?",
})
if err != nil {
log.Fatalf("Error processing: %v", err)
}
fmt.Printf("Answer: %s\n", result["answer"])
}
Multimodal Example:
import (
"context"
"fmt"
"log"
"os"
"github.com/darwishdev/dspy-go/pkg/core"
"github.com/darwishdev/dspy-go/pkg/llms"
"github.com/darwishdev/dspy-go/pkg/modules"
)
func main() {
// Create Gemini LLM instance with multimodal support
llm, err := llms.NewGeminiLLM(os.Getenv("GEMINI_API_KEY"), core.ModelGoogleGeminiFlash)
if err != nil {
log.Fatalf("Failed to create Gemini LLM: %v", err)
}
// Define multimodal signature for image analysis
signature := core.NewSignature(
[]core.InputField{
{Field: core.NewImageField("image", core.WithDescription("The image to analyze"))},
{Field: core.NewTextField("question", core.WithDescription("Question about the image"))},
},
[]core.OutputField{
{Field: core.NewTextField("answer", core.WithDescription("Analysis result"))},
},
).WithInstruction("Analyze the provided image and answer the given question about it.")
// Create prediction module
predictor := modules.NewPredict(signature)
predictor.SetLLM(llm)
// Load and process image
imageData, err := os.ReadFile("path/to/image.jpg")
if err != nil {
log.Fatalf("Failed to load image: %v", err)
}
// Execute multimodal prediction
ctx := core.WithExecutionState(context.Background())
result, err := predictor.Process(ctx, map[string]interface{}{
"image": core.NewImageBlock(imageData, "image/jpeg"),
"question": "What objects can you see in this image?",
})
if err != nil {
log.Fatalf("Error processing image: %v", err)
}
fmt.Printf("Answer: %s\n", result["answer"])
}
GEPA Optimizer Example:
import (
"context"
"fmt"
"log"
"github.com/darwishdev/dspy-go/pkg/core"
"github.com/darwishdev/dspy-go/pkg/llms"
"github.com/darwishdev/dspy-go/pkg/modules"
"github.com/darwishdev/dspy-go/pkg/optimizers"
)
func main() {
// Configure LLM
llm, err := llms.NewAnthropicLLM("your-api-key", core.ModelAnthropicSonnet)
if err != nil {
log.Fatalf("Failed to create LLM: %v", err)
}
core.SetDefaultLLM(llm)
// Create program to optimize
signature := core.NewSignature(
[]core.InputField{{Field: core.Field{Name: "question"}}},
[]core.OutputField{{Field: core.Field{Name: "answer"}}},
)
module := modules.NewPredict(signature)
program := core.NewProgram(
map[string]core.Module{"predictor": module},
func(ctx context.Context, inputs map[string]interface{}) (map[string]interface{}, error) {
return module.Process(ctx, inputs)
},
)
// Configure GEPA optimizer with multi-objective optimization
config := &optimizers.GEPAConfig{
PopulationSize: 20, // Population size for evolutionary search
MaxGenerations: 10, // Maximum number of generations
SelectionStrategy: "adaptive_pareto", // Use adaptive Pareto-based selection
MutationRate: 0.3, // Probability of mutation
CrossoverRate: 0.7, // Probability of crossover
ReflectionFreq: 2, // Perform reflection every 2 generations
}
gepa, err := optimizers.NewGEPA(config)
if err != nil {
log.Fatalf("Failed to create GEPA optimizer: %v", err)
}
// Create training dataset
dataset := []core.Example{
{
Inputs: map[string]interface{}{"question": "What is 2+2?"},
Outputs: map[string]interface{}{"answer": "4"},
},
{
Inputs: map[string]interface{}{"question": "What is the capital of Japan?"},
Outputs: map[string]interface{}{"answer": "Tokyo"},
},
// Add more examples...
}
// Define evaluation metric
metric := func(expected, actual map[string]interface{}) float64 {
if expected["answer"] == actual["answer"] {
return 1.0
}
return 0.0
}
// Optimize the program using GEPA's multi-objective evolutionary approach
optimizedProgram, err := gepa.Compile(context.Background(), program, dataset, metric)
if err != nil {
log.Fatalf("GEPA optimization failed: %v", err)
}
// Get optimization results
state := gepa.GetOptimizationState()
fmt.Printf("Optimization completed in %d generations\n", state.CurrentGeneration)
fmt.Printf("Best fitness achieved: %.3f\n", state.BestFitness)
fmt.Printf("Best prompt: %s\n", state.BestCandidate.Instruction)
// Access Pareto archive of elite solutions
archive := state.GetParetoArchive()
fmt.Printf("Elite solutions in archive: %d\n", len(archive))
// Use the optimized program
result, err := optimizedProgram.Execute(context.Background(), map[string]interface{}{
"question": "What is machine learning?",
})
if err != nil {
log.Fatalf("Error executing optimized program: %v", err)
}
fmt.Printf("Optimized Answer: %s\n", result["answer"])
}
Advanced Features:
Tracing and Logging: Detailed tracing and structured logging for debugging and optimization Execution context is tracked and passed through the pipeline for debugging and analysis.
Error Handling: Comprehensive error management with custom error types and centralized handling
Metric-Based Optimization: Improve module performance based on custom evaluation metrics
Smart Tool Management: Intelligent tool selection, performance tracking, auto-discovery, chaining, and composition for building complex tool workflows
Custom Tool Integration: Extend ReAct modules with domain-specific tools
Workflow Retry Logic: Resilient execution with configurable retry mechanisms and backoff strategies
Streaming Support: Process LLM outputs incrementally as they're generated
Text streaming for regular LLM interactions
Multimodal streaming for image analysis and vision tasks
Data Storage: Integration with various storage backends for persistence of examples and results
Dataset Management: Built-in support for downloading and managing datasets like GSM8K and HotPotQA
Arrow Support: Integration with Apache Arrow for efficient data handling and processing
Working with Smart Tool Registry:
import "github.com/darwishdev/dspy-go/pkg/tools"
// Create intelligent tool registry
config := &tools.SmartToolRegistryConfig{
AutoDiscoveryEnabled: true,
PerformanceTrackingEnabled: true,
FallbackEnabled: true,
}
registry := tools.NewSmartToolRegistry(config)
// Register tools
registry.Register(mySearchTool)
registry.Register(myAnalysisTool)
// Intelligent tool selection based on intent
tool, err := registry.SelectBest(ctx, "find user information")
result, err := registry.ExecuteWithTracking(ctx, tool.Name(), params)
Working with Tool Chaining and Composition:
// Tool Chaining - Sequential execution with data transformation
pipeline, err := tools.NewPipelineBuilder("data_processing", registry).
Step("data_extractor").
StepWithTransformer("data_validator", tools.TransformExtractField("result")).
ConditionalStep("data_enricher", tools.ConditionExists("validated")).
StepWithRetries("data_transformer", 3).
FailFast().
EnableCaching().
Build()
result, err := pipeline.Execute(ctx, input)
// Dependency-aware execution with automatic parallelization
graph := tools.NewDependencyGraph()
graph.AddNode(&tools.DependencyNode{
ToolName: "extractor",
Dependencies: []string{},
Outputs: []string{"raw_data"},
})
graph.AddNode(&tools.DependencyNode{
ToolName: "validator",
Dependencies: []string{"extractor"},
Inputs: []string{"raw_data"},
})
depPipeline, err := tools.NewDependencyPipeline("smart_pipeline", registry, graph, options)
result, err := depPipeline.ExecuteWithDependencies(ctx, input)
// Parallel execution with advanced scheduling
executor := tools.NewParallelExecutor(registry, 4)
tasks := []*tools.ParallelTask{
{ID: "task1", ToolName: "analyzer", Input: data1, Priority: 1},
{ID: "task2", ToolName: "processor", Input: data2, Priority: 2},
}
results, err := executor.ExecuteParallel(ctx, tasks, &tools.PriorityScheduler{})
// Tool Composition - Create reusable composite tools
type CompositeTool struct {
name string
pipeline *tools.ToolPipeline
}
textProcessor, err := NewCompositeTool("text_processor", registry,
func(builder *tools.PipelineBuilder) *tools.PipelineBuilder {
return builder.Step("uppercase").Step("reverse").Step("length")
})
// Register and use composite tool like any other tool
registry.Register(textProcessor)
result, err := textProcessor.Execute(ctx, input)
Working with Multimodal Streaming:
// Create multimodal content with text and image
content := []core.ContentBlock{
core.NewTextBlock("Please describe this image in detail:"),
core.NewImageBlock(imageData, "image/jpeg"),
}
// Stream multimodal response
streamResp, err := llm.StreamGenerateWithContent(ctx, content)
if err != nil {
log.Fatalf("Failed to start streaming: %v", err)
}
// Process streaming chunks
for chunk := range streamResp.ChunkChannel {
if chunk.Error != nil {
log.Printf("Streaming error: %v", chunk.Error)
break
}
fmt.Print(chunk.Content)
}
Working with Workflows:
// Chain workflow example
workflow := workflows.NewChainWorkflow(store)
workflow.AddStep(&workflows.Step{
ID: "step1",
Module: modules.NewPredict(signature1),
})
workflow.AddStep(&workflows.Step{
ID: "step2",
Module: modules.NewPredict(signature2),
// Configurable retry logic
RetryConfig: &workflows.RetryConfig{
MaxAttempts: 3,
BackoffMultiplier: 2.0,
},
// Conditional execution
Condition: func(state map[string]interface{}) bool {
return someCondition(state)
},
})
For more examples and detailed documentation, visit: https://github.com/darwishdev/dspy-go
DSPy-Go is released under the MIT License.
Directories
¶
| Path | Synopsis |
|---|---|
|
examples
|
|
|
a2a_composition
command
|
|
|
agents
command
|
|
|
gsm8k
command
|
|
|
hotpotqa
command
|
|
|
mcp_optimizer
command
|
|
|
multi_chain_comparison
command
|
|
|
multimodal
command
|
|
|
others/gepa
command
|
|
|
others/html
command
|
|
|
others/mcp
command
|
|
|
others/mipro
command
|
|
|
others/simba
command
|
|
|
parallel
command
|
|
|
react_agent
command
|
|
|
reactive_simple
command
|
|
|
refine
command
|
|
|
smart_tool_registry
command
|
|
|
tool_chaining
command
|
|
|
tool_composition
command
|
|
|
xml_adapter/basic_usage
command
|
|
|
xml_adapter/composable
command
|
|
|
xml_adapter/custom_config
command
|
|
|
xml_adapter/predict_xml
command
|
|
|
xml_adapter/react_xml
command
|
|
|
internal
|
|
|
pkg
|
|
|
agents/communication
Package a2a implements Google's Agent-to-Agent (a2a) protocol for dspy-go.
|
Package a2a implements Google's Agent-to-Agent (a2a) protocol for dspy-go. |
|
interceptors
Package interceptors provides middleware components for dspy-go modules and agents.
|
Package interceptors provides middleware components for dspy-go modules and agents. |
|
logging
Package logging provides FlightRecorder integration for Go 1.25+.
|
Package logging provides FlightRecorder integration for Go 1.25+. |
|
modules
Package modules provides the Parallel execution wrapper for DSPy-Go.
|
Package modules provides the Parallel execution wrapper for DSPy-Go. |
|
optimizers
Package optimizers implements the GEPA (Generative Evolutionary Prompt Adaptation) algorithm.
|
Package optimizers implements the GEPA (Generative Evolutionary Prompt Adaptation) algorithm. |
|
tools
pkg/tools/func.go
|
pkg/tools/func.go |