Documentation
¶
Overview ¶
Package llmflow provides pipeline architecture for processing LLM interactions with configurable request and response processors.
The llmflow package implements the core processing pipeline that handles LLM requests and responses through a series of configurable processors. It provides the foundation for sophisticated agent workflows by enabling modular processing of authentication, content transformation, function calling, agent transfers, code execution, and more.
Architecture Overview ¶
The package follows a pipeline architecture where LLM requests and responses flow through a series of processors:
┌─────────────┐ ┌──────────────────┐ ┌─────────────┐ ┌───────────────────┐ │ Request │───▶│ Request │───▶│ LLM │───▶│ Response │ │ │ │ Processors │ │ Call │ │ Processors │ └─────────────┘ └──────────────────┘ └─────────────┘ └───────────────────┘
Core Components ¶
The package provides several key components:
- LLMFlow: Base flow that orchestrates the entire pipeline
- Request Processors: Process and modify requests before sending to LLM
- Response Processors: Process and transform responses after receiving from LLM
- Predefined Pipelines: Common processor configurations for different use cases
LLMFlow Base Class ¶
The LLMFlow struct provides the foundation for all LLM processing:
flow := &llmflow.LLMFlow{ RequestProcessors: []types.LLMRequestProcessor{ &BasicLlmRequestProcessor{}, &AuthLLMRequestProcessor{}, &InstructionsLlmRequestProcessor{}, }, ResponseProcessors: []types.LLMResponseProcessor{ &CodeExecutionResponseProcessor{}, }, } // Configure flow flow.WithLogger(logger). WithRequestProcessors(customProcessor). WithResponseProcessors(customResponseProcessor)
Predefined Pipelines ¶
The package provides two main predefined processor pipelines:
## Single Flow Pipeline
For simple LLM interactions without agent transfers:
requestProcessors := llmflow.SingleRequestProcessor() responseProcessors := llmflow.SingleResponseProcessor() // Includes: // - BasicLlmRequestProcessor: Core LLM interaction // - AuthLLMRequestProcessor: Authentication handling // - InstructionsLlmRequestProcessor: System instructions // - IdentityLlmRequestProcessor: Identity management // - ContentLLMRequestProcessor: Content processing // - NLPlanningRequestProcessor: Natural language planning // - CodeExecutionRequestProcessor: Code execution support
## Auto Flow Pipeline
For complex workflows with agent transfer capabilities:
requestProcessors := llmflow.AutoRequestProcessor() responseProcessors := llmflow.AutoResponseProcessor() // Includes all Single Flow processors plus: // - AgentTransferLlmRequestProcessor: Agent transfer support
Request Processors ¶
Request processors modify and enhance LLM requests before sending:
## BasicLlmRequestProcessor
Handles core LLM interaction and model management:
processor := &BasicLlmRequestProcessor{} // Automatically manages model creation, request formatting, and basic error handling
## AuthLLMRequestProcessor
Processes authentication requirements for tools:
processor := &AuthLLMRequestProcessor{} // Handles credential requests, OAuth flows, API key management
## InstructionsLlmRequestProcessor
Manages system instructions and context:
processor := &InstructionsLlmRequestProcessor{} // Applies agent instructions, context-specific prompts, and system messages
## ContentLLMRequestProcessor
Processes and transforms content before sending to LLM:
processor := &ContentLLMRequestProcessor{} // Handles content optimization, artifact management, and context preparation
## CodeExecutionRequestProcessor
Prepares code execution context and optimizes data files:
processor := &CodeExecutionRequestProcessor{} // Handles code block detection, execution environment setup, and data file optimization
## AgentTransferLlmRequestProcessor
Enables agent transfer capabilities:
processor := &AgentTransferLlmRequestProcessor{} // Manages parent/peer agent transfers, hierarchy navigation, and delegation
Response Processors ¶
Response processors handle LLM outputs and execute actions:
## CodeExecutionResponseProcessor
Executes code blocks and handles results:
processor := &CodeExecutionResponseProcessor{} // Detects code blocks, executes them securely, and integrates results
## NLPlanningResponseProcessor
Processes natural language planning responses:
processor := &NLPlanningResponseProcessor{} // Handles planning markup, thought processing, and structured reasoning
Function Calling Integration ¶
The pipeline includes sophisticated function calling support:
// Function calls are automatically handled through the processor pipeline // with parallel execution, proper error handling, and result integration for event, err := range flow.Run(ctx, ictx) { if err != nil { log.Printf("Flow error: %v", err) continue } if event.Actions != nil && len(event.Actions.FunctionCalls) > 0 { // Function calls are automatically executed in parallel // Results are integrated back into the conversation } }
Authentication Flow ¶
Authentication is seamlessly integrated through the auth processor:
// Tools can request credentials during execution tool.RequestCredential("github_token", &types.AuthConfig{ Type: types.AuthTypeOAuth2, ClientID: "your-client-id", Scopes: []string{"repo", "user"}, }) // Auth processor handles the flow automatically: // 1. Detects credential requests // 2. Initiates appropriate auth flow (OAuth2, API Key, etc.) // 3. Stores and manages credentials securely // 4. Provides credentials to tools when needed
Agent Transfer Support ¶
The auto flow supports sophisticated agent transfer:
// Agents can transfer to parent or peer agents event := &types.Event{ Actions: &types.EventActions{ AgentTransfer: &types.AgentTransfer{ Target: "parent", // or specific agent name Reason: "Escalation needed for complex analysis", }, }, } // Transfer processor handles: // - Target agent resolution // - Context preservation // - Conversation continuity // - Proper delegation workflow
Code Execution Pipeline ¶
Code execution is integrated throughout the pipeline:
// Request processor optimizes data files and prepares execution context // Response processor detects and executes code blocks // Example: Python code execution response := `Here's the analysis: '''python import pandas as pd data = pd.read_csv('data.csv') print(data.describe()) '''` // Processor automatically: // 1. Detects code block // 2. Sets up secure execution environment // 3. Executes code with proper isolation // 4. Captures output and integrates into conversation
Error Handling and Retry Logic ¶
The pipeline includes comprehensive error handling:
for event, err := range flow.Run(ctx, ictx) { if err != nil { // Handle different error types if rateLimitErr, ok := err.(*types.RateLimitError); ok { // Wait and retry time.Sleep(rateLimitErr.RetryAfter) continue } if execErr, ok := err.(*types.ExecutionError); ok { // Code execution failed log.Printf("Execution failed after %d attempts: %v", execErr.Attempts, execErr.LastError) } // Other error handling } // Process successful events }
Streaming and Real-time Processing ¶
All processors support streaming for real-time interactions:
for event, err := range flow.Run(ctx, ictx) { if err != nil { continue } // Stream text deltas in real-time if event.TextDelta != "" { fmt.Print(event.TextDelta) } // Handle function calls as they occur if event.Actions != nil && len(event.Actions.FunctionCalls) > 0 { // Process function calls immediately } }
Custom Processor Development ¶
Create custom processors for specialized workflows:
type CustomRequestProcessor struct{} func (p *CustomRequestProcessor) Run(ctx context.Context, ictx *types.InvocationContext, request *types.LLMRequest) iter.Seq2[*types.Event, error] { return func(yield func(*types.Event, error) bool) { // Custom request processing logic // Modify request request.GenerationConfig.Temperature = 0.1 // Add custom system instructions if request.SystemInstruction == nil { request.SystemInstruction = &genai.Content{} } // ... custom logic } } // Integrate into pipeline flow.WithRequestProcessors(&CustomRequestProcessor{})
Integration with Agent System ¶
The flow seamlessly integrates with the agent framework:
agent := agent.NewLLMAgent(ctx, "assistant", agent.WithModel("gemini-1.5-pro"), agent.WithInstruction("You are a helpful assistant"), agent.WithTools(tool1, tool2), ) // Agent automatically uses appropriate flow based on configuration // SingleFlow for simple agents, AutoFlow for complex hierarchical agents
Performance Optimization ¶
The pipeline includes several performance optimizations:
- Parallel function execution with proper synchronization
- Connection pooling for model requests
- Content caching and optimization
- Streaming response processing
- Efficient memory management with object pooling
- Request batching where supported
Security Considerations ¶
The pipeline implements security best practices:
- Secure credential storage and management
- Code execution sandboxing and isolation
- Input validation and sanitization
- Rate limiting and quota management
- Audit logging for sensitive operations
- Proper authentication and authorization
Thread Safety ¶
All processors are designed to be safe for concurrent use across multiple goroutines. The pipeline can handle multiple concurrent requests with proper isolation.
Best Practices ¶
When working with the flow pipeline:
- Use predefined pipelines (Single/Auto) for standard use cases
- Add custom processors only when specific functionality is needed
- Order processors carefully - some depend on others (e.g., content before planning)
- Handle streaming events promptly to avoid blocking
- Implement proper error handling for different error types
- Use appropriate flow type based on agent hierarchy needs
- Consider performance implications of processor ordering
- Test custom processors thoroughly with various input scenarios
Configuration Examples ¶
## Basic LLM Agent Flow
flow := &llmflow.LLMFlow{ RequestProcessors: llmflow.SingleRequestProcessor(), ResponseProcessors: llmflow.SingleResponseProcessor(), }
## Advanced Agent with Custom Processing
customProcessor := &MyCustomProcessor{} flow := &llmflow.LLMFlow{ RequestProcessors: append(llmflow.AutoRequestProcessor(), customProcessor), ResponseProcessors: llmflow.AutoResponseProcessor(), }
## Code-Heavy Workflow
flow := &llmflow.LLMFlow{ RequestProcessors: []types.LLMRequestProcessor{ &BasicLlmRequestProcessor{}, &ContentLLMRequestProcessor{}, &CodeExecutionRequestProcessor{}, }, ResponseProcessors: []types.LLMResponseProcessor{ &CodeExecutionResponseProcessor{}, }, }
The llmflow package provides the essential pipeline infrastructure for building sophisticated AI agent workflows with comprehensive LLM integration capabilities.
Index ¶
- Constants
- Variables
- func AutoRequestProcessor() []types.LLMRequestProcessor
- func AutoResponseProcessor() []types.LLMResponseProcessor
- func FindIterChannel(pattern *regexp.Regexp, text string) <-chan Match
- func FindMatchingFunctionCall(ctx context.Context, events []*types.Event) *types.Event
- func GenerateAuthEvent(ctx context.Context, ictx *types.InvocationContext, ...) (*types.Event, error)
- func GenerateClientFunctioncallID() string
- func GetLongRunningFunctionCalls(ctx context.Context, funcCalls []*genai.FunctionCall, ...) py.Set[string]
- func HandleFunctionCalls(ctx context.Context, ictx *types.InvocationContext, ...) (*types.Event, error)
- func HandleFunctionCallsLive(ctx context.Context, ictx *types.InvocationContext, ...) (*types.Event, error)
- func PopulateClientFunctionCallID(ctx context.Context, modelResponseEvent *types.Event)
- func RemoveClientFunctionCallID(content *genai.Content) *genai.Content
- func SingleRequestProcessor() []types.LLMRequestProcessor
- func SingleResponseProcessor() []types.LLMResponseProcessor
- type AgentTransferLlmRequestProcessor
- type AudioTranscriber
- type AuthLLMRequestProcessor
- type AutoFlow
- type BasicLLMRequestProcessor
- type CodeExecutionRequestProcessor
- type CodeExecutionResponseProcessor
- type ContentLLMRequestProcessor
- type DataFileUtil
- type IdentityLlmRequestProcessor
- type InstructionsLlmRequestProcessor
- type LLMFlow
- func (f *LLMFlow) Run(ctx context.Context, ic *types.InvocationContext) iter.Seq2[*types.Event, error]
- func (f *LLMFlow) RunLive(ctx context.Context, ictx *types.InvocationContext) iter.Seq2[*types.Event, error]
- func (f *LLMFlow) WithLogger(logger *slog.Logger) *LLMFlow
- func (f *LLMFlow) WithRequestProcessors(processors ...types.LLMRequestProcessor) *LLMFlow
- func (f *LLMFlow) WithResponseProcessors(processors ...types.LLMResponseProcessor) *LLMFlow
- type Match
- type NLPlanningRequestProcessor
- type NLPlanningResponseProcessor
- type SingleFlow
Constants ¶
const ( FunctionCallIDPrefix = "adk-" RequestEUCFunctionCallName = "adk_request_credential" )
const DataFileHelperLib = `` /* 1120-byte string literal not displayed */
Variables ¶
var DataFileUtilMap = map[string]*DataFileUtil{
"text/csv": {
Extension: ".csv",
LoaderCodeTemplate: "pd.read_csv('%s')",
},
}
Functions ¶
func AutoRequestProcessor ¶
func AutoRequestProcessor() []types.LLMRequestProcessor
AutoRequestProcessor returns the default types.LLMRequestProcessor for AutoFlow.
func AutoResponseProcessor ¶
func AutoResponseProcessor() []types.LLMResponseProcessor
AutoResponseProcessor returns the default types.LLMResponseProcessor for AutoFlow.
func FindIterChannel ¶
FindIterChannel returns a channel that yields match objects one by one
func FindMatchingFunctionCall ¶
FindMatchingFunctionCall finds the function call event that matches the function response id of the last event.
func GenerateAuthEvent ¶
func GenerateAuthEvent(ctx context.Context, ictx *types.InvocationContext, funcResponseEvent *types.Event) (*types.Event, error)
GenerateAuthEvent generates an authentication event for the given function response event.
func GenerateClientFunctioncallID ¶
func GenerateClientFunctioncallID() string
GenerateClientFunctioncallID generates a unique function call ID for the client.
func GetLongRunningFunctionCalls ¶
func GetLongRunningFunctionCalls(ctx context.Context, funcCalls []*genai.FunctionCall, toolsDict map[string]types.Tool) py.Set[string]
GetLongRunningFunctionCalls returns a set of long-running function call IDs from the given function calls.
func HandleFunctionCalls ¶
func HandleFunctionCalls(ctx context.Context, ictx *types.InvocationContext, functionCallEvent *types.Event, toolsDict map[string]types.Tool, filters py.Set[string]) (*types.Event, error)
HandleFunctionCalls processes function calls asynchronously.
func HandleFunctionCallsLive ¶
func HandleFunctionCallsLive(ctx context.Context, ictx *types.InvocationContext, functionCallEvent *types.Event, toolsDict map[string]types.Tool) (*types.Event, error)
HandleFunctionCallsLive calls the functions and returns the function response event.
func PopulateClientFunctionCallID ¶
PopulateClientFunctionCallID populates the function call ID for each function call in the model response event.
func RemoveClientFunctionCallID ¶
RemoveClientFunctionCallID removes the function call ID for each function call in the model response event.
func SingleRequestProcessor ¶
func SingleRequestProcessor() []types.LLMRequestProcessor
SingleRequestProcessor returns the default types.LLMRequestProcessor for SingleFlow.
func SingleResponseProcessor ¶
func SingleResponseProcessor() []types.LLMResponseProcessor
SingleResponseProcessor returns the default types.LLMResponseProcessor for SingleFlow.
Types ¶
type AgentTransferLlmRequestProcessor ¶
type AgentTransferLlmRequestProcessor struct{}
AgentTransferLlmRequestProcessor represents an agent transfer request processor.
func (*AgentTransferLlmRequestProcessor) Run ¶
func (rp *AgentTransferLlmRequestProcessor) Run(ctx context.Context, ictx *types.InvocationContext, request *types.LLMRequest) iter.Seq2[*types.Event, error]
Run implements [LLMRequestProcessor].
type AudioTranscriber ¶
type AudioTranscriber struct {
// contains filtered or unexported fields
}
AudioTranscriber represents a transcribes audio using Google Cloud Speech-to-Text.
func NewAudioTranscriber ¶
func NewAudioTranscriber(ctx context.Context) (*AudioTranscriber, error)
NewAudioTranscriber creates a new AudioTranscriber instance.
func (*AudioTranscriber) TranscribeFile ¶
func (f *AudioTranscriber) TranscribeFile(ctx context.Context, ictx *types.InvocationContext) ([]*genai.Content, error)
TranscribeFile transcribe audio, bundling consecutive segments from the same speaker.
The ordering of speakers will be preserved. Audio blobs will be merged for the same speaker as much as we can do reduce the transcription latency.
type AuthLLMRequestProcessor ¶
type AuthLLMRequestProcessor struct {
// contains filtered or unexported fields
}
AuthLLMRequestProcessor represents a handles auth information to build the LLM request.
func NewAuthPreprocessor ¶
func NewAuthPreprocessor() *AuthLLMRequestProcessor
NewAuthPreprocessor creates a new authentication *AuthLLMRequestProcessor.
func (*AuthLLMRequestProcessor) Run ¶
func (p *AuthLLMRequestProcessor) Run(ctx context.Context, ictx *types.InvocationContext, request *types.LLMRequest) iter.Seq2[*types.Event, error]
Run implements types.LLMRequestProcessor.
func (*AuthLLMRequestProcessor) WithLogger ¶
func (p *AuthLLMRequestProcessor) WithLogger(logger *slog.Logger) *AuthLLMRequestProcessor
WithLogger sets the logger for the Preprocessor.
type AutoFlow ¶
type AutoFlow struct {
*LLMFlow
}
AutoFlow is SingleFlow with agent transfer capability.
Agent transfer is allowed in the following direction:
- from parent to sub-agent;
- from sub-agent to parent;
- from sub-agent to its peer agents;
For peer-agent transfers, it's only enabled when all below conditions are met:
- The parent agent is also of AutoFlow;
- `disallow_transfer_to_peer` option of this agent is False (default).
Depending on the target agent flow type, the transfer may be automatically reversed. The condition is as below:
- If the flow type of the tranferee agent is also auto, transfee agent will remain as the active agent. The transfee agent will respond to the user's next message directly.
- If the flow type of the transfere agent is not auto, the active agent will be reversed back to previous agent.
TODO(adk-python): allow user to config auto-reverse function.
func NewAutoFlow ¶
func NewAutoFlow() *AutoFlow
NewAutoFlow creates a new AutoFlow with the default types.LLMRequestProcessor and types.LLMResponseProcessor.
type BasicLLMRequestProcessor ¶
type BasicLLMRequestProcessor struct{}
BasicLLMRequestProcessor is a simple implementation of LLMFlow that just passes content to the LLM and returns the response.
func (*BasicLLMRequestProcessor) Run ¶
func (f *BasicLLMRequestProcessor) Run(ctx context.Context, ictx *types.InvocationContext, request *types.LLMRequest) iter.Seq2[*types.Event, error]
Run implements [LLMRequestProcessor].
type CodeExecutionRequestProcessor ¶
type CodeExecutionRequestProcessor struct{}
CodeExecutionRequestProcessor represents a processes code execution requests.
func (*CodeExecutionRequestProcessor) Run ¶
func (p *CodeExecutionRequestProcessor) Run(ctx context.Context, ictx *types.InvocationContext, request *types.LLMRequest) iter.Seq2[*types.Event, error]
Run implements types.LLMRequestProcessor.
type CodeExecutionResponseProcessor ¶
type CodeExecutionResponseProcessor struct{}
CodeExecutionResponseProcessor represents a processes code execution responses.
func (*CodeExecutionResponseProcessor) Run ¶
func (p *CodeExecutionResponseProcessor) Run(ctx context.Context, ictx *types.InvocationContext, response *types.LLMResponse) iter.Seq2[*types.Event, error]
Run implements types.LLMResponseProcessor.
type ContentLLMRequestProcessor ¶
type ContentLLMRequestProcessor struct{}
ContentLLMRequestProcessor builds the contents for the LLM request.
func (*ContentLLMRequestProcessor) Run ¶
func (cp *ContentLLMRequestProcessor) Run(ctx context.Context, ictx *types.InvocationContext, request *types.LLMRequest) iter.Seq2[*types.Event, error]
Run implements [LLMRequestProcessor].
type DataFileUtil ¶
type DataFileUtil struct { // The file extension (e.g., ".csv"). Extension string // The code template to load the data file. LoaderCodeTemplate string }
DataFileUtil represents a structure that contains a data file name and its content.
type IdentityLlmRequestProcessor ¶
type IdentityLlmRequestProcessor struct{}
IdentityLlmRequestProcessor represents a gives the agent identity from the framework.
func (*IdentityLlmRequestProcessor) Run ¶
func (p *IdentityLlmRequestProcessor) Run(ctx context.Context, ictx *types.InvocationContext, request *types.LLMRequest) iter.Seq2[*types.Event, error]
Run implements [LLMRequestProcessor].
type InstructionsLlmRequestProcessor ¶
type InstructionsLlmRequestProcessor struct{}
InstructionsLlmRequestProcessor represents a handles instructions and global instructions for LLM flow.
func (*InstructionsLlmRequestProcessor) Run ¶
func (p *InstructionsLlmRequestProcessor) Run(ctx context.Context, ictx *types.InvocationContext, request *types.LLMRequest) iter.Seq2[*types.Event, error]
Run implements [LLMRequestProcessor].
type LLMFlow ¶
type LLMFlow struct { RequestProcessors []types.LLMRequestProcessor ResponseProcessors []types.LLMResponseProcessor Logger *slog.Logger }
LLMFlow represents a base flow that calls the LLM in a loop until a final response is generated.
This flow ends when it transfer to another agent.
func NewLLMFlow ¶
func NewLLMFlow() *LLMFlow
NewLLMFlow creates a new LLMFlow with the given model and options.
func (*LLMFlow) Run ¶
func (f *LLMFlow) Run(ctx context.Context, ic *types.InvocationContext) iter.Seq2[*types.Event, error]
Run implements [Flow].
func (*LLMFlow) RunLive ¶
func (f *LLMFlow) RunLive(ctx context.Context, ictx *types.InvocationContext) iter.Seq2[*types.Event, error]
RunLive implements [Flow].
TODO(zchee): support OTel tracing.
func (*LLMFlow) WithLogger ¶
WithLogger returns an option that sets the logger for a flow.
func (*LLMFlow) WithRequestProcessors ¶
func (f *LLMFlow) WithRequestProcessors(processors ...types.LLMRequestProcessor) *LLMFlow
WithRequestProcessors adds a request processor to the LLMFlow.
func (*LLMFlow) WithResponseProcessors ¶
func (f *LLMFlow) WithResponseProcessors(processors ...types.LLMResponseProcessor) *LLMFlow
WithResponseProcessors adds a response processor to the LLMFlow.
type NLPlanningRequestProcessor ¶
type NLPlanningRequestProcessor struct{}
NLPlanningRequestProcessor represents a processor for NL planning.
func (*NLPlanningRequestProcessor) Run ¶
func (p *NLPlanningRequestProcessor) Run(ctx context.Context, ictx *types.InvocationContext, request *types.LLMRequest) iter.Seq2[*types.Event, error]
Run implements types.LLMRequestProcessor.
type NLPlanningResponseProcessor ¶
type NLPlanningResponseProcessor struct{}
func (*NLPlanningResponseProcessor) Run ¶
func (p *NLPlanningResponseProcessor) Run(ctx context.Context, ictx *types.InvocationContext, response *types.LLMResponse) iter.Seq2[*types.Event, error]
type SingleFlow ¶
type SingleFlow struct {
*LLMFlow
}
SingleFlow is the LLM flows that handles tools calls.
A single flow only consider an agent itself and tools. No sub-agents are allowed for single flow.
func NewSingleFlow ¶
func NewSingleFlow() *SingleFlow
NewSingleFlow creates a new SingleFlow with the default types.LLMRequestProcessor and types.LLMResponseProcessor.