Documentation
¶
Overview ¶
Package routing provides distributed circuit breaker implementation for federation request routing.
Index ¶
- Constants
- Variables
- type AdaptiveLoadBalancer
- func (alb *AdaptiveLoadBalancer) Balance(routes []*types.Route, load int) map[string]int
- func (alb *AdaptiveLoadBalancer) DecrementConnections(routeID string)
- func (alb *AdaptiveLoadBalancer) GetCurrentWeights() map[string]float64
- func (alb *AdaptiveLoadBalancer) IncrementConnections(routeID string)
- func (alb *AdaptiveLoadBalancer) SetAlgorithm(algorithm LoadBalancingAlgorithm)
- func (alb *AdaptiveLoadBalancer) UpdateWeights(metrics map[string]*types.RouteMetrics) error
- type AggregatedHealth
- type BackpressureRule
- type BatchQueryCoordinator
- func (bc *BatchQueryCoordinator) AddInstanceQuery(ctx context.Context, instanceID string, priority int) (interface{}, error)
- func (bc *BatchQueryCoordinator) AddMetricsQuery(ctx context.Context, routeID string, priority int) (interface{}, error)
- func (bc *BatchQueryCoordinator) AddStatusQuery(ctx context.Context, status string, priority int) (interface{}, error)
- func (bc *BatchQueryCoordinator) GetBatchStats() map[string]interface{}
- func (bc *BatchQueryCoordinator) Start()
- func (bc *BatchQueryCoordinator) Stop()
- type CircuitBreaker
- type DeliveryQueue
- type DistributedCircuitBreaker
- func (dcb *DistributedCircuitBreaker) AssessRouteHealthAndAdjustCircuit(ctx context.Context, routeID string, metrics *types.RouteMetrics) error
- func (dcb *DistributedCircuitBreaker) CanAttempt(instanceID string) bool
- func (dcb *DistributedCircuitBreaker) Close(instanceID string) error
- func (dcb *DistributedCircuitBreaker) GetBackpressureRules() map[MessagePriority]BackpressureRule
- func (dcb *DistributedCircuitBreaker) GetMetrics(instanceID string) map[string]any
- func (dcb *DistributedCircuitBreaker) GetStatus(instanceID string) types.CircuitStatus
- func (dcb *DistributedCircuitBreaker) HalfOpen(instanceID string) error
- func (dcb *DistributedCircuitBreaker) IsOpen(instanceID string) bool
- func (dcb *DistributedCircuitBreaker) Open(instanceID string, reason string) error
- func (dcb *DistributedCircuitBreaker) RecordFailure(instanceID string, err error) error
- func (dcb *DistributedCircuitBreaker) RecordSuccess(instanceID string) error
- func (dcb *DistributedCircuitBreaker) ShouldEnterEmergencyMode(healthyRoutes, totalRoutes int) bool
- type FederationInstanceRepository
- type GlobalMetrics
- type HealthChecker
- type HealthSummary
- type InstanceHealthChecker
- func (hc *InstanceHealthChecker) CheckHealth(instance *types.Instance) (*types.HealthStatus, error)
- func (hc *InstanceHealthChecker) GetAggregatedHealth(instanceID string, window time.Duration) (*AggregatedHealth, error)
- func (hc *InstanceHealthChecker) GetHealthHistory(instanceID string, duration time.Duration) ([]*types.HealthStatus, error)
- func (hc *InstanceHealthChecker) StartMonitoring(_ *types.Instance) error
- func (hc *InstanceHealthChecker) StopMonitoring(_ string) error
- type InstanceHealthDetail
- type InstanceMetrics
- type InstanceRegistry
- func (ir *InstanceRegistry) BatchCreateInstances(ctx context.Context, instances []*types.Instance) error
- func (ir *InstanceRegistry) BatchGetInstances(ctx context.Context, instanceIDs []string) ([]*types.Instance, error)
- func (ir *InstanceRegistry) BatchUpdateInstancesHealth(ctx context.Context, healthUpdates map[string]*types.HealthStatus) error
- func (ir *InstanceRegistry) BatchUpdateInstancesUsage(ctx context.Context, usageUpdates map[string]int64) error
- func (ir *InstanceRegistry) ClearCache()
- func (ir *InstanceRegistry) GetCacheStats() map[string]interface{}
- func (ir *InstanceRegistry) GetHealthHistory(ctx context.Context, instanceID string, duration time.Duration) ([]*types.HealthStatus, error)
- func (ir *InstanceRegistry) GetInstance(ctx context.Context, instanceID string) (*types.Instance, error)
- func (ir *InstanceRegistry) GetInstanceByDomain(ctx context.Context, domain string) (*types.Instance, error)
- func (ir *InstanceRegistry) GetInstancesByStatus(ctx context.Context, status types.InstanceStatus, limit int) ([]*types.Instance, error)
- func (ir *InstanceRegistry) GetInstancesByTier(ctx context.Context, tier types.TierLevel, limit int) ([]*types.Instance, error)
- func (ir *InstanceRegistry) ListHealthyInstances(ctx context.Context) ([]*types.Instance, error)
- func (ir *InstanceRegistry) ListInstances(ctx context.Context, limit int, startKey map[string]interface{}) ([]*types.Instance, map[string]interface{}, error)
- func (ir *InstanceRegistry) RegisterInstance(ctx context.Context, instance *types.Instance) error
- func (ir *InstanceRegistry) SearchInstances(ctx context.Context, domainPattern string, limit int) ([]*types.Instance, error)
- func (ir *InstanceRegistry) UnregisterInstance(ctx context.Context, instanceID string) error
- func (ir *InstanceRegistry) UpdateInstance(ctx context.Context, instance *types.Instance) error
- func (ir *InstanceRegistry) UpdateInstanceHealth(ctx context.Context, instanceID string, health *types.HealthStatus) error
- func (ir *InstanceRegistry) UpdateInstanceUsage(ctx context.Context, instanceID string, bytesUsed int64) error
- type LoadBalancer
- type LoadBalancingAlgorithm
- type Manager
- func (m *Manager) CloseCircuit(instanceID string) error
- func (m *Manager) DeliverMessage(ctx context.Context, message *types.FederationMessage, ...) (*types.DeliveryResult, error)
- func (m *Manager) DetectUnhealthyInstances() ([]string, error)
- func (m *Manager) GetCircuitStatus(instanceID string) types.CircuitStatus
- func (m *Manager) GetHealthSummary() (*HealthSummary, error)
- func (m *Manager) GetInstance(instanceID string) (*types.Instance, error)
- func (m *Manager) GetRouteMetrics(destination string) (*types.RouteMetrics, error)
- func (m *Manager) GetRoutes(destination string) ([]*types.Route, error)
- func (m *Manager) ListHealthyInstances() ([]*types.Instance, error)
- func (m *Manager) MonitorInstanceHealth() error
- func (m *Manager) OpenCircuit(instanceID string, reason string) error
- func (m *Manager) OptimizeRoutes() error
- func (m *Manager) PerformHealthCheck(instanceID string) (*types.HealthStatus, error)
- func (m *Manager) RecoverInstances() error
- func (m *Manager) RegisterInstance(instance *types.Instance) error
- func (m *Manager) SelectRoute(destination string, messageType types.MessageType) (*types.Route, error)
- func (m *Manager) UpdateInstanceHealth(instanceID string, health *types.HealthStatus) error
- type ManagerConfig
- type MessagePriority
- type MetricEventType
- type OptimizerConfig
- type QueryOptimizer
- func (qo *QueryOptimizer) GetBatchCoordinator() *BatchQueryCoordinator
- func (qo *QueryOptimizer) InvalidateCache(pattern string)
- func (qo *QueryOptimizer) OptimizedBatchGetInstances(ctx context.Context, instanceIDs []string) ([]*fedTypes.Instance, error)
- func (qo *QueryOptimizer) OptimizedGetInstance(ctx context.Context, instanceID string) (*fedTypes.Instance, error)
- func (qo *QueryOptimizer) OptimizedQueryByStatus(ctx context.Context, status fedTypes.InstanceStatus) ([]*fedTypes.Instance, error)
- func (qo *QueryOptimizer) OptimizedQueryRecentMetrics(ctx context.Context, routeID string, limit int) ([]*fedTypes.DeliveryResult, error)
- func (qo *QueryOptimizer) PrewarmCache(ctx context.Context) error
- func (qo *QueryOptimizer) Shutdown()
- type QueryRequest
- type RecoveryStep
- type RepositoryInterface
- type RouteHealthAssessment
- type RouteHealthStatus
- type RouteManager
- type RouteOptimizer
- type RouteRanking
- type RouteSelector
- type RouteThresholdManager
- func (rtm *RouteThresholdManager) AssessRouteHealth(_ context.Context, routeID string, metrics *types.RouteMetrics) *RouteHealthAssessment
- func (rtm *RouteThresholdManager) CalculateRouteCacheKey(sourceInstance, targetInstance string, activityType types.MessageType, ...) string
- func (rtm *RouteThresholdManager) GetCacheTTLForMessageType(messageType types.MessageType, routeHealth RouteHealthStatus) time.Duration
- func (rtm *RouteThresholdManager) GetEmergencyBackpressureRules() map[MessagePriority]BackpressureRule
- func (rtm *RouteThresholdManager) GetMessageSizeClass(messageSize int64) int
- func (rtm *RouteThresholdManager) GetRecoverySteps() []RecoveryStep
- func (rtm *RouteThresholdManager) ShouldEnterEmergencyMode(healthyRoutes int, totalRoutes int) bool
- type RoutingMetrics
- func (rm *RoutingMetrics) Flush(ctx context.Context) error
- func (rm *RoutingMetrics) GetGlobalMetrics(ctx context.Context, window time.Duration) (*GlobalMetrics, error)
- func (rm *RoutingMetrics) GetInstanceMetrics(ctx context.Context, instanceID string, window time.Duration) (*InstanceMetrics, error)
- func (rm *RoutingMetrics) GetRouteMetrics(ctx context.Context, routeID string, window time.Duration) (*fedTypes.RouteMetrics, error)
- func (rm *RoutingMetrics) RecordCircuitChange(instanceID string, oldState, newState fedTypes.CircuitStatus)
- func (rm *RoutingMetrics) RecordDelivery(result *fedTypes.DeliveryResult)
- func (rm *RoutingMetrics) RecordHealthCheck(instanceID string, health *fedTypes.HealthStatus)
- func (rm *RoutingMetrics) RecordRouteSelection(routeID, destination string, messageType fedTypes.MessageType)
- type SmartRouteOptimizer
- func (sro *SmartRouteOptimizer) EstimateCost(route *types.Route, messageSize int64) float64
- func (sro *SmartRouteOptimizer) GetRouteMetrics(ctx context.Context, routeID string) (*types.RouteMetrics, error)
- func (sro *SmartRouteOptimizer) OptimizeRoutes(ctx context.Context, routes []*types.Route, messageSize int64) ([]*types.Route, error)
- func (sro *SmartRouteOptimizer) PredictLatency(route *types.Route, messageSize int64) time.Duration
- func (sro *SmartRouteOptimizer) RecordDeliveryResult(ctx context.Context, result *types.DeliveryResult) error
- type ThresholdConfig
Constants ¶
const ( QueryTypeInstance = "instance" QueryTypeStatus = "status" QueryTypeMetrics = "metrics" )
Query type constants
Variables ¶
var ( // ErrCircuitStateRetrieveFailed is returned when circuit state cannot be retrieved ErrCircuitStateRetrieveFailed = errors.FailedToGet("circuit state", stdErrors.New("failed to get circuit state")) // ErrCircuitStateSaveFailed is returned when circuit state cannot be saved ErrCircuitStateSaveFailed = errors.FailedToSave("circuit state", stdErrors.New("failed to save circuit state")) // ErrCircuitStateUpdateFailed is returned when circuit state cannot be updated ErrCircuitStateUpdateFailed = errors.FailedToUpdate("circuit state", stdErrors.New("failed to update circuit state")) // ErrInvalidCircuitTransition is returned when attempting an invalid state transition ErrInvalidCircuitTransition = errors.InvalidValue("circuit_state", []string{"open", "half-open", "closed"}, "half-open") )
Circuit breaker errors - consolidated to use centralized error system
var ( // ErrHealthHistoryRetrieveFailed is returned when health history cannot be retrieved ErrHealthHistoryRetrieveFailed = errors.FailedToGet("health history", stdErrors.New("failed to get health history")) // ErrNoHealthDataAvailable is returned when no health data is available for aggregation ErrNoHealthDataAvailable = errors.ResourceUnavailable("health data") // ErrInvalidURL is returned when an instance URL is invalid ErrInvalidURL = errors.URLInvalid("") // ErrHealthCheckRequestFailed is returned when a health check HTTP request fails ErrHealthCheckRequestFailed = errors.NetworkError("health check request", stdErrors.New("health check request failed")) // ErrServerError is returned when the server returns a 5xx status code ErrServerError = errors.ExternalAPIError("health check", 500, stdErrors.New("server error")) // ErrClientError is returned when the server returns a 4xx status code ErrClientError = errors.ExternalAPIError("health check", 400, stdErrors.New("client error")) )
Health checking errors - consolidated to use centralized error system
var ( // ErrBatchQueryFailed is returned when a batch query operation fails ErrBatchQueryFailed = errors.BatchOperationFailed("query", stdErrors.New("batch query failed")) // ErrInstanceNotFound is returned when a requested instance is not found ErrInstanceNotFound = errors.ItemNotFound("instance") // ErrInvalidResultType is returned when query result has unexpected type ErrInvalidResultType = errors.InvalidFormat("result_type", "expected format") // ErrBatchStatusQueryFailed is returned when a batch status query fails ErrBatchStatusQueryFailed = errors.BatchOperationFailed("status query", stdErrors.New("batch status query failed")) // ErrFallbackStatusQueryFailed is returned when fallback status query fails ErrFallbackStatusQueryFailed = errors.FailedToQuery("status", stdErrors.New("fallback status query failed")) // ErrPrewarmActiveInstancesFailed is returned when prewarming active instances fails ErrPrewarmActiveInstancesFailed = errors.ProcessingFailed("instance prewarming", stdErrors.New("instance prewarming failed")) // ErrPrewarmActiveInstancesInMemoryFailed is returned when prewarming active instances in memory fails ErrPrewarmActiveInstancesInMemoryFailed = errors.ProcessingFailed("in-memory instance prewarming", stdErrors.New("in-memory instance prewarming failed")) // ErrCoordinatorStopped is returned when the batch coordinator is stopped ErrCoordinatorStopped = errors.ServiceUnavailable("batch coordinator") // ErrBatchQueryTimeout is returned when a batch query times out ErrBatchQueryTimeout = errors.TimeoutError("batch query") // ErrUnknownQueryType is returned when an unknown query type is encountered ErrUnknownQueryType = errors.InvalidValue("query_type", []string{"batch", "single", "fallback"}, "") // ErrBatchGetInstancesFailed is returned when batch get instances operation fails ErrBatchGetInstancesFailed = errors.BatchOperationFailed("get instances", stdErrors.New("batch get instances failed")) )
Query optimization errors - consolidated to use centralized error system
var ( // ErrInstanceRegistrationFailed is returned when instance registration fails ErrInstanceRegistrationFailed = errors.FailedToCreate("instance", stdErrors.New("failed to register instance")) // ErrInstanceUpdateFailed is returned when instance update fails ErrInstanceUpdateFailed = errors.FailedToUpdate("instance", stdErrors.New("failed to update instance")) // ErrInstanceUnregistrationFailed is returned when instance unregistration fails ErrInstanceUnregistrationFailed = errors.FailedToDelete("instance", stdErrors.New("failed to unregister instance")) // ErrInstanceHealthUpdateFailed is returned when instance health update fails ErrInstanceHealthUpdateFailed = errors.FailedToUpdate("instance health", stdErrors.New("failed to update instance health")) // ErrInstanceUsageUpdateFailed is returned when instance usage update fails ErrInstanceUsageUpdateFailed = errors.FailedToUpdate("instance usage", stdErrors.New("failed to update instance usage")) // ErrInstanceBatchGetFailed is returned when batch instance retrieval fails ErrInstanceBatchGetFailed = errors.BatchOperationFailed("get instances", stdErrors.New("batch get instances failed")) // ErrInstanceBatchCreateFailed is returned when batch instance creation fails ErrInstanceBatchCreateFailed = errors.BatchOperationFailed("create instances", stdErrors.New("batch create instances failed")) // ErrInstanceBatchHealthUpdateFailed is returned when batch health update fails ErrInstanceBatchHealthUpdateFailed = errors.BatchOperationFailed("update instances health", stdErrors.New("batch health update failed")) // ErrInstanceBatchUsageUpdateFailed is returned when batch usage update fails ErrInstanceBatchUsageUpdateFailed = errors.BatchOperationFailed("update instances usage", stdErrors.New("batch usage update failed")) )
Instance registry errors - consolidated to use centralized error system
var ( // ErrGetRoutesFailed is returned when retrieving routes fails ErrGetRoutesFailed = errors.FailedToGet("routes", stdErrors.New("failed to get routes")) // ErrGetInstancesFailed is returned when retrieving instances fails ErrGetInstancesFailed = errors.FailedToGet("instances", stdErrors.New("failed to get instances")) // ErrRegisterInstanceFailed is returned when registering an instance fails ErrRegisterInstanceFailed = errors.FailedToCreate("instance", stdErrors.New("failed to register instance")) // ErrUpdateHealthFailed is returned when updating instance health fails ErrUpdateHealthFailed = errors.FailedToUpdate("health", stdErrors.New("failed to update health")) // ErrGetInstanceFailed is returned when retrieving a specific instance fails ErrGetInstanceFailed = errors.FailedToGet("instance", stdErrors.New("failed to get instance")) // ErrListInstancesFailed is returned when listing instances fails ErrListInstancesFailed = errors.FailedToList("instances", stdErrors.New("failed to list instances")) // ErrNoRoutesForDestination is returned when no routes exist for a destination ErrNoRoutesForDestination = errors.ItemNotFound("route") // ErrHealthRepositoryNotAvailable is returned when health repository is not configured ErrHealthRepositoryNotAvailable = errors.ServiceNotAvailable("health repository") // ErrGetUnhealthyInstancesFailed is returned when retrieving unhealthy instances fails ErrGetUnhealthyInstancesFailed = errors.FailedToGet("unhealthy instances", stdErrors.New("failed to get unhealthy instances")) // ErrNoRoutesAvailable is returned when no routes are available for any target ErrNoRoutesAvailable = errors.ResourceUnavailable("routes") // ErrGetRoutesInEmergencyMode is returned when getting routes fails in emergency mode ErrGetRoutesInEmergencyMode = errors.FailedToGet("routes in emergency mode", stdErrors.New("failed to get routes in emergency mode")) // ErrInvalidInboxURLs is returned when instance has invalid inbox URLs ErrInvalidInboxURLs = errors.URLInvalid("inbox URL") )
Route management errors - consolidated to use centralized error system
var ( // ErrNoMessageTypeSupport is returned when no routes support the message type ErrNoMessageTypeSupport = errors.ResourceUnavailable("routes for message type") // ErrMessageQueuedBackpressure is returned when message is queued due to backpressure ErrMessageQueuedBackpressure = errors.TooManyRequests("message delivery") // ErrMessageQueuedEmergency is returned when message is queued due to emergency mode ErrMessageQueuedEmergency = errors.ServiceUnavailable("message delivery") // ErrMessageDroppedEmergency is returned when message is dropped due to emergency mode ErrMessageDroppedEmergency = errors.ServiceUnavailable("message delivery") // ErrGetSigningActorFailed is returned when getting signing actor fails ErrGetSigningActorFailed = errors.ActorNotFound("") // ErrFederationStoreNotConfigured is returned when federation store is not configured ErrFederationStoreNotConfigured = errors.ConfigurationMissing("federation_store") // ErrExtractUsernameFromActorID is returned when username cannot be extracted from actor ID ErrExtractUsernameFromActorID = errors.ParsingFailed("username from actor ID", stdErrors.New("failed to extract username")) // ErrGetActorFailed is returned when getting actor fails ErrGetActorFailed = errors.ActorFetchFailed("", stdErrors.New("failed to fetch actor")) // ErrMarshalActivityFailed is returned when marshaling activity fails ErrMarshalActivityFailed = errors.MarshalingFailed("activity", stdErrors.New("failed to marshal activity")) // ErrCreateRequestFailed is returned when creating HTTP request fails ErrCreateRequestFailed = errors.ProcessingFailed("request creation", stdErrors.New("request creation failed")) // ErrFederationStoreNotConfiguredForSigning is returned when federation store is not configured for signing ErrFederationStoreNotConfiguredForSigning = errors.ConfigurationMissing("federation_store_signing") // ErrGetPrivateKeyFailed is returned when getting private key fails ErrGetPrivateKeyFailed = errors.SigningKeyNotFound("") // ErrParsePrivateKeyFailed is returned when parsing private key fails ErrParsePrivateKeyFailed = errors.SigningKeyInvalid("") // ErrSignRequestFailed is returned when signing HTTP request fails ErrSignRequestFailed = errors.SignatureVerificationFailed() // ErrSendRequestFailed is returned when sending HTTP request fails ErrSendRequestFailed = errors.NetworkError("send request", stdErrors.New("failed to send request")) // ErrHTTPDeliveryFailed is returned when HTTP delivery fails with an error status ErrHTTPDeliveryFailed = errors.DeliveryFailed("", stdErrors.New("HTTP delivery failed")) // ErrInstanceUnhealthy is returned when instance health check determines instance is unhealthy ErrInstanceUnhealthy = errors.HealthCheckFailed("", stdErrors.New("instance unhealthy")) // ErrHealthCheckFailed is returned when a health check fails for an instance ErrHealthCheckFailed = errors.HealthCheckFailed("", stdErrors.New("health check failed")) )
Message delivery errors - consolidated to use centralized error system
var ( // ErrQueryRouteMetricsFailed is returned when querying route metrics fails ErrQueryRouteMetricsFailed = errors.FailedToQuery("route metrics", stdErrors.New("failed to query route metrics")) // ErrQueryInstanceMetricsFailed is returned when querying instance metrics fails ErrQueryInstanceMetricsFailed = errors.FailedToQuery("instance metrics", stdErrors.New("failed to query instance metrics")) // ErrQueryGlobalMetricsFailed is returned when querying global metrics fails ErrQueryGlobalMetricsFailed = errors.FailedToQuery("global metrics", stdErrors.New("failed to query global metrics")) // ErrPersistMetricsWindowFailed is returned when persisting metrics window fails ErrPersistMetricsWindowFailed = errors.FailedToStore("metrics window", stdErrors.New("failed to persist metrics window")) // ErrBatchWriteMetricsFailed is returned when batch writing metrics fails ErrBatchWriteMetricsFailed = errors.BatchOperationFailed("write metrics", stdErrors.New("batch write metrics failed")) )
Metrics errors - consolidated to use centralized error system
var ( // ErrRecordDeliveryResultFailed is returned when recording delivery result fails ErrRecordDeliveryResultFailed = errors.FailedToStore("delivery result", stdErrors.New("failed to record delivery result")) )
Route optimization errors - consolidated to use centralized error system
Functions ¶
This section is empty.
Types ¶
type AdaptiveLoadBalancer ¶
type AdaptiveLoadBalancer struct {
// contains filtered or unexported fields
}
AdaptiveLoadBalancer implements intelligent load distribution
func NewAdaptiveLoadBalancer ¶
func NewAdaptiveLoadBalancer(logger *zap.Logger) *AdaptiveLoadBalancer
NewAdaptiveLoadBalancer creates a new load balancer
func (*AdaptiveLoadBalancer) DecrementConnections ¶
func (alb *AdaptiveLoadBalancer) DecrementConnections(routeID string)
DecrementConnections decrements active connections for a route
func (*AdaptiveLoadBalancer) GetCurrentWeights ¶
func (alb *AdaptiveLoadBalancer) GetCurrentWeights() map[string]float64
GetCurrentWeights returns current weights for all routes
func (*AdaptiveLoadBalancer) IncrementConnections ¶
func (alb *AdaptiveLoadBalancer) IncrementConnections(routeID string)
IncrementConnections increments active connections for a route
func (*AdaptiveLoadBalancer) SetAlgorithm ¶
func (alb *AdaptiveLoadBalancer) SetAlgorithm(algorithm LoadBalancingAlgorithm)
SetAlgorithm changes the load balancing algorithm
func (*AdaptiveLoadBalancer) UpdateWeights ¶
func (alb *AdaptiveLoadBalancer) UpdateWeights(metrics map[string]*types.RouteMetrics) error
UpdateWeights updates route weights based on metrics
type AggregatedHealth ¶
type AggregatedHealth struct {
InstanceID string
Window time.Duration
SampleCount int
LastCheck time.Time
Availability float64
AvgResponseTime time.Duration
ErrorRate float64
AvgBacklog int
MaxBacklog int
StatusCodes map[int]int
HealthScore float64 // 0-100
}
AggregatedHealth represents aggregated health metrics
type BackpressureRule ¶
type BackpressureRule struct {
Threshold float64 // Health ratio threshold
Action string // "allow", "queue", "drop", "queue_if_below_threshold"
RateLimit time.Duration // Rate limiting interval
QueueDepth int // Maximum queue depth
}
BackpressureRule defines how to handle messages during degraded conditions
type BatchQueryCoordinator ¶
type BatchQueryCoordinator struct {
// contains filtered or unexported fields
}
BatchQueryCoordinator aggregates queries into efficient batches
func NewBatchQueryCoordinator ¶
func NewBatchQueryCoordinator(cacheRepo *repositories.QueryCacheRepository, logger *zap.Logger) *BatchQueryCoordinator
NewBatchQueryCoordinator creates a new batch query coordinator
func (*BatchQueryCoordinator) AddInstanceQuery ¶
func (bc *BatchQueryCoordinator) AddInstanceQuery(ctx context.Context, instanceID string, priority int) (interface{}, error)
AddInstanceQuery adds an instance query to the batch
func (*BatchQueryCoordinator) AddMetricsQuery ¶
func (bc *BatchQueryCoordinator) AddMetricsQuery(ctx context.Context, routeID string, priority int) (interface{}, error)
AddMetricsQuery adds a metrics query to the batch
func (*BatchQueryCoordinator) AddStatusQuery ¶
func (bc *BatchQueryCoordinator) AddStatusQuery(ctx context.Context, status string, priority int) (interface{}, error)
AddStatusQuery adds a status query to the batch
func (*BatchQueryCoordinator) GetBatchStats ¶
func (bc *BatchQueryCoordinator) GetBatchStats() map[string]interface{}
GetBatchStats returns statistics about batch operations
func (*BatchQueryCoordinator) Start ¶
func (bc *BatchQueryCoordinator) Start()
Start begins background batch processing
func (*BatchQueryCoordinator) Stop ¶
func (bc *BatchQueryCoordinator) Stop()
Stop gracefully shuts down the batch coordinator
type CircuitBreaker ¶
type CircuitBreaker interface {
// Circuit control
Open(instanceID string, reason string) error
Close(instanceID string) error
HalfOpen(instanceID string) error
// Status checks
IsOpen(instanceID string) bool
CanAttempt(instanceID string) bool
RecordSuccess(instanceID string) error
RecordFailure(instanceID string, err error) error
// Configuration
SetThreshold(instanceID string, threshold int) error
SetTimeout(instanceID string, timeout time.Duration) error
}
CircuitBreaker implements circuit breaker pattern
type DeliveryQueue ¶
type DeliveryQueue interface {
Enqueue(message *types.FederationMessage, options types.DeliveryOptions) error
Dequeue(count int) ([]*types.QueuedMessage, error)
// Retry management
ScheduleRetry(messageID string, after time.Duration) error
GetRetryMessages() ([]*types.QueuedMessage, error)
// Dead letter queue
MoveToDLQ(messageID string, reason string) error
GetDLQMessages(limit int) ([]*types.QueuedMessage, error)
// Metrics
GetQueueDepth() (int64, error)
GetQueueMetrics() (*types.QueueMetrics, error)
}
DeliveryQueue manages message delivery
type DistributedCircuitBreaker ¶
type DistributedCircuitBreaker struct {
// contains filtered or unexported fields
}
DistributedCircuitBreaker implements circuit breaker pattern with DynamORM persistence
func NewDistributedCircuitBreaker ¶
func NewDistributedCircuitBreaker(repo *repositories.CircuitBreakerRepository, thresholdManager *RouteThresholdManager, logger *zap.Logger, config *models.CircuitBreakerConfig) *DistributedCircuitBreaker
NewDistributedCircuitBreaker creates a new circuit breaker
func (*DistributedCircuitBreaker) AssessRouteHealthAndAdjustCircuit ¶
func (dcb *DistributedCircuitBreaker) AssessRouteHealthAndAdjustCircuit(ctx context.Context, routeID string, metrics *types.RouteMetrics) error
AssessRouteHealthAndAdjustCircuit uses threshold manager to assess route health and adjust circuit state
func (*DistributedCircuitBreaker) CanAttempt ¶
func (dcb *DistributedCircuitBreaker) CanAttempt(instanceID string) bool
CanAttempt checks if a request can be attempted
func (*DistributedCircuitBreaker) Close ¶
func (dcb *DistributedCircuitBreaker) Close(instanceID string) error
Close closes the circuit for an instance
func (*DistributedCircuitBreaker) GetBackpressureRules ¶
func (dcb *DistributedCircuitBreaker) GetBackpressureRules() map[MessagePriority]BackpressureRule
GetBackpressureRules returns backpressure rules for emergency mode
func (*DistributedCircuitBreaker) GetMetrics ¶
func (dcb *DistributedCircuitBreaker) GetMetrics(instanceID string) map[string]any
GetMetrics returns circuit breaker metrics
func (*DistributedCircuitBreaker) GetStatus ¶
func (dcb *DistributedCircuitBreaker) GetStatus(instanceID string) types.CircuitStatus
GetStatus returns the current circuit status
func (*DistributedCircuitBreaker) HalfOpen ¶
func (dcb *DistributedCircuitBreaker) HalfOpen(instanceID string) error
HalfOpen puts the circuit in half-open state for testing
func (*DistributedCircuitBreaker) IsOpen ¶
func (dcb *DistributedCircuitBreaker) IsOpen(instanceID string) bool
IsOpen checks if the circuit is open
func (*DistributedCircuitBreaker) Open ¶
func (dcb *DistributedCircuitBreaker) Open(instanceID string, reason string) error
Open opens the circuit for an instance
func (*DistributedCircuitBreaker) RecordFailure ¶
func (dcb *DistributedCircuitBreaker) RecordFailure(instanceID string, err error) error
RecordFailure records a failed request
func (*DistributedCircuitBreaker) RecordSuccess ¶
func (dcb *DistributedCircuitBreaker) RecordSuccess(instanceID string) error
RecordSuccess records a successful request
func (*DistributedCircuitBreaker) ShouldEnterEmergencyMode ¶
func (dcb *DistributedCircuitBreaker) ShouldEnterEmergencyMode(healthyRoutes, totalRoutes int) bool
ShouldEnterEmergencyMode checks if the system should enter emergency mode
type FederationInstanceRepository ¶
type FederationInstanceRepository interface {
// Instance CRUD operations
CreateInstance(ctx context.Context, instance *types.Instance) error
GetInstance(ctx context.Context, instanceID string) (*types.Instance, error)
GetInstanceByDomain(ctx context.Context, domain string) (*types.Instance, error)
UpdateInstance(ctx context.Context, instance *types.Instance) error
DeleteInstance(ctx context.Context, instanceID string) error
// Instance queries
ListInstancesByStatus(ctx context.Context, status types.InstanceStatus, limit int) ([]*types.Instance, error)
ListHealthyInstances(ctx context.Context) ([]*types.Instance, error)
GetInstancesByTier(ctx context.Context, tier types.TierLevel, limit int) ([]*types.Instance, error)
BatchGetInstances(ctx context.Context, instanceIDs []string) ([]*types.Instance, error)
SearchInstances(ctx context.Context, domainPattern string, limit int) ([]*types.Instance, error)
ListAllInstances(ctx context.Context, limit int, startKey map[string]interface{}) ([]*types.Instance, map[string]interface{}, error)
// Batch operations for efficiency
BatchCreateInstances(ctx context.Context, instances []*types.Instance) error
BatchUpdateInstancesHealth(ctx context.Context, healthUpdates map[string]*types.HealthStatus) error
BatchUpdateInstancesUsage(ctx context.Context, usageUpdates map[string]int64) error
// Instance health and metrics
UpdateInstanceHealth(ctx context.Context, instanceID string, health *types.HealthStatus) error
UpdateInstanceUsage(ctx context.Context, instanceID string, bytesUsed int64) error
GetHealthHistory(ctx context.Context, instanceID string, duration time.Duration) ([]*types.HealthStatus, error)
}
FederationInstanceRepository interface for dependency injection
type GlobalMetrics ¶
type GlobalMetrics struct {
Window time.Duration
TotalMessages int64
TotalBytes int64
TotalCost float64
ActiveRoutes int64
ActiveInstances int64
HourlyVolume map[int]int64
TopRoutes []RouteRanking
LastUpdated time.Time
}
GlobalMetrics represents system-wide metrics
type HealthChecker ¶
type HealthChecker interface {
CheckHealth(instance *types.Instance) (*types.HealthStatus, error)
StartMonitoring(instance *types.Instance) error
StopMonitoring(instanceID string) error
GetHealthHistory(instanceID string, duration time.Duration) ([]*types.HealthStatus, error)
}
HealthChecker monitors instance health
type HealthSummary ¶
type HealthSummary struct {
Timestamp time.Time `json:"timestamp"`
TotalInstances int `json:"total_instances"`
HealthyInstances int `json:"healthy_instances"`
DegradedInstances int `json:"degraded_instances"`
UnhealthyInstances int `json:"unhealthy_instances"`
OverallHealth float64 `json:"overall_health_percentage"`
InstanceDetails map[string]InstanceHealthDetail `json:"instance_details"`
}
HealthSummary represents overall health status of the federation system
type InstanceHealthChecker ¶
type InstanceHealthChecker struct {
// contains filtered or unexported fields
}
InstanceHealthChecker monitors instance health using DynamORM
func NewHealthChecker ¶
func NewHealthChecker(healthRepo *repositories.InstanceHealthRepository, logger *zap.Logger, config *types.RoutingConfig) *InstanceHealthChecker
NewHealthChecker creates a new health checker using DynamORM
func (*InstanceHealthChecker) CheckHealth ¶
func (hc *InstanceHealthChecker) CheckHealth(instance *types.Instance) (*types.HealthStatus, error)
CheckHealth performs a health check on an instance and stores the result
func (*InstanceHealthChecker) GetAggregatedHealth ¶
func (hc *InstanceHealthChecker) GetAggregatedHealth(instanceID string, window time.Duration) (*AggregatedHealth, error)
GetAggregatedHealth returns aggregated health metrics using DynamORM
func (*InstanceHealthChecker) GetHealthHistory ¶
func (hc *InstanceHealthChecker) GetHealthHistory(instanceID string, duration time.Duration) ([]*types.HealthStatus, error)
GetHealthHistory retrieves health history for an instance using DynamORM
func (*InstanceHealthChecker) StartMonitoring ¶
func (hc *InstanceHealthChecker) StartMonitoring(_ *types.Instance) error
StartMonitoring is deprecated - use serverless health checking instead This method is kept for compatibility but does nothing
func (*InstanceHealthChecker) StopMonitoring ¶
func (hc *InstanceHealthChecker) StopMonitoring(_ string) error
StopMonitoring is deprecated - use serverless health checking instead This method is kept for compatibility but does nothing
type InstanceHealthDetail ¶
type InstanceHealthDetail struct {
Domain string `json:"domain"`
LastChecked time.Time `json:"last_checked"`
HealthScore float64 `json:"health_score"`
ResponseTime time.Duration `json:"response_time"`
ErrorRate float64 `json:"error_rate"`
CircuitStatus types.CircuitStatus `json:"circuit_status"`
}
InstanceHealthDetail represents detailed health information for a single instance
type InstanceMetrics ¶
type InstanceMetrics struct {
InstanceID string
Window time.Duration
TotalMessages int64
TotalBytes int64
TotalCost float64
Availability float64
MessageTypes map[fedTypes.MessageType]int64
LastUpdated time.Time
}
InstanceMetrics represents metrics for a specific instance
type InstanceRegistry ¶
type InstanceRegistry struct {
// contains filtered or unexported fields
}
InstanceRegistry manages federated instance data using DynamORM
func NewInstanceRegistry ¶
func NewInstanceRegistry(repo FederationInstanceRepository, logger *zap.Logger) *InstanceRegistry
NewInstanceRegistry creates a new instance registry
func (*InstanceRegistry) BatchCreateInstances ¶
func (ir *InstanceRegistry) BatchCreateInstances(ctx context.Context, instances []*types.Instance) error
BatchCreateInstances creates multiple instances efficiently for federation discovery
func (*InstanceRegistry) BatchGetInstances ¶
func (ir *InstanceRegistry) BatchGetInstances(ctx context.Context, instanceIDs []string) ([]*types.Instance, error)
BatchGetInstances retrieves multiple instances efficiently
func (*InstanceRegistry) BatchUpdateInstancesHealth ¶
func (ir *InstanceRegistry) BatchUpdateInstancesHealth(ctx context.Context, healthUpdates map[string]*types.HealthStatus) error
BatchUpdateInstancesHealth updates health status for multiple instances efficiently
func (*InstanceRegistry) BatchUpdateInstancesUsage ¶
func (ir *InstanceRegistry) BatchUpdateInstancesUsage(ctx context.Context, usageUpdates map[string]int64) error
BatchUpdateInstancesUsage updates usage counters for multiple instances efficiently
func (*InstanceRegistry) ClearCache ¶
func (ir *InstanceRegistry) ClearCache()
ClearCache clears the local cache (useful for testing)
func (*InstanceRegistry) GetCacheStats ¶
func (ir *InstanceRegistry) GetCacheStats() map[string]interface{}
GetCacheStats returns cache statistics for monitoring
func (*InstanceRegistry) GetHealthHistory ¶
func (ir *InstanceRegistry) GetHealthHistory(ctx context.Context, instanceID string, duration time.Duration) ([]*types.HealthStatus, error)
GetHealthHistory retrieves health history for an instance
func (*InstanceRegistry) GetInstance ¶
func (ir *InstanceRegistry) GetInstance(ctx context.Context, instanceID string) (*types.Instance, error)
GetInstance retrieves an instance by ID with caching
func (*InstanceRegistry) GetInstanceByDomain ¶
func (ir *InstanceRegistry) GetInstanceByDomain(ctx context.Context, domain string) (*types.Instance, error)
GetInstanceByDomain retrieves an instance by domain name
func (*InstanceRegistry) GetInstancesByStatus ¶
func (ir *InstanceRegistry) GetInstancesByStatus(ctx context.Context, status types.InstanceStatus, limit int) ([]*types.Instance, error)
GetInstancesByStatus retrieves instances by status
func (*InstanceRegistry) GetInstancesByTier ¶
func (ir *InstanceRegistry) GetInstancesByTier(ctx context.Context, tier types.TierLevel, limit int) ([]*types.Instance, error)
GetInstancesByTier retrieves instances by tier level
func (*InstanceRegistry) ListHealthyInstances ¶
ListHealthyInstances returns all healthy instances
func (*InstanceRegistry) ListInstances ¶
func (ir *InstanceRegistry) ListInstances(ctx context.Context, limit int, startKey map[string]interface{}) ([]*types.Instance, map[string]interface{}, error)
ListInstances returns all instances with optional pagination
func (*InstanceRegistry) RegisterInstance ¶
RegisterInstance registers a new federated instance
func (*InstanceRegistry) SearchInstances ¶
func (ir *InstanceRegistry) SearchInstances(ctx context.Context, domainPattern string, limit int) ([]*types.Instance, error)
SearchInstances searches for instances by domain pattern
func (*InstanceRegistry) UnregisterInstance ¶
func (ir *InstanceRegistry) UnregisterInstance(ctx context.Context, instanceID string) error
UnregisterInstance removes an instance
func (*InstanceRegistry) UpdateInstance ¶
UpdateInstance updates an existing instance
func (*InstanceRegistry) UpdateInstanceHealth ¶
func (ir *InstanceRegistry) UpdateInstanceHealth(ctx context.Context, instanceID string, health *types.HealthStatus) error
UpdateInstanceHealth updates instance health metrics
func (*InstanceRegistry) UpdateInstanceUsage ¶
func (ir *InstanceRegistry) UpdateInstanceUsage(ctx context.Context, instanceID string, bytesUsed int64) error
UpdateInstanceUsage updates usage counters
type LoadBalancer ¶
type LoadBalancer interface {
Balance(routes []*types.Route, load int) map[string]int
UpdateWeights(metrics map[string]*types.RouteMetrics) error
GetCurrentWeights() map[string]float64
}
LoadBalancer distributes load across routes
type LoadBalancingAlgorithm ¶
type LoadBalancingAlgorithm string
LoadBalancingAlgorithm represents the type of load balancing algorithm
const ( AlgorithmRoundRobin LoadBalancingAlgorithm = "round_robin" AlgorithmWeightedRandom LoadBalancingAlgorithm = "weighted_random" AlgorithmLeastConnections LoadBalancingAlgorithm = "least_connections" AlgorithmAdaptive LoadBalancingAlgorithm = "adaptive" )
Load balancing algorithm constants
type Manager ¶
type Manager struct {
// contains filtered or unexported fields
}
Manager implements the RouteManager interface
func NewManager ¶
func NewManager( instanceRepo FederationInstanceRepository, instanceHealthRepo interface{}, circuitBreakerRepo *repositories.CircuitBreakerRepository, routeOptimRepo *repositories.RouteOptimizerRepository, routingMetricsRepo *repositories.RoutingMetricsRepository, costTrackingRepo *repositories.FederationCostRepository, logger *zap.Logger, config *ManagerConfig, ) *Manager
NewManager creates a new route manager with dependency injection
func (*Manager) CloseCircuit ¶
CloseCircuit closes the circuit for an instance
func (*Manager) DeliverMessage ¶
func (m *Manager) DeliverMessage(ctx context.Context, message *types.FederationMessage, options types.DeliveryOptions) (*types.DeliveryResult, error)
DeliverMessage delivers a federation message using optimal routing
func (*Manager) DetectUnhealthyInstances ¶
DetectUnhealthyInstances identifies instances that should be removed from rotation
func (*Manager) GetCircuitStatus ¶
func (m *Manager) GetCircuitStatus(instanceID string) types.CircuitStatus
GetCircuitStatus returns the circuit status for an instance
func (*Manager) GetHealthSummary ¶
func (m *Manager) GetHealthSummary() (*HealthSummary, error)
GetHealthSummary provides comprehensive health overview
func (*Manager) GetInstance ¶
GetInstance retrieves instance information
func (*Manager) GetRouteMetrics ¶
func (m *Manager) GetRouteMetrics(destination string) (*types.RouteMetrics, error)
GetRouteMetrics retrieves metrics for a destination
func (*Manager) ListHealthyInstances ¶
ListHealthyInstances lists all healthy instances
func (*Manager) MonitorInstanceHealth ¶
MonitorInstanceHealth performs continuous health monitoring for all instances
func (*Manager) OpenCircuit ¶
OpenCircuit opens the circuit for an instance
func (*Manager) OptimizeRoutes ¶
OptimizeRoutes triggers route optimization
func (*Manager) PerformHealthCheck ¶
func (m *Manager) PerformHealthCheck(instanceID string) (*types.HealthStatus, error)
PerformHealthCheck performs a comprehensive health check on an instance
func (*Manager) RecoverInstances ¶
RecoverInstances attempts to recover instances from unhealthy state
func (*Manager) RegisterInstance ¶
RegisterInstance registers a new federated instance
func (*Manager) SelectRoute ¶
func (m *Manager) SelectRoute(destination string, messageType types.MessageType) (*types.Route, error)
SelectRoute selects the best route for a destination with emergency mode handling
func (*Manager) UpdateInstanceHealth ¶
func (m *Manager) UpdateInstanceHealth(instanceID string, health *types.HealthStatus) error
UpdateInstanceHealth updates instance health metrics
type ManagerConfig ¶
type ManagerConfig struct {
RoutingConfig *types.RoutingConfig
OptimizerConfig *OptimizerConfig
CircuitBreakerConfig *models.CircuitBreakerConfig
CacheTTL time.Duration
FederationStore federation.FederationStorage
}
ManagerConfig holds configuration for the route manager
type MessagePriority ¶
type MessagePriority int
MessagePriority represents priority levels for message prioritization during degraded conditions
const ( // PriorityCritical for direct replies and mentions PriorityCritical MessagePriority = iota // PriorityHigh for follows and likes from verified accounts PriorityHigh // PriorityNormal for regular posts and boosts PriorityNormal // PriorityLow for deletes and updates to old content PriorityLow )
Message priority levels
type MetricEventType ¶
type MetricEventType string
MetricEventType represents the type of metric event
const ( EventRouteSelected MetricEventType = "route_selected" EventDeliveryResult MetricEventType = "delivery_result" EventCircuitChange MetricEventType = "circuit_change" EventHealthCheck MetricEventType = "health_check" )
Metric event types
type OptimizerConfig ¶
type OptimizerConfig struct {
// Weights for scoring
LatencyWeight float64
ReliabilityWeight float64
CostWeight float64
// Thresholds
MaxAcceptableLatency time.Duration
MinAcceptableSuccess float64
MaxCostPerMB float64
// Learning parameters
HistoryWindow time.Duration
MinSamplesRequired int
AdaptationRate float64
}
OptimizerConfig contains configuration for route optimization
type QueryOptimizer ¶
type QueryOptimizer struct {
// contains filtered or unexported fields
}
QueryOptimizer optimizes DynamoDB query patterns for federation routing
func NewQueryOptimizer ¶
func NewQueryOptimizer(cacheRepo *repositories.QueryCacheRepository, logger *zap.Logger) *QueryOptimizer
NewQueryOptimizer creates a new query optimizer
func (*QueryOptimizer) GetBatchCoordinator ¶
func (qo *QueryOptimizer) GetBatchCoordinator() *BatchQueryCoordinator
GetBatchCoordinator returns the batch coordinator (for testing)
func (*QueryOptimizer) InvalidateCache ¶
func (qo *QueryOptimizer) InvalidateCache(pattern string)
InvalidateCache invalidates cache entries matching a pattern
func (*QueryOptimizer) OptimizedBatchGetInstances ¶
func (qo *QueryOptimizer) OptimizedBatchGetInstances(ctx context.Context, instanceIDs []string) ([]*fedTypes.Instance, error)
OptimizedBatchGetInstances retrieves multiple instances efficiently
func (*QueryOptimizer) OptimizedGetInstance ¶
func (qo *QueryOptimizer) OptimizedGetInstance(ctx context.Context, instanceID string) (*fedTypes.Instance, error)
OptimizedGetInstance retrieves an instance with caching and batching
func (*QueryOptimizer) OptimizedQueryByStatus ¶
func (qo *QueryOptimizer) OptimizedQueryByStatus(ctx context.Context, status fedTypes.InstanceStatus) ([]*fedTypes.Instance, error)
OptimizedQueryByStatus queries instances by status with result caching
func (*QueryOptimizer) OptimizedQueryRecentMetrics ¶
func (qo *QueryOptimizer) OptimizedQueryRecentMetrics(ctx context.Context, routeID string, limit int) ([]*fedTypes.DeliveryResult, error)
OptimizedQueryRecentMetrics queries recent metrics with intelligent pagination
func (*QueryOptimizer) PrewarmCache ¶
func (qo *QueryOptimizer) PrewarmCache(ctx context.Context) error
PrewarmCache preloads frequently accessed data
func (*QueryOptimizer) Shutdown ¶
func (qo *QueryOptimizer) Shutdown()
Shutdown gracefully shuts down the query optimizer
type QueryRequest ¶
type QueryRequest struct {
Type string
Key string
Priority int
Deadline time.Time
Context context.Context
Callback chan interface{}
}
QueryRequest represents a query that can be batched
type RecoveryStep ¶
type RecoveryStep struct {
Load float64 // Traffic load percentage (0.0-1.0)
Duration time.Duration // How long to maintain this load
Description string // Human-readable description
}
RecoveryStep represents a step in the gradual recovery process
type RepositoryInterface ¶
type RepositoryInterface interface {
RecordDeliveryResult(ctx context.Context, result *types.DeliveryResult) error
GetRouteMetrics(ctx context.Context, routeID string) (*types.RouteMetrics, error)
GetRoutePerformance(ctx context.Context, routeID string) (interface{}, error)
StoreOptimizationDecision(ctx context.Context, routes []*types.Route, messageSize int64) error
}
RepositoryInterface defines the methods needed from a repository
type RouteHealthAssessment ¶
type RouteHealthAssessment struct {
RouteID string
Status RouteHealthStatus
SuccessRate float64
AvgLatency time.Duration
P95Latency time.Duration
P99Latency time.Duration
SampleCount int
LastUpdated time.Time
CacheTTL time.Duration
RecommendedAction string
DegradationReason string
}
RouteHealthAssessment contains the complete health assessment of a route
type RouteHealthStatus ¶
type RouteHealthStatus int
RouteHealthStatus represents the current health status of a route
const ( // RouteHealthUnknown indicates unknown health status RouteHealthUnknown RouteHealthStatus = iota // RouteHealthPreferred indicates preferred route status RouteHealthPreferred // RouteHealthHealthy indicates healthy route status RouteHealthHealthy // RouteHealthMonitored indicates route is being monitored RouteHealthMonitored // RouteHealthDegraded indicates degraded performance RouteHealthDegraded // RouteHealthCritical indicates critical issues RouteHealthCritical // RouteHealthEmergency indicates emergency status RouteHealthEmergency )
Route health status levels
func (RouteHealthStatus) String ¶
func (rhs RouteHealthStatus) String() string
String returns the string representation of RouteHealthStatus
type RouteManager ¶
type RouteManager interface {
// Route selection
SelectRoute(destination string, messageType types.MessageType) (*types.Route, error)
GetRoutes(destination string) ([]*types.Route, error)
// Instance management
RegisterInstance(instance *types.Instance) error
UpdateInstanceHealth(instanceID string, health *types.HealthStatus) error
GetInstance(instanceID string) (*types.Instance, error)
ListHealthyInstances() ([]*types.Instance, error)
// Route optimization
OptimizeRoutes() error
GetRouteMetrics(destination string) (*types.RouteMetrics, error)
// Circuit breaker
OpenCircuit(instanceID string, reason string) error
CloseCircuit(instanceID string) error
GetCircuitStatus(instanceID string) types.CircuitStatus
}
RouteManager manages federation message routing
type RouteOptimizer ¶
type RouteOptimizer interface {
Optimize(routes []*types.Route, history []*types.DeliveryResult) ([]*types.Route, error)
PredictLatency(route *types.Route, messageSize int64) time.Duration
EstimateCost(route *types.Route, messageSize int64) float64
RecommendBatchSize(route *types.Route) int
}
RouteOptimizer optimizes routing decisions
type RouteRanking ¶
type RouteRanking struct {
RouteID string
SuccessRate float64
MessageCount int64
AvgLatency time.Duration
}
RouteRanking represents a route's performance ranking
type RouteSelector ¶
type RouteSelector interface {
SelectBestRoute(routes []*types.Route, options types.SelectionOptions) (*types.Route, error)
RankRoutes(routes []*types.Route) []*types.Route
}
RouteSelector implements routing algorithms
type RouteThresholdManager ¶
type RouteThresholdManager struct {
// contains filtered or unexported fields
}
RouteThresholdManager manages route health thresholds and decisions based on guidance document
func NewRouteThresholdManager ¶
func NewRouteThresholdManager(logger *zap.Logger, config *ThresholdConfig) *RouteThresholdManager
NewRouteThresholdManager creates a new threshold manager with guidance document defaults
func (*RouteThresholdManager) AssessRouteHealth ¶
func (rtm *RouteThresholdManager) AssessRouteHealth(_ context.Context, routeID string, metrics *types.RouteMetrics) *RouteHealthAssessment
AssessRouteHealth assesses the health of a route based on metrics
func (*RouteThresholdManager) CalculateRouteCacheKey ¶
func (rtm *RouteThresholdManager) CalculateRouteCacheKey(sourceInstance, targetInstance string, activityType types.MessageType, messageSizeClass int) string
CalculateRouteCacheKey generates cache key considering all factors from guidance
func (*RouteThresholdManager) GetCacheTTLForMessageType ¶
func (rtm *RouteThresholdManager) GetCacheTTLForMessageType(messageType types.MessageType, routeHealth RouteHealthStatus) time.Duration
GetCacheTTLForMessageType returns appropriate cache TTL based on message priority
func (*RouteThresholdManager) GetEmergencyBackpressureRules ¶
func (rtm *RouteThresholdManager) GetEmergencyBackpressureRules() map[MessagePriority]BackpressureRule
GetEmergencyBackpressureRules returns backpressure rules for emergency mode
func (*RouteThresholdManager) GetMessageSizeClass ¶
func (rtm *RouteThresholdManager) GetMessageSizeClass(messageSize int64) int
GetMessageSizeClass classifies message size for cache key generation
func (*RouteThresholdManager) GetRecoverySteps ¶
func (rtm *RouteThresholdManager) GetRecoverySteps() []RecoveryStep
GetRecoverySteps returns the gradual recovery steps from guidance document
func (*RouteThresholdManager) ShouldEnterEmergencyMode ¶
func (rtm *RouteThresholdManager) ShouldEnterEmergencyMode(healthyRoutes int, totalRoutes int) bool
ShouldEnterEmergencyMode determines if the system should enter emergency mode
type RoutingMetrics ¶
type RoutingMetrics struct {
// contains filtered or unexported fields
}
RoutingMetrics tracks and aggregates routing performance metrics
func NewRoutingMetrics ¶
func NewRoutingMetrics(db core.DB, logger *zap.Logger) *RoutingMetrics
NewRoutingMetrics creates a new metrics tracker
func (*RoutingMetrics) Flush ¶
func (rm *RoutingMetrics) Flush(ctx context.Context) error
Flush manually flushes accumulated metrics to DynamoDB (via TableTheory). This should be called at the end of Lambda invocations
func (*RoutingMetrics) GetGlobalMetrics ¶
func (rm *RoutingMetrics) GetGlobalMetrics(ctx context.Context, window time.Duration) (*GlobalMetrics, error)
GetGlobalMetrics retrieves system-wide metrics
func (*RoutingMetrics) GetInstanceMetrics ¶
func (rm *RoutingMetrics) GetInstanceMetrics(ctx context.Context, instanceID string, window time.Duration) (*InstanceMetrics, error)
GetInstanceMetrics retrieves metrics for a specific instance
func (*RoutingMetrics) GetRouteMetrics ¶
func (rm *RoutingMetrics) GetRouteMetrics(ctx context.Context, routeID string, window time.Duration) (*fedTypes.RouteMetrics, error)
GetRouteMetrics retrieves metrics for a specific route
func (*RoutingMetrics) RecordCircuitChange ¶
func (rm *RoutingMetrics) RecordCircuitChange(instanceID string, oldState, newState fedTypes.CircuitStatus)
RecordCircuitChange records a circuit breaker state change
func (*RoutingMetrics) RecordDelivery ¶
func (rm *RoutingMetrics) RecordDelivery(result *fedTypes.DeliveryResult)
RecordDelivery records a delivery result
func (*RoutingMetrics) RecordHealthCheck ¶
func (rm *RoutingMetrics) RecordHealthCheck(instanceID string, health *fedTypes.HealthStatus)
RecordHealthCheck records a health check result
func (*RoutingMetrics) RecordRouteSelection ¶
func (rm *RoutingMetrics) RecordRouteSelection(routeID, destination string, messageType fedTypes.MessageType)
RecordRouteSelection records a route selection event
type SmartRouteOptimizer ¶
type SmartRouteOptimizer struct {
// contains filtered or unexported fields
}
SmartRouteOptimizer implements intelligent route optimization
func NewSmartRouteOptimizer ¶
func NewSmartRouteOptimizer(repo *repositories.RouteOptimizerRepository, logger *zap.Logger, config *OptimizerConfig) *SmartRouteOptimizer
NewSmartRouteOptimizer creates a new route optimizer with concrete repository
func NewSmartRouteOptimizerFromInterface ¶
func NewSmartRouteOptimizerFromInterface(repo RepositoryInterface, logger *zap.Logger, config *OptimizerConfig) *SmartRouteOptimizer
NewSmartRouteOptimizerFromInterface creates a new route optimizer from interface
func (*SmartRouteOptimizer) EstimateCost ¶
func (sro *SmartRouteOptimizer) EstimateCost(route *types.Route, messageSize int64) float64
EstimateCost estimates the cost of sending a message through a route
func (*SmartRouteOptimizer) GetRouteMetrics ¶
func (sro *SmartRouteOptimizer) GetRouteMetrics(ctx context.Context, routeID string) (*types.RouteMetrics, error)
GetRouteMetrics retrieves detailed metrics for a route
func (*SmartRouteOptimizer) OptimizeRoutes ¶
func (sro *SmartRouteOptimizer) OptimizeRoutes(ctx context.Context, routes []*types.Route, messageSize int64) ([]*types.Route, error)
OptimizeRoutes optimizes route selection based on historical performance
func (*SmartRouteOptimizer) PredictLatency ¶
PredictLatency predicts latency for a route based on historical data
func (*SmartRouteOptimizer) RecordDeliveryResult ¶
func (sro *SmartRouteOptimizer) RecordDeliveryResult(ctx context.Context, result *types.DeliveryResult) error
RecordDeliveryResult records the result of a delivery for learning
type ThresholdConfig ¶
type ThresholdConfig struct {
// Latency Thresholds (from guidance document)
P95LatencyThreshold time.Duration // 5s - Trigger route change
P99LatencyThreshold time.Duration // 10s - Hard limit before timeout
AvgLatencyThreshold time.Duration // 2s - Sustained poor performance
DegradationWindow time.Duration // 5min - Window for degradation detection
DegradationIncrease float64 // 0.5 - 50% increase triggers degradation
// Success Rate Thresholds
CriticalSuccessRate float64 // 0.5 - < 50% = Open circuit immediately
DegradedSuccessRate float64 // 0.7 - < 70% = Mark as degraded, reduce traffic
MonitorSuccessRate float64 // 0.9 - < 90% = Monitor closely
PreferredSuccessRate float64 // 0.95 - > 95% = Preferred route
// Decision Windows
SampleWindow time.Duration // 5min or last 100 requests
MinSamplesRequired int // 10 - Minimum samples before decisions
// Cache TTL Strategy
HealthyRouteTTL time.Duration // 5min - Stable routes
DegradedRouteTTL time.Duration // 30s - Re-evaluate frequently
UnknownRouteTTL time.Duration // 1min - New routes
HighPriorityTTL time.Duration // 10s - Direct messages, mentions
NormalPriorityTTL time.Duration // 2min - Regular posts
LowPriorityTTL time.Duration // 10min - Bulk updates, deletes
// Emergency Mode Configuration
EmergencyThreshold float64 // 0.3 - When to enter emergency mode
RecoveryProbeInterval time.Duration // 30s - Probe interval during recovery
RecoverySuccessThreshold int // 3 - Consecutive successes to mark healthy
}
ThresholdConfig defines the thresholds from the route optimization guidance
func DefaultThresholdConfig ¶
func DefaultThresholdConfig() *ThresholdConfig
DefaultThresholdConfig returns the default configuration from the guidance document