Documentation
¶
Overview ¶
Package server provides a set of struct definitions for the resource group, can be imported.
Index ¶
- Variables
- func CreateServerWrapper(cmd *cobra.Command, args []string)
- func NewService[T ConfigProvider](svr bs.Server) registry.RegistrableService
- type Config
- func (c *Config) Adjust(meta *toml.MetaData) error
- func (c *Config) GeBackendEndpoints() string
- func (c *Config) GetAdvertiseListenAddr() string
- func (c *Config) GetListenAddr() string
- func (c *Config) GetName() string
- func (c *Config) GetTLSConfig() *grpcutil.TLSConfig
- func (c *Config) Parse(flagSet *pflag.FlagSet) error
- func (c *Config) Validate() error
- type ConfigProvider
- type ControllerConfig
- type GroupStates
- type GroupTokenBucket
- type GroupTokenBucketState
- type Manager
- func (m *Manager) AddResourceGroup(grouppb *rmpb.ResourceGroup) error
- func (m *Manager) DeleteResourceGroup(name string) error
- func (m *Manager) GetBasicServer() bs.Server
- func (m *Manager) GetControllerConfig() *ControllerConfig
- func (m *Manager) GetMutableResourceGroup(name string) *ResourceGroup
- func (m *Manager) GetResourceGroup(name string, withStats bool) *ResourceGroup
- func (m *Manager) GetResourceGroupList(withStats bool) []*ResourceGroup
- func (m *Manager) Init(ctx context.Context) error
- func (m *Manager) ModifyResourceGroup(group *rmpb.ResourceGroup) error
- func (m *Manager) UpdateControllerConfigItem(key string, value any) error
- type RequestUnitConfig
- type RequestUnitSettings
- type ResourceGroup
- func (rg *ResourceGroup) Clone(withStats bool) *ResourceGroup
- func (rg *ResourceGroup) GetGroupStates() *GroupStates
- func (rg *ResourceGroup) IntoProtoResourceGroup() *rmpb.ResourceGroup
- func (rg *ResourceGroup) PatchSettings(metaGroup *rmpb.ResourceGroup) error
- func (rg *ResourceGroup) RequestRU(now time.Time, requiredToken float64, targetPeriodMs, clientUniqueID uint64) *rmpb.GrantedRUTokenBucket
- func (rg *ResourceGroup) SetStatesIntoResourceGroup(states *GroupStates)
- func (rg *ResourceGroup) String() string
- func (rg *ResourceGroup) UpdateRUConsumption(c *rmpb.Consumption)
- type Server
- func (s *Server) AddServiceReadyCallback(callbacks ...func(context.Context) error)
- func (s *Server) Close()
- func (s *Server) GetAddr() string
- func (s *Server) GetAdvertiseListenAddr() string
- func (s *Server) GetBackendEndpoints() string
- func (s *Server) GetControllerConfig() *ControllerConfig
- func (s *Server) GetLeaderListenUrls() []string
- func (s *Server) GetTLSConfig() *grpcutil.TLSConfig
- func (s *Server) IsClosed() bool
- func (s *Server) IsServing() bool
- func (s *Server) Name() string
- func (s *Server) RegisterGRPCService(grpcServer *grpc.Server)
- func (s *Server) Run() (err error)
- func (s *Server) ServerLoopWgAdd(n int)
- func (s *Server) ServerLoopWgDone()
- func (s *Server) SetLogLevel(level string) error
- func (s *Server) SetUpRestHandler() (http.Handler, apiutil.APIServiceGroup)
- type Service
- func (s *Service) AcquireTokenBuckets(stream rmpb.ResourceManager_AcquireTokenBucketsServer) error
- func (s *Service) AddResourceGroup(_ context.Context, req *rmpb.PutResourceGroupRequest) (*rmpb.PutResourceGroupResponse, error)
- func (s *Service) DeleteResourceGroup(_ context.Context, req *rmpb.DeleteResourceGroupRequest) (*rmpb.DeleteResourceGroupResponse, error)
- func (s *Service) GetManager() *Manager
- func (s *Service) GetResourceGroup(_ context.Context, req *rmpb.GetResourceGroupRequest) (*rmpb.GetResourceGroupResponse, error)
- func (s *Service) ListResourceGroups(_ context.Context, req *rmpb.ListResourceGroupsRequest) (*rmpb.ListResourceGroupsResponse, error)
- func (s *Service) ModifyResourceGroup(_ context.Context, req *rmpb.PutResourceGroupRequest) (*rmpb.PutResourceGroupResponse, error)
- func (s *Service) RegisterGRPCService(g *grpc.Server)
- func (s *Service) RegisterRESTHandler(userDefineHandlers map[string]http.Handler) error
- type TokenSlot
Constants ¶
This section is empty.
Variables ¶
var SetUpRestHandler = func(*Service) (http.Handler, apiutil.APIServiceGroup) { return dummyRestService{}, apiutil.APIServiceGroup{} }
SetUpRestHandler is a hook to sets up the REST service.
Functions ¶
func CreateServerWrapper ¶
CreateServerWrapper encapsulates the configuration/log/metrics initialization and create the server
func NewService ¶
func NewService[T ConfigProvider](svr bs.Server) registry.RegistrableService
NewService creates a new resource manager service.
Types ¶
type Config ¶
type Config struct {
BackendEndpoints string `toml:"backend-endpoints" json:"backend-endpoints"`
ListenAddr string `toml:"listen-addr" json:"listen-addr"`
AdvertiseListenAddr string `toml:"advertise-listen-addr" json:"advertise-listen-addr"`
Name string `toml:"name" json:"name"`
DataDir string `toml:"data-dir" json:"data-dir"` // TODO: remove this after refactoring
EnableGRPCGateway bool `json:"enable-grpc-gateway"` // TODO: use it
Metric metricutil.MetricConfig `toml:"metric" json:"metric"`
// Log related config.
Log log.Config `toml:"log" json:"log"`
Logger *zap.Logger
LogProps *log.ZapProperties
Security configutil.SecurityConfig `toml:"security" json:"security"`
// WarningMsgs contains all warnings during parsing.
WarningMsgs []string
// LeaderLease defines the time within which a Resource Manager primary/leader must
// update its TTL in etcd, otherwise etcd will expire the leader key and other servers
// can campaign the primary/leader again. Etcd only supports seconds TTL, so here is
// second too.
LeaderLease int64 `toml:"lease" json:"lease"`
Controller ControllerConfig `toml:"controller" json:"controller"`
}
Config is the configuration for the resource manager.
func GenerateConfig ¶
GenerateConfig generates a new config with the given options.
func (*Config) GeBackendEndpoints ¶
GeBackendEndpoints returns the BackendEndpoints
func (*Config) GetAdvertiseListenAddr ¶
GetAdvertiseListenAddr returns the AdvertiseListenAddr
func (*Config) GetListenAddr ¶
GetListenAddr returns the ListenAddr
func (*Config) GetTLSConfig ¶
GetTLSConfig returns the TLS config.
type ConfigProvider ¶
type ConfigProvider interface {
GetControllerConfig() *ControllerConfig
}
ConfigProvider is used to get resource manager config from the given `bs.server` without modifying its interface.
type ControllerConfig ¶
type ControllerConfig struct {
// EnableDegradedMode is to control whether resource control client enable degraded mode when server is disconnect.
DegradedModeWaitDuration typeutil.Duration `toml:"degraded-mode-wait-duration" json:"degraded-mode-wait-duration"`
// LTBMaxWaitDuration is the max wait time duration for local token bucket.
LTBMaxWaitDuration typeutil.Duration `toml:"ltb-max-wait-duration" json:"ltb-max-wait-duration"`
// LTBTokenRPCMaxDelay is the upper bound of backoff delay for local token bucket RPC.
LTBTokenRPCMaxDelay typeutil.Duration `toml:"ltb-token-rpc-max-delay" json:"ltb-token-rpc-max-delay"`
// RequestUnit is the configuration determines the coefficients of the RRU and WRU cost.
// This configuration should be modified carefully.
RequestUnit RequestUnitConfig `toml:"request-unit" json:"request-unit"`
// EnableControllerTraceLog is to control whether resource control client enable trace.
EnableControllerTraceLog bool `toml:"enable-controller-trace-log" json:"enable-controller-trace-log,string"`
}
ControllerConfig is the configuration of the resource manager controller which includes some option for client needed.
func (*ControllerConfig) Adjust ¶
func (rmc *ControllerConfig) Adjust(meta *configutil.ConfigMetaData)
Adjust adjusts the configuration and initializes it with the default value if necessary.
type GroupStates ¶
type GroupStates struct {
// RU tokens
RU *GroupTokenBucketState `json:"r_u,omitempty"`
// RU consumption
RUConsumption *rmpb.Consumption `json:"ru_consumption,omitempty"`
// raw resource tokens
CPU *GroupTokenBucketState `json:"cpu,omitempty"`
IORead *GroupTokenBucketState `json:"io_read,omitempty"`
IOWrite *GroupTokenBucketState `json:"io_write,omitempty"`
}
GroupStates is the tokens set of a resource group.
type GroupTokenBucket ¶
type GroupTokenBucket struct {
// Settings is the setting of TokenBucket.
// BurstLimit is used as below:
// - If b == 0, that means the limiter is unlimited capacity. default use in resource controller (burst with a rate within an unlimited capacity).
// - If b < 0, that means the limiter is unlimited capacity and fillrate(r) is ignored, can be seen as r == Inf (burst within an unlimited capacity).
// - If b > 0, that means the limiter is limited capacity.
// MaxTokens limits the number of tokens that can be accumulated
Settings *rmpb.TokenLimitSettings `json:"settings,omitempty"`
GroupTokenBucketState `json:"state,omitempty"`
}
GroupTokenBucket is a token bucket for a resource group. Now we don't save consumption in `GroupTokenBucket`, only statistics it in prometheus.
func NewGroupTokenBucket ¶
func NewGroupTokenBucket(resourceGroupName string, tokenBucket *rmpb.TokenBucket) *GroupTokenBucket
NewGroupTokenBucket returns a new GroupTokenBucket
func (*GroupTokenBucket) Clone ¶
func (gtb *GroupTokenBucket) Clone() *GroupTokenBucket
Clone returns the deep copy of GroupTokenBucket
func (*GroupTokenBucket) GetTokenBucket ¶
func (gtb *GroupTokenBucket) GetTokenBucket() *rmpb.TokenBucket
GetTokenBucket returns the grpc protoc struct of GroupTokenBucket.
type GroupTokenBucketState ¶
type GroupTokenBucketState struct {
Tokens float64 `json:"tokens,omitempty"`
LastUpdate *time.Time `json:"last_update,omitempty"`
Initialized bool `json:"initialized"`
// contains filtered or unexported fields
}
GroupTokenBucketState is the running state of TokenBucket.
func (*GroupTokenBucketState) Clone ¶
func (gts *GroupTokenBucketState) Clone() *GroupTokenBucketState
Clone returns the copy of GroupTokenBucketState
type Manager ¶
Manager is the manager of resource group.
func NewManager ¶
func NewManager[T ConfigProvider](srv bs.Server) *Manager
NewManager returns a new manager base on the given server, which should implement the `ConfigProvider` interface.
func (*Manager) AddResourceGroup ¶
func (m *Manager) AddResourceGroup(grouppb *rmpb.ResourceGroup) error
AddResourceGroup puts a resource group. NOTE: AddResourceGroup should also be idempotent because tidb depends on this retry mechanism.
func (*Manager) DeleteResourceGroup ¶
DeleteResourceGroup deletes a resource group.
func (*Manager) GetBasicServer ¶
GetBasicServer returns the basic server.
func (*Manager) GetControllerConfig ¶
func (m *Manager) GetControllerConfig() *ControllerConfig
GetControllerConfig returns the controller config.
func (*Manager) GetMutableResourceGroup ¶
func (m *Manager) GetMutableResourceGroup(name string) *ResourceGroup
GetMutableResourceGroup returns a mutable resource group.
func (*Manager) GetResourceGroup ¶
func (m *Manager) GetResourceGroup(name string, withStats bool) *ResourceGroup
GetResourceGroup returns a copy of a resource group.
func (*Manager) GetResourceGroupList ¶
func (m *Manager) GetResourceGroupList(withStats bool) []*ResourceGroup
GetResourceGroupList returns copies of resource group list.
func (*Manager) ModifyResourceGroup ¶
func (m *Manager) ModifyResourceGroup(group *rmpb.ResourceGroup) error
ModifyResourceGroup modifies an existing resource group.
type RequestUnitConfig ¶
type RequestUnitConfig struct {
// ReadBaseCost is the base cost for a read request. No matter how many bytes read/written or
// the CPU times taken for a request, this cost is inevitable.
ReadBaseCost float64 `toml:"read-base-cost" json:"read-base-cost"`
// ReadPerBatchBaseCost is the base cost for a read request with batch.
ReadPerBatchBaseCost float64 `toml:"read-per-batch-base-cost" json:"read-per-batch-base-cost"`
// ReadCostPerByte is the cost for each byte read. It's 1 RU = 64 KiB by default.
ReadCostPerByte float64 `toml:"read-cost-per-byte" json:"read-cost-per-byte"`
// WriteBaseCost is the base cost for a write request. No matter how many bytes read/written or
// the CPU times taken for a request, this cost is inevitable.
WriteBaseCost float64 `toml:"write-base-cost" json:"write-base-cost"`
// WritePerBatchBaseCost is the base cost for a write request with batch.
WritePerBatchBaseCost float64 `toml:"write-per-batch-base-cost" json:"write-per-batch-base-cost"`
// WriteCostPerByte is the cost for each byte written. It's 1 RU = 1 KiB by default.
WriteCostPerByte float64 `toml:"write-cost-per-byte" json:"write-cost-per-byte"`
// CPUMsCost is the cost for each millisecond of CPU time taken.
// It's 1 RU = 3 millisecond by default.
CPUMsCost float64 `toml:"read-cpu-ms-cost" json:"read-cpu-ms-cost"`
}
RequestUnitConfig is the configuration of the request units, which determines the coefficients of the RRU and WRU cost. This configuration should be modified carefully.
func (*RequestUnitConfig) Adjust ¶
func (ruc *RequestUnitConfig) Adjust(meta *configutil.ConfigMetaData)
Adjust adjusts the configuration and initializes it with the default value if necessary.
type RequestUnitSettings ¶
type RequestUnitSettings struct {
RU *GroupTokenBucket `json:"r_u,omitempty"`
}
RequestUnitSettings is the definition of the RU settings.
func NewRequestUnitSettings ¶
func NewRequestUnitSettings(resourceGroupName string, tokenBucket *rmpb.TokenBucket) *RequestUnitSettings
NewRequestUnitSettings creates a new RequestUnitSettings with the given token bucket.
func (*RequestUnitSettings) Clone ¶
func (rus *RequestUnitSettings) Clone() *RequestUnitSettings
Clone returns a deep copy of the RequestUnitSettings.
type ResourceGroup ¶
type ResourceGroup struct {
syncutil.RWMutex
Name string `json:"name"`
Mode rmpb.GroupMode `json:"mode"`
// RU settings
RUSettings *RequestUnitSettings `json:"r_u_settings,omitempty"`
Priority uint32 `json:"priority"`
Runaway *rmpb.RunawaySettings `json:"runaway_settings,omitempty"`
Background *rmpb.BackgroundSettings `json:"background_settings,omitempty"`
// total ru consumption
RUConsumption *rmpb.Consumption `json:"ru_consumption,omitempty"`
}
ResourceGroup is the definition of a resource group, for REST API.
func FromProtoResourceGroup ¶
func FromProtoResourceGroup(group *rmpb.ResourceGroup) *ResourceGroup
FromProtoResourceGroup converts a rmpb.ResourceGroup to a ResourceGroup.
func (*ResourceGroup) Clone ¶
func (rg *ResourceGroup) Clone(withStats bool) *ResourceGroup
Clone copies the resource group.
func (*ResourceGroup) GetGroupStates ¶
func (rg *ResourceGroup) GetGroupStates() *GroupStates
GetGroupStates get the token set of ResourceGroup.
func (*ResourceGroup) IntoProtoResourceGroup ¶
func (rg *ResourceGroup) IntoProtoResourceGroup() *rmpb.ResourceGroup
IntoProtoResourceGroup converts a ResourceGroup to a rmpb.ResourceGroup.
func (*ResourceGroup) PatchSettings ¶
func (rg *ResourceGroup) PatchSettings(metaGroup *rmpb.ResourceGroup) error
PatchSettings patches the resource group settings. Only used to patch the resource group when updating. Note: the tokens is the delta value to patch.
func (*ResourceGroup) RequestRU ¶
func (rg *ResourceGroup) RequestRU( now time.Time, requiredToken float64, targetPeriodMs, clientUniqueID uint64, ) *rmpb.GrantedRUTokenBucket
RequestRU requests the RU of the resource group.
func (*ResourceGroup) SetStatesIntoResourceGroup ¶
func (rg *ResourceGroup) SetStatesIntoResourceGroup(states *GroupStates)
SetStatesIntoResourceGroup updates the state of resource group.
func (*ResourceGroup) String ¶
func (rg *ResourceGroup) String() string
func (*ResourceGroup) UpdateRUConsumption ¶
func (rg *ResourceGroup) UpdateRUConsumption(c *rmpb.Consumption)
UpdateRUConsumption add delta consumption data to group ru statistics.
type Server ¶
type Server struct {
*server.BaseServer
diagnosticspb.DiagnosticsServer
// contains filtered or unexported fields
}
Server is the resource manager server, and it implements bs.Server.
func CreateServer ¶
CreateServer creates the Server
func NewTestServer ¶
func NewTestServer(ctx context.Context, re *require.Assertions, cfg *Config) (*Server, testutil.CleanupFunc, error)
NewTestServer creates a resource manager server for testing.
func (*Server) AddServiceReadyCallback ¶
AddServiceReadyCallback adds callbacks when the server becomes the leader, if there is embedded etcd, or the primary otherwise.
func (*Server) GetAdvertiseListenAddr ¶
GetAdvertiseListenAddr returns the advertise address of the server.
func (*Server) GetBackendEndpoints ¶
GetBackendEndpoints returns the backend endpoints.
func (*Server) GetControllerConfig ¶
func (s *Server) GetControllerConfig() *ControllerConfig
GetControllerConfig returns the controller config.
func (*Server) GetLeaderListenUrls ¶
GetLeaderListenUrls gets service endpoints from the leader in election group.
func (*Server) GetTLSConfig ¶
GetTLSConfig gets the security config.
func (*Server) IsServing ¶
IsServing returns whether the server is the leader, if there is embedded etcd, or the primary otherwise.
func (*Server) RegisterGRPCService ¶
RegisterGRPCService registers the grpc service.
func (*Server) ServerLoopWgAdd ¶
ServerLoopWgAdd increases the server loop wait group.
func (*Server) ServerLoopWgDone ¶
func (s *Server) ServerLoopWgDone()
ServerLoopWgDone decreases the server loop wait group.
func (*Server) SetLogLevel ¶
SetLogLevel sets log level.
func (*Server) SetUpRestHandler ¶
func (s *Server) SetUpRestHandler() (http.Handler, apiutil.APIServiceGroup)
SetUpRestHandler sets up the REST handler.
type Service ¶
type Service struct {
*Server
// contains filtered or unexported fields
}
Service is the gRPC service for resource manager.
func (*Service) AcquireTokenBuckets ¶
func (s *Service) AcquireTokenBuckets(stream rmpb.ResourceManager_AcquireTokenBucketsServer) error
AcquireTokenBuckets implements ResourceManagerServer.AcquireTokenBuckets.
func (*Service) AddResourceGroup ¶
func (s *Service) AddResourceGroup(_ context.Context, req *rmpb.PutResourceGroupRequest) (*rmpb.PutResourceGroupResponse, error)
AddResourceGroup implements ResourceManagerServer.AddResourceGroup.
func (*Service) DeleteResourceGroup ¶
func (s *Service) DeleteResourceGroup(_ context.Context, req *rmpb.DeleteResourceGroupRequest) (*rmpb.DeleteResourceGroupResponse, error)
DeleteResourceGroup implements ResourceManagerServer.DeleteResourceGroup.
func (*Service) GetManager ¶
GetManager returns the resource manager.
func (*Service) GetResourceGroup ¶
func (s *Service) GetResourceGroup(_ context.Context, req *rmpb.GetResourceGroupRequest) (*rmpb.GetResourceGroupResponse, error)
GetResourceGroup implements ResourceManagerServer.GetResourceGroup.
func (*Service) ListResourceGroups ¶
func (s *Service) ListResourceGroups(_ context.Context, req *rmpb.ListResourceGroupsRequest) (*rmpb.ListResourceGroupsResponse, error)
ListResourceGroups implements ResourceManagerServer.ListResourceGroups.
func (*Service) ModifyResourceGroup ¶
func (s *Service) ModifyResourceGroup(_ context.Context, req *rmpb.PutResourceGroupRequest) (*rmpb.PutResourceGroupResponse, error)
ModifyResourceGroup implements ResourceManagerServer.ModifyResourceGroup.
func (*Service) RegisterGRPCService ¶
RegisterGRPCService registers the service to gRPC server.