sqs

package
v0.0.0-...-84a1714 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 8, 2026 License: Apache-2.0 Imports: 10 Imported by: 0

Documentation

Overview

Package sqs hosts a Murmur pipeline behind an AWS Lambda SQS trigger.

SQS is the third common Lambda event source after Kinesis and DynamoDB Streams. It's the right shape for:

  • Job-style ingest where producers POST aggregation events to a queue and the consumer is a Murmur pipeline (e.g., a webhook receiver buffers incoming events into SQS, the Lambda fans them into Murmur).
  • Cross-account ingest where multiple producer accounts deliver into a shared queue.
  • Burst absorption — SQS holds up to 14 days; the consumer Lambda scales on visible-message count rather than shard count.

Symmetric peer of pkg/exec/lambda/kinesis and pkg/exec/lambda/dynamodbstreams: same retry / dedup / metrics / BatchItemFailures semantics; the only difference is the input event shape and the dedup key derivation.

Wire it up:

func main() {
    pipe := buildPipeline()
    handler, err := sqs.NewHandler(pipe, sqs.JSONDecoder[Event](),
        sqs.WithMetrics(rec),
        sqs.WithDedup(deduper),
    )
    if err != nil { log.Fatal(err) }
    lambda.Start(handler)
}

Partial-batch failure handling

Records that exhaust their retry budget are reported via BatchItemFailures with the SQS message ID as ItemIdentifier. Configure your event-source mapping with `FunctionResponseTypes=["ReportBatchItemFailures"]` so SQS only redelivers the failures rather than retrying the whole batch. Without this flag, a single poison message would force the entire batch to redeliver until the message hit its maxReceiveCount and went to a DLQ.

Dedup key

Default EventID = "<event-source-ARN>/<MessageId>". SQS messages have stable IDs across redeliveries (the message ID is stable; the receipt handle is not). For FIFO queues with content-based dedup, you may prefer to derive the EventID from the message body's natural key — pass a custom EventIDFn via WithEventID.

Visibility timeout interactions

SQS event-source mappings handle visibility-timeout extension automatically; the Lambda runtime extends the timeout for in-flight messages so a long-running handler doesn't trigger a redelivery. Keep the per-record retry budget reasonable so a stuck record doesn't burn the entire visibility window.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Decoder

type Decoder[T any] func(body string) (T, error)

Decoder converts a raw SQS message body to the pipeline's input type T.

func JSONDecoder

func JSONDecoder[T any]() Decoder[T]

JSONDecoder returns a Decoder that unmarshals the message body as JSON into T.

type Handler

Handler is the Lambda handler signature for SQS triggers — pass the returned value directly to lambda.Start.

func NewHandler

func NewHandler[T any, V any](
	p *pipeline.Pipeline[T, V],
	decode Decoder[T],
	opts ...HandlerOption[T],
) (Handler, error)

NewHandler builds a Lambda handler that drives a Murmur pipeline from an SQS trigger. The pipeline must be Build-validated; the supplied Decoder converts each message body to the pipeline's input type T. The pipeline's Source field is unused — Lambda owns polling.

Returns an error if the pipeline's required fields (Key, Value, Aggregate, StoreIn) are not set, or if Decode is nil.

type HandlerOption

type HandlerOption[T any] func(*handlerConfig[T])

HandlerOption configures NewHandler.

func WithClock

func WithClock[T any](now func() time.Time) HandlerOption[T]

WithClock overrides time.Now for windowed-bucket assignment. Useful for tests with deterministic clocks.

func WithDecodeErrorCallback

func WithDecodeErrorCallback[T any](fn func(body string, messageID string, err error)) HandlerOption[T]

WithDecodeErrorCallback installs a callback for messages whose Decode returned an error. Default behavior is to drop silently. Decode failures are NOT redelivered (poison pill loop) — wire to a DLQ producer to move poison messages off the hot path.

func WithDedup

func WithDedup[T any](d state.Deduper) HandlerOption[T]

WithDedup installs a state.Deduper. Each SQS message's EventID (default: "<event-source-ARN>/<MessageId>") is claimed via MarkSeen before the merge runs; on a duplicate, the merge is skipped and the message is counted as processed.

Strongly recommended in production: SQS at-least-once delivery means duplicate redeliveries are normal during visibility-timeout edge cases and during partial-batch redelivery.

func WithEventID

func WithEventID[T any](fn func(msg *events.SQSMessage, decoded T) string) HandlerOption[T]

WithEventID overrides the default EventID derivation. Default uses "<event-source-ARN>/<MessageId>" — globally unique across the queue's history and stable across redeliveries.

For FIFO queues with content-based dedup, or for cases where the message payload carries an upstream identifier (a Mongo `_id`, a Kafka offset reference, etc.), supply an extractor that returns that identifier so re-deliveries of the same logical change fold idempotently.

Return "" to disable dedup for that message (it will be processed as if no Deduper were configured).

func WithMaxAttempts

func WithMaxAttempts[T any](n int) HandlerOption[T]

WithMaxAttempts sets the per-record retry budget. Defaults to 3.

func WithMetrics

func WithMetrics[T any](r metrics.Recorder) HandlerOption[T]

WithMetrics installs a metrics.Recorder. Defaults to metrics.Noop{}. The handler records events under the pipeline's Name; retries under "<name>:retry"; dedup skips under "<name>:dedup_skip"; dead letters under "<name>:dead_letter".

func WithRetryBackoff

func WithRetryBackoff[T any](base, max time.Duration) HandlerOption[T]

WithRetryBackoff configures the per-attempt sleep schedule. Doubles after each failure starting from base, capped at max, with full jitter. Defaults to 50 ms / 5 s.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL