Documentation
¶
Index ¶
- func AddSuffix(s Suite, str string) string
- func AttachSchema(s Suite, table string) string
- func CompareTableSchemas(x *protos.TableSchema, y *protos.TableSchema) bool
- func ConsumeAllMessages(ctx context.Context, namespaceName string, eventhubName string, ...) ([]string, error)
- func CreatePeer(t *testing.T, peer *protos.Peer)
- func CreateQRepWorkflowConfig(t *testing.T, flowJobName string, sourceTable string, dstTable string, ...) *protos.QRepConfig
- func CreateTableForQRep(ctx context.Context, conn *pgx.Conn, suffix string, tableName string) error
- func DeleteEventhub(ctx context.Context, eventhubName string, ...) error
- func EnvEqualRecordBatches(t *testing.T, env WorkflowRun, q *model.QRecordBatch, ...)
- func EnvEqualTables[TSource connectors.Connector](env WorkflowRun, suite RowSource, table string, cols string)
- func EnvEqualTablesWithNames(env WorkflowRun, suite RowSource, srcTable string, dstTable string, ...)
- func EnvGetRunID(t *testing.T, env WorkflowRun) string
- func EnvGetWorkflowState(t *testing.T, env WorkflowRun) cdc_state.CDCFlowWorkflowState
- func EnvNoError(t *testing.T, env WorkflowRun, err error)
- func EnvTrue(t *testing.T, env WorkflowRun, val bool)
- func EnvWaitFor(t *testing.T, env WorkflowRun, timeout time.Duration, reason string, ...)
- func EnvWaitForCount(env WorkflowRun, suite RowSource, reason string, dstTable string, cols string, ...)
- func EnvWaitForEqualTables(env WorkflowRun, suite RowSource, reason string, table string, cols string)
- func EnvWaitForEqualTablesWithNames(env WorkflowRun, suite RowSource, reason string, srcTable string, ...)
- func EnvWaitForEqualTablesWithNames_Only(env WorkflowRun, suite RowSource, reason string, srcTable string, ...)
- func EnvWaitForFinished(t *testing.T, env WorkflowRun, timeout time.Duration)
- func ExpectedDestinationIdentifier(s GenericSuite, ident string) string
- func ExpectedDestinationTableName(s GenericSuite, table string) string
- func GeneratePostgresPeer(t *testing.T) *protos.Peer
- func GetOwnersSchema() *types.QRecordSchema
- func GetOwnersSelectorStringsSF() [2]string
- func GetPostgresToxicProxy(t *testing.T, suffix string, port uint32) (*tp.Proxy, error)
- func GetTestDatabase(suffix string) string
- func InitToxiproxy() error
- func NewApiClient() (protos.FlowServiceClient, error)
- func NewTemporalClient(t *testing.T) client.Client
- func PopulateSourceTable(ctx context.Context, conn *pgx.Conn, suffix string, tableName string, ...) error
- func RequireEmptyDestinationTable(suite RowSource, dstTable string, cols string)
- func RequireEnvCanceled(t *testing.T, env WorkflowRun)
- func RequireEqualRecordBatches(t *testing.T, q *model.QRecordBatch, other *model.QRecordBatch)
- func RequireEqualTableSchemas(t *testing.T, expected *protos.TableSchema, actual *protos.TableSchema) bool
- func RequireEqualTables(suite RowSource, table string, cols string)
- func RequireEqualTablesWithNames(suite RowSource, srcTable string, dstTable string, cols string)
- func RevokePermissionForTableColumns(ctx context.Context, conn *pgx.Conn, tableIdentifier string, ...) error
- func Schema(s Suite) string
- func SetupCDCFlowStatusQuery(t *testing.T, env WorkflowRun, config *protos.FlowConnectionConfigs)
- func SetupClickHouseSuite[TSource SuiteSource](t *testing.T, cluster bool, ...) func(*testing.T) ClickHouseSuite
- func SetupElasticSuite(t *testing.T) elasticsearchSuite
- func SignalWorkflow[T any](ctx context.Context, env WorkflowRun, signal model.TypedSignal[T], value T)
- func TableMappings(s GenericSuite, tables ...string) []*protos.TableMapping
- func TearDownPostgres(ctx context.Context, s Suite)
- type BigQueryTestHelper
- func (b *BigQueryTestHelper) CheckNull(ctx context.Context, tableName string, colName []string) (bool, error)
- func (b *BigQueryTestHelper) CountObjectsInGCSPath(ctx context.Context, gcsPath string) (int, error)
- func (b *BigQueryTestHelper) DropDataset(ctx context.Context, datasetName string) error
- func (b *BigQueryTestHelper) ExecuteAndProcessQuery(ctx context.Context, query string) (*model.QRecordBatch, error)
- func (b *BigQueryTestHelper) RecreateDataset(ctx context.Context) error
- func (b *BigQueryTestHelper) RunInt64Query(ctx context.Context, query string) (int64, error)
- func (b *BigQueryTestHelper) SelectRow(ctx context.Context, tableName string, cols ...string) ([]bigquery.Value, error)
- type ClickHouseMVManager
- type ClickHouseSuite
- func (s ClickHouseSuite) Conn() *pgx.Conn
- func (s ClickHouseSuite) Connector() *connpostgres.PostgresConnector
- func (s ClickHouseSuite) CreateRMTTable(tableName string, columns []TestClickHouseColumn, orderingKey string) error
- func (s ClickHouseSuite) DestinationConnector() connectors.Connector
- func (s ClickHouseSuite) DestinationTable(table string) string
- func (s ClickHouseSuite) DropTable(tableName string) error
- func (s ClickHouseSuite) GetRows(table string, cols string) (*model.QRecordBatch, error)
- func (s ClickHouseSuite) NewMVManager(tableName string, suffix string) *ClickHouseMVManager
- func (s ClickHouseSuite) Peer() *protos.Peer
- func (s ClickHouseSuite) PeerForDatabase(dbname string) *protos.Peer
- func (s ClickHouseSuite) Source() SuiteSource
- func (s ClickHouseSuite) Suffix() string
- func (s ClickHouseSuite) T() *testing.T
- func (s ClickHouseSuite) Teardown(ctx context.Context)
- type FlowConnectionGenerationConfig
- type GenericSuite
- type MongoSource
- func (s *MongoSource) AdminClient() *mongo.Client
- func (s *MongoSource) Connector() connectors.Connector
- func (s *MongoSource) Exec(ctx context.Context, sql string) error
- func (s *MongoSource) GeneratePeer(t *testing.T) *protos.Peer
- func (s *MongoSource) GetRows(ctx context.Context, suffix, table, cols string) (*model.QRecordBatch, error)
- func (s *MongoSource) Teardown(t *testing.T, ctx context.Context, suffix string)
- type MySqlSource
- func (s *MySqlSource) Connector() connectors.Connector
- func (s *MySqlSource) Exec(ctx context.Context, sql string) error
- func (s *MySqlSource) GeneratePeer(t *testing.T) *protos.Peer
- func (s *MySqlSource) GetRows(ctx context.Context, suffix string, table string, cols string) (*model.QRecordBatch, error)
- func (s *MySqlSource) Teardown(t *testing.T, ctx context.Context, suffix string)
- type PeerFlowE2ETestSuiteBQ
- func (s PeerFlowE2ETestSuiteBQ) Conn() *pgx.Conn
- func (s PeerFlowE2ETestSuiteBQ) Connector() *connpostgres.PostgresConnector
- func (s PeerFlowE2ETestSuiteBQ) DestinationConnector() connectors.Connector
- func (s PeerFlowE2ETestSuiteBQ) DestinationTable(table string) string
- func (s PeerFlowE2ETestSuiteBQ) GetRows(tableName string, colsString string) (*model.QRecordBatch, error)
- func (s PeerFlowE2ETestSuiteBQ) GetRowsWhere(tableName string, colsString string, where string) (*model.QRecordBatch, error)
- func (s PeerFlowE2ETestSuiteBQ) Peer() *protos.Peer
- func (s PeerFlowE2ETestSuiteBQ) Source() SuiteSource
- func (s PeerFlowE2ETestSuiteBQ) Suffix() string
- func (s PeerFlowE2ETestSuiteBQ) T() *testing.T
- func (s PeerFlowE2ETestSuiteBQ) Teardown(ctx context.Context)
- type PeerFlowE2ETestSuitePG
- func (s PeerFlowE2ETestSuitePG) Conn() *pgx.Conn
- func (s PeerFlowE2ETestSuitePG) Connector() *connpostgres.PostgresConnector
- func (s PeerFlowE2ETestSuitePG) DestinationConnector() connectors.Connector
- func (s PeerFlowE2ETestSuitePG) DestinationTable(table string) string
- func (s PeerFlowE2ETestSuitePG) Exec(ctx context.Context, sql string) error
- func (s PeerFlowE2ETestSuitePG) GetRows(table string, cols string) (*model.QRecordBatch, error)
- func (s PeerFlowE2ETestSuitePG) Peer() *protos.Peer
- func (s PeerFlowE2ETestSuitePG) Source() SuiteSource
- func (s PeerFlowE2ETestSuitePG) Suffix() string
- func (s PeerFlowE2ETestSuitePG) T() *testing.T
- func (s PeerFlowE2ETestSuitePG) Teardown(ctx context.Context)
- type PeerFlowE2ETestSuiteSF
- func (s PeerFlowE2ETestSuiteSF) Conn() *pgx.Conn
- func (s PeerFlowE2ETestSuiteSF) Connector() *connpostgres.PostgresConnector
- func (s PeerFlowE2ETestSuiteSF) DestinationConnector() connectors.Connector
- func (s PeerFlowE2ETestSuiteSF) DestinationTable(table string) string
- func (s PeerFlowE2ETestSuiteSF) GetRows(tableName string, sfSelector string) (*model.QRecordBatch, error)
- func (s PeerFlowE2ETestSuiteSF) Peer() *protos.Peer
- func (s PeerFlowE2ETestSuiteSF) Source() SuiteSource
- func (s PeerFlowE2ETestSuiteSF) Suffix() string
- func (s PeerFlowE2ETestSuiteSF) T() *testing.T
- func (s PeerFlowE2ETestSuiteSF) Teardown(ctx context.Context)
- type PostgresSource
- func (s *PostgresSource) Connector() connectors.Connector
- func (s *PostgresSource) Exec(ctx context.Context, sql string) error
- func (s *PostgresSource) GeneratePeer(t *testing.T) *protos.Peer
- func (s *PostgresSource) GetLogCount(ctx context.Context, flowJobName, errorType, pattern string) (int, error)
- func (s *PostgresSource) GetRows(ctx context.Context, suffix string, table string, cols string) (*model.QRecordBatch, error)
- func (s *PostgresSource) GetRowsOnly(ctx context.Context, suffix string, table string, cols string) (*model.QRecordBatch, error)
- func (s *PostgresSource) Query(ctx context.Context, query string) (*model.QRecordBatch, error)
- func (s *PostgresSource) Teardown(t *testing.T, ctx context.Context, suffix string)
- type RowSource
- type S3Environment
- type S3PeerCredentials
- type S3TestHelper
- type SnowflakeTestHelper
- func (s *SnowflakeTestHelper) CheckNull(ctx context.Context, tableName string, colNames []string) (bool, error)
- func (s *SnowflakeTestHelper) Cleanup(ctx context.Context) error
- func (s *SnowflakeTestHelper) CountNonNullRows(ctx context.Context, tableName string, columnName string) (int64, error)
- func (s *SnowflakeTestHelper) CountRows(ctx context.Context, tableName string) (int64, error)
- func (s *SnowflakeTestHelper) CountSRIDs(ctx context.Context, tableName string, columnName string) (int64, error)
- func (s *SnowflakeTestHelper) ExecuteAndProcessQuery(ctx context.Context, query string) (*model.QRecordBatch, error)
- func (s *SnowflakeTestHelper) RunCommand(ctx context.Context, command string) error
- func (s *SnowflakeTestHelper) RunIntQuery(ctx context.Context, query string) (int, error)
- type Suite
- type SuiteSource
- type TestClickHouseColumn
- type WorkflowRun
- func ExecuteDropFlow(ctx context.Context, tc client.Client, config *protos.FlowConnectionConfigs, ...) WorkflowRun
- func ExecutePeerflow(t *testing.T, tc client.Client, config *protos.FlowConnectionConfigs) WorkflowRun
- func ExecuteWorkflow(ctx context.Context, tc client.Client, taskQueueID shared.TaskQueueID, wf any, ...) WorkflowRun
- func GetPeerflow(ctx context.Context, catalog *pgx.Conn, tc client.Client, flowName string) (WorkflowRun, error)
- func RunQRepFlowWorkflow(t *testing.T, tc client.Client, config *protos.QRepConfig) WorkflowRun
- func (env WorkflowRun) Cancel(ctx context.Context)
- func (env WorkflowRun) Error(ctx context.Context) error
- func (env WorkflowRun) Finished(ctx context.Context) bool
- func (env WorkflowRun) GetFlowStatus(t *testing.T) protos.FlowStatus
- func (env WorkflowRun) Query(ctx context.Context, queryType string, args ...any) (converter.EncodedValue, error)
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func AttachSchema ¶
func CompareTableSchemas ¶
func CompareTableSchemas(x *protos.TableSchema, y *protos.TableSchema) bool
func ConsumeAllMessages ¶
func ConsumeAllMessages( ctx context.Context, namespaceName string, eventhubName string, expectedNum int, config *protos.EventHubGroupConfig, ) ([]string, error)
consume all messages from the eventhub with the given name“. returns as a list of strings.
func CreateTableForQRep ¶
func DeleteEventhub ¶
func EnvEqualRecordBatches ¶
func EnvEqualRecordBatches(t *testing.T, env WorkflowRun, q *model.QRecordBatch, other *model.QRecordBatch)
func EnvEqualTables ¶
func EnvEqualTables[TSource connectors.Connector](env WorkflowRun, suite RowSource, table string, cols string)
func EnvEqualTablesWithNames ¶
func EnvEqualTablesWithNames( env WorkflowRun, suite RowSource, srcTable string, dstTable string, cols string, )
func EnvGetRunID ¶
func EnvGetRunID(t *testing.T, env WorkflowRun) string
func EnvGetWorkflowState ¶
func EnvGetWorkflowState(t *testing.T, env WorkflowRun) cdc_state.CDCFlowWorkflowState
func EnvNoError ¶
func EnvNoError(t *testing.T, env WorkflowRun, err error)
Helper function to assert errors in go routines running concurrent to workflows This achieves two goals: 1. cancel workflow to avoid waiting on goroutine which has failed 2. get around t.FailNow being incorrect when called from non initial goroutine
func EnvWaitFor ¶
func EnvWaitForCount ¶
func EnvWaitForEqualTables ¶
func EnvWaitForEqualTables( env WorkflowRun, suite RowSource, reason string, table string, cols string, )
func EnvWaitForFinished ¶
func EnvWaitForFinished(t *testing.T, env WorkflowRun, timeout time.Duration)
func ExpectedDestinationIdentifier ¶
func ExpectedDestinationIdentifier(s GenericSuite, ident string) string
func ExpectedDestinationTableName ¶
func ExpectedDestinationTableName(s GenericSuite, table string) string
func GetOwnersSchema ¶
func GetOwnersSchema() *types.QRecordSchema
func GetOwnersSelectorStringsSF ¶
func GetOwnersSelectorStringsSF() [2]string
func GetPostgresToxicProxy ¶
GetPostgresToxicProxy gets or creates the PostgreSQL proxy
func GetTestDatabase ¶
func InitToxiproxy ¶
func InitToxiproxy() error
InitToxiproxy initializes the Toxiproxy client (singleton pattern)
func NewApiClient ¶
func NewApiClient() (protos.FlowServiceClient, error)
func PopulateSourceTable ¶
func RequireEnvCanceled ¶
func RequireEnvCanceled(t *testing.T, env WorkflowRun)
func RequireEqualRecordBatches ¶
func RequireEqualRecordBatches(t *testing.T, q *model.QRecordBatch, other *model.QRecordBatch)
func RequireEqualTableSchemas ¶
func RequireEqualTableSchemas(t *testing.T, expected *protos.TableSchema, actual *protos.TableSchema) bool
func RequireEqualTables ¶
func SetupCDCFlowStatusQuery ¶
func SetupCDCFlowStatusQuery(t *testing.T, env WorkflowRun, config *protos.FlowConnectionConfigs)
func SetupClickHouseSuite ¶
func SetupElasticSuite ¶
func SignalWorkflow ¶
func SignalWorkflow[T any](ctx context.Context, env WorkflowRun, signal model.TypedSignal[T], value T)
func TableMappings ¶
func TableMappings(s GenericSuite, tables ...string) []*protos.TableMapping
func TearDownPostgres ¶
Types ¶
type BigQueryTestHelper ¶
type BigQueryTestHelper struct {
Config *protos.BigqueryConfig
// contains filtered or unexported fields
}
func NewBigQueryTestHelper ¶
func NewBigQueryTestHelper(t *testing.T, datasetID string) (*BigQueryTestHelper, error)
NewBigQueryTestHelper creates a new BigQueryTestHelper.
func (*BigQueryTestHelper) CheckNull ¶
func (b *BigQueryTestHelper) CheckNull(ctx context.Context, tableName string, colName []string) (bool, error)
returns whether the function errors or there are no nulls
func (*BigQueryTestHelper) CountObjectsInGCSPath ¶
func (b *BigQueryTestHelper) CountObjectsInGCSPath(ctx context.Context, gcsPath string) (int, error)
CountObjectsInGCSPath counts the number of objects in a GCS path (gs://bucket/path/prefix)
func (*BigQueryTestHelper) DropDataset ¶
func (b *BigQueryTestHelper) DropDataset(ctx context.Context, datasetName string) error
DropDataset drops the dataset.
func (*BigQueryTestHelper) ExecuteAndProcessQuery ¶
func (b *BigQueryTestHelper) ExecuteAndProcessQuery(ctx context.Context, query string) (*model.QRecordBatch, error)
func (*BigQueryTestHelper) RecreateDataset ¶
func (b *BigQueryTestHelper) RecreateDataset(ctx context.Context) error
RecreateDataset recreates the dataset, i.e, deletes it if exists and creates it again.
func (*BigQueryTestHelper) RunInt64Query ¶
type ClickHouseMVManager ¶
type ClickHouseMVManager struct {
// contains filtered or unexported fields
}
ClickHouseMVManager manages materialized views for ClickHouse testing
func (*ClickHouseMVManager) CreateBadMV ¶
func (m *ClickHouseMVManager) CreateBadMV(ctx context.Context) error
CreateBadMV creates a materialized view on the configured table that will fail when data is inserted. Source schema is assumed to correspond to (id UInt64, val String).
type ClickHouseSuite ¶
type ClickHouseSuite struct {
// contains filtered or unexported fields
}
func (ClickHouseSuite) Conn ¶
func (s ClickHouseSuite) Conn() *pgx.Conn
func (ClickHouseSuite) Connector ¶
func (s ClickHouseSuite) Connector() *connpostgres.PostgresConnector
func (ClickHouseSuite) CreateRMTTable ¶
func (s ClickHouseSuite) CreateRMTTable(tableName string, columns []TestClickHouseColumn, orderingKey string) error
CreateRMTTable creates a ReplacingMergeTree table with the given name and columns.
func (ClickHouseSuite) DestinationConnector ¶
func (s ClickHouseSuite) DestinationConnector() connectors.Connector
func (ClickHouseSuite) DestinationTable ¶
func (s ClickHouseSuite) DestinationTable(table string) string
func (ClickHouseSuite) DropTable ¶
func (s ClickHouseSuite) DropTable(tableName string) error
func (ClickHouseSuite) GetRows ¶
func (s ClickHouseSuite) GetRows(table string, cols string) (*model.QRecordBatch, error)
func (ClickHouseSuite) NewMVManager ¶
func (s ClickHouseSuite) NewMVManager(tableName string, suffix string) *ClickHouseMVManager
func (ClickHouseSuite) Peer ¶
func (s ClickHouseSuite) Peer() *protos.Peer
func (ClickHouseSuite) PeerForDatabase ¶
func (s ClickHouseSuite) PeerForDatabase(dbname string) *protos.Peer
func (ClickHouseSuite) Source ¶
func (s ClickHouseSuite) Source() SuiteSource
func (ClickHouseSuite) Suffix ¶
func (s ClickHouseSuite) Suffix() string
func (ClickHouseSuite) T ¶
func (s ClickHouseSuite) T() *testing.T
func (ClickHouseSuite) Teardown ¶
func (s ClickHouseSuite) Teardown(ctx context.Context)
type FlowConnectionGenerationConfig ¶
type FlowConnectionGenerationConfig struct {
FlowJobName string
TableNameMapping map[string]string
Destination string
TableMappings []*protos.TableMapping
SoftDelete bool
}
func (*FlowConnectionGenerationConfig) GenerateFlowConnectionConfigs ¶
func (c *FlowConnectionGenerationConfig) GenerateFlowConnectionConfigs(s Suite) *protos.FlowConnectionConfigs
type GenericSuite ¶
type MongoSource ¶
type MongoSource struct {
// contains filtered or unexported fields
}
func SetupMongo ¶
func SetupMongo(t *testing.T, suffix string) (*MongoSource, error)
func (*MongoSource) AdminClient ¶
func (s *MongoSource) AdminClient() *mongo.Client
func (*MongoSource) Connector ¶
func (s *MongoSource) Connector() connectors.Connector
func (*MongoSource) GeneratePeer ¶
func (s *MongoSource) GeneratePeer(t *testing.T) *protos.Peer
func (*MongoSource) GetRows ¶
func (s *MongoSource) GetRows(ctx context.Context, suffix, table, cols string) (*model.QRecordBatch, error)
type MySqlSource ¶
type MySqlSource struct {
*connmysql.MySqlConnector
Config *protos.MySqlConfig
}
func SetupMyCore ¶
func SetupMyCore(t *testing.T, suffix string, replication protos.MySqlReplicationMechanism, flavor protos.MySqlFlavor) (*MySqlSource, error)
func SetupMySQL ¶
func SetupMySQL(t *testing.T, suffix string) (*MySqlSource, error)
func (*MySqlSource) Connector ¶
func (s *MySqlSource) Connector() connectors.Connector
func (*MySqlSource) GeneratePeer ¶
func (s *MySqlSource) GeneratePeer(t *testing.T) *protos.Peer
type PeerFlowE2ETestSuiteBQ ¶
type PeerFlowE2ETestSuiteBQ struct {
// contains filtered or unexported fields
}
func SetupBigquerySuite ¶
func SetupBigquerySuite(t *testing.T) PeerFlowE2ETestSuiteBQ
func (PeerFlowE2ETestSuiteBQ) Conn ¶
func (s PeerFlowE2ETestSuiteBQ) Conn() *pgx.Conn
func (PeerFlowE2ETestSuiteBQ) Connector ¶
func (s PeerFlowE2ETestSuiteBQ) Connector() *connpostgres.PostgresConnector
func (PeerFlowE2ETestSuiteBQ) DestinationConnector ¶
func (s PeerFlowE2ETestSuiteBQ) DestinationConnector() connectors.Connector
func (PeerFlowE2ETestSuiteBQ) DestinationTable ¶
func (s PeerFlowE2ETestSuiteBQ) DestinationTable(table string) string
func (PeerFlowE2ETestSuiteBQ) GetRows ¶
func (s PeerFlowE2ETestSuiteBQ) GetRows(tableName string, colsString string) (*model.QRecordBatch, error)
func (PeerFlowE2ETestSuiteBQ) GetRowsWhere ¶
func (s PeerFlowE2ETestSuiteBQ) GetRowsWhere(tableName string, colsString string, where string) (*model.QRecordBatch, error)
func (PeerFlowE2ETestSuiteBQ) Peer ¶
func (s PeerFlowE2ETestSuiteBQ) Peer() *protos.Peer
func (PeerFlowE2ETestSuiteBQ) Source ¶
func (s PeerFlowE2ETestSuiteBQ) Source() SuiteSource
func (PeerFlowE2ETestSuiteBQ) Suffix ¶
func (s PeerFlowE2ETestSuiteBQ) Suffix() string
func (PeerFlowE2ETestSuiteBQ) T ¶
func (s PeerFlowE2ETestSuiteBQ) T() *testing.T
func (PeerFlowE2ETestSuiteBQ) Teardown ¶
func (s PeerFlowE2ETestSuiteBQ) Teardown(ctx context.Context)
type PeerFlowE2ETestSuitePG ¶
type PeerFlowE2ETestSuitePG struct {
// contains filtered or unexported fields
}
func SetupPostgresSuite ¶
func SetupPostgresSuite(t *testing.T) PeerFlowE2ETestSuitePG
func (PeerFlowE2ETestSuitePG) Conn ¶
func (s PeerFlowE2ETestSuitePG) Conn() *pgx.Conn
func (PeerFlowE2ETestSuitePG) Connector ¶
func (s PeerFlowE2ETestSuitePG) Connector() *connpostgres.PostgresConnector
func (PeerFlowE2ETestSuitePG) DestinationConnector ¶
func (s PeerFlowE2ETestSuitePG) DestinationConnector() connectors.Connector
func (PeerFlowE2ETestSuitePG) DestinationTable ¶
func (s PeerFlowE2ETestSuitePG) DestinationTable(table string) string
func (PeerFlowE2ETestSuitePG) Exec ¶
func (s PeerFlowE2ETestSuitePG) Exec(ctx context.Context, sql string) error
func (PeerFlowE2ETestSuitePG) GetRows ¶
func (s PeerFlowE2ETestSuitePG) GetRows(table string, cols string) (*model.QRecordBatch, error)
func (PeerFlowE2ETestSuitePG) Peer ¶
func (s PeerFlowE2ETestSuitePG) Peer() *protos.Peer
func (PeerFlowE2ETestSuitePG) Source ¶
func (s PeerFlowE2ETestSuitePG) Source() SuiteSource
func (PeerFlowE2ETestSuitePG) Suffix ¶
func (s PeerFlowE2ETestSuitePG) Suffix() string
func (PeerFlowE2ETestSuitePG) T ¶
func (s PeerFlowE2ETestSuitePG) T() *testing.T
func (PeerFlowE2ETestSuitePG) Teardown ¶
func (s PeerFlowE2ETestSuitePG) Teardown(ctx context.Context)
type PeerFlowE2ETestSuiteSF ¶
type PeerFlowE2ETestSuiteSF struct {
// contains filtered or unexported fields
}
func SetupSnowflakeSuite ¶
func SetupSnowflakeSuite(t *testing.T) PeerFlowE2ETestSuiteSF
func (PeerFlowE2ETestSuiteSF) Conn ¶
func (s PeerFlowE2ETestSuiteSF) Conn() *pgx.Conn
func (PeerFlowE2ETestSuiteSF) Connector ¶
func (s PeerFlowE2ETestSuiteSF) Connector() *connpostgres.PostgresConnector
func (PeerFlowE2ETestSuiteSF) DestinationConnector ¶
func (s PeerFlowE2ETestSuiteSF) DestinationConnector() connectors.Connector
func (PeerFlowE2ETestSuiteSF) DestinationTable ¶
func (s PeerFlowE2ETestSuiteSF) DestinationTable(table string) string
func (PeerFlowE2ETestSuiteSF) GetRows ¶
func (s PeerFlowE2ETestSuiteSF) GetRows(tableName string, sfSelector string) (*model.QRecordBatch, error)
func (PeerFlowE2ETestSuiteSF) Peer ¶
func (s PeerFlowE2ETestSuiteSF) Peer() *protos.Peer
func (PeerFlowE2ETestSuiteSF) Source ¶
func (s PeerFlowE2ETestSuiteSF) Source() SuiteSource
func (PeerFlowE2ETestSuiteSF) Suffix ¶
func (s PeerFlowE2ETestSuiteSF) Suffix() string
func (PeerFlowE2ETestSuiteSF) T ¶
func (s PeerFlowE2ETestSuiteSF) T() *testing.T
func (PeerFlowE2ETestSuiteSF) Teardown ¶
func (s PeerFlowE2ETestSuiteSF) Teardown(ctx context.Context)
type PostgresSource ¶
type PostgresSource struct {
*connpostgres.PostgresConnector
}
func SetupPostgres ¶
func SetupPostgres(t *testing.T, suffix string) (*PostgresSource, error)
func SetupPostgresWithToxiproxy ¶
func SetupPostgresWithToxiproxy(t *testing.T, suffix string, port uint32) (*PostgresSource, *tp.Proxy, error)
SetupPostgresWithToxiproxy creates a PostgreSQL source that connects through Toxiproxy
func (*PostgresSource) Connector ¶
func (s *PostgresSource) Connector() connectors.Connector
func (*PostgresSource) GeneratePeer ¶
func (s *PostgresSource) GeneratePeer(t *testing.T) *protos.Peer
func (*PostgresSource) GetLogCount ¶
func (*PostgresSource) GetRows ¶
func (s *PostgresSource) GetRows(ctx context.Context, suffix string, table string, cols string) (*model.QRecordBatch, error)
func (*PostgresSource) GetRowsOnly ¶
func (s *PostgresSource) GetRowsOnly(ctx context.Context, suffix string, table string, cols string) (*model.QRecordBatch, error)
to avoid fetching rows from "child" tables ala Postgres table inheritance
func (*PostgresSource) Query ¶
func (s *PostgresSource) Query(ctx context.Context, query string) (*model.QRecordBatch, error)
type RowSource ¶
type RowSource interface {
Suite
GetRows(table, cols string) (*model.QRecordBatch, error)
}
type S3PeerCredentials ¶
type S3TestHelper ¶
type S3TestHelper struct {
S3Config *protos.S3Config
BucketName string
// contains filtered or unexported fields
}
func NewS3TestHelper ¶
func NewS3TestHelper(ctx context.Context, s3environment S3Environment) (*S3TestHelper, error)
func (*S3TestHelper) CleanUp ¶
func (h *S3TestHelper) CleanUp(ctx context.Context) error
Delete all generated objects during the test
func (*S3TestHelper) ListAllFiles ¶
func (h *S3TestHelper) ListAllFiles( ctx context.Context, jobName string, ) ([]s3types.Object, error)
List all files from the S3 bucket. returns as a list of S3Objects.
type SnowflakeTestHelper ¶
type SnowflakeTestHelper struct {
// config is the Snowflake config.
Config *protos.SnowflakeConfig
// contains filtered or unexported fields
}
func NewSnowflakeTestHelper ¶
func NewSnowflakeTestHelper(t *testing.T) (*SnowflakeTestHelper, error)
func (*SnowflakeTestHelper) Cleanup ¶
func (s *SnowflakeTestHelper) Cleanup(ctx context.Context) error
Cleanup drops the database.
func (*SnowflakeTestHelper) CountNonNullRows ¶
func (s *SnowflakeTestHelper) CountNonNullRows(ctx context.Context, tableName string, columnName string) (int64, error)
CountRows(tableName) returns the non-null number of rows in the given table.
func (*SnowflakeTestHelper) CountRows ¶
CountRows(tableName) returns the number of rows in the given table.
func (*SnowflakeTestHelper) CountSRIDs ¶
func (*SnowflakeTestHelper) ExecuteAndProcessQuery ¶
func (s *SnowflakeTestHelper) ExecuteAndProcessQuery(ctx context.Context, query string) (*model.QRecordBatch, error)
func (*SnowflakeTestHelper) RunCommand ¶
func (s *SnowflakeTestHelper) RunCommand(ctx context.Context, command string) error
RunCommand runs the given command.
func (*SnowflakeTestHelper) RunIntQuery ¶
runs a query that returns an int result
type Suite ¶
type Suite interface {
e2eshared.Suite
T() *testing.T
Connector() *connpostgres.PostgresConnector
Suffix() string
Source() SuiteSource
}
type SuiteSource ¶
type TestClickHouseColumn ¶
type WorkflowRun ¶
type WorkflowRun struct {
client.WorkflowRun
// contains filtered or unexported fields
}
func ExecuteDropFlow ¶
func ExecuteDropFlow(ctx context.Context, tc client.Client, config *protos.FlowConnectionConfigs, tableMappingsVersion int64) WorkflowRun
func ExecutePeerflow ¶
func ExecutePeerflow(t *testing.T, tc client.Client, config *protos.FlowConnectionConfigs) WorkflowRun
func ExecuteWorkflow ¶
func ExecuteWorkflow(ctx context.Context, tc client.Client, taskQueueID shared.TaskQueueID, wf any, args ...any) WorkflowRun
func GetPeerflow ¶
func RunQRepFlowWorkflow ¶
func RunQRepFlowWorkflow(t *testing.T, tc client.Client, config *protos.QRepConfig) WorkflowRun
func (WorkflowRun) Cancel ¶
func (env WorkflowRun) Cancel(ctx context.Context)
func (WorkflowRun) GetFlowStatus ¶
func (env WorkflowRun) GetFlowStatus(t *testing.T) protos.FlowStatus
func (WorkflowRun) Query ¶
func (env WorkflowRun) Query(ctx context.Context, queryType string, args ...any) (converter.EncodedValue, error)