streamer

package
v1.0.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 11, 2025 License: Apache-2.0 Imports: 8 Imported by: 0

README

Streamer Router Package

The streamer package provides the core request routing functionality for the Streamer async request processing system. It handles WebSocket message routing, sync/async decision logic, request validation, and middleware support.

Overview

The router is designed to:

  • Receive WebSocket messages from API Gateway
  • Route requests to appropriate handlers based on action
  • Automatically decide between sync and async processing
  • Validate requests before processing
  • Support middleware for cross-cutting concerns
  • Handle errors gracefully with structured responses

Quick Start

import (
    "github.com/streamer/streamer/pkg/streamer"
)

// Create a router
router := streamer.NewRouter(requestStore, connectionManager)

// Register a handler
handler := streamer.NewEchoHandler()
router.Handle("echo", handler)

// Set async threshold (optional, default is 5 seconds)
router.SetAsyncThreshold(10 * time.Second)

// Process WebSocket events
err := router.Route(ctx, websocketEvent)

Core Concepts

Handlers

Handlers process incoming requests. They must implement the Handler interface:

type Handler interface {
    Validate(request *Request) error
    EstimatedDuration() time.Duration
    Process(ctx context.Context, request *Request) (*Result, error)
}
Sync vs Async Processing

The router automatically decides whether to process a request synchronously or asynchronously based on the handler's EstimatedDuration():

  • Sync: If duration < async threshold, process immediately and return response
  • Async: If duration >= async threshold, queue the request and return acknowledgment
Message Format
Request Message
{
    "action": "process-data",
    "id": "req-123",
    "payload": {
        "data": "example"
    },
    "metadata": {
        "user_id": "user-456"
    }
}
Response Types

Sync Response:

{
    "type": "response",
    "request_id": "req-123",
    "success": true,
    "data": {
        "result": "processed"
    }
}

Async Acknowledgment:

{
    "type": "acknowledgment",
    "request_id": "req-123",
    "status": "queued",
    "message": "Request queued for async processing"
}

Error Response:

{
    "type": "error",
    "error": {
        "code": "VALIDATION_ERROR",
        "message": "Invalid payload format",
        "details": {
            "field": "email"
        }
    }
}

Creating Handlers

Simple Handler

Use SimpleHandler for basic handlers:

handler := streamer.SimpleHandler("greet", func(ctx context.Context, req *streamer.Request) (*streamer.Result, error) {
    var payload struct {
        Name string `json:"name"`
    }
    
    if err := json.Unmarshal(req.Payload, &payload); err != nil {
        return nil, err
    }
    
    return &streamer.Result{
        RequestID: req.ID,
        Success:   true,
        Data: map[string]string{
            "message": fmt.Sprintf("Hello, %s!", payload.Name),
        },
    }, nil
})

router.Handle("greet", handler)
Custom Handler with Validation
type CreateUserHandler struct {
    streamer.BaseHandler
    userService UserService
}

func NewCreateUserHandler(userService UserService) *CreateUserHandler {
    h := &CreateUserHandler{
        BaseHandler: streamer.BaseHandler{
            EstimatedDuration: 200 * time.Millisecond,
        },
        userService: userService,
    }
    
    h.Validator = func(req *streamer.Request) error {
        var payload CreateUserPayload
        if err := json.Unmarshal(req.Payload, &payload); err != nil {
            return fmt.Errorf("invalid payload format")
        }
        
        if payload.Email == "" {
            return fmt.Errorf("email is required")
        }
        
        if !isValidEmail(payload.Email) {
            return fmt.Errorf("invalid email format")
        }
        
        return nil
    }
    
    return h
}

func (h *CreateUserHandler) Process(ctx context.Context, req *streamer.Request) (*streamer.Result, error) {
    var payload CreateUserPayload
    json.Unmarshal(req.Payload, &payload)
    
    user, err := h.userService.Create(ctx, payload)
    if err != nil {
        return nil, err
    }
    
    return &streamer.Result{
        RequestID: req.ID,
        Success:   true,
        Data:      user,
    }, nil
}
Handler with Progress Reporting

For async handlers that support progress updates:

type ReportGeneratorHandler struct {
    streamer.BaseHandler
}

func (h *ReportGeneratorHandler) ProcessWithProgress(
    ctx context.Context, 
    req *streamer.Request, 
    reporter streamer.ProgressReporter,
) (*streamer.Result, error) {
    // Parse request
    var params ReportParams
    json.Unmarshal(req.Payload, &params)
    
    // Step 1: Load data (30%)
    reporter.Report(10, "Loading data...")
    data := loadData(params)
    reporter.Report(30, "Data loaded")
    
    // Step 2: Process data (60%)
    reporter.Report(40, "Processing data...")
    processed := processData(data)
    reporter.Report(60, "Data processed")
    
    // Step 3: Generate report (100%)
    reporter.Report(70, "Generating report...")
    report := generateReport(processed)
    reporter.Report(100, "Report complete")
    
    return &streamer.Result{
        RequestID: req.ID,
        Success:   true,
        Data: map[string]string{
            "report_url": report.URL,
        },
    }, nil
}

Middleware

Middleware allows you to add cross-cutting concerns like logging, metrics, and authentication:

// Logging middleware
loggingMiddleware := func(next streamer.Handler) streamer.Handler {
    return streamer.NewHandlerFunc(
        func(ctx context.Context, req *streamer.Request) (*streamer.Result, error) {
            start := time.Now()
            log.Printf("Processing request: %s, action: %s", req.ID, req.Action)
            
            result, err := next.Process(ctx, req)
            
            duration := time.Since(start)
            if err != nil {
                log.Printf("Request failed: %s, duration: %v, error: %v", req.ID, duration, err)
            } else {
                log.Printf("Request completed: %s, duration: %v", req.ID, duration)
            }
            
            return result, err
        },
        next.EstimatedDuration(),
        next.Validate,
    )
}

// Authentication middleware
authMiddleware := func(next streamer.Handler) streamer.Handler {
    return streamer.NewHandlerFunc(
        func(ctx context.Context, req *streamer.Request) (*streamer.Result, error) {
            // Extract auth token from metadata
            token, ok := req.Metadata["auth_token"]
            if !ok {
                return nil, streamer.NewError(
                    streamer.ErrCodeUnauthorized, 
                    "Authentication required",
                )
            }
            
            // Validate token
            userID, err := validateToken(token)
            if err != nil {
                return nil, streamer.NewError(
                    streamer.ErrCodeUnauthorized,
                    "Invalid authentication token",
                )
            }
            
            // Add user ID to context
            ctx = context.WithValue(ctx, "user_id", userID)
            
            return next.Process(ctx, req)
        },
        next.EstimatedDuration(),
        next.Validate,
    )
}

// Apply middleware
router.SetMiddleware(loggingMiddleware, authMiddleware)

Error Handling

Use structured errors for consistent error responses:

// Create an error with details
err := streamer.NewError(
    streamer.ErrCodeValidation,
    "Invalid input data",
).WithDetail("field", "email").WithDetail("reason", "invalid format")

// Common error codes
const (
    ErrCodeValidation      = "VALIDATION_ERROR"
    ErrCodeNotFound        = "NOT_FOUND"
    ErrCodeUnauthorized    = "UNAUTHORIZED"
    ErrCodeInternalError   = "INTERNAL_ERROR"
    ErrCodeTimeout         = "TIMEOUT"
    ErrCodeRateLimited     = "RATE_LIMITED"
    ErrCodeInvalidAction   = "INVALID_ACTION"
)

Testing

The package includes comprehensive test utilities:

// Create mock dependencies
store := &MockRequestStore{}
connManager := NewMockConnectionManager()
router := streamer.NewRouter(store, connManager)

// Register handler
router.Handle("test", handler)

// Create test event
event := events.APIGatewayWebsocketProxyRequest{
    Body: `{"action": "test", "id": "test-123"}`,
    RequestContext: events.APIGatewayWebsocketProxyRequestContext{
        ConnectionID: "conn-123",
    },
}

// Route and verify
err := router.Route(context.Background(), event)

// Check responses
messages := connManager.Messages["conn-123"]
// ... verify messages

Best Practices

  1. Accurate Duration Estimates: Provide accurate EstimatedDuration() to ensure proper sync/async routing
  2. Comprehensive Validation: Validate requests thoroughly in the Validate() method
  3. Structured Errors: Use the provided error types for consistent error handling
  4. Context Propagation: Pass context through the entire request lifecycle
  5. Idempotent Handlers: Design handlers to be idempotent for retry safety
  6. Progress Reporting: For long operations, implement ProcessWithProgress
  7. Middleware Order: Apply middleware in the correct order (outer to inner)

Integration with Team 1

The router depends on interfaces that Team 1 will implement:

  • RequestStore: For queuing async requests to DynamoDB
  • ConnectionManager: For sending WebSocket messages via API Gateway

These interfaces allow for easy mocking and testing while Team 1 builds the infrastructure.

Documentation

Overview

Package streamer provides the core interfaces and types for async request processing

Index

Constants

View Source
const (
	ErrCodeValidation    = "VALIDATION_ERROR"
	ErrCodeNotFound      = "NOT_FOUND"
	ErrCodeUnauthorized  = "UNAUTHORIZED"
	ErrCodeInternalError = "INTERNAL_ERROR"
	ErrCodeTimeout       = "TIMEOUT"
	ErrCodeRateLimited   = "RATE_LIMITED"
	ErrCodeInvalidAction = "INVALID_ACTION"
)

Common error codes

Variables

This section is empty.

Functions

This section is empty.

Types

type BaseHandler

type BaseHandler struct {
	// contains filtered or unexported fields
}

BaseHandler provides common functionality for handlers

func (*BaseHandler) EstimatedDuration

func (h *BaseHandler) EstimatedDuration() time.Duration

EstimatedDuration returns the expected processing time

func (*BaseHandler) Validate

func (h *BaseHandler) Validate(req *Request) error

Validate validates the request

type ConnectionManager

type ConnectionManager interface {
	Send(ctx context.Context, connectionID string, message interface{}) error
}

ConnectionManager defines the interface for managing WebSocket connections

type DefaultRouter

type DefaultRouter struct {
	// contains filtered or unexported fields
}

DefaultRouter implements the Router interface

func NewRouter

func NewRouter(store RequestStore, connManager ConnectionManager) *DefaultRouter

NewRouter creates a new router instance

func (*DefaultRouter) Handle

func (r *DefaultRouter) Handle(action string, handler Handler) error

Handle registers a handler for a specific action

func (*DefaultRouter) Route

Route processes an incoming WebSocket event

func (*DefaultRouter) SetAsyncThreshold

func (r *DefaultRouter) SetAsyncThreshold(duration time.Duration)

SetAsyncThreshold sets the duration threshold for async processing

func (*DefaultRouter) SetMiddleware

func (r *DefaultRouter) SetMiddleware(middleware ...Middleware)

SetMiddleware adds middleware to the router

type DelayHandler

type DelayHandler struct {
	BaseHandler
	// contains filtered or unexported fields
}

DelayHandler simulates a long-running operation

func NewDelayHandler

func NewDelayHandler(delay time.Duration) *DelayHandler

NewDelayHandler creates a new delay handler

func (*DelayHandler) Process

func (h *DelayHandler) Process(ctx context.Context, req *Request) (*Result, error)

Process simulates processing with a delay

func (*DelayHandler) ProcessWithProgress

func (h *DelayHandler) ProcessWithProgress(ctx context.Context, req *Request, reporter ProgressReporter) (*Result, error)

ProcessWithProgress implements processing with progress updates

type EchoHandler

type EchoHandler struct {
	BaseHandler
}

EchoHandler echoes back the request payload

func NewEchoHandler

func NewEchoHandler() *EchoHandler

NewEchoHandler creates a new echo handler

func (*EchoHandler) Process

func (h *EchoHandler) Process(ctx context.Context, req *Request) (*Result, error)

Process echoes the request payload

type Error

type Error struct {
	Code    string                 `json:"code"`
	Message string                 `json:"message"`
	Details map[string]interface{} `json:"details,omitempty"`
}

Error represents a structured error response

func NewError

func NewError(code, message string) *Error

NewError creates a new Error instance

func (*Error) Error

func (e *Error) Error() string

Error implements the error interface

func (*Error) WithDetail

func (e *Error) WithDetail(key string, value interface{}) *Error

WithDetail adds a detail to the error

type Handler

type Handler interface {
	// Validate checks if the request is valid
	Validate(request *Request) error

	// EstimatedDuration returns the expected processing time
	// Used to determine if the request should be processed sync or async
	EstimatedDuration() time.Duration

	// Process executes the handler logic
	Process(ctx context.Context, request *Request) (*Result, error)
}

Handler defines the interface for request handlers

func NewHandlerFunc

func NewHandlerFunc(
	processFunc func(context.Context, *Request) (*Result, error),
	estimatedDuration time.Duration,
	validator func(*Request) error,
) Handler

NewHandlerFunc creates a new handler from a function

func SimpleHandler

func SimpleHandler(name string, processFunc func(context.Context, *Request) (*Result, error)) Handler

SimpleHandler creates a simple handler with minimal configuration

type HandlerFunc

type HandlerFunc struct {
	BaseHandler
	ProcessFunc func(context.Context, *Request) (*Result, error)
}

HandlerFunc is an adapter to allow the use of ordinary functions as handlers

func (*HandlerFunc) Process

func (h *HandlerFunc) Process(ctx context.Context, req *Request) (*Result, error)

Process executes the handler function

type HandlerWithProgress

type HandlerWithProgress interface {
	Handler

	// ProcessWithProgress executes the handler with progress reporting capability
	ProcessWithProgress(ctx context.Context, request *Request, reporter ProgressReporter) (*Result, error)
}

HandlerWithProgress extends Handler to support progress reporting

type Middleware

type Middleware func(Handler) Handler

Middleware defines a function that wraps handler execution

func LoggingMiddleware

func LoggingMiddleware(logger func(format string, args ...interface{})) Middleware

LoggingMiddleware adds logging to handler execution

type ProgressReporter

type ProgressReporter interface {
	// Report sends a progress update
	Report(percentage float64, message string) error

	// SetMetadata adds metadata to the progress update
	SetMetadata(key string, value interface{}) error
}

ProgressReporter allows handlers to report progress during async execution

type ProgressUpdate

type ProgressUpdate struct {
	RequestID  string                 `json:"request_id"`
	Percentage float64                `json:"percentage"`
	Message    string                 `json:"message"`
	Metadata   map[string]interface{} `json:"metadata,omitempty"`
	Timestamp  time.Time              `json:"timestamp"`
}

ProgressUpdate represents a progress notification for async operations

type Request

type Request struct {
	ID           string            `json:"id"`
	ConnectionID string            `json:"connection_id"`
	Action       string            `json:"action"`
	Payload      json.RawMessage   `json:"payload"`
	Metadata     map[string]string `json:"metadata,omitempty"`
	CreatedAt    time.Time         `json:"created_at"`
}

Request represents an incoming request from a WebSocket connection

func ConvertAsyncRequestToRequest

func ConvertAsyncRequestToRequest(asyncReq *store.AsyncRequest) (*Request, error)

ConvertAsyncRequestToRequest converts a store.AsyncRequest back to a Request

type RequestQueueAdapter

type RequestQueueAdapter struct {
	// contains filtered or unexported fields
}

RequestQueueAdapter adapts Team 1's RequestQueue to our RequestStore interface

func (*RequestQueueAdapter) CompleteRequest

func (a *RequestQueueAdapter) CompleteRequest(ctx context.Context, requestID string, result *Result) error

CompleteRequest marks a request as completed

func (*RequestQueueAdapter) Enqueue

func (a *RequestQueueAdapter) Enqueue(ctx context.Context, request *Request) error

Enqueue converts our Request to store.AsyncRequest and enqueues it

func (*RequestQueueAdapter) FailRequest

func (a *RequestQueueAdapter) FailRequest(ctx context.Context, requestID string, err error) error

FailRequest marks a request as failed

func (*RequestQueueAdapter) GetAsyncRequest

func (a *RequestQueueAdapter) GetAsyncRequest(ctx context.Context, requestID string) (*Request, error)

GetAsyncRequest retrieves an async request and converts it to Request

func (*RequestQueueAdapter) UpdateProgress

func (a *RequestQueueAdapter) UpdateProgress(ctx context.Context, requestID string, progress float64, message string) error

UpdateProgress updates the progress of an async request

type RequestStore

type RequestStore interface {
	Enqueue(ctx context.Context, request *Request) error
}

RequestStore defines the interface for storing async requests

func NewRequestQueueAdapter

func NewRequestQueueAdapter(queue store.RequestQueue) RequestStore

NewRequestQueueAdapter creates a new adapter instance

type Result

type Result struct {
	RequestID string            `json:"request_id"`
	Success   bool              `json:"success"`
	Data      interface{}       `json:"data,omitempty"`
	Error     *Error            `json:"error,omitempty"`
	Metadata  map[string]string `json:"metadata,omitempty"`
}

Result represents the response from processing a request

type Router

type Router interface {
	// Handle registers a handler for a specific action
	Handle(action string, handler Handler) error

	// Route processes an incoming WebSocket event
	Route(ctx context.Context, event events.APIGatewayWebsocketProxyRequest) error

	// SetAsyncThreshold sets the duration threshold for async processing
	SetAsyncThreshold(duration time.Duration)

	// SetMiddleware adds middleware to the router
	SetMiddleware(middleware ...Middleware)
}

Router handles incoming WebSocket messages and routes them to appropriate handlers

type ValidationExampleHandler

type ValidationExampleHandler struct {
	BaseHandler
}

ValidationExampleHandler demonstrates request validation

func NewValidationExampleHandler

func NewValidationExampleHandler() *ValidationExampleHandler

NewValidationExampleHandler creates a handler that validates specific payload fields

func (*ValidationExampleHandler) Process

func (h *ValidationExampleHandler) Process(ctx context.Context, req *Request) (*Result, error)

Process handles the validated request

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL