Documentation
¶
Index ¶
- Constants
- func NewJob(name string, steps ...Step) *jobBuilder
- func NewStep(name string, handler ...interface{}) *stepBuilder
- func Register(job Job) error
- func Restart(ctx context.Context, jobId interface{}) (int64, error)
- func RestartAsync(ctx context.Context, jobId interface{}) (int64, error)
- func SetDB(sqlDb *sql.DB)
- func SetLogger(l logs.Logger)
- func SetMaxRunningJobs(size int)
- func SetMaxRunningSteps(size int)
- func SetTransactionManager(txMgr TransactionManager)
- func Start(ctx context.Context, jobName string, params string) (int64, error)
- func StartAsync(ctx context.Context, jobName string, params string) (int64, error)
- func Stop(ctx context.Context, jobId interface{}) error
- func Unregister(job Job)
- type Aggregator
- type BatchContext
- func (ctx *BatchContext) DeepCopy() *BatchContext
- func (ctx *BatchContext) Exists(key string) bool
- func (ctx *BatchContext) Get(key string, def ...interface{}) interface{}
- func (ctx *BatchContext) GetBool(key string, def ...bool) (bool, error)
- func (ctx *BatchContext) GetInt(key string, def ...int) (int, error)
- func (ctx *BatchContext) GetInt64(key string, def ...int64) (int64, error)
- func (ctx *BatchContext) GetString(key string, def ...string) (string, error)
- func (ctx *BatchContext) MarshalJSON() ([]byte, error)
- func (ctx *BatchContext) Merge(other *BatchContext)
- func (ctx *BatchContext) Put(key string, value interface{})
- func (ctx *BatchContext) Remove(key string)
- func (ctx *BatchContext) UnmarshalJSON(b []byte) error
- type BatchError
- type ChunkContext
- type ChunkListener
- type DefaultTxManager
- type FilePath
- type Future
- type Handler
- type ItemReader
- type Job
- type JobExecution
- type JobListener
- type OpenCloser
- type PartitionListener
- type Partitioner
- type PartitionerFactory
- type Processor
- type Reader
- type Step
- type StepExecution
- type StepListener
- type Task
- type TransactionManager
- type Writer
Constants ¶
View Source
const ( ErrCodeRetry = "retry" ErrCodeStop = "stop" ErrCodeConcurrency = "concurrency" ErrCodeDbFail = "db_fail" ErrCodeGeneral = "general" )
View Source
const ( FileItemReaderHandleKey = "gobatch.FileItemReader.handle" FileItemReaderFileNameKey = "gobatch.FileItemWriter.fileName" FileItemReaderCurrentIndex = "gobatch.FileItemReader.current.index" FileItemReaderStart = "gobatch.FileItemReader.start" FileItemReaderEnd = "gobatch.FileItemReader.end" )
View Source
const ( FileItemWriterHandleKey = "gobatch.FileItemWriter.handle" FileItemWriterFileNameKey = "gobatch.FileItemWriter.fileName" )
View Source
const ( DefaultJobPoolSize = 10 DefaultStepTaskPoolSize = 1000 )
task pool
View Source
const ( ItemReaderKeyList = "gobatch.ItemReader.key.list" ItemReaderCurrentIndex = "gobatch.ItemReader.current.index" ItemReaderMaxIndex = "gobatch.ItemReader.max.index" )
View Source
const ( DefaultChunkSize = 10 DefaultPartitions = 1 DefaultMinPartitionSize = 1 DefaultMaxPartitionSize = 2147483647 )
View Source
const PartitionStepPartitionsKey = "gobatch.partitionStep.partitions"
Variables ¶
This section is empty.
Functions ¶
func SetMaxRunningJobs ¶
func SetMaxRunningJobs(size int)
func SetMaxRunningSteps ¶
func SetMaxRunningSteps(size int)
func SetTransactionManager ¶
func SetTransactionManager(txMgr TransactionManager)
func StartAsync ¶
func Unregister ¶
func Unregister(job Job)
Types ¶
type Aggregator ¶
type Aggregator interface {
//Aggregate aggregate result from all sub step executions
Aggregate(execution *StepExecution, subExecutions []*StepExecution) BatchError
}
type BatchContext ¶
type BatchContext struct {
// contains filtered or unexported fields
}
func NewBatchContext ¶
func NewBatchContext() *BatchContext
func (*BatchContext) DeepCopy ¶
func (ctx *BatchContext) DeepCopy() *BatchContext
func (*BatchContext) Exists ¶
func (ctx *BatchContext) Exists(key string) bool
func (*BatchContext) Get ¶
func (ctx *BatchContext) Get(key string, def ...interface{}) interface{}
func (*BatchContext) GetBool ¶
func (ctx *BatchContext) GetBool(key string, def ...bool) (bool, error)
func (*BatchContext) GetInt64 ¶
func (ctx *BatchContext) GetInt64(key string, def ...int64) (int64, error)
func (*BatchContext) GetString ¶
func (ctx *BatchContext) GetString(key string, def ...string) (string, error)
func (*BatchContext) MarshalJSON ¶
func (ctx *BatchContext) MarshalJSON() ([]byte, error)
func (*BatchContext) Merge ¶
func (ctx *BatchContext) Merge(other *BatchContext)
func (*BatchContext) Put ¶
func (ctx *BatchContext) Put(key string, value interface{})
func (*BatchContext) Remove ¶
func (ctx *BatchContext) Remove(key string)
func (*BatchContext) UnmarshalJSON ¶
func (ctx *BatchContext) UnmarshalJSON(b []byte) error
type BatchError ¶
func NewBatchError ¶
func NewBatchError(code string, msg string, args ...interface{}) BatchError
type ChunkContext ¶
type ChunkContext struct {
StepExecution *StepExecution
Tx interface{}
End bool
}
type ChunkListener ¶
type ChunkListener interface {
BeforeChunk(context *ChunkContext) BatchError
AfterChunk(context *ChunkContext) BatchError
OnError(context *ChunkContext, err BatchError)
}
type DefaultTxManager ¶
type DefaultTxManager struct {
// contains filtered or unexported fields
}
func (*DefaultTxManager) BeginTx ¶
func (tm *DefaultTxManager) BeginTx() (interface{}, BatchError)
func (*DefaultTxManager) Commit ¶
func (tm *DefaultTxManager) Commit(tx interface{}) BatchError
func (*DefaultTxManager) Rollback ¶
func (tm *DefaultTxManager) Rollback(tx interface{}) BatchError
type Handler ¶
type Handler interface {
Handle(execution *StepExecution) BatchError
}
type ItemReader ¶
type Job ¶
type Job interface {
Name() string
Start(ctx context.Context, execution *JobExecution) BatchError
Stop(ctx context.Context, execution *JobExecution) BatchError
GetSteps() []Step
}
type JobExecution ¶
type JobExecution struct {
JobExecutionId int64
JobInstanceId int64
JobName string
JobParams map[string]interface{}
JobStatus status.BatchStatus
StepExecutions []*StepExecution
JobContext *BatchContext
CreateTime time.Time
StartTime time.Time
EndTime time.Time
FailError BatchError
Version int64
}
func (*JobExecution) AddStepExecution ¶
func (e *JobExecution) AddStepExecution(execution *StepExecution)
type JobListener ¶
type JobListener interface {
BeforeJob(execution *JobExecution) BatchError
AfterJob(execution *JobExecution) BatchError
}
type OpenCloser ¶
type OpenCloser interface {
Open(execution *StepExecution) BatchError
Close(execution *StepExecution) BatchError
}
type PartitionListener ¶
type PartitionListener interface {
BeforePartition(execution *StepExecution) BatchError
AfterPartition(execution *StepExecution, subExecutions []*StepExecution) BatchError
OnError(execution *StepExecution, err BatchError)
}
type Partitioner ¶
type Partitioner interface {
//Partition generate sub step executions from specified step execution and partitions count
Partition(execution *StepExecution, partitions uint) ([]*StepExecution, BatchError)
//GetPartitionNames generate sub step names from specified step execution and partitions count
GetPartitionNames(execution *StepExecution, partitions uint) []string
}
type PartitionerFactory ¶
type PartitionerFactory interface {
GetPartitioner(minPartitionSize, maxPartitionSize uint) Partitioner
}
type Processor ¶
type Processor interface {
//Process process an item from reader and return a result item
Process(item interface{}, chunkCtx *ChunkContext) (interface{}, BatchError)
}
type Reader ¶
type Reader interface {
//Read each call of Read() will return a data item, if there is no more data, a nil item will be returned.
Read(chunkCtx *ChunkContext) (interface{}, BatchError)
}
type Step ¶
type Step interface {
Name() string
Exec(ctx context.Context, execution *StepExecution) BatchError
// contains filtered or unexported methods
}
Step step interface
type StepExecution ¶
type StepExecution struct {
StepExecutionId int64
StepName string
StepStatus status.BatchStatus
StepContext *BatchContext
StepContextId int64
StepExecutionContext *BatchContext
JobExecution *JobExecution
CreateTime time.Time
StartTime time.Time
EndTime time.Time
ReadCount int64
WriteCount int64
CommitCount int64
FilterCount int64
ReadSkipCount int64
WriteSkipCount int64
ProcessSkipCount int64
RollbackCount int64
FailError BatchError
LastUpdated time.Time
Version int64
}
type StepListener ¶
type StepListener interface {
BeforeStep(execution *StepExecution) BatchError
AfterStep(execution *StepExecution) BatchError
}
type Task ¶
type Task func(execution *StepExecution) BatchError
type TransactionManager ¶
type TransactionManager interface {
BeginTx() (tx interface{}, err BatchError)
Commit(tx interface{}) BatchError
Rollback(tx interface{}) BatchError
}
func NewTransactionManager ¶
func NewTransactionManager(db *sql.DB) TransactionManager
type Writer ¶
type Writer interface {
//Write write items generated by processor in a chunk
Write(items []interface{}, chunkCtx *ChunkContext) BatchError
}
Source Files
¶
Click to show internal directories.
Click to hide internal directories.

