dynamorm

package
v1.0.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 11, 2025 License: Apache-2.0 Imports: 8 Imported by: 0

README

DynamORM Implementation for Streamer

This directory contains the DynamORM v1.0.9 implementation for the Streamer project's data storage layer.

Overview

DynamORM provides a cleaner, more maintainable interface for DynamoDB operations compared to direct AWS SDK usage. This implementation maintains backward compatibility while offering improved developer experience.

Files

  • models.go - DynamORM model definitions with PK/SK pattern
  • connection_store.go - ConnectionStore interface implementation using DynamORM
  • factory.go - Factory pattern for creating DynamORM stores
  • connection_store_test.go - Unit tests for the connection store

Key Features

1. PK/SK Pattern

DynamORM uses a composite key pattern for better query flexibility:

  • PK: CONN#<ConnectionID>
  • SK: METADATA

This allows for future extensibility and efficient queries.

2. Automatic Index Management

Indexes are defined using struct tags:

UserID   string `dynamorm:"user_id" dynamorm-index:"user-index,pk"`
TenantID string `dynamorm:"tenant_id" dynamorm-index:"tenant-index,pk"`
3. Simplified Operations

Compare the difference:

Before (AWS SDK):

input := &dynamodb.PutItemInput{
    TableName: aws.String(s.tableName),
    Item:      item,
}
_, err = s.client.PutItem(ctx, input)

After (DynamORM):

err := s.db.Model(dynamormConn).Create()

Usage

Enable DynamORM in Lambda Functions

Set the environment variable:

USE_DYNAMORM=true
Initialize in Code
// Create DynamORM factory
factory, err := dynamorm.NewStoreFactory(session.Config{
    Region: "us-east-1",
})

// Get connection store
connStore := factory.ConnectionStore()

Migration Status

  • ✅ Connection Store implemented
  • ⏳ Request Queue (TODO)
  • ⏳ Subscription Store (TODO)

Testing

Run tests with:

go test ./internal/store/dynamorm/...

Performance Considerations

  • DynamORM is optimized for Lambda cold starts
  • Connection pooling is handled automatically
  • Built-in retry logic with exponential backoff

Next Steps

  1. Implement RequestQueue and SubscriptionStore
  2. Add integration tests with DynamoDB Local
  3. Create migration scripts for production data
  4. Update deployment infrastructure

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func NewConnectionStore

func NewConnectionStore(db core.DB) store.ConnectionStore

NewConnectionStore creates a new DynamORM-backed connection store

func NewRequestQueue

func NewRequestQueue(db core.DB) store.RequestQueue

NewRequestQueue creates a new DynamORM-backed request queue

Types

type AsyncRequest

type AsyncRequest struct {
	// DynamORM composite key pattern
	PK string `dynamorm:"pk"`
	SK string `dynamorm:"sk"`

	// Request data with proper attribute mapping
	RequestID    string                 `dynamorm:"attr:request_id" json:"requestId"`
	ConnectionID string                 `dynamorm:"attr:connection_id" json:"connectionId" dynamorm-index:"connection-index,pk"`
	Status       RequestStatus          `dynamorm:"attr:status" json:"status" dynamorm-index:"status-index,sk"`
	Action       string                 `dynamorm:"attr:action" json:"action"`
	Payload      map[string]interface{} `dynamorm:"attr:payload,omitempty" json:"payload,omitempty"`

	// Processing information
	ProcessingStarted *time.Time `dynamorm:"attr:processing_started,omitempty" json:"processingStarted,omitempty"`
	ProcessingEnded   *time.Time `dynamorm:"attr:processing_ended,omitempty" json:"processingEnded,omitempty"`

	// Result or error
	Result map[string]interface{} `dynamorm:"attr:result,omitempty" json:"result,omitempty"`
	Error  string                 `dynamorm:"attr:error,omitempty" json:"error,omitempty"`

	// Progress tracking
	Progress        float64                `dynamorm:"attr:progress" json:"progress"`
	ProgressMessage string                 `dynamorm:"attr:progress_message,omitempty" json:"progressMessage,omitempty"`
	ProgressDetails map[string]interface{} `dynamorm:"attr:progress_details,omitempty" json:"progressDetails,omitempty"`

	// Retry information
	RetryCount int       `dynamorm:"attr:retry_count" json:"retryCount"`
	MaxRetries int       `dynamorm:"attr:max_retries" json:"maxRetries"`
	RetryAfter time.Time `dynamorm:"attr:retry_after,omitempty" json:"retryAfter,omitempty"`

	// User and tenant for querying
	UserID   string `dynamorm:"attr:user_id" json:"userId" dynamorm-index:"user-index,sk"`
	TenantID string `dynamorm:"attr:tenant_id" json:"tenantId" dynamorm-index:"tenant-index,sk"`

	// DynamORM managed fields
	CreatedAt time.Time `dynamorm:"created_at" json:"createdAt"`
	UpdatedAt time.Time `dynamorm:"updated_at" json:"updatedAt"`
	Version   int       `dynamorm:"version" json:"version"`

	// TTL for automatic cleanup
	TTL int64 `dynamorm:"attr:ttl,omitempty" json:"ttl,omitempty"`
}

AsyncRequest represents a queued async request - single model serving both business and database needs

func (*AsyncRequest) FromStoreModel

func (r *AsyncRequest) FromStoreModel(req *store.AsyncRequest)

FromStoreModel converts store.AsyncRequest to DynamORM AsyncRequest

func (*AsyncRequest) SetKeys

func (r *AsyncRequest) SetKeys()

SetKeys sets the composite keys for the request

func (*AsyncRequest) TableName

func (r *AsyncRequest) TableName() string

TableName returns the DynamoDB table name

func (*AsyncRequest) ToStoreModel

func (r *AsyncRequest) ToStoreModel() *store.AsyncRequest

ToStoreModel converts DynamORM AsyncRequest to store.AsyncRequest

type Connection

type Connection struct {
	// DynamORM composite key pattern
	PK string `dynamorm:"pk"`
	SK string `dynamorm:"sk"`

	// Connection data with proper attribute mapping
	ConnectionID string    `dynamorm:"attr:connection_id" json:"connectionId"`
	UserID       string    `dynamorm:"attr:user_id" json:"userId" dynamorm-index:"user-index,pk"`
	TenantID     string    `dynamorm:"attr:tenant_id" json:"tenantId" dynamorm-index:"tenant-index,pk"`
	Endpoint     string    `dynamorm:"attr:endpoint" json:"endpoint"`
	ConnectedAt  time.Time `dynamorm:"attr:connected_at" json:"connectedAt"`
	LastPing     time.Time `dynamorm:"attr:last_ping" json:"lastPing"`

	// Metadata for storing additional information
	Metadata map[string]string `dynamorm:"attr:metadata,omitempty" json:"metadata,omitempty"`

	// DynamORM managed fields
	CreatedAt time.Time `dynamorm:"created_at" json:"createdAt"`
	UpdatedAt time.Time `dynamorm:"updated_at" json:"updatedAt"`
	Version   int       `dynamorm:"version" json:"version"`

	// TTL for automatic cleanup
	TTL int64 `dynamorm:"attr:ttl,omitempty" json:"ttl,omitempty"`
}

Connection represents a WebSocket connection - single model serving both business and database needs

func (*Connection) FromStoreModel

func (c *Connection) FromStoreModel(conn *store.Connection)

FromStoreModel converts store.Connection to DynamORM Connection

func (*Connection) SetKeys

func (c *Connection) SetKeys()

SetKeys sets the composite keys for the connection

func (*Connection) TableName

func (c *Connection) TableName() string

TableName returns the DynamoDB table name

func (*Connection) ToStoreModel

func (c *Connection) ToStoreModel() *store.Connection

ToStoreModel converts DynamORM Connection to store.Connection

type RequestStatus added in v1.0.4

type RequestStatus string

RequestStatus represents the status of an async request

const (
	StatusPending    RequestStatus = "PENDING"
	StatusProcessing RequestStatus = "PROCESSING"
	StatusCompleted  RequestStatus = "COMPLETED"
	StatusFailed     RequestStatus = "FAILED"
	StatusCancelled  RequestStatus = "CANCELLED"
	StatusRetrying   RequestStatus = "RETRYING"
)

type StoreFactory

type StoreFactory struct {
	// contains filtered or unexported fields
}

StoreFactory creates all the necessary stores using DynamORM

func NewStoreFactory

func NewStoreFactory(config session.Config) (*StoreFactory, error)

NewStoreFactory creates a new DynamORM store factory

func NewStoreFactoryFromClient

func NewStoreFactoryFromClient(client *dynamodb.Client, region string) (*StoreFactory, error)

NewStoreFactoryFromClient creates a new factory from an existing DynamoDB client This is useful for Lambda functions that already have a client configured

func (*StoreFactory) ConnectionStore

func (f *StoreFactory) ConnectionStore() store.ConnectionStore

ConnectionStore returns the connection store

func (*StoreFactory) DB

func (f *StoreFactory) DB() *dynamorm.DB

DB returns the underlying DynamORM database instance

func (*StoreFactory) EnsureTables

func (f *StoreFactory) EnsureTables(ctx context.Context) error

EnsureTables creates the required DynamoDB tables if they don't exist This should only be used in development/testing environments

func (*StoreFactory) RequestQueue

func (f *StoreFactory) RequestQueue() store.RequestQueue

RequestQueue returns the request queue

func (*StoreFactory) SubscriptionStore

func (f *StoreFactory) SubscriptionStore() store.SubscriptionStore

SubscriptionStore returns the subscription store

type Subscription

type Subscription struct {
	// DynamORM composite key pattern
	PK string `dynamorm:"pk"`
	SK string `dynamorm:"sk"`

	// Subscription data with proper attribute mapping
	SubscriptionID string   `dynamorm:"attr:subscription_id" json:"subscriptionId"`
	ConnectionID   string   `dynamorm:"attr:connection_id" json:"connectionId" dynamorm-index:"connection-index,pk"`
	RequestID      string   `dynamorm:"attr:request_id" json:"requestId" dynamorm-index:"request-index,pk"`
	EventTypes     []string `dynamorm:"attr:event_types,stringset" json:"eventTypes"`

	// DynamORM managed fields
	CreatedAt time.Time `dynamorm:"created_at" json:"createdAt"`
	UpdatedAt time.Time `dynamorm:"updated_at" json:"updatedAt"`
	Version   int       `dynamorm:"version" json:"version"`

	// TTL for automatic cleanup
	TTL int64 `dynamorm:"attr:ttl,omitempty" json:"ttl,omitempty"`
}

Subscription represents a real-time update subscription - single model serving both business and database needs

func (*Subscription) FromStoreModel

func (s *Subscription) FromStoreModel(sub *store.Subscription)

FromStoreModel converts store.Subscription to DynamORM Subscription

func (*Subscription) SetKeys

func (s *Subscription) SetKeys()

SetKeys sets the composite keys for the subscription

func (*Subscription) TableName

func (s *Subscription) TableName() string

TableName returns the DynamoDB table name

func (*Subscription) ToStoreModel

func (s *Subscription) ToStoreModel() *store.Subscription

ToStoreModel converts DynamORM Subscription to store.Subscription

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL