zrpc

package
v2.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 17, 2026 License: Apache-2.0 Imports: 13 Imported by: 0

Documentation

Overview

Package zrpc implements protobuf RPC over NATS.

Unary calls use NATS request-reply. Streaming calls open a session with inbox subjects and exchange framed messages (open/ack/data/end/error).

Logging and metrics are not emitted directly here. Pass lava.Middleware (typically accesslog/metric/recovery) to Client or Server so each request is recorded with request_id, subject, latency, and errors.

Index

Constants

View Source
const (
	HeaderStatusCode    = "Zrpc-Status-Code"
	HeaderStatusMessage = "Zrpc-Status-Message"
)
View Source
const (
	Name               = "zrpc"
	DefaultContentType = "application/protobuf"
	HeaderTimeout      = "Timeout" // propagated to server; also used for stream session deadline
	MethodNATS         = "NATS"
	HeaderStream       = "Zrpc-Stream"
	HeaderStreamFrame  = "Zrpc-Stream-Frame"
	HeaderStreamReqSub = "Zrpc-Stream-Req-Subject"
)

Variables

This section is empty.

Functions

func ErrorFromMessage

func ErrorFromMessage(msg *nats.Msg) error

ErrorFromMessage converts a zrpc reply message to a typed error when needed.

func Errorf

func Errorf(code Code, format string, args ...any) error

Errorf builds a *Status for handler return values.

func HandleStream

func HandleStream(
	msg *nats.Msg,
	nc *nats.Conn,
	subject string,
	middlewares []lava.Middleware,
	handler func(context.Context, *ServerStream) error,
)

HandleStream handles one streaming session.

func HandleUnary

func HandleUnary[Req, Resp proto.Message](
	msg *nats.Msg,
	subject string,
	middlewares []lava.Middleware,
	newReq func() Req,
	handler func(context.Context, Req) (Resp, error),
)

HandleUnary decodes a request, applies middlewares, calls the handler, and responds.

func RegisterService

func RegisterService(srv *Server, svc *ServiceDescriptor, queue string) error

RegisterService binds all routes for svc on srv.

func RegisterStream

func RegisterStream(
	s *Server,
	subject, queue string,
	handler func(context.Context, *ServerStream) error,
) error

RegisterStream binds one zrpc streaming method on subject + queue group.

func RegisterUnary

func RegisterUnary[Req, Resp proto.Message](
	s *Server,
	subject, queue string,
	newReq func() Req,
	handler func(context.Context, Req) (Resp, error),
) error

RegisterUnary binds one protobuf unary method on subject + queue group.

func ReplyError

func ReplyError(msg *nats.Msg, code Code, text string)

ReplyError sends a zrpc error reply on NATS. Used by server handlers when decode/validation fails before middleware runs.

func ReplyErrorWithHeader

func ReplyErrorWithHeader(msg *nats.Msg, header nats.Header, code Code, text string)

ReplyErrorWithHeader sends a zrpc error reply on NATS and preserves custom headers.

Types

type Client

type Client struct {
	// contains filtered or unexported fields
}

Client sends zrpc unary and streaming requests through NATS. Request logging is handled by lava.Middleware attached at construction time.

func NewClient

func NewClient(nc *nats.Conn, middlewares ...lava.Middleware) *Client

NewClient creates a zrpc client bound to a NATS connection.

func (*Client) CallUnary

func (c *Client) CallUnary(ctx context.Context, subject string, timeout time.Duration, req, resp proto.Message) error

CallUnary sends a unary protobuf request and decodes the protobuf response.

func (*Client) Conn

func (c *Client) Conn() *nats.Conn

Conn returns the underlying NATS connection.

func (*Client) OpenStream

func (c *Client) OpenStream(ctx context.Context, subject string, timeout time.Duration) (*ClientStream, error)

OpenStream opens a bidirectional stream bound to a zrpc subject.

type ClientStream

type ClientStream struct {
	// contains filtered or unexported fields
}

ClientStream is a bidirectional zrpc message stream.

func (*ClientStream) Close

func (s *ClientStream) Close() error

Close closes the stream and releases subscriptions.

func (*ClientStream) CloseSend

func (s *ClientStream) CloseSend() error

CloseSend closes the request side of the stream.

func (*ClientStream) Recv

func (s *ClientStream) Recv(resp proto.Message) error

Recv receives one protobuf message from the response stream.

func (*ClientStream) Send

func (s *ClientStream) Send(msg proto.Message) error

Send sends one protobuf message into the request stream.

type Code

type Code int

Code is a zrpc status code.

const (
	CodeOK              Code = 0
	CodeInvalidArgument Code = 3
	CodeInternal        Code = 13
)

func StatusFromError

func StatusFromError(err error) (Code, string)

StatusFromError maps handler errors to a zrpc code.

func StatusFromMessage

func StatusFromMessage(msg *nats.Msg) (Code, string)

StatusFromMessage maps a zrpc reply message to its embedded status code.

type Route

type Route struct {
	Subject string
	Queue   string
	// Register is called by generated code to bind this route on srv.
	Register func(srv *Server, queue string) error
}

Route describes one unary NATS binding.

type Server

type Server struct {
	// contains filtered or unexported fields
}

Server registers zrpc unary and streaming handlers on NATS queue subscriptions. Request logging is handled by lava.Middleware attached at construction time.

func NewServer

func NewServer(nc *nats.Conn, middlewares ...lava.Middleware) *Server

NewServer creates a server bound to a NATS connection.

func (*Server) Close

func (s *Server) Close()

Close unsubscribes all registered handlers.

func (*Server) Conn

func (s *Server) Conn() *nats.Conn

Conn returns the underlying NATS connection.

type ServerStream

type ServerStream struct {
	// contains filtered or unexported fields
}

ServerStream is a bidirectional stream used by zrpc streaming handlers.

func (*ServerStream) Close

func (s *ServerStream) Close() error

Close closes stream resources.

func (*ServerStream) CloseSend

func (s *ServerStream) CloseSend() error

CloseSend closes response side stream.

func (*ServerStream) Recv

func (s *ServerStream) Recv(req proto.Message) error

Recv receives one protobuf message from request stream.

func (*ServerStream) Send

func (s *ServerStream) Send(resp proto.Message) error

Send sends one protobuf message into response stream.

type ServiceDescriptor

type ServiceDescriptor struct {
	ServicePath  string
	DefaultQueue string
	Routes       []Route
}

ServiceDescriptor groups routes for one protobuf service.

type Status

type Status struct {
	Code    Code
	Message string
}

Status is an application error returned from handlers.

func (*Status) Error

func (s *Status) Error() string

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL