testcore

package
v1.30.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 4, 2026 License: MIT Imports: 118 Imported by: 0

Documentation

Index

Constants

View Source
const (
	DefaultPageSize   = 5
	PprofTestPort     = 7000
	TlsCertCommonName = "my-common-name"
	ClientSuiteLimit  = 10
	// TODO (alex): replace all sleeps with WaitForESToSettle with s.Eventually()
	WaitForESToSettle = 4 * time.Second // wait es shards for some time ensure data consistent
)
View Source
const (
	DirectionSend       = "send"
	DirectionRecv       = "recv"
	DirectionServerSend = "server_send"
	DirectionServerRecv = "server_recv"
)

Message direction constants

View Source
const NamespaceCacheRefreshInterval = time.Second

Variables

View Source
var (
	ErrEncodingIsNotSet       = errors.New("payload encoding metadata is not set")
	ErrEncodingIsNotSupported = errors.New("payload encoding is not supported")
)
View Source
var (
	ErrNoTasks = errors.New("no tasks")
)

Functions

func DecodeString

func DecodeString(t require.TestingT, pls *commonpb.Payloads) string

func EventBatchesToVersionHistory

func EventBatchesToVersionHistory(
	versionHistory *historyspb.VersionHistory,
	eventBatches []*historypb.History,
) (*historyspb.VersionHistory, error)

func ExtractReplicationMessages added in v1.30.0

func ExtractReplicationMessages(msg proto.Message) *replicationspb.WorkflowReplicationMessages

ExtractReplicationMessages extracts WorkflowReplicationMessages from a proto message. This is a helper for tests that need to inspect replication message contents.

func NewContext

func NewContext() context.Context

NewContext create new context with default timeout 90 seconds.

func NewTestDataConverter

func NewTestDataConverter() converter.DataConverter

TODO (alex): use it by default SdkCleint everywhere?

func RandomizeStr

func RandomizeStr(id string) string

func RandomizedNexusEndpoint

func RandomizedNexusEndpoint(name string) string

func UseCassandraPersistence added in v1.29.0

func UseCassandraPersistence() bool

func UseSQLVisibility added in v1.27.0

func UseSQLVisibility() bool

func WithDumpHistory

func WithDumpHistory(o *PollAndProcessWorkflowTaskOptions)

func WithForceNewWorkflowTask

func WithForceNewWorkflowTask(o *PollAndProcessWorkflowTaskOptions)

func WithNoDumpCommands

func WithNoDumpCommands(o *PollAndProcessWorkflowTaskOptions)

func WithPollSticky

func WithPollSticky(o *PollAndProcessWorkflowTaskOptions)

func WithRespondSticky

func WithRespondSticky(o *PollAndProcessWorkflowTaskOptions)

func WithoutRetries

func WithoutRetries(o *PollAndProcessWorkflowTaskOptions)

Types

type ArchiverBase

type ArchiverBase struct {
	// contains filtered or unexported fields
}

ArchiverBase is a testcore struct for archiver provider being used in functional tests

func (*ArchiverBase) HistoryURI

func (a *ArchiverBase) HistoryURI() string

func (*ArchiverBase) Metadata

func (a *ArchiverBase) Metadata() archiver.ArchivalMetadata

func (*ArchiverBase) Provider

func (a *ArchiverBase) Provider() provider.ArchiverProvider

func (*ArchiverBase) VisibilityURI

func (a *ArchiverBase) VisibilityURI() string

type CapturedReplicationMessage added in v1.30.0

type CapturedReplicationMessage struct {
	Timestamp     string          `json:"timestamp"`
	Method        string          `json:"method"`
	Direction     string          `json:"direction"`
	ClusterName   string          `json:"clusterName"`
	TargetAddress string          `json:"targetAddress"`
	MessageType   string          `json:"messageType"`
	IsStreamCall  bool            `json:"isStreamCall"`
	Request       proto.Message   `json:"-"` // Don't marshal directly
	Response      proto.Message   `json:"-"` // Don't marshal directly
	Message       json.RawMessage `json:"message,omitempty"`
}

CapturedReplicationMessage represents a captured replication message

type FrontendConfig

type FrontendConfig struct {
	NumFrontendHosts int
}

FrontendConfig is the config for the frontend service

type FunctionalTestBase

type FunctionalTestBase struct {
	suite.Suite

	// `suite.Suite` embeds `*assert.Assertions` which, by default, makes all asserts (like `s.NoError(err)`)
	// only log the error, continue test execution, and only then fail the test.
	// This is not desired behavior in most cases. The idiomatic way to change this behavior
	// is to replace `*assert.Assertions` with `*require.Assertions` by embedding it in every test suite
	// (or base struct of every test suite).
	*require.Assertions

	protorequire.ProtoAssertions
	historyrequire.HistoryRequire
	updateutils.UpdateUtils

	Logger log.Logger
	// contains filtered or unexported fields
}

func (*FunctionalTestBase) AdminClient

func (*FunctionalTestBase) DecodePayloadsByteSliceInt32

func (s *FunctionalTestBase) DecodePayloadsByteSliceInt32(ps *commonpb.Payloads) (r int32)

func (*FunctionalTestBase) DecodePayloadsInt

func (s *FunctionalTestBase) DecodePayloadsInt(ps *commonpb.Payloads) int

func (*FunctionalTestBase) DecodePayloadsString

func (s *FunctionalTestBase) DecodePayloadsString(ps *commonpb.Payloads) string

func (*FunctionalTestBase) DurationNear

func (s *FunctionalTestBase) DurationNear(value, target, tolerance time.Duration)

func (*FunctionalTestBase) ExternalNamespace added in v1.29.0

func (s *FunctionalTestBase) ExternalNamespace() namespace.Name

func (*FunctionalTestBase) FrontendClient

func (*FunctionalTestBase) FrontendGRPCAddress

func (s *FunctionalTestBase) FrontendGRPCAddress() string

func (*FunctionalTestBase) GetHistory

func (s *FunctionalTestBase) GetHistory(namespace string, execution *commonpb.WorkflowExecution) []*historypb.HistoryEvent

func (*FunctionalTestBase) GetHistoryFunc

func (s *FunctionalTestBase) GetHistoryFunc(namespace string, execution *commonpb.WorkflowExecution) func() []*historypb.HistoryEvent

func (*FunctionalTestBase) GetNamespaceID

func (s *FunctionalTestBase) GetNamespaceID(namespace string) string

func (*FunctionalTestBase) GetTestCluster

func (s *FunctionalTestBase) GetTestCluster() *TestCluster

func (*FunctionalTestBase) GetTestClusterConfig

func (s *FunctionalTestBase) GetTestClusterConfig() *TestClusterConfig

func (*FunctionalTestBase) HttpAPIAddress

func (s *FunctionalTestBase) HttpAPIAddress() string

func (*FunctionalTestBase) InjectHook added in v1.27.0

func (s *FunctionalTestBase) InjectHook(key testhooks.Key, value any) (cleanup func())

func (*FunctionalTestBase) MarkNamespaceAsDeleted added in v1.27.0

func (s *FunctionalTestBase) MarkNamespaceAsDeleted(
	nsName namespace.Name,
) error

func (*FunctionalTestBase) Namespace

func (s *FunctionalTestBase) Namespace() namespace.Name

func (*FunctionalTestBase) NamespaceID added in v1.27.0

func (s *FunctionalTestBase) NamespaceID() namespace.ID

func (*FunctionalTestBase) OperatorClient

func (*FunctionalTestBase) OverrideDynamicConfig

func (s *FunctionalTestBase) OverrideDynamicConfig(setting dynamicconfig.GenericSetting, value any) (cleanup func())

Overrides one dynamic config setting for the duration of this test (or sub-test). The change will automatically be reverted at the end of the test (using t.Cleanup). The cleanup function is also returned if you want to revert the change before the end of the test.

func (*FunctionalTestBase) RegisterNamespace added in v1.27.0

func (s *FunctionalTestBase) RegisterNamespace(
	nsName namespace.Name,
	retentionDays int32,
	archivalState enumspb.ArchivalState,
	historyArchivalURI string,
	visibilityArchivalURI string,
) (namespace.ID, error)

Register namespace using persistence API because:

  1. The Retention period is set to 0 for archival tests, and this can't be done through FE,
  2. Update search attributes would require an extra API call,
  3. One more extra API call would be necessary to get namespace.ID.

func (*FunctionalTestBase) RunTestWithMatchingBehavior

func (s *FunctionalTestBase) RunTestWithMatchingBehavior(subtest func())

func (*FunctionalTestBase) SdkClient added in v1.28.0

func (s *FunctionalTestBase) SdkClient() sdkclient.Client

func (*FunctionalTestBase) SendSignal added in v1.27.0

func (s *FunctionalTestBase) SendSignal(nsName string, execution *commonpb.WorkflowExecution, signalName string,
	input *commonpb.Payloads, identity string) error

TODO (alex): change to nsName namespace.Name

func (*FunctionalTestBase) SetupSubTest added in v1.27.0

func (s *FunctionalTestBase) SetupSubTest()

func (*FunctionalTestBase) SetupSuite

func (s *FunctionalTestBase) SetupSuite()

func (*FunctionalTestBase) SetupSuiteWithCluster added in v1.27.0

func (s *FunctionalTestBase) SetupSuiteWithCluster(options ...TestClusterOption)

func (*FunctionalTestBase) SetupTest

func (s *FunctionalTestBase) SetupTest()

All test suites that inherit FunctionalTestBase and overwrite SetupTest must call this testcore FunctionalTestBase.SetupTest function to distribute the tests into partitions. Otherwise, the test suite will be executed multiple times in each partition.

func (*FunctionalTestBase) TaskPoller added in v1.28.0

func (s *FunctionalTestBase) TaskPoller() *taskpoller.TaskPoller

func (*FunctionalTestBase) TaskQueue added in v1.28.0

func (s *FunctionalTestBase) TaskQueue() string

func (*FunctionalTestBase) TearDownCluster added in v1.27.0

func (s *FunctionalTestBase) TearDownCluster()

func (*FunctionalTestBase) TearDownSubTest added in v1.28.0

func (s *FunctionalTestBase) TearDownSubTest()

**IMPORTANT**: When overridding this, make sure to invoke `s.FunctionalTestBase.TearDownSubTest()`.

func (*FunctionalTestBase) TearDownSuite

func (s *FunctionalTestBase) TearDownSuite()

func (*FunctionalTestBase) TearDownTest added in v1.28.0

func (s *FunctionalTestBase) TearDownTest()

**IMPORTANT**: When overridding this, make sure to invoke `s.FunctionalTestBase.TearDownTest()`.

func (*FunctionalTestBase) WaitForChannel added in v1.27.0

func (s *FunctionalTestBase) WaitForChannel(ctx context.Context, ch chan struct{})

func (*FunctionalTestBase) Worker added in v1.28.0

func (s *FunctionalTestBase) Worker() sdkworker.Worker

type HistoryConfig

type HistoryConfig struct {
	NumHistoryShards int32
	NumHistoryHosts  int
}

HistoryConfig contains configs for history service

type MatchingConfig

type MatchingConfig struct {
	NumMatchingHosts int
}

MatchingConfig is the config for the matching service

type PersistenceTestBaseFactory

type PersistenceTestBaseFactory interface {
	NewTestBase(options *persistencetests.TestBaseOptions) *persistencetests.TestBase
}

type PollAndProcessWorkflowTaskOptionFunc

type PollAndProcessWorkflowTaskOptionFunc func(*PollAndProcessWorkflowTaskOptions)

type PollAndProcessWorkflowTaskOptions

type PollAndProcessWorkflowTaskOptions struct {
	DumpHistory          bool
	DumpCommands         bool
	DropTask             bool
	PollSticky           bool
	RespondSticky        bool
	ExpectedAttemptCount int
	Retries              int
	ForceNewWorkflowTask bool
	QueryResult          *querypb.WorkflowQueryResult
}

type PollAndProcessWorkflowTaskResponse

type PollAndProcessWorkflowTaskResponse struct {
	IsQueryTask bool
	NewTask     *workflowservice.RespondWorkflowTaskCompletedResponse
}

type RecordedTask added in v1.30.0

type RecordedTask struct {
	Timestamp   time.Time  `json:"timestamp"`
	TaskType    string     `json:"taskType"` // The specific task type (e.g., "TASK_TYPE_ACTIVITY_RETRY_TIMER")
	ShardID     int32      `json:"shardId"`
	RangeID     int64      `json:"rangeId,omitempty"`
	NamespaceID string     `json:"namespaceId"`
	WorkflowID  string     `json:"workflowId"`
	RunID       string     `json:"runId"`
	Task        tasks.Task `json:"task"` // The actual task object
}

RecordedTask wraps a task with metadata about when and where it was written

type ReplicationStreamRecorder added in v1.30.0

type ReplicationStreamRecorder struct {
	// contains filtered or unexported fields
}

ReplicationStreamRecorder captures replication stream messages for testing

func NewReplicationStreamRecorder added in v1.30.0

func NewReplicationStreamRecorder() *ReplicationStreamRecorder

func (*ReplicationStreamRecorder) Clear added in v1.30.0

func (r *ReplicationStreamRecorder) Clear()

func (*ReplicationStreamRecorder) GetMessages added in v1.30.0

func (*ReplicationStreamRecorder) SetOutputFile added in v1.30.0

func (r *ReplicationStreamRecorder) SetOutputFile(filePath string)

SetOutputFile sets the file path for writing captured messages on-demand

func (*ReplicationStreamRecorder) StreamInterceptor added in v1.30.0

func (r *ReplicationStreamRecorder) StreamInterceptor(clusterName string) grpc.StreamClientInterceptor

StreamInterceptor returns a gRPC stream client interceptor that captures stream messages

func (*ReplicationStreamRecorder) StreamServerInterceptor added in v1.30.0

func (r *ReplicationStreamRecorder) StreamServerInterceptor(clusterName string) grpc.StreamServerInterceptor

StreamServerInterceptor returns a gRPC stream server interceptor that captures stream messages

func (*ReplicationStreamRecorder) UnaryInterceptor added in v1.30.0

func (r *ReplicationStreamRecorder) UnaryInterceptor(clusterName string) grpc.UnaryClientInterceptor

UnaryInterceptor returns a gRPC unary client interceptor that captures messages

func (*ReplicationStreamRecorder) UnaryServerInterceptor added in v1.30.0

func (r *ReplicationStreamRecorder) UnaryServerInterceptor(clusterName string) grpc.UnaryServerInterceptor

UnaryServerInterceptor returns a gRPC unary server interceptor that captures messages

func (*ReplicationStreamRecorder) WriteToLog added in v1.30.0

func (r *ReplicationStreamRecorder) WriteToLog() error

WriteToLog writes all captured messages to the configured output file

type TaskFilter added in v1.30.0

type TaskFilter struct {
	NamespaceID string // Required: namespace ID to filter by
	WorkflowID  string // Optional: workflow ID to filter by (empty string means no filter)
	RunID       string // Optional: run ID to filter by (empty string means no filter)
}

TaskFilter specifies criteria for filtering recorded tasks

type TaskMatcher added in v1.30.0

type TaskMatcher func(RecordedTask) bool

TaskMatcher is a function that tests whether a RecordedTask matches some criteria

type TaskPoller deprecated

type TaskPoller struct {
	Client                       workflowservice.WorkflowServiceClient
	Namespace                    string
	TaskQueue                    *taskqueuepb.TaskQueue
	StickyTaskQueue              *taskqueuepb.TaskQueue
	StickyScheduleToStartTimeout time.Duration
	Identity                     string
	WorkflowTaskHandler          WorkflowTaskHandler
	ActivityTaskHandler          ActivityTaskHandler
	QueryHandler                 QueryHandler
	MessageHandler               MessageHandler
	Logger                       log.Logger
	T                            *testing.T
}

Deprecated: TaskPoller is deprecated. Use taskpoller.TaskPoller instead. TaskPoller is used in functional tests to poll workflow or activity task queues.

func (*TaskPoller) HandlePartialWorkflowTask

func (p *TaskPoller) HandlePartialWorkflowTask(response *workflowservice.PollWorkflowTaskQueueResponse, forceCreateNewWorkflowTask bool) (*workflowservice.RespondWorkflowTaskCompletedResponse, error)

HandlePartialWorkflowTask for workflow task

func (*TaskPoller) PollAndProcessActivityTask

func (p *TaskPoller) PollAndProcessActivityTask(dropTask bool) error

PollAndProcessActivityTask for activity tasks

func (*TaskPoller) PollAndProcessActivityTaskWithID

func (p *TaskPoller) PollAndProcessActivityTaskWithID(dropTask bool) error

PollAndProcessActivityTaskWithID is similar to PollAndProcessActivityTask but using RespondActivityTask...ByID

func (*TaskPoller) PollAndProcessWorkflowTask

func (p *TaskPoller) PollAndProcessWorkflowTask(funcs ...PollAndProcessWorkflowTaskOptionFunc) (res PollAndProcessWorkflowTaskResponse, err error)

func (*TaskPoller) PollAndProcessWorkflowTaskWithOptions

func (p *TaskPoller) PollAndProcessWorkflowTaskWithOptions(opts *PollAndProcessWorkflowTaskOptions) (res PollAndProcessWorkflowTaskResponse, err error)

type TaskQueueRecorder added in v1.30.0

type TaskQueueRecorder struct {
	// contains filtered or unexported fields
}

TaskQueueRecorder wraps an ExecutionManager to record ALL task writes to the history task queues (transfer, timer, replication, visibility, archival, etc.). This is useful for integration tests where you want to assert on what tasks were generated and in what order. Tasks are stored flattened by category - all tasks of the same type are in a single list, with each task wrapped with metadata about when/where it was written.

func NewTaskQueueRecorder added in v1.30.0

func NewTaskQueueRecorder(delegate persistence.ExecutionManager, logger log.Logger) *TaskQueueRecorder

NewTaskQueueRecorder creates a recorder that wraps the given ExecutionManager

func (*TaskQueueRecorder) AddHistoryTasks added in v1.30.0

func (r *TaskQueueRecorder) AddHistoryTasks(
	ctx context.Context,
	request *persistence.AddHistoryTasksRequest,
) error

AddHistoryTasks records the task write and then delegates to the underlying manager

func (*TaskQueueRecorder) AppendHistoryNodes added in v1.30.0

func (*TaskQueueRecorder) AppendRawHistoryNodes added in v1.30.0

func (*TaskQueueRecorder) Close added in v1.30.0

func (r *TaskQueueRecorder) Close()

func (*TaskQueueRecorder) CompleteHistoryTask added in v1.30.0

func (r *TaskQueueRecorder) CompleteHistoryTask(
	ctx context.Context,
	request *persistence.CompleteHistoryTaskRequest,
) error

func (*TaskQueueRecorder) ConflictResolveWorkflowExecution added in v1.30.0

func (*TaskQueueRecorder) CountMatchingTasks added in v1.30.0

func (r *TaskQueueRecorder) CountMatchingTasks(category tasks.Category, matcher TaskMatcher) int

CountMatchingTasks returns the count of tasks in a category that match the given matcher

func (*TaskQueueRecorder) CountTasksForNamespace added in v1.30.0

func (r *TaskQueueRecorder) CountTasksForNamespace(
	category tasks.Category,
	namespaceID string,
	matcher TaskMatcher,
) int

CountTasksForNamespace returns the count of tasks in a category for a specific namespace

func (*TaskQueueRecorder) CountTasksForWorkflow added in v1.30.0

func (r *TaskQueueRecorder) CountTasksForWorkflow(
	category tasks.Category,
	namespaceID string,
	workflowID string,
	runID string,
	matcher TaskMatcher,
) int

CountTasksForWorkflow returns the count of tasks in a category for a specific workflow If namespaceID is empty, it matches any namespace If runID is empty, it matches any runID for the given workflowID

func (*TaskQueueRecorder) CreateWorkflowExecution added in v1.30.0

func (*TaskQueueRecorder) DeleteCurrentWorkflowExecution added in v1.30.0

func (r *TaskQueueRecorder) DeleteCurrentWorkflowExecution(
	ctx context.Context,
	request *persistence.DeleteCurrentWorkflowExecutionRequest,
) error

func (*TaskQueueRecorder) DeleteHistoryBranch added in v1.30.0

func (r *TaskQueueRecorder) DeleteHistoryBranch(
	ctx context.Context,
	request *persistence.DeleteHistoryBranchRequest,
) error

func (*TaskQueueRecorder) DeleteReplicationTaskFromDLQ added in v1.30.0

func (r *TaskQueueRecorder) DeleteReplicationTaskFromDLQ(
	ctx context.Context,
	request *persistence.DeleteReplicationTaskFromDLQRequest,
) error

func (*TaskQueueRecorder) DeleteWorkflowExecution added in v1.30.0

func (r *TaskQueueRecorder) DeleteWorkflowExecution(
	ctx context.Context,
	request *persistence.DeleteWorkflowExecutionRequest,
) error

func (*TaskQueueRecorder) ForkHistoryBranch added in v1.30.0

func (*TaskQueueRecorder) GetAllHistoryTreeBranches added in v1.30.0

func (*TaskQueueRecorder) GetAllRecordedTasks added in v1.30.0

func (r *TaskQueueRecorder) GetAllRecordedTasks() map[tasks.Category][]RecordedTask

GetAllRecordedTasks returns all recorded tasks WITH metadata, grouped by category

func (*TaskQueueRecorder) GetAllTasks added in v1.30.0

func (r *TaskQueueRecorder) GetAllTasks() map[tasks.Category][]tasks.Task

GetAllTasks returns all tasks grouped by category (unwrapped, without metadata)

func (*TaskQueueRecorder) GetCurrentExecution added in v1.30.0

func (*TaskQueueRecorder) GetHistoryBranchUtil added in v1.30.0

func (r *TaskQueueRecorder) GetHistoryBranchUtil() persistence.HistoryBranchUtil

func (*TaskQueueRecorder) GetHistoryTasks added in v1.30.0

func (*TaskQueueRecorder) GetName added in v1.30.0

func (r *TaskQueueRecorder) GetName() string

func (*TaskQueueRecorder) GetRecordedTasksByCategoryFiltered added in v1.30.0

func (r *TaskQueueRecorder) GetRecordedTasksByCategoryFiltered(category tasks.Category, filter TaskFilter) []RecordedTask

GetRecordedTasksByCategoryFiltered returns recorded tasks WITH metadata for a specific category, filtered by namespace (required) and optionally by workflow ID and run ID. This is the preferred API for tests to ensure tasks are properly scoped.

func (*TaskQueueRecorder) GetReplicationTasksFromDLQ added in v1.30.0

func (*TaskQueueRecorder) GetWorkflowExecution added in v1.30.0

func (*TaskQueueRecorder) IsReplicationDLQEmpty added in v1.30.0

func (r *TaskQueueRecorder) IsReplicationDLQEmpty(
	ctx context.Context,
	request *persistence.GetReplicationTasksFromDLQRequest,
) (bool, error)

func (*TaskQueueRecorder) ListConcreteExecutions added in v1.30.0

func (*TaskQueueRecorder) MatchTasks added in v1.30.0

func (r *TaskQueueRecorder) MatchTasks(category tasks.Category, matcher TaskMatcher) []RecordedTask

MatchTasks returns all tasks in a category that match the given matcher function

func (*TaskQueueRecorder) MatchTasksForNamespace added in v1.30.0

func (r *TaskQueueRecorder) MatchTasksForNamespace(
	category tasks.Category,
	namespaceID string,
	matcher TaskMatcher,
) []RecordedTask

MatchTasksForNamespace returns all tasks in a category for a specific namespace

func (*TaskQueueRecorder) MatchTasksForWorkflow added in v1.30.0

func (r *TaskQueueRecorder) MatchTasksForWorkflow(
	category tasks.Category,
	namespaceID string,
	workflowID string,
	runID string,
	matcher TaskMatcher,
) []RecordedTask

MatchTasksForWorkflow returns all tasks in a category for a specific workflow If namespaceID is empty, it matches any namespace If runID is empty, it matches any runID for the given workflowID

func (*TaskQueueRecorder) PutReplicationTaskToDLQ added in v1.30.0

func (r *TaskQueueRecorder) PutReplicationTaskToDLQ(
	ctx context.Context,
	request *persistence.PutReplicationTaskToDLQRequest,
) error

func (*TaskQueueRecorder) RangeCompleteHistoryTasks added in v1.30.0

func (r *TaskQueueRecorder) RangeCompleteHistoryTasks(
	ctx context.Context,
	request *persistence.RangeCompleteHistoryTasksRequest,
) error

func (*TaskQueueRecorder) RangeDeleteReplicationTaskFromDLQ added in v1.30.0

func (r *TaskQueueRecorder) RangeDeleteReplicationTaskFromDLQ(
	ctx context.Context,
	request *persistence.RangeDeleteReplicationTaskFromDLQRequest,
) error

func (*TaskQueueRecorder) ReadHistoryBranch added in v1.30.0

func (*TaskQueueRecorder) ReadHistoryBranchByBatch added in v1.30.0

func (*TaskQueueRecorder) ReadHistoryBranchReverse added in v1.30.0

func (*TaskQueueRecorder) ReadRawHistoryBranch added in v1.30.0

func (*TaskQueueRecorder) SetWorkflowExecution added in v1.30.0

func (*TaskQueueRecorder) TrimHistoryBranch added in v1.30.0

func (*TaskQueueRecorder) UpdateWorkflowExecution added in v1.30.0

func (*TaskQueueRecorder) WriteToLog added in v1.30.0

func (r *TaskQueueRecorder) WriteToLog(filePath string) error

WriteToLog writes all captured tasks to a file in JSON format

type TemporalImpl

type TemporalImpl struct {
	// contains filtered or unexported fields
}

func (*TemporalImpl) AdminClient

func (c *TemporalImpl) AdminClient() adminservice.AdminServiceClient

func (*TemporalImpl) Authorize

func (*TemporalImpl) CaptureMetricsHandler

func (c *TemporalImpl) CaptureMetricsHandler() *metricstest.CaptureHandler

func (*TemporalImpl) ChasmEngine added in v1.29.0

func (c *TemporalImpl) ChasmEngine() (chasm.Engine, error)

func (*TemporalImpl) ChasmVisibilityManager added in v1.30.0

func (c *TemporalImpl) ChasmVisibilityManager() chasm.VisibilityManager

func (*TemporalImpl) DcClient

func (c *TemporalImpl) DcClient() *dynamicconfig.MemoryClient

func (*TemporalImpl) FrontendClient

func (*TemporalImpl) FrontendGRPCAddress

func (c *TemporalImpl) FrontendGRPCAddress() string

func (*TemporalImpl) FrontendHTTPAddress

func (c *TemporalImpl) FrontendHTTPAddress() string

func (*TemporalImpl) GetCHASMRegistry added in v1.29.0

func (c *TemporalImpl) GetCHASMRegistry() *chasm.Registry

func (*TemporalImpl) GetClaims

func (c *TemporalImpl) GetClaims(authInfo *authorization.AuthInfo) (*authorization.Claims, error)

func (*TemporalImpl) GetExecutionManager

func (c *TemporalImpl) GetExecutionManager() persistence.ExecutionManager

func (*TemporalImpl) GetGrpcClientInterceptor added in v1.29.0

func (c *TemporalImpl) GetGrpcClientInterceptor() *grpcinject.Interceptor

func (*TemporalImpl) GetMetricsHandler

func (c *TemporalImpl) GetMetricsHandler() metrics.Handler

func (*TemporalImpl) GetTLSConfigProvider

func (c *TemporalImpl) GetTLSConfigProvider() encryption.TLSConfigProvider

func (*TemporalImpl) GetTaskCategoryRegistry

func (c *TemporalImpl) GetTaskCategoryRegistry() tasks.TaskCategoryRegistry

func (*TemporalImpl) GetTaskQueueRecorder added in v1.30.0

func (c *TemporalImpl) GetTaskQueueRecorder() *TaskQueueRecorder

func (*TemporalImpl) HistoryClient

func (*TemporalImpl) MatchingClient

func (*TemporalImpl) NamespaceRegistries added in v1.27.0

func (c *TemporalImpl) NamespaceRegistries() []namespace.Registry

func (*TemporalImpl) OperatorClient

func (*TemporalImpl) RemoteFrontendGRPCAddress

func (c *TemporalImpl) RemoteFrontendGRPCAddress() string

Use this to get an address for a remote cluster to connect to.

func (*TemporalImpl) SetOnAuthorize

func (*TemporalImpl) SetOnGetClaims

func (c *TemporalImpl) SetOnGetClaims(fn func(*authorization.AuthInfo) (*authorization.Claims, error))

func (*TemporalImpl) SetTaskQueueRecorder added in v1.30.0

func (c *TemporalImpl) SetTaskQueueRecorder(recorder *TaskQueueRecorder)

func (*TemporalImpl) Start

func (c *TemporalImpl) Start() error

func (*TemporalImpl) Stop

func (c *TemporalImpl) Stop() error

func (*TemporalImpl) TlsConfigProvider

func (c *TemporalImpl) TlsConfigProvider() *encryption.FixedTLSConfigProvider

type TemporalParams

type TemporalParams struct {
	ClusterMetadataConfig            *cluster.Config
	PersistenceConfig                config.Persistence
	MetadataMgr                      persistence.MetadataManager
	ClusterMetadataManager           persistence.ClusterMetadataManager
	ShardMgr                         persistence.ShardManager
	ExecutionManager                 persistence.ExecutionManager
	TaskMgr                          persistence.TaskManager
	NamespaceReplicationQueue        persistence.NamespaceReplicationQueue
	AbstractDataStoreFactory         persistenceClient.AbstractDataStoreFactory
	VisibilityStoreFactory           visibility.VisibilityStoreFactory
	Logger                           log.Logger
	ArchiverMetadata                 carchiver.ArchivalMetadata
	ArchiverProvider                 provider.ArchiverProvider
	EnableReadHistoryFromArchival    bool
	FrontendConfig                   FrontendConfig
	HistoryConfig                    HistoryConfig
	MatchingConfig                   MatchingConfig
	WorkerConfig                     WorkerConfig
	ESConfig                         *esclient.Config
	ESClient                         esclient.Client
	MockAdminClient                  map[string]adminservice.AdminServiceClient
	NamespaceReplicationTaskExecutor nsreplication.TaskExecutor
	DynamicConfigOverrides           map[dynamicconfig.Key]interface{}
	TLSConfigProvider                *encryption.FixedTLSConfigProvider
	CaptureMetricsHandler            *metricstest.CaptureHandler
	// ServiceFxOptions is populated by WithFxOptionsForService.
	ServiceFxOptions         map[primitives.ServiceName][]fx.Option
	TaskCategoryRegistry     tasks.TaskCategoryRegistry
	HostsByProtocolByService map[transferProtocol]map[primitives.ServiceName]static.Hosts
	SpanExporters            map[telemetry.SpanExporterType]sdktrace.SpanExporter
}

TemporalParams contains everything needed to bootstrap Temporal

type TestCluster

type TestCluster struct {
	// contains filtered or unexported fields
}

TestCluster is a testcore struct for functional tests

func (*TestCluster) AdminClient

func (tc *TestCluster) AdminClient() adminservice.AdminServiceClient

func (*TestCluster) ArchiverBase added in v1.27.0

func (tc *TestCluster) ArchiverBase() *ArchiverBase

func (*TestCluster) ClusterName added in v1.27.0

func (tc *TestCluster) ClusterName() string

func (*TestCluster) ExecutionManager

func (tc *TestCluster) ExecutionManager() persistence.ExecutionManager

ExecutionManager returns an execution manager factory from the test cluster

func (*TestCluster) FrontendClient

func (tc *TestCluster) FrontendClient() workflowservice.WorkflowServiceClient

func (*TestCluster) GetReplicationStreamRecorder added in v1.30.0

func (tc *TestCluster) GetReplicationStreamRecorder() *ReplicationStreamRecorder

func (*TestCluster) GetTaskQueueRecorder added in v1.30.0

func (tc *TestCluster) GetTaskQueueRecorder() *TaskQueueRecorder

func (*TestCluster) HistoryClient

func (tc *TestCluster) HistoryClient() historyservice.HistoryServiceClient

HistoryClient returns a history client from the test cluster

func (*TestCluster) Host

func (tc *TestCluster) Host() *TemporalImpl

TODO (alex): expose only needed objects from TemporalImpl.

func (*TestCluster) MatchingClient

func (tc *TestCluster) MatchingClient() matchingservice.MatchingServiceClient

MatchingClient returns a matching client from the test cluster

func (*TestCluster) OperatorClient

func (tc *TestCluster) OperatorClient() operatorservice.OperatorServiceClient

func (*TestCluster) OverrideDynamicConfig

func (tc *TestCluster) OverrideDynamicConfig(t *testing.T, key dynamicconfig.GenericSetting, value any) (cleanup func())

func (*TestCluster) TearDownCluster

func (tc *TestCluster) TearDownCluster() error

TearDownCluster tears down the test cluster

func (*TestCluster) TestBase

func (tc *TestCluster) TestBase() *persistencetests.TestBase

TODO (alex): remove this method. Replace usages with concrete methods.

type TestClusterConfig

type TestClusterConfig struct {
	EnableArchival         bool
	IsMasterCluster        bool
	ClusterMetadata        cluster.Config
	Persistence            persistencetests.TestBaseOptions
	FrontendConfig         FrontendConfig
	HistoryConfig          HistoryConfig
	MatchingConfig         MatchingConfig
	WorkerConfig           WorkerConfig
	ESConfig               *esclient.Config
	MockAdminClient        map[string]adminservice.AdminServiceClient
	FaultInjection         *config.FaultInjection
	DynamicConfigOverrides map[dynamicconfig.Key]any
	EnableMTLS             bool
	EnableMetricsCapture   bool
	SpanExporters          map[telemetry.SpanExporterType]sdktrace.SpanExporter
	// ServiceFxOptions can be populated using WithFxOptionsForService.
	ServiceFxOptions map[primitives.ServiceName][]fx.Option
}

TestClusterConfig are config for a test cluster

type TestClusterFactory

type TestClusterFactory interface {
	NewCluster(t *testing.T, clusterConfig *TestClusterConfig, logger log.Logger) (*TestCluster, error)
}

func NewTestClusterFactory

func NewTestClusterFactory() TestClusterFactory

func NewTestClusterFactoryWithCustomTestBaseFactory

func NewTestClusterFactoryWithCustomTestBaseFactory(tbFactory PersistenceTestBaseFactory) TestClusterFactory

type TestClusterOption added in v1.27.0

type TestClusterOption func(params *TestClusterParams)

func WithArchivalEnabled added in v1.27.0

func WithArchivalEnabled() TestClusterOption

func WithDynamicConfigOverrides added in v1.27.0

func WithDynamicConfigOverrides(overrides map[dynamicconfig.Key]any) TestClusterOption

func WithFaultInjectionConfig added in v1.28.0

func WithFaultInjectionConfig(cfg *config.FaultInjection) TestClusterOption

func WithFxOptionsForService

func WithFxOptionsForService(serviceName primitives.ServiceName, options ...fx.Option) TestClusterOption

WithFxOptionsForService returns an Option which, when passed as an argument to setupSuite, will append the given list of fx options to the end of the arguments to the fx.New call for the given service. For example, if you want to obtain the shard controller for the history service, you can do this:

var shardController shard.Controller
s.setupSuite(t, tests.WithFxOptionsForService(primitives.HistoryService, fx.Populate(&shardController)))
// now you can use shardController during your test

This is similar to the pattern of plumbing dependencies through the TestClusterConfig, but it's much more convenient, scalable and flexible. The reason we need to do this on a per-service basis is that there are separate fx apps for each one.

func WithMTLS added in v1.28.0

func WithMTLS() TestClusterOption

func WithNumHistoryShards added in v1.28.0

func WithNumHistoryShards(n int32) TestClusterOption

type TestClusterParams

type TestClusterParams struct {
	ServiceOptions         map[primitives.ServiceName][]fx.Option
	DynamicConfigOverrides map[dynamicconfig.Key]any
	ArchivalEnabled        bool
	EnableMTLS             bool
	FaultInjectionConfig   *config.FaultInjection
	NumHistoryShards       int32
}

TestClusterParams contains the variables which are used to configure test cluster via the TestClusterOption type.

func ApplyTestClusterOptions added in v1.27.0

func ApplyTestClusterOptions(options []TestClusterOption) TestClusterParams

type TestDataConverter

type TestDataConverter struct {
	NumOfCallToPayloads   int // for testing to know testDataConverter is called as expected
	NumOfCallFromPayloads int
}

TestDataConverter implements encoded.DataConverter using gob

func (*TestDataConverter) FromPayload

func (tdc *TestDataConverter) FromPayload(payload *commonpb.Payload, valuePtr interface{}) error

func (*TestDataConverter) FromPayloads

func (tdc *TestDataConverter) FromPayloads(payloads *commonpb.Payloads, valuePtrs ...interface{}) error

func (*TestDataConverter) ToPayload

func (tdc *TestDataConverter) ToPayload(value interface{}) (*commonpb.Payload, error)

func (*TestDataConverter) ToPayloads

func (tdc *TestDataConverter) ToPayloads(values ...interface{}) (*commonpb.Payloads, error)

func (*TestDataConverter) ToString

func (tdc *TestDataConverter) ToString(payload *commonpb.Payload) string

func (*TestDataConverter) ToStrings

func (tdc *TestDataConverter) ToStrings(payloads *commonpb.Payloads) []string

type WorkerConfig

type WorkerConfig struct {
	NumWorkers    int
	DisableWorker bool // overrides NumWorkers
}

WorkerConfig is the config for the worker service

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL