Documentation
¶
Overview ¶
Package dynamodb provides a DynamoDB-backed implementation of the github.com/slackmgr/types.DB interface for Slack Manager.
Overview ¶
The package uses a single-table DynamoDB design. Every record is keyed by a Slack channel ID (partition key, "pk") combined with a type-prefixed composite sort key ("sk"):
- Alerts: ALERT#<timestamp>#<unique_id>
- Issues: ISSUE#<channel>#<base64_correlation_id>#<unique_id>
- Move mappings: MOVEMAPPING#<channel>#<base64_correlation_id>
- Processing state: PROCESSINGSTATE#<channel>
Two Global Secondary Indexes support cross-channel queries:
Getting Started ¶
Create a Client with New, supplying an AWS config, the DynamoDB table name, and any Option values you need:
client := dynamodb.New(
&awsCfg,
tableName,
dynamodb.WithAlertsTimeToLive(30*24*time.Hour),
dynamodb.WithIssuesTimeToLive(180*24*time.Hour),
)
By default, New creates an AWS SDK v2 DynamoDB client from the supplied aws.Config. Supply WithAPI to inject a custom or mock implementation.
TTL Behaviour ¶
Alert records expire after 30 days by default; closed issue records expire after 180 days. Both windows are configurable via WithAlertsTimeToLive and WithIssuesTimeToLive. TTL values are stored as Unix timestamps and rely on DynamoDB's built-in TTL feature for automatic deletion.
Concurrency ¶
Client is safe for concurrent use by multiple goroutines.
Index ¶
- Constants
- type API
- type Client
- func (c *Client) Connect() error
- func (c *Client) DeleteMoveMapping(ctx context.Context, channelID, correlationID string) error
- func (c *Client) DropAllData(ctx context.Context) error
- func (c *Client) FindActiveChannels(ctx context.Context) ([]string, error)
- func (c *Client) FindChannelProcessingState(ctx context.Context, channelID string) (*types.ChannelProcessingState, error)
- func (c *Client) FindIssueBySlackPostID(ctx context.Context, channelID, postID string) (string, json.RawMessage, error)
- func (c *Client) FindMoveMapping(ctx context.Context, channelID, correlationID string) (json.RawMessage, error)
- func (c *Client) FindOpenIssueByCorrelationID(ctx context.Context, channelID, correlationID string) (string, json.RawMessage, error)
- func (c *Client) Init(ctx context.Context, skipSchemaValidation bool) error
- func (c *Client) LoadOpenIssuesInChannel(ctx context.Context, channelID string) (map[string]json.RawMessage, error)
- func (c *Client) MoveIssue(ctx context.Context, issue types.Issue, ...) error
- func (c *Client) SaveAlert(ctx context.Context, alert *types.Alert) error
- func (c *Client) SaveChannelProcessingState(ctx context.Context, state *types.ChannelProcessingState) error
- func (c *Client) SaveIssue(ctx context.Context, issue types.Issue) error
- func (c *Client) SaveIssues(ctx context.Context, issues ...types.Issue) error
- func (c *Client) SaveMoveMapping(ctx context.Context, moveMapping types.MoveMapping) error
- type Option
- type Options
Constants ¶
const ( // GSIPostID is the name of the Global Secondary Index used to query issues by // Slack post ID. Partition key: pk, sort key: post_id, projected attribute: sk. GSIPostID = "GSIPostID" // GSIIsOpen is the name of the Global Secondary Index used to query all open // issues across channels. Partition key: is_open, sort key: sk, projected // attribute: body. // // All open issues share the partition key value "true", which creates a hot // partition. At higher scale, consider sharding (e.g., "true#<shard>" where // shard = hash(channelID) % N) to distribute load. This would require schema // changes and a data migration. GSIIsOpen = "GSIIsOpen" // PartitionKey is the DynamoDB partition key attribute name. PartitionKey = "pk" // SortKey is the DynamoDB sort key attribute name. SortKey = "sk" // IsOpenAttr is the attribute name that marks an issue as open. It also // serves as the partition key for the GSIIsOpen index. IsOpenAttr = "is_open" // PostIDAttr is the attribute name used to store the Slack post ID. It also // serves as the sort key for the GSIPostID index. PostIDAttr = "post_id" // BodyAttr is the attribute name used to store the JSON-encoded body of a record. BodyAttr = "body" // TTLAttr is the attribute name used for DynamoDB TTL-based expiration. The // table must have TTL enabled on this attribute. TTLAttr = "ttl" // IsOpenValue is the partition key value written to the GSIIsOpen index for // open issues. IsOpenValue = "true" )
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type API ¶
type API interface {
PutItem(ctx context.Context, params *dynamodb.PutItemInput, optFns ...func(*dynamodb.Options)) (*dynamodb.PutItemOutput, error)
Query(ctx context.Context, params *dynamodb.QueryInput, optFns ...func(*dynamodb.Options)) (*dynamodb.QueryOutput, error)
GetItem(ctx context.Context, params *dynamodb.GetItemInput, optFns ...func(*dynamodb.Options)) (*dynamodb.GetItemOutput, error)
Scan(ctx context.Context, params *dynamodb.ScanInput, optFns ...func(*dynamodb.Options)) (*dynamodb.ScanOutput, error)
BatchWriteItem(ctx context.Context, params *dynamodb.BatchWriteItemInput, optFns ...func(*dynamodb.Options)) (*dynamodb.BatchWriteItemOutput, error)
TransactWriteItems(ctx context.Context, params *dynamodb.TransactWriteItemsInput, optFns ...func(*dynamodb.Options)) (*dynamodb.TransactWriteItemsOutput, error)
DescribeTable(ctx context.Context, params *dynamodb.DescribeTableInput, optFns ...func(*dynamodb.Options)) (*dynamodb.DescribeTableOutput, error)
DescribeTimeToLive(ctx context.Context, params *dynamodb.DescribeTimeToLiveInput, optFns ...func(*dynamodb.Options)) (*dynamodb.DescribeTimeToLiveOutput, error)
DeleteItem(ctx context.Context, params *dynamodb.DeleteItemInput, optFns ...func(*dynamodb.Options)) (*dynamodb.DeleteItemOutput, error)
}
API defines the interface for DynamoDB operations used by the Client. This interface is satisfied by *dynamodb.Client and allows for mocking in tests.
type Client ¶
type Client struct {
// contains filtered or unexported fields
}
Client is a DynamoDB-backed implementation of the types.DB interface. It uses a single-table design with composite sort keys to store alerts, issues, move mappings, and channel processing state.
Use New to create a Client, Client.Connect to initialize the underlying DynamoDB connection, and Client.Init to validate the table schema.
func New ¶
New creates a new Client configured with the given AWS config, table name, and optional options. Call Client.Connect on the returned client before use.
func (*Client) Connect ¶
Connect initializes the DynamoDB client from the AWS config provided to New. It must be called before any other Client methods, and must complete before the Client is used concurrently.
func (*Client) DeleteMoveMapping ¶
DeleteMoveMapping removes a move mapping from DynamoDB. It is a no-op if the mapping does not exist.
func (*Client) DropAllData ¶
DropAllData deletes every item from the DynamoDB table. It scans the table in pages and removes each page using BatchWriteItem with exponential backoff for unprocessed items.
This method is intended for use in tests only. Do not call it in production.
func (*Client) FindActiveChannels ¶
FindActiveChannels returns the IDs of all channels that have at least one open issue, by querying the GSIIsOpen index. Results are sorted alphabetically. Returns an empty slice if no channels are active.
func (*Client) FindChannelProcessingState ¶
func (c *Client) FindChannelProcessingState(ctx context.Context, channelID string) (*types.ChannelProcessingState, error)
FindChannelProcessingState retrieves the processing state for a channel. Returns (nil, nil) if no state has been saved for the channel.
func (*Client) FindIssueBySlackPostID ¶
func (c *Client) FindIssueBySlackPostID(ctx context.Context, channelID, postID string) (string, json.RawMessage, error)
FindIssueBySlackPostID looks up an issue by channel ID and Slack post ID using the GSIPostID index. It returns the issue's unique ID and its JSON-encoded body. Returns ("", nil, nil) if no match is found.
Note: the GSI uses eventually consistent reads, so recent writes may not be immediately visible.
func (*Client) FindMoveMapping ¶
func (c *Client) FindMoveMapping(ctx context.Context, channelID, correlationID string) (json.RawMessage, error)
FindMoveMapping retrieves a move mapping by channel ID and correlation ID. Returns (nil, nil) if no move mapping is found.
func (*Client) FindOpenIssueByCorrelationID ¶
func (c *Client) FindOpenIssueByCorrelationID(ctx context.Context, channelID, correlationID string) (string, json.RawMessage, error)
FindOpenIssueByCorrelationID looks up an open issue by channel ID and correlation ID using the GSIIsOpen index. It returns the issue's unique ID and its JSON-encoded body. Returns ("", nil, nil) if no match is found. Returns an error if more than one open issue matches the correlation ID.
func (*Client) Init ¶
Init validates the DynamoDB table schema. It checks that the table exists, has the correct partition key (pk) and sort key (sk), has TTL enabled on the ttl attribute, and that both required Global Secondary Indexes (GSIPostID and GSIIsOpen) are present and correctly configured.
Pass skipSchemaValidation true to skip all checks and return immediately, which is useful when schema validation is managed separately.
func (*Client) LoadOpenIssuesInChannel ¶
func (c *Client) LoadOpenIssuesInChannel(ctx context.Context, channelID string) (map[string]json.RawMessage, error)
LoadOpenIssuesInChannel returns all open issues in the given channel as a map of unique ID to JSON-encoded body, queried from the GSIIsOpen index. Returns an empty map if the channel has no open issues.
func (*Client) MoveIssue ¶
func (c *Client) MoveIssue(ctx context.Context, issue types.Issue, sourceChannelID, targetChannelID string) error
MoveIssue atomically deletes the issue from the source channel and writes it to the target channel using a DynamoDB transaction. The source item must exist; if it does not, the transaction fails with a condition check error.
The issue must already have its channel ID set to targetChannelID before this method is called.
func (*Client) SaveAlert ¶
SaveAlert persists an alert to DynamoDB. The record is stored as a JSON-encoded body with a TTL set to the current time plus the alerts time-to-live configured via WithAlertsTimeToLive (default: 30 days).
func (*Client) SaveChannelProcessingState ¶
func (c *Client) SaveChannelProcessingState(ctx context.Context, state *types.ChannelProcessingState) error
SaveChannelProcessingState persists a channel processing state to DynamoDB.
func (*Client) SaveIssue ¶
SaveIssue persists a single issue to DynamoDB. Open issues are indexed in GSIIsOpen for efficient lookup. Closed issues have a TTL set to the current time plus the issues time-to-live configured via WithIssuesTimeToLive (default: 180 days).
func (*Client) SaveIssues ¶
SaveIssues persists multiple issues to DynamoDB. A single issue is written with PutItem; two or more issues are batched in groups of up to 25 using BatchWriteItem, with exponential backoff for any unprocessed items.
func (*Client) SaveMoveMapping ¶
SaveMoveMapping persists a move mapping to DynamoDB.
type Option ¶
type Option func(*Options)
Option is a functional option for configuring a Client.
func WithAPI ¶
WithAPI sets a custom API implementation. This is useful when a custom DynamoDB configuration is required, or for injecting mocks in tests.
func WithAlertsTimeToLive ¶
WithAlertsTimeToLive sets the TTL applied to alert records. The default is 30 days. The duration must be greater than zero.
func WithClock ¶
WithClock sets a custom clock function used when computing TTL values. Defaults to time.Now. This is useful for controlling time in tests.
func WithIssuesTimeToLive ¶
WithIssuesTimeToLive sets the TTL applied to closed issue records. The default is 180 days. The duration must be greater than zero.
type Options ¶
type Options struct {
// contains filtered or unexported fields
}
Options holds the configuration for a Client. Use Option functions (such as WithAlertsTimeToLive or WithIssuesTimeToLive) to customise the defaults.