cluster

package
v1.38.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 5, 2026 License: BSD-3-Clause Imports: 55 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Config added in v1.25.2

type Config struct {
	// WorkDir is the directory RAFT will use to store config & snapshot
	WorkDir string
	// NodeID is this node id
	NodeID string
	// Host is this node host name
	Host string
	// BindAddr is the address to bind to
	BindAddr string
	// RaftPort is used by internal RAFT communication
	RaftPort int
	// RPCPort is used by weaviate internal gRPC communication
	RPCPort int
	// RaftRPCMessageMaxSize is the maximum message sized allowed on the internal RPC communication
	// TODO: Remove Raft prefix to avoid confusion between RAFT and RPC.
	RaftRPCMessageMaxSize int

	// NodeNameToPortMap maps server names to port numbers
	NodeNameToPortMap map[string]int

	// HeartbeatTimeout specifies the time in follower state without contact
	// from a leader before we attempt an election.
	HeartbeatTimeout time.Duration
	// ElectionTimeout specifies the time in candidate state without contact
	// from a leader before we attempt an election.
	ElectionTimeout time.Duration
	// LeaderLeaseTimeout specifies the time in leader state without contact
	// from a follower before we attempt an election.
	LeaderLeaseTimeout time.Duration
	// TimeoutsMultiplier is the multiplier for the timeout values for
	// raft election, heartbeat, and leader lease
	TimeoutsMultiplier int

	// SnapshotThreshold controls how many outstanding logs there must be before
	// we perform a snapshot. This is to prevent excessive snapshotting by
	// replaying a small set of logs instead. The value passed here is the initial
	// setting used. This can be tuned during operation using ReloadConfig.
	SnapshotThreshold uint64

	// SnapshotInterval controls how often we check if we should perform a
	// snapshot. We randomly stagger between this value and 2x this value to avoid
	// the entire cluster from performing a snapshot at once. The value passed
	// here is the initial setting used. This can be tuned during operation using
	// ReloadConfig.
	SnapshotInterval time.Duration

	// TrailingLogs controls how many logs we leave after a snapshot. This is used
	// so that we can quickly replay logs on a follower instead of being forced to
	// send an entire snapshot. The value passed here is the initial setting used.
	// This can be tuned during operation using ReloadConfig.
	TrailingLogs uint64

	// BootstrapTimeout is the time a node will notify other node that it is ready to bootstrap a cluster if it can't
	// find a an existing cluster to join
	BootstrapTimeout time.Duration
	// BootstrapExpect is the number of nodes this cluster expect to receive a notify from to start bootstrapping a
	// cluster
	BootstrapExpect int

	// ConsistencyWaitTimeout is the duration we will wait for a schema version to land on that node
	ConsistencyWaitTimeout time.Duration
	// NodeSelector is the memberlist interface to RAFT
	NodeSelector cluster.NodeSelector
	Logger       *logrus.Logger
	Voter        bool

	// MetadataOnlyVoters configures the voters to store metadata exclusively, without storing any other data
	MetadataOnlyVoters bool

	// DB is the interface to the weaviate database. It is necessary so that schema changes are reflected to the DB
	DB schema.Indexer
	// Parser parses class field after deserialization
	Parser schema.Parser
	// LoadLegacySchema is responsible for loading old schema from boltDB
	LoadLegacySchema schema.LoadLegacySchema
	// IsLocalHost only required when running Weaviate from the console in localhost
	IsLocalHost bool

	// SentryEnabled configures the sentry integration to add internal middlewares to rpc client/server to set spans &
	// capture traces
	SentryEnabled bool

	// EnableOneNodeRecovery enables the actually one node recovery logic to avoid it running all the time when
	// unnecessary
	EnableOneNodeRecovery bool
	// ForceOneNodeRecovery will force the single node recovery routine to run. This is useful if the cluster has
	// committed wrong peer configuration entry that makes it unable to obtain a quorum to start.
	// WARNING: This should be run on *actual* one node cluster only.
	ForceOneNodeRecovery bool

	// 	AuthzController to manage RBAC commands and apply it to casbin
	AuthzController authorization.Controller
	AuthNConfig     config.Authentication
	RBAC            *rbac.Manager

	DynamicUserController *apikey.DBUser

	NamespacesController *usecasesNamespaces.Controller
	NamespacesEnabled    bool

	// ReplicaCopier copies shard replicas between nodes
	ReplicaCopier replicationTypes.ReplicaCopier

	// ReplicationEngineMaxWorkers is the maximum number of workers for the replication engine
	ReplicationEngineMaxWorkers int

	// DistributedTasks is the configuration for the distributed task manager.
	DistributedTasks config.DistributedTasksConfig

	// DistributedTaskCollectionExtractors are registered on the
	// distributed-task Manager at FSM construction time, BEFORE RAFT
	// replay runs, so the DELETE_CLASS cascade fires on catchup-replay
	// (schemaOnly) apply too. Late post-construction registration would
	// miss tasks resurrected by replay. See [distributedtask.CollectionExtractor]
	// and weaviate/0-weaviate-issues#231.
	DistributedTaskCollectionExtractors map[string]distributedtask.CollectionExtractor

	ReplicaMovementEnabled bool

	// DrainSleep is the time the node will wait for the cluster to process any ongoing
	// operations before shutting down.
	DrainSleep time.Duration
}

type Raft added in v1.25.2

type Raft struct {
	// contains filtered or unexported fields
}

Raft abstracts away the Raft store, providing clients with an interface that encompasses all query & write operations. It ensures that these operations are executed on the current leader, regardless of the specific leader in the cluster. If current node is the leader, then changes will be applied on the local node and bypass any networking requests.

func NewRaft added in v1.25.2

func NewRaft(selector cluster.NodeSelector, store *Store, client client) *Raft

func (*Raft) ActivateUser added in v1.30.0

func (s *Raft) ActivateUser(ctx context.Context, userId string) error

func (*Raft) AddClass added in v1.25.2

func (s *Raft) AddClass(ctx context.Context, cls *models.Class, ss *sharding.State) (uint64, error)

func (*Raft) AddDistributedTask added in v1.31.0

func (s *Raft) AddDistributedTask(ctx context.Context, namespace, taskID string, taskPayload any, unitIDs []string) error

AddDistributedTask is the no-barrier variant — task uses pre-barrier semantics (STARTED → SWAPPING directly on AllUnitsTerminal). Use AddDistributedTaskWithBarrier when the task needs cluster-wide swap coordination (semantic migrations in the reindex provider; future providers needing the same property).

func (*Raft) AddDistributedTaskWithBarrier added in v1.38.0

func (s *Raft) AddDistributedTaskWithBarrier(
	ctx context.Context, namespace, taskID string,
	taskPayload any, unitIDs []string, needsPreparationBarrier bool,
) error

AddDistributedTaskWithBarrier is the PREP-barrier-aware counterpart to AddDistributedTask. When needsPreparationBarrier=true the task uses the two-phase RAFT-coordinated swap: AllUnitsTerminal routes to PREPARING (not SWAPPING directly), and each node's PREP completion must ack before any node fires its atomic swap.

AddDistributedTask is equivalent to AddDistributedTaskWithBarrier(..., false) for callers that don't need cluster-wide swap coordination (format-only migrations, debug-originated tasks).

func (*Raft) AddDistributedTaskWithGroups added in v1.37.0

func (s *Raft) AddDistributedTaskWithGroups(
	ctx context.Context, namespace, taskID string,
	taskPayload any, unitSpecs []distributedtask.UnitSpec,
) error

AddDistributedTaskWithGroups creates a task with units that have explicit group assignments. UnitSpecs take precedence over UnitIds when both are present.

func (*Raft) AddDistributedTaskWithGroupsBarrier added in v1.38.0

func (s *Raft) AddDistributedTaskWithGroupsBarrier(
	ctx context.Context, namespace, taskID string,
	taskPayload any, unitSpecs []distributedtask.UnitSpec, needsPreparationBarrier bool,
) error

AddDistributedTaskWithGroupsBarrier is the grouped, PREP-barrier-aware counterpart to AddDistributedTaskWithGroups. See AddDistributedTaskWithBarrier for the barrier semantics; this variant takes UnitSpecs (with optional GroupID per unit) instead of bare unit IDs.

func (*Raft) AddNamespace added in v1.38.0

func (s *Raft) AddNamespace(ctx context.Context, ns cmd.Namespace) (cmd.Namespace, uint64, error)

AddNamespace proposes an AddNamespace RAFT command and returns the persisted namespace alongside the apply version. An empty ns.HomeNodes is filled from the cluster's storage candidates before propose. The apply side rejects duplicates with [namespaces.ErrAlreadyExists] and invalid names with [namespaces.ErrBadRequest]. Callers that need a follow-up local read on a non-leader node should pass the returned version to WaitForUpdate.

func (*Raft) AddProperty added in v1.25.2

func (s *Raft) AddProperty(ctx context.Context, class string, props ...*models.Property) (uint64, error)

func (*Raft) AddReplicaToShard added in v1.31.0

func (s *Raft) AddReplicaToShard(ctx context.Context, class, shard, targetNode string) (uint64, error)

func (*Raft) AddRolesForUser added in v1.28.0

func (s *Raft) AddRolesForUser(user string, roles []string) error

func (*Raft) AddTenants added in v1.25.2

func (s *Raft) AddTenants(ctx context.Context, class string, req *cmd.AddTenantsRequest) (uint64, error)

func (*Raft) ApplyReplicationScalePlan added in v1.31.20

func (s *Raft) ApplyReplicationScalePlan(ctx context.Context, scalePlan api.ReplicationScalePlan) (opsUUIDs []strfmt.UUID, err error)

func (*Raft) CancelDistributedTask added in v1.31.0

func (s *Raft) CancelDistributedTask(ctx context.Context, namespace, taskID string, taskVersion uint64) error

func (*Raft) CancelReplication added in v1.31.0

func (s *Raft) CancelReplication(ctx context.Context, uuid strfmt.UUID) error

func (*Raft) ChangeNamespaceState added in v1.38.0

func (s *Raft) ChangeNamespaceState(ctx context.Context, name string, target cmd.NamespaceState) (uint64, error)

ChangeNamespaceState proposes a ChangeNamespaceState RAFT command and returns the apply version. The apply side returns [namespaces.ErrNotFound] for unknown namespaces and [namespaces.ErrInvalidStateTransition] for forbidden transitions; same-state transitions are idempotent.

func (*Raft) CheckUserIdentifierExists added in v1.30.0

func (s *Raft) CheckUserIdentifierExists(userIdentifier string) (bool, error)

func (*Raft) CleanUpDistributedTask added in v1.31.0

func (s *Raft) CleanUpDistributedTask(ctx context.Context, namespace, taskID string, taskVersion uint64) error

func (*Raft) Close added in v1.25.2

func (s *Raft) Close(ctx context.Context) (err error)

func (*Raft) CreateAlias added in v1.32.0

func (s *Raft) CreateAlias(ctx context.Context, alias string, class *models.Class) (uint64, error)

func (*Raft) CreateRolesPermissions added in v1.28.9

func (s *Raft) CreateRolesPermissions(roles map[string][]authorization.Policy) error

func (*Raft) CreateUser added in v1.30.0

func (s *Raft) CreateUser(ctx context.Context, userId, secureHash, userIdentifier, apiKeyFirstLetters, namespace string, createdAt time.Time) error

func (*Raft) CreateUserWithKey added in v1.30.10

func (s *Raft) CreateUserWithKey(ctx context.Context, userId, apiKeyFirstLetters string, weakHash [sha256.Size]byte, createdAt time.Time) error

func (*Raft) DeactivateUser added in v1.30.0

func (s *Raft) DeactivateUser(ctx context.Context, userId string, revokeKey bool) error

func (*Raft) DeleteAlias added in v1.32.0

func (s *Raft) DeleteAlias(ctx context.Context, alias string) (uint64, error)

func (*Raft) DeleteAllReplications added in v1.31.0

func (s *Raft) DeleteAllReplications(ctx context.Context) error

func (*Raft) DeleteClass added in v1.25.2

func (s *Raft) DeleteClass(ctx context.Context, name string) (uint64, error)

func (*Raft) DeleteReplicaFromShard added in v1.31.0

func (s *Raft) DeleteReplicaFromShard(ctx context.Context, class, shard, targetNode string) (uint64, error)

func (*Raft) DeleteReplication added in v1.31.0

func (s *Raft) DeleteReplication(ctx context.Context, uuid strfmt.UUID) error

func (*Raft) DeleteReplicationsByCollection added in v1.31.0

func (s *Raft) DeleteReplicationsByCollection(ctx context.Context, collection string) error

func (*Raft) DeleteReplicationsByTenants added in v1.31.0

func (s *Raft) DeleteReplicationsByTenants(ctx context.Context, collection string, tenants []string) error

func (*Raft) DeleteRoles added in v1.28.0

func (s *Raft) DeleteRoles(names ...string) error

func (*Raft) DeleteTenants added in v1.25.2

func (s *Raft) DeleteTenants(ctx context.Context, class string, req *cmd.DeleteTenantsRequest) (uint64, error)

func (*Raft) DeleteUser added in v1.30.0

func (s *Raft) DeleteUser(ctx context.Context, userId string) error

func (*Raft) DeleteUsersInNamespace added in v1.38.0

func (s *Raft) DeleteUsersInNamespace(ctx context.Context, namespace string) error

func (*Raft) Execute added in v1.25.2

func (s *Raft) Execute(ctx context.Context, req *cmd.ApplyRequest) (uint64, error)

func (*Raft) ForceDeleteAllReplications added in v1.31.0

func (s *Raft) ForceDeleteAllReplications(ctx context.Context) error

func (*Raft) ForceDeleteReplicationByUuid added in v1.31.0

func (s *Raft) ForceDeleteReplicationByUuid(ctx context.Context, uuid strfmt.UUID) error

func (*Raft) ForceDeleteReplicationsByCollection added in v1.31.0

func (s *Raft) ForceDeleteReplicationsByCollection(ctx context.Context, collection string) error

func (*Raft) ForceDeleteReplicationsByCollectionAndShard added in v1.31.0

func (s *Raft) ForceDeleteReplicationsByCollectionAndShard(ctx context.Context, collection, shard string) error

func (*Raft) ForceDeleteReplicationsByTargetNode added in v1.31.0

func (s *Raft) ForceDeleteReplicationsByTargetNode(ctx context.Context, node string) error

func (*Raft) GetAlias added in v1.32.5

func (s *Raft) GetAlias(ctx context.Context, aliasName string) (*models.Alias, error)

func (*Raft) GetAliases added in v1.32.0

func (s *Raft) GetAliases(ctx context.Context, alias string, class *models.Class) ([]*models.Alias, error)

func (*Raft) GetAllReplicationDetails added in v1.31.0

func (s *Raft) GetAllReplicationDetails(ctx context.Context) ([]api.ReplicationDetailsResponse, error)

func (*Raft) GetNamespaces added in v1.38.0

func (s *Raft) GetNamespaces(names ...string) ([]cmd.Namespace, error)

GetNamespaces returns the namespaces with the given names. An empty names slice returns all known namespaces; missing names are silently omitted.

func (*Raft) GetReplicationDetailsByCollection added in v1.31.0

func (s *Raft) GetReplicationDetailsByCollection(ctx context.Context, collection string) ([]api.ReplicationDetailsResponse, error)

func (*Raft) GetReplicationDetailsByCollectionAndShard added in v1.31.0

func (s *Raft) GetReplicationDetailsByCollectionAndShard(ctx context.Context, collection string, shard string) ([]api.ReplicationDetailsResponse, error)

func (*Raft) GetReplicationDetailsByReplicationId added in v1.31.0

func (s *Raft) GetReplicationDetailsByReplicationId(ctx context.Context, uuid strfmt.UUID) (api.ReplicationDetailsResponse, error)

func (*Raft) GetReplicationDetailsByTargetNode added in v1.31.0

func (s *Raft) GetReplicationDetailsByTargetNode(ctx context.Context, node string) ([]api.ReplicationDetailsResponse, error)

func (*Raft) GetReplicationScalePlan added in v1.31.20

func (s *Raft) GetReplicationScalePlan(ctx context.Context, collection string, replicationFactor int) (api.ReplicationScalePlan, error)

func (*Raft) GetRoles added in v1.28.0

func (s *Raft) GetRoles(names ...string) (map[string][]authorization.Policy, error)

func (*Raft) GetRolesForUserOrGroup added in v1.32.5

func (s *Raft) GetRolesForUserOrGroup(user string, authType authentication.AuthType, isGroup bool) (map[string][]authorization.Policy, error)

func (*Raft) GetUsers added in v1.30.0

func (s *Raft) GetUsers(userIds ...string) (map[string]apikey.UserView, error)

func (*Raft) GetUsersOrGroupForRole added in v1.32.5

func (s *Raft) GetUsersOrGroupForRole(role string, authType authentication.AuthType, isGroup bool) ([]string, error)

func (*Raft) GetUsersOrGroupsWithRoles added in v1.32.5

func (s *Raft) GetUsersOrGroupsWithRoles(isGroup bool, authType authentication.AuthType) ([]string, error)

func (*Raft) HasPermission added in v1.28.0

func (s *Raft) HasPermission(roleName string, permission *authorization.Policy) (bool, error)

HasPermission returns consistent permissions check by asking the leader

func (*Raft) IsLeader added in v1.35.0

func (s *Raft) IsLeader() bool

func (*Raft) Join added in v1.25.2

func (s *Raft) Join(ctx context.Context, id, addr string, voter bool) error

func (*Raft) LeaderWithID added in v1.25.2

func (s *Raft) LeaderWithID() (string, string)

LeaderWithID is used to return the current leader address and ID of the cluster. It may return empty strings if there is no current leader or the leader is unknown.

func (*Raft) ListDistributedTasks added in v1.31.0

func (s *Raft) ListDistributedTasks(ctx context.Context) (map[string][]*distributedtask.Task, error)

func (*Raft) MarkDistributedTaskFinalized added in v1.38.0

func (s *Raft) MarkDistributedTaskFinalized(ctx context.Context, namespace, taskID string, taskVersion uint64) error

func (*Raft) NamespaceCount added in v1.38.0

func (s *Raft) NamespaceCount() int

NamespaceCount returns the number of known namespaces. Used by the startup invariant check in configure_api.go. Reads local FSM state (no RAFT round trip) — safe to call after the meta store is ready.

func (*Raft) NodeSelector added in v1.32.0

func (s *Raft) NodeSelector() cluster.NodeSelector

func (*Raft) Open added in v1.25.2

func (s *Raft) Open(ctx context.Context, db schema.Indexer) error

Open opens this store service and marked as such. It constructs a new Raft node using the provided configuration. If there is any old state, such as snapshots, logs, peers, etc., all of those will be restored

func (*Raft) Query added in v1.25.2

func (s *Raft) Query(ctx context.Context, req *cmd.QueryRequest) (*cmd.QueryResponse, error)

Query receives a QueryRequest and ensure it is executed on the leader and returns the related QueryResponse If any error happens it returns it

func (*Raft) QueryClassVersions added in v1.27.11

func (s *Raft) QueryClassVersions(classes ...string) (map[string]uint64, error)

QueryClassVersions returns the current version of the requested classes.

func (*Raft) QueryCollectionsCount added in v1.30.0

func (s *Raft) QueryCollectionsCount(namespace string) (int, error)

QueryCollectionsCount issues a leader-directed count query. An empty namespace returns the cluster-global total; a non-empty namespace returns the count restricted to classes in that namespace.

func (*Raft) QueryReadOnlyClasses added in v1.25.2

func (s *Raft) QueryReadOnlyClasses(classes ...string) (map[string]versioned.Class, error)

QueryReadOnlyClass will verify that class is non empty and then build a Query that will be directed to the leader to ensure we will read the class with strong consistency

func (*Raft) QuerySchema added in v1.25.2

func (s *Raft) QuerySchema() (models.Schema, error)

QuerySchema build a Query to read the schema that will be directed to the leader to ensure we will read the class with strong consistency

func (*Raft) QueryShardOwner added in v1.25.2

func (s *Raft) QueryShardOwner(class, shard string) (string, uint64, error)

QueryShardOwner build a Query to read the tenants of a given class that will be directed to the leader to ensure we will read the tenant with strong consistency and return the shard owner node

func (*Raft) QueryShardingState added in v1.25.2

func (s *Raft) QueryShardingState(class string) (*sharding.State, uint64, error)

QueryShardingState build a Query to read the sharding state of a given class. The request will be directed to the leader to ensure we will read the shard state with strong consistency and return the state and it's version.

func (*Raft) QueryShardingStateByCollection added in v1.31.0

func (s *Raft) QueryShardingStateByCollection(ctx context.Context, collection string) (api.ShardingState, error)

func (*Raft) QueryShardingStateByCollectionAndShard added in v1.31.0

func (s *Raft) QueryShardingStateByCollectionAndShard(ctx context.Context, collection string, shard string) (api.ShardingState, error)

func (*Raft) QueryTenants added in v1.25.2

func (s *Raft) QueryTenants(class string, tenants []string) ([]*models.Tenant, uint64, error)

QueryTenants build a Query to read the tenants of a given class that will be directed to the leader to ensure we will read the class with strong consistency

func (*Raft) QueryTenantsShards added in v1.25.2

func (s *Raft) QueryTenantsShards(class string, tenants ...string) (map[string]string, uint64, error)

QueryTenantsShards build a Query to read the tenants and their activity status of a given class. The request will be directed to the leader to ensure we will read the tenant with strong consistency and return the shard owner node

func (*Raft) Ready added in v1.25.2

func (s *Raft) Ready() bool

func (*Raft) RecordDistributedTaskPostCompletionAck added in v1.38.0

func (s *Raft) RecordDistributedTaskPostCompletionAck(
	ctx context.Context,
	namespace, taskID string,
	taskVersion uint64,
	nodeID string,
	success bool,
	errMsg string,
) error

RecordDistributedTaskPostCompletionAck commits one node's SWAP-phase callback result to the FSM (OnGroupCompleted for non-barrier tasks, OnSwapRequested for barrier tasks). The scheduler tick on each node fires this after its local SWAP body has returned so the cluster has durable evidence of which nodes' post-completion work succeeded before MarkDistributedTaskFinalized is allowed to land.

See distributedtask.PostCompletionAckRecorder.

func (*Raft) RecordDistributedTaskPreparationCompleteAck added in v1.38.0

func (s *Raft) RecordDistributedTaskPreparationCompleteAck(
	ctx context.Context,
	namespace, taskID string,
	taskVersion uint64,
	nodeID string,
	success bool,
	errMsg string,
) error

RecordDistributedTaskPreparationCompleteAck commits one node's OnGroupCompleted PREP-phase result to the FSM. The scheduler tick on each node fires this after its local PREP body has returned, so the cluster has durable evidence of which nodes' prep work succeeded before the PREPARING → SWAPPING transition is committed. This is the load-bearing barrier.

See distributedtask.PostCompletionAckRecorder.

func (*Raft) RecordDistributedTaskUnitCompletion added in v1.37.0

func (s *Raft) RecordDistributedTaskUnitCompletion(ctx context.Context, namespace, taskID string, version uint64, nodeID, unitID string) error

func (*Raft) RecordDistributedTaskUnitFailure added in v1.37.0

func (s *Raft) RecordDistributedTaskUnitFailure(ctx context.Context, namespace, taskID string, version uint64, nodeID, unitID, errMsg string) error

func (*Raft) RegisterDistributedTaskCollectionExtractor added in v1.38.0

func (s *Raft) RegisterDistributedTaskCollectionExtractor(namespace string, extractor distributedtask.CollectionExtractor)

RegisterDistributedTaskCollectionExtractor opts a task namespace into the DELETE_CLASS cascade. See distributedtask.CollectionExtractor and weaviate/0-weaviate-issues#231.

func (*Raft) Remove added in v1.25.2

func (s *Raft) Remove(ctx context.Context, id string) error

func (*Raft) RemoveNamespaceEntity added in v1.38.0

func (s *Raft) RemoveNamespaceEntity(ctx context.Context, name string) (uint64, error)

RemoveNamespaceEntity proposes a RemoveNamespaceEntity RAFT command and returns the apply version. The apply side returns [namespaces.ErrNotFound] for unknown namespaces and [namespaces.ErrInvalidState] when called on an active namespace.

func (*Raft) RemovePermissions added in v1.28.0

func (s *Raft) RemovePermissions(role string, permissions []*authorization.Policy) error

func (*Raft) ReplaceAlias added in v1.32.0

func (s *Raft) ReplaceAlias(ctx context.Context, alias *models.Alias, newClass *models.Class) (uint64, error)

func (*Raft) ReplicationAddReplicaToShard added in v1.31.0

func (s *Raft) ReplicationAddReplicaToShard(ctx context.Context, class, shard, targetNode string, opId uint64) (uint64, error)

func (*Raft) ReplicationAllPeersAtLeast added in v1.38.0

func (s *Raft) ReplicationAllPeersAtLeast(opID uint64, target cmd.ShardReplicationState) (bool, error)

func (*Raft) ReplicationCancellationComplete added in v1.31.0

func (s *Raft) ReplicationCancellationComplete(ctx context.Context, id uint64) error

func (*Raft) ReplicationFsm added in v1.32.0

func (s *Raft) ReplicationFsm() *replication.ShardReplicationFSM

func (*Raft) ReplicationGetReplicaOpStatus added in v1.31.0

func (s *Raft) ReplicationGetReplicaOpStatus(ctx context.Context, id uint64) (api.ShardReplicationState, error)

func (*Raft) ReplicationRegisterError added in v1.31.0

func (s *Raft) ReplicationRegisterError(ctx context.Context, id uint64, errorToRegister string) error

func (*Raft) ReplicationRemoveReplicaOp added in v1.31.0

func (s *Raft) ReplicationRemoveReplicaOp(ctx context.Context, id uint64) error

func (*Raft) ReplicationReplicateReplica added in v1.31.0

func (s *Raft) ReplicationReplicateReplica(ctx context.Context, uuid strfmt.UUID, sourceNode string, sourceCollection string, sourceShard string, targetNode string, transferType string) error

func (*Raft) ReplicationStoreSchemaVersion added in v1.31.0

func (s *Raft) ReplicationStoreSchemaVersion(ctx context.Context, id uint64, schemaVersion uint64) error

func (*Raft) ReplicationUpdateReplicaOpStatus added in v1.31.0

func (s *Raft) ReplicationUpdateReplicaOpStatus(ctx context.Context, id uint64, state api.ShardReplicationState) error

func (*Raft) RestoreClass added in v1.25.2

func (s *Raft) RestoreClass(ctx context.Context, cls *models.Class, ss *sharding.State) (uint64, error)

func (*Raft) RevokeRolesForUser added in v1.28.0

func (s *Raft) RevokeRolesForUser(user string, roles ...string) error

func (*Raft) RotateKey added in v1.30.0

func (s *Raft) RotateKey(ctx context.Context, userId, apiKeyFirstLetters, secureHash, oldIdentifier, newIdentifier string) error

func (*Raft) SchemaReader added in v1.25.2

func (s *Raft) SchemaReader() schema.SchemaReader

func (*Raft) SetDistributedTaskConflictDetectors added in v1.38.0

func (s *Raft) SetDistributedTaskConflictDetectors(detectors map[string]distributedtask.ConflictDetector)

SetDistributedTaskConflictDetectors installs the per-namespace conflict-detection hooks on the underlying distributed task FSM Manager. Called once at startup from MakeAppState, after the providers are registered. See distributedtask.ConflictDetector for the FSM-determinism contract.

func (*Raft) SetDistributedTaskSchedulerNotifier added in v1.38.0

func (s *Raft) SetDistributedTaskSchedulerNotifier(notifier distributedtask.SchedulerNotifier)

SetDistributedTaskSchedulerNotifier installs the wake-up notifier on the underlying distributed task FSM Manager. Called once at startup from MakeAppState, after both Raft and the Scheduler exist. See distributedtask.SchedulerNotifier for the contract.

func (*Raft) SetDistributedTaskSchemaMutationDetectors added in v1.38.0

func (s *Raft) SetDistributedTaskSchemaMutationDetectors(detectors map[string]distributedtask.SchemaMutationDetector)

SetDistributedTaskSchemaMutationDetectors installs the per-namespace schema-mutation detectors consulted from the schema FSM's UpdateProperty apply path. Called once at startup from MakeAppState, after the providers are registered. See distributedtask.SchemaMutationDetector for the contract and motivating failure mode.

func (*Raft) SetInflightDrainer added in v1.38.0

func (s *Raft) SetInflightDrainer(fn func(ctx context.Context, class, shard string) error)

func (*Raft) Stats added in v1.25.2

func (s *Raft) Stats() map[string]any

func (*Raft) StorageCandidates added in v1.25.13

func (s *Raft) StorageCandidates() []string

StorageCandidates return the nodes in the raft configuration or memberlist storage nodes based on the current configuration of the cluster if it does have MetadataVoterOnly nodes.

func (*Raft) SubmitNodeReachedState added in v1.38.0

func (s *Raft) SubmitNodeReachedState(ctx context.Context, req *api.ReplicationNodeReachedStateRequest) error

func (*Raft) SyncShard added in v1.31.0

func (s *Raft) SyncShard(ctx context.Context, collection, shard, nodeId string) (uint64, error)

func (*Raft) UpdateClass added in v1.25.2

func (s *Raft) UpdateClass(ctx context.Context, cls *models.Class, _ *sharding.State) (uint64, error)

func (*Raft) UpdateDistributedTaskUnitProgress added in v1.37.0

func (s *Raft) UpdateDistributedTaskUnitProgress(ctx context.Context, namespace, taskID string, version uint64, nodeID, unitID string, progress float32) error

func (*Raft) UpdateNamespace added in v1.38.0

func (s *Raft) UpdateNamespace(ctx context.Context, ns cmd.Namespace) (uint64, error)

UpdateNamespace proposes an UpdateNamespace RAFT command and returns the apply version. The apply side returns [namespaces.ErrNotFound] when the target namespace does not exist, and [namespaces.ErrBadRequest] when HomeNodes does not contain exactly one entry. Only HomeNodes is mutable; existing live shards are not moved.

func (*Raft) UpdateProperty added in v1.36.0

func (s *Raft) UpdateProperty(ctx context.Context, class string, property *models.Property, fields ...string) (uint64, error)

UpdateProperty schedules a RAFT command to merge `property` into the named class. When `fields` is non-empty, the FSM merges ONLY the listed fields onto an existing property (see api.PropertyField* constants); the rest are kept from the existing class state, so two concurrent updaters that touch different fields cannot clobber each other. An empty `fields` preserves the legacy "replace every field" semantics — public API callers that don't pass a mask are unaffected.

This is the public entry point: callers reach it from the REST / gRPC schema handlers. The cluster-side MutationGuard at apply time blocks updates while a reindex on the same (collection, property) is STARTED or FINALIZING. Internal callers driven by the distributed-task scheduler's own completion path must use Raft.UpdatePropertyFromMigration instead, which sets the bypass flag.

func (*Raft) UpdatePropertyFromMigration added in v1.38.0

func (s *Raft) UpdatePropertyFromMigration(ctx context.Context, class string, property *models.Property, fields ...string) (uint64, error)

UpdatePropertyFromMigration schedules a RAFT command for a property schema flip emitted by the in-process distributed-task scheduler's own completion handler (see adapters/repos/db/reindex_provider.flipSemanticMigrationSchema). The resulting command carries api.UpdatePropertyRequest.FromInFlightMigration = true, which the schema FSM uses to bypass the in-flight-reindex MutationGuard for this single update.

Public REST / gRPC handlers must not call this; they go through Raft.UpdateProperty. The migration-only bypass exists because the scheduler's OnTaskCompleted fires while the task is still FINALIZING (status not yet FINISHED), so the same MutationGuard that protects the property from external mutations would otherwise reject the migration's own scheduled flip.

func (*Raft) UpdateRolesPermissions added in v1.28.9

func (s *Raft) UpdateRolesPermissions(roles map[string][]authorization.Policy) error

func (*Raft) UpdateShardStatus added in v1.25.2

func (s *Raft) UpdateShardStatus(ctx context.Context, class, shard, status string) (uint64, error)

func (*Raft) UpdateTenants added in v1.25.2

func (s *Raft) UpdateTenants(ctx context.Context, class string, req *cmd.UpdateTenantsRequest) (uint64, error)

func (*Raft) UpdateTenantsProcess added in v1.26.0

func (s *Raft) UpdateTenantsProcess(ctx context.Context, class string, req *cmd.TenantProcessRequest) (uint64, error)

func (*Raft) WaitForUpdate added in v1.31.0

func (s *Raft) WaitForUpdate(ctx context.Context, schemaVersion uint64) error

func (*Raft) WaitUntilDBRestored added in v1.25.2

func (s *Raft) WaitUntilDBRestored(ctx context.Context, period time.Duration, close chan struct{}) error

type Response added in v1.25.2

type Response struct {
	Error   error
	Version uint64
}

type SchemaNamespaceLister added in v1.38.0

type SchemaNamespaceLister struct {
	// contains filtered or unexported fields
}

SchemaNamespaceLister returns the classes and aliases whose name starts with "<namespace>:".

func NewSchemaNamespaceLister added in v1.38.0

func NewSchemaNamespaceLister(src SchemaSource) *SchemaNamespaceLister

func (*SchemaNamespaceLister) AliasesInNamespace added in v1.38.0

func (a *SchemaNamespaceLister) AliasesInNamespace(namespace string) []string

func (*SchemaNamespaceLister) ClassesInNamespace added in v1.38.0

func (a *SchemaNamespaceLister) ClassesInNamespace(namespace string) ([]string, error)

type SchemaSource added in v1.38.0

type SchemaSource interface {
	ReadSchema(reader func(models.Class, uint64)) error
	Aliases() map[string]string
}

SchemaSource is the subset of the schema reader used here.

type Service

type Service struct {
	*Raft
	// contains filtered or unexported fields
}

Service class serves as the primary entry point for the Raft layer, managing and coordinating the key functionalities of the distributed consensus protocol.

func New

func New(cfg Config, authZController authorization.Controller, snapshotter fsm.Snapshotter, svrMetrics *monitoring.GRPCServerMetrics) *Service

New returns a Service configured with cfg. The service will initialize internals gRPC api & clients to other cluster nodes. Raft store will be initialized and ready to be started. To start the service call Open().

func (*Service) Close

func (c *Service) Close(ctx context.Context) error

Close closes the raft service and frees all allocated ressources. Internal RAFT store will be closed and if leadership is assumed it will be transferred to another node. gRPC server and clients will also be closed.

func (*Service) LeaderWithID

func (c *Service) LeaderWithID() (string, string)

LeaderWithID is used to return the current leader address and ID of the cluster. It may return empty strings if there is no current leader or the leader is unknown.

func (*Service) Open

func (c *Service) Open(ctx context.Context, db schema.Indexer) error

Open internal RPC service to handle node communication, bootstrap the Raft node, and restore the database state

func (*Service) Ready

func (c *Service) Ready() bool

Ready returns or not whether the node is ready to accept requests.

func (*Service) StorageCandidates added in v1.25.13

func (c *Service) StorageCandidates() []string

type Store added in v1.25.2

type Store struct {
	// contains filtered or unexported fields
}

Store is the implementation of RAFT on this local node. It will handle the local schema and RAFT operations (startup, bootstrap, snapshot, etc...). It ensures that a raft cluster is setup with remote node on start (either recovering from old state, or bootstrap itself based on the provided configuration).

func NewFSM added in v1.25.2

func NewFSM(cfg Config, authZController authorization.Controller, snapshotter fsm.Snapshotter, reg prometheus.Registerer) Store

func (*Store) Apply added in v1.25.2

func (st *Store) Apply(l *raft.Log) any

Apply is called once a log entry is committed by a majority of the cluster. Apply should apply the log to the FSM. Apply must be deterministic and produce the same result on all peers in the cluster. The returned value is returned to the client as the ApplyFuture.Response.

func (*Store) Close added in v1.25.2

func (st *Store) Close(ctx context.Context) error

func (*Store) Execute added in v1.25.2

func (st *Store) Execute(req *api.ApplyRequest) (uint64, error)

func (*Store) FSMHasCaughtUp added in v1.31.0

func (st *Store) FSMHasCaughtUp() bool

func (*Store) ID added in v1.25.2

func (st *Store) ID() string

func (*Store) IsLeader added in v1.25.2

func (st *Store) IsLeader() bool

IsLeader returns whether this node is the leader of the cluster

func (*Store) IsVoter added in v1.25.2

func (st *Store) IsVoter() bool

func (*Store) Join added in v1.25.2

func (st *Store) Join(id, addr string, voter bool) error

Join adds the given peer to the cluster. This operation must be executed on the leader, otherwise, it will fail with ErrNotLeader. If the cluster has not been opened yet, it will return ErrNotOpen.

func (*Store) LastAppliedCommand added in v1.25.11

func (st *Store) LastAppliedCommand() (uint64, error)

func (*Store) Leader added in v1.25.2

func (st *Store) Leader() string

Leader is used to return the current leader address. It may return empty strings if there is no current leader or the leader is unknown.

func (*Store) LeaderWithID added in v1.25.2

func (st *Store) LeaderWithID() (raft.ServerAddress, raft.ServerID)

func (*Store) Notify added in v1.25.2

func (st *Store) Notify(id, addr string) (err error)

Notify signals this Store that a node is ready for bootstrapping at the specified address. Bootstrapping will be initiated once the number of known nodes reaches the expected level, which includes this node.

func (*Store) Open added in v1.25.2

func (st *Store) Open(ctx context.Context) (err error)

Open opens this store and marked as such. It constructs a new Raft node using the provided configuration. If there is any old state, such as snapshots, logs, peers, etc., all of those will be restored.

func (*Store) Persist added in v1.28.13

func (s *Store) Persist(sink raft.SnapshotSink) (err error)

Persist should dump all necessary state to the WriteCloser 'sink', and call sink.Close() when finished or call sink.Cancel() on error.

func (*Store) Query added in v1.25.2

func (st *Store) Query(req *cmd.QueryRequest) (*cmd.QueryResponse, error)

func (*Store) Ready added in v1.25.2

func (st *Store) Ready() bool

func (*Store) RegisterDistributedTaskCollectionExtractor added in v1.38.0

func (st *Store) RegisterDistributedTaskCollectionExtractor(namespace string, extractor distributedtask.CollectionExtractor)

RegisterDistributedTaskCollectionExtractor opts a task namespace into [SchemaManager.DeleteClass]'s cascade-delete of task records. weaviate/0-weaviate-issues#231.

func (*Store) Release added in v1.28.13

func (s *Store) Release()

Release is invoked when we are finished with the snapshot. Satisfy the interface for raft.FSMSnapshot

func (*Store) Remove added in v1.25.2

func (st *Store) Remove(id string) error

Remove removes this peer from the cluster

func (*Store) Restore added in v1.25.2

func (st *Store) Restore(rc io.ReadCloser) error

Restore is used to restore an FSM from a snapshot. It is not called concurrently with any other command. The FSM must discard all previous state before restoring the snapshot.

func (*Store) SchemaReader added in v1.25.2

func (st *Store) SchemaReader() schema.SchemaReader

SchemaReader returns a SchemaReader from the underlying schema manager using a wait function that will make it wait for a raft log entry to be applied in the FSM Store before authorizing the read to continue.

func (*Store) SetDB added in v1.25.2

func (st *Store) SetDB(db schema.Indexer)

func (*Store) SetDistributedTaskConflictDetectors added in v1.38.0

func (st *Store) SetDistributedTaskConflictDetectors(detectors map[string]distributedtask.ConflictDetector)

SetDistributedTaskConflictDetectors installs the per-namespace conflict-detection hooks on the distributed task FSM Manager. Called once at startup from MakeAppState. See distributedtask.ConflictDetector for the contract.

func (*Store) SetDistributedTaskSchedulerNotifier added in v1.38.0

func (st *Store) SetDistributedTaskSchedulerNotifier(notifier distributedtask.SchedulerNotifier)

SetDistributedTaskSchedulerNotifier installs the wake-up notifier on the distributed task FSM Manager so it can poke the Scheduler from RAFT-apply paths (see distributedtask.SchedulerNotifier for the rationale). Called once at startup after both Store and Scheduler exist, from MakeAppState's wiring.

func (*Store) SetDistributedTaskSchemaMutationDetectors added in v1.38.0

func (st *Store) SetDistributedTaskSchemaMutationDetectors(detectors map[string]distributedtask.SchemaMutationDetector)

SetDistributedTaskSchemaMutationDetectors installs the per-namespace detectors consulted from the schema FSM's UpdateProperty apply path via the Manager's CheckPropertyUpdate method. Called once at startup from MakeAppState, after the providers exist. See distributedtask.SchemaMutationDetector for the FSM-determinism contract and motivating failure mode.

func (*Store) Snapshot added in v1.25.2

func (st *Store) Snapshot() (raft.FSMSnapshot, error)

Snapshot returns an FSMSnapshot used to: support log compaction, to restore the FSM to a previous state, or to bring out-of-date followers up to a recent log index.

The Snapshot implementation should return quickly, because Apply can not be called while Snapshot is running. Generally this means Snapshot should only capture a pointer to the state, and any expensive IO should happen as part of FSMSnapshot.Persist.

Apply and Snapshot are always called from the same thread, but Apply will be called concurrently with FSMSnapshot.Persist. This means the FSM should be implemented to allow for concurrent updates while a snapshot is happening.

func (*Store) Stats added in v1.25.2

func (st *Store) Stats() map[string]any

Stats returns internal statistics from this store, for informational/debugging purposes only.

The statistics directly from raft are nested under the "raft" key. If the raft statistics are not yet available, then the "raft" key will not exist. See https://pkg.go.dev/github.com/hashicorp/raft#Raft.Stats for the default raft stats.

The values of "leader_address" and "leader_id" are the respective address/ID for the current leader of the cluster. They may be empty strings if there is no current leader or the leader is unknown.

The value of "ready" indicates whether this store is ready, see Store.Ready.

The value of "is_voter" indicates whether this store is a voter, see Store.IsVoter.

The value of "open" indicates whether this store is open, see Store.open.

The value of "bootstrapped" indicates whether this store has completed bootstrapping, see Store.bootstrapped.

The value of "candidates" is a map[string]string of the current candidates IDs/addresses, see Store.candidates.

The value of "last_store_log_applied_index" is the index of the last applied command found when the store was opened, see Store.lastAppliedIndexToDB.

The value of "last_applied_index" is the index of the latest update to the store, see Store.lastAppliedIndex.

The value of "db_loaded" indicates whether the DB has finished loading, see Store.dbLoaded.

Since this is for information/debugging we want to avoid enforcing unnecessary restrictions on what can go in these stats, thus we're returning map[string]any. However, any values added to this map should be able to be JSON encoded.

func (*Store) StoreConfiguration added in v1.27.26

func (st *Store) StoreConfiguration(index uint64, _ raft.Configuration)

We implemented this to keep `lastAppliedIndex` metric to correct value to also handle `LogConfiguration` type of Raft command.

func (*Store) WaitForAppliedIndex added in v1.25.2

func (st *Store) WaitForAppliedIndex(ctx context.Context, period time.Duration, version uint64) error

WaitForAppliedIndex waits until the update with the given version is propagated to this follower node

func (*Store) WaitToRestoreDB added in v1.25.2

func (st *Store) WaitToRestoreDB(ctx context.Context, period time.Duration, close chan struct{}) error

WaitToLoadDB waits for the DB to be loaded. The DB might be first loaded after RAFT is in a healthy state, which is when the leader has been elected and there is consensus on the log.

Directories

Path Synopsis
Package distributedtask coordinates long-running operations (e.g.
Package distributedtask coordinates long-running operations (e.g.
Package namespaces is the RAFT FSM adapter for namespace control-plane state.
Package namespaces is the RAFT FSM adapter for namespace control-plane state.
proto
api
changelog
Package changelog provides an append-only, per-op change capture log used by replica movement to deterministically replay writes that land on the source after the file snapshot is taken.
Package changelog provides an append-only, per-op change capture log used by replica movement to deterministically replay writes that land on the source after the file snapshot is taken.
copier/internal/changelogdrain
Package changelogdrain pulls ChangeLogStreamEntry messages from a gRPC server-streaming client, batches them, and flushes via a caller-supplied apply func.
Package changelogdrain pulls ChangeLogStreamEntry messages from a gRPC server-streaming client, batches them, and flushes via a caller-supplied apply func.
Package router provides an abstraction for determining the optimal routing plans for reads and writes within a Weaviate cluster.
Package router provides an abstraction for determining the optimal routing plans for reads and writes within a Weaviate cluster.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL