Documentation
¶
Overview ¶
Package streamer provides the core interfaces and types for async request processing
Index ¶
- Constants
- type BaseHandler
- type ConnectionManager
- type DefaultRouter
- func (r *DefaultRouter) Handle(action string, handler Handler) error
- func (r *DefaultRouter) Route(ctx context.Context, event events.APIGatewayWebsocketProxyRequest) error
- func (r *DefaultRouter) SetAsyncThreshold(duration time.Duration)
- func (r *DefaultRouter) SetMiddleware(middleware ...Middleware)
- type DelayHandler
- type EchoHandler
- type Error
- type Handler
- type HandlerFunc
- type HandlerWithProgress
- type Middleware
- type ProgressReporter
- type ProgressUpdate
- type Request
- type RequestQueueAdapter
- func (a *RequestQueueAdapter) CompleteRequest(ctx context.Context, requestID string, result *Result) error
- func (a *RequestQueueAdapter) Enqueue(ctx context.Context, request *Request) error
- func (a *RequestQueueAdapter) FailRequest(ctx context.Context, requestID string, err error) error
- func (a *RequestQueueAdapter) GetAsyncRequest(ctx context.Context, requestID string) (*Request, error)
- func (a *RequestQueueAdapter) UpdateProgress(ctx context.Context, requestID string, progress float64, message string) error
- type RequestStore
- type Result
- type Router
- type ValidationExampleHandler
Constants ¶
const ( ErrCodeValidation = "VALIDATION_ERROR" ErrCodeNotFound = "NOT_FOUND" ErrCodeInternalError = "INTERNAL_ERROR" ErrCodeTimeout = "TIMEOUT" ErrCodeRateLimited = "RATE_LIMITED" ErrCodeInvalidAction = "INVALID_ACTION" )
Common error codes
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type BaseHandler ¶
type BaseHandler struct {
// contains filtered or unexported fields
}
BaseHandler provides common functionality for handlers
func (*BaseHandler) EstimatedDuration ¶
func (h *BaseHandler) EstimatedDuration() time.Duration
EstimatedDuration returns the expected processing time
func (*BaseHandler) Validate ¶
func (h *BaseHandler) Validate(req *Request) error
Validate validates the request
type ConnectionManager ¶
type ConnectionManager interface {
Send(ctx context.Context, connectionID string, message interface{}) error
}
ConnectionManager defines the interface for managing WebSocket connections
type DefaultRouter ¶
type DefaultRouter struct {
// contains filtered or unexported fields
}
DefaultRouter implements the Router interface
func NewRouter ¶
func NewRouter(store RequestStore, connManager ConnectionManager) *DefaultRouter
NewRouter creates a new router instance
func (*DefaultRouter) Handle ¶
func (r *DefaultRouter) Handle(action string, handler Handler) error
Handle registers a handler for a specific action
func (*DefaultRouter) Route ¶
func (r *DefaultRouter) Route(ctx context.Context, event events.APIGatewayWebsocketProxyRequest) error
Route processes an incoming WebSocket event
func (*DefaultRouter) SetAsyncThreshold ¶
func (r *DefaultRouter) SetAsyncThreshold(duration time.Duration)
SetAsyncThreshold sets the duration threshold for async processing
func (*DefaultRouter) SetMiddleware ¶
func (r *DefaultRouter) SetMiddleware(middleware ...Middleware)
SetMiddleware adds middleware to the router
type DelayHandler ¶
type DelayHandler struct {
BaseHandler
// contains filtered or unexported fields
}
DelayHandler simulates a long-running operation
func NewDelayHandler ¶
func NewDelayHandler(delay time.Duration) *DelayHandler
NewDelayHandler creates a new delay handler
func (*DelayHandler) ProcessWithProgress ¶
func (h *DelayHandler) ProcessWithProgress(ctx context.Context, req *Request, reporter ProgressReporter) (*Result, error)
ProcessWithProgress implements processing with progress updates
type EchoHandler ¶
type EchoHandler struct {
BaseHandler
}
EchoHandler echoes back the request payload
type Error ¶
type Error struct {
Code string `json:"code"`
Message string `json:"message"`
Details map[string]interface{} `json:"details,omitempty"`
}
Error represents a structured error response
func (*Error) WithDetail ¶
WithDetail adds a detail to the error
type Handler ¶
type Handler interface {
// Validate checks if the request is valid
Validate(request *Request) error
// EstimatedDuration returns the expected processing time
// Used to determine if the request should be processed sync or async
EstimatedDuration() time.Duration
// Process executes the handler logic
Process(ctx context.Context, request *Request) (*Result, error)
}
Handler defines the interface for request handlers
type HandlerFunc ¶
type HandlerFunc struct {
BaseHandler
ProcessFunc func(context.Context, *Request) (*Result, error)
}
HandlerFunc is an adapter to allow the use of ordinary functions as handlers
type HandlerWithProgress ¶
type HandlerWithProgress interface {
Handler
// ProcessWithProgress executes the handler with progress reporting capability
ProcessWithProgress(ctx context.Context, request *Request, reporter ProgressReporter) (*Result, error)
}
HandlerWithProgress extends Handler to support progress reporting
type Middleware ¶
Middleware defines a function that wraps handler execution
func LoggingMiddleware ¶
func LoggingMiddleware(logger func(format string, args ...interface{})) Middleware
LoggingMiddleware adds logging to handler execution
type ProgressReporter ¶
type ProgressReporter interface {
// Report sends a progress update
Report(percentage float64, message string) error
// SetMetadata adds metadata to the progress update
SetMetadata(key string, value interface{}) error
}
ProgressReporter allows handlers to report progress during async execution
type ProgressUpdate ¶
type ProgressUpdate struct {
RequestID string `json:"request_id"`
Percentage float64 `json:"percentage"`
Message string `json:"message"`
Metadata map[string]interface{} `json:"metadata,omitempty"`
Timestamp time.Time `json:"timestamp"`
}
ProgressUpdate represents a progress notification for async operations
type Request ¶
type Request struct {
ID string `json:"id"`
ConnectionID string `json:"connection_id"`
Action string `json:"action"`
Payload json.RawMessage `json:"payload"`
Metadata map[string]string `json:"metadata,omitempty"`
CreatedAt time.Time `json:"created_at"`
}
Request represents an incoming request from a WebSocket connection
func ConvertAsyncRequestToRequest ¶
func ConvertAsyncRequestToRequest(asyncReq *store.AsyncRequest) (*Request, error)
ConvertAsyncRequestToRequest converts a store.AsyncRequest back to a Request
type RequestQueueAdapter ¶
type RequestQueueAdapter struct {
// contains filtered or unexported fields
}
RequestQueueAdapter adapts Team 1's RequestQueue to our RequestStore interface
func (*RequestQueueAdapter) CompleteRequest ¶
func (a *RequestQueueAdapter) CompleteRequest(ctx context.Context, requestID string, result *Result) error
CompleteRequest marks a request as completed
func (*RequestQueueAdapter) Enqueue ¶
func (a *RequestQueueAdapter) Enqueue(ctx context.Context, request *Request) error
Enqueue converts our Request to store.AsyncRequest and enqueues it
func (*RequestQueueAdapter) FailRequest ¶
FailRequest marks a request as failed
func (*RequestQueueAdapter) GetAsyncRequest ¶
func (a *RequestQueueAdapter) GetAsyncRequest(ctx context.Context, requestID string) (*Request, error)
GetAsyncRequest retrieves an async request and converts it to Request
func (*RequestQueueAdapter) UpdateProgress ¶
func (a *RequestQueueAdapter) UpdateProgress(ctx context.Context, requestID string, progress float64, message string) error
UpdateProgress updates the progress of an async request
type RequestStore ¶
RequestStore defines the interface for storing async requests
func NewRequestQueueAdapter ¶
func NewRequestQueueAdapter(queue store.RequestQueue) RequestStore
NewRequestQueueAdapter creates a new adapter instance
type Result ¶
type Result struct {
RequestID string `json:"request_id"`
Success bool `json:"success"`
Data interface{} `json:"data,omitempty"`
Error *Error `json:"error,omitempty"`
Metadata map[string]string `json:"metadata,omitempty"`
}
Result represents the response from processing a request
type Router ¶
type Router interface {
// Handle registers a handler for a specific action
Handle(action string, handler Handler) error
// Route processes an incoming WebSocket event
Route(ctx context.Context, event events.APIGatewayWebsocketProxyRequest) error
// SetAsyncThreshold sets the duration threshold for async processing
SetAsyncThreshold(duration time.Duration)
// SetMiddleware adds middleware to the router
SetMiddleware(middleware ...Middleware)
}
Router handles incoming WebSocket messages and routes them to appropriate handlers
type ValidationExampleHandler ¶
type ValidationExampleHandler struct {
BaseHandler
}
ValidationExampleHandler demonstrates request validation
func NewValidationExampleHandler ¶
func NewValidationExampleHandler() *ValidationExampleHandler
NewValidationExampleHandler creates a handler that validates specific payload fields