sqs

package module
v0.2.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 22, 2026 License: MIT Imports: 16 Imported by: 0

README

sqs

Go Reference Go Report Card CI

An AWS SQS queue backend for Slack Manager. Provides FIFO queue consumption with automatic visibility timeout extension and a webhook handler for forwarding HTTP callbacks to SQS queues. Uses AWS SDK v2.

Part of the slackmgr/plugins monorepo. Versioned independently using the sqs/vX.Y.Z tag convention.

Installation

go get github.com/slackmgr/plugins/sqs

Requires Go 1.25+.

Usage

Client

The Client reads messages from an SQS FIFO queue and delivers them to a sink channel. Each message is wrapped with Ack and Nack callbacks, and its visibility timeout is automatically extended in the background while processing is ongoing.

import (
    "github.com/aws/aws-sdk-go-v2/config"
    sqsplugin "github.com/slackmgr/plugins/sqs"
    "github.com/slackmgr/types"
)

awsCfg, err := config.LoadDefaultConfig(ctx)
if err != nil {
    log.Fatal(err)
}

client, err := sqsplugin.New(&awsCfg, "my-queue.fifo", logger,
    sqsplugin.WithSqsVisibilityTimeout(30),
    sqsplugin.WithMaxMessageExtension(10*time.Minute),
).Init(ctx)
if err != nil {
    log.Fatal(err)
}

sinkCh := make(chan *types.FifoQueueItem)

go func() {
    if err := client.Receive(ctx, sinkCh); err != nil && !errors.Is(err, context.Canceled) {
        log.Fatal(err)
    }
}()

for item := range sinkCh {
    // process item...
    item.Ack()
}

Init validates options, resolves the queue URL, and starts the background visibility extender. Receive is a blocking loop that must be run in a goroutine; it closes sinkCh when it returns.

Call Ack() on a message to delete it from the queue after successful processing. Call Nack() to abandon it — the message will become visible again in SQS after the visibility timeout expires.

Webhook Handler

WebhookHandler converts incoming HTTP webhook callbacks into SQS messages. It implements the types.WebhookHandler interface and supports both FIFO and standard queues. For FIFO queues, it uses the channel ID as the message group and a SHA256 hash of the callback fields as the deduplication ID.

handler, err := sqsplugin.NewWebhookHandler(&awsCfg).Init(ctx)
if err != nil {
    log.Fatal(err)
}

// ShouldHandleWebhook returns true for targets starting with "https://sqs."
if handler.ShouldHandleWebhook(ctx, target) {
    if err := handler.HandleWebhook(ctx, queueURL, webhookData, logger); err != nil {
        log.Printf("webhook error: %v", err)
    }
}

Configuration

All options are provided via With* constructor functions passed to New.

Option Default Description
WithSqsVisibilityTimeout(int32) 30 seconds Visibility timeout applied to received messages
WithSqsReceiveMaxNumberOfMessages(int32) 1 Maximum messages per receive call (1–10)
WithSqsReceiveWaitTimeSeconds(int32) 20 seconds Long-poll wait time per receive call (3–20)
WithSqsAPIMaxRetryAttempts(int) 5 Maximum SQS API retry attempts (0–10)
WithSqsAPIMaxRetryBackoffDelay(time.Duration) 10s Maximum backoff delay between retries (1s–30s)
WithMaxMessageExtension(time.Duration) 10m Maximum total visibility extension per message (1m–1h)
WithMaxOutstandingMessages(int) 100 Maximum number of in-flight messages before pausing reads
WithMaxOutstandingBytes(int) 1048576 (1 MB) Maximum total in-flight message bytes before pausing reads
WithSQSClient(sqsClient) (AWS default) Custom SQS client implementation (useful for testing)

SQS Queue Setup

Client requirements

The Client requires an SQS FIFO queue. Init returns an error if the queue name does not end with .fifo.

Property Value
Queue type FIFO (name must end with .fifo)
Content-based deduplication Optional — the client always supplies an explicit deduplication ID

Messages are grouped by Slack channel ID using the SQS MessageGroupId attribute.

Visibility timeout extension

The background extender checks in-flight messages at an interval of one-third of the configured visibility timeout (minimum 5 seconds). It extends the timeout for any message that has consumed more than half of its current visibility window. Extension is best-effort — if an extension fails, the message is removed from tracking and may be redelivered. Downstream processing should be idempotent.

A message will not be extended beyond the duration set by WithMaxMessageExtension. After that limit is reached, the message is dropped from tracking and will become visible in SQS again.

Webhook handler requirements

The WebhookHandler supports both FIFO and standard queues. For FIFO queues, the target URL must end with .fifo. For standard queues, no deduplication ID is used.

License

This project is licensed under the MIT License — see the LICENSE file for details.

Copyright (c) 2026 Peter Aglen

Documentation

Overview

Package sqs provides an AWS SQS queue backend for the Slack Manager ecosystem. It implements FIFO queue consumption with automatic visibility timeout extension and a webhook handler that forwards HTTP callbacks to SQS queues.

Client

Client reads messages from an SQS FIFO queue and delivers them to a caller-supplied channel as github.com/slackmgr/types.FifoQueueItem values. While a message is in flight, a background goroutine periodically extends its visibility timeout so it is not redelivered before processing completes. Callers signal completion by invoking the Ack or Nack callback on each item.

Create a client with New and initialise it with Client.Init:

client, err := sqs.New(&awsCfg, "events.fifo", logger,
    sqs.WithSqsVisibilityTimeout(60),
).Init(ctx)

Then start consuming:

sinkCh := make(chan *types.FifoQueueItem)
go client.Receive(ctx, sinkCh)
for item := range sinkCh {
    process(item)
    item.Ack()
}

WebhookHandler

WebhookHandler converts incoming HTTP webhook callbacks into SQS messages. It implements the github.com/slackmgr/types.WebhookHandler interface and supports both FIFO and standard queues. For FIFO queues the handler derives a per-message deduplication ID from a SHA-256 hash of the callback fields, preventing duplicate messages caused by webhook retries.

handler, err := sqs.NewWebhookHandler(&awsCfg).Init(ctx)
if handler.ShouldHandleWebhook(ctx, target) {
    if err := handler.HandleWebhook(ctx, target, data, logger); err != nil {
        log.Printf("webhook error: %v", err)
    }
}

Configuration

Both Client and WebhookHandler accept functional options that are passed to the constructor and take effect before Client.Init or WebhookHandler.Init is called. See the With* functions for available settings and their defaults.

Visibility Extension

Visibility extension is best-effort. If an extension call fails (for example due to a transient network error or SQS throttling), the message is removed from extension tracking and will become visible in SQS again after the current timeout expires. Downstream processing should therefore be idempotent to handle potential duplicate deliveries gracefully.

A message is never extended beyond the duration set by WithMaxMessageExtension (default: 10 minutes). Once that limit is reached the message is dropped from tracking regardless of whether Ack or Nack has been called.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Client

type Client struct {
	// contains filtered or unexported fields
}

Client is an SQS queue consumer for the Slack Manager ecosystem. It reads messages from a FIFO SQS queue, extends their visibility timeouts automatically while processing is in progress, and delivers each message to a caller-supplied sink channel as a github.com/slackmgr/types.FifoQueueItem.

Create a Client with New, then call Client.Init once before any other method. Init is not thread-safe; all other methods are safe for concurrent use after Init returns.

func New

func New(awsCfg *aws.Config, queueName string, logger types.Logger, opts ...Option) *Client

New creates a Client configured to consume from the named SQS FIFO queue. The queue name must end with ".fifo"; this constraint is enforced by Client.Init.

Functional options may be passed to override defaults (see With* functions). The logger is automatically enriched with "plugin" and "queue_name" fields.

New does not connect to AWS. Call Client.Init to resolve the queue URL and start the background visibility-extension goroutine.

func (*Client) Init

func (c *Client) Init(ctx context.Context) (*Client, error)

Init initializes the Client: validates options, resolves the queue URL via GetQueueUrl, and starts the background visibility-extension goroutine. It returns the receiver so that initialization can be chained with New:

client, err := sqs.New(&awsCfg, "events.fifo", logger).Init(ctx)

The provided context governs the background visibility-extension goroutine; cancelling it shuts the goroutine down cleanly.

Init is idempotent — subsequent calls on an already-initialized Client are no-ops. It is not thread-safe and must be called once during application startup before any concurrent access.

func (*Client) Name

func (c *Client) Name() string

Name returns the SQS queue name supplied to New.

func (*Client) Receive

func (c *Client) Receive(ctx context.Context, sinkCh chan<- *types.FifoQueueItem) error

Receive reads messages from the queue in a loop and sends each one to sinkCh as a github.com/slackmgr/types.FifoQueueItem. It closes sinkCh before returning.

Each delivered item exposes two callbacks:

  • Ack deletes the message from SQS, signalling successful processing.
  • Nack abandons the message without deleting it; SQS will redeliver it after the visibility timeout expires.

Receive pauses reads automatically when the number or total byte size of in-flight messages reaches the limits configured by WithMaxOutstandingMessages or WithMaxOutstandingBytes. On a transient receive error it logs the failure and retries after a 5-second backoff.

Receive blocks until ctx is cancelled, at which point it returns ctx.Err(). It must be called in its own goroutine. Client.Init must have been called successfully before Receive is invoked.

func (*Client) Send

func (c *Client) Send(ctx context.Context, groupID, dedupID, body string) error

Send publishes a single message to the FIFO queue.

groupID is used as the SQS MessageGroupId, which determines message ordering within the queue. dedupID is used as the SQS MessageDeduplicationId; SQS will silently discard messages with a duplicate ID within the 5-minute deduplication window. Both fields are required and must be non-empty.

Send requires Client.Init to have been called successfully.

type Option

type Option func(*Options)

Option is a functional option for configuring a Client. Options are passed to New and applied before Client.Init is called.

func WithMaxMessageExtension

func WithMaxMessageExtension(d time.Duration) Option

WithMaxMessageExtension sets the maximum total duration for which a message's visibility timeout may be extended after it was first received. Once a message reaches this age it is dropped from extension tracking and will become visible in SQS again after the current timeout expires. Must be between 1 minute and 1 hour. Default: 10 minutes.

func WithMaxOutstandingBytes

func WithMaxOutstandingBytes(n int) Option

WithMaxOutstandingBytes sets the maximum total byte size of in-flight messages before Client.Receive pauses reading from the queue. Must be at least 10 KB (10240 bytes). Default: 1 MB (1048576 bytes).

func WithMaxOutstandingMessages

func WithMaxOutstandingMessages(n int) Option

WithMaxOutstandingMessages sets the maximum number of in-flight messages before Client.Receive pauses reading from the queue. This prevents unbounded memory growth when processing is slower than delivery. Must be at least 1. Default: 100.

func WithSQSClient

func WithSQSClient(client sqsClient) Option

WithSQSClient replaces the default AWS SQS client with a custom implementation of the internal sqsClient interface. This option is intended for testing with mock or stub clients.

func WithSqsAPIMaxRetryAttempts

func WithSqsAPIMaxRetryAttempts(n int) Option

WithSqsAPIMaxRetryAttempts sets the maximum number of retry attempts for failed SQS API calls. Must be between 0 and 10. Default: 5.

func WithSqsAPIMaxRetryBackoffDelay

func WithSqsAPIMaxRetryBackoffDelay(d time.Duration) Option

WithSqsAPIMaxRetryBackoffDelay sets the maximum backoff delay between consecutive SQS API retry attempts. Must be between 1 second and 30 seconds. Default: 10 seconds.

func WithSqsReceiveMaxNumberOfMessages

func WithSqsReceiveMaxNumberOfMessages(n int32) Option

WithSqsReceiveMaxNumberOfMessages sets the maximum number of messages returned by a single ReceiveMessage API call. Must be between 1 and 10. Default: 1.

func WithSqsReceiveWaitTimeSeconds

func WithSqsReceiveWaitTimeSeconds(seconds int32) Option

WithSqsReceiveWaitTimeSeconds sets the long-poll wait duration for each ReceiveMessage API call. Longer values reduce empty responses and API costs. Must be between 3 and 20 seconds. Default: 20.

func WithSqsVisibilityTimeout

func WithSqsVisibilityTimeout(seconds int32) Option

WithSqsVisibilityTimeout sets the visibility timeout applied to each received message. While a message is hidden from other consumers its timeout is extended automatically by the background goroutine. Must be between 10 and 3600 seconds. Default: 30.

type Options

type Options struct {
	// contains filtered or unexported fields
}

Options holds the resolved configuration for a Client. All fields are set to sensible defaults by New; use With* functions to override individual values.

type WebhookHandler

type WebhookHandler struct {
	// contains filtered or unexported fields
}

WebhookHandler converts incoming HTTP webhook callbacks into SQS messages. It implements the types.WebhookHandler interface and routes each callback to either a FIFO or a standard SQS queue based on the target URL.

For FIFO queues the handler sets the message group ID to the Slack channel ID and derives a deduplication ID from a SHA-256 hash of the callback fields, preventing duplicate messages caused by webhook retries.

Create a WebhookHandler with NewWebhookHandler and call WebhookHandler.Init once before handling any requests. Init is not thread-safe; all other methods are safe for concurrent use after Init returns.

func NewWebhookHandler

func NewWebhookHandler(awsCfg *aws.Config) *WebhookHandler

NewWebhookHandler creates a WebhookHandler that uses awsCfg to construct its SQS client during WebhookHandler.Init.

NewWebhookHandler does not connect to AWS. Call WebhookHandler.Init before handling any requests.

func (*WebhookHandler) HandleWebhook

func (c *WebhookHandler) HandleWebhook(ctx context.Context, queueURL string, data *types.WebhookCallback, logger types.Logger) error

HandleWebhook marshals data as JSON and sends it to the SQS queue at queueURL. The queue type is detected from the URL suffix:

  • FIFO queues (URL ends with ".fifo"): the message group ID is set to data.ChannelID and a deduplication ID is derived from a SHA-256 hash of the channel ID, message ID, callback ID, and nanosecond timestamp.
  • Standard queues: the message is sent without a group or dedup ID.

HandleWebhook requires WebhookHandler.Init to have been called successfully.

func (*WebhookHandler) Init

Init initializes the WebhookHandler by constructing the underlying SQS client from the AWS configuration supplied to NewWebhookHandler. It returns the receiver so that initialization can be chained:

handler, err := sqs.NewWebhookHandler(&awsCfg).Init(ctx)

Init is idempotent — subsequent calls on an already-initialized handler are no-ops. It is not thread-safe and must be called once during application startup before any concurrent access.

func (*WebhookHandler) ShouldHandleWebhook

func (c *WebhookHandler) ShouldHandleWebhook(_ context.Context, target string) bool

ShouldHandleWebhook reports whether the handler should process a webhook with the given target URL. It returns true when target begins with "https://sqs.", matching any AWS SQS queue endpoint URL.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL