disq

package module
v0.1.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 12, 2022 License: MIT Imports: 16 Imported by: 0

README

disq

A custom job-queue library for Convoy. Provides in-memory (localstorage) and redis (stream and list) backends only for now.

Features

  • Redis (Stream and List), and in-memory backends.
  • Message Delay
  • Automatic retries

Usage

Create a broker

You can create a single broker for publishing and consuming messages from a queue. You'll need to first create a task and a message though.

import (
    "github.com/frain-dev/disq"
    redisBroker "github.com/frain-dev/disq/brokers/redis"
) 

//Create a Task
var CountHandler, _ = disq.RegisterTask(&disq.TaskOptions{
	Name: "CountHandler",
	Handler: func(name string) error {
		time.Sleep(time.Duration(10) * time.Second)
		fmt.Println("Hello", name)
		return nil
	},
	RetryLimit: 3,
})

//Create a Message
var value = fmt.Sprint("message_", uuid.NewString())
var ctx = context.Background()

var msg := &disq.Message{
    Ctx:      ctx,
    TaskName: CountHandler.Name(),
    Args:     []interface{}{value},
}

// Create a (redis stream) Broker
cfg := redisBroker.RedisConfig{
		Redis:       c, //redis client
		Name:        name, //name of queue
		Concurency:  int32(concurency),
		StreamGroup: "disq:",
	}

var broker = redisBroker.NewStream(&cfg)

// Publish and Consume with the broker
broker.publish(msg)
broker.Consume(ctx)
broker.Stats() //View consumer stats

Create multiple brokers and assign them to a worker

You can create multiple brokers, create a worker and manage those brokers with the worker.

import (
    "github.com/frain-dev/disq"
) 
//Create a worker
var brokers = []disq.Broker{broker1, broker2, broker3}
var w = disq.NewWorker(brokers)

//start processing messages
var err = w.Start(ctx)
if err != nil {
    log.Fatal(err)
}

//Get stats from all brokers
for i, b := range w.Brokers() {
    var len, _ = b.Len()
    log.Printf("Broker_%d Queue Size: %+v", i, len)
    log.Printf("Broker_%d Stats: %+v\n\n", i, b.Stats())
}

Full example

There is a full working example in test. To run it;

go run test/api/api.go 
go run test/worker/worker.go

Contributing

Please see CONTRIBUTING for details.

License

The MIT License (MIT). Please see License File for more information.

Documentation

Index

Constants

This section is empty.

Variables

View Source
var Logger *log.Logger

Functions

func BytesToString added in v0.1.4

func BytesToString(b []byte) string

BytesToString converts byte slice to string.

func ConsumerName

func ConsumerName() string

func Delay

func Delay(msg *Message, msgErr error) time.Duration

func DurEqual

func DurEqual(d1, d2 time.Duration, threshold int) bool

func Scheduler

func Scheduler(name string, c *redis.Client,
	fn func(ctx context.Context) (int, error))

func SetLogger

func SetLogger(logger *log.Logger)

func StringToBytes added in v0.1.4

func StringToBytes(s string) []byte

StringToBytes converts string to byte slice.

func UnixMs

func UnixMs(tm time.Time) int64

func Version

func Version() string

func WithRedisLock

func WithRedisLock(
	ctx context.Context, name string, redis Redis, fn func(ctx context.Context) error,
) error

Types

type Broker

type Broker interface {
	Consume(context.Context)
	Publish(*Message) error
	Process(*Message) error
	FetchN(context.Context, int, time.Duration) ([]Message, error)
	Delete(*Message) error
	Stats() *Stats
	Status() bool
	Len() (int, error)
	Stop() error
}

type Config

type Config interface {
	Init() error
}

type Delayer

type Delayer interface {
	Delay() time.Duration
}

type Handler

type Handler interface {
	HandleMessage(msg *Message) error
}

Handler is an interface for processing messages.

func NewHandler

func NewHandler(fn interface{}) Handler

type HandlerFunc

type HandlerFunc func(*Message) error

func (HandlerFunc) HandleMessage

func (fn HandlerFunc) HandleMessage(msg *Message) error

type Message

type Message struct {
	Ctx context.Context

	ID string

	Name string

	// Delay specifies the duration the queue must wait
	// before executing the message.
	Delay time.Duration

	Args []interface{}

	ArgsBin []byte

	TaskName string

	RetryCount int

	//Execution time need for localstorage delays
	ExecutionTime time.Time

	Err error
}

Message is used as a uniform object for publishing and consuming messages from a queue.

func NewMessage

func NewMessage(ctx context.Context, args ...interface{}) *Message

func (*Message) MarshalBinary added in v0.1.1

func (m *Message) MarshalBinary() ([]byte, error)

func (*Message) SetDelay

func (m *Message) SetDelay(delay time.Duration)

func (*Message) UnmarshalBinary added in v0.1.1

func (m *Message) UnmarshalBinary(b []byte) error

type MessageRaw

type MessageRaw Message

type Redis

type Redis interface {
	Del(ctx context.Context, keys ...string) *redis.IntCmd
	SetNX(ctx context.Context, key string, value interface{}, expiration time.Duration) *redis.BoolCmd
	Pipelined(ctx context.Context, fn func(pipe redis.Pipeliner) error) ([]redis.Cmder, error)

	Eval(ctx context.Context, script string, keys []string, args ...interface{}) *redis.Cmd
	EvalSha(ctx context.Context, sha1 string, keys []string, args ...interface{}) *redis.Cmd
	ScriptExists(ctx context.Context, scripts ...string) *redis.BoolSliceCmd
	ScriptLoad(ctx context.Context, script string) *redis.StringCmd

	//Stream and ZSET methods
	TxPipeline() redis.Pipeliner
	XAdd(ctx context.Context, a *redis.XAddArgs) *redis.StringCmd
	XDel(ctx context.Context, stream string, ids ...string) *redis.IntCmd
	XLen(ctx context.Context, stream string) *redis.IntCmd
	XRangeN(ctx context.Context, stream, start, stop string, count int64) *redis.XMessageSliceCmd
	XGroupCreateMkStream(ctx context.Context, stream, group, start string) *redis.StatusCmd
	XReadGroup(ctx context.Context, a *redis.XReadGroupArgs) *redis.XStreamSliceCmd
	XAck(ctx context.Context, stream, group string, ids ...string) *redis.IntCmd
	XPendingExt(ctx context.Context, a *redis.XPendingExtArgs) *redis.XPendingExtCmd
	XTrim(ctx context.Context, key string, maxLen int64) *redis.IntCmd
	XGroupDelConsumer(ctx context.Context, stream, group, consumer string) *redis.IntCmd
	ZAdd(ctx context.Context, key string, members ...*redis.Z) *redis.IntCmd
	ZRangeByScore(ctx context.Context, key string, opt *redis.ZRangeBy) *redis.StringSliceCmd
	ZRem(ctx context.Context, key string, members ...interface{}) *redis.IntCmd
	XInfoConsumers(ctx context.Context, key string, group string) *redis.XInfoConsumersCmd

	//List methods
	LIndex(ctx context.Context, key string, index int64) *redis.StringCmd
	LLen(ctx context.Context, key string) *redis.IntCmd
	LPop(ctx context.Context, key string) *redis.StringCmd
	LPopCount(ctx context.Context, key string, count int) *redis.StringSliceCmd
	LPush(ctx context.Context, key string, values ...interface{}) *redis.IntCmd
	LRange(ctx context.Context, key string, start, stop int64) *redis.StringSliceCmd
	LRem(ctx context.Context, key string, count int64, value interface{}) *redis.IntCmd
	LTrim(ctx context.Context, key string, start, stop int64) *redis.StatusCmd
}

type Stats

type Stats struct {
	Name      string
	Processed uint32
	Retries   uint32
	Fails     uint32
}

type Task

type Task struct {
	// contains filtered or unexported fields
}

func NewTask

func NewTask(opt *TaskOptions) *Task

func RegisterTask

func RegisterTask(opt *TaskOptions) (*Task, error)

func (*Task) HandleMessage

func (t *Task) HandleMessage(msg *Message) error

func (*Task) Name

func (t *Task) Name() string

func (*Task) RetryLimit

func (t *Task) RetryLimit() int

type TaskMap

type TaskMap struct {
	// contains filtered or unexported fields
}
var Tasks TaskMap

func (*TaskMap) LoadTask

func (s *TaskMap) LoadTask(name string) (*Task, error)

func (*TaskMap) RegisterTasks

func (s *TaskMap) RegisterTasks(opts *TaskOptions) (*Task, error)

type TaskOptions

type TaskOptions struct {
	Name string

	Handler interface{}

	RetryLimit int
}

type Worker

type Worker struct {
	// contains filtered or unexported fields
}

func NewWorker

func NewWorker(brokers []Broker) *Worker

func (*Worker) AddBroker

func (w *Worker) AddBroker(ctx context.Context, broker Broker)

func (*Worker) Brokers

func (w *Worker) Brokers() []Broker

func (*Worker) Start

func (w *Worker) Start(ctx context.Context) error

func (*Worker) Stats

func (w *Worker) Stats() []*Stats

func (*Worker) Stop

func (w *Worker) Stop() error

Directories

Path Synopsis
brokers
consumer command
publisher command

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL