Documentation
¶
Overview ¶
Package sqs hosts a Murmur pipeline behind an AWS Lambda SQS trigger.
SQS is the third common Lambda event source after Kinesis and DynamoDB Streams. It's the right shape for:
- Job-style ingest where producers POST aggregation events to a queue and the consumer is a Murmur pipeline (e.g., a webhook receiver buffers incoming events into SQS, the Lambda fans them into Murmur).
- Cross-account ingest where multiple producer accounts deliver into a shared queue.
- Burst absorption — SQS holds up to 14 days; the consumer Lambda scales on visible-message count rather than shard count.
Symmetric peer of pkg/exec/lambda/kinesis and pkg/exec/lambda/dynamodbstreams: same retry / dedup / metrics / BatchItemFailures semantics; the only difference is the input event shape and the dedup key derivation.
Wire it up:
func main() {
pipe := buildPipeline()
handler, err := sqs.NewHandler(pipe, sqs.JSONDecoder[Event](),
sqs.WithMetrics(rec),
sqs.WithDedup(deduper),
)
if err != nil { log.Fatal(err) }
lambda.Start(handler)
}
Partial-batch failure handling ¶
Records that exhaust their retry budget are reported via BatchItemFailures with the SQS message ID as ItemIdentifier. Configure your event-source mapping with `FunctionResponseTypes=["ReportBatchItemFailures"]` so SQS only redelivers the failures rather than retrying the whole batch. Without this flag, a single poison message would force the entire batch to redeliver until the message hit its maxReceiveCount and went to a DLQ.
Dedup key ¶
Default EventID = "<event-source-ARN>/<MessageId>". SQS messages have stable IDs across redeliveries (the message ID is stable; the receipt handle is not). For FIFO queues with content-based dedup, you may prefer to derive the EventID from the message body's natural key — pass a custom EventIDFn via WithEventID.
Visibility timeout interactions ¶
SQS event-source mappings handle visibility-timeout extension automatically; the Lambda runtime extends the timeout for in-flight messages so a long-running handler doesn't trigger a redelivery. Keep the per-record retry budget reasonable so a stuck record doesn't burn the entire visibility window.
Index ¶
- type Decoder
- type Handler
- type HandlerOption
- func WithClock[T any](now func() time.Time) HandlerOption[T]
- func WithDecodeErrorCallback[T any](fn func(body string, messageID string, err error)) HandlerOption[T]
- func WithDedup[T any](d state.Deduper) HandlerOption[T]
- func WithEventID[T any](fn func(msg *events.SQSMessage, decoded T) string) HandlerOption[T]
- func WithMaxAttempts[T any](n int) HandlerOption[T]
- func WithMetrics[T any](r metrics.Recorder) HandlerOption[T]
- func WithRetryBackoff[T any](base, max time.Duration) HandlerOption[T]
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Decoder ¶
Decoder converts a raw SQS message body to the pipeline's input type T.
func JSONDecoder ¶
JSONDecoder returns a Decoder that unmarshals the message body as JSON into T.
type Handler ¶
Handler is the Lambda handler signature for SQS triggers — pass the returned value directly to lambda.Start.
func NewHandler ¶
func NewHandler[T any, V any]( p *pipeline.Pipeline[T, V], decode Decoder[T], opts ...HandlerOption[T], ) (Handler, error)
NewHandler builds a Lambda handler that drives a Murmur pipeline from an SQS trigger. The pipeline must be Build-validated; the supplied Decoder converts each message body to the pipeline's input type T. The pipeline's Source field is unused — Lambda owns polling.
Returns an error if the pipeline's required fields (Key, Value, Aggregate, StoreIn) are not set, or if Decode is nil.
type HandlerOption ¶
type HandlerOption[T any] func(*handlerConfig[T])
HandlerOption configures NewHandler.
func WithClock ¶
func WithClock[T any](now func() time.Time) HandlerOption[T]
WithClock overrides time.Now for windowed-bucket assignment. Useful for tests with deterministic clocks.
func WithDecodeErrorCallback ¶
func WithDecodeErrorCallback[T any](fn func(body string, messageID string, err error)) HandlerOption[T]
WithDecodeErrorCallback installs a callback for messages whose Decode returned an error. Default behavior is to drop silently. Decode failures are NOT redelivered (poison pill loop) — wire to a DLQ producer to move poison messages off the hot path.
func WithDedup ¶
func WithDedup[T any](d state.Deduper) HandlerOption[T]
WithDedup installs a state.Deduper. Each SQS message's EventID (default: "<event-source-ARN>/<MessageId>") is claimed via MarkSeen before the merge runs; on a duplicate, the merge is skipped and the message is counted as processed.
Strongly recommended in production: SQS at-least-once delivery means duplicate redeliveries are normal during visibility-timeout edge cases and during partial-batch redelivery.
func WithEventID ¶
func WithEventID[T any](fn func(msg *events.SQSMessage, decoded T) string) HandlerOption[T]
WithEventID overrides the default EventID derivation. Default uses "<event-source-ARN>/<MessageId>" — globally unique across the queue's history and stable across redeliveries.
For FIFO queues with content-based dedup, or for cases where the message payload carries an upstream identifier (a Mongo `_id`, a Kafka offset reference, etc.), supply an extractor that returns that identifier so re-deliveries of the same logical change fold idempotently.
Return "" to disable dedup for that message (it will be processed as if no Deduper were configured).
func WithMaxAttempts ¶
func WithMaxAttempts[T any](n int) HandlerOption[T]
WithMaxAttempts sets the per-record retry budget. Defaults to 3.
func WithMetrics ¶
func WithMetrics[T any](r metrics.Recorder) HandlerOption[T]
WithMetrics installs a metrics.Recorder. Defaults to metrics.Noop{}. The handler records events under the pipeline's Name; retries under "<name>:retry"; dedup skips under "<name>:dedup_skip"; dead letters under "<name>:dead_letter".
func WithRetryBackoff ¶
func WithRetryBackoff[T any](base, max time.Duration) HandlerOption[T]
WithRetryBackoff configures the per-attempt sleep schedule. Doubles after each failure starting from base, capped at max, with full jitter. Defaults to 50 ms / 5 s.