connection

package
v1.0.8 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 12, 2025 License: Apache-2.0 Imports: 15 Imported by: 0

README

Connection Manager Package

The connection package provides WebSocket connection management for the Streamer system. It handles sending messages to connected clients via AWS API Gateway Management API.

Overview

The ConnectionManager is responsible for:

  • Sending messages to individual WebSocket connections
  • Broadcasting messages to multiple connections
  • Checking connection health
  • Automatic cleanup of stale connections
  • Connection caching for performance

Usage

Basic Setup
import (
    "github.com/aws/aws-sdk-go-v2/config"
    "github.com/aws/aws-sdk-go-v2/service/apigatewaymanagementapi"
    "github.com/pay-theory/streamer/internal/store"
    "github.com/pay-theory/streamer/pkg/connection"
)

// Initialize AWS config
cfg, err := config.LoadDefaultConfig(ctx)
if err != nil {
    return err
}

// Create API Gateway Management API client
// The endpoint should be your WebSocket API endpoint
endpoint := "https://your-api-id.execute-api.region.amazonaws.com/stage"
apiClient := apigatewaymanagementapi.NewFromConfig(cfg, func(o *apigatewaymanagementapi.Options) {
    o.EndpointResolver = apigatewaymanagementapi.EndpointResolverFunc(
        func(region string, options apigatewaymanagementapi.EndpointResolverOptions) (aws.Endpoint, error) {
            return aws.Endpoint{
                URL: endpoint,
            }, nil
        })
})

// Create connection manager
connManager := connection.NewManager(connectionStore, apiClient, endpoint)
Sending Messages
// Send to a single connection
message := map[string]interface{}{
    "type": "progress",
    "request_id": "req_123",
    "percentage": 45.5,
    "message": "Processing...",
}

err := connManager.Send(ctx, connectionID, message)
if err != nil {
    if connection.IsConnectionGone(err) {
        // Connection no longer exists
        log.Printf("Connection %s is gone", connectionID)
    } else {
        // Other error
        return err
    }
}
Broadcasting Messages
// Broadcast to multiple connections
connectionIDs := []string{"conn1", "conn2", "conn3"}
notification := map[string]interface{}{
    "type": "announcement",
    "message": "System update in 5 minutes",
}

err := connManager.Broadcast(ctx, connectionIDs, notification)
if err != nil {
    // Broadcast error contains details about failed connections
    var broadcastErr *connection.BroadcastError
    if errors.As(err, &broadcastErr) {
        log.Printf("Broadcast failed for %d connections", len(broadcastErr.Errors))
    }
}
Checking Connection Status
// Check if a connection is active
if connManager.IsActive(ctx, connectionID) {
    // Connection is active
} else {
    // Connection is inactive or gone
}

Error Handling

The package provides specific error types:

  • ConnectionGoneError: Indicates the WebSocket connection no longer exists (410 Gone)
  • BroadcastError: Contains errors from failed broadcast attempts

Use the helper function to check for specific errors:

if connection.IsConnectionGone(err) {
    // Handle gone connection
}

Performance Considerations

  1. Connection Caching: The manager caches connection status for 30 seconds to reduce DynamoDB lookups
  2. Parallel Broadcasts: Broadcasts are sent in parallel with a concurrency limit of 50
  3. Automatic Cleanup: Gone connections are automatically removed from DynamoDB
  4. Async Operations: Last ping updates and cleanup operations run asynchronously

Testing

The package includes comprehensive unit tests with mocked dependencies:

// Create mocks for testing
mockStore := new(MockConnectionStore)
mockAPI := new(MockAPIGatewayClient)

// Set up expectations
mockStore.On("Get", mock.Anything, "conn-123").Return(conn, nil)
mockAPI.On("PostToConnection", mock.Anything, mock.Anything).Return(output, nil)

// Create manager with mocks
manager := NewManager(mockStore, mockAPI, endpoint)
Premade Mocks for Your Tests

We provide several ready-to-use mocks in testing.go:

SendOnlyMock

For packages that only need the Send method:

mock := connection.NewSendOnlyMock()
router := streamer.NewRouter(store, mock)

// Verify messages
messages := mock.GetMessages("conn-123")
ProgressReporterMock

For packages that need Send + IsActive:

mock := connection.NewProgressReporterMock()
mock.SetActive("conn-123", true)

reporter := progress.NewReporter("req-123", "conn-123", mock)
FailingMock

For error handling tests:

mock := connection.NewFailingMock(errors.New("network error"))
err := mock.Send(ctx, "any", "message")
// err will be "network error"

See TESTING_GUIDE.md for comprehensive testing documentation.

Integration with Team 2

Team 2 can use this ConnectionManager in their router and async processor:

In Router Lambda
// When sending sync responses
response := map[string]interface{}{
    "type": "response",
    "request_id": request.ID,
    "data": result,
}
err = connManager.Send(ctx, request.ConnectionID, response)
In Async Processor
// When sending progress updates
progressReporter := &progressReporter{
    connManager: connManager,
    connectionID: request.ConnectionID,
    requestID: request.ID,
}

// Use progress reporter during processing
progressReporter.Report(25.0, "Processing started")

Environment Variables

  • WEBSOCKET_ENDPOINT: The WebSocket API endpoint URL
  • AWS_REGION: AWS region for the services

Dependencies

  • AWS SDK v2 for Go
  • API Gateway Management API client
  • Internal store package (from Team 1)

Documentation

Overview

Package connection provides mock implementations for testing.

This file consolidates all mock types for the connection package. We provide both manual mocks (with function fields) and testify-based mocks to support different testing styles across the codebase.

Package connection provides testing utilities for connection management.

These mocks are designed to work with any interface that has Send/IsActive methods, making them usable across different packages that define their own ConnectionManager interfaces.

Example

Example demonstrates how to use the ConnectionManager

package main

import (
	"context"
	"errors"
	"fmt"
	"log"
	"time"

	"github.com/aws/aws-sdk-go-v2/aws"
	"github.com/aws/aws-sdk-go-v2/config"
	"github.com/aws/aws-sdk-go-v2/service/apigatewaymanagementapi"
	"github.com/pay-theory/dynamorm/pkg/session"
	"github.com/pay-theory/streamer/internal/store/dynamorm"
	"github.com/pay-theory/streamer/pkg/connection"
)

func main() {
	// Load AWS configuration
	cfg, err := config.LoadDefaultConfig(context.Background())
	if err != nil {
		log.Fatal(err)
	}

	// Create DynamORM factory
	dynamormConfig := session.Config{
		Region: cfg.Region,
	}

	storeFactory, err := dynamorm.NewStoreFactory(dynamormConfig)
	if err != nil {
		log.Fatalf("Failed to create DynamORM store factory: %v", err)
	}

	// Get connection store from factory
	connStore := storeFactory.ConnectionStore()

	// Create API Gateway Management API client
	// The endpoint should be your WebSocket API endpoint
	endpoint := "https://abc123.execute-api.us-east-1.amazonaws.com/production"
	apiGatewayClient := apigatewaymanagementapi.NewFromConfig(cfg, func(o *apigatewaymanagementapi.Options) {
		o.EndpointResolver = apigatewaymanagementapi.EndpointResolverFunc(func(region string, options apigatewaymanagementapi.EndpointResolverOptions) (aws.Endpoint, error) {
			return aws.Endpoint{
				URL: endpoint,
			}, nil
		})
	})

	// Wrap the AWS SDK client with our adapter
	apiGatewayAdapter := connection.NewAWSAPIGatewayAdapter(apiGatewayClient)

	// Create connection manager
	manager := connection.NewManager(connStore, apiGatewayAdapter, endpoint)

	// Optional: Set custom logger
	manager.SetLogger(func(format string, args ...interface{}) {
		log.Printf("[ConnectionManager] "+format, args...)
	})

	// Example 1: Send a message to a single connection
	err = manager.Send(context.Background(), "connection-123", map[string]interface{}{
		"type": "notification",
		"data": map[string]string{
			"message":   "Hello from the server!",
			"timestamp": time.Now().Format(time.RFC3339),
		},
	})
	if err != nil {
		switch {
		case errors.Is(err, connection.ErrConnectionNotFound):
			log.Printf("Connection not found")
		case errors.Is(err, connection.ErrConnectionStale):
			log.Printf("Connection is stale and has been removed")
		default:
			log.Printf("Failed to send message: %v", err)
		}
	}

	// Example 2: Broadcast to multiple connections
	connectionIDs := []string{"conn-1", "conn-2", "conn-3"}
	err = manager.Broadcast(context.Background(), connectionIDs, map[string]interface{}{
		"type": "broadcast",
		"data": map[string]string{
			"announcement": "System maintenance in 5 minutes",
		},
	})
	if err != nil {
		log.Printf("Broadcast had some failures: %v", err)
	}

	// Example 3: Check if a connection is active
	if manager.IsActive(context.Background(), "connection-123") {
		fmt.Println("Connection is active")
	} else {
		fmt.Println("Connection is not active")
	}
}
Example (MockUsage)
package main

import (
	"context"
	"fmt"
)

// Example_mockForTesting shows how Team 2 can mock the ConnectionManager for tests
type MockConnectionManager struct {
	SendFunc func(ctx context.Context, connectionID string, message interface{}) error
	calls    []SendCall
}

type SendCall struct {
	ConnectionID string
	Message      interface{}
}

func (m *MockConnectionManager) Send(ctx context.Context, connectionID string, message interface{}) error {
	m.calls = append(m.calls, SendCall{
		ConnectionID: connectionID,
		Message:      message,
	})

	if m.SendFunc != nil {
		return m.SendFunc(ctx, connectionID, message)
	}
	return nil
}

func main() {
	// In Team 2's tests:
	mockManager := &MockConnectionManager{
		SendFunc: func(ctx context.Context, connectionID string, message interface{}) error {
			// Simulate successful send
			return nil
		},
	}

	// Use mock in router tests
	// router := streamer.NewRouter(requestStore, mockManager)

	// Verify calls were made
	fmt.Printf("Number of sends: %d\n", len(mockManager.calls))
	fmt.Printf("First call was made\n")
}
Example (WithRouter)

Example_withRouter shows integration with Team 2's router

package main

import (
	"context"
	"log"

	"github.com/pay-theory/streamer/pkg/connection"
)

func main() {
	// Setup connection manager (as shown above)
	var manager *connection.Manager // ... initialized as above

	// This is how Team 2 would use it in their router
	// The manager implements the ConnectionManager interface expected by the router:
	//
	// type ConnectionManager interface {
	//     Send(ctx context.Context, connectionID string, message interface{}) error
	// }

	// In router.go:
	// router := streamer.NewRouter(requestStore, manager)

	// When router needs to send a response:
	response := map[string]interface{}{
		"type":       "response",
		"request_id": "req-123",
		"success":    true,
		"data": map[string]string{
			"result": "Operation completed",
		},
	}

	err := manager.Send(context.Background(), "connection-456", response)
	if err != nil {
		log.Printf("Failed to send response: %v", err)
	}
}

Index

Examples

Constants

This section is empty.

Variables

View Source
var (
	// ErrConnectionNotFound indicates the connection ID doesn't exist
	ErrConnectionNotFound = errors.New("connection not found")

	// ErrConnectionStale indicates the connection is no longer valid (410 Gone)
	ErrConnectionStale = errors.New("connection is stale")

	// ErrInvalidMessage indicates the message could not be marshaled
	ErrInvalidMessage = errors.New("invalid message format")

	// ErrBroadcastPartialFailure indicates some connections failed during broadcast
	ErrBroadcastPartialFailure = errors.New("broadcast partially failed")
)

Common errors

Functions

func IsConnectionGone

func IsConnectionGone(err error) bool

IsConnectionGone checks if an error indicates the connection is gone

Types

type APIError added in v1.0.1

type APIError interface {
	error
	// HTTPStatusCode returns the HTTP status code of the error
	HTTPStatusCode() int
	// ErrorCode returns the error code (e.g., "GoneException")
	ErrorCode() string
	// IsRetryable returns true if the error is retryable
	IsRetryable() bool
}

APIError represents an error from the API Gateway service

type APIGatewayClient added in v1.0.1

type APIGatewayClient interface {
	// PostToConnection sends data to a WebSocket connection
	PostToConnection(ctx context.Context, connectionID string, data []byte) error

	// DeleteConnection terminates a WebSocket connection
	DeleteConnection(ctx context.Context, connectionID string) error

	// GetConnection retrieves connection information
	GetConnection(ctx context.Context, connectionID string) (*ConnectionInfo, error)
}

APIGatewayClient defines the interface for API Gateway Management API operations

type AWSAPIGatewayAdapter added in v1.0.1

type AWSAPIGatewayAdapter struct {
	// contains filtered or unexported fields
}

AWSAPIGatewayAdapter adapts the AWS SDK client to our interface

func NewAWSAPIGatewayAdapter added in v1.0.1

func NewAWSAPIGatewayAdapter(client *apigatewaymanagementapi.Client) *AWSAPIGatewayAdapter

NewAWSAPIGatewayAdapter creates a new adapter

func (*AWSAPIGatewayAdapter) DeleteConnection added in v1.0.1

func (a *AWSAPIGatewayAdapter) DeleteConnection(ctx context.Context, connectionID string) error

DeleteConnection terminates a WebSocket connection

func (*AWSAPIGatewayAdapter) GetConnection added in v1.0.1

func (a *AWSAPIGatewayAdapter) GetConnection(ctx context.Context, connectionID string) (*ConnectionInfo, error)

GetConnection retrieves connection information

func (*AWSAPIGatewayAdapter) PostToConnection added in v1.0.1

func (a *AWSAPIGatewayAdapter) PostToConnection(ctx context.Context, connectionID string, data []byte) error

PostToConnection sends data to a WebSocket connection

type BroadcastError

type BroadcastError struct {
	Failed []string // Connection IDs that failed
	Errors []error  // Corresponding errors
}

BroadcastError contains errors from broadcasting to multiple connections

func (*BroadcastError) Error

func (e *BroadcastError) Error() string

type CircuitBreaker

type CircuitBreaker struct {
	// contains filtered or unexported fields
}

CircuitBreaker tracks connection health

func (*CircuitBreaker) CountOpen

func (cb *CircuitBreaker) CountOpen() int

func (*CircuitBreaker) IsOpen

func (cb *CircuitBreaker) IsOpen(connectionID string) bool

func (*CircuitBreaker) RecordFailure

func (cb *CircuitBreaker) RecordFailure(connectionID string)

func (*CircuitBreaker) RecordSuccess

func (cb *CircuitBreaker) RecordSuccess(connectionID string)

type ConnectionError

type ConnectionError struct {
	ConnectionID string
	Err          error
}

ConnectionError represents a connection-specific error

func (*ConnectionError) Error

func (e *ConnectionError) Error() string

func (*ConnectionError) Unwrap

func (e *ConnectionError) Unwrap() error

type ConnectionGoneError

type ConnectionGoneError struct {
	ConnectionID string
}

ConnectionGoneError indicates a WebSocket connection no longer exists

func (*ConnectionGoneError) Error

func (e *ConnectionGoneError) Error() string

type ConnectionInfo added in v1.0.1

type ConnectionInfo struct {
	ConnectionID string
	ConnectedAt  string
	LastActiveAt string
	SourceIP     string
	UserAgent    string
}

ConnectionInfo represents information about a WebSocket connection

type ConnectionManager added in v1.0.1

type ConnectionManager interface {
	// Send sends a message to a specific connection
	Send(ctx context.Context, connectionID string, message interface{}) error

	// Broadcast sends a message to multiple connections
	Broadcast(ctx context.Context, connectionIDs []string, message interface{}) error

	// IsActive checks if a connection is active
	IsActive(ctx context.Context, connectionID string) bool

	// GetMetrics returns current performance metrics
	GetMetrics() map[string]interface{}

	// Shutdown gracefully shuts down the manager
	Shutdown(ctx context.Context) error

	// SetLogger sets a custom logger function
	SetLogger(logger func(format string, args ...interface{}))
}

ConnectionManager defines the interface for managing WebSocket connections

type ConnectionMetrics added in v1.0.1

type ConnectionMetrics struct {
	SendCount      int
	BroadcastCount int
	FailureCount   int
	ActiveConns    map[string]bool
}

ConnectionMetrics tracks mock connection metrics

type FailingMock added in v1.0.1

type FailingMock struct {
	Error error
}

FailingMock always returns errors - useful for error handling tests

func NewFailingMock added in v1.0.1

func NewFailingMock(err error) *FailingMock

NewFailingMock creates a mock that always fails

func (*FailingMock) IsActive added in v1.0.1

func (m *FailingMock) IsActive(ctx context.Context, connectionID string) bool

IsActive always returns false

func (*FailingMock) Send added in v1.0.1

func (m *FailingMock) Send(ctx context.Context, connectionID string, message interface{}) error

Send always returns the configured error

type ForbiddenError added in v1.0.1

type ForbiddenError struct {
	ConnectionID string
	Message      string
}

ForbiddenError indicates access is forbidden (403 Forbidden)

func (ForbiddenError) Error added in v1.0.1

func (e ForbiddenError) Error() string

func (ForbiddenError) ErrorCode added in v1.0.1

func (e ForbiddenError) ErrorCode() string

func (ForbiddenError) HTTPStatusCode added in v1.0.1

func (e ForbiddenError) HTTPStatusCode() int

func (ForbiddenError) IsRetryable added in v1.0.1

func (e ForbiddenError) IsRetryable() bool

type GoneError added in v1.0.1

type GoneError struct {
	ConnectionID string
	Message      string
}

GoneError indicates the connection no longer exists (410 Gone)

func (GoneError) Error added in v1.0.1

func (e GoneError) Error() string

Error implementations

func (GoneError) ErrorCode added in v1.0.1

func (e GoneError) ErrorCode() string

func (GoneError) HTTPStatusCode added in v1.0.1

func (e GoneError) HTTPStatusCode() int

func (GoneError) IsRetryable added in v1.0.1

func (e GoneError) IsRetryable() bool

type InternalServerError added in v1.0.1

type InternalServerError struct {
	Message string
}

InternalServerError indicates a server error (500)

func (InternalServerError) Error added in v1.0.1

func (e InternalServerError) Error() string

func (InternalServerError) ErrorCode added in v1.0.1

func (e InternalServerError) ErrorCode() string

func (InternalServerError) HTTPStatusCode added in v1.0.1

func (e InternalServerError) HTTPStatusCode() int

func (InternalServerError) IsRetryable added in v1.0.1

func (e InternalServerError) IsRetryable() bool

type LatencyTracker

type LatencyTracker struct {
	// contains filtered or unexported fields
}

LatencyTracker tracks latency percentiles

func (*LatencyTracker) Percentile

func (lt *LatencyTracker) Percentile(p float64) time.Duration

func (*LatencyTracker) Record

func (lt *LatencyTracker) Record(d time.Duration)

type Manager

type Manager struct {
	// contains filtered or unexported fields
}

Manager handles WebSocket connections through API Gateway

func NewManager

func NewManager(store store.ConnectionStore, apiGateway APIGatewayClient, endpoint string) *Manager

NewManager creates a new connection manager

func (*Manager) Broadcast

func (m *Manager) Broadcast(ctx context.Context, connectionIDs []string, message interface{}) error

Broadcast sends a message to multiple connections

func (*Manager) GetMetrics

func (m *Manager) GetMetrics() map[string]interface{}

GetMetrics returns current performance metrics

func (*Manager) IsActive

func (m *Manager) IsActive(ctx context.Context, connectionID string) bool

IsActive checks if a connection is active

func (*Manager) Send

func (m *Manager) Send(ctx context.Context, connectionID string, message interface{}) error

Send sends a message to a specific connection

func (*Manager) SetLogger

func (m *Manager) SetLogger(logger func(format string, args ...interface{}))

SetLogger sets a custom logger function

func (*Manager) Shutdown

func (m *Manager) Shutdown(ctx context.Context) error

Shutdown gracefully shuts down the manager

type MethodCall added in v1.0.1

type MethodCall struct {
	Method    string
	Arguments []interface{}
	Timestamp time.Time
}

MethodCall records a method invocation

type Metrics

type Metrics struct {
	SendLatency      *LatencyTracker
	BroadcastLatency *LatencyTracker
	ErrorsByType     map[string]*atomic.Int64
	ActiveSends      *atomic.Int32
	// contains filtered or unexported fields
}

Metrics holds performance metrics for the connection manager

type MockAPIGatewayClient added in v1.0.1

type MockAPIGatewayClient struct {
	mock.Mock
	// contains filtered or unexported fields
}

MockAPIGatewayClient is a testify-based mock of APIGatewayClient

func NewMockAPIGatewayClient added in v1.0.1

func NewMockAPIGatewayClient() *MockAPIGatewayClient

NewMockAPIGatewayClient creates a new mock API Gateway client

func (*MockAPIGatewayClient) AddConnection added in v1.0.1

func (m *MockAPIGatewayClient) AddConnection(connectionID string, info *ConnectionInfo)

Helper methods

func (*MockAPIGatewayClient) ClearMessages added in v1.0.1

func (m *MockAPIGatewayClient) ClearMessages()

func (*MockAPIGatewayClient) DeleteConnection added in v1.0.1

func (m *MockAPIGatewayClient) DeleteConnection(ctx context.Context, connectionID string) error

func (*MockAPIGatewayClient) GetConnection added in v1.0.1

func (m *MockAPIGatewayClient) GetConnection(ctx context.Context, connectionID string) (*ConnectionInfo, error)

func (*MockAPIGatewayClient) GetMessages added in v1.0.1

func (m *MockAPIGatewayClient) GetMessages(connectionID string) [][]byte

func (*MockAPIGatewayClient) PostToConnection added in v1.0.1

func (m *MockAPIGatewayClient) PostToConnection(ctx context.Context, connectionID string, data []byte) error

type MockConnectionManager added in v1.0.1

type MockConnectionManager struct {
	// Function fields for each method
	SendFunc       func(ctx context.Context, connectionID string, message interface{}) error
	BroadcastFunc  func(ctx context.Context, connectionIDs []string, message interface{}) error
	IsActiveFunc   func(ctx context.Context, connectionID string) bool
	GetMetricsFunc func() map[string]interface{}
	ShutdownFunc   func(ctx context.Context) error
	SetLoggerFunc  func(logger func(format string, args ...interface{}))
	// contains filtered or unexported fields
}

MockConnectionManager is a manual mock implementation of ConnectionManager. Use this when you need fine control over mock behavior or want to avoid testify.

func NewMockConnectionManager added in v1.0.1

func NewMockConnectionManager() *MockConnectionManager

NewMockConnectionManager creates a mock with sensible defaults

func (*MockConnectionManager) Broadcast added in v1.0.1

func (m *MockConnectionManager) Broadcast(ctx context.Context, connectionIDs []string, message interface{}) error

Broadcast implements ConnectionManager

func (*MockConnectionManager) CallCount added in v1.0.1

func (m *MockConnectionManager) CallCount(method string) int

Testing helpers

func (*MockConnectionManager) GetMetrics added in v1.0.1

func (m *MockConnectionManager) GetMetrics() map[string]interface{}

GetMetrics implements ConnectionManager

func (*MockConnectionManager) IsActive added in v1.0.1

func (m *MockConnectionManager) IsActive(ctx context.Context, connectionID string) bool

IsActive implements ConnectionManager

func (*MockConnectionManager) Reset added in v1.0.1

func (m *MockConnectionManager) Reset()

func (*MockConnectionManager) Send added in v1.0.1

func (m *MockConnectionManager) Send(ctx context.Context, connectionID string, message interface{}) error

Send implements ConnectionManager

func (*MockConnectionManager) SetLogger added in v1.0.1

func (m *MockConnectionManager) SetLogger(logger func(format string, args ...interface{}))

SetLogger implements ConnectionManager

func (*MockConnectionManager) Shutdown added in v1.0.1

func (m *MockConnectionManager) Shutdown(ctx context.Context) error

Shutdown implements ConnectionManager

type MockConnectionManagerTestify added in v1.0.1

type MockConnectionManagerTestify struct {
	mock.Mock
}

MockConnectionManagerTestify is a testify-based mock of ConnectionManager. Use this when you prefer testify's mocking style.

func (*MockConnectionManagerTestify) Broadcast added in v1.0.1

func (m *MockConnectionManagerTestify) Broadcast(ctx context.Context, connectionIDs []string, message interface{}) error

func (*MockConnectionManagerTestify) GetMetrics added in v1.0.1

func (m *MockConnectionManagerTestify) GetMetrics() map[string]interface{}

func (*MockConnectionManagerTestify) IsActive added in v1.0.1

func (m *MockConnectionManagerTestify) IsActive(ctx context.Context, connectionID string) bool

func (*MockConnectionManagerTestify) Send added in v1.0.1

func (m *MockConnectionManagerTestify) Send(ctx context.Context, connectionID string, message interface{}) error

func (*MockConnectionManagerTestify) SetLogger added in v1.0.1

func (m *MockConnectionManagerTestify) SetLogger(logger func(format string, args ...interface{}))

func (*MockConnectionManagerTestify) Shutdown added in v1.0.1

type PayloadTooLargeError added in v1.0.1

type PayloadTooLargeError struct {
	ConnectionID string
	PayloadSize  int
	MaxSize      int
	Message      string
}

PayloadTooLargeError indicates the payload exceeds size limits

func (PayloadTooLargeError) Error added in v1.0.1

func (e PayloadTooLargeError) Error() string

func (PayloadTooLargeError) ErrorCode added in v1.0.1

func (e PayloadTooLargeError) ErrorCode() string

func (PayloadTooLargeError) HTTPStatusCode added in v1.0.1

func (e PayloadTooLargeError) HTTPStatusCode() int

func (PayloadTooLargeError) IsRetryable added in v1.0.1

func (e PayloadTooLargeError) IsRetryable() bool

type ProgressReporterMock added in v1.0.1

type ProgressReporterMock struct {
	SendOnlyMock // Embed for Send functionality

	// IsActiveFunc allows custom behavior for IsActive
	IsActiveFunc func(ctx context.Context, connectionID string) bool

	// ActiveConnections tracks which connections are active
	ActiveConnections map[string]bool
}

ProgressReporterMock implements Send and IsActive for pkg/progress

func NewProgressReporterMock added in v1.0.1

func NewProgressReporterMock() *ProgressReporterMock

NewProgressReporterMock creates a mock suitable for progress reporting

func (*ProgressReporterMock) IsActive added in v1.0.1

func (m *ProgressReporterMock) IsActive(ctx context.Context, connectionID string) bool

IsActive implements the IsActive method needed by pkg/progress

func (*ProgressReporterMock) SetActive added in v1.0.1

func (m *ProgressReporterMock) SetActive(connectionID string, active bool)

SetActive sets a connection's active status

type RecordingMock added in v1.0.1

type RecordingMock struct {
	SendOnlyMock
	IsActiveCalls []string // Track IsActive calls

	// Override for IsActive
	IsActiveFunc func(ctx context.Context, connectionID string) bool
}

RecordingMock provides detailed recording of all method calls

func NewRecordingMock added in v1.0.1

func NewRecordingMock() *RecordingMock

NewRecordingMock creates a mock that records all interactions

func (*RecordingMock) GetIsActiveCalls added in v1.0.1

func (m *RecordingMock) GetIsActiveCalls() []string

GetIsActiveCalls returns all IsActive calls

func (*RecordingMock) IsActive added in v1.0.1

func (m *RecordingMock) IsActive(ctx context.Context, connectionID string) bool

IsActive records the call and returns result

type SendOnlyMock added in v1.0.1

type SendOnlyMock struct {
	// SendFunc allows custom behavior for Send method
	SendFunc func(ctx context.Context, connectionID string, message interface{}) error

	// Messages stores all sent messages for verification
	Messages map[string][]interface{}
	// contains filtered or unexported fields
}

SendOnlyMock implements just the Send method, suitable for packages that only need Send. This works with pkg/streamer's ConnectionManager interface.

func NewSendOnlyMock added in v1.0.1

func NewSendOnlyMock() *SendOnlyMock

NewSendOnlyMock creates a mock that only implements Send

func (*SendOnlyMock) GetMessages added in v1.0.1

func (m *SendOnlyMock) GetMessages(connectionID string) []interface{}

GetMessages returns all messages sent to a connection (thread-safe)

func (*SendOnlyMock) Reset added in v1.0.1

func (m *SendOnlyMock) Reset()

Reset clears all stored messages

func (*SendOnlyMock) Send added in v1.0.1

func (m *SendOnlyMock) Send(ctx context.Context, connectionID string, message interface{}) error

Send implements the minimal ConnectionManager.Send method

type TestableAPIGatewayClient added in v1.0.1

type TestableAPIGatewayClient struct {
	// contains filtered or unexported fields
}

TestableAPIGatewayClient is a configurable mock for complex test scenarios. This provides more control than the testify mock for simulating various conditions.

func NewTestableAPIGatewayClient added in v1.0.1

func NewTestableAPIGatewayClient() *TestableAPIGatewayClient

NewTestableAPIGatewayClient creates a new testable client

func (*TestableAPIGatewayClient) AddConnection added in v1.0.1

func (t *TestableAPIGatewayClient) AddConnection(id string, sourceIP string)

Configuration methods

func (*TestableAPIGatewayClient) Clear added in v1.0.1

func (t *TestableAPIGatewayClient) Clear()

func (*TestableAPIGatewayClient) DeleteConnection added in v1.0.1

func (t *TestableAPIGatewayClient) DeleteConnection(ctx context.Context, connectionID string) error

func (*TestableAPIGatewayClient) GetConnection added in v1.0.1

func (t *TestableAPIGatewayClient) GetConnection(ctx context.Context, connectionID string) (*ConnectionInfo, error)

func (*TestableAPIGatewayClient) GetMessageCount added in v1.0.1

func (t *TestableAPIGatewayClient) GetMessageCount(connectionID string) int

func (*TestableAPIGatewayClient) GetMessages added in v1.0.1

func (t *TestableAPIGatewayClient) GetMessages(connectionID string) [][]byte

func (*TestableAPIGatewayClient) PostToConnection added in v1.0.1

func (t *TestableAPIGatewayClient) PostToConnection(ctx context.Context, connectionID string, data []byte) error

func (*TestableAPIGatewayClient) SetError added in v1.0.1

func (t *TestableAPIGatewayClient) SetError(connectionID string, err error)

func (*TestableAPIGatewayClient) SetLatency added in v1.0.1

func (t *TestableAPIGatewayClient) SetLatency(d time.Duration)

func (*TestableAPIGatewayClient) SimulateGoneError added in v1.0.1

func (t *TestableAPIGatewayClient) SimulateGoneError(connectionID string)

func (*TestableAPIGatewayClient) SimulateThrottling added in v1.0.1

func (t *TestableAPIGatewayClient) SimulateThrottling(connectionID string, retryAfter int)

type TestifyMock added in v1.0.1

type TestifyMock struct {
	// contains filtered or unexported fields
}

TestifyMock is a minimal testify-based mock for simple cases

func (*TestifyMock) IsActive added in v1.0.1

func (m *TestifyMock) IsActive(ctx context.Context, connectionID string) bool

IsActive for testify mocks (if needed)

func (*TestifyMock) Send added in v1.0.1

func (m *TestifyMock) Send(ctx context.Context, connectionID string, message interface{}) error

Send for testify mocks

type ThrottlingError added in v1.0.1

type ThrottlingError struct {
	ConnectionID string
	RetryAfter   int // seconds
	Message      string
}

ThrottlingError indicates rate limit exceeded (429 Too Many Requests)

func (ThrottlingError) Error added in v1.0.1

func (e ThrottlingError) Error() string

func (ThrottlingError) ErrorCode added in v1.0.1

func (e ThrottlingError) ErrorCode() string

func (ThrottlingError) HTTPStatusCode added in v1.0.1

func (e ThrottlingError) HTTPStatusCode() int

func (ThrottlingError) IsRetryable added in v1.0.1

func (e ThrottlingError) IsRetryable() bool

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL