namespace

package
v0.115.0-nightly Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 20, 2026 License: AGPL-3.0 Imports: 30 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// NamespacePortRangeStart is the beginning of the reserved port range for namespace services
	NamespacePortRangeStart = 10000

	// NamespacePortRangeEnd is the end of the reserved port range for namespace services
	NamespacePortRangeEnd = 10099

	// PortsPerNamespace is the number of ports required per namespace instance on a node
	// RQLite HTTP (0), RQLite Raft (1), Olric HTTP (2), Olric Memberlist (3), Gateway HTTP (4)
	PortsPerNamespace = 5

	// MaxNamespacesPerNode is the maximum number of namespace instances a single node can host
	MaxNamespacesPerNode = (NamespacePortRangeEnd - NamespacePortRangeStart + 1) / PortsPerNamespace // 20
)

Port allocation constants

View Source
const (
	// SFU media port range: 20000-29999
	// Each namespace gets a 500-port sub-range for RTP media
	SFUMediaPortRangeStart    = 20000
	SFUMediaPortRangeEnd      = 29999
	SFUMediaPortsPerNamespace = 500

	// SFU signaling ports: 30000-30099
	// Each namespace gets 1 signaling port per node
	SFUSignalingPortRangeStart = 30000
	SFUSignalingPortRangeEnd   = 30099

	// TURN relay port range: 49152-65535
	// Each namespace gets an 800-port sub-range for TURN relay
	TURNRelayPortRangeStart    = 49152
	TURNRelayPortRangeEnd      = 65535
	TURNRelayPortsPerNamespace = 800

	// TURN listen ports (standard)
	TURNDefaultPort = 3478
	TURNSPort       = 5349 // TURNS (TURN over TLS on TCP)

	// Default TURN credential TTL in seconds (10 minutes)
	DefaultTURNCredentialTTL = 600

	// Default service counts per namespace
	DefaultSFUNodeCount  = 3 // SFU on all 3 nodes
	DefaultTURNNodeCount = 2 // TURN on 2 of 3 nodes for HA
)

WebRTC port allocation constants These are separate from the core namespace port range (10000-10099) to avoid breaking existing port blocks.

View Source
const (
	DefaultRQLiteNodeCount  = 3
	DefaultOlricNodeCount   = 3
	DefaultGatewayNodeCount = 3
	PublicRQLiteNodeCount   = 5
	PublicOlricNodeCount    = 5
)

Default cluster sizes

Variables

View Source
var (
	ErrNoPortsAvailable       = &ClusterError{Message: "no ports available on node"}
	ErrNodeAtCapacity         = &ClusterError{Message: "node has reached maximum namespace instances"}
	ErrInsufficientNodes      = &ClusterError{Message: "insufficient nodes available for cluster"}
	ErrClusterNotFound        = &ClusterError{Message: "namespace cluster not found"}
	ErrClusterAlreadyExists   = &ClusterError{Message: "namespace cluster already exists"}
	ErrProvisioningFailed     = &ClusterError{Message: "cluster provisioning failed"}
	ErrNamespaceNotFound      = &ClusterError{Message: "namespace not found"}
	ErrInvalidClusterStatus   = &ClusterError{Message: "invalid cluster status for operation"}
	ErrRecoveryInProgress     = &ClusterError{Message: "recovery already in progress for this cluster"}
	ErrWebRTCAlreadyEnabled   = &ClusterError{Message: "WebRTC is already enabled for this namespace"}
	ErrWebRTCNotEnabled       = &ClusterError{Message: "WebRTC is not enabled for this namespace"}
	ErrNoWebRTCPortsAvailable = &ClusterError{Message: "no WebRTC ports available on node"}
)

Functions

This section is empty.

Types

type ClusterError

type ClusterError struct {
	Message string
	Cause   error
}

Errors

func (*ClusterError) Error

func (e *ClusterError) Error() string

func (*ClusterError) Unwrap

func (e *ClusterError) Unwrap() error

type ClusterEvent

type ClusterEvent struct {
	ID                 string    `json:"id" db:"id"`
	NamespaceClusterID string    `json:"namespace_cluster_id" db:"namespace_cluster_id"`
	EventType          EventType `json:"event_type" db:"event_type"`
	NodeID             string    `json:"node_id,omitempty" db:"node_id"`
	Message            string    `json:"message,omitempty" db:"message"`
	Metadata           string    `json:"metadata,omitempty" db:"metadata"` // JSON
	CreatedAt          time.Time `json:"created_at" db:"created_at"`
}

ClusterEvent represents an audit event for cluster lifecycle

type ClusterLocalState

type ClusterLocalState struct {
	ClusterID     string                  `json:"cluster_id"`
	NamespaceName string                  `json:"namespace_name"`
	LocalNodeID   string                  `json:"local_node_id"`
	LocalIP       string                  `json:"local_ip"`
	LocalPorts    ClusterLocalStatePorts  `json:"local_ports"`
	AllNodes      []ClusterLocalStateNode `json:"all_nodes"`
	HasGateway    bool                    `json:"has_gateway"`
	BaseDomain    string                  `json:"base_domain"`
	SavedAt       time.Time               `json:"saved_at"`

	// WebRTC fields (zero values when WebRTC not enabled — backward compatible)
	HasSFU             bool   `json:"has_sfu,omitempty"`
	HasTURN            bool   `json:"has_turn,omitempty"`
	TURNSharedSecret   string `json:"turn_shared_secret,omitempty"` // Needed for gateway to generate TURN credentials on cold start
	TURNDomain         string `json:"turn_domain,omitempty"`        // TURN server domain for gateway config
	TURNCredentialTTL  int    `json:"turn_credential_ttl,omitempty"`
	SFUSignalingPort   int    `json:"sfu_signaling_port,omitempty"`
	SFUMediaPortStart  int    `json:"sfu_media_port_start,omitempty"`
	SFUMediaPortEnd    int    `json:"sfu_media_port_end,omitempty"`
	TURNListenPort     int    `json:"turn_listen_port,omitempty"`
	TURNTLSPort        int    `json:"turn_tls_port,omitempty"`
	TURNRelayPortStart int    `json:"turn_relay_port_start,omitempty"`
	TURNRelayPortEnd   int    `json:"turn_relay_port_end,omitempty"`
}

ClusterLocalState is persisted to disk so namespace processes can be restored without querying the main RQLite cluster (which may not have a leader yet on cold start).

type ClusterLocalStateNode

type ClusterLocalStateNode struct {
	NodeID              string `json:"node_id"`
	InternalIP          string `json:"internal_ip"`
	RQLiteHTTPPort      int    `json:"rqlite_http_port"`
	RQLiteRaftPort      int    `json:"rqlite_raft_port"`
	OlricHTTPPort       int    `json:"olric_http_port"`
	OlricMemberlistPort int    `json:"olric_memberlist_port"`
}

type ClusterLocalStatePorts

type ClusterLocalStatePorts struct {
	RQLiteHTTPPort      int `json:"rqlite_http_port"`
	RQLiteRaftPort      int `json:"rqlite_raft_port"`
	OlricHTTPPort       int `json:"olric_http_port"`
	OlricMemberlistPort int `json:"olric_memberlist_port"`
	GatewayHTTPPort     int `json:"gateway_http_port"`
}

type ClusterManager

type ClusterManager struct {
	// contains filtered or unexported fields
}

ClusterManager orchestrates namespace cluster provisioning and lifecycle

func NewClusterManager

func NewClusterManager(
	db rqlite.Client,
	cfg ClusterManagerConfig,
	logger *zap.Logger,
) *ClusterManager

NewClusterManager creates a new cluster manager

func NewClusterManagerWithComponents

func NewClusterManagerWithComponents(
	db rqlite.Client,
	portAllocator *NamespacePortAllocator,
	nodeSelector *ClusterNodeSelector,
	systemdSpawner *SystemdSpawner,
	cfg ClusterManagerConfig,
	logger *zap.Logger,
) *ClusterManager

NewClusterManagerWithComponents creates a cluster manager with custom components (useful for testing)

func (*ClusterManager) CheckNamespaceCluster

func (cm *ClusterManager) CheckNamespaceCluster(ctx context.Context, namespaceName string) (string, string, bool, error)

CheckNamespaceCluster checks if a namespace has a cluster and returns its status. Returns: (clusterID, status, needsProvisioning, error) - If the namespace is "default", returns ("", "default", false, nil) as it uses the global cluster - If a cluster exists and is ready/provisioning, returns (clusterID, status, false, nil) - If no cluster exists or cluster failed, returns ("", "", true, nil) to indicate provisioning is needed

func (*ClusterManager) DeprovisionCluster

func (cm *ClusterManager) DeprovisionCluster(ctx context.Context, namespaceID int64) error

DeprovisionCluster tears down a namespace cluster on all nodes. Stops namespace infrastructure (Gateway, Olric, RQLite) on every cluster node, deletes cluster-state.json, deallocates ports, removes DNS records, and cleans up DB.

func (*ClusterManager) DisableWebRTC

func (cm *ClusterManager) DisableWebRTC(ctx context.Context, namespaceName string) error

DisableWebRTC disables WebRTC for a namespace cluster. Stops SFU/TURN services, deallocates ports, and cleans up DNS/DB.

func (*ClusterManager) EnableWebRTC

func (cm *ClusterManager) EnableWebRTC(ctx context.Context, namespaceName, enabledBy string) error

EnableWebRTC enables WebRTC (SFU + TURN) for an existing namespace cluster. Allocates ports, spawns SFU on all 3 nodes and TURN on 2 nodes, creates TURN DNS records, and updates cluster state.

func (*ClusterManager) GetCluster

func (cm *ClusterManager) GetCluster(ctx context.Context, clusterID string) (*NamespaceCluster, error)

GetCluster retrieves a cluster by ID

func (*ClusterManager) GetClusterByNamespace

func (cm *ClusterManager) GetClusterByNamespace(ctx context.Context, namespaceName string) (*NamespaceCluster, error)

GetClusterByNamespace retrieves a cluster by namespace name

func (*ClusterManager) GetClusterByNamespaceID

func (cm *ClusterManager) GetClusterByNamespaceID(ctx context.Context, namespaceID int64) (*NamespaceCluster, error)

GetClusterByNamespaceID retrieves a cluster by namespace ID

func (*ClusterManager) GetClusterStatus

func (cm *ClusterManager) GetClusterStatus(ctx context.Context, clusterID string) (*ClusterProvisioningStatus, error)

GetClusterStatus returns the current status of a namespace cluster

func (*ClusterManager) GetClusterStatusByID

func (cm *ClusterManager) GetClusterStatusByID(ctx context.Context, clusterID string) (interface{}, error)

GetClusterStatusByID returns the full status of a cluster by ID. This method is part of the ClusterProvisioner interface used by the gateway. It returns a generic struct that matches the interface definition in auth/handlers.go.

func (*ClusterManager) GetWebRTCConfig

func (cm *ClusterManager) GetWebRTCConfig(ctx context.Context, namespaceName string) (*WebRTCConfig, error)

GetWebRTCConfig returns the WebRTC configuration for a namespace. Transparently decrypts the TURN shared secret if it was encrypted at rest.

func (*ClusterManager) GetWebRTCStatus

func (cm *ClusterManager) GetWebRTCStatus(ctx context.Context, namespaceName string) (interface{}, error)

GetWebRTCStatus returns the WebRTC config as an interface{} for the WebRTCManager interface.

func (*ClusterManager) HandleDeadNode

func (cm *ClusterManager) HandleDeadNode(ctx context.Context, deadNodeID string)

HandleDeadNode processes the death of a network node by recovering all affected namespace clusters and deployment replicas. It marks all deployment replicas on the dead node as failed, updates deployment statuses, and replaces namespace cluster nodes.

func (*ClusterManager) HandleRecoveredNode

func (cm *ClusterManager) HandleRecoveredNode(ctx context.Context, nodeID string)

HandleRecoveredNode handles a previously-dead node coming back online. It checks if the node was replaced during downtime and cleans up orphaned services.

func (*ClusterManager) HandleSuspectNode

func (cm *ClusterManager) HandleSuspectNode(ctx context.Context, suspectNodeID string)

HandleSuspectNode disables DNS records for a suspect node to prevent traffic from being routed to it. Called early (T+30s) when the node first becomes suspect, before confirming it's actually dead. If the node recovers, HandleSuspectRecovery will re-enable the records.

Safety: never disables the last active record for a namespace.

func (*ClusterManager) HandleSuspectRecovery

func (cm *ClusterManager) HandleSuspectRecovery(ctx context.Context, nodeID string)

HandleSuspectRecovery re-enables DNS records for a node that recovered from suspect state without going dead. Called when the health monitor detects that a previously suspect node is responding to probes again.

func (*ClusterManager) ProvisionCluster

func (cm *ClusterManager) ProvisionCluster(ctx context.Context, namespaceID int, namespaceName, provisionedBy string) (*NamespaceCluster, error)

ProvisionCluster provisions a new 3-node cluster for a namespace This is an async operation - returns immediately with cluster ID for polling

func (*ClusterManager) ProvisionNamespaceCluster

func (cm *ClusterManager) ProvisionNamespaceCluster(ctx context.Context, namespaceID int, namespaceName, wallet string) (string, string, error)

ProvisionNamespaceCluster triggers provisioning for a new namespace cluster. Returns: (clusterID, pollURL, error) This starts an async provisioning process and returns immediately with the cluster ID and a URL to poll for status updates.

func (*ClusterManager) RepairCluster

func (cm *ClusterManager) RepairCluster(ctx context.Context, namespaceName string) error

RepairCluster checks a namespace cluster for missing nodes and adds replacements without touching surviving nodes. This is used to repair under-provisioned clusters (e.g., after manual node removal) without data loss or downtime.

func (*ClusterManager) ReplaceClusterNode

func (cm *ClusterManager) ReplaceClusterNode(ctx context.Context, cluster *NamespaceCluster, deadNodeID string) error

ReplaceClusterNode replaces a dead node in a specific namespace cluster. It selects a new node, allocates ports, spawns services, updates DNS, and cleans up.

func (*ClusterManager) RestoreLocalClusters

func (cm *ClusterManager) RestoreLocalClusters(ctx context.Context) error

RestoreLocalClusters restores namespace cluster processes that should be running on this node. Called on node startup to re-spawn RQLite, Olric, and Gateway processes for clusters that were previously provisioned and assigned to this node.

func (*ClusterManager) RestoreLocalClustersFromDisk

func (cm *ClusterManager) RestoreLocalClustersFromDisk(ctx context.Context) (int, error)

RestoreLocalClustersFromDisk restores namespace processes using local state files, avoiding any dependency on the main RQLite cluster being available. Returns the number of namespaces restored, or -1 if no state files were found.

func (*ClusterManager) SetLocalNodeID

func (cm *ClusterManager) SetLocalNodeID(id string)

SetLocalNodeID sets this node's peer ID for local/remote dispatch during provisioning

type ClusterManagerConfig

type ClusterManagerConfig struct {
	BaseDomain      string // Base domain for namespace gateways (e.g., "orama-devnet.network")
	BaseDataDir     string // Base directory for namespace data (e.g., "~/.orama/data/namespaces")
	GlobalRQLiteDSN string // Global RQLite DSN for API key validation (e.g., "http://localhost:4001")
	// IPFS configuration for namespace gateways (defaults used if not set)
	IPFSClusterAPIURL     string        // IPFS Cluster API URL (default: "http://localhost:9094")
	IPFSAPIURL            string        // IPFS API URL (default: "http://localhost:4501")
	IPFSTimeout           time.Duration // Timeout for IPFS operations (default: 60s)
	IPFSReplicationFactor int           // IPFS replication factor (default: 3)

	// TurnEncryptionKey is a 32-byte AES-256 key for encrypting TURN shared secrets
	// in RQLite. Derived from cluster secret via HKDF(clusterSecret, "turn-encryption").
	// If nil, TURN secrets are stored in plaintext (backward compatibility).
	TurnEncryptionKey []byte
}

ClusterManagerConfig contains configuration for the cluster manager

type ClusterNode

type ClusterNode struct {
	ID                  string     `json:"id" db:"id"`
	NamespaceClusterID  string     `json:"namespace_cluster_id" db:"namespace_cluster_id"`
	NodeID              string     `json:"node_id" db:"node_id"`
	Role                NodeRole   `json:"role" db:"role"`
	RQLiteHTTPPort      int        `json:"rqlite_http_port,omitempty" db:"rqlite_http_port"`
	RQLiteRaftPort      int        `json:"rqlite_raft_port,omitempty" db:"rqlite_raft_port"`
	OlricHTTPPort       int        `json:"olric_http_port,omitempty" db:"olric_http_port"`
	OlricMemberlistPort int        `json:"olric_memberlist_port,omitempty" db:"olric_memberlist_port"`
	GatewayHTTPPort     int        `json:"gateway_http_port,omitempty" db:"gateway_http_port"`
	Status              NodeStatus `json:"status" db:"status"`
	ProcessPID          int        `json:"process_pid,omitempty" db:"process_pid"`
	LastHeartbeat       *time.Time `json:"last_heartbeat,omitempty" db:"last_heartbeat"`
	ErrorMessage        string     `json:"error_message,omitempty" db:"error_message"`
	RQLiteJoinAddress   string     `json:"rqlite_join_address,omitempty" db:"rqlite_join_address"`
	OlricPeers          string     `json:"olric_peers,omitempty" db:"olric_peers"` // JSON array
	CreatedAt           time.Time  `json:"created_at" db:"created_at"`
	UpdatedAt           time.Time  `json:"updated_at" db:"updated_at"`
}

ClusterNode represents a node participating in a namespace cluster

type ClusterNodeSelector

type ClusterNodeSelector struct {
	// contains filtered or unexported fields
}

ClusterNodeSelector selects optimal nodes for namespace clusters. It extends the existing capacity scoring system from deployments/home_node.go to select multiple nodes based on available capacity.

func NewClusterNodeSelector

func NewClusterNodeSelector(db rqlite.Client, portAllocator *NamespacePortAllocator, logger *zap.Logger) *ClusterNodeSelector

NewClusterNodeSelector creates a new node selector

func (*ClusterNodeSelector) SelectNodesForCluster

func (cns *ClusterNodeSelector) SelectNodesForCluster(ctx context.Context, nodeCount int) ([]NodeCapacity, error)

SelectNodesForCluster selects the optimal N nodes for a new namespace cluster. Returns the node IDs sorted by score (best first).

func (*ClusterNodeSelector) SelectReplacementNode

func (cns *ClusterNodeSelector) SelectReplacementNode(ctx context.Context, excludeNodeIDs []string) (*NodeCapacity, error)

SelectReplacementNode selects a single optimal node for replacing a dead node in an existing cluster. excludeNodeIDs contains nodes that should not be selected (dead node + existing cluster members).

type ClusterProvisioningStatus

type ClusterProvisioningStatus struct {
	ClusterID    string        `json:"cluster_id"`
	Namespace    string        `json:"namespace"`
	Status       ClusterStatus `json:"status"`
	Nodes        []string      `json:"nodes"`
	RQLiteReady  bool          `json:"rqlite_ready"`
	OlricReady   bool          `json:"olric_ready"`
	GatewayReady bool          `json:"gateway_ready"`
	DNSReady     bool          `json:"dns_ready"`
	Error        string        `json:"error,omitempty"`
	CreatedAt    time.Time     `json:"created_at"`
	ReadyAt      *time.Time    `json:"ready_at,omitempty"`
}

ClusterProvisioningStatus is the response format for the /v1/namespace/status endpoint

type ClusterStatus

type ClusterStatus string

ClusterStatus represents the current state of a namespace cluster

const (
	ClusterStatusNone           ClusterStatus = "none"           // No cluster provisioned
	ClusterStatusProvisioning   ClusterStatus = "provisioning"   // Cluster is being provisioned
	ClusterStatusReady          ClusterStatus = "ready"          // Cluster is operational
	ClusterStatusDegraded       ClusterStatus = "degraded"       // Some nodes are unhealthy
	ClusterStatusFailed         ClusterStatus = "failed"         // Cluster failed to provision/operate
	ClusterStatusDeprovisioning ClusterStatus = "deprovisioning" // Cluster is being deprovisioned
)

type DNSRecordManager

type DNSRecordManager struct {
	// contains filtered or unexported fields
}

DNSRecordManager manages DNS records for namespace clusters. It creates and deletes DNS A records for namespace gateway endpoints.

func NewDNSRecordManager

func NewDNSRecordManager(db rqlite.Client, baseDomain string, logger *zap.Logger) *DNSRecordManager

NewDNSRecordManager creates a new DNS record manager

func (*DNSRecordManager) AddNamespaceRecord

func (drm *DNSRecordManager) AddNamespaceRecord(ctx context.Context, namespaceName, ip string) error

AddNamespaceRecord adds DNS A records for a single IP to an existing namespace. Unlike CreateNamespaceRecords, this does NOT delete existing records — it's purely additive. Used when adding a new node to an under-provisioned cluster (repair).

func (*DNSRecordManager) CountActiveNamespaceRecords

func (drm *DNSRecordManager) CountActiveNamespaceRecords(ctx context.Context, namespaceName string) (int, error)

CountActiveNamespaceRecords returns the number of active A records for a namespace's main FQDN. Used as a safety check before disabling records to prevent disabling the last one.

func (*DNSRecordManager) CreateNamespaceRecords

func (drm *DNSRecordManager) CreateNamespaceRecords(ctx context.Context, namespaceName string, nodeIPs []string) error

CreateNamespaceRecords creates DNS A records for a namespace cluster. Each namespace gets records for ns-{namespace}.{baseDomain} pointing to its gateway nodes. Multiple A records enable round-robin DNS load balancing.

func (*DNSRecordManager) CreateTURNRecords

func (drm *DNSRecordManager) CreateTURNRecords(ctx context.Context, namespaceName string, turnIPs []string) error

CreateTURNRecords creates DNS A records for TURN servers. TURN records follow the pattern: turn.ns-{namespace}.{baseDomain} -> TURN node IPs

func (*DNSRecordManager) DeleteNamespaceRecords

func (drm *DNSRecordManager) DeleteNamespaceRecords(ctx context.Context, namespaceName string) error

DeleteNamespaceRecords deletes all DNS records for a namespace

func (*DNSRecordManager) DeleteTURNRecords

func (drm *DNSRecordManager) DeleteTURNRecords(ctx context.Context, namespaceName string) error

DeleteTURNRecords deletes all TURN DNS records for a namespace.

func (*DNSRecordManager) DisableNamespaceRecord

func (drm *DNSRecordManager) DisableNamespaceRecord(ctx context.Context, namespaceName, ip string) error

DisableNamespaceRecord marks a specific IP's record as inactive (for temporary failover)

func (*DNSRecordManager) EnableNamespaceRecord

func (drm *DNSRecordManager) EnableNamespaceRecord(ctx context.Context, namespaceName, ip string) error

EnableNamespaceRecord marks a specific IP's record as active (for recovery)

func (*DNSRecordManager) GetNamespaceGatewayIPs

func (drm *DNSRecordManager) GetNamespaceGatewayIPs(ctx context.Context, namespaceName string) ([]string, error)

GetNamespaceGatewayIPs returns the IP addresses for a namespace's gateway

func (*DNSRecordManager) UpdateNamespaceRecord

func (drm *DNSRecordManager) UpdateNamespaceRecord(ctx context.Context, namespaceName, oldIP, newIP string) error

UpdateNamespaceRecord updates a specific node's DNS record (for failover)

type EventType

type EventType string

EventType represents types of cluster lifecycle events

const (
	EventProvisioningStarted EventType = "provisioning_started"
	EventNodesSelected       EventType = "nodes_selected"
	EventPortsAllocated      EventType = "ports_allocated"
	EventRQLiteStarted       EventType = "rqlite_started"
	EventRQLiteJoined        EventType = "rqlite_joined"
	EventRQLiteLeaderElected EventType = "rqlite_leader_elected"
	EventOlricStarted        EventType = "olric_started"
	EventOlricJoined         EventType = "olric_joined"
	EventGatewayStarted      EventType = "gateway_started"
	EventDNSCreated          EventType = "dns_created"
	EventClusterReady        EventType = "cluster_ready"
	EventClusterDegraded     EventType = "cluster_degraded"
	EventClusterFailed       EventType = "cluster_failed"
	EventNodeFailed          EventType = "node_failed"
	EventNodeRecovered       EventType = "node_recovered"
	EventDeprovisionStarted  EventType = "deprovisioning_started"
	EventDeprovisioned       EventType = "deprovisioned"
	EventRecoveryStarted     EventType = "recovery_started"
	EventNodeReplaced        EventType = "node_replaced"
	EventRecoveryComplete    EventType = "recovery_complete"
	EventRecoveryFailed      EventType = "recovery_failed"
	EventWebRTCEnabled       EventType = "webrtc_enabled"
	EventWebRTCDisabled      EventType = "webrtc_disabled"
	EventSFUStarted          EventType = "sfu_started"
	EventSFUStopped          EventType = "sfu_stopped"
	EventTURNStarted         EventType = "turn_started"
	EventTURNStopped         EventType = "turn_stopped"
)

type NamespaceCluster

type NamespaceCluster struct {
	ID               string        `json:"id" db:"id"`
	NamespaceID      int           `json:"namespace_id" db:"namespace_id"`
	NamespaceName    string        `json:"namespace_name" db:"namespace_name"`
	Status           ClusterStatus `json:"status" db:"status"`
	RQLiteNodeCount  int           `json:"rqlite_node_count" db:"rqlite_node_count"`
	OlricNodeCount   int           `json:"olric_node_count" db:"olric_node_count"`
	GatewayNodeCount int           `json:"gateway_node_count" db:"gateway_node_count"`
	ProvisionedBy    string        `json:"provisioned_by" db:"provisioned_by"`
	ProvisionedAt    time.Time     `json:"provisioned_at" db:"provisioned_at"`
	ReadyAt          *time.Time    `json:"ready_at,omitempty" db:"ready_at"`
	LastHealthCheck  *time.Time    `json:"last_health_check,omitempty" db:"last_health_check"`
	ErrorMessage     string        `json:"error_message,omitempty" db:"error_message"`
	RetryCount       int           `json:"retry_count" db:"retry_count"`

	// Populated by queries, not stored directly
	Nodes []ClusterNode `json:"nodes,omitempty"`
}

NamespaceCluster represents a dedicated cluster for a namespace

type NamespacePortAllocator

type NamespacePortAllocator struct {
	// contains filtered or unexported fields
}

NamespacePortAllocator manages the reserved port range (10000-10099) for namespace services. Each namespace instance on a node gets a block of 5 consecutive ports.

func NewNamespacePortAllocator

func NewNamespacePortAllocator(db rqlite.Client, logger *zap.Logger) *NamespacePortAllocator

NewNamespacePortAllocator creates a new port allocator

func (*NamespacePortAllocator) AllocatePortBlock

func (npa *NamespacePortAllocator) AllocatePortBlock(ctx context.Context, nodeID, namespaceClusterID string) (*PortBlock, error)

AllocatePortBlock finds and allocates the next available 5-port block on a node. Returns an error if the node is at capacity (20 namespace instances).

func (*NamespacePortAllocator) DeallocateAllPortBlocks

func (npa *NamespacePortAllocator) DeallocateAllPortBlocks(ctx context.Context, namespaceClusterID string) error

DeallocateAllPortBlocks releases all port blocks for a namespace cluster

func (*NamespacePortAllocator) DeallocatePortBlock

func (npa *NamespacePortAllocator) DeallocatePortBlock(ctx context.Context, namespaceClusterID, nodeID string) error

DeallocatePortBlock releases a port block when a namespace is deprovisioned

func (*NamespacePortAllocator) GetAllPortBlocks

func (npa *NamespacePortAllocator) GetAllPortBlocks(ctx context.Context, namespaceClusterID string) ([]PortBlock, error)

GetAllPortBlocks retrieves all port blocks for a namespace cluster

func (*NamespacePortAllocator) GetNodeAllocationCount

func (npa *NamespacePortAllocator) GetNodeAllocationCount(ctx context.Context, nodeID string) (int, error)

GetNodeAllocationCount returns the number of namespace instances on a node

func (*NamespacePortAllocator) GetNodeCapacity

func (npa *NamespacePortAllocator) GetNodeCapacity(ctx context.Context, nodeID string) (int, error)

GetNodeCapacity returns how many more namespace instances a node can host

func (*NamespacePortAllocator) GetPortBlock

func (npa *NamespacePortAllocator) GetPortBlock(ctx context.Context, namespaceClusterID, nodeID string) (*PortBlock, error)

GetPortBlock retrieves the port block for a namespace on a specific node

type NodeCapacity

type NodeCapacity struct {
	NodeID                  string  `json:"node_id"`
	IPAddress               string  `json:"ip_address"`
	InternalIP              string  `json:"internal_ip"` // WireGuard IP for inter-node communication
	DeploymentCount         int     `json:"deployment_count"`
	AllocatedPorts          int     `json:"allocated_ports"`
	AvailablePorts          int     `json:"available_ports"`
	UsedMemoryMB            int     `json:"used_memory_mb"`
	AvailableMemoryMB       int     `json:"available_memory_mb"`
	UsedCPUPercent          int     `json:"used_cpu_percent"`
	NamespaceInstanceCount  int     `json:"namespace_instance_count"`  // Number of namespace clusters on this node
	AvailableNamespaceSlots int     `json:"available_namespace_slots"` // How many more namespace instances can fit
	Score                   float64 `json:"score"`
}

NodeCapacity represents the capacity metrics for a single node

type NodeRole

type NodeRole string

NodeRole represents the role of a node in a namespace cluster

const (
	NodeRoleRQLiteLeader   NodeRole = "rqlite_leader"
	NodeRoleRQLiteFollower NodeRole = "rqlite_follower"
	NodeRoleOlric          NodeRole = "olric"
	NodeRoleGateway        NodeRole = "gateway"
	NodeRoleSFU            NodeRole = "sfu"
	NodeRoleTURN           NodeRole = "turn"
)

type NodeStatus

type NodeStatus string

NodeStatus represents the status of a service on a node

const (
	NodeStatusPending  NodeStatus = "pending"
	NodeStatusStarting NodeStatus = "starting"
	NodeStatusRunning  NodeStatus = "running"
	NodeStatusStopped  NodeStatus = "stopped"
	NodeStatusFailed   NodeStatus = "failed"
)

type PortBlock

type PortBlock struct {
	ID                  string    `json:"id" db:"id"`
	NodeID              string    `json:"node_id" db:"node_id"`
	NamespaceClusterID  string    `json:"namespace_cluster_id" db:"namespace_cluster_id"`
	PortStart           int       `json:"port_start" db:"port_start"`
	PortEnd             int       `json:"port_end" db:"port_end"`
	RQLiteHTTPPort      int       `json:"rqlite_http_port" db:"rqlite_http_port"`
	RQLiteRaftPort      int       `json:"rqlite_raft_port" db:"rqlite_raft_port"`
	OlricHTTPPort       int       `json:"olric_http_port" db:"olric_http_port"`
	OlricMemberlistPort int       `json:"olric_memberlist_port" db:"olric_memberlist_port"`
	GatewayHTTPPort     int       `json:"gateway_http_port" db:"gateway_http_port"`
	AllocatedAt         time.Time `json:"allocated_at" db:"allocated_at"`
}

PortBlock represents an allocated block of ports for a namespace on a node

type ProvisioningResponse

type ProvisioningResponse struct {
	Status               string `json:"status"`
	ClusterID            string `json:"cluster_id"`
	PollURL              string `json:"poll_url"`
	EstimatedTimeSeconds int    `json:"estimated_time_seconds"`
}

ProvisioningResponse is returned when a new namespace triggers cluster provisioning

type SFUInstanceConfig

type SFUInstanceConfig struct {
	Namespace      string
	NodeID         string
	ListenAddr     string                 // WireGuard IP:port (e.g., "10.0.0.1:30000")
	MediaPortStart int                    // Start of RTP media port range
	MediaPortEnd   int                    // End of RTP media port range
	TURNServers    []sfu.TURNServerConfig // TURN servers to advertise to peers
	TURNSecret     string                 // HMAC-SHA1 shared secret
	TURNCredTTL    int                    // Credential TTL in seconds
	RQLiteDSN      string                 // Namespace-local RQLite DSN
}

SFUInstanceConfig holds configuration for spawning an SFU instance

type SystemdSpawner

type SystemdSpawner struct {
	// contains filtered or unexported fields
}

SystemdSpawner spawns namespace cluster processes using systemd services

func NewSystemdSpawner

func NewSystemdSpawner(namespaceBase string, logger *zap.Logger) *SystemdSpawner

NewSystemdSpawner creates a new systemd-based spawner

func (*SystemdSpawner) DeleteClusterState

func (s *SystemdSpawner) DeleteClusterState(namespace string) error

DeleteClusterState removes cluster state and config files for a namespace.

func (*SystemdSpawner) RestartGateway

func (s *SystemdSpawner) RestartGateway(ctx context.Context, namespace, nodeID string, cfg gateway.InstanceConfig) error

RestartGateway stops and re-spawns a Gateway instance with updated config. Used when gateway config changes at runtime (e.g., WebRTC enable/disable).

func (*SystemdSpawner) SaveClusterState

func (s *SystemdSpawner) SaveClusterState(namespace string, data []byte) error

SaveClusterState writes cluster state JSON to the namespace data directory. Used by the spawn handler to persist state received from the coordinator node.

func (*SystemdSpawner) SpawnGateway

func (s *SystemdSpawner) SpawnGateway(ctx context.Context, namespace, nodeID string, cfg gateway.InstanceConfig) error

SpawnGateway starts a Gateway instance using systemd

func (*SystemdSpawner) SpawnOlric

func (s *SystemdSpawner) SpawnOlric(ctx context.Context, namespace, nodeID string, cfg olric.InstanceConfig) error

SpawnOlric starts an Olric instance using systemd

func (*SystemdSpawner) SpawnRQLite

func (s *SystemdSpawner) SpawnRQLite(ctx context.Context, namespace, nodeID string, cfg rqlite.InstanceConfig) error

SpawnRQLite starts a RQLite instance using systemd

func (*SystemdSpawner) SpawnSFU

func (s *SystemdSpawner) SpawnSFU(ctx context.Context, namespace, nodeID string, cfg SFUInstanceConfig) error

SpawnSFU starts an SFU instance using systemd

func (*SystemdSpawner) SpawnTURN

func (s *SystemdSpawner) SpawnTURN(ctx context.Context, namespace, nodeID string, cfg TURNInstanceConfig) error

SpawnTURN starts a TURN instance using systemd

func (*SystemdSpawner) StopAll

func (s *SystemdSpawner) StopAll(ctx context.Context, namespace string) error

StopAll stops all services for a namespace, including deployment processes

func (*SystemdSpawner) StopGateway

func (s *SystemdSpawner) StopGateway(ctx context.Context, namespace, nodeID string) error

StopGateway stops a Gateway instance

func (*SystemdSpawner) StopOlric

func (s *SystemdSpawner) StopOlric(ctx context.Context, namespace, nodeID string) error

StopOlric stops an Olric instance

func (*SystemdSpawner) StopRQLite

func (s *SystemdSpawner) StopRQLite(ctx context.Context, namespace, nodeID string) error

StopRQLite stops a RQLite instance

func (*SystemdSpawner) StopSFU

func (s *SystemdSpawner) StopSFU(ctx context.Context, namespace, nodeID string) error

StopSFU stops an SFU instance

func (*SystemdSpawner) StopTURN

func (s *SystemdSpawner) StopTURN(ctx context.Context, namespace, nodeID string) error

StopTURN stops a TURN instance

type TURNInstanceConfig

type TURNInstanceConfig struct {
	Namespace       string
	NodeID          string
	ListenAddr      string // e.g., "0.0.0.0:3478"
	TURNSListenAddr string // e.g., "0.0.0.0:5349" (TURNS over TLS/TCP)
	PublicIP        string // Public IP for TURN relay allocations
	Realm           string // TURN realm (typically base domain)
	AuthSecret      string // HMAC-SHA1 shared secret
	RelayPortStart  int    // Start of relay port range
	RelayPortEnd    int    // End of relay port range
	TURNDomain      string // TURN domain for Let's Encrypt cert (e.g., "turn.ns-myapp.orama-devnet.network")
}

TURNInstanceConfig holds configuration for spawning a TURN instance

type WebRTCConfig

type WebRTCConfig struct {
	ID                 string     `json:"id" db:"id"`
	NamespaceClusterID string     `json:"namespace_cluster_id" db:"namespace_cluster_id"`
	NamespaceName      string     `json:"namespace_name" db:"namespace_name"`
	Enabled            bool       `json:"enabled" db:"enabled"`
	TURNSharedSecret   string     `json:"-" db:"turn_shared_secret"` // Never serialize secret to JSON
	TURNCredentialTTL  int        `json:"turn_credential_ttl" db:"turn_credential_ttl"`
	SFUNodeCount       int        `json:"sfu_node_count" db:"sfu_node_count"`
	TURNNodeCount      int        `json:"turn_node_count" db:"turn_node_count"`
	EnabledBy          string     `json:"enabled_by" db:"enabled_by"`
	EnabledAt          time.Time  `json:"enabled_at" db:"enabled_at"`
	DisabledAt         *time.Time `json:"disabled_at,omitempty" db:"disabled_at"`
}

WebRTCConfig represents the per-namespace WebRTC configuration stored in the database

type WebRTCPortAllocator

type WebRTCPortAllocator struct {
	// contains filtered or unexported fields
}

WebRTCPortAllocator manages port allocation for SFU and TURN services. Uses the webrtc_port_allocations table, separate from namespace_port_allocations, to avoid breaking existing port blocks.

func NewWebRTCPortAllocator

func NewWebRTCPortAllocator(db rqlite.Client, logger *zap.Logger) *WebRTCPortAllocator

NewWebRTCPortAllocator creates a new WebRTC port allocator

func (*WebRTCPortAllocator) AllocateSFUPorts

func (wpa *WebRTCPortAllocator) AllocateSFUPorts(ctx context.Context, nodeID, namespaceClusterID string) (*WebRTCPortBlock, error)

AllocateSFUPorts allocates SFU ports for a namespace on a node. Each namespace gets: 1 signaling port (30000-30099) + 500 media ports (20000-29999). Returns the existing allocation if one already exists (idempotent).

func (*WebRTCPortAllocator) AllocateTURNPorts

func (wpa *WebRTCPortAllocator) AllocateTURNPorts(ctx context.Context, nodeID, namespaceClusterID string) (*WebRTCPortBlock, error)

AllocateTURNPorts allocates TURN ports for a namespace on a node. Each namespace gets: standard listen ports (3478/443) + 800 relay ports (49152-65535). Returns the existing allocation if one already exists (idempotent).

func (*WebRTCPortAllocator) DeallocateAll

func (wpa *WebRTCPortAllocator) DeallocateAll(ctx context.Context, namespaceClusterID string) error

DeallocateAll releases all WebRTC port blocks for a namespace cluster.

func (*WebRTCPortAllocator) DeallocateByNode

func (wpa *WebRTCPortAllocator) DeallocateByNode(ctx context.Context, namespaceClusterID, nodeID, serviceType string) error

DeallocateByNode releases WebRTC port blocks for a specific node and service type.

func (*WebRTCPortAllocator) GetAllPorts

func (wpa *WebRTCPortAllocator) GetAllPorts(ctx context.Context, namespaceClusterID string) ([]WebRTCPortBlock, error)

GetAllPorts retrieves all WebRTC port blocks for a namespace cluster.

func (*WebRTCPortAllocator) GetSFUPorts

func (wpa *WebRTCPortAllocator) GetSFUPorts(ctx context.Context, namespaceClusterID, nodeID string) (*WebRTCPortBlock, error)

GetSFUPorts retrieves the SFU port allocation for a namespace on a node.

func (*WebRTCPortAllocator) GetTURNPorts

func (wpa *WebRTCPortAllocator) GetTURNPorts(ctx context.Context, namespaceClusterID, nodeID string) (*WebRTCPortBlock, error)

GetTURNPorts retrieves the TURN port allocation for a namespace on a node.

func (*WebRTCPortAllocator) NodeHasTURN

func (wpa *WebRTCPortAllocator) NodeHasTURN(ctx context.Context, nodeID string) (bool, error)

NodeHasTURN checks if a node already has a TURN allocation from any namespace. Used during node selection to avoid port conflicts on standard TURN ports (3478/443).

type WebRTCPortBlock

type WebRTCPortBlock struct {
	ID                 string `json:"id" db:"id"`
	NodeID             string `json:"node_id" db:"node_id"`
	NamespaceClusterID string `json:"namespace_cluster_id" db:"namespace_cluster_id"`
	ServiceType        string `json:"service_type" db:"service_type"` // "sfu" or "turn"

	// SFU ports
	SFUSignalingPort  int `json:"sfu_signaling_port,omitempty" db:"sfu_signaling_port"`
	SFUMediaPortStart int `json:"sfu_media_port_start,omitempty" db:"sfu_media_port_start"`
	SFUMediaPortEnd   int `json:"sfu_media_port_end,omitempty" db:"sfu_media_port_end"`

	// TURN ports
	TURNListenPort     int `json:"turn_listen_port,omitempty" db:"turn_listen_port"`
	TURNTLSPort        int `json:"turn_tls_port,omitempty" db:"turn_tls_port"`
	TURNRelayPortStart int `json:"turn_relay_port_start,omitempty" db:"turn_relay_port_start"`
	TURNRelayPortEnd   int `json:"turn_relay_port_end,omitempty" db:"turn_relay_port_end"`

	AllocatedAt time.Time `json:"allocated_at" db:"allocated_at"`
}

WebRTCPortBlock represents allocated WebRTC ports for a namespace on a node

type WebRTCRoom

type WebRTCRoom struct {
	ID                 string    `json:"id" db:"id"`
	NamespaceClusterID string    `json:"namespace_cluster_id" db:"namespace_cluster_id"`
	NamespaceName      string    `json:"namespace_name" db:"namespace_name"`
	RoomID             string    `json:"room_id" db:"room_id"`
	SFUNodeID          string    `json:"sfu_node_id" db:"sfu_node_id"`
	SFUInternalIP      string    `json:"sfu_internal_ip" db:"sfu_internal_ip"`
	SFUSignalingPort   int       `json:"sfu_signaling_port" db:"sfu_signaling_port"`
	ParticipantCount   int       `json:"participant_count" db:"participant_count"`
	MaxParticipants    int       `json:"max_participants" db:"max_participants"`
	CreatedAt          time.Time `json:"created_at" db:"created_at"`
	LastActivity       time.Time `json:"last_activity" db:"last_activity"`
}

WebRTCRoom represents an active WebRTC room tracked in the database

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL