dispatch

package
v0.1.9 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 19, 2026 License: Apache-2.0 Imports: 20 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type AgentHealthState added in v0.1.9

type AgentHealthState struct {
	// Immutable fields
	AgentID string
	Addr    string
	// contains filtered or unexported fields
}

AgentHealthState tracks the health state of a single agent

func NewAgentHealthState added in v0.1.9

func NewAgentHealthState(agentID, addr string, config *HealthCheckConfig) *AgentHealthState

NewAgentHealthState creates a new agent health state

func (*AgentHealthState) ActiveConnections added in v0.1.9

func (s *AgentHealthState) ActiveConnections() int32

ActiveConnections returns the current number of active connections

func (*AgentHealthState) CircuitState added in v0.1.9

func (s *AgentHealthState) CircuitState() CircuitBreakerState

CircuitState returns the current circuit breaker state

func (*AgentHealthState) ConsecutiveFailures added in v0.1.9

func (s *AgentHealthState) ConsecutiveFailures() int32

ConsecutiveFailures returns the current consecutive failure count

func (*AgentHealthState) DecrementConnections added in v0.1.9

func (s *AgentHealthState) DecrementConnections()

DecrementConnections decrements the active connection count

func (*AgentHealthState) GetStatistics added in v0.1.9

func (s *AgentHealthState) GetStatistics() HealthStatistics

GetStatistics returns the current statistics

func (*AgentHealthState) HealthScore added in v0.1.9

func (s *AgentHealthState) HealthScore() float64

HealthScore returns the current health score (0-100)

func (*AgentHealthState) IncrementConnections added in v0.1.9

func (s *AgentHealthState) IncrementConnections()

IncrementConnections increments the active connection count

func (*AgentHealthState) IsAvailable added in v0.1.9

func (s *AgentHealthState) IsAvailable() bool

IsAvailable returns true if the agent is available for routing

func (*AgentHealthState) IsCircuitOpen added in v0.1.9

func (s *AgentHealthState) IsCircuitOpen() bool

IsCircuitOpen returns true if the circuit is open

func (*AgentHealthState) RecordFailure added in v0.1.9

func (s *AgentHealthState) RecordFailure()

RecordFailure records a failed request

func (*AgentHealthState) RecordSuccess added in v0.1.9

func (s *AgentHealthState) RecordSuccess()

RecordSuccess records a successful request

type Candidate added in v0.1.9

type Candidate struct {
	AgentID   string
	Session   *reg.AgentSession
	Health    *AgentHealthState
	Available bool
}

Candidate represents an agent that can be selected for routing

type CandidateStats added in v0.1.9

type CandidateStats struct {
	AgentID             string  `json:"agentId"`
	ActiveConnections   int32   `json:"activeConnections"`
	ConsecutiveFailures int32   `json:"consecutiveFailures"`
	HealthScore         float64 `json:"healthScore"`
	CircuitState        string  `json:"circuitState"`
	Available           bool    `json:"available"`
}

CandidateStats contains statistics for a single candidate

type CircuitBreakerState added in v0.1.9

type CircuitBreakerState int32

CircuitBreakerState represents the state of the circuit breaker

const (
	// CircuitClosed means the circuit is closed and requests are allowed
	CircuitClosed CircuitBreakerState = iota
	// CircuitOpen means the circuit is open and requests are blocked
	CircuitOpen
	// CircuitHalfOpen means the circuit is half-open and testing if the agent has recovered
	CircuitHalfOpen
)

func (CircuitBreakerState) String added in v0.1.9

func (s CircuitBreakerState) String() string

type Dispatcher

type Dispatcher struct {
	// contains filtered or unexported fields
}

Dispatcher routes function invocations to live agents discovered via registry store. Now uses NNG instead of gRPC for Agent communication. Supports HA features: health tracking, circuit breaker, load balancing.

func NewDispatcher

func NewDispatcher(store *reg.Store) *Dispatcher

func NewDispatcherWithHA added in v0.1.9

func NewDispatcherWithHA(store *reg.Store, jobStore JobRoutingStore, haEnabled bool, strategy LoadBalanceStrategy, healthConfig *HealthCheckConfig) *Dispatcher

NewDispatcherWithHA creates a new Dispatcher with HA features enabled

func NewDispatcherWithJobStore

func NewDispatcherWithJobStore(store *reg.Store, jobStore JobRoutingStore) *Dispatcher

NewDispatcherWithJobStore creates a new Dispatcher with optional job routing store

func (*Dispatcher) CancelJob

func (d *Dispatcher) CancelJob(ctx context.Context, jobID string) error

func (*Dispatcher) CleanupOldJobs

func (d *Dispatcher) CleanupOldJobs(ttl time.Duration) error

CleanupOldJobs removes old job routing entries

func (*Dispatcher) Close

func (d *Dispatcher) Close() error

Close closes the dispatcher and its resources

func (*Dispatcher) GetHealthTracker added in v0.1.9

func (d *Dispatcher) GetHealthTracker() *HealthTracker

GetHealthTracker returns the health tracker (for monitoring/inspection)

func (*Dispatcher) GetLoadBalanceStrategy added in v0.1.9

func (d *Dispatcher) GetLoadBalanceStrategy() LoadBalanceStrategy

GetLoadBalanceStrategy returns the current load balancing strategy

func (*Dispatcher) GetLoadBalancer added in v0.1.9

func (d *Dispatcher) GetLoadBalancer() *LoadBalancer

GetLoadBalancer returns the load balancer (for monitoring/inspection)

func (*Dispatcher) Invoke

func (d *Dispatcher) Invoke(ctx context.Context, functionID string, payload []byte) ([]byte, error)

func (*Dispatcher) InvokeRequest

func (d *Dispatcher) InvokeRequest(ctx context.Context, req *sdkv1.InvokeRequest) (*sdkv1.InvokeResponse, error)

InvokeRequest forwards a fully populated InvokeRequest to a live agent.

func (*Dispatcher) JobAddr

func (d *Dispatcher) JobAddr(jobID string) (string, bool)

JobAddr exposes tracked job routing addresses (primarily for diagnostics).

func (*Dispatcher) ListFunctionAgents

func (d *Dispatcher) ListFunctionAgents(functionID string) []string

ListFunctionAgents returns agent IDs that currently expose the function.

func (*Dispatcher) RegisterJob

func (d *Dispatcher) RegisterJob(jobID, addr string)

RegisterJob registers a job routing (exported method)

func (*Dispatcher) SetHAEnabled added in v0.1.9

func (d *Dispatcher) SetHAEnabled(enabled bool)

SetHAEnabled enables or disables HA features at runtime

func (*Dispatcher) SetLoadBalanceStrategy added in v0.1.9

func (d *Dispatcher) SetLoadBalanceStrategy(strategy LoadBalanceStrategy)

SetLoadBalanceStrategy changes the load balancing strategy

func (*Dispatcher) SetTLSConfig

func (d *Dispatcher) SetTLSConfig(cfg *tlsutil.ClientTLSConfig)

func (*Dispatcher) StartJob

func (d *Dispatcher) StartJob(ctx context.Context, functionID string, payload []byte) (string, error)

func (*Dispatcher) StartJobRequest

func (d *Dispatcher) StartJobRequest(ctx context.Context, req *sdkv1.InvokeRequest) (*sdkv1.StartJobResponse, error)

StartJobRequest forwards a structured InvokeRequest to the agent StartJob RPC.

func (*Dispatcher) Store

func (d *Dispatcher) Store() *reg.Store

func (*Dispatcher) StreamJob

func (d *Dispatcher) StreamJob(ctx context.Context, jobID string) ([]*sdkv1.JobEvent, bool, error)

func (*Dispatcher) StreamJobRealtime

func (d *Dispatcher) StreamJobRealtime(ctx context.Context, jobID string, fn func(*sdkv1.JobEvent) bool) (bool, error)

StreamJobRealtime forwards job events to the provided callback.

func (*Dispatcher) UnregisterJob

func (d *Dispatcher) UnregisterJob(jobID string)

UnregisterJob unregisters a job routing (exported method)

type FileJobRoutingStore

type FileJobRoutingStore struct {
	// contains filtered or unexported fields
}

FileJobRoutingStore implements JobRoutingStore using file-based persistence

func NewFileJobRoutingStore

func NewFileJobRoutingStore(dataDir string) (*FileJobRoutingStore, error)

NewFileJobRoutingStore creates a new file-based job routing store

func (*FileJobRoutingStore) Cleanup

func (s *FileJobRoutingStore) Cleanup(ttl time.Duration) error

func (*FileJobRoutingStore) Close

func (s *FileJobRoutingStore) Close() error

func (*FileJobRoutingStore) Delete

func (s *FileJobRoutingStore) Delete(jobID string) error

func (*FileJobRoutingStore) Get

func (s *FileJobRoutingStore) Get(jobID string) (*JobRouting, error)

func (*FileJobRoutingStore) List

func (s *FileJobRoutingStore) List() ([]*JobRouting, error)

func (*FileJobRoutingStore) Set

func (s *FileJobRoutingStore) Set(jobID, agentAddr string) error

type HealthCheckConfig added in v0.1.9

type HealthCheckConfig struct {
	// Score decay rate per tick (0-1)
	ScoreDecayRate float64
	// Score bonus on successful request
	ScoreSuccessBonus float64
	// Score penalty on failed request
	ScoreFailurePenalty float64
	// Minimum health score (0-100)
	MinScore float64
	// Maximum health score (0-100)
	MaxScore float64
	// Score decay interval
	DecayInterval time.Duration
	// Circuit breaker failure threshold
	FailureThreshold int32
	// Circuit breaker open timeout
	CircuitOpenTimeout time.Duration
	// Half-open max requests before transitioning
	HalfOpenMaxRequests int32
}

HealthCheckConfig configures the health check behavior

func DefaultHealthCheckConfig added in v0.1.9

func DefaultHealthCheckConfig() *HealthCheckConfig

DefaultHealthCheckConfig returns the default health check configuration

type HealthStatistics added in v0.1.9

type HealthStatistics struct {
	TotalRequests       int64
	SuccessfulRequests  int64
	FailedRequests      int64
	ActiveConnections   int32
	ConsecutiveFailures int32
	HealthScore         float64
	CircuitState        CircuitBreakerState
	LastSuccessTime     time.Time
	LastFailureTime     time.Time
	CircuitOpenedAt     time.Time
	LastStateChange     time.Time
}

HealthStatistics contains the current health statistics

type HealthTracker added in v0.1.9

type HealthTracker struct {
	// contains filtered or unexported fields
}

HealthTracker manages health states for all agents

func NewHealthTracker added in v0.1.9

func NewHealthTracker(config *HealthCheckConfig) *HealthTracker

NewHealthTracker creates a new health tracker

func (*HealthTracker) DecrementConnections added in v0.1.9

func (t *HealthTracker) DecrementConnections(agentID string)

DecrementConnections decrements the connection count for an agent

func (*HealthTracker) GetAllStates added in v0.1.9

func (t *HealthTracker) GetAllStates() map[string]*AgentHealthState

GetAllStates returns a snapshot of all agent states

func (*HealthTracker) GetAllStatistics added in v0.1.9

func (t *HealthTracker) GetAllStatistics() map[string]HealthStatistics

GetAllStatistics returns statistics for all agents

func (*HealthTracker) GetAvailableAgents added in v0.1.9

func (t *HealthTracker) GetAvailableAgents() []*AgentHealthState

GetAvailableAgents returns all agents that are available for routing

func (*HealthTracker) GetNextRoundRobinIndex added in v0.1.9

func (t *HealthTracker) GetNextRoundRobinIndex(count int) int32

GetNextRoundRobinIndex returns the next index for round-robin selection

func (*HealthTracker) GetOrCreateState added in v0.1.9

func (t *HealthTracker) GetOrCreateState(agentID, addr string) *AgentHealthState

GetOrCreateState returns the health state for an agent, creating it if necessary

func (*HealthTracker) GetState added in v0.1.9

func (t *HealthTracker) GetState(agentID string) (*AgentHealthState, bool)

GetState returns the health state for an agent

func (*HealthTracker) GetStatistics added in v0.1.9

func (t *HealthTracker) GetStatistics(agentID string) (HealthStatistics, error)

GetStatistics returns statistics for a specific agent

func (*HealthTracker) IncrementConnections added in v0.1.9

func (t *HealthTracker) IncrementConnections(agentID string)

IncrementConnections increments the connection count for an agent

func (*HealthTracker) RecordFailure added in v0.1.9

func (t *HealthTracker) RecordFailure(agentID string)

RecordFailure records a failed request for an agent

func (*HealthTracker) RecordSuccess added in v0.1.9

func (t *HealthTracker) RecordSuccess(agentID string)

RecordSuccess records a successful request for an agent

func (*HealthTracker) RegisterAgent added in v0.1.9

func (t *HealthTracker) RegisterAgent(agentID, addr string) *AgentHealthState

RegisterAgent registers a new agent or updates an existing one

func (*HealthTracker) Reset added in v0.1.9

func (t *HealthTracker) Reset(agentID string)

Reset resets the health state for an agent (e.g., after agent re-registration)

func (*HealthTracker) Start added in v0.1.9

func (t *HealthTracker) Start()

Start starts the health tracker background routines

func (*HealthTracker) Stop added in v0.1.9

func (t *HealthTracker) Stop()

Stop stops the health tracker

func (*HealthTracker) UnregisterAgent added in v0.1.9

func (t *HealthTracker) UnregisterAgent(agentID string)

UnregisterAgent removes an agent from tracking

type JobRouting

type JobRouting struct {
	JobID     string    `json:"job_id"`
	AgentAddr string    `json:"agent_addr"`
	CreatedAt time.Time `json:"created_at"`
	UpdatedAt time.Time `json:"updated_at"`
}

JobRouting represents a job routing entry

type JobRoutingStore

type JobRoutingStore interface {
	// Get retrieves job routing by job ID
	Get(jobID string) (*JobRouting, error)

	// Set stores or updates job routing
	Set(jobID, agentAddr string) error

	// Delete removes job routing
	Delete(jobID string) error

	// List returns all job routings
	List() ([]*JobRouting, error)

	// Cleanup removes old entries (older than ttl)
	Cleanup(ttl time.Duration) error

	// Close closes the store
	Close() error
}

JobRoutingStore defines the interface for job routing persistence

type LoadBalanceStrategy added in v0.1.9

type LoadBalanceStrategy string

LoadBalanceStrategy defines the load balancing strategy

const (
	// StrategyMinID selects agent with minimum ID (original behavior)
	StrategyMinID LoadBalanceStrategy = "min_id"
	// StrategyRoundRobin selects agents in round-robin fashion
	StrategyRoundRobin LoadBalanceStrategy = "round_robin"
	// StrategyLeastConn selects agent with least active connections
	StrategyLeastConn LoadBalanceStrategy = "least_conn"
	// StrategyWeighted selects agent based on health score weights
	StrategyWeighted LoadBalanceStrategy = "weighted"
)

type LoadBalancer added in v0.1.9

type LoadBalancer struct {
	// contains filtered or unexported fields
}

LoadBalancer selects the best agent for routing based on strategy

func NewLoadBalancer added in v0.1.9

func NewLoadBalancer(strategy LoadBalanceStrategy, tracker *HealthTracker) *LoadBalancer

NewLoadBalancer creates a new load balancer

func (*LoadBalancer) BuildCandidates added in v0.1.9

func (lb *LoadBalancer) BuildCandidates(sessions []*reg.AgentSession, functionID string) []*Candidate

BuildCandidates builds candidate list from agent sessions

func (*LoadBalancer) GetStatistics added in v0.1.9

func (lb *LoadBalancer) GetStatistics() LoadBalancerStats

GetStatistics returns load balancer statistics

func (*LoadBalancer) GetStrategy added in v0.1.9

func (lb *LoadBalancer) GetStrategy() LoadBalanceStrategy

GetStrategy returns the current load balancing strategy

func (*LoadBalancer) Select added in v0.1.9

func (lb *LoadBalancer) Select(functionID string, candidates []*Candidate) (*Candidate, error)

Select selects the best agent for the given function

func (*LoadBalancer) SetStrategy added in v0.1.9

func (lb *LoadBalancer) SetStrategy(strategy LoadBalanceStrategy)

SetStrategy updates the load balancing strategy

type LoadBalancerStats added in v0.1.9

type LoadBalancerStats struct {
	Strategy        LoadBalanceStrategy       `json:"strategy"`
	RoundRobinState map[string]int32          `json:"roundRobinState,omitempty"`
	AgentStats      map[string]CandidateStats `json:"agentStats,omitempty"`
}

LoadBalancerStats contains load balancer statistics

type MemoryJobRoutingStore

type MemoryJobRoutingStore struct {
	// contains filtered or unexported fields
}

MemoryJobRoutingStore implements JobRoutingStore using in-memory storage

func NewMemoryJobRoutingStore

func NewMemoryJobRoutingStore() *MemoryJobRoutingStore

NewMemoryJobRoutingStore creates a new in-memory job routing store

func (*MemoryJobRoutingStore) Cleanup

func (s *MemoryJobRoutingStore) Cleanup(ttl time.Duration) error

func (*MemoryJobRoutingStore) Close

func (s *MemoryJobRoutingStore) Close() error

func (*MemoryJobRoutingStore) Delete

func (s *MemoryJobRoutingStore) Delete(jobID string) error

func (*MemoryJobRoutingStore) Get

func (s *MemoryJobRoutingStore) Get(jobID string) (*JobRouting, error)

func (*MemoryJobRoutingStore) List

func (s *MemoryJobRoutingStore) List() ([]*JobRouting, error)

func (*MemoryJobRoutingStore) Set

func (s *MemoryJobRoutingStore) Set(jobID, agentAddr string) error

type ReconnectionManager added in v0.1.9

type ReconnectionManager struct {
	// contains filtered or unexported fields
}

ReconnectionManager manages reconnection states for multiple connections

func NewReconnectionManager added in v0.1.9

func NewReconnectionManager(policy *ReconnectionPolicy) *ReconnectionManager

NewReconnectionManager creates a new reconnection manager

func (*ReconnectionManager) GetOrCreateState added in v0.1.9

func (m *ReconnectionManager) GetOrCreateState(key string) *ReconnectionState

GetOrCreateState gets or creates a reconnection state for the given key

func (*ReconnectionManager) GetPolicy added in v0.1.9

func (m *ReconnectionManager) GetPolicy() *ReconnectionPolicy

GetPolicy returns the reconnection policy

func (*ReconnectionManager) RemoveState added in v0.1.9

func (m *ReconnectionManager) RemoveState(key string)

RemoveState removes the reconnection state for the given key

func (*ReconnectionManager) ResetAll added in v0.1.9

func (m *ReconnectionManager) ResetAll()

ResetAll resets all reconnection states

func (*ReconnectionManager) SetPolicy added in v0.1.9

func (m *ReconnectionManager) SetPolicy(policy *ReconnectionPolicy)

SetPolicy updates the reconnection policy

func (*ReconnectionManager) Stats added in v0.1.9

Stats returns reconnection statistics

type ReconnectionPolicy added in v0.1.9

type ReconnectionPolicy struct {
	// MaxRetries is the maximum number of reconnection attempts (-1 for infinite)
	MaxRetries int
	// InitialDelay is the initial delay before the first retry
	InitialDelay time.Duration
	// MaxDelay is the maximum delay between retries
	MaxDelay time.Duration
	// Multiplier is the exponential backoff multiplier (e.g., 2.0 for doubling)
	Multiplier float64
	// Jitter is the random jitter factor (0-1) to add to delays
	Jitter float64
	// EnableAutoReconnect enables automatic reconnection
	EnableAutoReconnect bool
	// contains filtered or unexported fields
}

ReconnectionPolicy defines how reconnection attempts are made

func DefaultReconnectionPolicy added in v0.1.9

func DefaultReconnectionPolicy() *ReconnectionPolicy

DefaultReconnectionPolicy returns the default reconnection policy

func NewReconnectionPolicy added in v0.1.9

func NewReconnectionPolicy(maxRetries int, initialDelay, maxDelay time.Duration, multiplier, jitter float64, enableAuto bool) *ReconnectionPolicy

NewReconnectionPolicy creates a new reconnection policy with custom settings

func (*ReconnectionPolicy) NextDelay added in v0.1.9

func (p *ReconnectionPolicy) NextDelay(attempt int) (time.Duration, error)

NextDelay returns the delay before the next reconnection attempt

func (*ReconnectionPolicy) ShouldRetry added in v0.1.9

func (p *ReconnectionPolicy) ShouldRetry(attempt int) bool

ShouldRetry returns whether another reconnection attempt should be made

type ReconnectionState added in v0.1.9

type ReconnectionState struct {
	// contains filtered or unexported fields
}

ReconnectionState tracks the reconnection state for a connection

func NewReconnectionState added in v0.1.9

func NewReconnectionState(policy *ReconnectionPolicy) *ReconnectionState

NewReconnectionState creates a new reconnection state

func (*ReconnectionState) GetAttempt added in v0.1.9

func (s *ReconnectionState) GetAttempt() int

GetAttempt returns the current attempt count

func (*ReconnectionState) GetLastError added in v0.1.9

func (s *ReconnectionState) GetLastError() error

GetLastError returns the last error encountered

func (*ReconnectionState) IsEnabled added in v0.1.9

func (s *ReconnectionState) IsEnabled() bool

IsEnabled returns whether automatic reconnection is enabled

func (*ReconnectionState) NextDelay added in v0.1.9

func (s *ReconnectionState) NextDelay() (time.Duration, error)

NextDelay returns the delay before the next reconnection attempt

func (*ReconnectionState) RecordAttempt added in v0.1.9

func (s *ReconnectionState) RecordAttempt(err error)

RecordAttempt records a reconnection attempt

func (*ReconnectionState) Reset added in v0.1.9

func (s *ReconnectionState) Reset()

Reset resets the reconnection state (e.g., after successful connection)

func (*ReconnectionState) SetEnabled added in v0.1.9

func (s *ReconnectionState) SetEnabled(enabled bool)

SetEnabled enables or disables automatic reconnection

func (*ReconnectionState) ShouldRetry added in v0.1.9

func (s *ReconnectionState) ShouldRetry() bool

ShouldRetry returns whether another reconnection attempt should be made

type ReconnectionStats added in v0.1.9

type ReconnectionStats struct {
	Attempt     int       `json:"attempt"`
	LastAttempt time.Time `json:"lastAttempt"`
	LastError   error     `json:"lastError,omitempty"`
	Enabled     bool      `json:"enabled"`
}

ReconnectionStats contains reconnection statistics for a single connection

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL