Documentation
¶
Overview ¶
Example ¶
Example demonstrates how to use the ConnectionManager
package main
import (
"context"
"errors"
"fmt"
"log"
"time"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/service/apigatewaymanagementapi"
"github.com/aws/aws-sdk-go-v2/service/dynamodb"
"github.com/pay-theory/streamer/internal/store"
"github.com/pay-theory/streamer/pkg/connection"
)
func main() {
// Load AWS configuration
cfg, err := config.LoadDefaultConfig(context.Background())
if err != nil {
log.Fatal(err)
}
// Create DynamoDB client for the store
dynamoClient := dynamodb.NewFromConfig(cfg)
// Create connection store
connStore := store.NewConnectionStore(dynamoClient, "streamer_connections")
// Create API Gateway Management API client
// The endpoint should be your WebSocket API endpoint
endpoint := "https://abc123.execute-api.us-east-1.amazonaws.com/production"
apiGatewayClient := apigatewaymanagementapi.NewFromConfig(cfg, func(o *apigatewaymanagementapi.Options) {
o.EndpointResolver = apigatewaymanagementapi.EndpointResolverFunc(func(region string, options apigatewaymanagementapi.EndpointResolverOptions) (aws.Endpoint, error) {
return aws.Endpoint{
URL: endpoint,
}, nil
})
})
// Create connection manager
manager := connection.NewManager(connStore, apiGatewayClient, endpoint)
// Optional: Set custom logger
manager.SetLogger(func(format string, args ...interface{}) {
log.Printf("[ConnectionManager] "+format, args...)
})
// Example 1: Send a message to a single connection
err = manager.Send(context.Background(), "connection-123", map[string]interface{}{
"type": "notification",
"data": map[string]string{
"message": "Hello from the server!",
"timestamp": time.Now().Format(time.RFC3339),
},
})
if err != nil {
switch {
case errors.Is(err, connection.ErrConnectionNotFound):
log.Printf("Connection not found")
case errors.Is(err, connection.ErrConnectionStale):
log.Printf("Connection is stale and has been removed")
default:
log.Printf("Failed to send message: %v", err)
}
}
// Example 2: Broadcast to multiple connections
connectionIDs := []string{"conn-1", "conn-2", "conn-3"}
err = manager.Broadcast(context.Background(), connectionIDs, map[string]interface{}{
"type": "broadcast",
"data": map[string]string{
"announcement": "System maintenance in 5 minutes",
},
})
if err != nil {
log.Printf("Broadcast had some failures: %v", err)
}
// Example 3: Check if a connection is active
if manager.IsActive(context.Background(), "connection-123") {
fmt.Println("Connection is active")
} else {
fmt.Println("Connection is not active")
}
}
Example (MockUsage) ¶
package main
import (
"context"
"fmt"
)
// Example_mockForTesting shows how Team 2 can mock the ConnectionManager for tests
type MockConnectionManager struct {
SendFunc func(ctx context.Context, connectionID string, message interface{}) error
calls []SendCall
}
type SendCall struct {
ConnectionID string
Message interface{}
}
func (m *MockConnectionManager) Send(ctx context.Context, connectionID string, message interface{}) error {
m.calls = append(m.calls, SendCall{
ConnectionID: connectionID,
Message: message,
})
if m.SendFunc != nil {
return m.SendFunc(ctx, connectionID, message)
}
return nil
}
func main() {
// In Team 2's tests:
mockManager := &MockConnectionManager{
SendFunc: func(ctx context.Context, connectionID string, message interface{}) error {
// Simulate successful send
return nil
},
}
// Use mock in router tests
// router := streamer.NewRouter(requestStore, mockManager)
// Verify calls were made
fmt.Printf("Number of sends: %d\n", len(mockManager.calls))
fmt.Printf("First call was to connection: %s\n", mockManager.calls[0].ConnectionID)
}
Example (WithRouter) ¶
Example_withRouter shows integration with Team 2's router
package main
import (
"context"
"log"
"github.com/pay-theory/streamer/pkg/connection"
)
func main() {
// Setup connection manager (as shown above)
var manager *connection.Manager // ... initialized as above
// This is how Team 2 would use it in their router
// The manager implements the ConnectionManager interface expected by the router:
//
// type ConnectionManager interface {
// Send(ctx context.Context, connectionID string, message interface{}) error
// }
// In router.go:
// router := streamer.NewRouter(requestStore, manager)
// When router needs to send a response:
response := map[string]interface{}{
"type": "response",
"request_id": "req-123",
"success": true,
"data": map[string]string{
"result": "Operation completed",
},
}
err := manager.Send(context.Background(), "connection-456", response)
if err != nil {
log.Printf("Failed to send response: %v", err)
}
}
Index ¶
- Variables
- func IsConnectionGone(err error) bool
- type BroadcastError
- type CircuitBreaker
- type ConnectionError
- type ConnectionGoneError
- type LatencyTracker
- type Manager
- func (m *Manager) Broadcast(ctx context.Context, connectionIDs []string, message interface{}) error
- func (m *Manager) GetMetrics() map[string]interface{}
- func (m *Manager) IsActive(ctx context.Context, connectionID string) bool
- func (m *Manager) Send(ctx context.Context, connectionID string, message interface{}) error
- func (m *Manager) SetLogger(logger func(format string, args ...interface{}))
- func (m *Manager) Shutdown(ctx context.Context) error
- type Metrics
Examples ¶
Constants ¶
This section is empty.
Variables ¶
var ( // ErrConnectionNotFound indicates the connection ID doesn't exist ErrConnectionNotFound = errors.New("connection not found") // ErrConnectionStale indicates the connection is no longer valid (410 Gone) ErrConnectionStale = errors.New("connection is stale") // ErrInvalidMessage indicates the message could not be marshaled ErrInvalidMessage = errors.New("invalid message format") // ErrBroadcastPartialFailure indicates some connections failed during broadcast ErrBroadcastPartialFailure = errors.New("broadcast partially failed") )
Common errors
Functions ¶
func IsConnectionGone ¶
IsConnectionGone checks if an error indicates the connection is gone
Types ¶
type BroadcastError ¶
type BroadcastError struct {
Failed []string // Connection IDs that failed
Errors []error // Corresponding errors
}
BroadcastError contains errors from broadcasting to multiple connections
func (*BroadcastError) Error ¶
func (e *BroadcastError) Error() string
type CircuitBreaker ¶
type CircuitBreaker struct {
// contains filtered or unexported fields
}
CircuitBreaker tracks connection health
func (*CircuitBreaker) CountOpen ¶
func (cb *CircuitBreaker) CountOpen() int
func (*CircuitBreaker) IsOpen ¶
func (cb *CircuitBreaker) IsOpen(connectionID string) bool
func (*CircuitBreaker) RecordFailure ¶
func (cb *CircuitBreaker) RecordFailure(connectionID string)
func (*CircuitBreaker) RecordSuccess ¶
func (cb *CircuitBreaker) RecordSuccess(connectionID string)
type ConnectionError ¶
ConnectionError represents a connection-specific error
func (*ConnectionError) Error ¶
func (e *ConnectionError) Error() string
func (*ConnectionError) Unwrap ¶
func (e *ConnectionError) Unwrap() error
type ConnectionGoneError ¶
type ConnectionGoneError struct {
ConnectionID string
}
ConnectionGoneError indicates a WebSocket connection no longer exists
func (*ConnectionGoneError) Error ¶
func (e *ConnectionGoneError) Error() string
type LatencyTracker ¶
type LatencyTracker struct {
// contains filtered or unexported fields
}
LatencyTracker tracks latency percentiles
func (*LatencyTracker) Percentile ¶
func (lt *LatencyTracker) Percentile(p float64) time.Duration
func (*LatencyTracker) Record ¶
func (lt *LatencyTracker) Record(d time.Duration)
type Manager ¶
type Manager struct {
// contains filtered or unexported fields
}
Manager handles WebSocket connections through API Gateway
func NewManager ¶
func NewManager(store store.ConnectionStore, apiGateway *apigatewaymanagementapi.Client, endpoint string) *Manager
NewManager creates a new connection manager
func (*Manager) GetMetrics ¶
GetMetrics returns current performance metrics
type Metrics ¶
type Metrics struct {
SendLatency *LatencyTracker
BroadcastLatency *LatencyTracker
ErrorsByType map[string]*atomic.Int64
ActiveSends *atomic.Int32
// contains filtered or unexported fields
}
Metrics holds performance metrics for the connection manager