Documentation
¶
Index ¶
- Constants
- Variables
- func DataStoreFactoryLifetimeHooks(lc fx.Lifecycle, f persistence.DataStoreFactory)
- func DataStoreFactoryProvider(clusterName ClusterName, r resolver.ServiceResolver, cfg *config.Persistence, ...) persistence.DataStoreFactory
- func EventBlobCacheProvider(dc *dynamicconfig.Collection, logger log.Logger, ...) persistence.XDCCache
- func HealthSignalAggregatorProvider(dynamicCollection *dynamicconfig.Collection, metricsHandler metrics.Handler, ...) persistence.HealthSignalAggregator
- func IsNamespaceQueueTransientError(err error) bool
- func IsPersistenceTransientError(err error) bool
- func NewPriorityNamespaceRateLimiter(hostMaxQPS PersistenceMaxQps, namespaceMaxQPS PersistenceNamespaceMaxQps, ...) quotas.RequestRateLimiter
- func NewPriorityNamespaceShardRateLimiter(hostMaxQPS PersistenceMaxQps, ...) quotas.RequestRateLimiter
- func NewPriorityRateLimiter(hostMaxQPS PersistenceMaxQps, requestPriorityFn quotas.RequestPriorityFn, ...) quotas.RequestRateLimiter
- func RequestPriorityFn(req quotas.Request) int
- type AbstractDataStoreFactory
- type ClusterName
- type DynamicRateLimitingParams
- type EnableBestEffortDeleteTasksOnWorkflowUpdate
- type EnableDataLossMetrics
- type Factory
- type FactoryProviderFn
- type HealthRequestRateLimiterImpl
- type NewFactoryParams
- type OperatorRPSRatio
- type PersistenceBurstRatio
- type PersistenceMaxQps
- type PersistenceNamespaceMaxQps
- type PersistencePerShardNamespaceMaxQPS
Constants ¶
const ( DefaultRefreshInterval = 10 * time.Second DefaultRateBurstRatio = 1.0 DefaultInitialRateMultiplier = 1.0 )
Variables ¶
var ( CallerTypeDefaultPriority = map[string]int{ headers.CallerTypeOperator: 0, headers.CallerTypeAPI: 2, headers.CallerTypeBackgroundHigh: 4, headers.CallerTypeBackgroundLow: 5, headers.CallerTypePreemptable: 6, } APITypeCallOriginPriorityOverride = map[string]int{ "StartWorkflowExecution": 1, "SignalWithStartWorkflowExecution": 1, "SignalWorkflowExecution": 1, "RequestCancelWorkflowExecution": 1, "TerminateWorkflowExecution": 1, "GetWorkflowExecutionHistory": 1, "UpdateWorkflowExecution": 1, } BackgroundTypeAPIPriorityOverride = map[string]int{ "GetOrCreateShard": 1, "UpdateShard": 1, p.ConstructHistoryTaskAPI("RangeCompleteHistoryTasks", tasks.CategoryTransfer): 1, p.ConstructHistoryTaskAPI("RangeCompleteHistoryTasks", tasks.CategoryTimer): 1, p.ConstructHistoryTaskAPI("RangeCompleteHistoryTasks", tasks.CategoryVisibility): 1, p.ConstructHistoryTaskAPI("GetHistoryTasks", tasks.CategoryTransfer): 3, p.ConstructHistoryTaskAPI("GetHistoryTasks", tasks.CategoryTimer): 3, p.ConstructHistoryTaskAPI("GetHistoryTasks", tasks.CategoryVisibility): 3, } RequestPrioritiesOrdered = []int{0, 1, 2, 3, 4, 5, 6} )
var Module = fx.Options( fx.Provide(DataStoreFactoryProvider), fx.Invoke(DataStoreFactoryLifetimeHooks), fx.Provide(managerProvider(Factory.NewClusterMetadataManager)), fx.Provide(managerProvider(Factory.NewMetadataManager)), fx.Provide(managerProvider(Factory.NewTaskManager)), fx.Provide(managerProvider(Factory.NewFairTaskManager)), fx.Provide(managerProvider(Factory.NewNamespaceReplicationQueue)), fx.Provide(managerProvider(Factory.NewShardManager)), fx.Provide(managerProvider(Factory.NewExecutionManager)), fx.Provide(managerProvider(Factory.NewHistoryTaskQueueManager)), fx.Provide(managerProvider(Factory.NewNexusEndpointManager)), fx.Provide(ClusterNameProvider), fx.Provide(HealthSignalAggregatorProvider), fx.Provide(persistence.NewDLQMetricsEmitter), fx.Provide(EventBlobCacheProvider), fx.Provide(EnableDataLossMetricsProvider), fx.Provide(EnableBestEffortDeleteTasksOnWorkflowUpdateProvider), )
Functions ¶
func DataStoreFactoryLifetimeHooks ¶
func DataStoreFactoryLifetimeHooks(lc fx.Lifecycle, f persistence.DataStoreFactory)
func DataStoreFactoryProvider ¶
func DataStoreFactoryProvider( clusterName ClusterName, r resolver.ServiceResolver, cfg *config.Persistence, abstractDataStoreFactory AbstractDataStoreFactory, logger log.Logger, metricsHandler metrics.Handler, tracerProvider trace.TracerProvider, serializer serialization.Serializer, ) persistence.DataStoreFactory
func EventBlobCacheProvider ¶
func EventBlobCacheProvider( dc *dynamicconfig.Collection, logger log.Logger, serializer serialization.Serializer, ) persistence.XDCCache
func HealthSignalAggregatorProvider ¶
func HealthSignalAggregatorProvider( dynamicCollection *dynamicconfig.Collection, metricsHandler metrics.Handler, logger log.ThrottledLogger, ) persistence.HealthSignalAggregator
func NewPriorityNamespaceRateLimiter ¶
func NewPriorityNamespaceRateLimiter( hostMaxQPS PersistenceMaxQps, namespaceMaxQPS PersistenceNamespaceMaxQps, requestPriorityFn quotas.RequestPriorityFn, operatorRPSRatio OperatorRPSRatio, burstRatio PersistenceBurstRatio, ) quotas.RequestRateLimiter
func NewPriorityNamespaceShardRateLimiter ¶
func NewPriorityNamespaceShardRateLimiter( hostMaxQPS PersistenceMaxQps, perShardNamespaceMaxQPS PersistencePerShardNamespaceMaxQPS, requestPriorityFn quotas.RequestPriorityFn, operatorRPSRatio OperatorRPSRatio, burstRatio PersistenceBurstRatio, ) quotas.RequestRateLimiter
func NewPriorityRateLimiter ¶
func NewPriorityRateLimiter( hostMaxQPS PersistenceMaxQps, requestPriorityFn quotas.RequestPriorityFn, operatorRPSRatio OperatorRPSRatio, burstRatio PersistenceBurstRatio, healthSignals p.HealthSignalAggregator, dynamicParams DynamicRateLimitingParams, metricsHandler metrics.Handler, logger log.Logger, ) quotas.RequestRateLimiter
func RequestPriorityFn ¶
Types ¶
type AbstractDataStoreFactory ¶
type AbstractDataStoreFactory interface {
NewFactory(
cfg config.CustomDatastoreConfig,
r resolver.ServiceResolver,
clusterName string,
logger log.Logger,
metricsHandler metrics.Handler,
serializer serialization.Serializer,
) persistence.DataStoreFactory
}
AbstractDataStoreFactory creates a DataStoreFactory, can be used to implement custom datastore support outside of the Temporal core.
type ClusterName ¶
type ClusterName string
func ClusterNameProvider ¶
func ClusterNameProvider(config *cluster.Config) ClusterName
type DynamicRateLimitingParams ¶
type DynamicRateLimitingParams dynamicconfig.TypedPropertyFn[dynamicconfig.DynamicRateLimitingParams]
type EnableBestEffortDeleteTasksOnWorkflowUpdate ¶
type EnableBestEffortDeleteTasksOnWorkflowUpdate dynamicconfig.BoolPropertyFn
func EnableBestEffortDeleteTasksOnWorkflowUpdateProvider ¶
func EnableBestEffortDeleteTasksOnWorkflowUpdateProvider( dc *dynamicconfig.Collection, ) EnableBestEffortDeleteTasksOnWorkflowUpdate
type EnableDataLossMetrics ¶
type EnableDataLossMetrics dynamicconfig.BoolPropertyFn
func EnableDataLossMetricsProvider ¶
func EnableDataLossMetricsProvider( dc *dynamicconfig.Collection, ) EnableDataLossMetrics
type Factory ¶
type Factory interface {
// Close the factory
Close()
// NewTaskManager returns a new task manager
NewTaskManager() (persistence.TaskManager, error)
// NewFairTaskManager returns a new fair task manager
NewFairTaskManager() (persistence.FairTaskManager, error)
// NewShardManager returns a new shard manager
NewShardManager() (persistence.ShardManager, error)
// NewMetadataManager returns a new metadata manager
NewMetadataManager() (persistence.MetadataManager, error)
// NewExecutionManager returns a new execution manager
NewExecutionManager() (persistence.ExecutionManager, error)
// NewNamespaceReplicationQueue returns a new queue for namespace replication
NewNamespaceReplicationQueue() (persistence.NamespaceReplicationQueue, error)
// NewClusterMetadataManager returns a new manager for cluster specific metadata
NewClusterMetadataManager() (persistence.ClusterMetadataManager, error)
// NewHistoryTaskQueueManager returns a new manager for history task queues
NewHistoryTaskQueueManager() (persistence.HistoryTaskQueueManager, error)
// NewNexusEndpointManager returns a new manager for nexus endpoints
NewNexusEndpointManager() (persistence.NexusEndpointManager, error)
}
Factory defines the interface for any implementation that can vend persistence layer objects backed by a datastore. The actual datastore is implementation detail hidden behind this interface
func FactoryProvider ¶
func FactoryProvider( params NewFactoryParams, ) Factory
func NewFactory ¶
func NewFactory( dataStoreFactory persistence.DataStoreFactory, cfg *config.Persistence, systemRateLimiter quotas.RequestRateLimiter, namespaceRateLimiter quotas.RequestRateLimiter, shardRateLimiter quotas.RequestRateLimiter, serializer serialization.Serializer, eventBlobCache persistence.XDCCache, clusterName string, metricsHandler metrics.Handler, logger log.Logger, healthSignals persistence.HealthSignalAggregator, enableDataLossMetrics EnableDataLossMetrics, enableBestEffortDeleteTasksOnWorkflowUpdate EnableBestEffortDeleteTasksOnWorkflowUpdate, ) Factory
NewFactory returns an implementation of factory that vends persistence objects based on specified configuration. This factory takes as input a config.Persistence object which specifies the datastore to be used for a given type of object. This config also contains config for individual datastores themselves.
The objects returned by this factory enforce ratelimit and maxconns according to given configuration. In addition, all objects will emit metrics automatically
type FactoryProviderFn ¶
type FactoryProviderFn func(NewFactoryParams) Factory
type HealthRequestRateLimiterImpl ¶
type HealthRequestRateLimiterImpl struct {
// contains filtered or unexported fields
}
func NewHealthRequestRateLimiterImpl ¶
func NewHealthRequestRateLimiterImpl( healthSignals persistence.HealthSignalAggregator, rateFn quotas.RateFn, params DynamicRateLimitingParams, burstRatio PersistenceBurstRatio, metricsHandler metrics.Handler, logger log.Logger, ) *HealthRequestRateLimiterImpl
func (*HealthRequestRateLimiterImpl) Reserve ¶
func (rl *HealthRequestRateLimiterImpl) Reserve(now time.Time, request quotas.Request) quotas.Reservation
type NewFactoryParams ¶
type NewFactoryParams struct {
fx.In
DataStoreFactory persistence.DataStoreFactory
EventBlobCache persistence.XDCCache
Cfg *config.Persistence
PersistenceMaxQPS PersistenceMaxQps
PersistenceNamespaceMaxQPS PersistenceNamespaceMaxQps
PersistencePerShardNamespaceMaxQPS PersistencePerShardNamespaceMaxQPS
OperatorRPSRatio OperatorRPSRatio
PersistenceBurstRatio PersistenceBurstRatio
ClusterName ClusterName
ServiceName primitives.ServiceName
MetricsHandler metrics.Handler
Logger log.Logger
HealthSignals persistence.HealthSignalAggregator
DynamicRateLimitingParams DynamicRateLimitingParams
EnableDataLossMetrics EnableDataLossMetrics
EnableBestEffortDeleteTasksOnWorkflowUpdate EnableBestEffortDeleteTasksOnWorkflowUpdate
Serializer serialization.Serializer
}
type OperatorRPSRatio ¶
type OperatorRPSRatio dynamicconfig.FloatPropertyFn
type PersistenceBurstRatio ¶
type PersistenceBurstRatio dynamicconfig.FloatPropertyFn
type PersistenceMaxQps ¶
type PersistenceMaxQps dynamicconfig.IntPropertyFn
type PersistenceNamespaceMaxQps ¶
type PersistenceNamespaceMaxQps dynamicconfig.IntPropertyFnWithNamespaceFilter
type PersistencePerShardNamespaceMaxQPS ¶
type PersistencePerShardNamespaceMaxQPS dynamicconfig.IntPropertyFnWithNamespaceFilter