Documentation
¶
Index ¶
- type AgentHealthState
- func (s *AgentHealthState) ActiveConnections() int32
- func (s *AgentHealthState) CircuitState() CircuitBreakerState
- func (s *AgentHealthState) ConsecutiveFailures() int32
- func (s *AgentHealthState) DecrementConnections()
- func (s *AgentHealthState) GetStatistics() HealthStatistics
- func (s *AgentHealthState) HealthScore() float64
- func (s *AgentHealthState) IncrementConnections()
- func (s *AgentHealthState) IsAvailable() bool
- func (s *AgentHealthState) IsCircuitOpen() bool
- func (s *AgentHealthState) RecordFailure()
- func (s *AgentHealthState) RecordSuccess()
- type AgentSessionResolver
- type Candidate
- type CandidateStats
- type CircuitBreakerState
- type Dispatcher
- func (d *Dispatcher) CancelTask(ctx context.Context, taskID string) error
- func (d *Dispatcher) CleanupOldTasks(ttl time.Duration) error
- func (d *Dispatcher) Close() error
- func (d *Dispatcher) GetHealthTracker() *HealthTracker
- func (d *Dispatcher) GetLoadBalanceStrategy() LoadBalanceStrategy
- func (d *Dispatcher) GetLoadBalancer() *LoadBalancer
- func (d *Dispatcher) Invoke(ctx context.Context, functionID string, payload []byte) ([]byte, error)
- func (d *Dispatcher) InvokeRequest(ctx context.Context, req *sdkv1.InvokeRequest) (*sdkv1.InvokeResponse, error)
- func (d *Dispatcher) ListFunctionAgents(functionID string) []string
- func (d *Dispatcher) ListTaskRoutings() ([]*TaskRouting, error)
- func (d *Dispatcher) RegisterTask(taskID, agentID string)
- func (d *Dispatcher) SetHAEnabled(enabled bool)
- func (d *Dispatcher) SetLoadBalanceStrategy(strategy LoadBalanceStrategy)
- func (d *Dispatcher) SetSessionResolver(resolver AgentSessionResolver)
- func (d *Dispatcher) SetTLSConfig(cfg *tlsutil.ClientTLSConfig)
- func (d *Dispatcher) SetTaskEventQuery(query TaskEventQuery)
- func (d *Dispatcher) StartTask(ctx context.Context, functionID string, payload []byte) (string, error)
- func (d *Dispatcher) StartTaskRequest(ctx context.Context, req *sdkv1.InvokeRequest) (*sdkv1.StartTaskResponse, error)
- func (d *Dispatcher) Store() *reg.Store
- func (d *Dispatcher) StreamTask(ctx context.Context, taskID string) ([]*sdkv1.TaskEvent, bool, error)
- func (d *Dispatcher) StreamTaskAfterSeq(ctx context.Context, taskID string, afterSeq int64) ([]*sdkv1.TaskEvent, bool, error)
- func (d *Dispatcher) StreamTaskRealtime(ctx context.Context, taskID string, fn func(*sdkv1.TaskEvent) bool) (bool, error)
- func (d *Dispatcher) StreamTaskRealtimeAfterSeq(ctx context.Context, taskID string, afterSeq int64, ...) (bool, error)
- func (d *Dispatcher) TaskAgentID(taskID string) (string, bool)
- func (d *Dispatcher) UnregisterTask(taskID string)
- type FileTaskRoutingStore
- func (s *FileTaskRoutingStore) Cleanup(ttl time.Duration) error
- func (s *FileTaskRoutingStore) Close() error
- func (s *FileTaskRoutingStore) Delete(taskID string) error
- func (s *FileTaskRoutingStore) Get(taskID string) (*TaskRouting, error)
- func (s *FileTaskRoutingStore) List() ([]*TaskRouting, error)
- func (s *FileTaskRoutingStore) Set(taskID, agentID string) error
- type HealthCheckConfig
- type HealthStatistics
- type HealthTracker
- func (t *HealthTracker) DecrementConnections(agentID string)
- func (t *HealthTracker) GetAllStates() map[string]*AgentHealthState
- func (t *HealthTracker) GetAllStatistics() map[string]HealthStatistics
- func (t *HealthTracker) GetAvailableAgents() []*AgentHealthState
- func (t *HealthTracker) GetNextRoundRobinIndex(count int) int32
- func (t *HealthTracker) GetOrCreateState(agentID, routeHint string) *AgentHealthState
- func (t *HealthTracker) GetState(agentID string) (*AgentHealthState, bool)
- func (t *HealthTracker) GetStatistics(agentID string) (HealthStatistics, error)
- func (t *HealthTracker) IncrementConnections(agentID string)
- func (t *HealthTracker) RecordFailure(agentID string)
- func (t *HealthTracker) RecordSuccess(agentID string)
- func (t *HealthTracker) RegisterAgent(agentID, routeHint string) *AgentHealthState
- func (t *HealthTracker) Reset(agentID string)
- func (t *HealthTracker) Start()
- func (t *HealthTracker) Stop()
- func (t *HealthTracker) UnregisterAgent(agentID string)
- type LoadBalanceStrategy
- type LoadBalancer
- func (lb *LoadBalancer) BuildCandidates(sessions []*reg.AgentSession, functionID string) []*Candidate
- func (lb *LoadBalancer) GetStatistics() LoadBalancerStats
- func (lb *LoadBalancer) GetStrategy() LoadBalanceStrategy
- func (lb *LoadBalancer) Select(functionID string, candidates []*Candidate) (*Candidate, error)
- func (lb *LoadBalancer) SetStrategy(strategy LoadBalanceStrategy)
- type LoadBalancerStats
- type MemoryTaskRoutingStore
- func (s *MemoryTaskRoutingStore) Cleanup(ttl time.Duration) error
- func (s *MemoryTaskRoutingStore) Close() error
- func (s *MemoryTaskRoutingStore) Delete(taskID string) error
- func (s *MemoryTaskRoutingStore) Get(taskID string) (*TaskRouting, error)
- func (s *MemoryTaskRoutingStore) List() ([]*TaskRouting, error)
- func (s *MemoryTaskRoutingStore) Set(taskID, agentID string) error
- type ReconnectionManager
- func (m *ReconnectionManager) GetOrCreateState(key string) *ReconnectionState
- func (m *ReconnectionManager) GetPolicy() *ReconnectionPolicy
- func (m *ReconnectionManager) RemoveState(key string)
- func (m *ReconnectionManager) ResetAll()
- func (m *ReconnectionManager) SetPolicy(policy *ReconnectionPolicy)
- func (m *ReconnectionManager) Stats() map[string]ReconnectionStats
- type ReconnectionPolicy
- type ReconnectionState
- func (s *ReconnectionState) GetAttempt() int
- func (s *ReconnectionState) GetLastError() error
- func (s *ReconnectionState) IsEnabled() bool
- func (s *ReconnectionState) NextDelay() (time.Duration, error)
- func (s *ReconnectionState) RecordAttempt(err error)
- func (s *ReconnectionState) Reset()
- func (s *ReconnectionState) SetEnabled(enabled bool)
- func (s *ReconnectionState) ShouldRetry() bool
- type ReconnectionStats
- type TaskEventQuery
- type TaskEventQueryAdapter
- type TaskRouting
- type TaskRoutingStore
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type AgentHealthState ¶ added in v0.1.9
type AgentHealthState struct {
// Immutable fields
AgentID string
Addr string
// contains filtered or unexported fields
}
AgentHealthState tracks the health state of a single agent
func NewAgentHealthState ¶ added in v0.1.9
func NewAgentHealthState(agentID, routeHint string, config *HealthCheckConfig) *AgentHealthState
NewAgentHealthState creates a new agent health state. routeHint keeps a best-effort compatibility address for diagnostics only.
func (*AgentHealthState) ActiveConnections ¶ added in v0.1.9
func (s *AgentHealthState) ActiveConnections() int32
ActiveConnections returns the current number of active connections
func (*AgentHealthState) CircuitState ¶ added in v0.1.9
func (s *AgentHealthState) CircuitState() CircuitBreakerState
CircuitState returns the current circuit breaker state
func (*AgentHealthState) ConsecutiveFailures ¶ added in v0.1.9
func (s *AgentHealthState) ConsecutiveFailures() int32
ConsecutiveFailures returns the current consecutive failure count
func (*AgentHealthState) DecrementConnections ¶ added in v0.1.9
func (s *AgentHealthState) DecrementConnections()
DecrementConnections decrements the active connection count
func (*AgentHealthState) GetStatistics ¶ added in v0.1.9
func (s *AgentHealthState) GetStatistics() HealthStatistics
GetStatistics returns the current statistics
func (*AgentHealthState) HealthScore ¶ added in v0.1.9
func (s *AgentHealthState) HealthScore() float64
HealthScore returns the current health score (0-100)
func (*AgentHealthState) IncrementConnections ¶ added in v0.1.9
func (s *AgentHealthState) IncrementConnections()
IncrementConnections increments the active connection count
func (*AgentHealthState) IsAvailable ¶ added in v0.1.9
func (s *AgentHealthState) IsAvailable() bool
IsAvailable returns true if the agent is available for routing
func (*AgentHealthState) IsCircuitOpen ¶ added in v0.1.9
func (s *AgentHealthState) IsCircuitOpen() bool
IsCircuitOpen returns true if the circuit is open
func (*AgentHealthState) RecordFailure ¶ added in v0.1.9
func (s *AgentHealthState) RecordFailure()
RecordFailure records a failed request
func (*AgentHealthState) RecordSuccess ¶ added in v0.1.9
func (s *AgentHealthState) RecordSuccess()
RecordSuccess records a successful request
type AgentSessionResolver ¶ added in v0.1.10
type AgentSessionResolver interface {
ResolveAgentConn(agentID string) (transport.SessionCaller, bool)
}
AgentSessionResolver finds active TCP sessions for connected Agents. The server package's AgentSessionStore implements this interface.
type Candidate ¶ added in v0.1.9
type Candidate struct {
AgentID string
Session *reg.AgentSession
Health *AgentHealthState
Available bool
}
Candidate represents an agent that can be selected for routing
type CandidateStats ¶ added in v0.1.9
type CandidateStats struct {
AgentID string `json:"agentId"`
ActiveConnections int32 `json:"activeConnections"`
ConsecutiveFailures int32 `json:"consecutiveFailures"`
HealthScore float64 `json:"healthScore"`
CircuitState string `json:"circuitState"`
Available bool `json:"available"`
}
CandidateStats contains statistics for a single candidate
type CircuitBreakerState ¶ added in v0.1.9
type CircuitBreakerState int32
CircuitBreakerState represents the state of the circuit breaker
const ( // CircuitClosed means the circuit is closed and requests are allowed CircuitClosed CircuitBreakerState = iota // CircuitOpen means the circuit is open and requests are blocked CircuitOpen // CircuitHalfOpen means the circuit is half-open and testing if the agent has recovered CircuitHalfOpen )
func (CircuitBreakerState) String ¶ added in v0.1.9
func (s CircuitBreakerState) String() string
type Dispatcher ¶
type Dispatcher struct {
// contains filtered or unexported fields
}
Dispatcher routes function invocations to live agents discovered via registry store. Uses TCP session routing for all agent communication. Supports HA features: health tracking, circuit breaker, load balancing.
func NewDispatcher ¶
func NewDispatcher(store *reg.Store) *Dispatcher
func NewDispatcherWithHA ¶ added in v0.1.9
func NewDispatcherWithHA(store *reg.Store, taskStore TaskRoutingStore, taskEventQuery TaskEventQuery, haEnabled bool, strategy LoadBalanceStrategy, healthConfig *HealthCheckConfig) *Dispatcher
NewDispatcherWithHA creates a new Dispatcher with HA features enabled
func NewDispatcherWithTaskStore ¶ added in v0.1.10
func NewDispatcherWithTaskStore(store *reg.Store, taskStore TaskRoutingStore, taskEventQuery TaskEventQuery) *Dispatcher
NewDispatcherWithTaskStore creates a new Dispatcher with optional task routing store and task event query
func (*Dispatcher) CancelTask ¶ added in v0.1.10
func (d *Dispatcher) CancelTask(ctx context.Context, taskID string) error
func (*Dispatcher) CleanupOldTasks ¶ added in v0.1.10
func (d *Dispatcher) CleanupOldTasks(ttl time.Duration) error
CleanupOldTasks removes old task routing entries.
func (*Dispatcher) Close ¶
func (d *Dispatcher) Close() error
Close closes the dispatcher and its resources
func (*Dispatcher) GetHealthTracker ¶ added in v0.1.9
func (d *Dispatcher) GetHealthTracker() *HealthTracker
GetHealthTracker returns the health tracker (for monitoring/inspection)
func (*Dispatcher) GetLoadBalanceStrategy ¶ added in v0.1.9
func (d *Dispatcher) GetLoadBalanceStrategy() LoadBalanceStrategy
GetLoadBalanceStrategy returns the current load balancing strategy
func (*Dispatcher) GetLoadBalancer ¶ added in v0.1.9
func (d *Dispatcher) GetLoadBalancer() *LoadBalancer
GetLoadBalancer returns the load balancer (for monitoring/inspection)
func (*Dispatcher) InvokeRequest ¶
func (d *Dispatcher) InvokeRequest(ctx context.Context, req *sdkv1.InvokeRequest) (*sdkv1.InvokeResponse, error)
InvokeRequest forwards a fully populated InvokeRequest to a live agent.
func (*Dispatcher) ListFunctionAgents ¶
func (d *Dispatcher) ListFunctionAgents(functionID string) []string
ListFunctionAgents returns agent IDs that currently expose the function.
func (*Dispatcher) ListTaskRoutings ¶ added in v0.1.10
func (d *Dispatcher) ListTaskRoutings() ([]*TaskRouting, error)
ListTaskRoutings returns persisted task routing entries.
func (*Dispatcher) RegisterTask ¶ added in v0.1.10
func (d *Dispatcher) RegisterTask(taskID, agentID string)
RegisterTask registers a task routing.
func (*Dispatcher) SetHAEnabled ¶ added in v0.1.9
func (d *Dispatcher) SetHAEnabled(enabled bool)
SetHAEnabled enables or disables HA features at runtime
func (*Dispatcher) SetLoadBalanceStrategy ¶ added in v0.1.9
func (d *Dispatcher) SetLoadBalanceStrategy(strategy LoadBalanceStrategy)
SetLoadBalanceStrategy changes the load balancing strategy
func (*Dispatcher) SetSessionResolver ¶ added in v0.1.10
func (d *Dispatcher) SetSessionResolver(resolver AgentSessionResolver)
SetSessionResolver sets the TCP session resolver for routing requests over established Agent sessions.
func (*Dispatcher) SetTLSConfig ¶
func (d *Dispatcher) SetTLSConfig(cfg *tlsutil.ClientTLSConfig)
func (*Dispatcher) SetTaskEventQuery ¶ added in v0.1.10
func (d *Dispatcher) SetTaskEventQuery(query TaskEventQuery)
SetTaskEventQuery sets the task event query for persistent storage access.
func (*Dispatcher) StartTaskRequest ¶ added in v0.1.10
func (d *Dispatcher) StartTaskRequest(ctx context.Context, req *sdkv1.InvokeRequest) (*sdkv1.StartTaskResponse, error)
StartTaskRequest forwards a structured InvokeRequest to the agent StartTask RPC.
func (*Dispatcher) Store ¶
func (d *Dispatcher) Store() *reg.Store
func (*Dispatcher) StreamTask ¶ added in v0.1.10
func (*Dispatcher) StreamTaskAfterSeq ¶ added in v0.1.10
func (d *Dispatcher) StreamTaskAfterSeq(ctx context.Context, taskID string, afterSeq int64) ([]*sdkv1.TaskEvent, bool, error)
StreamTaskAfterSeq streams task events after a given sequence number. Returns events, whether the task is done, and any error.
func (*Dispatcher) StreamTaskRealtime ¶ added in v0.1.10
func (d *Dispatcher) StreamTaskRealtime(ctx context.Context, taskID string, fn func(*sdkv1.TaskEvent) bool) (bool, error)
StreamTaskRealtime forwards task events to the provided callback.
func (*Dispatcher) StreamTaskRealtimeAfterSeq ¶ added in v0.1.10
func (d *Dispatcher) StreamTaskRealtimeAfterSeq(ctx context.Context, taskID string, afterSeq int64, fn func(*sdkv1.TaskEvent) bool) (bool, error)
StreamTaskRealtimeAfterSeq streams task events to the callback after a given sequence number. Returns whether the task is done and any error.
func (*Dispatcher) TaskAgentID ¶ added in v0.1.10
func (d *Dispatcher) TaskAgentID(taskID string) (string, bool)
TaskAgentID exposes tracked task routing (primarily for diagnostics). Returns the agentID for the task.
func (*Dispatcher) UnregisterTask ¶ added in v0.1.10
func (d *Dispatcher) UnregisterTask(taskID string)
UnregisterTask unregisters a task routing.
type FileTaskRoutingStore ¶ added in v0.1.10
type FileTaskRoutingStore struct {
// contains filtered or unexported fields
}
func NewFileTaskRoutingStore ¶ added in v0.1.10
func NewFileTaskRoutingStore(dataDir string) (*FileTaskRoutingStore, error)
func (*FileTaskRoutingStore) Cleanup ¶ added in v0.1.10
func (s *FileTaskRoutingStore) Cleanup(ttl time.Duration) error
func (*FileTaskRoutingStore) Close ¶ added in v0.1.10
func (s *FileTaskRoutingStore) Close() error
func (*FileTaskRoutingStore) Delete ¶ added in v0.1.10
func (s *FileTaskRoutingStore) Delete(taskID string) error
func (*FileTaskRoutingStore) Get ¶ added in v0.1.10
func (s *FileTaskRoutingStore) Get(taskID string) (*TaskRouting, error)
func (*FileTaskRoutingStore) List ¶ added in v0.1.10
func (s *FileTaskRoutingStore) List() ([]*TaskRouting, error)
func (*FileTaskRoutingStore) Set ¶ added in v0.1.10
func (s *FileTaskRoutingStore) Set(taskID, agentID string) error
type HealthCheckConfig ¶ added in v0.1.9
type HealthCheckConfig struct {
// Score decay rate per tick (0-1)
ScoreDecayRate float64
// Score bonus on successful request
ScoreSuccessBonus float64
// Score penalty on failed request
ScoreFailurePenalty float64
// Minimum health score (0-100)
MinScore float64
// Maximum health score (0-100)
MaxScore float64
// Score decay interval
DecayInterval time.Duration
// Circuit breaker failure threshold
FailureThreshold int32
// Circuit breaker open timeout
CircuitOpenTimeout time.Duration
// Half-open max requests before transitioning
HalfOpenMaxRequests int32
}
HealthCheckConfig configures the health check behavior
func DefaultHealthCheckConfig ¶ added in v0.1.9
func DefaultHealthCheckConfig() *HealthCheckConfig
DefaultHealthCheckConfig returns the default health check configuration
type HealthStatistics ¶ added in v0.1.9
type HealthStatistics struct {
TotalRequests int64
SuccessfulRequests int64
FailedRequests int64
ActiveConnections int32
ConsecutiveFailures int32
HealthScore float64
CircuitState CircuitBreakerState
LastSuccessTime time.Time
LastFailureTime time.Time
CircuitOpenedAt time.Time
LastStateChange time.Time
}
HealthStatistics contains the current health statistics
type HealthTracker ¶ added in v0.1.9
type HealthTracker struct {
// contains filtered or unexported fields
}
HealthTracker manages health states for all agents
func NewHealthTracker ¶ added in v0.1.9
func NewHealthTracker(config *HealthCheckConfig) *HealthTracker
NewHealthTracker creates a new health tracker
func (*HealthTracker) DecrementConnections ¶ added in v0.1.9
func (t *HealthTracker) DecrementConnections(agentID string)
DecrementConnections decrements the connection count for an agent
func (*HealthTracker) GetAllStates ¶ added in v0.1.9
func (t *HealthTracker) GetAllStates() map[string]*AgentHealthState
GetAllStates returns a snapshot of all agent states
func (*HealthTracker) GetAllStatistics ¶ added in v0.1.9
func (t *HealthTracker) GetAllStatistics() map[string]HealthStatistics
GetAllStatistics returns statistics for all agents
func (*HealthTracker) GetAvailableAgents ¶ added in v0.1.9
func (t *HealthTracker) GetAvailableAgents() []*AgentHealthState
GetAvailableAgents returns all agents that are available for routing
func (*HealthTracker) GetNextRoundRobinIndex ¶ added in v0.1.9
func (t *HealthTracker) GetNextRoundRobinIndex(count int) int32
GetNextRoundRobinIndex returns the next index for round-robin selection
func (*HealthTracker) GetOrCreateState ¶ added in v0.1.9
func (t *HealthTracker) GetOrCreateState(agentID, routeHint string) *AgentHealthState
GetOrCreateState returns the health state for an agent, creating it if necessary. routeHint is retained only for diagnostics while live routing uses sessions.
func (*HealthTracker) GetState ¶ added in v0.1.9
func (t *HealthTracker) GetState(agentID string) (*AgentHealthState, bool)
GetState returns the health state for an agent
func (*HealthTracker) GetStatistics ¶ added in v0.1.9
func (t *HealthTracker) GetStatistics(agentID string) (HealthStatistics, error)
GetStatistics returns statistics for a specific agent
func (*HealthTracker) IncrementConnections ¶ added in v0.1.9
func (t *HealthTracker) IncrementConnections(agentID string)
IncrementConnections increments the connection count for an agent
func (*HealthTracker) RecordFailure ¶ added in v0.1.9
func (t *HealthTracker) RecordFailure(agentID string)
RecordFailure records a failed request for an agent
func (*HealthTracker) RecordSuccess ¶ added in v0.1.9
func (t *HealthTracker) RecordSuccess(agentID string)
RecordSuccess records a successful request for an agent
func (*HealthTracker) RegisterAgent ¶ added in v0.1.9
func (t *HealthTracker) RegisterAgent(agentID, routeHint string) *AgentHealthState
RegisterAgent registers a new agent or updates an existing one. routeHint is a compatibility mirror used for observability, not routing.
func (*HealthTracker) Reset ¶ added in v0.1.9
func (t *HealthTracker) Reset(agentID string)
Reset resets the health state for an agent (e.g., after agent re-registration)
func (*HealthTracker) Start ¶ added in v0.1.9
func (t *HealthTracker) Start()
Start starts the health tracker background routines
func (*HealthTracker) Stop ¶ added in v0.1.9
func (t *HealthTracker) Stop()
Stop stops the health tracker
func (*HealthTracker) UnregisterAgent ¶ added in v0.1.9
func (t *HealthTracker) UnregisterAgent(agentID string)
UnregisterAgent removes an agent from tracking
type LoadBalanceStrategy ¶ added in v0.1.9
type LoadBalanceStrategy string
LoadBalanceStrategy defines the load balancing strategy
const ( // StrategyMinID selects agent with minimum ID (original behavior) StrategyMinID LoadBalanceStrategy = "min_id" // StrategyRoundRobin selects agents in round-robin fashion StrategyRoundRobin LoadBalanceStrategy = "round_robin" // StrategyLeastConn selects agent with least active connections StrategyLeastConn LoadBalanceStrategy = "least_conn" // StrategyWeighted selects agent based on health score weights StrategyWeighted LoadBalanceStrategy = "weighted" )
type LoadBalancer ¶ added in v0.1.9
type LoadBalancer struct {
// contains filtered or unexported fields
}
LoadBalancer selects the best agent for routing based on strategy
func NewLoadBalancer ¶ added in v0.1.9
func NewLoadBalancer(strategy LoadBalanceStrategy, tracker *HealthTracker) *LoadBalancer
NewLoadBalancer creates a new load balancer
func (*LoadBalancer) BuildCandidates ¶ added in v0.1.9
func (lb *LoadBalancer) BuildCandidates(sessions []*reg.AgentSession, functionID string) []*Candidate
BuildCandidates builds candidate list from agent sessions
func (*LoadBalancer) GetStatistics ¶ added in v0.1.9
func (lb *LoadBalancer) GetStatistics() LoadBalancerStats
GetStatistics returns load balancer statistics
func (*LoadBalancer) GetStrategy ¶ added in v0.1.9
func (lb *LoadBalancer) GetStrategy() LoadBalanceStrategy
GetStrategy returns the current load balancing strategy
func (*LoadBalancer) Select ¶ added in v0.1.9
func (lb *LoadBalancer) Select(functionID string, candidates []*Candidate) (*Candidate, error)
Select selects the best agent for the given function
func (*LoadBalancer) SetStrategy ¶ added in v0.1.9
func (lb *LoadBalancer) SetStrategy(strategy LoadBalanceStrategy)
SetStrategy updates the load balancing strategy
type LoadBalancerStats ¶ added in v0.1.9
type LoadBalancerStats struct {
Strategy LoadBalanceStrategy `json:"strategy"`
RoundRobinState map[string]int32 `json:"roundRobinState,omitempty"`
AgentStats map[string]CandidateStats `json:"agentStats,omitempty"`
}
LoadBalancerStats contains load balancer statistics
type MemoryTaskRoutingStore ¶ added in v0.1.10
type MemoryTaskRoutingStore struct {
// contains filtered or unexported fields
}
func NewMemoryTaskRoutingStore ¶ added in v0.1.10
func NewMemoryTaskRoutingStore() *MemoryTaskRoutingStore
func (*MemoryTaskRoutingStore) Cleanup ¶ added in v0.1.10
func (s *MemoryTaskRoutingStore) Cleanup(ttl time.Duration) error
func (*MemoryTaskRoutingStore) Close ¶ added in v0.1.10
func (s *MemoryTaskRoutingStore) Close() error
func (*MemoryTaskRoutingStore) Delete ¶ added in v0.1.10
func (s *MemoryTaskRoutingStore) Delete(taskID string) error
func (*MemoryTaskRoutingStore) Get ¶ added in v0.1.10
func (s *MemoryTaskRoutingStore) Get(taskID string) (*TaskRouting, error)
func (*MemoryTaskRoutingStore) List ¶ added in v0.1.10
func (s *MemoryTaskRoutingStore) List() ([]*TaskRouting, error)
func (*MemoryTaskRoutingStore) Set ¶ added in v0.1.10
func (s *MemoryTaskRoutingStore) Set(taskID, agentID string) error
type ReconnectionManager ¶ added in v0.1.9
type ReconnectionManager struct {
// contains filtered or unexported fields
}
ReconnectionManager manages reconnection states for multiple connections
func NewReconnectionManager ¶ added in v0.1.9
func NewReconnectionManager(policy *ReconnectionPolicy) *ReconnectionManager
NewReconnectionManager creates a new reconnection manager
func (*ReconnectionManager) GetOrCreateState ¶ added in v0.1.9
func (m *ReconnectionManager) GetOrCreateState(key string) *ReconnectionState
GetOrCreateState gets or creates a reconnection state for the given key
func (*ReconnectionManager) GetPolicy ¶ added in v0.1.9
func (m *ReconnectionManager) GetPolicy() *ReconnectionPolicy
GetPolicy returns the reconnection policy
func (*ReconnectionManager) RemoveState ¶ added in v0.1.9
func (m *ReconnectionManager) RemoveState(key string)
RemoveState removes the reconnection state for the given key
func (*ReconnectionManager) ResetAll ¶ added in v0.1.9
func (m *ReconnectionManager) ResetAll()
ResetAll resets all reconnection states
func (*ReconnectionManager) SetPolicy ¶ added in v0.1.9
func (m *ReconnectionManager) SetPolicy(policy *ReconnectionPolicy)
SetPolicy updates the reconnection policy
func (*ReconnectionManager) Stats ¶ added in v0.1.9
func (m *ReconnectionManager) Stats() map[string]ReconnectionStats
Stats returns reconnection statistics
type ReconnectionPolicy ¶ added in v0.1.9
type ReconnectionPolicy struct {
// MaxRetries is the maximum number of reconnection attempts (-1 for infinite)
MaxRetries int
// InitialDelay is the initial delay before the first retry
InitialDelay time.Duration
// MaxDelay is the maximum delay between retries
MaxDelay time.Duration
// Multiplier is the exponential backoff multiplier (e.g., 2.0 for doubling)
Multiplier float64
// Jitter is the random jitter factor (0-1) to add to delays
Jitter float64
// EnableAutoReconnect enables automatic reconnection
EnableAutoReconnect bool
// contains filtered or unexported fields
}
ReconnectionPolicy defines how reconnection attempts are made
func DefaultReconnectionPolicy ¶ added in v0.1.9
func DefaultReconnectionPolicy() *ReconnectionPolicy
DefaultReconnectionPolicy returns the default reconnection policy
func NewReconnectionPolicy ¶ added in v0.1.9
func NewReconnectionPolicy(maxRetries int, initialDelay, maxDelay time.Duration, multiplier, jitter float64, enableAuto bool) *ReconnectionPolicy
NewReconnectionPolicy creates a new reconnection policy with custom settings
func (*ReconnectionPolicy) NextDelay ¶ added in v0.1.9
func (p *ReconnectionPolicy) NextDelay(attempt int) (time.Duration, error)
NextDelay returns the delay before the next reconnection attempt
func (*ReconnectionPolicy) ShouldRetry ¶ added in v0.1.9
func (p *ReconnectionPolicy) ShouldRetry(attempt int) bool
ShouldRetry returns whether another reconnection attempt should be made
type ReconnectionState ¶ added in v0.1.9
type ReconnectionState struct {
// contains filtered or unexported fields
}
ReconnectionState tracks the reconnection state for a connection
func NewReconnectionState ¶ added in v0.1.9
func NewReconnectionState(policy *ReconnectionPolicy) *ReconnectionState
NewReconnectionState creates a new reconnection state
func (*ReconnectionState) GetAttempt ¶ added in v0.1.9
func (s *ReconnectionState) GetAttempt() int
GetAttempt returns the current attempt count
func (*ReconnectionState) GetLastError ¶ added in v0.1.9
func (s *ReconnectionState) GetLastError() error
GetLastError returns the last error encountered
func (*ReconnectionState) IsEnabled ¶ added in v0.1.9
func (s *ReconnectionState) IsEnabled() bool
IsEnabled returns whether automatic reconnection is enabled
func (*ReconnectionState) NextDelay ¶ added in v0.1.9
func (s *ReconnectionState) NextDelay() (time.Duration, error)
NextDelay returns the delay before the next reconnection attempt
func (*ReconnectionState) RecordAttempt ¶ added in v0.1.9
func (s *ReconnectionState) RecordAttempt(err error)
RecordAttempt records a reconnection attempt
func (*ReconnectionState) Reset ¶ added in v0.1.9
func (s *ReconnectionState) Reset()
Reset resets the reconnection state (e.g., after successful connection)
func (*ReconnectionState) SetEnabled ¶ added in v0.1.9
func (s *ReconnectionState) SetEnabled(enabled bool)
SetEnabled enables or disables automatic reconnection
func (*ReconnectionState) ShouldRetry ¶ added in v0.1.9
func (s *ReconnectionState) ShouldRetry() bool
ShouldRetry returns whether another reconnection attempt should be made
type ReconnectionStats ¶ added in v0.1.9
type ReconnectionStats struct {
Attempt int `json:"attempt"`
LastAttempt time.Time `json:"lastAttempt"`
LastError error `json:"lastError,omitempty"`
Enabled bool `json:"enabled"`
}
ReconnectionStats contains reconnection statistics for a single connection
type TaskEventQuery ¶ added in v0.1.10
type TaskEventQuery interface {
ListEvents(ctx context.Context, taskID string, afterSeq int64) ([]*sdkv1.TaskEvent, error)
GetRun(ctx context.Context, taskID string) (*sdkv1.TaskEvent, error)
}
TaskEventQuery queries task events from persistent storage.
type TaskEventQueryAdapter ¶ added in v0.1.10
type TaskEventQueryAdapter struct {
// contains filtered or unexported fields
}
TaskEventQueryAdapter implements TaskEventQuery using model.TaskEventModel and model.TaskRunModel.
func NewTaskEventQueryAdapter ¶ added in v0.1.10
func NewTaskEventQueryAdapter(events *model.TaskEventModel, runs *model.TaskRunModel) *TaskEventQueryAdapter
NewTaskEventQueryAdapter creates a new TaskEventQuery from database models.
func (*TaskEventQueryAdapter) GetRun ¶ added in v0.1.10
func (a *TaskEventQueryAdapter) GetRun(ctx context.Context, taskID string) (*sdkv1.TaskEvent, error)
GetRun returns the task run for the given task ID.
func (*TaskEventQueryAdapter) ListEvents ¶ added in v0.1.10
func (a *TaskEventQueryAdapter) ListEvents(ctx context.Context, taskID string, afterSeq int64) ([]*sdkv1.TaskEvent, error)
ListEvents returns task events after the given sequence number.