testcore

package
v1.31.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 29, 2026 License: MIT Imports: 126 Imported by: 1

Documentation

Index

Constants

View Source
const (
	DefaultPageSize   = 5
	PprofTestPort     = 7000
	TlsCertCommonName = "my-common-name"
	ClientSuiteLimit  = 10
	// TODO (alex): replace all sleeps with WaitForESToSettle with s.Eventually()
	WaitForESToSettle = 4 * time.Second // wait es shards for some time ensure data consistent
)
View Source
const (
	DirectionSend       = "send"
	DirectionRecv       = "recv"
	DirectionServerSend = "server_send"
	DirectionServerRecv = "server_recv"
)

Message direction constants

View Source
const NamespaceCacheRefreshInterval = time.Second

Variables

View Source
var (
	ErrEncodingIsNotSet       = errors.New("payload encoding metadata is not set")
	ErrEncodingIsNotSupported = errors.New("payload encoding is not supported")
)
View Source
var (
	ErrNoTasks = errors.New("no tasks")
)

Functions

func DecodeString

func DecodeString(t require.TestingT, pls *commonpb.Payloads) string

func EventBatchesToVersionHistory

func EventBatchesToVersionHistory(
	versionHistory *historyspb.VersionHistory,
	eventBatches []*historypb.History,
) (*historyspb.VersionHistory, error)

func ExtractReplicationMessages added in v1.30.0

func ExtractReplicationMessages(msg proto.Message) *replicationspb.WorkflowReplicationMessages

ExtractReplicationMessages extracts WorkflowReplicationMessages from a proto message. This is a helper for tests that need to inspect replication message contents.

func GetPersistenceTestDefaults added in v1.31.0

func GetPersistenceTestDefaults() persistencetests.TestBaseOptions

GetPersistenceTestDefaults returns the default persistence options based on CLI flags. Use this when creating TestClusterConfig to ensure proper database configuration.

func MustToPayload added in v1.31.0

func MustToPayload(t require.TestingT, v any) *commonpb.Payload

MustToPayload converts a value to a Payload using the default data converter.

func NewContext

func NewContext(parent ...context.Context) context.Context

NewContext creates a context with default 90-second timeout and RPC headers.

NOTE: If you're using testcore.NewEnv, you can use env.Context() directly - it already includes RPC headers. This function is primarily for legacy tests or creating standalone contexts outside of the TestEnv framework.

If a parent context is provided, the returned context will be canceled when either the timeout expires OR the parent is canceled.

func NewTestDataConverter

func NewTestDataConverter() converter.DataConverter

TODO (alex): use it by default SdkCleint everywhere?

func RandomizeStr

func RandomizeStr(id string) string

func RandomizedNexusEndpoint

func RandomizedNexusEndpoint(name string) string

func UseCassandraPersistence added in v1.29.0

func UseCassandraPersistence() bool

func UseSQLVisibility added in v1.27.0

func UseSQLVisibility() bool

func WithDumpHistory

func WithDumpHistory(o *PollAndProcessWorkflowTaskOptions)

func WithForceNewWorkflowTask

func WithForceNewWorkflowTask(o *PollAndProcessWorkflowTaskOptions)

func WithNoDumpCommands

func WithNoDumpCommands(o *PollAndProcessWorkflowTaskOptions)

func WithPollSticky

func WithPollSticky(o *PollAndProcessWorkflowTaskOptions)

func WithRespondSticky

func WithRespondSticky(o *PollAndProcessWorkflowTaskOptions)

func WithoutRetries

func WithoutRetries(o *PollAndProcessWorkflowTaskOptions)

Types

type ArchiverBase

type ArchiverBase struct {
	// contains filtered or unexported fields
}

ArchiverBase is a testcore struct for archiver provider being used in functional tests

func (*ArchiverBase) HistoryURI

func (a *ArchiverBase) HistoryURI() string

func (*ArchiverBase) Metadata

func (a *ArchiverBase) Metadata() archiver.ArchivalMetadata

func (*ArchiverBase) Provider

func (a *ArchiverBase) Provider() provider.ArchiverProvider

func (*ArchiverBase) VisibilityURI

func (a *ArchiverBase) VisibilityURI() string

type CapturedReplicationMessage added in v1.30.0

type CapturedReplicationMessage struct {
	Timestamp     string          `json:"timestamp"`
	Method        string          `json:"method"`
	Direction     string          `json:"direction"`
	ClusterName   string          `json:"clusterName"`
	TargetAddress string          `json:"targetAddress"`
	MessageType   string          `json:"messageType"`
	IsStreamCall  bool            `json:"isStreamCall"`
	Request       proto.Message   `json:"-"` // Don't marshal directly
	Response      proto.Message   `json:"-"` // Don't marshal directly
	Message       json.RawMessage `json:"message,omitempty"`
}

CapturedReplicationMessage represents a captured replication message

type Env added in v1.31.0

type Env interface {
	// T returns the *testing.T. Deprecated: use the suite's T() method instead.
	T() *testing.T
	Namespace() namespace.Name
	NamespaceID() namespace.ID
	FrontendClient() workflowservice.WorkflowServiceClient
	AdminClient() adminservice.AdminServiceClient
	GetTestCluster() *TestCluster
	CloseShard(namespaceID string, workflowID string)
	OverrideDynamicConfig(setting dynamicconfig.GenericSetting, value any) (cleanup func())
	Context() context.Context
	InjectHook(hook testhooks.Hook) (cleanup func())
}

type FrontendConfig

type FrontendConfig struct {
	NumFrontendHosts int
}

FrontendConfig is the config for the frontend service

type FunctionalTestBase

type FunctionalTestBase struct {
	suite.Suite

	// `suite.Suite` embeds `*assert.Assertions` which, by default, makes all asserts (like `s.NoError(err)`)
	// only log the error, continue test execution, and only then fail the test.
	// This is not desired behavior in most cases. The idiomatic way to change this behavior
	// is to replace `*assert.Assertions` with `*require.Assertions` by embedding it in every test suite
	// (or base struct of every test suite).
	*require.Assertions

	protorequire.ProtoAssertions
	historyrequire.HistoryRequire
	updateutils.UpdateUtils

	Logger log.Logger
	// contains filtered or unexported fields
}

func (*FunctionalTestBase) AdminClient

func (*FunctionalTestBase) CloseShard added in v1.31.0

func (s *FunctionalTestBase) CloseShard(namespaceID string, workflowID string)

CloseShard closes the shard that contains the given workflow. This is a cluster-global operation and cannot be called on shared clusters.

func (*FunctionalTestBase) Context added in v1.31.0

func (s *FunctionalTestBase) Context() context.Context

Context returns a context with RPC headers for use in this test.

func (*FunctionalTestBase) DecodePayloadsByteSliceInt32

func (s *FunctionalTestBase) DecodePayloadsByteSliceInt32(ps *commonpb.Payloads) (r int32)

func (*FunctionalTestBase) DecodePayloadsInt

func (s *FunctionalTestBase) DecodePayloadsInt(ps *commonpb.Payloads) int

func (*FunctionalTestBase) DecodePayloadsString

func (s *FunctionalTestBase) DecodePayloadsString(ps *commonpb.Payloads) string

func (*FunctionalTestBase) DurationNear

func (s *FunctionalTestBase) DurationNear(value, target, tolerance time.Duration)

func (*FunctionalTestBase) ExternalNamespace added in v1.29.0

func (s *FunctionalTestBase) ExternalNamespace() namespace.Name

func (*FunctionalTestBase) FrontendClient

func (*FunctionalTestBase) FrontendGRPCAddress

func (s *FunctionalTestBase) FrontendGRPCAddress() string

func (*FunctionalTestBase) GetHistory

func (s *FunctionalTestBase) GetHistory(namespace string, execution *commonpb.WorkflowExecution) []*historypb.HistoryEvent

func (*FunctionalTestBase) GetHistoryFunc

func (s *FunctionalTestBase) GetHistoryFunc(namespace string, execution *commonpb.WorkflowExecution) func() []*historypb.HistoryEvent

func (*FunctionalTestBase) GetNamespaceID

func (s *FunctionalTestBase) GetNamespaceID(namespace string) string

func (*FunctionalTestBase) GetTestCluster

func (s *FunctionalTestBase) GetTestCluster() *TestCluster

func (*FunctionalTestBase) GetTestClusterConfig

func (s *FunctionalTestBase) GetTestClusterConfig() *TestClusterConfig

func (*FunctionalTestBase) HttpAPIAddress

func (s *FunctionalTestBase) HttpAPIAddress() string

func (*FunctionalTestBase) InjectHook added in v1.27.0

func (s *FunctionalTestBase) InjectHook(hook testhooks.Hook) (cleanup func())

InjectHook sets a test hook inside the cluster.

func (*FunctionalTestBase) MarkNamespaceAsDeleted added in v1.27.0

func (s *FunctionalTestBase) MarkNamespaceAsDeleted(
	nsName namespace.Name,
) error

func (*FunctionalTestBase) Namespace

func (s *FunctionalTestBase) Namespace() namespace.Name

func (*FunctionalTestBase) NamespaceID added in v1.27.0

func (s *FunctionalTestBase) NamespaceID() namespace.ID

func (*FunctionalTestBase) OperatorClient

func (*FunctionalTestBase) OverrideDynamicConfig

func (s *FunctionalTestBase) OverrideDynamicConfig(setting dynamicconfig.GenericSetting, value any) (cleanup func())

func (*FunctionalTestBase) RegisterNamespace added in v1.27.0

func (s *FunctionalTestBase) RegisterNamespace(
	nsName namespace.Name,
	retentionDays int32,
	archivalState enumspb.ArchivalState,
	historyArchivalURI string,
	visibilityArchivalURI string,
) (namespace.ID, error)

Register namespace using persistence API because:

  1. The Retention period is set to 0 for archival tests, and this can't be done through FE,
  2. Update search attributes would require an extra API call,
  3. One more extra API call would be necessary to get namespace.ID.

func (*FunctionalTestBase) RunTestWithMatchingBehavior

func (s *FunctionalTestBase) RunTestWithMatchingBehavior(subtest func())

func (*FunctionalTestBase) SdkClient added in v1.28.0

func (s *FunctionalTestBase) SdkClient() sdkclient.Client

func (*FunctionalTestBase) SdkWorker added in v1.31.0

func (s *FunctionalTestBase) SdkWorker() sdkworker.Worker

func (*FunctionalTestBase) SendSignal added in v1.27.0

func (s *FunctionalTestBase) SendSignal(nsName string, execution *commonpb.WorkflowExecution, signalName string,
	input *commonpb.Payloads, identity string) error

TODO (alex): change to nsName namespace.Name

func (*FunctionalTestBase) SendToChannel added in v1.31.0

func (s *FunctionalTestBase) SendToChannel(ctx context.Context, ch chan struct{})

func (*FunctionalTestBase) SetupSubTest added in v1.27.0

func (s *FunctionalTestBase) SetupSubTest()

func (*FunctionalTestBase) SetupSuite

func (s *FunctionalTestBase) SetupSuite()

func (*FunctionalTestBase) SetupSuiteWithCluster added in v1.27.0

func (s *FunctionalTestBase) SetupSuiteWithCluster(options ...TestClusterOption)

func (*FunctionalTestBase) SetupTest

func (s *FunctionalTestBase) SetupTest()

All test suites that inherit FunctionalTestBase and overwrite SetupTest must call this testcore FunctionalTestBase.SetupTest function to distribute the tests into partitions. Otherwise, the test suite will be executed multiple times in each partition.

func (*FunctionalTestBase) TaskPoller added in v1.28.0

func (s *FunctionalTestBase) TaskPoller() *taskpoller.TaskPoller

func (*FunctionalTestBase) TaskQueue added in v1.28.0

func (s *FunctionalTestBase) TaskQueue() string

func (*FunctionalTestBase) TearDownCluster added in v1.27.0

func (s *FunctionalTestBase) TearDownCluster()

func (*FunctionalTestBase) TearDownSubTest added in v1.28.0

func (s *FunctionalTestBase) TearDownSubTest()

**IMPORTANT**: When overridding this, make sure to invoke `s.FunctionalTestBase.TearDownSubTest()`.

func (*FunctionalTestBase) TearDownSuite

func (s *FunctionalTestBase) TearDownSuite()

func (*FunctionalTestBase) TearDownTest added in v1.28.0

func (s *FunctionalTestBase) TearDownTest()

**IMPORTANT**: When overridding this, make sure to invoke `s.FunctionalTestBase.TearDownTest()`.

func (*FunctionalTestBase) WaitForChannel added in v1.27.0

func (s *FunctionalTestBase) WaitForChannel(ctx context.Context, ch chan struct{})

func (*FunctionalTestBase) WorkerGRPCAddress added in v1.31.0

func (s *FunctionalTestBase) WorkerGRPCAddress() string

type GlobalMetricCapture added in v1.31.0

type GlobalMetricCapture struct {
	// contains filtered or unexported fields
}

func (*GlobalMetricCapture) CollectMetric added in v1.31.0

CollectMetric returns the recordings for the named metric that the caller chooses to keep.

func (*GlobalMetricCapture) Metric added in v1.31.0

type HistoryConfig

type HistoryConfig struct {
	NumHistoryShards int32
	NumHistoryHosts  int
}

HistoryConfig contains configs for history service

type MatchingBehavior added in v1.31.0

type MatchingBehavior struct {
	ForceTaskForward bool
	ForcePollForward bool
	ForceAsync       bool
}

MatchingBehavior describes a test scenario for matching service behavior.

func AllMatchingBehaviors added in v1.31.0

func AllMatchingBehaviors() []MatchingBehavior

AllMatchingBehaviors returns all 8 combinations of matching behaviors for testing.

func (MatchingBehavior) InjectHooks added in v1.31.0

func (b MatchingBehavior) InjectHooks(env hookInjector)

InjectHooks injects the test hooks for this matching behavior.

func (MatchingBehavior) Name added in v1.31.0

func (b MatchingBehavior) Name() string

Name returns a descriptive name for this behavior combination.

func (MatchingBehavior) Options added in v1.31.0

func (b MatchingBehavior) Options() []TestOption

Options returns the TestOptions to configure matching behavior.

type MatchingConfig

type MatchingConfig struct {
	NumMatchingHosts int
}

MatchingConfig is the config for the matching service

type NamespaceMetricCapture added in v1.31.0

type NamespaceMetricCapture struct {
	// contains filtered or unexported fields
}

func (*NamespaceMetricCapture) CollectMetric added in v1.31.0

CollectMetric returns the recordings for the named metric that belong to the test namespace and that the caller chooses to keep. It panics if the requested metric is not namespace-scoped.

func (*NamespaceMetricCapture) Metric added in v1.31.0

type PersistenceTestBaseFactory

type PersistenceTestBaseFactory interface {
	NewTestBase(options *persistencetests.TestBaseOptions) *persistencetests.TestBase
}

type PollAndProcessWorkflowTaskOptionFunc

type PollAndProcessWorkflowTaskOptionFunc func(*PollAndProcessWorkflowTaskOptions)

type PollAndProcessWorkflowTaskOptions

type PollAndProcessWorkflowTaskOptions struct {
	DumpHistory          bool
	DumpCommands         bool
	DropTask             bool
	PollSticky           bool
	RespondSticky        bool
	ExpectedAttemptCount int
	Retries              int
	ForceNewWorkflowTask bool
	QueryResult          *querypb.WorkflowQueryResult
}

type PollAndProcessWorkflowTaskResponse

type PollAndProcessWorkflowTaskResponse struct {
	IsQueryTask bool
	NewTask     *workflowservice.RespondWorkflowTaskCompletedResponse
}

type RecordedTask added in v1.30.0

type RecordedTask struct {
	Timestamp   time.Time  `json:"timestamp"`
	TaskType    string     `json:"taskType"` // The specific task type (e.g., "TASK_TYPE_ACTIVITY_RETRY_TIMER")
	ShardID     int32      `json:"shardId"`
	RangeID     int64      `json:"rangeId,omitempty"`
	NamespaceID string     `json:"namespaceId"`
	WorkflowID  string     `json:"workflowId"`
	RunID       string     `json:"runId"`
	Task        tasks.Task `json:"task"` // The actual task object
}

RecordedTask wraps a task with metadata about when and where it was written

type ReplicationStreamRecorder added in v1.30.0

type ReplicationStreamRecorder struct {
	// contains filtered or unexported fields
}

ReplicationStreamRecorder captures replication stream messages for testing

func NewReplicationStreamRecorder added in v1.30.0

func NewReplicationStreamRecorder() *ReplicationStreamRecorder

func (*ReplicationStreamRecorder) Clear added in v1.30.0

func (r *ReplicationStreamRecorder) Clear()

func (*ReplicationStreamRecorder) GetMessages added in v1.30.0

func (*ReplicationStreamRecorder) SetOutputFile added in v1.30.0

func (r *ReplicationStreamRecorder) SetOutputFile(filePath string)

SetOutputFile sets the file path for writing captured messages on-demand

func (*ReplicationStreamRecorder) StreamInterceptor added in v1.30.0

func (r *ReplicationStreamRecorder) StreamInterceptor(clusterName string) grpc.StreamClientInterceptor

StreamInterceptor returns a gRPC stream client interceptor that captures stream messages

func (*ReplicationStreamRecorder) StreamServerInterceptor added in v1.30.0

func (r *ReplicationStreamRecorder) StreamServerInterceptor(clusterName string) grpc.StreamServerInterceptor

StreamServerInterceptor returns a gRPC stream server interceptor that captures stream messages

func (*ReplicationStreamRecorder) UnaryInterceptor added in v1.30.0

func (r *ReplicationStreamRecorder) UnaryInterceptor(clusterName string) grpc.UnaryClientInterceptor

UnaryInterceptor returns a gRPC unary client interceptor that captures messages

func (*ReplicationStreamRecorder) UnaryServerInterceptor added in v1.30.0

func (r *ReplicationStreamRecorder) UnaryServerInterceptor(clusterName string) grpc.UnaryServerInterceptor

UnaryServerInterceptor returns a gRPC unary server interceptor that captures messages

func (*ReplicationStreamRecorder) WriteToLog added in v1.30.0

func (r *ReplicationStreamRecorder) WriteToLog() error

WriteToLog writes all captured messages to the configured output file

type TaskFilter added in v1.30.0

type TaskFilter struct {
	NamespaceID string // Required: namespace ID to filter by
	WorkflowID  string // Optional: workflow ID to filter by (empty string means no filter)
	RunID       string // Optional: run ID to filter by (empty string means no filter)
}

TaskFilter specifies criteria for filtering recorded tasks

type TaskMatcher added in v1.30.0

type TaskMatcher func(RecordedTask) bool

TaskMatcher is a function that tests whether a RecordedTask matches some criteria

type TaskPoller deprecated

type TaskPoller struct {
	Client                       workflowservice.WorkflowServiceClient
	Namespace                    string
	TaskQueue                    *taskqueuepb.TaskQueue
	StickyTaskQueue              *taskqueuepb.TaskQueue
	StickyScheduleToStartTimeout time.Duration
	Identity                     string
	WorkflowTaskHandler          WorkflowTaskHandler
	ActivityTaskHandler          ActivityTaskHandler
	QueryHandler                 QueryHandler
	MessageHandler               MessageHandler
	Logger                       log.Logger
	T                            *testing.T
}

Deprecated: TaskPoller is deprecated. Use taskpoller.TaskPoller instead. TaskPoller is used in functional tests to poll workflow or activity task queues.

func (*TaskPoller) HandlePartialWorkflowTask

func (p *TaskPoller) HandlePartialWorkflowTask(response *workflowservice.PollWorkflowTaskQueueResponse, forceCreateNewWorkflowTask bool) (*workflowservice.RespondWorkflowTaskCompletedResponse, error)

HandlePartialWorkflowTask for workflow task

func (*TaskPoller) PollAndProcessActivityTask

func (p *TaskPoller) PollAndProcessActivityTask(dropTask bool) error

PollAndProcessActivityTask for activity tasks

func (*TaskPoller) PollAndProcessActivityTaskWithID

func (p *TaskPoller) PollAndProcessActivityTaskWithID(dropTask bool) error

PollAndProcessActivityTaskWithID is similar to PollAndProcessActivityTask but using RespondActivityTask...ByID

func (*TaskPoller) PollAndProcessWorkflowTask

func (p *TaskPoller) PollAndProcessWorkflowTask(funcs ...PollAndProcessWorkflowTaskOptionFunc) (res PollAndProcessWorkflowTaskResponse, err error)

func (*TaskPoller) PollAndProcessWorkflowTaskWithOptions

func (p *TaskPoller) PollAndProcessWorkflowTaskWithOptions(opts *PollAndProcessWorkflowTaskOptions) (res PollAndProcessWorkflowTaskResponse, err error)

type TaskQueueRecorder added in v1.30.0

type TaskQueueRecorder struct {
	// contains filtered or unexported fields
}

TaskQueueRecorder wraps an ExecutionManager to record ALL task writes to the history task queues (transfer, timer, replication, visibility, archival, etc.). This is useful for integration tests where you want to assert on what tasks were generated and in what order. Tasks are stored flattened by category - all tasks of the same type are in a single list, with each task wrapped with metadata about when/where it was written.

func NewTaskQueueRecorder added in v1.30.0

func NewTaskQueueRecorder(delegate persistence.ExecutionManager, logger log.Logger) *TaskQueueRecorder

NewTaskQueueRecorder creates a recorder that wraps the given ExecutionManager

func (*TaskQueueRecorder) AddHistoryTasks added in v1.30.0

func (r *TaskQueueRecorder) AddHistoryTasks(
	ctx context.Context,
	request *persistence.AddHistoryTasksRequest,
) error

AddHistoryTasks records the task write and then delegates to the underlying manager

func (*TaskQueueRecorder) AppendHistoryNodes added in v1.30.0

func (*TaskQueueRecorder) AppendRawHistoryNodes added in v1.30.0

func (*TaskQueueRecorder) Close added in v1.30.0

func (r *TaskQueueRecorder) Close()

func (*TaskQueueRecorder) CompleteHistoryTask added in v1.30.0

func (r *TaskQueueRecorder) CompleteHistoryTask(
	ctx context.Context,
	request *persistence.CompleteHistoryTaskRequest,
) error

func (*TaskQueueRecorder) ConflictResolveWorkflowExecution added in v1.30.0

func (*TaskQueueRecorder) CountMatchingTasks added in v1.30.0

func (r *TaskQueueRecorder) CountMatchingTasks(category tasks.Category, matcher TaskMatcher) int

CountMatchingTasks returns the count of tasks in a category that match the given matcher

func (*TaskQueueRecorder) CountTasksForNamespace added in v1.30.0

func (r *TaskQueueRecorder) CountTasksForNamespace(
	category tasks.Category,
	namespaceID string,
	matcher TaskMatcher,
) int

CountTasksForNamespace returns the count of tasks in a category for a specific namespace

func (*TaskQueueRecorder) CountTasksForWorkflow added in v1.30.0

func (r *TaskQueueRecorder) CountTasksForWorkflow(
	category tasks.Category,
	namespaceID string,
	workflowID string,
	runID string,
	matcher TaskMatcher,
) int

CountTasksForWorkflow returns the count of tasks in a category for a specific workflow If namespaceID is empty, it matches any namespace If runID is empty, it matches any runID for the given workflowID

func (*TaskQueueRecorder) CreateWorkflowExecution added in v1.30.0

func (*TaskQueueRecorder) DeleteCurrentWorkflowExecution added in v1.30.0

func (r *TaskQueueRecorder) DeleteCurrentWorkflowExecution(
	ctx context.Context,
	request *persistence.DeleteCurrentWorkflowExecutionRequest,
) error

func (*TaskQueueRecorder) DeleteHistoryBranch added in v1.30.0

func (r *TaskQueueRecorder) DeleteHistoryBranch(
	ctx context.Context,
	request *persistence.DeleteHistoryBranchRequest,
) error

func (*TaskQueueRecorder) DeleteReplicationTaskFromDLQ added in v1.30.0

func (r *TaskQueueRecorder) DeleteReplicationTaskFromDLQ(
	ctx context.Context,
	request *persistence.DeleteReplicationTaskFromDLQRequest,
) error

func (*TaskQueueRecorder) DeleteWorkflowExecution added in v1.30.0

func (r *TaskQueueRecorder) DeleteWorkflowExecution(
	ctx context.Context,
	request *persistence.DeleteWorkflowExecutionRequest,
) error

func (*TaskQueueRecorder) ForkHistoryBranch added in v1.30.0

func (*TaskQueueRecorder) GetAllHistoryTreeBranches added in v1.30.0

func (*TaskQueueRecorder) GetAllRecordedTasks added in v1.30.0

func (r *TaskQueueRecorder) GetAllRecordedTasks() map[tasks.Category][]RecordedTask

GetAllRecordedTasks returns all recorded tasks WITH metadata, grouped by category

func (*TaskQueueRecorder) GetAllTasks added in v1.30.0

func (r *TaskQueueRecorder) GetAllTasks() map[tasks.Category][]tasks.Task

GetAllTasks returns all tasks grouped by category (unwrapped, without metadata)

func (*TaskQueueRecorder) GetCurrentExecution added in v1.30.0

func (*TaskQueueRecorder) GetHistoryBranchUtil added in v1.30.0

func (r *TaskQueueRecorder) GetHistoryBranchUtil() persistence.HistoryBranchUtil

func (*TaskQueueRecorder) GetHistoryTasks added in v1.30.0

func (*TaskQueueRecorder) GetName added in v1.30.0

func (r *TaskQueueRecorder) GetName() string

func (*TaskQueueRecorder) GetRecordedTasksByCategoryFiltered added in v1.30.0

func (r *TaskQueueRecorder) GetRecordedTasksByCategoryFiltered(category tasks.Category, filter TaskFilter) []RecordedTask

GetRecordedTasksByCategoryFiltered returns recorded tasks WITH metadata for a specific category, filtered by namespace (required) and optionally by workflow ID and run ID. This is the preferred API for tests to ensure tasks are properly scoped.

func (*TaskQueueRecorder) GetReplicationTasksFromDLQ added in v1.30.0

func (*TaskQueueRecorder) GetWorkflowExecution added in v1.30.0

func (*TaskQueueRecorder) IsReplicationDLQEmpty added in v1.30.0

func (r *TaskQueueRecorder) IsReplicationDLQEmpty(
	ctx context.Context,
	request *persistence.GetReplicationTasksFromDLQRequest,
) (bool, error)

func (*TaskQueueRecorder) ListConcreteExecutions added in v1.30.0

func (*TaskQueueRecorder) MatchTasks added in v1.30.0

func (r *TaskQueueRecorder) MatchTasks(category tasks.Category, matcher TaskMatcher) []RecordedTask

MatchTasks returns all tasks in a category that match the given matcher function

func (*TaskQueueRecorder) MatchTasksForNamespace added in v1.30.0

func (r *TaskQueueRecorder) MatchTasksForNamespace(
	category tasks.Category,
	namespaceID string,
	matcher TaskMatcher,
) []RecordedTask

MatchTasksForNamespace returns all tasks in a category for a specific namespace

func (*TaskQueueRecorder) MatchTasksForWorkflow added in v1.30.0

func (r *TaskQueueRecorder) MatchTasksForWorkflow(
	category tasks.Category,
	namespaceID string,
	workflowID string,
	runID string,
	matcher TaskMatcher,
) []RecordedTask

MatchTasksForWorkflow returns all tasks in a category for a specific workflow If namespaceID is empty, it matches any namespace If runID is empty, it matches any runID for the given workflowID

func (*TaskQueueRecorder) PutReplicationTaskToDLQ added in v1.30.0

func (r *TaskQueueRecorder) PutReplicationTaskToDLQ(
	ctx context.Context,
	request *persistence.PutReplicationTaskToDLQRequest,
) error

func (*TaskQueueRecorder) RangeCompleteHistoryTasks added in v1.30.0

func (r *TaskQueueRecorder) RangeCompleteHistoryTasks(
	ctx context.Context,
	request *persistence.RangeCompleteHistoryTasksRequest,
) error

func (*TaskQueueRecorder) RangeDeleteReplicationTaskFromDLQ added in v1.30.0

func (r *TaskQueueRecorder) RangeDeleteReplicationTaskFromDLQ(
	ctx context.Context,
	request *persistence.RangeDeleteReplicationTaskFromDLQRequest,
) error

func (*TaskQueueRecorder) ReadHistoryBranch added in v1.30.0

func (*TaskQueueRecorder) ReadHistoryBranchByBatch added in v1.30.0

func (*TaskQueueRecorder) ReadHistoryBranchReverse added in v1.30.0

func (*TaskQueueRecorder) ReadRawHistoryBranch added in v1.30.0

func (*TaskQueueRecorder) SetWorkflowExecution added in v1.30.0

func (*TaskQueueRecorder) TrimHistoryBranch added in v1.30.0

func (*TaskQueueRecorder) UpdateWorkflowExecution added in v1.30.0

func (*TaskQueueRecorder) WriteToLog added in v1.30.0

func (r *TaskQueueRecorder) WriteToLog(filePath string) error

WriteToLog writes all captured tasks to a file in JSON format

type TemporalImpl

type TemporalImpl struct {
	// contains filtered or unexported fields
}

func (*TemporalImpl) AdminClient

func (c *TemporalImpl) AdminClient() adminservice.AdminServiceClient

func (*TemporalImpl) Authorize

func (*TemporalImpl) CaptureMetricsHandler deprecated

func (c *TemporalImpl) CaptureMetricsHandler() *metricstest.CaptureHandler

Deprecated: metric capture is cluster-global. Use (*TestEnv).StartGlobalMetricCapture() or (*TestEnv).StartNamespaceMetricCapture() instead.

func (*TemporalImpl) ChasmEngine added in v1.29.0

func (c *TemporalImpl) ChasmEngine() (chasm.Engine, error)

func (*TemporalImpl) ChasmVisibilityManager added in v1.30.0

func (c *TemporalImpl) ChasmVisibilityManager() chasm.VisibilityManager

func (*TemporalImpl) DcClient

func (c *TemporalImpl) DcClient() *dynamicconfig.MemoryClient

func (*TemporalImpl) FrontendClient

func (*TemporalImpl) FrontendGRPCAddress

func (c *TemporalImpl) FrontendGRPCAddress() string

func (*TemporalImpl) FrontendHTTPAddress

func (c *TemporalImpl) FrontendHTTPAddress() string

func (*TemporalImpl) GetCHASMRegistry added in v1.29.0

func (c *TemporalImpl) GetCHASMRegistry() *chasm.Registry

func (*TemporalImpl) GetClaims

func (c *TemporalImpl) GetClaims(authInfo *authorization.AuthInfo) (*authorization.Claims, error)

func (*TemporalImpl) GetExecutionManager

func (c *TemporalImpl) GetExecutionManager() persistence.ExecutionManager

func (*TemporalImpl) GetGrpcClientInterceptor added in v1.29.0

func (c *TemporalImpl) GetGrpcClientInterceptor() *grpcinject.Interceptor

func (*TemporalImpl) GetMetricsHandler

func (c *TemporalImpl) GetMetricsHandler() metrics.Handler

func (*TemporalImpl) GetTLSConfigProvider

func (c *TemporalImpl) GetTLSConfigProvider() encryption.TLSConfigProvider

func (*TemporalImpl) GetTaskCategoryRegistry

func (c *TemporalImpl) GetTaskCategoryRegistry() tasks.TaskCategoryRegistry

func (*TemporalImpl) GetTaskQueueRecorder added in v1.30.0

func (c *TemporalImpl) GetTaskQueueRecorder() *TaskQueueRecorder

func (*TemporalImpl) HistoryClient

func (*TemporalImpl) MatchingClient

func (*TemporalImpl) NamespaceRegistries added in v1.27.0

func (c *TemporalImpl) NamespaceRegistries() []namespace.Registry

func (*TemporalImpl) OperatorClient

func (*TemporalImpl) RemoteFrontendGRPCAddress

func (c *TemporalImpl) RemoteFrontendGRPCAddress() string

Use this to get an address for a remote cluster to connect to.

func (*TemporalImpl) SchedulerClient added in v1.31.0

func (c *TemporalImpl) SchedulerClient() schedulerpb.SchedulerServiceClient

func (*TemporalImpl) SetOnAuthorize

func (*TemporalImpl) SetOnGetClaims

func (c *TemporalImpl) SetOnGetClaims(fn func(*authorization.AuthInfo) (*authorization.Claims, error))

func (*TemporalImpl) SetTaskQueueRecorder added in v1.30.0

func (c *TemporalImpl) SetTaskQueueRecorder(recorder *TaskQueueRecorder)

func (*TemporalImpl) Start

func (c *TemporalImpl) Start() error

func (*TemporalImpl) Stop

func (c *TemporalImpl) Stop() error

func (*TemporalImpl) TlsConfigProvider

func (c *TemporalImpl) TlsConfigProvider() *encryption.FixedTLSConfigProvider

func (*TemporalImpl) WorkerGRPCAddress added in v1.31.0

func (c *TemporalImpl) WorkerGRPCAddress() string

type TemporalParams

type TemporalParams struct {
	ClusterMetadataConfig            *cluster.Config
	PersistenceConfig                config.Persistence
	MetadataMgr                      persistence.MetadataManager
	ClusterMetadataManager           persistence.ClusterMetadataManager
	ShardMgr                         persistence.ShardManager
	ExecutionManager                 persistence.ExecutionManager
	TaskMgr                          persistence.TaskManager
	NamespaceReplicationQueue        persistence.NamespaceReplicationQueue
	AbstractDataStoreFactory         persistenceClient.AbstractDataStoreFactory
	VisibilityStoreFactory           visibility.VisibilityStoreFactory
	Logger                           log.Logger
	ArchiverMetadata                 carchiver.ArchivalMetadata
	ArchiverProvider                 provider.ArchiverProvider
	EnableReadHistoryFromArchival    bool
	FrontendConfig                   FrontendConfig
	HistoryConfig                    HistoryConfig
	MatchingConfig                   MatchingConfig
	WorkerConfig                     WorkerConfig
	ESConfig                         *esclient.Config
	ESClient                         esclient.Client
	MockAdminClient                  map[string]adminservice.AdminServiceClient
	NamespaceReplicationTaskExecutor nsreplication.TaskExecutor
	DynamicConfigOverrides           map[dynamicconfig.Key]any
	TLSConfigProvider                *encryption.FixedTLSConfigProvider
	CaptureMetricsHandler            *metricstest.CaptureHandler
	// ServiceFxOptions is populated by WithFxOptionsForService.
	ServiceFxOptions         map[primitives.ServiceName][]fx.Option
	TaskCategoryRegistry     tasks.TaskCategoryRegistry
	HostsByProtocolByService map[transferProtocol]map[primitives.ServiceName]static.Hosts
	SpanExporters            map[telemetry.SpanExporterType]sdktrace.SpanExporter
}

TemporalParams contains everything needed to bootstrap Temporal

type TestCluster

type TestCluster struct {
	// contains filtered or unexported fields
}

TestCluster is a testcore struct for functional tests

func (*TestCluster) AdminClient

func (tc *TestCluster) AdminClient() adminservice.AdminServiceClient

func (*TestCluster) ArchiverBase added in v1.27.0

func (tc *TestCluster) ArchiverBase() *ArchiverBase

func (*TestCluster) ClusterName added in v1.27.0

func (tc *TestCluster) ClusterName() string

func (*TestCluster) ExecutionManager

func (tc *TestCluster) ExecutionManager() persistence.ExecutionManager

ExecutionManager returns an execution manager factory from the test cluster

func (*TestCluster) FrontendClient

func (tc *TestCluster) FrontendClient() workflowservice.WorkflowServiceClient

func (*TestCluster) GetReplicationStreamRecorder added in v1.30.0

func (tc *TestCluster) GetReplicationStreamRecorder() *ReplicationStreamRecorder

func (*TestCluster) GetTaskQueueRecorder added in v1.30.0

func (tc *TestCluster) GetTaskQueueRecorder() *TaskQueueRecorder

func (*TestCluster) HistoryClient

func (tc *TestCluster) HistoryClient() historyservice.HistoryServiceClient

HistoryClient returns a history client from the test cluster

func (*TestCluster) Host

func (tc *TestCluster) Host() *TemporalImpl

TODO (alex): expose only needed objects from TemporalImpl.

func (*TestCluster) MatchingClient

func (tc *TestCluster) MatchingClient() matchingservice.MatchingServiceClient

MatchingClient returns a matching client from the test cluster

func (*TestCluster) OperatorClient

func (tc *TestCluster) OperatorClient() operatorservice.OperatorServiceClient

func (*TestCluster) OverrideDynamicConfig

func (tc *TestCluster) OverrideDynamicConfig(t *testing.T, key dynamicconfig.GenericSetting, value any) (cleanup func())

func (*TestCluster) SchedulerClient added in v1.31.0

func (tc *TestCluster) SchedulerClient() schedulerpb.SchedulerServiceClient

SchedulerClient returns a scheduler client from the test cluster

func (*TestCluster) TearDownCluster

func (tc *TestCluster) TearDownCluster() error

TearDownCluster tears down the test cluster

func (*TestCluster) TestBase

func (tc *TestCluster) TestBase() *persistencetests.TestBase

TODO (alex): remove this method. Replace usages with concrete methods.

func (*TestCluster) WorkerGRPCAddress added in v1.31.0

func (tc *TestCluster) WorkerGRPCAddress() string

type TestClusterConfig

type TestClusterConfig struct {
	EnableArchival                  bool
	IsMasterCluster                 bool
	ClusterMetadata                 cluster.Config
	Persistence                     persistencetests.TestBaseOptions
	FrontendConfig                  FrontendConfig
	HistoryConfig                   HistoryConfig
	MatchingConfig                  MatchingConfig
	WorkerConfig                    WorkerConfig
	ESConfig                        *esclient.Config
	MockAdminClient                 map[string]adminservice.AdminServiceClient
	FaultInjection                  *config.FaultInjection
	DynamicConfigOverrides          map[dynamicconfig.Key]any
	EnableMTLS                      bool
	EnableMetricsCapture            bool
	SpanExporters                   map[telemetry.SpanExporterType]sdktrace.SpanExporter
	CustomHistoryArchiverFactory    provider.CustomHistoryArchiverFactory
	CustomVisibilityArchiverFactory provider.CustomVisibilityArchiverFactory
	// ServiceFxOptions can be populated using WithFxOptionsForService.
	ServiceFxOptions map[primitives.ServiceName][]fx.Option
}

TestClusterConfig are config for a test cluster

type TestClusterFactory

type TestClusterFactory interface {
	NewCluster(t *testing.T, clusterConfig *TestClusterConfig, logger log.Logger) (*TestCluster, error)
}

func NewTestClusterFactory

func NewTestClusterFactory() TestClusterFactory

func NewTestClusterFactoryWithCustomTestBaseFactory

func NewTestClusterFactoryWithCustomTestBaseFactory(tbFactory PersistenceTestBaseFactory) TestClusterFactory

type TestClusterOption added in v1.27.0

type TestClusterOption func(params *TestClusterParams)

func WithArchivalEnabled added in v1.27.0

func WithArchivalEnabled() TestClusterOption

func WithCustomHistoryArchiverFactory added in v1.31.0

func WithCustomHistoryArchiverFactory(factory provider.CustomHistoryArchiverFactory) TestClusterOption

func WithCustomVisibilityArchiverFactory added in v1.31.0

func WithCustomVisibilityArchiverFactory(factory provider.CustomVisibilityArchiverFactory) TestClusterOption

func WithDynamicConfigOverrides added in v1.27.0

func WithDynamicConfigOverrides(overrides map[dynamicconfig.Key]any) TestClusterOption

func WithFaultInjectionConfig added in v1.28.0

func WithFaultInjectionConfig(cfg *config.FaultInjection) TestClusterOption

func WithFxOptionsForService

func WithFxOptionsForService(serviceName primitives.ServiceName, options ...fx.Option) TestClusterOption

WithFxOptionsForService returns an Option which, when passed as an argument to setupSuite, will append the given list of fx options to the end of the arguments to the fx.New call for the given service. For example, if you want to obtain the shard controller for the history service, you can do this:

var shardController shard.Controller
s.setupSuite(t, tests.WithFxOptionsForService(primitives.HistoryService, fx.Populate(&shardController)))
// now you can use shardController during your test

This is similar to the pattern of plumbing dependencies through the TestClusterConfig, but it's much more convenient, scalable and flexible. The reason we need to do this on a per-service basis is that there are separate fx apps for each one.

func WithMTLS added in v1.28.0

func WithMTLS() TestClusterOption

func WithNumHistoryShards added in v1.28.0

func WithNumHistoryShards(n int32) TestClusterOption

func WithSharedCluster added in v1.31.0

func WithSharedCluster() TestClusterOption

type TestClusterParams

type TestClusterParams struct {
	ServiceOptions                  map[primitives.ServiceName][]fx.Option
	DynamicConfigOverrides          map[dynamicconfig.Key]any
	ArchivalEnabled                 bool
	EnableMTLS                      bool
	FaultInjectionConfig            *config.FaultInjection
	NumHistoryShards                int32
	SharedCluster                   bool
	CustomHistoryArchiverFactory    provider.CustomHistoryArchiverFactory
	CustomVisibilityArchiverFactory provider.CustomVisibilityArchiverFactory
}

TestClusterParams contains the variables which are used to configure test cluster via the TestClusterOption type.

func ApplyTestClusterOptions added in v1.27.0

func ApplyTestClusterOptions(options []TestClusterOption) TestClusterParams

type TestDataConverter

type TestDataConverter struct {
	NumOfCallToPayloads   int // for testing to know testDataConverter is called as expected
	NumOfCallFromPayloads int
}

TestDataConverter implements encoded.DataConverter using gob

func (*TestDataConverter) FromPayload

func (tdc *TestDataConverter) FromPayload(payload *commonpb.Payload, valuePtr any) error

func (*TestDataConverter) FromPayloads

func (tdc *TestDataConverter) FromPayloads(payloads *commonpb.Payloads, valuePtrs ...any) error

func (*TestDataConverter) ToPayload

func (tdc *TestDataConverter) ToPayload(value any) (*commonpb.Payload, error)

func (*TestDataConverter) ToPayloads

func (tdc *TestDataConverter) ToPayloads(values ...any) (*commonpb.Payloads, error)

func (*TestDataConverter) ToString

func (tdc *TestDataConverter) ToString(payload *commonpb.Payload) string

func (*TestDataConverter) ToStrings

func (tdc *TestDataConverter) ToStrings(payloads *commonpb.Payloads) []string

type TestEnv added in v1.31.0

type TestEnv struct {
	*FunctionalTestBase

	// Shadows FunctionalTestBase.Assertions with a per-test instance bound to
	// this TestEnv's own *testing.T, avoiding data races when parallel tests
	// share the same *FunctionalTestBase cluster.
	// TODO: remove once all tests are migrated to TestEnv (and no longer use FunctionalTestBase directly).
	*require.Assertions

	Logger log.Logger
	// contains filtered or unexported fields
}

func NewEnv added in v1.31.0

func NewEnv(t *testing.T, opts ...TestOption) *TestEnv

NewEnv creates a new test environment with access to a Temporal cluster.

func (*TestEnv) Context added in v1.31.0

func (e *TestEnv) Context() context.Context

Context returns the test-level timeout context with RPC version headers already included. This context will be canceled when the test timeout occurs. Use this directly for all RPC operations - no need to wrap with NewContext or add headers manually.

For custom timeouts, use:

ctx, cancel := context.WithTimeout(env.Context(), 10*time.Second)
defer cancel()

func (*TestEnv) Error added in v1.31.0

func (e *TestEnv) Error(err error, msgAndArgs ...any)

Error asserts that err is not nil. Deprecated: use require.Error with the parent test or suite instead. TODO: remove once all tests are migrated to TestEnv (and no longer use FunctionalTestBase directly).

func (*TestEnv) InjectHook added in v1.31.0

func (e *TestEnv) InjectHook(hook testhooks.Hook) (cleanup func())

InjectHook sets a test hook inside the cluster.

It auto-detects the scope from the hook: - For namespace-scoped hooks: scopes it to the test's namespace - For global hooks: requires a dedicated cluster (fails early if used on shared cluster)

func (*TestEnv) Namespace added in v1.31.0

func (e *TestEnv) Namespace() namespace.Name

Use test env-specific namespace here for test isolation.

func (*TestEnv) NamespaceID added in v1.31.0

func (e *TestEnv) NamespaceID() namespace.ID

func (*TestEnv) NoError added in v1.31.0

func (e *TestEnv) NoError(err error, msgAndArgs ...any)

NoError asserts that err is nil. Deprecated: use require.NoError with the parent test or suite instead. TODO: remove once all tests are migrated to TestEnv (and no longer use FunctionalTestBase directly).

func (*TestEnv) OverrideDynamicConfig added in v1.31.0

func (e *TestEnv) OverrideDynamicConfig(setting dynamicconfig.GenericSetting, value any) (cleanup func())

OverrideDynamicConfig overrides a dynamic config setting for the duration of this test. For settings that can be namespace-scoped, a namespace constraint is applied. All others cannot be applied to a shared cluster and require `WithDedicatedCluster`.

func (*TestEnv) Run added in v1.31.0

func (e *TestEnv) Run(name string, subtest func()) bool

Run executes a subtest. Deprecated: use the suite's Run method instead. TODO: remove once all tests are migrated to TestEnv (and no longer use FunctionalTestBase directly).

func (*TestEnv) SdkClient added in v1.31.0

func (e *TestEnv) SdkClient() sdkclient.Client

SdkClient returns the SDK client. It is lazily initialized on the first call.

func (*TestEnv) SdkWorker added in v1.31.0

func (e *TestEnv) SdkWorker() sdkworker.Worker

SdkWorker returns the SDK worker. It is lazily initialized on the first call.

func (*TestEnv) StartGlobalMetricCapture added in v1.31.0

func (e *TestEnv) StartGlobalMetricCapture() *GlobalMetricCapture

StartGlobalMetricCapture starts a cluster-global metrics capture for this test and automatically stops it during cleanup. Metric capture is cluster-global, so it is only safe on dedicated clusters. Misuse detection is best-effort and only applies to queried metrics that produced recordings.

func (*TestEnv) StartNamespaceMetricCapture added in v1.31.0

func (e *TestEnv) StartNamespaceMetricCapture() *NamespaceMetricCapture

StartNamespaceMetricCapture starts a metrics capture that only allows safe per-metric namespace filtering. Namespace captures are safe on shared clusters because reads are restricted to per-metric namespace-filtered iteration and reject non-namespaced metrics.

func (*TestEnv) T added in v1.31.0

func (e *TestEnv) T() *testing.T

T returns the *testing.T. Deprecated: use the suite's T() method instead.

func (*TestEnv) TaskPoller added in v1.31.0

func (e *TestEnv) TaskPoller() *taskpoller.TaskPoller

func (*TestEnv) Tv added in v1.31.0

func (e *TestEnv) Tv() *testvars.TestVars

func (*TestEnv) WorkerTaskQueue added in v1.31.0

func (e *TestEnv) WorkerTaskQueue() string

WorkerTaskQueue returns the task queue name used by the SDK Worker.

type TestOption added in v1.31.0

type TestOption func(*testOptions)

func WithDedicatedCluster added in v1.31.0

func WithDedicatedCluster() TestOption

WithDedicatedCluster requests a dedicated (non-shared) cluster for the test. Use this for tests that have cluster-global side effects.

func WithDynamicConfig added in v1.31.0

func WithDynamicConfig(setting dynamicconfig.GenericSetting, value any) TestOption

WithDynamicConfig overrides a dynamic config setting for the test. For settings that can be namespace-scoped, a namespace constraint is applied. For all others that require a dedicated cluster, this implies `WithDedicatedCluster`.

func WithSdkWorker deprecated added in v1.31.0

func WithSdkWorker() TestOption

Deprecated: this option is no longer required and will be removed once all callers have been updated.

func WithTimeout added in v1.31.0

func WithTimeout(duration time.Duration) TestOption

WithTimeout sets a custom timeout for the test. The test will fail if it runs longer than this duration. The timeout is multiplied by debug.TimeoutMultiplier when debugging. The TEMPORAL_TEST_TIMEOUT environment variable can also set the default timeout in seconds.

type WorkerConfig

type WorkerConfig struct {
	NumWorkers    int
	DisableWorker bool // overrides NumWorkers
}

WorkerConfig is the config for the worker service

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL