Documentation
¶
Overview ¶
Package sqs provides an AWS SQS queue backend for the Slack Manager ecosystem. It implements FIFO queue consumption with automatic visibility timeout extension and a webhook handler that forwards HTTP callbacks to SQS queues.
Client ¶
Client reads messages from an SQS FIFO queue and delivers them to a caller-supplied channel as github.com/slackmgr/types.FifoQueueItem values. While a message is in flight, a background goroutine periodically extends its visibility timeout so it is not redelivered before processing completes. Callers signal completion by invoking the Ack or Nack callback on each item.
Create a client with New and initialise it with Client.Init:
client, err := sqs.New(&awsCfg, "events.fifo", logger,
sqs.WithSqsVisibilityTimeout(60),
).Init(ctx)
Then start consuming:
sinkCh := make(chan *types.FifoQueueItem)
go client.Receive(ctx, sinkCh)
for item := range sinkCh {
process(item)
item.Ack()
}
WebhookHandler ¶
WebhookHandler converts incoming HTTP webhook callbacks into SQS messages. It implements the github.com/slackmgr/types.WebhookHandler interface and supports both FIFO and standard queues. For FIFO queues the handler derives a per-message deduplication ID from a SHA-256 hash of the callback fields, preventing duplicate messages caused by webhook retries.
handler, err := sqs.NewWebhookHandler(&awsCfg).Init(ctx)
if handler.ShouldHandleWebhook(ctx, target) {
if err := handler.HandleWebhook(ctx, target, data, logger); err != nil {
log.Printf("webhook error: %v", err)
}
}
Configuration ¶
Both Client and WebhookHandler accept functional options that are passed to the constructor and take effect before Client.Init or WebhookHandler.Init is called. See the With* functions for available settings and their defaults.
Visibility Extension ¶
Visibility extension is best-effort. If an extension call fails (for example due to a transient network error or SQS throttling), the message is removed from extension tracking and will become visible in SQS again after the current timeout expires. Downstream processing should therefore be idempotent to handle potential duplicate deliveries gracefully.
A message is never extended beyond the duration set by WithMaxMessageExtension (default: 10 minutes). Once that limit is reached the message is dropped from tracking regardless of whether Ack or Nack has been called.
Index ¶
- type Client
- type Option
- func WithMaxMessageExtension(d time.Duration) Option
- func WithMaxOutstandingBytes(n int) Option
- func WithMaxOutstandingMessages(n int) Option
- func WithSQSClient(client sqsClient) Option
- func WithSqsAPIMaxRetryAttempts(n int) Option
- func WithSqsAPIMaxRetryBackoffDelay(d time.Duration) Option
- func WithSqsReceiveMaxNumberOfMessages(n int32) Option
- func WithSqsReceiveWaitTimeSeconds(seconds int32) Option
- func WithSqsVisibilityTimeout(seconds int32) Option
- type Options
- type WebhookHandler
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Client ¶
type Client struct {
// contains filtered or unexported fields
}
Client is an SQS queue consumer for the Slack Manager ecosystem. It reads messages from a FIFO SQS queue, extends their visibility timeouts automatically while processing is in progress, and delivers each message to a caller-supplied sink channel as a github.com/slackmgr/types.FifoQueueItem.
Create a Client with New, then call Client.Init once before any other method. Init is not thread-safe; all other methods are safe for concurrent use after Init returns.
func New ¶
New creates a Client configured to consume from the named SQS FIFO queue. The queue name must end with ".fifo"; this constraint is enforced by Client.Init.
Functional options may be passed to override defaults (see With* functions). The logger is automatically enriched with "plugin" and "queue_name" fields.
New does not connect to AWS. Call Client.Init to resolve the queue URL and start the background visibility-extension goroutine.
func (*Client) Init ¶
Init initializes the Client: validates options, resolves the queue URL via GetQueueUrl, and starts the background visibility-extension goroutine. It returns the receiver so that initialization can be chained with New:
client, err := sqs.New(&awsCfg, "events.fifo", logger).Init(ctx)
The provided context governs the background visibility-extension goroutine; cancelling it shuts the goroutine down cleanly.
Init is idempotent — subsequent calls on an already-initialized Client are no-ops. It is not thread-safe and must be called once during application startup before any concurrent access.
func (*Client) Receive ¶
Receive reads messages from the queue in a loop and sends each one to sinkCh as a github.com/slackmgr/types.FifoQueueItem. It closes sinkCh before returning.
Each delivered item exposes two callbacks:
- Ack deletes the message from SQS, signalling successful processing.
- Nack abandons the message without deleting it; SQS will redeliver it after the visibility timeout expires.
Receive pauses reads automatically when the number or total byte size of in-flight messages reaches the limits configured by WithMaxOutstandingMessages or WithMaxOutstandingBytes. On a transient receive error it logs the failure and retries after a 5-second backoff.
Receive blocks until ctx is cancelled, at which point it returns ctx.Err(). It must be called in its own goroutine. Client.Init must have been called successfully before Receive is invoked.
func (*Client) Send ¶
Send publishes a single message to the FIFO queue.
groupID is used as the SQS MessageGroupId, which determines message ordering within the queue. dedupID is used as the SQS MessageDeduplicationId; SQS will silently discard messages with a duplicate ID within the 5-minute deduplication window. Both fields are required and must be non-empty.
Send requires Client.Init to have been called successfully.
type Option ¶
type Option func(*Options)
Option is a functional option for configuring a Client. Options are passed to New and applied before Client.Init is called.
func WithMaxMessageExtension ¶
WithMaxMessageExtension sets the maximum total duration for which a message's visibility timeout may be extended after it was first received. Once a message reaches this age it is dropped from extension tracking and will become visible in SQS again after the current timeout expires. Must be between 1 minute and 1 hour. Default: 10 minutes.
func WithMaxOutstandingBytes ¶
WithMaxOutstandingBytes sets the maximum total byte size of in-flight messages before Client.Receive pauses reading from the queue. Must be at least 10 KB (10240 bytes). Default: 1 MB (1048576 bytes).
func WithMaxOutstandingMessages ¶
WithMaxOutstandingMessages sets the maximum number of in-flight messages before Client.Receive pauses reading from the queue. This prevents unbounded memory growth when processing is slower than delivery. Must be at least 1. Default: 100.
func WithSQSClient ¶
func WithSQSClient(client sqsClient) Option
WithSQSClient replaces the default AWS SQS client with a custom implementation of the internal sqsClient interface. This option is intended for testing with mock or stub clients.
func WithSqsAPIMaxRetryAttempts ¶
WithSqsAPIMaxRetryAttempts sets the maximum number of retry attempts for failed SQS API calls. Must be between 0 and 10. Default: 5.
func WithSqsAPIMaxRetryBackoffDelay ¶
WithSqsAPIMaxRetryBackoffDelay sets the maximum backoff delay between consecutive SQS API retry attempts. Must be between 1 second and 30 seconds. Default: 10 seconds.
func WithSqsReceiveMaxNumberOfMessages ¶
WithSqsReceiveMaxNumberOfMessages sets the maximum number of messages returned by a single ReceiveMessage API call. Must be between 1 and 10. Default: 1.
func WithSqsReceiveWaitTimeSeconds ¶
WithSqsReceiveWaitTimeSeconds sets the long-poll wait duration for each ReceiveMessage API call. Longer values reduce empty responses and API costs. Must be between 3 and 20 seconds. Default: 20.
func WithSqsVisibilityTimeout ¶
WithSqsVisibilityTimeout sets the visibility timeout applied to each received message. While a message is hidden from other consumers its timeout is extended automatically by the background goroutine. Must be between 10 and 3600 seconds. Default: 30.
type Options ¶
type Options struct {
// contains filtered or unexported fields
}
Options holds the resolved configuration for a Client. All fields are set to sensible defaults by New; use With* functions to override individual values.
type WebhookHandler ¶
type WebhookHandler struct {
// contains filtered or unexported fields
}
WebhookHandler converts incoming HTTP webhook callbacks into SQS messages. It implements the types.WebhookHandler interface and routes each callback to either a FIFO or a standard SQS queue based on the target URL.
For FIFO queues the handler sets the message group ID to the Slack channel ID and derives a deduplication ID from a SHA-256 hash of the callback fields, preventing duplicate messages caused by webhook retries.
Create a WebhookHandler with NewWebhookHandler and call WebhookHandler.Init once before handling any requests. Init is not thread-safe; all other methods are safe for concurrent use after Init returns.
func NewWebhookHandler ¶
func NewWebhookHandler(awsCfg *aws.Config) *WebhookHandler
NewWebhookHandler creates a WebhookHandler that uses awsCfg to construct its SQS client during WebhookHandler.Init.
NewWebhookHandler does not connect to AWS. Call WebhookHandler.Init before handling any requests.
func (*WebhookHandler) HandleWebhook ¶
func (c *WebhookHandler) HandleWebhook(ctx context.Context, queueURL string, data *types.WebhookCallback, logger types.Logger) error
HandleWebhook marshals data as JSON and sends it to the SQS queue at queueURL. The queue type is detected from the URL suffix:
- FIFO queues (URL ends with ".fifo"): the message group ID is set to data.ChannelID and a deduplication ID is derived from a SHA-256 hash of the channel ID, message ID, callback ID, and nanosecond timestamp.
- Standard queues: the message is sent without a group or dedup ID.
HandleWebhook requires WebhookHandler.Init to have been called successfully.
func (*WebhookHandler) Init ¶
func (c *WebhookHandler) Init(_ context.Context) (*WebhookHandler, error)
Init initializes the WebhookHandler by constructing the underlying SQS client from the AWS configuration supplied to NewWebhookHandler. It returns the receiver so that initialization can be chained:
handler, err := sqs.NewWebhookHandler(&awsCfg).Init(ctx)
Init is idempotent — subsequent calls on an already-initialized handler are no-ops. It is not thread-safe and must be called once during application startup before any concurrent access.
func (*WebhookHandler) ShouldHandleWebhook ¶
func (c *WebhookHandler) ShouldHandleWebhook(_ context.Context, target string) bool
ShouldHandleWebhook reports whether the handler should process a webhook with the given target URL. It returns true when target begins with "https://sqs.", matching any AWS SQS queue endpoint URL.