dynamorm

package
v1.0.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 15, 2025 License: Apache-2.0 Imports: 8 Imported by: 0

README

DynamORM Implementation for Streamer

This directory contains the DynamORM v1.0.9 implementation for the Streamer project's data storage layer.

Overview

DynamORM provides a cleaner, more maintainable interface for DynamoDB operations compared to direct AWS SDK usage. This implementation maintains backward compatibility while offering improved developer experience.

Files

  • models.go - DynamORM model definitions with PK/SK pattern
  • connection_store.go - ConnectionStore interface implementation using DynamORM
  • factory.go - Factory pattern for creating DynamORM stores
  • connection_store_test.go - Unit tests for the connection store

Key Features

1. PK/SK Pattern

DynamORM uses a composite key pattern for better query flexibility:

  • PK: CONN#<ConnectionID>
  • SK: METADATA

This allows for future extensibility and efficient queries.

2. Automatic Index Management

Indexes are defined using struct tags:

UserID   string `dynamorm:"user_id" dynamorm-index:"user-index,pk"`
TenantID string `dynamorm:"tenant_id" dynamorm-index:"tenant-index,pk"`
3. Simplified Operations

Compare the difference:

Before (AWS SDK):

input := &dynamodb.PutItemInput{
    TableName: aws.String(s.tableName),
    Item:      item,
}
_, err = s.client.PutItem(ctx, input)

After (DynamORM):

err := s.db.Model(dynamormConn).Create()

Usage

Enable DynamORM in Lambda Functions

Set the environment variable:

USE_DYNAMORM=true
Initialize in Code
// Create DynamORM factory
factory, err := dynamorm.NewStoreFactory(session.Config{
    Region: "us-east-1",
})

// Get connection store
connStore := factory.ConnectionStore()

Migration Status

  • ✅ Connection Store implemented
  • ⏳ Request Queue (TODO)
  • ⏳ Subscription Store (TODO)

Testing

Run tests with:

go test ./internal/store/dynamorm/...

Performance Considerations

  • DynamORM is optimized for Lambda cold starts
  • Connection pooling is handled automatically
  • Built-in retry logic with exponential backoff

Next Steps

  1. Implement RequestQueue and SubscriptionStore
  2. Add integration tests with DynamoDB Local
  3. Create migration scripts for production data
  4. Update deployment infrastructure

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func NewConnectionStore

func NewConnectionStore(db core.DB) store.ConnectionStore

NewConnectionStore creates a new DynamORM-backed connection store

func NewRequestQueue

func NewRequestQueue(db core.DB) store.RequestQueue

NewRequestQueue creates a new DynamORM-backed request queue

Types

type AsyncRequest

type AsyncRequest struct {
	// DynamORM composite key pattern
	PK string `dynamorm:"pk"`
	SK string `dynamorm:"sk"`

	// Request data
	RequestID    string                 `dynamorm:"request_id"`
	ConnectionID string                 `dynamorm:"connection_id" dynamorm-index:"connection-index,pk"`
	Status       store.RequestStatus    `dynamorm:"status" dynamorm-index:"status-index,sk"`
	CreatedAt    time.Time              `dynamorm:"created_at"`
	Action       string                 `dynamorm:"action"`
	Payload      map[string]interface{} `dynamorm:"payload,omitempty"`

	// Processing information
	ProcessingStarted *time.Time `dynamorm:"processing_started,omitempty"`
	ProcessingEnded   *time.Time `dynamorm:"processing_ended,omitempty"`

	// Result or error
	Result map[string]interface{} `dynamorm:"result,omitempty"`
	Error  string                 `dynamorm:"error,omitempty"`

	// Progress tracking
	Progress        float64                `dynamorm:"progress"`
	ProgressMessage string                 `dynamorm:"progress_message,omitempty"`
	ProgressDetails map[string]interface{} `dynamorm:"progress_details,omitempty"`

	// Retry information
	RetryCount int       `dynamorm:"retry_count"`
	MaxRetries int       `dynamorm:"max_retries"`
	RetryAfter time.Time `dynamorm:"retry_after,omitempty"`

	// User and tenant for querying
	UserID   string `dynamorm:"user_id" dynamorm-index:"user-index,sk"`
	TenantID string `dynamorm:"tenant_id" dynamorm-index:"tenant-index,sk"`

	// TTL for automatic cleanup
	TTL int64 `dynamorm:"ttl,omitempty"`
}

AsyncRequest represents a queued async request with DynamORM

func (*AsyncRequest) FromStoreModel

func (r *AsyncRequest) FromStoreModel(req *store.AsyncRequest)

FromStoreModel converts from the store.AsyncRequest model

func (*AsyncRequest) SetKeys

func (r *AsyncRequest) SetKeys()

SetKeys sets the composite keys for the request

func (*AsyncRequest) TableName

func (r *AsyncRequest) TableName() string

TableName returns the DynamoDB table name

func (*AsyncRequest) ToStoreModel

func (r *AsyncRequest) ToStoreModel() *store.AsyncRequest

ToStoreModel converts to the store.AsyncRequest model

type Connection

type Connection struct {
	// DynamORM composite key pattern
	PK string `dynamorm:"pk"`
	SK string `dynamorm:"sk"`

	// Connection data
	ConnectionID string    `dynamorm:"connection_id"`
	UserID       string    `dynamorm:"user_id" dynamorm-index:"user-index,pk"`
	TenantID     string    `dynamorm:"tenant_id" dynamorm-index:"tenant-index,pk"`
	Endpoint     string    `dynamorm:"endpoint"`
	ConnectedAt  time.Time `dynamorm:"connected_at"`
	LastPing     time.Time `dynamorm:"last_ping"`

	// Metadata for storing additional information
	Metadata map[string]string `dynamorm:"metadata,omitempty"`

	// TTL for automatic cleanup
	TTL int64 `dynamorm:"ttl,omitempty"`
}

Connection represents a WebSocket connection with DynamORM

func (*Connection) FromStoreModel

func (c *Connection) FromStoreModel(conn *store.Connection)

FromStoreModel converts from the store.Connection model

func (*Connection) SetKeys

func (c *Connection) SetKeys()

SetKeys sets the composite keys for the connection

func (*Connection) TableName

func (c *Connection) TableName() string

TableName returns the DynamoDB table name

func (*Connection) ToStoreModel

func (c *Connection) ToStoreModel() *store.Connection

ToStoreModel converts to the store.Connection model

type StoreFactory

type StoreFactory struct {
	// contains filtered or unexported fields
}

StoreFactory creates all the necessary stores using DynamORM

func NewStoreFactory

func NewStoreFactory(config session.Config) (*StoreFactory, error)

NewStoreFactory creates a new DynamORM store factory

func NewStoreFactoryFromClient

func NewStoreFactoryFromClient(client *dynamodb.Client, region string) (*StoreFactory, error)

NewStoreFactoryFromClient creates a new factory from an existing DynamoDB client This is useful for Lambda functions that already have a client configured

func (*StoreFactory) ConnectionStore

func (f *StoreFactory) ConnectionStore() store.ConnectionStore

ConnectionStore returns the connection store

func (*StoreFactory) DB

func (f *StoreFactory) DB() *dynamorm.DB

DB returns the underlying DynamORM database instance

func (*StoreFactory) EnsureTables

func (f *StoreFactory) EnsureTables(ctx context.Context) error

EnsureTables creates the required DynamoDB tables if they don't exist This should only be used in development/testing environments

func (*StoreFactory) RequestQueue

func (f *StoreFactory) RequestQueue() store.RequestQueue

RequestQueue returns the request queue

func (*StoreFactory) SubscriptionStore

func (f *StoreFactory) SubscriptionStore() store.SubscriptionStore

SubscriptionStore returns the subscription store

type Subscription

type Subscription struct {
	// DynamORM composite key pattern
	PK string `dynamorm:"pk"`
	SK string `dynamorm:"sk"`

	// Subscription data
	SubscriptionID string    `dynamorm:"subscription_id"`
	ConnectionID   string    `dynamorm:"connection_id" dynamorm-index:"connection-index,pk"`
	RequestID      string    `dynamorm:"request_id" dynamorm-index:"request-index,pk"`
	EventTypes     []string  `dynamorm:"event_types,stringset"`
	CreatedAt      time.Time `dynamorm:"created_at"`

	// TTL for automatic cleanup
	TTL int64 `dynamorm:"ttl,omitempty"`
}

Subscription represents a real-time update subscription with DynamORM

func (*Subscription) FromStoreModel

func (s *Subscription) FromStoreModel(sub *store.Subscription)

FromStoreModel converts from the store.Subscription model

func (*Subscription) SetKeys

func (s *Subscription) SetKeys()

SetKeys sets the composite keys for the subscription

func (*Subscription) TableName

func (s *Subscription) TableName() string

TableName returns the DynamoDB table name

func (*Subscription) ToStoreModel

func (s *Subscription) ToStoreModel() *store.Subscription

ToStoreModel converts to the store.Subscription model

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL