Documentation
¶
Index ¶
- Variables
- func ClientDelete(serverURL string, route RouteName, m map[string]interface{}, ...) (respStr string, err error)
- func ClientGet(serverURL string, route RouteName, m map[string]interface{}, ...) (respStr string, err error)
- func ClientOptions(serverURL string, route RouteName, m map[string]interface{}, ...) (respStr string, err error)
- func ClientPatch(serverURL string, route RouteName, m map[string]interface{}, ...) (respStr string, err error)
- func ClientPost(serverURL string, route RouteName, m map[string]interface{}, ...) (respStr string, err error)
- func ClientPut(serverURL string, route RouteName, m map[string]interface{}, ...) (respStr string, err error)
- func Decode(sp *iop.StreamProcessor, decoder *encoding.Decoder, val string) (string, error)
- func GetJWTFromKey(masterServerURL, key string) (string, error)
- func LoadProject(path string)
- func ParseUUID(sp *iop.StreamProcessor, val string) (string, error)
- func SetStreamDefaults(stream *ReplicationStreamConfig, replicationCfg ReplicationConfig)
- func Sling(cfg *Config) (err error)
- type ColumnCasing
- type Config
- func (cfg *Config) DetermineType() (Type JobType, err error)
- func (cfg *Config) FormatTargetObjectName() (err error)
- func (cfg *Config) GetFormatMap() (m map[string]any, err error)
- func (cfg *Config) Marshal() (cfgBytes []byte, err error)
- func (cfg *Config) Prepare() (err error)
- func (cfg *Config) Scan(value interface{}) error
- func (cfg *Config) SetDefault()
- func (cfg *Config) Unmarshal(cfgStr string) error
- func (cfg Config) Value() (driver.Value, error)
- type ConfigOptions
- type ExecStatus
- type ExecutionStatus
- type JobType
- type Mode
- type NotificationConfig
- type ProgressBar
- type Project
- type ProjectConfig
- type ReplicationConfig
- type ReplicationStreamConfig
- type RouteName
- type Source
- type SourceOptions
- type Target
- type TargetOptions
- type TaskExecution
- func (t *TaskExecution) AddCleanupTask(f func())
- func (t *TaskExecution) Cleanup()
- func (t *TaskExecution) Execute() error
- func (t *TaskExecution) GetBytes() (inBytes, outBytes uint64)
- func (t *TaskExecution) GetBytesString() (s string)
- func (t *TaskExecution) GetCount() (count uint64)
- func (t *TaskExecution) GetRate(secWindow int) (rowRate, byteRate int64)
- func (t *TaskExecution) GetTotalBytes() (rcBytes, txBytes uint64)
- func (t *TaskExecution) IsStalled(window float64) bool
- func (t *TaskExecution) ReadFromAPI(cfg *Config, client *airbyte.Airbyte) (df *iop.Dataflow, err error)
- func (t *TaskExecution) ReadFromDB(cfg *Config, srcConn database.Connection) (df *iop.Dataflow, err error)
- func (t *TaskExecution) ReadFromFile(cfg *Config) (df *iop.Dataflow, err error)
- func (t *TaskExecution) SetProgress(progressText string, args ...interface{})
- func (t *TaskExecution) WriteToDb(cfg *Config, df *iop.Dataflow, tgtConn database.Connection) (cnt uint64, err error)
- func (t *TaskExecution) WriteToFile(cfg *Config, df *iop.Dataflow) (cnt uint64, err error)
Constants ¶
This section is empty.
Variables ¶
var AllowedProps = map[string]string{
"sheet": "Provided for Excel source files. Default is first sheet",
"range": "Optional for Excel source file. Default is largest table range",
}
AllowedProps allowed properties
var ShowProgress = true
var SourceAPIOptionsDefault = SourceOptions{ EmptyAsNull: g.Bool(true), NullIf: g.String("NULL"), DatetimeFormat: "AUTO", MaxDecimals: g.Int(9), }
var SourceDBOptionsDefault = SourceOptions{ EmptyAsNull: g.Bool(true), NullIf: g.String("NULL"), DatetimeFormat: "AUTO", MaxDecimals: g.Int(9), }
var SourceFileOptionsDefault = SourceOptions{ TrimSpace: g.Bool(false), EmptyAsNull: g.Bool(true), Header: g.Bool(true), Flatten: g.Bool(false), Compression: iop.CompressorTypePtr(iop.AutoCompressorType), NullIf: g.String("NULL"), DatetimeFormat: "AUTO", SkipBlankLines: g.Bool(false), Delimiter: ",", MaxDecimals: g.Int(9), }
var TargetAPIOptionsDefault = TargetOptions{ FileMaxRows: lo.Ternary( os.Getenv("FILE_MAX_ROWS") != "", cast.ToInt64(os.Getenv("FILE_MAX_ROWS")), 0, ), UseBulk: g.Bool(true), AddNewColumns: g.Bool(true), DatetimeFormat: "auto", MaxDecimals: g.Int(-1), }
var TargetDBOptionsDefault = TargetOptions{ FileMaxRows: lo.Ternary( os.Getenv("FILE_MAX_ROWS") != "", cast.ToInt64(os.Getenv("FILE_MAX_ROWS")), 0, ), UseBulk: g.Bool(true), AddNewColumns: g.Bool(true), DatetimeFormat: "auto", MaxDecimals: g.Int(-1), ColumnCasing: (*ColumnCasing)(g.String(string(SourceColumnCasing))), }
var TargetFileOptionsDefault = TargetOptions{ Header: g.Bool(true), Compression: lo.Ternary( os.Getenv("COMPRESSION") != "", iop.CompressorTypePtr(iop.CompressorType(os.Getenv("COMPRESSION"))), iop.CompressorTypePtr(iop.AutoCompressorType), ), Concurrency: lo.Ternary( os.Getenv("CONCURRENCY") != "", cast.ToInt(os.Getenv("CONCURRENCY")), runtime.NumCPU(), ), FileMaxRows: lo.Ternary( os.Getenv("FILE_MAX_ROWS") != "", cast.ToInt64(os.Getenv("FILE_MAX_ROWS")), 0, ), FileMaxBytes: lo.Ternary( os.Getenv("FILE_MAX_BYTES") != "", cast.ToInt64(os.Getenv("FILE_MAX_BYTES")), 0, ), Format: filesys.FileTypeCsv, UseBulk: g.Bool(true), AddNewColumns: g.Bool(true), DatetimeFormat: "auto", Delimiter: ",", MaxDecimals: g.Int(-1), ColumnCasing: (*ColumnCasing)(g.String(string(SourceColumnCasing))), }
Functions ¶
func ClientDelete ¶
func ClientDelete(serverURL string, route RouteName, m map[string]interface{}, headers map[string]string) (respStr string, err error)
ClientDelete sends a DELETE request
func ClientGet ¶
func ClientGet(serverURL string, route RouteName, m map[string]interface{}, headers map[string]string) (respStr string, err error)
ClientGet sends a GET request
func ClientOptions ¶
func ClientOptions(serverURL string, route RouteName, m map[string]interface{}, headers map[string]string) (respStr string, err error)
ClientOptions sends a HEAD request
func ClientPatch ¶
func ClientPatch(serverURL string, route RouteName, m map[string]interface{}, headers map[string]string) (respStr string, err error)
ClientPatch sends a PATCH request
func ClientPost ¶
func ClientPost(serverURL string, route RouteName, m map[string]interface{}, headers map[string]string) (respStr string, err error)
ClientPost sends a POST request
func ClientPut ¶
func ClientPut(serverURL string, route RouteName, m map[string]interface{}, headers map[string]string) (respStr string, err error)
ClientPut sends a PUT request
func GetJWTFromKey ¶
GetJWTFromKey logs in and returns the JWT based on the provided key
func LoadProject ¶
func LoadProject(path string)
func ParseUUID ¶ added in v1.0.31
func ParseUUID(sp *iop.StreamProcessor, val string) (string, error)
func SetStreamDefaults ¶ added in v0.86.36
func SetStreamDefaults(stream *ReplicationStreamConfig, replicationCfg ReplicationConfig)
Types ¶
type ColumnCasing ¶ added in v1.0.50
type ColumnCasing string
ColumnCasing is the casing method to use
const ( SourceColumnCasing ColumnCasing = "source" // keeps source column name casing. The default. TargetColumnCasing ColumnCasing = "target" // converts casing according to target database. Lower-case for files. SnakeColumnCasing ColumnCasing = "snake" // converts snake casing according to target database. Lower-case for files. )
type Config ¶
type Config struct {
Source Source `json:"source,omitempty" yaml:"source,omitempty"`
Target Target `json:"target" yaml:"target"`
Mode Mode `json:"mode,omitempty" yaml:"mode,omitempty"`
Options ConfigOptions `json:"options,omitempty" yaml:"options,omitempty"`
Env map[string]string `json:"env,omitempty" yaml:"env,omitempty"`
SrcConn connection.Connection `json:"_src_conn,omitempty" yaml:"_src_conn,omitempty"`
TgtConn connection.Connection `json:"_tgt_conn,omitempty" yaml:"_tgt_conn,omitempty"`
Prepared bool `json:"_prepared,omitempty" yaml:"_prepared,omitempty"`
IncrementalVal string `json:"-" yaml:"-"`
ReplicationMode bool `json:"-" yaml:"-"`
MetadataLoadedAt bool `json:"-" yaml:"-"`
MetadataStreamURL bool `json:"-" yaml:"-"`
}
Config is the new config struct
func (*Config) DetermineType ¶ added in v0.84.9
func (*Config) FormatTargetObjectName ¶ added in v0.85.54
func (*Config) GetFormatMap ¶ added in v1.0.31
GetFormatMap returns a map to format a string with provided with variables
type ConfigOptions ¶
type ConfigOptions struct {
Debug bool `json:"debug,omitempty" yaml:"debug,omitempty"`
StdIn bool `json:"-"` // whether stdin is passed
StdOut bool `json:"stdout,omitempty" yaml:"stdout,omitempty"` // whether to output to stdout
}
ConfigOptions are configuration options
type ExecStatus ¶
type ExecStatus string
ExecStatus is the status of an execution
const ( // ExecStatusCreated = created ExecStatusCreated ExecStatus = "created" // ExecStatusQueued = queued ExecStatusQueued ExecStatus = "queued" // ExecStatusStarted = started ExecStatusStarted ExecStatus = "started" // ExecStatusRunning = running ExecStatusRunning ExecStatus = "running" // ExecStatusSuccess = success ExecStatusSuccess ExecStatus = "success" // ExecStatusTerminated = terminated ExecStatusTerminated ExecStatus = "terminated" // ExecStatusInterrupted = interrupted ExecStatusInterrupted ExecStatus = "interrupted" // ExecStatusTimedOut = timed-out (when no heartbeat sent for 30 sec) ExecStatusTimedOut ExecStatus = "timed-out" // ExecStatusError = error ExecStatusError ExecStatus = "error" // ExecStatusSkipped = skipped ExecStatusSkipped ExecStatus = "skipped" // ExecStatusStalled = stalled (when still heartbeating, but rows are unchanged for a while) ExecStatusStalled ExecStatus = "stalled" )
func (ExecStatus) IsFailure ¶
func (s ExecStatus) IsFailure() bool
IsFailure returns true if an execution is failed
func (ExecStatus) IsFinished ¶
func (s ExecStatus) IsFinished() bool
IsFinished returns true if an execution is finished
func (ExecStatus) IsRunning ¶
func (s ExecStatus) IsRunning() bool
IsRunning returns true if an execution is running
func (ExecStatus) IsSuccess ¶
func (s ExecStatus) IsSuccess() bool
IsSuccess returns true if an execution is successful
type ExecutionStatus ¶
type ExecutionStatus struct {
JobID int `json:"job_id,omitempty"`
ExecID int64 `json:"exec_id,omitempty"`
Status ExecStatus `json:"status,omitempty"`
Text string `json:"text,omitempty"`
Rows uint64 `json:"rows,omitempty"`
Bytes uint64 `json:"bytes,omitempty"`
Percent int `json:"percent,omitempty"`
Stalled bool `json:"stalled,omitempty"`
Duration *int `json:"duration,omitempty"`
AvgDuration int `json:"avg_duration,omitempty"`
}
ExecutionStatus is an execution status object
type JobType ¶
type JobType string
JobType is an enum type for jobs
const APIToDb JobType = "api-db"
APIToDb is from api to db
const APIToFile JobType = "api-file"
APIToFile is from api to file
const ConnTest JobType = "conn-test"
ConnTest is for a connection test
const DbSQL JobType = "db-sql"
DbSQL is for a db sql query
const DbToDb JobType = "db-db"
DbToDb is from db to db
const DbToFile JobType = "db-file"
DbToFile is from db to file
const FileToDB JobType = "file-db"
FileToDB is from db to db
const FileToFile JobType = "file-file"
FileToFile is from file to file
type NotificationConfig ¶
type NotificationConfig struct {
Name string `json:"name"`
Emails []string `json:"emails"`
Slack bool `json:"slack"`
MsTeams bool `json:"msteams"`
WebhookURLs []string `json:"webhook_urls"` // urls
OnSuccess bool `json:"on_success"`
OnFailure bool `json:"on_failure"`
OnLinger bool `json:"on_linger"`
OnEmpty bool `json:"on_empty"`
}
type ProgressBar ¶
type ProgressBar struct {
// contains filtered or unexported fields
}
func (*ProgressBar) Finish ¶
func (pb *ProgressBar) Finish()
func (*ProgressBar) SetStatus ¶
func (pb *ProgressBar) SetStatus(status string)
SetStatus sets the progress bar status
func (*ProgressBar) Start ¶
func (pb *ProgressBar) Start()
type Project ¶
type Project struct {
Config ProjectConfig
TaskConfigs map[string]Config
}
type ProjectConfig ¶
type ProjectConfig struct {
Project string `json:"project" yaml:"project"`
TaskPaths []string `json:"task-paths" yaml:"task-paths"`
Defaults map[string]interface{} `json:"defaults" yaml:"defaults"`
NotificationTags map[string]NotificationConfig `json:"notification_tags" yaml:"notification_tags"`
}
type ReplicationConfig ¶ added in v0.86.36
type ReplicationConfig struct {
Source string `json:"source,omitempty" yaml:"source,omitempty"`
Target string `json:"target,omitempty" yaml:"target,omitempty"`
Defaults ReplicationStreamConfig `json:"defaults,omitempty" yaml:"defaults,omitempty"`
Streams map[string]*ReplicationStreamConfig `json:"streams,omitempty" yaml:"streams,omitempty"`
Env map[string]any `json:"env,omitempty" yaml:"env,omitempty"`
// contains filtered or unexported fields
}
func LoadReplicationConfig ¶ added in v0.87.17
func LoadReplicationConfig(cfgPath string) (config ReplicationConfig, err error)
func UnmarshalReplication ¶ added in v0.86.39
func UnmarshalReplication(replicYAML string) (config ReplicationConfig, err error)
UnmarshalReplication converts a yaml file to a replication
func (*ReplicationConfig) ProcessWildcards ¶ added in v0.87.18
func (rd *ReplicationConfig) ProcessWildcards() (err error)
ProcessWildcards process the streams using wildcards such as `my_schema.*` or `my_schema.my_prefix_*` or `my_schema.*_my_suffix`
func (*ReplicationConfig) Scan ¶ added in v0.86.36
func (rd *ReplicationConfig) Scan(value interface{}) error
Scan scan value into Jsonb, implements sql.Scanner interface
func (ReplicationConfig) StreamsOrdered ¶ added in v1.0.6
func (rd ReplicationConfig) StreamsOrdered() []string
StreamsOrdered returns the stream names as ordered in the YAML file
type ReplicationStreamConfig ¶ added in v0.86.36
type ReplicationStreamConfig struct {
Mode Mode `json:"mode,omitempty" yaml:"mode,omitempty"`
Object string `json:"object,omitempty" yaml:"object,omitempty"`
Columns []string `json:"columns,omitempty" yaml:"columns,flow,omitempty"`
PrimaryKeyI any `json:"primary_key,omitempty" yaml:"primary_key,flow,omitempty"`
UpdateKey string `json:"update_key,omitempty" yaml:"update_key,omitempty"`
SQL string `json:"sql,omitempty" yaml:"sql,omitempty"`
Schedule *string `json:"schedule,omitempty" yaml:"schedule,omitempty"`
SourceOptions *SourceOptions `json:"source_options,omitempty" yaml:"source_options,omitempty"`
TargetOptions *TargetOptions `json:"target_options,omitempty" yaml:"target_options,omitempty"`
Disabled bool `json:"disabled,omitempty" yaml:"disabled,omitempty"`
}
func (*ReplicationStreamConfig) PrimaryKey ¶ added in v0.86.36
func (s *ReplicationStreamConfig) PrimaryKey() []string
type RouteName ¶
type RouteName string
RouteName is the name of a route
const ( RouteStatus RouteName = "/status" RouteNotice RouteName = "/notice" RouteError RouteName = "/error" RouteSignUpUser RouteName = "/sign-up" RouteUser RouteName = "/user" RouteForgotPassword RouteName = "/forgot-password" RouteResetPassword RouteName = "/reset-password" RouteLogin RouteName = "/login" RouteLogout RouteName = "/logout" RouteProxy RouteName = "/p" RouteAppIndex RouteName = "/app" RouteAppLogin RouteName = "/app/login" RouteAppLogout RouteName = "/app/logout" RouteAppAPIKey RouteName = "/app/apikey" RouteAPI RouteName = "/api/v1" RouteMasterStatus RouteName = "/api/v1/master-status" RouteMasterDBReset RouteName = "/api/v1/master-db-reset" RouteUploads RouteName = "/api/v1/uploads" RouteAPIAccounts RouteName = "/api/v1/accounts" RouteAPIProjects RouteName = "/api/v1/projects" RouteAPIKey RouteName = "/api/v1/apikey" RouteAPIUsers RouteName = "/api/v1/users" RouteAPIJobs RouteName = "/api/v1/jobs" RouteAPILogs RouteName = "/api/v1/logs" RouteAPIExecutions RouteName = "/api/v1/executions" RouteAPIConnections RouteName = "/api/v1/connections" RouteAPIConnectionTest RouteName = "/api/v1/connection-test" RouteAPIResetPassword RouteName = "/api/v1/reset-password" RouteAPIDataRequest RouteName = "/api/v1/data-request" RouteAPIWorkers RouteName = "/api/v1/workers" RouteAPISettings RouteName = "/api/v1/settings" RouteAlertLog RouteName = "/alert/log" RouteWs RouteName = "/ws" RouteWsClient RouteName = "/ws/client" RouteWsWorker RouteName = "/ws/worker" )
type Source ¶
type Source struct {
Conn string `json:"conn" yaml:"conn"`
Stream string `json:"stream,omitempty" yaml:"stream,omitempty"`
Columns []string `json:"columns,omitempty" yaml:"columns,omitempty"`
PrimaryKeyI any `json:"primary_key,omitempty" yaml:"primary_key,omitempty"`
UpdateKey string `json:"update_key,omitempty" yaml:"update_key,omitempty"`
Options *SourceOptions `json:"options,omitempty" yaml:"options,omitempty"`
Data map[string]interface{} `json:"data,omitempty" yaml:"data,omitempty"`
}
Source is a source of data
func (*Source) HasPrimaryKey ¶ added in v1.0.50
func (*Source) HasUpdateKey ¶ added in v1.0.50
func (*Source) PrimaryKey ¶ added in v0.84.0
type SourceOptions ¶
type SourceOptions struct {
TrimSpace *bool `json:"trim_space,omitempty" yaml:"trim_space,omitempty"`
EmptyAsNull *bool `json:"empty_as_null,omitempty" yaml:"empty_as_null,omitempty"`
Header *bool `json:"header,omitempty" yaml:"header,omitempty"`
Flatten *bool `json:"flatten,omitempty" yaml:"flatten,omitempty"`
Compression *iop.CompressorType `json:"compression,omitempty" yaml:"compression,omitempty"`
Format *filesys.FileType `json:"format,omitempty" yaml:"format,omitempty"`
NullIf *string `json:"null_if,omitempty" yaml:"null_if,omitempty"`
DatetimeFormat string `json:"datetime_format,omitempty" yaml:"datetime_format,omitempty"`
SkipBlankLines *bool `json:"skip_blank_lines,omitempty" yaml:"skip_blank_lines,omitempty"`
Delimiter string `json:"delimiter,omitempty" yaml:"delimiter,omitempty"`
MaxDecimals *int `json:"max_decimals,omitempty" yaml:"max_decimals,omitempty"`
JmesPath *string `json:"jmespath,omitempty" yaml:"jmespath,omitempty"`
Sheet *string `json:"sheet,omitempty" yaml:"sheet,omitempty"`
Range *string `json:"range,omitempty" yaml:"range,omitempty"`
Limit *int `json:"limit,omitempty" yaml:"limit,omitempty"`
Columns any `json:"columns,omitempty" yaml:"columns,omitempty"`
Transforms []string `json:"transforms,omitempty" yaml:"transforms,omitempty"`
}
SourceOptions are connection and stream processing options
func (*SourceOptions) SetDefaults ¶ added in v1.0.31
func (o *SourceOptions) SetDefaults(sourceOptions SourceOptions)
type Target ¶
type Target struct {
Conn string `json:"conn" yaml:"conn"`
Object string `json:"object,omitempty" yaml:"object,omitempty"`
Options *TargetOptions `json:"options,omitempty" yaml:"options,omitempty"`
Data map[string]interface{} `json:"data,omitempty" yaml:"data,omitempty"`
TmpTableCreated bool `json:"-" yaml:"-"`
// contains filtered or unexported fields
}
Target is a target of data
type TargetOptions ¶
type TargetOptions struct {
Header *bool `json:"header,omitempty" yaml:"header,omitempty"`
Compression *iop.CompressorType `json:"compression,omitempty" yaml:"compression,omitempty"`
Concurrency int `json:"concurrency,omitempty" yaml:"concurrency,omitempty"`
DatetimeFormat string `json:"datetime_format,omitempty" yaml:"datetime_format,omitempty"`
Delimiter string `json:"delimiter,omitempty" yaml:"delimiter,omitempty"`
FileMaxRows int64 `json:"file_max_rows,omitempty" yaml:"file_max_rows,omitempty"`
FileMaxBytes int64 `json:"file_max_bytes,omitempty" yaml:"file_max_bytes,omitempty"`
Format filesys.FileType `json:"format,omitempty" yaml:"format,omitempty"`
MaxDecimals *int `json:"max_decimals,omitempty" yaml:"max_decimals,omitempty"`
UseBulk *bool `json:"use_bulk,omitempty" yaml:"use_bulk,omitempty"`
AddNewColumns *bool `json:"add_new_columns,omitempty" yaml:"add_new_columns,omitempty"`
AdjustColumnType *bool `json:"adjust_column_type,omitempty" yaml:"adjust_column_type,omitempty"`
ColumnCasing *ColumnCasing `json:"column_casing,omitempty" yaml:"column_casing,omitempty"`
TableTmp string `json:"table_tmp,omitempty" yaml:"table_tmp,omitempty"`
TableDDL string `json:"table_ddl,omitempty" yaml:"table_ddl,omitempty"`
PreSQL string `json:"pre_sql,omitempty" yaml:"pre_sql,omitempty"`
PostSQL string `json:"post_sql,omitempty" yaml:"post_sql,omitempty"`
}
TargetOptions are target connection and stream processing options
func (*TargetOptions) SetDefaults ¶ added in v1.0.31
func (o *TargetOptions) SetDefaults(targetOptions TargetOptions)
type TaskExecution ¶
type TaskExecution struct {
ExecID int64 `json:"exec_id"`
Config *Config `json:"config"`
Type JobType `json:"type"`
Status ExecStatus `json:"status"`
Err error `json:"error"`
StartTime *time.Time `json:"start_time"`
EndTime *time.Time `json:"end_time"`
Bytes uint64 `json:"bytes"`
Context *g.Context `json:"-"`
Progress string `json:"progress"`
ProgressHist []string `json:"progress_hist"`
PBar *ProgressBar `json:"-"`
ProcStatsStart g.ProcStats `json:"-"` // process stats at beginning
// contains filtered or unexported fields
}
TaskExecution is a sling ELT task run, synonymous to an execution
func NewTask ¶
func NewTask(execID int64, cfg *Config) (t *TaskExecution)
NewTask creates a Sling task with given configuration
func (*TaskExecution) AddCleanupTask ¶ added in v0.84.3
func (t *TaskExecution) AddCleanupTask(f func())
func (*TaskExecution) Cleanup ¶ added in v0.84.3
func (t *TaskExecution) Cleanup()
func (*TaskExecution) Execute ¶
func (t *TaskExecution) Execute() error
Execute runs a Sling task. This may be a file/db to file/db transfer
func (*TaskExecution) GetBytes ¶
func (t *TaskExecution) GetBytes() (inBytes, outBytes uint64)
GetBytes return the current total of bytes processed
func (*TaskExecution) GetBytesString ¶
func (t *TaskExecution) GetBytesString() (s string)
func (*TaskExecution) GetCount ¶
func (t *TaskExecution) GetCount() (count uint64)
GetCount return the current count of rows processed
func (*TaskExecution) GetRate ¶
func (t *TaskExecution) GetRate(secWindow int) (rowRate, byteRate int64)
GetRate return the speed of flow (rows / sec and bytes / sec) secWindow is how many seconds back to measure (0 is since beginning)
func (*TaskExecution) GetTotalBytes ¶
func (t *TaskExecution) GetTotalBytes() (rcBytes, txBytes uint64)
GetTotalBytes gets the inbound/oubound bytes of the task
func (*TaskExecution) IsStalled ¶
func (t *TaskExecution) IsStalled(window float64) bool
IsStalled determines if the task has stalled (no row increment)
func (*TaskExecution) ReadFromAPI ¶
func (t *TaskExecution) ReadFromAPI(cfg *Config, client *airbyte.Airbyte) (df *iop.Dataflow, err error)
ReadFromAPI reads from a source API
func (*TaskExecution) ReadFromDB ¶
func (t *TaskExecution) ReadFromDB(cfg *Config, srcConn database.Connection) (df *iop.Dataflow, err error)
ReadFromDB reads from a source database
func (*TaskExecution) ReadFromFile ¶
func (t *TaskExecution) ReadFromFile(cfg *Config) (df *iop.Dataflow, err error)
ReadFromFile reads from a source file
func (*TaskExecution) SetProgress ¶
func (t *TaskExecution) SetProgress(progressText string, args ...interface{})
SetProgress sets the progress
func (*TaskExecution) WriteToDb ¶
func (t *TaskExecution) WriteToDb(cfg *Config, df *iop.Dataflow, tgtConn database.Connection) (cnt uint64, err error)
WriteToDb writes to a target DB create temp table load into temp table insert / incremental / replace into target table
func (*TaskExecution) WriteToFile ¶
WriteToFile writes to a target file