roratelimit

package module
v0.0.0-...-a6ee939 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 10, 2026 License: Apache-2.0 Imports: 2 Imported by: 0

README

Native Rate Limiter Plugin

The native rate limiter plugin provides operators for rate limiting using Go's built-in time-based windowing.

Installation

go get github.com/samber/ro/plugins/ratelimit/native

Operators

NewRateLimiter

Creates a rate limiter operator that limits values based on a count and time interval.

import (
    "time"
    "github.com/samber/ro"
    roratelimit "github.com/samber/ro/plugins/ratelimit/native"
)

// Create a rate limiter: 5 items per second per key
observable := ro.Pipe1(
    ro.Just("user1", "user2", "user1", "user3", "user1", "user2", "user1"),
    roratelimit.NewRateLimiter[string](5, time.Second, func(userID string) string {
        return userID
    }),
)

subscription := observable.Subscribe(ro.PrintObserver[string]())
defer subscription.Unsubscribe()

Parameters

Count

The maximum number of items allowed in the time window:

// Allow 10 items per window
observable := ro.Pipe1(
    ro.Just("item1", "item2", "item3", "item4", "item5"),
    roratelimit.NewRateLimiter[string](10, time.Second, func(item string) string {
        return "default"
    }),
)
Interval

The time window for rate limiting:

// 5 items per second
observable := ro.Pipe1(
    ro.Just("item1", "item2", "item3", "item4", "item5"),
    roratelimit.NewRateLimiter[string](5, time.Second, func(item string) string {
        return "default"
    }),
)

// 100 items per minute
observable := ro.Pipe1(
    ro.Just("item1", "item2", "item3", "item4", "item5"),
    roratelimit.NewRateLimiter[string](100, time.Minute, func(item string) string {
        return "default"
    }),
)

// 1000 items per hour
observable := ro.Pipe1(
    ro.Just("item1", "item2", "item3", "item4", "item5"),
    roratelimit.NewRateLimiter[string](1000, time.Hour, func(item string) string {
        return "default"
    }),
)
Key Function

A function that extracts the key for rate limiting:

type Request struct {
    UserID string
    Action string
    Data   string
}

// Rate limit by user ID
observable := ro.Pipe1(
    ro.Just(
        Request{UserID: "user1", Action: "login", Data: "data1"},
        Request{UserID: "user2", Action: "login", Data: "data2"},
        Request{UserID: "user1", Action: "logout", Data: "data3"},
    ),
    roratelimit.NewRateLimiter[Request](3, time.Minute, func(req Request) string {
        return req.UserID
    }),
)

Key Generation Strategies

User-based Rate Limiting
type Request struct {
    UserID string
    Action string
    Data   string
}

observable := ro.Pipe1(
    ro.Just(
        Request{UserID: "user1", Action: "login", Data: "data1"},
        Request{UserID: "user2", Action: "login", Data: "data2"},
        Request{UserID: "user1", Action: "logout", Data: "data3"},
    ),
    roratelimit.NewRateLimiter[Request](5, time.Minute, func(req Request) string {
        return req.UserID
    }),
)
IP-based Rate Limiting
type APIRequest struct {
    IPAddress string
    Endpoint  string
    Method    string
}

observable := ro.Pipe1(
    ro.Just(
        APIRequest{IPAddress: "192.168.1.1", Endpoint: "/api/users", Method: "GET"},
        APIRequest{IPAddress: "192.168.1.2", Endpoint: "/api/users", Method: "GET"},
        APIRequest{IPAddress: "192.168.1.1", Endpoint: "/api/posts", Method: "POST"},
    ),
    roratelimit.NewRateLimiter[APIRequest](10, time.Minute, func(req APIRequest) string {
        return req.IPAddress
    }),
)
Endpoint-based Rate Limiting
observable := ro.Pipe1(
    ro.Just(
        APIRequest{IPAddress: "192.168.1.1", Endpoint: "/api/users", Method: "GET"},
        APIRequest{IPAddress: "192.168.1.2", Endpoint: "/api/posts", Method: "GET"},
        APIRequest{IPAddress: "192.168.1.1", Endpoint: "/api/users", Method: "POST"},
    ),
    roratelimit.NewRateLimiter[APIRequest](2, time.Second, func(req APIRequest) string {
        return req.Endpoint
    }),
)
Composite Key Rate Limiting
observable := ro.Pipe1(
    ro.Just(
        APIRequest{IPAddress: "192.168.1.1", Endpoint: "/api/users", Method: "GET"},
        APIRequest{IPAddress: "192.168.1.2", Endpoint: "/api/users", Method: "GET"},
        APIRequest{IPAddress: "192.168.1.1", Endpoint: "/api/posts", Method: "GET"},
    ),
    roratelimit.NewRateLimiter[APIRequest](5, time.Minute, func(req APIRequest) string {
        return req.IPAddress + ":" + req.Endpoint
    }),
)

Real-world Example

Here's a practical example that rate limits API requests:

import (
    "time"
    "github.com/samber/ro"
    roratelimit "github.com/samber/ro/plugins/ratelimit/native"
)

type APIRequest struct {
    UserID   string
    Endpoint string
    Method   string
    Data     string
}

// Process API requests with rate limiting
pipeline := ro.Pipe2(
    // Simulate API requests
    ro.Just(
        APIRequest{UserID: "user1", Endpoint: "/api/users", Method: "GET", Data: "data1"},
        APIRequest{UserID: "user2", Endpoint: "/api/posts", Method: "GET", Data: "data2"},
        APIRequest{UserID: "user1", Endpoint: "/api/users", Method: "POST", Data: "data3"},
        APIRequest{UserID: "user3", Endpoint: "/api/comments", Method: "GET", Data: "data4"},
        APIRequest{UserID: "user1", Endpoint: "/api/users", Method: "PUT", Data: "data5"},
    ),
    // Apply rate limiting: 10 requests per minute per user
    roratelimit.NewRateLimiter[APIRequest](10, time.Minute, func(req APIRequest) string {
        return req.UserID
    }),
)

subscription := pipeline.Subscribe(
    ro.NewObserver(
        func(req APIRequest) {
            // Process rate-limited request
            // Only requests within rate limit will be processed
        },
        func(err error) {
            // Handle errors
        },
        func() {
            // Handle completion
        },
    ),
)
defer subscription.Unsubscribe()

Performance Considerations

  • The plugin uses Go's built-in time-based windowing for rate limiting
  • Rate limiting is applied per key (user, IP, endpoint, etc.)
  • Memory usage scales with the number of unique keys
  • The algorithm uses sliding windows for accurate rate limiting
  • Consider the count and interval for your use case
  • The plugin automatically handles rate limit checking and filtering
  • Choose appropriate key generation strategies for your application
  • This implementation is suitable for single-instance applications

Documentation

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

func NewRateLimiter

func NewRateLimiter[T any](count int64, interval time.Duration, keyGetter func(T) string) func(destination ro.Observable[T]) ro.Observable[T]

NewRateLimiter creates a rate limiter that allows count items per interval for each key. Play: https://go.dev/play/p/YNhnGgrMWmj

Example (Basic)
// Basic rate limiting: 3 items per second
observable := ro.Pipe1(
	ro.Just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10),
	NewRateLimiter[int](3, time.Second, func(v int) string {
		return "default"
	}),
)

values, err := ro.Collect(observable)
if err != nil {
	fmt.Printf("Error: %v\n", err)
	return
}

fmt.Printf("Rate limited values: %v\n", values)
Output:

Rate limited values: [1 2 3]
Example (CompositeKey)
type APIRequest struct {
	IPAddress string
	Endpoint  string
	Method    string
}

// Rate limit by IP + endpoint combination: 3 requests per minute per IP-endpoint pair
observable := ro.Pipe1(
	ro.Just(
		APIRequest{IPAddress: "192.168.1.1", Endpoint: "/api/users", Method: "GET"},
		APIRequest{IPAddress: "192.168.1.2", Endpoint: "/api/users", Method: "GET"},
		APIRequest{IPAddress: "192.168.1.1", Endpoint: "/api/posts", Method: "GET"},
		APIRequest{IPAddress: "192.168.1.1", Endpoint: "/api/users", Method: "POST"},
		APIRequest{IPAddress: "192.168.1.2", Endpoint: "/api/posts", Method: "GET"},
	),
	NewRateLimiter[APIRequest](3, time.Minute, func(req APIRequest) string {
		return req.IPAddress + ":" + req.Endpoint
	}),
)

values, err := ro.Collect(observable)
if err != nil {
	fmt.Printf("Error: %v\n", err)
	return
}

fmt.Printf("Composite key rate limited requests: %d\n", len(values))
Output:

Composite key rate limited requests: 5
Example (EndpointBased)
type APIRequest struct {
	IPAddress string
	Endpoint  string
	Method    string
}

// Rate limit by endpoint: 2 requests per second per endpoint
observable := ro.Pipe1(
	ro.Just(
		APIRequest{IPAddress: "192.168.1.1", Endpoint: "/api/users", Method: "GET"},
		APIRequest{IPAddress: "192.168.1.2", Endpoint: "/api/posts", Method: "GET"},
		APIRequest{IPAddress: "192.168.1.1", Endpoint: "/api/users", Method: "POST"},
		APIRequest{IPAddress: "192.168.1.3", Endpoint: "/api/comments", Method: "GET"},
		APIRequest{IPAddress: "192.168.1.1", Endpoint: "/api/posts", Method: "PUT"},
	),
	NewRateLimiter[APIRequest](2, time.Second, func(req APIRequest) string {
		return req.Endpoint
	}),
)

values, err := ro.Collect(observable)
if err != nil {
	fmt.Printf("Error: %v\n", err)
	return
}

fmt.Printf("Endpoint-based rate limited requests: %d\n", len(values))
Output:

Endpoint-based rate limited requests: 5
Example (IpBased)
type APIRequest struct {
	IPAddress string
	Endpoint  string
	Method    string
}

// Rate limit by IP address: 5 requests per minute per IP
observable := ro.Pipe1(
	ro.Just(
		APIRequest{IPAddress: "192.168.1.1", Endpoint: "/api/users", Method: "GET"},
		APIRequest{IPAddress: "192.168.1.2", Endpoint: "/api/posts", Method: "GET"},
		APIRequest{IPAddress: "192.168.1.1", Endpoint: "/api/users", Method: "POST"},
		APIRequest{IPAddress: "192.168.1.3", Endpoint: "/api/comments", Method: "GET"},
		APIRequest{IPAddress: "192.168.1.1", Endpoint: "/api/posts", Method: "PUT"},
	),
	NewRateLimiter[APIRequest](5, time.Minute, func(req APIRequest) string {
		return req.IPAddress
	}),
)

values, err := ro.Collect(observable)
if err != nil {
	fmt.Printf("Error: %v\n", err)
	return
}

fmt.Printf("IP-based rate limited requests: %d\n", len(values))
Output:

IP-based rate limited requests: 5
Example (RealWorld)
type LogEntry struct {
	UserID    string
	Action    string
	Timestamp time.Time
	Message   string
}

// Simulate log processing with rate limiting
// Limit: 100 logs per minute per user
logs := []LogEntry{
	{UserID: "user1", Action: "login", Timestamp: time.Now(), Message: "User logged in"},
	{UserID: "user2", Action: "login", Timestamp: time.Now(), Message: "User logged in"},
	{UserID: "user1", Action: "logout", Timestamp: time.Now(), Message: "User logged out"},
	{UserID: "user3", Action: "login", Timestamp: time.Now(), Message: "User logged in"},
	{UserID: "user1", Action: "profile", Timestamp: time.Now(), Message: "Profile updated"},
}

observable := ro.Pipe1(
	ro.Just(logs...),
	NewRateLimiter[LogEntry](100, time.Minute, func(log LogEntry) string {
		return log.UserID
	}),
)

values, err := ro.Collect(observable)
if err != nil {
	fmt.Printf("Error: %v\n", err)
	return
}

fmt.Printf("Processed log entries: %d\n", len(values))
Output:

Processed log entries: 5
Example (UserBased)
type UserRequest struct {
	UserID string
	Action string
	Data   string
}

// Rate limit by user ID: 2 requests per minute per user
observable := ro.Pipe1(
	ro.Just(
		UserRequest{UserID: "user1", Action: "login", Data: "data1"},
		UserRequest{UserID: "user2", Action: "login", Data: "data2"},
		UserRequest{UserID: "user1", Action: "logout", Data: "data3"},
		UserRequest{UserID: "user1", Action: "profile", Data: "data4"},
		UserRequest{UserID: "user3", Action: "login", Data: "data5"},
	),
	NewRateLimiter[UserRequest](2, time.Minute, func(req UserRequest) string {
		return req.UserID
	}),
)

values, err := ro.Collect(observable)
if err != nil {
	fmt.Printf("Error: %v\n", err)
	return
}

fmt.Printf("User-based rate limited requests: %d\n", len(values))
Output:

User-based rate limited requests: 4

Types

This section is empty.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL