connection

package
v1.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 7, 2025 License: Apache-2.0 Imports: 14 Imported by: 0

README

Connection Manager Package

The connection package provides WebSocket connection management for the Streamer system. It handles sending messages to connected clients via AWS API Gateway Management API.

Overview

The ConnectionManager is responsible for:

  • Sending messages to individual WebSocket connections
  • Broadcasting messages to multiple connections
  • Checking connection health
  • Automatic cleanup of stale connections
  • Connection caching for performance

Usage

Basic Setup
import (
    "github.com/aws/aws-sdk-go-v2/config"
    "github.com/aws/aws-sdk-go-v2/service/apigatewaymanagementapi"
    "github.com/pay-theory/streamer/internal/store"
    "github.com/pay-theory/streamer/pkg/connection"
)

// Initialize AWS config
cfg, err := config.LoadDefaultConfig(ctx)
if err != nil {
    return err
}

// Create API Gateway Management API client
// The endpoint should be your WebSocket API endpoint
endpoint := "https://your-api-id.execute-api.region.amazonaws.com/stage"
apiClient := apigatewaymanagementapi.NewFromConfig(cfg, func(o *apigatewaymanagementapi.Options) {
    o.EndpointResolver = apigatewaymanagementapi.EndpointResolverFunc(
        func(region string, options apigatewaymanagementapi.EndpointResolverOptions) (aws.Endpoint, error) {
            return aws.Endpoint{
                URL: endpoint,
            }, nil
        })
})

// Create connection manager
connManager := connection.NewManager(connectionStore, apiClient, endpoint)
Sending Messages
// Send to a single connection
message := map[string]interface{}{
    "type": "progress",
    "request_id": "req_123",
    "percentage": 45.5,
    "message": "Processing...",
}

err := connManager.Send(ctx, connectionID, message)
if err != nil {
    if connection.IsConnectionGone(err) {
        // Connection no longer exists
        log.Printf("Connection %s is gone", connectionID)
    } else {
        // Other error
        return err
    }
}
Broadcasting Messages
// Broadcast to multiple connections
connectionIDs := []string{"conn1", "conn2", "conn3"}
notification := map[string]interface{}{
    "type": "announcement",
    "message": "System update in 5 minutes",
}

err := connManager.Broadcast(ctx, connectionIDs, notification)
if err != nil {
    // Broadcast error contains details about failed connections
    var broadcastErr *connection.BroadcastError
    if errors.As(err, &broadcastErr) {
        log.Printf("Broadcast failed for %d connections", len(broadcastErr.Errors))
    }
}
Checking Connection Status
// Check if a connection is active
if connManager.IsActive(ctx, connectionID) {
    // Connection is active
} else {
    // Connection is inactive or gone
}

Error Handling

The package provides specific error types:

  • ConnectionGoneError: Indicates the WebSocket connection no longer exists (410 Gone)
  • BroadcastError: Contains errors from failed broadcast attempts

Use the helper function to check for specific errors:

if connection.IsConnectionGone(err) {
    // Handle gone connection
}

Performance Considerations

  1. Connection Caching: The manager caches connection status for 30 seconds to reduce DynamoDB lookups
  2. Parallel Broadcasts: Broadcasts are sent in parallel with a concurrency limit of 50
  3. Automatic Cleanup: Gone connections are automatically removed from DynamoDB
  4. Async Operations: Last ping updates and cleanup operations run asynchronously

Testing

The package includes comprehensive unit tests with mocked dependencies:

// Create mocks for testing
mockStore := new(MockConnectionStore)
mockAPI := new(MockAPIGatewayClient)

// Set up expectations
mockStore.On("Get", mock.Anything, "conn-123").Return(conn, nil)
mockAPI.On("PostToConnection", mock.Anything, mock.Anything).Return(output, nil)

// Create manager with mocks
manager := NewManager(mockStore, mockAPI, endpoint)

Integration with Team 2

Team 2 can use this ConnectionManager in their router and async processor:

In Router Lambda
// When sending sync responses
response := map[string]interface{}{
    "type": "response",
    "request_id": request.ID,
    "data": result,
}
err = connManager.Send(ctx, request.ConnectionID, response)
In Async Processor
// When sending progress updates
progressReporter := &progressReporter{
    connManager: connManager,
    connectionID: request.ConnectionID,
    requestID: request.ID,
}

// Use progress reporter during processing
progressReporter.Report(25.0, "Processing started")

Environment Variables

  • WEBSOCKET_ENDPOINT: The WebSocket API endpoint URL
  • AWS_REGION: AWS region for the services

Dependencies

  • AWS SDK v2 for Go
  • API Gateway Management API client
  • Internal store package (from Team 1)

Documentation

Overview

Example

Example demonstrates how to use the ConnectionManager

package main

import (
	"context"
	"errors"
	"fmt"
	"log"
	"time"

	"github.com/aws/aws-sdk-go-v2/aws"
	"github.com/aws/aws-sdk-go-v2/config"
	"github.com/aws/aws-sdk-go-v2/service/apigatewaymanagementapi"
	"github.com/aws/aws-sdk-go-v2/service/dynamodb"
	"github.com/pay-theory/streamer/internal/store"
	"github.com/pay-theory/streamer/pkg/connection"
)

func main() {
	// Load AWS configuration
	cfg, err := config.LoadDefaultConfig(context.Background())
	if err != nil {
		log.Fatal(err)
	}

	// Create DynamoDB client for the store
	dynamoClient := dynamodb.NewFromConfig(cfg)

	// Create connection store
	connStore := store.NewConnectionStore(dynamoClient, "streamer_connections")

	// Create API Gateway Management API client
	// The endpoint should be your WebSocket API endpoint
	endpoint := "https://abc123.execute-api.us-east-1.amazonaws.com/production"
	apiGatewayClient := apigatewaymanagementapi.NewFromConfig(cfg, func(o *apigatewaymanagementapi.Options) {
		o.EndpointResolver = apigatewaymanagementapi.EndpointResolverFunc(func(region string, options apigatewaymanagementapi.EndpointResolverOptions) (aws.Endpoint, error) {
			return aws.Endpoint{
				URL: endpoint,
			}, nil
		})
	})

	// Create connection manager
	manager := connection.NewManager(connStore, apiGatewayClient, endpoint)

	// Optional: Set custom logger
	manager.SetLogger(func(format string, args ...interface{}) {
		log.Printf("[ConnectionManager] "+format, args...)
	})

	// Example 1: Send a message to a single connection
	err = manager.Send(context.Background(), "connection-123", map[string]interface{}{
		"type": "notification",
		"data": map[string]string{
			"message":   "Hello from the server!",
			"timestamp": time.Now().Format(time.RFC3339),
		},
	})
	if err != nil {
		switch {
		case errors.Is(err, connection.ErrConnectionNotFound):
			log.Printf("Connection not found")
		case errors.Is(err, connection.ErrConnectionStale):
			log.Printf("Connection is stale and has been removed")
		default:
			log.Printf("Failed to send message: %v", err)
		}
	}

	// Example 2: Broadcast to multiple connections
	connectionIDs := []string{"conn-1", "conn-2", "conn-3"}
	err = manager.Broadcast(context.Background(), connectionIDs, map[string]interface{}{
		"type": "broadcast",
		"data": map[string]string{
			"announcement": "System maintenance in 5 minutes",
		},
	})
	if err != nil {
		log.Printf("Broadcast had some failures: %v", err)
	}

	// Example 3: Check if a connection is active
	if manager.IsActive(context.Background(), "connection-123") {
		fmt.Println("Connection is active")
	} else {
		fmt.Println("Connection is not active")
	}
}
Example (MockUsage)
package main

import (
	"context"
	"fmt"
)

// Example_mockForTesting shows how Team 2 can mock the ConnectionManager for tests
type MockConnectionManager struct {
	SendFunc func(ctx context.Context, connectionID string, message interface{}) error
	calls    []SendCall
}

type SendCall struct {
	ConnectionID string
	Message      interface{}
}

func (m *MockConnectionManager) Send(ctx context.Context, connectionID string, message interface{}) error {
	m.calls = append(m.calls, SendCall{
		ConnectionID: connectionID,
		Message:      message,
	})

	if m.SendFunc != nil {
		return m.SendFunc(ctx, connectionID, message)
	}
	return nil
}

func main() {
	// In Team 2's tests:
	mockManager := &MockConnectionManager{
		SendFunc: func(ctx context.Context, connectionID string, message interface{}) error {
			// Simulate successful send
			return nil
		},
	}

	// Use mock in router tests
	// router := streamer.NewRouter(requestStore, mockManager)

	// Verify calls were made
	fmt.Printf("Number of sends: %d\n", len(mockManager.calls))
	fmt.Printf("First call was to connection: %s\n", mockManager.calls[0].ConnectionID)
}
Example (WithRouter)

Example_withRouter shows integration with Team 2's router

package main

import (
	"context"
	"log"

	"github.com/pay-theory/streamer/pkg/connection"
)

func main() {
	// Setup connection manager (as shown above)
	var manager *connection.Manager // ... initialized as above

	// This is how Team 2 would use it in their router
	// The manager implements the ConnectionManager interface expected by the router:
	//
	// type ConnectionManager interface {
	//     Send(ctx context.Context, connectionID string, message interface{}) error
	// }

	// In router.go:
	// router := streamer.NewRouter(requestStore, manager)

	// When router needs to send a response:
	response := map[string]interface{}{
		"type":       "response",
		"request_id": "req-123",
		"success":    true,
		"data": map[string]string{
			"result": "Operation completed",
		},
	}

	err := manager.Send(context.Background(), "connection-456", response)
	if err != nil {
		log.Printf("Failed to send response: %v", err)
	}
}

Index

Examples

Constants

This section is empty.

Variables

View Source
var (
	// ErrConnectionNotFound indicates the connection ID doesn't exist
	ErrConnectionNotFound = errors.New("connection not found")

	// ErrConnectionStale indicates the connection is no longer valid (410 Gone)
	ErrConnectionStale = errors.New("connection is stale")

	// ErrInvalidMessage indicates the message could not be marshaled
	ErrInvalidMessage = errors.New("invalid message format")

	// ErrBroadcastPartialFailure indicates some connections failed during broadcast
	ErrBroadcastPartialFailure = errors.New("broadcast partially failed")
)

Common errors

Functions

func IsConnectionGone

func IsConnectionGone(err error) bool

IsConnectionGone checks if an error indicates the connection is gone

Types

type BroadcastError

type BroadcastError struct {
	Failed []string // Connection IDs that failed
	Errors []error  // Corresponding errors
}

BroadcastError contains errors from broadcasting to multiple connections

func (*BroadcastError) Error

func (e *BroadcastError) Error() string

type CircuitBreaker

type CircuitBreaker struct {
	// contains filtered or unexported fields
}

CircuitBreaker tracks connection health

func (*CircuitBreaker) CountOpen

func (cb *CircuitBreaker) CountOpen() int

func (*CircuitBreaker) IsOpen

func (cb *CircuitBreaker) IsOpen(connectionID string) bool

func (*CircuitBreaker) RecordFailure

func (cb *CircuitBreaker) RecordFailure(connectionID string)

func (*CircuitBreaker) RecordSuccess

func (cb *CircuitBreaker) RecordSuccess(connectionID string)

type ConnectionError

type ConnectionError struct {
	ConnectionID string
	Err          error
}

ConnectionError represents a connection-specific error

func (*ConnectionError) Error

func (e *ConnectionError) Error() string

func (*ConnectionError) Unwrap

func (e *ConnectionError) Unwrap() error

type ConnectionGoneError

type ConnectionGoneError struct {
	ConnectionID string
}

ConnectionGoneError indicates a WebSocket connection no longer exists

func (*ConnectionGoneError) Error

func (e *ConnectionGoneError) Error() string

type LatencyTracker

type LatencyTracker struct {
	// contains filtered or unexported fields
}

LatencyTracker tracks latency percentiles

func (*LatencyTracker) Percentile

func (lt *LatencyTracker) Percentile(p float64) time.Duration

func (*LatencyTracker) Record

func (lt *LatencyTracker) Record(d time.Duration)

type Manager

type Manager struct {
	// contains filtered or unexported fields
}

Manager handles WebSocket connections through API Gateway

func NewManager

func NewManager(store store.ConnectionStore, apiGateway *apigatewaymanagementapi.Client, endpoint string) *Manager

NewManager creates a new connection manager

func (*Manager) Broadcast

func (m *Manager) Broadcast(ctx context.Context, connectionIDs []string, message interface{}) error

Broadcast sends a message to multiple connections

func (*Manager) GetMetrics

func (m *Manager) GetMetrics() map[string]interface{}

GetMetrics returns current performance metrics

func (*Manager) IsActive

func (m *Manager) IsActive(ctx context.Context, connectionID string) bool

IsActive checks if a connection is active

func (*Manager) Send

func (m *Manager) Send(ctx context.Context, connectionID string, message interface{}) error

Send sends a message to a specific connection

func (*Manager) SetLogger

func (m *Manager) SetLogger(logger func(format string, args ...interface{}))

SetLogger sets a custom logger function

func (*Manager) Shutdown

func (m *Manager) Shutdown(ctx context.Context) error

Shutdown gracefully shuts down the manager

type Metrics

type Metrics struct {
	SendLatency      *LatencyTracker
	BroadcastLatency *LatencyTracker
	ErrorsByType     map[string]*atomic.Int64
	ActiveSends      *atomic.Int32
	// contains filtered or unexported fields
}

Metrics holds performance metrics for the connection manager

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL