Documentation
¶
Index ¶
- Constants
- Variables
- func GetGormFromContext(ctx context.Context) *gorm.DB
- func GetRedisClientFromContext(ctx context.Context) *redis.Client
- func LogFailedJob(ctx context.Context, j Job, err error)
- func MakeConsumer(jobFactory JobFactory, handlerFactory HandlerFactory, publisher Publisher, ...) *defaultConsumer
- func NewRelicGormWithTransaction(dbConn *gorm.DB) func(next HandleFunc) HandleFunc
- func NewRelicToGorm(dbConn *gorm.DB) func(next HandleFunc) HandleFunc
- func NewRelicToRedis(c *redis.Client) func(next HandleFunc) HandleFunc
- func NewRmqConn(redisConn *redis.Client) (rmq.Connection, error)
- func NewRmqConnFromRedisConfig(redisConfig *RedisConfig) (rmq.Connection, error)
- func SetGormToContext(ctx context.Context, dbConn *gorm.DB) context.Context
- func SetRedisClientToContext(ctx context.Context, c *redis.Client) context.Context
- func WithNewRelicForConsumer(nrApp newrelic.Application) func(next HandleFunc) HandleFunc
- type BaseHandler
- type ConsumerManager
- type ConsumerManagerMock
- type FailHandler
- type HandleFunc
- type Handler
- type HandlerFactory
- type HandlerMiddleWare
- type HandlerMock
- type Job
- type JobFactory
- type Publisher
- type PublisherHandlerFunc
- type PublisherHandlerMiddleWare
- type PublisherMock
- type RedisConfig
- type RedisJob
- func (job *RedisJob) Attempt()
- func (job *RedisJob) Delay() int
- func (job *RedisJob) Fail(err error)
- func (job *RedisJob) GetAttempts() int
- func (job *RedisJob) GetFailedError() string
- func (job *RedisJob) GetID() string
- func (job *RedisJob) GetMaxTries() int
- func (job *RedisJob) GetQueue() string
- func (job *RedisJob) GetTraceID() string
- func (job *RedisJob) GetUserID() string
- func (job *RedisJob) HasFailed() bool
- func (job *RedisJob) OnQueue(queue string)
- func (job *RedisJob) Retry(err error)
Constants ¶
View Source
const (
FieldJobPayload = "job"
)
Variables ¶
View Source
var ( ErrFailedWithUnknownData = errors.New("consume failed with unknown data") ErrJobExceedRetryTimes = errors.New("job exceeds retry times") ErrorInValidJobModel = errors.New("invalid job struct") ErrDbEmpty = errors.New("db connection invalid") ErrRedisConnectionEmpty = errors.New("Redis connection invalid") )
Functions ¶
func MakeConsumer ¶
func MakeConsumer( jobFactory JobFactory, handlerFactory HandlerFactory, publisher Publisher, failJobHandlers []FailHandler, middlewareList []HandlerMiddleWare, ) *defaultConsumer
func NewRelicGormWithTransaction ¶
func NewRelicGormWithTransaction(dbConn *gorm.DB) func(next HandleFunc) HandleFunc
func NewRelicToGorm ¶
func NewRelicToGorm(dbConn *gorm.DB) func(next HandleFunc) HandleFunc
func NewRelicToRedis ¶
func NewRelicToRedis(c *redis.Client) func(next HandleFunc) HandleFunc
func NewRmqConn ¶
func NewRmqConn(redisConn *redis.Client) (rmq.Connection, error)
NewRmqConn returns a connection to RedisQueue with redisClient
func NewRmqConnFromRedisConfig ¶
func NewRmqConnFromRedisConfig(redisConfig *RedisConfig) (rmq.Connection, error)
NewRmqConnFromRedisConfig returns a connection to RedisQueue using redisConfig
func SetRedisClientToContext ¶
func WithNewRelicForConsumer ¶
func WithNewRelicForConsumer(nrApp newrelic.Application) func(next HandleFunc) HandleFunc
Types ¶
type BaseHandler ¶
type BaseHandler struct{}
Base handler, all handler should embed this one
func (*BaseHandler) Handle ¶
func (handler *BaseHandler) Handle(_ context.Context, _ Job) error
Handle job
func (*BaseHandler) ShouldRejectOnFailure ¶
func (handler *BaseHandler) ShouldRejectOnFailure(err error) bool
Depend on error, we can move job to rejected queue OR just skip, ignore the job
func (*BaseHandler) ShouldRetryOnError ¶
func (handler *BaseHandler) ShouldRetryOnError(err error) bool
Determine if which this error job should retry or fail
type ConsumerManager ¶
type ConsumerManager interface {
Add(queueName string, consumer rmq.Consumer)
StartConsuming(queueName string, replicas int, pollDuration time.Duration)
StopConsuming(queueName string)
}
func NewConsumerManager ¶
func NewConsumerManager() (ConsumerManager, error)
NewConsumerManager returns a ConsumerManager
func NewConsumerManagerFromConfig ¶
func NewConsumerManagerFromConfig(conf *RedisConfig) (ConsumerManager, error)
func NewConsumerManagerWithConnection ¶
func NewConsumerManagerWithConnection(conn rmq.Connection) ConsumerManager
type ConsumerManagerMock ¶
type ConsumerManagerMock struct {
AddFn func(queueName string, consumer rmq.Consumer)
StartConsumingFn func(queueName string, replicas int, pollDuration time.Duration)
StopConsumingFn func(queueName string)
}
func (ConsumerManagerMock) Add ¶
func (p ConsumerManagerMock) Add(queueName string, consumer rmq.Consumer)
func (ConsumerManagerMock) StartConsuming ¶
func (p ConsumerManagerMock) StartConsuming(queueName string, replicas int, pollDuration time.Duration)
func (ConsumerManagerMock) StopConsuming ¶
func (p ConsumerManagerMock) StopConsuming(queueName string)
type HandleFunc ¶
type Handler ¶
type Handler interface {
// Handle job return nil error mean job was processed successfully, otherwise job
// was failed and method FailJob will be called
Handle(ctx context.Context, job Job) error
// Should retry on error
ShouldRetryOnError(err error) bool
// Should be move to rejected queue in case of fail
ShouldRejectOnFailure(err error) bool
}
type HandlerFactory ¶
Make handler from context Brand new handler is created for every request
type HandlerMiddleWare ¶
type HandlerMiddleWare func(next HandleFunc) HandleFunc
type HandlerMock ¶
type HandlerMock struct {
HandleFn func(ctx context.Context, job Job) error
ShouldRetryOnErrorFn func(err error) bool
ShouldRejectOnFailureFn func(err error) bool
}
func (HandlerMock) ShouldRejectOnFailure ¶
func (handler HandlerMock) ShouldRejectOnFailure(err error) bool
func (HandlerMock) ShouldRetryOnError ¶
func (handler HandlerMock) ShouldRetryOnError(err error) bool
type Job ¶
type Job interface {
// Get job ID
GetID() string
// Get user who triggered job
GetUserID() string
// Get trace id of request which triggered job
GetTraceID() string
// Set Queue
OnQueue(queueName string)
// Increase number of attempt times
Attempt()
// Get the number of job's attempt times
GetAttempts() int
// Mark job as failed
Fail(err error)
// Retry on error
Retry(err error)
// Determine if the job has been marked as a failure.
HasFailed() bool
// Get the number of times to attempt a job. Default is 1.
GetMaxTries() int
// Get job's Queue name
GetQueue() string
// Get delay time time in second before the job is retried again
Delay() int
// Return error string why job failed
GetFailedError() string
}
type JobFactory ¶
type JobFactory func() Job
Return a new job instance for consumer to decode payload json
type Publisher ¶
type Publisher interface {
Publish(ctx context.Context, job Job) error
PublishOnDelay(ctx context.Context, job Job) error
PublishRejected(ctx context.Context, job Job) error
UseMiddlewares(m ...PublisherHandlerMiddleWare)
}
func NewPublisher ¶
func NewPublisherFromConfig ¶
func NewPublisherFromConfig(conf *RedisConfig) (Publisher, error)
func NewPublisherWithConnection ¶
func NewPublisherWithConnection(conn rmq.Connection) (Publisher, error)
type PublisherHandlerMiddleWare ¶
type PublisherHandlerMiddleWare func(next PublisherHandlerFunc) PublisherHandlerFunc
func WithNewRelicTransaction ¶
func WithNewRelicTransaction() PublisherHandlerMiddleWare
type PublisherMock ¶
type PublisherMock struct {
PublishFn func(queue string, job Job) error
PublishOnDelayFn func(queue string, job Job, delayAt time.Time) error
PublishRejectedFn func(job Job) error
}
func (PublisherMock) PublishOnDelay ¶
func (PublisherMock) PublishRejected ¶
func (p PublisherMock) PublishRejected(job Job) error
type RedisConfig ¶
type RedisConfig struct {
RedisMaster string `envconfig:"REDIS_MASTER" required:"true"`
SentinelHost string `envconfig:"SENTINEL_HOST" required:"true"`
SentinelPort string `envconfig:"SENTINEL_PORT" required:"true"`
RedisMaxActiveConnection int `envconfig:"REDIS_MAX_ACTIVE" required:"false"`
MaxIdle int `envconfig:"REDIS_MAX_IDLE" required:"false"`
}
func GetConfigFromEnv ¶
func GetConfigFromEnv() RedisConfig
func (*RedisConfig) GetSentinelAddress ¶
func (redisConfig *RedisConfig) GetSentinelAddress() string
type RedisJob ¶
type RedisJob struct {
ID string
UserID string
TraceID string
Queue string
Attempts int
Failed bool
Error string
}
func NewRedisJob ¶
func (*RedisJob) GetAttempts ¶
func (*RedisJob) GetFailedError ¶
func (*RedisJob) GetMaxTries ¶
func (*RedisJob) GetTraceID ¶
Click to show internal directories.
Click to hide internal directories.