Documentation
¶
Index ¶
- Variables
- func GetSqlSliceSnowflakeCopyInto(schemaTableName rdbms.SchemaTable, stageName string, fileName string, ...) []string
- func GetSqlSliceSnowflakeMerge(cfg *SnowflakeMergeSqlConfig) ...
- func GetSqlSliceSnowflakeSyncTable(cfg *SnowflakeSyncSqlConfig) ...
- func NewCartNIterator(sizes []int) *cartNIterator
- func NewChannelBridge(i interface{}) (inputChan chan chan stream.Record, outputChan chan stream.Record)
- func NewChannelCombiner(i interface{}) (outputChan chan stream.Record, controlChan chan ControlAction)
- func NewCopyFilesToS3(i interface{}) (outputChan chan stream.Record, controlChan chan ControlAction)
- func NewCqnWithArgs(i interface{}) (mainOutputChan chan stream.Record, mainControlChan chan ControlAction)
- func NewCsvFileWriter(i interface{}) (outputChan chan stream.Record, controlChan chan ControlAction)
- func NewDateRangeGenerator(i interface{}) (outputChan chan stream.Record, controlChan chan ControlAction)
- func NewFieldMapper(i interface{}) (outputChan chan stream.Record, controlChan chan ControlAction)
- func NewFilterRows(i interface{}) (outputChan chan stream.Record, controlChan chan ControlAction)
- func NewGenerateRows(i interface{}) (outputChan chan stream.Record, controlChan chan ControlAction)
- func NewManifestWriter(i interface{}) (outputChan chan stream.Record, controlChan chan ControlAction)
- func NewMergeDiff(i interface{}) (chan stream.Record, chan ControlAction)
- func NewMergeNChannels(i interface{}) (outputChan chan stream.Record, controlChan chan ControlAction)
- func NewNumberRangeGenerator(i interface{}) (outputChan chan stream.Record, controlChan chan ControlAction)
- func NewS3BucketList(i interface{}) (outputChan chan stream.Record, controlChan chan ControlAction)
- func NewS3ManifestReader(i interface{}) (outputChan chan stream.Record, controlChan chan ControlAction)
- func NewSnowflakeLoader(i interface{}) (outputChan chan stream.Record, controlChan chan ControlAction)
- func NewSnowflakeMerge(i interface{}) (outputChan chan stream.Record, controlChan chan ControlAction)
- func NewSnowflakeSync(i interface{}) (outputChan chan stream.Record, controlChan chan ControlAction)
- func NewSqlExec(i interface{}) (outputChan chan stream.Record, controlChan chan ControlAction)
- func NewSqlQueryWithArgs(i interface{}) (chan stream.Record, chan ControlAction)
- func NewSqlQueryWithInputChan(i interface{}) (chan stream.Record, chan ControlAction)
- func NewSqlQueryWithReplace(i interface{}) (chan stream.Record, chan ControlAction)
- func NewStdOutPassThrough(i interface{}) (outputChan chan stream.Record, controlChan chan ControlAction)
- func NewTableMerge(i interface{}) (outputChan chan stream.Record, controlChan chan ControlAction)
- func NewTableSync(i interface{}) (outputChan chan stream.Record, controlChan chan ControlAction)
- type Action
- type ChannelBridgeConfig
- type ChannelCombinerConfig
- type ColumnType
- type ComponentStep
- type ComponentWaiter
- type ControlAction
- type CopyFilesToS3Config
- type CqnDownstreamSnowflakeSync
- type CqnDownstreamSyncer
- type CqnDownstreamTableSync
- type CqnWithArgsConfig
- type CsvFileWriterConfig
- type DateRangeGeneratorConfig
- type FieldMapperConfig
- type FilterMetadata
- type FilterRowsConfig
- type FilterType
- type GenerateRowsConfig
- type ManifestWriterConfig
- type MergeDiffConfig
- type MergeNChannelsConfig
- type MockComponentWaiter
- type NumberRangeGeneratorConfig
- type PanicHandlerFunc
- type S3BucketListerConfig
- type S3ManifestReaderConfig
- type SnowflakeLoaderConfig
- type SnowflakeMergeConfig
- type SnowflakeMergeSqlConfig
- type SnowflakeSqlBuilderFunc
- type SnowflakeSyncConfig
- type SnowflakeSyncSqlConfig
- type SqlExecConfig
- type SqlQueryWithArgsConfig
- type SqlQueryWithChanConfig
- type SqlQueryWithReplace
- type StdOutPassThroughConfig
- type TableMergeConfig
- type TableSyncConfig
Constants ¶
This section is empty.
Variables ¶
var Defaults = struct { ChanField4CSVFileName string ChanField4FileName string // the default map key that contains the file names found in the S3 bucket, used by input and output Channels. ChanField4FileNameWithoutPrefix string // the default map key that contains the file names found in the S3 bucket, used by input and output Channels. ChanField4BucketName string // the default map key that contains the bucket name, used by input and output Channels. ChanField4BucketPrefix string // the default map key that contains the bucket prefix, used by input and output Channels. ChanField4BucketRegion string // the default map key that contains the bucket region, used by input and output Channels. ChanField4StageName string // the default map key that contains the Snowflake stage name, used by input and output Channels. ChanField4TableName string // the default map key that contains the Snowflake table name, used by input and output Channels. }{ ChanField4CSVFileName: "#CSVFileName", ChanField4FileName: "#DataFileName", ChanField4FileNameWithoutPrefix: "#DataFileNameWithoutPrefix", ChanField4BucketName: "#BucketName", ChanField4BucketPrefix: "#BucketPrefix", ChanField4BucketRegion: "#BucketRegion", ChanField4StageName: "#SnowflakeStageName", ChanField4TableName: "#SnowflakeTargetTableName", }
Default field names are used by components to know the names of input and output fields.
Functions ¶
func GetSqlSliceSnowflakeCopyInto ¶
func GetSqlSliceSnowflakeCopyInto(schemaTableName rdbms.SchemaTable, stageName string, fileName string, force bool) []string
GetSqlSliceSnowflakeCopyInto generates SQL to copy data from the supplied Snowflake STAGE/fileName into the given tableName.
func GetSqlSliceSnowflakeMerge ¶
func GetSqlSliceSnowflakeMerge(cfg *SnowflakeMergeSqlConfig, ) func(schemaTableName rdbms.SchemaTable, stageName string, fileName string, force bool) []string
GetSqlSliceSnowflakeSyncTable will return a slice of SQL statements required to sync data from a given fileName into the target table where
func GetSqlSliceSnowflakeSyncTable ¶
func GetSqlSliceSnowflakeSyncTable(cfg *SnowflakeSyncSqlConfig, ) func(schemaTableName rdbms.SchemaTable, stageName string, fileName string, force bool) []string
GetSqlSliceSnowflakeSyncTable will return a slice of SQL statements required to sync data from a given fileName into the target table where
func NewCartNIterator ¶
func NewCartNIterator(sizes []int) *cartNIterator
func NewChannelBridge ¶
func NewChannelBridge(i interface{}) (inputChan chan chan stream.Record, outputChan chan stream.Record)
NewChannelBridge returns an input channel upon which it waits to be given a chan StreamRecordIface to read from. Once it receives a channel, it will forward the rows onto the output channel. After all rows are passed on, it waits for a new input channel again. This allows us to keep outputChan open while our input changes over time. For example: we have a transform that supplies data to us, and we want to take action when the first input chan is closed. We can keep outputChan open and wait for a new channel to be given to us for more of the same work. The use case is: writing a manifest file once the first input chan has closed, but still remaining open to write another manifest file once the next input chan is closed. Close inputChan to get this goroutine to end.
func NewChannelCombiner ¶
func NewChannelCombiner(i interface{}) (outputChan chan stream.Record, controlChan chan ControlAction)
NewChannelCombiner will accept 2 input channels and collect all rows onto the outputChan. TODO: consider using reflect.SelectCase to allow N channels as input (it would be slower).
func NewCopyFilesToS3 ¶
func NewCopyFilesToS3(i interface{}) (outputChan chan stream.Record, controlChan chan ControlAction)
NewCopyFilesToS3 copies os files to S3. This passes InputChan rows to outputChan. It does not currently add details of the S3 bucket to the output row.
func NewCqnWithArgs ¶
func NewCqnWithArgs(i interface{}) (mainOutputChan chan stream.Record, mainControlChan chan ControlAction)
NewCqnWithArgs registers a Continuous Query Notification with the Oracle database connection provided. The SrcSqlQuery query is executed by the CQN and the results are compared to the data found in target table using SQL statement TgtSqlQuery and the target database connection TgtDBConnector. Then the following process is followed: 1) The target table is synchronised, and the changes to the target table are output by this step's mainOutputChan. While the results are being synchronised to the target, all CQN row change events are buffered. 2) Once the initial source data set has been synced to the target, the buffered rows will be processed by a similar comparison and the target kept up to date. If a CQN change event shows that the full source table or query has been invalidated, then all rows are compared between source and target as per (1) above, before the process continues at step (2) above.
func NewCsvFileWriter ¶
func NewCsvFileWriter(i interface{}) (outputChan chan stream.Record, controlChan chan ControlAction)
NewCsvFileWriter will dump cfg.InputChan to a CSV with spec defined in cfg. The CSV header must be specified for this func to pull out the map keys from the input chan in the correct order. outputChan contains the CSV file names produced (it does not pass input records to the output yet).
func NewDateRangeGenerator ¶
func NewDateRangeGenerator(i interface{}) (outputChan chan stream.Record, controlChan chan ControlAction)
NewDateRangeGenerator will: Read the input chan to get the FromDate(s). Figure out the ToDate (it could be "now"). Calculate num intervals between FromDate and ToDate using the interval size in seconds. Output N rows of type date. Ensure daylight savings are accounted for.
func NewFieldMapper ¶
func NewFieldMapper(i interface{}) (outputChan chan stream.Record, controlChan chan ControlAction)
NewFieldMapper uses FieldMapperConfig to map fields in records read from InputChan. Supply a slice of map step actions in cfg.Steps, where: Steps.Type is one of the entries in mapFieldMappers to lookup a map function. Steps.Data is a map of further config values to supply to the chosen map function.
func NewFilterRows ¶
func NewFilterRows(i interface{}) (outputChan chan stream.Record, controlChan chan ControlAction)
NewFilterRows accepts a FilterRowsConfig{} and outputs rows if they match the given filter.
func NewGenerateRows ¶
func NewGenerateRows(i interface{}) (outputChan chan stream.Record, controlChan chan ControlAction)
func NewManifestWriter ¶
func NewManifestWriter(i interface{}) (outputChan chan stream.Record, controlChan chan ControlAction)
NewManifestWriter is expected to be used after CSV file generation. It expects one or more file names on the input channel field specified by InputChanField4FilePath. It writes a single manifest (CSV txt) file to the output directory specified containing each of the input file names. It produces a single record on outputChan with fields outputDir and manifestFileName. The manifest is only written and filename sent on the output channel once the input channel for this step is closed.
func NewMergeDiff ¶
func NewMergeDiff(i interface{}) (chan stream.Record, chan ControlAction)
Produce an output channel (chanOutput) of records based on the map[string]interface{} data found in chanOld and chanNew. A new column is added to chanOutput (with the key name specified in resultsFlagKeyName) per record to show the merge-diff results as follows, where the value on the new column is one of:
N == new record found on chanNew that is not on chanOld (chanOutput contains the row from the chanNew rowset so you can do a database INSERT) C == changes found to the record on chanOld compared to chanNew (chanOutput contains row from the chanNew rowset so you can do a database UPDATE) D == record not found on chanNew (chanOutput contains row from the chanOld rowset so you have the key required to perform a database DELETE) I == records are identical for compareKeyMap columns (chanOutput contains the row from the chanNew rowset)
NOTE that input channel records MUST be pre-sorted by the key fields for this to work! NOTE that the output channel (chanOutput) is closed by this function when it is done.
Here's how it works:
To perform the comparison, two input ordered_maps are required:
- joinKeys specifies the keys in chanOld & chanNew that should be compared in order to join the records from chanOld and chanNew. Key names are case-sensitive!
- compareKeys specifies the columns in chanOld & chanNew that are used for data comparison once the join keys match.
It is expected that each comparable column is of the same type, where supported types are: int and string for now.
Ordered maps are used as the comparison process exits early when inequality is found, so this can be used to speed up comparison if the user knows which keys to prioritise.
This function can be used to determine what action should be taken to sync a target database table (assume target table data is on chanOld) with the contents from a source table (assume source table data on chanNew).
Here's how to use this step output:
1) fill chanOld with all rows from a target table (table to be compared/updated) 2) fill chanNew with data from a source table (reference data) 3) feed chanOutput to a database table sync step where:
N rows are INSERTed into the target table (chanOutput contains row from chanNew rowset C rows are UPDATEd into the target table D rows are DELETEd from the target table I rows are identical for fields in compareKeyMap and these can be ignored
TODO: add flag to disable output of rows that are found to be unchanged i.e. resultValueIdentical. TODO: handle multiple mergeDiffResult column names by appending _1 etc. TODO: add handling of nested maps
func NewMergeNChannels ¶
func NewMergeNChannels(i interface{}) (outputChan chan stream.Record, controlChan chan ControlAction)
NewChannelMerge will consume all records from InputChan2 into memory and then add use them to profield curValues from those records to all records on InputChan1 producing a cartesian product. Each combination of then send the merged record to outputChan.
func NewNumberRangeGenerator ¶
func NewNumberRangeGenerator(i interface{}) (outputChan chan stream.Record, controlChan chan ControlAction)
NewNumberRangeGenerator will: Read the input chan to get the LowNum and HighNum, per input row. Calculate number of intervals between LowNum and HighNum using the interval size. Output N rows with low and high values of type int.
func NewS3BucketList ¶
func NewS3BucketList(i interface{}) (outputChan chan stream.Record, controlChan chan ControlAction)
NewS3BucketList fetches the list of objects from the given S3 bucket and produces records onto the output channel where each record on the channel has: map key name = ChanField4FileName (or the default mentioned above) map value = the file name found in S3 TODO: add a test for filtering by filename prefix (not bucket prefix)
func NewS3ManifestReader ¶
func NewS3ManifestReader(i interface{}) (outputChan chan stream.Record, controlChan chan ControlAction)
NewManifestReader will open manifest files expected to be found on the S3 bucket specified and output the contents of the manifest files to outputChan.
func NewSnowflakeLoader ¶
func NewSnowflakeLoader(i interface{}) (outputChan chan stream.Record, controlChan chan ControlAction)
NewSnowflakeLoader reads the input channel of records expecting it to contain the following: 1) the data file name on S3 that exists via... 2) the Snowflake "stage" set up on a known S3 bucket which contains the above data files 3) table name to load data into. This component generates and executes COPY INTO SQL statements If Use1Transaction is true, AUTOCOMMIT will be on; else it will turn AUTOCOMMIT OFF and commit once InputChan is closed. InputChan rows are copied to the outputChan.
func NewSnowflakeMerge ¶
func NewSnowflakeMerge(i interface{}) (outputChan chan stream.Record, controlChan chan ControlAction)
func NewSnowflakeSync ¶
func NewSnowflakeSync(i interface{}) (outputChan chan stream.Record, controlChan chan ControlAction)
func NewSqlExec ¶
func NewSqlExec(i interface{}) (outputChan chan stream.Record, controlChan chan ControlAction)
func NewSqlQueryWithArgs ¶
func NewSqlQueryWithArgs(i interface{}) (chan stream.Record, chan ControlAction)
Execute SQL and fetch rows onto the output channel. Use this when you have args to pass to the SQL directly. Args can be nil if you don't want to use bind variables. An alternative func is available to read args from another channel populated by its own SQL query. TODO: swap input cfg to be of type pointer to SqlQueryWithArgsConfig.
func NewSqlQueryWithInputChan ¶
func NewSqlQueryWithInputChan(i interface{}) (chan stream.Record, chan ControlAction)
Execute SQL and fetch rows onto the output channel. SQL text is expected to use bind variables whose values are fetched from the input channel. This builds an args list from the input channel record. Only one record is expected on the input channel. Simple enhancement required to handle multiple input channel records i.e. cause multiple SQL executions.
func NewSqlQueryWithReplace ¶
func NewSqlQueryWithReplace(i interface{}) (chan stream.Record, chan ControlAction)
NewSqlQueryWithReplace will execute SQL with args, but replace strings within the supplied SQL first.
func NewStdOutPassThrough ¶
func NewStdOutPassThrough(i interface{}) (outputChan chan stream.Record, controlChan chan ControlAction)
NewStdOutPassThrough prints input records found on the InputChan to STDOUT and passes them on to the output channel outputChan. OutputFields may either be empty to write all fields found on the input stream, or supply a slice of field names, which must exist on the input stream. Optionally use AbortAfterCount to cause a panic after the supplied number of records has been sent. Supply an io.Writer for the records to be output to (the default launcher func uses STDOUT).
func NewTableMerge ¶
func NewTableMerge(i interface{}) (outputChan chan stream.Record, controlChan chan ControlAction)
NewTableMerge will apply the output of a prior MergeDiff step to a target database table using SQL MERGE statements (instead of choosing the appropriate INSERT, UPDATE or DELETE).
func NewTableSync ¶
func NewTableSync(i interface{}) (outputChan chan stream.Record, controlChan chan ControlAction)
NewTableSync can be used to apply the output of a MergeDiff step to a target database table. Records are INSERTed, UPDATEed, DELETEd accordingly based on the flag field, FlagKeyName. The TableSync component adds a zero-based integer field to the output stream that increments per commit. This helps consumers because the component releases rows as they are processed instead of after each commit. It moves the problem of whether a batch has been committed downstream though.
Types ¶
type ChannelBridgeConfig ¶
type ChannelBridgeConfig struct {
Log logger.Logger
Name string
StepWatcher *stats.StepWatcher
WaitCounter ComponentWaiter
PanicHandlerFn PanicHandlerFunc
}
type ChannelCombinerConfig ¶
type ChannelCombinerConfig struct {
Log logger.Logger
Name string
Chan1 chan stream.Record
Chan2 chan stream.Record
StepWatcher *s.StepWatcher
WaitCounter ComponentWaiter
PanicHandlerFn PanicHandlerFunc
}
type ColumnType ¶
type ColumnType struct {
// contains filtered or unexported fields
}
ColumnType contains the name and type of a column.
type ComponentStep ¶
type ComponentStep struct {
Type string `json:"type" errorTxt:"step type" mandatory:"yes"`
Data map[string]string `json:"data" errorTxt:"step data" mandatory:"yes"`
}
ComponentStep is a generic holder for FieldMapper config. TODO: can we use Type specific structs instead with some clever un-marshalling?
type ComponentWaiter ¶
type ComponentWaiter interface {
Add()
Done()
}
ComponentWaiter is a simple interface for use around a wait group.
type ControlAction ¶
type ControlAction struct {
Action Action
ResponseChan chan error // channel to send a response channel on.
}
ControlAction is used to communicate with components.
type CopyFilesToS3Config ¶
type CopyFilesToS3Config struct {
Log logger.Logger
Name string
InputChan chan stream.Record // the input channel of rows containing files (with full paths) to copy/move to S3.
FileNameChanField string // name of the field in InputChan that contains the files to move.
BucketName string // target bucket
BucketPrefix string
Region string
RemoveInputFiles bool // true to delete the input files after successful copy to s3.
StepWatcher *stats.StepWatcher
WaitCounter ComponentWaiter
PanicHandlerFn PanicHandlerFunc
}
type CqnDownstreamSnowflakeSync ¶
type CqnDownstreamSnowflakeSync struct {
// Snowflake specific downstream components:
// For CsvFileWriterConfig
CsvFileNamePrefix string
CsvHeaderFields []string // the slice of key names to be found in InputChan that will be used as the CSV header.
CsvMaxFileRows int
CsvMaxFileBytes int
// For CopyFilesToS3Config
BucketPrefix string
BucketName string
Region string
// For SnowflakeLoaderConfig
StageName string
}
CqnDownstreamSnowflakeSync implements CqnDownstreamSyncer.
func (*CqnDownstreamSnowflakeSync) GetDownstreamSyncFunc ¶
func (s *CqnDownstreamSnowflakeSync) GetDownstreamSyncFunc(cfg *CqnWithArgsConfig) func(ctx context.Context, inputChan chan stream.Record, tgtSqlQuery string) (outputChan chan stream.Record, controlChan chan ControlAction)
getStartNewSnowflakeTableSyncFunc returns a function that can be used to create all components required to start a Snowflake table sync process. The function that is returned accepts an inputChan that contains records that will be synced to Snowflake. It also accepts a context that can must closed once outputChan is complete. The reason for this is that this function creates multiple components and wraps their individual control channels in one that is returned i.e. controlChan. Once the components have ended, there will be nothing to respond on the control channels. This function launches a goroutine to wait for any shutdown messages and propagate them to the components. We need to be able to tell that goroutine to exit. TODO: add tests for CqnDownstreamSnowflakeSync -> GetDownstreamSyncFunc().
type CqnDownstreamSyncer ¶
type CqnDownstreamSyncer interface {
GetDownstreamSyncFunc(cfg *CqnWithArgsConfig) func(ctx context.Context, inputChan chan stream.Record, tgtSqlQuery string) (outputChan chan stream.Record, controlChan chan ControlAction)
}
type CqnDownstreamTableSync ¶
type CqnDownstreamTableSync struct {
BatchSize int // commit interval for internal TableSync components
}
CqnDownstreamTableSync implements CqnDownstreamSyncer.
func (*CqnDownstreamTableSync) GetDownstreamSyncFunc ¶
func (s *CqnDownstreamTableSync) GetDownstreamSyncFunc(cfg *CqnWithArgsConfig) func(ctx context.Context, inputChan chan stream.Record, tgtSqlQuery string) (outputChan chan stream.Record, controlChan chan ControlAction)
getStartNewTableSyncFunc returns a simple function that can be used to create a new TableSync component with pre-configured config. The returned function when called just needs to be given an inputChan upon which the output of a MergeDiff is expected. I.e. rows to sync arrive on inputChan.
type CqnWithArgsConfig ¶
type CqnWithArgsConfig struct {
Log logger.Logger
Name string // component instance name for better logging
SrcCqnConnection shared.OracleCqnExecutor // relloyd/go-oci8 -> CqnConn{} database connection for CQN source query
SrcDBConnector shared.Connector // another connection matching the source CQN connection. Used for standard SQL queries; passed to the cqnHandler used by this component
TgtDBConnector shared.Connector // target database connector
SrcSqlQuery string // SQL statement to execute CQN // we can force ORDER BY using the MergeDiffJoinKeys which are expected to be the primary keys
TgtSqlQuery string // SQL statement to fetch target table data for comparison (do we need ORDER BY on this?)
SrcRowIdKey string // field names holding Oracle row ID values
TgtRowIdKey string // field names holding Oracle row ID values
TgtSchema string // provided to perform a TableSync // TODO: can we do this without need duplicate info?
TgtTable string // provided to perform a TableSync
TgtKeyCols *om.OrderedMap // primary key fields to sync - ordered map of: key = chan field name; value = target table column name
TgtOtherCols *om.OrderedMap // other fields to sync - ordered map of: key = chan field name; value = target table column name
MergeDiffJoinKeys *om.OrderedMap // primary keys to join src and tgt result sets for comparison (map entries should be of type map[source]=target)
MergeDiffCompareKeys *om.OrderedMap // keys upon which to compare old vs new (tgt vs src) rows (map entries should be of type map[source]=target)
StepWatcher *s.StepWatcher // optional ptr to object that can gather step stats
WaitCounter ComponentWaiter
PanicHandlerFn PanicHandlerFunc
DownstreamHandler CqnDownstreamSyncer
}
Oracle Continuous Query Notification Input Step.
type CsvFileWriterConfig ¶
type CsvFileWriterConfig struct {
Log logger.Logger
Name string
InputChan chan stream.Record // the input channel of rows to write to an output CSV file.
OutputDir string // set to empty string to use a system generated sub directory in OS temp space.
FileNamePrefix string
FileNameSuffixAppendCreationStamp bool
FileNameSuffixDateFormat string
FileNameExtension string
UseGzip bool
MaxFileRows int
MaxFileBytes int
HeaderFields []string // the slice of key names to be found in InputChan that will be used as the CSV header.
OutputChanField4FilePath string // the field on outputChan that will contain the file name.
StepWatcher *s.StepWatcher
WaitCounter ComponentWaiter
PanicHandlerFn PanicHandlerFunc
}
type DateRangeGeneratorConfig ¶
type DateRangeGeneratorConfig struct {
Log log.Logger
Name string
InputChan chan stream.Record // input channel containing time.Time
InputChanFieldName4FromDate string // name of the field on InputChan which contains the FromDate values expected to be of type time.Time.
InputChanFieldName4ToDate string // name of the field on InputChan which contains the ToDate values expected to be of type time.Time. This takes precedence over use of field ToDateRFC3339orNow.
ToDateRFC3339orNow string // either supply "now" or a date in RFC3339 format which includes a time zone offset. If used, "now" will be truncated to the nearest second.
UseUTC bool // if true then the date generated by supplying "now" will be in UTC; else we expect local times.
IntervalSizeSeconds int // number of seconds to split the duration between FromDate and ToDate into.
OutputChanFieldName4LowDate string
OutputChanFieldName4HiDate string
PassInputFieldsToOutput bool
StepWatcher *stats.StepWatcher
WaitCounter ComponentWaiter
PanicHandlerFn PanicHandlerFunc
}
type FieldMapperConfig ¶
type FieldMapperConfig struct {
Log logger.Logger
Name string
InputChan chan stream.Record // input channel containing time.Time
Steps []ComponentStep
StepWatcher *stats.StepWatcher
WaitCounter ComponentWaiter
PanicHandlerFn PanicHandlerFunc
}
type FilterMetadata ¶
type FilterMetadata string
type FilterRowsConfig ¶
type FilterRowsConfig struct {
Log log.Logger
Name string
InputChan chan stream.Record // input channel containing time.Time
FilterType FilterType // one of the keys in the filterTypes map.
FilterMetadata FilterMetadata // the field found in stream.StreamRecordIface data map to operate on.
StepWatcher *stats.StepWatcher
WaitCounter ComponentWaiter
PanicHandlerFn PanicHandlerFunc
}
type FilterType ¶
type FilterType string
type GenerateRowsConfig ¶
type GenerateRowsConfig struct {
Log logger.Logger
Name string
FieldName4Sequence string // optional field name to hold 1-based sequence number on the outputChan.
MapFieldNamesValuesCSV string // optional CSV string of fieldName:fieldValue tokens to use for row generation.
NumRows int // number of rows to generate on outputChan.
SleepIntervalSeconds int // number of seconds to sleep between emitting rows.
StepWatcher *stats.StepWatcher
WaitCounter ComponentWaiter
PanicHandlerFn PanicHandlerFunc
}
type ManifestWriterConfig ¶
type ManifestWriterConfig struct {
Log logger.Logger
Name string
InputChan chan stream.Record // the input channel of rows to write to an output CSV file.
InputChanField4FilePath string
OutputDir string // set to empty string to use a system generated sub directory in OS temp space.
ManifestFileNamePrefix string
ManifestFileNameSuffixAppendCreationStamp bool
ManifestFileNameSuffixDateFormat string // golang Time format to be appended to Prefix. If not supplied, the default value is constants.TimeFormatYearSeconds.
ManifestFileNameExtension string
OutputChanField4ManifestDir string
OutputChanField4ManifestName string
OutputChanField4ManifestFullPath string
StepWatcher *s.StepWatcher
WaitCounter ComponentWaiter
PanicHandlerFn PanicHandlerFunc
}
type MergeDiffConfig ¶
type MergeDiffConfig struct {
Log logger.Logger
Name string
ChanOld chan stream.Record
ChanNew chan stream.Record
JoinKeys *om.OrderedMap
CompareKeys *om.OrderedMap
ResultFlagKeyName string
OutputIdenticalRows bool
StepWatcher *s.StepWatcher
WaitCounter ComponentWaiter
PanicHandlerFn PanicHandlerFunc
}
type MergeNChannelsConfig ¶
type MergeNChannelsConfig struct {
Log logger.Logger
Name string
InputChannels []chan stream.Record
AllowFieldOverwrite bool
StepWatcher *stats.StepWatcher
WaitCounter ComponentWaiter
PanicHandlerFn PanicHandlerFunc
}
type MockComponentWaiter ¶
type MockComponentWaiter struct {
// contains filtered or unexported fields
}
func (*MockComponentWaiter) Add ¶
func (cw *MockComponentWaiter) Add()
func (*MockComponentWaiter) Done ¶
func (cw *MockComponentWaiter) Done()
type NumberRangeGeneratorConfig ¶
type NumberRangeGeneratorConfig struct {
Log log.Logger
Name string
InputChan chan stream.Record // input channel containing low and high numbers to split by IntervalSize
InputChanFieldName4LowNum string // name of the field on InputChan which contains the Low value ,expected to be of type int
InputChanFieldName4HighNum string // name of the field on InputChan which contains the Hi value, expected to be of type int
IntervalSize float64 // number of units to split the difference between LowNum and HighNum into
OutputLeftPaddedNumZeros int
OutputChanFieldName4LowNum string
OutputChanFieldName4HighNum string
PassInputFieldsToOutput bool
StepWatcher *stats.StepWatcher
WaitCounter ComponentWaiter
PanicHandlerFn PanicHandlerFunc
}
type PanicHandlerFunc ¶
type PanicHandlerFunc func()
type S3BucketListerConfig ¶
type S3BucketListerConfig struct {
Log logger.Logger
Name string
Region string // AWS region for the bucket.
BucketName string // AWS bucket name.
BucketPrefix string // AWS bucket prefix.
ObjectNamePrefix string // list files where the beginning of their names matches this string (this is not the AWS bucket prefix). This is given to S3 list command and a dumb filter.
ObjectNameRegexp string // used to further filter the list of files fetched using the ObjectNamePrefix.
OutputField4FileName string // the map key on outputChan that contains the file names found in the S3 bucket. If this is an empty string then default to value found in this package var, Defaults.
OutputField4FileNameWithoutPrefix string
OutputField4BucketName string // the map key on outputChan that contains the bucket name. If this is an empty string then default to value found in this package var, Defaults.
OutputField4BucketPrefix string // the map key on outputChan that contains the bucket prefix. If this is an empty string then default to value found in this package var, Defaults.
OutputField4BucketRegion string // the map key on outputChan that contains the bucket region. If this is an empty string then default to value found in this package var, Defaults.
StepWatcher *stats.StepWatcher // supply a StepWatcher or nil.
WaitCounter ComponentWaiter
PanicHandlerFn PanicHandlerFunc
}
type S3ManifestReaderConfig ¶
type S3ManifestReaderConfig struct {
Log logger.Logger
Name string
InputChan chan stream.Record // the input channel of rows to write to an output CSV file.
InputChanField4ManifestName string // path to manifest files (s3:// or file://)
BucketName string // bucket containing manifest files
BucketPrefix string
Region string
OutputChanField4DataFileName string // outputChan field to produce file names onto
StepWatcher *stats.StepWatcher
WaitCounter ComponentWaiter
PanicHandlerFn PanicHandlerFunc
}
type SnowflakeLoaderConfig ¶
type SnowflakeLoaderConfig struct {
Log logger.Logger
Name string
InputChan chan stream.Record
Db shared.Connector // connection to target snowflake database abstracted via interface.
InputChanField4FileName string // the field name found on InputChan that contains the file name to load.
StageName string // the external stage that can access the files to load.
TargetSchemaTableName rdbms.SchemaTable // the [schema.]table to load into.
DeleteAll bool // set to true to SQL DELETE all table rows before loading begins (set Use1Transaction = true for a safe reload of data).
FnGetSnowflakeSqlSlice SnowflakeSqlBuilderFunc // func that will be used by NewSnowflakeLoader to fetch a slice of SQL statements to execute per input row.
CommitSequenceKeyName string // the field name added by this component to the outputChan record, incremented when a batch is committed; used by downstream components - see also TableSync component.
StepWatcher *stats.StepWatcher
WaitCounter ComponentWaiter
PanicHandlerFn PanicHandlerFunc
}
type SnowflakeMergeConfig ¶
type SnowflakeMergeConfig struct {
Log logger.Logger
Name string
InputChan chan stream.Record
Db shared.Connector // connection to target snowflake database abstracted via interface.
InputChanField4FileName string // the field name found on InputChan that contains the file name to load.
StageName string // the external stage that can access the files to load.
TargetSchemaTableName rdbms.SchemaTable // the [schema.]table to load into.
CommitSequenceKeyName string // the field name added by this component to the outputChan record, incremented when a batch is committed; used by downstream components - see also TableSync component.
TargetKeyCols *om.OrderedMap // ordered map of: key = chan field name; value = target table column name
TargetOtherCols *om.OrderedMap // ordered map of: key = chan field name; value = target table column name
StepWatcher *stats.StepWatcher
WaitCounter ComponentWaiter
PanicHandlerFn PanicHandlerFunc
}
type SnowflakeMergeSqlConfig ¶
type SnowflakeMergeSqlConfig struct {
TargetKeyCols *om.OrderedMap // ordered map of: key = chan field name; value = target table column name
TargetOtherCols *om.OrderedMap // ordered map of: key = chan field name; value = target table column name
}
type SnowflakeSqlBuilderFunc ¶
type SnowflakeSqlBuilderFunc func(tableName rdbms.SchemaTable, stageName string, fileName string, force bool) []string
SnowflakeSqlBuilderFunc should return a slice of SQL statements for NewSnowflakeLoader to execute.
type SnowflakeSyncConfig ¶
type SnowflakeSyncConfig struct {
Log logger.Logger
Name string
InputChan chan stream.Record
Db shared.Connector // connection to target snowflake database abstracted via interface.
InputChanField4FileName string // the field name found on InputChan that contains the file name to load.
StageName string // the external stage that can access the files to load.
TargetSchemaTableName rdbms.SchemaTable // the [schema.]table to load into.
CommitSequenceKeyName string // the field name added by this component to the outputChan record, incremented when a batch is committed; used by downstream components - see also TableSync component.
TargetKeyCols *om.OrderedMap // ordered map of: key = chan field name; value = target table column name
TargetOtherCols *om.OrderedMap // ordered map of: key = chan field name; value = target table column name
FlagField string
StepWatcher *stats.StepWatcher
WaitCounter ComponentWaiter
PanicHandlerFn PanicHandlerFunc
}
type SnowflakeSyncSqlConfig ¶
type SnowflakeSyncSqlConfig struct {
TargetKeyCols *om.OrderedMap // ordered map of: key = chan field name; value = target table column name
TargetOtherCols *om.OrderedMap // ordered map of: key = chan field name; value = target table column name
FlagField string
}
type SqlExecConfig ¶
type SqlExecConfig struct {
Log logger.Logger
Name string
InputChan chan stream.Record
SqlQueryFieldName string
SqlRowsAffectedFieldName string
OutputDb shared.Connector
StepWatcher *s.StepWatcher
WaitCounter ComponentWaiter
PanicHandlerFn PanicHandlerFunc
}
type SqlQueryWithArgsConfig ¶
type SqlQueryWithArgsConfig struct {
Log logger.Logger
Name string
Db shared.Connector
StepWatcher *s.StepWatcher // optional ptr to object that can gather step stats.
WaitCounter ComponentWaiter
Sqltext string
Args []interface{}
PanicHandlerFn PanicHandlerFunc
}
type SqlQueryWithChanConfig ¶
type SqlQueryWithChanConfig struct {
Log logger.Logger
Name string
Db shared.Connector
StepWatcher *s.StepWatcher // optional ptr to object that can gather step stats.
WaitCounter ComponentWaiter
Sqltext string
InputChan chan stream.Record // optional input channel from which values for bind variables are fetched. If omitted, then Sqltext must not use binds.
InputChanFields []string // list of field names in the input channel for which to use as bind variables.
PanicHandlerFn PanicHandlerFunc
}
type SqlQueryWithReplace ¶
type SqlQueryWithReplace struct {
Log logger.Logger
Name string
Db shared.Connector
StepWatcher *s.StepWatcher // optional ptr to object that can gather step stats.
WaitCounter ComponentWaiter
Sqltext string
Args []interface{}
Replacements map[string]string
PanicHandlerFn PanicHandlerFunc
}
type StdOutPassThroughConfig ¶
type StdOutPassThroughConfig struct {
Log logger.Logger
Name string
InputChan chan stream.Record
Writer io.Writer // the write to output records to, usually STDOUT
OutputFields []string // the list of fields to write to the Writer (leave empty for all fields)
AbortAfterCount int64
StepWatcher *stats.StepWatcher
WaitCounter ComponentWaiter
PanicHandlerFn PanicHandlerFunc
}
StdOutPassThroughConfig should return a slice of SQL statements for NewSnowflakeLoader to execute.
type TableMergeConfig ¶
type TableMergeConfig struct {
Log logger.Logger // TODO: do we need to find a way to stub this out?
Name string
InputChan chan stream.Record // input rows to write to database table.
OutputDb shared.Connector // target database connection for writes.
ExecBatchSize int // commit interval in num rows
CommitBatchSize int
shared.SqlStatementGeneratorConfig // config for target database table
StepWatcher *s.StepWatcher
WaitCounter ComponentWaiter
PanicHandlerFn PanicHandlerFunc
}
type TableSyncConfig ¶
type TableSyncConfig struct {
Log logger.Logger
Name string
InputChan chan stream.Record // input rows to write to database table.
OutputDb shared.Connector // target database connection for writes.
CommitBatchSize int // commit interval in num rows
TxtBatchNumRows int // number of rows in a single SQL statement.
// outputRowsAfterCommit bool // FEATURE NOT USED YET - this component will forward rows to its outputChan as they are processed (False) or after each transaction is committed (True). The latter means that extra memory is used to buffer rows the amount of which matches the batch size before they are released downstream.
FlagKeyName string // name of the key in channel inputChan that contains values "N", "C", "D" (see constants for actual values) that can be used to distinguish, "new", "changed" and "deleted" rows, which resolve to database INSERTs/UPDATEs/DELETEs respectively.
CommitSequenceKeyName string // the field name added by this component to the outputChan record, incremented when a batch is committed.
shared.SqlStatementGeneratorConfig // config for target database table
StepWatcher *s.StepWatcher
WaitCounter ComponentWaiter
PanicHandlerFn PanicHandlerFunc
}
Source Files
¶
- channel-bridge.go
- channel-cartesian-product.go
- channel-combiner.go
- copy-files-to-s3.go
- csv-file-writer.go
- defaults.go
- field-mapper.go
- filter-rows.go
- generate-date-range-rows.go
- generate-number-range-rows.go
- generate-rows.go
- helper.go
- interface.go
- manifest-reader.go
- manifest-writer.go
- merge-diff.go
- mock.go
- s3-bucket-list-input.go
- snowflake-loader.go
- snowflake-merge.go
- snowflake-sync.go
- sql-exec.go
- stdout-pass-through.go
- table-input-cqn-query.go
- table-input-sql-query.go
- table-output-merge.go
- table-output-sync.go
- types.go





