testcore

package
v1.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 17, 2026 License: MIT Imports: 125 Imported by: 0

Documentation

Index

Constants

View Source
const (
	DefaultPageSize   = 5
	PprofTestPort     = 7000
	TlsCertCommonName = "my-common-name"
	ClientSuiteLimit  = 10
	// TODO (alex): replace all sleeps with WaitForESToSettle with s.Eventually()
	WaitForESToSettle = 4 * time.Second // wait es shards for some time ensure data consistent
)
View Source
const (
	DirectionSend       = "send"
	DirectionRecv       = "recv"
	DirectionServerSend = "server_send"
	DirectionServerRecv = "server_recv"
)

Message direction constants

View Source
const NamespaceCacheRefreshInterval = time.Second

Variables

View Source
var (
	ErrEncodingIsNotSet       = errors.New("payload encoding metadata is not set")
	ErrEncodingIsNotSupported = errors.New("payload encoding is not supported")
)
View Source
var (
	ErrNoTasks = errors.New("no tasks")
)

Functions

func DecodeString

func DecodeString(t require.TestingT, pls *commonpb.Payloads) string

func EventBatchesToVersionHistory

func EventBatchesToVersionHistory(
	versionHistory *historyspb.VersionHistory,
	eventBatches []*historypb.History,
) (*historyspb.VersionHistory, error)

func ExtractReplicationMessages

func ExtractReplicationMessages(msg proto.Message) *replicationspb.WorkflowReplicationMessages

ExtractReplicationMessages extracts WorkflowReplicationMessages from a proto message. This is a helper for tests that need to inspect replication message contents.

func GetPersistenceTestDefaults

func GetPersistenceTestDefaults() persistencetests.TestBaseOptions

GetPersistenceTestDefaults returns the default persistence options based on CLI flags. Use this when creating TestClusterConfig to ensure proper database configuration.

func MustRunSequential

func MustRunSequential(t *testing.T, reason string)

MustRunSequential marks a test suite to run its tests sequentially instead of in parallel. Call this at the start of your test suite before any subtests are created. A single dedicated cluster will be created for this suite and torn down when the suite completes.

func MustToPayload

func MustToPayload(t require.TestingT, v any) *commonpb.Payload

MustToPayload converts a value to a Payload using the default data converter.

func NewContext

func NewContext(parent ...context.Context) context.Context

NewContext creates a context with default 90-second timeout and RPC headers.

NOTE: If you're using testcore.NewEnv, you can use env.Context() directly - it already includes RPC headers. This function is primarily for legacy tests or creating standalone contexts outside of the TestEnv framework.

If a parent context is provided, the returned context will be canceled when either the timeout expires OR the parent is canceled.

func NewTestDataConverter

func NewTestDataConverter() converter.DataConverter

TODO (alex): use it by default SdkCleint everywhere?

func RandomizeStr

func RandomizeStr(id string) string

func RandomizedNexusEndpoint

func RandomizedNexusEndpoint(name string) string

func UseCassandraPersistence

func UseCassandraPersistence() bool

func UseSQLVisibility

func UseSQLVisibility() bool

func WithDumpHistory

func WithDumpHistory(o *PollAndProcessWorkflowTaskOptions)

func WithForceNewWorkflowTask

func WithForceNewWorkflowTask(o *PollAndProcessWorkflowTaskOptions)

func WithNoDumpCommands

func WithNoDumpCommands(o *PollAndProcessWorkflowTaskOptions)

func WithPollSticky

func WithPollSticky(o *PollAndProcessWorkflowTaskOptions)

func WithRespondSticky

func WithRespondSticky(o *PollAndProcessWorkflowTaskOptions)

func WithoutRetries

func WithoutRetries(o *PollAndProcessWorkflowTaskOptions)

Types

type ArchiverBase

type ArchiverBase struct {
	// contains filtered or unexported fields
}

ArchiverBase is a testcore struct for archiver provider being used in functional tests

func (*ArchiverBase) HistoryURI

func (a *ArchiverBase) HistoryURI() string

func (*ArchiverBase) Metadata

func (a *ArchiverBase) Metadata() archiver.ArchivalMetadata

func (*ArchiverBase) Provider

func (a *ArchiverBase) Provider() provider.ArchiverProvider

func (*ArchiverBase) VisibilityURI

func (a *ArchiverBase) VisibilityURI() string

type CapturedReplicationMessage

type CapturedReplicationMessage struct {
	Timestamp     string          `json:"timestamp"`
	Method        string          `json:"method"`
	Direction     string          `json:"direction"`
	ClusterName   string          `json:"clusterName"`
	TargetAddress string          `json:"targetAddress"`
	MessageType   string          `json:"messageType"`
	IsStreamCall  bool            `json:"isStreamCall"`
	Request       proto.Message   `json:"-"` // Don't marshal directly
	Response      proto.Message   `json:"-"` // Don't marshal directly
	Message       json.RawMessage `json:"message,omitempty"`
}

CapturedReplicationMessage represents a captured replication message

type Env

type Env interface {
	T() *testing.T
	Namespace() namespace.Name
	NamespaceID() namespace.ID
	FrontendClient() workflowservice.WorkflowServiceClient
	AdminClient() adminservice.AdminServiceClient
	GetTestCluster() *TestCluster
	CloseShard(namespaceID string, workflowID string)
	OverrideDynamicConfig(setting dynamicconfig.GenericSetting, value any) (cleanup func())
	Context() context.Context
	InjectHook(hook testhooks.Hook) (cleanup func())
}

type FrontendConfig

type FrontendConfig struct {
	NumFrontendHosts int
}

FrontendConfig is the config for the frontend service

type FunctionalTestBase

type FunctionalTestBase struct {
	suite.Suite

	// `suite.Suite` embeds `*assert.Assertions` which, by default, makes all asserts (like `s.NoError(err)`)
	// only log the error, continue test execution, and only then fail the test.
	// This is not desired behavior in most cases. The idiomatic way to change this behavior
	// is to replace `*assert.Assertions` with `*require.Assertions` by embedding it in every test suite
	// (or base struct of every test suite).
	*require.Assertions

	protorequire.ProtoAssertions
	historyrequire.HistoryRequire
	updateutils.UpdateUtils

	Logger log.Logger
	// contains filtered or unexported fields
}

func (*FunctionalTestBase) AdminClient

func (*FunctionalTestBase) CloseShard

func (s *FunctionalTestBase) CloseShard(namespaceID string, workflowID string)

CloseShard closes the shard that contains the given workflow. This is a cluster-global operation and cannot be called on shared clusters.

func (*FunctionalTestBase) Context

func (s *FunctionalTestBase) Context() context.Context

Context returns a context with RPC headers for use in this test.

func (*FunctionalTestBase) DecodePayloadsByteSliceInt32

func (s *FunctionalTestBase) DecodePayloadsByteSliceInt32(ps *commonpb.Payloads) (r int32)

func (*FunctionalTestBase) DecodePayloadsInt

func (s *FunctionalTestBase) DecodePayloadsInt(ps *commonpb.Payloads) int

func (*FunctionalTestBase) DecodePayloadsString

func (s *FunctionalTestBase) DecodePayloadsString(ps *commonpb.Payloads) string

func (*FunctionalTestBase) DurationNear

func (s *FunctionalTestBase) DurationNear(value, target, tolerance time.Duration)

func (*FunctionalTestBase) ExternalNamespace

func (s *FunctionalTestBase) ExternalNamespace() namespace.Name

func (*FunctionalTestBase) FrontendClient

func (*FunctionalTestBase) FrontendGRPCAddress

func (s *FunctionalTestBase) FrontendGRPCAddress() string

func (*FunctionalTestBase) GetHistory

func (s *FunctionalTestBase) GetHistory(namespace string, execution *commonpb.WorkflowExecution) []*historypb.HistoryEvent

func (*FunctionalTestBase) GetHistoryFunc

func (s *FunctionalTestBase) GetHistoryFunc(namespace string, execution *commonpb.WorkflowExecution) func() []*historypb.HistoryEvent

func (*FunctionalTestBase) GetNamespaceID

func (s *FunctionalTestBase) GetNamespaceID(namespace string) string

func (*FunctionalTestBase) GetTestCluster

func (s *FunctionalTestBase) GetTestCluster() *TestCluster

func (*FunctionalTestBase) GetTestClusterConfig

func (s *FunctionalTestBase) GetTestClusterConfig() *TestClusterConfig

func (*FunctionalTestBase) HttpAPIAddress

func (s *FunctionalTestBase) HttpAPIAddress() string

func (*FunctionalTestBase) InjectHook

func (s *FunctionalTestBase) InjectHook(hook testhooks.Hook) (cleanup func())

InjectHook sets a test hook inside the cluster.

func (*FunctionalTestBase) MarkNamespaceAsDeleted

func (s *FunctionalTestBase) MarkNamespaceAsDeleted(
	nsName namespace.Name,
) error

func (*FunctionalTestBase) Namespace

func (s *FunctionalTestBase) Namespace() namespace.Name

func (*FunctionalTestBase) NamespaceID

func (s *FunctionalTestBase) NamespaceID() namespace.ID

func (*FunctionalTestBase) OperatorClient

func (*FunctionalTestBase) OverrideDynamicConfig

func (s *FunctionalTestBase) OverrideDynamicConfig(setting dynamicconfig.GenericSetting, value any) (cleanup func())

func (*FunctionalTestBase) RegisterNamespace

func (s *FunctionalTestBase) RegisterNamespace(
	nsName namespace.Name,
	retentionDays int32,
	archivalState enumspb.ArchivalState,
	historyArchivalURI string,
	visibilityArchivalURI string,
) (namespace.ID, error)

Register namespace using persistence API because:

  1. The Retention period is set to 0 for archival tests, and this can't be done through FE,
  2. Update search attributes would require an extra API call,
  3. One more extra API call would be necessary to get namespace.ID.

func (*FunctionalTestBase) RunTestWithMatchingBehavior

func (s *FunctionalTestBase) RunTestWithMatchingBehavior(subtest func())

func (*FunctionalTestBase) SdkClient

func (s *FunctionalTestBase) SdkClient() sdkclient.Client

func (*FunctionalTestBase) SdkWorker

func (s *FunctionalTestBase) SdkWorker() sdkworker.Worker

func (*FunctionalTestBase) SendSignal

func (s *FunctionalTestBase) SendSignal(nsName string, execution *commonpb.WorkflowExecution, signalName string,
	input *commonpb.Payloads, identity string) error

TODO (alex): change to nsName namespace.Name

func (*FunctionalTestBase) SetupSubTest

func (s *FunctionalTestBase) SetupSubTest()

func (*FunctionalTestBase) SetupSuite

func (s *FunctionalTestBase) SetupSuite()

func (*FunctionalTestBase) SetupSuiteWithCluster

func (s *FunctionalTestBase) SetupSuiteWithCluster(options ...TestClusterOption)

func (*FunctionalTestBase) SetupTest

func (s *FunctionalTestBase) SetupTest()

All test suites that inherit FunctionalTestBase and overwrite SetupTest must call this testcore FunctionalTestBase.SetupTest function to distribute the tests into partitions. Otherwise, the test suite will be executed multiple times in each partition.

func (*FunctionalTestBase) TaskPoller

func (s *FunctionalTestBase) TaskPoller() *taskpoller.TaskPoller

func (*FunctionalTestBase) TaskQueue

func (s *FunctionalTestBase) TaskQueue() string

func (*FunctionalTestBase) TearDownCluster

func (s *FunctionalTestBase) TearDownCluster()

func (*FunctionalTestBase) TearDownSubTest

func (s *FunctionalTestBase) TearDownSubTest()

**IMPORTANT**: When overridding this, make sure to invoke `s.FunctionalTestBase.TearDownSubTest()`.

func (*FunctionalTestBase) TearDownSuite

func (s *FunctionalTestBase) TearDownSuite()

func (*FunctionalTestBase) TearDownTest

func (s *FunctionalTestBase) TearDownTest()

**IMPORTANT**: When overridding this, make sure to invoke `s.FunctionalTestBase.TearDownTest()`.

func (*FunctionalTestBase) WaitForChannel

func (s *FunctionalTestBase) WaitForChannel(ctx context.Context, ch chan struct{})

func (*FunctionalTestBase) WorkerGRPCAddress

func (s *FunctionalTestBase) WorkerGRPCAddress() string

type HistoryConfig

type HistoryConfig struct {
	NumHistoryShards int32
	NumHistoryHosts  int
}

HistoryConfig contains configs for history service

type MatchingBehavior

type MatchingBehavior struct {
	ForceTaskForward bool
	ForcePollForward bool
	ForceAsync       bool
}

MatchingBehavior describes a test scenario for matching service behavior.

func AllMatchingBehaviors

func AllMatchingBehaviors() []MatchingBehavior

AllMatchingBehaviors returns all 8 combinations of matching behaviors for testing.

func (MatchingBehavior) InjectHooks

func (b MatchingBehavior) InjectHooks(env Env)

InjectHooks injects the test hooks for this matching behavior.

func (MatchingBehavior) Name

func (b MatchingBehavior) Name() string

Name returns a descriptive name for this behavior combination.

func (MatchingBehavior) Options

func (b MatchingBehavior) Options() []TestOption

Options returns the TestOptions to configure matching behavior.

type MatchingConfig

type MatchingConfig struct {
	NumMatchingHosts int
}

MatchingConfig is the config for the matching service

type PersistenceTestBaseFactory

type PersistenceTestBaseFactory interface {
	NewTestBase(options *persistencetests.TestBaseOptions) *persistencetests.TestBase
}

type PollAndProcessWorkflowTaskOptionFunc

type PollAndProcessWorkflowTaskOptionFunc func(*PollAndProcessWorkflowTaskOptions)

type PollAndProcessWorkflowTaskOptions

type PollAndProcessWorkflowTaskOptions struct {
	DumpHistory          bool
	DumpCommands         bool
	DropTask             bool
	PollSticky           bool
	RespondSticky        bool
	ExpectedAttemptCount int
	Retries              int
	ForceNewWorkflowTask bool
	QueryResult          *querypb.WorkflowQueryResult
}

type PollAndProcessWorkflowTaskResponse

type PollAndProcessWorkflowTaskResponse struct {
	IsQueryTask bool
	NewTask     *workflowservice.RespondWorkflowTaskCompletedResponse
}

type RecordedTask

type RecordedTask struct {
	Timestamp   time.Time  `json:"timestamp"`
	TaskType    string     `json:"taskType"` // The specific task type (e.g., "TASK_TYPE_ACTIVITY_RETRY_TIMER")
	ShardID     int32      `json:"shardId"`
	RangeID     int64      `json:"rangeId,omitempty"`
	NamespaceID string     `json:"namespaceId"`
	WorkflowID  string     `json:"workflowId"`
	RunID       string     `json:"runId"`
	Task        tasks.Task `json:"task"` // The actual task object
}

RecordedTask wraps a task with metadata about when and where it was written

type ReplicationStreamRecorder

type ReplicationStreamRecorder struct {
	// contains filtered or unexported fields
}

ReplicationStreamRecorder captures replication stream messages for testing

func NewReplicationStreamRecorder

func NewReplicationStreamRecorder() *ReplicationStreamRecorder

func (*ReplicationStreamRecorder) Clear

func (r *ReplicationStreamRecorder) Clear()

func (*ReplicationStreamRecorder) GetMessages

func (*ReplicationStreamRecorder) SetOutputFile

func (r *ReplicationStreamRecorder) SetOutputFile(filePath string)

SetOutputFile sets the file path for writing captured messages on-demand

func (*ReplicationStreamRecorder) StreamInterceptor

func (r *ReplicationStreamRecorder) StreamInterceptor(clusterName string) grpc.StreamClientInterceptor

StreamInterceptor returns a gRPC stream client interceptor that captures stream messages

func (*ReplicationStreamRecorder) StreamServerInterceptor

func (r *ReplicationStreamRecorder) StreamServerInterceptor(clusterName string) grpc.StreamServerInterceptor

StreamServerInterceptor returns a gRPC stream server interceptor that captures stream messages

func (*ReplicationStreamRecorder) UnaryInterceptor

func (r *ReplicationStreamRecorder) UnaryInterceptor(clusterName string) grpc.UnaryClientInterceptor

UnaryInterceptor returns a gRPC unary client interceptor that captures messages

func (*ReplicationStreamRecorder) UnaryServerInterceptor

func (r *ReplicationStreamRecorder) UnaryServerInterceptor(clusterName string) grpc.UnaryServerInterceptor

UnaryServerInterceptor returns a gRPC unary server interceptor that captures messages

func (*ReplicationStreamRecorder) WriteToLog

func (r *ReplicationStreamRecorder) WriteToLog() error

WriteToLog writes all captured messages to the configured output file

type TaskFilter

type TaskFilter struct {
	NamespaceID string // Required: namespace ID to filter by
	WorkflowID  string // Optional: workflow ID to filter by (empty string means no filter)
	RunID       string // Optional: run ID to filter by (empty string means no filter)
}

TaskFilter specifies criteria for filtering recorded tasks

type TaskMatcher

type TaskMatcher func(RecordedTask) bool

TaskMatcher is a function that tests whether a RecordedTask matches some criteria

type TaskPoller deprecated

type TaskPoller struct {
	Client                       workflowservice.WorkflowServiceClient
	Namespace                    string
	TaskQueue                    *taskqueuepb.TaskQueue
	StickyTaskQueue              *taskqueuepb.TaskQueue
	StickyScheduleToStartTimeout time.Duration
	Identity                     string
	WorkflowTaskHandler          WorkflowTaskHandler
	ActivityTaskHandler          ActivityTaskHandler
	QueryHandler                 QueryHandler
	MessageHandler               MessageHandler
	Logger                       log.Logger
	T                            *testing.T
}

Deprecated: TaskPoller is deprecated. Use taskpoller.TaskPoller instead. TaskPoller is used in functional tests to poll workflow or activity task queues.

func (*TaskPoller) HandlePartialWorkflowTask

func (p *TaskPoller) HandlePartialWorkflowTask(response *workflowservice.PollWorkflowTaskQueueResponse, forceCreateNewWorkflowTask bool) (*workflowservice.RespondWorkflowTaskCompletedResponse, error)

HandlePartialWorkflowTask for workflow task

func (*TaskPoller) PollAndProcessActivityTask

func (p *TaskPoller) PollAndProcessActivityTask(dropTask bool) error

PollAndProcessActivityTask for activity tasks

func (*TaskPoller) PollAndProcessActivityTaskWithID

func (p *TaskPoller) PollAndProcessActivityTaskWithID(dropTask bool) error

PollAndProcessActivityTaskWithID is similar to PollAndProcessActivityTask but using RespondActivityTask...ByID

func (*TaskPoller) PollAndProcessWorkflowTask

func (p *TaskPoller) PollAndProcessWorkflowTask(funcs ...PollAndProcessWorkflowTaskOptionFunc) (res PollAndProcessWorkflowTaskResponse, err error)

func (*TaskPoller) PollAndProcessWorkflowTaskWithOptions

func (p *TaskPoller) PollAndProcessWorkflowTaskWithOptions(opts *PollAndProcessWorkflowTaskOptions) (res PollAndProcessWorkflowTaskResponse, err error)

type TaskQueueRecorder

type TaskQueueRecorder struct {
	// contains filtered or unexported fields
}

TaskQueueRecorder wraps an ExecutionManager to record ALL task writes to the history task queues (transfer, timer, replication, visibility, archival, etc.). This is useful for integration tests where you want to assert on what tasks were generated and in what order. Tasks are stored flattened by category - all tasks of the same type are in a single list, with each task wrapped with metadata about when/where it was written.

func NewTaskQueueRecorder

func NewTaskQueueRecorder(delegate persistence.ExecutionManager, logger log.Logger) *TaskQueueRecorder

NewTaskQueueRecorder creates a recorder that wraps the given ExecutionManager

func (*TaskQueueRecorder) AddHistoryTasks

func (r *TaskQueueRecorder) AddHistoryTasks(
	ctx context.Context,
	request *persistence.AddHistoryTasksRequest,
) error

AddHistoryTasks records the task write and then delegates to the underlying manager

func (*TaskQueueRecorder) AppendHistoryNodes

func (*TaskQueueRecorder) AppendRawHistoryNodes

func (*TaskQueueRecorder) Close

func (r *TaskQueueRecorder) Close()

func (*TaskQueueRecorder) CompleteHistoryTask

func (r *TaskQueueRecorder) CompleteHistoryTask(
	ctx context.Context,
	request *persistence.CompleteHistoryTaskRequest,
) error

func (*TaskQueueRecorder) CountMatchingTasks

func (r *TaskQueueRecorder) CountMatchingTasks(category tasks.Category, matcher TaskMatcher) int

CountMatchingTasks returns the count of tasks in a category that match the given matcher

func (*TaskQueueRecorder) CountTasksForNamespace

func (r *TaskQueueRecorder) CountTasksForNamespace(
	category tasks.Category,
	namespaceID string,
	matcher TaskMatcher,
) int

CountTasksForNamespace returns the count of tasks in a category for a specific namespace

func (*TaskQueueRecorder) CountTasksForWorkflow

func (r *TaskQueueRecorder) CountTasksForWorkflow(
	category tasks.Category,
	namespaceID string,
	workflowID string,
	runID string,
	matcher TaskMatcher,
) int

CountTasksForWorkflow returns the count of tasks in a category for a specific workflow If namespaceID is empty, it matches any namespace If runID is empty, it matches any runID for the given workflowID

func (*TaskQueueRecorder) DeleteCurrentWorkflowExecution

func (r *TaskQueueRecorder) DeleteCurrentWorkflowExecution(
	ctx context.Context,
	request *persistence.DeleteCurrentWorkflowExecutionRequest,
) error

func (*TaskQueueRecorder) DeleteHistoryBranch

func (r *TaskQueueRecorder) DeleteHistoryBranch(
	ctx context.Context,
	request *persistence.DeleteHistoryBranchRequest,
) error

func (*TaskQueueRecorder) DeleteReplicationTaskFromDLQ

func (r *TaskQueueRecorder) DeleteReplicationTaskFromDLQ(
	ctx context.Context,
	request *persistence.DeleteReplicationTaskFromDLQRequest,
) error

func (*TaskQueueRecorder) DeleteWorkflowExecution

func (r *TaskQueueRecorder) DeleteWorkflowExecution(
	ctx context.Context,
	request *persistence.DeleteWorkflowExecutionRequest,
) error

func (*TaskQueueRecorder) ForkHistoryBranch

func (*TaskQueueRecorder) GetAllRecordedTasks

func (r *TaskQueueRecorder) GetAllRecordedTasks() map[tasks.Category][]RecordedTask

GetAllRecordedTasks returns all recorded tasks WITH metadata, grouped by category

func (*TaskQueueRecorder) GetAllTasks

func (r *TaskQueueRecorder) GetAllTasks() map[tasks.Category][]tasks.Task

GetAllTasks returns all tasks grouped by category (unwrapped, without metadata)

func (*TaskQueueRecorder) GetCurrentExecution

func (*TaskQueueRecorder) GetHistoryBranchUtil

func (r *TaskQueueRecorder) GetHistoryBranchUtil() persistence.HistoryBranchUtil

func (*TaskQueueRecorder) GetHistoryTasks

func (*TaskQueueRecorder) GetName

func (r *TaskQueueRecorder) GetName() string

func (*TaskQueueRecorder) GetRecordedTasksByCategoryFiltered

func (r *TaskQueueRecorder) GetRecordedTasksByCategoryFiltered(category tasks.Category, filter TaskFilter) []RecordedTask

GetRecordedTasksByCategoryFiltered returns recorded tasks WITH metadata for a specific category, filtered by namespace (required) and optionally by workflow ID and run ID. This is the preferred API for tests to ensure tasks are properly scoped.

func (*TaskQueueRecorder) GetReplicationTasksFromDLQ

func (*TaskQueueRecorder) GetWorkflowExecution

func (*TaskQueueRecorder) IsReplicationDLQEmpty

func (r *TaskQueueRecorder) IsReplicationDLQEmpty(
	ctx context.Context,
	request *persistence.GetReplicationTasksFromDLQRequest,
) (bool, error)

func (*TaskQueueRecorder) MatchTasks

func (r *TaskQueueRecorder) MatchTasks(category tasks.Category, matcher TaskMatcher) []RecordedTask

MatchTasks returns all tasks in a category that match the given matcher function

func (*TaskQueueRecorder) MatchTasksForNamespace

func (r *TaskQueueRecorder) MatchTasksForNamespace(
	category tasks.Category,
	namespaceID string,
	matcher TaskMatcher,
) []RecordedTask

MatchTasksForNamespace returns all tasks in a category for a specific namespace

func (*TaskQueueRecorder) MatchTasksForWorkflow

func (r *TaskQueueRecorder) MatchTasksForWorkflow(
	category tasks.Category,
	namespaceID string,
	workflowID string,
	runID string,
	matcher TaskMatcher,
) []RecordedTask

MatchTasksForWorkflow returns all tasks in a category for a specific workflow If namespaceID is empty, it matches any namespace If runID is empty, it matches any runID for the given workflowID

func (*TaskQueueRecorder) PutReplicationTaskToDLQ

func (r *TaskQueueRecorder) PutReplicationTaskToDLQ(
	ctx context.Context,
	request *persistence.PutReplicationTaskToDLQRequest,
) error

func (*TaskQueueRecorder) RangeCompleteHistoryTasks

func (r *TaskQueueRecorder) RangeCompleteHistoryTasks(
	ctx context.Context,
	request *persistence.RangeCompleteHistoryTasksRequest,
) error

func (*TaskQueueRecorder) RangeDeleteReplicationTaskFromDLQ

func (r *TaskQueueRecorder) RangeDeleteReplicationTaskFromDLQ(
	ctx context.Context,
	request *persistence.RangeDeleteReplicationTaskFromDLQRequest,
) error

func (*TaskQueueRecorder) ReadHistoryBranch

func (*TaskQueueRecorder) ReadHistoryBranchByBatch

func (*TaskQueueRecorder) ReadRawHistoryBranch

func (*TaskQueueRecorder) SetWorkflowExecution

func (*TaskQueueRecorder) TrimHistoryBranch

func (*TaskQueueRecorder) WriteToLog

func (r *TaskQueueRecorder) WriteToLog(filePath string) error

WriteToLog writes all captured tasks to a file in JSON format

type TemporalImpl

type TemporalImpl struct {
	// contains filtered or unexported fields
}

func (*TemporalImpl) AdminClient

func (c *TemporalImpl) AdminClient() adminservice.AdminServiceClient

func (*TemporalImpl) Authorize

func (*TemporalImpl) CaptureMetricsHandler

func (c *TemporalImpl) CaptureMetricsHandler() *metricstest.CaptureHandler

func (*TemporalImpl) ChasmEngine

func (c *TemporalImpl) ChasmEngine() (chasm.Engine, error)

func (*TemporalImpl) ChasmVisibilityManager

func (c *TemporalImpl) ChasmVisibilityManager() chasm.VisibilityManager

func (*TemporalImpl) DcClient

func (c *TemporalImpl) DcClient() *dynamicconfig.MemoryClient

func (*TemporalImpl) FrontendClient

func (*TemporalImpl) FrontendGRPCAddress

func (c *TemporalImpl) FrontendGRPCAddress() string

func (*TemporalImpl) FrontendHTTPAddress

func (c *TemporalImpl) FrontendHTTPAddress() string

func (*TemporalImpl) GetCHASMRegistry

func (c *TemporalImpl) GetCHASMRegistry() *chasm.Registry

func (*TemporalImpl) GetClaims

func (c *TemporalImpl) GetClaims(authInfo *authorization.AuthInfo) (*authorization.Claims, error)

func (*TemporalImpl) GetExecutionManager

func (c *TemporalImpl) GetExecutionManager() persistence.ExecutionManager

func (*TemporalImpl) GetGrpcClientInterceptor

func (c *TemporalImpl) GetGrpcClientInterceptor() *grpcinject.Interceptor

func (*TemporalImpl) GetMetricsHandler

func (c *TemporalImpl) GetMetricsHandler() metrics.Handler

func (*TemporalImpl) GetTLSConfigProvider

func (c *TemporalImpl) GetTLSConfigProvider() encryption.TLSConfigProvider

func (*TemporalImpl) GetTaskCategoryRegistry

func (c *TemporalImpl) GetTaskCategoryRegistry() tasks.TaskCategoryRegistry

func (*TemporalImpl) GetTaskQueueRecorder

func (c *TemporalImpl) GetTaskQueueRecorder() *TaskQueueRecorder

func (*TemporalImpl) HistoryClient

func (*TemporalImpl) MatchingClient

func (*TemporalImpl) NamespaceRegistries

func (c *TemporalImpl) NamespaceRegistries() []namespace.Registry

func (*TemporalImpl) OperatorClient

func (*TemporalImpl) RemoteFrontendGRPCAddress

func (c *TemporalImpl) RemoteFrontendGRPCAddress() string

Use this to get an address for a remote cluster to connect to.

func (*TemporalImpl) SchedulerClient

func (c *TemporalImpl) SchedulerClient() schedulerpb.SchedulerServiceClient

func (*TemporalImpl) SetOnAuthorize

func (*TemporalImpl) SetOnGetClaims

func (c *TemporalImpl) SetOnGetClaims(fn func(*authorization.AuthInfo) (*authorization.Claims, error))

func (*TemporalImpl) SetTaskQueueRecorder

func (c *TemporalImpl) SetTaskQueueRecorder(recorder *TaskQueueRecorder)

func (*TemporalImpl) Start

func (c *TemporalImpl) Start() error

func (*TemporalImpl) Stop

func (c *TemporalImpl) Stop() error

func (*TemporalImpl) TlsConfigProvider

func (c *TemporalImpl) TlsConfigProvider() *encryption.FixedTLSConfigProvider

func (*TemporalImpl) WorkerGRPCAddress

func (c *TemporalImpl) WorkerGRPCAddress() string

type TemporalParams

type TemporalParams struct {
	ClusterMetadataConfig            *cluster.Config
	PersistenceConfig                config.Persistence
	MetadataMgr                      persistence.MetadataManager
	ClusterMetadataManager           persistence.ClusterMetadataManager
	ShardMgr                         persistence.ShardManager
	ExecutionManager                 persistence.ExecutionManager
	TaskMgr                          persistence.TaskManager
	NamespaceReplicationQueue        persistence.NamespaceReplicationQueue
	AbstractDataStoreFactory         persistenceClient.AbstractDataStoreFactory
	VisibilityStoreFactory           visibility.VisibilityStoreFactory
	Logger                           log.Logger
	ArchiverMetadata                 carchiver.ArchivalMetadata
	ArchiverProvider                 provider.ArchiverProvider
	EnableReadHistoryFromArchival    bool
	FrontendConfig                   FrontendConfig
	HistoryConfig                    HistoryConfig
	MatchingConfig                   MatchingConfig
	WorkerConfig                     WorkerConfig
	ESConfig                         *esclient.Config
	ESClient                         esclient.Client
	MockAdminClient                  map[string]adminservice.AdminServiceClient
	NamespaceReplicationTaskExecutor nsreplication.TaskExecutor
	DynamicConfigOverrides           map[dynamicconfig.Key]any
	TLSConfigProvider                *encryption.FixedTLSConfigProvider
	CaptureMetricsHandler            *metricstest.CaptureHandler
	// ServiceFxOptions is populated by WithFxOptionsForService.
	ServiceFxOptions         map[primitives.ServiceName][]fx.Option
	TaskCategoryRegistry     tasks.TaskCategoryRegistry
	HostsByProtocolByService map[transferProtocol]map[primitives.ServiceName]static.Hosts
	SpanExporters            map[telemetry.SpanExporterType]sdktrace.SpanExporter
}

TemporalParams contains everything needed to bootstrap Temporal

type TestCluster

type TestCluster struct {
	// contains filtered or unexported fields
}

TestCluster is a testcore struct for functional tests

func (*TestCluster) AdminClient

func (tc *TestCluster) AdminClient() adminservice.AdminServiceClient

func (*TestCluster) ArchiverBase

func (tc *TestCluster) ArchiverBase() *ArchiverBase

func (*TestCluster) ClusterName

func (tc *TestCluster) ClusterName() string

func (*TestCluster) ExecutionManager

func (tc *TestCluster) ExecutionManager() persistence.ExecutionManager

ExecutionManager returns an execution manager factory from the test cluster

func (*TestCluster) FrontendClient

func (tc *TestCluster) FrontendClient() workflowservice.WorkflowServiceClient

func (*TestCluster) GetReplicationStreamRecorder

func (tc *TestCluster) GetReplicationStreamRecorder() *ReplicationStreamRecorder

func (*TestCluster) GetTaskQueueRecorder

func (tc *TestCluster) GetTaskQueueRecorder() *TaskQueueRecorder

func (*TestCluster) HistoryClient

func (tc *TestCluster) HistoryClient() historyservice.HistoryServiceClient

HistoryClient returns a history client from the test cluster

func (*TestCluster) Host

func (tc *TestCluster) Host() *TemporalImpl

TODO (alex): expose only needed objects from TemporalImpl.

func (*TestCluster) MatchingClient

func (tc *TestCluster) MatchingClient() matchingservice.MatchingServiceClient

MatchingClient returns a matching client from the test cluster

func (*TestCluster) OperatorClient

func (tc *TestCluster) OperatorClient() operatorservice.OperatorServiceClient

func (*TestCluster) OverrideDynamicConfig

func (tc *TestCluster) OverrideDynamicConfig(t *testing.T, key dynamicconfig.GenericSetting, value any) (cleanup func())

func (*TestCluster) SchedulerClient

func (tc *TestCluster) SchedulerClient() schedulerpb.SchedulerServiceClient

SchedulerClient returns a scheduler client from the test cluster

func (*TestCluster) TearDownCluster

func (tc *TestCluster) TearDownCluster() error

TearDownCluster tears down the test cluster

func (*TestCluster) TestBase

func (tc *TestCluster) TestBase() *persistencetests.TestBase

TODO (alex): remove this method. Replace usages with concrete methods.

func (*TestCluster) WorkerGRPCAddress

func (tc *TestCluster) WorkerGRPCAddress() string

type TestClusterConfig

type TestClusterConfig struct {
	EnableArchival         bool
	IsMasterCluster        bool
	ClusterMetadata        cluster.Config
	Persistence            persistencetests.TestBaseOptions
	FrontendConfig         FrontendConfig
	HistoryConfig          HistoryConfig
	MatchingConfig         MatchingConfig
	WorkerConfig           WorkerConfig
	ESConfig               *esclient.Config
	MockAdminClient        map[string]adminservice.AdminServiceClient
	FaultInjection         *config.FaultInjection
	DynamicConfigOverrides map[dynamicconfig.Key]any
	EnableMTLS             bool
	EnableMetricsCapture   bool
	SpanExporters          map[telemetry.SpanExporterType]sdktrace.SpanExporter
	// ServiceFxOptions can be populated using WithFxOptionsForService.
	ServiceFxOptions map[primitives.ServiceName][]fx.Option
}

TestClusterConfig are config for a test cluster

type TestClusterFactory

type TestClusterFactory interface {
	NewCluster(t *testing.T, clusterConfig *TestClusterConfig, logger log.Logger) (*TestCluster, error)
}

func NewTestClusterFactory

func NewTestClusterFactory() TestClusterFactory

func NewTestClusterFactoryWithCustomTestBaseFactory

func NewTestClusterFactoryWithCustomTestBaseFactory(tbFactory PersistenceTestBaseFactory) TestClusterFactory

type TestClusterOption

type TestClusterOption func(params *TestClusterParams)

func WithArchivalEnabled

func WithArchivalEnabled() TestClusterOption

func WithDynamicConfigOverrides

func WithDynamicConfigOverrides(overrides map[dynamicconfig.Key]any) TestClusterOption

func WithFaultInjectionConfig

func WithFaultInjectionConfig(cfg *config.FaultInjection) TestClusterOption

func WithFxOptionsForService

func WithFxOptionsForService(serviceName primitives.ServiceName, options ...fx.Option) TestClusterOption

WithFxOptionsForService returns an Option which, when passed as an argument to setupSuite, will append the given list of fx options to the end of the arguments to the fx.New call for the given service. For example, if you want to obtain the shard controller for the history service, you can do this:

var shardController shard.Controller
s.setupSuite(t, tests.WithFxOptionsForService(primitives.HistoryService, fx.Populate(&shardController)))
// now you can use shardController during your test

This is similar to the pattern of plumbing dependencies through the TestClusterConfig, but it's much more convenient, scalable and flexible. The reason we need to do this on a per-service basis is that there are separate fx apps for each one.

func WithMTLS

func WithMTLS() TestClusterOption

func WithNumHistoryShards

func WithNumHistoryShards(n int32) TestClusterOption

func WithSharedCluster

func WithSharedCluster() TestClusterOption

type TestClusterParams

type TestClusterParams struct {
	ServiceOptions         map[primitives.ServiceName][]fx.Option
	DynamicConfigOverrides map[dynamicconfig.Key]any
	ArchivalEnabled        bool
	EnableMTLS             bool
	FaultInjectionConfig   *config.FaultInjection
	NumHistoryShards       int32
	SharedCluster          bool
}

TestClusterParams contains the variables which are used to configure test cluster via the TestClusterOption type.

func ApplyTestClusterOptions

func ApplyTestClusterOptions(options []TestClusterOption) TestClusterParams

type TestDataConverter

type TestDataConverter struct {
	NumOfCallToPayloads   int // for testing to know testDataConverter is called as expected
	NumOfCallFromPayloads int
}

TestDataConverter implements encoded.DataConverter using gob

func (*TestDataConverter) FromPayload

func (tdc *TestDataConverter) FromPayload(payload *commonpb.Payload, valuePtr any) error

func (*TestDataConverter) FromPayloads

func (tdc *TestDataConverter) FromPayloads(payloads *commonpb.Payloads, valuePtrs ...any) error

func (*TestDataConverter) ToPayload

func (tdc *TestDataConverter) ToPayload(value any) (*commonpb.Payload, error)

func (*TestDataConverter) ToPayloads

func (tdc *TestDataConverter) ToPayloads(values ...any) (*commonpb.Payloads, error)

func (*TestDataConverter) ToString

func (tdc *TestDataConverter) ToString(payload *commonpb.Payload) string

func (*TestDataConverter) ToStrings

func (tdc *TestDataConverter) ToStrings(payloads *commonpb.Payloads) []string

type TestEnv

type TestEnv struct {
	*FunctionalTestBase
	*require.Assertions
	historyrequire.HistoryRequire

	Logger log.Logger
	// contains filtered or unexported fields
}

func NewEnv

func NewEnv(t *testing.T, opts ...TestOption) *TestEnv

NewEnv creates a new test environment with access to a Temporal cluster.

By default, tests are marked as parallel. Use MustRunSequential on the test's parent `testing.T` to run them sequentially instead.

func (*TestEnv) Context

func (e *TestEnv) Context() context.Context

Context returns the test-level timeout context with RPC version headers already included. This context will be canceled when the test timeout occurs. Use this directly for all RPC operations - no need to wrap with NewContext or add headers manually.

For custom timeouts, use:

ctx, cancel := context.WithTimeout(env.Context(), 10*time.Second)
defer cancel()

func (*TestEnv) InjectHook

func (e *TestEnv) InjectHook(hook testhooks.Hook) (cleanup func())

InjectHook sets a test hook inside the cluster.

It auto-detects the scope from the hook: - For namespace-scoped hooks: scopes it to the test's namespace - For global hooks: requires a dedicated cluster (fails early if used on shared cluster)

func (*TestEnv) Namespace

func (e *TestEnv) Namespace() namespace.Name

Use test env-specific namespace here for test isolation.

func (*TestEnv) NamespaceID

func (e *TestEnv) NamespaceID() namespace.ID

func (*TestEnv) OverrideDynamicConfig

func (e *TestEnv) OverrideDynamicConfig(setting dynamicconfig.GenericSetting, value any) (cleanup func())

OverrideDynamicConfig overrides a dynamic config setting for the duration of this test. For settings that can be namespace-scoped, a namespace constraint is applied. All others cannot be applied to a shared cluster and require `WithDedicatedCluster`.

func (*TestEnv) SdkClient

func (e *TestEnv) SdkClient() sdkclient.Client

SdkClient returns the SDK client created by WithSdkWorker. Panics if WithSdkWorker was not passed to NewEnv.

func (*TestEnv) SdkWorker

func (e *TestEnv) SdkWorker() sdkworker.Worker

SdkWorker returns the SDK worker created by WithSdkWorker. Panics if WithSdkWorker was not passed to NewEnv.

func (*TestEnv) T

func (e *TestEnv) T() *testing.T

func (*TestEnv) TaskPoller

func (e *TestEnv) TaskPoller() *taskpoller.TaskPoller

func (*TestEnv) Tv

func (e *TestEnv) Tv() *testvars.TestVars

func (*TestEnv) WorkerTaskQueue

func (e *TestEnv) WorkerTaskQueue() string

WorkerTaskQueue returns the task queue name used by the SDK Worker. Panics if WithSdkWorker was not passed to NewEnv.

type TestOption

type TestOption func(*testOptions)

func WithDedicatedCluster

func WithDedicatedCluster() TestOption

WithDedicatedCluster requests a dedicated (non-shared) cluster for the test. Use this for tests that have cluster-global side effects.

func WithDynamicConfig

func WithDynamicConfig(setting dynamicconfig.GenericSetting, value any) TestOption

WithDynamicConfig overrides a dynamic config setting for the test. For settings that can be namespace-scoped, a namespace constraint is applied. For all others that require a dedicated cluster, this implies `WithDedicatedCluster`.

func WithSdkWorker

func WithSdkWorker() TestOption

WithSdkWorker sets up an SDK client and worker for the test. Cleanup is handled automatically via t.Cleanup().

func WithTimeout

func WithTimeout(duration time.Duration) TestOption

WithTimeout sets a custom timeout for the test. The test will fail if it runs longer than this duration. The timeout is multiplied by debug.TimeoutMultiplier when debugging. The TEMPORAL_TEST_TIMEOUT environment variable can also set the default timeout in seconds.

type WorkerConfig

type WorkerConfig struct {
	NumWorkers    int
	DisableWorker bool // overrides NumWorkers
}

WorkerConfig is the config for the worker service

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL