streamer

package
v1.0.78 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 15, 2025 License: Apache-2.0 Imports: 10 Imported by: 0

Documentation

Overview

Package streamer provides WebSocket connection management for AWS API Gateway.

This package offers a clean, unified interface for managing WebSocket connections through AWS API Gateway Management API, with robust error handling and connection lifecycle management.

Basic Usage

Create a client using NewClient:

client, err := streamer.NewClient(ctx, streamer.ClientConfig{
	Endpoint: "https://abc123.execute-api.us-west-2.amazonaws.com/production",
	Region:   "us-west-2", // Optional: will be extracted from endpoint if omitted
})
if err != nil {
	log.Fatal(err)
}

Send messages to connections:

err = client.PostToConnection(ctx, connectionID, data)
if err != nil {
	if errors.Is(err, streamer.ErrConnectionGone) {
		// Handle disconnected connection
	}
}

Get connection information:

info, err := client.GetConnection(ctx, connectionID)
if err != nil {
	log.Fatal(err)
}
fmt.Printf("Active: %v, Age: %v\n", info.IsActive(), info.Age())

Error Handling

The package provides structured error types that match AWS API Gateway responses:

  • GoneError (410): Connection no longer exists
  • ForbiddenError (403): Operation not permitted
  • PayloadTooLargeError (413): Message exceeds size limit
  • ThrottlingError (429): Rate limit exceeded
  • InternalServerError (500): AWS service error

All errors implement the APIError interface with HTTP status codes and retry information.

Integration with Lift

This package integrates seamlessly with Lift's WebSocket context:

func handler(ctx *lift.Context) error {
	wsCtx, err := ctx.AsWebSocket()
	if err != nil {
		return err
	}

	// Create streamer client from WebSocket context
	client, err := streamer.NewClient(ctx.Context, streamer.ClientConfig{
		Endpoint: wsCtx.ManagementEndpoint(),
		Region:   wsCtx.GetRegion(),
	})
	if err != nil {
		return err
	}

	return client.PostToConnection(ctx.Context, connectionID, data)
}

Region Resolution

The client automatically resolves the AWS region from multiple sources:

  1. Explicit cfg.Region parameter
  2. Region extracted from endpoint URL (e.g., "us-west-2" from execute-api URL)
  3. AWS_REGION environment variable
  4. AWS_DEFAULT_REGION environment variable
  5. Default fallback to "us-east-1"

This ensures correct SigV4 request signing even when the region is not explicitly provided.

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrConnectionGone    = errors.New("connection no longer exists")
	ErrForbidden         = errors.New("operation forbidden")
	ErrPayloadTooLarge   = errors.New("payload exceeds maximum size")
	ErrThrottled         = errors.New("request throttled")
	ErrInternalServer    = errors.New("internal server error")
	ErrInvalidConnection = errors.New("invalid connection ID")
)

Sentinel errors for common conditions

Functions

This section is empty.

Types

type APIError

type APIError interface {
	error
	HTTPStatusCode() int
	ErrorCode() string
	IsRetryable() bool
}

APIError represents an error from the API Gateway Management API.

type AWSClient

type AWSClient struct {
	// contains filtered or unexported fields
}

AWSClient implements the Client interface using AWS SDK v2.

func NewClient

func NewClient(ctx context.Context, cfg ClientConfig) (*AWSClient, error)

NewClient creates a new WebSocket client with the given configuration.

func (*AWSClient) DeleteConnection

func (c *AWSClient) DeleteConnection(ctx context.Context, connectionID string) error

DeleteConnection forcefully disconnects a WebSocket connection.

func (*AWSClient) Endpoint

func (c *AWSClient) Endpoint() string

Endpoint returns the API Gateway endpoint.

func (*AWSClient) GetConnection

func (c *AWSClient) GetConnection(ctx context.Context, connectionID string) (*ConnectionInfo, error)

GetConnection retrieves information about a WebSocket connection.

func (*AWSClient) PostToConnection

func (c *AWSClient) PostToConnection(ctx context.Context, connectionID string, data []byte) error

PostToConnection sends data to a specific WebSocket connection.

func (*AWSClient) Region

func (c *AWSClient) Region() string

Region returns the AWS region.

type Client

type Client interface {
	// PostToConnection sends data to a specific WebSocket connection.
	PostToConnection(ctx context.Context, connectionID string, data []byte) error

	// DeleteConnection forcefully disconnects a WebSocket connection.
	DeleteConnection(ctx context.Context, connectionID string) error

	// GetConnection retrieves information about a WebSocket connection.
	GetConnection(ctx context.Context, connectionID string) (*ConnectionInfo, error)
}

Client defines the interface for WebSocket connection management.

type ClientConfig

type ClientConfig struct {
	AWSConfig *aws.Config
	Endpoint  string
	Region    string
}

ClientConfig holds configuration for creating a new client.

type ConnectionInfo

type ConnectionInfo struct {
	Identity     map[string]any
	ConnectedAt  time.Time
	LastActiveAt time.Time
	ConnectionID string
	SourceIP     string
	UserAgent    string
}

ConnectionInfo represents information about a WebSocket connection.

func (ConnectionInfo) Age

func (c ConnectionInfo) Age() time.Duration

Age returns how long the connection has been alive.

func (ConnectionInfo) IdleDuration

func (c ConnectionInfo) IdleDuration() time.Duration

IdleDuration returns how long the connection has been idle.

func (ConnectionInfo) IsActive

func (c ConnectionInfo) IsActive() bool

IsActive returns true if the connection is in an active state.

type ConnectionState

type ConnectionState string

ConnectionState represents the state of a WebSocket connection.

const (
	// ConnectionStateActive indicates an active, connected WebSocket
	ConnectionStateActive ConnectionState = "ACTIVE"

	// ConnectionStateDisconnected indicates a disconnected WebSocket
	ConnectionStateDisconnected ConnectionState = "DISCONNECTED"

	// ConnectionStateStale indicates a connection that has expired
	ConnectionStateStale ConnectionState = "STALE"
)

type ForbiddenError

type ForbiddenError struct {
	ConnectionID string
	Message      string
}

ForbiddenError indicates the operation is not permitted (403).

func (ForbiddenError) Error

func (e ForbiddenError) Error() string

func (ForbiddenError) ErrorCode

func (e ForbiddenError) ErrorCode() string

func (ForbiddenError) HTTPStatusCode

func (e ForbiddenError) HTTPStatusCode() int

func (ForbiddenError) IsRetryable

func (e ForbiddenError) IsRetryable() bool

func (ForbiddenError) Unwrap

func (e ForbiddenError) Unwrap() error

type GoneError

type GoneError struct {
	ConnectionID string
	Message      string
}

GoneError indicates a connection no longer exists (410).

func (GoneError) Error

func (e GoneError) Error() string

func (GoneError) ErrorCode

func (e GoneError) ErrorCode() string

func (GoneError) HTTPStatusCode

func (e GoneError) HTTPStatusCode() int

func (GoneError) IsRetryable

func (e GoneError) IsRetryable() bool

func (GoneError) Unwrap

func (e GoneError) Unwrap() error

type InternalServerError

type InternalServerError struct {
	Message string
}

InternalServerError indicates an AWS service error (500).

func (InternalServerError) Error

func (e InternalServerError) Error() string

func (InternalServerError) ErrorCode

func (e InternalServerError) ErrorCode() string

func (InternalServerError) HTTPStatusCode

func (e InternalServerError) HTTPStatusCode() int

func (InternalServerError) IsRetryable

func (e InternalServerError) IsRetryable() bool

func (InternalServerError) Unwrap

func (e InternalServerError) Unwrap() error

type PayloadTooLargeError

type PayloadTooLargeError struct {
	ConnectionID string
	Message      string
	PayloadSize  int
	MaxSize      int
}

PayloadTooLargeError indicates the message payload exceeds the maximum size (413).

func (PayloadTooLargeError) Error

func (e PayloadTooLargeError) Error() string

func (PayloadTooLargeError) ErrorCode

func (e PayloadTooLargeError) ErrorCode() string

func (PayloadTooLargeError) HTTPStatusCode

func (e PayloadTooLargeError) HTTPStatusCode() int

func (PayloadTooLargeError) IsRetryable

func (e PayloadTooLargeError) IsRetryable() bool

func (PayloadTooLargeError) Unwrap

func (e PayloadTooLargeError) Unwrap() error

type ThrottlingError

type ThrottlingError struct {
	ConnectionID string
	Message      string
	RetryAfter   int // Seconds to wait before retrying
}

ThrottlingError indicates the request is being rate limited (429).

func (ThrottlingError) Error

func (e ThrottlingError) Error() string

func (ThrottlingError) ErrorCode

func (e ThrottlingError) ErrorCode() string

func (ThrottlingError) HTTPStatusCode

func (e ThrottlingError) HTTPStatusCode() int

func (ThrottlingError) IsRetryable

func (e ThrottlingError) IsRetryable() bool

func (ThrottlingError) Unwrap

func (e ThrottlingError) Unwrap() error

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL