consensus

package
v0.0.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 5, 2025 License: AGPL-3.0 Imports: 26 Imported by: 0

Documentation

Index

Constants

View Source
const NodeTable = "atlas.nodes"

Variables

View Source
var ErrCannotChangeReplicationLevel = errors.New("cannot change replication level of a table")
View Source
var ErrCannotStealGroupOwnership = errors.New("cannot steal ownership of a table in a group")
View Source
var ErrKVPoolNotInitialized = errors.New("KV pool not initialized")
View Source
var ErrMetadataStoreClosed = errors.New("metadata store closed")
View Source
var ErrTablePolicyViolation = errors.New("table policy violation")

Functions

func SendGossip

func SendGossip(ctx context.Context, req *GossipMigration, kvStore kv.Store) error

Types

type ErrStealTableOwnershipFailed

type ErrStealTableOwnershipFailed struct {
	Table *Table
}

func (ErrStealTableOwnershipFailed) Error

type HealthChecker

type HealthChecker struct {
	// contains filtered or unexported fields
}

HealthChecker monitors node health and maintains active node lists

func NewHealthChecker

func NewHealthChecker(manager *NodeConnectionManager) *HealthChecker

func NewHealthCheckerForTesting

func NewHealthCheckerForTesting(manager *NodeConnectionManager) *HealthChecker

NewHealthCheckerForTesting creates a health checker with no jitter for testing

func (*HealthChecker) GetHealthStats

func (hc *HealthChecker) GetHealthStats() *HealthStats

func (*HealthChecker) Start

func (hc *HealthChecker) Start(ctx context.Context)

Start begins the health checking routine

func (*HealthChecker) Stop

func (hc *HealthChecker) Stop()

Stop halts the health checking routine

type HealthStats

type HealthStats struct {
	TotalNodes    int64            `json:"total_nodes"`
	ActiveNodes   int64            `json:"active_nodes"`
	FailedNodes   int64            `json:"failed_nodes"`
	RegionStats   map[string]int64 `json:"region_stats"`
	AverageRTT    time.Duration    `json:"average_rtt"`
	LastCheckTime time.Time        `json:"last_check_time"`
}

GetHealthStats returns health statistics for monitoring

type KVConsensusAdapter

type KVConsensusAdapter struct {
	// contains filtered or unexported fields
}

KVConsensusAdapter provides a clean interface for KV operations using the existing w-paxos consensus

func NewKVConsensusAdapter

func NewKVConsensusAdapter(server *Server, kvStore kv.Store) *KVConsensusAdapter

NewKVConsensusAdapter creates a new KV consensus adapter

func (*KVConsensusAdapter) GetKey

func (a *KVConsensusAdapter) GetKey(ctx context.Context, key string) ([]byte, error)

GetKey implements distributed GET operation with freshness checking

type ManagedNode

type ManagedNode struct {
	*Node
	// contains filtered or unexported fields
}

ManagedNode represents a node with its connection state

func (*ManagedNode) AddRTTMeasurement

func (m *ManagedNode) AddRTTMeasurement(rtt time.Duration)

func (*ManagedNode) Close

func (m *ManagedNode) Close()

func (*ManagedNode) GetAverageRTT

func (m *ManagedNode) GetAverageRTT() time.Duration

func (*ManagedNode) GetFailures

func (m *ManagedNode) GetFailures() int64

func (*ManagedNode) GetStatus

func (m *ManagedNode) GetStatus() NodeStatus

func (*ManagedNode) UpdateStatus

func (m *ManagedNode) UpdateStatus(status NodeStatus)

type MigrationRepository

type MigrationRepository interface {
	// GetUncommittedMigrations returns all uncommitted migrations -- from when this node was part of a previous quorum -- for a given table.
	GetUncommittedMigrations(table *Table) ([]*Migration, error)
	// AddMigration adds a migration to the migration table.
	AddMigration(migration *Migration) error
	// GetMigrationVersion returns all migrations for a given version.
	GetMigrationVersion(version *MigrationVersion) ([]*Migration, error)
	// CommitAllMigrations commits all migrations for a given table.
	CommitAllMigrations(table string) error
	// CommitMigrationExact commits a migration for a given version.
	CommitMigrationExact(version *MigrationVersion) error
	// AddGossipMigration adds a migration to the migration table as a gossiped migration.
	AddGossipMigration(migration *Migration) error
	// GetNextVersion returns the next version for a given table.
	GetNextVersion(table string) (int64, error)
}

MigrationRepository is an interface that allows getting and maintaining migrations, which are uniquely identified by a table, table version, migration version, and sender.

func NewMigrationRepositoryKV

func NewMigrationRepositoryKV(ctx context.Context, store kv.Store) MigrationRepository

NewMigrationRepositoryKV creates a new KV-based migration repository

type MigrationRepositoryKV

type MigrationRepositoryKV struct {
	// contains filtered or unexported fields
}

MigrationRepositoryKV implements MigrationRepository using key-value operations

func (*MigrationRepositoryKV) AddGossipMigration

func (m *MigrationRepositoryKV) AddGossipMigration(migration *Migration) error

func (*MigrationRepositoryKV) AddMigration

func (m *MigrationRepositoryKV) AddMigration(migration *Migration) error

func (*MigrationRepositoryKV) CommitAllMigrations

func (m *MigrationRepositoryKV) CommitAllMigrations(table string) error

func (*MigrationRepositoryKV) CommitMigrationExact

func (m *MigrationRepositoryKV) CommitMigrationExact(version *MigrationVersion) error

func (*MigrationRepositoryKV) DeleteMigration

func (m *MigrationRepositoryKV) DeleteMigration(version *MigrationVersion) error

DeleteMigration removes a migration (useful for cleanup)

func (*MigrationRepositoryKV) GetMigrationVersion

func (m *MigrationRepositoryKV) GetMigrationVersion(version *MigrationVersion) ([]*Migration, error)

func (*MigrationRepositoryKV) GetMigrationsByTable

func (m *MigrationRepositoryKV) GetMigrationsByTable(tableName string) ([]*Migration, error)

GetMigrationsByTable provides efficient querying of migrations by table

func (*MigrationRepositoryKV) GetNextVersion

func (m *MigrationRepositoryKV) GetNextVersion(table string) (int64, error)

func (*MigrationRepositoryKV) GetUncommittedMigrations

func (m *MigrationRepositoryKV) GetUncommittedMigrations(table *Table) ([]*Migration, error)

type NodeConnectionManager

type NodeConnectionManager struct {
	// contains filtered or unexported fields
}

NodeConnectionManager centralizes all node connection management

func GetNodeConnectionManager

func GetNodeConnectionManager(ctx context.Context) *NodeConnectionManager

GetNodeConnectionManager returns the singleton connection manager

func (*NodeConnectionManager) AddNode

func (ncm *NodeConnectionManager) AddNode(ctx context.Context, node *Node) error

AddNode registers a new node and attempts connection

func (*NodeConnectionManager) ExecuteOnNode

func (ncm *NodeConnectionManager) ExecuteOnNode(nodeID int64, operation func(ConsensusClient) error) error

ExecuteOnNode executes a consensus operation on a specific node

func (*NodeConnectionManager) GetActiveNodesByRegion

func (ncm *NodeConnectionManager) GetActiveNodesByRegion(region string) []*ManagedNode

GetActiveNodesByRegion returns currently reachable nodes in a region

func (*NodeConnectionManager) GetAllActiveNodes

func (ncm *NodeConnectionManager) GetAllActiveNodes() map[string][]*ManagedNode

GetAllActiveNodes returns all currently reachable nodes

func (*NodeConnectionManager) Shutdown

func (ncm *NodeConnectionManager) Shutdown()

Shutdown gracefully closes all connections and stops background processes

type NodeRepository

type NodeRepository interface {
	GetNodeById(id int64) (*Node, error)
	GetNodeByAddress(address string, port uint) (*Node, error)
	GetNodesByRegion(region string) ([]*Node, error)
	GetRegions() ([]*Region, error)
	Iterate(fn func(*Node) error) error
	TotalCount() (int64, error)
	GetRandomNodes(num int64, excluding ...int64) ([]*Node, error)
}

func NewNodeRepositoryKV

func NewNodeRepositoryKV(ctx context.Context, store kv.Store) NodeRepository

NewNodeRepositoryKV creates a new KV-based node repository

type NodeRepositoryKV

type NodeRepositoryKV struct {
	// contains filtered or unexported fields
}

NodeRepositoryKV implements NodeRepository using key-value operations

func (*NodeRepositoryKV) AddNode

func (n *NodeRepositoryKV) AddNode(node *Node) error

AddNode adds a new node to the repository

func (*NodeRepositoryKV) DeleteNode

func (n *NodeRepositoryKV) DeleteNode(nodeID int64) error

DeleteNode removes a node from the repository

func (*NodeRepositoryKV) GetNodeByAddress

func (n *NodeRepositoryKV) GetNodeByAddress(address string, port uint) (*Node, error)

func (*NodeRepositoryKV) GetNodeById

func (n *NodeRepositoryKV) GetNodeById(id int64) (*Node, error)

func (*NodeRepositoryKV) GetNodesByRegion

func (n *NodeRepositoryKV) GetNodesByRegion(region string) ([]*Node, error)

func (*NodeRepositoryKV) GetRandomNodes

func (n *NodeRepositoryKV) GetRandomNodes(num int64, excluding ...int64) ([]*Node, error)

func (*NodeRepositoryKV) GetRegions

func (n *NodeRepositoryKV) GetRegions() ([]*Region, error)

func (*NodeRepositoryKV) Iterate

func (n *NodeRepositoryKV) Iterate(fn func(*Node) error) error

func (*NodeRepositoryKV) TotalCount

func (n *NodeRepositoryKV) TotalCount() (int64, error)

func (*NodeRepositoryKV) UpdateNode

func (n *NodeRepositoryKV) UpdateNode(node *Node) error

UpdateNode updates an existing node

type NodeStatus

type NodeStatus int
const (
	NodeStatusUnknown NodeStatus = iota
	NodeStatusConnecting
	NodeStatusActive
	NodeStatusFailed
	NodeStatusRemoved
)

type NodeStorageModel

type NodeStorageModel struct {
	ID        int64     `json:"id"`
	Address   string    `json:"address"`
	Port      int64     `json:"port"`
	Region    string    `json:"region"`
	Active    bool      `json:"active"`
	RTT       int64     `json:"rtt_nanoseconds"`
	CreatedAt time.Time `json:"created_at"`
}

NodeStorageModel represents node data in KV storage

type NodeStorageModelKV

type NodeStorageModelKV struct {
	ID        int64     `json:"id"`
	Address   string    `json:"address"`
	Port      int64     `json:"port"`
	Region    string    `json:"region"`
	Active    bool      `json:"active"`
	RTT       int64     `json:"rtt_nanoseconds"`
	CreatedAt time.Time `json:"created_at"`
	UpdatedAt time.Time `json:"updated_at"`
}

NodeStorageModelKV represents how node data is stored in KV format

type Quorum

type Quorum interface {
	ConsensusClient
	CurrentNodeInReplicationQuorum() bool
	CurrentNodeInMigrationQuorum() bool
}

type QuorumManager

type QuorumManager interface {
	GetQuorum(ctx context.Context, table string) (Quorum, error)
	AddNode(ctx context.Context, node *Node) error
	RemoveNode(nodeID int64) error
	Send(node *Node, do func(quorumNode *QuorumNode) (interface{}, error)) (interface{}, error)
}

func GetDefaultQuorumManager

func GetDefaultQuorumManager(ctx context.Context) QuorumManager

type QuorumNode

type QuorumNode struct {
	*Node
	// contains filtered or unexported fields
}

func (*QuorumNode) AcceptMigration

func (q *QuorumNode) AcceptMigration(ctx context.Context, in *WriteMigrationRequest, opts ...grpc.CallOption) (*emptypb.Empty, error)

func (*QuorumNode) Close

func (q *QuorumNode) Close()

func (*QuorumNode) Gossip

func (q *QuorumNode) Gossip(ctx context.Context, in *GossipMigration, opts ...grpc.CallOption) (*emptypb.Empty, error)

func (*QuorumNode) JoinCluster

func (q *QuorumNode) JoinCluster(ctx context.Context, in *Node, opts ...grpc.CallOption) (*JoinClusterResponse, error)

func (*QuorumNode) Ping

func (q *QuorumNode) Ping(ctx context.Context, in *PingRequest, opts ...grpc.CallOption) (*PingResponse, error)

func (*QuorumNode) ReadKey

func (q *QuorumNode) ReadKey(ctx context.Context, in *ReadKeyRequest, opts ...grpc.CallOption) (*ReadKeyResponse, error)

func (*QuorumNode) StealTableOwnership

func (q *QuorumNode) StealTableOwnership(ctx context.Context, in *StealTableOwnershipRequest, opts ...grpc.CallOption) (*StealTableOwnershipResponse, error)

func (*QuorumNode) WriteKey

func (q *QuorumNode) WriteKey(ctx context.Context, in *WriteKeyRequest, opts ...grpc.CallOption) (*WriteKeyResponse, error)

func (*QuorumNode) WriteMigration

func (q *QuorumNode) WriteMigration(ctx context.Context, in *WriteMigrationRequest, opts ...grpc.CallOption) (*WriteMigrationResponse, error)

type RegionName

type RegionName string

type Server

type Server struct {
	UnimplementedConsensusServer
}

func (*Server) AcceptMigration

func (s *Server) AcceptMigration(ctx context.Context, req *WriteMigrationRequest) (*emptypb.Empty, error)

func (*Server) Gossip

func (s *Server) Gossip(ctx context.Context, req *GossipMigration) (*emptypb.Empty, error)

Gossip is a method that randomly disseminates information to other nodes in the cluster. A leader disseminates this to every node in the cluster after a commit.

func (*Server) JoinCluster

func (s *Server) JoinCluster(ctx context.Context, req *Node) (*JoinClusterResponse, error)

JoinCluster adds a node to the cluster on behalf of the node.

func (*Server) Ping

func (s *Server) Ping(ctx context.Context, req *PingRequest) (*PingResponse, error)

Ping implements a simple health check endpoint

func (*Server) ReadKey

func (s *Server) ReadKey(ctx context.Context, req *ReadKeyRequest) (*ReadKeyResponse, error)

func (*Server) StealTableOwnership

func (s *Server) StealTableOwnership(ctx context.Context, req *StealTableOwnershipRequest) (*StealTableOwnershipResponse, error)

func (*Server) WriteKey

func (s *Server) WriteKey(ctx context.Context, req *WriteKeyRequest) (*WriteKeyResponse, error)

func (*Server) WriteMigration

func (s *Server) WriteMigration(ctx context.Context, req *WriteMigrationRequest) (*WriteMigrationResponse, error)

type TableRepository

type TableRepository interface {
	// GetTable returns a table by name.
	GetTable(name string) (*Table, error)
	// UpdateTable updates a table.
	UpdateTable(*Table) error
	// InsertTable inserts a table.
	InsertTable(*Table) error
	// GetGroup returns a group by name.
	GetGroup(string) (*TableGroup, error)
	// UpdateGroup updates a group.
	UpdateGroup(*TableGroup) error
	// InsertGroup inserts a group.
	InsertGroup(*TableGroup) error
	// GetShard returns a shard of a table, given the principal.
	GetShard(*Table, []*Principal) (*Shard, error)
	// UpdateShard updates a shard metadata.
	UpdateShard(*Shard) error
	// InsertShard inserts a shard metadata.
	// Ensure principals are set and the shard meta-name will be updated before inserting.
	InsertShard(*Shard) error
}

func NewTableRepositoryKV

func NewTableRepositoryKV(ctx context.Context, store kv.Store) TableRepository

NewTableRepositoryKV creates a new KV-based table repository

type TableRepositoryKV

type TableRepositoryKV struct {
	// contains filtered or unexported fields
}

TableRepositoryKV implements TableRepository using key-value operations

func (*TableRepositoryKV) GetGroup

func (r *TableRepositoryKV) GetGroup(name string) (*TableGroup, error)

func (*TableRepositoryKV) GetShard

func (r *TableRepositoryKV) GetShard(shard *Table, principals []*Principal) (*Shard, error)

func (*TableRepositoryKV) GetTable

func (r *TableRepositoryKV) GetTable(name string) (*Table, error)

func (*TableRepositoryKV) GetTablesByReplicationLevel

func (r *TableRepositoryKV) GetTablesByReplicationLevel(level ReplicationLevel) ([]*Table, error)

GetTablesByReplicationLevel provides efficient querying by replication level

func (*TableRepositoryKV) GetTablesOwnedByNode

func (r *TableRepositoryKV) GetTablesOwnedByNode(nodeID int64) ([]*Table, error)

GetTablesOwnedByNode provides efficient querying by owner node

func (*TableRepositoryKV) InsertGroup

func (r *TableRepositoryKV) InsertGroup(group *TableGroup) error

func (*TableRepositoryKV) InsertShard

func (r *TableRepositoryKV) InsertShard(shard *Shard) error

func (*TableRepositoryKV) InsertTable

func (r *TableRepositoryKV) InsertTable(table *Table) error

func (*TableRepositoryKV) UpdateGroup

func (r *TableRepositoryKV) UpdateGroup(group *TableGroup) error

func (*TableRepositoryKV) UpdateShard

func (r *TableRepositoryKV) UpdateShard(shard *Shard) error

func (*TableRepositoryKV) UpdateTable

func (r *TableRepositoryKV) UpdateTable(table *Table) error

type TableStorageModel

type TableStorageModel struct {
	Name              string            `json:"name"`
	Version           int64             `json:"version"`
	ReplicationLevel  string            `json:"replication_level"`
	AllowedRegions    []string          `json:"allowed_regions"`
	RestrictedRegions []string          `json:"restricted_regions"`
	OwnerNodeID       *int64            `json:"owner_node_id"`
	CreatedAt         time.Time         `json:"created_at"`
	GroupID           string            `json:"group_id,omitempty"`
	TableType         string            `json:"table_type"`
	ShardPrincipals   []string          `json:"shard_principals"`
	Owner             *NodeStorageModel `json:"owner,omitempty"`
}

TableStorageModel represents how table data is stored in KV format

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL