sqsprocessor

package module
v0.2.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 30, 2023 License: MIT Imports: 6 Imported by: 0

README

SQS Processor

Attempt to wrap AWS SQS client with functionality similar to that of the gcloud pub/sub client.

Usage

The processor will simply run a given ProcessFunc over any messages found on a given SQS queue. ProcessFunc takes the sqs message body in as a string and decides how to decode and action on the message.

The ProcessFunc must return either ProcessResultAck or ProcessResultNack. Ack implies a success and leads to the message being deleted from the queue, whereas Nack will re-publish the message to the queue.

Example
import (
    "context"
    "time"

    "github.com/barrett370/sqs-processor/middleware"
    sqsprocessor "github.com/barrett370/sqs-processor"
    "github.com/aws/aws-sdk-go-v2/service/sqs"
)

type messageBody struct {
    ID string
    Action EnumType
}

func (s *service) process(ctx context.Context, message messageBody) (ret sqsprocessor.ProcessResult) {
    err := s.DoAction(message.ID, message.Action)
    if err == nil {
        ret = sqsprocessor.ProcessResultAck
    }
    return
}


func main() {
    // initialise v2 sqs client
    c := newClient() 

	config := sqsprocessor.ProcessorConfig{
		Receive: sqs.ReceiveMessageInput{
			WaitTimeSeconds:     10,
			MaxNumberOfMessages: 10,
			VisibilityTimeout:   2,
		},
		NumWorkers: 10,
		Backoff:    time.Second,
	}

	p := sqsprocessor.NewProcessor(c, config)
	ctx, cancel := context.WithCancel(context.Background())
	done := make(chan struct{})


    cleanup := func() {
        cancel()
        <- done
    }

	go func() {
		p.Process(ctx, middleware.JSONDecode(svc.process))
		close(done)
	}()

    // some other code

    cleanup()
}

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type ErrMessageExpired

type ErrMessageExpired struct {
	// contains filtered or unexported fields
}

ErrMessageExpired is returned when a message with an expired deadline is encountered and processing is abandoned

func (ErrMessageExpired) Error

func (e ErrMessageExpired) Error() string

type Middleware added in v0.2.0

type Middleware func(ProcessFunc) ProcessFunc

type ProcessFunc

type ProcessFunc func(ctx context.Context, msg types.Message) ProcessResult

ProcessFunc is the signature of functions the Processor and its workers can run on received messages

type ProcessResult

type ProcessResult uint8

ProcessResult is an enum used to signal success or failure when processing a message in a ProcessFunc

const (
	// ProcessResultNack is the default value of ProcessResult
	// and indicates processing of a message failed and the
	// message should be re-published to the queue
	ProcessResultNack ProcessResult = iota
	// ProcessResultAck is the success variant of ProcessResult
	// and indicates that a message was successfully processed
	// and can be deleted from the queue
	ProcessResultAck
)

type Processor

type Processor struct {
	// contains filtered or unexported fields
}

Processor is the struct which orchestrates polling for messages as well as starting and feeding a configured number of workers in a pool

func New added in v0.1.4

func New(c SQSClienter, config ProcessorConfig) *Processor

New returns a pointer to a new Processor given a config and sqs client

func (*Processor) Errors

func (p *Processor) Errors() <-chan error

Errors returns a channel on which errors encountered by a Processor or Worker are sent

func (*Processor) Process

func (p *Processor) Process(ctx context.Context, pf ProcessFunc)

Process starts all workers and polls for messages on the configured sqs queue Any message received will have the given ProcessFunc executed on it Process will exit when the provided context is cancelled

type ProcessorConfig

type ProcessorConfig struct {
	NumWorkers int
	// TODO abstract these?
	Receive        sqs.ReceiveMessageInput
	ReceiveOptions []func(*sqs.Options)
	Backoff        time.Duration
}

type SQSClienter

type SQSClienter interface {
	ReceiveMessage(ctx context.Context, params *sqs.ReceiveMessageInput, optFns ...func(*sqs.Options)) (*sqs.ReceiveMessageOutput, error)
	DeleteMessage(ctx context.Context, params *sqs.DeleteMessageInput, optFns ...func(*sqs.Options)) (*sqs.DeleteMessageOutput, error)
	ChangeMessageVisibility(ctx context.Context, params *sqs.ChangeMessageVisibilityInput, optFns ...func(*sqs.Options)) (*sqs.ChangeMessageVisibilityOutput, error)
}

SQSClienter encapsulates all sqs methods a Processor will use

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL