Documentation
¶
Index ¶
- Constants
- Variables
- func DecodeString(t require.TestingT, pls *commonpb.Payloads) string
- func EventBatchesToVersionHistory(versionHistory *historyspb.VersionHistory, eventBatches []*historypb.History) (*historyspb.VersionHistory, error)
- func ExtractReplicationMessages(msg proto.Message) *replicationspb.WorkflowReplicationMessages
- func GetPersistenceTestDefaults() persistencetests.TestBaseOptions
- func MustRunSequential(t *testing.T, reason string)
- func MustToPayload(t require.TestingT, v any) *commonpb.Payload
- func NewContext(parent ...context.Context) context.Context
- func NewTestDataConverter() converter.DataConverter
- func RandomizeStr(id string) string
- func RandomizedNexusEndpoint(name string) string
- func UseCassandraPersistence() bool
- func UseSQLVisibility() bool
- func WithDropTask(o *PollAndProcessWorkflowTaskOptions)
- func WithDumpHistory(o *PollAndProcessWorkflowTaskOptions)
- func WithForceNewWorkflowTask(o *PollAndProcessWorkflowTaskOptions)
- func WithNoDumpCommands(o *PollAndProcessWorkflowTaskOptions)
- func WithPollSticky(o *PollAndProcessWorkflowTaskOptions)
- func WithRespondSticky(o *PollAndProcessWorkflowTaskOptions)
- func WithoutRetries(o *PollAndProcessWorkflowTaskOptions)
- type ActivityTaskHandler
- type ArchiverBase
- type CapturedReplicationMessage
- type Env
- type FrontendConfig
- type FunctionalTestBase
- func (s *FunctionalTestBase) AdminClient() adminservice.AdminServiceClient
- func (s *FunctionalTestBase) CloseShard(namespaceID string, workflowID string)
- func (s *FunctionalTestBase) Context() context.Context
- func (s *FunctionalTestBase) DecodePayloadsByteSliceInt32(ps *commonpb.Payloads) (r int32)
- func (s *FunctionalTestBase) DecodePayloadsInt(ps *commonpb.Payloads) int
- func (s *FunctionalTestBase) DecodePayloadsString(ps *commonpb.Payloads) string
- func (s *FunctionalTestBase) DurationNear(value, target, tolerance time.Duration)
- func (s *FunctionalTestBase) ExternalNamespace() namespace.Name
- func (s *FunctionalTestBase) FrontendClient() workflowservice.WorkflowServiceClient
- func (s *FunctionalTestBase) FrontendGRPCAddress() string
- func (s *FunctionalTestBase) GetHistory(namespace string, execution *commonpb.WorkflowExecution) []*historypb.HistoryEvent
- func (s *FunctionalTestBase) GetHistoryFunc(namespace string, execution *commonpb.WorkflowExecution) func() []*historypb.HistoryEvent
- func (s *FunctionalTestBase) GetNamespaceID(namespace string) string
- func (s *FunctionalTestBase) GetTestCluster() *TestCluster
- func (s *FunctionalTestBase) GetTestClusterConfig() *TestClusterConfig
- func (s *FunctionalTestBase) HttpAPIAddress() string
- func (s *FunctionalTestBase) InjectHook(hook testhooks.Hook) (cleanup func())
- func (s *FunctionalTestBase) MarkNamespaceAsDeleted(nsName namespace.Name) error
- func (s *FunctionalTestBase) Namespace() namespace.Name
- func (s *FunctionalTestBase) NamespaceID() namespace.ID
- func (s *FunctionalTestBase) OperatorClient() operatorservice.OperatorServiceClient
- func (s *FunctionalTestBase) OverrideDynamicConfig(setting dynamicconfig.GenericSetting, value any) (cleanup func())
- func (s *FunctionalTestBase) RegisterNamespace(nsName namespace.Name, retentionDays int32, ...) (namespace.ID, error)
- func (s *FunctionalTestBase) RunTestWithMatchingBehavior(subtest func())
- func (s *FunctionalTestBase) SdkClient() sdkclient.Client
- func (s *FunctionalTestBase) SdkWorker() sdkworker.Worker
- func (s *FunctionalTestBase) SendSignal(nsName string, execution *commonpb.WorkflowExecution, signalName string, ...) error
- func (s *FunctionalTestBase) SetupSubTest()
- func (s *FunctionalTestBase) SetupSuite()
- func (s *FunctionalTestBase) SetupSuiteWithCluster(options ...TestClusterOption)
- func (s *FunctionalTestBase) SetupTest()
- func (s *FunctionalTestBase) TaskPoller() *taskpoller.TaskPoller
- func (s *FunctionalTestBase) TaskQueue() string
- func (s *FunctionalTestBase) TearDownCluster()
- func (s *FunctionalTestBase) TearDownSubTest()
- func (s *FunctionalTestBase) TearDownSuite()
- func (s *FunctionalTestBase) TearDownTest()
- func (s *FunctionalTestBase) WaitForChannel(ctx context.Context, ch chan struct{})
- func (s *FunctionalTestBase) WorkerGRPCAddress() string
- type HistoryConfig
- type MatchingBehavior
- type MatchingConfig
- type MessageHandler
- type PersistenceTestBaseFactory
- type PollAndProcessWorkflowTaskOptionFunc
- type PollAndProcessWorkflowTaskOptions
- type PollAndProcessWorkflowTaskResponse
- type QueryHandler
- type RecordedTask
- type ReplicationStreamRecorder
- func (r *ReplicationStreamRecorder) Clear()
- func (r *ReplicationStreamRecorder) GetMessages() []CapturedReplicationMessage
- func (r *ReplicationStreamRecorder) SetOutputFile(filePath string)
- func (r *ReplicationStreamRecorder) StreamInterceptor(clusterName string) grpc.StreamClientInterceptor
- func (r *ReplicationStreamRecorder) StreamServerInterceptor(clusterName string) grpc.StreamServerInterceptor
- func (r *ReplicationStreamRecorder) UnaryInterceptor(clusterName string) grpc.UnaryClientInterceptor
- func (r *ReplicationStreamRecorder) UnaryServerInterceptor(clusterName string) grpc.UnaryServerInterceptor
- func (r *ReplicationStreamRecorder) WriteToLog() error
- type TaskFilter
- type TaskMatcher
- type TaskPollerdeprecated
- func (p *TaskPoller) HandlePartialWorkflowTask(response *workflowservice.PollWorkflowTaskQueueResponse, ...) (*workflowservice.RespondWorkflowTaskCompletedResponse, error)
- func (p *TaskPoller) PollAndProcessActivityTask(dropTask bool) error
- func (p *TaskPoller) PollAndProcessActivityTaskWithID(dropTask bool) error
- func (p *TaskPoller) PollAndProcessWorkflowTask(funcs ...PollAndProcessWorkflowTaskOptionFunc) (res PollAndProcessWorkflowTaskResponse, err error)
- func (p *TaskPoller) PollAndProcessWorkflowTaskWithOptions(opts *PollAndProcessWorkflowTaskOptions) (res PollAndProcessWorkflowTaskResponse, err error)
- type TaskQueueRecorder
- func (r *TaskQueueRecorder) AddHistoryTasks(ctx context.Context, request *persistence.AddHistoryTasksRequest) error
- func (r *TaskQueueRecorder) AppendHistoryNodes(ctx context.Context, request *persistence.AppendHistoryNodesRequest) (*persistence.AppendHistoryNodesResponse, error)
- func (r *TaskQueueRecorder) AppendRawHistoryNodes(ctx context.Context, request *persistence.AppendRawHistoryNodesRequest) (*persistence.AppendHistoryNodesResponse, error)
- func (r *TaskQueueRecorder) Close()
- func (r *TaskQueueRecorder) CompleteHistoryTask(ctx context.Context, request *persistence.CompleteHistoryTaskRequest) error
- func (r *TaskQueueRecorder) ConflictResolveWorkflowExecution(ctx context.Context, ...) (*persistence.ConflictResolveWorkflowExecutionResponse, error)
- func (r *TaskQueueRecorder) CountMatchingTasks(category tasks.Category, matcher TaskMatcher) int
- func (r *TaskQueueRecorder) CountTasksForNamespace(category tasks.Category, namespaceID string, matcher TaskMatcher) int
- func (r *TaskQueueRecorder) CountTasksForWorkflow(category tasks.Category, namespaceID string, workflowID string, runID string, ...) int
- func (r *TaskQueueRecorder) CreateWorkflowExecution(ctx context.Context, request *persistence.CreateWorkflowExecutionRequest) (*persistence.CreateWorkflowExecutionResponse, error)
- func (r *TaskQueueRecorder) DeleteCurrentWorkflowExecution(ctx context.Context, ...) error
- func (r *TaskQueueRecorder) DeleteHistoryBranch(ctx context.Context, request *persistence.DeleteHistoryBranchRequest) error
- func (r *TaskQueueRecorder) DeleteReplicationTaskFromDLQ(ctx context.Context, request *persistence.DeleteReplicationTaskFromDLQRequest) error
- func (r *TaskQueueRecorder) DeleteWorkflowExecution(ctx context.Context, request *persistence.DeleteWorkflowExecutionRequest) error
- func (r *TaskQueueRecorder) ForkHistoryBranch(ctx context.Context, request *persistence.ForkHistoryBranchRequest) (*persistence.ForkHistoryBranchResponse, error)
- func (r *TaskQueueRecorder) GetAllHistoryTreeBranches(ctx context.Context, request *persistence.GetAllHistoryTreeBranchesRequest) (*persistence.GetAllHistoryTreeBranchesResponse, error)
- func (r *TaskQueueRecorder) GetAllRecordedTasks() map[tasks.Category][]RecordedTask
- func (r *TaskQueueRecorder) GetAllTasks() map[tasks.Category][]tasks.Task
- func (r *TaskQueueRecorder) GetCurrentExecution(ctx context.Context, request *persistence.GetCurrentExecutionRequest) (*persistence.GetCurrentExecutionResponse, error)
- func (r *TaskQueueRecorder) GetHistoryBranchUtil() persistence.HistoryBranchUtil
- func (r *TaskQueueRecorder) GetHistoryTasks(ctx context.Context, request *persistence.GetHistoryTasksRequest) (*persistence.GetHistoryTasksResponse, error)
- func (r *TaskQueueRecorder) GetName() string
- func (r *TaskQueueRecorder) GetRecordedTasksByCategoryFiltered(category tasks.Category, filter TaskFilter) []RecordedTask
- func (r *TaskQueueRecorder) GetReplicationTasksFromDLQ(ctx context.Context, request *persistence.GetReplicationTasksFromDLQRequest) (*persistence.GetHistoryTasksResponse, error)
- func (r *TaskQueueRecorder) GetWorkflowExecution(ctx context.Context, request *persistence.GetWorkflowExecutionRequest) (*persistence.GetWorkflowExecutionResponse, error)
- func (r *TaskQueueRecorder) IsReplicationDLQEmpty(ctx context.Context, request *persistence.GetReplicationTasksFromDLQRequest) (bool, error)
- func (r *TaskQueueRecorder) ListConcreteExecutions(ctx context.Context, request *persistence.ListConcreteExecutionsRequest) (*persistence.ListConcreteExecutionsResponse, error)
- func (r *TaskQueueRecorder) MatchTasks(category tasks.Category, matcher TaskMatcher) []RecordedTask
- func (r *TaskQueueRecorder) MatchTasksForNamespace(category tasks.Category, namespaceID string, matcher TaskMatcher) []RecordedTask
- func (r *TaskQueueRecorder) MatchTasksForWorkflow(category tasks.Category, namespaceID string, workflowID string, runID string, ...) []RecordedTask
- func (r *TaskQueueRecorder) PutReplicationTaskToDLQ(ctx context.Context, request *persistence.PutReplicationTaskToDLQRequest) error
- func (r *TaskQueueRecorder) RangeCompleteHistoryTasks(ctx context.Context, request *persistence.RangeCompleteHistoryTasksRequest) error
- func (r *TaskQueueRecorder) RangeDeleteReplicationTaskFromDLQ(ctx context.Context, ...) error
- func (r *TaskQueueRecorder) ReadHistoryBranch(ctx context.Context, request *persistence.ReadHistoryBranchRequest) (*persistence.ReadHistoryBranchResponse, error)
- func (r *TaskQueueRecorder) ReadHistoryBranchByBatch(ctx context.Context, request *persistence.ReadHistoryBranchRequest) (*persistence.ReadHistoryBranchByBatchResponse, error)
- func (r *TaskQueueRecorder) ReadHistoryBranchReverse(ctx context.Context, request *persistence.ReadHistoryBranchReverseRequest) (*persistence.ReadHistoryBranchReverseResponse, error)
- func (r *TaskQueueRecorder) ReadRawHistoryBranch(ctx context.Context, request *persistence.ReadHistoryBranchRequest) (*persistence.ReadRawHistoryBranchResponse, error)
- func (r *TaskQueueRecorder) SetWorkflowExecution(ctx context.Context, request *persistence.SetWorkflowExecutionRequest) (*persistence.SetWorkflowExecutionResponse, error)
- func (r *TaskQueueRecorder) TrimHistoryBranch(ctx context.Context, request *persistence.TrimHistoryBranchRequest) (*persistence.TrimHistoryBranchResponse, error)
- func (r *TaskQueueRecorder) UpdateWorkflowExecution(ctx context.Context, request *persistence.UpdateWorkflowExecutionRequest) (*persistence.UpdateWorkflowExecutionResponse, error)
- func (r *TaskQueueRecorder) WriteToLog(filePath string) error
- type TemporalImpl
- func (c *TemporalImpl) AdminClient() adminservice.AdminServiceClient
- func (c *TemporalImpl) Authorize(ctx context.Context, caller *authorization.Claims, ...) (authorization.Result, error)
- func (c *TemporalImpl) CaptureMetricsHandler() *metricstest.CaptureHandler
- func (c *TemporalImpl) ChasmEngine() (chasm.Engine, error)
- func (c *TemporalImpl) ChasmVisibilityManager() chasm.VisibilityManager
- func (c *TemporalImpl) DcClient() *dynamicconfig.MemoryClient
- func (c *TemporalImpl) FrontendClient() workflowservice.WorkflowServiceClient
- func (c *TemporalImpl) FrontendGRPCAddress() string
- func (c *TemporalImpl) FrontendHTTPAddress() string
- func (c *TemporalImpl) GetCHASMRegistry() *chasm.Registry
- func (c *TemporalImpl) GetClaims(authInfo *authorization.AuthInfo) (*authorization.Claims, error)
- func (c *TemporalImpl) GetExecutionManager() persistence.ExecutionManager
- func (c *TemporalImpl) GetGrpcClientInterceptor() *grpcinject.Interceptor
- func (c *TemporalImpl) GetMetricsHandler() metrics.Handler
- func (c *TemporalImpl) GetTLSConfigProvider() encryption.TLSConfigProvider
- func (c *TemporalImpl) GetTaskCategoryRegistry() tasks.TaskCategoryRegistry
- func (c *TemporalImpl) GetTaskQueueRecorder() *TaskQueueRecorder
- func (c *TemporalImpl) HistoryClient() historyservice.HistoryServiceClient
- func (c *TemporalImpl) MatchingClient() matchingservice.MatchingServiceClient
- func (c *TemporalImpl) NamespaceRegistries() []namespace.Registry
- func (c *TemporalImpl) OperatorClient() operatorservice.OperatorServiceClient
- func (c *TemporalImpl) RemoteFrontendGRPCAddress() string
- func (c *TemporalImpl) SchedulerClient() schedulerpb.SchedulerServiceClient
- func (c *TemporalImpl) SetOnAuthorize(...)
- func (c *TemporalImpl) SetOnGetClaims(fn func(*authorization.AuthInfo) (*authorization.Claims, error))
- func (c *TemporalImpl) SetTaskQueueRecorder(recorder *TaskQueueRecorder)
- func (c *TemporalImpl) Start() error
- func (c *TemporalImpl) Stop() error
- func (c *TemporalImpl) TlsConfigProvider() *encryption.FixedTLSConfigProvider
- func (c *TemporalImpl) WorkerGRPCAddress() string
- type TemporalParams
- type TestCluster
- func (tc *TestCluster) AdminClient() adminservice.AdminServiceClient
- func (tc *TestCluster) ArchiverBase() *ArchiverBase
- func (tc *TestCluster) ClusterName() string
- func (tc *TestCluster) ExecutionManager() persistence.ExecutionManager
- func (tc *TestCluster) FrontendClient() workflowservice.WorkflowServiceClient
- func (tc *TestCluster) GetReplicationStreamRecorder() *ReplicationStreamRecorder
- func (tc *TestCluster) GetTaskQueueRecorder() *TaskQueueRecorder
- func (tc *TestCluster) HistoryClient() historyservice.HistoryServiceClient
- func (tc *TestCluster) Host() *TemporalImpl
- func (tc *TestCluster) MatchingClient() matchingservice.MatchingServiceClient
- func (tc *TestCluster) OperatorClient() operatorservice.OperatorServiceClient
- func (tc *TestCluster) OverrideDynamicConfig(t *testing.T, key dynamicconfig.GenericSetting, value any) (cleanup func())
- func (tc *TestCluster) SchedulerClient() schedulerpb.SchedulerServiceClient
- func (tc *TestCluster) TearDownCluster() error
- func (tc *TestCluster) TestBase() *persistencetests.TestBase
- func (tc *TestCluster) WorkerGRPCAddress() string
- type TestClusterConfig
- type TestClusterFactory
- type TestClusterOption
- func WithArchivalEnabled() TestClusterOption
- func WithDynamicConfigOverrides(overrides map[dynamicconfig.Key]any) TestClusterOption
- func WithFaultInjectionConfig(cfg *config.FaultInjection) TestClusterOption
- func WithFxOptionsForService(serviceName primitives.ServiceName, options ...fx.Option) TestClusterOption
- func WithMTLS() TestClusterOption
- func WithNumHistoryShards(n int32) TestClusterOption
- func WithSharedCluster() TestClusterOption
- type TestClusterParams
- type TestDataConverter
- func (tdc *TestDataConverter) FromPayload(payload *commonpb.Payload, valuePtr any) error
- func (tdc *TestDataConverter) FromPayloads(payloads *commonpb.Payloads, valuePtrs ...any) error
- func (tdc *TestDataConverter) ToPayload(value any) (*commonpb.Payload, error)
- func (tdc *TestDataConverter) ToPayloads(values ...any) (*commonpb.Payloads, error)
- func (tdc *TestDataConverter) ToString(payload *commonpb.Payload) string
- func (tdc *TestDataConverter) ToStrings(payloads *commonpb.Payloads) []string
- type TestEnv
- func (e *TestEnv) Context() context.Context
- func (e *TestEnv) InjectHook(hook testhooks.Hook) (cleanup func())
- func (e *TestEnv) Namespace() namespace.Name
- func (e *TestEnv) NamespaceID() namespace.ID
- func (e *TestEnv) OverrideDynamicConfig(setting dynamicconfig.GenericSetting, value any) (cleanup func())
- func (e *TestEnv) SdkClient() sdkclient.Client
- func (e *TestEnv) SdkWorker() sdkworker.Worker
- func (e *TestEnv) T() *testing.T
- func (e *TestEnv) TaskPoller() *taskpoller.TaskPoller
- func (e *TestEnv) Tv() *testvars.TestVars
- func (e *TestEnv) WorkerTaskQueue() string
- type TestOption
- type WorkerConfig
- type WorkflowTaskHandler
Constants ¶
const ( DefaultPageSize = 5 PprofTestPort = 7000 TlsCertCommonName = "my-common-name" ClientSuiteLimit = 10 // TODO (alex): replace all sleeps with WaitForESToSettle with s.Eventually() WaitForESToSettle = 4 * time.Second // wait es shards for some time ensure data consistent )
const ( DirectionSend = "send" DirectionRecv = "recv" DirectionServerSend = "server_send" DirectionServerRecv = "server_recv" )
Message direction constants
const NamespaceCacheRefreshInterval = time.Second
Variables ¶
var ( ErrEncodingIsNotSet = errors.New("payload encoding metadata is not set") ErrEncodingIsNotSupported = errors.New("payload encoding is not supported") )
var (
ErrNoTasks = errors.New("no tasks")
)
Functions ¶
func EventBatchesToVersionHistory ¶
func EventBatchesToVersionHistory( versionHistory *historyspb.VersionHistory, eventBatches []*historypb.History, ) (*historyspb.VersionHistory, error)
func ExtractReplicationMessages ¶
func ExtractReplicationMessages(msg proto.Message) *replicationspb.WorkflowReplicationMessages
ExtractReplicationMessages extracts WorkflowReplicationMessages from a proto message. This is a helper for tests that need to inspect replication message contents.
func GetPersistenceTestDefaults ¶
func GetPersistenceTestDefaults() persistencetests.TestBaseOptions
GetPersistenceTestDefaults returns the default persistence options based on CLI flags. Use this when creating TestClusterConfig to ensure proper database configuration.
func MustRunSequential ¶
MustRunSequential marks a test suite to run its tests sequentially instead of in parallel. Call this at the start of your test suite before any subtests are created. A single dedicated cluster will be created for this suite and torn down when the suite completes.
func MustToPayload ¶
MustToPayload converts a value to a Payload using the default data converter.
func NewContext ¶
NewContext creates a context with default 90-second timeout and RPC headers.
NOTE: If you're using testcore.NewEnv, you can use env.Context() directly - it already includes RPC headers. This function is primarily for legacy tests or creating standalone contexts outside of the TestEnv framework.
If a parent context is provided, the returned context will be canceled when either the timeout expires OR the parent is canceled.
func NewTestDataConverter ¶
func NewTestDataConverter() converter.DataConverter
TODO (alex): use it by default SdkCleint everywhere?
func RandomizeStr ¶
func RandomizedNexusEndpoint ¶
func UseCassandraPersistence ¶
func UseCassandraPersistence() bool
func UseSQLVisibility ¶
func UseSQLVisibility() bool
func WithDropTask ¶
func WithDropTask(o *PollAndProcessWorkflowTaskOptions)
func WithDumpHistory ¶
func WithDumpHistory(o *PollAndProcessWorkflowTaskOptions)
func WithForceNewWorkflowTask ¶
func WithForceNewWorkflowTask(o *PollAndProcessWorkflowTaskOptions)
func WithNoDumpCommands ¶
func WithNoDumpCommands(o *PollAndProcessWorkflowTaskOptions)
func WithPollSticky ¶
func WithPollSticky(o *PollAndProcessWorkflowTaskOptions)
func WithRespondSticky ¶
func WithRespondSticky(o *PollAndProcessWorkflowTaskOptions)
func WithoutRetries ¶
func WithoutRetries(o *PollAndProcessWorkflowTaskOptions)
Types ¶
type ActivityTaskHandler ¶
type ActivityTaskHandler func(task *workflowservice.PollActivityTaskQueueResponse) (*commonpb.Payloads, bool, error)
type ArchiverBase ¶
type ArchiverBase struct {
// contains filtered or unexported fields
}
ArchiverBase is a testcore struct for archiver provider being used in functional tests
func (*ArchiverBase) HistoryURI ¶
func (a *ArchiverBase) HistoryURI() string
func (*ArchiverBase) Metadata ¶
func (a *ArchiverBase) Metadata() archiver.ArchivalMetadata
func (*ArchiverBase) Provider ¶
func (a *ArchiverBase) Provider() provider.ArchiverProvider
func (*ArchiverBase) VisibilityURI ¶
func (a *ArchiverBase) VisibilityURI() string
type CapturedReplicationMessage ¶
type CapturedReplicationMessage struct {
Timestamp string `json:"timestamp"`
Method string `json:"method"`
Direction string `json:"direction"`
ClusterName string `json:"clusterName"`
TargetAddress string `json:"targetAddress"`
MessageType string `json:"messageType"`
IsStreamCall bool `json:"isStreamCall"`
Request proto.Message `json:"-"` // Don't marshal directly
Response proto.Message `json:"-"` // Don't marshal directly
Message json.RawMessage `json:"message,omitempty"`
}
CapturedReplicationMessage represents a captured replication message
type Env ¶
type Env interface {
T() *testing.T
Namespace() namespace.Name
NamespaceID() namespace.ID
FrontendClient() workflowservice.WorkflowServiceClient
AdminClient() adminservice.AdminServiceClient
GetTestCluster() *TestCluster
CloseShard(namespaceID string, workflowID string)
OverrideDynamicConfig(setting dynamicconfig.GenericSetting, value any) (cleanup func())
Context() context.Context
InjectHook(hook testhooks.Hook) (cleanup func())
}
type FrontendConfig ¶
type FrontendConfig struct {
NumFrontendHosts int
}
FrontendConfig is the config for the frontend service
type FunctionalTestBase ¶
type FunctionalTestBase struct {
suite.Suite
// `suite.Suite` embeds `*assert.Assertions` which, by default, makes all asserts (like `s.NoError(err)`)
// only log the error, continue test execution, and only then fail the test.
// This is not desired behavior in most cases. The idiomatic way to change this behavior
// is to replace `*assert.Assertions` with `*require.Assertions` by embedding it in every test suite
// (or base struct of every test suite).
*require.Assertions
protorequire.ProtoAssertions
historyrequire.HistoryRequire
updateutils.UpdateUtils
Logger log.Logger
// contains filtered or unexported fields
}
func (*FunctionalTestBase) AdminClient ¶
func (s *FunctionalTestBase) AdminClient() adminservice.AdminServiceClient
func (*FunctionalTestBase) CloseShard ¶
func (s *FunctionalTestBase) CloseShard(namespaceID string, workflowID string)
CloseShard closes the shard that contains the given workflow. This is a cluster-global operation and cannot be called on shared clusters.
func (*FunctionalTestBase) Context ¶
func (s *FunctionalTestBase) Context() context.Context
Context returns a context with RPC headers for use in this test.
func (*FunctionalTestBase) DecodePayloadsByteSliceInt32 ¶
func (s *FunctionalTestBase) DecodePayloadsByteSliceInt32(ps *commonpb.Payloads) (r int32)
func (*FunctionalTestBase) DecodePayloadsInt ¶
func (s *FunctionalTestBase) DecodePayloadsInt(ps *commonpb.Payloads) int
func (*FunctionalTestBase) DecodePayloadsString ¶
func (s *FunctionalTestBase) DecodePayloadsString(ps *commonpb.Payloads) string
func (*FunctionalTestBase) DurationNear ¶
func (s *FunctionalTestBase) DurationNear(value, target, tolerance time.Duration)
func (*FunctionalTestBase) ExternalNamespace ¶
func (s *FunctionalTestBase) ExternalNamespace() namespace.Name
func (*FunctionalTestBase) FrontendClient ¶
func (s *FunctionalTestBase) FrontendClient() workflowservice.WorkflowServiceClient
func (*FunctionalTestBase) FrontendGRPCAddress ¶
func (s *FunctionalTestBase) FrontendGRPCAddress() string
func (*FunctionalTestBase) GetHistory ¶
func (s *FunctionalTestBase) GetHistory(namespace string, execution *commonpb.WorkflowExecution) []*historypb.HistoryEvent
func (*FunctionalTestBase) GetHistoryFunc ¶
func (s *FunctionalTestBase) GetHistoryFunc(namespace string, execution *commonpb.WorkflowExecution) func() []*historypb.HistoryEvent
func (*FunctionalTestBase) GetNamespaceID ¶
func (s *FunctionalTestBase) GetNamespaceID(namespace string) string
func (*FunctionalTestBase) GetTestCluster ¶
func (s *FunctionalTestBase) GetTestCluster() *TestCluster
func (*FunctionalTestBase) GetTestClusterConfig ¶
func (s *FunctionalTestBase) GetTestClusterConfig() *TestClusterConfig
func (*FunctionalTestBase) HttpAPIAddress ¶
func (s *FunctionalTestBase) HttpAPIAddress() string
func (*FunctionalTestBase) InjectHook ¶
func (s *FunctionalTestBase) InjectHook(hook testhooks.Hook) (cleanup func())
InjectHook sets a test hook inside the cluster.
func (*FunctionalTestBase) MarkNamespaceAsDeleted ¶
func (s *FunctionalTestBase) MarkNamespaceAsDeleted( nsName namespace.Name, ) error
func (*FunctionalTestBase) Namespace ¶
func (s *FunctionalTestBase) Namespace() namespace.Name
func (*FunctionalTestBase) NamespaceID ¶
func (s *FunctionalTestBase) NamespaceID() namespace.ID
func (*FunctionalTestBase) OperatorClient ¶
func (s *FunctionalTestBase) OperatorClient() operatorservice.OperatorServiceClient
func (*FunctionalTestBase) OverrideDynamicConfig ¶
func (s *FunctionalTestBase) OverrideDynamicConfig(setting dynamicconfig.GenericSetting, value any) (cleanup func())
func (*FunctionalTestBase) RegisterNamespace ¶
func (s *FunctionalTestBase) RegisterNamespace( nsName namespace.Name, retentionDays int32, archivalState enumspb.ArchivalState, historyArchivalURI string, visibilityArchivalURI string, ) (namespace.ID, error)
Register namespace using persistence API because:
- The Retention period is set to 0 for archival tests, and this can't be done through FE,
- Update search attributes would require an extra API call,
- One more extra API call would be necessary to get namespace.ID.
func (*FunctionalTestBase) RunTestWithMatchingBehavior ¶
func (s *FunctionalTestBase) RunTestWithMatchingBehavior(subtest func())
func (*FunctionalTestBase) SdkClient ¶
func (s *FunctionalTestBase) SdkClient() sdkclient.Client
func (*FunctionalTestBase) SdkWorker ¶
func (s *FunctionalTestBase) SdkWorker() sdkworker.Worker
func (*FunctionalTestBase) SendSignal ¶
func (s *FunctionalTestBase) SendSignal(nsName string, execution *commonpb.WorkflowExecution, signalName string, input *commonpb.Payloads, identity string) error
TODO (alex): change to nsName namespace.Name
func (*FunctionalTestBase) SetupSubTest ¶
func (s *FunctionalTestBase) SetupSubTest()
func (*FunctionalTestBase) SetupSuite ¶
func (s *FunctionalTestBase) SetupSuite()
func (*FunctionalTestBase) SetupSuiteWithCluster ¶
func (s *FunctionalTestBase) SetupSuiteWithCluster(options ...TestClusterOption)
func (*FunctionalTestBase) SetupTest ¶
func (s *FunctionalTestBase) SetupTest()
All test suites that inherit FunctionalTestBase and overwrite SetupTest must call this testcore FunctionalTestBase.SetupTest function to distribute the tests into partitions. Otherwise, the test suite will be executed multiple times in each partition.
func (*FunctionalTestBase) TaskPoller ¶
func (s *FunctionalTestBase) TaskPoller() *taskpoller.TaskPoller
func (*FunctionalTestBase) TaskQueue ¶
func (s *FunctionalTestBase) TaskQueue() string
func (*FunctionalTestBase) TearDownCluster ¶
func (s *FunctionalTestBase) TearDownCluster()
func (*FunctionalTestBase) TearDownSubTest ¶
func (s *FunctionalTestBase) TearDownSubTest()
**IMPORTANT**: When overridding this, make sure to invoke `s.FunctionalTestBase.TearDownSubTest()`.
func (*FunctionalTestBase) TearDownSuite ¶
func (s *FunctionalTestBase) TearDownSuite()
func (*FunctionalTestBase) TearDownTest ¶
func (s *FunctionalTestBase) TearDownTest()
**IMPORTANT**: When overridding this, make sure to invoke `s.FunctionalTestBase.TearDownTest()`.
func (*FunctionalTestBase) WaitForChannel ¶
func (s *FunctionalTestBase) WaitForChannel(ctx context.Context, ch chan struct{})
func (*FunctionalTestBase) WorkerGRPCAddress ¶
func (s *FunctionalTestBase) WorkerGRPCAddress() string
type HistoryConfig ¶
HistoryConfig contains configs for history service
type MatchingBehavior ¶
MatchingBehavior describes a test scenario for matching service behavior.
func AllMatchingBehaviors ¶
func AllMatchingBehaviors() []MatchingBehavior
AllMatchingBehaviors returns all 8 combinations of matching behaviors for testing.
func (MatchingBehavior) InjectHooks ¶
func (b MatchingBehavior) InjectHooks(env Env)
InjectHooks injects the test hooks for this matching behavior.
func (MatchingBehavior) Name ¶
func (b MatchingBehavior) Name() string
Name returns a descriptive name for this behavior combination.
func (MatchingBehavior) Options ¶
func (b MatchingBehavior) Options() []TestOption
Options returns the TestOptions to configure matching behavior.
type MatchingConfig ¶
type MatchingConfig struct {
NumMatchingHosts int
}
MatchingConfig is the config for the matching service
type MessageHandler ¶
type MessageHandler func(task *workflowservice.PollWorkflowTaskQueueResponse) ([]*protocolpb.Message, error)
type PersistenceTestBaseFactory ¶
type PersistenceTestBaseFactory interface {
NewTestBase(options *persistencetests.TestBaseOptions) *persistencetests.TestBase
}
type PollAndProcessWorkflowTaskOptionFunc ¶
type PollAndProcessWorkflowTaskOptionFunc func(*PollAndProcessWorkflowTaskOptions)
func WithExpectedAttemptCount ¶
func WithExpectedAttemptCount(c int) PollAndProcessWorkflowTaskOptionFunc
func WithRetries ¶
func WithRetries(c int) PollAndProcessWorkflowTaskOptionFunc
type PollAndProcessWorkflowTaskResponse ¶
type PollAndProcessWorkflowTaskResponse struct {
IsQueryTask bool
NewTask *workflowservice.RespondWorkflowTaskCompletedResponse
}
type QueryHandler ¶
type QueryHandler func(task *workflowservice.PollWorkflowTaskQueueResponse) (*commonpb.Payloads, error)
type RecordedTask ¶
type RecordedTask struct {
Timestamp time.Time `json:"timestamp"`
TaskType string `json:"taskType"` // The specific task type (e.g., "TASK_TYPE_ACTIVITY_RETRY_TIMER")
ShardID int32 `json:"shardId"`
RangeID int64 `json:"rangeId,omitempty"`
NamespaceID string `json:"namespaceId"`
WorkflowID string `json:"workflowId"`
RunID string `json:"runId"`
Task tasks.Task `json:"task"` // The actual task object
}
RecordedTask wraps a task with metadata about when and where it was written
type ReplicationStreamRecorder ¶
type ReplicationStreamRecorder struct {
// contains filtered or unexported fields
}
ReplicationStreamRecorder captures replication stream messages for testing
func NewReplicationStreamRecorder ¶
func NewReplicationStreamRecorder() *ReplicationStreamRecorder
func (*ReplicationStreamRecorder) Clear ¶
func (r *ReplicationStreamRecorder) Clear()
func (*ReplicationStreamRecorder) GetMessages ¶
func (r *ReplicationStreamRecorder) GetMessages() []CapturedReplicationMessage
func (*ReplicationStreamRecorder) SetOutputFile ¶
func (r *ReplicationStreamRecorder) SetOutputFile(filePath string)
SetOutputFile sets the file path for writing captured messages on-demand
func (*ReplicationStreamRecorder) StreamInterceptor ¶
func (r *ReplicationStreamRecorder) StreamInterceptor(clusterName string) grpc.StreamClientInterceptor
StreamInterceptor returns a gRPC stream client interceptor that captures stream messages
func (*ReplicationStreamRecorder) StreamServerInterceptor ¶
func (r *ReplicationStreamRecorder) StreamServerInterceptor(clusterName string) grpc.StreamServerInterceptor
StreamServerInterceptor returns a gRPC stream server interceptor that captures stream messages
func (*ReplicationStreamRecorder) UnaryInterceptor ¶
func (r *ReplicationStreamRecorder) UnaryInterceptor(clusterName string) grpc.UnaryClientInterceptor
UnaryInterceptor returns a gRPC unary client interceptor that captures messages
func (*ReplicationStreamRecorder) UnaryServerInterceptor ¶
func (r *ReplicationStreamRecorder) UnaryServerInterceptor(clusterName string) grpc.UnaryServerInterceptor
UnaryServerInterceptor returns a gRPC unary server interceptor that captures messages
func (*ReplicationStreamRecorder) WriteToLog ¶
func (r *ReplicationStreamRecorder) WriteToLog() error
WriteToLog writes all captured messages to the configured output file
type TaskFilter ¶
type TaskFilter struct {
NamespaceID string // Required: namespace ID to filter by
WorkflowID string // Optional: workflow ID to filter by (empty string means no filter)
RunID string // Optional: run ID to filter by (empty string means no filter)
}
TaskFilter specifies criteria for filtering recorded tasks
type TaskMatcher ¶
type TaskMatcher func(RecordedTask) bool
TaskMatcher is a function that tests whether a RecordedTask matches some criteria
type TaskPoller
deprecated
type TaskPoller struct {
Client workflowservice.WorkflowServiceClient
Namespace string
TaskQueue *taskqueuepb.TaskQueue
StickyTaskQueue *taskqueuepb.TaskQueue
StickyScheduleToStartTimeout time.Duration
Identity string
WorkflowTaskHandler WorkflowTaskHandler
ActivityTaskHandler ActivityTaskHandler
QueryHandler QueryHandler
MessageHandler MessageHandler
Logger log.Logger
T *testing.T
}
Deprecated: TaskPoller is deprecated. Use taskpoller.TaskPoller instead. TaskPoller is used in functional tests to poll workflow or activity task queues.
func (*TaskPoller) HandlePartialWorkflowTask ¶
func (p *TaskPoller) HandlePartialWorkflowTask(response *workflowservice.PollWorkflowTaskQueueResponse, forceCreateNewWorkflowTask bool) (*workflowservice.RespondWorkflowTaskCompletedResponse, error)
HandlePartialWorkflowTask for workflow task
func (*TaskPoller) PollAndProcessActivityTask ¶
func (p *TaskPoller) PollAndProcessActivityTask(dropTask bool) error
PollAndProcessActivityTask for activity tasks
func (*TaskPoller) PollAndProcessActivityTaskWithID ¶
func (p *TaskPoller) PollAndProcessActivityTaskWithID(dropTask bool) error
PollAndProcessActivityTaskWithID is similar to PollAndProcessActivityTask but using RespondActivityTask...ByID
func (*TaskPoller) PollAndProcessWorkflowTask ¶
func (p *TaskPoller) PollAndProcessWorkflowTask(funcs ...PollAndProcessWorkflowTaskOptionFunc) (res PollAndProcessWorkflowTaskResponse, err error)
func (*TaskPoller) PollAndProcessWorkflowTaskWithOptions ¶
func (p *TaskPoller) PollAndProcessWorkflowTaskWithOptions(opts *PollAndProcessWorkflowTaskOptions) (res PollAndProcessWorkflowTaskResponse, err error)
type TaskQueueRecorder ¶
type TaskQueueRecorder struct {
// contains filtered or unexported fields
}
TaskQueueRecorder wraps an ExecutionManager to record ALL task writes to the history task queues (transfer, timer, replication, visibility, archival, etc.). This is useful for integration tests where you want to assert on what tasks were generated and in what order. Tasks are stored flattened by category - all tasks of the same type are in a single list, with each task wrapped with metadata about when/where it was written.
func NewTaskQueueRecorder ¶
func NewTaskQueueRecorder(delegate persistence.ExecutionManager, logger log.Logger) *TaskQueueRecorder
NewTaskQueueRecorder creates a recorder that wraps the given ExecutionManager
func (*TaskQueueRecorder) AddHistoryTasks ¶
func (r *TaskQueueRecorder) AddHistoryTasks( ctx context.Context, request *persistence.AddHistoryTasksRequest, ) error
AddHistoryTasks records the task write and then delegates to the underlying manager
func (*TaskQueueRecorder) AppendHistoryNodes ¶
func (r *TaskQueueRecorder) AppendHistoryNodes( ctx context.Context, request *persistence.AppendHistoryNodesRequest, ) (*persistence.AppendHistoryNodesResponse, error)
func (*TaskQueueRecorder) AppendRawHistoryNodes ¶
func (r *TaskQueueRecorder) AppendRawHistoryNodes( ctx context.Context, request *persistence.AppendRawHistoryNodesRequest, ) (*persistence.AppendHistoryNodesResponse, error)
func (*TaskQueueRecorder) Close ¶
func (r *TaskQueueRecorder) Close()
func (*TaskQueueRecorder) CompleteHistoryTask ¶
func (r *TaskQueueRecorder) CompleteHistoryTask( ctx context.Context, request *persistence.CompleteHistoryTaskRequest, ) error
func (*TaskQueueRecorder) ConflictResolveWorkflowExecution ¶
func (r *TaskQueueRecorder) ConflictResolveWorkflowExecution( ctx context.Context, request *persistence.ConflictResolveWorkflowExecutionRequest, ) (*persistence.ConflictResolveWorkflowExecutionResponse, error)
func (*TaskQueueRecorder) CountMatchingTasks ¶
func (r *TaskQueueRecorder) CountMatchingTasks(category tasks.Category, matcher TaskMatcher) int
CountMatchingTasks returns the count of tasks in a category that match the given matcher
func (*TaskQueueRecorder) CountTasksForNamespace ¶
func (r *TaskQueueRecorder) CountTasksForNamespace( category tasks.Category, namespaceID string, matcher TaskMatcher, ) int
CountTasksForNamespace returns the count of tasks in a category for a specific namespace
func (*TaskQueueRecorder) CountTasksForWorkflow ¶
func (r *TaskQueueRecorder) CountTasksForWorkflow( category tasks.Category, namespaceID string, workflowID string, runID string, matcher TaskMatcher, ) int
CountTasksForWorkflow returns the count of tasks in a category for a specific workflow If namespaceID is empty, it matches any namespace If runID is empty, it matches any runID for the given workflowID
func (*TaskQueueRecorder) CreateWorkflowExecution ¶
func (r *TaskQueueRecorder) CreateWorkflowExecution( ctx context.Context, request *persistence.CreateWorkflowExecutionRequest, ) (*persistence.CreateWorkflowExecutionResponse, error)
func (*TaskQueueRecorder) DeleteCurrentWorkflowExecution ¶
func (r *TaskQueueRecorder) DeleteCurrentWorkflowExecution( ctx context.Context, request *persistence.DeleteCurrentWorkflowExecutionRequest, ) error
func (*TaskQueueRecorder) DeleteHistoryBranch ¶
func (r *TaskQueueRecorder) DeleteHistoryBranch( ctx context.Context, request *persistence.DeleteHistoryBranchRequest, ) error
func (*TaskQueueRecorder) DeleteReplicationTaskFromDLQ ¶
func (r *TaskQueueRecorder) DeleteReplicationTaskFromDLQ( ctx context.Context, request *persistence.DeleteReplicationTaskFromDLQRequest, ) error
func (*TaskQueueRecorder) DeleteWorkflowExecution ¶
func (r *TaskQueueRecorder) DeleteWorkflowExecution( ctx context.Context, request *persistence.DeleteWorkflowExecutionRequest, ) error
func (*TaskQueueRecorder) ForkHistoryBranch ¶
func (r *TaskQueueRecorder) ForkHistoryBranch( ctx context.Context, request *persistence.ForkHistoryBranchRequest, ) (*persistence.ForkHistoryBranchResponse, error)
func (*TaskQueueRecorder) GetAllHistoryTreeBranches ¶
func (r *TaskQueueRecorder) GetAllHistoryTreeBranches( ctx context.Context, request *persistence.GetAllHistoryTreeBranchesRequest, ) (*persistence.GetAllHistoryTreeBranchesResponse, error)
func (*TaskQueueRecorder) GetAllRecordedTasks ¶
func (r *TaskQueueRecorder) GetAllRecordedTasks() map[tasks.Category][]RecordedTask
GetAllRecordedTasks returns all recorded tasks WITH metadata, grouped by category
func (*TaskQueueRecorder) GetAllTasks ¶
func (r *TaskQueueRecorder) GetAllTasks() map[tasks.Category][]tasks.Task
GetAllTasks returns all tasks grouped by category (unwrapped, without metadata)
func (*TaskQueueRecorder) GetCurrentExecution ¶
func (r *TaskQueueRecorder) GetCurrentExecution( ctx context.Context, request *persistence.GetCurrentExecutionRequest, ) (*persistence.GetCurrentExecutionResponse, error)
func (*TaskQueueRecorder) GetHistoryBranchUtil ¶
func (r *TaskQueueRecorder) GetHistoryBranchUtil() persistence.HistoryBranchUtil
func (*TaskQueueRecorder) GetHistoryTasks ¶
func (r *TaskQueueRecorder) GetHistoryTasks( ctx context.Context, request *persistence.GetHistoryTasksRequest, ) (*persistence.GetHistoryTasksResponse, error)
func (*TaskQueueRecorder) GetName ¶
func (r *TaskQueueRecorder) GetName() string
func (*TaskQueueRecorder) GetRecordedTasksByCategoryFiltered ¶
func (r *TaskQueueRecorder) GetRecordedTasksByCategoryFiltered(category tasks.Category, filter TaskFilter) []RecordedTask
GetRecordedTasksByCategoryFiltered returns recorded tasks WITH metadata for a specific category, filtered by namespace (required) and optionally by workflow ID and run ID. This is the preferred API for tests to ensure tasks are properly scoped.
func (*TaskQueueRecorder) GetReplicationTasksFromDLQ ¶
func (r *TaskQueueRecorder) GetReplicationTasksFromDLQ( ctx context.Context, request *persistence.GetReplicationTasksFromDLQRequest, ) (*persistence.GetHistoryTasksResponse, error)
func (*TaskQueueRecorder) GetWorkflowExecution ¶
func (r *TaskQueueRecorder) GetWorkflowExecution( ctx context.Context, request *persistence.GetWorkflowExecutionRequest, ) (*persistence.GetWorkflowExecutionResponse, error)
func (*TaskQueueRecorder) IsReplicationDLQEmpty ¶
func (r *TaskQueueRecorder) IsReplicationDLQEmpty( ctx context.Context, request *persistence.GetReplicationTasksFromDLQRequest, ) (bool, error)
func (*TaskQueueRecorder) ListConcreteExecutions ¶
func (r *TaskQueueRecorder) ListConcreteExecutions( ctx context.Context, request *persistence.ListConcreteExecutionsRequest, ) (*persistence.ListConcreteExecutionsResponse, error)
func (*TaskQueueRecorder) MatchTasks ¶
func (r *TaskQueueRecorder) MatchTasks(category tasks.Category, matcher TaskMatcher) []RecordedTask
MatchTasks returns all tasks in a category that match the given matcher function
func (*TaskQueueRecorder) MatchTasksForNamespace ¶
func (r *TaskQueueRecorder) MatchTasksForNamespace( category tasks.Category, namespaceID string, matcher TaskMatcher, ) []RecordedTask
MatchTasksForNamespace returns all tasks in a category for a specific namespace
func (*TaskQueueRecorder) MatchTasksForWorkflow ¶
func (r *TaskQueueRecorder) MatchTasksForWorkflow( category tasks.Category, namespaceID string, workflowID string, runID string, matcher TaskMatcher, ) []RecordedTask
MatchTasksForWorkflow returns all tasks in a category for a specific workflow If namespaceID is empty, it matches any namespace If runID is empty, it matches any runID for the given workflowID
func (*TaskQueueRecorder) PutReplicationTaskToDLQ ¶
func (r *TaskQueueRecorder) PutReplicationTaskToDLQ( ctx context.Context, request *persistence.PutReplicationTaskToDLQRequest, ) error
func (*TaskQueueRecorder) RangeCompleteHistoryTasks ¶
func (r *TaskQueueRecorder) RangeCompleteHistoryTasks( ctx context.Context, request *persistence.RangeCompleteHistoryTasksRequest, ) error
func (*TaskQueueRecorder) RangeDeleteReplicationTaskFromDLQ ¶
func (r *TaskQueueRecorder) RangeDeleteReplicationTaskFromDLQ( ctx context.Context, request *persistence.RangeDeleteReplicationTaskFromDLQRequest, ) error
func (*TaskQueueRecorder) ReadHistoryBranch ¶
func (r *TaskQueueRecorder) ReadHistoryBranch( ctx context.Context, request *persistence.ReadHistoryBranchRequest, ) (*persistence.ReadHistoryBranchResponse, error)
func (*TaskQueueRecorder) ReadHistoryBranchByBatch ¶
func (r *TaskQueueRecorder) ReadHistoryBranchByBatch( ctx context.Context, request *persistence.ReadHistoryBranchRequest, ) (*persistence.ReadHistoryBranchByBatchResponse, error)
func (*TaskQueueRecorder) ReadHistoryBranchReverse ¶
func (r *TaskQueueRecorder) ReadHistoryBranchReverse( ctx context.Context, request *persistence.ReadHistoryBranchReverseRequest, ) (*persistence.ReadHistoryBranchReverseResponse, error)
func (*TaskQueueRecorder) ReadRawHistoryBranch ¶
func (r *TaskQueueRecorder) ReadRawHistoryBranch( ctx context.Context, request *persistence.ReadHistoryBranchRequest, ) (*persistence.ReadRawHistoryBranchResponse, error)
func (*TaskQueueRecorder) SetWorkflowExecution ¶
func (r *TaskQueueRecorder) SetWorkflowExecution( ctx context.Context, request *persistence.SetWorkflowExecutionRequest, ) (*persistence.SetWorkflowExecutionResponse, error)
func (*TaskQueueRecorder) TrimHistoryBranch ¶
func (r *TaskQueueRecorder) TrimHistoryBranch( ctx context.Context, request *persistence.TrimHistoryBranchRequest, ) (*persistence.TrimHistoryBranchResponse, error)
func (*TaskQueueRecorder) UpdateWorkflowExecution ¶
func (r *TaskQueueRecorder) UpdateWorkflowExecution( ctx context.Context, request *persistence.UpdateWorkflowExecutionRequest, ) (*persistence.UpdateWorkflowExecutionResponse, error)
func (*TaskQueueRecorder) WriteToLog ¶
func (r *TaskQueueRecorder) WriteToLog(filePath string) error
WriteToLog writes all captured tasks to a file in JSON format
type TemporalImpl ¶
type TemporalImpl struct {
// contains filtered or unexported fields
}
func (*TemporalImpl) AdminClient ¶
func (c *TemporalImpl) AdminClient() adminservice.AdminServiceClient
func (*TemporalImpl) Authorize ¶
func (c *TemporalImpl) Authorize( ctx context.Context, caller *authorization.Claims, target *authorization.CallTarget, ) (authorization.Result, error)
func (*TemporalImpl) CaptureMetricsHandler ¶
func (c *TemporalImpl) CaptureMetricsHandler() *metricstest.CaptureHandler
func (*TemporalImpl) ChasmEngine ¶
func (c *TemporalImpl) ChasmEngine() (chasm.Engine, error)
func (*TemporalImpl) ChasmVisibilityManager ¶
func (c *TemporalImpl) ChasmVisibilityManager() chasm.VisibilityManager
func (*TemporalImpl) DcClient ¶
func (c *TemporalImpl) DcClient() *dynamicconfig.MemoryClient
func (*TemporalImpl) FrontendClient ¶
func (c *TemporalImpl) FrontendClient() workflowservice.WorkflowServiceClient
func (*TemporalImpl) FrontendGRPCAddress ¶
func (c *TemporalImpl) FrontendGRPCAddress() string
func (*TemporalImpl) FrontendHTTPAddress ¶
func (c *TemporalImpl) FrontendHTTPAddress() string
func (*TemporalImpl) GetCHASMRegistry ¶
func (c *TemporalImpl) GetCHASMRegistry() *chasm.Registry
func (*TemporalImpl) GetClaims ¶
func (c *TemporalImpl) GetClaims(authInfo *authorization.AuthInfo) (*authorization.Claims, error)
func (*TemporalImpl) GetExecutionManager ¶
func (c *TemporalImpl) GetExecutionManager() persistence.ExecutionManager
func (*TemporalImpl) GetGrpcClientInterceptor ¶
func (c *TemporalImpl) GetGrpcClientInterceptor() *grpcinject.Interceptor
func (*TemporalImpl) GetMetricsHandler ¶
func (c *TemporalImpl) GetMetricsHandler() metrics.Handler
func (*TemporalImpl) GetTLSConfigProvider ¶
func (c *TemporalImpl) GetTLSConfigProvider() encryption.TLSConfigProvider
func (*TemporalImpl) GetTaskCategoryRegistry ¶
func (c *TemporalImpl) GetTaskCategoryRegistry() tasks.TaskCategoryRegistry
func (*TemporalImpl) GetTaskQueueRecorder ¶
func (c *TemporalImpl) GetTaskQueueRecorder() *TaskQueueRecorder
func (*TemporalImpl) HistoryClient ¶
func (c *TemporalImpl) HistoryClient() historyservice.HistoryServiceClient
func (*TemporalImpl) MatchingClient ¶
func (c *TemporalImpl) MatchingClient() matchingservice.MatchingServiceClient
func (*TemporalImpl) NamespaceRegistries ¶
func (c *TemporalImpl) NamespaceRegistries() []namespace.Registry
func (*TemporalImpl) OperatorClient ¶
func (c *TemporalImpl) OperatorClient() operatorservice.OperatorServiceClient
func (*TemporalImpl) RemoteFrontendGRPCAddress ¶
func (c *TemporalImpl) RemoteFrontendGRPCAddress() string
Use this to get an address for a remote cluster to connect to.
func (*TemporalImpl) SchedulerClient ¶
func (c *TemporalImpl) SchedulerClient() schedulerpb.SchedulerServiceClient
func (*TemporalImpl) SetOnAuthorize ¶
func (c *TemporalImpl) SetOnAuthorize( fn func(context.Context, *authorization.Claims, *authorization.CallTarget) (authorization.Result, error), )
func (*TemporalImpl) SetOnGetClaims ¶
func (c *TemporalImpl) SetOnGetClaims(fn func(*authorization.AuthInfo) (*authorization.Claims, error))
func (*TemporalImpl) SetTaskQueueRecorder ¶
func (c *TemporalImpl) SetTaskQueueRecorder(recorder *TaskQueueRecorder)
func (*TemporalImpl) Start ¶
func (c *TemporalImpl) Start() error
func (*TemporalImpl) Stop ¶
func (c *TemporalImpl) Stop() error
func (*TemporalImpl) TlsConfigProvider ¶
func (c *TemporalImpl) TlsConfigProvider() *encryption.FixedTLSConfigProvider
func (*TemporalImpl) WorkerGRPCAddress ¶
func (c *TemporalImpl) WorkerGRPCAddress() string
type TemporalParams ¶
type TemporalParams struct {
ClusterMetadataConfig *cluster.Config
PersistenceConfig config.Persistence
MetadataMgr persistence.MetadataManager
ClusterMetadataManager persistence.ClusterMetadataManager
ShardMgr persistence.ShardManager
ExecutionManager persistence.ExecutionManager
TaskMgr persistence.TaskManager
NamespaceReplicationQueue persistence.NamespaceReplicationQueue
AbstractDataStoreFactory persistenceClient.AbstractDataStoreFactory
VisibilityStoreFactory visibility.VisibilityStoreFactory
Logger log.Logger
ArchiverMetadata carchiver.ArchivalMetadata
ArchiverProvider provider.ArchiverProvider
EnableReadHistoryFromArchival bool
FrontendConfig FrontendConfig
HistoryConfig HistoryConfig
MatchingConfig MatchingConfig
WorkerConfig WorkerConfig
ESConfig *esclient.Config
ESClient esclient.Client
MockAdminClient map[string]adminservice.AdminServiceClient
NamespaceReplicationTaskExecutor nsreplication.TaskExecutor
DynamicConfigOverrides map[dynamicconfig.Key]any
TLSConfigProvider *encryption.FixedTLSConfigProvider
CaptureMetricsHandler *metricstest.CaptureHandler
// ServiceFxOptions is populated by WithFxOptionsForService.
ServiceFxOptions map[primitives.ServiceName][]fx.Option
TaskCategoryRegistry tasks.TaskCategoryRegistry
HostsByProtocolByService map[transferProtocol]map[primitives.ServiceName]static.Hosts
SpanExporters map[telemetry.SpanExporterType]sdktrace.SpanExporter
}
TemporalParams contains everything needed to bootstrap Temporal
type TestCluster ¶
type TestCluster struct {
// contains filtered or unexported fields
}
TestCluster is a testcore struct for functional tests
func (*TestCluster) AdminClient ¶
func (tc *TestCluster) AdminClient() adminservice.AdminServiceClient
func (*TestCluster) ArchiverBase ¶
func (tc *TestCluster) ArchiverBase() *ArchiverBase
func (*TestCluster) ClusterName ¶
func (tc *TestCluster) ClusterName() string
func (*TestCluster) ExecutionManager ¶
func (tc *TestCluster) ExecutionManager() persistence.ExecutionManager
ExecutionManager returns an execution manager factory from the test cluster
func (*TestCluster) FrontendClient ¶
func (tc *TestCluster) FrontendClient() workflowservice.WorkflowServiceClient
func (*TestCluster) GetReplicationStreamRecorder ¶
func (tc *TestCluster) GetReplicationStreamRecorder() *ReplicationStreamRecorder
func (*TestCluster) GetTaskQueueRecorder ¶
func (tc *TestCluster) GetTaskQueueRecorder() *TaskQueueRecorder
func (*TestCluster) HistoryClient ¶
func (tc *TestCluster) HistoryClient() historyservice.HistoryServiceClient
HistoryClient returns a history client from the test cluster
func (*TestCluster) Host ¶
func (tc *TestCluster) Host() *TemporalImpl
TODO (alex): expose only needed objects from TemporalImpl.
func (*TestCluster) MatchingClient ¶
func (tc *TestCluster) MatchingClient() matchingservice.MatchingServiceClient
MatchingClient returns a matching client from the test cluster
func (*TestCluster) OperatorClient ¶
func (tc *TestCluster) OperatorClient() operatorservice.OperatorServiceClient
func (*TestCluster) OverrideDynamicConfig ¶
func (tc *TestCluster) OverrideDynamicConfig(t *testing.T, key dynamicconfig.GenericSetting, value any) (cleanup func())
func (*TestCluster) SchedulerClient ¶
func (tc *TestCluster) SchedulerClient() schedulerpb.SchedulerServiceClient
SchedulerClient returns a scheduler client from the test cluster
func (*TestCluster) TearDownCluster ¶
func (tc *TestCluster) TearDownCluster() error
TearDownCluster tears down the test cluster
func (*TestCluster) TestBase ¶
func (tc *TestCluster) TestBase() *persistencetests.TestBase
TODO (alex): remove this method. Replace usages with concrete methods.
func (*TestCluster) WorkerGRPCAddress ¶
func (tc *TestCluster) WorkerGRPCAddress() string
type TestClusterConfig ¶
type TestClusterConfig struct {
EnableArchival bool
IsMasterCluster bool
ClusterMetadata cluster.Config
Persistence persistencetests.TestBaseOptions
FrontendConfig FrontendConfig
HistoryConfig HistoryConfig
MatchingConfig MatchingConfig
WorkerConfig WorkerConfig
ESConfig *esclient.Config
MockAdminClient map[string]adminservice.AdminServiceClient
FaultInjection *config.FaultInjection
DynamicConfigOverrides map[dynamicconfig.Key]any
EnableMTLS bool
EnableMetricsCapture bool
SpanExporters map[telemetry.SpanExporterType]sdktrace.SpanExporter
// ServiceFxOptions can be populated using WithFxOptionsForService.
ServiceFxOptions map[primitives.ServiceName][]fx.Option
}
TestClusterConfig are config for a test cluster
type TestClusterFactory ¶
type TestClusterFactory interface {
NewCluster(t *testing.T, clusterConfig *TestClusterConfig, logger log.Logger) (*TestCluster, error)
}
func NewTestClusterFactory ¶
func NewTestClusterFactory() TestClusterFactory
func NewTestClusterFactoryWithCustomTestBaseFactory ¶
func NewTestClusterFactoryWithCustomTestBaseFactory(tbFactory PersistenceTestBaseFactory) TestClusterFactory
type TestClusterOption ¶
type TestClusterOption func(params *TestClusterParams)
func WithArchivalEnabled ¶
func WithArchivalEnabled() TestClusterOption
func WithDynamicConfigOverrides ¶
func WithDynamicConfigOverrides(overrides map[dynamicconfig.Key]any) TestClusterOption
func WithFaultInjectionConfig ¶
func WithFaultInjectionConfig(cfg *config.FaultInjection) TestClusterOption
func WithFxOptionsForService ¶
func WithFxOptionsForService(serviceName primitives.ServiceName, options ...fx.Option) TestClusterOption
WithFxOptionsForService returns an Option which, when passed as an argument to setupSuite, will append the given list of fx options to the end of the arguments to the fx.New call for the given service. For example, if you want to obtain the shard controller for the history service, you can do this:
var shardController shard.Controller s.setupSuite(t, tests.WithFxOptionsForService(primitives.HistoryService, fx.Populate(&shardController))) // now you can use shardController during your test
This is similar to the pattern of plumbing dependencies through the TestClusterConfig, but it's much more convenient, scalable and flexible. The reason we need to do this on a per-service basis is that there are separate fx apps for each one.
func WithMTLS ¶
func WithMTLS() TestClusterOption
func WithNumHistoryShards ¶
func WithNumHistoryShards(n int32) TestClusterOption
func WithSharedCluster ¶
func WithSharedCluster() TestClusterOption
type TestClusterParams ¶
type TestClusterParams struct {
ServiceOptions map[primitives.ServiceName][]fx.Option
DynamicConfigOverrides map[dynamicconfig.Key]any
ArchivalEnabled bool
EnableMTLS bool
FaultInjectionConfig *config.FaultInjection
NumHistoryShards int32
}
TestClusterParams contains the variables which are used to configure test cluster via the TestClusterOption type.
func ApplyTestClusterOptions ¶
func ApplyTestClusterOptions(options []TestClusterOption) TestClusterParams
type TestDataConverter ¶
type TestDataConverter struct {
NumOfCallToPayloads int // for testing to know testDataConverter is called as expected
NumOfCallFromPayloads int
}
TestDataConverter implements encoded.DataConverter using gob
func (*TestDataConverter) FromPayload ¶
func (tdc *TestDataConverter) FromPayload(payload *commonpb.Payload, valuePtr any) error
func (*TestDataConverter) FromPayloads ¶
func (tdc *TestDataConverter) FromPayloads(payloads *commonpb.Payloads, valuePtrs ...any) error
func (*TestDataConverter) ToPayload ¶
func (tdc *TestDataConverter) ToPayload(value any) (*commonpb.Payload, error)
func (*TestDataConverter) ToPayloads ¶
func (tdc *TestDataConverter) ToPayloads(values ...any) (*commonpb.Payloads, error)
type TestEnv ¶
type TestEnv struct {
*FunctionalTestBase
*require.Assertions
historyrequire.HistoryRequire
Logger log.Logger
// contains filtered or unexported fields
}
func NewEnv ¶
func NewEnv(t *testing.T, opts ...TestOption) *TestEnv
NewEnv creates a new test environment with access to a Temporal cluster.
By default, tests are marked as parallel. Use MustRunSequential on the test's parent `testing.T` to run them sequentially instead.
func (*TestEnv) Context ¶
Context returns the test-level timeout context with RPC version headers already included. This context will be canceled when the test timeout occurs. Use this directly for all RPC operations - no need to wrap with NewContext or add headers manually.
For custom timeouts, use:
ctx, cancel := context.WithTimeout(env.Context(), 10*time.Second) defer cancel()
func (*TestEnv) InjectHook ¶
InjectHook sets a test hook inside the cluster.
It auto-detects the scope from the hook: - For namespace-scoped hooks: scopes it to the test's namespace - For global hooks: requires a dedicated cluster (fails early if used on shared cluster)
func (*TestEnv) NamespaceID ¶
func (*TestEnv) OverrideDynamicConfig ¶
func (e *TestEnv) OverrideDynamicConfig(setting dynamicconfig.GenericSetting, value any) (cleanup func())
OverrideDynamicConfig overrides a dynamic config setting for the duration of this test. For settings that can be namespace-scoped, a namespace constraint is applied. All others cannot be applied to a shared cluster and require `WithDedicatedCluster`.
func (*TestEnv) SdkClient ¶
SdkClient returns the SDK client created by WithSdkWorker. Panics if WithSdkWorker was not passed to NewEnv.
func (*TestEnv) SdkWorker ¶
SdkWorker returns the SDK worker created by WithSdkWorker. Panics if WithSdkWorker was not passed to NewEnv.
func (*TestEnv) TaskPoller ¶
func (e *TestEnv) TaskPoller() *taskpoller.TaskPoller
func (*TestEnv) WorkerTaskQueue ¶
WorkerTaskQueue returns the task queue name used by the SDK Worker. Panics if WithSdkWorker was not passed to NewEnv.
type TestOption ¶
type TestOption func(*testOptions)
func WithDedicatedCluster ¶
func WithDedicatedCluster() TestOption
WithDedicatedCluster requests a dedicated (non-shared) cluster for the test. Use this for tests that have cluster-global side effects.
func WithDynamicConfig ¶
func WithDynamicConfig(setting dynamicconfig.GenericSetting, value any) TestOption
WithDynamicConfig overrides a dynamic config setting for the test. For settings that can be namespace-scoped, a namespace constraint is applied. For all others that require a dedicated cluster, this implies `WithDedicatedCluster`.
func WithSdkWorker ¶
func WithSdkWorker() TestOption
WithSdkWorker sets up an SDK client and worker for the test. Cleanup is handled automatically via t.Cleanup().
func WithTimeout ¶
func WithTimeout(duration time.Duration) TestOption
WithTimeout sets a custom timeout for the test. The test will fail if it runs longer than this duration. The timeout is multiplied by debug.TimeoutMultiplier when debugging. The TEMPORAL_TEST_TIMEOUT environment variable can also set the default timeout in seconds.
type WorkerConfig ¶
WorkerConfig is the config for the worker service
type WorkflowTaskHandler ¶
type WorkflowTaskHandler func(task *workflowservice.PollWorkflowTaskQueueResponse) ([]*commandpb.Command, error)