utils

package
v0.0.0-...-493599c Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 22, 2026 License: AGPL-3.0 Imports: 67 Imported by: 0

Documentation

Index

Constants

View Source
const (
	AvroLocalStorage = iota
	AvroS3Storage
	AvroGCSStorage
)
View Source
const (
	ToxiproxyAPIPort = "18474"
	SSHServerPort    = "2222"
	ToxiproxyHost    = "localhost"
	SSHServerHost    = "openssh"
)
View Source
const FullTablePartitionID = "full-table-partition-id"
View Source
const MaxAWSSessionNameLength = 63 // Docs mention 64 as limit, but always good to stay under
View Source
const RDSAuthTokenTTL = 10 * time.Minute

RDSAuthTokenTTL is the cache TTL for RDS auth tokens. RDS Tokens Live for 15 minutes by default

View Source
const SSHKeepaliveInterval = 15 * time.Second

Variables

This section is empty.

Functions

func CreatePeerNoValidate

func CreatePeerNoValidate(
	ctx context.Context,
	pool shared.CatalogPool,
	peer *protos.Peer,
	allowUpdate bool,
) (*protos.CreatePeerResponse, error)

func CreateS3Client

func CreateS3Client(ctx context.Context, credsProvider AWSCredentialsProvider) (*s3.Client, error)

func CreateSSHProxy

func CreateSSHProxy(t *testing.T, client *toxiproxy.Client, name string, port int) *toxiproxy.Proxy

func DefaultOnRecord

func DefaultOnRecord(ls *lua.LState) int

func ExecuteTemplate

func ExecuteTemplate(tmpl string, params map[string]string) (string, error)

ExecuteTemplate executes a text template with the provided parameters and returns the resulting string.

func FileURLForS3Service

func FileURLForS3Service(endpoint string, region string, bucket string, filePath string) string

func FormatTableSize

func FormatTableSize(bytes int64) string

FormatTableSize converts bytes to human-readable format

func GetRDSToken

func GetRDSToken(ctx context.Context, connConfig RDSConnectionConfig, rdsAuth *RDSAuth, connectorName string) (string, error)

func GetSSHClientConfig

func GetSSHClientConfig(config *protos.SSHConfig) (*ssh.ClientConfig, error)

GetSSHClientConfig returns an *ssh.ClientConfig based on provided credentials.

func InitialiseTableRowsMap

func InitialiseTableRowsMap(tableMaps []*protos.TableMapping) map[string]*model.RecordTypeCounts

func IsLower

func IsLower(s string) bool

I think these only work with ASCII?

func IsUpper

func IsUpper(s string) bool

I think these only work with ASCII?

func KeysToString

func KeysToString(m map[string]struct{}) string

func LVAsReadOnlyBytes

func LVAsReadOnlyBytes(ls *lua.LState, v lua.LValue) ([]byte, error)

func LVAsStringOrNil

func LVAsStringOrNil(ls *lua.LState, v lua.LValue) (string, error)

func LoadScript

func LoadScript(ctx context.Context, script string, printfn lua.LGFunction) (*lua.LState, error)

func LuaPrintFn

func LuaPrintFn(fn func(string)) lua.LGFunction

func NewPeerDBOCFWriter

func NewPeerDBOCFWriter(
	stream *model.QRecordStream,
	avroSchema *model.QRecordAvroSchemaDefinition,
	avroCompressionCodec ocf.CodecName,
	targetDWH protos.DBType,
	sizeTracker *model.QRecordAvroChunkSizeTracker,
) *peerDBOCFWriter

func NewToxiproxyClient

func NewToxiproxyClient(t *testing.T) *toxiproxy.Client

func PutAndRemoveS3

func PutAndRemoveS3(ctx context.Context, client *s3.Client, bucket string, prefix string) error

Write an empty file and then delete it to check if we have write permissions

func QuoteLiteral

func QuoteLiteral(literal string) string

QuoteLiteral quotes a 'literal' (e.g. a parameter, often used to pass literal to DDL and other statements that do not accept parameters) to be used as part of an SQL statement. For example:

exp_date := pq.QuoteLiteral("2023-01-05 15:00:00Z")
err := db.Exec(fmt.Sprintf("CREATE ROLE my_user VALID UNTIL %s", exp_date))

Any single quotes in name will be escaped. Any backslashes (i.e. "\") will be replaced by two backslashes (i.e. "\\") and the C-style escape identifier that PostgreSQL provides ('E') will be prepended to the string.

func RemoveSpacesTabsNewlines

func RemoveSpacesTabsNewlines(s string) string

func RunSSHKeepaliveDownTest

func RunSSHKeepaliveDownTest(t *testing.T, cfg SSHKeepaliveTestConfig)

func RunSSHKeepaliveLatencyTest

func RunSSHKeepaliveLatencyTest(t *testing.T, cfg SSHKeepaliveTestConfig)

func RunSSHResetPeerTest

func RunSSHResetPeerTest(t *testing.T, cfg SSHKeepaliveTestConfig)

Types

type AWSCredentials

type AWSCredentials struct {
	EndpointUrl *string
	AWS         aws.Credentials
}

type AWSCredentialsProvider

type AWSCredentialsProvider interface {
	Retrieve(ctx context.Context) (AWSCredentials, error)
	GetUnderlyingProvider() aws.CredentialsProvider
	GetRegion() string
	GetEndpointURL() string
	GetTlsConfig() (*string, string)
}

func GetAWSCredentialsProvider

func GetAWSCredentialsProvider(ctx context.Context, connectorName string, peerCredentials PeerAWSCredentials) (AWSCredentialsProvider, error)

type AssumeRoleBasedAWSCredentialsProvider

type AssumeRoleBasedAWSCredentialsProvider struct {
	Provider aws.CredentialsProvider // New Credentials
	// contains filtered or unexported fields
}

func NewAssumeRoleBasedAWSCredentialsProvider

func NewAssumeRoleBasedAWSCredentialsProvider(
	ctx context.Context,
	config aws.Config,
	roleArn string,
	sessionName string,
) (*AssumeRoleBasedAWSCredentialsProvider, error)

func (*AssumeRoleBasedAWSCredentialsProvider) GetEndpointURL

func (a *AssumeRoleBasedAWSCredentialsProvider) GetEndpointURL() string

func (*AssumeRoleBasedAWSCredentialsProvider) GetRegion

func (*AssumeRoleBasedAWSCredentialsProvider) GetTlsConfig

func (a *AssumeRoleBasedAWSCredentialsProvider) GetTlsConfig() (*string, string)

func (*AssumeRoleBasedAWSCredentialsProvider) GetUnderlyingProvider

func (*AssumeRoleBasedAWSCredentialsProvider) Retrieve

type AvroFile

type AvroFile struct {
	FilePath        string              `json:"filePath"`
	StorageLocation AvroStorageLocation `json:"storageLocation"`
	NumRecords      int64               `json:"numRecords"`
}

func (*AvroFile) Cleanup

func (l *AvroFile) Cleanup(ctx context.Context)

type AvroStorageLocation

type AvroStorageLocation int64

type CDCStore

type CDCStore[Items model.Items] struct {
	// contains filtered or unexported fields
}

func NewCDCStore

func NewCDCStore[Items model.Items](ctx context.Context, env map[string]string, flowJobName string) (*CDCStore[Items], error)

func (*CDCStore[T]) Close

func (c *CDCStore[T]) Close() error

func (*CDCStore[T]) Get

func (c *CDCStore[T]) Get(key model.TableWithPkey) (model.Record[T], bool, error)

bool is to indicate if a record is found or not [similar to ok]

func (*CDCStore[T]) Set

func (c *CDCStore[T]) Set(key model.TableWithPkey, rec model.Record[T]) error

type ClickHouseS3Credentials

type ClickHouseS3Credentials struct {
	Provider   AWSCredentialsProvider
	BucketPath string
}

type ConfigBasedAWSCredentialsProvider

type ConfigBasedAWSCredentialsProvider struct {
	// contains filtered or unexported fields
}

func NewConfigBasedAWSCredentialsProvider

func NewConfigBasedAWSCredentialsProvider(config aws.Config) *ConfigBasedAWSCredentialsProvider

func (*ConfigBasedAWSCredentialsProvider) GetEndpointURL

func (r *ConfigBasedAWSCredentialsProvider) GetEndpointURL() string

func (*ConfigBasedAWSCredentialsProvider) GetRegion

func (*ConfigBasedAWSCredentialsProvider) GetTlsConfig

func (r *ConfigBasedAWSCredentialsProvider) GetTlsConfig() (*string, string)

func (*ConfigBasedAWSCredentialsProvider) GetUnderlyingProvider

func (r *ConfigBasedAWSCredentialsProvider) GetUnderlyingProvider() aws.CredentialsProvider

func (*ConfigBasedAWSCredentialsProvider) Retrieve

Retrieve should be called as late as possible in order to have credentials with latest expiry

type GcpServiceAccount

type GcpServiceAccount struct {
	Type                    string `json:"type"`
	ProjectID               string `json:"project_id"`
	PrivateKeyID            string `json:"private_key_id"`
	PrivateKey              string `json:"private_key"`
	ClientEmail             string `json:"client_email"`
	ClientID                string `json:"client_id"`
	AuthURI                 string `json:"auth_uri"`
	TokenURI                string `json:"token_uri"`
	AuthProviderX509CertURL string `json:"auth_provider_x509_cert_url"`
	ClientX509CertURL       string `json:"client_x509_cert_url"`
}

func GcpServiceAccountFromProto

func GcpServiceAccountFromProto(sa *protos.GcpServiceAccount) *GcpServiceAccount

func (*GcpServiceAccount) CreateBigQueryClient

func (sa *GcpServiceAccount) CreateBigQueryClient(ctx context.Context) (*bigquery.Client, error)

CreateBigQueryClient creates a new BigQuery client from a GcpServiceAccount.

func (*GcpServiceAccount) CreatePubSubClient

func (sa *GcpServiceAccount) CreatePubSubClient(ctx context.Context) (*pubsub.Client, error)

CreatePubSubClient creates a new PubSub client from a GcpServiceAccount.

func (*GcpServiceAccount) CreateStorageClient

func (sa *GcpServiceAccount) CreateStorageClient(ctx context.Context) (*storage.Client, error)

CreateStorageClient creates a new Storage client from a GcpServiceAccount.

func (*GcpServiceAccount) Validate

func (sa *GcpServiceAccount) Validate() error

Validates a GcpServiceAccount, that none of the fields are empty.

type LPool

type LPool[T any] struct {
	// contains filtered or unexported fields
}

func LuaPool

func LuaPool[T any](maxSize int, cons func() (*lua.LState, error), merge func(T)) (*LPool[T], error)

func (*LPool[T]) Close

func (pool *LPool[T]) Close()

func (*LPool[T]) Run

func (pool *LPool[T]) Run(f func(*lua.LState) T)

func (*LPool[T]) Spawn

func (pool *LPool[T]) Spawn() error

func (*LPool[T]) Wait

func (pool *LPool[T]) Wait(ctx context.Context) error

type LPoolMessage

type LPoolMessage[T any] struct {
	// contains filtered or unexported fields
}

type MeteredConn

type MeteredConn struct {
	net.Conn
	// contains filtered or unexported fields
}

func (*MeteredConn) Read

func (mc *MeteredConn) Read(b []byte) (int, error)

type MeteredDialer

type MeteredDialer struct {
	// contains filtered or unexported fields
}

func NewMeteredDialer

func NewMeteredDialer(totalBytesRead *atomic.Int64, deltaBytesRead *atomic.Int64,
	innerDialer innerDialer, noDeadlineRequired bool,
) MeteredDialer

func (*MeteredDialer) DialContext

func (md *MeteredDialer) DialContext(ctx context.Context, network, address string) (net.Conn, error)

type NoDeadlineConn

type NoDeadlineConn struct{ net.Conn }

see: https://github.com/jackc/pgx/issues/382#issuecomment-1496586216

func (*NoDeadlineConn) SetDeadline

func (c *NoDeadlineConn) SetDeadline(t time.Time) error

func (*NoDeadlineConn) SetReadDeadline

func (c *NoDeadlineConn) SetReadDeadline(t time.Time) error

func (*NoDeadlineConn) SetWriteDeadline

func (c *NoDeadlineConn) SetWriteDeadline(t time.Time) error

type PartitionHelper

type PartitionHelper struct {
	// contains filtered or unexported fields
}

func NewPartitionHelper

func NewPartitionHelper(logger log.Logger) *PartitionHelper

func (*PartitionHelper) AddNullPartition

func (p *PartitionHelper) AddNullPartition()

func (*PartitionHelper) AddPartition

func (p *PartitionHelper) AddPartition(start any, end any) error

func (*PartitionHelper) AddPartitionsWithRange

func (p *PartitionHelper) AddPartitionsWithRange(start any, end any, numPartitions int64) error

func (*PartitionHelper) GetPartitions

func (p *PartitionHelper) GetPartitions() []*protos.QRepPartition

type PartitionRangeForComparison

type PartitionRangeForComparison struct {
	// contains filtered or unexported fields
}

type PartitionRangeType

type PartitionRangeType string
const (
	PartitionEndRangeType   PartitionRangeType = "end"
	PartitionStartRangeType PartitionRangeType = "start"
)

type PeerAWSCredentials

type PeerAWSCredentials struct {
	Credentials    aws.Credentials
	RoleArn        *string
	ChainedRoleArn *string
	EndpointUrl    *string
	Region         string
	RootCAs        *string
	TlsHost        string
}

func BuildPeerAWSCredentials

func BuildPeerAWSCredentials(awsAuth *protos.AwsAuthenticationConfig) PeerAWSCredentials

func NewPeerAWSCredentials

func NewPeerAWSCredentials(s3 *protos.S3Config) PeerAWSCredentials

type RDSAuth

type RDSAuth struct {
	AwsAuthConfig *protos.AwsAuthenticationConfig
	// contains filtered or unexported fields
}

func (*RDSAuth) VerifyAuthConfig

func (r *RDSAuth) VerifyAuthConfig() error

type RDSConnectionConfig

type RDSConnectionConfig struct {
	Host string
	User string
	Port uint32
}

type RecalculateV4Signature

type RecalculateV4Signature struct {
	// contains filtered or unexported fields
}

RecalculateV4Signature allow GCS over S3, removing Accept-Encoding header from sign https://stackoverflow.com/a/74382598/1204665 https://github.com/aws/aws-sdk-go-v2/issues/1816

func (*RecalculateV4Signature) RoundTrip

func (lt *RecalculateV4Signature) RoundTrip(req *http.Request) (*http.Response, error)

type S3BucketAndPrefix

type S3BucketAndPrefix struct {
	Bucket string
	Prefix string
}

func NewS3BucketAndPrefix

func NewS3BucketAndPrefix(s3Path string) (*S3BucketAndPrefix, error)

path would be something like s3://bucket/prefix

type SSHKeepaliveTestConfig

type SSHKeepaliveTestConfig struct {
	SSHProxy      *toxiproxy.Proxy
	KeepaliveChan <-chan struct{}
	RunLongQuery  func(ctx context.Context) error
}

Callbacks avoid adding test-only methods to the connector interfaces in core.go.

type SSHTunnel

type SSHTunnel struct {
	*ssh.Client
	// contains filtered or unexported fields
}

func NewSSHTunnel

func NewSSHTunnel(
	ctx context.Context,
	sshConfig *protos.SSHConfig,
) (*SSHTunnel, error)

func (*SSHTunnel) Close

func (tunnel *SSHTunnel) Close() error

func (*SSHTunnel) GetKeepaliveChan

func (tunnel *SSHTunnel) GetKeepaliveChan(ctx context.Context) <-chan struct{}

returns a channel that is closed if the SSH keepalive fails, or nil if no SSH tunnel is configured

func (*SSHTunnel) StartKeepalive

func (tunnel *SSHTunnel) StartKeepalive(ctx context.Context, onFailure func())

type StaticAWSCredentialsProvider

type StaticAWSCredentialsProvider struct {
	// contains filtered or unexported fields
}

func LoadPeerDBAWSEnvConfigProvider

func LoadPeerDBAWSEnvConfigProvider(connectorName string) *StaticAWSCredentialsProvider

func NewStaticAWSCredentialsProvider

func NewStaticAWSCredentialsProvider(credentials AWSCredentials, region string, rootCAs *string, tlsHost string) *StaticAWSCredentialsProvider

func (*StaticAWSCredentialsProvider) GetEndpointURL

func (s *StaticAWSCredentialsProvider) GetEndpointURL() string

func (*StaticAWSCredentialsProvider) GetRegion

func (s *StaticAWSCredentialsProvider) GetRegion() string

func (*StaticAWSCredentialsProvider) GetTlsConfig

func (s *StaticAWSCredentialsProvider) GetTlsConfig() (*string, string)

func (*StaticAWSCredentialsProvider) GetUnderlyingProvider

func (s *StaticAWSCredentialsProvider) GetUnderlyingProvider() aws.CredentialsProvider

func (*StaticAWSCredentialsProvider) Retrieve

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL