Documentation
¶
Index ¶
- type AgentHealthState
- func (s *AgentHealthState) ActiveConnections() int32
- func (s *AgentHealthState) CircuitState() CircuitBreakerState
- func (s *AgentHealthState) ConsecutiveFailures() int32
- func (s *AgentHealthState) DecrementConnections()
- func (s *AgentHealthState) GetStatistics() HealthStatistics
- func (s *AgentHealthState) HealthScore() float64
- func (s *AgentHealthState) IncrementConnections()
- func (s *AgentHealthState) IsAvailable() bool
- func (s *AgentHealthState) IsCircuitOpen() bool
- func (s *AgentHealthState) RecordFailure()
- func (s *AgentHealthState) RecordSuccess()
- type Candidate
- type CandidateStats
- type CircuitBreakerState
- type Dispatcher
- func (d *Dispatcher) CancelJob(ctx context.Context, jobID string) error
- func (d *Dispatcher) CleanupOldJobs(ttl time.Duration) error
- func (d *Dispatcher) Close() error
- func (d *Dispatcher) GetHealthTracker() *HealthTracker
- func (d *Dispatcher) GetLoadBalanceStrategy() LoadBalanceStrategy
- func (d *Dispatcher) GetLoadBalancer() *LoadBalancer
- func (d *Dispatcher) Invoke(ctx context.Context, functionID string, payload []byte) ([]byte, error)
- func (d *Dispatcher) InvokeRequest(ctx context.Context, req *sdkv1.InvokeRequest) (*sdkv1.InvokeResponse, error)
- func (d *Dispatcher) JobAddr(jobID string) (string, bool)
- func (d *Dispatcher) ListFunctionAgents(functionID string) []string
- func (d *Dispatcher) RegisterJob(jobID, addr string)
- func (d *Dispatcher) SetHAEnabled(enabled bool)
- func (d *Dispatcher) SetLoadBalanceStrategy(strategy LoadBalanceStrategy)
- func (d *Dispatcher) SetTLSConfig(cfg *tlsutil.ClientTLSConfig)
- func (d *Dispatcher) StartJob(ctx context.Context, functionID string, payload []byte) (string, error)
- func (d *Dispatcher) StartJobRequest(ctx context.Context, req *sdkv1.InvokeRequest) (*sdkv1.StartJobResponse, error)
- func (d *Dispatcher) Store() *reg.Store
- func (d *Dispatcher) StreamJob(ctx context.Context, jobID string) ([]*sdkv1.JobEvent, bool, error)
- func (d *Dispatcher) StreamJobRealtime(ctx context.Context, jobID string, fn func(*sdkv1.JobEvent) bool) (bool, error)
- func (d *Dispatcher) UnregisterJob(jobID string)
- type FileJobRoutingStore
- func (s *FileJobRoutingStore) Cleanup(ttl time.Duration) error
- func (s *FileJobRoutingStore) Close() error
- func (s *FileJobRoutingStore) Delete(jobID string) error
- func (s *FileJobRoutingStore) Get(jobID string) (*JobRouting, error)
- func (s *FileJobRoutingStore) List() ([]*JobRouting, error)
- func (s *FileJobRoutingStore) Set(jobID, agentAddr string) error
- type HealthCheckConfig
- type HealthStatistics
- type HealthTracker
- func (t *HealthTracker) DecrementConnections(agentID string)
- func (t *HealthTracker) GetAllStates() map[string]*AgentHealthState
- func (t *HealthTracker) GetAllStatistics() map[string]HealthStatistics
- func (t *HealthTracker) GetAvailableAgents() []*AgentHealthState
- func (t *HealthTracker) GetNextRoundRobinIndex(count int) int32
- func (t *HealthTracker) GetOrCreateState(agentID, addr string) *AgentHealthState
- func (t *HealthTracker) GetState(agentID string) (*AgentHealthState, bool)
- func (t *HealthTracker) GetStatistics(agentID string) (HealthStatistics, error)
- func (t *HealthTracker) IncrementConnections(agentID string)
- func (t *HealthTracker) RecordFailure(agentID string)
- func (t *HealthTracker) RecordSuccess(agentID string)
- func (t *HealthTracker) RegisterAgent(agentID, addr string) *AgentHealthState
- func (t *HealthTracker) Reset(agentID string)
- func (t *HealthTracker) Start()
- func (t *HealthTracker) Stop()
- func (t *HealthTracker) UnregisterAgent(agentID string)
- type JobRouting
- type JobRoutingStore
- type LoadBalanceStrategy
- type LoadBalancer
- func (lb *LoadBalancer) BuildCandidates(sessions []*reg.AgentSession, functionID string) []*Candidate
- func (lb *LoadBalancer) GetStatistics() LoadBalancerStats
- func (lb *LoadBalancer) GetStrategy() LoadBalanceStrategy
- func (lb *LoadBalancer) Select(functionID string, candidates []*Candidate) (*Candidate, error)
- func (lb *LoadBalancer) SetStrategy(strategy LoadBalanceStrategy)
- type LoadBalancerStats
- type MemoryJobRoutingStore
- func (s *MemoryJobRoutingStore) Cleanup(ttl time.Duration) error
- func (s *MemoryJobRoutingStore) Close() error
- func (s *MemoryJobRoutingStore) Delete(jobID string) error
- func (s *MemoryJobRoutingStore) Get(jobID string) (*JobRouting, error)
- func (s *MemoryJobRoutingStore) List() ([]*JobRouting, error)
- func (s *MemoryJobRoutingStore) Set(jobID, agentAddr string) error
- type ReconnectionManager
- func (m *ReconnectionManager) GetOrCreateState(key string) *ReconnectionState
- func (m *ReconnectionManager) GetPolicy() *ReconnectionPolicy
- func (m *ReconnectionManager) RemoveState(key string)
- func (m *ReconnectionManager) ResetAll()
- func (m *ReconnectionManager) SetPolicy(policy *ReconnectionPolicy)
- func (m *ReconnectionManager) Stats() map[string]ReconnectionStats
- type ReconnectionPolicy
- type ReconnectionState
- func (s *ReconnectionState) GetAttempt() int
- func (s *ReconnectionState) GetLastError() error
- func (s *ReconnectionState) IsEnabled() bool
- func (s *ReconnectionState) NextDelay() (time.Duration, error)
- func (s *ReconnectionState) RecordAttempt(err error)
- func (s *ReconnectionState) Reset()
- func (s *ReconnectionState) SetEnabled(enabled bool)
- func (s *ReconnectionState) ShouldRetry() bool
- type ReconnectionStats
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type AgentHealthState ¶ added in v0.1.9
type AgentHealthState struct {
// Immutable fields
AgentID string
Addr string
// contains filtered or unexported fields
}
AgentHealthState tracks the health state of a single agent
func NewAgentHealthState ¶ added in v0.1.9
func NewAgentHealthState(agentID, addr string, config *HealthCheckConfig) *AgentHealthState
NewAgentHealthState creates a new agent health state
func (*AgentHealthState) ActiveConnections ¶ added in v0.1.9
func (s *AgentHealthState) ActiveConnections() int32
ActiveConnections returns the current number of active connections
func (*AgentHealthState) CircuitState ¶ added in v0.1.9
func (s *AgentHealthState) CircuitState() CircuitBreakerState
CircuitState returns the current circuit breaker state
func (*AgentHealthState) ConsecutiveFailures ¶ added in v0.1.9
func (s *AgentHealthState) ConsecutiveFailures() int32
ConsecutiveFailures returns the current consecutive failure count
func (*AgentHealthState) DecrementConnections ¶ added in v0.1.9
func (s *AgentHealthState) DecrementConnections()
DecrementConnections decrements the active connection count
func (*AgentHealthState) GetStatistics ¶ added in v0.1.9
func (s *AgentHealthState) GetStatistics() HealthStatistics
GetStatistics returns the current statistics
func (*AgentHealthState) HealthScore ¶ added in v0.1.9
func (s *AgentHealthState) HealthScore() float64
HealthScore returns the current health score (0-100)
func (*AgentHealthState) IncrementConnections ¶ added in v0.1.9
func (s *AgentHealthState) IncrementConnections()
IncrementConnections increments the active connection count
func (*AgentHealthState) IsAvailable ¶ added in v0.1.9
func (s *AgentHealthState) IsAvailable() bool
IsAvailable returns true if the agent is available for routing
func (*AgentHealthState) IsCircuitOpen ¶ added in v0.1.9
func (s *AgentHealthState) IsCircuitOpen() bool
IsCircuitOpen returns true if the circuit is open
func (*AgentHealthState) RecordFailure ¶ added in v0.1.9
func (s *AgentHealthState) RecordFailure()
RecordFailure records a failed request
func (*AgentHealthState) RecordSuccess ¶ added in v0.1.9
func (s *AgentHealthState) RecordSuccess()
RecordSuccess records a successful request
type Candidate ¶ added in v0.1.9
type Candidate struct {
AgentID string
Session *reg.AgentSession
Health *AgentHealthState
Available bool
}
Candidate represents an agent that can be selected for routing
type CandidateStats ¶ added in v0.1.9
type CandidateStats struct {
AgentID string `json:"agentId"`
ActiveConnections int32 `json:"activeConnections"`
ConsecutiveFailures int32 `json:"consecutiveFailures"`
HealthScore float64 `json:"healthScore"`
CircuitState string `json:"circuitState"`
Available bool `json:"available"`
}
CandidateStats contains statistics for a single candidate
type CircuitBreakerState ¶ added in v0.1.9
type CircuitBreakerState int32
CircuitBreakerState represents the state of the circuit breaker
const ( // CircuitClosed means the circuit is closed and requests are allowed CircuitClosed CircuitBreakerState = iota // CircuitOpen means the circuit is open and requests are blocked CircuitOpen // CircuitHalfOpen means the circuit is half-open and testing if the agent has recovered CircuitHalfOpen )
func (CircuitBreakerState) String ¶ added in v0.1.9
func (s CircuitBreakerState) String() string
type Dispatcher ¶
type Dispatcher struct {
// contains filtered or unexported fields
}
Dispatcher routes function invocations to live agents discovered via registry store. Now uses NNG instead of gRPC for Agent communication. Supports HA features: health tracking, circuit breaker, load balancing.
func NewDispatcher ¶
func NewDispatcher(store *reg.Store) *Dispatcher
func NewDispatcherWithHA ¶ added in v0.1.9
func NewDispatcherWithHA(store *reg.Store, jobStore JobRoutingStore, haEnabled bool, strategy LoadBalanceStrategy, healthConfig *HealthCheckConfig) *Dispatcher
NewDispatcherWithHA creates a new Dispatcher with HA features enabled
func NewDispatcherWithJobStore ¶
func NewDispatcherWithJobStore(store *reg.Store, jobStore JobRoutingStore) *Dispatcher
NewDispatcherWithJobStore creates a new Dispatcher with optional job routing store
func (*Dispatcher) CancelJob ¶
func (d *Dispatcher) CancelJob(ctx context.Context, jobID string) error
func (*Dispatcher) CleanupOldJobs ¶
func (d *Dispatcher) CleanupOldJobs(ttl time.Duration) error
CleanupOldJobs removes old job routing entries
func (*Dispatcher) Close ¶
func (d *Dispatcher) Close() error
Close closes the dispatcher and its resources
func (*Dispatcher) GetHealthTracker ¶ added in v0.1.9
func (d *Dispatcher) GetHealthTracker() *HealthTracker
GetHealthTracker returns the health tracker (for monitoring/inspection)
func (*Dispatcher) GetLoadBalanceStrategy ¶ added in v0.1.9
func (d *Dispatcher) GetLoadBalanceStrategy() LoadBalanceStrategy
GetLoadBalanceStrategy returns the current load balancing strategy
func (*Dispatcher) GetLoadBalancer ¶ added in v0.1.9
func (d *Dispatcher) GetLoadBalancer() *LoadBalancer
GetLoadBalancer returns the load balancer (for monitoring/inspection)
func (*Dispatcher) InvokeRequest ¶
func (d *Dispatcher) InvokeRequest(ctx context.Context, req *sdkv1.InvokeRequest) (*sdkv1.InvokeResponse, error)
InvokeRequest forwards a fully populated InvokeRequest to a live agent.
func (*Dispatcher) JobAddr ¶
func (d *Dispatcher) JobAddr(jobID string) (string, bool)
JobAddr exposes tracked job routing addresses (primarily for diagnostics).
func (*Dispatcher) ListFunctionAgents ¶
func (d *Dispatcher) ListFunctionAgents(functionID string) []string
ListFunctionAgents returns agent IDs that currently expose the function.
func (*Dispatcher) RegisterJob ¶
func (d *Dispatcher) RegisterJob(jobID, addr string)
RegisterJob registers a job routing (exported method)
func (*Dispatcher) SetHAEnabled ¶ added in v0.1.9
func (d *Dispatcher) SetHAEnabled(enabled bool)
SetHAEnabled enables or disables HA features at runtime
func (*Dispatcher) SetLoadBalanceStrategy ¶ added in v0.1.9
func (d *Dispatcher) SetLoadBalanceStrategy(strategy LoadBalanceStrategy)
SetLoadBalanceStrategy changes the load balancing strategy
func (*Dispatcher) SetTLSConfig ¶
func (d *Dispatcher) SetTLSConfig(cfg *tlsutil.ClientTLSConfig)
func (*Dispatcher) StartJobRequest ¶
func (d *Dispatcher) StartJobRequest(ctx context.Context, req *sdkv1.InvokeRequest) (*sdkv1.StartJobResponse, error)
StartJobRequest forwards a structured InvokeRequest to the agent StartJob RPC.
func (*Dispatcher) Store ¶
func (d *Dispatcher) Store() *reg.Store
func (*Dispatcher) StreamJobRealtime ¶
func (d *Dispatcher) StreamJobRealtime(ctx context.Context, jobID string, fn func(*sdkv1.JobEvent) bool) (bool, error)
StreamJobRealtime forwards job events to the provided callback.
func (*Dispatcher) UnregisterJob ¶
func (d *Dispatcher) UnregisterJob(jobID string)
UnregisterJob unregisters a job routing (exported method)
type FileJobRoutingStore ¶
type FileJobRoutingStore struct {
// contains filtered or unexported fields
}
FileJobRoutingStore implements JobRoutingStore using file-based persistence
func NewFileJobRoutingStore ¶
func NewFileJobRoutingStore(dataDir string) (*FileJobRoutingStore, error)
NewFileJobRoutingStore creates a new file-based job routing store
func (*FileJobRoutingStore) Cleanup ¶
func (s *FileJobRoutingStore) Cleanup(ttl time.Duration) error
func (*FileJobRoutingStore) Close ¶
func (s *FileJobRoutingStore) Close() error
func (*FileJobRoutingStore) Delete ¶
func (s *FileJobRoutingStore) Delete(jobID string) error
func (*FileJobRoutingStore) Get ¶
func (s *FileJobRoutingStore) Get(jobID string) (*JobRouting, error)
func (*FileJobRoutingStore) List ¶
func (s *FileJobRoutingStore) List() ([]*JobRouting, error)
func (*FileJobRoutingStore) Set ¶
func (s *FileJobRoutingStore) Set(jobID, agentAddr string) error
type HealthCheckConfig ¶ added in v0.1.9
type HealthCheckConfig struct {
// Score decay rate per tick (0-1)
ScoreDecayRate float64
// Score bonus on successful request
ScoreSuccessBonus float64
// Score penalty on failed request
ScoreFailurePenalty float64
// Minimum health score (0-100)
MinScore float64
// Maximum health score (0-100)
MaxScore float64
// Score decay interval
DecayInterval time.Duration
// Circuit breaker failure threshold
FailureThreshold int32
// Circuit breaker open timeout
CircuitOpenTimeout time.Duration
// Half-open max requests before transitioning
HalfOpenMaxRequests int32
}
HealthCheckConfig configures the health check behavior
func DefaultHealthCheckConfig ¶ added in v0.1.9
func DefaultHealthCheckConfig() *HealthCheckConfig
DefaultHealthCheckConfig returns the default health check configuration
type HealthStatistics ¶ added in v0.1.9
type HealthStatistics struct {
TotalRequests int64
SuccessfulRequests int64
FailedRequests int64
ActiveConnections int32
ConsecutiveFailures int32
HealthScore float64
CircuitState CircuitBreakerState
LastSuccessTime time.Time
LastFailureTime time.Time
CircuitOpenedAt time.Time
LastStateChange time.Time
}
HealthStatistics contains the current health statistics
type HealthTracker ¶ added in v0.1.9
type HealthTracker struct {
// contains filtered or unexported fields
}
HealthTracker manages health states for all agents
func NewHealthTracker ¶ added in v0.1.9
func NewHealthTracker(config *HealthCheckConfig) *HealthTracker
NewHealthTracker creates a new health tracker
func (*HealthTracker) DecrementConnections ¶ added in v0.1.9
func (t *HealthTracker) DecrementConnections(agentID string)
DecrementConnections decrements the connection count for an agent
func (*HealthTracker) GetAllStates ¶ added in v0.1.9
func (t *HealthTracker) GetAllStates() map[string]*AgentHealthState
GetAllStates returns a snapshot of all agent states
func (*HealthTracker) GetAllStatistics ¶ added in v0.1.9
func (t *HealthTracker) GetAllStatistics() map[string]HealthStatistics
GetAllStatistics returns statistics for all agents
func (*HealthTracker) GetAvailableAgents ¶ added in v0.1.9
func (t *HealthTracker) GetAvailableAgents() []*AgentHealthState
GetAvailableAgents returns all agents that are available for routing
func (*HealthTracker) GetNextRoundRobinIndex ¶ added in v0.1.9
func (t *HealthTracker) GetNextRoundRobinIndex(count int) int32
GetNextRoundRobinIndex returns the next index for round-robin selection
func (*HealthTracker) GetOrCreateState ¶ added in v0.1.9
func (t *HealthTracker) GetOrCreateState(agentID, addr string) *AgentHealthState
GetOrCreateState returns the health state for an agent, creating it if necessary
func (*HealthTracker) GetState ¶ added in v0.1.9
func (t *HealthTracker) GetState(agentID string) (*AgentHealthState, bool)
GetState returns the health state for an agent
func (*HealthTracker) GetStatistics ¶ added in v0.1.9
func (t *HealthTracker) GetStatistics(agentID string) (HealthStatistics, error)
GetStatistics returns statistics for a specific agent
func (*HealthTracker) IncrementConnections ¶ added in v0.1.9
func (t *HealthTracker) IncrementConnections(agentID string)
IncrementConnections increments the connection count for an agent
func (*HealthTracker) RecordFailure ¶ added in v0.1.9
func (t *HealthTracker) RecordFailure(agentID string)
RecordFailure records a failed request for an agent
func (*HealthTracker) RecordSuccess ¶ added in v0.1.9
func (t *HealthTracker) RecordSuccess(agentID string)
RecordSuccess records a successful request for an agent
func (*HealthTracker) RegisterAgent ¶ added in v0.1.9
func (t *HealthTracker) RegisterAgent(agentID, addr string) *AgentHealthState
RegisterAgent registers a new agent or updates an existing one
func (*HealthTracker) Reset ¶ added in v0.1.9
func (t *HealthTracker) Reset(agentID string)
Reset resets the health state for an agent (e.g., after agent re-registration)
func (*HealthTracker) Start ¶ added in v0.1.9
func (t *HealthTracker) Start()
Start starts the health tracker background routines
func (*HealthTracker) Stop ¶ added in v0.1.9
func (t *HealthTracker) Stop()
Stop stops the health tracker
func (*HealthTracker) UnregisterAgent ¶ added in v0.1.9
func (t *HealthTracker) UnregisterAgent(agentID string)
UnregisterAgent removes an agent from tracking
type JobRouting ¶
type JobRouting struct {
JobID string `json:"job_id"`
AgentAddr string `json:"agent_addr"`
CreatedAt time.Time `json:"created_at"`
UpdatedAt time.Time `json:"updated_at"`
}
JobRouting represents a job routing entry
type JobRoutingStore ¶
type JobRoutingStore interface {
// Get retrieves job routing by job ID
Get(jobID string) (*JobRouting, error)
// Set stores or updates job routing
Set(jobID, agentAddr string) error
// Delete removes job routing
Delete(jobID string) error
// List returns all job routings
List() ([]*JobRouting, error)
// Cleanup removes old entries (older than ttl)
Cleanup(ttl time.Duration) error
// Close closes the store
Close() error
}
JobRoutingStore defines the interface for job routing persistence
type LoadBalanceStrategy ¶ added in v0.1.9
type LoadBalanceStrategy string
LoadBalanceStrategy defines the load balancing strategy
const ( // StrategyMinID selects agent with minimum ID (original behavior) StrategyMinID LoadBalanceStrategy = "min_id" // StrategyRoundRobin selects agents in round-robin fashion StrategyRoundRobin LoadBalanceStrategy = "round_robin" // StrategyLeastConn selects agent with least active connections StrategyLeastConn LoadBalanceStrategy = "least_conn" // StrategyWeighted selects agent based on health score weights StrategyWeighted LoadBalanceStrategy = "weighted" )
type LoadBalancer ¶ added in v0.1.9
type LoadBalancer struct {
// contains filtered or unexported fields
}
LoadBalancer selects the best agent for routing based on strategy
func NewLoadBalancer ¶ added in v0.1.9
func NewLoadBalancer(strategy LoadBalanceStrategy, tracker *HealthTracker) *LoadBalancer
NewLoadBalancer creates a new load balancer
func (*LoadBalancer) BuildCandidates ¶ added in v0.1.9
func (lb *LoadBalancer) BuildCandidates(sessions []*reg.AgentSession, functionID string) []*Candidate
BuildCandidates builds candidate list from agent sessions
func (*LoadBalancer) GetStatistics ¶ added in v0.1.9
func (lb *LoadBalancer) GetStatistics() LoadBalancerStats
GetStatistics returns load balancer statistics
func (*LoadBalancer) GetStrategy ¶ added in v0.1.9
func (lb *LoadBalancer) GetStrategy() LoadBalanceStrategy
GetStrategy returns the current load balancing strategy
func (*LoadBalancer) Select ¶ added in v0.1.9
func (lb *LoadBalancer) Select(functionID string, candidates []*Candidate) (*Candidate, error)
Select selects the best agent for the given function
func (*LoadBalancer) SetStrategy ¶ added in v0.1.9
func (lb *LoadBalancer) SetStrategy(strategy LoadBalanceStrategy)
SetStrategy updates the load balancing strategy
type LoadBalancerStats ¶ added in v0.1.9
type LoadBalancerStats struct {
Strategy LoadBalanceStrategy `json:"strategy"`
RoundRobinState map[string]int32 `json:"roundRobinState,omitempty"`
AgentStats map[string]CandidateStats `json:"agentStats,omitempty"`
}
LoadBalancerStats contains load balancer statistics
type MemoryJobRoutingStore ¶
type MemoryJobRoutingStore struct {
// contains filtered or unexported fields
}
MemoryJobRoutingStore implements JobRoutingStore using in-memory storage
func NewMemoryJobRoutingStore ¶
func NewMemoryJobRoutingStore() *MemoryJobRoutingStore
NewMemoryJobRoutingStore creates a new in-memory job routing store
func (*MemoryJobRoutingStore) Cleanup ¶
func (s *MemoryJobRoutingStore) Cleanup(ttl time.Duration) error
func (*MemoryJobRoutingStore) Close ¶
func (s *MemoryJobRoutingStore) Close() error
func (*MemoryJobRoutingStore) Delete ¶
func (s *MemoryJobRoutingStore) Delete(jobID string) error
func (*MemoryJobRoutingStore) Get ¶
func (s *MemoryJobRoutingStore) Get(jobID string) (*JobRouting, error)
func (*MemoryJobRoutingStore) List ¶
func (s *MemoryJobRoutingStore) List() ([]*JobRouting, error)
func (*MemoryJobRoutingStore) Set ¶
func (s *MemoryJobRoutingStore) Set(jobID, agentAddr string) error
type ReconnectionManager ¶ added in v0.1.9
type ReconnectionManager struct {
// contains filtered or unexported fields
}
ReconnectionManager manages reconnection states for multiple connections
func NewReconnectionManager ¶ added in v0.1.9
func NewReconnectionManager(policy *ReconnectionPolicy) *ReconnectionManager
NewReconnectionManager creates a new reconnection manager
func (*ReconnectionManager) GetOrCreateState ¶ added in v0.1.9
func (m *ReconnectionManager) GetOrCreateState(key string) *ReconnectionState
GetOrCreateState gets or creates a reconnection state for the given key
func (*ReconnectionManager) GetPolicy ¶ added in v0.1.9
func (m *ReconnectionManager) GetPolicy() *ReconnectionPolicy
GetPolicy returns the reconnection policy
func (*ReconnectionManager) RemoveState ¶ added in v0.1.9
func (m *ReconnectionManager) RemoveState(key string)
RemoveState removes the reconnection state for the given key
func (*ReconnectionManager) ResetAll ¶ added in v0.1.9
func (m *ReconnectionManager) ResetAll()
ResetAll resets all reconnection states
func (*ReconnectionManager) SetPolicy ¶ added in v0.1.9
func (m *ReconnectionManager) SetPolicy(policy *ReconnectionPolicy)
SetPolicy updates the reconnection policy
func (*ReconnectionManager) Stats ¶ added in v0.1.9
func (m *ReconnectionManager) Stats() map[string]ReconnectionStats
Stats returns reconnection statistics
type ReconnectionPolicy ¶ added in v0.1.9
type ReconnectionPolicy struct {
// MaxRetries is the maximum number of reconnection attempts (-1 for infinite)
MaxRetries int
// InitialDelay is the initial delay before the first retry
InitialDelay time.Duration
// MaxDelay is the maximum delay between retries
MaxDelay time.Duration
// Multiplier is the exponential backoff multiplier (e.g., 2.0 for doubling)
Multiplier float64
// Jitter is the random jitter factor (0-1) to add to delays
Jitter float64
// EnableAutoReconnect enables automatic reconnection
EnableAutoReconnect bool
// contains filtered or unexported fields
}
ReconnectionPolicy defines how reconnection attempts are made
func DefaultReconnectionPolicy ¶ added in v0.1.9
func DefaultReconnectionPolicy() *ReconnectionPolicy
DefaultReconnectionPolicy returns the default reconnection policy
func NewReconnectionPolicy ¶ added in v0.1.9
func NewReconnectionPolicy(maxRetries int, initialDelay, maxDelay time.Duration, multiplier, jitter float64, enableAuto bool) *ReconnectionPolicy
NewReconnectionPolicy creates a new reconnection policy with custom settings
func (*ReconnectionPolicy) NextDelay ¶ added in v0.1.9
func (p *ReconnectionPolicy) NextDelay(attempt int) (time.Duration, error)
NextDelay returns the delay before the next reconnection attempt
func (*ReconnectionPolicy) ShouldRetry ¶ added in v0.1.9
func (p *ReconnectionPolicy) ShouldRetry(attempt int) bool
ShouldRetry returns whether another reconnection attempt should be made
type ReconnectionState ¶ added in v0.1.9
type ReconnectionState struct {
// contains filtered or unexported fields
}
ReconnectionState tracks the reconnection state for a connection
func NewReconnectionState ¶ added in v0.1.9
func NewReconnectionState(policy *ReconnectionPolicy) *ReconnectionState
NewReconnectionState creates a new reconnection state
func (*ReconnectionState) GetAttempt ¶ added in v0.1.9
func (s *ReconnectionState) GetAttempt() int
GetAttempt returns the current attempt count
func (*ReconnectionState) GetLastError ¶ added in v0.1.9
func (s *ReconnectionState) GetLastError() error
GetLastError returns the last error encountered
func (*ReconnectionState) IsEnabled ¶ added in v0.1.9
func (s *ReconnectionState) IsEnabled() bool
IsEnabled returns whether automatic reconnection is enabled
func (*ReconnectionState) NextDelay ¶ added in v0.1.9
func (s *ReconnectionState) NextDelay() (time.Duration, error)
NextDelay returns the delay before the next reconnection attempt
func (*ReconnectionState) RecordAttempt ¶ added in v0.1.9
func (s *ReconnectionState) RecordAttempt(err error)
RecordAttempt records a reconnection attempt
func (*ReconnectionState) Reset ¶ added in v0.1.9
func (s *ReconnectionState) Reset()
Reset resets the reconnection state (e.g., after successful connection)
func (*ReconnectionState) SetEnabled ¶ added in v0.1.9
func (s *ReconnectionState) SetEnabled(enabled bool)
SetEnabled enables or disables automatic reconnection
func (*ReconnectionState) ShouldRetry ¶ added in v0.1.9
func (s *ReconnectionState) ShouldRetry() bool
ShouldRetry returns whether another reconnection attempt should be made