cluster

package
v0.3.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 7, 2026 License: MIT Imports: 20 Imported by: 0

Documentation

Index

Constants

View Source
const (
	RoleManagement = "management"
	RoleWorker     = "worker"
)

Cluster roles.

Variables

This section is empty.

Functions

This section is empty.

Types

type ClusterConfig

type ClusterConfig struct {
	Enabled      bool          // CLUSTER_ENABLED
	Role         string        // CLUSTER_ROLE: "management" | "worker"
	NodeName     string        // CLUSTER_NODE_NAME
	NodeURL      string        // CLUSTER_NODE_URL (this node's IPC endpoint)
	Secret       string        // CLUSTER_SECRET (inter-node auth)
	Heartbeat    time.Duration // CLUSTER_HEARTBEAT (default: 30s)
	GhostTTL     time.Duration // CLUSTER_GHOST_TTL (default: 2m)
	PollInterval time.Duration // CLUSTER_POLL_INTERVAL (default: 30s)
}

ClusterConfig holds configuration for cluster mode. All fields are populated from CLUSTER_* environment variables.

func (ClusterConfig) IsManagement

func (c ClusterConfig) IsManagement() bool

IsManagement returns true if this node is the management node.

func (ClusterConfig) IsWorker

func (c ClusterConfig) IsWorker() bool

IsWorker returns true if this node is a worker node.

type ClusterResult

type ClusterResult struct {
	Success bool         `json:"success"`
	Results []NodeResult `json:"results"`
}

ClusterResult is the aggregated result of a broadcast operation.

type Coordinator

type Coordinator struct {
	// contains filtered or unexported fields
}

Coordinator manages cluster operations on the management node. It reads the node registry via GraphQL, broadcasts commands to workers via GraphQL/IPC, and manages heartbeat/ghost cleanup.

func NewCoordinator

func NewCoordinator(config ClusterConfig, qe types.Querier) *Coordinator

NewCoordinator creates a new cluster coordinator for the management node.

func (*Coordinator) ActiveWorkers

func (c *Coordinator) ActiveWorkers(ctx context.Context) ([]NodeInfo, error)

ActiveWorkers reads active worker nodes via GraphQL.

func (*Coordinator) Broadcast

func (c *Coordinator) Broadcast(ctx context.Context, query string, vars map[string]any) *ClusterResult

Broadcast sends a GraphQL mutation to all active workers in parallel. Returns per-node results.

func (*Coordinator) LoadSource

func (c *Coordinator) LoadSource(ctx context.Context, name string) (*ClusterResult, error)

LoadSource compiles a source on management, increments schema version, and broadcasts handle_source_load to all workers.

func (*Coordinator) RegisterStorage

func (c *Coordinator) RegisterStorage(ctx context.Context, params StorageParams) (*ClusterResult, error)

RegisterStorage calls the storage module to register a secret locally, then broadcasts secret sync to all workers.

func (*Coordinator) ReloadSource

func (c *Coordinator) ReloadSource(ctx context.Context, name string) (*ClusterResult, error)

ReloadSource reloads a source on management and broadcasts cache invalidation.

func (*Coordinator) UnloadSource

func (c *Coordinator) UnloadSource(ctx context.Context, name string) (*ClusterResult, error)

UnloadSource unloads a source on management, increments schema version, and broadcasts handle_source_unload to all workers.

func (*Coordinator) UnregisterStorage

func (c *Coordinator) UnregisterStorage(ctx context.Context, name string) (*ClusterResult, error)

UnregisterStorage calls the storage module to drop a secret locally, then broadcasts secret sync to all workers.

type NodeInfo

type NodeInfo struct {
	Name          string    `json:"name"`
	URL           string    `json:"url"`
	Role          string    `json:"role"`
	Version       string    `json:"version"`
	StartedAt     time.Time `json:"started_at"`
	LastHeartbeat time.Time `json:"last_heartbeat"`
	Error         string    `json:"error"`
}

NodeInfo represents a registered cluster node.

type NodeResult

type NodeResult struct {
	Node    string `json:"node"`
	Success bool   `json:"success"`
	Error   string `json:"error,omitempty"`
}

NodeResult is the per-node result of a broadcast operation.

type Source

type Source struct {
	// contains filtered or unexported fields
}

Source is the cluster RuntimeSource. It exposes cluster queries and mutations via GraphQL. Role-aware: management executes operations locally + broadcasts; workers forward mutations to management.

func NewSource

func NewSource(config ClusterConfig, qe types.Querier, provider *catalogdb.Provider) *Source

NewSource creates a new cluster runtime source.

func (*Source) AsModule

func (*Source) AsModule() bool

func (*Source) Attach

func (s *Source) Attach(ctx context.Context, pool *db.Pool) error

func (*Source) Catalog

func (s *Source) Catalog(ctx context.Context) (cs.Catalog, error)

func (*Source) Close

func (s *Source) Close() error

Close stops background goroutines.

func (*Source) Engine

func (*Source) Engine() engines.Engine

func (*Source) IsReadonly

func (*Source) IsReadonly() bool

func (*Source) Name

func (*Source) Name() string

func (*Source) WaitRegistered

func (s *Source) WaitRegistered(ctx context.Context) error

WaitRegistered blocks until the node has registered itself in the cluster or the context is cancelled.

type StorageParams

type StorageParams struct {
	Type     string
	Name     string
	Scope    string
	Key      string
	Secret   string
	Region   string
	Endpoint string
	UseSSL   bool
	URLStyle string
}

StorageParams holds parameters for registering an object storage secret.

type WorkerClient

type WorkerClient struct {
	// contains filtered or unexported fields
}

WorkerClient handles cluster operations on the worker side: secret sync from management, mutation forwarding, and schema version polling.

func NewWorkerClient

func NewWorkerClient(config ClusterConfig, qe types.Querier, pool *db.Pool) *WorkerClient

NewWorkerClient creates a new worker client.

func (*WorkerClient) ForwardToManagement

func (w *WorkerClient) ForwardToManagement(ctx context.Context, query string, vars map[string]any) (*types.OperationResult, error)

ForwardToManagement sends a GraphQL mutation to the management node and returns the OperationResult.

func (*WorkerClient) HandleSourceLoad

func (w *WorkerClient) HandleSourceLoad(ctx context.Context, name string) error

HandleSourceLoad attaches a data source without compilation (broadcast target). Schema compilation is skipped because the datasources.Service has skipCatalogOps=true on worker nodes — schemas are managed by the management node.

func (*WorkerClient) HandleSourceUnload

func (w *WorkerClient) HandleSourceUnload(ctx context.Context, name string) error

HandleSourceUnload detaches a data source (broadcast target).

func (*WorkerClient) ManagementURL

func (w *WorkerClient) ManagementURL(ctx context.Context) (string, error)

ManagementURL discovers and caches the management node URL via GraphQL. Uses sync.Once to avoid races when called from multiple goroutines.

func (*WorkerClient) SyncSecrets

func (w *WorkerClient) SyncSecrets(ctx context.Context) error

SyncSecrets queries management's duckdb_secrets via GraphQL and creates them locally in this node's DuckDB using CREATE OR REPLACE PERSISTENT SECRET.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL