godynamodb-queue

module
v0.11.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 18, 2026 License: Apache-2.0

README

:author_name: Mario Toffia
:author_email: <use discussions on github>
:author: {author_name}
:email: {author_email}
:source-highlighter: highlightjs
:toc:
:toc-title: Table of Contents
:toclevels: 3
:sectnums:
:homepage: github.com/mariotoffia/godynamodb-queue
:stem: latexmath
:doctype: book
:imagesdir: ./assets
:icons: font

= DynamoDB as Queue in golang

.Status
|===
|Build Status |Documentation

|image:https://github.com/mariotoffia/godynamodb-queue/workflows/Integration%20Tests/badge.svg["Integration Tests"]

|link:https://pkg.go.dev/mod/github.com/mariotoffia/godynamodb-queue[image:https://pkg.go.dev/badge/mariotoffia/godynamodb-queue/repository.svg["Go Reference"]]
|===

== Introduction

A flexible message queue library using DynamoDB as the backend. Features:

* *Two queue modes*: Standard (best-effort ordering) and FIFO (strict ordering with message groups)
* *Multi-tenant support*: Isolated queues via `QueueName` and `ClientID` combinations
* *Concurrent consumers*: Safe for multiple processes/goroutines with visibility timeout locking
* *Message groups* (FIFO): Strict ordering within groups, parallel processing across groups
* *Automatic redelivery*: Unacknowledged messages become visible after timeout
* *TTL support*: Automatic message expiration via DynamoDB TTL

The DynamoDB Table has the following schema:

[cols="1,1,1,1,1,1,1", options="header"]
|===
|PK |SK |hidden_until |owner |message_group |TTL |event

|`queueName\|clientID`
|`{timestamp_ns}&{random_digits}`
|`{now()+visibilityTimeout}`
|`{clientID}`
|`{group}` (FIFO only)
|`{ttl}`
|`{events.SQSMessage}`
|===

* *PK*: Partition key combining queue name and client ID
* *SK*: Sort key with nanosecond timestamp and random suffix for uniqueness
* *hidden_until*: Timestamp until which the message is invisible (visibility timeout)
* *message_group*: (FIFO only) Groups messages for ordered delivery
* *TTL*: DynamoDB TTL for automatic message expiration

When polling, messages are queried by PK (oldest first) where `hidden_until < now()`.

NOTE: The DynamoDB table must be configured with the TTL option on the TTL column in order for it to
be deleted automatically.

It has convenience functions to create tables with proper configuration in runtime (as well as drop them).

.Create Table
[source,go]
----
ctx := context.Background()
cfg, _ := config.LoadDefaultConfig(ctx)

created, _ := dynamodbqueue.New(cfg, 0, dynamodbqueue.QueueStandard).
    UseTable("queue-table").
    CreateQueueTable(ctx)
----

== Quick Start

.Simple Example
[source,go]
----
ctx := context.Background()
cfg, _ := config.LoadDefaultConfig(ctx)

queue := dynamodbqueue.New(cfg, 0, dynamodbqueue.QueueStandard). // <1>
    UseTable("my-queue-table").
    UseQueueName("testQueue").
    UseClientID("testClientId")

// Push messages to the queue
msgs, _ := queue.PushMessages(ctx, 0, events.SQSMessage{ // <2>
    MessageAttributes: map[string]events.SQSMessageAttribute{
        "priority": {DataType: "String", StringValue: aws.String("high")},
    },
    Body: "test body",
})

// Check the number of messages in the queue
count, _ := queue.Count(ctx)

// Poll messages (minMessages=1, maxMessages=10)
msgs, _ = queue.PollMessages( // <3>
    ctx,
    time.Second*5,  // timeout - keep polling until min messages or timeout
    time.Minute*14, // visibilityTimeout - how long to "own" messages
    1,              // minMessages
    10,             // maxMessages
)

// Process and delete messages
for _, msg := range msgs {
    // Process the message...
    queue.DeleteMessages(ctx, msg.ReceiptHandle) // <4>
}
----
<1> Create a queue with Standard mode (use `QueueFIFO` for strict ordering)
<2> Push one or more messages to the queue
<3> Poll for messages with timeout and visibility settings
<4> Delete messages after successful processing

When `timeout` is positive, the poll continues until `minMessages` are received or timeout expires. The `visibilityTimeout` determines how long messages are hidden from other consumers.

TIP: Use FIFO mode with message groups when you need strict ordering guarantees within a logical partition (e.g., per customer, per order).

== Queue Types

This library supports two queue types: *Standard* and *FIFO*.

=== Standard Queue

Standard queues provide best-effort ordering and at-least-once delivery. Messages are delivered in approximately the order they were sent, but strict ordering is not guaranteed.

.Standard Queue Example
[source,go]
----
ctx := context.Background()
cfg, _ := config.LoadDefaultConfig(ctx)

// Create a standard queue (default)
queue := dynamodbqueue.New(cfg, 0, dynamodbqueue.QueueStandard).
    UseTable("my-queue-table").
    UseQueueName("orders").
    UseClientID("processor-1")

// Push messages
queue.PushMessages(ctx, 0, events.SQSMessage{
    Body: `{"orderId": "12345", "amount": 99.99}`,
})

// Poll messages (up to 10, with 5-minute visibility timeout)
msgs, _ := queue.PollMessages(ctx, time.Second*5, time.Minute*5, 1, 10)

// Process and delete
for _, msg := range msgs {
    // Process message...
    queue.DeleteMessages(ctx, msg.ReceiptHandle)
}
----

=== FIFO Queue

FIFO queues guarantee strict ordering within message groups. Key properties:

* *Exactly one message in-flight per group* - While a message from group "A" is being processed, no other messages from group "A" can be delivered
* *Strict ordering* - Messages within a group are always delivered in the order they were sent
* *Parallel processing across groups* - Different groups can be processed concurrently

.FIFO Queue with Message Groups
[source,go]
----
ctx := context.Background()
cfg, _ := config.LoadDefaultConfig(ctx)

// Create a FIFO queue
queue := dynamodbqueue.New(cfg, 0, dynamodbqueue.QueueFIFO).
    UseTable("my-queue-table").
    UseQueueName("orders").
    UseClientID("processor-1")

// Cast to FifoQueue to access group-specific methods
fifo := queue.(dynamodbqueue.FifoQueue)

// Push messages to specific groups (e.g., by customer ID)
fifo.PushMessagesWithGroup(ctx, 0, "customer-123", events.SQSMessage{
    Body: `{"orderId": "A1", "action": "create"}`,
})
fifo.PushMessagesWithGroup(ctx, 0, "customer-123", events.SQSMessage{
    Body: `{"orderId": "A1", "action": "update"}`,
})
fifo.PushMessagesWithGroup(ctx, 0, "customer-456", events.SQSMessage{
    Body: `{"orderId": "B1", "action": "create"}`,
})

// Poll messages from multiple groups (up to 3 groups at once)
// Returns at most one message per group to maintain FIFO ordering
msgs, _ := fifo.PollMessages(ctx, time.Second*5, time.Minute*5, 1, 3)

// Process and delete - other messages in same group remain blocked
for _, msg := range msgs {
    // Process message...
    fifo.DeleteMessages(ctx, msg.ReceiptHandle)
    // Now the next message in this group becomes available
}
----

.FIFO Ordering Guarantee
[source,go]
----
// Messages pushed to the same group are ALWAYS delivered in order
fifo.PushMessagesWithGroup(ctx, 0, "user-1", events.SQSMessage{Body: "msg-1"})
fifo.PushMessagesWithGroup(ctx, 0, "user-1", events.SQSMessage{Body: "msg-2"})
fifo.PushMessagesWithGroup(ctx, 0, "user-1", events.SQSMessage{Body: "msg-3"})

// Even with multiple concurrent consumers, messages are delivered:
// msg-1 first, then msg-2, then msg-3
// This is guaranteed regardless of the number of consumers
----

== Concurrent Consumers

Multiple consumers can safely process the queue simultaneously.

.Multiple Consumers Example
[source,go]
----
// Start multiple consumers in goroutines
for i := 0; i < 5; i++ {
    go func(consumerID int) {
        queue := dynamodbqueue.New(cfg, 0, dynamodbqueue.QueueFIFO).
            UseTable("my-queue-table").
            UseQueueName("orders").
            UseClientID(fmt.Sprintf("consumer-%d", consumerID))

        fifo := queue.(dynamodbqueue.FifoQueue)

        for {
            msgs, err := fifo.PollMessages(ctx, time.Second, time.Minute*5, 1, 10)
            if err != nil || len(msgs) == 0 {
                continue
            }

            for _, msg := range msgs {
                processMessage(msg)
                fifo.DeleteMessages(ctx, msg.ReceiptHandle)
            }
        }
    }(i)
}
----

== Visibility Timeout and Redelivery

Messages become invisible to other consumers during processing. If not deleted within the visibility timeout, they are automatically redelivered.

.Visibility Timeout Example
[source,go]
----
// Poll with 30-second visibility timeout
msgs, _ := queue.PollMessages(ctx, 0, time.Second*30, 1, 10)

// If processing takes longer than 30 seconds without deleting,
// the message becomes visible again and can be picked up by another consumer

// Always delete after successful processing
for _, msg := range msgs {
    if err := processMessage(msg); err == nil {
        queue.DeleteMessages(ctx, msg.ReceiptHandle)
    }
    // If processing fails, don't delete - message will be redelivered
}
----

== Table Management

.Create and Drop Tables
[source,go]
----
ctx := context.Background()
cfg, _ := config.LoadDefaultConfig(ctx)

queue := dynamodbqueue.New(cfg, time.Hour*24, dynamodbqueue.QueueStandard). // 24h default TTL
    UseTable("my-queue-table").
    UseQueueName("orders").
    UseClientID("admin")

// Create table (returns true if created, false if already exists)
created, err := queue.CreateQueueTable(ctx)

// Check if table exists
exists := queue.TableExists(ctx)

// Purge all messages from this queue/client combination
queue.Purge(ctx)

// Drop the entire table
queue.DropQueueTable(ctx)
----

== The Idea

This idea, was born when the usage of SQS queues was not sufficient flexible. I had a set of queues and a set of clients and there where no possibility to dequeue messages that has e.g. a discriminator to select on. I.e it would consume other
clients messages and push back them into the queue, making it very inefficient. I do love _SQS_ but it was not usable for that particular use-case.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL