components

package
v0.2.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 23, 2022 License: MIT Imports: 33 Imported by: 0

README

Transformation Components

SQL Query (Table Input)
  1. Input is a SQL statement.
  2. Output rows to a channel of map[string]interface{}, where the map keys are the SQL column names.

Image Stream Lookup

SQL Query with Arguments (Table Input)
  1. Input is a SQL statement with bind variables populated by another (prior) SQL statement.
  2. Output rows to a channel of map[string]interface{}, where the map keys are the SQL column names.

Image Stream Lookup

Stream Lookup - Join Tables A & B (1:n)
  1. Input is two SQL row channels that can join by common field(s), where the cardinality of parent-child data is 1:n. Take the child rows and build a []map[string]interface{} before adding it to the parent row map.
  2. The channels must be sorted by the common join field(s) for this to work. E.g. use SQL order by to achieve this.
  3. Output results to a channel of map[string]interface{} so further processing can be performed by other steps.
  4. The map keys are the column names and these are case sensitive.

Image Stream Lookup

Table Diff / Merge Diff
  1. Input is two channels containing an ordered stream of records of type map[string]interface{}: one with old data, one with new data. Use the table input steps above as input.
  2. Output map[string]interface{} per row with an added flag field showing whether a record is NEW, CHANGED or DELETED or IDENTICAL. This output can feed into the Table Sync or Merge step below. Output of IDENTICAL rows is optional.

Image Merge Diff

Table Sync (Table Output)
  1. Input is one channel of records containing both table data fields and a flag field from the Table Diff / Merge Diff step above.
  2. Output is database writes for the field changes to a RDBMS table where NEW rows cause INSERTs, CHANGED rows cause UPDATEs and DELETED rows cause DELETEs. IDENTICAL rows are ignored. Transaction size is configurable.

Image Stream Lookup

Table Merge (Table Output)
  1. Input is one channel of records containing table data fields. E.g. Data from the Table Input step above.
  2. Output is SQL MERGE statements executed.

Image Stream Lookup

Documentation

Index

Constants

This section is empty.

Variables

View Source
var Defaults = struct {
	ChanField4CSVFileName           string
	ChanField4FileName              string // the default map key that contains the file names found in the S3 bucket, used by input and output Channels.
	ChanField4FileNameWithoutPrefix string // the default map key that contains the file names found in the S3 bucket, used by input and output Channels.
	ChanField4BucketName            string // the default map key that contains the bucket name, used by input and output Channels.
	ChanField4BucketPrefix          string // the default map key that contains the bucket prefix, used by input and output Channels.
	ChanField4BucketRegion          string // the default map key that contains the bucket region, used by input and output Channels.
	ChanField4StageName             string // the default map key that contains the Snowflake stage name, used by input and output Channels.
	ChanField4TableName             string // the default map key that contains the Snowflake table name, used by input and output Channels.
}{
	ChanField4CSVFileName:           "#CSVFileName",
	ChanField4FileName:              "#DataFileName",
	ChanField4FileNameWithoutPrefix: "#DataFileNameWithoutPrefix",
	ChanField4BucketName:            "#BucketName",
	ChanField4BucketPrefix:          "#BucketPrefix",
	ChanField4BucketRegion:          "#BucketRegion",
	ChanField4StageName:             "#SnowflakeStageName",
	ChanField4TableName:             "#SnowflakeTargetTableName",
}

Default field names are used by components to know the names of input and output fields.

Functions

func GetSqlSliceSnowflakeCopyInto

func GetSqlSliceSnowflakeCopyInto(schemaTableName rdbms.SchemaTable, stageName string, fileName string, force bool) []string

GetSqlSliceSnowflakeCopyInto generates SQL to copy data from the supplied Snowflake STAGE/fileName into the given tableName.

func GetSqlSliceSnowflakeMerge

func GetSqlSliceSnowflakeMerge(cfg *SnowflakeMergeSqlConfig,
) func(schemaTableName rdbms.SchemaTable, stageName string, fileName string, force bool) []string

GetSqlSliceSnowflakeSyncTable will return a slice of SQL statements required to sync data from a given fileName into the target table where

func GetSqlSliceSnowflakeSyncTable

func GetSqlSliceSnowflakeSyncTable(cfg *SnowflakeSyncSqlConfig,
) func(schemaTableName rdbms.SchemaTable, stageName string, fileName string, force bool) []string

GetSqlSliceSnowflakeSyncTable will return a slice of SQL statements required to sync data from a given fileName into the target table where

func NewCartNIterator

func NewCartNIterator(sizes []int) *cartNIterator

func NewChannelBridge

func NewChannelBridge(i interface{}) (inputChan chan chan stream.Record, outputChan chan stream.Record)

NewChannelBridge returns an input channel upon which it waits to be given a chan StreamRecordIface to read from. Once it receives a channel, it will forward the rows onto the output channel. After all rows are passed on, it waits for a new input channel again. This allows us to keep outputChan open while our input changes over time. For example: we have a transform that supplies data to us, and we want to take action when the first input chan is closed. We can keep outputChan open and wait for a new channel to be given to us for more of the same work. The use case is: writing a manifest file once the first input chan has closed, but still remaining open to write another manifest file once the next input chan is closed. Close inputChan to get this goroutine to end.

func NewChannelCombiner

func NewChannelCombiner(i interface{}) (outputChan chan stream.Record, controlChan chan ControlAction)

NewChannelCombiner will accept 2 input channels and collect all rows onto the outputChan. TODO: consider using reflect.SelectCase to allow N channels as input (it would be slower).

func NewCopyFilesToS3

func NewCopyFilesToS3(i interface{}) (outputChan chan stream.Record, controlChan chan ControlAction)

NewCopyFilesToS3 copies os files to S3. This passes InputChan rows to outputChan. It does not currently add details of the S3 bucket to the output row.

func NewCqnWithArgs

func NewCqnWithArgs(i interface{}) (mainOutputChan chan stream.Record, mainControlChan chan ControlAction)

NewCqnWithArgs registers a Continuous Query Notification with the Oracle database connection provided. The SrcSqlQuery query is executed by the CQN and the results are compared to the data found in target table using SQL statement TgtSqlQuery and the target database connection TgtDBConnector. Then the following process is followed: 1) The target table is synchronised, and the changes to the target table are output by this step's mainOutputChan. While the results are being synchronised to the target, all CQN row change events are buffered. 2) Once the initial source data set has been synced to the target, the buffered rows will be processed by a similar comparison and the target kept up to date. If a CQN change event shows that the full source table or query has been invalidated, then all rows are compared between source and target as per (1) above, before the process continues at step (2) above.

func NewCsvFileWriter

func NewCsvFileWriter(i interface{}) (outputChan chan stream.Record, controlChan chan ControlAction)

NewCsvFileWriter will dump cfg.InputChan to a CSV with spec defined in cfg. The CSV header must be specified for this func to pull out the map keys from the input chan in the correct order. outputChan contains the CSV file names produced (it does not pass input records to the output yet).

func NewDateRangeGenerator

func NewDateRangeGenerator(i interface{}) (outputChan chan stream.Record, controlChan chan ControlAction)

NewDateRangeGenerator will: Read the input chan to get the FromDate(s). Figure out the ToDate (it could be "now"). Calculate num intervals between FromDate and ToDate using the interval size in seconds. Output N rows of type date. Ensure daylight savings are accounted for.

func NewFieldMapper

func NewFieldMapper(i interface{}) (outputChan chan stream.Record, controlChan chan ControlAction)

NewFieldMapper uses FieldMapperConfig to map fields in records read from InputChan. Supply a slice of map step actions in cfg.Steps, where: Steps.Type is one of the entries in mapFieldMappers to lookup a map function. Steps.Data is a map of further config values to supply to the chosen map function.

func NewFilterRows

func NewFilterRows(i interface{}) (outputChan chan stream.Record, controlChan chan ControlAction)

NewFilterRows accepts a FilterRowsConfig{} and outputs rows if they match the given filter.

func NewGenerateRows

func NewGenerateRows(i interface{}) (outputChan chan stream.Record, controlChan chan ControlAction)

func NewManifestWriter

func NewManifestWriter(i interface{}) (outputChan chan stream.Record, controlChan chan ControlAction)

NewManifestWriter is expected to be used after CSV file generation. It expects one or more file names on the input channel field specified by InputChanField4FilePath. It writes a single manifest (CSV txt) file to the output directory specified containing each of the input file names. It produces a single record on outputChan with fields outputDir and manifestFileName. The manifest is only written and filename sent on the output channel once the input channel for this step is closed.

func NewMergeDiff

func NewMergeDiff(i interface{}) (chan stream.Record, chan ControlAction)

Produce an output channel (chanOutput) of records based on the map[string]interface{} data found in chanOld and chanNew. A new column is added to chanOutput (with the key name specified in resultsFlagKeyName) per record to show the merge-diff results as follows, where the value on the new column is one of:

N == new record found on chanNew that is not on chanOld (chanOutput contains the row from the chanNew rowset so you can do a database INSERT)
C == changes found to the record on chanOld compared to chanNew (chanOutput contains row from the chanNew rowset so you can do a database UPDATE)
D == record not found on chanNew (chanOutput contains row from the chanOld rowset so you have the key required to perform a database DELETE)
I == records are identical for compareKeyMap columns (chanOutput contains the row from the chanNew rowset)

NOTE that input channel records MUST be pre-sorted by the key fields for this to work! NOTE that the output channel (chanOutput) is closed by this function when it is done.

Here's how it works:

To perform the comparison, two input ordered_maps are required:

  1. joinKeys specifies the keys in chanOld & chanNew that should be compared in order to join the records from chanOld and chanNew. Key names are case-sensitive!
  2. compareKeys specifies the columns in chanOld & chanNew that are used for data comparison once the join keys match.

It is expected that each comparable column is of the same type, where supported types are: int and string for now.

Ordered maps are used as the comparison process exits early when inequality is found, so this can be used to speed up comparison if the user knows which keys to prioritise.

This function can be used to determine what action should be taken to sync a target database table (assume target table data is on chanOld) with the contents from a source table (assume source table data on chanNew).

Here's how to use this step output:

1) fill chanOld with all rows from a target table (table to be compared/updated) 2) fill chanNew with data from a source table (reference data) 3) feed chanOutput to a database table sync step where:

N rows are INSERTed into the target table (chanOutput contains row from chanNew rowset
C rows are UPDATEd into the target table
D rows are DELETEd from the target table
I rows are identical for fields in compareKeyMap and these can be ignored

TODO: add flag to disable output of rows that are found to be unchanged i.e. resultValueIdentical. TODO: handle multiple mergeDiffResult column names by appending _1 etc. TODO: add handling of nested maps

func NewMergeNChannels

func NewMergeNChannels(i interface{}) (outputChan chan stream.Record, controlChan chan ControlAction)

NewChannelMerge will consume all records from InputChan2 into memory and then add use them to profield curValues from those records to all records on InputChan1 producing a cartesian product. Each combination of then send the merged record to outputChan.

func NewNumberRangeGenerator

func NewNumberRangeGenerator(i interface{}) (outputChan chan stream.Record, controlChan chan ControlAction)

NewNumberRangeGenerator will: Read the input chan to get the LowNum and HighNum, per input row. Calculate number of intervals between LowNum and HighNum using the interval size. Output N rows with low and high values of type int.

func NewS3BucketList

func NewS3BucketList(i interface{}) (outputChan chan stream.Record, controlChan chan ControlAction)

NewS3BucketList fetches the list of objects from the given S3 bucket and produces records onto the output channel where each record on the channel has: map key name = ChanField4FileName (or the default mentioned above) map value = the file name found in S3 TODO: add a test for filtering by filename prefix (not bucket prefix)

func NewS3ManifestReader

func NewS3ManifestReader(i interface{}) (outputChan chan stream.Record, controlChan chan ControlAction)

NewManifestReader will open manifest files expected to be found on the S3 bucket specified and output the contents of the manifest files to outputChan.

func NewSnowflakeLoader

func NewSnowflakeLoader(i interface{}) (outputChan chan stream.Record, controlChan chan ControlAction)

NewSnowflakeLoader reads the input channel of records expecting it to contain the following: 1) the data file name on S3 that exists via... 2) the Snowflake "stage" set up on a known S3 bucket which contains the above data files 3) table name to load data into. This component generates and executes COPY INTO SQL statements If Use1Transaction is true, AUTOCOMMIT will be on; else it will turn AUTOCOMMIT OFF and commit once InputChan is closed. InputChan rows are copied to the outputChan.

func NewSnowflakeMerge

func NewSnowflakeMerge(i interface{}) (outputChan chan stream.Record, controlChan chan ControlAction)

func NewSnowflakeSync

func NewSnowflakeSync(i interface{}) (outputChan chan stream.Record, controlChan chan ControlAction)

func NewSqlExec

func NewSqlExec(i interface{}) (outputChan chan stream.Record, controlChan chan ControlAction)

func NewSqlQueryWithArgs

func NewSqlQueryWithArgs(i interface{}) (chan stream.Record, chan ControlAction)

Execute SQL and fetch rows onto the output channel. Use this when you have args to pass to the SQL directly. Args can be nil if you don't want to use bind variables. An alternative func is available to read args from another channel populated by its own SQL query. TODO: swap input cfg to be of type pointer to SqlQueryWithArgsConfig.

func NewSqlQueryWithInputChan

func NewSqlQueryWithInputChan(i interface{}) (chan stream.Record, chan ControlAction)

Execute SQL and fetch rows onto the output channel. SQL text is expected to use bind variables whose values are fetched from the input channel. This builds an args list from the input channel record. Only one record is expected on the input channel. Simple enhancement required to handle multiple input channel records i.e. cause multiple SQL executions.

func NewSqlQueryWithReplace

func NewSqlQueryWithReplace(i interface{}) (chan stream.Record, chan ControlAction)

NewSqlQueryWithReplace will execute SQL with args, but replace strings within the supplied SQL first.

func NewStdOutPassThrough

func NewStdOutPassThrough(i interface{}) (outputChan chan stream.Record, controlChan chan ControlAction)

NewStdOutPassThrough prints input records found on the InputChan to STDOUT and passes them on to the output channel outputChan. OutputFields may either be empty to write all fields found on the input stream, or supply a slice of field names, which must exist on the input stream. Optionally use AbortAfterCount to cause a panic after the supplied number of records has been sent. Supply an io.Writer for the records to be output to (the default launcher func uses STDOUT).

func NewTableMerge

func NewTableMerge(i interface{}) (outputChan chan stream.Record, controlChan chan ControlAction)

NewTableMerge will apply the output of a prior MergeDiff step to a target database table using SQL MERGE statements (instead of choosing the appropriate INSERT, UPDATE or DELETE).

func NewTableSync

func NewTableSync(i interface{}) (outputChan chan stream.Record, controlChan chan ControlAction)

NewTableSync can be used to apply the output of a MergeDiff step to a target database table. Records are INSERTed, UPDATEed, DELETEd accordingly based on the flag field, FlagKeyName. The TableSync component adds a zero-based integer field to the output stream that increments per commit. This helps consumers because the component releases rows as they are processed instead of after each commit. It moves the problem of whether a batch has been committed downstream though.

Types

type Action

type Action uint32
const (
	Shutdown Action = iota + 1
	Pause
	Resume
)

type ChannelBridgeConfig

type ChannelBridgeConfig struct {
	Log            logger.Logger
	Name           string
	StepWatcher    *stats.StepWatcher
	WaitCounter    ComponentWaiter
	PanicHandlerFn PanicHandlerFunc
}

type ChannelCombinerConfig

type ChannelCombinerConfig struct {
	Log            logger.Logger
	Name           string
	Chan1          chan stream.Record
	Chan2          chan stream.Record
	StepWatcher    *s.StepWatcher
	WaitCounter    ComponentWaiter
	PanicHandlerFn PanicHandlerFunc
}

type ColumnType

type ColumnType struct {
	// contains filtered or unexported fields
}

ColumnType contains the name and type of a column.

type ComponentStep

type ComponentStep struct {
	Type string            `json:"type" errorTxt:"step type" mandatory:"yes"`
	Data map[string]string `json:"data" errorTxt:"step data" mandatory:"yes"`
}

ComponentStep is a generic holder for FieldMapper config. TODO: can we use Type specific structs instead with some clever un-marshalling?

type ComponentWaiter

type ComponentWaiter interface {
	Add()
	Done()
}

ComponentWaiter is a simple interface for use around a wait group.

type ControlAction

type ControlAction struct {
	Action       Action
	ResponseChan chan error // channel to send a response channel on.
}

ControlAction is used to communicate with components.

type CopyFilesToS3Config

type CopyFilesToS3Config struct {
	Log               logger.Logger
	Name              string
	InputChan         chan stream.Record // the input channel of rows containing files (with full paths) to copy/move to S3.
	FileNameChanField string             // name of the field in InputChan that contains the files to move.
	BucketName        string             // target bucket
	BucketPrefix      string
	Region            string
	RemoveInputFiles  bool // true to delete the input files after successful copy to s3.
	StepWatcher       *stats.StepWatcher
	WaitCounter       ComponentWaiter
	PanicHandlerFn    PanicHandlerFunc
}

type CqnDownstreamSnowflakeSync

type CqnDownstreamSnowflakeSync struct {
	// Snowflake specific downstream components:
	// For CsvFileWriterConfig
	CsvFileNamePrefix string
	CsvHeaderFields   []string // the slice of key names to be found in InputChan that will be used as the CSV header.
	CsvMaxFileRows    int
	CsvMaxFileBytes   int
	// For CopyFilesToS3Config
	BucketPrefix string
	BucketName   string
	Region       string
	// For SnowflakeLoaderConfig
	StageName string
}

CqnDownstreamSnowflakeSync implements CqnDownstreamSyncer.

func (*CqnDownstreamSnowflakeSync) GetDownstreamSyncFunc

func (s *CqnDownstreamSnowflakeSync) GetDownstreamSyncFunc(cfg *CqnWithArgsConfig) func(ctx context.Context, inputChan chan stream.Record, tgtSqlQuery string) (outputChan chan stream.Record, controlChan chan ControlAction)

getStartNewSnowflakeTableSyncFunc returns a function that can be used to create all components required to start a Snowflake table sync process. The function that is returned accepts an inputChan that contains records that will be synced to Snowflake. It also accepts a context that can must closed once outputChan is complete. The reason for this is that this function creates multiple components and wraps their individual control channels in one that is returned i.e. controlChan. Once the components have ended, there will be nothing to respond on the control channels. This function launches a goroutine to wait for any shutdown messages and propagate them to the components. We need to be able to tell that goroutine to exit. TODO: add tests for CqnDownstreamSnowflakeSync -> GetDownstreamSyncFunc().

type CqnDownstreamSyncer

type CqnDownstreamSyncer interface {
	GetDownstreamSyncFunc(cfg *CqnWithArgsConfig) func(ctx context.Context, inputChan chan stream.Record, tgtSqlQuery string) (outputChan chan stream.Record, controlChan chan ControlAction)
}

type CqnDownstreamTableSync

type CqnDownstreamTableSync struct {
	BatchSize int // commit interval for internal TableSync components
}

CqnDownstreamTableSync implements CqnDownstreamSyncer.

func (*CqnDownstreamTableSync) GetDownstreamSyncFunc

func (s *CqnDownstreamTableSync) GetDownstreamSyncFunc(cfg *CqnWithArgsConfig) func(ctx context.Context, inputChan chan stream.Record, tgtSqlQuery string) (outputChan chan stream.Record, controlChan chan ControlAction)

getStartNewTableSyncFunc returns a simple function that can be used to create a new TableSync component with pre-configured config. The returned function when called just needs to be given an inputChan upon which the output of a MergeDiff is expected. I.e. rows to sync arrive on inputChan.

type CqnWithArgsConfig

type CqnWithArgsConfig struct {
	Log                  logger.Logger
	Name                 string                   // component instance name for better logging
	SrcCqnConnection     shared.OracleCqnExecutor // relloyd/go-oci8 -> CqnConn{} database connection for CQN source query
	SrcDBConnector       shared.Connector         // another connection matching the source CQN connection. Used for standard SQL queries; passed to the cqnHandler used by this component
	TgtDBConnector       shared.Connector         // target database connector
	SrcSqlQuery          string                   // SQL statement to execute CQN // we can force ORDER BY using the MergeDiffJoinKeys which are expected to be the primary keys
	TgtSqlQuery          string                   // SQL statement to fetch target table data for comparison (do we need ORDER BY on this?)
	SrcRowIdKey          string                   // field names holding Oracle row ID values
	TgtRowIdKey          string                   // field names holding Oracle row ID values
	TgtSchema            string                   // provided to perform a TableSync  // TODO: can we do this without need duplicate info?
	TgtTable             string                   // provided to perform a TableSync
	TgtKeyCols           *om.OrderedMap           // primary key fields to sync - ordered map of: key = chan field name; value = target table column name
	TgtOtherCols         *om.OrderedMap           // other fields to sync - ordered map of: key = chan field name; value = target table column name
	MergeDiffJoinKeys    *om.OrderedMap           // primary keys to join src and tgt result sets for comparison (map entries should be of type map[source]=target)
	MergeDiffCompareKeys *om.OrderedMap           // keys upon which to compare old vs new (tgt vs src) rows (map entries should be of type map[source]=target)
	StepWatcher          *s.StepWatcher           // optional ptr to object that can gather step stats
	WaitCounter          ComponentWaiter
	PanicHandlerFn       PanicHandlerFunc
	DownstreamHandler    CqnDownstreamSyncer
}

Oracle Continuous Query Notification Input Step.

type CsvFileWriterConfig

type CsvFileWriterConfig struct {
	Log                               logger.Logger
	Name                              string
	InputChan                         chan stream.Record // the input channel of rows to write to an output CSV file.
	OutputDir                         string             // set to empty string to use a system generated sub directory in OS temp space.
	FileNamePrefix                    string
	FileNameSuffixAppendCreationStamp bool
	FileNameSuffixDateFormat          string
	FileNameExtension                 string
	UseGzip                           bool
	MaxFileRows                       int
	MaxFileBytes                      int
	HeaderFields                      []string // the slice of key names to be found in InputChan that will be used as the CSV header.
	OutputChanField4FilePath          string   // the field on outputChan that will contain the file name.
	StepWatcher                       *s.StepWatcher
	WaitCounter                       ComponentWaiter
	PanicHandlerFn                    PanicHandlerFunc
}

type DateRangeGeneratorConfig

type DateRangeGeneratorConfig struct {
	Log                         log.Logger
	Name                        string
	InputChan                   chan stream.Record // input channel containing time.Time
	InputChanFieldName4FromDate string             // name of the field on InputChan which contains the FromDate values expected to be of type time.Time.
	InputChanFieldName4ToDate   string             // name of the field on InputChan which contains the ToDate values expected to be of type time.Time. This takes precedence over use of field ToDateRFC3339orNow.
	ToDateRFC3339orNow          string             // either supply "now" or a date in RFC3339 format which includes a time zone offset. If used, "now" will be truncated to the nearest second.
	UseUTC                      bool               // if true then the date generated by supplying "now" will be in UTC; else we expect local times.
	IntervalSizeSeconds         int                // number of seconds to split the duration between FromDate and ToDate into.
	OutputChanFieldName4LowDate string
	OutputChanFieldName4HiDate  string
	PassInputFieldsToOutput     bool
	StepWatcher                 *stats.StepWatcher
	WaitCounter                 ComponentWaiter
	PanicHandlerFn              PanicHandlerFunc
}

type FieldMapperConfig

type FieldMapperConfig struct {
	Log            logger.Logger
	Name           string
	InputChan      chan stream.Record // input channel containing time.Time
	Steps          []ComponentStep
	StepWatcher    *stats.StepWatcher
	WaitCounter    ComponentWaiter
	PanicHandlerFn PanicHandlerFunc
}

type FilterMetadata

type FilterMetadata string

type FilterRowsConfig

type FilterRowsConfig struct {
	Log            log.Logger
	Name           string
	InputChan      chan stream.Record // input channel containing time.Time
	FilterType     FilterType         // one of the keys in the filterTypes map.
	FilterMetadata FilterMetadata     // the field found in stream.StreamRecordIface data map to operate on.
	StepWatcher    *stats.StepWatcher
	WaitCounter    ComponentWaiter
	PanicHandlerFn PanicHandlerFunc
}

type FilterType

type FilterType string

type GenerateRowsConfig

type GenerateRowsConfig struct {
	Log                    logger.Logger
	Name                   string
	FieldName4Sequence     string // optional field name to hold 1-based sequence number on the outputChan.
	MapFieldNamesValuesCSV string // optional CSV string of fieldName:fieldValue tokens to use for row generation.
	NumRows                int    // number of rows to generate on outputChan.
	SleepIntervalSeconds   int    // number of seconds to sleep between emitting rows.
	StepWatcher            *stats.StepWatcher
	WaitCounter            ComponentWaiter
	PanicHandlerFn         PanicHandlerFunc
}

type ManifestWriterConfig

type ManifestWriterConfig struct {
	Log                                       logger.Logger
	Name                                      string
	InputChan                                 chan stream.Record // the input channel of rows to write to an output CSV file.
	InputChanField4FilePath                   string
	OutputDir                                 string // set to empty string to use a system generated sub directory in OS temp space.
	ManifestFileNamePrefix                    string
	ManifestFileNameSuffixAppendCreationStamp bool
	ManifestFileNameSuffixDateFormat          string // golang Time format to be appended to Prefix. If not supplied, the default value is constants.TimeFormatYearSeconds.
	ManifestFileNameExtension                 string
	OutputChanField4ManifestDir               string
	OutputChanField4ManifestName              string
	OutputChanField4ManifestFullPath          string
	StepWatcher                               *s.StepWatcher
	WaitCounter                               ComponentWaiter
	PanicHandlerFn                            PanicHandlerFunc
}

type MergeDiffConfig

type MergeDiffConfig struct {
	Log                 logger.Logger
	Name                string
	ChanOld             chan stream.Record
	ChanNew             chan stream.Record
	JoinKeys            *om.OrderedMap
	CompareKeys         *om.OrderedMap
	ResultFlagKeyName   string
	OutputIdenticalRows bool
	StepWatcher         *s.StepWatcher
	WaitCounter         ComponentWaiter
	PanicHandlerFn      PanicHandlerFunc
}

type MergeNChannelsConfig

type MergeNChannelsConfig struct {
	Log                 logger.Logger
	Name                string
	InputChannels       []chan stream.Record
	AllowFieldOverwrite bool
	StepWatcher         *stats.StepWatcher
	WaitCounter         ComponentWaiter
	PanicHandlerFn      PanicHandlerFunc
}

type MockComponentWaiter

type MockComponentWaiter struct {
	// contains filtered or unexported fields
}

func (*MockComponentWaiter) Add

func (cw *MockComponentWaiter) Add()

func (*MockComponentWaiter) Done

func (cw *MockComponentWaiter) Done()

type NumberRangeGeneratorConfig

type NumberRangeGeneratorConfig struct {
	Log                         log.Logger
	Name                        string
	InputChan                   chan stream.Record // input channel containing low and high numbers to split by IntervalSize
	InputChanFieldName4LowNum   string             // name of the field on InputChan which contains the Low value ,expected to be of type int
	InputChanFieldName4HighNum  string             // name of the field on InputChan which contains the Hi value, expected to be of type int
	IntervalSize                float64            // number of units to split the difference between LowNum and HighNum into
	OutputLeftPaddedNumZeros    int
	OutputChanFieldName4LowNum  string
	OutputChanFieldName4HighNum string
	PassInputFieldsToOutput     bool
	StepWatcher                 *stats.StepWatcher
	WaitCounter                 ComponentWaiter
	PanicHandlerFn              PanicHandlerFunc
}

type PanicHandlerFunc

type PanicHandlerFunc func()

type S3BucketListerConfig

type S3BucketListerConfig struct {
	Log                               logger.Logger
	Name                              string
	Region                            string // AWS region for the bucket.
	BucketName                        string // AWS bucket name.
	BucketPrefix                      string // AWS bucket prefix.
	ObjectNamePrefix                  string // list files where the beginning of their names matches this string (this is not the AWS bucket prefix). This is given to S3 list command and a dumb filter.
	ObjectNameRegexp                  string // used to further filter the list of files fetched using the ObjectNamePrefix.
	OutputField4FileName              string // the map key on outputChan that contains the file names found in the S3 bucket. If this is an empty string then default to value found in this package var, Defaults.
	OutputField4FileNameWithoutPrefix string
	OutputField4BucketName            string             // the map key on outputChan that contains the bucket name. If this is an empty string then default to value found in this package var, Defaults.
	OutputField4BucketPrefix          string             // the map key on outputChan that contains the bucket prefix. If this is an empty string then default to value found in this package var, Defaults.
	OutputField4BucketRegion          string             // the map key on outputChan that contains the bucket region. If this is an empty string then default to value found in this package var, Defaults.
	StepWatcher                       *stats.StepWatcher // supply a StepWatcher or nil.
	WaitCounter                       ComponentWaiter
	PanicHandlerFn                    PanicHandlerFunc
}

type S3ManifestReaderConfig

type S3ManifestReaderConfig struct {
	Log                          logger.Logger
	Name                         string
	InputChan                    chan stream.Record // the input channel of rows to write to an output CSV file.
	InputChanField4ManifestName  string             // path to manifest files (s3:// or file://)
	BucketName                   string             // bucket containing manifest files
	BucketPrefix                 string
	Region                       string
	OutputChanField4DataFileName string // outputChan field to produce file names onto
	StepWatcher                  *stats.StepWatcher
	WaitCounter                  ComponentWaiter
	PanicHandlerFn               PanicHandlerFunc
}

type SnowflakeLoaderConfig

type SnowflakeLoaderConfig struct {
	Log                     logger.Logger
	Name                    string
	InputChan               chan stream.Record
	Db                      shared.Connector        // connection to target snowflake database abstracted via interface.
	InputChanField4FileName string                  // the field name found on InputChan that contains the file name to load.
	StageName               string                  // the external stage that can access the files to load.
	TargetSchemaTableName   rdbms.SchemaTable       // the [schema.]table to load into.
	DeleteAll               bool                    // set to true to SQL DELETE all table rows before loading begins (set Use1Transaction = true for a safe reload of data).
	FnGetSnowflakeSqlSlice  SnowflakeSqlBuilderFunc // func that will be used by NewSnowflakeLoader to fetch a slice of SQL statements to execute per input row.
	CommitSequenceKeyName   string                  // the field name added by this component to the outputChan record, incremented when a batch is committed; used by downstream components - see also TableSync component.
	StepWatcher             *stats.StepWatcher
	WaitCounter             ComponentWaiter
	PanicHandlerFn          PanicHandlerFunc
}

type SnowflakeMergeConfig

type SnowflakeMergeConfig struct {
	Log                     logger.Logger
	Name                    string
	InputChan               chan stream.Record
	Db                      shared.Connector  // connection to target snowflake database abstracted via interface.
	InputChanField4FileName string            // the field name found on InputChan that contains the file name to load.
	StageName               string            // the external stage that can access the files to load.
	TargetSchemaTableName   rdbms.SchemaTable // the [schema.]table to load into.
	CommitSequenceKeyName   string            // the field name added by this component to the outputChan record, incremented when a batch is committed; used by downstream components - see also TableSync component.
	TargetKeyCols           *om.OrderedMap    // ordered map of: key = chan field name; value = target table column name
	TargetOtherCols         *om.OrderedMap    // ordered map of: key = chan field name; value = target table column name
	StepWatcher             *stats.StepWatcher
	WaitCounter             ComponentWaiter
	PanicHandlerFn          PanicHandlerFunc
}

type SnowflakeMergeSqlConfig

type SnowflakeMergeSqlConfig struct {
	TargetKeyCols   *om.OrderedMap // ordered map of: key = chan field name; value = target table column name
	TargetOtherCols *om.OrderedMap // ordered map of: key = chan field name; value = target table column name
}

type SnowflakeSqlBuilderFunc

type SnowflakeSqlBuilderFunc func(tableName rdbms.SchemaTable, stageName string, fileName string, force bool) []string

SnowflakeSqlBuilderFunc should return a slice of SQL statements for NewSnowflakeLoader to execute.

type SnowflakeSyncConfig

type SnowflakeSyncConfig struct {
	Log                     logger.Logger
	Name                    string
	InputChan               chan stream.Record
	Db                      shared.Connector  // connection to target snowflake database abstracted via interface.
	InputChanField4FileName string            // the field name found on InputChan that contains the file name to load.
	StageName               string            // the external stage that can access the files to load.
	TargetSchemaTableName   rdbms.SchemaTable // the [schema.]table to load into.
	CommitSequenceKeyName   string            // the field name added by this component to the outputChan record, incremented when a batch is committed; used by downstream components - see also TableSync component.
	TargetKeyCols           *om.OrderedMap    // ordered map of: key = chan field name; value = target table column name
	TargetOtherCols         *om.OrderedMap    // ordered map of: key = chan field name; value = target table column name
	FlagField               string
	StepWatcher             *stats.StepWatcher
	WaitCounter             ComponentWaiter
	PanicHandlerFn          PanicHandlerFunc
}

type SnowflakeSyncSqlConfig

type SnowflakeSyncSqlConfig struct {
	TargetKeyCols   *om.OrderedMap // ordered map of: key = chan field name; value = target table column name
	TargetOtherCols *om.OrderedMap // ordered map of: key = chan field name; value = target table column name
	FlagField       string
}

type SqlExecConfig

type SqlExecConfig struct {
	Log                      logger.Logger
	Name                     string
	InputChan                chan stream.Record
	SqlQueryFieldName        string
	SqlRowsAffectedFieldName string
	OutputDb                 shared.Connector
	StepWatcher              *s.StepWatcher
	WaitCounter              ComponentWaiter
	PanicHandlerFn           PanicHandlerFunc
}

type SqlQueryWithArgsConfig

type SqlQueryWithArgsConfig struct {
	Log            logger.Logger
	Name           string
	Db             shared.Connector
	StepWatcher    *s.StepWatcher // optional ptr to object that can gather step stats.
	WaitCounter    ComponentWaiter
	Sqltext        string
	Args           []interface{}
	PanicHandlerFn PanicHandlerFunc
}

type SqlQueryWithChanConfig

type SqlQueryWithChanConfig struct {
	Log             logger.Logger
	Name            string
	Db              shared.Connector
	StepWatcher     *s.StepWatcher // optional ptr to object that can gather step stats.
	WaitCounter     ComponentWaiter
	Sqltext         string
	InputChan       chan stream.Record // optional input channel from which values for bind variables are fetched. If omitted, then Sqltext must not use binds.
	InputChanFields []string           // list of field names in the input channel for which to use as bind variables.
	PanicHandlerFn  PanicHandlerFunc
}

type SqlQueryWithReplace

type SqlQueryWithReplace struct {
	Log            logger.Logger
	Name           string
	Db             shared.Connector
	StepWatcher    *s.StepWatcher // optional ptr to object that can gather step stats.
	WaitCounter    ComponentWaiter
	Sqltext        string
	Args           []interface{}
	Replacements   map[string]string
	PanicHandlerFn PanicHandlerFunc
}

type StdOutPassThroughConfig

type StdOutPassThroughConfig struct {
	Log             logger.Logger
	Name            string
	InputChan       chan stream.Record
	Writer          io.Writer // the write to output records to, usually STDOUT
	OutputFields    []string  // the list of fields to write to the Writer (leave empty for all fields)
	AbortAfterCount int64
	StepWatcher     *stats.StepWatcher
	WaitCounter     ComponentWaiter
	PanicHandlerFn  PanicHandlerFunc
}

StdOutPassThroughConfig should return a slice of SQL statements for NewSnowflakeLoader to execute.

type TableMergeConfig

type TableMergeConfig struct {
	Log                                logger.Logger // TODO: do we need to find a way to stub this out?
	Name                               string
	InputChan                          chan stream.Record // input rows to write to database table.
	OutputDb                           shared.Connector   // target database connection for writes.
	ExecBatchSize                      int                // commit interval in num rows
	CommitBatchSize                    int
	shared.SqlStatementGeneratorConfig // config for target database table
	StepWatcher                        *s.StepWatcher
	WaitCounter                        ComponentWaiter
	PanicHandlerFn                     PanicHandlerFunc
}

type TableSyncConfig

type TableSyncConfig struct {
	Log             logger.Logger
	Name            string
	InputChan       chan stream.Record // input rows to write to database table.
	OutputDb        shared.Connector   // target database connection for writes.
	CommitBatchSize int                // commit interval in num rows
	TxtBatchNumRows int                // number of rows in a single SQL statement.
	// outputRowsAfterCommit bool                 // FEATURE NOT USED YET - this component will forward rows to its outputChan as they are processed (False) or after each transaction is committed (True). The latter means that extra memory is used to buffer rows the amount of which matches the batch size before they are released downstream.
	FlagKeyName                        string // name of the key in channel inputChan that contains values "N", "C", "D" (see constants for actual values) that can be used to distinguish, "new", "changed" and "deleted" rows, which resolve to database INSERTs/UPDATEs/DELETEs respectively.
	CommitSequenceKeyName              string // the field name added by this component to the outputChan record, incremented when a batch is committed.
	shared.SqlStatementGeneratorConfig        // config for target database table
	StepWatcher                        *s.StepWatcher
	WaitCounter                        ComponentWaiter
	PanicHandlerFn                     PanicHandlerFunc
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL