Documentation
¶
Overview ¶
Package msgqueue implements a SQS & IronMQ client with rate limiting and call once.
Example (CustomRateLimit) ¶
package main
import (
"fmt"
"time"
"github.com/go-msgqueue/msgqueue"
"github.com/go-msgqueue/msgqueue/memqueue"
)
type RateLimitError string
func (e RateLimitError) Error() string {
return string(e)
}
func (RateLimitError) Delay() time.Duration {
return 3 * time.Second
}
func main() {
start := time.Now()
q := memqueue.NewQueue(&msgqueue.Options{
Handler: func() error {
fmt.Println("retried in", timeSince(start))
return RateLimitError("calm down")
},
RetryLimit: 2,
MinBackoff: time.Millisecond,
})
defer q.Close()
q.Processor().Stop()
q.Call()
q.Processor().ProcessAll()
}
Output: retried in 0s retried in 3s
Example (MaxWorkers) ¶
package main
import (
"fmt"
"math"
"time"
"github.com/go-redis/redis"
"github.com/go-msgqueue/msgqueue"
"github.com/go-msgqueue/msgqueue/memqueue"
)
func redisRing() *redis.Ring {
ring := redis.NewRing(&redis.RingOptions{
Addrs: map[string]string{"0": ":6379"},
PoolSize: 100,
})
err := ring.FlushDb().Err()
if err != nil {
panic(err)
}
return ring
}
func timeSince(start time.Time) time.Duration {
secs := float64(time.Since(start)) / float64(time.Second)
return time.Duration(math.Floor(secs)) * time.Second
}
func main() {
start := time.Now()
q := memqueue.NewQueue(&msgqueue.Options{
Handler: func() {
fmt.Println(timeSince(start))
time.Sleep(time.Second)
},
Redis: redisRing(),
WorkerLimit: 1,
})
for i := 0; i < 3; i++ {
q.Call()
}
// Close queue to make sure all messages are processed.
_ = q.Close()
}
Output: 0s 1s 2s
Example (MessageDelay) ¶
package main
import (
"fmt"
"math"
"time"
"github.com/go-msgqueue/msgqueue"
"github.com/go-msgqueue/msgqueue/memqueue"
)
func timeSince(start time.Time) time.Duration {
secs := float64(time.Since(start)) / float64(time.Second)
return time.Duration(math.Floor(secs)) * time.Second
}
func main() {
start := time.Now()
q := memqueue.NewQueue(&msgqueue.Options{
Handler: func() {
fmt.Println("processed with delay", timeSince(start))
},
})
defer q.Close()
q.Processor().Stop()
msg := msgqueue.NewMessage()
msg.Delay = time.Second
q.Add(msg)
q.Processor().ProcessAll()
}
Output: processed with delay 1s
Example (Once) ¶
package main
import (
"fmt"
"time"
"github.com/go-redis/redis"
"golang.org/x/time/rate"
"github.com/go-msgqueue/msgqueue"
"github.com/go-msgqueue/msgqueue/memqueue"
)
func redisRing() *redis.Ring {
ring := redis.NewRing(&redis.RingOptions{
Addrs: map[string]string{"0": ":6379"},
PoolSize: 100,
})
err := ring.FlushDb().Err()
if err != nil {
panic(err)
}
return ring
}
func main() {
q := memqueue.NewQueue(&msgqueue.Options{
Handler: func(name string) {
fmt.Println("hello", name)
},
Redis: redisRing(),
RateLimit: rate.Every(time.Second),
})
for _, name := range []string{"world", "adele"} {
for i := 0; i < 10; i++ {
// Call once in a second.
q.CallOnce(time.Second, name)
}
}
// Close queue to make sure all messages are processed.
_ = q.Close()
}
Output: hello world hello adele
Example (RateLimit) ¶
package main
import (
"fmt"
"math"
"time"
"github.com/go-redis/redis"
"golang.org/x/time/rate"
"github.com/go-msgqueue/msgqueue"
"github.com/go-msgqueue/msgqueue/memqueue"
)
func redisRing() *redis.Ring {
ring := redis.NewRing(&redis.RingOptions{
Addrs: map[string]string{"0": ":6379"},
PoolSize: 100,
})
err := ring.FlushDb().Err()
if err != nil {
panic(err)
}
return ring
}
func timeSinceCeil(start time.Time) time.Duration {
secs := float64(time.Since(start)) / float64(time.Second)
return time.Duration(math.Ceil(secs)) * time.Second
}
func main() {
start := time.Now()
q := memqueue.NewQueue(&msgqueue.Options{
Handler: func() {
fmt.Println(timeSinceCeil(start))
},
Redis: redisRing(),
RateLimit: rate.Every(time.Second),
})
for i := 0; i < 5; i++ {
q.Call()
}
// Close queue to make sure all messages are processed.
_ = q.Close()
}
Output: 1s 1s 2s 3s 4s
Example (RetryOnError) ¶
package main
import (
"errors"
"fmt"
"math"
"time"
"github.com/go-msgqueue/msgqueue"
"github.com/go-msgqueue/msgqueue/memqueue"
)
func timeSince(start time.Time) time.Duration {
secs := float64(time.Since(start)) / float64(time.Second)
return time.Duration(math.Floor(secs)) * time.Second
}
func main() {
start := time.Now()
q := memqueue.NewQueue(&msgqueue.Options{
Handler: func() error {
fmt.Println("retried in", timeSince(start))
return errors.New("fake error")
},
RetryLimit: 3,
MinBackoff: time.Second,
})
defer q.Close()
q.Processor().Stop()
q.Call()
q.Processor().ProcessAll()
}
Output: retried in 0s retried in 1s retried in 3s
Index ¶
Examples ¶
Constants ¶
This section is empty.
Variables ¶
View Source
var ErrDuplicate = errors.New("queue: message with such name already exists")
Functions ¶
This section is empty.
Types ¶
type Handler ¶
func NewHandler ¶
func NewHandler(fn interface{}) Handler
type HandlerFunc ¶
func (HandlerFunc) HandleMessage ¶
func (fn HandlerFunc) HandleMessage(msg *Message) error
type Message ¶
type Message struct {
// SQS/IronMQ message id.
Id string
// Optional name for the message. Messages with the same name
// are processed only once.
Name string
// Delay specifies the duration the queue must wait
// before executing the message.
Delay time.Duration
// Function args passed to the handler.
Args []interface{}
// Text representation of the Args.
Body string
// SQS/IronMQ reservation id that is used to release/delete the message..
ReservationId string
// The number of times the message has been reserved or released.
ReservedCount int
}
Message is used to create and retrieve messages from a queue.
func NewMessage ¶
func NewMessage(args ...interface{}) *Message
func (*Message) MarshalArgs ¶
func (*Message) SetDelayName ¶
SetDelayName sets delay and generates message name from the args.
type Options ¶
type Options struct {
// Queue name.
Name string
// Queue group name.
GroupName string
// Function called to process a message.
Handler interface{}
// Function called to process failed message.
FallbackHandler interface{}
// Number of goroutines processing messages.
WorkerNumber int
// Global limit of concurrently running workers. Overrides WorkerNumber.
WorkerLimit int
// Size of the buffer where reserved messages are stored.
BufferSize int
// Time after which the reserved message is returned to the queue.
ReservationTimeout time.Duration
// Number of tries/releases after which the message fails permanently
// and is deleted.
RetryLimit int
// Minimum backoff time between retries.
MinBackoff time.Duration
// Maximum backoff time between retries.
MaxBackoff time.Duration
// Processing rate limit.
RateLimit rate.Limit
// Redis client that is used for storing metadata.
Redis Redis
// Optional storage interface. The default is to use Redis.
Storage Storage
// Optional rate limiter interface. The default is to use Redis.
RateLimiter RateLimiter
// contains filtered or unexported fields
}
type RateLimiter ¶
type Redis ¶
type Redis interface {
Del(keys ...string) *redis.IntCmd
SetNX(key string, value interface{}, expiration time.Duration) *redis.BoolCmd
SAdd(key string, members ...interface{}) *redis.IntCmd
SMembers(key string) *redis.StringSliceCmd
Pipelined(func(pipe redis.Pipeliner) error) ([]redis.Cmder, error)
Eval(script string, keys []string, args ...interface{}) *redis.Cmd
Publish(channel, message string) *redis.IntCmd
}
Click to show internal directories.
Click to hide internal directories.