sqsworker

package module
v1.0.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 31, 2019 License: MIT Imports: 10 Imported by: 0

README

sqsworker

Concurrent SQS Consumer written on Go. ⚠️WARNING⚠️: This repo is under active development, and there may be rapid and incompatible changes.

CircleCI GoDoc Maintainability Test Coverage Go Report Card Release

Documentation

The Worker type represents a SQS consumer that can process sqs messages from a SQS queue and optionally send the results to a sns topic. The intended use is multiple concurrent consumers reading from the same queue which execute the hander function defined on the Worker struct.

To use his package, first define a handler function. This can also be a closure:

var handlerFunction = func(ctx context.Context, m *sqs.Message, w *sns.PublishInput) error {
	*w.Message = strings.ToLower(*m.Body)
	return nil
}

The function must match the following type definition:

type Handler func(context.Context, *sqs.Message, *sns.PublishInput) error

A Worker Struct can be initialized with the NewWorker method, and you may optionally define an outbound topic, and number of concurrent workers. If the number of workers is not set, the number of workers defaults to runtime.NumCPU().

package main

import (
	"context"
	"github.com/ajbeach2/sqsworker"
	"github.com/aws/aws-sdk-go/aws"
	"github.com/aws/aws-sdk-go/aws/session"
	"github.com/aws/aws-sdk-go/service/sns"
	"github.com/aws/aws-sdk-go/service/sqs"
	"strings"
)

func ExampleWorker() {
	var handlerFunction = func(ctx context.Context, m *sqs.Message, w *sns.PublishInput) error {
		*w.Message = strings.ToLower(*m.Body)
		return nil
	}

	sess := session.New(&aws.Config{Region: aws.String("us-east-1")})

	w := sqsworker.NewWorker(sess, sqsworker.WorkerConfig{
		QueueUrl: "https://sqs.us-east-1.amazonaws.com/88888888888/In",
		TopicArn: "arn:aws:sns:us-east-1:88888888888:out",
		Workers:  1,
		Handler:  handlerFunction,
		Name:     "TestApp",
	})

	w.Run()
}

The worker will send messages to the TopicArn topic on succesfull runs.

Concurrency

Handler function will be called concurrently by multiple workers depending on the configuration, and it is best to ensure that handler function can be executed concurrently, especially if it is a closure and there is shared state.

Performance

Real world performace will be dictated by latency to sqs. The benchmarks mock sqs and sns calls to illustrate that the package adds very little overhead to consuming messages, and to ensure that memory is managed to not create more garbage collection than needed.

From the SQS documentation in AWS: https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-throughput-horizontal-scaling-and-batching.html

Because you access Amazon SQS through an HTTP request-response protocol, the request latency (the interval between initiating a request and receiving a response) limits the throughput that you can achieve from a single thread using a single connection. For example, if the latency from an Amazon EC2-based client to Amazon SQS in the same region averages 20 ms, the maximum throughput from a single thread over a single connection averages 50 TPS.

->cat /proc/cpuinfo | grep "model name" | head -1
model name	: Intel(R) Core(TM) i5-6600K CPU @ 3.50GHz

->go test -bench .
goos: linux
goarch: amd64
pkg: github.com/ajbeach2/sqsworker
BenchmarkWorker-4   	 2000000	       840 ns/op	      64 B/op	       1 allocs/op
PASS
ok  	github.com/ajbeach2/sqsworker	2.510s

Documentation

Overview

Package sqsworker implements a SQS consumer that can process sqs messages from a SQS queue and optionally send the results to a result tpoic.

Overview

The inenteded use of this package is for multiple consumers reading from the same queue. Consumers are represeted by concurrent handler functions that are managed by the Worker type. This package only does long-polling based sqs receives.

To use his package, first define a handler function.

var handlerFunction = func(ctx context.Context, m *sqs.Message, w *sns.PublishInput) error {
	*w.Message = strings.ToLower(*m.Body)
	return nil
}

The function must match the following type definition:

type Handler func(context.Context, *sqs.Message, *sns.PublishInput) error

A Worker Struct can be initialized with the NewWorker method, and you may optionally define an outbound topic, and number of concurrent workers. If the number of workers is not set, the number of workers defaults to runtime.NumCPU().

sess := session.New(&aws.Config{Region: aws.String("us-east-1")})
w := sqsworker.NewWorker(sess, sqsworker.WorkerConfig{
	QueueUrl:  "https://sqs.us-east-1.amazonaws.com/88888888888/In",
	TopicArn: "arn:aws:sns:us-east-1:88888888888:out",
	Workers:  4,
	Handler:  handlerFunction,
	Name:     "TestApp",
})
w.Run()

The worker will send messages to the TopicArn on successful runs.

Concurrency

Handler function will be called concurrently by multiple workers depending on the configuration. It is best to ensure that handler functions can be executed concurrently, especially if they are closures and share state.

Index

Examples

Constants

View Source
const DefaultMaxNumberOfMessages = 10

DefaultMaxNumberOfMessages amount of messages received by each SQS request

View Source
const DefaultVisibilityTimeout = 60

DefaultVisibilityTimeout SQS visibility Timeout

View Source
const DefaultWaitTimeSeconds = 20

DefaultWaitTimeSeconds Long-polling interval for SQS

View Source
const DefaultWorkers = 1

DefaultWorkers Number of worker goroutines to spawn, each runs the handler function

Variables

This section is empty.

Functions

This section is empty.

Types

type Callback

type Callback func(*string, error)

Callback which is passed result from handler on success

type Handler

type Handler func(context.Context, *sqs.Message, *sns.PublishInput) error

Handler for SQS consumers

type Worker

type Worker struct {
	QueueUrl  string
	TopicArn  string
	Queue     sqsiface.SQSAPI
	Topic     snsiface.SNSAPI
	Session   *session.Session
	Consumers int
	Logger    *zap.Logger
	Handler   Handler
	Callback  Callback
	Name      string
	// contains filtered or unexported fields
}

Worker encapsulates the SQS consumer

Example
package main

import (
	"context"
	"github.com/ajbeach2/sqsworker"
	"github.com/aws/aws-sdk-go/aws"
	"github.com/aws/aws-sdk-go/aws/session"
	"github.com/aws/aws-sdk-go/service/sns"
	"github.com/aws/aws-sdk-go/service/sqs"
	"strings"
)

func main() {
	var handlerFunction = func(ctx context.Context, m *sqs.Message, w *sns.PublishInput) error {
		*w.Message = strings.ToLower(*m.Body)
		return nil
	}

	sess := session.New(&aws.Config{Region: aws.String("us-east-1")})

	w := sqsworker.NewWorker(sess, sqsworker.WorkerConfig{
		QueueUrl: "https://sqs.us-east-1.amazonaws.com/88888888888/In",
		TopicArn: "arn:aws:sns:us-east-1:88888888888:out",
		Workers:  1,
		Handler:  handlerFunction,
		Name:     "TestApp",
	})

	w.Run()
}

func NewWorker

func NewWorker(sess *session.Session, wc WorkerConfig) *Worker

NewWorker constructor for SQS Worker

func (*Worker) Close

func (w *Worker) Close()

Close function will send a signal to all workers to exit

func (*Worker) Run

func (w *Worker) Run()

Run does the main consumer/producer loop

type WorkerConfig

type WorkerConfig struct {
	QueueUrl string
	TopicArn string
	// If the number of workers is 0, the number of workers defaults to runtime.NumCPU()
	Workers  int
	Handler  Handler
	Callback Callback
	Name     string
	Logger   *zap.Logger
}

WorkerConfig settings for Worker to be passed in NewWorker Contstuctor

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL