distributed

package
v0.4.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 28, 2025 License: Apache-2.0 Imports: 15 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// DefaultMaxConcurrency 默认最大并发数
	DefaultMaxConcurrency = 100
)

Variables

View Source
var (
	// ErrCircuitOpen is returned when the circuit is open
	ErrCircuitOpen = errors.New("circuit breaker is open")
	// ErrTooManyRequests is returned when too many requests are attempted in half-open state
	ErrTooManyRequests = errors.New("too many requests in half-open state")
)

Functions

This section is empty.

Types

type AgentTask

type AgentTask struct {
	ServiceName string
	AgentName   string
	Input       *agentcore.AgentInput
}

AgentTask Agent 任务

type AgentTaskResult

type AgentTaskResult struct {
	Task   AgentTask
	Output *agentcore.AgentOutput
	Error  error
}

AgentTaskResult Agent 任务结果

type CircuitBreaker

type CircuitBreaker struct {
	// contains filtered or unexported fields
}

CircuitBreaker implements the circuit breaker pattern to prevent cascading failures

func NewCircuitBreaker

func NewCircuitBreaker(config *CircuitBreakerConfig) *CircuitBreaker

NewCircuitBreaker creates a new circuit breaker with the given configuration

func (*CircuitBreaker) Execute

func (cb *CircuitBreaker) Execute(fn func() error) error

Execute runs the given function through the circuit breaker

func (*CircuitBreaker) Failures

func (cb *CircuitBreaker) Failures() uint32

Failures returns the current failure count

func (*CircuitBreaker) Reset

func (cb *CircuitBreaker) Reset()

Reset resets the circuit breaker to closed state with zero failures

func (*CircuitBreaker) State

func (cb *CircuitBreaker) State() CircuitState

State returns the current state of the circuit breaker

type CircuitBreakerConfig

type CircuitBreakerConfig struct {
	// MaxFailures is the number of consecutive failures before opening the circuit
	MaxFailures uint32
	// Timeout is the duration to wait before transitioning from open to half-open
	Timeout time.Duration
	// OnStateChange is called when the circuit state changes (optional)
	OnStateChange func(from, to CircuitState)
}

CircuitBreakerConfig holds configuration for the circuit breaker

func DefaultCircuitBreakerConfig

func DefaultCircuitBreakerConfig() *CircuitBreakerConfig

DefaultCircuitBreakerConfig returns a default configuration

type CircuitState

type CircuitState int32

CircuitState represents the current state of the circuit breaker

const (
	// StateClosed allows all requests through
	StateClosed CircuitState = iota
	// StateOpen blocks all requests
	StateOpen
	// StateHalfOpen allows a single test request through
	StateHalfOpen
)

func (CircuitState) String

func (s CircuitState) String() string

String returns the string representation of the circuit state

type Client

type Client struct {
	// contains filtered or unexported fields
}

Client 远程 Agent 客户端 负责调用远程服务的 Agent

func NewClient

func NewClient(logger core.Logger) *Client

NewClient 创建客户端

func NewClientWithCircuitBreaker

func NewClientWithCircuitBreaker(logger core.Logger, cbConfig *CircuitBreakerConfig) *Client

NewClientWithCircuitBreaker 创建带自定义熔断器配置的客户端

func (*Client) CircuitBreaker

func (c *Client) CircuitBreaker() *CircuitBreaker

CircuitBreaker returns the client's circuit breaker for monitoring

func (*Client) ExecuteAgent

func (c *Client) ExecuteAgent(ctx context.Context, endpoint, agentName string, input *agentcore.AgentInput) (*agentcore.AgentOutput, error)

ExecuteAgent 执行远程 Agent

func (*Client) ExecuteAgentAsync

func (c *Client) ExecuteAgentAsync(ctx context.Context, endpoint, agentName string, input *agentcore.AgentInput) (string, error)

ExecuteAgentAsync 异步执行远程 Agent

func (*Client) GetAsyncResult

func (c *Client) GetAsyncResult(ctx context.Context, endpoint, taskID string) (*agentcore.AgentOutput, bool, error)

GetAsyncResult 获取异步执行结果

func (*Client) ListAgents

func (c *Client) ListAgents(ctx context.Context, endpoint string) ([]string, error)

ListAgents 列出服务支持的所有 Agent

func (*Client) Ping

func (c *Client) Ping(ctx context.Context, endpoint string) error

Ping 检查服务健康状态

func (*Client) WaitForAsyncResult

func (c *Client) WaitForAsyncResult(ctx context.Context, endpoint, taskID string, pollInterval time.Duration) (*agentcore.AgentOutput, error)

WaitForAsyncResult 等待异步执行完成

type Coordinator

type Coordinator struct {
	// contains filtered or unexported fields
}

Coordinator 分布式 Agent 协调器 负责跨服务的 Agent 调用和协调

func NewCoordinator

func NewCoordinator(registry *Registry, client *Client, logger core.Logger, opts ...CoordinatorOption) *Coordinator

NewCoordinator 创建协调器

func (*Coordinator) ExecuteAgent

func (c *Coordinator) ExecuteAgent(ctx context.Context, serviceName, agentName string, input *agentcore.AgentInput) (*agentcore.AgentOutput, error)

ExecuteAgent 执行远程 Agent

func (*Coordinator) ExecuteAgentWithRetry

func (c *Coordinator) ExecuteAgentWithRetry(ctx context.Context, serviceName, agentName string, input *agentcore.AgentInput, maxRetries int) (*agentcore.AgentOutput, error)

ExecuteAgentWithRetry 执行 Agent 并支持重试

func (*Coordinator) ExecuteParallel

func (c *Coordinator) ExecuteParallel(ctx context.Context, tasks []AgentTask) ([]AgentTaskResult, error)

ExecuteParallel 并行执行多个 Agent 使用信号量模式限制并发 goroutine 数量,防止资源耗尽

func (*Coordinator) ExecuteSequential

func (c *Coordinator) ExecuteSequential(ctx context.Context, tasks []AgentTask) ([]AgentTaskResult, error)

ExecuteSequential 顺序执行多个 Agent

type CoordinatorOption

type CoordinatorOption func(*Coordinator)

CoordinatorOption Coordinator 配置选项

func WithMaxConcurrency

func WithMaxConcurrency(max int) CoordinatorOption

WithMaxConcurrency 设置最大并发数

type Registry

type Registry struct {
	// contains filtered or unexported fields
}

Registry Agent 注册中心 管理所有服务实例的注册信息

func NewRegistry

func NewRegistry(logger core.Logger) *Registry

NewRegistry 创建注册中心

func (*Registry) Deregister

func (r *Registry) Deregister(instanceID string) error

Deregister 注销服务实例

func (*Registry) GetAllInstances

func (r *Registry) GetAllInstances(serviceName string) ([]*ServiceInstance, error)

GetAllInstances 获取所有服务实例

func (*Registry) GetHealthyInstances

func (r *Registry) GetHealthyInstances(serviceName string) ([]*ServiceInstance, error)

GetHealthyInstances 获取健康的服务实例

func (*Registry) GetInstance

func (r *Registry) GetInstance(instanceID string) (*ServiceInstance, error)

GetInstance 获取实例

func (*Registry) GetStatistics

func (r *Registry) GetStatistics() map[string]interface{}

GetStatistics 获取统计信息

func (*Registry) Heartbeat

func (r *Registry) Heartbeat(instanceID string) error

Heartbeat 更新实例心跳

func (*Registry) ListServices

func (r *Registry) ListServices() []string

ListServices 列出所有服务

func (*Registry) MarkHealthy

func (r *Registry) MarkHealthy(instanceID string)

MarkHealthy 标记实例为健康

func (*Registry) MarkUnhealthy

func (r *Registry) MarkUnhealthy(instanceID string)

MarkUnhealthy 标记实例为不健康

func (*Registry) Register

func (r *Registry) Register(instance *ServiceInstance) error

Register 注册服务实例

type ServiceInstance

type ServiceInstance struct {
	ID          string                 // 实例 ID
	ServiceName string                 // 服务名称
	Endpoint    string                 // 服务端点 (e.g., http://localhost:8080)
	Agents      []string               // 支持的 Agent 列表
	Metadata    map[string]interface{} // 元数据
	RegisterAt  time.Time              // 注册时间
	LastSeen    time.Time              // 最后心跳时间
	Healthy     bool                   // 健康状态
}

ServiceInstance 服务实例

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL