Documentation
¶
Index ¶
- Variables
- func NewClusterMetadata(t *testing.T, options *TestClusterConfig) cluster.Metadata
- func NewPersistenceTestCluster(t *testing.T, clusterConfig *TestClusterConfig) testcluster.PersistenceTestCluster
- func NewSimpleResolver(serviceName string, hosts map[string][]membership.HostInfo, ...) membership.Resolver
- func PrettyPrintHistory(history *types.History, logger log.Logger)
- type AdminClient
- type ArchiverBase
- type AsyncWFIntegrationSuite
- type Cadence
- type CadenceParams
- type ClientIntegrationSuite
- type FrontendClient
- type HistoryClient
- type HistoryConfig
- type HistorySimulationConfig
- type IntegrationBase
- type IntegrationBaseParams
- type IntegrationQueueV2Suite
- type IntegrationSuite
- type MatchingClient
- type MatchingConfig
- type MatchingSimulationConfig
- type MessagingClientConfig
- type Service
- type SimulationBacklogConfiguration
- type SimulationPollerConfiguration
- type SimulationTaskConfiguration
- type SizeLimitIntegrationSuite
- type TaskListIsolationIntegrationSuite
- type TaskPoller
- func (p *TaskPoller) HandlePartialDecision(response *types.PollForDecisionTaskResponse) (*types.RespondDecisionTaskCompletedResponse, error)
- func (p *TaskPoller) PollAndProcessActivityTask(dropTask bool) error
- func (p *TaskPoller) PollAndProcessActivityTaskWithID(dropTask bool) error
- func (p *TaskPoller) PollAndProcessDecisionTask(dumpHistory bool, dropTask bool) (isQueryTask bool, err error)
- func (p *TaskPoller) PollAndProcessDecisionTaskWithAttempt(dumpHistory bool, dropTask bool, pollStickyTaskList bool, ...) (isQueryTask bool, err error)
- func (p *TaskPoller) PollAndProcessDecisionTaskWithAttemptAndRetry(dumpHistory bool, dropTask bool, pollStickyTaskList bool, ...) (isQueryTask bool, err error)
- func (p *TaskPoller) PollAndProcessDecisionTaskWithAttemptAndRetryAndForceNewDecision(dumpHistory bool, dropTask bool, pollStickyTaskList bool, ...) (isQueryTask bool, newTask *types.RespondDecisionTaskCompletedResponse, ...)
- func (p *TaskPoller) PollAndProcessDecisionTaskWithSticky(dumpHistory bool, dropTask bool) (isQueryTask bool, err error)
- func (p *TaskPoller) PollAndProcessDecisionTaskWithoutRetry(dumpHistory bool, dropTask bool) (isQueryTask bool, err error)
- func (p *TaskPoller) PollAndProcessDecisions() context.CancelFunc
- type TasksProduceSpec
- type TestCluster
- func (tc *TestCluster) GetAdminClient() AdminClient
- func (tc *TestCluster) GetExecutionManagerFactory() persistence.ExecutionManagerFactory
- func (tc *TestCluster) GetFrontendClient() FrontendClient
- func (tc *TestCluster) GetHistoryClient() HistoryClient
- func (tc *TestCluster) GetMatchingClient() MatchingClient
- func (tc *TestCluster) GetMatchingClients() []MatchingClient
- func (tc *TestCluster) TearDownCluster()
- type TestClusterConfig
- type WorkerConfig
- type WorkflowIDInternalRateLimitIntegrationSuite
- type WorkflowIDRateLimitIntegrationSuite
Constants ¶
This section is empty.
Variables ¶
var TestFlags struct { FrontendAddr string PersistenceType string SQLPluginName string TestClusterConfigFile string }
TestFlags contains the feature flags for integration tests
Functions ¶
func NewClusterMetadata ¶ added in v0.20.0
func NewClusterMetadata(t *testing.T, options *TestClusterConfig) cluster.Metadata
NewClusterMetadata returns cluster metdata from config
func NewPersistenceTestCluster ¶ added in v0.20.0
func NewPersistenceTestCluster(t *testing.T, clusterConfig *TestClusterConfig) testcluster.PersistenceTestCluster
func NewSimpleResolver ¶ added in v0.24.0
func NewSimpleResolver(serviceName string, hosts map[string][]membership.HostInfo, currentHost membership.HostInfo) membership.Resolver
NewSimpleResolver returns a membership resolver interface
Types ¶
type AdminClient ¶ added in v0.5.0
AdminClient is the interface exposed by admin service client
func NewAdminClient ¶ added in v0.5.0
func NewAdminClient(d *yarpc.Dispatcher) AdminClient
NewAdminClient creates a client to cadence admin client
type ArchiverBase ¶ added in v0.7.0
type ArchiverBase struct {
// contains filtered or unexported fields
}
ArchiverBase is a base struct for archiver provider being used in integration tests
type AsyncWFIntegrationSuite ¶ added in v1.2.8
type AsyncWFIntegrationSuite struct {
*require.Assertions
*IntegrationBase
}
type Cadence ¶
type Cadence interface {
Start() error
Stop()
GetAdminClient() adminClient.Client
GetFrontendClient() frontendClient.Client
FrontendHost() membership.HostInfo
GetHistoryClient() historyClient.Client
GetMatchingClient() matchingClient.Client
GetMatchingClients() []matchingClient.Client
GetExecutionManagerFactory() persistence.ExecutionManagerFactory
}
Cadence hosts all of cadence services in one process
func NewCadence ¶
func NewCadence(params *CadenceParams) Cadence
NewCadence returns an instance that hosts full cadence in one process
type CadenceParams ¶ added in v0.5.7
type CadenceParams struct {
ClusterMetadata cluster.Metadata
PersistenceConfig config.Persistence
MessagingClient messaging.Client
DomainManager persistence.DomainManager
HistoryV2Mgr persistence.HistoryManager
ExecutionMgrFactory persistence.ExecutionManagerFactory
DomainReplicationQueue domain.ReplicationQueue
Logger log.Logger
ClusterNo int
ArchiverMetadata carchiver.ArchivalMetadata
ArchiverProvider provider.ArchiverProvider
EnableReadHistoryFromArchival bool
HistoryConfig *HistoryConfig
MatchingConfig *MatchingConfig
ESConfig *config.ElasticSearchConfig
ESClient elasticsearch.GenericClient
WorkerConfig *WorkerConfig
MockAdminClient map[string]adminClient.Client
DomainReplicationTaskExecutor domain.ReplicationTaskExecutor
AuthorizationConfig config.Authorization
PinotConfig *config.PinotVisibilityConfig
PinotClient pinot.GenericClient
AsyncWFQueues map[string]config.AsyncWorkflowQueueProvider
TimeSource clock.TimeSource
FrontendDynCfgOverrides map[dynamicproperties.Key]interface{}
HistoryDynCfgOverrides map[dynamicproperties.Key]interface{}
MatchingDynCfgOverrides map[dynamicproperties.Key]interface{}
WorkerDynCfgOverrides map[dynamicproperties.Key]interface{}
}
CadenceParams contains everything needed to bootstrap Cadence
type ClientIntegrationSuite ¶ added in v0.20.0
type ClientIntegrationSuite struct {
// override suite.Suite.Assertions with require.Assertions; this means that s.NotNil(nil) will stop the test,
// not merely log an error
*require.Assertions
*IntegrationBase
// contains filtered or unexported fields
}
type FrontendClient ¶ added in v0.5.0
FrontendClient is the interface exposed by frontend service client
func NewFrontendClient ¶ added in v0.5.0
func NewFrontendClient(d *yarpc.Dispatcher) FrontendClient
NewFrontendClient creates a client to cadence frontend client
type HistoryClient ¶ added in v0.9.3
HistoryClient is the interface exposed by history service client
func NewHistoryClient ¶ added in v0.9.3
func NewHistoryClient(d *yarpc.Dispatcher) HistoryClient
NewHistoryClient creates a client to cadence history service client
type HistoryConfig ¶ added in v0.5.7
type HistoryConfig struct {
// When MockClient is set, rest of the configs are ignored, history service is not started
// and mock history client is passed to other services
MockClient HistoryClient
NumHistoryShards int
NumHistoryHosts int
HistoryCountLimitError int
HistoryCountLimitWarn int
SimulationConfig HistorySimulationConfig
}
HistoryConfig contains configs for history service
type HistorySimulationConfig ¶ added in v1.3.1
type HistorySimulationConfig struct {
NumWorkflows int
// WorkflowDeletionJitterRange defines the duration in minutes for workflow close tasks jittering
// defaults to 0 to remove jittering
WorkflowDeletionJitterRange int
// EnableTransferQueueV2 enables queue v2 framework for transfer queue
EnableTransferQueueV2 bool
// EnableTimerQueueV2 enables queue v2 framework for timer queue
EnableTimerQueueV2 bool
}
type IntegrationBase ¶ added in v0.4.0
type IntegrationBase struct {
suite.Suite
TestCluster *TestCluster
TestClusterConfig *TestClusterConfig
Engine FrontendClient
AdminClient AdminClient
Logger log.Logger
DomainName string
SecondaryDomainName string
TestRawHistoryDomainName string
ForeignDomainName string
ArchivalDomainName string
DefaultTestCluster testcluster.PersistenceTestCluster
VisibilityTestCluster testcluster.PersistenceTestCluster
}
IntegrationBase is a base struct for integration tests
func NewIntegrationBase ¶ added in v0.20.0
func NewIntegrationBase(params IntegrationBaseParams) *IntegrationBase
func (*IntegrationBase) RandomizeStr ¶ added in v1.2.17
func (s *IntegrationBase) RandomizeStr(id string) string
func (*IntegrationBase) RegisterDomain ¶ added in v1.2.17
func (s *IntegrationBase) RegisterDomain( domain string, retentionDays int, historyArchivalStatus types.ArchivalStatus, historyArchivalURI string, visibilityArchivalStatus types.ArchivalStatus, visibilityArchivalURI string, ) error
func (*IntegrationBase) SetupLogger ¶ added in v1.2.17
func (s *IntegrationBase) SetupLogger()
func (*IntegrationBase) TearDownBaseSuite ¶ added in v1.2.17
func (s *IntegrationBase) TearDownBaseSuite()
type IntegrationBaseParams ¶ added in v0.20.0
type IntegrationBaseParams struct {
T *testing.T
DefaultTestCluster testcluster.PersistenceTestCluster
VisibilityTestCluster testcluster.PersistenceTestCluster
TestClusterConfig *TestClusterConfig
}
type IntegrationQueueV2Suite ¶ added in v1.3.2
type IntegrationQueueV2Suite struct {
*IntegrationSuite
}
type IntegrationSuite ¶ added in v0.20.0
type IntegrationSuite struct {
// override suite.Suite.Assertions with require.Assertions; this means that s.NotNil(nil) will stop the test,
// not merely log an error
*require.Assertions
*IntegrationBase
}
type MatchingClient ¶ added in v1.2.13
type MatchingConfig ¶ added in v1.2.13
type MatchingConfig struct {
// number of matching host can be at most 4 due to existing static port assignments in onebox.go.
// can be changed easily.
NumMatchingHosts int
SimulationConfig MatchingSimulationConfig
}
type MatchingSimulationConfig ¶ added in v1.2.13
type MatchingSimulationConfig struct {
// Number of task list write partitions defaults to 1
TaskListWritePartitions int
// Number of task list read partitions defaults to 1
TaskListReadPartitions int
// At most N polls will be forwarded at a time. defaults to 20
ForwarderMaxOutstandingPolls int
// At most N tasks will be forwarded at a time. defaults to 1
ForwarderMaxOutstandingTasks int
// Forwarder rps limit defaults to 10
ForwarderMaxRatePerSecond int
// Children per node. defaults to 20
ForwarderMaxChildrenPerNode int
// LocalPollWaitTime. defaults to 0ms.
LocalPollWaitTime time.Duration
// LocalTaskWaitTime. defaults to 0ms.
LocalTaskWaitTime time.Duration
// RecordDecisionTaskStartedTime. The amount of time spent by History to complete RecordDecisionTaskStarted
RecordDecisionTaskStartedTime time.Duration
// TasklistLoadBalancerStrategy the strategy of load balancer. defaults to "random".
TasklistLoadBalancerStrategy string
// The pollers that will be created to process
Pollers []SimulationPollerConfiguration
Tasks []SimulationTaskConfiguration
Backlogs []SimulationBacklogConfiguration
// GetPartitionConfigFromDB indicates whether to get the partition config from DB or not.
// This is a prerequisite for adaptive scaler.
GetPartitionConfigFromDB bool
// Adaptive scaler configurations
EnableAdaptiveScaler bool
PartitionDownscaleFactor float64
PartitionUpscaleRPS int
PartitionUpscaleSustainedDuration time.Duration
PartitionDownscaleSustainedDuration time.Duration
AdaptiveScalerUpdateInterval time.Duration
QPSTrackerInterval time.Duration
TaskIsolationDuration time.Duration
}
type MessagingClientConfig ¶ added in v0.5.7
type MessagingClientConfig struct {
UseMock bool
KafkaConfig *config.KafkaConfig
}
MessagingClientConfig is the config for messaging config
type Service ¶ added in v0.24.0
type Service interface {
Start()
Stop()
GetLogger() log.Logger
GetThrottledLogger() log.Logger
GetMetricsClient() metrics.Client
GetClientBean() client.Bean
GetTimeSource() clock.TimeSource
GetDispatcher() *yarpc.Dispatcher
GetMembershipResolver() membership.Resolver
GetHostInfo() membership.HostInfo
GetClusterMetadata() cluster.Metadata
GetMessagingClient() messaging.Client
GetBlobstoreClient() blobstore.Client
GetArchivalMetadata() archiver.ArchivalMetadata
GetArchiverProvider() provider.ArchiverProvider
GetPayloadSerializer() persistence.PayloadSerializer
}
Service is the interface which must be implemented by all the services TODO: Service contains many methods that are not used now that we have resource bean, these should be cleaned up
func NewService ¶ added in v0.24.0
NewService instantiates a Service Instance TODO: have a better name for Service.
type SimulationBacklogConfiguration ¶ added in v1.2.15
type SimulationPollerConfiguration ¶ added in v1.2.13
type SimulationPollerConfiguration struct {
// The isolation group that pollers will be created with. Optional.
IsolationGroup string
// The number of pollers that will be created with this configuration. Defaults to 1
NumPollers int
// TaskProcessTime. The amount of time spent by the poller in-between requests. Defaults to 1ms
TaskProcessTime time.Duration
// Poll request timeout defaults to 15 seconds
PollTimeout time.Duration
}
type SimulationTaskConfiguration ¶ added in v1.2.13
type SimulationTaskConfiguration struct {
// The isolation groups that tasks will be evenly distributed between
IsolationGroups []string
// Number of task generators defaults to 1
NumTaskGenerators int
// Upper limit of tasks to generate. Task generators will stop if total number of tasks generated reaches MaxTaskToGenerate during simulation
// Defaults to 2k
MaxTaskToGenerate int
// Task generation QPS. Defaults to 40.
TasksPerSecond int
// The burst value for the rate limiter for task generation. Controls the maximum number of AddTask requests
// that can be sent concurrently. For example, if you have TasksPerSecond, TasksBurst, and NumTaskGenerators all
// set to 10 then every second you'll get 10 tasks added right at the start of the second. If you instead set
// TasksBurst to 1 then you'd get a steady stream of tasks, with one task every 100ms.
TasksBurst int
// OverTime is a list of TasksProduceSpec that will be used to change the qps over time.
// Each item has a duration and they will be applied in the given order.
// If this is set, TasksPerSecond and TasksBurst will be ignored.
OverTime []TasksProduceSpec
}
type SizeLimitIntegrationSuite ¶ added in v0.20.0
type SizeLimitIntegrationSuite struct {
// override suite.Suite.Assertions with require.Assertions; this means that s.NotNil(nil) will stop the test,
// not merely log an error
*require.Assertions
*IntegrationBase
}
type TaskListIsolationIntegrationSuite ¶ added in v1.2.15
type TaskListIsolationIntegrationSuite struct {
*require.Assertions
*IntegrationBase
}
type TaskPoller ¶ added in v0.4.0
type TaskPoller struct {
Engine FrontendClient
Domain string
TaskList *types.TaskList
StickyTaskList *types.TaskList
StickyScheduleToStartTimeoutSeconds *int32
Identity string
DecisionHandler decisionTaskHandler
ActivityHandler activityTaskHandler
QueryHandler queryHandler
Logger log.Logger
T *testing.T
CallOptions []yarpc.CallOption
}
TaskPoller is used in integration tests to poll decision or activity tasks
func (*TaskPoller) HandlePartialDecision ¶ added in v0.4.0
func (p *TaskPoller) HandlePartialDecision(response *types.PollForDecisionTaskResponse) ( *types.RespondDecisionTaskCompletedResponse, error)
HandlePartialDecision for decision task
func (*TaskPoller) PollAndProcessActivityTask ¶ added in v0.4.0
func (p *TaskPoller) PollAndProcessActivityTask(dropTask bool) error
PollAndProcessActivityTask for activity tasks
func (*TaskPoller) PollAndProcessActivityTaskWithID ¶ added in v0.4.0
func (p *TaskPoller) PollAndProcessActivityTaskWithID(dropTask bool) error
PollAndProcessActivityTaskWithID is similar to PollAndProcessActivityTask but using RespondActivityTask...ByID
func (*TaskPoller) PollAndProcessDecisionTask ¶ added in v0.4.0
func (p *TaskPoller) PollAndProcessDecisionTask(dumpHistory bool, dropTask bool) (isQueryTask bool, err error)
PollAndProcessDecisionTask for decision tasks
func (*TaskPoller) PollAndProcessDecisionTaskWithAttempt ¶ added in v0.4.0
func (p *TaskPoller) PollAndProcessDecisionTaskWithAttempt( dumpHistory bool, dropTask bool, pollStickyTaskList bool, respondStickyTaskList bool, decisionAttempt int64, ) (isQueryTask bool, err error)
PollAndProcessDecisionTaskWithAttempt for decision tasks
func (*TaskPoller) PollAndProcessDecisionTaskWithAttemptAndRetry ¶ added in v0.4.0
func (p *TaskPoller) PollAndProcessDecisionTaskWithAttemptAndRetry( dumpHistory bool, dropTask bool, pollStickyTaskList bool, respondStickyTaskList bool, decisionAttempt int64, retryCount int, ) (isQueryTask bool, err error)
PollAndProcessDecisionTaskWithAttemptAndRetry for decision tasks
func (*TaskPoller) PollAndProcessDecisionTaskWithAttemptAndRetryAndForceNewDecision ¶ added in v0.4.0
func (p *TaskPoller) PollAndProcessDecisionTaskWithAttemptAndRetryAndForceNewDecision( dumpHistory bool, dropTask bool, pollStickyTaskList bool, respondStickyTaskList bool, decisionAttempt int64, retryCount int, forceCreateNewDecision bool, queryResult *types.WorkflowQueryResult, ) (isQueryTask bool, newTask *types.RespondDecisionTaskCompletedResponse, err error)
PollAndProcessDecisionTaskWithAttemptAndRetryAndForceNewDecision for decision tasks
func (*TaskPoller) PollAndProcessDecisionTaskWithSticky ¶ added in v0.4.0
func (p *TaskPoller) PollAndProcessDecisionTaskWithSticky(dumpHistory bool, dropTask bool) (isQueryTask bool, err error)
PollAndProcessDecisionTaskWithSticky for decision tasks
func (*TaskPoller) PollAndProcessDecisionTaskWithoutRetry ¶ added in v0.4.0
func (p *TaskPoller) PollAndProcessDecisionTaskWithoutRetry(dumpHistory bool, dropTask bool) (isQueryTask bool, err error)
PollAndProcessDecisionTaskWithoutRetry for decision tasks
func (*TaskPoller) PollAndProcessDecisions ¶ added in v1.2.15
func (p *TaskPoller) PollAndProcessDecisions() context.CancelFunc
type TasksProduceSpec ¶ added in v1.2.15
type TasksProduceSpec struct {
// Task generation qps
TasksPerSecond int
// The burst value for the rate limiter for task generation.
TasksBurst int
// The duration for which the settings will be applied.
// If the duration is unset, the settings will be applied indefinitely.
Duration *time.Duration
}
type TestCluster ¶ added in v0.5.7
type TestCluster struct {
// contains filtered or unexported fields
}
TestCluster is a base struct for integration tests
func NewCluster ¶ added in v0.5.7
func NewCluster(t *testing.T, options *TestClusterConfig, logger log.Logger, params persistencetests.TestBaseParams) (*TestCluster, error)
NewCluster creates and sets up the test cluster
func NewPinotTestCluster ¶ added in v1.2.5
func NewPinotTestCluster(t *testing.T, options *TestClusterConfig, logger log.Logger, params persistencetests.TestBaseParams) (*TestCluster, error)
func (*TestCluster) GetAdminClient ¶ added in v0.5.7
func (tc *TestCluster) GetAdminClient() AdminClient
GetAdminClient returns an admin client from the test cluster
func (*TestCluster) GetExecutionManagerFactory ¶ added in v0.11.0
func (tc *TestCluster) GetExecutionManagerFactory() persistence.ExecutionManagerFactory
GetExecutionManagerFactory returns an execution manager factory from the test cluster
func (*TestCluster) GetFrontendClient ¶ added in v0.5.7
func (tc *TestCluster) GetFrontendClient() FrontendClient
GetFrontendClient returns a frontend client from the test cluster
func (*TestCluster) GetHistoryClient ¶ added in v0.9.3
func (tc *TestCluster) GetHistoryClient() HistoryClient
GetHistoryClient returns a history client from the test cluster
func (*TestCluster) GetMatchingClient ¶ added in v1.2.13
func (tc *TestCluster) GetMatchingClient() MatchingClient
GetMatchingClient returns a matching client from the test cluster
func (*TestCluster) GetMatchingClients ¶ added in v1.2.14
func (tc *TestCluster) GetMatchingClients() []MatchingClient
func (*TestCluster) TearDownCluster ¶ added in v0.5.7
func (tc *TestCluster) TearDownCluster()
TearDownCluster tears down the test cluster
type TestClusterConfig ¶ added in v0.5.7
type TestClusterConfig struct {
FrontendAddress string
EnableArchival bool
IsPrimaryCluster bool
ClusterNo int
ClusterGroupMetadata config.ClusterGroupMetadata
MessagingClientConfig *MessagingClientConfig
Persistence persistencetests.TestBaseOptions
HistoryConfig *HistoryConfig
MatchingConfig *MatchingConfig
ESConfig *config.ElasticSearchConfig
WorkerConfig *WorkerConfig
MockAdminClient map[string]adminClient.Client
PinotConfig *config.PinotVisibilityConfig
AsyncWFQueues map[string]config.AsyncWorkflowQueueProvider
// TimeSource is used to override the time source of internal components.
// Note that most components don't respect this, and it's only used in a few places.
// e.g. async workflow test's consumer manager and domain manager
TimeSource clock.MockedTimeSource
FrontendDynamicConfigOverrides map[dynamicproperties.Key]interface{}
HistoryDynamicConfigOverrides map[dynamicproperties.Key]interface{}
MatchingDynamicConfigOverrides map[dynamicproperties.Key]interface{}
WorkerDynamicConfigOverrides map[dynamicproperties.Key]interface{}
}
TestClusterConfig are config for a test cluster
func GetTestClusterConfig ¶ added in v0.5.7
func GetTestClusterConfig(configFile string) (*TestClusterConfig, error)
GetTestClusterConfig return test cluster config
func GetTestClusterConfigs ¶ added in v0.20.0
func GetTestClusterConfigs(configFile string) ([]*TestClusterConfig, error)
GetTestClusterConfigs return test cluster configs
type WorkerConfig ¶ added in v0.5.7
type WorkerConfig struct {
EnableArchiver bool
EnableIndexer bool
EnableReplicator bool
EnableAsyncWFConsumer bool
}
WorkerConfig is the config for enabling/disabling cadence worker
type WorkflowIDInternalRateLimitIntegrationSuite ¶ added in v1.2.10
type WorkflowIDInternalRateLimitIntegrationSuite struct {
*require.Assertions
*IntegrationBase
}
type WorkflowIDRateLimitIntegrationSuite ¶ added in v1.2.9
type WorkflowIDRateLimitIntegrationSuite struct {
*require.Assertions
*IntegrationBase
}