Documentation
¶
Index ¶
- Constants
- Variables
- func CreateTables(ctx context.Context, client *dynamodb.Client) error
- func DeleteTables(ctx context.Context, client *dynamodb.Client) error
- func IsAlreadyExists(err error) bool
- func IsNotFound(err error) bool
- func NewValidationError(field, message string) error
- type AsyncRequest
- type Connection
- type ConnectionStore
- type RequestQueue
- type RequestStatus
- type StoreError
- type Subscription
- type SubscriptionStore
- type TableDefinition
- type ValidationError
Constants ¶
const ( ConnectionsTable = "streamer_connections" RequestsTable = "streamer_requests" SubscriptionsTable = "streamer_subscriptions" )
TableNames defines the DynamoDB table names
Variables ¶
var ( // ErrNotFound is returned when an item is not found in DynamoDB ErrNotFound = errors.New("item not found") // ErrAlreadyExists is returned when trying to create an item that already exists ErrAlreadyExists = errors.New("item already exists") // ErrInvalidInput is returned when input validation fails ErrInvalidInput = errors.New("invalid input") // ErrConnectionClosed is returned when operating on a closed connection ErrConnectionClosed = errors.New("connection is closed") // ErrRequestNotPending is returned when trying to process a non-pending request ErrRequestNotPending = errors.New("request is not in pending state") // ErrConcurrentModification is returned when an item was modified concurrently ErrConcurrentModification = errors.New("item was modified concurrently") )
Common errors
Functions ¶
func CreateTables ¶
CreateTables creates all required DynamoDB tables
func DeleteTables ¶
DeleteTables deletes all Streamer tables (for testing)
func IsAlreadyExists ¶
IsAlreadyExists checks if an error is an already exists error
func IsNotFound ¶
IsNotFound checks if an error is a not found error
func NewValidationError ¶
NewValidationError creates a new validation error
Types ¶
type AsyncRequest ¶
type AsyncRequest struct {
// Primary key
RequestID string `dynamodbav:"RequestID" json:"requestId"`
// Connection that created this request
ConnectionID string `dynamodbav:"ConnectionID" json:"connectionId"`
// Status tracking
Status RequestStatus `dynamodbav:"Status" json:"status"`
CreatedAt time.Time `dynamodbav:"CreatedAt" json:"createdAt"`
// Request details
Action string `dynamodbav:"Action" json:"action"`
Payload map[string]interface{} `dynamodbav:"Payload,omitempty" json:"payload,omitempty"`
// Processing information
ProcessingStarted *time.Time `dynamodbav:"ProcessingStarted,omitempty" json:"processingStarted,omitempty"`
ProcessingEnded *time.Time `dynamodbav:"ProcessingEnded,omitempty" json:"processingEnded,omitempty"`
// Result or error
Result map[string]interface{} `dynamodbav:"Result,omitempty" json:"result,omitempty"`
Error string `dynamodbav:"Error,omitempty" json:"error,omitempty"`
// Progress tracking
Progress float64 `dynamodbav:"Progress" json:"progress"`
ProgressMessage string `dynamodbav:"ProgressMessage,omitempty" json:"progressMessage,omitempty"`
ProgressDetails map[string]interface{} `dynamodbav:"ProgressDetails,omitempty" json:"progressDetails,omitempty"`
// Retry information
RetryCount int `dynamodbav:"RetryCount" json:"retryCount"`
MaxRetries int `dynamodbav:"MaxRetries" json:"maxRetries"`
RetryAfter time.Time `dynamodbav:"RetryAfter,omitempty" json:"retryAfter,omitempty"`
// User and tenant for querying
UserID string `dynamodbav:"UserID" json:"userId"`
TenantID string `dynamodbav:"TenantID" json:"tenantId"`
// TTL for automatic cleanup
TTL int64 `dynamodbav:"TTL,omitempty" json:"ttl,omitempty"`
}
AsyncRequest represents a queued async request
type Connection ¶
type Connection struct {
// Primary key
ConnectionID string `dynamodbav:"ConnectionID" json:"connectionId"`
// User and tenant information for multi-tenancy
UserID string `dynamodbav:"UserID" json:"userId"`
TenantID string `dynamodbav:"TenantID" json:"tenantId"`
// WebSocket endpoint for sending messages
Endpoint string `dynamodbav:"Endpoint" json:"endpoint"`
// Timestamps
ConnectedAt time.Time `dynamodbav:"ConnectedAt" json:"connectedAt"`
LastPing time.Time `dynamodbav:"LastPing" json:"lastPing"`
// Metadata for storing additional information
Metadata map[string]string `dynamodbav:"Metadata,omitempty" json:"metadata,omitempty"`
// TTL for automatic cleanup
TTL int64 `dynamodbav:"TTL,omitempty" json:"ttl,omitempty"`
}
Connection represents a WebSocket connection
type ConnectionStore ¶
type ConnectionStore interface {
// Save creates or updates a connection
Save(ctx context.Context, conn *Connection) error
// Get retrieves a connection by ID
Get(ctx context.Context, connectionID string) (*Connection, error)
// Delete removes a connection
Delete(ctx context.Context, connectionID string) error
// ListByUser returns all connections for a user
ListByUser(ctx context.Context, userID string) ([]*Connection, error)
// ListByTenant returns all connections for a tenant
ListByTenant(ctx context.Context, tenantID string) ([]*Connection, error)
// UpdateLastPing updates the last ping timestamp
UpdateLastPing(ctx context.Context, connectionID string) error
// DeleteStale removes connections older than the specified time
DeleteStale(ctx context.Context, before time.Time) error
}
ConnectionStore manages WebSocket connections in DynamoDB
type RequestQueue ¶
type RequestQueue interface {
// Enqueue adds a new request to the queue
Enqueue(ctx context.Context, req *AsyncRequest) error
// Dequeue retrieves and marks requests for processing
// This is mainly for testing - in production, DynamoDB Streams handle this
Dequeue(ctx context.Context, limit int) ([]*AsyncRequest, error)
// UpdateStatus updates the status of a request
UpdateStatus(ctx context.Context, requestID string, status RequestStatus, message string) error
// UpdateProgress updates the progress of a request
UpdateProgress(ctx context.Context, requestID string, progress float64, message string, details map[string]interface{}) error
// CompleteRequest marks a request as completed with results
CompleteRequest(ctx context.Context, requestID string, result map[string]interface{}) error
// FailRequest marks a request as failed with an error
FailRequest(ctx context.Context, requestID string, errMsg string) error
// GetByConnection retrieves all requests for a connection
GetByConnection(ctx context.Context, connectionID string, limit int) ([]*AsyncRequest, error)
// GetByStatus retrieves requests by status
GetByStatus(ctx context.Context, status RequestStatus, limit int) ([]*AsyncRequest, error)
// Get retrieves a specific request
Get(ctx context.Context, requestID string) (*AsyncRequest, error)
// Delete removes a request
Delete(ctx context.Context, requestID string) error
}
RequestQueue manages async requests in DynamoDB
type RequestStatus ¶
type RequestStatus string
RequestStatus represents the status of an async request
const ( StatusPending RequestStatus = "PENDING" StatusProcessing RequestStatus = "PROCESSING" StatusCompleted RequestStatus = "COMPLETED" StatusFailed RequestStatus = "FAILED" StatusCancelled RequestStatus = "CANCELLED" StatusRetrying RequestStatus = "RETRYING" )
type StoreError ¶
type StoreError struct {
Op string // Operation that failed
Table string // DynamoDB table
Key string // Item key
Err error // Underlying error
Message string // Additional context
}
StoreError wraps storage-related errors with additional context
func NewStoreError ¶
func NewStoreError(op, table, key string, err error) *StoreError
NewStoreError creates a new storage error
func (*StoreError) Is ¶
func (e *StoreError) Is(target error) bool
Is checks if the error matches the target
type Subscription ¶
type Subscription struct {
// Composite key: ConnectionID#RequestID
SubscriptionID string `dynamodbav:"SubscriptionID" json:"subscriptionId"`
// Individual components for querying
ConnectionID string `dynamodbav:"ConnectionID" json:"connectionId"`
RequestID string `dynamodbav:"RequestID" json:"requestId"`
// Subscription details
EventTypes []string `dynamodbav:"EventTypes,stringset" json:"eventTypes"`
CreatedAt time.Time `dynamodbav:"CreatedAt" json:"createdAt"`
// TTL for automatic cleanup
TTL int64 `dynamodbav:"TTL,omitempty" json:"ttl,omitempty"`
}
Subscription represents a real-time update subscription
type SubscriptionStore ¶
type SubscriptionStore interface {
// Subscribe creates a subscription for progress updates
Subscribe(ctx context.Context, sub *Subscription) error
// Unsubscribe removes a subscription
Unsubscribe(ctx context.Context, connectionID, requestID string) error
// GetByConnection returns all subscriptions for a connection
GetByConnection(ctx context.Context, connectionID string) ([]*Subscription, error)
// GetByRequest returns all subscriptions for a request
GetByRequest(ctx context.Context, requestID string) ([]*Subscription, error)
// DeleteByConnection removes all subscriptions for a connection
DeleteByConnection(ctx context.Context, connectionID string) error
}
SubscriptionStore manages real-time update subscriptions
type TableDefinition ¶
type TableDefinition struct {
TableName string
AttributeDefinitions []types.AttributeDefinition
KeySchema []types.KeySchemaElement
GlobalSecondaryIndexes []types.GlobalSecondaryIndex
BillingMode types.BillingMode
}
TableDefinition holds the configuration for a DynamoDB table
func GetTableDefinitions ¶
func GetTableDefinitions() []TableDefinition
GetTableDefinitions returns all table definitions for Streamer
type ValidationError ¶
ValidationError represents input validation errors
func (*ValidationError) Error ¶
func (e *ValidationError) Error() string