sqsprocessor

package module
v0.1.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 15, 2023 License: MIT Imports: 5 Imported by: 0

README

SQS Processor (WIP)

Attempt to wrap AWS SQS client with functionality similar to that of the gcloud pub/sub client.

Usage

The processor will simply run a given ProcessFunc over any messages found on a given SQS queue. ProcessFunc takes the sqs message body in as a string and decides how to decode and action on the message.

The ProcessFunc must return either ProcessResultAck or ProcessResultNack. Ack implies a success and leads to the message being deleted from the queue, whereas Nack will re-publish the message to the queue.

Example
type messageBody struct {
    ID string
    Action EnumType
}

func (s *service) process(ctx context.Context, msgBody string) (ret sqsprocessor.ProcessResult) {
    var message messageBody
    err := json.Unmarshal([]byte(msgBody), &message)
    if err != nil {
        return
    }
    err := s.DoAction(message.ID, message.Action)
    if err == nil {
        ret = sqsprocessor.ProcessResultAck
    }
    return
}


func main() {
    // initialise v2 sqs client
    c := newClient() 

	config := sqsprocessor.ProcessorConfig{
		Receive: sqs.ReceiveMessageInput{
			WaitTimeSeconds:     10,
			MaxNumberOfMessages: 10,
			VisibilityTimeout:   2,
		},
		NumWorkers: 10,
		Backoff:    time.Second,
	}

	p := sqsprocessor.NewProcessor(c, config)
	ctx, cancel := context.WithCancel(context.Background())
	done := make(chan struct{})


    cleanup := func() {
        cancel()
        <- done
    }

	go func() {
		p.Process(ctx, svc.process)
		close(done)
	}()

    ...

    cleanup()
}

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type ErrMessageExpired

type ErrMessageExpired struct {
	// contains filtered or unexported fields
}

func (ErrMessageExpired) Error

func (e ErrMessageExpired) Error() string

type ProcessFunc

type ProcessFunc func(ctx context.Context, msgBody string) ProcessResult

type ProcessResult

type ProcessResult uint8
const (
	ProcessResultNack ProcessResult = iota
	ProcessResultAck
)

type Processor

type Processor struct {
	// contains filtered or unexported fields
}

func New added in v0.1.4

func New(c SQSClienter, config ProcessorConfig) *Processor

func (*Processor) Errors

func (p *Processor) Errors() <-chan error

func (*Processor) Process

func (p *Processor) Process(ctx context.Context, pf ProcessFunc)

type ProcessorConfig

type ProcessorConfig struct {
	NumWorkers int
	// TODO abstract these?
	Receive        sqs.ReceiveMessageInput
	ReceiveOptions []func(*sqs.Options)
	Backoff        time.Duration
}

type SQSClienter

type SQSClienter interface {
	ReceiveMessage(ctx context.Context, params *sqs.ReceiveMessageInput, optFns ...func(*sqs.Options)) (*sqs.ReceiveMessageOutput, error)
	DeleteMessage(ctx context.Context, params *sqs.DeleteMessageInput, optFns ...func(*sqs.Options)) (*sqs.DeleteMessageOutput, error)
	ChangeMessageVisibility(ctx context.Context, params *sqs.ChangeMessageVisibilityInput, optFns ...func(*sqs.Options)) (*sqs.ChangeMessageVisibilityOutput, error)
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL