Documentation
¶
Index ¶
- type AckIndication
- type AckSeqNum
- type AlertOnErrorCB
- type ForwardMessageHandlerCB
- type JetStreamACKBroadcaster
- type JetStreamACKReceiver
- type JetStreamAckHandler
- type JetStreamInflightMsgProcessor
- type JetStreamPublisher
- type JetStreamPushSubscriber
- type MessageDispatcher
- type MsgToDeliver
- type MsgToDeliverSeq
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type AckIndication ¶
type AckIndication struct {
// Stream is the name of the stream
Stream string `json:"stream" validate:"required,alphanum|uuid"`
// Consumer is the name of the consumer
Consumer string `json:"consumer" validate:"required,alphanum|uuid"`
// SeqNum is the sequence number of the JetStream message
SeqNum AckSeqNum `json:"seq_num" validate:"required,dive"`
}
AckIndication is the ACK of a NATs JetStream message which contains its key parameters
func (AckIndication) String ¶
func (m AckIndication) String() string
String toString for ackIndication
type AckSeqNum ¶
type AckSeqNum struct {
// Stream is the JetStream message sequence number for this stream
Stream uint64 `json:"stream" validate:"required,gte=0"`
// Consumer is the JetStream message sequence number for this consumer
Consumer uint64 `json:"consumer" validate:"required,gte=0"`
}
AckSeqNum are the sequence numbers of the NATs JetStream message
type AlertOnErrorCB ¶
type AlertOnErrorCB func(err error)
AlertOnErrorCB callback used to expose internal error to an outer context for handling
type ForwardMessageHandlerCB ¶
ForwardMessageHandlerCB callback used to forward new messages to the next pipeline stage
type JetStreamACKBroadcaster ¶
type JetStreamACKBroadcaster interface {
// BroadcastACK broadcast a JetStream message ACK
BroadcastACK(ctxt context.Context, ack AckIndication) error
}
JetStreamACKBroadcaster broadcasts JetStream message ACK through NATs subjects
func GetJetStreamACKBroadcaster ¶
func GetJetStreamACKBroadcaster( natsClient *core.NatsClient, instance string, ) (JetStreamACKBroadcaster, error)
GetJetStreamACKBroadcaster define JetStreamACKBroadcaster
type JetStreamACKReceiver ¶
type JetStreamACKReceiver interface {
// SubscribeForACKs start receiving JetStream message ACKs
SubscribeForACKs(wg *sync.WaitGroup, handler JetStreamAckHandler) error
}
JetStreamACKReceiver processes JetStream message ACKs being broadcast through NATs subjects
type JetStreamAckHandler ¶
type JetStreamAckHandler func(context.Context, AckIndication)
JetStreamAckHandler is the function signature for callback processing a JetStream ACK
type JetStreamInflightMsgProcessor ¶
type JetStreamInflightMsgProcessor interface {
// RecordInflightMessage records a new JetStream message inflight awaiting ACK
RecordInflightMessage(callCtxt context.Context, msg *nats.Msg, blocking bool) error
// HandlerMsgACK processes a new message ACK
HandlerMsgACK(callCtxt context.Context, ack AckIndication, blocking bool) error
}
JetStreamInflightMsgProcessor processes inflight JetStream messages awaiting ACK
type JetStreamPublisher ¶
type JetStreamPublisher interface {
// Publish publishes a new message into JetStream on a subject
Publish(ctxt context.Context, subject string, msg []byte) error
}
JetStreamPublisher publishes new messages into JetStream
func GetJetStreamPublisher ¶
func GetJetStreamPublisher( natsClient *core.NatsClient, instance string, ) (JetStreamPublisher, error)
GetJetStreamPublisher get new JetStreamPublisher
type JetStreamPushSubscriber ¶
type JetStreamPushSubscriber interface {
// StartReading begin reading data from JetStream
StartReading(
forwardCB ForwardMessageHandlerCB,
errorCB AlertOnErrorCB,
wg *sync.WaitGroup,
) error
}
JetStreamPushSubscriber is directly reading from JetStream with a push consumer
type MessageDispatcher ¶
type MessageDispatcher interface {
// Start starts operations
Start(msgOutput ForwardMessageHandlerCB, errorCB AlertOnErrorCB) error
}
MessageDispatcher process a consumer subscription request from a client and dispatch messages to that client
func GetPushMessageDispatcher ¶
func GetPushMessageDispatcher( ctxt context.Context, natsClient *core.NatsClient, stream, subject, consumer string, deliveryGroup *string, maxInflightMsgs int, wg *sync.WaitGroup, ) (MessageDispatcher, error)
GetPushMessageDispatcher get a new push MessageDispatcher
type MsgToDeliver ¶
type MsgToDeliver struct {
// Stream is the name of the stream
Stream string `json:"stream" validate:"required,alphanum|uuid"`
// Subject is the name of the subject / subject filter
Subject string `json:"subject" validate:"required"`
// Consumer is the name of the consumer
Consumer string `json:"consumer" validate:"required,alphanum|uuid"`
// Sequence is the sequence numbers for this JetStream message
Sequence MsgToDeliverSeq `json:"sequence" validate:"required,dive"`
// Message is the message body
Message []byte `json:"b64_msg" validate:"required" swaggertype:"string" format:"base64" example:"SGVsbG8gV29ybGQK"`
}
MsgToDeliver a structure for representing a message to send out to a subscribing client
func ConvertJSMessageDeliver ¶
func ConvertJSMessageDeliver(subject string, msg *nats.Msg) (MsgToDeliver, error)
ConvertJSMessageDeliver convert a JetStream message for delivery
func (MsgToDeliver) String ¶
func (m MsgToDeliver) String() string
String toString function for MsgToDeliver
type MsgToDeliverSeq ¶
type MsgToDeliverSeq struct {
// Stream is the message sequence number within the stream
Stream uint64 `json:"stream" validate:"required,gte=0"`
// Consumer is the message sequence number for this consumer
Consumer uint64 `json:"consumer" validate:"required,gte=0"`
}
MsgToDeliverSeq sequence numbers for a JetStream message