Documentation
¶
Overview ¶
Package sqsworker implements a SQS consumer that can process sqs messages from a SQS queue and optionally send the results to a result queue.
Overview ¶
The inenteded use of this package is for multiple consumers reading from the same queue. Consumers are represeted by concurrent handler functions hat are managed by the Worker type. This package only does long-polling based sqs recieves.
To use his package, first define a handler function. This can also be a closure:
var handlerFunction = func(ctx context.Context, m *sqs.Message) ([]byte, error) {
return []byte(strings.ToLower(*m.Body)), nil
}
The function must match the following type definition:
type Handler func(context.Context, *sqs.Message) ([]byte, error)
A Worker Struct can be initialized with the NewWorker method, and you may optionally define an outbound queue, and number of concurrent workers. If the number of workers is not set, the number of workers defaults to runtime.NumCPU(). A Timeout in seconds can be set where a handler will fail on a Timeout. The default, if this is not set, is 30 seconds.
sess := session.New(&aws.Config{Region: aws.String("us-east-1")})
w := sqsworker.NewWorker(sess, sqsworker.WorkerConfig{
QueueIn: "https://sqs.us-east-1.amazonaws.com/88888888888/In",
QueueOut: "https://sqs.us-east-1.amazonaws.com/88888888888/Out",
Workers: 4,
Timeout, 30, // Handler Timeout in seconds.
Handler: handlerFunction,
Name: "TestApp",
})
w.Run()
The worker will send messages to the QueuOut queue on succesfull runs.
Concurrency ¶
Handler function will be called concurrently by multiple workers depending on the configuration. It is best to ensure that handler functions can be executed concurrently, especially if they are closures and share state.
Index ¶
Examples ¶
Constants ¶
const DefaultMaxNumberOfMessages = 10
Default amount of messages recieved by each SQS request
const DefaultTimeout = 30
Timeout for each handler function in seconds
const DefaultVisibilityTimeout = 60
SQS visibility Timeout
const DefaultWaitTimeSeconds = 20
Long-polling interval for SQS
const DefaultWorkers = 1
Number of worker goroutines to spawn, each runs the handler function
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type HandlerTimeoutError ¶
type HandlerTimeoutError struct{}
func (HandlerTimeoutError) Error ¶
func (HandlerTimeoutError) Error() string
type Worker ¶
type Worker struct {
QueueInUrl string
QueueOutUrl string
Queue sqsiface.SQSAPI
Session *session.Session
Consumers int
Logger *zap.Logger
Handler Handler
Callback Callback
Name string
Timeout time.Duration
// contains filtered or unexported fields
}
Example ¶
package main
import (
"context"
"github.com/ajbeach2/sqsworker"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/sqs"
"strings"
)
func main() {
var handlerFunction = func(ctx context.Context, m *sqs.Message) ([]byte, error) {
return []byte(strings.ToLower(*m.Body)), nil
}
sess := session.New(&aws.Config{Region: aws.String("us-east-1")})
w := sqsworker.NewWorker(sess, sqsworker.WorkerConfig{
QueueIn: "https://sqs.us-east-1.amazonaws.com/88888888888/In",
QueueOut: "https://sqs.us-east-1.amazonaws.com/88888888888/Out",
Workers: 1,
Handler: handlerFunction,
Name: "TestApp",
})
w.Run()
}
Output: