disq

package module
v0.1.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 12, 2022 License: MIT Imports: 15 Imported by: 0

README

disq

A custom job-queue library for Convoy. Provides in-memory (localstorage) and redis (stream and list) backends only for now.

Features

  • Redis (Stream and List), and in-memory backends.
  • Message Delay
  • Automatic retries

Usage

Create a broker

You can create a single broker for publishing and consuming messages from a queue. You'll need to first create a task and a message though.

import (
    "github.com/frain-dev/disq"
    redisBroker "github.com/frain-dev/disq/brokers/redis"
) 

//Create a Task
var CountHandler, _ = disq.RegisterTask(&disq.TaskOptions{
	Name: "CountHandler",
	Handler: func(name string) error {
		time.Sleep(time.Duration(10) * time.Second)
		fmt.Println("Hello", name)
		return nil
	},
	RetryLimit: 3,
})

//Create a Message
var value = fmt.Sprint("message_", uuid.NewString())
var ctx = context.Background()

var msg := &disq.Message{
    Ctx:      ctx,
    TaskName: CountHandler.Name(),
    Args:     []interface{}{value},
}

// Create a (redis stream) Broker
cfg := redisBroker.RedisConfig{
		Redis:       c, //redis client
		Name:        name, //name of queue
		Concurency:  int32(concurency),
		StreamGroup: "disq:",
	}

var broker = redisBroker.NewStream(&cfg)

// Publish and Consume with the broker
broker.publish(msg)
broker.Consume(ctx)
broker.Stats() //View consumer stats

Create multiple brokers and assign them to a worker

You can create multiple brokers, create a worker and manage those brokers with the worker.

import (
    "github.com/frain-dev/disq"
) 
//Create a worker
var brokers = []disq.Broker{broker1, broker2, broker3}
var w = disq.NewWorker(brokers)

//start processing messages
var err = w.Start(ctx)
if err != nil {
    log.Fatal(err)
}

//Get stats from all brokers
for i, b := range w.Brokers() {
    var len, _ = b.Len()
    log.Printf("Broker_%d Queue Size: %+v", i, len)
    log.Printf("Broker_%d Stats: %+v\n\n", i, b.Stats())
}

Full example

There is a full working example in test. To run it;

go run test/api/api.go 
go run test/worker/worker.go

Contributing

Please see CONTRIBUTING for details.

License

The MIT License (MIT). Please see License File for more information.

Documentation

Index

Constants

This section is empty.

Variables

View Source
var Logger *log.Logger

Functions

func ConsumerName

func ConsumerName() string

func Delay

func Delay(msg *Message, msgErr error) time.Duration

func DurEqual

func DurEqual(d1, d2 time.Duration, threshold int) bool

func Scheduler

func Scheduler(name string, c *redis.Client,
	fn func(ctx context.Context) (int, error))

func SetLogger

func SetLogger(logger *log.Logger)

func UnixMs

func UnixMs(tm time.Time) int64

func Version

func Version() string

func WithRedisLock

func WithRedisLock(
	ctx context.Context, name string, redis Redis, fn func(ctx context.Context) error,
) error

Types

type Broker

type Broker interface {
	Consume(context.Context)
	Publish(*Message) error
	Process(*Message) error
	FetchN(context.Context, int, time.Duration) ([]Message, error)
	Delete(*Message) error
	Stats() *Stats
	Status() bool
	Len() (int, error)
	Stop() error
}

type Config

type Config interface {
	Init() error
}

type Delayer

type Delayer interface {
	Delay() time.Duration
}

type Handler

type Handler interface {
	HandleMessage(msg *Message) error
}

Handler is an interface for processing messages.

func NewHandler

func NewHandler(fn interface{}) Handler

type HandlerFunc

type HandlerFunc func(*Message) error

func (HandlerFunc) HandleMessage

func (fn HandlerFunc) HandleMessage(msg *Message) error

type Message

type Message struct {
	Ctx context.Context

	ID string

	Name string

	// Delay specifies the duration the queue must wait
	// before executing the message.
	Delay time.Duration

	Args []interface{}

	ArgsBin []byte

	TaskName string

	RetryCount int

	//Execution time need for localstorage delays
	ExecutionTime time.Time

	Err error
}

Message is used as a uniform object for publishing and consuming messages from a queue.

func NewMessage

func NewMessage(ctx context.Context, args ...interface{}) *Message

func (*Message) MarshalBinary added in v0.1.1

func (m *Message) MarshalBinary() ([]byte, error)

func (*Message) SetDelay

func (m *Message) SetDelay(delay time.Duration)

func (*Message) UnmarshalBinary added in v0.1.1

func (m *Message) UnmarshalBinary(b []byte) error

type MessageRaw

type MessageRaw Message

type Redis

type Redis interface {
	Del(ctx context.Context, keys ...string) *redis.IntCmd
	SetNX(ctx context.Context, key string, value interface{}, expiration time.Duration) *redis.BoolCmd
	Pipelined(ctx context.Context, fn func(pipe redis.Pipeliner) error) ([]redis.Cmder, error)

	Eval(ctx context.Context, script string, keys []string, args ...interface{}) *redis.Cmd
	EvalSha(ctx context.Context, sha1 string, keys []string, args ...interface{}) *redis.Cmd
	ScriptExists(ctx context.Context, scripts ...string) *redis.BoolSliceCmd
	ScriptLoad(ctx context.Context, script string) *redis.StringCmd

	//Stream and ZSET methods
	TxPipeline() redis.Pipeliner
	XAdd(ctx context.Context, a *redis.XAddArgs) *redis.StringCmd
	XDel(ctx context.Context, stream string, ids ...string) *redis.IntCmd
	XLen(ctx context.Context, stream string) *redis.IntCmd
	XRangeN(ctx context.Context, stream, start, stop string, count int64) *redis.XMessageSliceCmd
	XGroupCreateMkStream(ctx context.Context, stream, group, start string) *redis.StatusCmd
	XReadGroup(ctx context.Context, a *redis.XReadGroupArgs) *redis.XStreamSliceCmd
	XAck(ctx context.Context, stream, group string, ids ...string) *redis.IntCmd
	XPendingExt(ctx context.Context, a *redis.XPendingExtArgs) *redis.XPendingExtCmd
	XTrim(ctx context.Context, key string, maxLen int64) *redis.IntCmd
	XGroupDelConsumer(ctx context.Context, stream, group, consumer string) *redis.IntCmd
	ZAdd(ctx context.Context, key string, members ...*redis.Z) *redis.IntCmd
	ZRangeByScore(ctx context.Context, key string, opt *redis.ZRangeBy) *redis.StringSliceCmd
	ZRem(ctx context.Context, key string, members ...interface{}) *redis.IntCmd
	XInfoConsumers(ctx context.Context, key string, group string) *redis.XInfoConsumersCmd

	//List methods
	LIndex(ctx context.Context, key string, index int64) *redis.StringCmd
	LLen(ctx context.Context, key string) *redis.IntCmd
	LPop(ctx context.Context, key string) *redis.StringCmd
	LPopCount(ctx context.Context, key string, count int) *redis.StringSliceCmd
	LPush(ctx context.Context, key string, values ...interface{}) *redis.IntCmd
	LRange(ctx context.Context, key string, start, stop int64) *redis.StringSliceCmd
	LRem(ctx context.Context, key string, count int64, value interface{}) *redis.IntCmd
	LTrim(ctx context.Context, key string, start, stop int64) *redis.StatusCmd
}

type Stats

type Stats struct {
	Name      string
	Processed uint32
	Retries   uint32
	Fails     uint32
}

type Task

type Task struct {
	// contains filtered or unexported fields
}

func NewTask

func NewTask(opt *TaskOptions) *Task

func RegisterTask

func RegisterTask(opt *TaskOptions) (*Task, error)

func (*Task) HandleMessage

func (t *Task) HandleMessage(msg *Message) error

func (*Task) Name

func (t *Task) Name() string

func (*Task) RetryLimit

func (t *Task) RetryLimit() int

type TaskMap

type TaskMap struct {
	// contains filtered or unexported fields
}
var Tasks TaskMap

func (*TaskMap) LoadTask

func (s *TaskMap) LoadTask(name string) (*Task, error)

func (*TaskMap) RegisterTasks

func (s *TaskMap) RegisterTasks(opts *TaskOptions) (*Task, error)

type TaskOptions

type TaskOptions struct {
	Name string

	Handler interface{}

	RetryLimit int
}

type Worker

type Worker struct {
	// contains filtered or unexported fields
}

func NewWorker

func NewWorker(brokers []Broker) *Worker

func (*Worker) AddBroker

func (w *Worker) AddBroker(ctx context.Context, broker Broker)

func (*Worker) Brokers

func (w *Worker) Brokers() []Broker

func (*Worker) Start

func (w *Worker) Start(ctx context.Context) error

func (*Worker) Stats

func (w *Worker) Stats() []*Stats

func (*Worker) Stop

func (w *Worker) Stop() error

Directories

Path Synopsis
brokers
consumer command
publisher command

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL