Documentation
¶
Overview ¶
Example (IpGRPCLimiter) ¶
// Set up a gRPC server.
lis, err := net.Listen("tcp", port)
if err != nil {
log.Fatalf("failed to listen: %v", err)
}
defer lis.Close()
// Connect to etcd.
etcdClient, err := clientv3.New(clientv3.Config{
Endpoints: strings.Split(os.Getenv("ETCD_ENDPOINTS"), ","),
DialTimeout: time.Second,
})
if err != nil {
log.Fatalf("could not connect to etcd: %v", err)
}
defer etcdClient.Close()
// Connect to Redis.
redisClient := redis.NewClient(&redis.Options{
Addr: os.Getenv("REDIS_ADDR"),
})
defer redisClient.Close()
logger := limiters.NewStdLogger()
// Registry is needed to keep track of previously created limiters. It can remove the expired limiters to free up
// memory.
registry := limiters.NewRegistry()
// The rate is used to define the token bucket refill rate and also the TTL for the limiters (both in Redis and in
// the registry).
rate := time.Second * 3
clock := limiters.NewSystemClock()
go func() {
// Garbage collect the old limiters to prevent memory leaks.
for {
<-time.After(rate)
registry.DeleteExpired(clock.Now())
}
}()
// Add a unary interceptor middleware to rate limit requests.
s := grpc.NewServer(grpc.UnaryInterceptor(
func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
p, ok := peer.FromContext(ctx)
var ip string
if !ok {
log.Println("no peer info available")
ip = "unknown"
} else {
ip = p.Addr.String()
}
// Create an IP address based rate limiter.
bucket := registry.GetOrCreate(ip, func() interface{} {
return limiters.NewTokenBucket(
2,
rate,
limiters.NewLockEtcd(etcdClient, fmt.Sprintf("/lock/ip/%s", ip), logger),
limiters.NewTokenBucketRedis(
redisClient,
fmt.Sprintf("/ratelimiter/ip/%s", ip),
rate, false),
clock, logger)
}, rate, clock.Now())
w, err := bucket.(*limiters.TokenBucket).Limit(ctx)
if err == limiters.ErrLimitExhausted {
return nil, status.Errorf(codes.ResourceExhausted, "try again later in %s", w)
} else if err != nil {
// The limiter failed. This error should be logged and examined.
log.Println(err)
return nil, status.Error(codes.Internal, "internal error")
}
return handler(ctx, req)
}))
pb.RegisterGreeterServer(s, &server{})
go func() {
// Start serving.
if err = s.Serve(lis); err != nil {
log.Fatalf("failed to serve: %v", err)
}
}()
defer s.GracefulStop()
// Set up a client connection to the server.
conn, err := grpc.Dial(fmt.Sprintf("localhost%s", port), grpc.WithInsecure())
if err != nil {
log.Fatalf("did not connect: %v", err)
}
defer conn.Close()
c := pb.NewGreeterClient(conn)
// Contact the server and print out its response.
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
r, err := c.SayHello(ctx, &pb.HelloRequest{Name: "Alice"})
if err != nil {
log.Fatalf("could not greet: %v", err)
}
fmt.Println(r.GetMessage())
r, err = c.SayHello(ctx, &pb.HelloRequest{Name: "Bob"})
if err != nil {
log.Fatalf("could not greet: %v", err)
}
fmt.Println(r.GetMessage())
r, err = c.SayHello(ctx, &pb.HelloRequest{Name: "Peter"})
if err == nil {
log.Fatal("error expected, but got nil")
}
fmt.Println(err)
Output: Hello Alice Hello Bob rpc error: code = ResourceExhausted desc = try again later in 3s
Example (SimpleGRPCLimiter) ¶
// Set up a gRPC server.
lis, err := net.Listen("tcp", port)
if err != nil {
log.Fatalf("failed to listen: %v", err)
}
defer lis.Close()
// Connect to etcd.
etcdClient, err := clientv3.New(clientv3.Config{
Endpoints: strings.Split(os.Getenv("ETCD_ENDPOINTS"), ","),
DialTimeout: time.Second,
})
if err != nil {
log.Fatalf("could not connect to etcd: %v", err)
}
defer etcdClient.Close()
// Connect to Redis.
redisClient := redis.NewClient(&redis.Options{
Addr: os.Getenv("REDIS_ADDR"),
})
defer redisClient.Close()
rate := time.Second * 3
limiter := limiters.NewTokenBucket(
2,
rate,
limiters.NewLockEtcd(etcdClient, "/ratelimiter_lock/simple/", limiters.NewStdLogger()),
limiters.NewTokenBucketRedis(
redisClient,
"ratelimiter/simple",
rate, false),
limiters.NewSystemClock(), limiters.NewStdLogger(),
)
// Add a unary interceptor middleware to rate limit all requests.
s := grpc.NewServer(grpc.UnaryInterceptor(
func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
w, err := limiter.Limit(ctx)
if err == limiters.ErrLimitExhausted {
return nil, status.Errorf(codes.ResourceExhausted, "try again later in %s", w)
} else if err != nil {
// The limiter failed. This error should be logged and examined.
log.Println(err)
return nil, status.Error(codes.Internal, "internal error")
}
return handler(ctx, req)
}))
pb.RegisterGreeterServer(s, &server{})
go func() {
// Start serving.
if err = s.Serve(lis); err != nil {
log.Fatalf("failed to serve: %v", err)
}
}()
defer s.GracefulStop()
// Set up a client connection to the server.
conn, err := grpc.Dial(fmt.Sprintf("localhost%s", port), grpc.WithInsecure())
if err != nil {
log.Fatalf("did not connect: %v", err)
}
defer conn.Close()
c := pb.NewGreeterClient(conn)
// Contact the server and print out its response.
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
r, err := c.SayHello(ctx, &pb.HelloRequest{Name: "Alice"})
if err != nil {
log.Fatalf("could not greet: %v", err)
}
fmt.Println(r.GetMessage())
r, err = c.SayHello(ctx, &pb.HelloRequest{Name: "Bob"})
if err != nil {
log.Fatalf("could not greet: %v", err)
}
fmt.Println(r.GetMessage())
r, err = c.SayHello(ctx, &pb.HelloRequest{Name: "Peter"})
if err == nil {
log.Fatal("error expected, but got nil")
}
fmt.Println(err)
Output: Hello Alice Hello Bob rpc error: code = ResourceExhausted desc = try again later in 3s
Click to show internal directories.
Click to hide internal directories.