Documentation
¶
Index ¶
- Constants
- Variables
- func RegisterHandler[RequestType proto.Message, ResponseType proto.Message](s *RPCServer, rpc string, topic string, ...) error
- func RequestMulti[ResponseType proto.Message](ctx context.Context, c *RPCClient, rpc string, topic string, ...) (rChan <-chan *Response[ResponseType], err error)
- func RequestSingle[ResponseType proto.Message](ctx context.Context, c *RPCClient, rpc string, topic string, ...) (response ResponseType, err error)
- func SetLogger(l logr.Logger)
- type AffinityFunc
- type ClientOption
- type ClientRequestHook
- type ClientResponseHook
- type Error
- type ErrorCode
- type Handler
- type MessageBus
- type RPCClient
- type RPCInfo
- type RPCServer
- type RequestOption
- type Response
- type SelectionOpts
- type ServerInterceptor
- type ServerOption
- type Subscription
- func Join[ResponseType proto.Message](ctx context.Context, c *RPCClient, rpc string, topic string) (Subscription[ResponseType], error)
- func JoinQueue[ResponseType proto.Message](ctx context.Context, c *RPCClient, rpc string, topic string) (Subscription[ResponseType], error)
- func Subscribe[MessageType proto.Message](ctx context.Context, bus MessageBus, channel string, channelSize int) (Subscription[MessageType], error)
- func SubscribeQueue[MessageType proto.Message](ctx context.Context, bus MessageBus, channel string, channelSize int) (Subscription[MessageType], error)
Constants ¶
View Source
const (
DefaultChannelSize = 100
)
View Source
const (
DefaultClientTimeout = time.Second * 3
)
View Source
const (
DefaultServerTimeout = time.Second * 3
)
Variables ¶
View Source
var ( ErrRequestTimedOut = NewError(DeadlineExceeded, errors.New("request timed out")) ErrNoResponse = NewError(Unavailable, errors.New("no response from servers")) )
Functions ¶
func RegisterHandler ¶ added in v0.2.0
func RequestMulti ¶ added in v0.2.0
func RequestSingle ¶
Types ¶
type AffinityFunc ¶
type ClientOption ¶ added in v0.2.0
type ClientOption func(*clientOpts)
func WithClientChannelSize ¶ added in v0.2.0
func WithClientChannelSize(size int) ClientOption
func WithClientRequestHooks ¶ added in v0.2.3
func WithClientRequestHooks(hooks ...ClientRequestHook) ClientOption
func WithClientResponseHooks ¶ added in v0.2.3
func WithClientResponseHooks(hooks ...ClientResponseHook) ClientOption
func WithClientTimeout ¶
func WithClientTimeout(timeout time.Duration) ClientOption
type ClientRequestHook ¶ added in v0.2.3
Request hooks are called as soon as the request is made
type ClientResponseHook ¶ added in v0.2.3
type ClientResponseHook func(ctx context.Context, req proto.Message, info RPCInfo, resp proto.Message, err error)
Response hooks are called just before responses are returned For multi-requests, response hooks are called on every response, and block while executing
type ErrorCode ¶ added in v0.2.3
type ErrorCode string
const ( OK ErrorCode = "" // Request Canceled by client Canceled ErrorCode = "canceled" // Could not unmarshal request MalformedRequest ErrorCode = "malformed_request" // Could not unmarshal result MalformedResponse ErrorCode = "malformed_result" // Request timed out DeadlineExceeded ErrorCode = "deadline_exceeded" Unavailable ErrorCode = "unavailable" // Unknown (server returned non-psrpc error) Unknown ErrorCode = "unknown" // Invalid argument in request InvalidArgument ErrorCode = "invalid_argument" // Entity not found NotFound ErrorCode = "not_found" // Duplicate creation attempted AlreadyExists ErrorCode = "already_exists" // Caller does not have required permissions PermissionDenied ErrorCode = "permission_denied" // Some resource has been exhausted, e.g. memory or quota ResourceExhausted ErrorCode = "resource_exhausted" // Inconsistent state to carry out request FailedPrecondition ErrorCode = "failed_precondition" // Request aborted Aborted ErrorCode = "aborted" // Operation was out of range OutOfRange ErrorCode = "out_of_range" // Operation is not implemented by the server Unimplemented ErrorCode = "unimplemented" // Operation failed due to an internal error Internal ErrorCode = "internal" // Irrecoverable loss or corruption of data DataLoss ErrorCode = "data_loss" // Similar to PermissionDenied, used when the caller is unidentified Unauthenticated ErrorCode = "unauthenticated" )
type MessageBus ¶
type MessageBus interface {
Publish(ctx context.Context, channel string, msg proto.Message) error
Subscribe(ctx context.Context, channel string, channelSize int) (subInternal, error)
SubscribeQueue(ctx context.Context, channel string, channelSize int) (subInternal, error)
}
func NewLocalMessageBus ¶ added in v0.2.2
func NewLocalMessageBus() MessageBus
func NewNatsMessageBus ¶
func NewNatsMessageBus(nc *nats.Conn) MessageBus
func NewRedisMessageBus ¶
func NewRedisMessageBus(rc redis.UniversalClient) MessageBus
type RPCClient ¶
type RPCClient struct {
// contains filtered or unexported fields
}
func NewRPCClient ¶
func NewRPCClient(serviceName, clientID string, bus MessageBus, opts ...ClientOption) (*RPCClient, error)
type RPCServer ¶
type RPCServer struct {
// contains filtered or unexported fields
}
func NewRPCServer ¶
func NewRPCServer(serviceName, serverID string, bus MessageBus, opts ...ServerOption) *RPCServer
func (*RPCServer) DeregisterHandler ¶
type RequestOption ¶ added in v0.2.0
type RequestOption func(*reqOpts)
func WithRequestTimeout ¶
func WithRequestTimeout(timeout time.Duration) RequestOption
func WithSelectionOpts ¶
func WithSelectionOpts(opts SelectionOpts) RequestOption
type SelectionOpts ¶
type SelectionOpts struct {
MinimumAffinity float32 // minimum affinity for a server to be considered a valid handler
AcceptFirstAvailable bool // go fast
AffinityTimeout time.Duration // server selection deadline
ShortCircuitTimeout time.Duration // deadline imposed after receiving first response
}
type ServerInterceptor ¶ added in v0.2.3
type ServerInterceptor func(ctx context.Context, req proto.Message, info RPCInfo, handler Handler) (proto.Message, error)
Server interceptors wrap the service implementation
func WithServerRecovery ¶ added in v0.2.0
func WithServerRecovery() ServerInterceptor
Recover from server panics. Should always be the last interceptor
type ServerOption ¶ added in v0.2.0
type ServerOption func(*serverOpts)
func WithServerChannelSize ¶ added in v0.2.0
func WithServerChannelSize(size int) ServerOption
func WithServerInterceptors ¶ added in v0.2.3
func WithServerInterceptors(interceptors ...ServerInterceptor) ServerOption
func WithServerTimeout ¶
func WithServerTimeout(timeout time.Duration) ServerOption
type Subscription ¶
type Subscription[MessageType proto.Message] interface { Channel() <-chan MessageType Close() error }
func Subscribe ¶
func Subscribe[MessageType proto.Message]( ctx context.Context, bus MessageBus, channel string, channelSize int, ) (Subscription[MessageType], error)
func SubscribeQueue ¶
func SubscribeQueue[MessageType proto.Message]( ctx context.Context, bus MessageBus, channel string, channelSize int, ) (Subscription[MessageType], error)
Source Files
¶
Click to show internal directories.
Click to hide internal directories.