Documentation
¶
Index ¶
- Constants
- Variables
- func SendGossip(ctx context.Context, req *GossipMigration, kvStore kv.Store) error
- type ErrStealTableOwnershipFailed
- type HealthChecker
- type HealthStats
- type KVConsensusAdapter
- type ManagedNode
- type MigrationRepository
- type MigrationRepositoryKV
- func (m *MigrationRepositoryKV) AddGossipMigration(migration *Migration) error
- func (m *MigrationRepositoryKV) AddMigration(migration *Migration) error
- func (m *MigrationRepositoryKV) CommitAllMigrations(table string) error
- func (m *MigrationRepositoryKV) CommitMigrationExact(version *MigrationVersion) error
- func (m *MigrationRepositoryKV) DeleteMigration(version *MigrationVersion) error
- func (m *MigrationRepositoryKV) GetMigrationVersion(version *MigrationVersion) ([]*Migration, error)
- func (m *MigrationRepositoryKV) GetMigrationsByTable(tableName string) ([]*Migration, error)
- func (m *MigrationRepositoryKV) GetNextVersion(table string) (int64, error)
- func (m *MigrationRepositoryKV) GetUncommittedMigrations(table *Table) ([]*Migration, error)
- type NodeConnectionManager
- func (ncm *NodeConnectionManager) AddNode(ctx context.Context, node *Node) error
- func (ncm *NodeConnectionManager) ExecuteOnNode(nodeID int64, operation func(ConsensusClient) error) error
- func (ncm *NodeConnectionManager) GetActiveNodesByRegion(region string) []*ManagedNode
- func (ncm *NodeConnectionManager) GetAllActiveNodes() map[string][]*ManagedNode
- func (ncm *NodeConnectionManager) Shutdown()
- type NodeRepository
- type NodeRepositoryKV
- func (n *NodeRepositoryKV) AddNode(node *Node) error
- func (n *NodeRepositoryKV) DeleteNode(nodeID int64) error
- func (n *NodeRepositoryKV) GetNodeByAddress(address string, port uint) (*Node, error)
- func (n *NodeRepositoryKV) GetNodeById(id int64) (*Node, error)
- func (n *NodeRepositoryKV) GetNodesByRegion(region string) ([]*Node, error)
- func (n *NodeRepositoryKV) GetRandomNodes(num int64, excluding ...int64) ([]*Node, error)
- func (n *NodeRepositoryKV) GetRegions() ([]*Region, error)
- func (n *NodeRepositoryKV) Iterate(fn func(*Node) error) error
- func (n *NodeRepositoryKV) TotalCount() (int64, error)
- func (n *NodeRepositoryKV) UpdateNode(node *Node) error
- type NodeStatus
- type NodeStorageModel
- type NodeStorageModelKV
- type Quorum
- type QuorumManager
- type QuorumNode
- func (q *QuorumNode) AcceptMigration(ctx context.Context, in *WriteMigrationRequest, opts ...grpc.CallOption) (*emptypb.Empty, error)
- func (q *QuorumNode) Close()
- func (q *QuorumNode) Gossip(ctx context.Context, in *GossipMigration, opts ...grpc.CallOption) (*emptypb.Empty, error)
- func (q *QuorumNode) JoinCluster(ctx context.Context, in *Node, opts ...grpc.CallOption) (*JoinClusterResponse, error)
- func (q *QuorumNode) Ping(ctx context.Context, in *PingRequest, opts ...grpc.CallOption) (*PingResponse, error)
- func (q *QuorumNode) ReadKey(ctx context.Context, in *ReadKeyRequest, opts ...grpc.CallOption) (*ReadKeyResponse, error)
- func (q *QuorumNode) StealTableOwnership(ctx context.Context, in *StealTableOwnershipRequest, opts ...grpc.CallOption) (*StealTableOwnershipResponse, error)
- func (q *QuorumNode) WriteKey(ctx context.Context, in *WriteKeyRequest, opts ...grpc.CallOption) (*WriteKeyResponse, error)
- func (q *QuorumNode) WriteMigration(ctx context.Context, in *WriteMigrationRequest, opts ...grpc.CallOption) (*WriteMigrationResponse, error)
- type RegionName
- type Server
- func (s *Server) AcceptMigration(ctx context.Context, req *WriteMigrationRequest) (*emptypb.Empty, error)
- func (s *Server) Gossip(ctx context.Context, req *GossipMigration) (*emptypb.Empty, error)
- func (s *Server) JoinCluster(ctx context.Context, req *Node) (*JoinClusterResponse, error)
- func (s *Server) Ping(ctx context.Context, req *PingRequest) (*PingResponse, error)
- func (s *Server) ReadKey(ctx context.Context, req *ReadKeyRequest) (*ReadKeyResponse, error)
- func (s *Server) StealTableOwnership(ctx context.Context, req *StealTableOwnershipRequest) (*StealTableOwnershipResponse, error)
- func (s *Server) WriteKey(ctx context.Context, req *WriteKeyRequest) (*WriteKeyResponse, error)
- func (s *Server) WriteMigration(ctx context.Context, req *WriteMigrationRequest) (*WriteMigrationResponse, error)
- type TableRepository
- type TableRepositoryKV
- func (r *TableRepositoryKV) GetGroup(name string) (*TableGroup, error)
- func (r *TableRepositoryKV) GetShard(shard *Table, principals []*Principal) (*Shard, error)
- func (r *TableRepositoryKV) GetTable(name string) (*Table, error)
- func (r *TableRepositoryKV) GetTablesByReplicationLevel(level ReplicationLevel) ([]*Table, error)
- func (r *TableRepositoryKV) GetTablesOwnedByNode(nodeID int64) ([]*Table, error)
- func (r *TableRepositoryKV) InsertGroup(group *TableGroup) error
- func (r *TableRepositoryKV) InsertShard(shard *Shard) error
- func (r *TableRepositoryKV) InsertTable(table *Table) error
- func (r *TableRepositoryKV) UpdateGroup(group *TableGroup) error
- func (r *TableRepositoryKV) UpdateShard(shard *Shard) error
- func (r *TableRepositoryKV) UpdateTable(table *Table) error
- type TableStorageModel
Constants ¶
const NodeTable = "atlas.nodes"
Variables ¶
var ErrCannotChangeReplicationLevel = errors.New("cannot change replication level of a table")
var ErrCannotStealGroupOwnership = errors.New("cannot steal ownership of a table in a group")
var ErrKVPoolNotInitialized = errors.New("KV pool not initialized")
var ErrMetadataStoreClosed = errors.New("metadata store closed")
var ErrTablePolicyViolation = errors.New("table policy violation")
Functions ¶
Types ¶
type ErrStealTableOwnershipFailed ¶
type ErrStealTableOwnershipFailed struct {
Table *Table
}
func (ErrStealTableOwnershipFailed) Error ¶
func (e ErrStealTableOwnershipFailed) Error() string
type HealthChecker ¶
type HealthChecker struct {
// contains filtered or unexported fields
}
HealthChecker monitors node health and maintains active node lists
func NewHealthChecker ¶
func NewHealthChecker(manager *NodeConnectionManager) *HealthChecker
func NewHealthCheckerForTesting ¶
func NewHealthCheckerForTesting(manager *NodeConnectionManager) *HealthChecker
NewHealthCheckerForTesting creates a health checker with no jitter for testing
func (*HealthChecker) GetHealthStats ¶
func (hc *HealthChecker) GetHealthStats() *HealthStats
func (*HealthChecker) Start ¶
func (hc *HealthChecker) Start(ctx context.Context)
Start begins the health checking routine
type HealthStats ¶
type HealthStats struct {
TotalNodes int64 `json:"total_nodes"`
ActiveNodes int64 `json:"active_nodes"`
FailedNodes int64 `json:"failed_nodes"`
RegionStats map[string]int64 `json:"region_stats"`
AverageRTT time.Duration `json:"average_rtt"`
LastCheckTime time.Time `json:"last_check_time"`
}
GetHealthStats returns health statistics for monitoring
type KVConsensusAdapter ¶
type KVConsensusAdapter struct {
// contains filtered or unexported fields
}
KVConsensusAdapter provides a clean interface for KV operations using the existing w-paxos consensus
func NewKVConsensusAdapter ¶
func NewKVConsensusAdapter(server *Server, kvStore kv.Store) *KVConsensusAdapter
NewKVConsensusAdapter creates a new KV consensus adapter
type ManagedNode ¶
type ManagedNode struct {
*Node
// contains filtered or unexported fields
}
ManagedNode represents a node with its connection state
func (*ManagedNode) AddRTTMeasurement ¶
func (m *ManagedNode) AddRTTMeasurement(rtt time.Duration)
func (*ManagedNode) Close ¶
func (m *ManagedNode) Close()
func (*ManagedNode) GetAverageRTT ¶
func (m *ManagedNode) GetAverageRTT() time.Duration
func (*ManagedNode) GetFailures ¶
func (m *ManagedNode) GetFailures() int64
func (*ManagedNode) GetStatus ¶
func (m *ManagedNode) GetStatus() NodeStatus
func (*ManagedNode) UpdateStatus ¶
func (m *ManagedNode) UpdateStatus(status NodeStatus)
type MigrationRepository ¶
type MigrationRepository interface {
// GetUncommittedMigrations returns all uncommitted migrations -- from when this node was part of a previous quorum -- for a given table.
GetUncommittedMigrations(table *Table) ([]*Migration, error)
// AddMigration adds a migration to the migration table.
AddMigration(migration *Migration) error
// GetMigrationVersion returns all migrations for a given version.
GetMigrationVersion(version *MigrationVersion) ([]*Migration, error)
// CommitAllMigrations commits all migrations for a given table.
CommitAllMigrations(table string) error
// CommitMigrationExact commits a migration for a given version.
CommitMigrationExact(version *MigrationVersion) error
// AddGossipMigration adds a migration to the migration table as a gossiped migration.
AddGossipMigration(migration *Migration) error
// GetNextVersion returns the next version for a given table.
GetNextVersion(table string) (int64, error)
}
MigrationRepository is an interface that allows getting and maintaining migrations, which are uniquely identified by a table, table version, migration version, and sender.
func NewMigrationRepositoryKV ¶
func NewMigrationRepositoryKV(ctx context.Context, store kv.Store) MigrationRepository
NewMigrationRepositoryKV creates a new KV-based migration repository
type MigrationRepositoryKV ¶
type MigrationRepositoryKV struct {
// contains filtered or unexported fields
}
MigrationRepositoryKV implements MigrationRepository using key-value operations
func (*MigrationRepositoryKV) AddGossipMigration ¶
func (m *MigrationRepositoryKV) AddGossipMigration(migration *Migration) error
func (*MigrationRepositoryKV) AddMigration ¶
func (m *MigrationRepositoryKV) AddMigration(migration *Migration) error
func (*MigrationRepositoryKV) CommitAllMigrations ¶
func (m *MigrationRepositoryKV) CommitAllMigrations(table string) error
func (*MigrationRepositoryKV) CommitMigrationExact ¶
func (m *MigrationRepositoryKV) CommitMigrationExact(version *MigrationVersion) error
func (*MigrationRepositoryKV) DeleteMigration ¶
func (m *MigrationRepositoryKV) DeleteMigration(version *MigrationVersion) error
DeleteMigration removes a migration (useful for cleanup)
func (*MigrationRepositoryKV) GetMigrationVersion ¶
func (m *MigrationRepositoryKV) GetMigrationVersion(version *MigrationVersion) ([]*Migration, error)
func (*MigrationRepositoryKV) GetMigrationsByTable ¶
func (m *MigrationRepositoryKV) GetMigrationsByTable(tableName string) ([]*Migration, error)
GetMigrationsByTable provides efficient querying of migrations by table
func (*MigrationRepositoryKV) GetNextVersion ¶
func (m *MigrationRepositoryKV) GetNextVersion(table string) (int64, error)
func (*MigrationRepositoryKV) GetUncommittedMigrations ¶
func (m *MigrationRepositoryKV) GetUncommittedMigrations(table *Table) ([]*Migration, error)
type NodeConnectionManager ¶
type NodeConnectionManager struct {
// contains filtered or unexported fields
}
NodeConnectionManager centralizes all node connection management
func GetNodeConnectionManager ¶
func GetNodeConnectionManager(ctx context.Context) *NodeConnectionManager
GetNodeConnectionManager returns the singleton connection manager
func (*NodeConnectionManager) AddNode ¶
func (ncm *NodeConnectionManager) AddNode(ctx context.Context, node *Node) error
AddNode registers a new node and attempts connection
func (*NodeConnectionManager) ExecuteOnNode ¶
func (ncm *NodeConnectionManager) ExecuteOnNode(nodeID int64, operation func(ConsensusClient) error) error
ExecuteOnNode executes a consensus operation on a specific node
func (*NodeConnectionManager) GetActiveNodesByRegion ¶
func (ncm *NodeConnectionManager) GetActiveNodesByRegion(region string) []*ManagedNode
GetActiveNodesByRegion returns currently reachable nodes in a region
func (*NodeConnectionManager) GetAllActiveNodes ¶
func (ncm *NodeConnectionManager) GetAllActiveNodes() map[string][]*ManagedNode
GetAllActiveNodes returns all currently reachable nodes
func (*NodeConnectionManager) Shutdown ¶
func (ncm *NodeConnectionManager) Shutdown()
Shutdown gracefully closes all connections and stops background processes
type NodeRepository ¶
type NodeRepository interface {
GetNodeById(id int64) (*Node, error)
GetNodeByAddress(address string, port uint) (*Node, error)
GetNodesByRegion(region string) ([]*Node, error)
GetRegions() ([]*Region, error)
Iterate(fn func(*Node) error) error
TotalCount() (int64, error)
GetRandomNodes(num int64, excluding ...int64) ([]*Node, error)
}
func NewNodeRepositoryKV ¶
func NewNodeRepositoryKV(ctx context.Context, store kv.Store) NodeRepository
NewNodeRepositoryKV creates a new KV-based node repository
type NodeRepositoryKV ¶
type NodeRepositoryKV struct {
// contains filtered or unexported fields
}
NodeRepositoryKV implements NodeRepository using key-value operations
func (*NodeRepositoryKV) AddNode ¶
func (n *NodeRepositoryKV) AddNode(node *Node) error
AddNode adds a new node to the repository
func (*NodeRepositoryKV) DeleteNode ¶
func (n *NodeRepositoryKV) DeleteNode(nodeID int64) error
DeleteNode removes a node from the repository
func (*NodeRepositoryKV) GetNodeByAddress ¶
func (n *NodeRepositoryKV) GetNodeByAddress(address string, port uint) (*Node, error)
func (*NodeRepositoryKV) GetNodeById ¶
func (n *NodeRepositoryKV) GetNodeById(id int64) (*Node, error)
func (*NodeRepositoryKV) GetNodesByRegion ¶
func (n *NodeRepositoryKV) GetNodesByRegion(region string) ([]*Node, error)
func (*NodeRepositoryKV) GetRandomNodes ¶
func (n *NodeRepositoryKV) GetRandomNodes(num int64, excluding ...int64) ([]*Node, error)
func (*NodeRepositoryKV) GetRegions ¶
func (n *NodeRepositoryKV) GetRegions() ([]*Region, error)
func (*NodeRepositoryKV) Iterate ¶
func (n *NodeRepositoryKV) Iterate(fn func(*Node) error) error
func (*NodeRepositoryKV) TotalCount ¶
func (n *NodeRepositoryKV) TotalCount() (int64, error)
func (*NodeRepositoryKV) UpdateNode ¶
func (n *NodeRepositoryKV) UpdateNode(node *Node) error
UpdateNode updates an existing node
type NodeStatus ¶
type NodeStatus int
const ( NodeStatusUnknown NodeStatus = iota NodeStatusConnecting NodeStatusActive NodeStatusFailed NodeStatusRemoved )
type NodeStorageModel ¶
type NodeStorageModel struct {
ID int64 `json:"id"`
Address string `json:"address"`
Port int64 `json:"port"`
Region string `json:"region"`
Active bool `json:"active"`
RTT int64 `json:"rtt_nanoseconds"`
CreatedAt time.Time `json:"created_at"`
}
NodeStorageModel represents node data in KV storage
type NodeStorageModelKV ¶
type NodeStorageModelKV struct {
ID int64 `json:"id"`
Address string `json:"address"`
Port int64 `json:"port"`
Region string `json:"region"`
Active bool `json:"active"`
RTT int64 `json:"rtt_nanoseconds"`
CreatedAt time.Time `json:"created_at"`
UpdatedAt time.Time `json:"updated_at"`
}
NodeStorageModelKV represents how node data is stored in KV format
type QuorumManager ¶
type QuorumManager interface {
GetQuorum(ctx context.Context, table string) (Quorum, error)
AddNode(ctx context.Context, node *Node) error
RemoveNode(nodeID int64) error
Send(node *Node, do func(quorumNode *QuorumNode) (interface{}, error)) (interface{}, error)
}
func GetDefaultQuorumManager ¶
func GetDefaultQuorumManager(ctx context.Context) QuorumManager
type QuorumNode ¶
type QuorumNode struct {
*Node
// contains filtered or unexported fields
}
func (*QuorumNode) AcceptMigration ¶
func (q *QuorumNode) AcceptMigration(ctx context.Context, in *WriteMigrationRequest, opts ...grpc.CallOption) (*emptypb.Empty, error)
func (*QuorumNode) Close ¶
func (q *QuorumNode) Close()
func (*QuorumNode) Gossip ¶
func (q *QuorumNode) Gossip(ctx context.Context, in *GossipMigration, opts ...grpc.CallOption) (*emptypb.Empty, error)
func (*QuorumNode) JoinCluster ¶
func (q *QuorumNode) JoinCluster(ctx context.Context, in *Node, opts ...grpc.CallOption) (*JoinClusterResponse, error)
func (*QuorumNode) Ping ¶
func (q *QuorumNode) Ping(ctx context.Context, in *PingRequest, opts ...grpc.CallOption) (*PingResponse, error)
func (*QuorumNode) ReadKey ¶
func (q *QuorumNode) ReadKey(ctx context.Context, in *ReadKeyRequest, opts ...grpc.CallOption) (*ReadKeyResponse, error)
func (*QuorumNode) StealTableOwnership ¶
func (q *QuorumNode) StealTableOwnership(ctx context.Context, in *StealTableOwnershipRequest, opts ...grpc.CallOption) (*StealTableOwnershipResponse, error)
func (*QuorumNode) WriteKey ¶
func (q *QuorumNode) WriteKey(ctx context.Context, in *WriteKeyRequest, opts ...grpc.CallOption) (*WriteKeyResponse, error)
func (*QuorumNode) WriteMigration ¶
func (q *QuorumNode) WriteMigration(ctx context.Context, in *WriteMigrationRequest, opts ...grpc.CallOption) (*WriteMigrationResponse, error)
type RegionName ¶
type RegionName string
type Server ¶
type Server struct {
UnimplementedConsensusServer
}
func (*Server) AcceptMigration ¶
func (*Server) Gossip ¶
Gossip is a method that randomly disseminates information to other nodes in the cluster. A leader disseminates this to every node in the cluster after a commit.
func (*Server) JoinCluster ¶
JoinCluster adds a node to the cluster on behalf of the node.
func (*Server) StealTableOwnership ¶
type TableRepository ¶
type TableRepository interface {
// GetTable returns a table by name.
GetTable(name string) (*Table, error)
// UpdateTable updates a table.
UpdateTable(*Table) error
// InsertTable inserts a table.
InsertTable(*Table) error
// GetGroup returns a group by name.
GetGroup(string) (*TableGroup, error)
// UpdateGroup updates a group.
UpdateGroup(*TableGroup) error
// InsertGroup inserts a group.
InsertGroup(*TableGroup) error
// GetShard returns a shard of a table, given the principal.
GetShard(*Table, []*Principal) (*Shard, error)
// UpdateShard updates a shard metadata.
UpdateShard(*Shard) error
// InsertShard inserts a shard metadata.
// Ensure principals are set and the shard meta-name will be updated before inserting.
InsertShard(*Shard) error
}
func NewTableRepositoryKV ¶
func NewTableRepositoryKV(ctx context.Context, store kv.Store) TableRepository
NewTableRepositoryKV creates a new KV-based table repository
type TableRepositoryKV ¶
type TableRepositoryKV struct {
// contains filtered or unexported fields
}
TableRepositoryKV implements TableRepository using key-value operations
func (*TableRepositoryKV) GetGroup ¶
func (r *TableRepositoryKV) GetGroup(name string) (*TableGroup, error)
func (*TableRepositoryKV) GetShard ¶
func (r *TableRepositoryKV) GetShard(shard *Table, principals []*Principal) (*Shard, error)
func (*TableRepositoryKV) GetTable ¶
func (r *TableRepositoryKV) GetTable(name string) (*Table, error)
func (*TableRepositoryKV) GetTablesByReplicationLevel ¶
func (r *TableRepositoryKV) GetTablesByReplicationLevel(level ReplicationLevel) ([]*Table, error)
GetTablesByReplicationLevel provides efficient querying by replication level
func (*TableRepositoryKV) GetTablesOwnedByNode ¶
func (r *TableRepositoryKV) GetTablesOwnedByNode(nodeID int64) ([]*Table, error)
GetTablesOwnedByNode provides efficient querying by owner node
func (*TableRepositoryKV) InsertGroup ¶
func (r *TableRepositoryKV) InsertGroup(group *TableGroup) error
func (*TableRepositoryKV) InsertShard ¶
func (r *TableRepositoryKV) InsertShard(shard *Shard) error
func (*TableRepositoryKV) InsertTable ¶
func (r *TableRepositoryKV) InsertTable(table *Table) error
func (*TableRepositoryKV) UpdateGroup ¶
func (r *TableRepositoryKV) UpdateGroup(group *TableGroup) error
func (*TableRepositoryKV) UpdateShard ¶
func (r *TableRepositoryKV) UpdateShard(shard *Shard) error
func (*TableRepositoryKV) UpdateTable ¶
func (r *TableRepositoryKV) UpdateTable(table *Table) error
type TableStorageModel ¶
type TableStorageModel struct {
Name string `json:"name"`
Version int64 `json:"version"`
ReplicationLevel string `json:"replication_level"`
AllowedRegions []string `json:"allowed_regions"`
RestrictedRegions []string `json:"restricted_regions"`
OwnerNodeID *int64 `json:"owner_node_id"`
CreatedAt time.Time `json:"created_at"`
GroupID string `json:"group_id,omitempty"`
TableType string `json:"table_type"`
ShardPrincipals []string `json:"shard_principals"`
Owner *NodeStorageModel `json:"owner,omitempty"`
}
TableStorageModel represents how table data is stored in KV format