client

package
v1.32.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 22, 2026 License: MIT Imports: 28 Imported by: 0

Documentation

Index

Constants

View Source
const (
	DefaultRefreshInterval       = 10 * time.Second
	DefaultRateBurstRatio        = 1.0
	DefaultInitialRateMultiplier = 1.0
)

Variables

View Source
var (
	CallerTypeDefaultPriority = map[string]int{
		headers.CallerTypeOperator:       0,
		headers.CallerTypeAPI:            2,
		headers.CallerTypeBackgroundHigh: 4,
		headers.CallerTypeBackgroundLow:  5,
		headers.CallerTypePreemptable:    6,
	}

	APITypeCallOriginPriorityOverride = map[string]int{
		"StartWorkflowExecution":           1,
		"SignalWithStartWorkflowExecution": 1,
		"SignalWorkflowExecution":          1,
		"RequestCancelWorkflowExecution":   1,
		"TerminateWorkflowExecution":       1,
		"GetWorkflowExecutionHistory":      1,
		"UpdateWorkflowExecution":          1,
	}

	BackgroundTypeAPIPriorityOverride = map[string]int{
		"GetOrCreateShard": 1,
		"UpdateShard":      1,

		p.ConstructHistoryTaskAPI("RangeCompleteHistoryTasks", tasks.CategoryTransfer):   1,
		p.ConstructHistoryTaskAPI("RangeCompleteHistoryTasks", tasks.CategoryTimer):      1,
		p.ConstructHistoryTaskAPI("RangeCompleteHistoryTasks", tasks.CategoryVisibility): 1,

		p.ConstructHistoryTaskAPI("GetHistoryTasks", tasks.CategoryTransfer):   3,
		p.ConstructHistoryTaskAPI("GetHistoryTasks", tasks.CategoryTimer):      3,
		p.ConstructHistoryTaskAPI("GetHistoryTasks", tasks.CategoryVisibility): 3,
	}

	RequestPrioritiesOrdered = []int{0, 1, 2, 3, 4, 5, 6}
)
View Source
var Module = fx.Options(
	fx.Provide(DataStoreFactoryProvider),
	fx.Invoke(DataStoreFactoryLifetimeHooks),
	fx.Provide(managerProvider(Factory.NewClusterMetadataManager)),
	fx.Provide(managerProvider(Factory.NewMetadataManager)),
	fx.Provide(managerProvider(Factory.NewTaskManager)),
	fx.Provide(managerProvider(Factory.NewFairTaskManager)),
	fx.Provide(managerProvider(Factory.NewNamespaceReplicationQueue)),
	fx.Provide(managerProvider(Factory.NewShardManager)),
	fx.Provide(managerProvider(Factory.NewExecutionManager)),
	fx.Provide(managerProvider(Factory.NewHistoryTaskQueueManager)),
	fx.Provide(managerProvider(Factory.NewNexusEndpointManager)),

	fx.Provide(ClusterNameProvider),
	fx.Provide(HealthSignalAggregatorProvider),
	fx.Provide(persistence.NewDLQMetricsEmitter),
	fx.Provide(EventBlobCacheProvider),
	fx.Provide(EnableDataLossMetricsProvider),
	fx.Provide(EnableBestEffortDeleteTasksOnWorkflowUpdateProvider),
)

Functions

func DataStoreFactoryLifetimeHooks

func DataStoreFactoryLifetimeHooks(lc fx.Lifecycle, f persistence.DataStoreFactory)

func DataStoreFactoryProvider

func DataStoreFactoryProvider(
	clusterName ClusterName,
	r resolver.ServiceResolver,
	cfg *config.Persistence,
	abstractDataStoreFactory AbstractDataStoreFactory,
	logger log.Logger,
	metricsHandler metrics.Handler,
	tracerProvider trace.TracerProvider,
	serializer serialization.Serializer,
) persistence.DataStoreFactory

func EventBlobCacheProvider

func EventBlobCacheProvider(
	dc *dynamicconfig.Collection,
	logger log.Logger,
	serializer serialization.Serializer,
) persistence.XDCCache

func HealthSignalAggregatorProvider

func HealthSignalAggregatorProvider(
	dynamicCollection *dynamicconfig.Collection,
	metricsHandler metrics.Handler,
	logger log.ThrottledLogger,
) persistence.HealthSignalAggregator

func IsNamespaceQueueTransientError

func IsNamespaceQueueTransientError(err error) bool

func IsPersistenceTransientError

func IsPersistenceTransientError(err error) bool

func NewPriorityNamespaceRateLimiter

func NewPriorityNamespaceRateLimiter(
	hostMaxQPS PersistenceMaxQps,
	namespaceMaxQPS PersistenceNamespaceMaxQps,
	requestPriorityFn quotas.RequestPriorityFn,
	operatorRPSRatio OperatorRPSRatio,
	burstRatio PersistenceBurstRatio,
) quotas.RequestRateLimiter

func NewPriorityNamespaceShardRateLimiter

func NewPriorityNamespaceShardRateLimiter(
	hostMaxQPS PersistenceMaxQps,
	perShardNamespaceMaxQPS PersistencePerShardNamespaceMaxQPS,
	requestPriorityFn quotas.RequestPriorityFn,
	operatorRPSRatio OperatorRPSRatio,
	burstRatio PersistenceBurstRatio,
) quotas.RequestRateLimiter

func NewPriorityRateLimiter

func NewPriorityRateLimiter(
	hostMaxQPS PersistenceMaxQps,
	requestPriorityFn quotas.RequestPriorityFn,
	operatorRPSRatio OperatorRPSRatio,
	burstRatio PersistenceBurstRatio,
	healthSignals p.HealthSignalAggregator,
	dynamicParams DynamicRateLimitingParams,
	metricsHandler metrics.Handler,
	logger log.Logger,
) quotas.RequestRateLimiter

func RequestPriorityFn

func RequestPriorityFn(req quotas.Request) int

Types

type AbstractDataStoreFactory

type AbstractDataStoreFactory interface {
	NewFactory(
		cfg config.CustomDatastoreConfig,
		r resolver.ServiceResolver,
		clusterName string,
		logger log.Logger,
		metricsHandler metrics.Handler,
		serializer serialization.Serializer,
	) persistence.DataStoreFactory
}

AbstractDataStoreFactory creates a DataStoreFactory, can be used to implement custom datastore support outside of the Temporal core.

type ClusterName

type ClusterName string

func ClusterNameProvider

func ClusterNameProvider(config *cluster.Config) ClusterName

type EnableBestEffortDeleteTasksOnWorkflowUpdate

type EnableBestEffortDeleteTasksOnWorkflowUpdate dynamicconfig.BoolPropertyFn

type EnableDataLossMetrics

type EnableDataLossMetrics dynamicconfig.BoolPropertyFn

func EnableDataLossMetricsProvider

func EnableDataLossMetricsProvider(
	dc *dynamicconfig.Collection,
) EnableDataLossMetrics

type Factory

type Factory interface {
	// Close the factory
	Close()
	// NewTaskManager returns a new task manager
	NewTaskManager() (persistence.TaskManager, error)
	// NewFairTaskManager returns a new fair task manager
	NewFairTaskManager() (persistence.FairTaskManager, error)
	// NewShardManager returns a new shard manager
	NewShardManager() (persistence.ShardManager, error)
	// NewMetadataManager returns a new metadata manager
	NewMetadataManager() (persistence.MetadataManager, error)
	// NewExecutionManager returns a new execution manager
	NewExecutionManager() (persistence.ExecutionManager, error)
	// NewNamespaceReplicationQueue returns a new queue for namespace replication
	NewNamespaceReplicationQueue() (persistence.NamespaceReplicationQueue, error)
	// NewClusterMetadataManager returns a new manager for cluster specific metadata
	NewClusterMetadataManager() (persistence.ClusterMetadataManager, error)
	// NewHistoryTaskQueueManager returns a new manager for history task queues
	NewHistoryTaskQueueManager() (persistence.HistoryTaskQueueManager, error)
	// NewNexusEndpointManager returns a new manager for nexus endpoints
	NewNexusEndpointManager() (persistence.NexusEndpointManager, error)
}

Factory defines the interface for any implementation that can vend persistence layer objects backed by a datastore. The actual datastore is implementation detail hidden behind this interface

func FactoryProvider

func FactoryProvider(
	params NewFactoryParams,
) Factory

func NewFactory

func NewFactory(
	dataStoreFactory persistence.DataStoreFactory,
	cfg *config.Persistence,
	systemRateLimiter quotas.RequestRateLimiter,
	namespaceRateLimiter quotas.RequestRateLimiter,
	shardRateLimiter quotas.RequestRateLimiter,
	serializer serialization.Serializer,
	eventBlobCache persistence.XDCCache,
	clusterName string,
	metricsHandler metrics.Handler,
	logger log.Logger,
	healthSignals persistence.HealthSignalAggregator,
	enableDataLossMetrics EnableDataLossMetrics,
	enableBestEffortDeleteTasksOnWorkflowUpdate EnableBestEffortDeleteTasksOnWorkflowUpdate,
) Factory

NewFactory returns an implementation of factory that vends persistence objects based on specified configuration. This factory takes as input a config.Persistence object which specifies the datastore to be used for a given type of object. This config also contains config for individual datastores themselves.

The objects returned by this factory enforce ratelimit and maxconns according to given configuration. In addition, all objects will emit metrics automatically

type FactoryProviderFn

type FactoryProviderFn func(NewFactoryParams) Factory

type HealthRequestRateLimiterImpl

type HealthRequestRateLimiterImpl struct {
	// contains filtered or unexported fields
}

func NewHealthRequestRateLimiterImpl

func NewHealthRequestRateLimiterImpl(
	healthSignals persistence.HealthSignalAggregator,
	rateFn quotas.RateFn,
	params DynamicRateLimitingParams,
	burstRatio PersistenceBurstRatio,
	metricsHandler metrics.Handler,
	logger log.Logger,
) *HealthRequestRateLimiterImpl

func (*HealthRequestRateLimiterImpl) Allow

func (rl *HealthRequestRateLimiterImpl) Allow(now time.Time, request quotas.Request) bool

func (*HealthRequestRateLimiterImpl) Reserve

func (*HealthRequestRateLimiterImpl) Wait

type NewFactoryParams

type NewFactoryParams struct {
	fx.In

	DataStoreFactory                            persistence.DataStoreFactory
	EventBlobCache                              persistence.XDCCache
	Cfg                                         *config.Persistence
	PersistenceMaxQPS                           PersistenceMaxQps
	PersistenceNamespaceMaxQPS                  PersistenceNamespaceMaxQps
	PersistencePerShardNamespaceMaxQPS          PersistencePerShardNamespaceMaxQPS
	OperatorRPSRatio                            OperatorRPSRatio
	PersistenceBurstRatio                       PersistenceBurstRatio
	ClusterName                                 ClusterName
	ServiceName                                 primitives.ServiceName
	MetricsHandler                              metrics.Handler
	Logger                                      log.Logger
	HealthSignals                               persistence.HealthSignalAggregator
	DynamicRateLimitingParams                   DynamicRateLimitingParams
	EnableDataLossMetrics                       EnableDataLossMetrics
	EnableBestEffortDeleteTasksOnWorkflowUpdate EnableBestEffortDeleteTasksOnWorkflowUpdate
	Serializer                                  serialization.Serializer
}

type OperatorRPSRatio

type OperatorRPSRatio dynamicconfig.FloatPropertyFn

type PersistenceBurstRatio

type PersistenceBurstRatio dynamicconfig.FloatPropertyFn

type PersistenceMaxQps

type PersistenceMaxQps dynamicconfig.IntPropertyFn

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL