Documentation
¶
Overview ¶
Package nats implements the CloudEvent transport implementation using NATS.
Index ¶
- Variables
- func NatsOptions(opts ...nats.Option) []nats.Option
- func WriteMsg(ctx context.Context, m binding.Message, writer io.ReaderFrom, ...) error
- type Consumer
- type ConsumerOption
- type Message
- type Protocol
- type ProtocolOption
- type QueueSubscriber
- type Receiver
- type RegularSubscriber
- type Sender
- type SenderOption
- type Subscriber
Constants ¶
This section is empty.
Variables ¶
var ErrInvalidQueueName = errors.New("invalid queue name for QueueSubscriber")
Functions ¶
func NatsOptions ¶
NatsOptions is a helper function to group a variadic stan.ProtocolOption into []stan.Option that can be used by either Sender, Consumer or Protocol
Types ¶
type Consumer ¶
type Consumer struct {
Receiver
Conn *nats.Conn
Subject string
Subscriber Subscriber
// contains filtered or unexported fields
}
func NewConsumer ¶
func NewConsumerFromConn ¶
type ConsumerOption ¶
func WithQueueSubscriber ¶
func WithQueueSubscriber(queue string) ConsumerOption
WithQueueSubscriber configures the Consumer to join a queue group when subscribing
type Message ¶
Message implements binding.Message by wrapping an *nats.Msg. This message *can* be read several times safely
func NewMessage ¶
NewMessage wraps an *nats.Msg in a binding.Message. The returned message *can* be read several times safely
func (*Message) ReadBinary ¶
func (*Message) ReadEncoding ¶
func (*Message) ReadStructured ¶
type Protocol ¶
type Protocol struct {
Conn *nats.Conn
Consumer *Consumer
Sender *Sender
// contains filtered or unexported fields
}
Protocol is a reference implementation for using the CloudEvents binding integration. Protocol acts as both a NATS client and a NATS handler.
func NewProtocol ¶
func NewProtocol(url, sendSubject, receiveSubject string, natsOpts []nats.Option, opts ...ProtocolOption) (*Protocol, error)
NewProtocol creates a new NATS protocol.
func NewProtocolFromConn ¶
type ProtocolOption ¶
ProtocolOption is the function signature required to be considered an nats.ProtocolOption.
func WithConsumerOptions ¶
func WithConsumerOptions(opts ...ConsumerOption) ProtocolOption
func WithSenderOptions ¶
func WithSenderOptions(opts ...SenderOption) ProtocolOption
type QueueSubscriber ¶
type QueueSubscriber struct {
Queue string
}
QueueSubscriber creates queue subscriptions
func (*QueueSubscriber) Subscribe ¶
func (s *QueueSubscriber) Subscribe(conn *nats.Conn, subject string, cb nats.MsgHandler) (*nats.Subscription, error)
Subscribe implements Subscriber.Subscribe
type Receiver ¶
type Receiver struct {
// contains filtered or unexported fields
}
func NewReceiver ¶
func NewReceiver() *Receiver
func (*Receiver) MsgHandler ¶
MsgHandler implements nats.MsgHandler and publishes messages onto our internal incoming channel to be delivered via r.Receive(ctx)
type RegularSubscriber ¶
type RegularSubscriber struct {
}
RegularSubscriber creates regular subscriptions
func (*RegularSubscriber) Subscribe ¶
func (s *RegularSubscriber) Subscribe(conn *nats.Conn, subject string, cb nats.MsgHandler) (*nats.Subscription, error)
Subscribe implements Subscriber.Subscribe
type Sender ¶
func NewSender ¶
NewSender creates a new protocol.Sender responsible for opening and closing the STAN connection
func NewSenderFromConn ¶
NewSenderFromConn creates a new protocol.Sender which leaves responsibility for opening and closing the STAN connection to the caller
type SenderOption ¶
type Subscriber ¶
type Subscriber interface {
Subscribe(conn *nats.Conn, subject string, cb nats.MsgHandler) (*nats.Subscription, error)
}
The Subscriber interface allows us to configure how the subscription is created