r

package module
v0.0.0-...-ec76828 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 1, 2025 License: MIT Imports: 23 Imported by: 0

README

r

Documentation

Overview

Package r provides a high-performance web framework built on fasthttp

middleware.go

Package r provides a high-performance web framework built on fasthttp

File: route_metrics.go

wrap_handlers.go

File: ws_config.go

File: connection_cleanup.go

File: ws_read_pump.go

File: ws_write_message.go

File: ws_write_pump.go

Index

Constants

View Source
const (
	StateClosed   = iota // 0: Normal operation
	StateHalfOpen        // 1: Testing the waters
	StateOpen            // 2: Circuit is broken
)

Circuit breaker states.

Variables

View Source
var (
	ErrServerClosed = errors.New("server closed")
	ErrTimeout      = errors.New("timeout")
	ErrInvalidJSON  = errors.New("invalid JSON")
)
View Source
var (
	ErrBufferFull       = fmt.Errorf("message buffer is full")
	ErrConnectionClosed = fmt.Errorf("connection is closed")
	ErrRateLimited      = fmt.Errorf("rate limit exceeded")
)

Functions

func GetConnectionStats

func GetConnectionStats() map[string]interface{}

func NewWSConn

func NewWSConn(conn *websocket.Conn, logger Logger, handler WSHandler) *wsConnection

Update NewWSConn to properly initialize the connection

func UpdateRouteMetrics

func UpdateRouteMetrics(router *RouterImpl, method, path string, duration time.Duration, err error)

UpdateRouteMetrics updates metrics for the route identified by method and path. It increments call counts, error counts (if an error is passed), and updates the average latency using an exponential moving average. The function is modularized into helper functions for clarity and testability.

Types

type CORSConfig

type CORSConfig struct {
	Origins          []string
	AllowMethods     []string
	AllowHeaders     []string
	AllowCredentials bool
	MaxAge           int
}

type CircuitBreaker

type CircuitBreaker struct {
	// contains filtered or unexported fields
}

CircuitBreaker represents a stateful circuit breaker.

func NewCircuitBreaker

func NewCircuitBreaker(threshold int64, resetTimeout time.Duration, metrics MetricsCollector) *CircuitBreaker

NewCircuitBreaker constructs a new CircuitBreaker, starts its monitoring loop, and returns it.

func (*CircuitBreaker) Close

func (cb *CircuitBreaker) Close() error

Close stops the monitor loop and cleans up resources.

func (*CircuitBreaker) GetState

func (cb *CircuitBreaker) GetState() int32

GetState returns the current state.

func (*CircuitBreaker) GetStatus

func (cb *CircuitBreaker) GetStatus() CircuitBreakerStatus

GetStatus returns a snapshot of the circuit breaker's current status.

func (*CircuitBreaker) IsOpen

func (cb *CircuitBreaker) IsOpen() bool

IsOpen returns true if the circuit breaker is open. When in the open state, it may try to transition to half-open if the reset timeout has elapsed.

func (*CircuitBreaker) MonitorState

func (cb *CircuitBreaker) MonitorState()

MonitorState records current error rate and failure counts.

func (*CircuitBreaker) RecordFailure

func (cb *CircuitBreaker) RecordFailure()

RecordFailure increments the failure counter and, if the threshold is reached, transitions the breaker to the open state.

func (*CircuitBreaker) RecordSuccess

func (cb *CircuitBreaker) RecordSuccess()

RecordSuccess handles a successful request by updating counters and, if in half-open state, possibly transitioning to the closed state.

type CircuitBreakerConfig

type CircuitBreakerConfig struct {
	Threshold        int64
	ResetTimeout     time.Duration
	Timeout          time.Duration
	HalfOpenRequests int64
}

CircuitBreakerConfig holds configuration options for the circuit breaker.

type CircuitBreakerOption

type CircuitBreakerOption func(*CircuitBreakerConfig)

CircuitBreakerOption defines a function type to set circuit breaker configuration.

func WithFailureThreshold

func WithFailureThreshold(threshold int64) CircuitBreakerOption

WithFailureThreshold sets a custom failure threshold.

func WithResetTimeout

func WithResetTimeout(timeout time.Duration) CircuitBreakerOption

WithResetTimeout sets a custom reset timeout.

type CircuitBreakerStatus

type CircuitBreakerStatus struct {
	State           string    `json:"state"`
	Failures        int64     `json:"failures"`
	LastFailure     time.Time `json:"last_failure"`
	SuccessStreak   int64     `json:"success_streak"`
	TotalRequests   int64     `json:"total_requests"`
	ErrorPercentage float64   `json:"error_percentage"`
}

CircuitBreakerStatus contains a snapshot of the circuit breaker's metrics.

type Config

type Config struct {
	ReadTimeout        time.Duration
	WriteTimeout       time.Duration
	IdleTimeout        time.Duration
	MaxRequestBodySize int
	CertFile           string
	KeyFile            string
	TLSConfig          *tls.Config
	WSConfig           WSConfig
	Handler            Router // Add this field
}

Config holds server configuration

func DefaultConfig

func DefaultConfig() Config

DefaultConfig returns a Config with sensible defaults

type ConfigProvider

type ConfigProvider interface {
	GetConfig() Config
	SetConfig(Config) error
	LoadFromFile(path string) error
	LoadFromEnv() error
}

type ConnectionManager

type ConnectionManager struct {
	// contains filtered or unexported fields
}

func NewConnectionManager

func NewConnectionManager(maxConns int32, metrics MetricsCollector, logger Logger) *ConnectionManager

func (*ConnectionManager) Add

func (cm *ConnectionManager) Add(conn *wsConnection) error

func (*ConnectionManager) GetActiveConnections

func (cm *ConnectionManager) GetActiveConnections() int32

func (*ConnectionManager) Remove

func (cm *ConnectionManager) Remove(conn *wsConnection)

type Context

type Context interface {
	context.Context
	RequestCtx() *fasthttp.RequestCtx
	Param(name string) string
	QueryParam(name string) string
	JSON(code int, v interface{}) error
	String(code int, s string) error
	Stream(code int, contentType string, reader io.Reader) error
	Redirect(code int, url string) error
	SetHeader(key, value string)
	GetHeader(key string) string
	Cookie(name string) string
	SetCookie(cookie *fasthttp.Cookie)
	RequestID() string
	RealIP() string
	Path() string
	Method() string
	IsWebSocket() bool
	Next()
	Abort()
	AbortWithError(code int, err error)
	Error() error
	Set(key string, value interface{})
	Get(key string) (interface{}, bool)
	SetStatus(code int)
	ErrorResponse(code int, msg string)
}

Context represents the enhanced request context

func TestContextWithHandlers

func TestContextWithHandlers(c *routing.Context, handlers []HandlerFunc) Context

TestContextWithHandlers creates a test context with the specified handler chain

func TestContextWithTimeout

func TestContextWithTimeout(parent Context, timeout time.Duration) (Context, context.CancelFunc)

TestContextWithTimeout returns a context with timeout for testing

type ContextFactory

type ContextFactory interface {
	CreateContext(*fasthttp.RequestCtx) Context
	WithStore(store Store) ContextFactory
}

type ContextImpl

type ContextImpl struct {
	*routing.Context
	// contains filtered or unexported fields
}

ContextImpl implements the enhanced Context interface

func NewTestContext

func NewTestContext(c *routing.Context) *ContextImpl

NewTestContext creates a new context implementation for testing

func (*ContextImpl) Abort

func (c *ContextImpl) Abort()

func (*ContextImpl) AbortWithError

func (c *ContextImpl) AbortWithError(code int, err error)

func (*ContextImpl) AddSpan

func (c *ContextImpl) AddSpan(name string, metadata map[string]string)

func (*ContextImpl) Cleanup

func (c *ContextImpl) Cleanup()

func (*ContextImpl) Cookie

func (c *ContextImpl) Cookie(name string) string

func (*ContextImpl) Deadline

func (c *ContextImpl) Deadline() (deadline time.Time, ok bool)

Implement Context interface methods

func (*ContextImpl) Done

func (c *ContextImpl) Done() <-chan struct{}

func (*ContextImpl) EndSpan

func (c *ContextImpl) EndSpan(name string)

func (*ContextImpl) Err

func (c *ContextImpl) Err() error

func (*ContextImpl) Error

func (c *ContextImpl) Error() error

func (*ContextImpl) ErrorResponse

func (c *ContextImpl) ErrorResponse(code int, msg string)

func (*ContextImpl) Get

func (c *ContextImpl) Get(key string) (interface{}, bool)

Update Get method to use sync.Map methods

func (*ContextImpl) GetHeader

func (c *ContextImpl) GetHeader(key string) string

func (*ContextImpl) GetRequestID

func (c *ContextImpl) GetRequestID() string

func (*ContextImpl) GetSpans

func (c *ContextImpl) GetSpans() []*tracingSpan

func (*ContextImpl) GetTraceID

func (c *ContextImpl) GetTraceID() string

func (*ContextImpl) IsWebSocket

func (c *ContextImpl) IsWebSocket() bool

func (*ContextImpl) JSON

func (c *ContextImpl) JSON(code int, v interface{}) error

func (*ContextImpl) Method

func (c *ContextImpl) Method() string

func (*ContextImpl) Next

func (c *ContextImpl) Next()

func (*ContextImpl) Param

func (c *ContextImpl) Param(name string) string

func (*ContextImpl) Path

func (c *ContextImpl) Path() string

func (*ContextImpl) QueryParam

func (c *ContextImpl) QueryParam(name string) string

func (*ContextImpl) RealIP

func (c *ContextImpl) RealIP() string

func (*ContextImpl) Redirect

func (c *ContextImpl) Redirect(code int, url string) error

func (*ContextImpl) Request

func (c *ContextImpl) Request() *http.Request

func (*ContextImpl) RequestCtx

func (c *ContextImpl) RequestCtx() *fasthttp.RequestCtx

func (*ContextImpl) RequestID

func (c *ContextImpl) RequestID() string

func (*ContextImpl) Reset

func (c *ContextImpl) Reset()

func (*ContextImpl) Set

func (c *ContextImpl) Set(key string, value interface{})

func (*ContextImpl) SetCookie

func (c *ContextImpl) SetCookie(cookie *fasthttp.Cookie)

func (*ContextImpl) SetHeader

func (c *ContextImpl) SetHeader(key, value string)

func (*ContextImpl) SetStatus

func (c *ContextImpl) SetStatus(code int)

Add this helper method to the Context interface

func (*ContextImpl) Stream

func (c *ContextImpl) Stream(code int, contentType string, reader io.Reader) error

func (*ContextImpl) String

func (c *ContextImpl) String(code int, s string) error

func (*ContextImpl) Value

func (c *ContextImpl) Value(key interface{}) interface{}

func (*ContextImpl) WithTimeout

func (c *ContextImpl) WithTimeout(timeout time.Duration) (Context, context.CancelFunc)

type ErrorContext

type ErrorContext struct {
	Error     error
	Stack     string
	RequestID string
	Timestamp time.Time
	Path      string
	Method    string
	ClientIP  string
	UserAgent string
}

type ErrorHandler

type ErrorHandler interface {
	HandleError(Context, error)
	HandlePanic(Context, interface{})
	WithLogger(Logger) ErrorHandler
}

8. Error Handler Interface (new)

type HandlerFunc

type HandlerFunc = MiddlewareFunc

HandlerFunc defines a function to serve HTTP requests

type HealthCheck

type HealthCheck struct {
	Name     string
	Check    func() error
	Interval time.Duration
}

type HealthChecker

type HealthChecker interface {
	Check() error
	RegisterCheck(name string, check func() error)
	Start(context.Context) error
	Stop() error
}

10. Health Checker Interface (new)

type LogConfig

type LogConfig struct {
	Output      io.Writer
	FilePath    string
	JsonFormat  bool
	AsyncWrite  bool
	BufferSize  int
	MaxFileSize int
	MaxBackups  int
	AddSource   bool
	Metrics     bool
	Level       LogLevel
}

LogConfig defines configuration options for logging

type LogEntry

type LogEntry struct {
	Level      LogLevel       `json:"level"`
	Message    string         `json:"message"`
	Time       time.Time      `json:"time"`
	Fields     map[string]any `json:"fields,omitempty"`
	Method     string         `json:"method,omitempty"`
	Status     int            `json:"status,omitempty"`
	Latency    time.Duration  `json:"latency,omitempty"`
	IP         string         `json:"ip,omitempty"`
	Path       string         `json:"path,omitempty"`
	StackTrace string         `json:"stack_trace,omitempty"`
}

type LogLevel

type LogLevel int

LogLevel represents the severity of a log entry

const (
	DebugLevel LogLevel = iota
	InfoLevel
	WarnLevel
	ErrorLevel
	FatalLevel
)

type Logger

type Logger interface {
	WithField(key string, value interface{}) Logger
	WithFields(fields map[string]interface{}) Logger
	WithError(err error) Logger
	With(key string, value interface{}) Logger
	Configure(config LogConfig) error
	Log(method string, status int, latency time.Duration, ip, path string)
	Info(msg string, args ...interface{})
	Error(msg string, args ...interface{})
	Debug(msg string, args ...interface{})
	Warn(msg string, args ...interface{})
}

Logger interface defines common logging methods

func NewDefaultLogger

func NewDefaultLogger() Logger

func NewStructuredLogger

func NewStructuredLogger(output io.Writer, level LogLevel) Logger

type MessageBuffer

type MessageBuffer struct {
	// contains filtered or unexported fields
}

MessageBuffer implements a fixed-size circular buffer for WebSocket messages

func (*MessageBuffer) Len

func (b *MessageBuffer) Len() int

func (*MessageBuffer) NotEmpty

func (b *MessageBuffer) NotEmpty() <-chan struct{}

func (*MessageBuffer) NotFull

func (b *MessageBuffer) NotFull() <-chan struct{}

func (*MessageBuffer) Read

func (b *MessageBuffer) Read() ([]byte, error)

func (*MessageBuffer) Write

func (b *MessageBuffer) Write(data []byte) error

type MetricsCollector

type MetricsCollector interface {
	IncrementCounter(name string, tags map[string]string)
	RecordTiming(name string, duration time.Duration, tags map[string]string)
	RecordValue(name string, value float64, tags map[string]string)
	GetMetrics() map[string]interface{}
	Close() error
}

Add at the top with other type definitions

func NewDefaultMetricsCollector

func NewDefaultMetricsCollector() MetricsCollector

type MiddlewareFunc

type MiddlewareFunc func(Context)

MiddlewareFunc defines HTTP middleware

type MiddlewareOption

type MiddlewareOption func(*standardMiddleware)

func WithRateLimiter

func WithRateLimiter(rl RateLimiter) MiddlewareOption

WithRateLimiter sets a custom rate limiter implementation.

type MiddlewareProvider

type MiddlewareProvider interface {
	Logger(format string) MiddlewareFunc
	RateLimit(reqs int, per time.Duration) MiddlewareFunc
	Timeout(duration time.Duration) MiddlewareFunc
	Security() MiddlewareFunc
	Recovery(handler func(Context, interface{})) MiddlewareFunc
	Compression() MiddlewareFunc
	RequestID() MiddlewareFunc
	CORS(origins []string) MiddlewareFunc
	CircuitBreaker(opts ...CircuitBreakerOption) MiddlewareFunc
}

MiddlewareProvider defines the interface for middleware functionality

func NewMiddlewareProvider

func NewMiddlewareProvider(router *RouterImpl, opts ...MiddlewareOption) MiddlewareProvider

NewMiddlewareProvider creates a new middleware provider with optional dependencies

type PanicHandlerFunc

type PanicHandlerFunc func(Context, interface{})

type RateLimiter

type RateLimiter interface {
	Allow(key string) bool
	Reset(key string)
	SetDistributedClient(client RedisClient)
	GetQuota(key string) (remaining int, reset time.Time)
	GetDebugStats(key string) map[string]interface{} // Add this method

}

RateLimiter defines the interface for rate limiting strategies

func NewDefaultRateLimiter

func NewDefaultRateLimiter(ctx context.Context, reqs int, per time.Duration, opts ...RateLimiterOption) RateLimiter

NewDefaultRateLimiter creates a new rate limiter and starts its cleanup loop.

type RateLimiterOption

type RateLimiterOption func(*defaultRateLimiter)

func WithMetrics

func WithMetrics(metrics MetricsCollector) RateLimiterOption

type RedisClient

type RedisClient interface {
	Get(key string) (string, error)
	Set(key string, value string, expiration time.Duration) error
	IncrBy(key string, value int64) (int64, error)
	// Add new methods for rate limiting
	ZRemRangeByScore(key string, min, max string) error
	ZAdd(key string, score float64, member string) error
	ZCount(key string, min, max string) (int64, error)
}

type Route

type Route struct {
	Path        string
	Method      string
	Handler     string
	Description string
	Params      []RouteParam
	Added       time.Time
	Tags        []string
	// contains filtered or unexported fields
}

type RouteParam

type RouteParam struct {
	Name        string
	Type        string
	Required    bool
	Description string
	Validation  string // regex or validation rules
}

type RouteValidator

type RouteValidator interface {
	ValidateRoute(path, method string) error
	ValidateParams(params []RouteParam) error
}

type Router

type Router interface {
	GET(path string, handlers ...HandlerFunc) Router
	POST(path string, handlers ...HandlerFunc) Router
	PUT(path string, handlers ...HandlerFunc) Router
	DELETE(path string, handlers ...HandlerFunc) Router
	PATCH(path string, handlers ...HandlerFunc) Router
	HEAD(path string, handlers ...HandlerFunc) Router
	OPTIONS(path string, handlers ...HandlerFunc) Router
	WS(path string, handler WSHandler) Router
	Group(prefix string) Router
	Use(middleware ...MiddlewareFunc) Router
	Static(prefix, root string) Router
	FileServer(path, root string) Router
	NotFound(handler HandlerFunc)
	MethodNotAllowed(handler HandlerFunc)
	PanicHandler(handler PanicHandlerFunc)
	ServeHTTP(ctx *fasthttp.RequestCtx)
}

Router defines the interface for HTTP routing

func NewRouter

func NewRouter() Router

NewRouter creates a new Router instance

type RouterImpl

type RouterImpl struct {
	// contains filtered or unexported fields
}

RouterImpl implements the Router interface using fasthttp-routing

func (*RouterImpl) DELETE

func (r *RouterImpl) DELETE(path string, handlers ...HandlerFunc) Router

func (*RouterImpl) FileServer

func (r *RouterImpl) FileServer(path, root string) Router

func (*RouterImpl) GET

func (r *RouterImpl) GET(path string, handlers ...HandlerFunc) Router

Implement Router methods

func (*RouterImpl) GetRoutes

func (r *RouterImpl) GetRoutes() []*Route

Add helper methods for route introspection

func (*RouterImpl) Group

func (r *RouterImpl) Group(prefix string) Router

Group implements Router.Group

func (*RouterImpl) HEAD

func (r *RouterImpl) HEAD(path string, handlers ...HandlerFunc) Router

func (*RouterImpl) MethodNotAllowed

func (r *RouterImpl) MethodNotAllowed(handler HandlerFunc)

func (*RouterImpl) NotFound

func (r *RouterImpl) NotFound(handler HandlerFunc)

func (*RouterImpl) OPTIONS

func (r *RouterImpl) OPTIONS(path string, handlers ...HandlerFunc) Router

func (*RouterImpl) PATCH

func (r *RouterImpl) PATCH(path string, handlers ...HandlerFunc) Router

func (*RouterImpl) POST

func (r *RouterImpl) POST(path string, handlers ...HandlerFunc) Router

func (*RouterImpl) PUT

func (r *RouterImpl) PUT(path string, handlers ...HandlerFunc) Router

func (*RouterImpl) PanicHandler

func (r *RouterImpl) PanicHandler(handler PanicHandlerFunc)

func (*RouterImpl) ServeHTTP

func (r *RouterImpl) ServeHTTP(ctx *fasthttp.RequestCtx)

func (*RouterImpl) Static

func (r *RouterImpl) Static(prefix, root string) Router

func (*RouterImpl) Use

func (r *RouterImpl) Use(middleware ...MiddlewareFunc) Router

Use implements Router.Use

func (*RouterImpl) WS

func (r *RouterImpl) WS(path string, handler WSHandler) Router

Update the WS method in RouterImpl to handle large messages

type RouterProvider

type RouterProvider interface {
	Router
	WithMiddlewareProvider(provider MiddlewareProvider) RouterProvider
	WithErrorHandler(handler ErrorHandler) RouterProvider
	WithWSUpgrader(upgrader WSUpgrader) RouterProvider
}

type SecurityProvider

type SecurityProvider interface {
	SetHeaders(c Context)
}

SecurityProvider defines the interface for security header management

type Server

type Server interface {
	Start(address string) error
	Stop() error
	WithConfig(config Config) Server
	WithRouter(router Router) Server
	WithMiddleware(middleware ...MiddlewareFunc) Server
	ServeHTTP(ctx *fasthttp.RequestCtx)
}

Server defines the interface for the HTTP server

type ServerImpl

type ServerImpl struct {
	// contains filtered or unexported fields
}

func NewServer

func NewServer(config Config) *ServerImpl

func (*ServerImpl) BuildHandler

func (s *ServerImpl) BuildHandler() fasthttp.RequestHandler

BuildHandler creates the final fasthttp.RequestHandler that wraps the entire request lifecycle. It prepares the request context, delegates the request to the router, and then preserves the status code.

func (*ServerImpl) ServeHTTP

func (s *ServerImpl) ServeHTTP(ctx *fasthttp.RequestCtx)

func (*ServerImpl) Start

func (s *ServerImpl) Start(address string) error

func (*ServerImpl) Stop

func (s *ServerImpl) Stop() error

Stop refactored into smaller helper functions.

func (*ServerImpl) WithLogger

func (s *ServerImpl) WithLogger(logger Logger) *ServerImpl

func (*ServerImpl) WithMetricsCollector

func (s *ServerImpl) WithMetricsCollector(collector MetricsCollector) *ServerImpl

type Store

type Store interface {
	Get(key string) (interface{}, bool)
	Set(key string, value interface{})
	Delete(key string)
	Clear()
}

type WSConfig

type WSConfig struct {
	HandshakeTimeout  time.Duration
	ReadBufferSize    int
	WriteBufferSize   int
	EnableCompression bool
	Origins           []string
	Path              string
}

WSConfig holds WebSocket-specific configuration

type WSConnection

type WSConnection interface {
	WriteMessage(messageType int, data []byte) error
	Close() error
	ReadMessage() (messageType int, p []byte, err error)
	SetReadDeadline(t time.Time) error
	SetWriteDeadline(t time.Time) error
	ID() string
	RemoteAddr() string
	CloseCode() (int, string)
}

WSConnection defines the interface for WebSocket connections

type WSErrorHandler

type WSErrorHandler interface {
	OnError(conn WSConnection, err error)
}

type WSHandler

type WSHandler interface {
	OnConnect(conn WSConnection)
	OnMessage(conn WSConnection, msg []byte)
	OnClose(conn WSConnection)
}

WSHandler defines the interface for WebSocket event handling

type WSUpgrader

type WSUpgrader interface {
	Upgrade(*fasthttp.RequestCtx, WSHandler) (WSConnection, error)
	WithConfig(WSConfig) WSUpgrader
}

Directories

Path Synopsis
middleware command

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL