Documentation
¶
Overview ¶
Package zrpc implements protobuf RPC over NATS.
Unary calls use NATS request-reply. Streaming calls open a session with inbox subjects and exchange framed messages (open/ack/data/end/error).
Logging and metrics are not emitted directly here. Pass lava.Middleware (typically accesslog/metric/recovery) to Client or Server so each request is recorded with request_id, subject, latency, and errors.
Index ¶
- Constants
- func ErrorFromMessage(msg *nats.Msg) error
- func Errorf(code Code, format string, args ...any) error
- func HandleStream(msg *nats.Msg, nc *nats.Conn, subject string, middlewares []lava.Middleware, ...)
- func HandleUnary[Req, Resp proto.Message](msg *nats.Msg, subject string, middlewares []lava.Middleware, ...)
- func RegisterService(srv *Server, svc *ServiceDescriptor, queue string) error
- func RegisterStream(s *Server, subject, queue string, ...) error
- func RegisterUnary[Req, Resp proto.Message](s *Server, subject, queue string, newReq func() Req, ...) error
- func ReplyError(msg *nats.Msg, code Code, text string)
- func ReplyErrorWithHeader(msg *nats.Msg, header nats.Header, code Code, text string)
- type Client
- type ClientStream
- type Code
- type Route
- type Server
- type ServerStream
- type ServiceDescriptor
- type Status
Constants ¶
const ( HeaderStatusCode = "Zrpc-Status-Code" HeaderStatusMessage = "Zrpc-Status-Message" )
const ( Name = "zrpc" DefaultContentType = "application/protobuf" HeaderTimeout = "Timeout" // propagated to server; also used for stream session deadline MethodNATS = "NATS" HeaderStream = "Zrpc-Stream" HeaderStreamFrame = "Zrpc-Stream-Frame" HeaderStreamReqSub = "Zrpc-Stream-Req-Subject" )
Variables ¶
This section is empty.
Functions ¶
func ErrorFromMessage ¶
ErrorFromMessage converts a zrpc reply message to a typed error when needed.
func HandleStream ¶
func HandleStream( msg *nats.Msg, nc *nats.Conn, subject string, middlewares []lava.Middleware, handler func(context.Context, *ServerStream) error, )
HandleStream handles one streaming session.
func HandleUnary ¶
func HandleUnary[Req, Resp proto.Message]( msg *nats.Msg, subject string, middlewares []lava.Middleware, newReq func() Req, handler func(context.Context, Req) (Resp, error), )
HandleUnary decodes a request, applies middlewares, calls the handler, and responds.
func RegisterService ¶
func RegisterService(srv *Server, svc *ServiceDescriptor, queue string) error
RegisterService binds all routes for svc on srv.
func RegisterStream ¶
func RegisterStream( s *Server, subject, queue string, handler func(context.Context, *ServerStream) error, ) error
RegisterStream binds one zrpc streaming method on subject + queue group.
func RegisterUnary ¶
func RegisterUnary[Req, Resp proto.Message]( s *Server, subject, queue string, newReq func() Req, handler func(context.Context, Req) (Resp, error), ) error
RegisterUnary binds one protobuf unary method on subject + queue group.
func ReplyError ¶
ReplyError sends a zrpc error reply on NATS. Used by server handlers when decode/validation fails before middleware runs.
Types ¶
type Client ¶
type Client struct {
// contains filtered or unexported fields
}
Client sends zrpc unary and streaming requests through NATS. Request logging is handled by lava.Middleware attached at construction time.
func NewClient ¶
func NewClient(nc *nats.Conn, middlewares ...lava.Middleware) *Client
NewClient creates a zrpc client bound to a NATS connection.
func (*Client) CallUnary ¶
func (c *Client) CallUnary(ctx context.Context, subject string, timeout time.Duration, req, resp proto.Message) error
CallUnary sends a unary protobuf request and decodes the protobuf response.
func (*Client) OpenStream ¶
func (c *Client) OpenStream(ctx context.Context, subject string, timeout time.Duration) (*ClientStream, error)
OpenStream opens a bidirectional stream bound to a zrpc subject.
type ClientStream ¶
type ClientStream struct {
// contains filtered or unexported fields
}
ClientStream is a bidirectional zrpc message stream.
func (*ClientStream) Close ¶
func (s *ClientStream) Close() error
Close closes the stream and releases subscriptions.
func (*ClientStream) CloseSend ¶
func (s *ClientStream) CloseSend() error
CloseSend closes the request side of the stream.
type Code ¶
type Code int
Code is a zrpc status code.
func StatusFromError ¶
StatusFromError maps handler errors to a zrpc code.
type Route ¶
type Route struct {
Subject string
Queue string
// Register is called by generated code to bind this route on srv.
Register func(srv *Server, queue string) error
}
Route describes one unary NATS binding.
type Server ¶
type Server struct {
// contains filtered or unexported fields
}
Server registers zrpc unary and streaming handlers on NATS queue subscriptions. Request logging is handled by lava.Middleware attached at construction time.
type ServerStream ¶
type ServerStream struct {
// contains filtered or unexported fields
}
ServerStream is a bidirectional stream used by zrpc streaming handlers.
func (*ServerStream) CloseSend ¶
func (s *ServerStream) CloseSend() error
CloseSend closes response side stream.
type ServiceDescriptor ¶
ServiceDescriptor groups routes for one protobuf service.