streams

package
v0.22.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 26, 2026 License: MIT Imports: 4 Imported by: 0

Documentation

Overview

Package streams provides a shared event store for DynamoDB Streams. Both the dynamodb service (producer) and the dynamodbstreams service (consumer) reference this package to avoid circular imports.

Index

Constants

This section is empty.

Variables

View Source
var Global = NewStore()

Global is the default global Store instance shared between the dynamodb and dynamodbstreams services.

Functions

This section is empty.

Types

type AttributeValue

type AttributeValue struct {
	S    *string                    `json:"S,omitempty"`
	N    *string                    `json:"N,omitempty"`
	B    []byte                     `json:"B,omitempty"`
	SS   []string                   `json:"SS,omitempty"`
	NS   []string                   `json:"NS,omitempty"`
	BS   [][]byte                   `json:"BS,omitempty"`
	M    map[string]*AttributeValue `json:"M,omitempty"`
	L    []*AttributeValue          `json:"L,omitempty"`
	NULL *bool                      `json:"NULL,omitempty"`
	BOOL *bool                      `json:"BOOL,omitempty"`
}

AttributeValue mirrors DynamoDB's AttributeValue for stream records. We use a simplified representation to avoid importing the dynamodb package.

type KeySchemaElement

type KeySchemaElement struct {
	AttributeName string
	KeyType       string // HASH or RANGE
}

KeySchemaElement represents a key schema element.

type OperationType

type OperationType string

OperationType represents the type of DynamoDB stream event.

const (
	OperationTypeInsert OperationType = "INSERT"
	OperationTypeModify OperationType = "MODIFY"
	OperationTypeRemove OperationType = "REMOVE"
)

Operation type constants.

type ShardInfo

type ShardInfo struct {
	ShardID                  string
	ParentShardID            string
	SequenceNumberRangeStart string
}

ShardInfo holds metadata about a shard within a stream.

type Store

type Store struct {
	// contains filtered or unexported fields
}

Store is a global, concurrency-safe store for DynamoDB stream events. DynamoDB storage writes events here; DynamoDB Streams reads them.

func NewStore

func NewStore() *Store

NewStore creates a new Store.

func (*Store) DescribeStream

func (s *Store) DescribeStream(streamARN string) (*StreamInfo, []ShardInfo, error)

DescribeStream returns stream info and shard list for the given ARN.

func (*Store) GetRecords

func (s *Store) GetRecords(streamARN, shardID string, position, limit int) ([]*StreamRecord, int, error)

GetRecords returns records from a given shard starting at the specified position.

func (*Store) PutRecord

func (s *Store) PutRecord(record *StreamRecord)

PutRecord adds a stream record. Called by DynamoDB storage on item mutations.

func (*Store) RegisterStream

func (s *Store) RegisterStream(info *StreamInfo)

RegisterStream registers a stream for a table. Called when a DynamoDB table with StreamSpecification is created.

func (*Store) ShardRecordCount

func (s *Store) ShardRecordCount(streamARN, shardID string) int

ShardRecordCount returns the number of records in a shard (used for iterator positioning).

type StreamInfo

type StreamInfo struct {
	StreamARN      string
	TableName      string
	StreamViewType string
	StreamLabel    string
	StreamStatus   string
	KeySchema      []KeySchemaElement
	CreationTime   time.Time
}

StreamInfo holds metadata about a stream.

type StreamRecord

type StreamRecord struct {
	EventID        string
	EventName      OperationType
	EventVersion   string
	EventSource    string
	AwsRegion      string
	StreamViewType string
	TableName      string
	StreamARN      string
	Keys           map[string]AttributeValue
	NewImage       map[string]AttributeValue
	OldImage       map[string]AttributeValue
	SequenceNumber string
	CreatedAt      time.Time
	SizeBytes      int64
}

StreamRecord represents a single change event in a DynamoDB stream.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL