Documentation
¶
Index ¶
- Constants
- Variables
- func DecodeString(t require.TestingT, pls *commonpb.Payloads) string
- func EventBatchesToVersionHistory(versionHistory *historyspb.VersionHistory, eventBatches []*historypb.History) (*historyspb.VersionHistory, error)
- func ExtractReplicationMessages(msg proto.Message) *replicationspb.WorkflowReplicationMessages
- func GetPersistenceTestDefaults() persistencetests.TestBaseOptions
- func MustToPayload(t require.TestingT, v any) *commonpb.Payload
- func NewContext(parent ...context.Context) context.Context
- func NewTestDataConverter() converter.DataConverter
- func RandomizeStr(id string) string
- func RandomizedNexusEndpoint(name string) string
- func UseCassandraPersistence() bool
- func UseSQLVisibility() bool
- func WithDropTask(o *PollAndProcessWorkflowTaskOptions)
- func WithDumpHistory(o *PollAndProcessWorkflowTaskOptions)
- func WithForceNewWorkflowTask(o *PollAndProcessWorkflowTaskOptions)
- func WithNoDumpCommands(o *PollAndProcessWorkflowTaskOptions)
- func WithPollSticky(o *PollAndProcessWorkflowTaskOptions)
- func WithRespondSticky(o *PollAndProcessWorkflowTaskOptions)
- func WithoutRetries(o *PollAndProcessWorkflowTaskOptions)
- type ActivityTaskHandler
- type ArchiverBase
- type CapturedReplicationMessage
- type Env
- type FrontendConfig
- type FunctionalTestBase
- func (s *FunctionalTestBase) AdminClient() adminservice.AdminServiceClient
- func (s *FunctionalTestBase) CloseShard(namespaceID string, workflowID string)
- func (s *FunctionalTestBase) Context() context.Context
- func (s *FunctionalTestBase) DecodePayloadsByteSliceInt32(ps *commonpb.Payloads) (r int32)
- func (s *FunctionalTestBase) DecodePayloadsInt(ps *commonpb.Payloads) int
- func (s *FunctionalTestBase) DecodePayloadsString(ps *commonpb.Payloads) string
- func (s *FunctionalTestBase) DurationNear(value, target, tolerance time.Duration)
- func (s *FunctionalTestBase) ExternalNamespace() namespace.Name
- func (s *FunctionalTestBase) FrontendClient() workflowservice.WorkflowServiceClient
- func (s *FunctionalTestBase) FrontendGRPCAddress() string
- func (s *FunctionalTestBase) GetHistory(namespace string, execution *commonpb.WorkflowExecution) []*historypb.HistoryEvent
- func (s *FunctionalTestBase) GetHistoryFunc(namespace string, execution *commonpb.WorkflowExecution) func() []*historypb.HistoryEvent
- func (s *FunctionalTestBase) GetNamespaceID(namespace string) string
- func (s *FunctionalTestBase) GetTestCluster() *TestCluster
- func (s *FunctionalTestBase) GetTestClusterConfig() *TestClusterConfig
- func (s *FunctionalTestBase) HttpAPIAddress() string
- func (s *FunctionalTestBase) InjectHook(hook testhooks.Hook) (cleanup func())
- func (s *FunctionalTestBase) MarkNamespaceAsDeleted(nsName namespace.Name) error
- func (s *FunctionalTestBase) Namespace() namespace.Name
- func (s *FunctionalTestBase) NamespaceID() namespace.ID
- func (s *FunctionalTestBase) OperatorClient() operatorservice.OperatorServiceClient
- func (s *FunctionalTestBase) OverrideDynamicConfig(setting dynamicconfig.GenericSetting, value any) (cleanup func())
- func (s *FunctionalTestBase) RegisterNamespace(nsName namespace.Name, retentionDays int32, ...) (namespace.ID, error)
- func (s *FunctionalTestBase) RunTestWithMatchingBehavior(subtest func())
- func (s *FunctionalTestBase) SdkClient() sdkclient.Client
- func (s *FunctionalTestBase) SdkWorker() sdkworker.Worker
- func (s *FunctionalTestBase) SendSignal(nsName string, execution *commonpb.WorkflowExecution, signalName string, ...) error
- func (s *FunctionalTestBase) SendToChannel(ctx context.Context, ch chan struct{})
- func (s *FunctionalTestBase) SetupSubTest()
- func (s *FunctionalTestBase) SetupSuite()
- func (s *FunctionalTestBase) SetupSuiteWithCluster(options ...TestClusterOption)
- func (s *FunctionalTestBase) SetupTest()
- func (s *FunctionalTestBase) TaskPoller() *taskpoller.TaskPoller
- func (s *FunctionalTestBase) TaskQueue() string
- func (s *FunctionalTestBase) TearDownCluster()
- func (s *FunctionalTestBase) TearDownSubTest()
- func (s *FunctionalTestBase) TearDownSuite()
- func (s *FunctionalTestBase) TearDownTest()
- func (s *FunctionalTestBase) WaitForChannel(ctx context.Context, ch chan struct{})
- func (s *FunctionalTestBase) WorkerGRPCAddress() string
- type GlobalMetricCapture
- type HistoryConfig
- type MatchingBehavior
- type MatchingConfig
- type MessageHandler
- type NamespaceMetricCapture
- type PersistenceTestBaseFactory
- type PollAndProcessWorkflowTaskOptionFunc
- type PollAndProcessWorkflowTaskOptions
- type PollAndProcessWorkflowTaskResponse
- type QueryHandler
- type RecordedTask
- type ReplicationStreamRecorder
- func (r *ReplicationStreamRecorder) Clear()
- func (r *ReplicationStreamRecorder) GetMessages() []CapturedReplicationMessage
- func (r *ReplicationStreamRecorder) SetOutputFile(filePath string)
- func (r *ReplicationStreamRecorder) StreamInterceptor(clusterName string) grpc.StreamClientInterceptor
- func (r *ReplicationStreamRecorder) StreamServerInterceptor(clusterName string) grpc.StreamServerInterceptor
- func (r *ReplicationStreamRecorder) UnaryInterceptor(clusterName string) grpc.UnaryClientInterceptor
- func (r *ReplicationStreamRecorder) UnaryServerInterceptor(clusterName string) grpc.UnaryServerInterceptor
- func (r *ReplicationStreamRecorder) WriteToLog() error
- type TaskFilter
- type TaskMatcher
- type TaskPollerdeprecated
- func (p *TaskPoller) HandlePartialWorkflowTask(response *workflowservice.PollWorkflowTaskQueueResponse, ...) (*workflowservice.RespondWorkflowTaskCompletedResponse, error)
- func (p *TaskPoller) PollAndProcessActivityTask(dropTask bool) error
- func (p *TaskPoller) PollAndProcessActivityTaskWithID(dropTask bool) error
- func (p *TaskPoller) PollAndProcessWorkflowTask(funcs ...PollAndProcessWorkflowTaskOptionFunc) (res PollAndProcessWorkflowTaskResponse, err error)
- func (p *TaskPoller) PollAndProcessWorkflowTaskWithOptions(opts *PollAndProcessWorkflowTaskOptions) (res PollAndProcessWorkflowTaskResponse, err error)
- type TaskQueueRecorder
- func (r *TaskQueueRecorder) AddHistoryTasks(ctx context.Context, request *persistence.AddHistoryTasksRequest) error
- func (r *TaskQueueRecorder) AppendHistoryNodes(ctx context.Context, request *persistence.AppendHistoryNodesRequest) (*persistence.AppendHistoryNodesResponse, error)
- func (r *TaskQueueRecorder) AppendRawHistoryNodes(ctx context.Context, request *persistence.AppendRawHistoryNodesRequest) (*persistence.AppendHistoryNodesResponse, error)
- func (r *TaskQueueRecorder) Close()
- func (r *TaskQueueRecorder) CompleteHistoryTask(ctx context.Context, request *persistence.CompleteHistoryTaskRequest) error
- func (r *TaskQueueRecorder) ConflictResolveWorkflowExecution(ctx context.Context, ...) (*persistence.ConflictResolveWorkflowExecutionResponse, error)
- func (r *TaskQueueRecorder) CountMatchingTasks(category tasks.Category, matcher TaskMatcher) int
- func (r *TaskQueueRecorder) CountTasksForNamespace(category tasks.Category, namespaceID string, matcher TaskMatcher) int
- func (r *TaskQueueRecorder) CountTasksForWorkflow(category tasks.Category, namespaceID string, workflowID string, runID string, ...) int
- func (r *TaskQueueRecorder) CreateWorkflowExecution(ctx context.Context, request *persistence.CreateWorkflowExecutionRequest) (*persistence.CreateWorkflowExecutionResponse, error)
- func (r *TaskQueueRecorder) DeleteCurrentWorkflowExecution(ctx context.Context, ...) error
- func (r *TaskQueueRecorder) DeleteHistoryBranch(ctx context.Context, request *persistence.DeleteHistoryBranchRequest) error
- func (r *TaskQueueRecorder) DeleteReplicationTaskFromDLQ(ctx context.Context, request *persistence.DeleteReplicationTaskFromDLQRequest) error
- func (r *TaskQueueRecorder) DeleteWorkflowExecution(ctx context.Context, request *persistence.DeleteWorkflowExecutionRequest) error
- func (r *TaskQueueRecorder) ForkHistoryBranch(ctx context.Context, request *persistence.ForkHistoryBranchRequest) (*persistence.ForkHistoryBranchResponse, error)
- func (r *TaskQueueRecorder) GetAllHistoryTreeBranches(ctx context.Context, request *persistence.GetAllHistoryTreeBranchesRequest) (*persistence.GetAllHistoryTreeBranchesResponse, error)
- func (r *TaskQueueRecorder) GetAllRecordedTasks() map[tasks.Category][]RecordedTask
- func (r *TaskQueueRecorder) GetAllTasks() map[tasks.Category][]tasks.Task
- func (r *TaskQueueRecorder) GetCurrentExecution(ctx context.Context, request *persistence.GetCurrentExecutionRequest) (*persistence.GetCurrentExecutionResponse, error)
- func (r *TaskQueueRecorder) GetHistoryBranchUtil() persistence.HistoryBranchUtil
- func (r *TaskQueueRecorder) GetHistoryTasks(ctx context.Context, request *persistence.GetHistoryTasksRequest) (*persistence.GetHistoryTasksResponse, error)
- func (r *TaskQueueRecorder) GetName() string
- func (r *TaskQueueRecorder) GetRecordedTasksByCategoryFiltered(category tasks.Category, filter TaskFilter) []RecordedTask
- func (r *TaskQueueRecorder) GetReplicationTasksFromDLQ(ctx context.Context, request *persistence.GetReplicationTasksFromDLQRequest) (*persistence.GetHistoryTasksResponse, error)
- func (r *TaskQueueRecorder) GetWorkflowExecution(ctx context.Context, request *persistence.GetWorkflowExecutionRequest) (*persistence.GetWorkflowExecutionResponse, error)
- func (r *TaskQueueRecorder) IsReplicationDLQEmpty(ctx context.Context, request *persistence.GetReplicationTasksFromDLQRequest) (bool, error)
- func (r *TaskQueueRecorder) ListConcreteExecutions(ctx context.Context, request *persistence.ListConcreteExecutionsRequest) (*persistence.ListConcreteExecutionsResponse, error)
- func (r *TaskQueueRecorder) MatchTasks(category tasks.Category, matcher TaskMatcher) []RecordedTask
- func (r *TaskQueueRecorder) MatchTasksForNamespace(category tasks.Category, namespaceID string, matcher TaskMatcher) []RecordedTask
- func (r *TaskQueueRecorder) MatchTasksForWorkflow(category tasks.Category, namespaceID string, workflowID string, runID string, ...) []RecordedTask
- func (r *TaskQueueRecorder) PutReplicationTaskToDLQ(ctx context.Context, request *persistence.PutReplicationTaskToDLQRequest) error
- func (r *TaskQueueRecorder) RangeCompleteHistoryTasks(ctx context.Context, request *persistence.RangeCompleteHistoryTasksRequest) error
- func (r *TaskQueueRecorder) RangeDeleteReplicationTaskFromDLQ(ctx context.Context, ...) error
- func (r *TaskQueueRecorder) ReadHistoryBranch(ctx context.Context, request *persistence.ReadHistoryBranchRequest) (*persistence.ReadHistoryBranchResponse, error)
- func (r *TaskQueueRecorder) ReadHistoryBranchByBatch(ctx context.Context, request *persistence.ReadHistoryBranchRequest) (*persistence.ReadHistoryBranchByBatchResponse, error)
- func (r *TaskQueueRecorder) ReadHistoryBranchReverse(ctx context.Context, request *persistence.ReadHistoryBranchReverseRequest) (*persistence.ReadHistoryBranchReverseResponse, error)
- func (r *TaskQueueRecorder) ReadRawHistoryBranch(ctx context.Context, request *persistence.ReadHistoryBranchRequest) (*persistence.ReadRawHistoryBranchResponse, error)
- func (r *TaskQueueRecorder) SetWorkflowExecution(ctx context.Context, request *persistence.SetWorkflowExecutionRequest) (*persistence.SetWorkflowExecutionResponse, error)
- func (r *TaskQueueRecorder) TrimHistoryBranch(ctx context.Context, request *persistence.TrimHistoryBranchRequest) (*persistence.TrimHistoryBranchResponse, error)
- func (r *TaskQueueRecorder) UpdateWorkflowExecution(ctx context.Context, request *persistence.UpdateWorkflowExecutionRequest) (*persistence.UpdateWorkflowExecutionResponse, error)
- func (r *TaskQueueRecorder) WriteToLog(filePath string) error
- type TemporalImpl
- func (c *TemporalImpl) AdminClient() adminservice.AdminServiceClient
- func (c *TemporalImpl) Authorize(ctx context.Context, caller *authorization.Claims, ...) (authorization.Result, error)
- func (c *TemporalImpl) CaptureMetricsHandler() *metricstest.CaptureHandlerdeprecated
- func (c *TemporalImpl) ChasmEngine() (chasm.Engine, error)
- func (c *TemporalImpl) ChasmVisibilityManager() chasm.VisibilityManager
- func (c *TemporalImpl) DcClient() *dynamicconfig.MemoryClient
- func (c *TemporalImpl) FrontendClient() workflowservice.WorkflowServiceClient
- func (c *TemporalImpl) FrontendGRPCAddress() string
- func (c *TemporalImpl) FrontendHTTPAddress() string
- func (c *TemporalImpl) GetCHASMRegistry() *chasm.Registry
- func (c *TemporalImpl) GetClaims(authInfo *authorization.AuthInfo) (*authorization.Claims, error)
- func (c *TemporalImpl) GetExecutionManager() persistence.ExecutionManager
- func (c *TemporalImpl) GetGrpcClientInterceptor() *grpcinject.Interceptor
- func (c *TemporalImpl) GetMetricsHandler() metrics.Handler
- func (c *TemporalImpl) GetTLSConfigProvider() encryption.TLSConfigProvider
- func (c *TemporalImpl) GetTaskCategoryRegistry() tasks.TaskCategoryRegistry
- func (c *TemporalImpl) GetTaskQueueRecorder() *TaskQueueRecorder
- func (c *TemporalImpl) HistoryClient() historyservice.HistoryServiceClient
- func (c *TemporalImpl) MatchingClient() matchingservice.MatchingServiceClient
- func (c *TemporalImpl) NamespaceRegistries() []namespace.Registry
- func (c *TemporalImpl) OperatorClient() operatorservice.OperatorServiceClient
- func (c *TemporalImpl) RemoteFrontendGRPCAddress() string
- func (c *TemporalImpl) SchedulerClient() schedulerpb.SchedulerServiceClient
- func (c *TemporalImpl) SetOnAuthorize(...)
- func (c *TemporalImpl) SetOnGetClaims(fn func(*authorization.AuthInfo) (*authorization.Claims, error))
- func (c *TemporalImpl) SetTaskQueueRecorder(recorder *TaskQueueRecorder)
- func (c *TemporalImpl) Start() error
- func (c *TemporalImpl) Stop() error
- func (c *TemporalImpl) TlsConfigProvider() *encryption.FixedTLSConfigProvider
- func (c *TemporalImpl) WorkerGRPCAddress() string
- type TemporalParams
- type TestCluster
- func (tc *TestCluster) AdminClient() adminservice.AdminServiceClient
- func (tc *TestCluster) ArchiverBase() *ArchiverBase
- func (tc *TestCluster) ClusterName() string
- func (tc *TestCluster) ExecutionManager() persistence.ExecutionManager
- func (tc *TestCluster) FrontendClient() workflowservice.WorkflowServiceClient
- func (tc *TestCluster) GetReplicationStreamRecorder() *ReplicationStreamRecorder
- func (tc *TestCluster) GetTaskQueueRecorder() *TaskQueueRecorder
- func (tc *TestCluster) HistoryClient() historyservice.HistoryServiceClient
- func (tc *TestCluster) Host() *TemporalImpl
- func (tc *TestCluster) MatchingClient() matchingservice.MatchingServiceClient
- func (tc *TestCluster) OperatorClient() operatorservice.OperatorServiceClient
- func (tc *TestCluster) OverrideDynamicConfig(t *testing.T, key dynamicconfig.GenericSetting, value any) (cleanup func())
- func (tc *TestCluster) SchedulerClient() schedulerpb.SchedulerServiceClient
- func (tc *TestCluster) TearDownCluster() error
- func (tc *TestCluster) TestBase() *persistencetests.TestBase
- func (tc *TestCluster) WorkerGRPCAddress() string
- type TestClusterConfig
- type TestClusterFactory
- type TestClusterOption
- func WithArchivalEnabled() TestClusterOption
- func WithCustomHistoryArchiverFactory(factory provider.CustomHistoryArchiverFactory) TestClusterOption
- func WithCustomVisibilityArchiverFactory(factory provider.CustomVisibilityArchiverFactory) TestClusterOption
- func WithDynamicConfigOverrides(overrides map[dynamicconfig.Key]any) TestClusterOption
- func WithFaultInjectionConfig(cfg *config.FaultInjection) TestClusterOption
- func WithFxOptionsForService(serviceName primitives.ServiceName, options ...fx.Option) TestClusterOption
- func WithMTLS() TestClusterOption
- func WithNumHistoryShards(n int32) TestClusterOption
- func WithSharedCluster() TestClusterOption
- type TestClusterParams
- type TestDataConverter
- func (tdc *TestDataConverter) FromPayload(payload *commonpb.Payload, valuePtr any) error
- func (tdc *TestDataConverter) FromPayloads(payloads *commonpb.Payloads, valuePtrs ...any) error
- func (tdc *TestDataConverter) ToPayload(value any) (*commonpb.Payload, error)
- func (tdc *TestDataConverter) ToPayloads(values ...any) (*commonpb.Payloads, error)
- func (tdc *TestDataConverter) ToString(payload *commonpb.Payload) string
- func (tdc *TestDataConverter) ToStrings(payloads *commonpb.Payloads) []string
- type TestEnv
- func (e *TestEnv) Context() context.Context
- func (e *TestEnv) Error(err error, msgAndArgs ...any)
- func (e *TestEnv) InjectHook(hook testhooks.Hook) (cleanup func())
- func (e *TestEnv) Namespace() namespace.Name
- func (e *TestEnv) NamespaceID() namespace.ID
- func (e *TestEnv) NoError(err error, msgAndArgs ...any)
- func (e *TestEnv) OverrideDynamicConfig(setting dynamicconfig.GenericSetting, value any) (cleanup func())
- func (e *TestEnv) Run(name string, subtest func()) bool
- func (e *TestEnv) SdkClient() sdkclient.Client
- func (e *TestEnv) SdkWorker() sdkworker.Worker
- func (e *TestEnv) StartGlobalMetricCapture() *GlobalMetricCapture
- func (e *TestEnv) StartNamespaceMetricCapture() *NamespaceMetricCapture
- func (e *TestEnv) T() *testing.T
- func (e *TestEnv) TaskPoller() *taskpoller.TaskPoller
- func (e *TestEnv) Tv() *testvars.TestVars
- func (e *TestEnv) WorkerTaskQueue() string
- type TestOption
- type WorkerConfig
- type WorkflowTaskHandler
Constants ¶
const ( DefaultPageSize = 5 PprofTestPort = 7000 TlsCertCommonName = "my-common-name" ClientSuiteLimit = 10 // TODO (alex): replace all sleeps with WaitForESToSettle with s.Eventually() WaitForESToSettle = 4 * time.Second // wait es shards for some time ensure data consistent )
const ( DirectionSend = "send" DirectionRecv = "recv" DirectionServerSend = "server_send" DirectionServerRecv = "server_recv" )
Message direction constants
const NamespaceCacheRefreshInterval = time.Second
Variables ¶
var ( ErrEncodingIsNotSet = errors.New("payload encoding metadata is not set") ErrEncodingIsNotSupported = errors.New("payload encoding is not supported") )
var (
ErrNoTasks = errors.New("no tasks")
)
Functions ¶
func EventBatchesToVersionHistory ¶
func EventBatchesToVersionHistory( versionHistory *historyspb.VersionHistory, eventBatches []*historypb.History, ) (*historyspb.VersionHistory, error)
func ExtractReplicationMessages ¶ added in v1.30.0
func ExtractReplicationMessages(msg proto.Message) *replicationspb.WorkflowReplicationMessages
ExtractReplicationMessages extracts WorkflowReplicationMessages from a proto message. This is a helper for tests that need to inspect replication message contents.
func GetPersistenceTestDefaults ¶ added in v1.31.0
func GetPersistenceTestDefaults() persistencetests.TestBaseOptions
GetPersistenceTestDefaults returns the default persistence options based on CLI flags. Use this when creating TestClusterConfig to ensure proper database configuration.
func MustToPayload ¶ added in v1.31.0
MustToPayload converts a value to a Payload using the default data converter.
func NewContext ¶
NewContext creates a context with default 90-second timeout and RPC headers.
NOTE: If you're using testcore.NewEnv, you can use env.Context() directly - it already includes RPC headers. This function is primarily for legacy tests or creating standalone contexts outside of the TestEnv framework.
If a parent context is provided, the returned context will be canceled when either the timeout expires OR the parent is canceled.
func NewTestDataConverter ¶
func NewTestDataConverter() converter.DataConverter
TODO (alex): use it by default SdkCleint everywhere?
func RandomizeStr ¶
func RandomizedNexusEndpoint ¶
func UseCassandraPersistence ¶ added in v1.29.0
func UseCassandraPersistence() bool
func UseSQLVisibility ¶ added in v1.27.0
func UseSQLVisibility() bool
func WithDropTask ¶
func WithDropTask(o *PollAndProcessWorkflowTaskOptions)
func WithDumpHistory ¶
func WithDumpHistory(o *PollAndProcessWorkflowTaskOptions)
func WithForceNewWorkflowTask ¶
func WithForceNewWorkflowTask(o *PollAndProcessWorkflowTaskOptions)
func WithNoDumpCommands ¶
func WithNoDumpCommands(o *PollAndProcessWorkflowTaskOptions)
func WithPollSticky ¶
func WithPollSticky(o *PollAndProcessWorkflowTaskOptions)
func WithRespondSticky ¶
func WithRespondSticky(o *PollAndProcessWorkflowTaskOptions)
func WithoutRetries ¶
func WithoutRetries(o *PollAndProcessWorkflowTaskOptions)
Types ¶
type ActivityTaskHandler ¶
type ActivityTaskHandler func(task *workflowservice.PollActivityTaskQueueResponse) (*commonpb.Payloads, bool, error)
type ArchiverBase ¶
type ArchiverBase struct {
// contains filtered or unexported fields
}
ArchiverBase is a testcore struct for archiver provider being used in functional tests
func (*ArchiverBase) HistoryURI ¶
func (a *ArchiverBase) HistoryURI() string
func (*ArchiverBase) Metadata ¶
func (a *ArchiverBase) Metadata() archiver.ArchivalMetadata
func (*ArchiverBase) Provider ¶
func (a *ArchiverBase) Provider() provider.ArchiverProvider
func (*ArchiverBase) VisibilityURI ¶
func (a *ArchiverBase) VisibilityURI() string
type CapturedReplicationMessage ¶ added in v1.30.0
type CapturedReplicationMessage struct {
Timestamp string `json:"timestamp"`
Method string `json:"method"`
Direction string `json:"direction"`
ClusterName string `json:"clusterName"`
TargetAddress string `json:"targetAddress"`
MessageType string `json:"messageType"`
IsStreamCall bool `json:"isStreamCall"`
Request proto.Message `json:"-"` // Don't marshal directly
Response proto.Message `json:"-"` // Don't marshal directly
Message json.RawMessage `json:"message,omitempty"`
}
CapturedReplicationMessage represents a captured replication message
type Env ¶ added in v1.31.0
type Env interface {
// T returns the *testing.T. Deprecated: use the suite's T() method instead.
T() *testing.T
Namespace() namespace.Name
NamespaceID() namespace.ID
FrontendClient() workflowservice.WorkflowServiceClient
AdminClient() adminservice.AdminServiceClient
GetTestCluster() *TestCluster
CloseShard(namespaceID string, workflowID string)
OverrideDynamicConfig(setting dynamicconfig.GenericSetting, value any) (cleanup func())
Context() context.Context
InjectHook(hook testhooks.Hook) (cleanup func())
}
type FrontendConfig ¶
type FrontendConfig struct {
NumFrontendHosts int
}
FrontendConfig is the config for the frontend service
type FunctionalTestBase ¶
type FunctionalTestBase struct {
suite.Suite
// `suite.Suite` embeds `*assert.Assertions` which, by default, makes all asserts (like `s.NoError(err)`)
// only log the error, continue test execution, and only then fail the test.
// This is not desired behavior in most cases. The idiomatic way to change this behavior
// is to replace `*assert.Assertions` with `*require.Assertions` by embedding it in every test suite
// (or base struct of every test suite).
*require.Assertions
protorequire.ProtoAssertions
historyrequire.HistoryRequire
updateutils.UpdateUtils
Logger log.Logger
// contains filtered or unexported fields
}
func (*FunctionalTestBase) AdminClient ¶
func (s *FunctionalTestBase) AdminClient() adminservice.AdminServiceClient
func (*FunctionalTestBase) CloseShard ¶ added in v1.31.0
func (s *FunctionalTestBase) CloseShard(namespaceID string, workflowID string)
CloseShard closes the shard that contains the given workflow. This is a cluster-global operation and cannot be called on shared clusters.
func (*FunctionalTestBase) Context ¶ added in v1.31.0
func (s *FunctionalTestBase) Context() context.Context
Context returns a context with RPC headers for use in this test.
func (*FunctionalTestBase) DecodePayloadsByteSliceInt32 ¶
func (s *FunctionalTestBase) DecodePayloadsByteSliceInt32(ps *commonpb.Payloads) (r int32)
func (*FunctionalTestBase) DecodePayloadsInt ¶
func (s *FunctionalTestBase) DecodePayloadsInt(ps *commonpb.Payloads) int
func (*FunctionalTestBase) DecodePayloadsString ¶
func (s *FunctionalTestBase) DecodePayloadsString(ps *commonpb.Payloads) string
func (*FunctionalTestBase) DurationNear ¶
func (s *FunctionalTestBase) DurationNear(value, target, tolerance time.Duration)
func (*FunctionalTestBase) ExternalNamespace ¶ added in v1.29.0
func (s *FunctionalTestBase) ExternalNamespace() namespace.Name
func (*FunctionalTestBase) FrontendClient ¶
func (s *FunctionalTestBase) FrontendClient() workflowservice.WorkflowServiceClient
func (*FunctionalTestBase) FrontendGRPCAddress ¶
func (s *FunctionalTestBase) FrontendGRPCAddress() string
func (*FunctionalTestBase) GetHistory ¶
func (s *FunctionalTestBase) GetHistory(namespace string, execution *commonpb.WorkflowExecution) []*historypb.HistoryEvent
func (*FunctionalTestBase) GetHistoryFunc ¶
func (s *FunctionalTestBase) GetHistoryFunc(namespace string, execution *commonpb.WorkflowExecution) func() []*historypb.HistoryEvent
func (*FunctionalTestBase) GetNamespaceID ¶
func (s *FunctionalTestBase) GetNamespaceID(namespace string) string
func (*FunctionalTestBase) GetTestCluster ¶
func (s *FunctionalTestBase) GetTestCluster() *TestCluster
func (*FunctionalTestBase) GetTestClusterConfig ¶
func (s *FunctionalTestBase) GetTestClusterConfig() *TestClusterConfig
func (*FunctionalTestBase) HttpAPIAddress ¶
func (s *FunctionalTestBase) HttpAPIAddress() string
func (*FunctionalTestBase) InjectHook ¶ added in v1.27.0
func (s *FunctionalTestBase) InjectHook(hook testhooks.Hook) (cleanup func())
InjectHook sets a test hook inside the cluster.
func (*FunctionalTestBase) MarkNamespaceAsDeleted ¶ added in v1.27.0
func (s *FunctionalTestBase) MarkNamespaceAsDeleted( nsName namespace.Name, ) error
func (*FunctionalTestBase) Namespace ¶
func (s *FunctionalTestBase) Namespace() namespace.Name
func (*FunctionalTestBase) NamespaceID ¶ added in v1.27.0
func (s *FunctionalTestBase) NamespaceID() namespace.ID
func (*FunctionalTestBase) OperatorClient ¶
func (s *FunctionalTestBase) OperatorClient() operatorservice.OperatorServiceClient
func (*FunctionalTestBase) OverrideDynamicConfig ¶
func (s *FunctionalTestBase) OverrideDynamicConfig(setting dynamicconfig.GenericSetting, value any) (cleanup func())
func (*FunctionalTestBase) RegisterNamespace ¶ added in v1.27.0
func (s *FunctionalTestBase) RegisterNamespace( nsName namespace.Name, retentionDays int32, archivalState enumspb.ArchivalState, historyArchivalURI string, visibilityArchivalURI string, ) (namespace.ID, error)
Register namespace using persistence API because:
- The Retention period is set to 0 for archival tests, and this can't be done through FE,
- Update search attributes would require an extra API call,
- One more extra API call would be necessary to get namespace.ID.
func (*FunctionalTestBase) RunTestWithMatchingBehavior ¶
func (s *FunctionalTestBase) RunTestWithMatchingBehavior(subtest func())
func (*FunctionalTestBase) SdkClient ¶ added in v1.28.0
func (s *FunctionalTestBase) SdkClient() sdkclient.Client
func (*FunctionalTestBase) SdkWorker ¶ added in v1.31.0
func (s *FunctionalTestBase) SdkWorker() sdkworker.Worker
func (*FunctionalTestBase) SendSignal ¶ added in v1.27.0
func (s *FunctionalTestBase) SendSignal(nsName string, execution *commonpb.WorkflowExecution, signalName string, input *commonpb.Payloads, identity string) error
TODO (alex): change to nsName namespace.Name
func (*FunctionalTestBase) SendToChannel ¶ added in v1.31.0
func (s *FunctionalTestBase) SendToChannel(ctx context.Context, ch chan struct{})
func (*FunctionalTestBase) SetupSubTest ¶ added in v1.27.0
func (s *FunctionalTestBase) SetupSubTest()
func (*FunctionalTestBase) SetupSuite ¶
func (s *FunctionalTestBase) SetupSuite()
func (*FunctionalTestBase) SetupSuiteWithCluster ¶ added in v1.27.0
func (s *FunctionalTestBase) SetupSuiteWithCluster(options ...TestClusterOption)
func (*FunctionalTestBase) SetupTest ¶
func (s *FunctionalTestBase) SetupTest()
All test suites that inherit FunctionalTestBase and overwrite SetupTest must call this testcore FunctionalTestBase.SetupTest function to distribute the tests into partitions. Otherwise, the test suite will be executed multiple times in each partition.
func (*FunctionalTestBase) TaskPoller ¶ added in v1.28.0
func (s *FunctionalTestBase) TaskPoller() *taskpoller.TaskPoller
func (*FunctionalTestBase) TaskQueue ¶ added in v1.28.0
func (s *FunctionalTestBase) TaskQueue() string
func (*FunctionalTestBase) TearDownCluster ¶ added in v1.27.0
func (s *FunctionalTestBase) TearDownCluster()
func (*FunctionalTestBase) TearDownSubTest ¶ added in v1.28.0
func (s *FunctionalTestBase) TearDownSubTest()
**IMPORTANT**: When overridding this, make sure to invoke `s.FunctionalTestBase.TearDownSubTest()`.
func (*FunctionalTestBase) TearDownSuite ¶
func (s *FunctionalTestBase) TearDownSuite()
func (*FunctionalTestBase) TearDownTest ¶ added in v1.28.0
func (s *FunctionalTestBase) TearDownTest()
**IMPORTANT**: When overridding this, make sure to invoke `s.FunctionalTestBase.TearDownTest()`.
func (*FunctionalTestBase) WaitForChannel ¶ added in v1.27.0
func (s *FunctionalTestBase) WaitForChannel(ctx context.Context, ch chan struct{})
func (*FunctionalTestBase) WorkerGRPCAddress ¶ added in v1.31.0
func (s *FunctionalTestBase) WorkerGRPCAddress() string
type GlobalMetricCapture ¶ added in v1.31.0
type GlobalMetricCapture struct {
// contains filtered or unexported fields
}
func (*GlobalMetricCapture) CollectMetric ¶ added in v1.31.0
func (c *GlobalMetricCapture) CollectMetric(name string, keep func(*metricstest.CapturedRecording) bool) []*metricstest.CapturedRecording
CollectMetric returns the recordings for the named metric that the caller chooses to keep.
func (*GlobalMetricCapture) Metric ¶ added in v1.31.0
func (c *GlobalMetricCapture) Metric(name string) []*metricstest.CapturedRecording
type HistoryConfig ¶
HistoryConfig contains configs for history service
type MatchingBehavior ¶ added in v1.31.0
MatchingBehavior describes a test scenario for matching service behavior.
func AllMatchingBehaviors ¶ added in v1.31.0
func AllMatchingBehaviors() []MatchingBehavior
AllMatchingBehaviors returns all 8 combinations of matching behaviors for testing.
func (MatchingBehavior) InjectHooks ¶ added in v1.31.0
func (b MatchingBehavior) InjectHooks(env hookInjector)
InjectHooks injects the test hooks for this matching behavior.
func (MatchingBehavior) Name ¶ added in v1.31.0
func (b MatchingBehavior) Name() string
Name returns a descriptive name for this behavior combination.
func (MatchingBehavior) Options ¶ added in v1.31.0
func (b MatchingBehavior) Options() []TestOption
Options returns the TestOptions to configure matching behavior.
type MatchingConfig ¶
type MatchingConfig struct {
NumMatchingHosts int
}
MatchingConfig is the config for the matching service
type MessageHandler ¶
type MessageHandler func(task *workflowservice.PollWorkflowTaskQueueResponse) ([]*protocolpb.Message, error)
type NamespaceMetricCapture ¶ added in v1.31.0
type NamespaceMetricCapture struct {
// contains filtered or unexported fields
}
func (*NamespaceMetricCapture) CollectMetric ¶ added in v1.31.0
func (c *NamespaceMetricCapture) CollectMetric(name string, keep func(*metricstest.CapturedRecording) bool) []*metricstest.CapturedRecording
CollectMetric returns the recordings for the named metric that belong to the test namespace and that the caller chooses to keep. It panics if the requested metric is not namespace-scoped.
func (*NamespaceMetricCapture) Metric ¶ added in v1.31.0
func (c *NamespaceMetricCapture) Metric(name string) []*metricstest.CapturedRecording
type PersistenceTestBaseFactory ¶
type PersistenceTestBaseFactory interface {
NewTestBase(options *persistencetests.TestBaseOptions) *persistencetests.TestBase
}
type PollAndProcessWorkflowTaskOptionFunc ¶
type PollAndProcessWorkflowTaskOptionFunc func(*PollAndProcessWorkflowTaskOptions)
func WithExpectedAttemptCount ¶
func WithExpectedAttemptCount(c int) PollAndProcessWorkflowTaskOptionFunc
func WithRetries ¶
func WithRetries(c int) PollAndProcessWorkflowTaskOptionFunc
type PollAndProcessWorkflowTaskResponse ¶
type PollAndProcessWorkflowTaskResponse struct {
IsQueryTask bool
NewTask *workflowservice.RespondWorkflowTaskCompletedResponse
}
type QueryHandler ¶
type QueryHandler func(task *workflowservice.PollWorkflowTaskQueueResponse) (*commonpb.Payloads, error)
type RecordedTask ¶ added in v1.30.0
type RecordedTask struct {
Timestamp time.Time `json:"timestamp"`
TaskType string `json:"taskType"` // The specific task type (e.g., "TASK_TYPE_ACTIVITY_RETRY_TIMER")
ShardID int32 `json:"shardId"`
RangeID int64 `json:"rangeId,omitempty"`
NamespaceID string `json:"namespaceId"`
WorkflowID string `json:"workflowId"`
RunID string `json:"runId"`
Task tasks.Task `json:"task"` // The actual task object
}
RecordedTask wraps a task with metadata about when and where it was written
type ReplicationStreamRecorder ¶ added in v1.30.0
type ReplicationStreamRecorder struct {
// contains filtered or unexported fields
}
ReplicationStreamRecorder captures replication stream messages for testing
func NewReplicationStreamRecorder ¶ added in v1.30.0
func NewReplicationStreamRecorder() *ReplicationStreamRecorder
func (*ReplicationStreamRecorder) Clear ¶ added in v1.30.0
func (r *ReplicationStreamRecorder) Clear()
func (*ReplicationStreamRecorder) GetMessages ¶ added in v1.30.0
func (r *ReplicationStreamRecorder) GetMessages() []CapturedReplicationMessage
func (*ReplicationStreamRecorder) SetOutputFile ¶ added in v1.30.0
func (r *ReplicationStreamRecorder) SetOutputFile(filePath string)
SetOutputFile sets the file path for writing captured messages on-demand
func (*ReplicationStreamRecorder) StreamInterceptor ¶ added in v1.30.0
func (r *ReplicationStreamRecorder) StreamInterceptor(clusterName string) grpc.StreamClientInterceptor
StreamInterceptor returns a gRPC stream client interceptor that captures stream messages
func (*ReplicationStreamRecorder) StreamServerInterceptor ¶ added in v1.30.0
func (r *ReplicationStreamRecorder) StreamServerInterceptor(clusterName string) grpc.StreamServerInterceptor
StreamServerInterceptor returns a gRPC stream server interceptor that captures stream messages
func (*ReplicationStreamRecorder) UnaryInterceptor ¶ added in v1.30.0
func (r *ReplicationStreamRecorder) UnaryInterceptor(clusterName string) grpc.UnaryClientInterceptor
UnaryInterceptor returns a gRPC unary client interceptor that captures messages
func (*ReplicationStreamRecorder) UnaryServerInterceptor ¶ added in v1.30.0
func (r *ReplicationStreamRecorder) UnaryServerInterceptor(clusterName string) grpc.UnaryServerInterceptor
UnaryServerInterceptor returns a gRPC unary server interceptor that captures messages
func (*ReplicationStreamRecorder) WriteToLog ¶ added in v1.30.0
func (r *ReplicationStreamRecorder) WriteToLog() error
WriteToLog writes all captured messages to the configured output file
type TaskFilter ¶ added in v1.30.0
type TaskFilter struct {
NamespaceID string // Required: namespace ID to filter by
WorkflowID string // Optional: workflow ID to filter by (empty string means no filter)
RunID string // Optional: run ID to filter by (empty string means no filter)
}
TaskFilter specifies criteria for filtering recorded tasks
type TaskMatcher ¶ added in v1.30.0
type TaskMatcher func(RecordedTask) bool
TaskMatcher is a function that tests whether a RecordedTask matches some criteria
type TaskPoller
deprecated
type TaskPoller struct {
Client workflowservice.WorkflowServiceClient
Namespace string
TaskQueue *taskqueuepb.TaskQueue
StickyTaskQueue *taskqueuepb.TaskQueue
StickyScheduleToStartTimeout time.Duration
Identity string
WorkflowTaskHandler WorkflowTaskHandler
ActivityTaskHandler ActivityTaskHandler
QueryHandler QueryHandler
MessageHandler MessageHandler
Logger log.Logger
T *testing.T
}
Deprecated: TaskPoller is deprecated. Use taskpoller.TaskPoller instead. TaskPoller is used in functional tests to poll workflow or activity task queues.
func (*TaskPoller) HandlePartialWorkflowTask ¶
func (p *TaskPoller) HandlePartialWorkflowTask(response *workflowservice.PollWorkflowTaskQueueResponse, forceCreateNewWorkflowTask bool) (*workflowservice.RespondWorkflowTaskCompletedResponse, error)
HandlePartialWorkflowTask for workflow task
func (*TaskPoller) PollAndProcessActivityTask ¶
func (p *TaskPoller) PollAndProcessActivityTask(dropTask bool) error
PollAndProcessActivityTask for activity tasks
func (*TaskPoller) PollAndProcessActivityTaskWithID ¶
func (p *TaskPoller) PollAndProcessActivityTaskWithID(dropTask bool) error
PollAndProcessActivityTaskWithID is similar to PollAndProcessActivityTask but using RespondActivityTask...ByID
func (*TaskPoller) PollAndProcessWorkflowTask ¶
func (p *TaskPoller) PollAndProcessWorkflowTask(funcs ...PollAndProcessWorkflowTaskOptionFunc) (res PollAndProcessWorkflowTaskResponse, err error)
func (*TaskPoller) PollAndProcessWorkflowTaskWithOptions ¶
func (p *TaskPoller) PollAndProcessWorkflowTaskWithOptions(opts *PollAndProcessWorkflowTaskOptions) (res PollAndProcessWorkflowTaskResponse, err error)
type TaskQueueRecorder ¶ added in v1.30.0
type TaskQueueRecorder struct {
// contains filtered or unexported fields
}
TaskQueueRecorder wraps an ExecutionManager to record ALL task writes to the history task queues (transfer, timer, replication, visibility, archival, etc.). This is useful for integration tests where you want to assert on what tasks were generated and in what order. Tasks are stored flattened by category - all tasks of the same type are in a single list, with each task wrapped with metadata about when/where it was written.
func NewTaskQueueRecorder ¶ added in v1.30.0
func NewTaskQueueRecorder(delegate persistence.ExecutionManager, logger log.Logger) *TaskQueueRecorder
NewTaskQueueRecorder creates a recorder that wraps the given ExecutionManager
func (*TaskQueueRecorder) AddHistoryTasks ¶ added in v1.30.0
func (r *TaskQueueRecorder) AddHistoryTasks( ctx context.Context, request *persistence.AddHistoryTasksRequest, ) error
AddHistoryTasks records the task write and then delegates to the underlying manager
func (*TaskQueueRecorder) AppendHistoryNodes ¶ added in v1.30.0
func (r *TaskQueueRecorder) AppendHistoryNodes( ctx context.Context, request *persistence.AppendHistoryNodesRequest, ) (*persistence.AppendHistoryNodesResponse, error)
func (*TaskQueueRecorder) AppendRawHistoryNodes ¶ added in v1.30.0
func (r *TaskQueueRecorder) AppendRawHistoryNodes( ctx context.Context, request *persistence.AppendRawHistoryNodesRequest, ) (*persistence.AppendHistoryNodesResponse, error)
func (*TaskQueueRecorder) Close ¶ added in v1.30.0
func (r *TaskQueueRecorder) Close()
func (*TaskQueueRecorder) CompleteHistoryTask ¶ added in v1.30.0
func (r *TaskQueueRecorder) CompleteHistoryTask( ctx context.Context, request *persistence.CompleteHistoryTaskRequest, ) error
func (*TaskQueueRecorder) ConflictResolveWorkflowExecution ¶ added in v1.30.0
func (r *TaskQueueRecorder) ConflictResolveWorkflowExecution( ctx context.Context, request *persistence.ConflictResolveWorkflowExecutionRequest, ) (*persistence.ConflictResolveWorkflowExecutionResponse, error)
func (*TaskQueueRecorder) CountMatchingTasks ¶ added in v1.30.0
func (r *TaskQueueRecorder) CountMatchingTasks(category tasks.Category, matcher TaskMatcher) int
CountMatchingTasks returns the count of tasks in a category that match the given matcher
func (*TaskQueueRecorder) CountTasksForNamespace ¶ added in v1.30.0
func (r *TaskQueueRecorder) CountTasksForNamespace( category tasks.Category, namespaceID string, matcher TaskMatcher, ) int
CountTasksForNamespace returns the count of tasks in a category for a specific namespace
func (*TaskQueueRecorder) CountTasksForWorkflow ¶ added in v1.30.0
func (r *TaskQueueRecorder) CountTasksForWorkflow( category tasks.Category, namespaceID string, workflowID string, runID string, matcher TaskMatcher, ) int
CountTasksForWorkflow returns the count of tasks in a category for a specific workflow If namespaceID is empty, it matches any namespace If runID is empty, it matches any runID for the given workflowID
func (*TaskQueueRecorder) CreateWorkflowExecution ¶ added in v1.30.0
func (r *TaskQueueRecorder) CreateWorkflowExecution( ctx context.Context, request *persistence.CreateWorkflowExecutionRequest, ) (*persistence.CreateWorkflowExecutionResponse, error)
func (*TaskQueueRecorder) DeleteCurrentWorkflowExecution ¶ added in v1.30.0
func (r *TaskQueueRecorder) DeleteCurrentWorkflowExecution( ctx context.Context, request *persistence.DeleteCurrentWorkflowExecutionRequest, ) error
func (*TaskQueueRecorder) DeleteHistoryBranch ¶ added in v1.30.0
func (r *TaskQueueRecorder) DeleteHistoryBranch( ctx context.Context, request *persistence.DeleteHistoryBranchRequest, ) error
func (*TaskQueueRecorder) DeleteReplicationTaskFromDLQ ¶ added in v1.30.0
func (r *TaskQueueRecorder) DeleteReplicationTaskFromDLQ( ctx context.Context, request *persistence.DeleteReplicationTaskFromDLQRequest, ) error
func (*TaskQueueRecorder) DeleteWorkflowExecution ¶ added in v1.30.0
func (r *TaskQueueRecorder) DeleteWorkflowExecution( ctx context.Context, request *persistence.DeleteWorkflowExecutionRequest, ) error
func (*TaskQueueRecorder) ForkHistoryBranch ¶ added in v1.30.0
func (r *TaskQueueRecorder) ForkHistoryBranch( ctx context.Context, request *persistence.ForkHistoryBranchRequest, ) (*persistence.ForkHistoryBranchResponse, error)
func (*TaskQueueRecorder) GetAllHistoryTreeBranches ¶ added in v1.30.0
func (r *TaskQueueRecorder) GetAllHistoryTreeBranches( ctx context.Context, request *persistence.GetAllHistoryTreeBranchesRequest, ) (*persistence.GetAllHistoryTreeBranchesResponse, error)
func (*TaskQueueRecorder) GetAllRecordedTasks ¶ added in v1.30.0
func (r *TaskQueueRecorder) GetAllRecordedTasks() map[tasks.Category][]RecordedTask
GetAllRecordedTasks returns all recorded tasks WITH metadata, grouped by category
func (*TaskQueueRecorder) GetAllTasks ¶ added in v1.30.0
func (r *TaskQueueRecorder) GetAllTasks() map[tasks.Category][]tasks.Task
GetAllTasks returns all tasks grouped by category (unwrapped, without metadata)
func (*TaskQueueRecorder) GetCurrentExecution ¶ added in v1.30.0
func (r *TaskQueueRecorder) GetCurrentExecution( ctx context.Context, request *persistence.GetCurrentExecutionRequest, ) (*persistence.GetCurrentExecutionResponse, error)
func (*TaskQueueRecorder) GetHistoryBranchUtil ¶ added in v1.30.0
func (r *TaskQueueRecorder) GetHistoryBranchUtil() persistence.HistoryBranchUtil
func (*TaskQueueRecorder) GetHistoryTasks ¶ added in v1.30.0
func (r *TaskQueueRecorder) GetHistoryTasks( ctx context.Context, request *persistence.GetHistoryTasksRequest, ) (*persistence.GetHistoryTasksResponse, error)
func (*TaskQueueRecorder) GetName ¶ added in v1.30.0
func (r *TaskQueueRecorder) GetName() string
func (*TaskQueueRecorder) GetRecordedTasksByCategoryFiltered ¶ added in v1.30.0
func (r *TaskQueueRecorder) GetRecordedTasksByCategoryFiltered(category tasks.Category, filter TaskFilter) []RecordedTask
GetRecordedTasksByCategoryFiltered returns recorded tasks WITH metadata for a specific category, filtered by namespace (required) and optionally by workflow ID and run ID. This is the preferred API for tests to ensure tasks are properly scoped.
func (*TaskQueueRecorder) GetReplicationTasksFromDLQ ¶ added in v1.30.0
func (r *TaskQueueRecorder) GetReplicationTasksFromDLQ( ctx context.Context, request *persistence.GetReplicationTasksFromDLQRequest, ) (*persistence.GetHistoryTasksResponse, error)
func (*TaskQueueRecorder) GetWorkflowExecution ¶ added in v1.30.0
func (r *TaskQueueRecorder) GetWorkflowExecution( ctx context.Context, request *persistence.GetWorkflowExecutionRequest, ) (*persistence.GetWorkflowExecutionResponse, error)
func (*TaskQueueRecorder) IsReplicationDLQEmpty ¶ added in v1.30.0
func (r *TaskQueueRecorder) IsReplicationDLQEmpty( ctx context.Context, request *persistence.GetReplicationTasksFromDLQRequest, ) (bool, error)
func (*TaskQueueRecorder) ListConcreteExecutions ¶ added in v1.30.0
func (r *TaskQueueRecorder) ListConcreteExecutions( ctx context.Context, request *persistence.ListConcreteExecutionsRequest, ) (*persistence.ListConcreteExecutionsResponse, error)
func (*TaskQueueRecorder) MatchTasks ¶ added in v1.30.0
func (r *TaskQueueRecorder) MatchTasks(category tasks.Category, matcher TaskMatcher) []RecordedTask
MatchTasks returns all tasks in a category that match the given matcher function
func (*TaskQueueRecorder) MatchTasksForNamespace ¶ added in v1.30.0
func (r *TaskQueueRecorder) MatchTasksForNamespace( category tasks.Category, namespaceID string, matcher TaskMatcher, ) []RecordedTask
MatchTasksForNamespace returns all tasks in a category for a specific namespace
func (*TaskQueueRecorder) MatchTasksForWorkflow ¶ added in v1.30.0
func (r *TaskQueueRecorder) MatchTasksForWorkflow( category tasks.Category, namespaceID string, workflowID string, runID string, matcher TaskMatcher, ) []RecordedTask
MatchTasksForWorkflow returns all tasks in a category for a specific workflow If namespaceID is empty, it matches any namespace If runID is empty, it matches any runID for the given workflowID
func (*TaskQueueRecorder) PutReplicationTaskToDLQ ¶ added in v1.30.0
func (r *TaskQueueRecorder) PutReplicationTaskToDLQ( ctx context.Context, request *persistence.PutReplicationTaskToDLQRequest, ) error
func (*TaskQueueRecorder) RangeCompleteHistoryTasks ¶ added in v1.30.0
func (r *TaskQueueRecorder) RangeCompleteHistoryTasks( ctx context.Context, request *persistence.RangeCompleteHistoryTasksRequest, ) error
func (*TaskQueueRecorder) RangeDeleteReplicationTaskFromDLQ ¶ added in v1.30.0
func (r *TaskQueueRecorder) RangeDeleteReplicationTaskFromDLQ( ctx context.Context, request *persistence.RangeDeleteReplicationTaskFromDLQRequest, ) error
func (*TaskQueueRecorder) ReadHistoryBranch ¶ added in v1.30.0
func (r *TaskQueueRecorder) ReadHistoryBranch( ctx context.Context, request *persistence.ReadHistoryBranchRequest, ) (*persistence.ReadHistoryBranchResponse, error)
func (*TaskQueueRecorder) ReadHistoryBranchByBatch ¶ added in v1.30.0
func (r *TaskQueueRecorder) ReadHistoryBranchByBatch( ctx context.Context, request *persistence.ReadHistoryBranchRequest, ) (*persistence.ReadHistoryBranchByBatchResponse, error)
func (*TaskQueueRecorder) ReadHistoryBranchReverse ¶ added in v1.30.0
func (r *TaskQueueRecorder) ReadHistoryBranchReverse( ctx context.Context, request *persistence.ReadHistoryBranchReverseRequest, ) (*persistence.ReadHistoryBranchReverseResponse, error)
func (*TaskQueueRecorder) ReadRawHistoryBranch ¶ added in v1.30.0
func (r *TaskQueueRecorder) ReadRawHistoryBranch( ctx context.Context, request *persistence.ReadHistoryBranchRequest, ) (*persistence.ReadRawHistoryBranchResponse, error)
func (*TaskQueueRecorder) SetWorkflowExecution ¶ added in v1.30.0
func (r *TaskQueueRecorder) SetWorkflowExecution( ctx context.Context, request *persistence.SetWorkflowExecutionRequest, ) (*persistence.SetWorkflowExecutionResponse, error)
func (*TaskQueueRecorder) TrimHistoryBranch ¶ added in v1.30.0
func (r *TaskQueueRecorder) TrimHistoryBranch( ctx context.Context, request *persistence.TrimHistoryBranchRequest, ) (*persistence.TrimHistoryBranchResponse, error)
func (*TaskQueueRecorder) UpdateWorkflowExecution ¶ added in v1.30.0
func (r *TaskQueueRecorder) UpdateWorkflowExecution( ctx context.Context, request *persistence.UpdateWorkflowExecutionRequest, ) (*persistence.UpdateWorkflowExecutionResponse, error)
func (*TaskQueueRecorder) WriteToLog ¶ added in v1.30.0
func (r *TaskQueueRecorder) WriteToLog(filePath string) error
WriteToLog writes all captured tasks to a file in JSON format
type TemporalImpl ¶
type TemporalImpl struct {
// contains filtered or unexported fields
}
func (*TemporalImpl) AdminClient ¶
func (c *TemporalImpl) AdminClient() adminservice.AdminServiceClient
func (*TemporalImpl) Authorize ¶
func (c *TemporalImpl) Authorize( ctx context.Context, caller *authorization.Claims, target *authorization.CallTarget, ) (authorization.Result, error)
func (*TemporalImpl) CaptureMetricsHandler
deprecated
func (c *TemporalImpl) CaptureMetricsHandler() *metricstest.CaptureHandler
Deprecated: metric capture is cluster-global. Use (*TestEnv).StartGlobalMetricCapture() or (*TestEnv).StartNamespaceMetricCapture() instead.
func (*TemporalImpl) ChasmEngine ¶ added in v1.29.0
func (c *TemporalImpl) ChasmEngine() (chasm.Engine, error)
func (*TemporalImpl) ChasmVisibilityManager ¶ added in v1.30.0
func (c *TemporalImpl) ChasmVisibilityManager() chasm.VisibilityManager
func (*TemporalImpl) DcClient ¶
func (c *TemporalImpl) DcClient() *dynamicconfig.MemoryClient
func (*TemporalImpl) FrontendClient ¶
func (c *TemporalImpl) FrontendClient() workflowservice.WorkflowServiceClient
func (*TemporalImpl) FrontendGRPCAddress ¶
func (c *TemporalImpl) FrontendGRPCAddress() string
func (*TemporalImpl) FrontendHTTPAddress ¶
func (c *TemporalImpl) FrontendHTTPAddress() string
func (*TemporalImpl) GetCHASMRegistry ¶ added in v1.29.0
func (c *TemporalImpl) GetCHASMRegistry() *chasm.Registry
func (*TemporalImpl) GetClaims ¶
func (c *TemporalImpl) GetClaims(authInfo *authorization.AuthInfo) (*authorization.Claims, error)
func (*TemporalImpl) GetExecutionManager ¶
func (c *TemporalImpl) GetExecutionManager() persistence.ExecutionManager
func (*TemporalImpl) GetGrpcClientInterceptor ¶ added in v1.29.0
func (c *TemporalImpl) GetGrpcClientInterceptor() *grpcinject.Interceptor
func (*TemporalImpl) GetMetricsHandler ¶
func (c *TemporalImpl) GetMetricsHandler() metrics.Handler
func (*TemporalImpl) GetTLSConfigProvider ¶
func (c *TemporalImpl) GetTLSConfigProvider() encryption.TLSConfigProvider
func (*TemporalImpl) GetTaskCategoryRegistry ¶
func (c *TemporalImpl) GetTaskCategoryRegistry() tasks.TaskCategoryRegistry
func (*TemporalImpl) GetTaskQueueRecorder ¶ added in v1.30.0
func (c *TemporalImpl) GetTaskQueueRecorder() *TaskQueueRecorder
func (*TemporalImpl) HistoryClient ¶
func (c *TemporalImpl) HistoryClient() historyservice.HistoryServiceClient
func (*TemporalImpl) MatchingClient ¶
func (c *TemporalImpl) MatchingClient() matchingservice.MatchingServiceClient
func (*TemporalImpl) NamespaceRegistries ¶ added in v1.27.0
func (c *TemporalImpl) NamespaceRegistries() []namespace.Registry
func (*TemporalImpl) OperatorClient ¶
func (c *TemporalImpl) OperatorClient() operatorservice.OperatorServiceClient
func (*TemporalImpl) RemoteFrontendGRPCAddress ¶
func (c *TemporalImpl) RemoteFrontendGRPCAddress() string
Use this to get an address for a remote cluster to connect to.
func (*TemporalImpl) SchedulerClient ¶ added in v1.31.0
func (c *TemporalImpl) SchedulerClient() schedulerpb.SchedulerServiceClient
func (*TemporalImpl) SetOnAuthorize ¶
func (c *TemporalImpl) SetOnAuthorize( fn func(context.Context, *authorization.Claims, *authorization.CallTarget) (authorization.Result, error), )
func (*TemporalImpl) SetOnGetClaims ¶
func (c *TemporalImpl) SetOnGetClaims(fn func(*authorization.AuthInfo) (*authorization.Claims, error))
func (*TemporalImpl) SetTaskQueueRecorder ¶ added in v1.30.0
func (c *TemporalImpl) SetTaskQueueRecorder(recorder *TaskQueueRecorder)
func (*TemporalImpl) Start ¶
func (c *TemporalImpl) Start() error
func (*TemporalImpl) Stop ¶
func (c *TemporalImpl) Stop() error
func (*TemporalImpl) TlsConfigProvider ¶
func (c *TemporalImpl) TlsConfigProvider() *encryption.FixedTLSConfigProvider
func (*TemporalImpl) WorkerGRPCAddress ¶ added in v1.31.0
func (c *TemporalImpl) WorkerGRPCAddress() string
type TemporalParams ¶
type TemporalParams struct {
ClusterMetadataConfig *cluster.Config
PersistenceConfig config.Persistence
MetadataMgr persistence.MetadataManager
ClusterMetadataManager persistence.ClusterMetadataManager
ShardMgr persistence.ShardManager
ExecutionManager persistence.ExecutionManager
TaskMgr persistence.TaskManager
NamespaceReplicationQueue persistence.NamespaceReplicationQueue
AbstractDataStoreFactory persistenceClient.AbstractDataStoreFactory
VisibilityStoreFactory visibility.VisibilityStoreFactory
Logger log.Logger
ArchiverMetadata carchiver.ArchivalMetadata
ArchiverProvider provider.ArchiverProvider
EnableReadHistoryFromArchival bool
FrontendConfig FrontendConfig
HistoryConfig HistoryConfig
MatchingConfig MatchingConfig
WorkerConfig WorkerConfig
ESConfig *esclient.Config
ESClient esclient.Client
MockAdminClient map[string]adminservice.AdminServiceClient
NamespaceReplicationTaskExecutor nsreplication.TaskExecutor
DynamicConfigOverrides map[dynamicconfig.Key]any
TLSConfigProvider *encryption.FixedTLSConfigProvider
CaptureMetricsHandler *metricstest.CaptureHandler
// ServiceFxOptions is populated by WithFxOptionsForService.
ServiceFxOptions map[primitives.ServiceName][]fx.Option
TaskCategoryRegistry tasks.TaskCategoryRegistry
HostsByProtocolByService map[transferProtocol]map[primitives.ServiceName]static.Hosts
SpanExporters map[telemetry.SpanExporterType]sdktrace.SpanExporter
}
TemporalParams contains everything needed to bootstrap Temporal
type TestCluster ¶
type TestCluster struct {
// contains filtered or unexported fields
}
TestCluster is a testcore struct for functional tests
func (*TestCluster) AdminClient ¶
func (tc *TestCluster) AdminClient() adminservice.AdminServiceClient
func (*TestCluster) ArchiverBase ¶ added in v1.27.0
func (tc *TestCluster) ArchiverBase() *ArchiverBase
func (*TestCluster) ClusterName ¶ added in v1.27.0
func (tc *TestCluster) ClusterName() string
func (*TestCluster) ExecutionManager ¶
func (tc *TestCluster) ExecutionManager() persistence.ExecutionManager
ExecutionManager returns an execution manager factory from the test cluster
func (*TestCluster) FrontendClient ¶
func (tc *TestCluster) FrontendClient() workflowservice.WorkflowServiceClient
func (*TestCluster) GetReplicationStreamRecorder ¶ added in v1.30.0
func (tc *TestCluster) GetReplicationStreamRecorder() *ReplicationStreamRecorder
func (*TestCluster) GetTaskQueueRecorder ¶ added in v1.30.0
func (tc *TestCluster) GetTaskQueueRecorder() *TaskQueueRecorder
func (*TestCluster) HistoryClient ¶
func (tc *TestCluster) HistoryClient() historyservice.HistoryServiceClient
HistoryClient returns a history client from the test cluster
func (*TestCluster) Host ¶
func (tc *TestCluster) Host() *TemporalImpl
TODO (alex): expose only needed objects from TemporalImpl.
func (*TestCluster) MatchingClient ¶
func (tc *TestCluster) MatchingClient() matchingservice.MatchingServiceClient
MatchingClient returns a matching client from the test cluster
func (*TestCluster) OperatorClient ¶
func (tc *TestCluster) OperatorClient() operatorservice.OperatorServiceClient
func (*TestCluster) OverrideDynamicConfig ¶
func (tc *TestCluster) OverrideDynamicConfig(t *testing.T, key dynamicconfig.GenericSetting, value any) (cleanup func())
func (*TestCluster) SchedulerClient ¶ added in v1.31.0
func (tc *TestCluster) SchedulerClient() schedulerpb.SchedulerServiceClient
SchedulerClient returns a scheduler client from the test cluster
func (*TestCluster) TearDownCluster ¶
func (tc *TestCluster) TearDownCluster() error
TearDownCluster tears down the test cluster
func (*TestCluster) TestBase ¶
func (tc *TestCluster) TestBase() *persistencetests.TestBase
TODO (alex): remove this method. Replace usages with concrete methods.
func (*TestCluster) WorkerGRPCAddress ¶ added in v1.31.0
func (tc *TestCluster) WorkerGRPCAddress() string
type TestClusterConfig ¶
type TestClusterConfig struct {
EnableArchival bool
IsMasterCluster bool
ClusterMetadata cluster.Config
Persistence persistencetests.TestBaseOptions
FrontendConfig FrontendConfig
HistoryConfig HistoryConfig
MatchingConfig MatchingConfig
WorkerConfig WorkerConfig
ESConfig *esclient.Config
MockAdminClient map[string]adminservice.AdminServiceClient
FaultInjection *config.FaultInjection
DynamicConfigOverrides map[dynamicconfig.Key]any
EnableMTLS bool
EnableMetricsCapture bool
SpanExporters map[telemetry.SpanExporterType]sdktrace.SpanExporter
CustomHistoryArchiverFactory provider.CustomHistoryArchiverFactory
CustomVisibilityArchiverFactory provider.CustomVisibilityArchiverFactory
// ServiceFxOptions can be populated using WithFxOptionsForService.
ServiceFxOptions map[primitives.ServiceName][]fx.Option
}
TestClusterConfig are config for a test cluster
type TestClusterFactory ¶
type TestClusterFactory interface {
NewCluster(t *testing.T, clusterConfig *TestClusterConfig, logger log.Logger) (*TestCluster, error)
}
func NewTestClusterFactory ¶
func NewTestClusterFactory() TestClusterFactory
func NewTestClusterFactoryWithCustomTestBaseFactory ¶
func NewTestClusterFactoryWithCustomTestBaseFactory(tbFactory PersistenceTestBaseFactory) TestClusterFactory
type TestClusterOption ¶ added in v1.27.0
type TestClusterOption func(params *TestClusterParams)
func WithArchivalEnabled ¶ added in v1.27.0
func WithArchivalEnabled() TestClusterOption
func WithCustomHistoryArchiverFactory ¶ added in v1.31.0
func WithCustomHistoryArchiverFactory(factory provider.CustomHistoryArchiverFactory) TestClusterOption
func WithCustomVisibilityArchiverFactory ¶ added in v1.31.0
func WithCustomVisibilityArchiverFactory(factory provider.CustomVisibilityArchiverFactory) TestClusterOption
func WithDynamicConfigOverrides ¶ added in v1.27.0
func WithDynamicConfigOverrides(overrides map[dynamicconfig.Key]any) TestClusterOption
func WithFaultInjectionConfig ¶ added in v1.28.0
func WithFaultInjectionConfig(cfg *config.FaultInjection) TestClusterOption
func WithFxOptionsForService ¶
func WithFxOptionsForService(serviceName primitives.ServiceName, options ...fx.Option) TestClusterOption
WithFxOptionsForService returns an Option which, when passed as an argument to setupSuite, will append the given list of fx options to the end of the arguments to the fx.New call for the given service. For example, if you want to obtain the shard controller for the history service, you can do this:
var shardController shard.Controller s.setupSuite(t, tests.WithFxOptionsForService(primitives.HistoryService, fx.Populate(&shardController))) // now you can use shardController during your test
This is similar to the pattern of plumbing dependencies through the TestClusterConfig, but it's much more convenient, scalable and flexible. The reason we need to do this on a per-service basis is that there are separate fx apps for each one.
func WithMTLS ¶ added in v1.28.0
func WithMTLS() TestClusterOption
func WithNumHistoryShards ¶ added in v1.28.0
func WithNumHistoryShards(n int32) TestClusterOption
func WithSharedCluster ¶ added in v1.31.0
func WithSharedCluster() TestClusterOption
type TestClusterParams ¶
type TestClusterParams struct {
ServiceOptions map[primitives.ServiceName][]fx.Option
DynamicConfigOverrides map[dynamicconfig.Key]any
ArchivalEnabled bool
EnableMTLS bool
FaultInjectionConfig *config.FaultInjection
NumHistoryShards int32
CustomHistoryArchiverFactory provider.CustomHistoryArchiverFactory
CustomVisibilityArchiverFactory provider.CustomVisibilityArchiverFactory
}
TestClusterParams contains the variables which are used to configure test cluster via the TestClusterOption type.
func ApplyTestClusterOptions ¶ added in v1.27.0
func ApplyTestClusterOptions(options []TestClusterOption) TestClusterParams
type TestDataConverter ¶
type TestDataConverter struct {
NumOfCallToPayloads int // for testing to know testDataConverter is called as expected
NumOfCallFromPayloads int
}
TestDataConverter implements encoded.DataConverter using gob
func (*TestDataConverter) FromPayload ¶
func (tdc *TestDataConverter) FromPayload(payload *commonpb.Payload, valuePtr any) error
func (*TestDataConverter) FromPayloads ¶
func (tdc *TestDataConverter) FromPayloads(payloads *commonpb.Payloads, valuePtrs ...any) error
func (*TestDataConverter) ToPayload ¶
func (tdc *TestDataConverter) ToPayload(value any) (*commonpb.Payload, error)
func (*TestDataConverter) ToPayloads ¶
func (tdc *TestDataConverter) ToPayloads(values ...any) (*commonpb.Payloads, error)
type TestEnv ¶ added in v1.31.0
type TestEnv struct {
*FunctionalTestBase
// Shadows FunctionalTestBase.Assertions with a per-test instance bound to
// this TestEnv's own *testing.T, avoiding data races when parallel tests
// share the same *FunctionalTestBase cluster.
// TODO: remove once all tests are migrated to TestEnv (and no longer use FunctionalTestBase directly).
*require.Assertions
Logger log.Logger
// contains filtered or unexported fields
}
func NewEnv ¶ added in v1.31.0
func NewEnv(t *testing.T, opts ...TestOption) *TestEnv
NewEnv creates a new test environment with access to a Temporal cluster.
func (*TestEnv) Context ¶ added in v1.31.0
Context returns the test-level timeout context with RPC version headers already included. This context will be canceled when the test timeout occurs. Use this directly for all RPC operations - no need to wrap with NewContext or add headers manually.
For custom timeouts, use:
ctx, cancel := context.WithTimeout(env.Context(), 10*time.Second) defer cancel()
func (*TestEnv) Error ¶ added in v1.31.0
Error asserts that err is not nil. Deprecated: use require.Error with the parent test or suite instead. TODO: remove once all tests are migrated to TestEnv (and no longer use FunctionalTestBase directly).
func (*TestEnv) InjectHook ¶ added in v1.31.0
InjectHook sets a test hook inside the cluster.
It auto-detects the scope from the hook: - For namespace-scoped hooks: scopes it to the test's namespace - For global hooks: requires a dedicated cluster (fails early if used on shared cluster)
func (*TestEnv) Namespace ¶ added in v1.31.0
Use test env-specific namespace here for test isolation.
func (*TestEnv) NamespaceID ¶ added in v1.31.0
func (*TestEnv) NoError ¶ added in v1.31.0
NoError asserts that err is nil. Deprecated: use require.NoError with the parent test or suite instead. TODO: remove once all tests are migrated to TestEnv (and no longer use FunctionalTestBase directly).
func (*TestEnv) OverrideDynamicConfig ¶ added in v1.31.0
func (e *TestEnv) OverrideDynamicConfig(setting dynamicconfig.GenericSetting, value any) (cleanup func())
OverrideDynamicConfig overrides a dynamic config setting for the duration of this test. For settings that can be namespace-scoped, a namespace constraint is applied. All others cannot be applied to a shared cluster and require `WithDedicatedCluster`.
func (*TestEnv) Run ¶ added in v1.31.0
Run executes a subtest. Deprecated: use the suite's Run method instead. TODO: remove once all tests are migrated to TestEnv (and no longer use FunctionalTestBase directly).
func (*TestEnv) SdkClient ¶ added in v1.31.0
SdkClient returns the SDK client. It is lazily initialized on the first call.
func (*TestEnv) SdkWorker ¶ added in v1.31.0
SdkWorker returns the SDK worker. It is lazily initialized on the first call.
func (*TestEnv) StartGlobalMetricCapture ¶ added in v1.31.0
func (e *TestEnv) StartGlobalMetricCapture() *GlobalMetricCapture
StartGlobalMetricCapture starts a cluster-global metrics capture for this test and automatically stops it during cleanup. Metric capture is cluster-global, so it is only safe on dedicated clusters. Misuse detection is best-effort and only applies to queried metrics that produced recordings.
func (*TestEnv) StartNamespaceMetricCapture ¶ added in v1.31.0
func (e *TestEnv) StartNamespaceMetricCapture() *NamespaceMetricCapture
StartNamespaceMetricCapture starts a metrics capture that only allows safe per-metric namespace filtering. Namespace captures are safe on shared clusters because reads are restricted to per-metric namespace-filtered iteration and reject non-namespaced metrics.
func (*TestEnv) T ¶ added in v1.31.0
T returns the *testing.T. Deprecated: use the suite's T() method instead.
func (*TestEnv) TaskPoller ¶ added in v1.31.0
func (e *TestEnv) TaskPoller() *taskpoller.TaskPoller
func (*TestEnv) WorkerTaskQueue ¶ added in v1.31.0
WorkerTaskQueue returns the task queue name used by the SDK Worker.
type TestOption ¶ added in v1.31.0
type TestOption func(*testOptions)
func WithDedicatedCluster ¶ added in v1.31.0
func WithDedicatedCluster() TestOption
WithDedicatedCluster requests a dedicated (non-shared) cluster for the test. Use this for tests that have cluster-global side effects.
func WithDynamicConfig ¶ added in v1.31.0
func WithDynamicConfig(setting dynamicconfig.GenericSetting, value any) TestOption
WithDynamicConfig overrides a dynamic config setting for the test. For settings that can be namespace-scoped, a namespace constraint is applied. For all others that require a dedicated cluster, this implies `WithDedicatedCluster`.
func WithSdkWorker
deprecated
added in
v1.31.0
func WithSdkWorker() TestOption
Deprecated: this option is no longer required and will be removed once all callers have been updated.
func WithTimeout ¶ added in v1.31.0
func WithTimeout(duration time.Duration) TestOption
WithTimeout sets a custom timeout for the test. The test will fail if it runs longer than this duration. The timeout is multiplied by debug.TimeoutMultiplier when debugging. The TEMPORAL_TEST_TIMEOUT environment variable can also set the default timeout in seconds.
type WorkerConfig ¶
WorkerConfig is the config for the worker service
type WorkflowTaskHandler ¶
type WorkflowTaskHandler func(task *workflowservice.PollWorkflowTaskQueueResponse) ([]*commandpb.Command, error)