cluster

package
v1.32.21 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 3, 2025 License: BSD-3-Clause Imports: 26 Imported by: 0

Documentation

Index

Constants

View Source
const (
	DefaultRequestQueueSize                   = 2000
	DefaultRequestQueueFullHttpStatus         = http.StatusTooManyRequests
	DefaultRequestQueueShutdownTimeoutSeconds = 90
)

Variables

View Source
var (
	ErrConcurrentTransaction = errors.New("concurrent transaction")
	ErrInvalidTransaction    = errors.New("invalid transaction")
	ErrExpiredTransaction    = errors.New("transaction TTL expired")
	ErrNotReady              = errors.New("server is not ready: either starting up or shutting down")
)

Functions

This section is empty.

Types

type AuthConfig added in v1.21.2

type AuthConfig struct {
	BasicAuth BasicAuth `json:"basic" yaml:"basic"`
}

type BasicAuth added in v1.21.2

type BasicAuth struct {
	Username string `json:"username" yaml:"username"`
	Password string `json:"password" yaml:"password"`
}

func (BasicAuth) Enabled added in v1.21.2

func (ba BasicAuth) Enabled() bool

type Client

type Client interface {
	OpenTransaction(ctx context.Context, host string, tx *Transaction) error
	AbortTransaction(ctx context.Context, host string, tx *Transaction) error
	CommitTransaction(ctx context.Context, host string, tx *Transaction) error
}

type CommitFn

type CommitFn func(ctx context.Context, tx *Transaction) error

type Config

type Config struct {
	Hostname                string     `json:"hostname" yaml:"hostname"`
	GossipBindPort          int        `json:"gossipBindPort" yaml:"gossipBindPort"`
	DataBindPort            int        `json:"dataBindPort" yaml:"dataBindPort"`
	Join                    string     `json:"join" yaml:"join"`
	IgnoreStartupSchemaSync bool       `json:"ignoreStartupSchemaSync" yaml:"ignoreStartupSchemaSync"`
	SkipSchemaSyncRepair    bool       `json:"skipSchemaSyncRepair" yaml:"skipSchemaSyncRepair"`
	AuthConfig              AuthConfig `json:"auth" yaml:"auth"`
	AdvertiseAddr           string     `json:"advertiseAddr" yaml:"advertiseAddr"`
	BindAddr                string     `json:"bindAddr" yaml:"bindAddr"`
	AdvertisePort           int        `json:"advertisePort" yaml:"advertisePort"`
	// MemberlistFastFailureDetection mostly for testing purpose, it will make memberlist sensitive and detect
	// failures (down nodes) faster.
	MemberlistFastFailureDetection bool `json:"memberlistFastFailureDetection" yaml:"memberlistFastFailureDetection"`
	// LocalHost flag enables running a multi-node setup with the same localhost and different ports
	Localhost bool `json:"localhost" yaml:"localhost"`
	// MaintenanceNodes is experimental. You should not use this directly, but should use the
	// public methods on the State struct. This is a list of nodes (by Hostname) that are in
	// maintenance mode (eg return a 418 for all data requests). We use a list here instead of a
	// bool because it allows us to set the same config/env vars on all nodes to put a subset of
	// them in maintenance mode. In addition, we may want to have the cluster nodes not in
	// maintenance mode be aware of which nodes are in maintenance mode in the future.
	MaintenanceNodes []string `json:"maintenanceNodes" yaml:"maintenanceNodes"`
	// RequestQueueConfig is used to configure the request queue buffer for the replicated indices
	RequestQueueConfig RequestQueueConfig `json:"requestQueueConfig" yaml:"requestQueueConfig"`
}

type ConsensusFn

type ConsensusFn func(ctx context.Context,
	in []*Transaction) (*Transaction, error)

The Broadcaster is the link between the the current node and all other nodes during a tx operation. This makes it a natural place to inject a consensus function for read transactions. How consensus is reached is completely opaque to the broadcaster and can be controlled through custom business logic.

type DiskUsage added in v1.18.3

type DiskUsage struct {
	// Total disk space
	Total uint64
	// Total available space
	Available uint64
}

DiskUsage contains total and available space in B

type HostnameSource

type HostnameSource interface {
	AllNames() []string
}

type IdealClusterState added in v1.17.4

type IdealClusterState struct {
	// contains filtered or unexported fields
}

func NewIdealClusterState added in v1.17.4

func NewIdealClusterState(s MemberLister, logger logrus.FieldLogger) *IdealClusterState

func (*IdealClusterState) Members added in v1.17.4

func (ics *IdealClusterState) Members() []string

func (*IdealClusterState) Validate added in v1.17.4

func (ics *IdealClusterState) Validate() error

Validate returns an error if the actual state does not match the assumed ideal state, e.g. because a node has died, or left unexpectedly.

type MemberLister

type MemberLister interface {
	AllNames() []string
	Hostnames() []string
}

type MockNodeSelector added in v1.31.1

type MockNodeSelector struct {
	mock.Mock
}

MockNodeSelector is an autogenerated mock type for the NodeSelector type

func NewMockNodeSelector added in v1.31.1

func NewMockNodeSelector(t interface {
	mock.TestingT
	Cleanup(func())
}) *MockNodeSelector

NewMockNodeSelector creates a new instance of MockNodeSelector. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. The first argument is typically a *testing.T value.

func (*MockNodeSelector) AllHostnames added in v1.31.1

func (_m *MockNodeSelector) AllHostnames() []string

AllHostnames provides a mock function with no fields

func (*MockNodeSelector) AllOtherClusterMembers added in v1.32.11

func (_m *MockNodeSelector) AllOtherClusterMembers(port int) map[string]string

AllOtherClusterMembers provides a mock function with given fields: port

func (*MockNodeSelector) EXPECT added in v1.31.1

func (*MockNodeSelector) Leave added in v1.32.11

func (_m *MockNodeSelector) Leave() error

Leave provides a mock function with no fields

func (*MockNodeSelector) LocalName added in v1.31.1

func (_m *MockNodeSelector) LocalName() string

LocalName provides a mock function with no fields

func (*MockNodeSelector) NodeAddress added in v1.31.1

func (_m *MockNodeSelector) NodeAddress(id string) string

NodeAddress provides a mock function with given fields: id

func (*MockNodeSelector) NodeGRPCPort added in v1.32.0

func (_m *MockNodeSelector) NodeGRPCPort(id string) (int, error)

NodeGRPCPort provides a mock function with given fields: id

func (*MockNodeSelector) NodeHostname added in v1.31.1

func (_m *MockNodeSelector) NodeHostname(name string) (string, bool)

NodeHostname provides a mock function with given fields: name

func (*MockNodeSelector) NonStorageNodes added in v1.31.1

func (_m *MockNodeSelector) NonStorageNodes() []string

NonStorageNodes provides a mock function with no fields

func (*MockNodeSelector) Shutdown added in v1.32.11

func (_m *MockNodeSelector) Shutdown() error

Shutdown provides a mock function with no fields

func (*MockNodeSelector) SortCandidates added in v1.31.1

func (_m *MockNodeSelector) SortCandidates(nodes []string) []string

SortCandidates provides a mock function with given fields: nodes

func (*MockNodeSelector) StorageCandidates added in v1.31.1

func (_m *MockNodeSelector) StorageCandidates() []string

StorageCandidates provides a mock function with no fields

type MockNodeSelector_AllHostnames_Call added in v1.31.1

type MockNodeSelector_AllHostnames_Call struct {
	*mock.Call
}

MockNodeSelector_AllHostnames_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'AllHostnames'

func (*MockNodeSelector_AllHostnames_Call) Return added in v1.31.1

func (*MockNodeSelector_AllHostnames_Call) Run added in v1.31.1

func (*MockNodeSelector_AllHostnames_Call) RunAndReturn added in v1.31.1

type MockNodeSelector_AllOtherClusterMembers_Call added in v1.32.11

type MockNodeSelector_AllOtherClusterMembers_Call struct {
	*mock.Call
}

MockNodeSelector_AllOtherClusterMembers_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'AllOtherClusterMembers'

func (*MockNodeSelector_AllOtherClusterMembers_Call) Return added in v1.32.11

func (*MockNodeSelector_AllOtherClusterMembers_Call) Run added in v1.32.11

func (*MockNodeSelector_AllOtherClusterMembers_Call) RunAndReturn added in v1.32.11

type MockNodeSelector_Expecter added in v1.31.1

type MockNodeSelector_Expecter struct {
	// contains filtered or unexported fields
}

func (*MockNodeSelector_Expecter) AllHostnames added in v1.31.1

AllHostnames is a helper method to define mock.On call

func (*MockNodeSelector_Expecter) AllOtherClusterMembers added in v1.32.11

func (_e *MockNodeSelector_Expecter) AllOtherClusterMembers(port interface{}) *MockNodeSelector_AllOtherClusterMembers_Call

AllOtherClusterMembers is a helper method to define mock.On call

  • port int

func (*MockNodeSelector_Expecter) Leave added in v1.32.11

Leave is a helper method to define mock.On call

func (*MockNodeSelector_Expecter) LocalName added in v1.31.1

LocalName is a helper method to define mock.On call

func (*MockNodeSelector_Expecter) NodeAddress added in v1.31.1

func (_e *MockNodeSelector_Expecter) NodeAddress(id interface{}) *MockNodeSelector_NodeAddress_Call

NodeAddress is a helper method to define mock.On call

  • id string

func (*MockNodeSelector_Expecter) NodeGRPCPort added in v1.32.0

func (_e *MockNodeSelector_Expecter) NodeGRPCPort(id interface{}) *MockNodeSelector_NodeGRPCPort_Call

NodeGRPCPort is a helper method to define mock.On call

  • id string

func (*MockNodeSelector_Expecter) NodeHostname added in v1.31.1

func (_e *MockNodeSelector_Expecter) NodeHostname(name interface{}) *MockNodeSelector_NodeHostname_Call

NodeHostname is a helper method to define mock.On call

  • name string

func (*MockNodeSelector_Expecter) NonStorageNodes added in v1.31.1

NonStorageNodes is a helper method to define mock.On call

func (*MockNodeSelector_Expecter) Shutdown added in v1.32.11

Shutdown is a helper method to define mock.On call

func (*MockNodeSelector_Expecter) SortCandidates added in v1.31.1

func (_e *MockNodeSelector_Expecter) SortCandidates(nodes interface{}) *MockNodeSelector_SortCandidates_Call

SortCandidates is a helper method to define mock.On call

  • nodes []string

func (*MockNodeSelector_Expecter) StorageCandidates added in v1.31.1

StorageCandidates is a helper method to define mock.On call

type MockNodeSelector_Leave_Call added in v1.32.11

type MockNodeSelector_Leave_Call struct {
	*mock.Call
}

MockNodeSelector_Leave_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Leave'

func (*MockNodeSelector_Leave_Call) Return added in v1.32.11

func (*MockNodeSelector_Leave_Call) Run added in v1.32.11

func (*MockNodeSelector_Leave_Call) RunAndReturn added in v1.32.11

func (_c *MockNodeSelector_Leave_Call) RunAndReturn(run func() error) *MockNodeSelector_Leave_Call

type MockNodeSelector_LocalName_Call added in v1.31.1

type MockNodeSelector_LocalName_Call struct {
	*mock.Call
}

MockNodeSelector_LocalName_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'LocalName'

func (*MockNodeSelector_LocalName_Call) Return added in v1.31.1

func (*MockNodeSelector_LocalName_Call) Run added in v1.31.1

func (*MockNodeSelector_LocalName_Call) RunAndReturn added in v1.31.1

type MockNodeSelector_NodeAddress_Call added in v1.31.1

type MockNodeSelector_NodeAddress_Call struct {
	*mock.Call
}

MockNodeSelector_NodeAddress_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'NodeAddress'

func (*MockNodeSelector_NodeAddress_Call) Return added in v1.31.1

func (*MockNodeSelector_NodeAddress_Call) Run added in v1.31.1

func (*MockNodeSelector_NodeAddress_Call) RunAndReturn added in v1.31.1

type MockNodeSelector_NodeGRPCPort_Call added in v1.32.0

type MockNodeSelector_NodeGRPCPort_Call struct {
	*mock.Call
}

MockNodeSelector_NodeGRPCPort_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'NodeGRPCPort'

func (*MockNodeSelector_NodeGRPCPort_Call) Return added in v1.32.0

func (*MockNodeSelector_NodeGRPCPort_Call) Run added in v1.32.0

func (*MockNodeSelector_NodeGRPCPort_Call) RunAndReturn added in v1.32.0

type MockNodeSelector_NodeHostname_Call added in v1.31.1

type MockNodeSelector_NodeHostname_Call struct {
	*mock.Call
}

MockNodeSelector_NodeHostname_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'NodeHostname'

func (*MockNodeSelector_NodeHostname_Call) Return added in v1.31.1

func (*MockNodeSelector_NodeHostname_Call) Run added in v1.31.1

func (*MockNodeSelector_NodeHostname_Call) RunAndReturn added in v1.31.1

type MockNodeSelector_NonStorageNodes_Call added in v1.31.1

type MockNodeSelector_NonStorageNodes_Call struct {
	*mock.Call
}

MockNodeSelector_NonStorageNodes_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'NonStorageNodes'

func (*MockNodeSelector_NonStorageNodes_Call) Return added in v1.31.1

func (*MockNodeSelector_NonStorageNodes_Call) Run added in v1.31.1

func (*MockNodeSelector_NonStorageNodes_Call) RunAndReturn added in v1.31.1

type MockNodeSelector_Shutdown_Call added in v1.32.11

type MockNodeSelector_Shutdown_Call struct {
	*mock.Call
}

MockNodeSelector_Shutdown_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Shutdown'

func (*MockNodeSelector_Shutdown_Call) Return added in v1.32.11

func (*MockNodeSelector_Shutdown_Call) Run added in v1.32.11

func (*MockNodeSelector_Shutdown_Call) RunAndReturn added in v1.32.11

type MockNodeSelector_SortCandidates_Call added in v1.31.1

type MockNodeSelector_SortCandidates_Call struct {
	*mock.Call
}

MockNodeSelector_SortCandidates_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'SortCandidates'

func (*MockNodeSelector_SortCandidates_Call) Return added in v1.31.1

func (*MockNodeSelector_SortCandidates_Call) Run added in v1.31.1

func (*MockNodeSelector_SortCandidates_Call) RunAndReturn added in v1.31.1

type MockNodeSelector_StorageCandidates_Call added in v1.31.1

type MockNodeSelector_StorageCandidates_Call struct {
	*mock.Call
}

MockNodeSelector_StorageCandidates_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'StorageCandidates'

func (*MockNodeSelector_StorageCandidates_Call) Return added in v1.31.1

func (*MockNodeSelector_StorageCandidates_Call) Run added in v1.31.1

func (*MockNodeSelector_StorageCandidates_Call) RunAndReturn added in v1.31.1

type NodeInfo added in v1.18.3

type NodeInfo struct {
	DiskUsage
	LastTimeMilli int64 // last update time in milliseconds
}

NodeInfo disk space

type NodeIterationStrategy

type NodeIterationStrategy int
const (
	StartRandom NodeIterationStrategy = iota
	StartAfter
)

type NodeIterator

type NodeIterator struct {
	// contains filtered or unexported fields
}

func NewNodeIterator

func NewNodeIterator(nodeNames []string,
	strategy NodeIterationStrategy,
) (*NodeIterator, error)

func (*NodeIterator) Next

func (n *NodeIterator) Next() string

func (*NodeIterator) SetStartNode

func (n *NodeIterator) SetStartNode(startNode string)

type NodeMetadata added in v1.32.0

type NodeMetadata struct {
	RestPort int `json:"rest_port"`
	GrpcPort int `json:"grpc_port"`
}

type NodeSelector added in v1.25.13

type NodeSelector interface {
	// NodeAddress resolves node id into an ip address without the port.
	NodeAddress(id string) string
	// NodeGRPCPort returns the gRPC port for a specific node id.
	NodeGRPCPort(id string) (int, error)
	// StorageCandidates returns list of storage nodes (names)
	// sorted by the free amount of disk space in descending orders
	StorageCandidates() []string
	// NonStorageNodes return nodes from member list which
	// they are configured not to be voter only
	NonStorageNodes() []string
	// SortCandidates Sort passed nodes names by the
	// free amount of disk space in descending order
	SortCandidates(nodes []string) []string
	// LocalName() return local node name
	LocalName() string
	// NodeHostname return hosts address for a specific node name
	NodeHostname(name string) (string, bool)
	AllHostnames() []string
	// AllOtherClusterMembers returns all cluster members discovered via memberlist with their raft addresses
	// This is useful for bootstrap when the join config is incomplete
	// TODO-RAFT: shall be removed once unifying with raft package
	AllOtherClusterMembers(port int) map[string]string
	// Leave marks the node as leaving the cluster (still visible but shutting down)
	Leave() error
	// Shutdown called when leaves the cluster gracefully and shuts down the memberlist instance
	Shutdown() error
}

NodeSelector is an interface to select a portion of the available nodes in memberlist

type Persistence added in v1.21.3

type Persistence interface {
	StoreTx(ctx context.Context, tx *Transaction) error
	DeleteTx(ctx context.Context, txID string) error
	IterateAll(ctx context.Context, cb func(tx *Transaction)) error
}

type Remote

type Remote interface {
	BroadcastTransaction(ctx context.Context, tx *Transaction) error
	BroadcastAbortTransaction(ctx context.Context, tx *Transaction) error
	BroadcastCommitTransaction(ctx context.Context, tx *Transaction) error
}

type RequestQueueConfig added in v1.30.19

type RequestQueueConfig struct {
	// IsEnabled is used to enable/disable the request queue, can be modified at runtime
	IsEnabled *configRuntime.DynamicValue[bool] `json:"isEnabled" yaml:"isEnabled"`
	// NumWorkers is used to configure the number of workers that handle requests from the queue
	NumWorkers int `json:"numWorkers" yaml:"numWorkers"`
	// QueueSize is used to configure the size of the request queue buffer
	QueueSize int `json:"queueSize" yaml:"queueSize"`
	// QueueFullHttpStatus is used to configure the http status code that is returned when the request queue is full
	// Should usually be set to 429 or 504 (429 will be retried by the coordinator, 504 will not)
	QueueFullHttpStatus int `json:"queueFullHttpStatus" yaml:"queueFullHttpStatus"`
	// QueueShutdownTimeoutSeconds is used to configure the timeout for the request queue shutdown.
	// This is the timeout for the workers to finish processing the requests in the queue
	// and for the request queue to be drained.
	// Should usually be set to 90 seconds, based on coordinator's timeout
	QueueShutdownTimeoutSeconds int `json:"queueShutdownTimeoutSeconds" yaml:"queueShutdownTimeoutSeconds"`
}

RequestQueueConfig is used to configure the request queue buffer for the replicated indices

type ResponseFn

type ResponseFn func(ctx context.Context, tx *Transaction) ([]byte, error)

type State

type State struct {
	// contains filtered or unexported fields
}

func Init

func Init(userConfig Config, grpcPort, raftTimeoutsMultiplier int, dataPath string, nonStorageNodes map[string]struct{}, logger logrus.FieldLogger) (_ *State, err error)

func (*State) AllHostnames

func (s *State) AllHostnames() []string

AllHostnames for live members, including self.

func (*State) AllNames

func (s *State) AllNames() []string

All node names (not their hostnames!) for live members, including self.

func (*State) AllOtherClusterMembers added in v1.31.17

func (s *State) AllOtherClusterMembers(port int) map[string]string

AllOtherClusterMembers returns all cluster members discovered via memberlist with their raft addresses This is useful for bootstrap when the join config is incomplete

func (*State) ClusterHealthScore

func (s *State) ClusterHealthScore() int

func (*State) Hostnames

func (s *State) Hostnames() []string

Hostnames for all live members, except self. Use AllHostnames to include self, prefixes the data port.

func (*State) Leave added in v1.31.17

func (s *State) Leave() error

Leave marks the node as leaving the cluster (still visible but shutting down)

func (*State) LocalAddr added in v1.31.19

func (s *State) LocalAddr() string

LocalAddr() returns local address

func (*State) LocalBindAddr added in v1.31.19

func (s *State) LocalBindAddr() string

func (*State) LocalName

func (s *State) LocalName() string

LocalName() return local node name

func (*State) MaintenanceModeEnabledForLocalhost added in v1.27.7

func (s *State) MaintenanceModeEnabledForLocalhost() bool

MaintenanceModeEnabledForLocalhost is experimental, may be removed/changed. It returns true if this node is in maintenance mode (which means it should return an error for all data requests).

func (*State) NodeAddress added in v1.25.0

func (s *State) NodeAddress(id string) string

NodeAddress is used to resolve the node name into an ip address without the port TODO-RAFT-DB-63 : shall be replaced by Members() which returns members in the list

func (*State) NodeCount

func (s *State) NodeCount() int

All node names (not their hostnames!) for live members, including self.

func (*State) NodeGRPCPort added in v1.32.0

func (s *State) NodeGRPCPort(nodeID string) (int, error)

func (*State) NodeHostname

func (s *State) NodeHostname(nodeName string) (string, bool)

func (*State) NodeInfo added in v1.18.3

func (s *State) NodeInfo(node string) (NodeInfo, bool)

func (*State) NonStorageNodes added in v1.25.13

func (s *State) NonStorageNodes() []string

NonStorageNodes return nodes from member list which they are configured not to be voter only

func (*State) SchemaSyncIgnored added in v1.17.4

func (s *State) SchemaSyncIgnored() bool

func (*State) SetMaintenanceModeForLocalhost added in v1.27.7

func (s *State) SetMaintenanceModeForLocalhost(enabled bool)

SetMaintenanceModeForLocalhost is experimental, may be removed/changed. Enables/disables maintenance mode for this node.

func (*State) Shutdown added in v1.31.17

func (s *State) Shutdown() error

Shutdown called when leaves the cluster gracefully and shuts down the memberlist instance

func (*State) SkipSchemaRepair added in v1.22.0

func (s *State) SkipSchemaRepair() bool

func (*State) SortCandidates added in v1.25.13

func (s *State) SortCandidates(nodes []string) []string

SortCandidates Sort passed nodes names by the free amount of disk space in descending order

func (*State) StorageCandidates added in v1.25.13

func (s *State) StorageCandidates() []string

StorageCandidates returns list of storage nodes (names) sorted by the free amount of disk space in descending order

type Transaction

type Transaction struct {
	ID       string
	Type     TransactionType
	Payload  interface{}
	Deadline time.Time

	// If TolerateNodeFailures is false (the default) a transaction cannot be
	// opened or committed if a node is confirmed dead. If a node is only
	// suspected dead, the TxManager will try, but abort unless all nodes ACK.
	TolerateNodeFailures bool
}

type TransactionType

type TransactionType string

type TxBroadcaster

type TxBroadcaster struct {
	// contains filtered or unexported fields
}

func NewTxBroadcaster

func NewTxBroadcaster(state MemberLister, client Client, logger logrus.FieldLogger) *TxBroadcaster

func (*TxBroadcaster) BroadcastAbortTransaction

func (t *TxBroadcaster) BroadcastAbortTransaction(rootCtx context.Context, tx *Transaction) error

func (*TxBroadcaster) BroadcastCommitTransaction

func (t *TxBroadcaster) BroadcastCommitTransaction(rootCtx context.Context, tx *Transaction) error

func (*TxBroadcaster) BroadcastTransaction

func (t *TxBroadcaster) BroadcastTransaction(rootCtx context.Context, tx *Transaction) error

func (*TxBroadcaster) SetConsensusFunction

func (t *TxBroadcaster) SetConsensusFunction(fn ConsensusFn)

type TxManager

type TxManager struct {
	sync.Mutex
	// contains filtered or unexported fields
}

func NewTxManager

func NewTxManager(remote Remote, persistence Persistence,
	logger logrus.FieldLogger,
) *TxManager

func (*TxManager) BeginTransaction

func (c *TxManager) BeginTransaction(ctx context.Context, trType TransactionType,
	payload interface{}, ttl time.Duration,
) (*Transaction, error)

Begin a Transaction with the specified type and payload. Transactions expire after the specified TTL. For a transaction that does not ever expire, pass in a ttl of 0. When choosing TTLs keep in mind that clocks might be slightly skewed in the cluster, therefore set your TTL for desiredTTL + toleratedClockSkew

Regular transactions cannot be opened if the cluster is not considered healthy.

func (*TxManager) BeginTransactionTolerateNodeFailures added in v1.17.4

func (c *TxManager) BeginTransactionTolerateNodeFailures(ctx context.Context, trType TransactionType,
	payload interface{}, ttl time.Duration,
) (*Transaction, error)

Begin a Transaction that does not require the whole cluster to be healthy. This can be used for example in bootstrapping situations when not all nodes are present yet, or in disaster recovery situations when a node needs to run a transaction in order to re-join a cluster.

func (*TxManager) CloseReadTransaction

func (c *TxManager) CloseReadTransaction(ctx context.Context,
	tx *Transaction,
) error

func (*TxManager) CommitWriteTransaction

func (c *TxManager) CommitWriteTransaction(ctx context.Context,
	tx *Transaction,
) error

func (*TxManager) HaveDanglingTxs added in v1.21.3

func (c *TxManager) HaveDanglingTxs(ctx context.Context,
	allowedTypes []TransactionType,
) (found bool)

HaveDanglingTxs is a way to check if there are any uncommitted transactions in the durable storage. This can be used to make decisions about whether a failed schema check can be temporarily ignored - with the assumption that applying the dangling txs will fix the issue.

func (*TxManager) IncomingAbortTransaction

func (c *TxManager) IncomingAbortTransaction(ctx context.Context,
	tx *Transaction,
)

func (*TxManager) IncomingBeginTransaction

func (c *TxManager) IncomingBeginTransaction(ctx context.Context,
	tx *Transaction,
) ([]byte, error)

func (*TxManager) IncomingCommitTransaction

func (c *TxManager) IncomingCommitTransaction(ctx context.Context,
	tx *Transaction,
) error

func (*TxManager) SetAllowUnready added in v1.21.3

func (c *TxManager) SetAllowUnready(types []TransactionType)

func (*TxManager) SetCommitFn

func (c *TxManager) SetCommitFn(fn CommitFn)

SetCommitFn sets a function that is used in Write Transactions, you can read from the transaction payload and use that state to alter your local state

func (*TxManager) SetResponseFn

func (c *TxManager) SetResponseFn(fn ResponseFn)

SetResponseFn sets a function that is used in Read Transactions. The function sets the local state (by writing it into the Tx Payload). It can then be sent to other nodes. Consensus is not part of the ResponseFn. The coordinator - who initiated the Tx - is responsible for coming up with consensus. Deciding on Consensus requires insights into business logic, as from the TX's perspective payloads are opaque.

func (*TxManager) Shutdown added in v1.21.3

func (c *TxManager) Shutdown()

func (*TxManager) StartAcceptIncoming added in v1.21.3

func (c *TxManager) StartAcceptIncoming()

func (*TxManager) TryResumeDanglingTxs added in v1.21.3

func (c *TxManager) TryResumeDanglingTxs(ctx context.Context,
	allowedTypes []TransactionType,
) (applied bool, err error)

TryResumeDanglingTxs loops over the existing transactions and applies them. It only does so if the transaction type is explicitly listed as allowed. This is because - at the time of creating this - we were not sure if all transaction commit functions are idempotent. If one would not be, then reapplying a tx or tx commit could potentially be dangerous, as we don't know if it was already applied prior to the node death.

For example, think of a "add property 'foo'" tx, that does nothing but append the property to the schema. If this ran twice, we might now end up with two duplicate properties with the name 'foo' which could in turn create other problems. To make sure all txs are resumable (which is what we want because that's the only way to avoid schema issues), we need to make sure that every single tx is idempotent, then add them to the allow list.

One other limitation is that this method currently does nothing to check if a tx was really committed or not. In an ideal world, the node would contact the other nodes and ask. However, this sipmler implementation does not do this check. Instead [HaveDanglingTxs] is used in combination with the schema check. If the schema is not out of sync in the first place, no txs will be applied. This does not cover all edge cases, but it seems to work for now. This should be improved in the future.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL