psrpc

package module
v0.2.10 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 5, 2023 License: Apache-2.0 Imports: 25 Imported by: 92

README

PubSub-RPC

Create custom protobuf-based golang RPCs built on pub/sub.

Supports:

  • Protobuf service definitions
  • Use Redis, Nats, or a local communication layer
  • Custom server selection for RPC handling based on user-defined affinity
  • RPC topics - any RPC can be divided into topics, (e.g. by region)
  • Single RPCs - one request is handled by one server, used for normal RPCs
  • Multi RPCs - one request is handled by every server, used for distributed updates or result aggregation
  • Queue Subscriptions - updates sent from the server will only be processed by a single client
  • Subscriptions - updates sent be the server will be processed by every client

Usage

Protobuf

PSRPC is generated from proto files, and we've added a few custom method options:

message Options {
  // For RPCs, each client request will receive a response from every server.
  // For subscriptions, every client will receive every update.
  bool multi = 1;

  // This method is a pub/sub.
  bool subscription = 2;

  // This method uses topics.
  bool topics = 3;

  // Your service will supply an affinity function for handler selection.
  bool affinity_func = 4;

  // The method uses bidirectional streaming.
  bool stream = 5;
}

Start with your service definition. Here's an example using different method options:

syntax = "proto3";

import "options.proto";

option go_package = "/api";

service MyService {
  // A normal RPC - one request, one response. The request will be handled by the first available server
  rpc NormalRPC(MyRequest) returns (MyResponse);

  // An RPC with a server affinity function for handler selection.
  rpc IntensiveRPC(MyRequest) returns (MyResponse) {
    option (psrpc.options).affinity_func = true;
  };

  // A multi-rpc - a client will send one request, and receive one response each from every server
  rpc GetStats(MyRequest) returns (MyResponse) {
    option (psrpc.options).multi = true;
  };

  // A streaming RPC - a client opens a stream, the first server to respond accepts it and both send and
  // receive messages until one side closes the stream.
  rpc ExchangeUpdates(MyClientMessage) returns (MyServerMessage) {
    option (psrpc.options).stream = true;
  };

  // An RPC with topics - a client can send one request, and receive one response from each server in one region
  rpc GetRegionStats(MyRequest) returns (MyResponse) {
    option (psrpc.options).topics = true;
    option (psrpc.options).multi = true;
  }

  // A queue subscription - even if multiple clients are subscribed, only one will receive this update.
  // The request parameter (in this case, Ignored) will always be ignored when generating go files.
  rpc ProcessUpdate(Ignored) returns (MyUpdate) {
    option (psrpc.options).subscription = true;
  };

  // A normal subscription - every client will receive every update.
  // The request parameter (in this case, Ignored) will always be ignored when generating go files.
  rpc UpdateState(Ignored) returns (MyUpdate) {
    option (psrpc.options).subscription = true;
    option (psrpc.options).multi = true;
  };

  // A subscription with topics - every client subscribed to the topic will receive every update.
  // The request parameter (in this case, Ignored) will always be ignored when generating go files.
  rpc UpdateRegionState(Ignored) returns (MyUpdate) {
    option (psrpc.options).subscription = true;
    option (psrpc.options).topics = true;
    option (psrpc.options).multi = true;
  }
}

message Ignored {}
message MyRequest {}
message MyResponse {}
message MyUpdate {}
message MyClientMessage {}
message MyServerMessage {}
Generation

Install protoc-gen-psrpc by running go install github.com/livekit/psrpc/protoc-gen-psrpc.

If using the custom options above, you'll also need to include options.proto. The simplest way to do this is to include psrpc in your project, then run

go list -json -m github.com/livekit/psrpc

{
	"Path": "github.com/livekit/psrpc",
	"Version": "v0.2.2",
	"Time": "2022-12-27T21:40:05Z",
	"Dir": "/Users/dc/go/pkg/mod/github.com/livekit/psrpc@v0.2.2",
	"GoMod": "/Users/dc/go/pkg/mod/cache/download/github.com/livekit/psrpc/@v/v0.2.2.mod",
	"GoVersion": "1.18"
}

Use the --psrpc_out with protoc and include the options directory.

protoc \
  --go_out=paths=source_relative:. \
  --psrpc_out=paths=source_relative:. \
  -I /Users/dc/go/pkg/mod/github.com/livekit/psrpc@v0.2.2/protoc-gen-psrpc/options \
  -I=. my_service.proto

This will create a my_service.psrpc.go file.

Client

A MyServiceClient will be generated based on your rpc definitions:

type MyServiceClient interface {
    // A normal RPC - one request, one response. The request will be handled by the first available server
    NormalRPC(ctx context.Context, req *MyRequest, opts ...psrpc.RequestOpt) (*MyResponse, error)

    // An RPC with a server affinity function for handler selection.
    IntensiveRPC(ctx context.Context, req *MyRequest, opts ...psrpc.RequestOpt) (*MyResponse, error)

    // A multi-rpc - a client will send one request, and receive one response each from every server
    GetStats(ctx context.Context, req *MyRequest, opts ...psrpc.RequestOpt) (<-chan *psrpc.Response[*MyResponse], error)

    // A streaming RPC - a client opens a stream, the first server to respond accepts it and both send and
    // receive messages until one side closes the stream.
    ExchangeUpdates(ctx context.Context, opts ...psrpc.RequestOpt) (psrpc.ClientStream[*MyClientMessage, *MyServerMessage], error)

    // An RPC with topics - a client can send one request, and receive one response from each server in one region
    GetRegionStats(ctx context.Context, topic string, req *Request, opts ...psrpc.RequestOpt) (<-chan *psrpc.Response[*MyResponse], error)

    // A queue subscription - even if multiple clients are subscribed, only one will receive this update.
    SubscribeProcessUpdate(ctx context.Context) (psrpc.Subscription[*MyUpdate], error)

    // A subscription with topics - every client subscribed to the topic will receive every update.
    SubscribeUpdateRegionState(ctx context.Context, topic string) (psrpc.Subscription[*MyUpdate], error)
}

// NewMyServiceClient creates a psrpc client that implements the MyServiceClient interface.
func NewMyServiceClient(clientID string, bus psrpc.MessageBus, opts ...psrpc.ClientOpt) (MyServiceClient, error) {
    ...
}

Multi-RPCs will return a chan *psrpc.Response, where you will receive an individual response or error from each server:

type Response[ResponseType proto.Message] struct {
    Result ResponseType
    Err    error
}

Streaming RPCs will return a psrpc.ClientStream. You can listen for updates from its channel, send updates, or close the stream.

Send blocks until the message has been received. When the stream closes the cause is available to both the server and client from Err.

type ClientStream[SendType, RecvType proto.Message] interface {
	Channel() <-chan RecvType
	Send(msg SendType, opts ...StreamOption) error
	Close(cause error) error
	Err() error
}

Subscription RPCs will return a psrpc.Subscription, where you can listen for updates on its channel:

type Subscription[MessageType proto.Message] interface {
    Channel() <-chan MessageType
    Close() error
}
ServerImpl

A <ServiceName>ServerImpl interface will be also be generated from your rpcs. Your service will need to fulfill its interface:

type MyServiceServerImpl interface {
    // A normal RPC - one request, one response. The request will be handled by the first available server
    NormalRPC(ctx context.Context, req *MyRequest) (*MyResponse, error)

    // An RPC with a server affinity function for handler selection.
    IntensiveRPC(ctx context.Context, req *MyRequest) (*MyResponse, error)
    IntensiveRPCAffinity(req *MyRequest) float32

    // A multi-rpc - a client will send one request, and receive one response each from every server
    GetStats(ctx context.Context, req *MyRequest) (*MyResponse, error)

    // A streaming RPC - a client opens a stream, the first server to respond accepts it and both send and
    // receive messages until one side closes the stream.
    ExchangeUpdates(stream psrpc.ServerStream[*MyClientMessage, *MyServerMessage]) error

    // An RPC with topics - a client can send one request, and receive one response from each server in one region
    GetRegionStats(ctx context.Context, req *MyRequest) (*MyResponse, error)
}
Server

Finally, a <ServiceName>Server will be generated. This is used to start your rpc server, as well as register and deregister topics:

type MyServiceServer interface {
    // An RPC with topics - a client can send one request, and receive one response from each server in one region
    RegisterGetRegionStatsTopic(topic string) error
    DeregisterGetRegionStatsTopic(topic string) error

    // A queue subscription - even if multiple clients are subscribed, only one will receive this update.
    PublishProcessUpdate(ctx context.Context, msg *MyUpdate) error

    // A subscription with topics - every client subscribed to the topic will receive every update.
    PublishUpdateRegionState(ctx context.Context, topic string, msg *MyUpdate) error

    // Close and wait for pending RPCs to complete
    Shutdown()

    // Close immediately, without waiting for pending RPCs
    Kill()
}

// NewMyServiceServer builds a RPCServer that can be used to handle
// requests that are routed to the right method in the provided svc implementation.
func NewMyServiceServer(serverID string, svc MyServiceServerImpl, bus psrpc.MessageBus, opts ...psrpc.ServerOpt) (MyServiceServer, error) {
    ...
}

Affinity

AffinityFunc

The server can implement an affinity function for the client to decide which instance should take a SingleRequest. A higher affinity score is better, and a score of 0 means the server is not available.

For example, the following could be used to return an affinity based on cpu load:

rpc IntensiveRPC(MyRequest) returns (MyResponse) {
  option (psrpc.options).affinity_func = true;
};
func (s *MyService) IntensiveRPC(ctx context.Context, req *api.MyRequest) (*api.MyResponse, error) {
    ... // do something CPU intensive
}

func (s *MyService) IntensiveRPCAffinity(_ *MyRequest) float32 {
    return stats.GetIdleCPU()
}
SelectionOpts

On the client side, you can also set server selection options with single RPCs.

type SelectionOpts struct {
    MinimumAffinity      float32       // (default 0) minimum affinity for a server to be considered a valid handler
    AcceptFirstAvailable bool          // (default true)
    AffinityTimeout      time.Duration // (default 0 (none)) server selection deadline
    ShortCircuitTimeout  time.Duration // (default 0 (none)) deadline imposed after receiving first response
}
selectionOpts := psrpc.SelectionOpts{
    MinimumAffinity:      0.5,
    AffinityTimeout:      time.Second,
    ShortCircuitTimeout:  time.Millisecond * 250,
}

res, err := myClient.IntensiveRPC(ctx, req, psrpc.WithSelectionOpts(selectionOpts))

Error handling

PSRPC defines an error type (psrpc.Error). This error type can be used to wrap any other error using the psrpc.NewError function:

func NewError(code ErrorCode, err error) Error

The code parameter provides more context about the cause of the error. A variety of codes are defined for common error conditions. PSRPC errors are serialized by the PSRPC server implementation, and unmarshalled (with the original error code) on the client. By retrieving the code using the Code() method, the client can determine if the error was caused by a server failure, or a client error, such as a bad parameter. This can be used as an input to the retry logic, or success rate metrics.

The most appropriate HTTP status code for a given error can be retrieved using the ToHttp() method. This status code is generated from the associated error code. Similarly, a grpc status.Error can be created from a psrpc.Error using the ToGrpc() method.

A psrpc.Error can also be converted easily to a twirp.Errorusing the errors.As function:

func As(err error, target any) bool

For instance:

func convertError(err error) {
	var twErr twirp.Error

	if errors.As(err, &twErr)
		return twErr
	}

	return err
}

This allows the twirp server implementations to interpret the prscp.Errors as native twirp.Error. Particularly, this means that twirp clients will also receive information about the error cause as twirp.Code. This makes sure that psrpc.Error created by psrpc server can be forwarded through PS and twirp RPC all the way to a twirp client error hook with the full associated context.

psrpc.Error implements the Unwrap() method, so the original error can be retrieved by users of PSRPC.

Interceptors

Interceptors allow writing middleware for RPC clients and servers. Interceptors can be used to run code during the call lifecycle such as logging, recording metrics, tracing, and retrying calls. PSRPC defines four interceptor types which allow intercepting requests on the client and server.

ServerInterceptor

ServerInterceptor are invoked by the server for calls to unary and multi RPCs.

type ServerInterceptor func(ctx context.Context, req proto.Message, info RPCInfo, handler Handler) (proto.Message, error)

The info parameter contains metadata about the method including the name and topic. Calling the handler parameter hands off execution to the next interceptor.

ServerInterceptor are added to new servers with WithServerInterceptor.

func WithServerInterceptors(interceptors ...ServerInterceptor) ServerOption

Interceptors run in the order they are added so the first interceptor passed to WithServerInterceptors is the first to receive a new requests. Calling the handler parameter invokes the second interceptor and so on until the service implementation receives the request and produces a response.

RPCInterceptor

RPCInterceptor are created by clients to process requests to unary RPCs.

type RPCInterceptor func(ctx context.Context, req proto.Message, opts ...RequestOption) (proto.Message, error)

RPCInterceptor are created by implementing RPCInterceptorFactory and passing the factory to new clients using WithClientRPCInterceptors.

type RPCInterceptorFactory func(info RPCInfo, next RPCInterceptor) RPCInterceptor

The next parameter received by RPCInterceptorFactory should be called by the implementation of RPCInterceptor to continue the call lifecycle.

MultiRPCInterceptor

MultiRPCInterceptor are created by clients to process requests to multi RPCs. Because MultiRPCInterceptor process several responses for the same request implementations can define separate functions for each phase of the call lifecycle. The Send function is executed on outgoing request parameters. The Recv function is executed once for each response when it returns from a servers. Close is called when the deadline is reached.

type MultiRPCInterceptor interface {
	Send(ctx context.Context, msg proto.Message, opts ...RequestOption) error
	Recv(msg proto.Message, err error)
	Close()
}

MultiRPCInterceptor are created by implementing MultiRPCInterceptorFactory and passing the factory to new clients using WithClientMultiRPCInterceptors.

type MultiRPCInterceptorFactory func(info RPCInfo, next MultiRPCInterceptor) MultiRPCInterceptor

Each function in a MultiRPCInterceptor should call the corresponding function in the interceptor received in the next parameter.

StreamInterceptor

StreamInterceptor are created by both clients and servers to process streaming RPCs. The Send function is executed once for each outgoing message. The Recv function is executed once for each incoming message. Close is called when either the local or remote host close the stream or if the stream receives a malformed message.

type StreamInterceptor interface {
	Recv(msg proto.Message) error
	Send(msg proto.Message, opts ...StreamOption) error
	Close(cause error) error
}

StreamInterceptor are created by implementing StreamInterceptorFactory and passing the factory to new clients or servers using WithClientStreamInterceptors and WithServerStreamInterceptors.

type StreamInterceptorFactory func(info RPCInfo, next StreamInterceptor) StreamInterceptor

Each function in a StreamInterceptor should call the corresponding function in the interceptor received in the next parameter.

Documentation

Index

Constants

View Source
const (
	DefaultChannelSize = 100
)
View Source
const (
	DefaultClientTimeout = time.Second * 3
)
View Source
const (
	DefaultServerTimeout = time.Second * 3
)

Variables

View Source
var (
	ErrRequestCanceled = NewError(Canceled, errors.New("request canceled"))
	ErrRequestTimedOut = NewError(DeadlineExceeded, errors.New("request timed out"))
	ErrNoResponse      = NewError(Unavailable, errors.New("no response from servers"))
	ErrStreamEOF       = NewError(Unavailable, io.EOF)
	ErrStreamClosed    = NewError(Canceled, errors.New("stream closed"))
	ErrSlowConsumer    = NewError(Unavailable, errors.New("stream message discarded by slow consumer"))
)

Functions

func AppendMetadataToOutgoingContext added in v0.2.8

func AppendMetadataToOutgoingContext(ctx context.Context, kv ...string) context.Context

func NewContextWithIncomingHeader added in v0.2.8

func NewContextWithIncomingHeader(ctx context.Context, head *Header) context.Context

func NewContextWithOutgoingMetadata added in v0.2.8

func NewContextWithOutgoingMetadata(ctx context.Context, md Metadata) context.Context

func RegisterHandler added in v0.2.0

func RegisterHandler[RequestType proto.Message, ResponseType proto.Message](
	s *RPCServer,
	rpc string,
	topic []string,
	svcImpl func(context.Context, RequestType) (ResponseType, error),
	affinityFunc AffinityFunc[RequestType],
	requireClaim bool,
	multi bool,
) error

func RegisterStreamHandler added in v0.2.5

func RegisterStreamHandler[RequestType proto.Message, ResponseType proto.Message](
	s *RPCServer,
	rpc string,
	topic []string,
	svcImpl func(ServerStream[ResponseType, RequestType]) error,
	affinityFunc StreamAffinityFunc,
	requireClaim bool,
) error

func RequestMulti added in v0.2.0

func RequestMulti[ResponseType proto.Message](
	ctx context.Context,
	c *RPCClient,
	rpc string,
	topic []string,
	request proto.Message,
	opts ...RequestOption,
) (rChan <-chan *Response[ResponseType], err error)

func RequestSingle

func RequestSingle[ResponseType proto.Message](
	ctx context.Context,
	c *RPCClient,
	rpc string,
	topic []string,
	requireClaim bool,
	request proto.Message,
	opts ...RequestOption,
) (response ResponseType, err error)

func SetLogger

func SetLogger(l logr.Logger)

Types

type AffinityFunc

type AffinityFunc[RequestType proto.Message] func(RequestType) float32

type ClientOption added in v0.2.0

type ClientOption func(*clientOpts)

func WithClientChannelSize added in v0.2.0

func WithClientChannelSize(size int) ClientOption

func WithClientMultiRPCInterceptors added in v0.2.7

func WithClientMultiRPCInterceptors(interceptors ...MultiRPCInterceptorFactory) ClientOption

func WithClientOptions added in v0.2.10

func WithClientOptions(opts ...ClientOption) ClientOption

func WithClientRPCInterceptors added in v0.2.7

func WithClientRPCInterceptors(interceptors ...RPCInterceptorFactory) ClientOption

func WithClientRequestHooks added in v0.2.3

func WithClientRequestHooks(hooks ...ClientRequestHook) ClientOption

func WithClientResponseHooks added in v0.2.3

func WithClientResponseHooks(hooks ...ClientResponseHook) ClientOption

func WithClientStreamInterceptors added in v0.2.5

func WithClientStreamInterceptors(interceptors ...StreamInterceptorFactory) ClientOption

func WithClientTimeout

func WithClientTimeout(timeout time.Duration) ClientOption

type ClientRequestHook added in v0.2.3

type ClientRequestHook func(ctx context.Context, req proto.Message, info RPCInfo)

Request hooks are called as soon as the request is made

type ClientResponseHook added in v0.2.3

type ClientResponseHook func(ctx context.Context, req proto.Message, info RPCInfo, resp proto.Message, err error)

Response hooks are called just before responses are returned For multi-requests, response hooks are called on every response, and block while executing

type ClientStream added in v0.2.5

type ClientStream[SendType, RecvType proto.Message] interface {
	Stream[SendType, RecvType]
}

func OpenStream added in v0.2.5

func OpenStream[SendType, RecvType proto.Message](
	ctx context.Context,
	c *RPCClient,
	rpc string,
	topic []string,
	requireClaim bool,
	opts ...RequestOption,
) (ClientStream[SendType, RecvType], error)

type Error added in v0.2.3

type Error interface {
	error
	Code() ErrorCode

	// convenience methods
	ToHttp() int
	GRPCStatus() *status.Status
}

func NewError added in v0.2.3

func NewError(code ErrorCode, err error) Error

func NewErrorf added in v0.2.3

func NewErrorf(code ErrorCode, msg string, args ...interface{}) Error

type ErrorCode added in v0.2.3

type ErrorCode string
const (
	OK ErrorCode = ""

	// Request Canceled by client
	Canceled ErrorCode = "canceled"
	// Could not unmarshal request
	MalformedRequest ErrorCode = "malformed_request"
	// Could not unmarshal result
	MalformedResponse ErrorCode = "malformed_result"
	// Request timed out
	DeadlineExceeded ErrorCode = "deadline_exceeded"
	// Service unavailable due to load and/or affinity constraints
	Unavailable ErrorCode = "unavailable"
	// Unknown (server returned non-psrpc error)
	Unknown ErrorCode = "unknown"

	// Invalid argument in request
	InvalidArgument ErrorCode = "invalid_argument"
	// Entity not found
	NotFound ErrorCode = "not_found"
	// Duplicate creation attempted
	AlreadyExists ErrorCode = "already_exists"
	// Caller does not have required permissions
	PermissionDenied ErrorCode = "permission_denied"
	// Some resource has been exhausted, e.g. memory or quota
	ResourceExhausted ErrorCode = "resource_exhausted"
	// Inconsistent state to carry out request
	FailedPrecondition ErrorCode = "failed_precondition"
	// Request aborted
	Aborted ErrorCode = "aborted"
	// Operation was out of range
	OutOfRange ErrorCode = "out_of_range"
	// Operation is not implemented by the server
	Unimplemented ErrorCode = "unimplemented"
	// Operation failed due to an internal error
	Internal ErrorCode = "internal"
	// Irrecoverable loss or corruption of data
	DataLoss ErrorCode = "data_loss"
	// Similar to PermissionDenied, used when the caller is unidentified
	Unauthenticated ErrorCode = "unauthenticated"
)

type Handler

type Handler func(context.Context, proto.Message) (proto.Message, error)
type Header struct {
	RemoteID string
	SentAt   time.Time
	Metadata Metadata
}

func IncomingHeader added in v0.2.8

func IncomingHeader(ctx context.Context) *Header

type MessageBus

type MessageBus interface {
	Publish(ctx context.Context, channel string, msg proto.Message) error
	Subscribe(ctx context.Context, channel string, channelSize int) (subInternal, error)
	SubscribeQueue(ctx context.Context, channel string, channelSize int) (subInternal, error)
}

func NewLocalMessageBus added in v0.2.2

func NewLocalMessageBus() MessageBus

func NewNatsMessageBus

func NewNatsMessageBus(nc *nats.Conn) MessageBus

func NewRedisMessageBus

func NewRedisMessageBus(rc redis.UniversalClient) MessageBus

type Metadata added in v0.2.8

type Metadata map[string]string

func OutgoingContextMetadata added in v0.2.8

func OutgoingContextMetadata(ctx context.Context) Metadata

type MultiRPCInterceptor added in v0.2.7

type MultiRPCInterceptor interface {
	Send(ctx context.Context, msg proto.Message, opts ...RequestOption) error
	Recv(msg proto.Message, err error)
	Close()
}

type MultiRPCInterceptorFactory added in v0.2.7

type MultiRPCInterceptorFactory func(info RPCInfo, next MultiRPCInterceptor) MultiRPCInterceptor

type RPCClient

type RPCClient struct {
	// contains filtered or unexported fields
}

func NewRPCClient

func NewRPCClient(serviceName, clientID string, bus MessageBus, opts ...ClientOption) (*RPCClient, error)

func NewRPCClientWithStreams added in v0.2.5

func NewRPCClientWithStreams(serviceName, clientID string, bus MessageBus, opts ...ClientOption) (*RPCClient, error)

func (*RPCClient) Close

func (c *RPCClient) Close()

type RPCInfo added in v0.2.3

type RPCInfo struct {
	Service string
	Method  string
	Topic   []string
	Multi   bool
}

type RPCInterceptor added in v0.2.7

type RPCInterceptor func(ctx context.Context, req proto.Message, opts ...RequestOption) (proto.Message, error)

type RPCInterceptorFactory added in v0.2.7

type RPCInterceptorFactory func(info RPCInfo, next RPCInterceptor) RPCInterceptor

type RPCServer

type RPCServer struct {
	// contains filtered or unexported fields
}

func NewRPCServer

func NewRPCServer(serviceName, serverID string, bus MessageBus, opts ...ServerOption) *RPCServer

func (*RPCServer) Close

func (s *RPCServer) Close(force bool)

func (*RPCServer) DeregisterHandler

func (s *RPCServer) DeregisterHandler(rpc string, topic []string)

func (*RPCServer) Publish

func (s *RPCServer) Publish(ctx context.Context, rpc string, topic []string, msg proto.Message) error

type Registerer added in v0.2.8

type Registerer struct {
	// contains filtered or unexported fields
}

func NewRegisterer added in v0.2.8

func NewRegisterer(register, deregister any) Registerer

type RegistererSlice added in v0.2.8

type RegistererSlice []Registerer

func (RegistererSlice) Deregister added in v0.2.8

func (rs RegistererSlice) Deregister(params ...any)

func (RegistererSlice) Register added in v0.2.8

func (rs RegistererSlice) Register(params ...any) error

type RequestOption added in v0.2.0

type RequestOption func(*reqOpts)

func WithRequestTimeout

func WithRequestTimeout(timeout time.Duration) RequestOption

func WithSelectionOpts

func WithSelectionOpts(opts SelectionOpts) RequestOption

type Response

type Response[ResponseType proto.Message] struct {
	Result ResponseType
	Err    error
}

type SelectionOpts

type SelectionOpts struct {
	MinimumAffinity      float32       // minimum affinity for a server to be considered a valid handler
	AcceptFirstAvailable bool          // go fast
	AffinityTimeout      time.Duration // server selection deadline
	ShortCircuitTimeout  time.Duration // deadline imposed after receiving first response
}

type ServerInterceptor added in v0.2.3

type ServerInterceptor func(ctx context.Context, req proto.Message, info RPCInfo, handler Handler) (proto.Message, error)

Server interceptors wrap the service implementation

func WithServerRecovery added in v0.2.0

func WithServerRecovery() ServerInterceptor

Recover from server panics. Should always be the last interceptor

type ServerOption added in v0.2.0

type ServerOption func(*serverOpts)

func WithServerChannelSize added in v0.2.0

func WithServerChannelSize(size int) ServerOption

func WithServerInterceptors added in v0.2.3

func WithServerInterceptors(interceptors ...ServerInterceptor) ServerOption

func WithServerOptions added in v0.2.10

func WithServerOptions(opts ...ServerOption) ServerOption

func WithServerStreamInterceptors added in v0.2.5

func WithServerStreamInterceptors(interceptors ...StreamInterceptorFactory) ServerOption

func WithServerTimeout

func WithServerTimeout(timeout time.Duration) ServerOption

type ServerStream added in v0.2.5

type ServerStream[SendType, RecvType proto.Message] interface {
	Stream[SendType, RecvType]
	Hijack()
}

type Stream added in v0.2.5

type Stream[SendType, RecvType proto.Message] interface {
	Channel() <-chan RecvType
	Send(msg SendType, opts ...StreamOption) error
	Close(cause error) error
	Context() context.Context
	Err() error
}

type StreamAffinityFunc added in v0.2.5

type StreamAffinityFunc func() float32

type StreamInterceptor added in v0.2.5

type StreamInterceptor interface {
	Recv(msg proto.Message) error
	Send(msg proto.Message, opts ...StreamOption) error
	Close(cause error) error
}

type StreamInterceptorFactory added in v0.2.7

type StreamInterceptorFactory func(info RPCInfo, next StreamInterceptor) StreamInterceptor

type StreamOption added in v0.2.5

type StreamOption func(*streamOpts)

func WithTimeout added in v0.2.5

func WithTimeout(timeout time.Duration) StreamOption

type Subscription

type Subscription[MessageType proto.Message] interface {
	Channel() <-chan MessageType
	Close() error
}

func Join added in v0.2.0

func Join[ResponseType proto.Message](
	ctx context.Context,
	c *RPCClient,
	rpc string,
	topic []string,
) (Subscription[ResponseType], error)

func JoinQueue added in v0.2.0

func JoinQueue[ResponseType proto.Message](
	ctx context.Context,
	c *RPCClient,
	rpc string,
	topic []string,
) (Subscription[ResponseType], error)

func Subscribe

func Subscribe[MessageType proto.Message](
	ctx context.Context,
	bus MessageBus,
	channel string,
	channelSize int,
) (Subscription[MessageType], error)

func SubscribeQueue

func SubscribeQueue[MessageType proto.Message](
	ctx context.Context,
	bus MessageBus,
	channel string,
	channelSize int,
) (Subscription[MessageType], error)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL