Documentation
¶
Index ¶
- Variables
- func BytesToString(b []byte) string
- func ConsumerName() string
- func DurEqual(d1, d2 time.Duration, threshold int) bool
- func ErrorHandler(msg *Message, msgErr error, retries *uint32) error
- func FormatHandlerError(msg *Message, retrylimit int) error
- func Scheduler(name string, c *redis.Client, fn func(ctx context.Context) (int, error))
- func SetLogger(logger *log.Logger)
- func StringToBytes(s string) []byte
- func UnixMs(tm time.Time) int64
- func Version() string
- func WithRedisLock(ctx context.Context, name string, redis Redis, ...) error
- type Broker
- type Config
- type DisqError
- type Error
- type Handler
- type HandlerFunc
- type Message
- type MessageRaw
- type Redis
- type Stats
- type Task
- type TaskMap
- type TaskOptions
Constants ¶
This section is empty.
Variables ¶
View Source
var Logger *log.Logger
Functions ¶
func BytesToString ¶ added in v0.1.4
BytesToString converts byte slice to string.
func ConsumerName ¶
func ConsumerName() string
func ErrorHandler ¶ added in v0.1.6
func FormatHandlerError ¶ added in v0.1.6
func StringToBytes ¶ added in v0.1.4
StringToBytes converts string to byte slice.
Types ¶
type Error ¶ added in v0.1.6
func (*Error) GetRateLimit ¶ added in v0.1.6
type Handler ¶
Handler is an interface for processing messages.
func NewHandler ¶
func NewHandler(fn interface{}) Handler
type HandlerFunc ¶
func (HandlerFunc) HandleMessage ¶
func (fn HandlerFunc) HandleMessage(msg *Message) error
type Message ¶
type Message struct {
Ctx context.Context `msgpack:"-"`
ID string `msgpack:"ID"`
Name string `msgpack:"Name"`
// Delay specifies the duration the queue must wait
// before executing the message.
Delay time.Duration `msgpack:"Delay"`
Args []interface{} `msgpack:"Args"`
ArgsBin []byte `msgpack:"ArgsBin"`
TaskName string `msgpack:"TaskName"`
RetryCount int `msgpack:"RetryCount"`
//Execution time need for localstorage delays
ExecutionTime time.Time `msgpack:"ExecutionTime"`
Err error `msgpack:"Err"`
}
Message is used as a uniform object for publishing and consuming messages from a queue.
func NewMessage ¶
func (*Message) MarshalArgs ¶ added in v0.1.5
func (*Message) MarshalBinary ¶ added in v0.1.1
func (*Message) UnmarshalBinary ¶ added in v0.1.1
type MessageRaw ¶
type MessageRaw Message
type Redis ¶
type Redis interface {
Del(ctx context.Context, keys ...string) *redis.IntCmd
SetNX(ctx context.Context, key string, value interface{}, expiration time.Duration) *redis.BoolCmd
Pipelined(ctx context.Context, fn func(pipe redis.Pipeliner) error) ([]redis.Cmder, error)
Eval(ctx context.Context, script string, keys []string, args ...interface{}) *redis.Cmd
EvalSha(ctx context.Context, sha1 string, keys []string, args ...interface{}) *redis.Cmd
ScriptExists(ctx context.Context, scripts ...string) *redis.BoolSliceCmd
ScriptLoad(ctx context.Context, script string) *redis.StringCmd
//Stream and ZSET methods
TxPipeline() redis.Pipeliner
XAdd(ctx context.Context, a *redis.XAddArgs) *redis.StringCmd
XDel(ctx context.Context, stream string, ids ...string) *redis.IntCmd
XLen(ctx context.Context, stream string) *redis.IntCmd
XRangeN(ctx context.Context, stream, start, stop string, count int64) *redis.XMessageSliceCmd
XRange(ctx context.Context, stream, start, stop string) *redis.XMessageSliceCmd
XGroupCreateMkStream(ctx context.Context, stream, group, start string) *redis.StatusCmd
XReadGroup(ctx context.Context, a *redis.XReadGroupArgs) *redis.XStreamSliceCmd
XAck(ctx context.Context, stream, group string, ids ...string) *redis.IntCmd
XInfoStream(ctx context.Context, key string) *redis.XInfoStreamCmd
XPendingExt(ctx context.Context, a *redis.XPendingExtArgs) *redis.XPendingExtCmd
XPending(ctx context.Context, stream, group string) *redis.XPendingCmd
XTrim(ctx context.Context, key string, maxLen int64) *redis.IntCmd
XGroupDelConsumer(ctx context.Context, stream, group, consumer string) *redis.IntCmd
ZAdd(ctx context.Context, key string, members ...*redis.Z) *redis.IntCmd
ZRangeByScore(ctx context.Context, key string, opt *redis.ZRangeBy) *redis.StringSliceCmd
ZRem(ctx context.Context, key string, members ...interface{}) *redis.IntCmd
XInfoConsumers(ctx context.Context, key string, group string) *redis.XInfoConsumersCmd
//List methods
LIndex(ctx context.Context, key string, index int64) *redis.StringCmd
LLen(ctx context.Context, key string) *redis.IntCmd
LPop(ctx context.Context, key string) *redis.StringCmd
LPopCount(ctx context.Context, key string, count int) *redis.StringSliceCmd
LPush(ctx context.Context, key string, values ...interface{}) *redis.IntCmd
LRange(ctx context.Context, key string, start, stop int64) *redis.StringSliceCmd
LRem(ctx context.Context, key string, count int64, value interface{}) *redis.IntCmd
LTrim(ctx context.Context, key string, start, stop int64) *redis.StatusCmd
}
type Task ¶
type Task struct {
// contains filtered or unexported fields
}
func NewTask ¶
func NewTask(opt *TaskOptions) *Task
func RegisterTask ¶
func RegisterTask(opt *TaskOptions) (*Task, error)
func (*Task) HandleMessage ¶
func (*Task) RetryLimit ¶
type TaskMap ¶
type TaskMap struct {
// contains filtered or unexported fields
}
var Tasks TaskMap
func (*TaskMap) RegisterTasks ¶
func (s *TaskMap) RegisterTasks(opts *TaskOptions) (*Task, error)
type TaskOptions ¶
Source Files
¶
Click to show internal directories.
Click to hide internal directories.