Documentation
¶
Index ¶
- Variables
- func WithCodec(c options.Codec) options.Option
- func WithContext(ctx context.Context) options.Option
- func WithKnownIdentities(ids ...string) options.Option
- func WithKnownPublicKey(keys ...ed25519.PublicKey) options.Option
- func WithLogger(logger *slog.Logger) options.Option
- func WithNKeySeed(contents string) options.Option
- func WithName(name string) options.Option
- func WithNats(nc options.NatsConn) options.Option
- func WithParam(key string, val any) options.Option
- func WithPemPrivateKey(keyFile string) options.Option
- func WithPrefix(prefix string) options.Option
- func WithPrivateKey(pkey crypto.PrivateKey) options.Option
- func WithPubNats(nc options.NatsConn) options.Option
- func WithPublishQueueSize(n int) options.Option
- func WithReqNats(nc options.NatsConn) options.Option
- func WithStreamName(name string) options.Option
- func WithSubNats(nc options.NatsConn) options.Option
- func WithTelemetryPeriod(p time.Duration) options.Option
- func WithUserCreds(path string) options.Option
- func WithVerbose(v bool) options.Option
- type JetStreamer
- type Message
- type MessageHandler
- type MockJetStreamer
- type MockJetStreamer_Expecter
- type MockJetStreamer_JetStream_Call
- func (_c *MockJetStreamer_JetStream_Call) Return(_a0 nats.JetStreamContext, _a1 error) *MockJetStreamer_JetStream_Call
- func (_c *MockJetStreamer_JetStream_Call) Run(run func(opts ...nats.JSOpt)) *MockJetStreamer_JetStream_Call
- func (_c *MockJetStreamer_JetStream_Call) RunAndReturn(run func(...nats.JSOpt) (nats.JetStreamContext, error)) *MockJetStreamer_JetStream_Call
- type MockMessage
- func (_m *MockMessage) Ack(opts ...nats.AckOpt) error
- func (_m *MockMessage) AckSync(opts ...nats.AckOpt) error
- func (_m *MockMessage) Data() []byte
- func (_m *MockMessage) EXPECT() *MockMessage_Expecter
- func (_m *MockMessage) Equal(msg Message) bool
- func (_m *MockMessage) Header() nats.Header
- func (_m *MockMessage) InProgress(opts ...nats.AckOpt) error
- func (_m *MockMessage) Message() *nats.Msg
- func (_m *MockMessage) Metadata() (*nats.MsgMetadata, error)
- func (_m *MockMessage) Nak(opts ...nats.AckOpt) error
- func (_m *MockMessage) NakWithDelay(delay time.Duration, opts ...nats.AckOpt) error
- func (_m *MockMessage) QueueName() string
- func (_m *MockMessage) Reply() string
- func (_m *MockMessage) Respond(_a0 protoreflect.ProtoMessage) error
- func (_m *MockMessage) Subject() string
- func (_m *MockMessage) Term(opts ...nats.AckOpt) error
- type MockMessage_AckSync_Call
- type MockMessage_Ack_Call
- type MockMessage_Data_Call
- type MockMessage_Equal_Call
- type MockMessage_Expecter
- func (_e *MockMessage_Expecter) Ack(opts ...interface{}) *MockMessage_Ack_Call
- func (_e *MockMessage_Expecter) AckSync(opts ...interface{}) *MockMessage_AckSync_Call
- func (_e *MockMessage_Expecter) Data() *MockMessage_Data_Call
- func (_e *MockMessage_Expecter) Equal(msg interface{}) *MockMessage_Equal_Call
- func (_e *MockMessage_Expecter) Header() *MockMessage_Header_Call
- func (_e *MockMessage_Expecter) InProgress(opts ...interface{}) *MockMessage_InProgress_Call
- func (_e *MockMessage_Expecter) Message() *MockMessage_Message_Call
- func (_e *MockMessage_Expecter) Metadata() *MockMessage_Metadata_Call
- func (_e *MockMessage_Expecter) Nak(opts ...interface{}) *MockMessage_Nak_Call
- func (_e *MockMessage_Expecter) NakWithDelay(delay interface{}, opts ...interface{}) *MockMessage_NakWithDelay_Call
- func (_e *MockMessage_Expecter) QueueName() *MockMessage_QueueName_Call
- func (_e *MockMessage_Expecter) Reply() *MockMessage_Reply_Call
- func (_e *MockMessage_Expecter) Respond(_a0 interface{}) *MockMessage_Respond_Call
- func (_e *MockMessage_Expecter) Subject() *MockMessage_Subject_Call
- func (_e *MockMessage_Expecter) Term(opts ...interface{}) *MockMessage_Term_Call
- type MockMessage_Header_Call
- type MockMessage_InProgress_Call
- func (_c *MockMessage_InProgress_Call) Return(_a0 error) *MockMessage_InProgress_Call
- func (_c *MockMessage_InProgress_Call) Run(run func(opts ...nats.AckOpt)) *MockMessage_InProgress_Call
- func (_c *MockMessage_InProgress_Call) RunAndReturn(run func(...nats.AckOpt) error) *MockMessage_InProgress_Call
- type MockMessage_Message_Call
- type MockMessage_Metadata_Call
- func (_c *MockMessage_Metadata_Call) Return(_a0 *nats.MsgMetadata, _a1 error) *MockMessage_Metadata_Call
- func (_c *MockMessage_Metadata_Call) Run(run func()) *MockMessage_Metadata_Call
- func (_c *MockMessage_Metadata_Call) RunAndReturn(run func() (*nats.MsgMetadata, error)) *MockMessage_Metadata_Call
- type MockMessage_NakWithDelay_Call
- func (_c *MockMessage_NakWithDelay_Call) Return(_a0 error) *MockMessage_NakWithDelay_Call
- func (_c *MockMessage_NakWithDelay_Call) Run(run func(delay time.Duration, opts ...nats.AckOpt)) *MockMessage_NakWithDelay_Call
- func (_c *MockMessage_NakWithDelay_Call) RunAndReturn(run func(time.Duration, ...nats.AckOpt) error) *MockMessage_NakWithDelay_Call
- type MockMessage_Nak_Call
- type MockMessage_QueueName_Call
- type MockMessage_Reply_Call
- type MockMessage_Respond_Call
- func (_c *MockMessage_Respond_Call) Return(_a0 error) *MockMessage_Respond_Call
- func (_c *MockMessage_Respond_Call) Run(run func(_a0 protoreflect.ProtoMessage)) *MockMessage_Respond_Call
- func (_c *MockMessage_Respond_Call) RunAndReturn(run func(protoreflect.ProtoMessage) error) *MockMessage_Respond_Call
- type MockMessage_Subject_Call
- type MockMessage_Term_Call
- type Service
- func (b *Service) AddStatusCallback(callback StatusFunc)
- func (b *Service) AddStream(maxMsgs, maxBytes uint64, age time.Duration, subjects ...string) error
- func (b *Service) Close() error
- func (b *Service) ConcatenateStatus(path string, status, items map[string]string) map[string]string
- func (b *Service) Configure(opts ...options.Option) error
- func (b *Service) Fail(err error)
- func (b *Service) Publish(msg proto.Message, suffixes ...string) error
- func (b *Service) PublishBuf(buf []byte, suffixes ...string) error
- func (b *Service) PublishBufTo(buf []byte, tokens ...string) error
- func (b *Service) PublishBufToRpc(buf []byte, replyTo string, tokens ...string) error
- func (b *Service) PublishTo(msg proto.Message, tokens ...string) error
- func (b *Service) PublishToRpc(msg proto.Message, replyTo string, tokens ...string) error
- func (b *Service) RemoveStatusCallback(callback StatusFunc)
- func (b *Service) RemoveStream(subjects ...string) error
- func (b *Service) RequestBufFrom(ctx context.Context, buf []byte, tokens ...string) (Message, error)
- func (b *Service) RequestFrom(ctx context.Context, msg proto.Message, resp proto.Message, tokens ...string) (Message, error)
- func (b *Service) Respond(nmsg Message, msg proto.Message) error
- func (b *Service) RespondBuf(msg Message, buf []byte) error
- func (b *Service) RpcInbox(suffixes ...string) string
- func (b *Service) Serve(handler ServiceHandler, suffixes ...string) (*nats.Subscription, error)
- func (b *Service) Sign(msg []byte) (signature []byte, publicKey []byte, err error)
- func (b *Service) Start() context.Context
- func (b *Service) Subscribe(handler MessageHandler, suffixes ...string) (*nats.Subscription, error)
- func (b *Service) SubscribeTo(handler MessageHandler, tokens ...string) (*nats.Subscription, error)
- func (b *Service) Unmarshal(nmsg Message, msg proto.Message) (nats.Header, error)
- func (b *Service) Verify(nmsg Message) error
- type ServiceHandler
- type StatusFunc
- type Subject
- type SubjectMap
Constants ¶
This section is empty.
Variables ¶
var ( ErrInvalidSignature = errors.New("invalid signature") ErrInvalidIdentity = errors.New("invalid identity") ErrUnknownIdentity = errors.New("unknown identity") ErrPubConnection = errors.New("publishing NATS connection is nil") ErrSubConnection = errors.New("subscribing NATS connection is nil") ErrReqConnection = errors.New("request NATS connection is nil") )
var ErrNotAvailable = fmt.Errorf("not available")
Functions ¶
func WithContext ¶
WithContext sets the context for the publisher.
func WithKnownIdentities ¶
WithKnownIdentities will add known identities as base58 encoded ed25519 public keys. Only Messages from publishers identified by these keys will be accepted.
func WithKnownPublicKey ¶
WithKnownPublicKey will add known public keys. Only Messages from publishers identified by these keys will be accepted.
func WithLogger ¶
WithLogger sets the logger for the publisher.
func WithNKeySeed ¶
WithNKeySeed will decode decorated NATS NKey and use it for identity.
func WithName ¶
WithName sets name of the publisher. The subject is in the form of {prefix}.{name}. Subscriptions and publishing will use the subject constructed from prefix and name.
func WithNats ¶
WithNats sets up preconfigured NATS connector for publishing, subscribing, and request/reply.
func WithPemPrivateKey ¶
WithPemPrivateKey will load ED25519 private key from a PEM file and use it for identity.
func WithPrefix ¶
WithPrefix sets the prefix for the subject. The subject is in the form of {prefix}.{name}. Subscriptions and publishing will use the subject constructed from prefix and name.
func WithPrivateKey ¶
func WithPrivateKey(pkey crypto.PrivateKey) options.Option
WithPrivateKey will configure identity using ED25519 crypto.PrivateKey.
func WithPubNats ¶
WithPubNats sets up preconfigured NATS connector specifically for publishing.
func WithPublishQueueSize ¶
WithPublishQueueSize will configure the size of the publish queue.
func WithReqNats ¶
WithReqNats sets up preconfigured NATS connector specifically for request/reply.
func WithStreamName ¶
WithStreamName sets name of the JetStream stream. If an empty string is used, then stream name will be "{prefix}-{name}".
func WithSubNats ¶
WithSubNats sets up preconfigured NATS connector specifically for subscribing.
NOTE: This will also set ReqNats for compatibility.
func WithTelemetryPeriod ¶
WithTelemetryPeriod will configure verbosity level.
func WithUserCreds ¶
WithUserCreds will load NKey from NATS User Credentials file and use it for identity.
func WithVerbose ¶
WithVerbose will configure verbosity level.
Types ¶
type JetStreamer ¶
type JetStreamer interface {
JetStream(opts ...nats.JSOpt) (nats.JetStreamContext, error)
}
type Message ¶
type Message interface { Ack(opts ...nats.AckOpt) error AckSync(opts ...nats.AckOpt) error Equal(msg Message) bool InProgress(opts ...nats.AckOpt) error Metadata() (*nats.MsgMetadata, error) Nak(opts ...nats.AckOpt) error NakWithDelay(delay time.Duration, opts ...nats.AckOpt) error Respond(proto.Message) error Term(opts ...nats.AckOpt) error Message() *nats.Msg Subject() string Reply() string Data() []byte Header() nats.Header QueueName() string }
type MessageHandler ¶
type MessageHandler func(msg Message)
type MockJetStreamer ¶ added in v0.4.3
MockJetStreamer is an autogenerated mock type for the JetStreamer type
func NewMockJetStreamer ¶ added in v0.4.3
func NewMockJetStreamer(t interface { mock.TestingT Cleanup(func()) }) *MockJetStreamer
NewMockJetStreamer creates a new instance of MockJetStreamer. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. The first argument is typically a *testing.T value.
func (*MockJetStreamer) EXPECT ¶ added in v0.4.3
func (_m *MockJetStreamer) EXPECT() *MockJetStreamer_Expecter
func (*MockJetStreamer) JetStream ¶ added in v0.4.3
func (_m *MockJetStreamer) JetStream(opts ...nats.JSOpt) (nats.JetStreamContext, error)
JetStream provides a mock function with given fields: opts
type MockJetStreamer_Expecter ¶ added in v0.4.3
type MockJetStreamer_Expecter struct {
// contains filtered or unexported fields
}
func (*MockJetStreamer_Expecter) JetStream ¶ added in v0.4.3
func (_e *MockJetStreamer_Expecter) JetStream(opts ...interface{}) *MockJetStreamer_JetStream_Call
JetStream is a helper method to define mock.On call
- opts ...nats.JSOpt
type MockJetStreamer_JetStream_Call ¶ added in v0.4.3
MockJetStreamer_JetStream_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'JetStream'
func (*MockJetStreamer_JetStream_Call) Return ¶ added in v0.4.3
func (_c *MockJetStreamer_JetStream_Call) Return(_a0 nats.JetStreamContext, _a1 error) *MockJetStreamer_JetStream_Call
func (*MockJetStreamer_JetStream_Call) Run ¶ added in v0.4.3
func (_c *MockJetStreamer_JetStream_Call) Run(run func(opts ...nats.JSOpt)) *MockJetStreamer_JetStream_Call
func (*MockJetStreamer_JetStream_Call) RunAndReturn ¶ added in v0.4.3
func (_c *MockJetStreamer_JetStream_Call) RunAndReturn(run func(...nats.JSOpt) (nats.JetStreamContext, error)) *MockJetStreamer_JetStream_Call
type MockMessage ¶ added in v0.4.3
MockMessage is an autogenerated mock type for the Message type
func NewMockMessage ¶ added in v0.4.3
func NewMockMessage(t interface { mock.TestingT Cleanup(func()) }) *MockMessage
NewMockMessage creates a new instance of MockMessage. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. The first argument is typically a *testing.T value.
func (*MockMessage) Ack ¶ added in v0.4.3
func (_m *MockMessage) Ack(opts ...nats.AckOpt) error
Ack provides a mock function with given fields: opts
func (*MockMessage) AckSync ¶ added in v0.4.3
func (_m *MockMessage) AckSync(opts ...nats.AckOpt) error
AckSync provides a mock function with given fields: opts
func (*MockMessage) Data ¶ added in v0.4.3
func (_m *MockMessage) Data() []byte
Data provides a mock function with given fields:
func (*MockMessage) EXPECT ¶ added in v0.4.3
func (_m *MockMessage) EXPECT() *MockMessage_Expecter
func (*MockMessage) Equal ¶ added in v0.4.3
func (_m *MockMessage) Equal(msg Message) bool
Equal provides a mock function with given fields: msg
func (*MockMessage) Header ¶ added in v0.4.3
func (_m *MockMessage) Header() nats.Header
Header provides a mock function with given fields:
func (*MockMessage) InProgress ¶ added in v0.4.3
func (_m *MockMessage) InProgress(opts ...nats.AckOpt) error
InProgress provides a mock function with given fields: opts
func (*MockMessage) Message ¶ added in v0.4.3
func (_m *MockMessage) Message() *nats.Msg
Message provides a mock function with given fields:
func (*MockMessage) Metadata ¶ added in v0.4.3
func (_m *MockMessage) Metadata() (*nats.MsgMetadata, error)
Metadata provides a mock function with given fields:
func (*MockMessage) Nak ¶ added in v0.4.3
func (_m *MockMessage) Nak(opts ...nats.AckOpt) error
Nak provides a mock function with given fields: opts
func (*MockMessage) NakWithDelay ¶ added in v0.4.3
NakWithDelay provides a mock function with given fields: delay, opts
func (*MockMessage) QueueName ¶ added in v0.4.3
func (_m *MockMessage) QueueName() string
QueueName provides a mock function with given fields:
func (*MockMessage) Reply ¶ added in v0.4.3
func (_m *MockMessage) Reply() string
Reply provides a mock function with given fields:
func (*MockMessage) Respond ¶ added in v0.4.3
func (_m *MockMessage) Respond(_a0 protoreflect.ProtoMessage) error
Respond provides a mock function with given fields: _a0
func (*MockMessage) Subject ¶ added in v0.4.3
func (_m *MockMessage) Subject() string
Subject provides a mock function with given fields:
type MockMessage_AckSync_Call ¶ added in v0.4.3
MockMessage_AckSync_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'AckSync'
func (*MockMessage_AckSync_Call) Return ¶ added in v0.4.3
func (_c *MockMessage_AckSync_Call) Return(_a0 error) *MockMessage_AckSync_Call
func (*MockMessage_AckSync_Call) Run ¶ added in v0.4.3
func (_c *MockMessage_AckSync_Call) Run(run func(opts ...nats.AckOpt)) *MockMessage_AckSync_Call
func (*MockMessage_AckSync_Call) RunAndReturn ¶ added in v0.4.3
func (_c *MockMessage_AckSync_Call) RunAndReturn(run func(...nats.AckOpt) error) *MockMessage_AckSync_Call
type MockMessage_Ack_Call ¶ added in v0.4.3
MockMessage_Ack_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Ack'
func (*MockMessage_Ack_Call) Return ¶ added in v0.4.3
func (_c *MockMessage_Ack_Call) Return(_a0 error) *MockMessage_Ack_Call
func (*MockMessage_Ack_Call) Run ¶ added in v0.4.3
func (_c *MockMessage_Ack_Call) Run(run func(opts ...nats.AckOpt)) *MockMessage_Ack_Call
func (*MockMessage_Ack_Call) RunAndReturn ¶ added in v0.4.3
func (_c *MockMessage_Ack_Call) RunAndReturn(run func(...nats.AckOpt) error) *MockMessage_Ack_Call
type MockMessage_Data_Call ¶ added in v0.4.3
MockMessage_Data_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Data'
func (*MockMessage_Data_Call) Return ¶ added in v0.4.3
func (_c *MockMessage_Data_Call) Return(_a0 []byte) *MockMessage_Data_Call
func (*MockMessage_Data_Call) Run ¶ added in v0.4.3
func (_c *MockMessage_Data_Call) Run(run func()) *MockMessage_Data_Call
func (*MockMessage_Data_Call) RunAndReturn ¶ added in v0.4.3
func (_c *MockMessage_Data_Call) RunAndReturn(run func() []byte) *MockMessage_Data_Call
type MockMessage_Equal_Call ¶ added in v0.4.3
MockMessage_Equal_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Equal'
func (*MockMessage_Equal_Call) Return ¶ added in v0.4.3
func (_c *MockMessage_Equal_Call) Return(_a0 bool) *MockMessage_Equal_Call
func (*MockMessage_Equal_Call) Run ¶ added in v0.4.3
func (_c *MockMessage_Equal_Call) Run(run func(msg Message)) *MockMessage_Equal_Call
func (*MockMessage_Equal_Call) RunAndReturn ¶ added in v0.4.3
func (_c *MockMessage_Equal_Call) RunAndReturn(run func(Message) bool) *MockMessage_Equal_Call
type MockMessage_Expecter ¶ added in v0.4.3
type MockMessage_Expecter struct {
// contains filtered or unexported fields
}
func (*MockMessage_Expecter) Ack ¶ added in v0.4.3
func (_e *MockMessage_Expecter) Ack(opts ...interface{}) *MockMessage_Ack_Call
Ack is a helper method to define mock.On call
- opts ...nats.AckOpt
func (*MockMessage_Expecter) AckSync ¶ added in v0.4.3
func (_e *MockMessage_Expecter) AckSync(opts ...interface{}) *MockMessage_AckSync_Call
AckSync is a helper method to define mock.On call
- opts ...nats.AckOpt
func (*MockMessage_Expecter) Data ¶ added in v0.4.3
func (_e *MockMessage_Expecter) Data() *MockMessage_Data_Call
Data is a helper method to define mock.On call
func (*MockMessage_Expecter) Equal ¶ added in v0.4.3
func (_e *MockMessage_Expecter) Equal(msg interface{}) *MockMessage_Equal_Call
Equal is a helper method to define mock.On call
- msg Message
func (*MockMessage_Expecter) Header ¶ added in v0.4.3
func (_e *MockMessage_Expecter) Header() *MockMessage_Header_Call
Header is a helper method to define mock.On call
func (*MockMessage_Expecter) InProgress ¶ added in v0.4.3
func (_e *MockMessage_Expecter) InProgress(opts ...interface{}) *MockMessage_InProgress_Call
InProgress is a helper method to define mock.On call
- opts ...nats.AckOpt
func (*MockMessage_Expecter) Message ¶ added in v0.4.3
func (_e *MockMessage_Expecter) Message() *MockMessage_Message_Call
Message is a helper method to define mock.On call
func (*MockMessage_Expecter) Metadata ¶ added in v0.4.3
func (_e *MockMessage_Expecter) Metadata() *MockMessage_Metadata_Call
Metadata is a helper method to define mock.On call
func (*MockMessage_Expecter) Nak ¶ added in v0.4.3
func (_e *MockMessage_Expecter) Nak(opts ...interface{}) *MockMessage_Nak_Call
Nak is a helper method to define mock.On call
- opts ...nats.AckOpt
func (*MockMessage_Expecter) NakWithDelay ¶ added in v0.4.3
func (_e *MockMessage_Expecter) NakWithDelay(delay interface{}, opts ...interface{}) *MockMessage_NakWithDelay_Call
NakWithDelay is a helper method to define mock.On call
- delay time.Duration
- opts ...nats.AckOpt
func (*MockMessage_Expecter) QueueName ¶ added in v0.4.3
func (_e *MockMessage_Expecter) QueueName() *MockMessage_QueueName_Call
QueueName is a helper method to define mock.On call
func (*MockMessage_Expecter) Reply ¶ added in v0.4.3
func (_e *MockMessage_Expecter) Reply() *MockMessage_Reply_Call
Reply is a helper method to define mock.On call
func (*MockMessage_Expecter) Respond ¶ added in v0.4.3
func (_e *MockMessage_Expecter) Respond(_a0 interface{}) *MockMessage_Respond_Call
Respond is a helper method to define mock.On call
- _a0 protoreflect.ProtoMessage
func (*MockMessage_Expecter) Subject ¶ added in v0.4.3
func (_e *MockMessage_Expecter) Subject() *MockMessage_Subject_Call
Subject is a helper method to define mock.On call
func (*MockMessage_Expecter) Term ¶ added in v0.4.3
func (_e *MockMessage_Expecter) Term(opts ...interface{}) *MockMessage_Term_Call
Term is a helper method to define mock.On call
- opts ...nats.AckOpt
type MockMessage_Header_Call ¶ added in v0.4.3
MockMessage_Header_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Header'
func (*MockMessage_Header_Call) Return ¶ added in v0.4.3
func (_c *MockMessage_Header_Call) Return(_a0 nats.Header) *MockMessage_Header_Call
func (*MockMessage_Header_Call) Run ¶ added in v0.4.3
func (_c *MockMessage_Header_Call) Run(run func()) *MockMessage_Header_Call
func (*MockMessage_Header_Call) RunAndReturn ¶ added in v0.4.3
func (_c *MockMessage_Header_Call) RunAndReturn(run func() nats.Header) *MockMessage_Header_Call
type MockMessage_InProgress_Call ¶ added in v0.4.3
MockMessage_InProgress_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'InProgress'
func (*MockMessage_InProgress_Call) Return ¶ added in v0.4.3
func (_c *MockMessage_InProgress_Call) Return(_a0 error) *MockMessage_InProgress_Call
func (*MockMessage_InProgress_Call) Run ¶ added in v0.4.3
func (_c *MockMessage_InProgress_Call) Run(run func(opts ...nats.AckOpt)) *MockMessage_InProgress_Call
func (*MockMessage_InProgress_Call) RunAndReturn ¶ added in v0.4.3
func (_c *MockMessage_InProgress_Call) RunAndReturn(run func(...nats.AckOpt) error) *MockMessage_InProgress_Call
type MockMessage_Message_Call ¶ added in v0.4.3
MockMessage_Message_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Message'
func (*MockMessage_Message_Call) Return ¶ added in v0.4.3
func (_c *MockMessage_Message_Call) Return(_a0 *nats.Msg) *MockMessage_Message_Call
func (*MockMessage_Message_Call) Run ¶ added in v0.4.3
func (_c *MockMessage_Message_Call) Run(run func()) *MockMessage_Message_Call
func (*MockMessage_Message_Call) RunAndReturn ¶ added in v0.4.3
func (_c *MockMessage_Message_Call) RunAndReturn(run func() *nats.Msg) *MockMessage_Message_Call
type MockMessage_Metadata_Call ¶ added in v0.4.3
MockMessage_Metadata_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Metadata'
func (*MockMessage_Metadata_Call) Return ¶ added in v0.4.3
func (_c *MockMessage_Metadata_Call) Return(_a0 *nats.MsgMetadata, _a1 error) *MockMessage_Metadata_Call
func (*MockMessage_Metadata_Call) Run ¶ added in v0.4.3
func (_c *MockMessage_Metadata_Call) Run(run func()) *MockMessage_Metadata_Call
func (*MockMessage_Metadata_Call) RunAndReturn ¶ added in v0.4.3
func (_c *MockMessage_Metadata_Call) RunAndReturn(run func() (*nats.MsgMetadata, error)) *MockMessage_Metadata_Call
type MockMessage_NakWithDelay_Call ¶ added in v0.4.3
MockMessage_NakWithDelay_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'NakWithDelay'
func (*MockMessage_NakWithDelay_Call) Return ¶ added in v0.4.3
func (_c *MockMessage_NakWithDelay_Call) Return(_a0 error) *MockMessage_NakWithDelay_Call
func (*MockMessage_NakWithDelay_Call) Run ¶ added in v0.4.3
func (_c *MockMessage_NakWithDelay_Call) Run(run func(delay time.Duration, opts ...nats.AckOpt)) *MockMessage_NakWithDelay_Call
func (*MockMessage_NakWithDelay_Call) RunAndReturn ¶ added in v0.4.3
func (_c *MockMessage_NakWithDelay_Call) RunAndReturn(run func(time.Duration, ...nats.AckOpt) error) *MockMessage_NakWithDelay_Call
type MockMessage_Nak_Call ¶ added in v0.4.3
MockMessage_Nak_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Nak'
func (*MockMessage_Nak_Call) Return ¶ added in v0.4.3
func (_c *MockMessage_Nak_Call) Return(_a0 error) *MockMessage_Nak_Call
func (*MockMessage_Nak_Call) Run ¶ added in v0.4.3
func (_c *MockMessage_Nak_Call) Run(run func(opts ...nats.AckOpt)) *MockMessage_Nak_Call
func (*MockMessage_Nak_Call) RunAndReturn ¶ added in v0.4.3
func (_c *MockMessage_Nak_Call) RunAndReturn(run func(...nats.AckOpt) error) *MockMessage_Nak_Call
type MockMessage_QueueName_Call ¶ added in v0.4.3
MockMessage_QueueName_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'QueueName'
func (*MockMessage_QueueName_Call) Return ¶ added in v0.4.3
func (_c *MockMessage_QueueName_Call) Return(_a0 string) *MockMessage_QueueName_Call
func (*MockMessage_QueueName_Call) Run ¶ added in v0.4.3
func (_c *MockMessage_QueueName_Call) Run(run func()) *MockMessage_QueueName_Call
func (*MockMessage_QueueName_Call) RunAndReturn ¶ added in v0.4.3
func (_c *MockMessage_QueueName_Call) RunAndReturn(run func() string) *MockMessage_QueueName_Call
type MockMessage_Reply_Call ¶ added in v0.4.3
MockMessage_Reply_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Reply'
func (*MockMessage_Reply_Call) Return ¶ added in v0.4.3
func (_c *MockMessage_Reply_Call) Return(_a0 string) *MockMessage_Reply_Call
func (*MockMessage_Reply_Call) Run ¶ added in v0.4.3
func (_c *MockMessage_Reply_Call) Run(run func()) *MockMessage_Reply_Call
func (*MockMessage_Reply_Call) RunAndReturn ¶ added in v0.4.3
func (_c *MockMessage_Reply_Call) RunAndReturn(run func() string) *MockMessage_Reply_Call
type MockMessage_Respond_Call ¶ added in v0.4.3
MockMessage_Respond_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Respond'
func (*MockMessage_Respond_Call) Return ¶ added in v0.4.3
func (_c *MockMessage_Respond_Call) Return(_a0 error) *MockMessage_Respond_Call
func (*MockMessage_Respond_Call) Run ¶ added in v0.4.3
func (_c *MockMessage_Respond_Call) Run(run func(_a0 protoreflect.ProtoMessage)) *MockMessage_Respond_Call
func (*MockMessage_Respond_Call) RunAndReturn ¶ added in v0.4.3
func (_c *MockMessage_Respond_Call) RunAndReturn(run func(protoreflect.ProtoMessage) error) *MockMessage_Respond_Call
type MockMessage_Subject_Call ¶ added in v0.4.3
MockMessage_Subject_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Subject'
func (*MockMessage_Subject_Call) Return ¶ added in v0.4.3
func (_c *MockMessage_Subject_Call) Return(_a0 string) *MockMessage_Subject_Call
func (*MockMessage_Subject_Call) Run ¶ added in v0.4.3
func (_c *MockMessage_Subject_Call) Run(run func()) *MockMessage_Subject_Call
func (*MockMessage_Subject_Call) RunAndReturn ¶ added in v0.4.3
func (_c *MockMessage_Subject_Call) RunAndReturn(run func() string) *MockMessage_Subject_Call
type MockMessage_Term_Call ¶ added in v0.4.3
MockMessage_Term_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Term'
func (*MockMessage_Term_Call) Return ¶ added in v0.4.3
func (_c *MockMessage_Term_Call) Return(_a0 error) *MockMessage_Term_Call
func (*MockMessage_Term_Call) Run ¶ added in v0.4.3
func (_c *MockMessage_Term_Call) Run(run func(opts ...nats.AckOpt)) *MockMessage_Term_Call
func (*MockMessage_Term_Call) RunAndReturn ¶ added in v0.4.3
func (_c *MockMessage_Term_Call) RunAndReturn(run func(...nats.AckOpt) error) *MockMessage_Term_Call
type Service ¶
Service is the base publisher structure. It must be embedded in the publisher to benefit from the common implementation. Config method must be called on this embedded struct in order to properly set it up.
func (*Service) AddStatusCallback ¶
func (b *Service) AddStatusCallback(callback StatusFunc)
AddStatusCallback will register a status callback.
func (*Service) AddStream ¶
AddStream is an experimental feature that creates a durable stream. It is possible to subscribe to this durable stream using regular Subscribe or SubscribeTo methods given that the subject is included in the created stream.
The interface for this feature is experimental and it should be expected to change.
This method will create a stream with maxMsgs, maxBytes, and age for a list of subjects on JetStream if it does not exist. This is a temporary solution so that the stream doesn't have to be created manually. However, this will change in the near future, therefore users will have to make sure that the stream exists before calling this method(maxMsgs, maxBytes, and age parameters will be removed).
NOTE: Messages are automatically acknowledged after handler returns.
func (*Service) ConcatenateStatus ¶
Concatenate status copies (k,v) from items to status optionally prefixing the key with path.
func (*Service) Fail ¶
Fail is a convenience function that allows to asynchronously propagate errors.
func (*Service) Publish ¶
Publish will sign the message and publish it to a subject constructed from "{prefix}.{name}.{suffixes}". Publish will use PubNats connection.
func (*Service) PublishBuf ¶
PublishBuf is the same as Publish, but will publish the raw bytes.
func (*Service) PublishBufTo ¶
PublishBufTo is the same as PublishTo, but for raw bytes.
func (*Service) PublishBufToRpc ¶ added in v0.4.3
PublishBufToRpc is the same as PublishBufTo, but uses ReqNats.
func (*Service) PublishTo ¶
PublishTo will sign the message and publish it to a specific subject constructed from subject tokens. PublishTo will use PubNats connection.
func (*Service) PublishToRpc ¶ added in v0.4.3
PublishToRpc will sign the message and publish it to a specific subject constructed from subject tokens. PublishToRpc will use ReqNats connection.
func (*Service) RemoveStatusCallback ¶
func (b *Service) RemoveStatusCallback(callback StatusFunc)
RemoveStatusCallback will remove a status callback.
func (*Service) RemoveStream ¶
RemoveStream will attempt to remove consumers based on a list of subjects. List of subjects must be exactly the same as was used in AddStream since js Consumer names are based on the subjects.
func (*Service) RequestBufFrom ¶
func (b *Service) RequestBufFrom(ctx context.Context, buf []byte, tokens ...string) (Message, error)
RequestBufFrom requests a reply from a subject using ReqNats connection. This a synchronous operation that does not involve publisher queue.
func (*Service) RequestFrom ¶
func (b *Service) RequestFrom(ctx context.Context, msg proto.Message, resp proto.Message, tokens ...string) (Message, error)
RequestFrom requests a reply from a subject using ReqNats connection. The subject will be constructed from tokens. This a synchronous operation that does not involve publisher queue.
func (*Service) Respond ¶
Respond will respond to a message sent as a request. This is a helper function and Message.Respond should be used instead.
func (*Service) RespondBuf ¶
RespondBuf is the same as Respond, but will respond with raw bytes.
func (*Service) Serve ¶
func (b *Service) Serve(handler ServiceHandler, suffixes ...string) (*nats.Subscription, error)
Serve is a convenience method to serve a service subject. It acts the same as Subscribe, but takes `ServiceHandler` instead, and will respond either with Error type or response from the handler. Serve will use ReqNats connection.
func (*Service) Start ¶
Start will start the publisher base implementation that includes telemetry and other internal goroutines.
func (*Service) Subscribe ¶
func (b *Service) Subscribe(handler MessageHandler, suffixes ...string) (*nats.Subscription, error)
Subscribe will subscribe to a subject constructed from {prefix}.{name}.{...suffixes}, where suffixes are joined using ".". Subscribe will use SubNats connection.
func (*Service) SubscribeTo ¶
func (b *Service) SubscribeTo(handler MessageHandler, tokens ...string) (*nats.Subscription, error)
SubscribeTo will subscribe to a subject constructed as {...tokens}, where tokens are joined using ".".
Experimental: When a stream was registered with AddStream SubscribeTo will use durable stream instead of realtime.
type StatusFunc ¶
StatusFunc is a type for callback func. This will be called periodically to construct telemetry status. "uptime", "goroutines", "period" keys will be overriden internally.
type Subject ¶
type Subject string
Subject represents a NATS subject which can include wildcards.
func NewSubject ¶ added in v0.4.3
func (Subject) Match ¶
Match tries to pattern-match the subject against another subject. It considers NATS wildcard rules where '*' matches any token at a level, and '>' matches all subsequent tokens.
func (Subject) RemainingTokens ¶
RemainingTokens matches subject tokens until the end of subject tokens and returns the remaining tokens.
func (Subject) SymmetricMatch ¶
SymmetricMatch tries to pattern-match a subject against another subject in both directions. It considers a match if either subject matches the other according to NATS wildcard rules.
type SubjectMap ¶
SubjectMap maps subjects (as strings) to indices to an array that holds stream and consumer configs.
func (SubjectMap) Add ¶
func (m SubjectMap) Add(subject Subject, idx int) error
Add inserts a subject and its associated index into the map. It returns an error if the subject is invalid, but in the current implementation, it always succeeds.
func (SubjectMap) Get ¶
func (m SubjectMap) Get(subject Subject) (int, bool)
Get returns the index stored
func (SubjectMap) Search ¶
func (m SubjectMap) Search(subject Subject) (Subject, int, bool)
Search looks for a subject in the map that matches the given subject according to NATS pattern matching rules. Returns the matching subject, its associated index, and true if a match is found. If no match is found, it returns empty string, 0, and false.
func (SubjectMap) SymmetricSearch ¶
func (m SubjectMap) SymmetricSearch(subject Subject) (Subject, int, bool)
SymmetricSearch looks for a subject in the map that symmetrically matches the given subject. Symmetric matching means either the map's subject matches the given subject or vice versa. Returns the matching subject, its associated index, and true if a match is found. If no match is found, it returns empty string, 0, and false.