Documentation
¶
Index ¶
- Constants
- Variables
- type ClusterError
- type ClusterEvent
- type ClusterLocalState
- type ClusterLocalStateNode
- type ClusterLocalStatePorts
- type ClusterManager
- func (cm *ClusterManager) CheckNamespaceCluster(ctx context.Context, namespaceName string) (string, string, bool, error)
- func (cm *ClusterManager) DeprovisionCluster(ctx context.Context, namespaceID int64) error
- func (cm *ClusterManager) DisableWebRTC(ctx context.Context, namespaceName string) error
- func (cm *ClusterManager) EnableWebRTC(ctx context.Context, namespaceName, enabledBy string) error
- func (cm *ClusterManager) GetCluster(ctx context.Context, clusterID string) (*NamespaceCluster, error)
- func (cm *ClusterManager) GetClusterByNamespace(ctx context.Context, namespaceName string) (*NamespaceCluster, error)
- func (cm *ClusterManager) GetClusterByNamespaceID(ctx context.Context, namespaceID int64) (*NamespaceCluster, error)
- func (cm *ClusterManager) GetClusterStatus(ctx context.Context, clusterID string) (*ClusterProvisioningStatus, error)
- func (cm *ClusterManager) GetClusterStatusByID(ctx context.Context, clusterID string) (interface{}, error)
- func (cm *ClusterManager) GetWebRTCConfig(ctx context.Context, namespaceName string) (*WebRTCConfig, error)
- func (cm *ClusterManager) GetWebRTCStatus(ctx context.Context, namespaceName string) (interface{}, error)
- func (cm *ClusterManager) HandleDeadNode(ctx context.Context, deadNodeID string)
- func (cm *ClusterManager) HandleRecoveredNode(ctx context.Context, nodeID string)
- func (cm *ClusterManager) HandleSuspectNode(ctx context.Context, suspectNodeID string)
- func (cm *ClusterManager) HandleSuspectRecovery(ctx context.Context, nodeID string)
- func (cm *ClusterManager) ProvisionCluster(ctx context.Context, namespaceID int, namespaceName, provisionedBy string) (*NamespaceCluster, error)
- func (cm *ClusterManager) ProvisionNamespaceCluster(ctx context.Context, namespaceID int, namespaceName, wallet string) (string, string, error)
- func (cm *ClusterManager) RepairCluster(ctx context.Context, namespaceName string) error
- func (cm *ClusterManager) ReplaceClusterNode(ctx context.Context, cluster *NamespaceCluster, deadNodeID string) error
- func (cm *ClusterManager) RestoreLocalClusters(ctx context.Context) error
- func (cm *ClusterManager) RestoreLocalClustersFromDisk(ctx context.Context) (int, error)
- func (cm *ClusterManager) SetLocalNodeID(id string)
- type ClusterManagerConfig
- type ClusterNode
- type ClusterNodeSelector
- type ClusterProvisioningStatus
- type ClusterStatus
- type DNSRecordManager
- func (drm *DNSRecordManager) AddNamespaceRecord(ctx context.Context, namespaceName, ip string) error
- func (drm *DNSRecordManager) CountActiveNamespaceRecords(ctx context.Context, namespaceName string) (int, error)
- func (drm *DNSRecordManager) CreateNamespaceRecords(ctx context.Context, namespaceName string, nodeIPs []string) error
- func (drm *DNSRecordManager) CreateTURNRecords(ctx context.Context, namespaceName string, turnIPs []string) error
- func (drm *DNSRecordManager) DeleteNamespaceRecords(ctx context.Context, namespaceName string) error
- func (drm *DNSRecordManager) DeleteTURNRecords(ctx context.Context, namespaceName string) error
- func (drm *DNSRecordManager) DisableNamespaceRecord(ctx context.Context, namespaceName, ip string) error
- func (drm *DNSRecordManager) EnableNamespaceRecord(ctx context.Context, namespaceName, ip string) error
- func (drm *DNSRecordManager) GetNamespaceGatewayIPs(ctx context.Context, namespaceName string) ([]string, error)
- func (drm *DNSRecordManager) UpdateNamespaceRecord(ctx context.Context, namespaceName, oldIP, newIP string) error
- type EventType
- type NamespaceCluster
- type NamespacePortAllocator
- func (npa *NamespacePortAllocator) AllocatePortBlock(ctx context.Context, nodeID, namespaceClusterID string) (*PortBlock, error)
- func (npa *NamespacePortAllocator) DeallocateAllPortBlocks(ctx context.Context, namespaceClusterID string) error
- func (npa *NamespacePortAllocator) DeallocatePortBlock(ctx context.Context, namespaceClusterID, nodeID string) error
- func (npa *NamespacePortAllocator) GetAllPortBlocks(ctx context.Context, namespaceClusterID string) ([]PortBlock, error)
- func (npa *NamespacePortAllocator) GetNodeAllocationCount(ctx context.Context, nodeID string) (int, error)
- func (npa *NamespacePortAllocator) GetNodeCapacity(ctx context.Context, nodeID string) (int, error)
- func (npa *NamespacePortAllocator) GetPortBlock(ctx context.Context, namespaceClusterID, nodeID string) (*PortBlock, error)
- type NodeCapacity
- type NodeRole
- type NodeStatus
- type PortBlock
- type ProvisioningResponse
- type SFUInstanceConfig
- type SystemdSpawner
- func (s *SystemdSpawner) DeleteClusterState(namespace string) error
- func (s *SystemdSpawner) RestartGateway(ctx context.Context, namespace, nodeID string, cfg gateway.InstanceConfig) error
- func (s *SystemdSpawner) SaveClusterState(namespace string, data []byte) error
- func (s *SystemdSpawner) SpawnGateway(ctx context.Context, namespace, nodeID string, cfg gateway.InstanceConfig) error
- func (s *SystemdSpawner) SpawnOlric(ctx context.Context, namespace, nodeID string, cfg olric.InstanceConfig) error
- func (s *SystemdSpawner) SpawnRQLite(ctx context.Context, namespace, nodeID string, cfg rqlite.InstanceConfig) error
- func (s *SystemdSpawner) SpawnSFU(ctx context.Context, namespace, nodeID string, cfg SFUInstanceConfig) error
- func (s *SystemdSpawner) SpawnTURN(ctx context.Context, namespace, nodeID string, cfg TURNInstanceConfig) error
- func (s *SystemdSpawner) StopAll(ctx context.Context, namespace string) error
- func (s *SystemdSpawner) StopGateway(ctx context.Context, namespace, nodeID string) error
- func (s *SystemdSpawner) StopOlric(ctx context.Context, namespace, nodeID string) error
- func (s *SystemdSpawner) StopRQLite(ctx context.Context, namespace, nodeID string) error
- func (s *SystemdSpawner) StopSFU(ctx context.Context, namespace, nodeID string) error
- func (s *SystemdSpawner) StopTURN(ctx context.Context, namespace, nodeID string) error
- type TURNInstanceConfig
- type WebRTCConfig
- type WebRTCPortAllocator
- func (wpa *WebRTCPortAllocator) AllocateSFUPorts(ctx context.Context, nodeID, namespaceClusterID string) (*WebRTCPortBlock, error)
- func (wpa *WebRTCPortAllocator) AllocateTURNPorts(ctx context.Context, nodeID, namespaceClusterID string) (*WebRTCPortBlock, error)
- func (wpa *WebRTCPortAllocator) DeallocateAll(ctx context.Context, namespaceClusterID string) error
- func (wpa *WebRTCPortAllocator) DeallocateByNode(ctx context.Context, namespaceClusterID, nodeID, serviceType string) error
- func (wpa *WebRTCPortAllocator) GetAllPorts(ctx context.Context, namespaceClusterID string) ([]WebRTCPortBlock, error)
- func (wpa *WebRTCPortAllocator) GetSFUPorts(ctx context.Context, namespaceClusterID, nodeID string) (*WebRTCPortBlock, error)
- func (wpa *WebRTCPortAllocator) GetTURNPorts(ctx context.Context, namespaceClusterID, nodeID string) (*WebRTCPortBlock, error)
- func (wpa *WebRTCPortAllocator) NodeHasTURN(ctx context.Context, nodeID string) (bool, error)
- type WebRTCPortBlock
- type WebRTCRoom
Constants ¶
const ( // NamespacePortRangeStart is the beginning of the reserved port range for namespace services NamespacePortRangeStart = 10000 // NamespacePortRangeEnd is the end of the reserved port range for namespace services NamespacePortRangeEnd = 10099 // PortsPerNamespace is the number of ports required per namespace instance on a node // RQLite HTTP (0), RQLite Raft (1), Olric HTTP (2), Olric Memberlist (3), Gateway HTTP (4) PortsPerNamespace = 5 // MaxNamespacesPerNode is the maximum number of namespace instances a single node can host MaxNamespacesPerNode = (NamespacePortRangeEnd - NamespacePortRangeStart + 1) / PortsPerNamespace // 20 )
Port allocation constants
const ( // SFU media port range: 20000-29999 // Each namespace gets a 500-port sub-range for RTP media SFUMediaPortRangeStart = 20000 SFUMediaPortRangeEnd = 29999 SFUMediaPortsPerNamespace = 500 // SFU signaling ports: 30000-30099 // Each namespace gets 1 signaling port per node SFUSignalingPortRangeStart = 30000 SFUSignalingPortRangeEnd = 30099 // TURN relay port range: 49152-65535 // Each namespace gets an 800-port sub-range for TURN relay TURNRelayPortRangeStart = 49152 TURNRelayPortRangeEnd = 65535 TURNRelayPortsPerNamespace = 800 // TURN listen ports (standard) TURNDefaultPort = 3478 TURNSPort = 5349 // TURNS (TURN over TLS on TCP) // Default TURN credential TTL in seconds (10 minutes) DefaultTURNCredentialTTL = 600 // Default service counts per namespace DefaultSFUNodeCount = 3 // SFU on all 3 nodes DefaultTURNNodeCount = 2 // TURN on 2 of 3 nodes for HA )
WebRTC port allocation constants These are separate from the core namespace port range (10000-10099) to avoid breaking existing port blocks.
const ( DefaultRQLiteNodeCount = 3 DefaultOlricNodeCount = 3 DefaultGatewayNodeCount = 3 PublicRQLiteNodeCount = 5 PublicOlricNodeCount = 5 )
Default cluster sizes
Variables ¶
var ( ErrNoPortsAvailable = &ClusterError{Message: "no ports available on node"} ErrNodeAtCapacity = &ClusterError{Message: "node has reached maximum namespace instances"} ErrInsufficientNodes = &ClusterError{Message: "insufficient nodes available for cluster"} ErrClusterNotFound = &ClusterError{Message: "namespace cluster not found"} ErrClusterAlreadyExists = &ClusterError{Message: "namespace cluster already exists"} ErrProvisioningFailed = &ClusterError{Message: "cluster provisioning failed"} ErrNamespaceNotFound = &ClusterError{Message: "namespace not found"} ErrInvalidClusterStatus = &ClusterError{Message: "invalid cluster status for operation"} ErrRecoveryInProgress = &ClusterError{Message: "recovery already in progress for this cluster"} ErrWebRTCAlreadyEnabled = &ClusterError{Message: "WebRTC is already enabled for this namespace"} ErrWebRTCNotEnabled = &ClusterError{Message: "WebRTC is not enabled for this namespace"} ErrNoWebRTCPortsAvailable = &ClusterError{Message: "no WebRTC ports available on node"} )
Functions ¶
This section is empty.
Types ¶
type ClusterError ¶
Errors
func (*ClusterError) Error ¶
func (e *ClusterError) Error() string
func (*ClusterError) Unwrap ¶
func (e *ClusterError) Unwrap() error
type ClusterEvent ¶
type ClusterEvent struct {
ID string `json:"id" db:"id"`
NamespaceClusterID string `json:"namespace_cluster_id" db:"namespace_cluster_id"`
EventType EventType `json:"event_type" db:"event_type"`
NodeID string `json:"node_id,omitempty" db:"node_id"`
Message string `json:"message,omitempty" db:"message"`
Metadata string `json:"metadata,omitempty" db:"metadata"` // JSON
CreatedAt time.Time `json:"created_at" db:"created_at"`
}
ClusterEvent represents an audit event for cluster lifecycle
type ClusterLocalState ¶
type ClusterLocalState struct {
ClusterID string `json:"cluster_id"`
NamespaceName string `json:"namespace_name"`
LocalNodeID string `json:"local_node_id"`
LocalIP string `json:"local_ip"`
LocalPorts ClusterLocalStatePorts `json:"local_ports"`
AllNodes []ClusterLocalStateNode `json:"all_nodes"`
HasGateway bool `json:"has_gateway"`
BaseDomain string `json:"base_domain"`
SavedAt time.Time `json:"saved_at"`
// WebRTC fields (zero values when WebRTC not enabled — backward compatible)
HasSFU bool `json:"has_sfu,omitempty"`
HasTURN bool `json:"has_turn,omitempty"`
TURNDomain string `json:"turn_domain,omitempty"` // TURN server domain for gateway config
TURNCredentialTTL int `json:"turn_credential_ttl,omitempty"`
SFUSignalingPort int `json:"sfu_signaling_port,omitempty"`
SFUMediaPortStart int `json:"sfu_media_port_start,omitempty"`
SFUMediaPortEnd int `json:"sfu_media_port_end,omitempty"`
TURNListenPort int `json:"turn_listen_port,omitempty"`
TURNTLSPort int `json:"turn_tls_port,omitempty"`
TURNRelayPortStart int `json:"turn_relay_port_start,omitempty"`
TURNRelayPortEnd int `json:"turn_relay_port_end,omitempty"`
}
ClusterLocalState is persisted to disk so namespace processes can be restored without querying the main RQLite cluster (which may not have a leader yet on cold start).
type ClusterLocalStateNode ¶
type ClusterLocalStatePorts ¶
type ClusterManager ¶
type ClusterManager struct {
// contains filtered or unexported fields
}
ClusterManager orchestrates namespace cluster provisioning and lifecycle
func NewClusterManager ¶
func NewClusterManager( db rqlite.Client, cfg ClusterManagerConfig, logger *zap.Logger, ) *ClusterManager
NewClusterManager creates a new cluster manager
func NewClusterManagerWithComponents ¶
func NewClusterManagerWithComponents( db rqlite.Client, portAllocator *NamespacePortAllocator, nodeSelector *ClusterNodeSelector, systemdSpawner *SystemdSpawner, cfg ClusterManagerConfig, logger *zap.Logger, ) *ClusterManager
NewClusterManagerWithComponents creates a cluster manager with custom components (useful for testing)
func (*ClusterManager) CheckNamespaceCluster ¶
func (cm *ClusterManager) CheckNamespaceCluster(ctx context.Context, namespaceName string) (string, string, bool, error)
CheckNamespaceCluster checks if a namespace has a cluster and returns its status. Returns: (clusterID, status, needsProvisioning, error) - If the namespace is "default", returns ("", "default", false, nil) as it uses the global cluster - If a cluster exists and is ready/provisioning, returns (clusterID, status, false, nil) - If no cluster exists or cluster failed, returns ("", "", true, nil) to indicate provisioning is needed
func (*ClusterManager) DeprovisionCluster ¶
func (cm *ClusterManager) DeprovisionCluster(ctx context.Context, namespaceID int64) error
DeprovisionCluster tears down a namespace cluster on all nodes. Stops namespace infrastructure (Gateway, Olric, RQLite) on every cluster node, deletes cluster-state.json, deallocates ports, removes DNS records, and cleans up DB.
func (*ClusterManager) DisableWebRTC ¶
func (cm *ClusterManager) DisableWebRTC(ctx context.Context, namespaceName string) error
DisableWebRTC disables WebRTC for a namespace cluster. Stops SFU/TURN services, deallocates ports, and cleans up DNS/DB.
func (*ClusterManager) EnableWebRTC ¶
func (cm *ClusterManager) EnableWebRTC(ctx context.Context, namespaceName, enabledBy string) error
EnableWebRTC enables WebRTC (SFU + TURN) for an existing namespace cluster. Allocates ports, spawns SFU on all 3 nodes and TURN on 2 nodes, creates TURN DNS records, and updates cluster state.
func (*ClusterManager) GetCluster ¶
func (cm *ClusterManager) GetCluster(ctx context.Context, clusterID string) (*NamespaceCluster, error)
GetCluster retrieves a cluster by ID
func (*ClusterManager) GetClusterByNamespace ¶
func (cm *ClusterManager) GetClusterByNamespace(ctx context.Context, namespaceName string) (*NamespaceCluster, error)
GetClusterByNamespace retrieves a cluster by namespace name
func (*ClusterManager) GetClusterByNamespaceID ¶
func (cm *ClusterManager) GetClusterByNamespaceID(ctx context.Context, namespaceID int64) (*NamespaceCluster, error)
GetClusterByNamespaceID retrieves a cluster by namespace ID
func (*ClusterManager) GetClusterStatus ¶
func (cm *ClusterManager) GetClusterStatus(ctx context.Context, clusterID string) (*ClusterProvisioningStatus, error)
GetClusterStatus returns the current status of a namespace cluster
func (*ClusterManager) GetClusterStatusByID ¶
func (cm *ClusterManager) GetClusterStatusByID(ctx context.Context, clusterID string) (interface{}, error)
GetClusterStatusByID returns the full status of a cluster by ID. This method is part of the ClusterProvisioner interface used by the gateway. It returns a generic struct that matches the interface definition in auth/handlers.go.
func (*ClusterManager) GetWebRTCConfig ¶
func (cm *ClusterManager) GetWebRTCConfig(ctx context.Context, namespaceName string) (*WebRTCConfig, error)
GetWebRTCConfig returns the WebRTC configuration for a namespace. Transparently decrypts the TURN shared secret if it was encrypted at rest.
func (*ClusterManager) GetWebRTCStatus ¶
func (cm *ClusterManager) GetWebRTCStatus(ctx context.Context, namespaceName string) (interface{}, error)
GetWebRTCStatus returns the WebRTC config as an interface{} for the WebRTCManager interface.
func (*ClusterManager) HandleDeadNode ¶
func (cm *ClusterManager) HandleDeadNode(ctx context.Context, deadNodeID string)
HandleDeadNode processes the death of a network node by recovering all affected namespace clusters and deployment replicas. It marks all deployment replicas on the dead node as failed, updates deployment statuses, and replaces namespace cluster nodes.
func (*ClusterManager) HandleRecoveredNode ¶
func (cm *ClusterManager) HandleRecoveredNode(ctx context.Context, nodeID string)
HandleRecoveredNode handles a previously-dead node coming back online. It checks if the node was replaced during downtime and cleans up orphaned services.
func (*ClusterManager) HandleSuspectNode ¶
func (cm *ClusterManager) HandleSuspectNode(ctx context.Context, suspectNodeID string)
HandleSuspectNode disables DNS records for a suspect node to prevent traffic from being routed to it. Called early (T+30s) when the node first becomes suspect, before confirming it's actually dead. If the node recovers, HandleSuspectRecovery will re-enable the records.
Safety: never disables the last active record for a namespace.
func (*ClusterManager) HandleSuspectRecovery ¶
func (cm *ClusterManager) HandleSuspectRecovery(ctx context.Context, nodeID string)
HandleSuspectRecovery re-enables DNS records for a node that recovered from suspect state without going dead. Called when the health monitor detects that a previously suspect node is responding to probes again.
func (*ClusterManager) ProvisionCluster ¶
func (cm *ClusterManager) ProvisionCluster(ctx context.Context, namespaceID int, namespaceName, provisionedBy string) (*NamespaceCluster, error)
ProvisionCluster provisions a new 3-node cluster for a namespace This is an async operation - returns immediately with cluster ID for polling
func (*ClusterManager) ProvisionNamespaceCluster ¶
func (cm *ClusterManager) ProvisionNamespaceCluster(ctx context.Context, namespaceID int, namespaceName, wallet string) (string, string, error)
ProvisionNamespaceCluster triggers provisioning for a new namespace cluster. Returns: (clusterID, pollURL, error) This starts an async provisioning process and returns immediately with the cluster ID and a URL to poll for status updates.
func (*ClusterManager) RepairCluster ¶
func (cm *ClusterManager) RepairCluster(ctx context.Context, namespaceName string) error
RepairCluster checks a namespace cluster for missing nodes and adds replacements without touching surviving nodes. This is used to repair under-provisioned clusters (e.g., after manual node removal) without data loss or downtime.
func (*ClusterManager) ReplaceClusterNode ¶
func (cm *ClusterManager) ReplaceClusterNode(ctx context.Context, cluster *NamespaceCluster, deadNodeID string) error
ReplaceClusterNode replaces a dead node in a specific namespace cluster. It selects a new node, allocates ports, spawns services, updates DNS, and cleans up.
func (*ClusterManager) RestoreLocalClusters ¶
func (cm *ClusterManager) RestoreLocalClusters(ctx context.Context) error
RestoreLocalClusters restores namespace cluster processes that should be running on this node. Called on node startup to re-spawn RQLite, Olric, and Gateway processes for clusters that were previously provisioned and assigned to this node.
func (*ClusterManager) RestoreLocalClustersFromDisk ¶
func (cm *ClusterManager) RestoreLocalClustersFromDisk(ctx context.Context) (int, error)
RestoreLocalClustersFromDisk restores namespace processes using local state files, avoiding any dependency on the main RQLite cluster being available. Returns the number of namespaces restored, or -1 if no state files were found.
func (*ClusterManager) SetLocalNodeID ¶
func (cm *ClusterManager) SetLocalNodeID(id string)
SetLocalNodeID sets this node's peer ID for local/remote dispatch during provisioning
type ClusterManagerConfig ¶
type ClusterManagerConfig struct {
BaseDomain string // Base domain for namespace gateways (e.g., "orama-devnet.network")
BaseDataDir string // Base directory for namespace data (e.g., "~/.orama/data/namespaces")
GlobalRQLiteDSN string // Global RQLite DSN for API key validation (e.g., "http://localhost:4001")
// IPFS configuration for namespace gateways (defaults used if not set)
IPFSClusterAPIURL string // IPFS Cluster API URL (default: "http://localhost:9094")
IPFSAPIURL string // IPFS API URL (default: "http://localhost:4501")
IPFSTimeout time.Duration // Timeout for IPFS operations (default: 60s)
IPFSReplicationFactor int // IPFS replication factor (default: 3)
// TurnEncryptionKey is a 32-byte AES-256 key for encrypting TURN shared secrets
// in RQLite. Derived from cluster secret via HKDF(clusterSecret, "turn-encryption").
// If nil, TURN secrets are stored in plaintext (backward compatibility).
TurnEncryptionKey []byte
}
ClusterManagerConfig contains configuration for the cluster manager
type ClusterNode ¶
type ClusterNode struct {
ID string `json:"id" db:"id"`
NamespaceClusterID string `json:"namespace_cluster_id" db:"namespace_cluster_id"`
NodeID string `json:"node_id" db:"node_id"`
Role NodeRole `json:"role" db:"role"`
RQLiteHTTPPort int `json:"rqlite_http_port,omitempty" db:"rqlite_http_port"`
RQLiteRaftPort int `json:"rqlite_raft_port,omitempty" db:"rqlite_raft_port"`
OlricHTTPPort int `json:"olric_http_port,omitempty" db:"olric_http_port"`
OlricMemberlistPort int `json:"olric_memberlist_port,omitempty" db:"olric_memberlist_port"`
GatewayHTTPPort int `json:"gateway_http_port,omitempty" db:"gateway_http_port"`
Status NodeStatus `json:"status" db:"status"`
ProcessPID int `json:"process_pid,omitempty" db:"process_pid"`
LastHeartbeat *time.Time `json:"last_heartbeat,omitempty" db:"last_heartbeat"`
ErrorMessage string `json:"error_message,omitempty" db:"error_message"`
RQLiteJoinAddress string `json:"rqlite_join_address,omitempty" db:"rqlite_join_address"`
OlricPeers string `json:"olric_peers,omitempty" db:"olric_peers"` // JSON array
CreatedAt time.Time `json:"created_at" db:"created_at"`
UpdatedAt time.Time `json:"updated_at" db:"updated_at"`
}
ClusterNode represents a node participating in a namespace cluster
type ClusterNodeSelector ¶
type ClusterNodeSelector struct {
// contains filtered or unexported fields
}
ClusterNodeSelector selects optimal nodes for namespace clusters. It extends the existing capacity scoring system from deployments/home_node.go to select multiple nodes based on available capacity.
func NewClusterNodeSelector ¶
func NewClusterNodeSelector(db rqlite.Client, portAllocator *NamespacePortAllocator, logger *zap.Logger) *ClusterNodeSelector
NewClusterNodeSelector creates a new node selector
func (*ClusterNodeSelector) SelectNodesForCluster ¶
func (cns *ClusterNodeSelector) SelectNodesForCluster(ctx context.Context, nodeCount int) ([]NodeCapacity, error)
SelectNodesForCluster selects the optimal N nodes for a new namespace cluster. Returns the node IDs sorted by score (best first).
func (*ClusterNodeSelector) SelectReplacementNode ¶
func (cns *ClusterNodeSelector) SelectReplacementNode(ctx context.Context, excludeNodeIDs []string) (*NodeCapacity, error)
SelectReplacementNode selects a single optimal node for replacing a dead node in an existing cluster. excludeNodeIDs contains nodes that should not be selected (dead node + existing cluster members).
type ClusterProvisioningStatus ¶
type ClusterProvisioningStatus struct {
ClusterID string `json:"cluster_id"`
Namespace string `json:"namespace"`
Status ClusterStatus `json:"status"`
Nodes []string `json:"nodes"`
RQLiteReady bool `json:"rqlite_ready"`
OlricReady bool `json:"olric_ready"`
GatewayReady bool `json:"gateway_ready"`
DNSReady bool `json:"dns_ready"`
Error string `json:"error,omitempty"`
CreatedAt time.Time `json:"created_at"`
ReadyAt *time.Time `json:"ready_at,omitempty"`
}
ClusterProvisioningStatus is the response format for the /v1/namespace/status endpoint
type ClusterStatus ¶
type ClusterStatus string
ClusterStatus represents the current state of a namespace cluster
const ( ClusterStatusNone ClusterStatus = "none" // No cluster provisioned ClusterStatusProvisioning ClusterStatus = "provisioning" // Cluster is being provisioned ClusterStatusReady ClusterStatus = "ready" // Cluster is operational ClusterStatusDegraded ClusterStatus = "degraded" // Some nodes are unhealthy ClusterStatusFailed ClusterStatus = "failed" // Cluster failed to provision/operate ClusterStatusDeprovisioning ClusterStatus = "deprovisioning" // Cluster is being deprovisioned )
type DNSRecordManager ¶
type DNSRecordManager struct {
// contains filtered or unexported fields
}
DNSRecordManager manages DNS records for namespace clusters. It creates and deletes DNS A records for namespace gateway endpoints.
func NewDNSRecordManager ¶
NewDNSRecordManager creates a new DNS record manager
func (*DNSRecordManager) AddNamespaceRecord ¶
func (drm *DNSRecordManager) AddNamespaceRecord(ctx context.Context, namespaceName, ip string) error
AddNamespaceRecord adds DNS A records for a single IP to an existing namespace. Unlike CreateNamespaceRecords, this does NOT delete existing records — it's purely additive. Used when adding a new node to an under-provisioned cluster (repair).
func (*DNSRecordManager) CountActiveNamespaceRecords ¶
func (drm *DNSRecordManager) CountActiveNamespaceRecords(ctx context.Context, namespaceName string) (int, error)
CountActiveNamespaceRecords returns the number of active A records for a namespace's main FQDN. Used as a safety check before disabling records to prevent disabling the last one.
func (*DNSRecordManager) CreateNamespaceRecords ¶
func (drm *DNSRecordManager) CreateNamespaceRecords(ctx context.Context, namespaceName string, nodeIPs []string) error
CreateNamespaceRecords creates DNS A records for a namespace cluster. Each namespace gets records for ns-{namespace}.{baseDomain} pointing to its gateway nodes. Multiple A records enable round-robin DNS load balancing.
func (*DNSRecordManager) CreateTURNRecords ¶
func (drm *DNSRecordManager) CreateTURNRecords(ctx context.Context, namespaceName string, turnIPs []string) error
CreateTURNRecords creates DNS A records for TURN servers. TURN records follow the pattern: turn.ns-{namespace}.{baseDomain} -> TURN node IPs
func (*DNSRecordManager) DeleteNamespaceRecords ¶
func (drm *DNSRecordManager) DeleteNamespaceRecords(ctx context.Context, namespaceName string) error
DeleteNamespaceRecords deletes all DNS records for a namespace
func (*DNSRecordManager) DeleteTURNRecords ¶
func (drm *DNSRecordManager) DeleteTURNRecords(ctx context.Context, namespaceName string) error
DeleteTURNRecords deletes all TURN DNS records for a namespace.
func (*DNSRecordManager) DisableNamespaceRecord ¶
func (drm *DNSRecordManager) DisableNamespaceRecord(ctx context.Context, namespaceName, ip string) error
DisableNamespaceRecord marks a specific IP's record as inactive (for temporary failover)
func (*DNSRecordManager) EnableNamespaceRecord ¶
func (drm *DNSRecordManager) EnableNamespaceRecord(ctx context.Context, namespaceName, ip string) error
EnableNamespaceRecord marks a specific IP's record as active (for recovery)
func (*DNSRecordManager) GetNamespaceGatewayIPs ¶
func (drm *DNSRecordManager) GetNamespaceGatewayIPs(ctx context.Context, namespaceName string) ([]string, error)
GetNamespaceGatewayIPs returns the IP addresses for a namespace's gateway
func (*DNSRecordManager) UpdateNamespaceRecord ¶
func (drm *DNSRecordManager) UpdateNamespaceRecord(ctx context.Context, namespaceName, oldIP, newIP string) error
UpdateNamespaceRecord updates a specific node's DNS record (for failover)
type EventType ¶
type EventType string
EventType represents types of cluster lifecycle events
const ( EventProvisioningStarted EventType = "provisioning_started" EventNodesSelected EventType = "nodes_selected" EventPortsAllocated EventType = "ports_allocated" EventRQLiteStarted EventType = "rqlite_started" EventRQLiteJoined EventType = "rqlite_joined" EventRQLiteLeaderElected EventType = "rqlite_leader_elected" EventOlricStarted EventType = "olric_started" EventOlricJoined EventType = "olric_joined" EventGatewayStarted EventType = "gateway_started" EventDNSCreated EventType = "dns_created" EventClusterReady EventType = "cluster_ready" EventClusterDegraded EventType = "cluster_degraded" EventClusterFailed EventType = "cluster_failed" EventNodeFailed EventType = "node_failed" EventNodeRecovered EventType = "node_recovered" EventDeprovisionStarted EventType = "deprovisioning_started" EventDeprovisioned EventType = "deprovisioned" EventRecoveryStarted EventType = "recovery_started" EventNodeReplaced EventType = "node_replaced" EventRecoveryComplete EventType = "recovery_complete" EventRecoveryFailed EventType = "recovery_failed" EventWebRTCEnabled EventType = "webrtc_enabled" EventWebRTCDisabled EventType = "webrtc_disabled" EventSFUStarted EventType = "sfu_started" EventSFUStopped EventType = "sfu_stopped" EventTURNStarted EventType = "turn_started" EventTURNStopped EventType = "turn_stopped" )
type NamespaceCluster ¶
type NamespaceCluster struct {
ID string `json:"id" db:"id"`
NamespaceID int `json:"namespace_id" db:"namespace_id"`
NamespaceName string `json:"namespace_name" db:"namespace_name"`
Status ClusterStatus `json:"status" db:"status"`
RQLiteNodeCount int `json:"rqlite_node_count" db:"rqlite_node_count"`
OlricNodeCount int `json:"olric_node_count" db:"olric_node_count"`
GatewayNodeCount int `json:"gateway_node_count" db:"gateway_node_count"`
ProvisionedBy string `json:"provisioned_by" db:"provisioned_by"`
ProvisionedAt time.Time `json:"provisioned_at" db:"provisioned_at"`
ReadyAt *time.Time `json:"ready_at,omitempty" db:"ready_at"`
LastHealthCheck *time.Time `json:"last_health_check,omitempty" db:"last_health_check"`
ErrorMessage string `json:"error_message,omitempty" db:"error_message"`
RetryCount int `json:"retry_count" db:"retry_count"`
// Populated by queries, not stored directly
Nodes []ClusterNode `json:"nodes,omitempty"`
}
NamespaceCluster represents a dedicated cluster for a namespace
type NamespacePortAllocator ¶
type NamespacePortAllocator struct {
// contains filtered or unexported fields
}
NamespacePortAllocator manages the reserved port range (10000-10099) for namespace services. Each namespace instance on a node gets a block of 5 consecutive ports.
func NewNamespacePortAllocator ¶
func NewNamespacePortAllocator(db rqlite.Client, logger *zap.Logger) *NamespacePortAllocator
NewNamespacePortAllocator creates a new port allocator
func (*NamespacePortAllocator) AllocatePortBlock ¶
func (npa *NamespacePortAllocator) AllocatePortBlock(ctx context.Context, nodeID, namespaceClusterID string) (*PortBlock, error)
AllocatePortBlock finds and allocates the next available 5-port block on a node. Returns an error if the node is at capacity (20 namespace instances).
func (*NamespacePortAllocator) DeallocateAllPortBlocks ¶
func (npa *NamespacePortAllocator) DeallocateAllPortBlocks(ctx context.Context, namespaceClusterID string) error
DeallocateAllPortBlocks releases all port blocks for a namespace cluster
func (*NamespacePortAllocator) DeallocatePortBlock ¶
func (npa *NamespacePortAllocator) DeallocatePortBlock(ctx context.Context, namespaceClusterID, nodeID string) error
DeallocatePortBlock releases a port block when a namespace is deprovisioned
func (*NamespacePortAllocator) GetAllPortBlocks ¶
func (npa *NamespacePortAllocator) GetAllPortBlocks(ctx context.Context, namespaceClusterID string) ([]PortBlock, error)
GetAllPortBlocks retrieves all port blocks for a namespace cluster
func (*NamespacePortAllocator) GetNodeAllocationCount ¶
func (npa *NamespacePortAllocator) GetNodeAllocationCount(ctx context.Context, nodeID string) (int, error)
GetNodeAllocationCount returns the number of namespace instances on a node
func (*NamespacePortAllocator) GetNodeCapacity ¶
GetNodeCapacity returns how many more namespace instances a node can host
func (*NamespacePortAllocator) GetPortBlock ¶
func (npa *NamespacePortAllocator) GetPortBlock(ctx context.Context, namespaceClusterID, nodeID string) (*PortBlock, error)
GetPortBlock retrieves the port block for a namespace on a specific node
type NodeCapacity ¶
type NodeCapacity struct {
NodeID string `json:"node_id"`
IPAddress string `json:"ip_address"`
InternalIP string `json:"internal_ip"` // WireGuard IP for inter-node communication
DeploymentCount int `json:"deployment_count"`
AllocatedPorts int `json:"allocated_ports"`
AvailablePorts int `json:"available_ports"`
UsedMemoryMB int `json:"used_memory_mb"`
AvailableMemoryMB int `json:"available_memory_mb"`
UsedCPUPercent int `json:"used_cpu_percent"`
NamespaceInstanceCount int `json:"namespace_instance_count"` // Number of namespace clusters on this node
AvailableNamespaceSlots int `json:"available_namespace_slots"` // How many more namespace instances can fit
Score float64 `json:"score"`
}
NodeCapacity represents the capacity metrics for a single node
type NodeStatus ¶
type NodeStatus string
NodeStatus represents the status of a service on a node
const ( NodeStatusPending NodeStatus = "pending" NodeStatusStarting NodeStatus = "starting" NodeStatusRunning NodeStatus = "running" NodeStatusStopped NodeStatus = "stopped" NodeStatusFailed NodeStatus = "failed" )
type PortBlock ¶
type PortBlock struct {
ID string `json:"id" db:"id"`
NodeID string `json:"node_id" db:"node_id"`
NamespaceClusterID string `json:"namespace_cluster_id" db:"namespace_cluster_id"`
PortStart int `json:"port_start" db:"port_start"`
PortEnd int `json:"port_end" db:"port_end"`
RQLiteHTTPPort int `json:"rqlite_http_port" db:"rqlite_http_port"`
RQLiteRaftPort int `json:"rqlite_raft_port" db:"rqlite_raft_port"`
OlricHTTPPort int `json:"olric_http_port" db:"olric_http_port"`
OlricMemberlistPort int `json:"olric_memberlist_port" db:"olric_memberlist_port"`
GatewayHTTPPort int `json:"gateway_http_port" db:"gateway_http_port"`
AllocatedAt time.Time `json:"allocated_at" db:"allocated_at"`
}
PortBlock represents an allocated block of ports for a namespace on a node
type ProvisioningResponse ¶
type ProvisioningResponse struct {
Status string `json:"status"`
ClusterID string `json:"cluster_id"`
PollURL string `json:"poll_url"`
EstimatedTimeSeconds int `json:"estimated_time_seconds"`
}
ProvisioningResponse is returned when a new namespace triggers cluster provisioning
type SFUInstanceConfig ¶
type SFUInstanceConfig struct {
Namespace string
NodeID string
ListenAddr string // WireGuard IP:port (e.g., "10.0.0.1:30000")
MediaPortStart int // Start of RTP media port range
MediaPortEnd int // End of RTP media port range
TURNServers []sfu.TURNServerConfig // TURN servers to advertise to peers
TURNSecret string // HMAC-SHA1 shared secret
TURNCredTTL int // Credential TTL in seconds
RQLiteDSN string // Namespace-local RQLite DSN
}
SFUInstanceConfig holds configuration for spawning an SFU instance
type SystemdSpawner ¶
type SystemdSpawner struct {
// contains filtered or unexported fields
}
SystemdSpawner spawns namespace cluster processes using systemd services
func NewSystemdSpawner ¶
func NewSystemdSpawner(namespaceBase string, logger *zap.Logger) *SystemdSpawner
NewSystemdSpawner creates a new systemd-based spawner
func (*SystemdSpawner) DeleteClusterState ¶
func (s *SystemdSpawner) DeleteClusterState(namespace string) error
DeleteClusterState removes cluster state and config files for a namespace.
func (*SystemdSpawner) RestartGateway ¶
func (s *SystemdSpawner) RestartGateway(ctx context.Context, namespace, nodeID string, cfg gateway.InstanceConfig) error
RestartGateway stops and re-spawns a Gateway instance with updated config. Used when gateway config changes at runtime (e.g., WebRTC enable/disable).
func (*SystemdSpawner) SaveClusterState ¶
func (s *SystemdSpawner) SaveClusterState(namespace string, data []byte) error
SaveClusterState writes cluster state JSON to the namespace data directory. Used by the spawn handler to persist state received from the coordinator node.
func (*SystemdSpawner) SpawnGateway ¶
func (s *SystemdSpawner) SpawnGateway(ctx context.Context, namespace, nodeID string, cfg gateway.InstanceConfig) error
SpawnGateway starts a Gateway instance using systemd
func (*SystemdSpawner) SpawnOlric ¶
func (s *SystemdSpawner) SpawnOlric(ctx context.Context, namespace, nodeID string, cfg olric.InstanceConfig) error
SpawnOlric starts an Olric instance using systemd
func (*SystemdSpawner) SpawnRQLite ¶
func (s *SystemdSpawner) SpawnRQLite(ctx context.Context, namespace, nodeID string, cfg rqlite.InstanceConfig) error
SpawnRQLite starts a RQLite instance using systemd
func (*SystemdSpawner) SpawnSFU ¶
func (s *SystemdSpawner) SpawnSFU(ctx context.Context, namespace, nodeID string, cfg SFUInstanceConfig) error
SpawnSFU starts an SFU instance using systemd
func (*SystemdSpawner) SpawnTURN ¶
func (s *SystemdSpawner) SpawnTURN(ctx context.Context, namespace, nodeID string, cfg TURNInstanceConfig) error
SpawnTURN starts a TURN instance using systemd
func (*SystemdSpawner) StopAll ¶
func (s *SystemdSpawner) StopAll(ctx context.Context, namespace string) error
StopAll stops all services for a namespace, including deployment processes
func (*SystemdSpawner) StopGateway ¶
func (s *SystemdSpawner) StopGateway(ctx context.Context, namespace, nodeID string) error
StopGateway stops a Gateway instance
func (*SystemdSpawner) StopOlric ¶
func (s *SystemdSpawner) StopOlric(ctx context.Context, namespace, nodeID string) error
StopOlric stops an Olric instance
func (*SystemdSpawner) StopRQLite ¶
func (s *SystemdSpawner) StopRQLite(ctx context.Context, namespace, nodeID string) error
StopRQLite stops a RQLite instance
type TURNInstanceConfig ¶
type TURNInstanceConfig struct {
Namespace string
NodeID string
ListenAddr string // e.g., "0.0.0.0:3478"
TURNSListenAddr string // e.g., "0.0.0.0:5349" (TURNS over TLS/TCP)
PublicIP string // Public IP for TURN relay allocations
Realm string // TURN realm (typically base domain)
AuthSecret string // HMAC-SHA1 shared secret
RelayPortStart int // Start of relay port range
RelayPortEnd int // End of relay port range
TURNDomain string // TURN domain for Let's Encrypt cert (e.g., "turn.ns-myapp.orama-devnet.network")
}
TURNInstanceConfig holds configuration for spawning a TURN instance
type WebRTCConfig ¶
type WebRTCConfig struct {
ID string `json:"id" db:"id"`
NamespaceClusterID string `json:"namespace_cluster_id" db:"namespace_cluster_id"`
NamespaceName string `json:"namespace_name" db:"namespace_name"`
Enabled bool `json:"enabled" db:"enabled"`
TURNCredentialTTL int `json:"turn_credential_ttl" db:"turn_credential_ttl"`
SFUNodeCount int `json:"sfu_node_count" db:"sfu_node_count"`
TURNNodeCount int `json:"turn_node_count" db:"turn_node_count"`
EnabledBy string `json:"enabled_by" db:"enabled_by"`
EnabledAt time.Time `json:"enabled_at" db:"enabled_at"`
DisabledAt *time.Time `json:"disabled_at,omitempty" db:"disabled_at"`
}
WebRTCConfig represents the per-namespace WebRTC configuration stored in the database
type WebRTCPortAllocator ¶
type WebRTCPortAllocator struct {
// contains filtered or unexported fields
}
WebRTCPortAllocator manages port allocation for SFU and TURN services. Uses the webrtc_port_allocations table, separate from namespace_port_allocations, to avoid breaking existing port blocks.
func NewWebRTCPortAllocator ¶
func NewWebRTCPortAllocator(db rqlite.Client, logger *zap.Logger) *WebRTCPortAllocator
NewWebRTCPortAllocator creates a new WebRTC port allocator
func (*WebRTCPortAllocator) AllocateSFUPorts ¶
func (wpa *WebRTCPortAllocator) AllocateSFUPorts(ctx context.Context, nodeID, namespaceClusterID string) (*WebRTCPortBlock, error)
AllocateSFUPorts allocates SFU ports for a namespace on a node. Each namespace gets: 1 signaling port (30000-30099) + 500 media ports (20000-29999). Returns the existing allocation if one already exists (idempotent).
func (*WebRTCPortAllocator) AllocateTURNPorts ¶
func (wpa *WebRTCPortAllocator) AllocateTURNPorts(ctx context.Context, nodeID, namespaceClusterID string) (*WebRTCPortBlock, error)
AllocateTURNPorts allocates TURN ports for a namespace on a node. Each namespace gets: standard listen ports (3478/443) + 800 relay ports (49152-65535). Returns the existing allocation if one already exists (idempotent).
func (*WebRTCPortAllocator) DeallocateAll ¶
func (wpa *WebRTCPortAllocator) DeallocateAll(ctx context.Context, namespaceClusterID string) error
DeallocateAll releases all WebRTC port blocks for a namespace cluster.
func (*WebRTCPortAllocator) DeallocateByNode ¶
func (wpa *WebRTCPortAllocator) DeallocateByNode(ctx context.Context, namespaceClusterID, nodeID, serviceType string) error
DeallocateByNode releases WebRTC port blocks for a specific node and service type.
func (*WebRTCPortAllocator) GetAllPorts ¶
func (wpa *WebRTCPortAllocator) GetAllPorts(ctx context.Context, namespaceClusterID string) ([]WebRTCPortBlock, error)
GetAllPorts retrieves all WebRTC port blocks for a namespace cluster.
func (*WebRTCPortAllocator) GetSFUPorts ¶
func (wpa *WebRTCPortAllocator) GetSFUPorts(ctx context.Context, namespaceClusterID, nodeID string) (*WebRTCPortBlock, error)
GetSFUPorts retrieves the SFU port allocation for a namespace on a node.
func (*WebRTCPortAllocator) GetTURNPorts ¶
func (wpa *WebRTCPortAllocator) GetTURNPorts(ctx context.Context, namespaceClusterID, nodeID string) (*WebRTCPortBlock, error)
GetTURNPorts retrieves the TURN port allocation for a namespace on a node.
func (*WebRTCPortAllocator) NodeHasTURN ¶
NodeHasTURN checks if a node already has a TURN allocation from any namespace. Used during node selection to avoid port conflicts on standard TURN ports (3478/443).
type WebRTCPortBlock ¶
type WebRTCPortBlock struct {
ID string `json:"id" db:"id"`
NodeID string `json:"node_id" db:"node_id"`
NamespaceClusterID string `json:"namespace_cluster_id" db:"namespace_cluster_id"`
ServiceType string `json:"service_type" db:"service_type"` // "sfu" or "turn"
// SFU ports
SFUSignalingPort int `json:"sfu_signaling_port,omitempty" db:"sfu_signaling_port"`
SFUMediaPortStart int `json:"sfu_media_port_start,omitempty" db:"sfu_media_port_start"`
SFUMediaPortEnd int `json:"sfu_media_port_end,omitempty" db:"sfu_media_port_end"`
// TURN ports
TURNListenPort int `json:"turn_listen_port,omitempty" db:"turn_listen_port"`
TURNTLSPort int `json:"turn_tls_port,omitempty" db:"turn_tls_port"`
TURNRelayPortStart int `json:"turn_relay_port_start,omitempty" db:"turn_relay_port_start"`
TURNRelayPortEnd int `json:"turn_relay_port_end,omitempty" db:"turn_relay_port_end"`
AllocatedAt time.Time `json:"allocated_at" db:"allocated_at"`
}
WebRTCPortBlock represents allocated WebRTC ports for a namespace on a node
type WebRTCRoom ¶
type WebRTCRoom struct {
ID string `json:"id" db:"id"`
NamespaceClusterID string `json:"namespace_cluster_id" db:"namespace_cluster_id"`
NamespaceName string `json:"namespace_name" db:"namespace_name"`
RoomID string `json:"room_id" db:"room_id"`
SFUNodeID string `json:"sfu_node_id" db:"sfu_node_id"`
SFUInternalIP string `json:"sfu_internal_ip" db:"sfu_internal_ip"`
SFUSignalingPort int `json:"sfu_signaling_port" db:"sfu_signaling_port"`
ParticipantCount int `json:"participant_count" db:"participant_count"`
MaxParticipants int `json:"max_participants" db:"max_participants"`
CreatedAt time.Time `json:"created_at" db:"created_at"`
LastActivity time.Time `json:"last_activity" db:"last_activity"`
}
WebRTCRoom represents an active WebRTC room tracked in the database