Documentation
¶
Index ¶
- Constants
- Variables
- func AppendMetadataToOutgoingContext(ctx context.Context, kv ...string) context.Context
- func NewContextWithIncomingHeader(ctx context.Context, head *Header) context.Context
- func NewContextWithOutgoingMetadata(ctx context.Context, md Metadata) context.Context
- func RegisterHandler[RequestType proto.Message, ResponseType proto.Message](s *RPCServer, rpc string, topic []string, ...) error
- func RegisterStreamHandler[RequestType proto.Message, ResponseType proto.Message](s *RPCServer, rpc string, topic []string, ...) error
- func RequestMulti[ResponseType proto.Message](ctx context.Context, c *RPCClient, rpc string, topic []string, ...) (rChan <-chan *Response[ResponseType], err error)
- func RequestSingle[ResponseType proto.Message](ctx context.Context, c *RPCClient, rpc string, topic []string, ...) (response ResponseType, err error)
- func SetLogger(l logr.Logger)
- type AffinityFunc
- type ClientOption
- func WithClientChannelSize(size int) ClientOption
- func WithClientMultiRPCInterceptors(interceptors ...MultiRPCInterceptorFactory) ClientOption
- func WithClientOptions(opts ...ClientOption) ClientOption
- func WithClientRPCInterceptors(interceptors ...RPCInterceptorFactory) ClientOption
- func WithClientRequestHooks(hooks ...ClientRequestHook) ClientOption
- func WithClientResponseHooks(hooks ...ClientResponseHook) ClientOption
- func WithClientStreamInterceptors(interceptors ...StreamInterceptorFactory) ClientOption
- func WithClientTimeout(timeout time.Duration) ClientOption
- type ClientRequestHook
- type ClientResponseHook
- type ClientStream
- type Error
- type ErrorCode
- type Handler
- type Header
- type MessageBus
- type Metadata
- type MultiRPCInterceptor
- type MultiRPCInterceptorFactory
- type RPCClient
- type RPCInfo
- type RPCInterceptor
- type RPCInterceptorFactory
- type RPCServer
- type Registerer
- type RegistererSlice
- type RequestOption
- type Response
- type SelectionOpts
- type ServerInterceptor
- type ServerOption
- func WithServerChannelSize(size int) ServerOption
- func WithServerInterceptors(interceptors ...ServerInterceptor) ServerOption
- func WithServerOptions(opts ...ServerOption) ServerOption
- func WithServerStreamInterceptors(interceptors ...StreamInterceptorFactory) ServerOption
- func WithServerTimeout(timeout time.Duration) ServerOption
- type ServerStream
- type Stream
- type StreamAffinityFunc
- type StreamInterceptor
- type StreamInterceptorFactory
- type StreamOption
- type Subscription
- func Join[ResponseType proto.Message](ctx context.Context, c *RPCClient, rpc string, topic []string) (Subscription[ResponseType], error)
- func JoinQueue[ResponseType proto.Message](ctx context.Context, c *RPCClient, rpc string, topic []string) (Subscription[ResponseType], error)
- func Subscribe[MessageType proto.Message](ctx context.Context, bus MessageBus, channel string, channelSize int) (Subscription[MessageType], error)
- func SubscribeQueue[MessageType proto.Message](ctx context.Context, bus MessageBus, channel string, channelSize int) (Subscription[MessageType], error)
Constants ¶
View Source
const (
DefaultChannelSize = 100
)
View Source
const (
DefaultClientTimeout = time.Second * 3
)
View Source
const (
DefaultServerTimeout = time.Second * 3
)
Variables ¶
View Source
var ( ErrRequestCanceled = NewError(Canceled, errors.New("request canceled")) ErrRequestTimedOut = NewError(DeadlineExceeded, errors.New("request timed out")) ErrNoResponse = NewError(Unavailable, errors.New("no response from servers")) ErrStreamEOF = NewError(Unavailable, io.EOF) ErrStreamClosed = NewError(Canceled, errors.New("stream closed")) ErrSlowConsumer = NewError(Unavailable, errors.New("stream message discarded by slow consumer")) )
Functions ¶
func AppendMetadataToOutgoingContext ¶ added in v0.2.8
func NewContextWithIncomingHeader ¶ added in v0.2.8
func NewContextWithOutgoingMetadata ¶ added in v0.2.8
func RegisterHandler ¶ added in v0.2.0
func RegisterStreamHandler ¶ added in v0.2.5
func RequestMulti ¶ added in v0.2.0
func RequestSingle ¶
Types ¶
type AffinityFunc ¶
type ClientOption ¶ added in v0.2.0
type ClientOption func(*clientOpts)
func WithClientChannelSize ¶ added in v0.2.0
func WithClientChannelSize(size int) ClientOption
func WithClientMultiRPCInterceptors ¶ added in v0.2.7
func WithClientMultiRPCInterceptors(interceptors ...MultiRPCInterceptorFactory) ClientOption
func WithClientOptions ¶ added in v0.2.10
func WithClientOptions(opts ...ClientOption) ClientOption
func WithClientRPCInterceptors ¶ added in v0.2.7
func WithClientRPCInterceptors(interceptors ...RPCInterceptorFactory) ClientOption
func WithClientRequestHooks ¶ added in v0.2.3
func WithClientRequestHooks(hooks ...ClientRequestHook) ClientOption
func WithClientResponseHooks ¶ added in v0.2.3
func WithClientResponseHooks(hooks ...ClientResponseHook) ClientOption
func WithClientStreamInterceptors ¶ added in v0.2.5
func WithClientStreamInterceptors(interceptors ...StreamInterceptorFactory) ClientOption
func WithClientTimeout ¶
func WithClientTimeout(timeout time.Duration) ClientOption
type ClientRequestHook ¶ added in v0.2.3
Request hooks are called as soon as the request is made
type ClientResponseHook ¶ added in v0.2.3
type ClientResponseHook func(ctx context.Context, req proto.Message, info RPCInfo, resp proto.Message, err error)
Response hooks are called just before responses are returned For multi-requests, response hooks are called on every response, and block while executing
type ClientStream ¶ added in v0.2.5
func OpenStream ¶ added in v0.2.5
type Error ¶ added in v0.2.3
type ErrorCode ¶ added in v0.2.3
type ErrorCode string
const ( OK ErrorCode = "" // Request Canceled by client Canceled ErrorCode = "canceled" // Could not unmarshal request MalformedRequest ErrorCode = "malformed_request" // Could not unmarshal result MalformedResponse ErrorCode = "malformed_result" // Request timed out DeadlineExceeded ErrorCode = "deadline_exceeded" Unavailable ErrorCode = "unavailable" // Unknown (server returned non-psrpc error) Unknown ErrorCode = "unknown" // Invalid argument in request InvalidArgument ErrorCode = "invalid_argument" // Entity not found NotFound ErrorCode = "not_found" // Duplicate creation attempted AlreadyExists ErrorCode = "already_exists" // Caller does not have required permissions PermissionDenied ErrorCode = "permission_denied" // Some resource has been exhausted, e.g. memory or quota ResourceExhausted ErrorCode = "resource_exhausted" // Inconsistent state to carry out request FailedPrecondition ErrorCode = "failed_precondition" // Request aborted Aborted ErrorCode = "aborted" // Operation was out of range OutOfRange ErrorCode = "out_of_range" // Operation is not implemented by the server Unimplemented ErrorCode = "unimplemented" // Operation failed due to an internal error Internal ErrorCode = "internal" // Irrecoverable loss or corruption of data DataLoss ErrorCode = "data_loss" // Similar to PermissionDenied, used when the caller is unidentified Unauthenticated ErrorCode = "unauthenticated" )
type Header ¶ added in v0.2.8
func IncomingHeader ¶ added in v0.2.8
type MessageBus ¶
type MessageBus interface {
Publish(ctx context.Context, channel string, msg proto.Message) error
Subscribe(ctx context.Context, channel string, channelSize int) (subInternal, error)
SubscribeQueue(ctx context.Context, channel string, channelSize int) (subInternal, error)
}
func NewLocalMessageBus ¶ added in v0.2.2
func NewLocalMessageBus() MessageBus
func NewNatsMessageBus ¶
func NewNatsMessageBus(nc *nats.Conn) MessageBus
func NewRedisMessageBus ¶
func NewRedisMessageBus(rc redis.UniversalClient) MessageBus
type Metadata ¶ added in v0.2.8
func OutgoingContextMetadata ¶ added in v0.2.8
type MultiRPCInterceptor ¶ added in v0.2.7
type MultiRPCInterceptorFactory ¶ added in v0.2.7
type MultiRPCInterceptorFactory func(info RPCInfo, next MultiRPCInterceptor) MultiRPCInterceptor
type RPCClient ¶
type RPCClient struct {
// contains filtered or unexported fields
}
func NewRPCClient ¶
func NewRPCClient(serviceName, clientID string, bus MessageBus, opts ...ClientOption) (*RPCClient, error)
func NewRPCClientWithStreams ¶ added in v0.2.5
func NewRPCClientWithStreams(serviceName, clientID string, bus MessageBus, opts ...ClientOption) (*RPCClient, error)
type RPCInterceptor ¶ added in v0.2.7
type RPCInterceptorFactory ¶ added in v0.2.7
type RPCInterceptorFactory func(info RPCInfo, next RPCInterceptor) RPCInterceptor
type RPCServer ¶
type RPCServer struct {
// contains filtered or unexported fields
}
func NewRPCServer ¶
func NewRPCServer(serviceName, serverID string, bus MessageBus, opts ...ServerOption) *RPCServer
func (*RPCServer) DeregisterHandler ¶
type Registerer ¶ added in v0.2.8
type Registerer struct {
// contains filtered or unexported fields
}
func NewRegisterer ¶ added in v0.2.8
func NewRegisterer(register, deregister any) Registerer
type RegistererSlice ¶ added in v0.2.8
type RegistererSlice []Registerer
func (RegistererSlice) Deregister ¶ added in v0.2.8
func (rs RegistererSlice) Deregister(params ...any)
func (RegistererSlice) Register ¶ added in v0.2.8
func (rs RegistererSlice) Register(params ...any) error
type RequestOption ¶ added in v0.2.0
type RequestOption func(*reqOpts)
func WithRequestTimeout ¶
func WithRequestTimeout(timeout time.Duration) RequestOption
func WithSelectionOpts ¶
func WithSelectionOpts(opts SelectionOpts) RequestOption
type SelectionOpts ¶
type SelectionOpts struct {
MinimumAffinity float32 // minimum affinity for a server to be considered a valid handler
AcceptFirstAvailable bool // go fast
AffinityTimeout time.Duration // server selection deadline
ShortCircuitTimeout time.Duration // deadline imposed after receiving first response
}
type ServerInterceptor ¶ added in v0.2.3
type ServerInterceptor func(ctx context.Context, req proto.Message, info RPCInfo, handler Handler) (proto.Message, error)
Server interceptors wrap the service implementation
func WithServerRecovery ¶ added in v0.2.0
func WithServerRecovery() ServerInterceptor
Recover from server panics. Should always be the last interceptor
type ServerOption ¶ added in v0.2.0
type ServerOption func(*serverOpts)
func WithServerChannelSize ¶ added in v0.2.0
func WithServerChannelSize(size int) ServerOption
func WithServerInterceptors ¶ added in v0.2.3
func WithServerInterceptors(interceptors ...ServerInterceptor) ServerOption
func WithServerOptions ¶ added in v0.2.10
func WithServerOptions(opts ...ServerOption) ServerOption
func WithServerStreamInterceptors ¶ added in v0.2.5
func WithServerStreamInterceptors(interceptors ...StreamInterceptorFactory) ServerOption
func WithServerTimeout ¶
func WithServerTimeout(timeout time.Duration) ServerOption
type ServerStream ¶ added in v0.2.5
type StreamAffinityFunc ¶ added in v0.2.5
type StreamAffinityFunc func() float32
type StreamInterceptor ¶ added in v0.2.5
type StreamInterceptorFactory ¶ added in v0.2.7
type StreamInterceptorFactory func(info RPCInfo, next StreamInterceptor) StreamInterceptor
type StreamOption ¶ added in v0.2.5
type StreamOption func(*streamOpts)
func WithTimeout ¶ added in v0.2.5
func WithTimeout(timeout time.Duration) StreamOption
type Subscription ¶
type Subscription[MessageType proto.Message] interface { Channel() <-chan MessageType Close() error }
func Subscribe ¶
func Subscribe[MessageType proto.Message]( ctx context.Context, bus MessageBus, channel string, channelSize int, ) (Subscription[MessageType], error)
func SubscribeQueue ¶
func SubscribeQueue[MessageType proto.Message]( ctx context.Context, bus MessageBus, channel string, channelSize int, ) (Subscription[MessageType], error)
Source Files
¶
Click to show internal directories.
Click to hide internal directories.