Documentation
¶
Index ¶
- func AddJob(ctx context.Context, req *AddJobRequest) (jobID string, err error)
- func AddJobViaHTTPRequest(ctx context.Context, workerHost string, req *AddJobRequest) (jobID string, err error)
- func NewTaskQueueWorker(service factory.ServiceFactory, opts ...OptionFunc) factory.AppServerFactory
- func RecalculateSummary(ctx context.Context)
- func RetryJob(ctx context.Context, jobID string) error
- func StopJob(ctx context.Context, jobID string) error
- func StreamAllJob(ctx context.Context, filter *Filter, streamFunc func(job *Job))
- type AddJobInputResolver
- type AddJobRequest
- type ClientSubscriber
- type ConfigResolver
- type Filter
- type Job
- type JobListResolver
- type JobStatusEnum
- type MemstatsResolver
- type MetaJobList
- type MetaTaskResolver
- type OptionFunc
- func SetAutoRemoveClientInterval(d time.Duration) OptionFunc
- func SetDashboardBanner(banner string) OptionFunc
- func SetDashboardHTTPPort(port uint16) OptionFunc
- func SetDebugMode(debugMode bool) OptionFunc
- func SetExternalWorkerHost(host string) OptionFunc
- func SetLocker(locker candiutils.Locker) OptionFunc
- func SetMaxClientSubscriber(max int) OptionFunc
- func SetMaxConcurrentAddJob(max int) OptionFunc
- func SetPersistent(p Persistent) OptionFunc
- func SetQueue(q QueueStorage) OptionFunc
- func SetTracingDashboard(host string) OptionFunc
- type Persistent
- type QueueStorage
- type RetryHistory
- type Summary
- type SummaryDetail
- type TaglineResolver
- type Task
- type TaskListResolver
- type TaskResolver
- type TaskSummary
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func AddJob ¶
func AddJob(ctx context.Context, req *AddJobRequest) (jobID string, err error)
AddJob public function for add new job in same runtime
func AddJobViaHTTPRequest ¶ added in v1.7.0
func AddJobViaHTTPRequest(ctx context.Context, workerHost string, req *AddJobRequest) (jobID string, err error)
AddJobViaHTTPRequest public function for add new job via http request
func NewTaskQueueWorker ¶ added in v1.6.8
func NewTaskQueueWorker(service factory.ServiceFactory, opts ...OptionFunc) factory.AppServerFactory
NewTaskQueueWorker create new task queue worker
func RecalculateSummary ¶ added in v1.11.4
RecalculateSummary func
Types ¶
type AddJobInputResolver ¶ added in v1.11.0
AddJobInputResolver model
type AddJobRequest ¶ added in v1.11.0
type AddJobRequest struct {
TaskName string `json:"task_name"`
MaxRetry int `json:"max_retry"`
Args []byte `json:"args"`
RetryInterval time.Duration `json:"retry_interval"`
// contains filtered or unexported fields
}
AddJobRequest request model
func (*AddJobRequest) Validate ¶ added in v1.11.2
func (a *AddJobRequest) Validate() error
Validate method
type ClientSubscriber ¶ added in v1.10.16
type ClientSubscriber struct {
ClientID string
SubscribeList struct {
TaskDashboard bool
JobDetailID string
JobList *Filter
}
}
ClientSubscriber model
type ConfigResolver ¶ added in v1.11.6
type ConfigResolver struct {
WithPersistent bool
}
ConfigResolver resolver
type Filter ¶
type Filter struct {
Page, Limit int
Sort string
TaskName string
TaskNameList []string
Search, JobID *string
Status *string
Statuses []string
ExcludeStatus []string
ShowAll bool
ShowHistories *bool
StartDate, EndDate time.Time
}
Filter type
type Job ¶
type Job struct {
ID string `bson:"_id" json:"_id"`
TaskName string `bson:"task_name" json:"task_name"`
Arguments string `bson:"arguments" json:"arguments"`
Retries int `bson:"retries" json:"retries"`
MaxRetry int `bson:"max_retry" json:"max_retry"`
Interval string `bson:"interval" json:"interval"`
CreatedAt time.Time `bson:"created_at" json:"created_at"`
FinishedAt time.Time `bson:"finished_at" json:"finished_at"`
Status string `bson:"status" json:"status"`
Error string `bson:"error" json:"error"`
ErrorStack string `bson:"-" json:"error_stack"`
TraceID string `bson:"trace_id" json:"trace_id"`
RetryHistories []RetryHistory `bson:"retry_histories" json:"retry_histories"`
NextRetryAt string `bson:"-" json:"-"`
// contains filtered or unexported fields
}
Job model
type JobListResolver ¶
type JobListResolver struct {
Meta MetaJobList
Data []Job
}
JobListResolver resolver
type MemstatsResolver ¶ added in v1.6.12
MemstatsResolver resolver
type MetaJobList ¶ added in v1.6.8
type MetaJobList struct {
Page int
Limit int
TotalRecords int
TotalPages int
IsCloseSession bool
IsLoading bool
Detail SummaryDetail
}
MetaJobList resolver
type MetaTaskResolver ¶ added in v1.6.8
type MetaTaskResolver struct {
Page int
Limit int
TotalRecords int
TotalPages int
IsCloseSession bool
TotalClientSubscriber int
}
MetaTaskResolver meta resolver
type OptionFunc ¶ added in v1.6.8
type OptionFunc func(*option)
OptionFunc type
func SetAutoRemoveClientInterval ¶ added in v1.6.8
func SetAutoRemoveClientInterval(d time.Duration) OptionFunc
SetAutoRemoveClientInterval option func
func SetDashboardBanner ¶ added in v1.7.0
func SetDashboardBanner(banner string) OptionFunc
SetDashboardBanner option func
func SetDashboardHTTPPort ¶ added in v1.7.4
func SetDashboardHTTPPort(port uint16) OptionFunc
SetDashboardHTTPPort option func
func SetDebugMode ¶ added in v1.7.4
func SetDebugMode(debugMode bool) OptionFunc
SetDebugMode option func
func SetExternalWorkerHost ¶ added in v1.11.6
func SetExternalWorkerHost(host string) OptionFunc
SetExternalWorkerHost option func, setting worker host for add job, if not empty default using http request when add job
func SetLocker ¶ added in v1.8.8
func SetLocker(locker candiutils.Locker) OptionFunc
SetLocker option func
func SetMaxClientSubscriber ¶ added in v1.6.8
func SetMaxClientSubscriber(max int) OptionFunc
SetMaxClientSubscriber option func
func SetMaxConcurrentAddJob ¶ added in v1.11.4
func SetMaxConcurrentAddJob(max int) OptionFunc
SetMaxConcurrentAddJob option func
func SetPersistent ¶ added in v1.11.6
func SetPersistent(p Persistent) OptionFunc
SetPersistent option func
func SetTracingDashboard ¶ added in v1.10.11
func SetTracingDashboard(host string) OptionFunc
SetTracingDashboard option func
type Persistent ¶ added in v1.7.0
type Persistent interface {
SetSummary(Summary)
Summary() Summary
FindAllJob(ctx context.Context, filter *Filter) (jobs []Job)
FindJobByID(ctx context.Context, id string, excludeFields ...string) (job Job, err error)
CountAllJob(ctx context.Context, filter *Filter) int
AggregateAllTaskJob(ctx context.Context, filter *Filter) (result []TaskSummary)
SaveJob(ctx context.Context, job *Job, retryHistories ...RetryHistory)
UpdateJob(ctx context.Context, filter *Filter, updated map[string]interface{}, retryHistories ...RetryHistory) (matchedCount, affectedRow int64, err error)
CleanJob(ctx context.Context, filter *Filter) (affectedRow int64)
DeleteJob(ctx context.Context, id string) (job Job, err error)
}
Persistent abstraction
func NewMongoPersistent ¶ added in v1.7.0
func NewMongoPersistent(db *mongo.Database) Persistent
NewMongoPersistent create mongodb persistent
func NewNoopPersistent ¶ added in v1.11.6
func NewNoopPersistent() Persistent
NewNoopPersistent constructor
type QueueStorage ¶
type QueueStorage interface {
PushJob(ctx context.Context, job *Job)
PopJob(ctx context.Context, taskName string) (jobID string)
NextJob(ctx context.Context, taskName string) (jobID string)
Clear(ctx context.Context, taskName string)
}
QueueStorage abstraction for queue storage backend
func NewRedisQueue ¶
func NewRedisQueue(redisPool *redis.Pool) QueueStorage
NewRedisQueue init inmem queue
type RetryHistory ¶ added in v1.10.0
type RetryHistory struct {
ErrorStack string `bson:"error_stack" json:"error_stack"`
Status string `bson:"status" json:"status"`
Error string `bson:"error" json:"error"`
TraceID string `bson:"trace_id" json:"trace_id"`
StartAt time.Time `bson:"start_at" json:"start_at"`
EndAt time.Time `bson:"end_at" json:"end_at"`
}
RetryHistory model
type Summary ¶ added in v1.11.6
type Summary interface {
FindAllSummary(ctx context.Context, filter *Filter) (result []TaskSummary)
FindDetailSummary(ctx context.Context, taskName string) (result TaskSummary)
UpdateSummary(ctx context.Context, taskName string, updated map[string]interface{})
IncrementSummary(ctx context.Context, taskName string, incr map[string]interface{})
}
Summary abstraction
func NewInMemSummary ¶ added in v1.11.6
func NewInMemSummary() Summary
NewInMemSummary constructor, store & read summary from in memory
type SummaryDetail ¶ added in v1.11.4
type SummaryDetail struct {
Failure, Retrying, Success, Queueing, Stopped int
}
SummaryDetail type
type TaglineResolver ¶
type TaglineResolver struct {
Banner string
Tagline string
Version string
GoVersion string
StartAt string
BuildNumber string
Config ConfigResolver
TaskListClientSubscribers []string
JobListClientSubscribers []string
MemoryStatistics MemstatsResolver
}
TaglineResolver resolver
type Task ¶ added in v1.9.0
type Task struct {
// contains filtered or unexported fields
}
Task model
type TaskListResolver ¶ added in v1.6.8
type TaskListResolver struct {
Meta MetaTaskResolver
Data []TaskResolver
}
TaskListResolver resolver
type TaskResolver ¶
type TaskResolver struct {
Name string
ModuleName string
TotalJobs int
IsLoading bool
Detail SummaryDetail
}
TaskResolver resolver
type TaskSummary ¶ added in v1.11.4
type TaskSummary struct {
ID string `bson:"_id"`
TaskName string `bson:"task_name"`
Success int `bson:"success"`
Queueing int `bson:"queueing"`
Retrying int `bson:"retrying"`
Failure int `bson:"failure"`
Stopped int `bson:"stopped"`
IsLoading bool `bson:"is_loading"`
}
TaskSummary model
func (*TaskSummary) CountTotalJob ¶ added in v1.11.4
func (s *TaskSummary) CountTotalJob() int
CountTotalJob method
func (*TaskSummary) SetValue ¶ added in v1.11.4
func (s *TaskSummary) SetValue(source map[string]int)
SetValue method
func (*TaskSummary) ToMapResult ¶ added in v1.11.4
func (s *TaskSummary) ToMapResult() map[string]int
ToMapResult method
func (*TaskSummary) ToSummaryDetail ¶ added in v1.11.4
func (s *TaskSummary) ToSummaryDetail() (detail SummaryDetail)
ToSummaryDetail method