Documentation
¶
Overview ¶
Package agent provides the public interfaces for building agents with Aixgo.
This package exports the core Agent, Message, and Runtime interfaces that external projects need to build custom agents or interact with the Aixgo framework.
Basic Usage ¶
To create a custom agent, implement the Agent interface:
type MyAgent struct {
name string
ready bool
}
func (a *MyAgent) Name() string { return a.name }
func (a *MyAgent) Role() string { return "custom" }
func (a *MyAgent) Ready() bool { return a.ready }
func (a *MyAgent) Start(ctx context.Context) error {
a.ready = true
// Start any background processing
return nil
}
func (a *MyAgent) Execute(ctx context.Context, input *agent.Message) (*agent.Message, error) {
// Process the input and return a response
return agent.NewMessage("response", result), nil
}
func (a *MyAgent) Stop(ctx context.Context) error {
a.ready = false
return nil
}
Runtime Usage ¶
Use aixgo.NewRuntime() to coordinate multiple agents:
rt := aixgo.NewRuntime()
rt.Register(myAgent)
rt.Start(ctx)
// Call an agent synchronously
response, err := rt.Call(ctx, "myagent", input)
// Call multiple agents in parallel
results, errs := rt.CallParallel(ctx, []string{"agent1", "agent2"}, input)
Message Format ¶
Messages are the standard unit of communication between agents:
msg := agent.NewMessage("analysis_request", payload).
WithMetadata("priority", "high").
WithMetadata("source", "api")
See the Aixgo documentation at https://aixgo.dev for more examples and patterns.
Example ¶
Example demonstrates how to use the agent package
package main
import (
"context"
"fmt"
"github.com/aixgo-dev/aixgo/agent"
)
// AnalyzerAgent is an example custom agent for document analysis
type AnalyzerAgent struct {
name string
ready bool
}
func NewAnalyzerAgent(name string) *AnalyzerAgent {
return &AnalyzerAgent{
name: name,
ready: false,
}
}
func (a *AnalyzerAgent) Name() string { return a.name }
func (a *AnalyzerAgent) Role() string { return "analyzer" }
func (a *AnalyzerAgent) Ready() bool { return a.ready }
func (a *AnalyzerAgent) Start(ctx context.Context) error {
a.ready = true
<-ctx.Done()
return nil
}
func (a *AnalyzerAgent) Execute(ctx context.Context, input *agent.Message) (*agent.Message, error) {
type AnalysisRequest struct {
DocumentID string `json:"document_id"`
Content string `json:"content"`
}
var req AnalysisRequest
if err := input.UnmarshalPayload(&req); err != nil {
return nil, fmt.Errorf("invalid request: %w", err)
}
result := map[string]interface{}{
"document_id": req.DocumentID,
"status": "analyzed",
"word_count": len(req.Content),
}
return agent.NewMessage("analysis_result", result), nil
}
func (a *AnalyzerAgent) Stop(ctx context.Context) error {
a.ready = false
return nil
}
func main() {
// Create a runtime
rt := agent.NewLocalRuntime()
// Register custom agents
analyzer := NewAnalyzerAgent("document-analyzer")
_ = rt.Register(analyzer)
// Start the runtime
ctx := context.Background()
analyzer.ready = true // Simulate ready state for example
// Create an analysis request
type Request struct {
DocumentID string `json:"document_id"`
Content string `json:"content"`
}
input := agent.NewMessage("analyze", Request{
DocumentID: "doc-123",
Content: "This is a sample privacy policy document.",
}).WithMetadata("priority", "high")
// Call the analyzer synchronously
response, err := rt.Call(ctx, "document-analyzer", input)
if err != nil {
fmt.Printf("Error: %v\n", err)
return
}
// Process the response
type Result struct {
DocumentID string `json:"document_id"`
Status string `json:"status"`
WordCount int `json:"word_count"`
}
var result Result
if err := response.UnmarshalPayload(&result); err != nil {
fmt.Printf("Error unmarshaling: %v\n", err)
return
}
fmt.Printf("Analysis complete: %s (status: %s, words: %d)\n",
result.DocumentID, result.Status, result.WordCount)
}
Output: Analysis complete: doc-123 (status: analyzed, words: 41)
Example (AsyncCommunication) ¶
Example_asyncCommunication demonstrates asynchronous message passing
package main
import (
"context"
"fmt"
"github.com/aixgo-dev/aixgo/agent"
)
// AnalyzerAgent is an example custom agent for document analysis
type AnalyzerAgent struct {
name string
ready bool
}
func NewAnalyzerAgent(name string) *AnalyzerAgent {
return &AnalyzerAgent{
name: name,
ready: false,
}
}
func (a *AnalyzerAgent) Name() string { return a.name }
func (a *AnalyzerAgent) Role() string { return "analyzer" }
func (a *AnalyzerAgent) Ready() bool { return a.ready }
func (a *AnalyzerAgent) Start(ctx context.Context) error {
a.ready = true
<-ctx.Done()
return nil
}
func (a *AnalyzerAgent) Execute(ctx context.Context, input *agent.Message) (*agent.Message, error) {
type AnalysisRequest struct {
DocumentID string `json:"document_id"`
Content string `json:"content"`
}
var req AnalysisRequest
if err := input.UnmarshalPayload(&req); err != nil {
return nil, fmt.Errorf("invalid request: %w", err)
}
result := map[string]interface{}{
"document_id": req.DocumentID,
"status": "analyzed",
"word_count": len(req.Content),
}
return agent.NewMessage("analysis_result", result), nil
}
func (a *AnalyzerAgent) Stop(ctx context.Context) error {
a.ready = false
return nil
}
func main() {
rt := agent.NewLocalRuntime()
agent1 := NewAnalyzerAgent("agent1")
agent1.ready = true
_ = rt.Register(agent1)
// Get a channel to receive messages
recvCh, _ := rt.Recv("agent1")
// Send a message asynchronously
msg := agent.NewMessage("notification", map[string]string{
"event": "document_updated",
})
_ = rt.Send("agent1", msg)
// Receive and process
received := <-recvCh
fmt.Printf("Received message type: %s\n", received.Type)
}
Output: Received message type: notification
Example (MessageMetadata) ¶
Example_messageMetadata demonstrates metadata usage
package main
import (
"fmt"
"github.com/aixgo-dev/aixgo/agent"
)
func main() {
// Create a message with metadata for tracing and correlation
msg := agent.NewMessage("request", map[string]string{"action": "analyze"}).
WithMetadata("correlation_id", "req-123").
WithMetadata("user_id", "user-456").
WithMetadata("priority", "high").
WithMetadata("source", "api")
// Access metadata
correlationID := msg.GetMetadataString("correlation_id", "")
priority := msg.GetMetadataString("priority", "normal")
fmt.Printf("Processing request %s with priority %s\n", correlationID, priority)
}
Output: Processing request req-123 with priority high
Example (ParallelAnalysis) ¶
Example_parallelAnalysis demonstrates parallel agent execution
package main
import (
"context"
"fmt"
"github.com/aixgo-dev/aixgo/agent"
)
// AnalyzerAgent is an example custom agent for document analysis
type AnalyzerAgent struct {
name string
ready bool
}
func NewAnalyzerAgent(name string) *AnalyzerAgent {
return &AnalyzerAgent{
name: name,
ready: false,
}
}
func (a *AnalyzerAgent) Name() string { return a.name }
func (a *AnalyzerAgent) Role() string { return "analyzer" }
func (a *AnalyzerAgent) Ready() bool { return a.ready }
func (a *AnalyzerAgent) Start(ctx context.Context) error {
a.ready = true
<-ctx.Done()
return nil
}
func (a *AnalyzerAgent) Execute(ctx context.Context, input *agent.Message) (*agent.Message, error) {
type AnalysisRequest struct {
DocumentID string `json:"document_id"`
Content string `json:"content"`
}
var req AnalysisRequest
if err := input.UnmarshalPayload(&req); err != nil {
return nil, fmt.Errorf("invalid request: %w", err)
}
result := map[string]interface{}{
"document_id": req.DocumentID,
"status": "analyzed",
"word_count": len(req.Content),
}
return agent.NewMessage("analysis_result", result), nil
}
func (a *AnalyzerAgent) Stop(ctx context.Context) error {
a.ready = false
return nil
}
func main() {
rt := agent.NewLocalRuntime()
// Register multiple analyzers
_ = rt.Register(NewAnalyzerAgent("syntax-analyzer"))
_ = rt.Register(NewAnalyzerAgent("risk-analyzer"))
_ = rt.Register(NewAnalyzerAgent("compliance-analyzer"))
ctx := context.Background()
// Mark all as ready (in real usage, Start would be called)
for _, name := range []string{"syntax-analyzer", "risk-analyzer", "compliance-analyzer"} {
a, _ := rt.Get(name)
if analyzer, ok := a.(*AnalyzerAgent); ok {
analyzer.ready = true
}
}
// Prepare input
type Request struct {
DocumentID string `json:"document_id"`
Content string `json:"content"`
}
input := agent.NewMessage("analyze", Request{
DocumentID: "doc-456",
Content: "Privacy policy content...",
})
// Call all analyzers in parallel
targets := []string{"syntax-analyzer", "risk-analyzer", "compliance-analyzer"}
results, errors := rt.CallParallel(ctx, targets, input)
// Check results
fmt.Printf("Completed: %d/%d analyzers\n", len(results), len(targets))
if len(errors) > 0 {
fmt.Printf("Errors: %d\n", len(errors))
} else {
fmt.Println("All analyzers completed successfully")
}
}
Output: Completed: 3/3 analyzers All analyzers completed successfully
Index ¶
- type Agent
- type LocalRuntime
- func (r *LocalRuntime) Broadcast(msg *Message) error
- func (r *LocalRuntime) Call(ctx context.Context, target string, input *Message) (*Message, error)
- func (r *LocalRuntime) CallParallel(ctx context.Context, targets []string, input *Message) (map[string]*Message, map[string]error)
- func (r *LocalRuntime) Get(name string) (Agent, error)
- func (r *LocalRuntime) List() []string
- func (r *LocalRuntime) Recv(source string) (<-chan *Message, error)
- func (r *LocalRuntime) Register(agent Agent) error
- func (r *LocalRuntime) Send(target string, msg *Message) error
- func (r *LocalRuntime) Start(ctx context.Context) error
- func (r *LocalRuntime) StartAgentsPhased(ctx context.Context, dependencies map[string][]string) error
- func (r *LocalRuntime) Stop(ctx context.Context) error
- func (r *LocalRuntime) Unregister(name string) error
- type Message
- func (m *Message) Clone() *Message
- func (m *Message) GetMetadata(key string, defaultValue interface{}) interface{}
- func (m *Message) GetMetadataString(key, defaultValue string) string
- func (m *Message) MarshalPayload() []byte
- func (m *Message) String() string
- func (m *Message) UnmarshalPayload(v interface{}) error
- func (m *Message) WithMetadata(key string, value interface{}) *Message
- type Runtime
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Agent ¶
type Agent interface {
// Name returns the unique identifier for this agent instance.
// Agent names must be unique within a Runtime.
Name() string
// Role returns the agent's role type (e.g., "react", "classifier", "supervisor").
// The role determines the agent's behavior and capabilities.
Role() string
// Start initializes the agent and prepares it to receive messages.
// This method is called when the Runtime starts the agent.
// For asynchronous agents, this typically runs a message processing loop.
// The method should block until the context is canceled or the agent encounters a fatal error.
Start(ctx context.Context) error
// Execute processes an input message and returns a response synchronously.
// This method is used by orchestration patterns for direct agent invocation.
// The implementation should be idempotent and thread-safe.
Execute(ctx context.Context, input *Message) (*Message, error)
// Stop gracefully shuts down the agent.
// This method is called when the Runtime stops the agent or when the context is canceled.
// Implementations should clean up resources and ensure all pending operations complete.
Stop(ctx context.Context) error
// Ready returns true if the agent is ready to process messages.
// The Runtime will not invoke Execute on an agent that is not ready.
Ready() bool
}
Agent is the interface that all agents must implement. External packages should implement this interface for custom agents.
Agents support both synchronous (Execute) and asynchronous (Start) execution modes. The Execute method is used for request-response patterns, while Start is used for agents that run continuously and process messages asynchronously.
type LocalRuntime ¶
type LocalRuntime struct {
// contains filtered or unexported fields
}
LocalRuntime is a single-process runtime for agent coordination. It uses in-memory channels for message passing and is suitable for applications that run all agents in a single Go binary.
LocalRuntime is thread-safe and can be used concurrently.
Example ¶
// Create a runtime and register agents
rt := NewLocalRuntime()
agent := NewMockAgent("analyzer", "analysis")
_ = rt.Register(agent)
agent.ready = true
// Call an agent synchronously
ctx := context.Background()
input := NewMessage("analyze", map[string]string{"text": "sample"})
response, _ := rt.Call(ctx, "analyzer", input)
_ = response // Process the response
// Output demonstrates the runtime was used
func NewLocalRuntime ¶
func NewLocalRuntime() *LocalRuntime
NewLocalRuntime creates a new local runtime.
func (*LocalRuntime) Broadcast ¶
func (r *LocalRuntime) Broadcast(msg *Message) error
Broadcast sends a message to all registered agents asynchronously.
func (*LocalRuntime) CallParallel ¶
func (r *LocalRuntime) CallParallel(ctx context.Context, targets []string, input *Message) (map[string]*Message, map[string]error)
CallParallel invokes multiple agents concurrently and returns all results.
func (*LocalRuntime) Get ¶
func (r *LocalRuntime) Get(name string) (Agent, error)
Get retrieves a registered agent by name.
func (*LocalRuntime) List ¶
func (r *LocalRuntime) List() []string
List returns all registered agent names.
func (*LocalRuntime) Recv ¶
func (r *LocalRuntime) Recv(source string) (<-chan *Message, error)
Recv returns a channel to receive messages from an agent.
func (*LocalRuntime) Register ¶
func (r *LocalRuntime) Register(agent Agent) error
Register adds an agent to the runtime.
func (*LocalRuntime) Send ¶
func (r *LocalRuntime) Send(target string, msg *Message) error
Send sends a message to an agent asynchronously.
func (*LocalRuntime) Start ¶
func (r *LocalRuntime) Start(ctx context.Context) error
Start starts all registered agents concurrently and waits for them to be ready. It returns an error if any agent fails to start or doesn't become ready within a reasonable time. All agents are started in parallel for performance, but Start() blocks until all agents report Ready() == true.
func (*LocalRuntime) StartAgentsPhased ¶ added in v0.2.3
func (r *LocalRuntime) StartAgentsPhased(ctx context.Context, dependencies map[string][]string) error
StartAgentsPhased starts all registered agents in dependency order. The dependencies map specifies which agents each agent depends on (agent name -> dependency names). Agents are started in phases based on their dependencies:
- Phase 0: Agents with no dependencies
- Phase N: Agents whose dependencies are all in phases < N
Within each phase, agents are started concurrently and the method waits for all of them to report Ready() before proceeding to the next phase.
This method should be called after all agents are registered and after Start() has been called to initialize the runtime.
func (*LocalRuntime) Stop ¶
func (r *LocalRuntime) Stop(ctx context.Context) error
Stop gracefully shuts down all registered agents.
func (*LocalRuntime) Unregister ¶
func (r *LocalRuntime) Unregister(name string) error
Unregister removes an agent from the runtime.
type Message ¶
type Message struct {
// ID is a unique identifier for this message, automatically generated.
ID string
// Type identifies the message type (e.g., "analysis_request", "analysis_result").
// The type is used by agents to route and process messages appropriately.
Type string
// Payload contains the message data as a JSON string.
// Use UnmarshalPayload to deserialize into a specific type.
Payload string
// Timestamp is the ISO 8601 timestamp when the message was created.
Timestamp string
// Metadata contains optional key-value pairs for routing, tracing, correlation, etc.
Metadata map[string]interface{}
}
Message is the standard message format for agent communication. Messages are used for both synchronous (Call) and asynchronous (Send/Recv) communication.
func NewMessage ¶
NewMessage creates a new message with the given type and payload. The payload is automatically serialized to JSON. A unique ID and timestamp are automatically generated.
Example ¶
Example usage
// Create a message with a structured payload
type AnalysisRequest struct {
DocumentID string `json:"document_id"`
Priority string `json:"priority"`
}
payload := AnalysisRequest{
DocumentID: "doc-123",
Priority: "high",
}
msg := NewMessage("analysis_request", payload).
WithMetadata("source", "api").
WithMetadata("user_id", "user-456")
// Marshal to JSON for inspection
data, _ := json.Marshal(msg)
_ = data // In real code, you'd send this somewhere
// Output demonstrates the message was created
// (actual output would vary due to dynamic ID and timestamp)
func (*Message) Clone ¶
Clone creates a deep copy of the message. This is useful when you need to modify a message without affecting the original.
func (*Message) GetMetadata ¶
GetMetadata retrieves metadata by key, returning the default value if not found.
func (*Message) GetMetadataString ¶
GetMetadataString is a convenience method to get metadata as a string.
func (*Message) MarshalPayload ¶
MarshalPayload is a convenience method that returns the payload as JSON bytes. This is equivalent to []byte(m.Payload).
func (*Message) String ¶
String returns a human-readable representation of the message for debugging.
func (*Message) UnmarshalPayload ¶
UnmarshalPayload deserializes the message payload into the provided value. The value should be a pointer to the desired type.
var req AnalysisRequest
if err := msg.UnmarshalPayload(&req); err != nil {
return err
}
func (*Message) WithMetadata ¶
WithMetadata adds metadata to the message and returns it for chaining. This allows for fluent construction:
msg := NewMessage("request", data).
WithMetadata("priority", "high").
WithMetadata("source", "api")
type Runtime ¶
type Runtime interface {
// Register adds an agent to the runtime.
// Returns an error if an agent with the same name is already registered.
Register(agent Agent) error
// Unregister removes an agent from the runtime.
// Returns an error if the agent is not found.
Unregister(name string) error
// Get retrieves a registered agent by name.
// Returns an error if the agent is not found.
Get(name string) (Agent, error)
// List returns all registered agent names.
List() []string
// Call sends a message to an agent and waits for a synchronous response.
// This is used for request-response patterns and orchestration.
// The target agent's Execute method is invoked.
// Returns an error if the agent is not found, not ready, or execution fails.
Call(ctx context.Context, target string, input *Message) (*Message, error)
// CallParallel invokes multiple agents concurrently and returns all results.
// Execution continues even if some agents fail (partial results are returned).
// Returns a map of successful responses and a map of errors keyed by agent name.
CallParallel(ctx context.Context, targets []string, input *Message) (map[string]*Message, map[string]error)
// Send sends a message to an agent asynchronously without waiting for a response.
// The message is placed in the target agent's message channel.
// Returns an error if the channel is full or the target is not found.
Send(target string, msg *Message) error
// Recv returns a channel to receive messages from an agent.
// This is used for asynchronous message passing patterns.
// The channel is created if it doesn't exist.
Recv(source string) (<-chan *Message, error)
// Broadcast sends a message to all registered agents asynchronously.
// Returns an error if any send operation fails (but continues sending to others).
Broadcast(msg *Message) error
// Start starts the runtime and all registered agents.
// For distributed runtimes, this starts the gRPC server.
// For local runtimes, this starts all agent Start methods.
Start(ctx context.Context) error
// Stop gracefully shuts down the runtime and all registered agents.
// This should clean up all resources and ensure pending operations complete.
Stop(ctx context.Context) error
}
Runtime provides the message passing and coordination infrastructure for agents. It supports both local (single binary) and distributed (gRPC) deployment modes.
The Runtime manages agent lifecycle, message routing, and orchestration patterns. It provides both synchronous (Call) and asynchronous (Send/Recv) communication.