controller

package
v3.27.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 12, 2023 License: Apache-2.0, Apache-2.0 Imports: 13 Imported by: 0

Documentation

Overview

Package controller provides the core Controller struct.

Index

Constants

View Source
const (
	// ErrCodeCustom can be used to return a custom error message.
	ErrCodeCustom errors.Code = "CustomError"

	// ErrCodeInternal can be used when the cause of an error can't be
	// determined. It can be accompanied by a single string message.
	ErrCodeInternal errors.Code = "InternalError"

	// ErrCodeTODO can be used as a placeholder until a proper error code is
	// created and assigned.
	ErrCodeTODO errors.Code = "TODOError"

	ErrCodeNodeExists      errors.Code = "NodeExists"
	ErrCodeNodeKeyInvalid  errors.Code = "NodeKeyInvalid"
	ErrCodeNoAvailableNode errors.Code = "NoAvailableNode"

	ErrCodeRoleTypeInvalid errors.Code = "RoleTypeInvalid"

	ErrCodeDirectiveSendFailure errors.Code = "DirectiveSendFailure"

	ErrCodeInvalidRequest errors.Code = "InvalidRequest"

	ErrCodeUnassignedJobs errors.Code = "UnassignedJobs"

	UndefinedErrorMessage string = "undefined message format"
)

Variables

This section is empty.

Functions

func NewErrCustom

func NewErrCustom() error

NewErrCustom can be used to return a custom error message.

func NewErrDirectiveSendFailure

func NewErrDirectiveSendFailure(msg string) error

func NewErrInternal

func NewErrInternal(msg string) error

NewErrInternal can be used when the cause of an error can't be determined. It can be accompanied by a single string message.

func NewErrInvalidRequest

func NewErrInvalidRequest(msg string) error

func NewErrNoAvailableNode

func NewErrNoAvailableNode() error

func NewErrNodeExists

func NewErrNodeExists(addr dax.Address) error

func NewErrNodeKeyInvalid

func NewErrNodeKeyInvalid(addr dax.Address) error

func NewErrRoleTypeInvalid

func NewErrRoleTypeInvalid(roleType dax.RoleType) error

func NewErrUnassignedJobs

func NewErrUnassignedJobs(jobs []dax.Job) error

Types

type AddressSet

type AddressSet map[dax.Address]struct{}

AddressSet is a set of strings.

func NewAddressSet

func NewAddressSet() AddressSet

func (AddressSet) Add

func (s AddressSet) Add(p dax.Address)

func (AddressSet) Contains

func (s AddressSet) Contains(p dax.Address) bool

func (AddressSet) Minus

func (s AddressSet) Minus(m AddressSet) []dax.Address

func (AddressSet) Remove

func (s AddressSet) Remove(p dax.Address)

func (AddressSet) SortedSlice

func (s AddressSet) SortedSlice() []dax.Address

type Balancer

type Balancer interface {
	AddWorker(ctx context.Context, worker fmt.Stringer) ([]dax.WorkerDiff, error)
	RemoveWorker(ctx context.Context, worker fmt.Stringer) ([]dax.WorkerDiff, error)
	AddJobs(ctx context.Context, job ...fmt.Stringer) ([]dax.WorkerDiff, error)
	RemoveJob(ctx context.Context, job fmt.Stringer) ([]dax.WorkerDiff, error)
	Balance(ctx context.Context) ([]dax.WorkerDiff, error)
	CurrentState(ctx context.Context) ([]dax.WorkerInfo, error)
	WorkerState(ctx context.Context, worker dax.Worker) (dax.WorkerInfo, error)
	WorkersForJobs(ctx context.Context, jobs []dax.Job) ([]dax.WorkerInfo, error)

	// WorkersForJobPrefix returns all workers and their job
	// assignments which start with `prefix` for all jobs that start
	// with `prefix`. If there are free jobs that start with `prefix`
	// an error is returned.
	//
	// The motivating use case is getting all workers for a particular
	// table so we can execute a query that will hit every shard in a
	// table. If there are jobs representing shards in that table
	// which are not assigned to any worker, that means the query
	// would return incomplete data, so we want to error.
	WorkersForJobPrefix(ctx context.Context, prefix string) ([]dax.WorkerInfo, error)

	// RemoveJobs is for e.g. when dropping a table remove all jobs
	// associated with that table without needing to look up in
	// advance which shards or partitions are actually present.
	RemoveJobs(ctx context.Context, prefix string) ([]dax.WorkerDiff, error)
}

type Config

type Config struct {
	Director          Director
	Schemar           schemar.Schemar
	ComputeBalancer   Balancer
	TranslateBalancer Balancer

	StorageMethod string
	BoltDB        *boltdb.DB

	// RegistrationBatchTimeout is the time that the controller will
	// wait after a node registers itself to see if any more nodes
	// will register before sending out directives to all nodes which
	// have been registered.
	RegistrationBatchTimeout time.Duration

	// SnappingTurtleTimeout is the period on which the automatic
	// snapshotting routine will run. If performing all the snapshots
	// takes longer than this amount of time, snapshotting will run
	// continuously. If it finishes before the timeout, it will wait
	// until the timeout expires to start another round of snapshots.
	SnappingTurtleTimeout time.Duration

	Logger logger.Logger
}

type Controller

type Controller struct {

	// Schemar used by the controller to get table information. The controller
	// should NOT call Schemar methods which modify data. Schema mutations are
	// made outside of the controller (at this point that happens in MDS).
	Schemar schemar.Schemar

	ComputeBalancer   Balancer
	TranslateBalancer Balancer

	// Director is used to send directives to computer workers.
	Director Director
	// contains filtered or unexported fields
}

func New

func New(cfg Config) *Controller

New returns a new instance of Controller with default values.

func (*Controller) AddAddresses

func (c *Controller) AddAddresses(ctx context.Context, addrs ...dax.Address) error

func (*Controller) AddShards

func (c *Controller) AddShards(ctx context.Context, qtid dax.QualifiedTableID, shards ...dax.ShardNum) error

AddShards registers the table/shard combinations with the controller and sends the necessary directive.

func (*Controller) CheckInNode

func (c *Controller) CheckInNode(ctx context.Context, n *dax.Node) error

CheckInNode handles a "check-in" from a compute node. These come periodically, and if the controller already knows about the compute node, it can simply no-op. If, however, the controller is not aware of the node checking in, then that probably means that the poller has removed that node from its list (perhaps due to a network fault) and therefore the node needs to be re-registered.

func (*Controller) ComputeNodes

func (c *Controller) ComputeNodes(ctx context.Context, qtid dax.QualifiedTableID, shards dax.ShardNums, isWrite bool) ([]dax.ComputeNode, error)

func (*Controller) CreateField

func (c *Controller) CreateField(ctx context.Context, qtid dax.QualifiedTableID, fld *dax.Field) error

func (*Controller) CreateTable

func (c *Controller) CreateTable(ctx context.Context, qtbl *dax.QualifiedTable) error

CreateTable adds a table to the versionStore and schemar, and then sends directives to all affected nodes based on the change.

func (*Controller) DebugNodes

func (c *Controller) DebugNodes(ctx context.Context) ([]*dax.Node, error)

func (*Controller) DeregisterNodes

func (c *Controller) DeregisterNodes(ctx context.Context, addresses ...dax.Address) error

DeregisterNodes removes nodes from the controller's list of registered nodes. It sends directives to the removed nodes, but ignores errors.

func (*Controller) DropField

func (c *Controller) DropField(ctx context.Context, qtid dax.QualifiedTableID, fldName dax.FieldName) error

func (*Controller) DropTable

func (c *Controller) DropTable(ctx context.Context, qtid dax.QualifiedTableID) error

DropTable removes a table from the schema and sends directives to all affected nodes based on the change.

func (*Controller) IngestPartition

func (c *Controller) IngestPartition(ctx context.Context, qtid dax.QualifiedTableID, partition dax.PartitionNum) (dax.Address, error)

func (*Controller) InitializePoller

func (c *Controller) InitializePoller(ctx context.Context) error

InitializePoller sends the list of known nodes (to be polled) to the poller. This is useful in the case where MDS has restarted (or has been replaced) and its poller is emtpy (i.e. it doesn't know about any nodes).

func (*Controller) Nodes

func (c *Controller) Nodes(ctx context.Context, role dax.Role, createMissing bool) ([]dax.AssignedNode, error)

Nodes returns the list of assigned nodes responsible for the jobs included in the given role. If createMissing is true, the Controller will create new jobs for any of which it isn't currently aware.

func (*Controller) RegisterNode

func (c *Controller) RegisterNode(ctx context.Context, n *dax.Node) error

RegisterNode adds a node to the controller's list of registered nodes. It makes no guarantees about when the node will actually be used for anything or assigned any jobs.

func (*Controller) RegisterNodes

func (c *Controller) RegisterNodes(ctx context.Context, nodes ...*dax.Node) error

RegisterNodes adds nodes to the controller's list of registered nodes.

func (*Controller) RemoveAddresses

func (c *Controller) RemoveAddresses(ctx context.Context, addrs ...dax.Address) error

func (*Controller) RemoveShards

func (c *Controller) RemoveShards(ctx context.Context, qtid dax.QualifiedTableID, shards ...dax.ShardNum) error

RemoveShards deregisters the table/shard combinations with the controller and sends the necessary directives.

func (*Controller) Run

func (c *Controller) Run() error

Run starts long running subroutines.

func (*Controller) SetPoller

func (c *Controller) SetPoller(poller dax.AddressManager)

func (*Controller) SnapshotFieldKeys

func (c *Controller) SnapshotFieldKeys(ctx context.Context, qtid dax.QualifiedTableID, field dax.FieldName) error

SnapshotFieldKeys forces the translate node responsible for the given field to snapshot the keys for that field, then increment its version for logs written to the WriteLogger.

func (*Controller) SnapshotShardData

func (c *Controller) SnapshotShardData(ctx context.Context, qtid dax.QualifiedTableID, shardNum dax.ShardNum) error

SnapshotShardData forces the compute node responsible for the given shard to snapshot that shard, then increment its shard version for logs written to the WriteLogger.

func (*Controller) SnapshotTable

func (c *Controller) SnapshotTable(ctx context.Context, qtid dax.QualifiedTableID) error

SnapshotTable snapshots a table. It might also snapshot everything else... no guarantees here, only used in tests as of this writing.

func (*Controller) SnapshotTableKeys

func (c *Controller) SnapshotTableKeys(ctx context.Context, qtid dax.QualifiedTableID, partitionNum dax.PartitionNum) error

SnapshotTableKeys forces the translate node responsible for the given partition to snapshot the table keys for that partition, then increment its version for logs written to the WriteLogger.

func (*Controller) Stop

func (c *Controller) Stop()

Stop stops the node registration routine.

func (*Controller) Table

Table returns a table by quaified table id.

func (*Controller) Tables

func (c *Controller) Tables(ctx context.Context, qual dax.TableQualifier, ids ...dax.TableID) ([]*dax.QualifiedTable, error)

Tables returns a list of tables by name.

func (*Controller) TranslateNodes

func (c *Controller) TranslateNodes(ctx context.Context, qtid dax.QualifiedTableID, partitions dax.PartitionNums, isWrite bool) ([]dax.TranslateNode, error)

type Director

type Director interface {
	SendDirective(ctx context.Context, dir *dax.Directive) error
	SendSnapshotShardDataRequest(ctx context.Context, req *dax.SnapshotShardDataRequest) error
	SendSnapshotTableKeysRequest(ctx context.Context, req *dax.SnapshotTableKeysRequest) error
	SendSnapshotFieldKeysRequest(ctx context.Context, req *dax.SnapshotFieldKeysRequest) error
}

type NewBalancerFn

type NewBalancerFn func(string, logger.Logger) Balancer

type NopBalancer

type NopBalancer struct{}

NopBalancer is a no-op implementation of the Balancer interface.

func NewNopBalancer

func NewNopBalancer() *NopBalancer

func (*NopBalancer) AddJobs

func (b *NopBalancer) AddJobs(ctx context.Context, job ...fmt.Stringer) ([]dax.WorkerDiff, error)

func (*NopBalancer) AddWorker

func (b *NopBalancer) AddWorker(ctx context.Context, worker fmt.Stringer) ([]dax.WorkerDiff, error)

func (*NopBalancer) Balance

func (b *NopBalancer) Balance(ctx context.Context) ([]dax.WorkerDiff, error)

func (*NopBalancer) CurrentState

func (b *NopBalancer) CurrentState(ctx context.Context) ([]dax.WorkerInfo, error)

func (*NopBalancer) RemoveJob

func (b *NopBalancer) RemoveJob(ctx context.Context, job fmt.Stringer) ([]dax.WorkerDiff, error)

func (*NopBalancer) RemoveJobs added in v3.27.0

func (b *NopBalancer) RemoveJobs(ctx context.Context, prefix string) ([]dax.WorkerDiff, error)

func (*NopBalancer) RemoveWorker

func (b *NopBalancer) RemoveWorker(ctx context.Context, worker fmt.Stringer) ([]dax.WorkerDiff, error)

func (*NopBalancer) WorkerState

func (b *NopBalancer) WorkerState(ctx context.Context, worker dax.Worker) (dax.WorkerInfo, error)

func (*NopBalancer) WorkersForJobPrefix

func (b *NopBalancer) WorkersForJobPrefix(ctx context.Context, prefix string) ([]dax.WorkerInfo, error)

func (*NopBalancer) WorkersForJobs

func (b *NopBalancer) WorkersForJobs(ctx context.Context, jobs []dax.Job) ([]dax.WorkerInfo, error)

type NopDirector

type NopDirector struct{}

NopDirector is a no-op implementation of the Director interface.

func NewNopDirector

func NewNopDirector() *NopDirector

func (*NopDirector) SendDirective

func (d *NopDirector) SendDirective(ctx context.Context, dir *dax.Directive) error

func (*NopDirector) SendSnapshotFieldKeysRequest

func (d *NopDirector) SendSnapshotFieldKeysRequest(ctx context.Context, req *dax.SnapshotFieldKeysRequest) error

func (*NopDirector) SendSnapshotShardDataRequest

func (d *NopDirector) SendSnapshotShardDataRequest(ctx context.Context, req *dax.SnapshotShardDataRequest) error

func (*NopDirector) SendSnapshotTableKeysRequest

func (d *NopDirector) SendSnapshotTableKeysRequest(ctx context.Context, req *dax.SnapshotTableKeysRequest) error

type StringSet

type StringSet map[string]struct{}

StringSet is a set of strings.

func NewStringSet

func NewStringSet() StringSet

func (StringSet) Add

func (s StringSet) Add(p string)

func (StringSet) Contains

func (s StringSet) Contains(p string) bool

func (StringSet) Minus

func (s StringSet) Minus(m StringSet) []string

func (StringSet) Remove

func (s StringSet) Remove(p string)

func (StringSet) SortedSlice

func (s StringSet) SortedSlice() []string

type TableSet

type TableSet map[dax.TableKey]struct{}

TableSet is a set of strings.

func NewTableSet

func NewTableSet() TableSet

func (TableSet) Add

func (s TableSet) Add(t dax.TableKey)

func (TableSet) Contains

func (s TableSet) Contains(t dax.TableKey) bool

func (TableSet) Minus

func (s TableSet) Minus(m TableSet) dax.TableKeys

func (TableSet) QualifiedSortedSlice

func (s TableSet) QualifiedSortedSlice() map[dax.TableQualifier]dax.TableIDs

func (TableSet) Remove

func (s TableSet) Remove(t dax.TableKey)

func (TableSet) SortedSlice

func (s TableSet) SortedSlice() dax.TableKeys

Directories

Path Synopsis
Package http provides the http implementation of the Director interface.
Package http provides the http implementation of the Director interface.
Package naive contains a naive implementation of the Balancer interface.
Package naive contains a naive implementation of the Balancer interface.
boltdb
Package boltdb contains the boltdb implementation of the Balancer interface.
Package boltdb contains the boltdb implementation of the Balancer interface.
Package partitioner provides the Partitioner type, which provides helper methods for determining partitions based on string keys.
Package partitioner provides the Partitioner type, which provides helper methods for determining partitions based on string keys.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL