store

package
v1.0.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 15, 2025 License: Apache-2.0 Imports: 7 Imported by: 0

README

Streamer Storage Layer

This package implements the storage layer for the Streamer project using AWS DynamoDB.

Overview

The storage layer provides interfaces and implementations for managing:

  • WebSocket connections
  • Async request queuing
  • Real-time subscriptions

Components

Models (models.go)
  • Connection: Represents a WebSocket connection with user/tenant information
  • AsyncRequest: Represents a queued request for async processing
  • Subscription: Represents a real-time update subscription
Interfaces (interfaces.go)
  • ConnectionStore: Manages WebSocket connections

    • Save, Get, Delete connections
    • Query by user or tenant
    • Update activity timestamps
    • Clean up stale connections
  • RequestQueue: Manages async requests

    • Enqueue new requests
    • Update status and progress
    • Query by connection or status
    • Complete or fail requests
  • SubscriptionStore: Manages real-time subscriptions

    • Subscribe/unsubscribe to updates
    • Query by connection or request
Implementations
  • connectionStore (connection_store.go): DynamoDB implementation of ConnectionStore
  • requestQueue (request_queue.go): DynamoDB implementation of RequestQueue
Table Definitions (migrations.go)

Defines DynamoDB table schemas with:

  • Primary keys and indexes
  • TTL configuration for automatic cleanup
  • Pay-per-request billing mode
Tables
  1. streamer_connections

    • Primary Key: ConnectionID
    • GSI: UserIndex (UserID)
    • GSI: TenantIndex (TenantID)
    • TTL: 24 hours
  2. streamer_requests

    • Primary Key: RequestID
    • GSI: ConnectionIndex (ConnectionID, CreatedAt)
    • GSI: StatusIndex (Status, CreatedAt)
    • TTL: 7 days
  3. streamer_subscriptions

    • Primary Key: SubscriptionID
    • GSI: ConnectionIndex (ConnectionID)
    • GSI: RequestIndex (RequestID)
    • TTL: 24 hours

Usage

import (
    "github.com/aws/aws-sdk-go-v2/config"
    "github.com/aws/aws-sdk-go-v2/service/dynamodb"
    "github.com/streamer/streamer/internal/store"
)

// Create DynamoDB client
cfg, _ := config.LoadDefaultConfig(context.Background())
client := dynamodb.NewFromConfig(cfg)

// Create tables (one-time setup)
err := store.CreateTables(context.Background(), client)

// Create stores
connStore := store.NewConnectionStore(client, "")
requestQueue := store.NewRequestQueue(client, "")

// Save a connection
conn := &store.Connection{
    ConnectionID: "conn-123",
    UserID:       "user-456",
    TenantID:     "tenant-789",
    Endpoint:     "wss://example.com/ws",
    ConnectedAt:  time.Now(),
    LastPing:     time.Now(),
}
err = connStore.Save(context.Background(), conn)

// Enqueue a request
req := &store.AsyncRequest{
    ConnectionID: "conn-123",
    Action:       "generate_report",
    Payload:      map[string]interface{}{"type": "monthly"},
}
err = requestQueue.Enqueue(context.Background(), req)

Testing

The storage layer includes comprehensive unit tests that can be run against a local DynamoDB instance:

# Start local DynamoDB
docker run -p 8000:8000 amazon/dynamodb-local

# Run tests
go test ./internal/store/...

# Run integration tests
go test ./internal/store/... -run Integration

Error Handling

The storage layer defines custom error types:

  • ErrNotFound: Item not found in DynamoDB
  • ErrAlreadyExists: Item already exists
  • ErrInvalidInput: Validation failed
  • ValidationError: Field-specific validation errors

Use the helper functions to check error types:

if store.IsNotFound(err) {
    // Handle not found
}

Performance Considerations

  • Uses DynamoDB batch operations where possible
  • Implements pagination for large result sets
  • Optimized indexes for common query patterns
  • TTL for automatic cleanup of old data
  • Pay-per-request billing for cost efficiency

Future Improvements

  • Add caching layer for frequently accessed items
  • Implement batch operations for bulk updates
  • Add metrics and monitoring
  • Support for DynamoDB transactions
  • Backup and restore functionality

Documentation

Index

Constants

View Source
const (
	ConnectionsTable   = "streamer_connections"
	RequestsTable      = "streamer_requests"
	SubscriptionsTable = "streamer_subscriptions"
)

TableNames defines the DynamoDB table names

Variables

View Source
var (
	// ErrNotFound is returned when an item is not found in DynamoDB
	ErrNotFound = errors.New("item not found")

	// ErrAlreadyExists is returned when trying to create an item that already exists
	ErrAlreadyExists = errors.New("item already exists")

	// ErrInvalidInput is returned when input validation fails
	ErrInvalidInput = errors.New("invalid input")

	// ErrConnectionClosed is returned when operating on a closed connection
	ErrConnectionClosed = errors.New("connection is closed")

	// ErrRequestNotPending is returned when trying to process a non-pending request
	ErrRequestNotPending = errors.New("request is not in pending state")

	// ErrConcurrentModification is returned when an item was modified concurrently
	ErrConcurrentModification = errors.New("item was modified concurrently")
)

Common errors

Functions

func CreateTables

func CreateTables(ctx context.Context, client *dynamodb.Client) error

CreateTables creates all required DynamoDB tables

func DeleteTables

func DeleteTables(ctx context.Context, client *dynamodb.Client) error

DeleteTables deletes all Streamer tables (for testing)

func IsAlreadyExists

func IsAlreadyExists(err error) bool

IsAlreadyExists checks if an error is an already exists error

func IsNotFound

func IsNotFound(err error) bool

IsNotFound checks if an error is a not found error

func NewValidationError

func NewValidationError(field, message string) error

NewValidationError creates a new validation error

Types

type AsyncRequest

type AsyncRequest struct {
	// Primary key
	RequestID string `dynamodbav:"RequestID" json:"requestId"`

	// Connection that created this request
	ConnectionID string `dynamodbav:"ConnectionID" json:"connectionId"`

	// Status tracking
	Status    RequestStatus `dynamodbav:"Status" json:"status"`
	CreatedAt time.Time     `dynamodbav:"CreatedAt" json:"createdAt"`

	// Request details
	Action  string                 `dynamodbav:"Action" json:"action"`
	Payload map[string]interface{} `dynamodbav:"Payload,omitempty" json:"payload,omitempty"`

	// Processing information
	ProcessingStarted *time.Time `dynamodbav:"ProcessingStarted,omitempty" json:"processingStarted,omitempty"`
	ProcessingEnded   *time.Time `dynamodbav:"ProcessingEnded,omitempty" json:"processingEnded,omitempty"`

	// Result or error
	Result map[string]interface{} `dynamodbav:"Result,omitempty" json:"result,omitempty"`
	Error  string                 `dynamodbav:"Error,omitempty" json:"error,omitempty"`

	// Progress tracking
	Progress        float64                `dynamodbav:"Progress" json:"progress"`
	ProgressMessage string                 `dynamodbav:"ProgressMessage,omitempty" json:"progressMessage,omitempty"`
	ProgressDetails map[string]interface{} `dynamodbav:"ProgressDetails,omitempty" json:"progressDetails,omitempty"`

	// Retry information
	RetryCount int       `dynamodbav:"RetryCount" json:"retryCount"`
	MaxRetries int       `dynamodbav:"MaxRetries" json:"maxRetries"`
	RetryAfter time.Time `dynamodbav:"RetryAfter,omitempty" json:"retryAfter,omitempty"`

	// User and tenant for querying
	UserID   string `dynamodbav:"UserID" json:"userId"`
	TenantID string `dynamodbav:"TenantID" json:"tenantId"`

	// TTL for automatic cleanup
	TTL int64 `dynamodbav:"TTL,omitempty" json:"ttl,omitempty"`
}

AsyncRequest represents a queued async request

type Connection

type Connection struct {
	// Primary key
	ConnectionID string `dynamodbav:"ConnectionID" json:"connectionId"`

	// User and tenant information for multi-tenancy
	UserID   string `dynamodbav:"UserID" json:"userId"`
	TenantID string `dynamodbav:"TenantID" json:"tenantId"`

	// WebSocket endpoint for sending messages
	Endpoint string `dynamodbav:"Endpoint" json:"endpoint"`

	// Timestamps
	ConnectedAt time.Time `dynamodbav:"ConnectedAt" json:"connectedAt"`
	LastPing    time.Time `dynamodbav:"LastPing" json:"lastPing"`

	// Metadata for storing additional information
	Metadata map[string]string `dynamodbav:"Metadata,omitempty" json:"metadata,omitempty"`

	// TTL for automatic cleanup
	TTL int64 `dynamodbav:"TTL,omitempty" json:"ttl,omitempty"`
}

Connection represents a WebSocket connection

type ConnectionStore

type ConnectionStore interface {
	// Save creates or updates a connection
	Save(ctx context.Context, conn *Connection) error

	// Get retrieves a connection by ID
	Get(ctx context.Context, connectionID string) (*Connection, error)

	// Delete removes a connection
	Delete(ctx context.Context, connectionID string) error

	// ListByUser returns all connections for a user
	ListByUser(ctx context.Context, userID string) ([]*Connection, error)

	// ListByTenant returns all connections for a tenant
	ListByTenant(ctx context.Context, tenantID string) ([]*Connection, error)

	// UpdateLastPing updates the last ping timestamp
	UpdateLastPing(ctx context.Context, connectionID string) error

	// DeleteStale removes connections older than the specified time
	DeleteStale(ctx context.Context, before time.Time) error
}

ConnectionStore manages WebSocket connections in DynamoDB

type RequestQueue

type RequestQueue interface {
	// Enqueue adds a new request to the queue
	Enqueue(ctx context.Context, req *AsyncRequest) error

	// Dequeue retrieves and marks requests for processing
	// This is mainly for testing - in production, DynamoDB Streams handle this
	Dequeue(ctx context.Context, limit int) ([]*AsyncRequest, error)

	// UpdateStatus updates the status of a request
	UpdateStatus(ctx context.Context, requestID string, status RequestStatus, message string) error

	// UpdateProgress updates the progress of a request
	UpdateProgress(ctx context.Context, requestID string, progress float64, message string, details map[string]interface{}) error

	// CompleteRequest marks a request as completed with results
	CompleteRequest(ctx context.Context, requestID string, result map[string]interface{}) error

	// FailRequest marks a request as failed with an error
	FailRequest(ctx context.Context, requestID string, errMsg string) error

	// GetByConnection retrieves all requests for a connection
	GetByConnection(ctx context.Context, connectionID string, limit int) ([]*AsyncRequest, error)

	// GetByStatus retrieves requests by status
	GetByStatus(ctx context.Context, status RequestStatus, limit int) ([]*AsyncRequest, error)

	// Get retrieves a specific request
	Get(ctx context.Context, requestID string) (*AsyncRequest, error)

	// Delete removes a request
	Delete(ctx context.Context, requestID string) error
}

RequestQueue manages async requests in DynamoDB

type RequestStatus

type RequestStatus string

RequestStatus represents the status of an async request

const (
	StatusPending    RequestStatus = "PENDING"
	StatusProcessing RequestStatus = "PROCESSING"
	StatusCompleted  RequestStatus = "COMPLETED"
	StatusFailed     RequestStatus = "FAILED"
	StatusCancelled  RequestStatus = "CANCELLED"
	StatusRetrying   RequestStatus = "RETRYING"
)

type StoreError

type StoreError struct {
	Op      string // Operation that failed
	Table   string // DynamoDB table
	Key     string // Item key
	Err     error  // Underlying error
	Message string // Additional context
}

StoreError wraps storage-related errors with additional context

func NewStoreError

func NewStoreError(op, table, key string, err error) *StoreError

NewStoreError creates a new storage error

func (*StoreError) Error

func (e *StoreError) Error() string

Error implements the error interface

func (*StoreError) Is

func (e *StoreError) Is(target error) bool

Is checks if the error matches the target

func (*StoreError) Unwrap

func (e *StoreError) Unwrap() error

Unwrap returns the underlying error

type Subscription

type Subscription struct {
	// Composite key: ConnectionID#RequestID
	SubscriptionID string `dynamodbav:"SubscriptionID" json:"subscriptionId"`

	// Individual components for querying
	ConnectionID string `dynamodbav:"ConnectionID" json:"connectionId"`
	RequestID    string `dynamodbav:"RequestID" json:"requestId"`

	// Subscription details
	EventTypes []string  `dynamodbav:"EventTypes,stringset" json:"eventTypes"`
	CreatedAt  time.Time `dynamodbav:"CreatedAt" json:"createdAt"`

	// TTL for automatic cleanup
	TTL int64 `dynamodbav:"TTL,omitempty" json:"ttl,omitempty"`
}

Subscription represents a real-time update subscription

type SubscriptionStore

type SubscriptionStore interface {
	// Subscribe creates a subscription for progress updates
	Subscribe(ctx context.Context, sub *Subscription) error

	// Unsubscribe removes a subscription
	Unsubscribe(ctx context.Context, connectionID, requestID string) error

	// GetByConnection returns all subscriptions for a connection
	GetByConnection(ctx context.Context, connectionID string) ([]*Subscription, error)

	// GetByRequest returns all subscriptions for a request
	GetByRequest(ctx context.Context, requestID string) ([]*Subscription, error)

	// DeleteByConnection removes all subscriptions for a connection
	DeleteByConnection(ctx context.Context, connectionID string) error
}

SubscriptionStore manages real-time update subscriptions

type TableDefinition

type TableDefinition struct {
	TableName              string
	AttributeDefinitions   []types.AttributeDefinition
	KeySchema              []types.KeySchemaElement
	GlobalSecondaryIndexes []types.GlobalSecondaryIndex
	BillingMode            types.BillingMode
}

TableDefinition holds the configuration for a DynamoDB table

func GetTableDefinitions

func GetTableDefinitions() []TableDefinition

GetTableDefinitions returns all table definitions for Streamer

type ValidationError

type ValidationError struct {
	Field   string
	Message string
}

ValidationError represents input validation errors

func (*ValidationError) Error

func (e *ValidationError) Error() string

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL