Documentation
¶
Overview ¶
Package controller provides the core Controller struct.
Index ¶
- Constants
- func NewErrCustom() error
- func NewErrDirectiveSendFailure(msg string) error
- func NewErrInternal(msg string) error
- func NewErrInvalidRequest(msg string) error
- func NewErrNoAvailableNode() error
- func NewErrNodeExists(addr dax.Address) error
- func NewErrNodeKeyInvalid(addr dax.Address) error
- func NewErrRoleTypeInvalid(roleType dax.RoleType) error
- func NewErrUnassignedJobs(jobs []dax.Job) error
- type AddressSet
- type Balancer
- type ComputeNode
- type Config
- type Controller
- func (c *Controller) AddAddresses(ctx context.Context, addrs ...dax.Address) error
- func (c *Controller) AddShards(ctx context.Context, qtid dax.QualifiedTableID, shards ...dax.VersionedShard) error
- func (c *Controller) CheckInNode(ctx context.Context, n *dax.Node) error
- func (c *Controller) ComputeNodes(ctx context.Context, qtid dax.QualifiedTableID, shards dax.ShardNums, ...) ([]ComputeNode, error)
- func (c *Controller) CreateField(ctx context.Context, qtid dax.QualifiedTableID, fld *dax.Field) error
- func (c *Controller) CreateTable(ctx context.Context, qtbl *dax.QualifiedTable) error
- func (c *Controller) DebugNodes(ctx context.Context) ([]*dax.Node, error)
- func (c *Controller) DeregisterNodes(ctx context.Context, addresses ...dax.Address) error
- func (c *Controller) DropField(ctx context.Context, qtid dax.QualifiedTableID, fldName dax.FieldName) error
- func (c *Controller) DropTable(ctx context.Context, qtid dax.QualifiedTableID) error
- func (c *Controller) IngestPartition(ctx context.Context, qtid dax.QualifiedTableID, partition dax.PartitionNum) (dax.Address, error)
- func (c *Controller) InitializePoller(ctx context.Context) error
- func (c *Controller) Nodes(ctx context.Context, role dax.Role, createMissing bool) ([]dax.AssignedNode, error)
- func (c *Controller) RegisterNode(ctx context.Context, n *dax.Node) error
- func (c *Controller) RegisterNodes(ctx context.Context, nodes ...*dax.Node) error
- func (c *Controller) RemoveAddresses(ctx context.Context, addrs ...dax.Address) error
- func (c *Controller) RemoveShards(ctx context.Context, qtid dax.QualifiedTableID, shards ...dax.VersionedShard) error
- func (c *Controller) Run() error
- func (c *Controller) SetPoller(poller dax.AddressManager)
- func (c *Controller) SnapshotFieldKeys(ctx context.Context, qtid dax.QualifiedTableID, field dax.FieldName) error
- func (c *Controller) SnapshotShardData(ctx context.Context, qtid dax.QualifiedTableID, shardNum dax.ShardNum) error
- func (c *Controller) SnapshotTable(ctx context.Context, qtid dax.QualifiedTableID) error
- func (c *Controller) SnapshotTableKeys(ctx context.Context, qtid dax.QualifiedTableID, partitionNum dax.PartitionNum) error
- func (c *Controller) Stop()
- func (c *Controller) Table(ctx context.Context, qtid dax.QualifiedTableID) (*dax.QualifiedTable, error)
- func (c *Controller) Tables(ctx context.Context, qual dax.TableQualifier, ids ...dax.TableID) ([]*dax.QualifiedTable, error)
- func (c *Controller) TranslateNodes(ctx context.Context, qtid dax.QualifiedTableID, partitions dax.PartitionNums, ...) ([]TranslateNode, error)
- type Director
- type NewBalancerFn
- type NopBalancer
- func (b *NopBalancer) AddJobs(ctx context.Context, job ...fmt.Stringer) ([]dax.WorkerDiff, error)
- func (b *NopBalancer) AddWorker(ctx context.Context, worker fmt.Stringer) ([]dax.WorkerDiff, error)
- func (b *NopBalancer) Balance(ctx context.Context) ([]dax.WorkerDiff, error)
- func (b *NopBalancer) CurrentState(ctx context.Context) ([]dax.WorkerInfo, error)
- func (b *NopBalancer) RemoveJob(ctx context.Context, job fmt.Stringer) ([]dax.WorkerDiff, error)
- func (b *NopBalancer) RemoveWorker(ctx context.Context, worker fmt.Stringer) ([]dax.WorkerDiff, error)
- func (b *NopBalancer) WorkerState(ctx context.Context, worker dax.Worker) (dax.WorkerInfo, error)
- func (b *NopBalancer) WorkersForJobPrefix(ctx context.Context, prefix string) ([]dax.WorkerInfo, error)
- func (b *NopBalancer) WorkersForJobs(ctx context.Context, jobs []dax.Job) ([]dax.WorkerInfo, error)
- type NopDirector
- func (d *NopDirector) SendDirective(ctx context.Context, dir *dax.Directive) error
- func (d *NopDirector) SendSnapshotFieldKeysRequest(ctx context.Context, req *dax.SnapshotFieldKeysRequest) error
- func (d *NopDirector) SendSnapshotShardDataRequest(ctx context.Context, req *dax.SnapshotShardDataRequest) error
- func (d *NopDirector) SendSnapshotTableKeysRequest(ctx context.Context, req *dax.SnapshotTableKeysRequest) error
- type StringSet
- type TableSet
- type TranslateNode
Constants ¶
const ( // ErrCodeCustom can be used to return a custom error message. ErrCodeCustom errors.Code = "CustomError" // ErrCodeInternal can be used when the cause of an error can't be // determined. It can be accompanied by a single string message. ErrCodeInternal errors.Code = "InternalError" // ErrCodeTODO can be used as a placeholder until a proper error code is // created and assigned. ErrCodeTODO errors.Code = "TODOError" ErrCodeNodeExists errors.Code = "NodeExists" ErrCodeNodeKeyInvalid errors.Code = "NodeKeyInvalid" ErrCodeNoAvailableNode errors.Code = "NoAvailableNode" ErrCodeRoleTypeInvalid errors.Code = "RoleTypeInvalid" ErrCodeDirectiveSendFailure errors.Code = "DirectiveSendFailure" ErrCodeInvalidRequest errors.Code = "InvalidRequest" ErrCodeUnassignedJobs errors.Code = "UnassignedJobs" UndefinedErrorMessage string = "undefined message format" )
Variables ¶
This section is empty.
Functions ¶
func NewErrCustom ¶
func NewErrCustom() error
NewErrCustom can be used to return a custom error message.
func NewErrInternal ¶
NewErrInternal can be used when the cause of an error can't be determined. It can be accompanied by a single string message.
func NewErrInvalidRequest ¶
func NewErrNoAvailableNode ¶
func NewErrNoAvailableNode() error
func NewErrNodeExists ¶
func NewErrNodeKeyInvalid ¶
func NewErrRoleTypeInvalid ¶
func NewErrUnassignedJobs ¶
Types ¶
type AddressSet ¶
AddressSet is a set of strings.
func NewAddressSet ¶
func NewAddressSet() AddressSet
func (AddressSet) Add ¶
func (s AddressSet) Add(p dax.Address)
func (AddressSet) Minus ¶
func (s AddressSet) Minus(m AddressSet) []dax.Address
func (AddressSet) Remove ¶
func (s AddressSet) Remove(p dax.Address)
func (AddressSet) SortedSlice ¶
func (s AddressSet) SortedSlice() []dax.Address
type Balancer ¶
type Balancer interface {
AddWorker(ctx context.Context, worker fmt.Stringer) ([]dax.WorkerDiff, error)
RemoveWorker(ctx context.Context, worker fmt.Stringer) ([]dax.WorkerDiff, error)
AddJobs(ctx context.Context, job ...fmt.Stringer) ([]dax.WorkerDiff, error)
RemoveJob(ctx context.Context, job fmt.Stringer) ([]dax.WorkerDiff, error)
Balance(ctx context.Context) ([]dax.WorkerDiff, error)
CurrentState(ctx context.Context) ([]dax.WorkerInfo, error)
WorkerState(ctx context.Context, worker dax.Worker) (dax.WorkerInfo, error)
WorkersForJobs(ctx context.Context, jobs []dax.Job) ([]dax.WorkerInfo, error)
// WorkersForJobPrefix returns all workers and their job
// assignments which start with `prefix` for all jobs that start
// with `prefix`. If there are free jobs that start with `prefix`
// an error is returned.
//
// The motivating use case is getting all workers for a particular
// table so we can execute a query that will hit every shard in a
// table. If there are jobs representing shards in that table
// which are not assigned to any worker, that means the query
// would return incomplete data, so we want to error.
WorkersForJobPrefix(ctx context.Context, prefix string) ([]dax.WorkerInfo, error)
}
type ComputeNode ¶
type ComputeNode struct {
Address dax.Address `json:"address"`
Table dax.TableKey `json:"table"`
Shards dax.ShardNums `json:"shards"`
}
ComputeNode represents a compute node and the table/shards for which it is responsible.
type Config ¶
type Config struct {
Director Director
Schemar schemar.Schemar
ComputeBalancer Balancer
TranslateBalancer Balancer
StorageMethod string
BoltDB *boltdb.DB
// RegistrationBatchTimeout is the time that the controller will
// wait after a node registers itself to see if any more nodes
// will register before sending out directives to all nodes which
// have been registered.
RegistrationBatchTimeout time.Duration
Logger logger.Logger
}
type Controller ¶
type Controller struct {
// Schemar used by the controller to get table information. The controller
// should NOT call Schemar methods which modify data. Schema mutations are
// made outside of the controller (at this point that happens in MDS).
Schemar schemar.Schemar
ComputeBalancer Balancer
TranslateBalancer Balancer
// Director is used to send directives to computer workers.
Director Director
// contains filtered or unexported fields
}
func New ¶
func New(cfg Config) *Controller
New returns a new instance of Controller with default values.
func (*Controller) AddAddresses ¶
func (*Controller) AddShards ¶
func (c *Controller) AddShards(ctx context.Context, qtid dax.QualifiedTableID, shards ...dax.VersionedShard) error
AddShards registers the table/shard combinations with the controller and sends the necessary directive.
func (*Controller) CheckInNode ¶
CheckInNode handles a "check-in" from a compute node. These come periodically, and if the controller already knows about the compute node, it can simply no-op. If, however, the controller is not aware of the node checking in, then that probably means that the poller has removed that node from its list (perhaps due to a network fault) and therefore the node needs to be re-registered.
func (*Controller) ComputeNodes ¶
func (c *Controller) ComputeNodes(ctx context.Context, qtid dax.QualifiedTableID, shards dax.ShardNums, isWrite bool) ([]ComputeNode, error)
func (*Controller) CreateField ¶
func (c *Controller) CreateField(ctx context.Context, qtid dax.QualifiedTableID, fld *dax.Field) error
func (*Controller) CreateTable ¶
func (c *Controller) CreateTable(ctx context.Context, qtbl *dax.QualifiedTable) error
CreateTable adds a table to the versionStore and schemar, and then sends directives to all affected nodes based on the change.
func (*Controller) DebugNodes ¶
func (*Controller) DeregisterNodes ¶
DeregisterNodes removes nodes from the controller's list of registered nodes. It sends directives to the removed nodes, but ignores errors.
func (*Controller) DropField ¶
func (c *Controller) DropField(ctx context.Context, qtid dax.QualifiedTableID, fldName dax.FieldName) error
func (*Controller) DropTable ¶
func (c *Controller) DropTable(ctx context.Context, qtid dax.QualifiedTableID) error
DropTable removes a table from the schema and sends directives to all affected nodes based on the change.
func (*Controller) IngestPartition ¶
func (c *Controller) IngestPartition(ctx context.Context, qtid dax.QualifiedTableID, partition dax.PartitionNum) (dax.Address, error)
func (*Controller) InitializePoller ¶
func (c *Controller) InitializePoller(ctx context.Context) error
InitializePoller sends the list of known nodes (to be polled) to the poller. This is useful in the case where MDS has restarted (or has been replaced) and its poller is emtpy (i.e. it doesn't know about any nodes).
func (*Controller) Nodes ¶
func (c *Controller) Nodes(ctx context.Context, role dax.Role, createMissing bool) ([]dax.AssignedNode, error)
Nodes returns the list of assigned nodes responsible for the jobs included in the given role. If createMissing is true, the Controller will create new jobs for any of which it isn't currently aware.
func (*Controller) RegisterNode ¶
RegisterNode adds a node to the controller's list of registered nodes. It makes no guarantees about when the node will actually be used for anything or assigned any jobs.
func (*Controller) RegisterNodes ¶
RegisterNodes adds nodes to the controller's list of registered nodes.
func (*Controller) RemoveAddresses ¶
func (*Controller) RemoveShards ¶
func (c *Controller) RemoveShards(ctx context.Context, qtid dax.QualifiedTableID, shards ...dax.VersionedShard) error
RemoveShards deregisters the table/shard combinations with the controller and sends the necessary directives.
func (*Controller) Run ¶
func (c *Controller) Run() error
Run starts the node registration goroutine.
func (*Controller) SetPoller ¶
func (c *Controller) SetPoller(poller dax.AddressManager)
func (*Controller) SnapshotFieldKeys ¶
func (c *Controller) SnapshotFieldKeys(ctx context.Context, qtid dax.QualifiedTableID, field dax.FieldName) error
SnapshotFieldKeys forces the translate node responsible for the given field to snapshot the keys for that field, then increment its version for logs written to the WriteLogger.
func (*Controller) SnapshotShardData ¶
func (c *Controller) SnapshotShardData(ctx context.Context, qtid dax.QualifiedTableID, shardNum dax.ShardNum) error
SnapshotShardData forces the compute node responsible for the given shard to snapshot that shard, then increment its shard version for logs written to the WriteLogger.
func (*Controller) SnapshotTable ¶
func (c *Controller) SnapshotTable(ctx context.Context, qtid dax.QualifiedTableID) error
SnapshotTable snapshots a table.
func (*Controller) SnapshotTableKeys ¶
func (c *Controller) SnapshotTableKeys(ctx context.Context, qtid dax.QualifiedTableID, partitionNum dax.PartitionNum) error
SnapshotTableKeys forces the translate node responsible for the given partition to snapshot the table keys for that partition, then increment its version for logs written to the WriteLogger.
func (*Controller) Table ¶
func (c *Controller) Table(ctx context.Context, qtid dax.QualifiedTableID) (*dax.QualifiedTable, error)
Table returns a table by quaified table id.
func (*Controller) Tables ¶
func (c *Controller) Tables(ctx context.Context, qual dax.TableQualifier, ids ...dax.TableID) ([]*dax.QualifiedTable, error)
Tables returns a list of tables by name.
func (*Controller) TranslateNodes ¶
func (c *Controller) TranslateNodes(ctx context.Context, qtid dax.QualifiedTableID, partitions dax.PartitionNums, isWrite bool) ([]TranslateNode, error)
type Director ¶
type Director interface {
SendDirective(ctx context.Context, dir *dax.Directive) error
SendSnapshotShardDataRequest(ctx context.Context, req *dax.SnapshotShardDataRequest) error
SendSnapshotTableKeysRequest(ctx context.Context, req *dax.SnapshotTableKeysRequest) error
SendSnapshotFieldKeysRequest(ctx context.Context, req *dax.SnapshotFieldKeysRequest) error
}
type NopBalancer ¶
type NopBalancer struct{}
NopBalancer is a no-op implementation of the Balancer interface.
func NewNopBalancer ¶
func NewNopBalancer() *NopBalancer
func (*NopBalancer) AddJobs ¶
func (b *NopBalancer) AddJobs(ctx context.Context, job ...fmt.Stringer) ([]dax.WorkerDiff, error)
func (*NopBalancer) AddWorker ¶
func (b *NopBalancer) AddWorker(ctx context.Context, worker fmt.Stringer) ([]dax.WorkerDiff, error)
func (*NopBalancer) Balance ¶
func (b *NopBalancer) Balance(ctx context.Context) ([]dax.WorkerDiff, error)
func (*NopBalancer) CurrentState ¶
func (b *NopBalancer) CurrentState(ctx context.Context) ([]dax.WorkerInfo, error)
func (*NopBalancer) RemoveJob ¶
func (b *NopBalancer) RemoveJob(ctx context.Context, job fmt.Stringer) ([]dax.WorkerDiff, error)
func (*NopBalancer) RemoveWorker ¶
func (b *NopBalancer) RemoveWorker(ctx context.Context, worker fmt.Stringer) ([]dax.WorkerDiff, error)
func (*NopBalancer) WorkerState ¶
func (b *NopBalancer) WorkerState(ctx context.Context, worker dax.Worker) (dax.WorkerInfo, error)
func (*NopBalancer) WorkersForJobPrefix ¶
func (b *NopBalancer) WorkersForJobPrefix(ctx context.Context, prefix string) ([]dax.WorkerInfo, error)
func (*NopBalancer) WorkersForJobs ¶
func (b *NopBalancer) WorkersForJobs(ctx context.Context, jobs []dax.Job) ([]dax.WorkerInfo, error)
type NopDirector ¶
type NopDirector struct{}
NopDirector is a no-op implementation of the Director interface.
func NewNopDirector ¶
func NewNopDirector() *NopDirector
func (*NopDirector) SendDirective ¶
func (*NopDirector) SendSnapshotFieldKeysRequest ¶
func (d *NopDirector) SendSnapshotFieldKeysRequest(ctx context.Context, req *dax.SnapshotFieldKeysRequest) error
func (*NopDirector) SendSnapshotShardDataRequest ¶
func (d *NopDirector) SendSnapshotShardDataRequest(ctx context.Context, req *dax.SnapshotShardDataRequest) error
func (*NopDirector) SendSnapshotTableKeysRequest ¶
func (d *NopDirector) SendSnapshotTableKeysRequest(ctx context.Context, req *dax.SnapshotTableKeysRequest) error
type StringSet ¶
type StringSet map[string]struct{}
StringSet is a set of strings.
func NewStringSet ¶
func NewStringSet() StringSet
func (StringSet) SortedSlice ¶
type TableSet ¶
TableSet is a set of strings.
func NewTableSet ¶
func NewTableSet() TableSet
func (TableSet) QualifiedSortedSlice ¶
func (s TableSet) QualifiedSortedSlice() map[dax.TableQualifier]dax.TableIDs
func (TableSet) SortedSlice ¶
type TranslateNode ¶
type TranslateNode struct {
Address dax.Address `json:"address"`
Table dax.TableKey `json:"table"`
Partitions dax.PartitionNums `json:"partitions"`
}
TranslateNode represents a translate node and the table/partitions for which it is responsible.
Source Files
¶
Directories
¶
| Path | Synopsis |
|---|---|
|
Package alpha contains inter-service implemenations of interfaces.
|
Package alpha contains inter-service implemenations of interfaces. |
|
Package http provides the http implementation of the Director interface.
|
Package http provides the http implementation of the Director interface. |
|
Package naive contains a naive implementation of the Balancer interface.
|
Package naive contains a naive implementation of the Balancer interface. |
|
boltdb
Package boltdb contains the boltdb implementation of the Balancer interface.
|
Package boltdb contains the boltdb implementation of the Balancer interface. |
|
Package partitioner provides the Partitioner type, which provides helper methods for determining partitions based on string keys.
|
Package partitioner provides the Partitioner type, which provides helper methods for determining partitions based on string keys. |