psrpc

package module
v0.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 24, 2022 License: Apache-2.0 Imports: 17 Imported by: 92

README

PubSub-RPC

Create custom protobuf-based golang RPCs built on pub/sub.

Supports:

  • Protobuf service definitions
  • Redis or Nats as a communication layer
  • Custom server selection for RPC handling based on user-defined affinity
  • RPC topics - any RPC can be divided into topics, (e.g. by region)
  • Single RPCs - one request is handled by one server, used for normal RPCs
  • Multi RPCs - one request is handled by every server, used for distributed updates or result aggregation
  • Queue Subscriptions - updates sent from the server will only be processed by a single client
  • Subscriptions - updates sent be the server will be processed by every client

Usage

Protobuf

PSRPC is generated from proto files, and we've added a few custom method options:

message Options {
  // For RPCs, each client request will receive a response from every server.
  // For subscriptions, every client will receive every update.
  bool multi = 1;

  // This method is a pub/sub.
  bool subscription = 2;

  // This method uses topics.
  bool topics = 3;

  // Your service will supply an affinity function for handler selection.
  bool affinity_func = 4;
}

Start with your service definition. Here's an example using different method options:

syntax = "proto3";

import "options.proto";

option go_package = "/api";

service MyService {
  // A normal RPC - one request, one response. The request will be handled by the first available server
  rpc NormalRPC(MyRequest) returns (MyResponse);
  
  // An RPC with a server affinity function for handler selection.
  rpc IntensiveRPC(MyRequest) returns (MyResponse) {
    option (psrpc.options).affinity_func = true;
  };
  
  // A multi-rpc - a client will send one request, and receive one response each from every server
  rpc GetStats(MyRequest) returns (MyResponse) {
    option (psrpc.options).multi = true;
  };
  
  // An RPC with topics - a client can send one request, and receive one response from each server in one region
  rpc GetRegionStats(MyRequest) returns (MyResponse) {
    option (psrpc.options).topics = true;
    option (psrpc.options).multi = true;
  }
  
  // A queue subscription - even if multiple clients are subscribed, only one will receive this update.
  // The request parameter (in this case, Ignored) will always be ignored when generating go files.
  rpc ProcessUpdate(Ignored) returns (MyUpdate) {
    option (psrpc.options).subscription = true;
  };
  
  // A normal subscription - every client will receive every update.
  // The request parameter (in this case, Ignored) will always be ignored when generating go files.
  rpc UpdateState(Ignored) returns (MyUpdate) {
    option (psrpc.options).subscription = true;
    option (psrpc.options).multi = true;
  };

  // A subscription with topics - every client subscribed to the topic will receive every update.
  // The request parameter (in this case, Ignored) will always be ignored when generating go files.
  rpc UpdateRegionState(Ignored) returns (MyUpdate) {
    option (psrpc.options).subscription = true;
    option (psrpc.options).topics = true;
    option (psrpc.options).multi = true;
  }
}

message Ignored {}
message MyRequest {}
message MyResponse {}
message MyUpdate {}
Generation

Install protoc-gen-psrpc by running go install github.com/livekit/psrpc/protoc-gen-psrpc.

If using the custom options above, you'll also need to download options.proto.

Use the --psrpc_out with protoc and include the options file.

protoc \ 
  --go_out=paths=source_relative:. \
  --psrpc_out=paths=source_relative:. \
  -I ./protoc-gen-psrpc/options \
  -I=. my_service.proto

This will create a my_service.psrpc.go file

Client

A MyServiceClient will be generated based on your rpc definitions:

type MyServiceClient interface {
    // A normal RPC - one request, one response. The request will be handled by the first available server
    NormalRPC(ctx context.Context, req *MyRequest, opts ...psrpc.RequestOpt) (*MyResponse, error)
    
    // An RPC with a server affinity function for handler selection.
    IntensiveRPC(ctx context.Context, req *MyRequest, opts ...psrpc.RequestOpt) (*MyResponse, error)
    
    // A multi-rpc - a client will send one request, and receive one response each from every server
    GetStats(ctx context.Context, req *MyRequest, opts ...psrpc.RequestOpt) (<-chan *psrpc.Response[*MyResponse], error)
    
    // An RPC with topics - a client can send one request, and receive one response from each server in one region
    GetRegionStats(ctx context.Context, topic string, req *Request, opts ...psrpc.RequestOpt) (<-chan *psrpc.Response[*MyResponse], error)
    
    // A queue subscription - even if multiple clients are subscribed, only one will receive this update.
    SubscribeProcessUpdate(ctx context.Context) (psrpc.Subscription[*MyUpdate], error)
    
    // A subscription with topics - every client subscribed to the topic will receive every update.
    SubscribeUpdateRegionState(ctx context.Context, topic string) (psrpc.Subscription[*MyUpdate], error)
}

// NewMyServiceClient creates a psrpc client that implements the MyServiceClient interface.
func NewMyServiceClient(clientID string, bus psrpc.MessageBus, opts ...psrpc.ClientOpt) (MyServiceClient, error) {
    ...
}

Multi-RPCs will return a chan *psrpc.Response, where you will receive an individual response or error from each server:

type Response[ResponseType proto.Message] struct {
    Result ResponseType
    Err    error
}

Subscription RPCs will return a psrpc.Subscription, where you can listen for updates on its channel:

type Subscription[MessageType proto.Message] interface {
    Channel() <-chan MessageType
    Close() error
}
ServerImpl

A <ServiceName>ServerImpl interface will be also be generated from your rpcs. Your service will need to fulfill its interface:

type MyServiceServerImpl interface {
    // A normal RPC - one request, one response. The request will be handled by the first available server
    NormalRPC(ctx context.Context, req *MyRequest) (*MyResponse, error)
    
    // An RPC with a server affinity function for handler selection.
    IntensiveRPC(ctx context.Context, req *MyRequest) (*MyResponse, error)
    IntensiveRPCAffinity(req *MyRequest) float32
    
    // A multi-rpc - a client will send one request, and receive one response each from every server
    GetStats(ctx context.Context, req *MyRequest) (*MyResponse, error)
    
    // An RPC with topics - a client can send one request, and receive one response from each server in one region
    GetRegionStats(ctx context.Context, req *MyRequest) (*MyResponse, error)
}
Server

Finally, a <ServiceName>Server will be generated. This is used to start your rpc server, as well as register and deregister topics:

type MyServiceServer interface {
    // An RPC with topics - a client can send one request, and receive one response from each server in one region
    RegisterGetRegionStatsTopic(topic string) error
    DeregisterGetRegionStatsTopic(topic string) error
    
    // A queue subscription - even if multiple clients are subscribed, only one will receive this update.
    PublishProcessUpdate(ctx context.Context, msg *MyUpdate) error
    
    // A subscription with topics - every client subscribed to the topic will receive every update.
    PublishUpdateRegionState(ctx context.Context, topic string, msg *MyUpdate) error
}

// NewMyServiceServer builds a RPCServer that can be used to handle
// requests that are routed to the right method in the provided svc implementation.
func NewMyServiceServer(serverID string, svc MyServiceServerImpl, bus psrpc.MessageBus, opts ...psrpc.ServerOpt) (MyServiceServer, error) {
    ...
}

Affinity

AffinityFunc

The server can implement an affinity function for the client to decide which instance should take a SingleRequest. A higher affinity score is better, and a score of 0 means the server is not available.

For example, the following could be used to return an affinity based on cpu load:

rpc IntensiveRPC(MyRequest) returns (MyResponse) {
  option (psrpc.options).affinity_func = true;
};
func (s *MyService) IntensiveRPC(ctx context.Context, req *api.MyRequest) (*api.MyResponse, error) {
    ... // do something CPU intensive
}

func (s *MyService) IntensiveRPCAffinity(_ *MyRequest) float32 {
    return stats.GetIdleCPU()
}
SelectionOpts

On the client side, you can also set server selection options with single RPCs.

type SelectionOpts struct {
    MinimumAffinity      float32       // (default 0) minimum affinity for a server to be considered a valid handler
    AcceptFirstAvailable bool          // (default true)
    AffinityTimeout      time.Duration // (default 0 (none)) server selection deadline
    ShortCircuitTimeout  time.Duration // (default 0 (none)) deadline imposed after receiving first response
}
selectionOpts := psrpc.SelectionOpts{
    MinimumAffinity:      0.5,
    AffinityTimeout:      time.Second,
    ShortCircuitTimeout:  time.Millisecond * 250,
}

res, err := myClient.IntensiveRPC(ctx, req, psrpc.WithSelectionOpts(selectionOpts))

Documentation

Index

Constants

View Source
const (
	DefaultChannelSize = 100
)
View Source
const (
	DefaultClientTimeout = time.Second * 3
)
View Source
const (
	DefaultServerTimeout = time.Second * 3
)

Variables

View Source
var (
	ErrRequestTimedOut = errors.New("request timed out")
	ErrNoResponse      = errors.New("no response from servers")
)

Functions

func RegisterHandler added in v0.2.0

func RegisterHandler[RequestType proto.Message, ResponseType proto.Message](
	s *RPCServer,
	rpc string,
	topic string,
	svcImpl func(context.Context, RequestType) (ResponseType, error),
	affinityFunc AffinityFunc[RequestType],
) error

func RequestMulti added in v0.2.0

func RequestMulti[ResponseType proto.Message](
	ctx context.Context,
	c *RPCClient,
	rpc string,
	topic string,
	request proto.Message,
	opts ...RequestOption,
) (<-chan *Response[ResponseType], error)

func RequestSingle

func RequestSingle[ResponseType proto.Message](
	ctx context.Context,
	c *RPCClient,
	rpc string,
	topic string,
	request proto.Message,
	opts ...RequestOption,
) (ResponseType, error)

func SetLogger

func SetLogger(l logr.Logger)

Types

type AffinityFunc

type AffinityFunc[RequestType proto.Message] func(RequestType) float32

type ClientOption added in v0.2.0

type ClientOption func(*clientOpts)

func WithClientChannelSize added in v0.2.0

func WithClientChannelSize(size int) ClientOption

func WithClientTimeout

func WithClientTimeout(timeout time.Duration) ClientOption

type Handler

type Handler func(context.Context, proto.Message) (proto.Message, error)

type MessageBus

type MessageBus interface {
	Publish(ctx context.Context, channel string, msg proto.Message) error
	Subscribe(ctx context.Context, channel string, channelSize int) (subInternal, error)
	SubscribeQueue(ctx context.Context, channel string, channelSize int) (subInternal, error)
}

func NewNatsMessageBus

func NewNatsMessageBus(nc *nats.Conn) MessageBus

func NewRedisMessageBus

func NewRedisMessageBus(rc redis.UniversalClient) MessageBus

type RPCClient

type RPCClient struct {
	// contains filtered or unexported fields
}

func NewRPCClient

func NewRPCClient(serviceName, clientID string, bus MessageBus, opts ...ClientOption) (*RPCClient, error)

func (*RPCClient) Close

func (c *RPCClient) Close()

type RPCServer

type RPCServer struct {
	// contains filtered or unexported fields
}

func NewRPCServer

func NewRPCServer(serviceName, serverID string, bus MessageBus, opts ...ServerOption) *RPCServer

func (*RPCServer) Close

func (s *RPCServer) Close(force bool)

func (*RPCServer) DeregisterHandler

func (s *RPCServer) DeregisterHandler(rpc, topic string)

func (*RPCServer) Publish

func (s *RPCServer) Publish(ctx context.Context, rpc, topic string, msg proto.Message) error

type RequestOption added in v0.2.0

type RequestOption func(*reqOpts)

func WithRequestTimeout

func WithRequestTimeout(timeout time.Duration) RequestOption

func WithSelectionOpts

func WithSelectionOpts(opts SelectionOpts) RequestOption

type Response

type Response[ResponseType proto.Message] struct {
	Result ResponseType
	Err    error
}

type SelectionOpts

type SelectionOpts struct {
	MinimumAffinity      float32       // minimum affinity for a server to be considered a valid handler
	AcceptFirstAvailable bool          // go fast
	AffinityTimeout      time.Duration // server selection deadline
	ShortCircuitTimeout  time.Duration // deadline imposed after receiving first response
}

type ServerOption added in v0.2.0

type ServerOption func(*serverOpts)

func WithServerChannelSize added in v0.2.0

func WithServerChannelSize(size int) ServerOption

func WithServerTimeout

func WithServerTimeout(timeout time.Duration) ServerOption

func WithUnaryServerInterceptors added in v0.2.0

func WithUnaryServerInterceptors(interceptors ...UnaryServerInterceptor) ServerOption

type Subscription

type Subscription[MessageType proto.Message] interface {
	Channel() <-chan MessageType
	Close() error
}

func Join added in v0.2.0

func Join[ResponseType proto.Message](
	ctx context.Context,
	c *RPCClient,
	rpc string,
	topic string,
) (Subscription[ResponseType], error)

func JoinQueue added in v0.2.0

func JoinQueue[ResponseType proto.Message](
	ctx context.Context,
	c *RPCClient,
	rpc string,
	topic string,
) (Subscription[ResponseType], error)

func Subscribe

func Subscribe[MessageType proto.Message](
	ctx context.Context,
	bus MessageBus,
	channel string,
	channelSize int,
) (Subscription[MessageType], error)

func SubscribeQueue

func SubscribeQueue[MessageType proto.Message](
	ctx context.Context,
	bus MessageBus,
	channel string,
	channelSize int,
) (Subscription[MessageType], error)

type UnaryServerInterceptor added in v0.2.0

type UnaryServerInterceptor func(ctx context.Context, req proto.Message, handler Handler) (proto.Message, error)

func WithServerRecovery added in v0.2.0

func WithServerRecovery() UnaryServerInterceptor

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL