sqsprocessor

package module
v0.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 27, 2023 License: MIT Imports: 7 Imported by: 0

README

SQS Processor

Attempt to wrap AWS SQS client with functionality similar to that of the gcloud pub/sub client.

Usage

The processor will simply run a given ProcessFunc over any messages found on a given SQS queue. ProcessFunc takes the sqs message body in as a string and decides how to decode and action on the message.

The ProcessFunc must return either ProcessResultAck or ProcessResultNack. Ack implies a success and leads to the message being deleted from the queue, whereas Nack will re-publish the message to the queue.

Example
import (
    "context"
    "time"

    "github.com/barrett370/sqs-processor/middleware"
    sqsprocessor "github.com/barrett370/sqs-processor"
    "github.com/aws/aws-sdk-go-v2/service/sqs"
)

type messageBody struct {
    ID string
    Action EnumType
}

func (s *service) process(ctx context.Context, message messageBody) (ret sqsprocessor.ProcessResult) {
    err := s.DoAction(message.ID, message.Action)
    if err == nil {
        ret = sqsprocessor.ProcessResultAck
    }
    return
}


func main() {
    // initialise v2 sqs client
    c := newClient() 

	config := sqsprocessor.ProcessorConfig{
		Receive: sqs.ReceiveMessageInput{
			WaitTimeSeconds:     10,
			MaxNumberOfMessages: 10,
			VisibilityTimeout:   2,
		},
		NumWorkers: 10,
		Backoff:    time.Second,
	}

	p := sqsprocessor.NewProcessor(c, config)
	ctx, cancel := context.WithCancel(context.Background())
	done := make(chan struct{})


    cleanup := func() {
        cancel()
        <- done
    }

	go func() {
		p.Process(ctx, middleware.JSONDecode(svc.process))
		close(done)
	}()

    // some other code

    cleanup()
}

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type ErrMessageExpired

type ErrMessageExpired struct {
	// contains filtered or unexported fields
}

func (ErrMessageExpired) Error

func (e ErrMessageExpired) Error() string

type Middleware added in v0.2.0

type Middleware func(ProcessFunc) ProcessFunc

type ProcessFunc

type ProcessFunc func(ctx context.Context, msg types.Message) ProcessResult

type ProcessResult

type ProcessResult uint8
const (
	ProcessResultNack ProcessResult = iota
	ProcessResultAck
)

type Processor

type Processor struct {
	// contains filtered or unexported fields
}

func New added in v0.1.4

func New(c SQSClienter, config ProcessorConfig) *Processor

func (*Processor) Errors

func (p *Processor) Errors() <-chan error

func (*Processor) Process

func (p *Processor) Process(ctx context.Context, pf ProcessFunc)

type ProcessorConfig

type ProcessorConfig struct {
	NumWorkers int
	// TODO abstract these?
	Receive        sqs.ReceiveMessageInput
	ReceiveOptions []func(*sqs.Options)
	Backoff        time.Duration
}

type SQSClienter

type SQSClienter interface {
	ReceiveMessage(ctx context.Context, params *sqs.ReceiveMessageInput, optFns ...func(*sqs.Options)) (*sqs.ReceiveMessageOutput, error)
	DeleteMessage(ctx context.Context, params *sqs.DeleteMessageInput, optFns ...func(*sqs.Options)) (*sqs.DeleteMessageOutput, error)
	ChangeMessageVisibility(ctx context.Context, params *sqs.ChangeMessageVisibilityInput, optFns ...func(*sqs.Options)) (*sqs.ChangeMessageVisibilityOutput, error)
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL