Documentation
¶
Index ¶
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func NewRateLimiter ¶
func NewRateLimiter[T any](count int64, interval time.Duration, keyGetter func(T) string) func(destination ro.Observable[T]) ro.Observable[T]
NewRateLimiter creates a rate limiter that allows count items per interval for each key. Play: https://go.dev/play/p/YNhnGgrMWmj
Example (Basic) ¶
// Basic rate limiting: 3 items per second
observable := ro.Pipe1(
ro.Just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10),
NewRateLimiter[int](3, time.Second, func(v int) string {
return "default"
}),
)
values, err := ro.Collect(observable)
if err != nil {
fmt.Printf("Error: %v\n", err)
return
}
fmt.Printf("Rate limited values: %v\n", values)
Output: Rate limited values: [1 2 3]
Example (CompositeKey) ¶
type APIRequest struct {
IPAddress string
Endpoint string
Method string
}
// Rate limit by IP + endpoint combination: 3 requests per minute per IP-endpoint pair
observable := ro.Pipe1(
ro.Just(
APIRequest{IPAddress: "192.168.1.1", Endpoint: "/api/users", Method: "GET"},
APIRequest{IPAddress: "192.168.1.2", Endpoint: "/api/users", Method: "GET"},
APIRequest{IPAddress: "192.168.1.1", Endpoint: "/api/posts", Method: "GET"},
APIRequest{IPAddress: "192.168.1.1", Endpoint: "/api/users", Method: "POST"},
APIRequest{IPAddress: "192.168.1.2", Endpoint: "/api/posts", Method: "GET"},
),
NewRateLimiter[APIRequest](3, time.Minute, func(req APIRequest) string {
return req.IPAddress + ":" + req.Endpoint
}),
)
values, err := ro.Collect(observable)
if err != nil {
fmt.Printf("Error: %v\n", err)
return
}
fmt.Printf("Composite key rate limited requests: %d\n", len(values))
Output: Composite key rate limited requests: 5
Example (EndpointBased) ¶
type APIRequest struct {
IPAddress string
Endpoint string
Method string
}
// Rate limit by endpoint: 2 requests per second per endpoint
observable := ro.Pipe1(
ro.Just(
APIRequest{IPAddress: "192.168.1.1", Endpoint: "/api/users", Method: "GET"},
APIRequest{IPAddress: "192.168.1.2", Endpoint: "/api/posts", Method: "GET"},
APIRequest{IPAddress: "192.168.1.1", Endpoint: "/api/users", Method: "POST"},
APIRequest{IPAddress: "192.168.1.3", Endpoint: "/api/comments", Method: "GET"},
APIRequest{IPAddress: "192.168.1.1", Endpoint: "/api/posts", Method: "PUT"},
),
NewRateLimiter[APIRequest](2, time.Second, func(req APIRequest) string {
return req.Endpoint
}),
)
values, err := ro.Collect(observable)
if err != nil {
fmt.Printf("Error: %v\n", err)
return
}
fmt.Printf("Endpoint-based rate limited requests: %d\n", len(values))
Output: Endpoint-based rate limited requests: 5
Example (IpBased) ¶
type APIRequest struct {
IPAddress string
Endpoint string
Method string
}
// Rate limit by IP address: 5 requests per minute per IP
observable := ro.Pipe1(
ro.Just(
APIRequest{IPAddress: "192.168.1.1", Endpoint: "/api/users", Method: "GET"},
APIRequest{IPAddress: "192.168.1.2", Endpoint: "/api/posts", Method: "GET"},
APIRequest{IPAddress: "192.168.1.1", Endpoint: "/api/users", Method: "POST"},
APIRequest{IPAddress: "192.168.1.3", Endpoint: "/api/comments", Method: "GET"},
APIRequest{IPAddress: "192.168.1.1", Endpoint: "/api/posts", Method: "PUT"},
),
NewRateLimiter[APIRequest](5, time.Minute, func(req APIRequest) string {
return req.IPAddress
}),
)
values, err := ro.Collect(observable)
if err != nil {
fmt.Printf("Error: %v\n", err)
return
}
fmt.Printf("IP-based rate limited requests: %d\n", len(values))
Output: IP-based rate limited requests: 5
Example (RealWorld) ¶
type LogEntry struct {
UserID string
Action string
Timestamp time.Time
Message string
}
// Simulate log processing with rate limiting
// Limit: 100 logs per minute per user
logs := []LogEntry{
{UserID: "user1", Action: "login", Timestamp: time.Now(), Message: "User logged in"},
{UserID: "user2", Action: "login", Timestamp: time.Now(), Message: "User logged in"},
{UserID: "user1", Action: "logout", Timestamp: time.Now(), Message: "User logged out"},
{UserID: "user3", Action: "login", Timestamp: time.Now(), Message: "User logged in"},
{UserID: "user1", Action: "profile", Timestamp: time.Now(), Message: "Profile updated"},
}
observable := ro.Pipe1(
ro.Just(logs...),
NewRateLimiter[LogEntry](100, time.Minute, func(log LogEntry) string {
return log.UserID
}),
)
values, err := ro.Collect(observable)
if err != nil {
fmt.Printf("Error: %v\n", err)
return
}
fmt.Printf("Processed log entries: %d\n", len(values))
Output: Processed log entries: 5
Example (UserBased) ¶
type UserRequest struct {
UserID string
Action string
Data string
}
// Rate limit by user ID: 2 requests per minute per user
observable := ro.Pipe1(
ro.Just(
UserRequest{UserID: "user1", Action: "login", Data: "data1"},
UserRequest{UserID: "user2", Action: "login", Data: "data2"},
UserRequest{UserID: "user1", Action: "logout", Data: "data3"},
UserRequest{UserID: "user1", Action: "profile", Data: "data4"},
UserRequest{UserID: "user3", Action: "login", Data: "data5"},
),
NewRateLimiter[UserRequest](2, time.Minute, func(req UserRequest) string {
return req.UserID
}),
)
values, err := ro.Collect(observable)
if err != nil {
fmt.Printf("Error: %v\n", err)
return
}
fmt.Printf("User-based rate limited requests: %d\n", len(values))
Output: User-based rate limited requests: 4
Types ¶
This section is empty.
Click to show internal directories.
Click to hide internal directories.