controller

package
v3.29.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 25, 2023 License: Apache-2.0, Apache-2.0 Imports: 15 Imported by: 0

Documentation

Overview

Package controller provides the core Controller struct.

Index

Constants

View Source
const (
	// ErrCodeCustom can be used to return a custom error message.
	ErrCodeCustom errors.Code = "CustomError"

	// ErrCodeInternal can be used when the cause of an error can't be
	// determined. It can be accompanied by a single string message.
	ErrCodeInternal errors.Code = "InternalError"

	// ErrCodeTODO can be used as a placeholder until a proper error code is
	// created and assigned.
	ErrCodeTODO errors.Code = "TODOError"

	ErrCodeNodeExists      errors.Code = "NodeExists"
	ErrCodeNodeKeyInvalid  errors.Code = "NodeKeyInvalid"
	ErrCodeNoAvailableNode errors.Code = "NoAvailableNode"

	ErrCodeRoleTypeInvalid errors.Code = "RoleTypeInvalid"

	ErrCodeDirectiveSendFailure errors.Code = "DirectiveSendFailure"

	ErrCodeInvalidRequest errors.Code = "InvalidRequest"

	ErrCodeUnassignedJobs errors.Code = "UnassignedJobs"

	UndefinedErrorMessage string = "undefined message format"
)

Variables

This section is empty.

Functions

func NewErrCustom

func NewErrCustom() error

NewErrCustom can be used to return a custom error message.

func NewErrDirectiveSendFailure

func NewErrDirectiveSendFailure(msg string) error

func NewErrInternal

func NewErrInternal(msg string) error

NewErrInternal can be used when the cause of an error can't be determined. It can be accompanied by a single string message.

func NewErrInvalidRequest

func NewErrInvalidRequest(msg string) error

func NewErrNoAvailableNode

func NewErrNoAvailableNode() error

func NewErrNodeExists

func NewErrNodeExists(addr dax.Address) error

func NewErrNodeKeyInvalid

func NewErrNodeKeyInvalid(addr dax.Address) error

func NewErrRoleTypeInvalid

func NewErrRoleTypeInvalid(roleType dax.RoleType) error

func NewErrUnassignedJobs

func NewErrUnassignedJobs(jobs []dax.Job) error

func NewNopNodeService added in v3.29.0

func NewNopNodeService() *nopNodeService

Types

type AddressSet

type AddressSet map[dax.Address]struct{}

AddressSet is a set of strings.

func NewAddressSet

func NewAddressSet() AddressSet

func (AddressSet) Add

func (s AddressSet) Add(p dax.Address)

func (AddressSet) Contains

func (s AddressSet) Contains(p dax.Address) bool

func (AddressSet) Merge added in v3.29.0

func (s AddressSet) Merge(o AddressSet)

func (AddressSet) Minus

func (s AddressSet) Minus(m AddressSet) []dax.Address

func (AddressSet) Remove

func (s AddressSet) Remove(p dax.Address)

func (AddressSet) SortedSlice

func (s AddressSet) SortedSlice() []dax.Address

type Balancer

type Balancer interface {
	// AddWorker adds a worker to the global pool of available workers.
	AddWorker(tx dax.Transaction, node *dax.Node) ([]dax.WorkerDiff, error)

	// RemoveWorker removes a worker from the system. If the worker is currently
	// assigned to a database and has jobs, it will be removed and its jobs will
	// be either transferred to other workers or placed on the free job list.
	RemoveWorker(tx dax.Transaction, addr dax.Address) ([]dax.WorkerDiff, error)

	// AddJobs adds new jobs for the given database.
	AddJobs(tx dax.Transaction, roleType dax.RoleType, qtid dax.QualifiedTableID, jobs ...dax.Job) ([]dax.WorkerDiff, error)

	// RemoveJobs removes jobs for the given database.
	RemoveJobs(tx dax.Transaction, roleType dax.RoleType, qtid dax.QualifiedTableID, jobs ...dax.Job) ([]dax.WorkerDiff, error)

	// BalanceDatabase forces a database balance. TODO(tlt): currently this is
	// only used in tests, so perhaps we can get rid of it.
	BalanceDatabase(tx dax.Transaction, qdbid dax.QualifiedDatabaseID) ([]dax.WorkerDiff, error)

	// CurrentState returns the workers and jobs currently active for the given
	// database.
	CurrentState(tx dax.Transaction, roleType dax.RoleType, qdbid dax.QualifiedDatabaseID) ([]dax.WorkerInfo, error)

	// WorkerState returns the jobs currently active for the given worker.
	WorkerState(tx dax.Transaction, roleType dax.RoleType, addr dax.Address) (dax.WorkerInfo, error)

	// WorkersForJobs returns the workers and jobs currently responsible for the
	// given jobs.
	WorkersForJobs(tx dax.Transaction, roleType dax.RoleType, qdbid dax.QualifiedDatabaseID, jobs ...dax.Job) ([]dax.WorkerInfo, error)

	// WorkersForTable returns the workers responsible for any job related to
	// the given table.
	WorkersForTable(tx dax.Transaction, roleType dax.RoleType, qtid dax.QualifiedTableID) ([]dax.WorkerInfo, error)

	// ReadNode returns the node for the given address.
	ReadNode(tx dax.Transaction, addr dax.Address) (*dax.Node, error)

	// Nodes returns all nodes known by the Balancer.
	Nodes(tx dax.Transaction) ([]*dax.Node, error)
}

type Config

type Config struct {
	Director Director
	Schemar  schemar.Schemar

	Balancer Balancer

	StorageMethod string
	BoltDB        *boltdb.DB

	SnapshotterDir string
	WriteloggerDir string

	// RegistrationBatchTimeout is the time that the controller will
	// wait after a node registers itself to see if any more nodes
	// will register before sending out directives to all nodes which
	// have been registered.
	RegistrationBatchTimeout time.Duration

	// SnappingTurtleTimeout is the period on which the automatic
	// snapshotting routine will run. If performing all the snapshots
	// takes longer than this amount of time, snapshotting will run
	// continuously. If it finishes before the timeout, it will wait
	// until the timeout expires to start another round of snapshots.
	SnappingTurtleTimeout time.Duration

	Logger logger.Logger
}

type Controller

type Controller struct {
	// Schemar used by the controller to get table information. The controller
	// should NOT call Schemar methods which modify data. Schema mutations are
	// made outside of the controller (at this point that happens in MDS).
	Schemar schemar.Schemar

	Balancer Balancer

	Snapshotter *snapshotter.Snapshotter
	Writelogger *writelogger.Writelogger

	// Director is used to send directives to computer workers.
	Director Director
	// contains filtered or unexported fields
}

func New

func New(cfg Config) *Controller

New returns a new instance of Controller with default values.

func (*Controller) AddAddresses

func (c *Controller) AddAddresses(ctx context.Context, addrs ...dax.Address) error

func (*Controller) CheckInNode

func (c *Controller) CheckInNode(ctx context.Context, n *dax.Node) error

CheckInNode handles a "check-in" from a compute node. These come periodically, and if the controller already knows about the compute node, it can simply no-op. If, however, the controller is not aware of the node checking in, then that probably means that the poller has removed that node from its list (perhaps due to a network fault) and therefore the node needs to be re-registered.

func (*Controller) ComputeNodes

func (c *Controller) ComputeNodes(ctx context.Context, qtid dax.QualifiedTableID, shards dax.ShardNums) ([]dax.ComputeNode, error)

ComputeNodes returns the compute nodes for the given table/shards. It always uses a read transaction. The writable equivalent to this method is `IngestShard`.

func (*Controller) CreateDatabase added in v3.29.0

func (c *Controller) CreateDatabase(ctx context.Context, qdb *dax.QualifiedDatabase) error

CreateDatabase adds a database to the schemar.

func (*Controller) CreateField

func (c *Controller) CreateField(ctx context.Context, qtid dax.QualifiedTableID, fld *dax.Field) error

func (*Controller) CreateNode added in v3.29.0

func (c *Controller) CreateNode(context.Context, dax.Address, *dax.Node) error

func (*Controller) CreateTable

func (c *Controller) CreateTable(ctx context.Context, qtbl *dax.QualifiedTable) error

CreateTable adds a table to the schemar, and then sends directives to all affected nodes based on the change.

func (*Controller) DatabaseByID added in v3.29.0

func (c *Controller) DatabaseByID(ctx context.Context, qdbid dax.QualifiedDatabaseID) (*dax.QualifiedDatabase, error)

DatabaseByID returns the database for the given id.

func (*Controller) DatabaseByName added in v3.29.0

func (c *Controller) DatabaseByName(ctx context.Context, orgID dax.OrganizationID, dbname dax.DatabaseName) (*dax.QualifiedDatabase, error)

DatabaseByName returns the database for the given name.

func (*Controller) Databases added in v3.29.0

func (c *Controller) Databases(ctx context.Context, orgID dax.OrganizationID, ids ...dax.DatabaseID) ([]*dax.QualifiedDatabase, error)

func (*Controller) DebugNodes

func (c *Controller) DebugNodes(ctx context.Context) ([]*dax.Node, error)

func (*Controller) DeleteNode added in v3.29.0

func (c *Controller) DeleteNode(context.Context, dax.Address) error

func (*Controller) DeregisterNodes

func (c *Controller) DeregisterNodes(ctx context.Context, addresses ...dax.Address) error

DeregisterNodes removes nodes from the controller's list of registered nodes. It sends directives to the removed nodes, but ignores errors.

func (*Controller) DropDatabase added in v3.29.0

func (c *Controller) DropDatabase(ctx context.Context, qdbid dax.QualifiedDatabaseID) error

func (*Controller) DropField

func (c *Controller) DropField(ctx context.Context, qtid dax.QualifiedTableID, fldName dax.FieldName) error

func (*Controller) DropTable

func (c *Controller) DropTable(ctx context.Context, qtid dax.QualifiedTableID) error

DropTable removes a table from the schema and sends directives to all affected nodes based on the change.

func (*Controller) IngestPartition

func (c *Controller) IngestPartition(ctx context.Context, qtid dax.QualifiedTableID, partition dax.PartitionNum) (dax.Address, error)

func (*Controller) IngestShard added in v3.29.0

func (c *Controller) IngestShard(ctx context.Context, qtid dax.QualifiedTableID, shrdNum dax.ShardNum) (dax.Address, error)

IngestShard handles an ingest shard request.

func (*Controller) Nodes

func (c *Controller) Nodes(ctx context.Context) ([]*dax.Node, error)

func (*Controller) ReadNode added in v3.29.0

func (c *Controller) ReadNode(context.Context, dax.Address) (*dax.Node, error)

func (*Controller) RegisterNode

func (c *Controller) RegisterNode(ctx context.Context, n *dax.Node) error

RegisterNode adds a node to the controller's list of registered nodes. It makes no guarantees about when the node will actually be used for anything or assigned any jobs.

func (*Controller) RegisterNodes

func (c *Controller) RegisterNodes(ctx context.Context, nodes ...*dax.Node) error

RegisterNodes adds nodes to the controller's list of registered nodes.

func (*Controller) RemoveAddresses

func (c *Controller) RemoveAddresses(ctx context.Context, addrs ...dax.Address) error

func (*Controller) RemoveShards

func (c *Controller) RemoveShards(ctx context.Context, qtid dax.QualifiedTableID, shards ...dax.ShardNum) error

RemoveShards deregisters the table/shard combinations with the controller and sends the necessary directives.

func (*Controller) Run

func (c *Controller) Run() error

Run starts long running subroutines.

func (*Controller) SetDatabaseOptions added in v3.29.0

func (c *Controller) SetDatabaseOptions(ctx context.Context, qdbid dax.QualifiedDatabaseID, opts dax.DatabaseOptions) error

SetDatabaseOptions sets the options on the given database.

func (*Controller) SnapshotFieldKeys

func (c *Controller) SnapshotFieldKeys(ctx context.Context, qtid dax.QualifiedTableID, field dax.FieldName) error

SnapshotFieldKeys forces the translate node responsible for the given field to snapshot the keys for that field, then increment its version for logs written to the Writelogger.

func (*Controller) SnapshotShardData

func (c *Controller) SnapshotShardData(ctx context.Context, qtid dax.QualifiedTableID, shardNum dax.ShardNum) error

SnapshotShardData forces the compute node responsible for the given shard to snapshot that shard, then increment its shard version for logs written to the Writelogger.

func (*Controller) SnapshotTable

func (c *Controller) SnapshotTable(ctx context.Context, qtid dax.QualifiedTableID) error

SnapshotTable snapshots a table. It might also snapshot everything else... no guarantees here, only used in tests as of this writing.

func (*Controller) SnapshotTableKeys

func (c *Controller) SnapshotTableKeys(ctx context.Context, qtid dax.QualifiedTableID, partitionNum dax.PartitionNum) error

SnapshotTableKeys forces the translate node responsible for the given partition to snapshot the table keys for that partition, then increment its version for logs written to the Writelogger.

func (*Controller) Stop

func (c *Controller) Stop()

Stop stops the node registration routine.

func (*Controller) TableByID added in v3.29.0

func (c *Controller) TableByID(ctx context.Context, qtid dax.QualifiedTableID) (*dax.QualifiedTable, error)

TableByID returns a table by quaified table id.

func (*Controller) TableByName added in v3.29.0

func (c *Controller) TableByName(ctx context.Context, qdbid dax.QualifiedDatabaseID, name dax.TableName) (*dax.QualifiedTable, error)

TableByName gets the full table by name.

func (*Controller) TableID added in v3.29.0

TableID returns the table id by table name. TODO(tlt): try to phase this out in favor of TableByName().

func (*Controller) Tables

func (c *Controller) Tables(ctx context.Context, qdbid dax.QualifiedDatabaseID, ids ...dax.TableID) ([]*dax.QualifiedTable, error)

Tables returns a list of tables by name.

func (*Controller) TranslateNodes

func (c *Controller) TranslateNodes(ctx context.Context, qtid dax.QualifiedTableID, partitions dax.PartitionNums) ([]dax.TranslateNode, error)

TranslateNodes returns the translate nodes for the given table/partitions. It always uses a read transaction. The writable equivalent to this method is `IngestPartition`.

type Director

type Director interface {
	SendDirective(ctx context.Context, dir *dax.Directive) error
	SendSnapshotShardDataRequest(ctx context.Context, req *dax.SnapshotShardDataRequest) error
	SendSnapshotTableKeysRequest(ctx context.Context, req *dax.SnapshotTableKeysRequest) error
	SendSnapshotFieldKeysRequest(ctx context.Context, req *dax.SnapshotFieldKeysRequest) error
}

type NewBalancerFn

type NewBalancerFn func(string, logger.Logger) Balancer

type NodeService added in v3.29.0

type NodeService interface {
	CreateNode(dax.Transaction, dax.Address, *dax.Node) error
	ReadNode(dax.Transaction, dax.Address) (*dax.Node, error)
	DeleteNode(dax.Transaction, dax.Address) error
	Nodes(dax.Transaction) ([]*dax.Node, error)
}

NodeService represents a service for managing Nodes.

type NopBalancer

type NopBalancer struct{}

NopBalancer is a no-op implementation of the Balancer interface.

func NewNopBalancer

func NewNopBalancer() *NopBalancer

func (*NopBalancer) AddJobs

func (b *NopBalancer) AddJobs(tx dax.Transaction, roleType dax.RoleType, qtid dax.QualifiedTableID, jobs ...dax.Job) ([]dax.WorkerDiff, error)

func (*NopBalancer) AddWorker

func (b *NopBalancer) AddWorker(tx dax.Transaction, node *dax.Node) ([]dax.WorkerDiff, error)

func (*NopBalancer) BalanceDatabase added in v3.29.0

func (b *NopBalancer) BalanceDatabase(tx dax.Transaction, qdbid dax.QualifiedDatabaseID) ([]dax.WorkerDiff, error)

func (*NopBalancer) CurrentState

func (b *NopBalancer) CurrentState(tx dax.Transaction, roleType dax.RoleType, qdbid dax.QualifiedDatabaseID) ([]dax.WorkerInfo, error)

func (*NopBalancer) Nodes added in v3.29.0

func (b *NopBalancer) Nodes(tx dax.Transaction) ([]*dax.Node, error)

func (*NopBalancer) ReadNode added in v3.29.0

func (b *NopBalancer) ReadNode(tx dax.Transaction, addr dax.Address) (*dax.Node, error)

func (*NopBalancer) RemoveJobs added in v3.27.0

func (b *NopBalancer) RemoveJobs(tx dax.Transaction, roleType dax.RoleType, qtid dax.QualifiedTableID, jobs ...dax.Job) ([]dax.WorkerDiff, error)

func (*NopBalancer) RemoveWorker

func (b *NopBalancer) RemoveWorker(tx dax.Transaction, addr dax.Address) ([]dax.WorkerDiff, error)

func (*NopBalancer) WorkerState

func (b *NopBalancer) WorkerState(tx dax.Transaction, roleType dax.RoleType, addr dax.Address) (dax.WorkerInfo, error)

func (*NopBalancer) WorkersForJobs

func (b *NopBalancer) WorkersForJobs(tx dax.Transaction, roleType dax.RoleType, qdbid dax.QualifiedDatabaseID, jobs ...dax.Job) ([]dax.WorkerInfo, error)

func (*NopBalancer) WorkersForTable added in v3.29.0

func (b *NopBalancer) WorkersForTable(tx dax.Transaction, roleType dax.RoleType, qtid dax.QualifiedTableID) ([]dax.WorkerInfo, error)

type NopDirector

type NopDirector struct{}

NopDirector is a no-op implementation of the Director interface.

func NewNopDirector

func NewNopDirector() *NopDirector

func (*NopDirector) SendDirective

func (d *NopDirector) SendDirective(ctx context.Context, dir *dax.Directive) error

func (*NopDirector) SendSnapshotFieldKeysRequest

func (d *NopDirector) SendSnapshotFieldKeysRequest(ctx context.Context, req *dax.SnapshotFieldKeysRequest) error

func (*NopDirector) SendSnapshotShardDataRequest

func (d *NopDirector) SendSnapshotShardDataRequest(ctx context.Context, req *dax.SnapshotShardDataRequest) error

func (*NopDirector) SendSnapshotTableKeysRequest

func (d *NopDirector) SendSnapshotTableKeysRequest(ctx context.Context, req *dax.SnapshotTableKeysRequest) error

type StringSet

type StringSet map[string]struct{}

StringSet is a set of strings.

func NewStringSet

func NewStringSet() StringSet

func (StringSet) Add

func (s StringSet) Add(p string)

func (StringSet) Contains

func (s StringSet) Contains(p string) bool

func (StringSet) Minus

func (s StringSet) Minus(m StringSet) []string

func (StringSet) Remove

func (s StringSet) Remove(p string)

func (StringSet) SortedSlice

func (s StringSet) SortedSlice() []string

type TableSet

type TableSet map[dax.TableKey]struct{}

TableSet is a set of strings.

func NewTableSet

func NewTableSet() TableSet

func (TableSet) Add

func (s TableSet) Add(t dax.TableKey)

func (TableSet) Contains

func (s TableSet) Contains(t dax.TableKey) bool

func (TableSet) Minus

func (s TableSet) Minus(m TableSet) dax.TableKeys

func (TableSet) QualifiedSortedSlice

func (s TableSet) QualifiedSortedSlice() map[dax.QualifiedDatabaseID]dax.TableIDs

func (TableSet) Remove

func (s TableSet) Remove(t dax.TableKey)

func (TableSet) SortedSlice

func (s TableSet) SortedSlice() dax.TableKeys

Directories

Path Synopsis
Package balancer is an implementation of the controller's Balancer interface.
Package balancer is an implementation of the controller's Balancer interface.
boltdb
Package boltdb contains the boltdb implementation of the Balancer interface.
Package boltdb contains the boltdb implementation of the Balancer interface.
Package http provides the http implementation of the Director interface.
Package http provides the http implementation of the Director interface.
Package partitioner provides the Partitioner type, which provides helper methods for determining partitions based on string keys.
Package partitioner provides the Partitioner type, which provides helper methods for determining partitions based on string keys.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL