Documentation
¶
Index ¶
- Constants
- Variables
- func Decode(rawResp *Message, into any) error
- func EncodeMimetype(payload any, mimeType string) ([]byte, error)
- func NatsConnect(url string, options ...NatsOption) (*nats.Conn, error)
- func NatsDefaultServerOptions() *server.Options
- func NatsEmbeddedServer(opts *server.Options, startTimeout time.Duration) (*server.Server, error)
- func NewInbox() string
- type AnyServerHandler
- type Bus
- type Header
- type LatticeRequest
- type Message
- type NatsBus
- func (c *NatsBus) Publish(msg *Message) error
- func (c *NatsBus) QueueSubscribe(subject string, queue string, backlog int) (Subscription, error)
- func (c *NatsBus) Request(ctx context.Context, msg *Message) (*Message, error)
- func (c *NatsBus) Subscribe(subject string, backlog int) (Subscription, error)
- type NatsOption
- type NatsSubscription
- type RequestHandler
- type Server
- type ServerError
- type ServerHandlerFunc
- type Subscription
- type SubscriptionCallback
Constants ¶
const ( // PatternAll is a wildcard pattern that matches all subjects. PatternAll = "*" // NoBackLog is used to indicate that the subscription should not have a backlog. NoBackLog = 0 )
const NatsDefaultURL = nats.DefaultURL
Variables ¶
var ( // ErrEncode is returned when encoding a message fails. ErrEncode = errors.New("encode error") // ErrInternal is returned when an internal error occurs. ErrInternal = errors.New("internal error") // ErrDecode is returned when decoding a message fails. ErrDecode = errors.New("decode error") // ErrTransport is returned when a transport error occurs. ErrTransport = errors.New("transport error") // ErrOperation is returned when an operation error occurs. ErrOperation = errors.New("operation error") // ErrValidation is returned when a validation error occurs. ErrValidation = errors.New("validation error") )
Functions ¶
func Decode ¶
Decode unmarshals the raw response data into the provided struct. The content type is used to determine the unmarshaling format. If the content type is not supported, an error is returned. Acceptable content types are "application/json" and "application/yaml".
func NatsConnect ¶
func NatsConnect(url string, options ...NatsOption) (*nats.Conn, error)
NatsConnect connects to a NATS server at the given URL. The URL should be in the form of "nats://host:port". This helper function sets some default options and calls `nats.Connect`.
func NatsEmbeddedServer ¶
Types ¶
type AnyServerHandler ¶
AnyServerHandler is an interface that can be implemented by any handler that can be registered with a server. Primary implementations are `RequestHandler` and `ServerHandlerFunc`.
type Bus ¶
type Bus interface {
// Subscribe creates a subscription for the given subject.
// The backlog parameter is the maximum number of messages that can be buffered in memory.
Subscribe(subject string, backlog int) (Subscription, error)
// QueueSubscribe creates a subscription for the given subject and queue group.
// The backlog parameter is the maximum number of messages that can be buffered in memory.
QueueSubscribe(subject string, queue string, backlog int) (Subscription, error)
// Request sends a request message and waits for a reply.
// The context is used for the request timeout.
Request(ctx context.Context, msg *Message) (*Message, error)
// Publish sends a message to `msg.Subject`.
Publish(msg *Message) error
}
Bus is the interface for the message bus. It provides methods for subscribing to messages and sending messages. It doesn't hold any state and is safe for concurrent use. See `Subscription` for stateful operations. Modeled after the NATS API.
type LatticeRequest ¶
type LatticeRequest[T any, Y any] struct { // Request should be a reference to the request struct Request T // Response should be a struct that the response will be unmarshaled into // and should be passed by value Response Y Subject string Bus Bus PreRequest func(context.Context, T, *Message) error PostRequest func(context.Context, *Y, *Message) error }
LatticeRequest encodes the Roundtrip logic for a Bus Request This is a generic implementation that can be used with any Bus and any Request/Response pair. Requests are encoded in json and Responses can be either json or yaml. Use Pre & Post Request hooks to modify the request/response before/after. The `T` and `Y` types are used to define the Request and Response types. `T` should be passed by reference (pointer) and `Y` by value (struct).
func NewLatticeRequest ¶
func NewLatticeRequest[T any, Y any](bus Bus, subject string, in T, out Y) *LatticeRequest[T, Y]
NewLatticeRequest returns a `LatticeRequest` for a given type. The `T` and `Y` types are used to define the Request and Response types. `T` should be passed by reference (pointer) and `Y` by value (struct).
func (*LatticeRequest[T, Y]) Execute ¶
func (l *LatticeRequest[T, Y]) Execute(ctx context.Context) (*Y, error)
Execute sends the request to the bus and returns the response.
type Message ¶
type Message struct {
Subject string
Reply string
Header Header
Data []byte
// contains filtered or unexported fields
}
Message is the message type for the message bus. Modeled after the NATS message type.
func Encode ¶
Encode marshals the payload into a Message. The payload is encoded as json.
func NewMessage ¶
NewMessage creates a new message with the given subject.
func (*Message) Bus ¶
Bus returns the bus that the message was received on or sent to Might be null.
func (*Message) LastSubjectPart ¶
LastSubjectPart returns the last part of the subject. Example: "a.b.c" -> "c"
type NatsBus ¶
type NatsBus struct {
// contains filtered or unexported fields
}
NatsBus is a Bus implementation that uses NATS as the transport.
func NewNatsBus ¶
NewNatsBus creates a new NATS bus using the given NATS connection.
func (*NatsBus) Publish ¶
Publish implements `Bus.Publish` for NAT
func (*NatsBus) QueueSubscribe ¶
QueueSubscribe implements `Bus.QueueSubscribe` for NATS.
func (*NatsBus) Request ¶
Request implements `Bus.Request` for NATS.
type NatsOption ¶
NatsOption is an option for configuring a NATS connection.
type NatsSubscription ¶
type NatsSubscription struct {
// contains filtered or unexported fields
}
NatsSubscription is a Subscription implementation for NATS.
func (*NatsSubscription) Drain ¶
func (s *NatsSubscription) Drain() error
Drain implements `Subscription.Drain` for NATS.
func (*NatsSubscription) Handle ¶
func (s *NatsSubscription) Handle(callback SubscriptionCallback)
Handle implements `Subscription.Handle` for NATS.
type RequestHandler ¶
type RequestHandler[T any, Y any] struct { Request T Response Y PreRequest func(context.Context, *T, *Message) error PostRequest func(context.Context, *Y, *Message) error Handler func(context.Context, *T) (*Y, error) }
RequestHandler is a generic handler that can be used to implement a server handler. It encodes the logic for handling a message and sending a response.
func NewRequestHandler ¶
func NewRequestHandler[T any, Y any]( req T, resp Y, handler func(context.Context, *T) (*Y, error), ) *RequestHandler[T, Y]
NewRequestHandler returns a new server handler instance. The `T` and `Y` types are used to define the Request and Response types. Both should be structs. They will be used as template for request/responses.
type Server ¶
type Server struct {
Bus
// Lattice is an informative field containing the lattice name.
// It is NOT used when manipulating subjects.
Lattice string
// ContextFunc is a function that returns a new context for each message.
// Defaults to `context.Background`.
ContextFunc func() context.Context
// contains filtered or unexported fields
}
Server is a higher-level abstraction that can be used to register handlers for specific subjects. See `AnyServerHandler` for more information.
func NewServer ¶
NewServer returns a new server instance.
func (*Server) Drain ¶
Drain walks through all subscriptions and drains them. It also closes the error stream. This is a blocking operation.
func (*Server) ErrorStream ¶
func (s *Server) ErrorStream() <-chan *ServerError
ErrorStream returns a channel that can be used to listen for Transport / Encoding level errors. See `ServerError` for more information.
func (*Server) RegisterHandler ¶
func (s *Server) RegisterHandler(subject string, handler AnyServerHandler) error
RegisterHandler registers a handler for a given subject. Each handler gets their channel subscription with no backlog, and their own goroutine for queue consumption. Callers should handle concurrency and synchronization themselves.
type ServerError ¶
ServerError carries information about transport & encoding errors outside Request/Response scope.
type ServerHandlerFunc ¶
ServerHandlerFunc is a function type that can be used to implement a server handler from a function.
type Subscription ¶
type Subscription interface {
// Handle starts the subscription and calls the callback for each message.
// Does not block.
Handle(callback SubscriptionCallback)
// Drain stops the subscription and closes the channel.
// Blocks until all messages are processed, releasing the Subscription Thread.
Drain() error
}
Subscription is the interface for a message subscription. It provides methods for handling messages and draining the subscription. Subscriptions run in their own goroutine.
Source Files
¶
- bus.go
- client.go
- nats.go
- server.go