e2e

package
v0.0.0-...-30435ab Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 13, 2026 License: AGPL-3.0 Imports: 66 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func AddSuffix

func AddSuffix(s Suite, str string) string

func AttachSchema

func AttachSchema(s Suite, table string) string

func CompareTableSchemas

func CompareTableSchemas(x *protos.TableSchema, y *protos.TableSchema) bool

func ConsumeAllMessages

func ConsumeAllMessages(
	ctx context.Context,
	namespaceName string,
	eventhubName string,
	expectedNum int,
	config *protos.EventHubGroupConfig,
) ([]string, error)

consume all messages from the eventhub with the given name“. returns as a list of strings.

func CreatePeer

func CreatePeer(t *testing.T, peer *protos.Peer)

func CreateQRepWorkflowConfig

func CreateQRepWorkflowConfig(
	t *testing.T,
	flowJobName string,
	sourceTable string,
	dstTable string,
	query string,
	dest string,
	stagingPath string,
	setupDst bool,
	syncedAtCol string,
	isDeletedCol string,
) *protos.QRepConfig

func CreateTableForQRep

func CreateTableForQRep(ctx context.Context, conn *pgx.Conn, suffix string, tableName string) error

func DeleteEventhub

func DeleteEventhub(
	ctx context.Context,
	eventhubName string,
	eventhubConfig *protos.EventHubConfig,
) error

func EnvEqualRecordBatches

func EnvEqualRecordBatches(t *testing.T, env WorkflowRun, q *model.QRecordBatch, other *model.QRecordBatch)

func EnvEqualTables

func EnvEqualTables[TSource connectors.Connector](env WorkflowRun, suite RowSource, table string, cols string)

func EnvEqualTablesWithNames

func EnvEqualTablesWithNames(
	env WorkflowRun, suite RowSource, srcTable string, dstTable string, cols string,
)

func EnvGetRunID

func EnvGetRunID(t *testing.T, env WorkflowRun) string

func EnvGetWorkflowState

func EnvGetWorkflowState(t *testing.T, env WorkflowRun) cdc_state.CDCFlowWorkflowState

func EnvNoError

func EnvNoError(t *testing.T, env WorkflowRun, err error)

Helper function to assert errors in go routines running concurrent to workflows This achieves two goals: 1. cancel workflow to avoid waiting on goroutine which has failed 2. get around t.FailNow being incorrect when called from non initial goroutine

func EnvTrue

func EnvTrue(t *testing.T, env WorkflowRun, val bool)

func EnvWaitFor

func EnvWaitFor(t *testing.T, env WorkflowRun, timeout time.Duration, reason string, f func() bool)

func EnvWaitForCount

func EnvWaitForCount(
	env WorkflowRun,
	suite RowSource,
	reason string,
	dstTable string,
	cols string,
	expectedCount int,
)

func EnvWaitForEqualTables

func EnvWaitForEqualTables(
	env WorkflowRun,
	suite RowSource,
	reason string,
	table string,
	cols string,
)

func EnvWaitForEqualTablesWithNames

func EnvWaitForEqualTablesWithNames(
	env WorkflowRun,
	suite RowSource,
	reason string,
	srcTable string,
	dstTable string,
	cols string,
)

func EnvWaitForEqualTablesWithNames_Only

func EnvWaitForEqualTablesWithNames_Only(
	env WorkflowRun,
	suite RowSource,
	reason string,
	srcTable string,
	dstTable string,
	cols string,
)

func EnvWaitForFinished

func EnvWaitForFinished(t *testing.T, env WorkflowRun, timeout time.Duration)

func ExpectedDestinationIdentifier

func ExpectedDestinationIdentifier(s GenericSuite, ident string) string

func ExpectedDestinationTableName

func ExpectedDestinationTableName(s GenericSuite, table string) string

func GeneratePostgresPeer

func GeneratePostgresPeer(t *testing.T) *protos.Peer

func GetOwnersSchema

func GetOwnersSchema() *types.QRecordSchema

func GetOwnersSelectorStringsSF

func GetOwnersSelectorStringsSF() [2]string

func GetPostgresToxicProxy

func GetPostgresToxicProxy(t *testing.T, suffix string, port uint32) (*tp.Proxy, error)

GetPostgresToxicProxy gets or creates the PostgreSQL proxy

func GetTestDatabase

func GetTestDatabase(suffix string) string

func InitToxiproxy

func InitToxiproxy() error

InitToxiproxy initializes the Toxiproxy client (singleton pattern)

func NewApiClient

func NewApiClient() (protos.FlowServiceClient, error)

func NewTemporalClient

func NewTemporalClient(t *testing.T) client.Client

func PopulateSourceTable

func PopulateSourceTable(ctx context.Context, conn *pgx.Conn, suffix string, tableName string, rowCount int) error

func RequireEmptyDestinationTable

func RequireEmptyDestinationTable(suite RowSource, dstTable string, cols string)

func RequireEnvCanceled

func RequireEnvCanceled(t *testing.T, env WorkflowRun)

func RequireEqualRecordBatches

func RequireEqualRecordBatches(t *testing.T, q *model.QRecordBatch, other *model.QRecordBatch)

func RequireEqualTableSchemas

func RequireEqualTableSchemas(t *testing.T, expected *protos.TableSchema, actual *protos.TableSchema) bool

func RequireEqualTables

func RequireEqualTables(suite RowSource, table string, cols string)

func RequireEqualTablesWithNames

func RequireEqualTablesWithNames(suite RowSource, srcTable string, dstTable string, cols string)

func RevokePermissionForTableColumns

func RevokePermissionForTableColumns(ctx context.Context, conn *pgx.Conn, tableIdentifier string, selectedColumns []string) error

func Schema

func Schema(s Suite) string

func SetupCDCFlowStatusQuery

func SetupCDCFlowStatusQuery(t *testing.T, env WorkflowRun, config *protos.FlowConnectionConfigs)

func SetupClickHouseSuite

func SetupClickHouseSuite[TSource SuiteSource](
	t *testing.T,
	cluster bool,
	setupSource func(*testing.T) (TSource, string, error),
) func(*testing.T) ClickHouseSuite

func SetupElasticSuite

func SetupElasticSuite(t *testing.T) elasticsearchSuite

func SignalWorkflow

func SignalWorkflow[T any](ctx context.Context, env WorkflowRun, signal model.TypedSignal[T], value T)

func TableMappings

func TableMappings(s GenericSuite, tables ...string) []*protos.TableMapping

func TearDownPostgres

func TearDownPostgres(ctx context.Context, s Suite)

Types

type BigQueryTestHelper

type BigQueryTestHelper struct {
	Config *protos.BigqueryConfig
	// contains filtered or unexported fields
}

func NewBigQueryTestHelper

func NewBigQueryTestHelper(t *testing.T, datasetID string) (*BigQueryTestHelper, error)

NewBigQueryTestHelper creates a new BigQueryTestHelper.

func (*BigQueryTestHelper) CheckNull

func (b *BigQueryTestHelper) CheckNull(ctx context.Context, tableName string, colName []string) (bool, error)

returns whether the function errors or there are no nulls

func (*BigQueryTestHelper) CountObjectsInGCSPath

func (b *BigQueryTestHelper) CountObjectsInGCSPath(ctx context.Context, gcsPath string) (int, error)

CountObjectsInGCSPath counts the number of objects in a GCS path (gs://bucket/path/prefix)

func (*BigQueryTestHelper) DropDataset

func (b *BigQueryTestHelper) DropDataset(ctx context.Context, datasetName string) error

DropDataset drops the dataset.

func (*BigQueryTestHelper) ExecuteAndProcessQuery

func (b *BigQueryTestHelper) ExecuteAndProcessQuery(ctx context.Context, query string) (*model.QRecordBatch, error)

func (*BigQueryTestHelper) RecreateDataset

func (b *BigQueryTestHelper) RecreateDataset(ctx context.Context) error

RecreateDataset recreates the dataset, i.e, deletes it if exists and creates it again.

func (*BigQueryTestHelper) RunInt64Query

func (b *BigQueryTestHelper) RunInt64Query(ctx context.Context, query string) (int64, error)

func (*BigQueryTestHelper) SelectRow

func (b *BigQueryTestHelper) SelectRow(ctx context.Context, tableName string, cols ...string) ([]bigquery.Value, error)

check if NaN, Inf double values are null

type ClickHouseMVManager

type ClickHouseMVManager struct {
	// contains filtered or unexported fields
}

ClickHouseMVManager manages materialized views for ClickHouse testing

func (*ClickHouseMVManager) CreateBadMV

func (m *ClickHouseMVManager) CreateBadMV(ctx context.Context) error

CreateBadMV creates a materialized view on the configured table that will fail when data is inserted. Source schema is assumed to correspond to (id UInt64, val String).

func (*ClickHouseMVManager) DropBadMV

func (m *ClickHouseMVManager) DropBadMV(ctx context.Context) error

DropBadMV removes the materialized view and its target table

type ClickHouseSuite

type ClickHouseSuite struct {
	// contains filtered or unexported fields
}

func (ClickHouseSuite) Conn

func (s ClickHouseSuite) Conn() *pgx.Conn

func (ClickHouseSuite) Connector

func (ClickHouseSuite) CreateRMTTable

func (s ClickHouseSuite) CreateRMTTable(tableName string, columns []TestClickHouseColumn, orderingKey string) error

CreateRMTTable creates a ReplacingMergeTree table with the given name and columns.

func (ClickHouseSuite) DestinationConnector

func (s ClickHouseSuite) DestinationConnector() connectors.Connector

func (ClickHouseSuite) DestinationTable

func (s ClickHouseSuite) DestinationTable(table string) string

func (ClickHouseSuite) DropTable

func (s ClickHouseSuite) DropTable(tableName string) error

func (ClickHouseSuite) GetRows

func (s ClickHouseSuite) GetRows(table string, cols string) (*model.QRecordBatch, error)

func (ClickHouseSuite) NewMVManager

func (s ClickHouseSuite) NewMVManager(tableName string, suffix string) *ClickHouseMVManager

func (ClickHouseSuite) Peer

func (s ClickHouseSuite) Peer() *protos.Peer

func (ClickHouseSuite) PeerForDatabase

func (s ClickHouseSuite) PeerForDatabase(dbname string) *protos.Peer

func (ClickHouseSuite) Source

func (s ClickHouseSuite) Source() SuiteSource

func (ClickHouseSuite) Suffix

func (s ClickHouseSuite) Suffix() string

func (ClickHouseSuite) T

func (s ClickHouseSuite) T() *testing.T

func (ClickHouseSuite) Teardown

func (s ClickHouseSuite) Teardown(ctx context.Context)

type FlowConnectionGenerationConfig

type FlowConnectionGenerationConfig struct {
	FlowJobName      string
	TableNameMapping map[string]string
	Destination      string
	TableMappings    []*protos.TableMapping
	SoftDelete       bool
}

func (*FlowConnectionGenerationConfig) GenerateFlowConnectionConfigs

func (c *FlowConnectionGenerationConfig) GenerateFlowConnectionConfigs(s Suite) *protos.FlowConnectionConfigs

type GenericSuite

type GenericSuite interface {
	RowSource
	Peer() *protos.Peer
	DestinationConnector() connectors.Connector
	DestinationTable(table string) string
}

type MongoSource

type MongoSource struct {
	// contains filtered or unexported fields
}

func SetupMongo

func SetupMongo(t *testing.T, suffix string) (*MongoSource, error)

func (*MongoSource) AdminClient

func (s *MongoSource) AdminClient() *mongo.Client

func (*MongoSource) Connector

func (s *MongoSource) Connector() connectors.Connector

func (*MongoSource) Exec

func (s *MongoSource) Exec(ctx context.Context, sql string) error

func (*MongoSource) GeneratePeer

func (s *MongoSource) GeneratePeer(t *testing.T) *protos.Peer

func (*MongoSource) GetRows

func (s *MongoSource) GetRows(ctx context.Context, suffix, table, cols string) (*model.QRecordBatch, error)

func (*MongoSource) Teardown

func (s *MongoSource) Teardown(t *testing.T, ctx context.Context, suffix string)

type MySqlSource

type MySqlSource struct {
	*connmysql.MySqlConnector
	Config *protos.MySqlConfig
}

func SetupMyCore

func SetupMyCore(t *testing.T, suffix string, replication protos.MySqlReplicationMechanism, flavor protos.MySqlFlavor) (*MySqlSource, error)

func SetupMySQL

func SetupMySQL(t *testing.T, suffix string) (*MySqlSource, error)

func (*MySqlSource) Connector

func (s *MySqlSource) Connector() connectors.Connector

func (*MySqlSource) Exec

func (s *MySqlSource) Exec(ctx context.Context, sql string) error

func (*MySqlSource) GeneratePeer

func (s *MySqlSource) GeneratePeer(t *testing.T) *protos.Peer

func (*MySqlSource) GetRows

func (s *MySqlSource) GetRows(ctx context.Context, suffix string, table string, cols string) (*model.QRecordBatch, error)

func (*MySqlSource) Teardown

func (s *MySqlSource) Teardown(t *testing.T, ctx context.Context, suffix string)

type PeerFlowE2ETestSuiteBQ

type PeerFlowE2ETestSuiteBQ struct {
	// contains filtered or unexported fields
}

func SetupBigquerySuite

func SetupBigquerySuite(t *testing.T) PeerFlowE2ETestSuiteBQ

func (PeerFlowE2ETestSuiteBQ) Conn

func (s PeerFlowE2ETestSuiteBQ) Conn() *pgx.Conn

func (PeerFlowE2ETestSuiteBQ) Connector

func (PeerFlowE2ETestSuiteBQ) DestinationConnector

func (s PeerFlowE2ETestSuiteBQ) DestinationConnector() connectors.Connector

func (PeerFlowE2ETestSuiteBQ) DestinationTable

func (s PeerFlowE2ETestSuiteBQ) DestinationTable(table string) string

func (PeerFlowE2ETestSuiteBQ) GetRows

func (s PeerFlowE2ETestSuiteBQ) GetRows(tableName string, colsString string) (*model.QRecordBatch, error)

func (PeerFlowE2ETestSuiteBQ) GetRowsWhere

func (s PeerFlowE2ETestSuiteBQ) GetRowsWhere(tableName string, colsString string, where string) (*model.QRecordBatch, error)

func (PeerFlowE2ETestSuiteBQ) Peer

func (PeerFlowE2ETestSuiteBQ) Source

func (PeerFlowE2ETestSuiteBQ) Suffix

func (s PeerFlowE2ETestSuiteBQ) Suffix() string

func (PeerFlowE2ETestSuiteBQ) T

func (PeerFlowE2ETestSuiteBQ) Teardown

func (s PeerFlowE2ETestSuiteBQ) Teardown(ctx context.Context)

type PeerFlowE2ETestSuitePG

type PeerFlowE2ETestSuitePG struct {
	// contains filtered or unexported fields
}

func SetupPostgresSuite

func SetupPostgresSuite(t *testing.T) PeerFlowE2ETestSuitePG

func (PeerFlowE2ETestSuitePG) Conn

func (s PeerFlowE2ETestSuitePG) Conn() *pgx.Conn

func (PeerFlowE2ETestSuitePG) Connector

func (PeerFlowE2ETestSuitePG) DestinationConnector

func (s PeerFlowE2ETestSuitePG) DestinationConnector() connectors.Connector

func (PeerFlowE2ETestSuitePG) DestinationTable

func (s PeerFlowE2ETestSuitePG) DestinationTable(table string) string

func (PeerFlowE2ETestSuitePG) Exec

func (PeerFlowE2ETestSuitePG) GetRows

func (s PeerFlowE2ETestSuitePG) GetRows(table string, cols string) (*model.QRecordBatch, error)

func (PeerFlowE2ETestSuitePG) Peer

func (PeerFlowE2ETestSuitePG) Source

func (PeerFlowE2ETestSuitePG) Suffix

func (s PeerFlowE2ETestSuitePG) Suffix() string

func (PeerFlowE2ETestSuitePG) T

func (PeerFlowE2ETestSuitePG) Teardown

func (s PeerFlowE2ETestSuitePG) Teardown(ctx context.Context)

type PeerFlowE2ETestSuiteSF

type PeerFlowE2ETestSuiteSF struct {
	// contains filtered or unexported fields
}

func SetupSnowflakeSuite

func SetupSnowflakeSuite(t *testing.T) PeerFlowE2ETestSuiteSF

func (PeerFlowE2ETestSuiteSF) Conn

func (s PeerFlowE2ETestSuiteSF) Conn() *pgx.Conn

func (PeerFlowE2ETestSuiteSF) Connector

func (PeerFlowE2ETestSuiteSF) DestinationConnector

func (s PeerFlowE2ETestSuiteSF) DestinationConnector() connectors.Connector

func (PeerFlowE2ETestSuiteSF) DestinationTable

func (s PeerFlowE2ETestSuiteSF) DestinationTable(table string) string

func (PeerFlowE2ETestSuiteSF) GetRows

func (s PeerFlowE2ETestSuiteSF) GetRows(tableName string, sfSelector string) (*model.QRecordBatch, error)

func (PeerFlowE2ETestSuiteSF) Peer

func (PeerFlowE2ETestSuiteSF) Source

func (PeerFlowE2ETestSuiteSF) Suffix

func (s PeerFlowE2ETestSuiteSF) Suffix() string

func (PeerFlowE2ETestSuiteSF) T

func (PeerFlowE2ETestSuiteSF) Teardown

func (s PeerFlowE2ETestSuiteSF) Teardown(ctx context.Context)

type PostgresSource

type PostgresSource struct {
	*connpostgres.PostgresConnector
}

func SetupPostgres

func SetupPostgres(t *testing.T, suffix string) (*PostgresSource, error)

func SetupPostgresWithToxiproxy

func SetupPostgresWithToxiproxy(t *testing.T, suffix string, port uint32) (*PostgresSource, *tp.Proxy, error)

SetupPostgresWithToxiproxy creates a PostgreSQL source that connects through Toxiproxy

func (*PostgresSource) Connector

func (s *PostgresSource) Connector() connectors.Connector

func (*PostgresSource) Exec

func (s *PostgresSource) Exec(ctx context.Context, sql string) error

func (*PostgresSource) GeneratePeer

func (s *PostgresSource) GeneratePeer(t *testing.T) *protos.Peer

func (*PostgresSource) GetLogCount

func (s *PostgresSource) GetLogCount(ctx context.Context, flowJobName, errorType, pattern string) (int, error)

func (*PostgresSource) GetRows

func (s *PostgresSource) GetRows(ctx context.Context, suffix string, table string, cols string) (*model.QRecordBatch, error)

func (*PostgresSource) GetRowsOnly

func (s *PostgresSource) GetRowsOnly(ctx context.Context, suffix string, table string, cols string) (*model.QRecordBatch, error)

to avoid fetching rows from "child" tables ala Postgres table inheritance

func (*PostgresSource) Query

func (s *PostgresSource) Query(ctx context.Context, query string) (*model.QRecordBatch, error)

func (*PostgresSource) Teardown

func (s *PostgresSource) Teardown(t *testing.T, ctx context.Context, suffix string)

type RowSource

type RowSource interface {
	Suite
	GetRows(table, cols string) (*model.QRecordBatch, error)
}

type S3Environment

type S3Environment int
const (
	Aws S3Environment = iota
	Gcs
	Minio
	MinioTls
)

type S3PeerCredentials

type S3PeerCredentials struct {
	AccessKeyID     string `json:"accessKeyId"`
	SecretAccessKey string `json:"secretAccessKey"`
	AwsRoleArn      string `json:"awsRoleArn"`
	SessionToken    string `json:"sessionToken"`
	Region          string `json:"region"`
	Endpoint        string `json:"endpoint"`
}

type S3TestHelper

type S3TestHelper struct {
	S3Config   *protos.S3Config
	BucketName string
	// contains filtered or unexported fields
}

func NewS3TestHelper

func NewS3TestHelper(ctx context.Context, s3environment S3Environment) (*S3TestHelper, error)

func (*S3TestHelper) CleanUp

func (h *S3TestHelper) CleanUp(ctx context.Context) error

Delete all generated objects during the test

func (*S3TestHelper) ListAllFiles

func (h *S3TestHelper) ListAllFiles(
	ctx context.Context,
	jobName string,
) ([]s3types.Object, error)

List all files from the S3 bucket. returns as a list of S3Objects.

type SnowflakeTestHelper

type SnowflakeTestHelper struct {
	// config is the Snowflake config.
	Config *protos.SnowflakeConfig
	// contains filtered or unexported fields
}

func NewSnowflakeTestHelper

func NewSnowflakeTestHelper(t *testing.T) (*SnowflakeTestHelper, error)

func (*SnowflakeTestHelper) CheckNull

func (s *SnowflakeTestHelper) CheckNull(ctx context.Context, tableName string, colNames []string) (bool, error)

func (*SnowflakeTestHelper) Cleanup

func (s *SnowflakeTestHelper) Cleanup(ctx context.Context) error

Cleanup drops the database.

func (*SnowflakeTestHelper) CountNonNullRows

func (s *SnowflakeTestHelper) CountNonNullRows(ctx context.Context, tableName string, columnName string) (int64, error)

CountRows(tableName) returns the non-null number of rows in the given table.

func (*SnowflakeTestHelper) CountRows

func (s *SnowflakeTestHelper) CountRows(ctx context.Context, tableName string) (int64, error)

CountRows(tableName) returns the number of rows in the given table.

func (*SnowflakeTestHelper) CountSRIDs

func (s *SnowflakeTestHelper) CountSRIDs(ctx context.Context, tableName string, columnName string) (int64, error)

func (*SnowflakeTestHelper) ExecuteAndProcessQuery

func (s *SnowflakeTestHelper) ExecuteAndProcessQuery(ctx context.Context, query string) (*model.QRecordBatch, error)

func (*SnowflakeTestHelper) RunCommand

func (s *SnowflakeTestHelper) RunCommand(ctx context.Context, command string) error

RunCommand runs the given command.

func (*SnowflakeTestHelper) RunIntQuery

func (s *SnowflakeTestHelper) RunIntQuery(ctx context.Context, query string) (int, error)

runs a query that returns an int result

type Suite

type Suite interface {
	e2eshared.Suite
	T() *testing.T
	Connector() *connpostgres.PostgresConnector
	Suffix() string
	Source() SuiteSource
}

type SuiteSource

type SuiteSource interface {
	Teardown(t *testing.T, ctx context.Context, suffix string)
	GeneratePeer(t *testing.T) *protos.Peer
	Connector() connectors.Connector
	Exec(ctx context.Context, sql string) error
	GetRows(ctx context.Context, suffix, table, cols string) (*model.QRecordBatch, error)
}

type TestClickHouseColumn

type TestClickHouseColumn struct {
	Name string
	Type string
}

type WorkflowRun

type WorkflowRun struct {
	client.WorkflowRun
	// contains filtered or unexported fields
}

func ExecuteDropFlow

func ExecuteDropFlow(ctx context.Context, tc client.Client, config *protos.FlowConnectionConfigs, tableMappingsVersion int64) WorkflowRun

func ExecutePeerflow

func ExecutePeerflow(t *testing.T, tc client.Client, config *protos.FlowConnectionConfigs) WorkflowRun

func ExecuteWorkflow

func ExecuteWorkflow(ctx context.Context, tc client.Client, taskQueueID shared.TaskQueueID, wf any, args ...any) WorkflowRun

func GetPeerflow

func GetPeerflow(ctx context.Context, catalog *pgx.Conn, tc client.Client, flowName string) (WorkflowRun, error)

func RunQRepFlowWorkflow

func RunQRepFlowWorkflow(t *testing.T, tc client.Client, config *protos.QRepConfig) WorkflowRun

func (WorkflowRun) Cancel

func (env WorkflowRun) Cancel(ctx context.Context)

func (WorkflowRun) Error

func (env WorkflowRun) Error(ctx context.Context) error

func (WorkflowRun) Finished

func (env WorkflowRun) Finished(ctx context.Context) bool

func (WorkflowRun) GetFlowStatus

func (env WorkflowRun) GetFlowStatus(t *testing.T) protos.FlowStatus

func (WorkflowRun) Query

func (env WorkflowRun) Query(ctx context.Context, queryType string, args ...any) (converter.EncodedValue, error)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL