sqsworker

package module
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 14, 2019 License: MIT Imports: 9 Imported by: 0

README

sqsworker

Concurrent SQS Consumer written on Go

CircleCI GoDoc

Documentation

The Worker type represents a SQS consumer that can process sqs messages from a SQS queue and optionally send the results to a queue. The inenteded use is multiple concurrent consumers reading from the same queue which execute the hander function defined on the Worker struct.

To use his package, first define a handler function. This can also be a closure:

var handlerFunction = func(ctx context.Context, m *sqs.Message) ([]byte, error) {
	return []byte(strings.ToLower(*m.Body)), nil
}

The function must match the following type definition:

type Handler func(context.Context, *sqs.Message) ([]byte, error)

A Worker Struct can be initialized with the NewWorker method, and you may optionally define an outbound queue, and number of concurrent workers. If the number of workers is not set, the number of workers defaults to runtime.NumCPU(). A Timeout in seconds can be set where a handler will fail on a Timeout. The default, if this is not set, is 30 seconds.

sess := session.New(&aws.Config{Region: aws.String("us-east-1")})
w := sqsworker.NewWorker(sess, sqsworker.WorkerConfig{
	QueueIn:  "https:sqs.us-east-1.amazonaws.com/88888888888/In",
	QueueOut: "https:sqs.us-east-1.amazonaws.com/88888888888/Out",
	Workers:  4,
	Timeout,  30, // Handler Timeout in seconds.
	Handler:  handlerFunction,
	Name:     "TestApp",
})
w.Run()

The worker will send messages to the QueueOut queue on succesfull runs.

Concurrency

Handler function will be called concurrently by multiple workers depending on the configuration, and it is best to ensure that handler function can be executed concurrently, especially if it is a closure and there is shared state.

Performance

Real world performace will be dictated by latency to sqs. The benchmarks mock sqs calls to illustrate that the package adds very little overhead to consuming messages, and to ensure that memory is managed to not create more garbage collection than needed.

From the SQS documentation in AWS: https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-throughput-horizontal-scaling-and-batching.html

Because you access Amazon SQS through an HTTP request-response protocol, the request latency (the interval between initiating a request and receiving a response) limits the throughput that you can achieve from a single thread using a single connection. For example, if the latency from an Amazon EC2-based client to Amazon SQS in the same region averages 20 ms, the maximum throughput from a single thread over a single connection averages 50 TPS.

->cat /proc/cpuinfo | grep "model name" | head -1
model name	: Intel(R) Core(TM) i5-6600K CPU @ 3.50GHz

->go test -bench .
goos: linux
goarch: amd64
pkg: github.com/ajbeach2/sqsworker
BenchmarkWorker-4   	 1000000	      1395 ns/op	       0 B/op	       0 allocs/op
PASS
ok  	github.com/ajbeach2/sqsworker	2.427s

Documentation

Overview

Package sqsworker implements a SQS consumer that can process sqs messages from a SQS queue and optionally send the results to a result queue.

Overview

The inenteded use of this package is for multiple consumers reading from the same queue. Consumers are represeted by concurrent handler functions hat are managed by the Worker type. This package only does long-polling based sqs recieves.

To use his package, first define a handler function. This can also be a closure:

var handlerFunction = func(ctx context.Context, m *sqs.Message) ([]byte, error) {
	return []byte(strings.ToLower(*m.Body)), nil
}

The function must match the following type definition:

type Handler func(context.Context, *sqs.Message) ([]byte, error)

A Worker Struct can be initialized with the NewWorker method, and you may optionally define an outbound queue, and number of concurrent workers. If the number of workers is not set, the number of workers defaults to runtime.NumCPU(). A Timeout in seconds can be set where a handler will fail on a Timeout. The default, if this is not set, is 30 seconds.

sess := session.New(&aws.Config{Region: aws.String("us-east-1")})
w := sqsworker.NewWorker(sess, sqsworker.WorkerConfig{
	QueueIn:  "https://sqs.us-east-1.amazonaws.com/88888888888/In",
	QueueOut: "https://sqs.us-east-1.amazonaws.com/88888888888/Out",
	Workers:  4,
	Timeout,  30, // Handler Timeout in seconds.
	Handler:  handlerFunction,
	Name:     "TestApp",
})
w.Run()

The worker will send messages to the QueuOut queue on succesfull runs.

Concurrency

Handler function will be called concurrently by multiple workers depending on the configuration. It is best to ensure that handler functions can be executed concurrently, especially if they are closures and share state.

Index

Examples

Constants

View Source
const DefaultMaxNumberOfMessages = 10

Default amount of messages recieved by each SQS request

View Source
const DefaultTimeout = 30

Timeout for each handler function in seconds

View Source
const DefaultVisibilityTimeout = 60

SQS visibility Timeout

View Source
const DefaultWaitTimeSeconds = 20

Long-polling interval for SQS

View Source
const DefaultWorkers = 1

Number of worker goroutines to spawn, each runs the handler function

Variables

This section is empty.

Functions

This section is empty.

Types

type Callback

type Callback func([]byte, error)

type Handler

type Handler func(context.Context, *sqs.Message) ([]byte, error)

type HandlerTimeoutError

type HandlerTimeoutError struct{}

func (HandlerTimeoutError) Error

func (HandlerTimeoutError) Error() string

type Worker

type Worker struct {
	QueueInUrl  string
	QueueOutUrl string
	Queue       sqsiface.SQSAPI
	Session     *session.Session
	Consumers   int
	Logger      *zap.Logger
	Handler     Handler
	Callback    Callback
	Name        string
	Timeout     time.Duration
	// contains filtered or unexported fields
}
Example
package main

import (
	"context"
	"github.com/ajbeach2/sqsworker"
	"github.com/aws/aws-sdk-go/aws"
	"github.com/aws/aws-sdk-go/aws/session"
	"github.com/aws/aws-sdk-go/service/sqs"
	"strings"
)

func main() {
	var handlerFunction = func(ctx context.Context, m *sqs.Message) ([]byte, error) {
		return []byte(strings.ToLower(*m.Body)), nil
	}

	sess := session.New(&aws.Config{Region: aws.String("us-east-1")})

	w := sqsworker.NewWorker(sess, sqsworker.WorkerConfig{
		QueueIn:  "https://sqs.us-east-1.amazonaws.com/88888888888/In",
		QueueOut: "https://sqs.us-east-1.amazonaws.com/88888888888/Out",
		Workers:  1,
		Handler:  handlerFunction,
		Name:     "TestApp",
	})

	w.Run()
}

func NewWorker

func NewWorker(sess *session.Session, wc WorkerConfig) *Worker

func (*Worker) Close

func (w *Worker) Close()

func (*Worker) Run

func (w *Worker) Run()

type WorkerConfig

type WorkerConfig struct {
	QueueIn  string
	QueueOut string
	// If the number of workers is 0, the number of workers defaults to runtime.NumCPU()
	Workers  int
	Region   string
	Handler  Handler
	Callback Callback
	Name     string
	Timeout  int
	Logger   *zap.Logger
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL