Documentation
¶
Overview ¶
Package streamer provides WebSocket connection management for AWS API Gateway.
This package offers a clean, unified interface for managing WebSocket connections through AWS API Gateway Management API, with robust error handling and connection lifecycle management.
Basic Usage ¶
Create a client using NewClient:
client, err := streamer.NewClient(ctx, streamer.ClientConfig{
Endpoint: "https://abc123.execute-api.us-west-2.amazonaws.com/production",
Region: "us-west-2", // Optional: will be extracted from endpoint if omitted
})
if err != nil {
log.Fatal(err)
}
Send messages to connections:
err = client.PostToConnection(ctx, connectionID, data)
if err != nil {
if errors.Is(err, streamer.ErrConnectionGone) {
// Handle disconnected connection
}
}
Get connection information:
info, err := client.GetConnection(ctx, connectionID)
if err != nil {
log.Fatal(err)
}
fmt.Printf("Active: %v, Age: %v\n", info.IsActive(), info.Age())
Error Handling ¶
The package provides structured error types that match AWS API Gateway responses:
- GoneError (410): Connection no longer exists
- ForbiddenError (403): Operation not permitted
- PayloadTooLargeError (413): Message exceeds size limit
- ThrottlingError (429): Rate limit exceeded
- InternalServerError (500): AWS service error
All errors implement the APIError interface with HTTP status codes and retry information.
Integration with Lift ¶
This package integrates seamlessly with Lift's WebSocket context:
func handler(ctx *lift.Context) error {
wsCtx, err := ctx.AsWebSocket()
if err != nil {
return err
}
// Create streamer client from WebSocket context
client, err := streamer.NewClient(ctx.Context, streamer.ClientConfig{
Endpoint: wsCtx.ManagementEndpoint(),
Region: wsCtx.GetRegion(),
})
if err != nil {
return err
}
return client.PostToConnection(ctx.Context, connectionID, data)
}
Region Resolution ¶
The client automatically resolves the AWS region from multiple sources:
- Explicit cfg.Region parameter
- Region extracted from endpoint URL (e.g., "us-west-2" from execute-api URL)
- AWS_REGION environment variable
- AWS_DEFAULT_REGION environment variable
- Default fallback to "us-east-1"
This ensures correct SigV4 request signing even when the region is not explicitly provided.
Index ¶
- Variables
- type APIError
- type AWSClient
- func (c *AWSClient) DeleteConnection(ctx context.Context, connectionID string) error
- func (c *AWSClient) Endpoint() string
- func (c *AWSClient) GetConnection(ctx context.Context, connectionID string) (*ConnectionInfo, error)
- func (c *AWSClient) PostToConnection(ctx context.Context, connectionID string, data []byte) error
- func (c *AWSClient) Region() string
- type Client
- type ClientConfig
- type ConnectionInfo
- type ConnectionState
- type ForbiddenError
- type GoneError
- type InternalServerError
- type PayloadTooLargeError
- type ThrottlingError
Constants ¶
This section is empty.
Variables ¶
var ( ErrConnectionGone = errors.New("connection no longer exists") ErrForbidden = errors.New("operation forbidden") ErrPayloadTooLarge = errors.New("payload exceeds maximum size") ErrThrottled = errors.New("request throttled") ErrInternalServer = errors.New("internal server error") ErrInvalidConnection = errors.New("invalid connection ID") )
Sentinel errors for common conditions
Functions ¶
This section is empty.
Types ¶
type AWSClient ¶
type AWSClient struct {
// contains filtered or unexported fields
}
AWSClient implements the Client interface using AWS SDK v2.
func NewClient ¶
func NewClient(ctx context.Context, cfg ClientConfig) (*AWSClient, error)
NewClient creates a new WebSocket client with the given configuration.
func (*AWSClient) DeleteConnection ¶
DeleteConnection forcefully disconnects a WebSocket connection.
func (*AWSClient) GetConnection ¶
func (c *AWSClient) GetConnection(ctx context.Context, connectionID string) (*ConnectionInfo, error)
GetConnection retrieves information about a WebSocket connection.
func (*AWSClient) PostToConnection ¶
PostToConnection sends data to a specific WebSocket connection.
type Client ¶
type Client interface {
// PostToConnection sends data to a specific WebSocket connection.
PostToConnection(ctx context.Context, connectionID string, data []byte) error
// DeleteConnection forcefully disconnects a WebSocket connection.
DeleteConnection(ctx context.Context, connectionID string) error
// GetConnection retrieves information about a WebSocket connection.
GetConnection(ctx context.Context, connectionID string) (*ConnectionInfo, error)
}
Client defines the interface for WebSocket connection management.
type ClientConfig ¶
ClientConfig holds configuration for creating a new client.
type ConnectionInfo ¶
type ConnectionInfo struct {
Identity map[string]any
ConnectedAt time.Time
LastActiveAt time.Time
ConnectionID string
SourceIP string
UserAgent string
}
ConnectionInfo represents information about a WebSocket connection.
func (ConnectionInfo) Age ¶
func (c ConnectionInfo) Age() time.Duration
Age returns how long the connection has been alive.
func (ConnectionInfo) IdleDuration ¶
func (c ConnectionInfo) IdleDuration() time.Duration
IdleDuration returns how long the connection has been idle.
func (ConnectionInfo) IsActive ¶
func (c ConnectionInfo) IsActive() bool
IsActive returns true if the connection is in an active state.
type ConnectionState ¶
type ConnectionState string
ConnectionState represents the state of a WebSocket connection.
const ( // ConnectionStateActive indicates an active, connected WebSocket ConnectionStateActive ConnectionState = "ACTIVE" // ConnectionStateDisconnected indicates a disconnected WebSocket ConnectionStateDisconnected ConnectionState = "DISCONNECTED" // ConnectionStateStale indicates a connection that has expired ConnectionStateStale ConnectionState = "STALE" )
type ForbiddenError ¶
ForbiddenError indicates the operation is not permitted (403).
func (ForbiddenError) Error ¶
func (e ForbiddenError) Error() string
func (ForbiddenError) ErrorCode ¶
func (e ForbiddenError) ErrorCode() string
func (ForbiddenError) HTTPStatusCode ¶
func (e ForbiddenError) HTTPStatusCode() int
func (ForbiddenError) IsRetryable ¶
func (e ForbiddenError) IsRetryable() bool
func (ForbiddenError) Unwrap ¶
func (e ForbiddenError) Unwrap() error
type GoneError ¶
GoneError indicates a connection no longer exists (410).
func (GoneError) HTTPStatusCode ¶
func (GoneError) IsRetryable ¶
type InternalServerError ¶
type InternalServerError struct {
Message string
}
InternalServerError indicates an AWS service error (500).
func (InternalServerError) Error ¶
func (e InternalServerError) Error() string
func (InternalServerError) ErrorCode ¶
func (e InternalServerError) ErrorCode() string
func (InternalServerError) HTTPStatusCode ¶
func (e InternalServerError) HTTPStatusCode() int
func (InternalServerError) IsRetryable ¶
func (e InternalServerError) IsRetryable() bool
func (InternalServerError) Unwrap ¶
func (e InternalServerError) Unwrap() error
type PayloadTooLargeError ¶
PayloadTooLargeError indicates the message payload exceeds the maximum size (413).
func (PayloadTooLargeError) Error ¶
func (e PayloadTooLargeError) Error() string
func (PayloadTooLargeError) ErrorCode ¶
func (e PayloadTooLargeError) ErrorCode() string
func (PayloadTooLargeError) HTTPStatusCode ¶
func (e PayloadTooLargeError) HTTPStatusCode() int
func (PayloadTooLargeError) IsRetryable ¶
func (e PayloadTooLargeError) IsRetryable() bool
func (PayloadTooLargeError) Unwrap ¶
func (e PayloadTooLargeError) Unwrap() error
type ThrottlingError ¶
type ThrottlingError struct {
ConnectionID string
Message string
RetryAfter int // Seconds to wait before retrying
}
ThrottlingError indicates the request is being rate limited (429).
func (ThrottlingError) Error ¶
func (e ThrottlingError) Error() string
func (ThrottlingError) ErrorCode ¶
func (e ThrottlingError) ErrorCode() string
func (ThrottlingError) HTTPStatusCode ¶
func (e ThrottlingError) HTTPStatusCode() int
func (ThrottlingError) IsRetryable ¶
func (e ThrottlingError) IsRetryable() bool
func (ThrottlingError) Unwrap ¶
func (e ThrottlingError) Unwrap() error