Documentation
¶
Index ¶
- Variables
- type DistributedOption
- type DistributedRuntime
- func (r *DistributedRuntime) Broadcast(msg *agent.Message) error
- func (r *DistributedRuntime) Call(ctx context.Context, target string, input *agent.Message) (*agent.Message, error)
- func (r *DistributedRuntime) CallParallel(ctx context.Context, targets []string, input *agent.Message) (map[string]*agent.Message, map[string]error)
- func (r *DistributedRuntime) CallWithSession(ctx context.Context, target string, input *publicAgent.Message, ...) (*publicAgent.Message, error)
- func (r *DistributedRuntime) Config() RuntimeConfig
- func (r *DistributedRuntime) Connect(name, addr string) error
- func (r *DistributedRuntime) Get(name string) (agent.Agent, error)
- func (r *DistributedRuntime) GetChannelStats(name string) (capacity, length int, err error)
- func (r *DistributedRuntime) List() []string
- func (r *DistributedRuntime) ListenAddr() string
- func (r *DistributedRuntime) MessagesSent() uint64
- func (r *DistributedRuntime) Recv(source string) (<-chan *agent.Message, error)
- func (r *DistributedRuntime) Register(a agent.Agent) error
- func (r *DistributedRuntime) Send(target string, msg *agent.Message) error
- func (r *DistributedRuntime) SessionManager() session.Manager
- func (r *DistributedRuntime) SetSessionManager(sm session.Manager)
- func (r *DistributedRuntime) Start(ctx context.Context) error
- func (r *DistributedRuntime) StartAgentsPhased(ctx context.Context, agentDefs map[string]agent.AgentDef) error
- func (r *DistributedRuntime) Stop(ctx context.Context) error
- func (r *DistributedRuntime) Unregister(name string) error
- type DistributedRuntimeConfig
- type LocalRuntimedeprecated
- func (r *LocalRuntime) Broadcast(msg *agent.Message) error
- func (r *LocalRuntime) Call(ctx context.Context, target string, input *agent.Message) (*agent.Message, error)
- func (r *LocalRuntime) CallParallel(ctx context.Context, targets []string, input *agent.Message) (map[string]*agent.Message, map[string]error)
- func (r *LocalRuntime) Get(name string) (agent.Agent, error)
- func (r *LocalRuntime) GetChannelStats(name string) (capacity, length int, err error)
- func (r *LocalRuntime) List() []string
- func (r *LocalRuntime) Recv(source string) (<-chan *agent.Message, error)
- func (r *LocalRuntime) Register(a agent.Agent) error
- func (r *LocalRuntime) Send(target string, msg *agent.Message) error
- func (r *LocalRuntime) Start(ctx context.Context) error
- func (r *LocalRuntime) StartAgentsPhased(ctx context.Context, agentDefs map[string]agent.AgentDef) error
- func (r *LocalRuntime) Stop(ctx context.Context) error
- func (r *LocalRuntime) Unregister(name string) error
- type Option
- type RuntimeConfig
- type TLSConfig
Constants ¶
This section is empty.
Variables ¶
var ( // ErrAgentNotFound is returned when an agent is not registered ErrAgentNotFound = errors.New("agent not found") // ErrAgentAlreadyRegistered is returned when trying to register an agent with a duplicate name ErrAgentAlreadyRegistered = errors.New("agent already registered") // ErrAgentNotReady is returned when trying to execute an agent that is not ready ErrAgentNotReady = errors.New("agent not ready") // ErrRuntimeNotStarted is returned when trying to use a runtime that hasn't been started ErrRuntimeNotStarted = errors.New("runtime not started") // ErrRuntimeAlreadyStarted is returned when trying to start an already running runtime ErrRuntimeAlreadyStarted = errors.New("runtime already started") )
Functions ¶
This section is empty.
Types ¶
type DistributedOption ¶ added in v0.3.0
type DistributedOption func(*DistributedRuntime)
DistributedOption configures a DistributedRuntime.
func WithDistributedSessionManager ¶ added in v0.3.0
func WithDistributedSessionManager(sm session.Manager) DistributedOption
WithDistributedSessionManager sets the session manager for the distributed runtime.
func WithTLS ¶ added in v0.3.0
func WithTLS(cfg *TLSConfig) DistributedOption
WithTLS configures TLS for secure gRPC connections.
type DistributedRuntime ¶
type DistributedRuntime struct {
// contains filtered or unexported fields
}
DistributedRuntime provides distributed agent execution using gRPC. Agents can run in separate processes or on different machines.
func NewDistributedRuntime ¶
func NewDistributedRuntime(listenAddr string, opts ...any) *DistributedRuntime
NewDistributedRuntime creates a new DistributedRuntime. The listenAddr is the address to listen for incoming gRPC connections (e.g., ":50051"). Use DistributedOption to configure TLS and session management.
func (*DistributedRuntime) Broadcast ¶
func (r *DistributedRuntime) Broadcast(msg *agent.Message) error
Broadcast sends a message to all registered agents asynchronously
func (*DistributedRuntime) Call ¶
func (r *DistributedRuntime) Call(ctx context.Context, target string, input *agent.Message) (*agent.Message, error)
Call invokes an agent synchronously and waits for response
func (*DistributedRuntime) CallParallel ¶
func (r *DistributedRuntime) CallParallel(ctx context.Context, targets []string, input *agent.Message) (map[string]*agent.Message, map[string]error)
CallParallel invokes multiple agents concurrently and returns all results
func (*DistributedRuntime) CallWithSession ¶ added in v0.3.0
func (r *DistributedRuntime) CallWithSession( ctx context.Context, target string, input *publicAgent.Message, sessionID string, ) (*publicAgent.Message, error)
CallWithSession invokes an agent with session context. The input message is appended to the session before execution, and the result is appended after execution.
If the agent implements sessionAwareAgent, it will receive the full conversation history during execution.
func (*DistributedRuntime) Config ¶ added in v0.3.0
func (r *DistributedRuntime) Config() RuntimeConfig
Config returns a copy of the runtime configuration.
func (*DistributedRuntime) Connect ¶
func (r *DistributedRuntime) Connect(name, addr string) error
Connect establishes a connection to a remote agent. Uses TLS if configured, otherwise falls back to insecure connection.
func (*DistributedRuntime) Get ¶
func (r *DistributedRuntime) Get(name string) (agent.Agent, error)
Get retrieves a registered agent by name (local only)
func (*DistributedRuntime) GetChannelStats ¶ added in v0.3.0
func (r *DistributedRuntime) GetChannelStats(name string) (capacity, length int, err error)
GetChannelStats returns statistics for a channel. Returns capacity, current length, and an error if the channel doesn't exist.
func (*DistributedRuntime) List ¶
func (r *DistributedRuntime) List() []string
List returns all registered agent names (local + remote)
func (*DistributedRuntime) ListenAddr ¶ added in v0.3.0
func (r *DistributedRuntime) ListenAddr() string
ListenAddr returns the address the gRPC server is listening on. Returns empty string if server is not started.
func (*DistributedRuntime) MessagesSent ¶ added in v0.3.0
func (r *DistributedRuntime) MessagesSent() uint64
MessagesSent returns the total number of messages sent via Send().
func (*DistributedRuntime) Recv ¶
func (r *DistributedRuntime) Recv(source string) (<-chan *agent.Message, error)
Recv returns a channel to receive messages from a source agent. For local agents, returns the local channel directly. For remote agents, establishes a gRPC streaming connection.
func (*DistributedRuntime) Register ¶
func (r *DistributedRuntime) Register(a agent.Agent) error
Register registers a local agent with the runtime
func (*DistributedRuntime) Send ¶
func (r *DistributedRuntime) Send(target string, msg *agent.Message) error
Send sends a message to a target agent asynchronously
func (*DistributedRuntime) SessionManager ¶ added in v0.3.0
func (r *DistributedRuntime) SessionManager() session.Manager
SessionManager returns the configured session manager.
func (*DistributedRuntime) SetSessionManager ¶ added in v0.3.0
func (r *DistributedRuntime) SetSessionManager(sm session.Manager)
SetSessionManager sets the session manager for this runtime.
func (*DistributedRuntime) Start ¶
func (r *DistributedRuntime) Start(ctx context.Context) error
Start starts the runtime and gRPC server. The gRPC server will listen on the address provided during construction.
func (*DistributedRuntime) StartAgentsPhased ¶ added in v0.2.3
func (r *DistributedRuntime) StartAgentsPhased(ctx context.Context, agentDefs map[string]agent.AgentDef) error
StartAgentsPhased starts all registered LOCAL agents in dependency order. Remote agents are assumed to be already running on their respective nodes.
Agents are started in phases based on their dependencies:
- Phase 0: Agents with no dependencies
- Phase N: Agents whose dependencies are all in phases < N
Within each phase, agents are started concurrently and the method waits for all of them to report Ready() before proceeding to the next phase.
This method should be called after all agents are registered and after Start() has been called to initialize the runtime.
func (*DistributedRuntime) Stop ¶
func (r *DistributedRuntime) Stop(ctx context.Context) error
Stop gracefully shuts down the runtime
func (*DistributedRuntime) Unregister ¶
func (r *DistributedRuntime) Unregister(name string) error
Unregister removes an agent from the runtime
type DistributedRuntimeConfig ¶
type DistributedRuntimeConfig struct {
*RuntimeConfig
ListenAddr string // Address to listen for gRPC connections (e.g., ":50051")
TLS *TLSConfig // TLS configuration for secure connections
}
DistributedRuntimeConfig extends RuntimeConfig with distributed-specific options
type LocalRuntime
deprecated
type LocalRuntime struct {
// contains filtered or unexported fields
}
LocalRuntime provides in-process agent execution using Go channels. All agents run in the same process, ideal for single-binary deployments.
Deprecated: Use aixgo.Runtime instead. LocalRuntime is kept for internal compatibility but will be removed in a future version. The unified Runtime in the main aixgo package provides all features of LocalRuntime plus session support.
func NewLocalRuntime
deprecated
func NewLocalRuntime(opts ...Option) *LocalRuntime
NewLocalRuntime creates a new LocalRuntime with the given options.
Deprecated: Use aixgo.NewRuntime() instead. This function is kept for internal compatibility but will be removed in a future version.
func (*LocalRuntime) Broadcast ¶
func (r *LocalRuntime) Broadcast(msg *agent.Message) error
Broadcast sends a message to all registered agents asynchronously
func (*LocalRuntime) Call ¶
func (r *LocalRuntime) Call(ctx context.Context, target string, input *agent.Message) (*agent.Message, error)
Call invokes an agent synchronously and waits for response
func (*LocalRuntime) CallParallel ¶
func (r *LocalRuntime) CallParallel(ctx context.Context, targets []string, input *agent.Message) (map[string]*agent.Message, map[string]error)
CallParallel invokes multiple agents concurrently and returns all results
func (*LocalRuntime) Get ¶
func (r *LocalRuntime) Get(name string) (agent.Agent, error)
Get retrieves a registered agent by name
func (*LocalRuntime) GetChannelStats ¶
func (r *LocalRuntime) GetChannelStats(name string) (capacity, length int, err error)
GetChannelStats returns statistics for a channel
func (*LocalRuntime) List ¶
func (r *LocalRuntime) List() []string
List returns all registered agent names
func (*LocalRuntime) Recv ¶
func (r *LocalRuntime) Recv(source string) (<-chan *agent.Message, error)
Recv returns a channel to receive messages from a source agent
func (*LocalRuntime) Register ¶
func (r *LocalRuntime) Register(a agent.Agent) error
Register registers an agent with the runtime
func (*LocalRuntime) Send ¶
func (r *LocalRuntime) Send(target string, msg *agent.Message) error
Send sends a message to a target agent asynchronously
func (*LocalRuntime) Start ¶
func (r *LocalRuntime) Start(ctx context.Context) error
Start starts the runtime (no-op for LocalRuntime, agents start individually)
func (*LocalRuntime) StartAgentsPhased ¶ added in v0.2.3
func (r *LocalRuntime) StartAgentsPhased(ctx context.Context, agentDefs map[string]agent.AgentDef) error
StartAgentsPhased starts all registered agents in dependency order. Agents are started in phases based on their dependencies:
- Phase 0: Agents with no dependencies
- Phase N: Agents whose dependencies are all in phases < N
Within each phase, agents are started concurrently and the method waits for all of them to report Ready() before proceeding to the next phase.
This method should be called after all agents are registered and after Start() has been called to initialize the runtime.
func (*LocalRuntime) Stop ¶
func (r *LocalRuntime) Stop(ctx context.Context) error
Stop gracefully shuts down the runtime
func (*LocalRuntime) Unregister ¶
func (r *LocalRuntime) Unregister(name string) error
Unregister removes an agent from the runtime
type Option ¶
type Option func(*RuntimeConfig)
Option is a functional option for configuring a runtime
func WithAgentStartTimeout ¶ added in v0.2.3
WithAgentStartTimeout sets the timeout for waiting for agents to become ready
func WithChannelBufferSize ¶
WithChannelBufferSize sets the channel buffer size for LocalRuntime
func WithMaxConcurrentCalls ¶
WithMaxConcurrentCalls sets the maximum number of concurrent agent calls
func WithMetrics ¶
WithMetrics enables or disables metrics collection
func WithSendTimeout ¶ added in v0.3.0
WithSendTimeout sets the timeout for Send operations
func WithTracing ¶ added in v0.3.0
WithTracing enables or disables OpenTelemetry tracing
type RuntimeConfig ¶
type RuntimeConfig struct {
// ChannelBufferSize sets the buffer size for message channels
// Default: 100
ChannelBufferSize int
// MaxConcurrentCalls limits parallel agent executions (0 = unlimited)
// Default: 0 (unlimited)
MaxConcurrentCalls int
// EnableMetrics enables runtime performance metrics collection
// Default: true
EnableMetrics bool
// EnableTracing enables OpenTelemetry tracing
// Default: false
EnableTracing bool
// AgentStartTimeout is the maximum time to wait for an agent to become ready
// Default: 30 seconds
AgentStartTimeout time.Duration
// SendTimeout is the timeout for Send operations
// Default: 5 seconds
SendTimeout time.Duration
// ChannelFullWarningThreshold triggers a warning when channel utilization exceeds this percentage
// Default: 80
ChannelFullWarningThreshold int
}
RuntimeConfig contains configuration options for creating a runtime
func DefaultConfig ¶
func DefaultConfig() *RuntimeConfig
DefaultConfig returns a RuntimeConfig with sensible defaults
type TLSConfig ¶ added in v0.3.0
type TLSConfig struct {
// Enabled turns on TLS encryption.
Enabled bool
// CertFile is the path to the server certificate.
CertFile string
// KeyFile is the path to the server private key.
KeyFile string
// CAFile is the path to the CA certificate (for mTLS).
CAFile string
// ServerName is used for SNI verification.
ServerName string
// InsecureSkipVerify skips certificate verification (development only).
// Warning: This logs a security warning. Do not use in production.
InsecureSkipVerify bool
// ExternalTLS indicates TLS is handled by a service mesh (Istio, Linkerd, etc.).
// When true, app-level TLS is disabled entirely since the mesh sidecar handles
// encryption. This takes precedence over other TLS settings.
ExternalTLS bool
}
TLSConfig holds TLS configuration for gRPC connections.