Documentation
¶
Index ¶
- Constants
- type ClusterConfig
- type ClusterResult
- type Coordinator
- func (c *Coordinator) ActiveWorkers(ctx context.Context) ([]NodeInfo, error)
- func (c *Coordinator) Broadcast(ctx context.Context, query string, vars map[string]any) *ClusterResult
- func (c *Coordinator) LoadSource(ctx context.Context, name string) (*ClusterResult, error)
- func (c *Coordinator) RegisterStorage(ctx context.Context, params StorageParams) (*ClusterResult, error)
- func (c *Coordinator) ReloadSource(ctx context.Context, name string) (*ClusterResult, error)
- func (c *Coordinator) UnloadSource(ctx context.Context, name string) (*ClusterResult, error)
- func (c *Coordinator) UnregisterStorage(ctx context.Context, name string) (*ClusterResult, error)
- type NodeInfo
- type NodeResult
- type Source
- func (*Source) AsModule() bool
- func (s *Source) Attach(ctx context.Context, pool *db.Pool) error
- func (s *Source) Catalog(ctx context.Context) (cs.Catalog, error)
- func (s *Source) Close() error
- func (*Source) Engine() engines.Engine
- func (*Source) IsReadonly() bool
- func (*Source) Name() string
- func (s *Source) WaitRegistered(ctx context.Context) error
- type StorageParams
- type WorkerClient
- func (w *WorkerClient) ForwardToManagement(ctx context.Context, query string, vars map[string]any) (*types.OperationResult, error)
- func (w *WorkerClient) HandleSourceLoad(ctx context.Context, name string) error
- func (w *WorkerClient) HandleSourceUnload(ctx context.Context, name string) error
- func (w *WorkerClient) ManagementURL(ctx context.Context) (string, error)
- func (w *WorkerClient) SyncSecrets(ctx context.Context) error
Constants ¶
const ( RoleManagement = "management" RoleWorker = "worker" )
Cluster roles.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type ClusterConfig ¶
type ClusterConfig struct {
Enabled bool // CLUSTER_ENABLED
Role string // CLUSTER_ROLE: "management" | "worker"
NodeName string // CLUSTER_NODE_NAME
NodeURL string // CLUSTER_NODE_URL (this node's IPC endpoint)
Secret string // CLUSTER_SECRET (inter-node auth)
Heartbeat time.Duration // CLUSTER_HEARTBEAT (default: 30s)
GhostTTL time.Duration // CLUSTER_GHOST_TTL (default: 2m)
PollInterval time.Duration // CLUSTER_POLL_INTERVAL (default: 30s)
}
ClusterConfig holds configuration for cluster mode. All fields are populated from CLUSTER_* environment variables.
func (ClusterConfig) IsManagement ¶
func (c ClusterConfig) IsManagement() bool
IsManagement returns true if this node is the management node.
func (ClusterConfig) IsWorker ¶
func (c ClusterConfig) IsWorker() bool
IsWorker returns true if this node is a worker node.
type ClusterResult ¶
type ClusterResult struct {
Success bool `json:"success"`
Results []NodeResult `json:"results"`
}
ClusterResult is the aggregated result of a broadcast operation.
type Coordinator ¶
type Coordinator struct {
// contains filtered or unexported fields
}
Coordinator manages cluster operations on the management node. It reads the node registry via GraphQL, broadcasts commands to workers via GraphQL/IPC, and manages heartbeat/ghost cleanup.
func NewCoordinator ¶
func NewCoordinator(config ClusterConfig, qe types.Querier) *Coordinator
NewCoordinator creates a new cluster coordinator for the management node.
func (*Coordinator) ActiveWorkers ¶
func (c *Coordinator) ActiveWorkers(ctx context.Context) ([]NodeInfo, error)
ActiveWorkers reads active worker nodes via GraphQL.
func (*Coordinator) Broadcast ¶
func (c *Coordinator) Broadcast(ctx context.Context, query string, vars map[string]any) *ClusterResult
Broadcast sends a GraphQL mutation to all active workers in parallel. Returns per-node results.
func (*Coordinator) LoadSource ¶
func (c *Coordinator) LoadSource(ctx context.Context, name string) (*ClusterResult, error)
LoadSource compiles a source on management, increments schema version, and broadcasts handle_source_load to all workers.
func (*Coordinator) RegisterStorage ¶
func (c *Coordinator) RegisterStorage(ctx context.Context, params StorageParams) (*ClusterResult, error)
RegisterStorage calls the storage module to register a secret locally, then broadcasts secret sync to all workers.
func (*Coordinator) ReloadSource ¶
func (c *Coordinator) ReloadSource(ctx context.Context, name string) (*ClusterResult, error)
ReloadSource reloads a source on management and broadcasts cache invalidation.
func (*Coordinator) UnloadSource ¶
func (c *Coordinator) UnloadSource(ctx context.Context, name string) (*ClusterResult, error)
UnloadSource unloads a source on management, increments schema version, and broadcasts handle_source_unload to all workers.
func (*Coordinator) UnregisterStorage ¶
func (c *Coordinator) UnregisterStorage(ctx context.Context, name string) (*ClusterResult, error)
UnregisterStorage calls the storage module to drop a secret locally, then broadcasts secret sync to all workers.
type NodeInfo ¶
type NodeInfo struct {
Name string `json:"name"`
URL string `json:"url"`
Role string `json:"role"`
Version string `json:"version"`
StartedAt time.Time `json:"started_at"`
LastHeartbeat time.Time `json:"last_heartbeat"`
Error string `json:"error"`
}
NodeInfo represents a registered cluster node.
type NodeResult ¶
type NodeResult struct {
Node string `json:"node"`
Success bool `json:"success"`
Error string `json:"error,omitempty"`
}
NodeResult is the per-node result of a broadcast operation.
type Source ¶
type Source struct {
// contains filtered or unexported fields
}
Source is the cluster RuntimeSource. It exposes cluster queries and mutations via GraphQL. Role-aware: management executes operations locally + broadcasts; workers forward mutations to management.
func (*Source) IsReadonly ¶
type StorageParams ¶
type StorageParams struct {
Type string
Name string
Scope string
Key string
Secret string
Region string
Endpoint string
UseSSL bool
URLStyle string
}
StorageParams holds parameters for registering an object storage secret.
type WorkerClient ¶
type WorkerClient struct {
// contains filtered or unexported fields
}
WorkerClient handles cluster operations on the worker side: secret sync from management, mutation forwarding, and schema version polling.
func NewWorkerClient ¶
func NewWorkerClient(config ClusterConfig, qe types.Querier, pool *db.Pool) *WorkerClient
NewWorkerClient creates a new worker client.
func (*WorkerClient) ForwardToManagement ¶
func (w *WorkerClient) ForwardToManagement(ctx context.Context, query string, vars map[string]any) (*types.OperationResult, error)
ForwardToManagement sends a GraphQL mutation to the management node and returns the OperationResult.
func (*WorkerClient) HandleSourceLoad ¶
func (w *WorkerClient) HandleSourceLoad(ctx context.Context, name string) error
HandleSourceLoad attaches a data source without compilation (broadcast target). Schema compilation is skipped because the datasources.Service has skipCatalogOps=true on worker nodes — schemas are managed by the management node.
func (*WorkerClient) HandleSourceUnload ¶
func (w *WorkerClient) HandleSourceUnload(ctx context.Context, name string) error
HandleSourceUnload detaches a data source (broadcast target).
func (*WorkerClient) ManagementURL ¶
func (w *WorkerClient) ManagementURL(ctx context.Context) (string, error)
ManagementURL discovers and caches the management node URL via GraphQL. Uses sync.Once to avoid races when called from multiple goroutines.
func (*WorkerClient) SyncSecrets ¶
func (w *WorkerClient) SyncSecrets(ctx context.Context) error
SyncSecrets queries management's duckdb_secrets via GraphQL and creates them locally in this node's DuckDB using CREATE OR REPLACE PERSISTENT SECRET.