Documentation
¶
Index ¶
- Constants
- func StartWrite(ctx context.Context, w BatchWriter, receiver plugin.RecordReceiver) (err error)
- type BaseBatchWriter
- func (b *BaseBatchWriter) BatchSize() int
- func (b *BaseBatchWriter) BatchTimeout() time.Duration
- func (b *BaseBatchWriter) BatchWrite(ctx context.Context, records []element.Record) (err error)
- func (b *BaseBatchWriter) JobID() int64
- func (b *BaseBatchWriter) TaskGroupID() int64
- func (b *BaseBatchWriter) TaskID() int64
- type BaseConfig
- func (b *BaseConfig) GetBaseTable() *database.BaseTable
- func (b *BaseConfig) GetBatchSize() int
- func (b *BaseConfig) GetBatchTimeout() time.Duration
- func (b *BaseConfig) GetColumns() (columns []dbmsreader.Column)
- func (b *BaseConfig) GetPassword() string
- func (b *BaseConfig) GetPostSQL() []string
- func (b *BaseConfig) GetPreSQL() []string
- func (b *BaseConfig) GetRetryStrategy(j schedule.RetryJudger) (schedule.RetryStrategy, error)
- func (b *BaseConfig) GetURL() string
- func (b *BaseConfig) GetUsername() string
- func (b *BaseConfig) GetWriteMode() string
- func (b *BaseConfig) IgnoreOneByOneError() bool
- type BaseDbHandler
- type BatchWriter
- type Config
- type DbHandler
- type Execer
- type Job
- func (j *Job) Destroy(ctx context.Context) (err error)
- func (j *Job) Init(ctx context.Context) (err error)
- func (j *Job) Post(ctx context.Context) (err error)
- func (j *Job) Prepare(ctx context.Context) (err error)
- func (j *Job) Split(ctx context.Context, number int) (confs []*config.JSON, err error)
- type Task
Constants ¶
const ( ExecModeNormal = "Normal" // Non-Transactional Execution ExecModeStmt = "Stmt" // prepare/exec without Transaction ExecModeTx = "Tx" // Transactional Execution ExecModeStmtTx = "StmtTx" // prepare/exec with Transaction )
Execution Mode
Variables ¶
This section is empty.
Functions ¶
func StartWrite ¶
func StartWrite(ctx context.Context, w BatchWriter, receiver plugin.RecordReceiver) (err error)
StartWrite - Begins the process of writing records to the database using the batch writer and record receiver.
Types ¶
type BaseBatchWriter ¶
type BaseBatchWriter struct {
Task *Task
// contains filtered or unexported fields
}
BaseBatchWriter - A basic implementation of a batch writer, providing the fundamental functionality for writing data in batches.
func NewBaseBatchWriter ¶
func NewBaseBatchWriter(task *Task, execMode string, opts *sql.TxOptions) *BaseBatchWriter
NewBaseBatchWriter - Creates a new instance of the basic batch writer based on the task, execution mode, and transaction options.
func (*BaseBatchWriter) BatchSize ¶
func (b *BaseBatchWriter) BatchSize() int
BatchSize - The number of records to be inserted in a single batch.
func (*BaseBatchWriter) BatchTimeout ¶
func (b *BaseBatchWriter) BatchTimeout() time.Duration
BatchTimeout - The maximum time allowed for a single batch insertion.
func (*BaseBatchWriter) BatchWrite ¶
BatchWrite - The process of writing data in batches.
func (*BaseBatchWriter) JobID ¶
func (b *BaseBatchWriter) JobID() int64
JobID - The unique identifier for a job.
func (*BaseBatchWriter) TaskGroupID ¶
func (b *BaseBatchWriter) TaskGroupID() int64
TaskGroupID - The unique identifier for a group of tasks.
func (*BaseBatchWriter) TaskID ¶
func (b *BaseBatchWriter) TaskID() int64
TaskID - The unique identifier for a specific task within a task group.
type BaseConfig ¶
type BaseConfig struct {
Username string `json:"username"` // Username
Password string `json:"password"` // Password
Column []string `json:"column"` // Column Information
Connection dbmsreader.ConnConfig `json:"connection"` // Connection Information
WriteMode string `json:"writeMode"` // Write Mode, e.g., Insert
BatchSize int `json:"batchSize"` // Batch Size for Single Write
BatchTimeout time2.Duration `json:"batchTimeout"` // Batch Timeout for Single Write
PreSQL []string `json:"preSQL"` // Prepared SQL Statement
PostSQL []string `json:"postSQL"` // Ending SQL Statement
// contains filtered or unexported fields
}
BaseConfig - Basic Relational Database Configuration for writers. Unless there are special requirements, this configuration can be used to quickly implement writers.
func NewBaseConfig ¶
func NewBaseConfig(conf *config.JSON) (c *BaseConfig, err error)
NewBaseConfig - Extract relational database configuration from the configuration file.
func (*BaseConfig) GetBaseTable ¶
func (b *BaseConfig) GetBaseTable() *database.BaseTable
GetBaseTable - Retrieve table information.
func (*BaseConfig) GetBatchSize ¶
func (b *BaseConfig) GetBatchSize() int
GetBatchSize - Retrieve the batch size for a single write.
func (*BaseConfig) GetBatchTimeout ¶
func (b *BaseConfig) GetBatchTimeout() time.Duration
GetBatchTimeout - Retrieve the batch timeout for a single write.
func (*BaseConfig) GetColumns ¶
func (b *BaseConfig) GetColumns() (columns []dbmsreader.Column)
GetColumns - Retrieve column information.
func (*BaseConfig) GetPassword ¶
func (b *BaseConfig) GetPassword() string
GetPassword - Retrieve the password.
func (*BaseConfig) GetPostSQL ¶
func (b *BaseConfig) GetPostSQL() []string
GetPostSQL - Retrieve the ending SQL statement.
func (*BaseConfig) GetPreSQL ¶
func (b *BaseConfig) GetPreSQL() []string
GetPreSQL - Retrieve the prepared SQL statement.
func (*BaseConfig) GetRetryStrategy ¶
func (b *BaseConfig) GetRetryStrategy(j schedule.RetryJudger) (schedule.RetryStrategy, error)
GetRetryStrategy - Retrieve the retry strategy.
func (*BaseConfig) GetURL ¶
func (b *BaseConfig) GetURL() string
GetURL - Retrieve the connection URL.
func (*BaseConfig) GetUsername ¶
func (b *BaseConfig) GetUsername() string
GetUsername - Retrieve the username.
func (*BaseConfig) GetWriteMode ¶
func (b *BaseConfig) GetWriteMode() string
GetWriteMode - Retrieve the write mode.
func (*BaseConfig) IgnoreOneByOneError ¶
func (b *BaseConfig) IgnoreOneByOneError() bool
IgnoreOneByOneError - Ignore individual retry errors.
type BaseDbHandler ¶
type BaseDbHandler struct {
// contains filtered or unexported fields
}
BaseDbHandler Basic Database Execution Handler Encapsulation
func NewBaseDbHandler ¶
func NewBaseDbHandler(newExecer func(name string, conf *config.JSON) (Execer, error), opts *sql.TxOptions) *BaseDbHandler
NewBaseDbHandler Create a database execution handler encapsulation using the executor function newExecer and database transaction execution options opts
func (*BaseDbHandler) Config ¶
func (d *BaseDbHandler) Config(conf *config.JSON) (Config, error)
Config Obtain relational database configuration through configuration
func (*BaseDbHandler) Execer ¶
Execer Obtain an executor through the database name and configuration
func (*BaseDbHandler) TableParam ¶
func (d *BaseDbHandler) TableParam(config Config, execer Execer) database.Parameter
TableParam Obtain table parameters through relational database configuration and executor
type BatchWriter ¶
type BatchWriter interface {
JobID() int64 // Job ID - A unique identifier for a job or task.
TaskGroupID() int64 // Task Group ID - A unique identifier for a group of tasks.
TaskID() int64 // Task ID - A unique identifier for a specific task within a task group.
BatchSize() int // Batch Size - The number of records to be written in a single batch.
BatchTimeout() time.Duration // Batch Timeout - The maximum time allowed for a single batch write operation.
BatchWrite(ctx context.Context, records []element.Record) error // Batch Write - The process of writing data in batches.
}
BatchWriter - A tool or component used for writing data in batches.
type Config ¶
type Config interface {
GetUsername() string // Get Username
GetPassword() string // Get Password
GetURL() string // Get Connection URL
GetColumns() []dbmsreader.Column // Get Column Information
GetBaseTable() *database.BaseTable // Get Table Information
GetWriteMode() string // Get Write Mode
GetBatchSize() int // Batch Size for Single Write
GetBatchTimeout() time.Duration // Batch Timeout for Single Write
GetRetryStrategy(j schedule.RetryJudger) (schedule.RetryStrategy, error) // Get Retry Strategy
IgnoreOneByOneError() bool // Ignore Individual Retry Errors
GetPreSQL() []string // Get Prepared SQL Statement
GetPostSQL() []string // Get Ending SQL Statement
}
Config - Relational Database Writer Configuration
type DbHandler ¶
type DbHandler interface {
Execer(name string, conf *config.JSON) (Execer, error) // Obtain an executor through the database name and configuration
Config(conf *config.JSON) (Config, error) // Obtain relational database configuration through configuration
TableParam(config Config, execer Execer) database.Parameter // Obtain table parameters through relational database configuration and executor
}
DbHandler Database Execution Handler Encapsulation
type Execer ¶
type Execer interface {
Table(*database.BaseTable) database.Table
// Obtain relational database configuration through configuration
PingContext(ctx context.Context) error
QueryContext(ctx context.Context, query string, args ...interface{}) (*sql.Rows, error)
// BaseDbHandler Basic Database Execution Handler Encapsulation
ExecContext(ctx context.Context, query string, args ...interface{}) (sql.Result, error)
FetchTableWithParam(ctx context.Context, param database.Parameter) (database.Table, error)
BatchExec(ctx context.Context, opts *database.ParameterOptions) (err error)
// NewBaseDbHandler Create a database execution handler encapsulation using the executor function newExecer and database transaction execution options opts
BatchExecStmt(ctx context.Context, opts *database.ParameterOptions) (err error)
BatchExecWithTx(ctx context.Context, opts *database.ParameterOptions) (err error)
BatchExecStmtWithTx(ctx context.Context, opts *database.ParameterOptions) (err error)
Close() error
}
type Job ¶
type Job struct {
*plugin.BaseJob
Handler DbHandler // Database handle
Execer Execer // Executor
// contains filtered or unexported fields
}
Job Work