shared

package
v0.2.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 23, 2022 License: MIT Imports: 17 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var DefaultDsnConnectionKeyNames = struct {
	Dsn string
}{
	Dsn: "dsn",
}

Functions

func DsnConnectionDetailsToMap

func DsnConnectionDetailsToMap(m map[string]string, c *DsnConnectionDetails) map[string]string

DsnConnectionDetailsToMap converts populates the supplied map with details from the connection-specific ODBC struct. The keys used to populate the map are those defaults held in DefaultDsnConnectionKeyNames. TODO: remove this by fixing 12factor code to use new methods above

func FixSqlStatementGeneratorConfig

func FixSqlStatementGeneratorConfig(cfg *SqlStatementGeneratorConfig)

func OracleConnectionDetailsToDSN

func OracleConnectionDetailsToDSN(d *OracleConnectionDetails) (retval string, err error)

OracleConnectionDetailsToDSN is a helper function to build a connection string. DBParams is optional.

Types

type ConnectionDetails

type ConnectionDetails struct {
	Type        string            `json:"type" errorTxt:"database type" mandatory:"yes" yaml:"type"`
	LogicalName string            `json:"logicalName" errorTxt:"database logical name" mandatory:"yes" yaml:"logicalName"`
	Data        map[string]string `json:"data" yaml:"data"`
}

ConnectionDetails is intended to hold credentials for a logical database connection. TODO: consider binding functions like OracleConnectionDetailsToMap to this struct for each connection type.

func (ConnectionDetails) MustGetDateFilterSql

func (c ConnectionDetails) MustGetDateFilterSql(dateVarName string) string

GetDateFilterSql returns a SQL snippet that can be used as a part of a date-time filter/predicate, where the SQL returned is database-type specific. We assume the literal string ${maxDateInTarget} will be replaced by the caller later.

func (ConnectionDetails) MustGetSysDateSql

func (c ConnectionDetails) MustGetSysDateSql() string

func (ConnectionDetails) String

func (c ConnectionDetails) String() string

String redacts passwords and pretty-prints the contents of ConnectionDetails. TODO: add tests for "dsn" redaction of passwords.

type ConnectionGetter

type ConnectionGetter interface {
	LoadConnection(name string) (ConnectionDetails, error)
}

type Connector

type Connector interface {
	// Go SQL entry points:
	Begin() (Transacter, error)
	Exec(query string, args ...interface{}) (Result, error)
	ExecContext(ctx context.Context, query string, args ...interface{}) (Result, error)
	Query(query string, args ...interface{}) (*HpRows, error)
	QueryContext(ctx context.Context, query string, args ...interface{}) (*HpRows, error)
	Close()
	// Halfpipe functionality:
	GetType() string
	GetDmlGenerator() DmlGenerator
}

Connector abstracts all access to Go SQL functionality.

func NewMockConnectionWithMockTx

func NewMockConnectionWithMockTx(log logger.Logger, dbType string) (Connector, chan string)

type CqnConnector

type CqnConnector interface {
	NewCqnConnection(dsnString string) (OracleCqnExecutor, error)
}

type DBConnections

type DBConnections map[string]ConnectionDetails

DBConnections is used by transform code and JSON pipelines definitions.

func (*DBConnections) LoadConnection

func (c *DBConnections) LoadConnection(i ConnectionGetter, connectionName string) error

LoadConnection will load the supplied *c[connectionName], which is expected to be in c, using the interface to do the actual loading.

type DmlGenerator

type DmlGenerator interface {
	NewInsertGenerator(cfg *SqlStatementGeneratorConfig) SqlStmtGenerator
	NewUpdateGenerator(cfg *SqlStatementGeneratorConfig) SqlStmtGenerator
	NewDeleteGenerator(cfg *SqlStatementGeneratorConfig) SqlStmtGenerator
	NewMergeGenerator(cfg *SqlStatementGeneratorConfig) SqlStmtGenerator
}

type DmlGeneratorTxtBatch

type DmlGeneratorTxtBatch struct{}

func (*DmlGeneratorTxtBatch) NewDeleteGenerator

NewDeleteGenerator. Configure defaults in SqlStatementGeneratorConfig.

func (*DmlGeneratorTxtBatch) NewInsertGenerator

NewInsertGenerator creates a new SqlStmtGenerator that implements interface SqlStmtTxtBatcher. Configure defaults in SqlStatementGeneratorConfig.

func (*DmlGeneratorTxtBatch) NewMergeGenerator

NewMergeGenerator Configure defaults in SqlStatementGeneratorConfig.

func (*DmlGeneratorTxtBatch) NewUpdateGenerator

NewUpdateGenerator. Configure defaults in SqlStatementGeneratorConfig.

type DsnConnectionDetails

type DsnConnectionDetails struct {
	Dsn            string `errorTxt:"data source name i.e. connect string" mandatory:"yes"`
	OriginalScheme string
}

DsnConnectionDetails is a simple struct to hold a DSN only.

func GetDsnConnectionDetails

func GetDsnConnectionDetails(c *ConnectionDetails) *DsnConnectionDetails

GetDsnConnectionDetails converts generic ConnectionDetails to DsnConnectionDetails and returns a pointer to the new struct.

func (DsnConnectionDetails) GetMap

func (d DsnConnectionDetails) GetMap(m map[string]string) map[string]string

func (DsnConnectionDetails) GetScheme

func (d DsnConnectionDetails) GetScheme() (string, error)

func (DsnConnectionDetails) Parse

func (d DsnConnectionDetails) Parse() error

func (DsnConnectionDetails) String

func (d DsnConnectionDetails) String() string

String returns the DSN with redacted password.

type HpColumnType

type HpColumnType struct {
	// contains filtered or unexported fields
}

func (*HpColumnType) DatabaseTypeName

func (c *HpColumnType) DatabaseTypeName() string

func (*HpColumnType) DecimalSize

func (c *HpColumnType) DecimalSize() (precision, scale int64, ok bool)

func (*HpColumnType) Length

func (c *HpColumnType) Length() (length int64, ok bool)

func (*HpColumnType) Name

func (c *HpColumnType) Name() string

func (*HpColumnType) Nullable

func (c *HpColumnType) Nullable() (nullable, ok bool)

func (*HpColumnType) ScanType

func (c *HpColumnType) ScanType() reflect.Type

type HpConnection

type HpConnection struct {
	DbRelloyd *relloyd.DB
	DbSql     *sql.DB
	Dml       DmlGenerator
	DbType    string
}

HpConnection is a wrapper around: 1) Go native sql.DB 2) relloyd/go-sql.DB It also adds the DmlGenerator interface for use in components that output records to a database.

func (*HpConnection) Begin

func (c *HpConnection) Begin() (Transacter, error)

func (*HpConnection) Close

func (c *HpConnection) Close()

func (*HpConnection) Exec

func (c *HpConnection) Exec(query string, args ...interface{}) (Result, error)

func (*HpConnection) ExecContext

func (c *HpConnection) ExecContext(ctx context.Context, query string, args ...interface{}) (Result, error)

func (*HpConnection) GetDmlGenerator

func (c *HpConnection) GetDmlGenerator() DmlGenerator

func (*HpConnection) GetType

func (c *HpConnection) GetType() string

func (*HpConnection) Query

func (c *HpConnection) Query(query string, args ...interface{}) (*HpRows, error)

func (*HpConnection) QueryContext

func (c *HpConnection) QueryContext(ctx context.Context, query string, args ...interface{}) (*HpRows, error)

type HpRows

type HpRows struct {
	// contains filtered or unexported fields
}

func (*HpRows) Close

func (r *HpRows) Close() error

func (*HpRows) ColumnTypes

func (r *HpRows) ColumnTypes() ([]*HpColumnType, error)

func (*HpRows) Columns

func (r *HpRows) Columns() ([]string, error)

func (*HpRows) Err

func (r *HpRows) Err() error

func (*HpRows) Next

func (r *HpRows) Next() bool

func (*HpRows) NextResultSet

func (r *HpRows) NextResultSet() bool

func (*HpRows) Scan

func (r *HpRows) Scan(dest ...interface{}) error

type HpStmt

type HpStmt struct {
	// contains filtered or unexported fields
}

func (*HpStmt) Close

func (s *HpStmt) Close() error

func (*HpStmt) Exec

func (s *HpStmt) Exec(args ...interface{}) (Result, error)

func (*HpStmt) ExecBatch

func (s *HpStmt) ExecBatch(args [][]interface{}) (Result, error)

func (*HpStmt) ExecBatchContext

func (s *HpStmt) ExecBatchContext(ctx context.Context, args [][]interface{}) (Result, error)

func (*HpStmt) ExecContext

func (s *HpStmt) ExecContext(ctx context.Context, args ...interface{}) (Result, error)

func (*HpStmt) Query

func (s *HpStmt) Query(args ...interface{}) (*HpRows, error)

func (*HpStmt) QueryContext

func (s *HpStmt) QueryContext(ctx context.Context, args ...interface{}) (*HpRows, error)

type HpTx

type HpTx struct {
	// contains filtered or unexported fields
}

func (*HpTx) Commit

func (t *HpTx) Commit() error

func (*HpTx) Exec

func (t *HpTx) Exec(query string, args ...interface{}) (Result, error)

func (*HpTx) ExecContext

func (t *HpTx) ExecContext(ctx context.Context, query string, args ...interface{}) (Result, error)

func (*HpTx) Prepare

func (t *HpTx) Prepare(query string) (StatementBatch, error)

func (*HpTx) PrepareContext

func (t *HpTx) PrepareContext(ctx context.Context, query string) (StatementBatch, error)

func (*HpTx) Rollback

func (t *HpTx) Rollback() error

type MockConnectionWithMockTx

type MockConnectionWithMockTx struct {
	OutputChan      chan string  // channel to be supplied by caller and used to return SQL and values generated by Exec().
	Dml             DmlGenerator // TODO: implement the mock for this!
	DbType          string
	DbHasBeenClosed bool
}

func (*MockConnectionWithMockTx) Begin

func (*MockConnectionWithMockTx) Close

func (c *MockConnectionWithMockTx) Close()

func (*MockConnectionWithMockTx) Exec

func (c *MockConnectionWithMockTx) Exec(query string, args ...interface{}) (Result, error)

func (*MockConnectionWithMockTx) ExecContext

func (c *MockConnectionWithMockTx) ExecContext(ctx context.Context, query string, args ...interface{}) (Result, error)

func (*MockConnectionWithMockTx) GetDmlGenerator

func (c *MockConnectionWithMockTx) GetDmlGenerator() DmlGenerator

func (*MockConnectionWithMockTx) GetType

func (c *MockConnectionWithMockTx) GetType() string

func (*MockConnectionWithMockTx) NewDeleteGenerator

func (*MockConnectionWithMockTx) NewInsertGenerator

func (*MockConnectionWithMockTx) NewUpdateGenerator

func (*MockConnectionWithMockTx) Query

func (c *MockConnectionWithMockTx) Query(query string, args ...interface{}) (*HpRows, error)

func (*MockConnectionWithMockTx) QueryContext

func (c *MockConnectionWithMockTx) QueryContext(ctx context.Context, query string, args ...interface{}) (*HpRows, error)

func (*MockConnectionWithMockTx) WasClosed

func (c *MockConnectionWithMockTx) WasClosed() bool

type NetezzaConnectionDetails

type NetezzaConnectionDetails struct {
	Dsn            string `errorTxt:"data source name i.e. connect string" mandatory:"yes"`
	OriginalScheme string
}

func (NetezzaConnectionDetails) GetMap

func (d NetezzaConnectionDetails) GetMap(m map[string]string) map[string]string

func (NetezzaConnectionDetails) GetNzgoConnectionString

func (d NetezzaConnectionDetails) GetNzgoConnectionString() (string, error)

getNzgoConnectionString will parse the connection string and convert it to the format required by nzgo library which is space separated key=value. https://pkg.go.dev/github.com/IBM/nzgo

func (NetezzaConnectionDetails) GetScheme

func (d NetezzaConnectionDetails) GetScheme() (string, error)

func (NetezzaConnectionDetails) Parse

func (d NetezzaConnectionDetails) Parse() error

func (NetezzaConnectionDetails) String

func (d NetezzaConnectionDetails) String() string

type OdbcConnector

type OdbcConnector interface {
	NewOdbcConnection(log logger.Logger, d *DsnConnectionDetails) (Connector, error)
}

type OracleConnectionDetails

type OracleConnectionDetails struct {
	DBName   string `errorTxt:"Oracle database name" mandatory:"yes"`
	DBUser   string `errorTxt:"Oracle username" mandatory:"yes"`
	DBPass   string `errorTxt:"Oracle password" mandatory:"yes"`
	DBHost   string `errorTxt:"Oracle hostname" mandatory:"yes"`
	DBPort   string `errorTxt:"Oracle port" mandatory:"yes"`
	DBParams string // param1=value1&param2=value2
	Dsn      string
}

OracleConnectionDetails is a helper type to build connection details with. user:password@host:port/sid?param1=value1&param2=value2

func OracleDsnToOracleConnectionDetails

func OracleDsnToOracleConnectionDetails(d string) (*OracleConnectionDetails, error)

func (OracleConnectionDetails) GetMap

func (d OracleConnectionDetails) GetMap(m map[string]string) map[string]string

func (OracleConnectionDetails) GetScheme

func (d OracleConnectionDetails) GetScheme() (string, error)

func (OracleConnectionDetails) Parse

func (d OracleConnectionDetails) Parse() error

func (OracleConnectionDetails) String

func (d OracleConnectionDetails) String() string

type OracleConnector

type OracleConnector interface {
	NewOracleConnection(log logger.Logger, d *DsnConnectionDetails) Connector
	OracleExecWithConnDetails(log logger.Logger, connDetails *DsnConnectionDetails, sql string) error
}

type OracleConnectorMock

type OracleConnectorMock interface {
	NewMockConnectionWithBatchWithMockTx(log logger.Logger) (Connector, chan string)
}

type OracleCqnExecutor

type OracleCqnExecutor interface {
	Execute(h types.SubscriptionHandler, query string, args []interface{}) (driver.Rows, error)
	RemoveSubscription()
	CloseCqnConnection() error
}

type Result

type Result interface {
	LastInsertId() (int64, error)
	RowsAffected() (int64, error)
}

type SqUpdateTxtBatch

type SqUpdateTxtBatch struct {
	SqlStatementGeneratorConfig // mandatory to be populated.

	ColList    []string // list of columns extracted from SqlStatementGeneratorConfig.
	KeyList    []string
	AllColList []string
	// contains filtered or unexported fields
}

Oracle-specific implementation of interface SqlStmtTxtBatcher is able to generate UPDATE statements with batches of rows supplied.

func (*SqUpdateTxtBatch) AddValuesToBatch

func (o *SqUpdateTxtBatch) AddValuesToBatch(values []interface{}) (batchIsFull bool, err error)

func (*SqUpdateTxtBatch) GetStatement

func (o *SqUpdateTxtBatch) GetStatement() string

func (*SqUpdateTxtBatch) GetValues

func (o *SqUpdateTxtBatch) GetValues() []interface{}

func (*SqUpdateTxtBatch) InitBatch

func (o *SqUpdateTxtBatch) InitBatch(batchSize int)

type SqlDeleteTxtBatch

type SqlDeleteTxtBatch struct {
	SqlStatementGeneratorConfig // mandatory to be populated.

	KeyList []string
	// contains filtered or unexported fields
}

Oracle-specific implementation of interface SqlStmtTxtBatcher is able to generate DELETE statements with batches of rows supplied.

func (*SqlDeleteTxtBatch) AddValuesToBatch

func (o *SqlDeleteTxtBatch) AddValuesToBatch(values []interface{}) (batchIsFull bool, err error)

func (*SqlDeleteTxtBatch) GetStatement

func (o *SqlDeleteTxtBatch) GetStatement() string

func (*SqlDeleteTxtBatch) GetValues

func (o *SqlDeleteTxtBatch) GetValues() []interface{}

func (*SqlDeleteTxtBatch) InitBatch

func (o *SqlDeleteTxtBatch) InitBatch(batchSize int)

type SqlInsertTxtBatch

type SqlInsertTxtBatch struct {
	SqlStatementGeneratorConfig // mandatory to be populated.

	ColList []string // list of columns extracted from SqlStatementGeneratorConfig.
	// contains filtered or unexported fields
}

Oracle-specific implementation of interface SqlStmtTxtBatcher is able to generate INSERT statements with batches of rows supplied.

func (*SqlInsertTxtBatch) AddValuesToBatch

func (o *SqlInsertTxtBatch) AddValuesToBatch(values []interface{}) (batchIsFull bool, err error)

func (*SqlInsertTxtBatch) GetStatement

func (o *SqlInsertTxtBatch) GetStatement() string

func (*SqlInsertTxtBatch) GetValues

func (o *SqlInsertTxtBatch) GetValues() []interface{}

func (*SqlInsertTxtBatch) InitBatch

func (o *SqlInsertTxtBatch) InitBatch(batchSize int)

type SqlMergeTxtBatch

type SqlMergeTxtBatch struct {
	SqlStatementGeneratorConfig // mandatory to be populated.

	AllCols   []string
	KeyCols   []string // list of columns extracted from SqlStatementGeneratorConfig.
	OtherCols []string
	// contains filtered or unexported fields
}

Oracle-specific implementation of interface SqlStmtTxtBatcher is able to generate MERGE statements with batches of rows supplied.

func (*SqlMergeTxtBatch) AddValuesToBatch

func (o *SqlMergeTxtBatch) AddValuesToBatch(values []interface{}) (batchIsFull bool, err error)

Build a SQL MERGE statement. The ordering of values is important: supply the key columns followed by all key+other columns.

func (*SqlMergeTxtBatch) GetStatement

func (o *SqlMergeTxtBatch) GetStatement() string

func (*SqlMergeTxtBatch) GetValues

func (o *SqlMergeTxtBatch) GetValues() []interface{}

func (*SqlMergeTxtBatch) InitBatch

func (o *SqlMergeTxtBatch) InitBatch(batchSize int)

type SqlResultHandler

type SqlResultHandler interface {
	HandleHeader(i []interface{}) error
	HandleRow(i []interface{}) error
}

type SqlStatementGeneratorConfig

type SqlStatementGeneratorConfig struct {
	Log             logger.Logger
	OutputSchema    string
	SchemaSeparator string
	OutputTable     string
	TargetKeyCols   *om.OrderedMap // ordered map of: key = chan field name; value = target table column name
	TargetOtherCols *om.OrderedMap // ordered map of: key = chan field name; value = target table column name
}

type SqlStmtGenerator

type SqlStmtGenerator interface {
	GetStatement() string
}

SqlStmtGenerator is used as part of SqlStmtTxtBatcher. This is implemented by:

Connector.GetDmlGenerator() DmlGenerator -> <multiple functions>.SqlStmtGenerator.
i.e. Oracle batch struct as well as other SQL text-batch structs.

type SqlStmtTxtBatcher

type SqlStmtTxtBatcher interface {
	SqlStmtGenerator
	InitBatch(batchSize int)                             // reset variables and preallocate slices for the given batch size.
	AddValuesToBatch(values []interface{}) (bool, error) // add values to SQL statement.
	GetValues() []interface{}                            // get all values added to the batch so they can be supplied as args to exec the SQL returned by getStatement().
}

SqlStmtTxtBatcher is used to combine DML statements that affect individual records into one statement, aiming to improve performance and reduce network round trips.

type StatementBatch

type StatementBatch interface {
	ExecBatch(args [][]interface{}) (Result, error)
}

StatementBatch provides ability to execute bulk SQL via relloyd/go-sql library. This is used by component NewTableSync in table-output-sync.go when the connection is using Oracle OCI array binds.

type Transacter

type Transacter interface {
	Prepare(query string) (StatementBatch, error)
	PrepareContext(ctx context.Context, query string) (StatementBatch, error)
	Exec(query string, args ...interface{}) (Result, error)
	ExecContext(ctx context.Context, query string, args ...interface{}) (Result, error)
	Commit() error
	Rollback() error
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL